Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
2b59087
fast job script prototype
eva-1729 Mar 17, 2026
e4fc737
Formatting and linting commit
invalid-email-address Mar 17, 2026
d86699d
fast start tests
eva-1729 Mar 17, 2026
99be90a
Merge branch 'PEARL_fast_jobs' of https://github.com/fiaisis/FIA-API …
eva-1729 Mar 17, 2026
8781540
Formatting and linting commit
invalid-email-address Mar 17, 2026
f1d0155
update tests
eva-1729 Mar 17, 2026
a0e14aa
Merge branch 'PEARL_fast_jobs' of https://github.com/fiaisis/FIA-API …
eva-1729 Mar 17, 2026
09a43fb
Formatting and linting commit
invalid-email-address Mar 17, 2026
fc063bf
Merge branch 'main' into PEARL_fast_jobs
eva-1729 May 15, 2026
c48d9da
Formatting and linting commit
invalid-email-address May 15, 2026
88e5b47
Move pearl_fast_jobs into correct folder
eva-1729 May 15, 2026
19e533b
Merge branch 'PEARL_fast_jobs' of https://github.com/fiaisis/FIA-API …
eva-1729 May 15, 2026
c714dd0
Formatting and linting commit
invalid-email-address May 15, 2026
9f7647d
update script
eva-1729 May 15, 2026
a429f96
Some reformatting to integrate changes that helped pearl_automation w…
eva-1729 May 18, 2026
d9e5dd6
Formatting and linting commit
invalid-email-address May 18, 2026
b4562ed
Some further changes to pearl fast start jobs
eva-1729 May 18, 2026
21fbd6b
Formatting and linting commit
invalid-email-address May 18, 2026
1c401ce
Move and modify patch for tests
eva-1729 May 26, 2026
4b001df
Remove get runner image in tests as it's not needed in script
eva-1729 May 26, 2026
983ed49
remove runner from main()
eva-1729 May 26, 2026
4c9dd34
remove mock runner image
eva-1729 May 26, 2026
674b7e1
Add tests for missing lines
eva-1729 May 26, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
291 changes: 291 additions & 0 deletions examples/job_scripts/pearl_fast_jobs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,291 @@
import argparse
import logging
import os
import sys
import time
from pathlib import Path
from typing import Any

import requests

from fia_api.core.models import State

PEARL_SCRIPT = """
from mantid.simpleapi import *
import numpy as np
import json

Cycles2Run=['15_2','25_3','25_4']
Path2Data = r'/archive/NDXPEARL/Instrument/data'

CycleDict = {
"start_15_2": 90482,
"end_15_2": 91528,
"start_25_3": 124935,
"end_25_3": 124946,
"start_25_4": 124987,
"end_25_4": 125000,
}

output = ""

for cycle in Cycles2Run:
reject=[]
peak_centres=[]
peak_centres_error=[]
peak_intensity=[]
peak_intensity_error=[]
uAmps=[]
RunNo=[]
index=0
start=CycleDict['start_'+cycle]
end=CycleDict['end_'+cycle]
for i in range(start,end+1):
if i == 95382:
continue
Load(Filename=Path2Data+'/cycle_'+cycle+'/PEARL00'+ str(i)+'.nxs', OutputWorkspace=str(i))
ws = mtd[str(i)]
run = ws.getRun()
pcharge = run.getProtonCharge()
if pcharge <1.0:
reject.append(str(i))
DeleteWorkspace(str(i))
continue
NormaliseByCurrent(InputWorkspace=str(i), OutputWorkspace=str(i))
ExtractSingleSpectrum(InputWorkspace=str(i),WorkspaceIndex=index,
OutputWorkspace=str(i)+ '_' + str(index))
CropWorkspace(InputWorkspace=str(i)+ '_' + str(index), Xmin=1100,
Xmax=19990, OutputWorkspace=str(i)+ '_' + str(index))
DeleteWorkspace(str(i))

fit_output = Fit(Function='name=Gaussian,Height=19.2327,\\
PeakCentre=4843.8,Sigma=1532.64,\\
constraints=(4600<PeakCentre<5200,1100<Sigma<1900);\\
name=FlatBackground,A0=16.6099,ties=(A0=16.6099)',
InputWorkspace=str(i)+ '_' + str(index),
MaxIterations=1000,
CreateOutput=True,
Output=str(i)+ '_' + str(index) + '_fit',
OutputCompositeMembers=True,
StartX=3800,
EndX=6850,
Normalise=True)
paramTable = fit_output.OutputParameters
# This catches some fits where the fit constraints are ignored,
# allowing the peak to fall far outside the nominal range
if paramTable.column(1)[1] < 4600.0 or paramTable.column(1)[1] > 5200.0:
DeleteWorkspace(str(i)+'_0_fit_Parameters')
DeleteWorkspace(str(i)+'_0_fit_Workspace')
DeleteWorkspace(str(i)+'_0')
DeleteWorkspace(str(i)+'_0_fit_NormalisedCovarianceMatrix')
reject.append(str(i))
continue
else:
uAmps.append(pcharge)
peak_centres.append(paramTable.column(1)[1])
peak_centres_error.append(paramTable.column(2)[1])
peak_intensity.append(paramTable.column(1)[0])
peak_intensity_error.append(paramTable.column(2)[0])
RunNo.append(str(i))
DeleteWorkspace(str(i)+'_0')
DeleteWorkspace(str(i)+'_0_fit_Parameters')
DeleteWorkspace(str(i)+'_0_fit_Workspace')
DeleteWorkspace(str(i)+'_0_fit_NormalisedCovarianceMatrix')

combined_data=np.column_stack(
(RunNo, uAmps, peak_intensity, peak_intensity_error, peak_centres, peak_centres_error)
)

output += f"peak_centres_{cycle}.csv, "
print(f"combined data for {cycle}: ")
print(combined_data)
np.savetxt('/output/peak_centres_'+cycle+'.csv', combined_data, delimiter=", ", fmt='% s',)

print("Outputting files")
print(json.dumps({"status": "Successful",
"status_message":"Simple job run successfully.",
"output_files": output, "stacktrace": ""}))
"""


logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)


class PearlFastStart:
def __init__(
self,
fia_url: str,
auth_url: str,
username: str | None,
password: str | None,
output_dir: str | Path,
token_refresh_interval: int = 3600, # Default to 1 hour
) -> None:
self.fia_url = fia_url.rstrip("/")
self.auth_url = auth_url.rstrip("/")
self.username = username
self.password = password
self.output_dir = Path(output_dir)
self.token: str | None = None
self.token_refresh_interval = token_refresh_interval
self._token_acquired_at: float = 0.0

def authenticate(self) -> None:
logger.info(f"Authenticating user {self.username} at {self.auth_url}")
try:
response = requests.post(
f"{self.auth_url}/login", json={"username": self.username, "password": self.password}, timeout=30
)
response.raise_for_status()
body = response.json()
self.token = body if isinstance(body, str) else body.get("token")
if not self.token:
raise ValueError("No token found in login response")
self._token_acquired_at = time.monotonic()
logger.info("Authentication successful")
except Exception as e:
logger.error(f"Authentication failed: {e}")
raise

def _is_token_expiring(self) -> bool:
"""Check if the token is due for a proactive refresh."""
return (time.monotonic() - self._token_acquired_at) >= self.token_refresh_interval

def _refresh_token_if_needed(self) -> None:
"""Re-authenticate proactively if the token is close to expiring."""
if self._is_token_expiring():
logger.info("Token nearing expiry, refreshing authentication")
self.authenticate()

def get_headers(self) -> dict[str, str]:
return {"Authorization": f"Bearer {self.token}"}

def submit_job(self, script: str) -> int:
logger.info(f"Submitting fast-start job script to {self.fia_url}")
payload = {"script": script}
# post /job/fast-start
response = requests.post(f"{self.fia_url}/execute", json=payload, headers=self.get_headers(), timeout=30)
response.raise_for_status()
job_id = int(response.json())
logger.info(f"Job submitted successfully. Job ID: {job_id}")
return job_id

def _poll_job_status(self, job_id: int) -> dict[str, Any]:
"""Poll the job status endpoint, re-authenticating on auth-related HTTP errors."""
self._refresh_token_if_needed()
response = requests.get(f"{self.fia_url}/job/{job_id}", headers=self.get_headers(), timeout=30)

if response.status_code in (401, 403, 404):
logger.warning(
f"Received HTTP {response.status_code} while polling job {job_id}, re-authenticating and retrying"
)
self.authenticate()
response = requests.get(f"{self.fia_url}/job/{job_id}", headers=self.get_headers(), timeout=30)

response.raise_for_status()
return response.json()

def monitor_job(self, job_id: int, poll_interval: int = 5) -> dict[str, Any]:
logger.info(f"Monitoring job {job_id}")
while True:
# this won't work until FIA-API supports job status for fast-start jobs,
# but we want to include it here for when that is implemented
job_data: dict[str, Any] = self._poll_job_status(job_id)
state = job_data.get("state")

logger.info(f"Job {job_id} current state: {state}")

if state == State.SUCCESSFUL.value:
logger.info(f"Job {job_id} completed successfully")
return job_data
if state in [State.ERROR.value, State.UNSUCCESSFUL.value]:
error_msg = job_data.get("status_message", "No error message provided")
logger.error(f"Job {job_id} failed with state {state}: {error_msg}")
raise RuntimeError(f"Job {job_id} failed: {error_msg}")

time.sleep(poll_interval)

def download_results(self, job_id: int, outputs: str | list[str] | None) -> None:
if not outputs:
logger.warning(f"No outputs found for job {job_id}")
return

# Outputs is expected to be a string or list of filenames
filenames = outputs.split(",") if isinstance(outputs, str) else outputs

self.output_dir.mkdir(parents=True, exist_ok=True)

for file in filenames:
filename = file.strip()
if not filename:
continue

logger.info(f"Downloading {filename} for job {job_id}")
response = requests.get(
f"{self.fia_url}/job/{job_id}/filename/{filename}", headers=self.get_headers(), timeout=30, stream=True
)
response.raise_for_status()

file_path = self.output_dir / filename
with Path.open(file_path, "wb") as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
logger.info(f"Downloaded {filename} to {file_path}")

def run(self) -> None:
try:
self.authenticate()
job_id = self.submit_job(PEARL_SCRIPT)
job_data = self.monitor_job(job_id)
self.download_results(job_id, job_data.get("outputs"))
logger.info("PEARL automation completed successfully")
except Exception as e:
logger.error(f"PEARL automation failed: {e}")
sys.exit(1)


def main() -> None:
parser = argparse.ArgumentParser(description="Automate PEARL Mantid jobs via FIA API")
parser.add_argument("--fia-url", default=os.environ.get("FIA_API_URL", "http://localhost:8000"), help="FIA API URL")
parser.add_argument(
"--auth-url", default=os.environ.get("AUTH_API_URL", "http://localhost:8001"), help="Auth API URL"
)
parser.add_argument("--username", default=os.environ.get("PEARL_USERNAME"), help="Auth Username")
parser.add_argument("--password", default=os.environ.get("PEARL_PASSWORD"), help="Auth Password")
parser.add_argument(
"--output-dir", default=os.environ.get("OUTPUT_DIRECTORY", "./output"), help="Output directory for results"
)
parser.add_argument(
"--runner", default=os.environ.get("MANTID_RUNNER_IMAGE"), help="Specific Mantid runner image to use"
)
parser.add_argument(
"--token-refresh-interval",
type=int,
default=3600,
help="Interval (in seconds) to refresh the authentication token",
)

args = parser.parse_args()

if not args.username or not args.password:
err_msg = (
"Username and password must be provided via "
"arguments or environment variables (PEARL_USERNAME, PEARL_PASSWORD)"
)
logger.error(err_msg)
sys.exit(1)

automation = PearlFastStart(
args.fia_url,
args.auth_url,
args.username,
args.password,
args.output_dir,
args.token_refresh_interval,
)
automation.run()


if __name__ == "__main__":
main()
Loading