diff --git a/config.yml b/config.yml index 4e3c6ed2..34d63014 100644 --- a/config.yml +++ b/config.yml @@ -139,6 +139,12 @@ globus: uuid: df82346e-9a15-11ea-b3c4-0ae144191ee3 name: nersc832 + bl832-beegfs-raw: + root_path: /beamline_staging/bl832/raw/ + uri: beegfs.als.lbl.gov + uuid: d33b5d6e-1603-414e-93cb-bcb732b7914a + name: bl832-beegfs-raw + globus_apps: als_transfer: client_id: ${GLOBUS_CLIENT_ID} diff --git a/orchestration/_tests/test_globus_flow.py b/orchestration/_tests/test_globus_flow.py index c75317e1..352c09cb 100644 --- a/orchestration/_tests/test_globus_flow.py +++ b/orchestration/_tests/test_globus_flow.py @@ -150,6 +150,7 @@ def __init__(self) -> None: MockSecret.for_endpoint("nersc832_alsdev_scratch")), "alcf832_raw": MockEndpoint("mock_alcf832_raw_path", MockSecret.for_endpoint("alcf832_raw")), "alcf832_scratch": MockEndpoint("mock_alcf832_scratch_path", MockSecret.for_endpoint("alcf832_scratch")), + "beegfs_raw": MockEndpoint("mock_beegfs_raw_path", MockSecret.for_endpoint("beegfs_raw")) } # Mock apps @@ -169,6 +170,7 @@ def __init__(self) -> None: self.data832_raw = self.endpoints["data832_raw"] self.data832_scratch = self.endpoints["data832_scratch"] self.nersc832_alsdev_scratch = self.endpoints["nersc832_alsdev_scratch"] + self.beegfs_raw = self.endpoints["beegfs_raw"] self.scicat = config["scicat"] @@ -250,6 +252,7 @@ def test_alcf_recon_flow(mocker: MockFixture): "nersc832_alsdev_recon_scripts": mocker.MagicMock(), "alcf832_raw": mocker.MagicMock(), "alcf832_scratch": mocker.MagicMock(), + "bl832-beegfs-raw": mocker.MagicMock(), } ) mocker.patch( diff --git a/orchestration/flows/bl832/config.py b/orchestration/flows/bl832/config.py index 16b03629..8c3be5ac 100644 --- a/orchestration/flows/bl832/config.py +++ b/orchestration/flows/bl832/config.py @@ -27,6 +27,7 @@ def _beam_specific_config(self) -> None: self.nersc832_alsdev_recon_scripts = self.endpoints["nersc832_alsdev_recon_scripts"] self.alcf832_raw = self.endpoints["alcf832_raw"] self.alcf832_scratch = self.endpoints["alcf832_scratch"] + self.beegfs_raw = self.endpoints["bl832-beegfs-raw"] # SciCat self.scicat = self.config["scicat"] # NERSC HPC submission settings diff --git a/orchestration/flows/bl832/move.py b/orchestration/flows/bl832/move.py index e5e10b02..de5d30f9 100644 --- a/orchestration/flows/bl832/move.py +++ b/orchestration/flows/bl832/move.py @@ -12,6 +12,7 @@ from orchestration.globus.transfer import GlobusEndpoint, start_transfer from orchestration.prune_controller import get_prune_controller, PruneMethod from orchestration.prometheus_utils import PrometheusMetrics +from orchestration.transfer_controller import CopyMethod, get_transfer_controller API_KEY = os.getenv("API_KEY") @@ -156,6 +157,21 @@ def process_new_832_file_task( except Exception as e: logger.error(f"SciCat ingest failed with {e}") + transfer_controller = get_transfer_controller( + transfer_type=CopyMethod.GLOBUS, + config=config, + prometheus_metrics=None + ) + + transfer_controller.copy( + file_path=relative_path, + source=config.data832, + destination=config.beegfs_raw + ) + logger.info(f"File successfully transferred from data832 to beegfs {file_path}") + + # TODO: we should trigger the tiled ingestion flow in orchestration.tiled, but that flow will be set up on Ride/beegfs + logger.info("Initializing prune controller") prune_controller = get_prune_controller( prune_type=PruneMethod.GLOBUS, diff --git a/orchestration/tiled.py b/orchestration/tiled.py new file mode 100644 index 00000000..849d567a --- /dev/null +++ b/orchestration/tiled.py @@ -0,0 +1,76 @@ +"""Register data files and Zarr/HDF5 stores to the Tiled catalog. + +Intended to run on a Ride worker. The file path must be accessible +to the Tiled server's filesystem. +""" +from dotenv import load_dotenv +import os +from pathlib import Path + +from prefect import flow, get_run_logger, task +from tiled.client import from_uri +from tiled.client.register import register + + +@task(name="register-file-to-tiled", task_run_name="register-{path}") +async def register_file_to_tiled( + path: Path, + prefix: str | None = None, + overwrite: bool = False, +) -> None: + """Register a file or Zarr store to the Tiled catalog. + + Args: + path: Absolute path on the client filesystem (used for logging). + prefix: Optional sub-path within the Tiled catalog. + overwrite: Whether to overwrite existing entries in the Tiled catalog. + """ + logger = get_run_logger() + load_dotenv() + tiled_uri = os.environ["TILED_URI"] + api_key = os.environ["TILED_SINGLE_USER_API_KEY"] + + client = from_uri(tiled_uri, api_key=api_key) + + logger.info(f"Registering {path} to Tiled catalog at {tiled_uri} with prefix {prefix!r}") + try: + await register( + node=client, + path=path, + prefix=prefix or "/", + overwrite=overwrite + ) + except Exception as e: + raise RuntimeError( + f"Failed to register {path} to Tiled catalog at {tiled_uri} " + f"(prefix={prefix!r}): {e}" + ) from e + logger.info(f"Registered {path} to Tiled catalog") + + +@flow(name="register-to-tiled", flow_run_name="register-{path}") +async def register_to_tiled( + path: Path | str, + prefix: str | None = None, + overwrite: bool = False, +) -> None: + """Register a file or Zarr store to the Tiled server. + + Args: + path: Path to the file or Zarr store (client filesystem). + prefix: Optional sub-path within the Tiled catalog. + overwrite: Whether to overwrite existing entries in the Tiled catalog. + """ + logger = get_run_logger() + path = Path(path) + logger.info(f"Submitting task: register {path} to Tiled (prefix={prefix!r})") + await register_file_to_tiled(path, prefix=prefix, overwrite=overwrite) + + +if __name__ == "__main__": + import asyncio + + zarr = Path("/Users/david/Documents/data/tomo/scratch/rec20230606_152011_jong-seto_fungal-mycelia.zarr") + h5 = Path("/Users/david/Documents/data/tomo/raw/20241216_154449_ddd.h5") + asyncio.run(register_to_tiled(zarr, prefix="scratch")) + asyncio.run(register_to_tiled(h5, prefix="raw")) diff --git a/pyproject.toml b/pyproject.toml index 7fee39f6..ede057ec 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -22,7 +22,9 @@ dependencies = [ "python-dotenv", "pyyaml", "scicat_beamline @ git+https://github.com/als-computing/scicat_beamline.git@4828273f5f49ba4eba5442728729e0545b3f5b79", - "sfapi_client" + "sfapi_client", + "tiled[client]", + "watchfiles" ] [build-system] diff --git a/requirements.txt b/requirements.txt index 527e9c0d..84581b18 100644 --- a/requirements.txt +++ b/requirements.txt @@ -17,3 +17,5 @@ python-dotenv pyyaml scicat-beamline @ git+https://github.com/als-computing/scicat_beamline.git@4828273f5f49ba4eba5442728729e0545b3f5b79 sfapi_client +tiled[client] +watchfiles \ No newline at end of file