refactor(jobs): change /tasks and /results endpoints for ML jobs#1197
refactor(jobs): change /tasks and /results endpoints for ML jobs#1197
Conversation
✅ Deploy Preview for antenna-ssec canceled.
|
✅ Deploy Preview for antenna-preview canceled.
|
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughReplaces the GET Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant API as "Django API\n(views)"
participant Serializer as "DRF Serializer\n(TasksRequestSerializer / PipelineResultsRequestSerializer)"
participant Worker as "Task Reservation / Queue"
Client->>API: POST /jobs/{id}/tasks { "batch_size": N, "client_info": {...} }
API->>Serializer: validate request body
Serializer-->>API: validated_data (batch_size, client_info)
API->>Worker: reserve_tasks(count=batch_size)
Worker-->>API: reserved tasks -> respond with tasks
Client->>API: POST /jobs/{id}/result { "results": [...], "client_info": {...} }
API->>Serializer: validate results wrapper
Serializer-->>API: validated_data (results, client_info)
API->>Worker: enqueue/process results
Worker-->>API: ack (results_queued, tasks info)
API-->>Client: 202 Accepted { status, job_id, results_queued, tasks }
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Possibly related issues
Possibly related PRs
Suggested labels
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Pull request overview
Refactors worker-facing job endpoints to move /jobs/{id}/tasks/ from a GET-with-query-params contract to a POST-with-body contract, and introduces structured client_info payload handling intended to accompany worker requests.
Changes:
- Switched
/jobs/{id}/tasks/from GET (?batch=N) to POST with a validated JSON body viaTasksRequestSerializer. - Added
ClientInfoSerializer(and helper utilities) for structured worker/client metadata. - Updated
/jobs/{id}/result/to accept a wrapped request format with{"results": [...], "client_info": {...}}while keeping a legacy bare-list fallback, plus logging format tweaks.
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| ami/jobs/views.py | Changes /tasks to POST + body validation; extends /result request parsing for wrapped payloads; updates logging. |
| ami/jobs/schemas.py | Adds TasksRequestSerializer and pulls in ClientInfoSerializer for request-body validation. |
| ami/jobs/tests/test_jobs.py | Updates tasks endpoint tests to POST with JSON body instead of GET query params. |
| ami/ml/serializers_client_info.py | Introduces ClientInfoSerializer and helper functions for extracting/augmenting client metadata. |
| ami/ml/tests.py | Adds unit tests for ClientInfoSerializer validation behavior. |
Comments suppressed due to low confidence (1)
ami/jobs/schemas.py:31
- Now that
/tasksuses a request-body serializer,batch_paramappears unused in this module (no references found). Consider removing it (or moving to a shared location if still needed) to avoid stale/duplicated schema definitions.
batch_param = OpenApiParameter(
name="batch",
description="Number of tasks to retrieve",
required=False,
type=int,
)
class TasksRequestSerializer(serializers.Serializer):
batch = serializers.IntegerField(min_value=1, required=True)
client_info = ClientInfoSerializer(required=False)
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
🧹 Nitpick comments (5)
ami/jobs/views.py (2)
254-256:client_infois validated but not used.The
TasksRequestSerializervalidatesclient_infofrom the request body, but the validated data is never used in this endpoint. Consider logging it or passing it to_mark_pipeline_pull_services_seen()for tracking worker activity.♻️ Suggested usage
serializer = TasksRequestSerializer(data=request.data) serializer.is_valid(raise_exception=True) batch = serializer.validated_data["batch"] +client_info = serializer.validated_data.get("client_info", {}) job: Job = self.get_object() # ... -# Record heartbeat for async processing services on this pipeline -_mark_pipeline_pull_services_seen(job) +# Record heartbeat for async processing services on this pipeline +_mark_pipeline_pull_services_seen(job) +if client_info: + logger.debug("Task fetch from client: %s", client_info.get("hostname", "unknown"))🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@ami/jobs/views.py` around lines 254 - 256, The serializer validates client_info but the endpoint never uses it; after serializer.is_valid(...) retrieve serializer.validated_data["client_info"] and pass it into _mark_pipeline_pull_services_seen(...) (or at minimum log it) so worker activity is tracked; update the call site that currently only uses batch (variable batch from serializer.validated_data["batch"]) to also accept and forward client_info to the _mark_pipeline_pull_services_seen function and adjust that function signature/invocations if needed to accept the new client_info parameter.
360-367: Consider narrowing the exception scope or adding more context to error response.The broad
except Exceptioncatch (flagged by static analysis) may hide unexpected errors. While it provides resilience, consider:
- Narrowing to expected exceptions (e.g., Celery task failures, serialization errors)
- Adding the error type/message to the response for debugging
- Re-raising unexpected exceptions after logging
♻️ Suggested improvement
except Exception as e: - logger.error("Failed to queue pipeline results for job %s: %s", job.pk, e) + logger.exception("Failed to queue pipeline results for job %s", job.pk) return Response( { "status": "error", "job_id": job.pk, + "error": str(e), }, status=500, )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@ami/jobs/views.py` around lines 360 - 367, The broad except Exception block that logs "Failed to queue pipeline results for job %s" should be narrowed and made more informative: replace the bare except in the view handling the queueing operation (the block that references logger, job.pk and returns a Response) with catches for expected errors (e.g., CeleryError, kombu.exceptions.SerializationError, ValueError) and handle them by logging logger.error with the exception type and message, returning a 500 Response that includes an "error": "<ExceptionType>: <message>" field alongside status and job_id; for truly unexpected exceptions re-raise after logging so they surface to error monitoring. Ensure references to logger, job.pk, and the Response payload are preserved.ami/ml/serializers_client_info.py (1)
35-38: Silent validation failure may hide client errors.When
ClientInfoSerializervalidation fails, the code silently falls back to an empty dict without logging. This could make debugging client issues difficult. Consider logging a warning when validation fails.♻️ Suggested improvement
+import logging + +logger = logging.getLogger(__name__) + + def get_client_info(request) -> dict: # ... serializer = ClientInfoSerializer(data=raw) if serializer.is_valid(): info = serializer.validated_data else: + logger.warning("Invalid client_info in request: %s", serializer.errors) info = {}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@ami/ml/serializers_client_info.py` around lines 35 - 38, The current block silently drops validation errors for ClientInfoSerializer by returning an empty dict when serializer.is_valid() is False; update the code around the serializer.is_valid() check to log a warning (e.g., logger.warning) including serializer.errors and any identifying request/client context before falling back to info = {}, so you can trace client-side validation failures; ensure the module has a logger (e.g., logging.getLogger(__name__)) and include the serializer name (ClientInfoSerializer) in the message.ami/ml/tests.py (1)
1371-1408: Consider consolidating imports and adding test coverage forget_client_info().The test class imports
ClientInfoSerializerinside each test method. Consider moving to a single import at the top of the test class or file for cleaner code.Additionally, while
ClientInfoSerializeris tested, theget_client_info()helper function (which adds server-observed fields likeipanduser_agent) is not tested.♻️ Suggested refactor
+from ami.ml.serializers_client_info import ClientInfoSerializer, get_client_info + + class TestClientInfoSerializer(TestCase): def test_valid_client_info(self): - from ami.ml.serializers_client_info import ClientInfoSerializer - data = { # ...And consider adding a test for
get_client_info():def test_get_client_info_extracts_server_fields(self): from unittest.mock import Mock request = Mock() request.data = {"client_info": {"hostname": "test-host"}} request.headers = {"user-agent": "TestAgent/1.0"} request.META = {"REMOTE_ADDR": "192.168.1.1"} info = get_client_info(request) self.assertEqual(info["hostname"], "test-host") self.assertEqual(info["ip"], "192.168.1.1") self.assertEqual(info["user_agent"], "TestAgent/1.0")🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@ami/ml/tests.py` around lines 1371 - 1408, Move the repeated in-test imports of ClientInfoSerializer to a single import at the top of the test file or the TestClientInfoSerializer class to reduce duplication; also add a new unit test for get_client_info that constructs a mocked request (use unittest.mock.Mock), sets request.data to include a client_info dict, sets request.headers["user-agent"], and request.META["REMOTE_ADDR"], then calls get_client_info(request) and asserts the returned dict contains the original client_info fields plus server-observed "ip" and "user_agent" entries; reference ClientInfoSerializer, get_client_info, and TestClientInfoSerializer when making these changes.ami/jobs/schemas.py (1)
20-25: Remove unusedbatch_paramOpenAPI parameter.The
/tasksendpoint now uses POST with a request body serializer (TasksRequestSerializer), not query parameters. Thebatch_paramdefinition is dead code left from an earlier implementation and is never imported or used.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@ami/jobs/schemas.py` around lines 20 - 25, The OpenApiParameter object batch_param in ami/jobs/schemas.py is dead code (unused by the current /tasks POST that uses TasksRequestSerializer); remove the batch_param variable declaration and any unused imports tied to it so the module no longer defines or exports that unused OpenAPI parameter (search for batch_param in ami/jobs/schemas.py to locate and delete it).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@ami/jobs/schemas.py`:
- Around line 20-25: The OpenApiParameter object batch_param in
ami/jobs/schemas.py is dead code (unused by the current /tasks POST that uses
TasksRequestSerializer); remove the batch_param variable declaration and any
unused imports tied to it so the module no longer defines or exports that unused
OpenAPI parameter (search for batch_param in ami/jobs/schemas.py to locate and
delete it).
In `@ami/jobs/views.py`:
- Around line 254-256: The serializer validates client_info but the endpoint
never uses it; after serializer.is_valid(...) retrieve
serializer.validated_data["client_info"] and pass it into
_mark_pipeline_pull_services_seen(...) (or at minimum log it) so worker activity
is tracked; update the call site that currently only uses batch (variable batch
from serializer.validated_data["batch"]) to also accept and forward client_info
to the _mark_pipeline_pull_services_seen function and adjust that function
signature/invocations if needed to accept the new client_info parameter.
- Around line 360-367: The broad except Exception block that logs "Failed to
queue pipeline results for job %s" should be narrowed and made more informative:
replace the bare except in the view handling the queueing operation (the block
that references logger, job.pk and returns a Response) with catches for expected
errors (e.g., CeleryError, kombu.exceptions.SerializationError, ValueError) and
handle them by logging logger.error with the exception type and message,
returning a 500 Response that includes an "error": "<ExceptionType>: <message>"
field alongside status and job_id; for truly unexpected exceptions re-raise
after logging so they surface to error monitoring. Ensure references to logger,
job.pk, and the Response payload are preserved.
In `@ami/ml/serializers_client_info.py`:
- Around line 35-38: The current block silently drops validation errors for
ClientInfoSerializer by returning an empty dict when serializer.is_valid() is
False; update the code around the serializer.is_valid() check to log a warning
(e.g., logger.warning) including serializer.errors and any identifying
request/client context before falling back to info = {}, so you can trace
client-side validation failures; ensure the module has a logger (e.g.,
logging.getLogger(__name__)) and include the serializer name
(ClientInfoSerializer) in the message.
In `@ami/ml/tests.py`:
- Around line 1371-1408: Move the repeated in-test imports of
ClientInfoSerializer to a single import at the top of the test file or the
TestClientInfoSerializer class to reduce duplication; also add a new unit test
for get_client_info that constructs a mocked request (use unittest.mock.Mock),
sets request.data to include a client_info dict, sets
request.headers["user-agent"], and request.META["REMOTE_ADDR"], then calls
get_client_info(request) and asserts the returned dict contains the original
client_info fields plus server-observed "ip" and "user_agent" entries; reference
ClientInfoSerializer, get_client_info, and TestClientInfoSerializer when making
these changes.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 4d8d07a9-2d2c-423a-8731-6736bcd55d66
📒 Files selected for processing (5)
ami/jobs/schemas.pyami/jobs/tests/test_jobs.pyami/jobs/views.pyami/ml/serializers_client_info.pyami/ml/tests.py
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
ami/jobs/views.py (1)
333-370:⚠️ Potential issue | 🟠 MajorDon't report full failure after partial enqueue.
Once Line 333 starts dispatching, a later exception can leave earlier results already accepted, but Line 365 still tells the client the whole batch failed. That makes a retry path duplicate already-enqueued results. Return the accepted subset on the error path, or make enqueueing idempotent by reply subject before clients retry.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@ami/jobs/views.py` around lines 333 - 370, The current except Exception path reports the entire batch as failed even after some results were already enqueued by process_nats_pipeline_result, leading to unsafe client retries; change the error handling around the loop that builds queued_tasks so that on exception you return a Response containing the accepted subset (job_id, results_queued=len(queued_tasks), tasks=queued_tasks) and a distinct status like "partial_error" (and still log the exception via logger.error referencing job.pk and the exception) instead of reporting the whole batch as failed; update the except block that currently logs "Failed to queue pipeline results for job %s: %s" to return the partial acceptance payload so clients can retry only the remaining items or de-duplicate safely.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@ami/jobs/views.py`:
- Around line 321-323: The loop building PipelineTaskResult from raw_results can
raise TypeError when an item is not a mapping (e.g., int or None) before
Pydantic validation runs; update the code that iterates raw_results (where
PipelineTaskResult(**item) is called) to guard each item by checking
isinstance(item, dict) and if not, raise a fast HTTP 400 (bad request) or append
a validation error for that item; alternatively, catch TypeError alongside
pydantic.ValidationError where PipelineTaskResult(...) is instantiated and
convert it into a 400 response instead of falling through to the generic 500
handler so invalid client input returns 400.
---
Outside diff comments:
In `@ami/jobs/views.py`:
- Around line 333-370: The current except Exception path reports the entire
batch as failed even after some results were already enqueued by
process_nats_pipeline_result, leading to unsafe client retries; change the error
handling around the loop that builds queued_tasks so that on exception you
return a Response containing the accepted subset (job_id,
results_queued=len(queued_tasks), tasks=queued_tasks) and a distinct status like
"partial_error" (and still log the exception via logger.error referencing job.pk
and the exception) instead of reporting the whole batch as failed; update the
except block that currently logs "Failed to queue pipeline results for job %s:
%s" to return the partial acceptance payload so clients can retry only the
remaining items or de-duplicate safely.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 00cfd037-eb54-415e-84ef-0174cf1e9af1
📒 Files selected for processing (2)
ami/jobs/schemas.pyami/jobs/views.py
🚧 Files skipped from review as they are similar to previous changes (1)
- ami/jobs/schemas.py
|
@copilot add instructions for deployment and suggest changes that need to be made in the Processing Services (https://github.com/RolnickLab/ami-data-companion specifically for this to work) |
The PR description now includes deployment sequencing and the exact code change needed in Required change: # Before (GET with batch as query param)
response = self._api_session.get(url, params={"batch": self.batch_size, ...}, timeout=30)
# After (POST with batch in JSON body)
body = {"batch": self.batch_size}
params = {"processing_service_name": self.processing_service_name}
response = self._api_session.post(url, json=body, params=params, timeout=30)Also add a trailing slash to the URL: No other files in Deployment order: Deploy this Antenna backend change first, then update the worker. During the gap, workers receive |
…erializer
Convert the /jobs/{id}/tasks/ endpoint from GET with query params to POST with request body.
This allows for validated request data including optional client_info field, better for
processing service workers that need to report their identity.
Changes:
- Add ami/ml/serializers_client_info.py with ClientInfoSerializer and get_client_info() helper
- Add TasksRequestSerializer to ami/jobs/schemas.py for validating batch and client_info
- Update /tasks action from GET to POST with serializer-based validation
- Support wrapped format in /result endpoint: {"client_info": {...}, "results": [...]}
- Update logger calls to use %s format (more efficient)
- Update all tests to use POST instead of GET
- Add TestClientInfoSerializer test class with validation tests
No changes to _mark_pipeline_pull_services_seen() or permission_classes.
Co-Authored-By: Claude <noreply@anthropic.com>
- Remove ClientInfoSerializer module and tests (belongs in backend auth/identity branch) - Remove client_info field from TasksRequestSerializer - Remove unused batch_param OpenAPI parameter (dead code) - Add project_id_doc_param to /tasks/ extend_schema - Validate 'results' is a list in wrapped /result/ format - Clean up docstrings Co-Authored-By: Claude <noreply@anthropic.com>
Replace manual loop with pydantic.parse_obj_as() to validate the results list. This catches non-dict items (e.g. int, null) with a proper 400 ValidationError instead of a TypeError falling through to the generic 500 handler. Co-Authored-By: Claude <noreply@anthropic.com>
b987f2d to
4cb1b7f
Compare
|
Fixed the This catches non-dict items (int, None, etc.) natively via Pydantic validation, returning a proper 400 instead of falling through to the generic 500 handler. |
|
Tip For best results, initiate chat on the files or code changes. 🧩 Analysis chain🏁 Script executed: Length of output: 0 🏁 Script executed: Length of output: 0 🏁 Script executed: Length of output: 3190
One minor heads-up for the future: pydantic.TypeAdapter(list[PipelineTaskResult]).validate_python(raw_results)Not an issue now, but worth keeping in mind when upgrading. If you found this review helpful, would you consider giving us a shout-out on X? Thank you for using CodeRabbit! |
There was a problem hiding this comment.
🧹 Nitpick comments (1)
ami/jobs/views.py (1)
360-367: Consider narrowing the exception catch (optional).Static analysis flags the broad
Exceptioncatch. While this serves as a safety net for unexpected Celery queueing failures, catching all exceptions can mask programming errors.If desired, you could narrow this to specific expected exceptions:
♻️ Optional: Narrow exception types
except pydantic.ValidationError as e: raise ValidationError(f"Invalid result data: {e}") from e - except Exception as e: + except (OSError, ConnectionError, RuntimeError) as e: + # Celery/Redis connection or task queueing failures logger.error("Failed to queue pipeline results for job %s: %s", job.pk, e) return Response( { "status": "error", "job_id": job.pk, }, status=500, )Alternatively, you can suppress the lint with
# noqa: BLE001if the broad catch is intentional for resilience.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@ami/jobs/views.py` around lines 360 - 367, The current except Exception as e block around the pipeline queuing (catching all exceptions and logging via logger.error and returning Response) is too broad; either restrict it to the expected exception types (e.g., Celery-related exceptions such as CeleryError or kombu.exceptions.* or whatever queueing-specific exceptions your project uses) by replacing "except Exception" with those specific exceptions around the queueing logic, or if the broad catch is intentionally required for resilience, add a lint suppression comment (e.g., # noqa: BLE001) adjacent to the except line to silence the static analysis; update logger.error and the Response return behavior only if needed after narrowing the except.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@ami/jobs/views.py`:
- Around line 360-367: The current except Exception as e block around the
pipeline queuing (catching all exceptions and logging via logger.error and
returning Response) is too broad; either restrict it to the expected exception
types (e.g., Celery-related exceptions such as CeleryError or kombu.exceptions.*
or whatever queueing-specific exceptions your project uses) by replacing "except
Exception" with those specific exceptions around the queueing logic, or if the
broad catch is intentionally required for resilience, add a lint suppression
comment (e.g., # noqa: BLE001) adjacent to the except line to silence the static
analysis; update logger.error and the Response return behavior only if needed
after narrowing the except.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: ddcdb8fe-d13e-4252-9d2f-fdc4d0090f14
📒 Files selected for processing (3)
ami/jobs/schemas.pyami/jobs/tests/test_jobs.pyami/jobs/views.py
🚧 Files skipped from review as they are similar to previous changes (2)
- ami/jobs/tests/test_jobs.py
- ami/jobs/schemas.py
|
Identity metadata for specific processing service instances. Allows tracking which worker/pod is making requests when a single ProcessingService has multiple workers. Co-Authored-By: Claude <noreply@anthropic.com>
Add typed request/response serializers for both job endpoints: - TasksRequestSerializer (existing, now with client_info) - TasksResponseSerializer - PipelineResultsRequestSerializer (replaces manual isinstance checks) - PipelineResultsResponseSerializer Uses SchemaField to delegate item validation to Pydantic models, keeping DRF for the HTTP envelope and Pydantic for domain schemas. Co-Authored-By: Claude <noreply@anthropic.com>
Replace manual isinstance branching and parse_obj_as with
PipelineResultsRequestSerializer. One canonical request shape:
{"results": [...]}. Bare list and single-item formats removed.
Tests updated to use wrapped format and assert bare lists are rejected.
Co-Authored-By: Claude <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
ami/jobs/views.py (1)
320-359:⚠️ Potential issue | 🟠 MajorDon't report a full failure after partially queuing the batch.
The loop publishes results one by one, but any later exception returns a generic 500 for the whole request. A caller retrying after that will resend earlier
reply_subjects that were already accepted. Either enqueue the batch atomically, or makeprocess_nats_pipeline_resultidempotent onreply_subjectand return the queued subset on failure.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@ami/jobs/views.py` around lines 320 - 359, The current loop over validated_results calls process_nats_pipeline_result.delay and if any exception occurs later the except block returns a 500 for the whole request even though some tasks were already queued; update the handler to avoid reporting full failure after partial success by making the enqueue operation fault-tolerant: perform per-item try/except inside the loop (wrap the call to process_nats_pipeline_result.delay and the queued_tasks append in a try) so you collect both queued_tasks and failed_tasks (include reply_subject, error info, and skip/continue on failures), log each per-item failure, and finally return a response that reports accepted/queued subset and failed items instead of raising a generic error. Alternatively, ensure process_nats_pipeline_result is idempotent on reply_subject so retries are safe and then respond with the queued subset if any exceptions still occur.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@ami/jobs/views.py`:
- Around line 316-318: Request handling currently rejects legacy bare-list
payloads because PipelineResultsRequestSerializer expects {"results": [...]};
detect when request.data is a list (or tuple) and wrap it into {"results":
request.data} before creating/validating the serializer so existing workers
posting [...] continue to work. Update the code around
PipelineResultsRequestSerializer / serializer / validated_results to normalize
request.data into the wrapped shape when necessary, then call
serializer.is_valid(raise_exception=True) as before.
In `@ami/ml/schemas.py`:
- Around line 265-276: ProcessingServiceClientInfo is being used as optional
client_info fields in the job schemas; confirm whether ami/jobs/schemas.py was
changed in this PR or already referenced client_info prior to it, and then
either (a) keep the ProcessingServiceClientInfo class if ami.jobs.schemas
legitimately needs it, (b) remove the class and related imports if
ami/jobs/schemas.py shouldn’t reference client_info in this PR, or (c) leave the
optional fields but add a clarifying comment and update the PR/deployment notes
to state that client_info is placeholder/deferred; check and act on usages of
ProcessingServiceClientInfo and the client_info fields in the Job schema
definitions to align code and PR notes.
---
Outside diff comments:
In `@ami/jobs/views.py`:
- Around line 320-359: The current loop over validated_results calls
process_nats_pipeline_result.delay and if any exception occurs later the except
block returns a 500 for the whole request even though some tasks were already
queued; update the handler to avoid reporting full failure after partial success
by making the enqueue operation fault-tolerant: perform per-item try/except
inside the loop (wrap the call to process_nats_pipeline_result.delay and the
queued_tasks append in a try) so you collect both queued_tasks and failed_tasks
(include reply_subject, error info, and skip/continue on failures), log each
per-item failure, and finally return a response that reports accepted/queued
subset and failed items instead of raising a generic error. Alternatively,
ensure process_nats_pipeline_result is idempotent on reply_subject so retries
are safe and then respond with the queued subset if any exceptions still occur.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: a40158ad-3a5b-4714-b1fe-b413034117e6
📒 Files selected for processing (4)
ami/jobs/schemas.pyami/jobs/tests/test_jobs.pyami/jobs/views.pyami/ml/schemas.py
🚧 Files skipped from review as they are similar to previous changes (1)
- ami/jobs/schemas.py
"batch" sounds like content, "batch_size" clearly communicates a requested quantity. Matches the ADC worker's field name. Co-Authored-By: Claude <noreply@anthropic.com>
There was a problem hiding this comment.
🧹 Nitpick comments (1)
ami/jobs/views.py (1)
357-364: Consider narrowing the exception catch or adding explicit exception types.The static analysis flags
except Exceptionas overly broad. While this serves as a reasonable catch-all for task queueing failures (Celery/Redis issues), it could mask unexpected bugs likeAttributeErrorfrom code changes.Consider catching specific exceptions that represent queueing failures:
♻️ Optional: Narrow exception types
- except Exception as e: + except (ConnectionError, TimeoutError, RuntimeError) as e: logger.error("Failed to queue pipeline results for job %s: %s", job.pk, e)Or alternatively, re-raise unexpected exceptions after logging:
except Exception as e: - logger.error("Failed to queue pipeline results for job %s: %s", job.pk, e) + logger.error("Failed to queue pipeline results for job %s: %s", job.pk, e, exc_info=True) return Response(🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@ami/jobs/views.py` around lines 357 - 364, The catch-all "except Exception as e" around the queueing logic should be narrowed: replace it with explicit handlers for queue-related errors (e.g., CeleryError, kombu.exceptions.OperationalError, redis.exceptions.RedisError or your project's QueueError) and log those with logger.error("Failed to queue pipeline results for job %s: %s", job.pk, e); for any other unexpected exceptions keep a final bare "except Exception" only to log and re-raise (or simply omit broad catching) so AttributeError/TypeError won't be swallowed; add the necessary imports for the specific exception classes and update the except blocks around the queueing code that references job.pk and logger.error accordingly.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@ami/jobs/views.py`:
- Around line 357-364: The catch-all "except Exception as e" around the queueing
logic should be narrowed: replace it with explicit handlers for queue-related
errors (e.g., CeleryError, kombu.exceptions.OperationalError,
redis.exceptions.RedisError or your project's QueueError) and log those with
logger.error("Failed to queue pipeline results for job %s: %s", job.pk, e); for
any other unexpected exceptions keep a final bare "except Exception" only to log
and re-raise (or simply omit broad catching) so AttributeError/TypeError won't
be swallowed; add the necessary imports for the specific exception classes and
update the except blocks around the queueing code that references job.pk and
logger.error accordingly.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 442baafa-f395-42c9-ae11-8e205d7df8c5
📒 Files selected for processing (3)
ami/jobs/schemas.pyami/jobs/tests/test_jobs.pyami/jobs/views.py
🚧 Files skipped from review as they are similar to previous changes (1)
- ami/jobs/schemas.py
Naming discussion: schema layers & conventionsResearched naming patterns across Celery, Temporal, NATS, SQS, Ray, BentoML, and Spring Batch to validate our naming choices. How names map across layersHTTP serializers (
Pattern: Domain schemas (
Key insight"TaskResult" as the envelope name is the dominant pattern across distributed frameworks (Celery, Temporal, Ray all use it). The original confusion wasn't the domain names — it was the collision between Future cleanup (separate PR)Shorter domain schema names for clarity:
These touch 30+ files across the codebase so they belong in a dedicated rename PR. |
Rename DRF serializers to MLJob{Action}{Direction}Serializer pattern:
- TasksRequestSerializer → MLJobTasksRequestSerializer
- TasksResponseSerializer → MLJobTasksResponseSerializer
- PipelineResultsRequestSerializer → MLJobResultsRequestSerializer
- PipelineResultsResponseSerializer → MLJobResultsResponseSerializer
ProcessingServiceClientInfo: remove specific fields, use extra="allow"
so processing services can send any useful identity key-value pairs.
Add detailed docstrings clarifying each serializer's role in the
processing service ↔ Antenna flow.
Fix test_result_endpoint_with_error_result to use wrapped payload format.
Co-Authored-By: Claude <noreply@anthropic.com>
Add djangorestframework-api-key based authentication for processing services. Each ProcessingService can have API keys managed via Django admin or API. Authenticated requests identify the specific service, enabling per-PS heartbeat tracking and client_info collection. Changes: - ProcessingServiceAPIKey model (AbstractAPIKey) with FK to ProcessingService - DRF auth backend (ProcessingServiceAPIKeyAuthentication) + permission class - HasProcessingServiceAPIKey permission on /tasks and /result endpoints - Per-PS heartbeat with client_info (ip, user_agent, hostname, software, etc.) - generate_key API action and Django admin action - Dual-auth pipeline registration (API key or legacy token) - Serializers refactored into package with client_info extraction - Minimal processing service self-registration example Jobs endpoint HTTP semantics unchanged (GET /tasks, bare list /result). The GET→POST refactor and request serializers are in PR #1197. Co-Authored-By: Claude <noreply@anthropic.com>
Add djangorestframework-api-key based authentication for processing services. Each ProcessingService can have API keys managed via Django admin or API. Authenticated requests identify the specific service, enabling per-PS heartbeat tracking and client_info collection. Changes: - ProcessingServiceAPIKey model (AbstractAPIKey) with FK to ProcessingService - DRF auth backend (ProcessingServiceAPIKeyAuthentication) + permission class - HasProcessingServiceAPIKey permission on /tasks and /result endpoints - Per-PS heartbeat with client_info (ip, user_agent, hostname, software, etc.) - generate_key API action and Django admin action - Dual-auth pipeline registration (API key or legacy token) - Serializers refactored into package with client_info extraction - Minimal processing service self-registration example Jobs endpoint HTTP semantics unchanged (GET /tasks, bare list /result). The GET→POST refactor and request serializers are in PR #1197. Co-Authored-By: Claude <noreply@anthropic.com>
Add djangorestframework-api-key based authentication for processing services. Each ProcessingService can have API keys managed via Django admin or API. Authenticated requests identify the specific service, enabling per-PS heartbeat tracking and client_info collection. Changes: - ProcessingServiceAPIKey model (AbstractAPIKey) with FK to ProcessingService - DRF auth backend (ProcessingServiceAPIKeyAuthentication) + permission class - HasProcessingServiceAPIKey permission on /tasks and /result endpoints - Per-PS heartbeat with client_info (ip, user_agent, hostname, software, etc.) - generate_key API action and Django admin action - Dual-auth pipeline registration (API key or legacy token) - Serializers refactored into package with client_info extraction - Minimal processing service self-registration example Jobs endpoint HTTP semantics unchanged (GET /tasks, bare list /result). The GET→POST refactor and request serializers are in PR #1197. Co-Authored-By: Claude <noreply@anthropic.com>
Add djangorestframework-api-key based authentication for processing services. Each ProcessingService can have API keys managed via Django admin or API. Authenticated requests identify the specific service, enabling per-PS heartbeat tracking and client_info collection. Changes: - ProcessingServiceAPIKey model (AbstractAPIKey) with FK to ProcessingService - DRF auth backend (ProcessingServiceAPIKeyAuthentication) + permission class - HasProcessingServiceAPIKey permission on /tasks and /result endpoints - Per-PS heartbeat with client_info (ip, user_agent, hostname, software, etc.) - generate_key API action and Django admin action - Dual-auth pipeline registration (API key or legacy token) - Serializers refactored into package with client_info extraction - Minimal processing service self-registration example Jobs endpoint HTTP semantics unchanged (GET /tasks, bare list /result). The GET→POST refactor and request serializers are in PR #1197. Co-Authored-By: Claude <noreply@anthropic.com>
Add djangorestframework-api-key based authentication for processing services. Each ProcessingService can have API keys managed via Django admin or API. Authenticated requests identify the specific service, enabling per-PS heartbeat tracking and client_info collection. Changes: - ProcessingServiceAPIKey model (AbstractAPIKey) with FK to ProcessingService - DRF auth backend (ProcessingServiceAPIKeyAuthentication) + permission class - HasProcessingServiceAPIKey permission on /tasks and /result endpoints - Per-PS heartbeat with client_info (ip, user_agent, hostname, software, etc.) - generate_key API action and Django admin action - Dual-auth pipeline registration (API key or legacy token) - Serializers refactored into package with client_info extraction - Minimal processing service self-registration example Jobs endpoint HTTP semantics unchanged (GET /tasks, bare list /result). The GET→POST refactor and request serializers are in PR #1197. Co-Authored-By: Claude <noreply@anthropic.com>
Replace broad `except Exception` with specific `(OSError, KombuError)` to catch only Celery broker connection failures. Returns 503 with a descriptive message instead of a generic 500 that would swallow bugs. Co-Authored-By: Claude <noreply@anthropic.com>
Follows the codebase convention: schemas.py holds OpenApiParameter definitions and Pydantic models, serializers.py holds DRF serializers. Co-Authored-By: Claude <noreply@anthropic.com>
Replace ListField(child=DictField()) with SchemaField using real Pydantic models for proper OpenAPI schema generation and validation: - MLJobTasksResponseSerializer.tasks → list[PipelineProcessingTask] - MLJobResultsResponseSerializer.tasks → list[QueuedTaskAcknowledgment] Co-Authored-By: Claude <noreply@anthropic.com>
Pydantic model belongs in the schema file, not serializers. Co-Authored-By: Claude <noreply@anthropic.com>
Adapt to breaking API changes in RolnickLab/antenna#1197: - /jobs/{id}/tasks/ changed from GET with ?batch=N to POST with {"batch_size": N} in the request body - /jobs/{id}/result/ now expects {"results": [...]} wrapper instead of a bare list - Trailing slash is required on POST URLs (Django APPEND_SLASH cannot redirect POSTs) - processing_service_name query param removed from both endpoints Also adds typed Pydantic schemas for the new request/response shapes (AntennaTasksRequest, AntennaResultPostResponse) and updates the mock test server to use them as FastAPI body parameters. Co-Authored-By: Claude <noreply@anthropic.com>
|
Re: bare-list backward compatibility concern from CodeRabbit — the ADC PR (RolnickLab/ami-data-companion#134) already sends the wrapped |
The upstream Antenna API (RolnickLab/antenna#1197) no longer accepts processing_service_name on GET /jobs. Remove the parameter from the client, worker call site, mock server, and test assertions. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…lt (#134) * refactor: update Antenna API client for POST /tasks and wrapped /result Adapt to breaking API changes in RolnickLab/antenna#1197: - /jobs/{id}/tasks/ changed from GET with ?batch=N to POST with {"batch_size": N} in the request body - /jobs/{id}/result/ now expects {"results": [...]} wrapper instead of a bare list - Trailing slash is required on POST URLs (Django APPEND_SLASH cannot redirect POSTs) - processing_service_name query param removed from both endpoints Also adds typed Pydantic schemas for the new request/response shapes (AntennaTasksRequest, AntennaResultPostResponse) and updates the mock test server to use them as FastAPI body parameters. Co-Authored-By: Claude <noreply@anthropic.com> * refactor: remove unused processing_service_name from result posting and dataset APIs The processing_service_name parameter was threaded through post_batch_results, ResultPoster.post_async, RESTDataset, get_rest_dataloader, and _process_job but was never sent to the Antenna API in any of these code paths. Removes the dead parameter from all affected signatures and call sites. Also adds gt=0 validation to AntennaTasksRequest.batch_size. Co-Authored-By: Claude <noreply@anthropic.com> * fix(tests): remove stale processing_service_name arg from _process_job calls The previous commit removed processing_service_name from _process_job but missed updating the test call sites, causing "got multiple values for argument 'device'" errors. Co-Authored-By: Claude <noreply@anthropic.com> * refactor: remove processing_service_name from GET /jobs endpoint The upstream Antenna API (RolnickLab/antenna#1197) no longer accepts processing_service_name on GET /jobs. Remove the parameter from the client, worker call site, mock server, and test assertions. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix(tests): strict URL matching in mock server and TestClient - Add trailing slash to POST /tasks/ route (matches client and /result/) - Set redirect_slashes=False on mock FastAPI app so slash mismatches cause 404 instead of silent redirects - Set follow_redirects=False on all TestClient instances so redirects surface as test failures Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * refactor: clean up stale docstrings and vestigial benchmark service_name - Remove service_name from result_posting.py docstring examples (param was removed in 588c186 but examples not updated) - Remove vestigial service_name param from benchmark run_benchmark() and CLI arg (no longer passed to any API call) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude <noreply@anthropic.com>
Add djangorestframework-api-key based authentication for processing services. Each ProcessingService can have API keys managed via Django admin or API. Authenticated requests identify the specific service, enabling per-PS heartbeat tracking and client_info collection. Changes: - ProcessingServiceAPIKey model (AbstractAPIKey) with FK to ProcessingService - DRF auth backend (ProcessingServiceAPIKeyAuthentication) + permission class - HasProcessingServiceAPIKey permission on /tasks and /result endpoints - Per-PS heartbeat with client_info (ip, user_agent, hostname, software, etc.) - generate_key API action and Django admin action - Dual-auth pipeline registration (API key or legacy token) - Serializers refactored into package with client_info extraction - Minimal processing service self-registration example Jobs endpoint HTTP semantics unchanged (GET /tasks, bare list /result). The GET→POST refactor and request serializers are in PR #1197. Co-Authored-By: Claude <noreply@anthropic.com>
Summary
Standardizes the HTTP contracts for the two endpoints that Processing Services use to interact with Antenna jobs. Both endpoints now use typed DRF serializers with Pydantic
SchemaFieldfor domain validation, and both accept an optionalclient_infofield for future PS identity tracking.Extracted from #1194 to keep the endpoint contract change separate from the auth/identity work.
What changed
/jobs/{id}/tasks/?batch=Nquery param{"batch_size": N}body viaMLJobTasksRequestSerializer/jobs/{id}/result/{"results": [...]}body viaMLJobResultsRequestSerializerNew serializers
All in
ami/jobs/serializers.py. Naming pattern:MLJob{Action}{Direction}SerializerMLJobTasksRequestSerializer/tasks/batch_size(int),client_info(optional)MLJobTasksResponseSerializer/tasks/tasks(list)MLJobResultsRequestSerializer/result/results(list ofPipelineTaskResult),client_info(optional)MLJobResultsResponseSerializer/result/status,job_id,results_queued,tasksNew Pydantic model in
ami/ml/schemas.py:ProcessingServiceClientInfoextra = "allow"so processing services can send any key-value pairs useful for distinguishing machines running the same parent ProcessingService.Endpoint contracts
POST /jobs/{id}/tasks/?project_id={id}— Fetch tasks from the job queuePOST /jobs/{id}/result/?project_id={id}— Submit pipeline resultsError responses
Both endpoints return structured error responses from two validation layers:
DRF serializer validation (400) — missing or invalid fields in the HTTP envelope:
Pydantic
SchemaFieldvalidation (400) — invalid domain objects insideresults:Task queue unavailable (503) — Celery broker unreachable when queueing results:
{"status": "error", "job_id": 1, "detail": "Task queue temporarily unavailable"}Both layers use DRF's standard exception handler, so error shapes are consistent with the rest of the API.
Related Issues
Relates to #1194
How to Test
Unit tests:
docker compose -f docker-compose.ci.yml run --rm django python manage.py test ami.jobs --keepdbE2E test with ami-data-companion:
Apply the required ADC changes listed below, then run the integration test script:
# From the antenna repo root bash scripts/psv2_integration_test.sh 10This creates a collection, dispatches an async job, starts the ADC worker, and polls until the job completes or times out. Requires the local Antenna stack running (
docker compose up -d) and theami-py311conda env with ami-data-companion installed.Deployment Notes
Breaking change to both
/jobs/{id}/tasks/(GET to POST) and/jobs/{id}/result/(bare list no longer accepted). Deploy Antenna andami-data-companiontogether.Required changes in
ami-data-companion1. Task fetching (
trapdata/antenna/datasets.py:163-186) — change GET to POST with JSON body:batch(query param) becomesbatch_size(JSON body field, required int >= 1). Theprocessing_service_namequery param is removed — useclient_infoin the request body instead if needed.Important: The URL must end with a trailing slash. Django's
APPEND_SLASHsetting can redirect GETs but not POSTs, so a missing slash will return a 500 error.2. Result submission (
trapdata/antenna/client.py:75-107) — wrap bare list in{"results": [...]}:The
processing_service_namequery param is removed from this endpoint too.3. Mock test server (
trapdata/antenna/tests/antenna_api_server.py) — update to match new contract:@app.get(...)→@app.post(...), readbatch_sizefrom request body instead of query parampayload: list[dict]to accept{"results": [...]}4. Schema docstring (
trapdata/antenna/schemas.py:38) — update comment fromGETtoPOST5. Optional: add typed schemas for the new request/response shapes:
AntennaTasksRequest(batch_size: int)— request body for POST /tasksAntennaResultPostResponse(status, job_id, results_queued, tasks)— response from POST /resultChecklist