Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,12 @@ globus:
uuid: df82346e-9a15-11ea-b3c4-0ae144191ee3
name: nersc832

bl832-beegfs-raw:
root_path: /beamline_staging/bl832/raw/
uri: beegfs.als.lbl.gov
uuid: d33b5d6e-1603-414e-93cb-bcb732b7914a
name: bl832-beegfs-raw

globus_apps:
als_transfer:
client_id: ${GLOBUS_CLIENT_ID}
Expand Down
3 changes: 3 additions & 0 deletions orchestration/_tests/test_globus_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ def __init__(self) -> None:
MockSecret.for_endpoint("nersc832_alsdev_scratch")),
"alcf832_raw": MockEndpoint("mock_alcf832_raw_path", MockSecret.for_endpoint("alcf832_raw")),
"alcf832_scratch": MockEndpoint("mock_alcf832_scratch_path", MockSecret.for_endpoint("alcf832_scratch")),
"beegfs_raw": MockEndpoint("mock_beegfs_raw_path", MockSecret.for_endpoint("beegfs_raw"))
}

# Mock apps
Expand All @@ -169,6 +170,7 @@ def __init__(self) -> None:
self.data832_raw = self.endpoints["data832_raw"]
self.data832_scratch = self.endpoints["data832_scratch"]
self.nersc832_alsdev_scratch = self.endpoints["nersc832_alsdev_scratch"]
self.beegfs_raw = self.endpoints["beegfs_raw"]
self.scicat = config["scicat"]


Expand Down Expand Up @@ -250,6 +252,7 @@ def test_alcf_recon_flow(mocker: MockFixture):
"nersc832_alsdev_recon_scripts": mocker.MagicMock(),
"alcf832_raw": mocker.MagicMock(),
"alcf832_scratch": mocker.MagicMock(),
"bl832-beegfs-raw": mocker.MagicMock(),
}
)
mocker.patch(
Expand Down
1 change: 1 addition & 0 deletions orchestration/flows/bl832/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ def _beam_specific_config(self) -> None:
self.nersc832_alsdev_recon_scripts = self.endpoints["nersc832_alsdev_recon_scripts"]
self.alcf832_raw = self.endpoints["alcf832_raw"]
self.alcf832_scratch = self.endpoints["alcf832_scratch"]
self.beegfs_raw = self.endpoints["bl832-beegfs-raw"]
# SciCat
self.scicat = self.config["scicat"]
# NERSC HPC submission settings
Expand Down
16 changes: 16 additions & 0 deletions orchestration/flows/bl832/move.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from orchestration.globus.transfer import GlobusEndpoint, start_transfer
from orchestration.prune_controller import get_prune_controller, PruneMethod
from orchestration.prometheus_utils import PrometheusMetrics
from orchestration.transfer_controller import CopyMethod, get_transfer_controller


API_KEY = os.getenv("API_KEY")
Expand Down Expand Up @@ -156,6 +157,21 @@ def process_new_832_file_task(
except Exception as e:
logger.error(f"SciCat ingest failed with {e}")

transfer_controller = get_transfer_controller(
transfer_type=CopyMethod.GLOBUS,
config=config,
prometheus_metrics=None
)

transfer_controller.copy(
file_path=relative_path,
source=config.data832,
destination=config.beegfs_raw
)
logger.info(f"File successfully transferred from data832 to beegfs {file_path}")

# TODO: we should trigger the tiled ingestion flow in orchestration.tiled, but that flow will be set up on Ride/beegfs

logger.info("Initializing prune controller")
prune_controller = get_prune_controller(
prune_type=PruneMethod.GLOBUS,
Expand Down
76 changes: 76 additions & 0 deletions orchestration/tiled.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
"""Register data files and Zarr/HDF5 stores to the Tiled catalog.

Intended to run on a Ride worker. The file path must be accessible
to the Tiled server's filesystem.
"""
from dotenv import load_dotenv
import os
from pathlib import Path

from prefect import flow, get_run_logger, task
from tiled.client import from_uri
from tiled.client.register import register


@task(name="register-file-to-tiled", task_run_name="register-{path}")
async def register_file_to_tiled(
path: Path,
prefix: str | None = None,
overwrite: bool = False,
) -> None:
"""Register a file or Zarr store to the Tiled catalog.

Args:
path: Absolute path on the client filesystem (used for logging).
prefix: Optional sub-path within the Tiled catalog.
overwrite: Whether to overwrite existing entries in the Tiled catalog.
"""
logger = get_run_logger()
load_dotenv()
tiled_uri = os.environ["TILED_URI"]
api_key = os.environ["TILED_SINGLE_USER_API_KEY"]

client = from_uri(tiled_uri, api_key=api_key)

logger.info(f"Registering {path} to Tiled catalog at {tiled_uri} with prefix {prefix!r}")
try:
await register(
node=client,
path=path,
prefix=prefix or "/",
overwrite=overwrite
)
except Exception as e:
raise RuntimeError(
f"Failed to register {path} to Tiled catalog at {tiled_uri} "
f"(prefix={prefix!r}): {e}"
) from e
logger.info(f"Registered {path} to Tiled catalog")


@flow(name="register-to-tiled", flow_run_name="register-{path}")
async def register_to_tiled(
path: Path | str,
prefix: str | None = None,
overwrite: bool = False,
) -> None:
"""Register a file or Zarr store to the Tiled server.

Args:
path: Path to the file or Zarr store (client filesystem).
prefix: Optional sub-path within the Tiled catalog.
overwrite: Whether to overwrite existing entries in the Tiled catalog.
"""
logger = get_run_logger()
path = Path(path)
logger.info(f"Submitting task: register {path} to Tiled (prefix={prefix!r})")
await register_file_to_tiled(path, prefix=prefix, overwrite=overwrite)


if __name__ == "__main__":
import asyncio

zarr = Path("/Users/david/Documents/data/tomo/scratch/rec20230606_152011_jong-seto_fungal-mycelia.zarr")
h5 = Path("/Users/david/Documents/data/tomo/raw/20241216_154449_ddd.h5")
asyncio.run(register_to_tiled(zarr, prefix="scratch"))
asyncio.run(register_to_tiled(h5, prefix="raw"))
4 changes: 3 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ dependencies = [
"python-dotenv",
"pyyaml",
"scicat_beamline @ git+https://github.com/als-computing/scicat_beamline.git@4828273f5f49ba4eba5442728729e0545b3f5b79",
"sfapi_client"
"sfapi_client",
"tiled[client]",
"watchfiles"
]

[build-system]
Expand Down
2 changes: 2 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,5 @@ python-dotenv
pyyaml
scicat-beamline @ git+https://github.com/als-computing/scicat_beamline.git@4828273f5f49ba4eba5442728729e0545b3f5b79
sfapi_client
tiled[client]
watchfiles
Loading