diff --git a/orchestration/flows/bl832/dispatcher.py b/orchestration/flows/bl832/dispatcher.py index d28a02eb..915a02cd 100644 --- a/orchestration/flows/bl832/dispatcher.py +++ b/orchestration/flows/bl832/dispatcher.py @@ -17,6 +17,12 @@ class FlowParameterMapper: "alcf_recon_flow/alcf_recon_flow": [ "file_path", "config"], + "alcf_forge_recon_segment_flow/alcf_forge_recon_segment_flow": [ + "file_path", + "config"], + "alcf_forge_recon_multisegment_flow/alcf_forge_recon_multisegment_flow": [ + "file_path", + "config"], # From move.py "new_832_file_flow/new_file_832": [ "file_path", @@ -67,6 +73,8 @@ class DecisionFlowInputModel(BaseModel): @task(name="setup_decision_settings") def setup_decision_settings( alcf_recon: bool, + alcf_forge_recon_segment: bool, + alcf_forge_recon_multisegment: bool, nersc_recon: bool, nersc_petiole_segment: bool, nersc_moon_segment: bool, @@ -76,6 +84,8 @@ def setup_decision_settings( This task is used to define the settings for the decision making process of the BL832 beamline. :param alcf_recon: Boolean indicating whether to run the ALCF reconstruction flow. + :param alcf_forge_recon_segment: Boolean indicating whether to run the ALCF Forge reconstruction segment flow. + :param alcf_forge_recon_multisegment: Boolean indicating whether to run the ALCF Forge reconstruction multisegment flow. :param nersc_recon: Boolean indicating whether to run the NERSC reconstruction flow. :param nersc_petiole_segment: Boolean indicating whether to run the NERSC petiole segmentation flow. :param nersc_moon_segment: Boolean indicating whether to run the NERSC moon segmentation flow. @@ -85,6 +95,8 @@ def setup_decision_settings( logger = get_run_logger() try: logger.info(f"Setting up decision settings: alcf_recon={alcf_recon}, " + f"alcf_forge_recon_segment={alcf_forge_recon_segment}, " + f"alcf_forge_recon_multisegment={alcf_forge_recon_multisegment}, " f"nersc_recon={nersc_recon}, " f"nersc_petiole_segment={nersc_petiole_segment}, " f"nersc_moon_segment={nersc_moon_segment}, " @@ -92,6 +104,8 @@ def setup_decision_settings( # Define which flows to run based on the input settings settings = { "alcf_recon_flow/alcf_recon_flow": alcf_recon, + "alcf_forge_recon_segment_flow/alcf_forge_recon_segment_flow": alcf_forge_recon_segment, + "alcf_forge_recon_multisegment_flow/alcf_forge_recon_multisegment_flow": alcf_forge_recon_multisegment, "nersc_recon_flow/nersc_recon_flow": nersc_recon, "nersc_petiole_segment_flow/nersc_petiole_segment_flow": nersc_petiole_segment, "nersc_moon_segment_flow/nersc_moon_segment_flow": nersc_moon_segment, @@ -168,6 +182,21 @@ async def dispatcher( alcf_params = FlowParameterMapper.get_flow_parameters("alcf_recon_flow/alcf_recon_flow", available_params) tasks.append(run_recon_flow_async("alcf_recon_flow/alcf_recon_flow", alcf_params)) + if decision_settings.get("alcf_forge_recon_segment_flow/alcf_forge_recon_segment_flow"): + alcf_forge_params = FlowParameterMapper.get_flow_parameters( + "alcf_forge_recon_segment_flow/alcf_forge_recon_segment_flow", + available_params + ) + tasks.append(run_recon_flow_async("alcf_forge_recon_segment_flow/alcf_forge_recon_segment_flow", alcf_forge_params)) + + if decision_settings.get("alcf_forge_recon_multisegment_flow/alcf_forge_recon_multisegment_flow"): + alcf_forge_params = FlowParameterMapper.get_flow_parameters( + "alcf_forge_recon_multisegment_flow/alcf_forge_recon_multisegment_flow", + available_params + ) + tasks.append(run_recon_flow_async("alcf_forge_recon_multisegment_flow/alcf_forge_recon_multisegment_flow", + alcf_forge_params)) + if decision_settings.get("nersc_recon_flow/nersc_recon_flow"): nersc_params = FlowParameterMapper.get_flow_parameters("nersc_recon_flow/nersc_recon_flow", available_params) tasks.append(run_recon_flow_async("nersc_recon_flow/nersc_recon_flow", nersc_params)) @@ -208,21 +237,15 @@ async def dispatcher( # Setup decision settings based on input parameters setup_decision_settings( alcf_recon=True, + alcf_forge_recon_segment=False, + alcf_forge_recon_multisegment=False, nersc_recon=True, - nersc_recon_multinode=True, + nersc_petiole_segment=False, + nersc_moon_segment=False, + nersc_forge_recon_segment=False, + nersc_forge_recon_multisegment=False, new_file_832=True ) - # Run the main decision flow with the specified parameters - # asyncio.run(dispatcher( - # config={}, # PYTEST, ALCF, NERSC - # is_export_control=False, # ALCF & MOVE - # folder_name="folder", # ALCF - # file_name="file", # ALCF - # file_path="/path/to/file", # MOVE - # send_to_alcf=True, # ALCF - # send_to_nersc=True, # MOVE - # ) - # ) except Exception as e: logger = get_run_logger() logger.error(f"Failed to execute main flow: {e}")