Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 35 additions & 12 deletions orchestration/flows/bl832/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@ class FlowParameterMapper:
"alcf_recon_flow/alcf_recon_flow": [
"file_path",
"config"],
"alcf_forge_recon_segment_flow/alcf_forge_recon_segment_flow": [
"file_path",
"config"],
"alcf_forge_recon_multisegment_flow/alcf_forge_recon_multisegment_flow": [
"file_path",
"config"],
# From move.py
"new_832_file_flow/new_file_832": [
"file_path",
Expand Down Expand Up @@ -67,6 +73,8 @@ class DecisionFlowInputModel(BaseModel):
@task(name="setup_decision_settings")
def setup_decision_settings(
alcf_recon: bool,
alcf_forge_recon_segment: bool,
alcf_forge_recon_multisegment: bool,
nersc_recon: bool,
nersc_petiole_segment: bool,
nersc_moon_segment: bool,
Expand All @@ -76,6 +84,8 @@ def setup_decision_settings(
This task is used to define the settings for the decision making process of the BL832 beamline.

:param alcf_recon: Boolean indicating whether to run the ALCF reconstruction flow.
:param alcf_forge_recon_segment: Boolean indicating whether to run the ALCF Forge reconstruction segment flow.
:param alcf_forge_recon_multisegment: Boolean indicating whether to run the ALCF Forge reconstruction multisegment flow.
:param nersc_recon: Boolean indicating whether to run the NERSC reconstruction flow.
:param nersc_petiole_segment: Boolean indicating whether to run the NERSC petiole segmentation flow.
:param nersc_moon_segment: Boolean indicating whether to run the NERSC moon segmentation flow.
Expand All @@ -85,13 +95,17 @@ def setup_decision_settings(
logger = get_run_logger()
try:
logger.info(f"Setting up decision settings: alcf_recon={alcf_recon}, "
f"alcf_forge_recon_segment={alcf_forge_recon_segment}, "
f"alcf_forge_recon_multisegment={alcf_forge_recon_multisegment}, "
f"nersc_recon={nersc_recon}, "
f"nersc_petiole_segment={nersc_petiole_segment}, "
f"nersc_moon_segment={nersc_moon_segment}, "
f"new_file_832={new_file_832}")
# Define which flows to run based on the input settings
settings = {
"alcf_recon_flow/alcf_recon_flow": alcf_recon,
"alcf_forge_recon_segment_flow/alcf_forge_recon_segment_flow": alcf_forge_recon_segment,
"alcf_forge_recon_multisegment_flow/alcf_forge_recon_multisegment_flow": alcf_forge_recon_multisegment,
"nersc_recon_flow/nersc_recon_flow": nersc_recon,
"nersc_petiole_segment_flow/nersc_petiole_segment_flow": nersc_petiole_segment,
"nersc_moon_segment_flow/nersc_moon_segment_flow": nersc_moon_segment,
Expand Down Expand Up @@ -168,6 +182,21 @@ async def dispatcher(
alcf_params = FlowParameterMapper.get_flow_parameters("alcf_recon_flow/alcf_recon_flow", available_params)
tasks.append(run_recon_flow_async("alcf_recon_flow/alcf_recon_flow", alcf_params))

if decision_settings.get("alcf_forge_recon_segment_flow/alcf_forge_recon_segment_flow"):
alcf_forge_params = FlowParameterMapper.get_flow_parameters(
"alcf_forge_recon_segment_flow/alcf_forge_recon_segment_flow",
available_params
)
tasks.append(run_recon_flow_async("alcf_forge_recon_segment_flow/alcf_forge_recon_segment_flow", alcf_forge_params))

if decision_settings.get("alcf_forge_recon_multisegment_flow/alcf_forge_recon_multisegment_flow"):
alcf_forge_params = FlowParameterMapper.get_flow_parameters(
"alcf_forge_recon_multisegment_flow/alcf_forge_recon_multisegment_flow",
available_params
)
tasks.append(run_recon_flow_async("alcf_forge_recon_multisegment_flow/alcf_forge_recon_multisegment_flow",
alcf_forge_params))

if decision_settings.get("nersc_recon_flow/nersc_recon_flow"):
nersc_params = FlowParameterMapper.get_flow_parameters("nersc_recon_flow/nersc_recon_flow", available_params)
tasks.append(run_recon_flow_async("nersc_recon_flow/nersc_recon_flow", nersc_params))
Expand Down Expand Up @@ -208,21 +237,15 @@ async def dispatcher(
# Setup decision settings based on input parameters
setup_decision_settings(
alcf_recon=True,
alcf_forge_recon_segment=False,
alcf_forge_recon_multisegment=False,
nersc_recon=True,
nersc_recon_multinode=True,
nersc_petiole_segment=False,
nersc_moon_segment=False,
nersc_forge_recon_segment=False,
nersc_forge_recon_multisegment=False,
new_file_832=True
)
# Run the main decision flow with the specified parameters
# asyncio.run(dispatcher(
# config={}, # PYTEST, ALCF, NERSC
# is_export_control=False, # ALCF & MOVE
# folder_name="folder", # ALCF
# file_name="file", # ALCF
# file_path="/path/to/file", # MOVE
# send_to_alcf=True, # ALCF
# send_to_nersc=True, # MOVE
# )
# )
except Exception as e:
logger = get_run_logger()
logger.error(f"Failed to execute main flow: {e}")
Loading