Skip to content

refactor(jobs): change /tasks and /results endpoints for ML jobs#1197

Merged
mihow merged 12 commits intomainfrom
refactor/tasks-endpoint-post
Apr 4, 2026
Merged

refactor(jobs): change /tasks and /results endpoints for ML jobs#1197
mihow merged 12 commits intomainfrom
refactor/tasks-endpoint-post

Conversation

@mihow
Copy link
Copy Markdown
Collaborator

@mihow mihow commented Apr 1, 2026

Summary

Standardizes the HTTP contracts for the two endpoints that Processing Services use to interact with Antenna jobs. Both endpoints now use typed DRF serializers with Pydantic SchemaField for domain validation, and both accept an optional client_info field for future PS identity tracking.

Extracted from #1194 to keep the endpoint contract change separate from the auth/identity work.

What changed

Endpoint Before After
/jobs/{id}/tasks/ GET with ?batch=N query param POST with {"batch_size": N} body via MLJobTasksRequestSerializer
/jobs/{id}/result/ POST accepting 3 formats (bare list, wrapped dict, single item) POST with {"results": [...]} body via MLJobResultsRequestSerializer

New serializers

All in ami/jobs/serializers.py. Naming pattern: MLJob{Action}{Direction}Serializer

Serializer Endpoint Direction Fields
MLJobTasksRequestSerializer /tasks/ request batch_size (int), client_info (optional)
MLJobTasksResponseSerializer /tasks/ response tasks (list)
MLJobResultsRequestSerializer /result/ request results (list of PipelineTaskResult), client_info (optional)
MLJobResultsResponseSerializer /result/ response status, job_id, results_queued, tasks

New Pydantic model in ami/ml/schemas.py:

Model Purpose
ProcessingServiceClientInfo Identity metadata for a specific PS worker/pod. Uses extra = "allow" so processing services can send any key-value pairs useful for distinguishing machines running the same parent ProcessingService.

Endpoint contracts

POST /jobs/{id}/tasks/?project_id={id} — Fetch tasks from the job queue

// Request
{"batch_size": 10}

// Response (200)
{"tasks": [{"id": "...", "image_id": "...", "image_url": "...", "reply_subject": "..."}, ...]}

POST /jobs/{id}/result/?project_id={id} — Submit pipeline results

// Request
{"results": [{"reply_subject": "...", "result": {...}}, ...]}

// Response (200)
{"status": "accepted", "job_id": 1, "results_queued": 3, "tasks": [...]}

Error responses

Both endpoints return structured error responses from two validation layers:

DRF serializer validation (400) — missing or invalid fields in the HTTP envelope:

// Missing required field
{"batch_size": ["This field is required."]}

// Wrong type
{"batch_size": ["A valid integer is required."]}

Pydantic SchemaField validation (400) — invalid domain objects inside results:

// Missing required fields in PipelineTaskResult
{"results": ["2 validation errors for list[PipelineTaskResult]\n  ..."]}

// Bare list (no longer accepted)
{"results": ["This field is required."]}

Task queue unavailable (503) — Celery broker unreachable when queueing results:

{"status": "error", "job_id": 1, "detail": "Task queue temporarily unavailable"}

Both layers use DRF's standard exception handler, so error shapes are consistent with the rest of the API.

Related Issues

Relates to #1194

How to Test

Unit tests:

docker compose -f docker-compose.ci.yml run --rm django python manage.py test ami.jobs --keepdb

E2E test with ami-data-companion:

Apply the required ADC changes listed below, then run the integration test script:

# From the antenna repo root
bash scripts/psv2_integration_test.sh 10

This creates a collection, dispatches an async job, starts the ADC worker, and polls until the job completes or times out. Requires the local Antenna stack running (docker compose up -d) and the ami-py311 conda env with ami-data-companion installed.

Deployment Notes

Breaking change to both /jobs/{id}/tasks/ (GET to POST) and /jobs/{id}/result/ (bare list no longer accepted). Deploy Antenna and ami-data-companion together.

Required changes in ami-data-companion

1. Task fetching (trapdata/antenna/datasets.py:163-186) — change GET to POST with JSON body:

# Before
url = f"{self.base_url.rstrip('/')}/jobs/{self.job_id}/tasks"
params = {
    "batch": self.batch_size,
    "processing_service_name": self.processing_service_name,
}
response = self._api_session.get(url, params=params, timeout=30)

# After — note the trailing slash (required for POST with APPEND_SLASH)
url = f"{self.base_url.rstrip('/')}/jobs/{self.job_id}/tasks/"
body = {"batch_size": self.batch_size}
response = self._api_session.post(url, json=body, timeout=30)

batch (query param) becomes batch_size (JSON body field, required int >= 1). The processing_service_name query param is removed — use client_info in the request body instead if needed.

Important: The URL must end with a trailing slash. Django's APPEND_SLASH setting can redirect GETs but not POSTs, so a missing slash will return a 500 error.

2. Result submission (trapdata/antenna/client.py:75-107) — wrap bare list in {"results": [...]}:

# Before
payload = [r.model_dump(mode="json") for r in results]
params = {"processing_service_name": processing_service_name}
response = session.post(url, json=payload, params=params, timeout=60)

# After — use the existing AntennaTaskResults wrapper model
payload = AntennaTaskResults(results=results).model_dump(mode="json")
response = session.post(url, json=payload, timeout=60)

The processing_service_name query param is removed from this endpoint too.

3. Mock test server (trapdata/antenna/tests/antenna_api_server.py) — update to match new contract:

  • Line 65: @app.get(...)@app.post(...), read batch_size from request body instead of query param
  • Line 86-87: Change payload: list[dict] to accept {"results": [...]}

4. Schema docstring (trapdata/antenna/schemas.py:38) — update comment from GET to POST

5. Optional: add typed schemas for the new request/response shapes:

  • AntennaTasksRequest(batch_size: int) — request body for POST /tasks
  • AntennaResultPostResponse(status, job_id, results_queued, tasks) — response from POST /result
  • Use these in client code and mock server FastAPI handlers for contract validation

Checklist

  • I have tested these changes appropriately.
  • I have added and/or modified relevant tests.
  • I updated relevant documentation or comments.
  • I have verified that this PR follows the project's coding standards.
  • Any dependent changes have already been merged to main.

Copilot AI review requested due to automatic review settings April 1, 2026 17:39
@netlify
Copy link
Copy Markdown

netlify bot commented Apr 1, 2026

Deploy Preview for antenna-ssec canceled.

Name Link
🔨 Latest commit e799871
🔍 Latest deploy log https://app.netlify.com/projects/antenna-ssec/deploys/69d0458b9ebbc4000853039f

@netlify
Copy link
Copy Markdown

netlify bot commented Apr 1, 2026

Deploy Preview for antenna-preview canceled.

Name Link
🔨 Latest commit e799871
🔍 Latest deploy log https://app.netlify.com/projects/antenna-preview/deploys/69d0458b9df29f0009003c9d

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai bot commented Apr 1, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

Replaces the GET /jobs/{id}/tasks with POST and moves batch from query to TasksRequestSerializer body; adds DRF serializers for task/result requests and responses; introduces ProcessingServiceClientInfo Pydantic model; updates /jobs/{id}/result to accept wrapped {"results": [...]} and adjusts logging/queue counting; tests updated accordingly.

Changes

Cohort / File(s) Summary
Serializers / Schemas
ami/jobs/schemas.py
Added TasksRequestSerializer, TasksResponseSerializer, PipelineResultsRequestSerializer, PipelineResultsResponseSerializer; removed batch_param.
API Views
ami/jobs/views.py
Changed /jobs/{id}/tasks from GET→POST, validate request body with TasksRequestSerializer, use batch_size for reserve_tasks; /jobs/{id}/result now validates wrapped {"results": [...]} via PipelineResultsRequestSerializer; removed per-item Pydantic validation loop; adjusted results_queued counting and switched to parameterized logging.
Pydantic Models
ami/ml/schemas.py
Added exported ProcessingServiceClientInfo Pydantic model with fields hostname, software, version, platform, pod_name.
Tests
ami/jobs/tests/test_jobs.py
Updated tests to POST {"batch_size": ...} to tasks endpoint and to POST wrapped {"results": [...]} for results; added assertion that a bare-list result payload is rejected; updated non-ASYNC_API rejection test to use POST body.

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant API as "Django API\n(views)"
    participant Serializer as "DRF Serializer\n(TasksRequestSerializer / PipelineResultsRequestSerializer)"
    participant Worker as "Task Reservation / Queue"

    Client->>API: POST /jobs/{id}/tasks { "batch_size": N, "client_info": {...} }
    API->>Serializer: validate request body
    Serializer-->>API: validated_data (batch_size, client_info)
    API->>Worker: reserve_tasks(count=batch_size)
    Worker-->>API: reserved tasks -> respond with tasks

    Client->>API: POST /jobs/{id}/result { "results": [...], "client_info": {...} }
    API->>Serializer: validate results wrapper
    Serializer-->>API: validated_data (results, client_info)
    API->>Worker: enqueue/process results
    Worker-->>API: ack (results_queued, tasks info)
    API-->>Client: 202 Accepted { status, job_id, results_queued, tasks }
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20 minutes

Possibly related issues

Possibly related PRs

Suggested labels

backend, ml

Poem

🐇 I swapped query for JSON crumbs today,
Validators nibble, and tasks hop away,
Results in a bundle, queued neat and tight,
Logs hum softly through day and night,
Hooray — the pipeline bounces bright!

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 50.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Title check ✅ Passed The title 'refactor(jobs): change /tasks and /results endpoints for ML jobs' clearly and concisely summarizes the main changes—converting two API endpoints from their previous implementations to new POST-based contracts with DRF serializers.
Description check ✅ Passed The pull request description is comprehensive and well-structured, covering summary, changes, new serializers, endpoint contracts, error responses, testing instructions, and deployment notes.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch refactor/tasks-endpoint-post

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Refactors worker-facing job endpoints to move /jobs/{id}/tasks/ from a GET-with-query-params contract to a POST-with-body contract, and introduces structured client_info payload handling intended to accompany worker requests.

Changes:

  • Switched /jobs/{id}/tasks/ from GET (?batch=N) to POST with a validated JSON body via TasksRequestSerializer.
  • Added ClientInfoSerializer (and helper utilities) for structured worker/client metadata.
  • Updated /jobs/{id}/result/ to accept a wrapped request format with {"results": [...], "client_info": {...}} while keeping a legacy bare-list fallback, plus logging format tweaks.

Reviewed changes

Copilot reviewed 5 out of 5 changed files in this pull request and generated 6 comments.

Show a summary per file
File Description
ami/jobs/views.py Changes /tasks to POST + body validation; extends /result request parsing for wrapped payloads; updates logging.
ami/jobs/schemas.py Adds TasksRequestSerializer and pulls in ClientInfoSerializer for request-body validation.
ami/jobs/tests/test_jobs.py Updates tasks endpoint tests to POST with JSON body instead of GET query params.
ami/ml/serializers_client_info.py Introduces ClientInfoSerializer and helper functions for extracting/augmenting client metadata.
ami/ml/tests.py Adds unit tests for ClientInfoSerializer validation behavior.
Comments suppressed due to low confidence (1)

ami/jobs/schemas.py:31

  • Now that /tasks uses a request-body serializer, batch_param appears unused in this module (no references found). Consider removing it (or moving to a shared location if still needed) to avoid stale/duplicated schema definitions.
batch_param = OpenApiParameter(
    name="batch",
    description="Number of tasks to retrieve",
    required=False,
    type=int,
)


class TasksRequestSerializer(serializers.Serializer):
    batch = serializers.IntegerField(min_value=1, required=True)
    client_info = ClientInfoSerializer(required=False)


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (5)
ami/jobs/views.py (2)

254-256: client_info is validated but not used.

The TasksRequestSerializer validates client_info from the request body, but the validated data is never used in this endpoint. Consider logging it or passing it to _mark_pipeline_pull_services_seen() for tracking worker activity.

♻️ Suggested usage
 serializer = TasksRequestSerializer(data=request.data)
 serializer.is_valid(raise_exception=True)
 batch = serializer.validated_data["batch"]
+client_info = serializer.validated_data.get("client_info", {})
 
 job: Job = self.get_object()
 
 # ...
 
-# Record heartbeat for async processing services on this pipeline
-_mark_pipeline_pull_services_seen(job)
+# Record heartbeat for async processing services on this pipeline
+_mark_pipeline_pull_services_seen(job)
+if client_info:
+    logger.debug("Task fetch from client: %s", client_info.get("hostname", "unknown"))
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ami/jobs/views.py` around lines 254 - 256, The serializer validates
client_info but the endpoint never uses it; after serializer.is_valid(...)
retrieve serializer.validated_data["client_info"] and pass it into
_mark_pipeline_pull_services_seen(...) (or at minimum log it) so worker activity
is tracked; update the call site that currently only uses batch (variable batch
from serializer.validated_data["batch"]) to also accept and forward client_info
to the _mark_pipeline_pull_services_seen function and adjust that function
signature/invocations if needed to accept the new client_info parameter.

360-367: Consider narrowing the exception scope or adding more context to error response.

The broad except Exception catch (flagged by static analysis) may hide unexpected errors. While it provides resilience, consider:

  1. Narrowing to expected exceptions (e.g., Celery task failures, serialization errors)
  2. Adding the error type/message to the response for debugging
  3. Re-raising unexpected exceptions after logging
♻️ Suggested improvement
         except Exception as e:
-            logger.error("Failed to queue pipeline results for job %s: %s", job.pk, e)
+            logger.exception("Failed to queue pipeline results for job %s", job.pk)
             return Response(
                 {
                     "status": "error",
                     "job_id": job.pk,
+                    "error": str(e),
                 },
                 status=500,
             )
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ami/jobs/views.py` around lines 360 - 367, The broad except Exception block
that logs "Failed to queue pipeline results for job %s" should be narrowed and
made more informative: replace the bare except in the view handling the queueing
operation (the block that references logger, job.pk and returns a Response) with
catches for expected errors (e.g., CeleryError,
kombu.exceptions.SerializationError, ValueError) and handle them by logging
logger.error with the exception type and message, returning a 500 Response that
includes an "error": "<ExceptionType>: <message>" field alongside status and
job_id; for truly unexpected exceptions re-raise after logging so they surface
to error monitoring. Ensure references to logger, job.pk, and the Response
payload are preserved.
ami/ml/serializers_client_info.py (1)

35-38: Silent validation failure may hide client errors.

When ClientInfoSerializer validation fails, the code silently falls back to an empty dict without logging. This could make debugging client issues difficult. Consider logging a warning when validation fails.

♻️ Suggested improvement
+import logging
+
+logger = logging.getLogger(__name__)
+
+
 def get_client_info(request) -> dict:
     # ...
     serializer = ClientInfoSerializer(data=raw)
     if serializer.is_valid():
         info = serializer.validated_data
     else:
+        logger.warning("Invalid client_info in request: %s", serializer.errors)
         info = {}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ami/ml/serializers_client_info.py` around lines 35 - 38, The current block
silently drops validation errors for ClientInfoSerializer by returning an empty
dict when serializer.is_valid() is False; update the code around the
serializer.is_valid() check to log a warning (e.g., logger.warning) including
serializer.errors and any identifying request/client context before falling back
to info = {}, so you can trace client-side validation failures; ensure the
module has a logger (e.g., logging.getLogger(__name__)) and include the
serializer name (ClientInfoSerializer) in the message.
ami/ml/tests.py (1)

1371-1408: Consider consolidating imports and adding test coverage for get_client_info().

The test class imports ClientInfoSerializer inside each test method. Consider moving to a single import at the top of the test class or file for cleaner code.

Additionally, while ClientInfoSerializer is tested, the get_client_info() helper function (which adds server-observed fields like ip and user_agent) is not tested.

♻️ Suggested refactor
+from ami.ml.serializers_client_info import ClientInfoSerializer, get_client_info
+
+
 class TestClientInfoSerializer(TestCase):
     def test_valid_client_info(self):
-        from ami.ml.serializers_client_info import ClientInfoSerializer
-
         data = {
             # ...

And consider adding a test for get_client_info():

def test_get_client_info_extracts_server_fields(self):
    from unittest.mock import Mock
    
    request = Mock()
    request.data = {"client_info": {"hostname": "test-host"}}
    request.headers = {"user-agent": "TestAgent/1.0"}
    request.META = {"REMOTE_ADDR": "192.168.1.1"}
    
    info = get_client_info(request)
    self.assertEqual(info["hostname"], "test-host")
    self.assertEqual(info["ip"], "192.168.1.1")
    self.assertEqual(info["user_agent"], "TestAgent/1.0")
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ami/ml/tests.py` around lines 1371 - 1408, Move the repeated in-test imports
of ClientInfoSerializer to a single import at the top of the test file or the
TestClientInfoSerializer class to reduce duplication; also add a new unit test
for get_client_info that constructs a mocked request (use unittest.mock.Mock),
sets request.data to include a client_info dict, sets
request.headers["user-agent"], and request.META["REMOTE_ADDR"], then calls
get_client_info(request) and asserts the returned dict contains the original
client_info fields plus server-observed "ip" and "user_agent" entries; reference
ClientInfoSerializer, get_client_info, and TestClientInfoSerializer when making
these changes.
ami/jobs/schemas.py (1)

20-25: Remove unused batch_param OpenAPI parameter.

The /tasks endpoint now uses POST with a request body serializer (TasksRequestSerializer), not query parameters. The batch_param definition is dead code left from an earlier implementation and is never imported or used.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ami/jobs/schemas.py` around lines 20 - 25, The OpenApiParameter object
batch_param in ami/jobs/schemas.py is dead code (unused by the current /tasks
POST that uses TasksRequestSerializer); remove the batch_param variable
declaration and any unused imports tied to it so the module no longer defines or
exports that unused OpenAPI parameter (search for batch_param in
ami/jobs/schemas.py to locate and delete it).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@ami/jobs/schemas.py`:
- Around line 20-25: The OpenApiParameter object batch_param in
ami/jobs/schemas.py is dead code (unused by the current /tasks POST that uses
TasksRequestSerializer); remove the batch_param variable declaration and any
unused imports tied to it so the module no longer defines or exports that unused
OpenAPI parameter (search for batch_param in ami/jobs/schemas.py to locate and
delete it).

In `@ami/jobs/views.py`:
- Around line 254-256: The serializer validates client_info but the endpoint
never uses it; after serializer.is_valid(...) retrieve
serializer.validated_data["client_info"] and pass it into
_mark_pipeline_pull_services_seen(...) (or at minimum log it) so worker activity
is tracked; update the call site that currently only uses batch (variable batch
from serializer.validated_data["batch"]) to also accept and forward client_info
to the _mark_pipeline_pull_services_seen function and adjust that function
signature/invocations if needed to accept the new client_info parameter.
- Around line 360-367: The broad except Exception block that logs "Failed to
queue pipeline results for job %s" should be narrowed and made more informative:
replace the bare except in the view handling the queueing operation (the block
that references logger, job.pk and returns a Response) with catches for expected
errors (e.g., CeleryError, kombu.exceptions.SerializationError, ValueError) and
handle them by logging logger.error with the exception type and message,
returning a 500 Response that includes an "error": "<ExceptionType>: <message>"
field alongside status and job_id; for truly unexpected exceptions re-raise
after logging so they surface to error monitoring. Ensure references to logger,
job.pk, and the Response payload are preserved.

In `@ami/ml/serializers_client_info.py`:
- Around line 35-38: The current block silently drops validation errors for
ClientInfoSerializer by returning an empty dict when serializer.is_valid() is
False; update the code around the serializer.is_valid() check to log a warning
(e.g., logger.warning) including serializer.errors and any identifying
request/client context before falling back to info = {}, so you can trace
client-side validation failures; ensure the module has a logger (e.g.,
logging.getLogger(__name__)) and include the serializer name
(ClientInfoSerializer) in the message.

In `@ami/ml/tests.py`:
- Around line 1371-1408: Move the repeated in-test imports of
ClientInfoSerializer to a single import at the top of the test file or the
TestClientInfoSerializer class to reduce duplication; also add a new unit test
for get_client_info that constructs a mocked request (use unittest.mock.Mock),
sets request.data to include a client_info dict, sets
request.headers["user-agent"], and request.META["REMOTE_ADDR"], then calls
get_client_info(request) and asserts the returned dict contains the original
client_info fields plus server-observed "ip" and "user_agent" entries; reference
ClientInfoSerializer, get_client_info, and TestClientInfoSerializer when making
these changes.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 4d8d07a9-2d2c-423a-8731-6736bcd55d66

📥 Commits

Reviewing files that changed from the base of the PR and between ed8f857 and 15cedc6.

📒 Files selected for processing (5)
  • ami/jobs/schemas.py
  • ami/jobs/tests/test_jobs.py
  • ami/jobs/views.py
  • ami/ml/serializers_client_info.py
  • ami/ml/tests.py

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
ami/jobs/views.py (1)

333-370: ⚠️ Potential issue | 🟠 Major

Don't report full failure after partial enqueue.

Once Line 333 starts dispatching, a later exception can leave earlier results already accepted, but Line 365 still tells the client the whole batch failed. That makes a retry path duplicate already-enqueued results. Return the accepted subset on the error path, or make enqueueing idempotent by reply subject before clients retry.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ami/jobs/views.py` around lines 333 - 370, The current except Exception path
reports the entire batch as failed even after some results were already enqueued
by process_nats_pipeline_result, leading to unsafe client retries; change the
error handling around the loop that builds queued_tasks so that on exception you
return a Response containing the accepted subset (job_id,
results_queued=len(queued_tasks), tasks=queued_tasks) and a distinct status like
"partial_error" (and still log the exception via logger.error referencing job.pk
and the exception) instead of reporting the whole batch as failed; update the
except block that currently logs "Failed to queue pipeline results for job %s:
%s" to return the partial acceptance payload so clients can retry only the
remaining items or de-duplicate safely.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@ami/jobs/views.py`:
- Around line 321-323: The loop building PipelineTaskResult from raw_results can
raise TypeError when an item is not a mapping (e.g., int or None) before
Pydantic validation runs; update the code that iterates raw_results (where
PipelineTaskResult(**item) is called) to guard each item by checking
isinstance(item, dict) and if not, raise a fast HTTP 400 (bad request) or append
a validation error for that item; alternatively, catch TypeError alongside
pydantic.ValidationError where PipelineTaskResult(...) is instantiated and
convert it into a 400 response instead of falling through to the generic 500
handler so invalid client input returns 400.

---

Outside diff comments:
In `@ami/jobs/views.py`:
- Around line 333-370: The current except Exception path reports the entire
batch as failed even after some results were already enqueued by
process_nats_pipeline_result, leading to unsafe client retries; change the error
handling around the loop that builds queued_tasks so that on exception you
return a Response containing the accepted subset (job_id,
results_queued=len(queued_tasks), tasks=queued_tasks) and a distinct status like
"partial_error" (and still log the exception via logger.error referencing job.pk
and the exception) instead of reporting the whole batch as failed; update the
except block that currently logs "Failed to queue pipeline results for job %s:
%s" to return the partial acceptance payload so clients can retry only the
remaining items or de-duplicate safely.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 00cfd037-eb54-415e-84ef-0174cf1e9af1

📥 Commits

Reviewing files that changed from the base of the PR and between 15cedc6 and b987f2d.

📒 Files selected for processing (2)
  • ami/jobs/schemas.py
  • ami/jobs/views.py
🚧 Files skipped from review as they are similar to previous changes (1)
  • ami/jobs/schemas.py

@mihow
Copy link
Copy Markdown
Collaborator Author

mihow commented Apr 1, 2026

@copilot add instructions for deployment and suggest changes that need to be made in the Processing Services (https://github.com/RolnickLab/ami-data-companion specifically for this to work)

Copy link
Copy Markdown
Contributor

Copilot AI commented Apr 1, 2026

@copilot add instructions for deployment and suggest changes that need to be made in the Processing Services (https://github.com/RolnickLab/ami-data-companion specifically for this to work)

The PR description now includes deployment sequencing and the exact code change needed in ami-data-companion.

Required change: trapdata/antenna/datasets.pyRESTDataset._fetch_tasks

# Before (GET with batch as query param)
response = self._api_session.get(url, params={"batch": self.batch_size, ...}, timeout=30)

# After (POST with batch in JSON body)
body = {"batch": self.batch_size}
params = {"processing_service_name": self.processing_service_name}
response = self._api_session.post(url, json=body, params=params, timeout=30)

Also add a trailing slash to the URL: .../tasks/.

No other files in ami-data-companion need changes. result_posting.py already uses POST and is unaffected.

Deployment order: Deploy this Antenna backend change first, then update the worker. During the gap, workers receive 405 Method Not Allowed but handle it gracefully (tasks stay in the NATS queue), so no data is lost.

mihow and others added 3 commits April 1, 2026 17:20
…erializer

Convert the /jobs/{id}/tasks/ endpoint from GET with query params to POST with request body.
This allows for validated request data including optional client_info field, better for
processing service workers that need to report their identity.

Changes:
- Add ami/ml/serializers_client_info.py with ClientInfoSerializer and get_client_info() helper
- Add TasksRequestSerializer to ami/jobs/schemas.py for validating batch and client_info
- Update /tasks action from GET to POST with serializer-based validation
- Support wrapped format in /result endpoint: {"client_info": {...}, "results": [...]}
- Update logger calls to use %s format (more efficient)
- Update all tests to use POST instead of GET
- Add TestClientInfoSerializer test class with validation tests

No changes to _mark_pipeline_pull_services_seen() or permission_classes.

Co-Authored-By: Claude <noreply@anthropic.com>
- Remove ClientInfoSerializer module and tests (belongs in backend
  auth/identity branch)
- Remove client_info field from TasksRequestSerializer
- Remove unused batch_param OpenAPI parameter (dead code)
- Add project_id_doc_param to /tasks/ extend_schema
- Validate 'results' is a list in wrapped /result/ format
- Clean up docstrings

Co-Authored-By: Claude <noreply@anthropic.com>
Replace manual loop with pydantic.parse_obj_as() to validate the
results list. This catches non-dict items (e.g. int, null) with a
proper 400 ValidationError instead of a TypeError falling through
to the generic 500 handler.

Co-Authored-By: Claude <noreply@anthropic.com>
@mihow mihow force-pushed the refactor/tasks-endpoint-post branch from b987f2d to 4cb1b7f Compare April 2, 2026 00:52
@mihow
Copy link
Copy Markdown
Collaborator Author

mihow commented Apr 2, 2026

Fixed the TypeError → 500 issue raised by @coderabbitai in 4cb1b7f — replaced the manual for item in raw_results: PipelineTaskResult(**item) loop with pydantic.parse_obj_as(list[PipelineTaskResult], raw_results).

This catches non-dict items (int, None, etc.) natively via Pydantic validation, returning a proper 400 instead of falling through to the generic 500 handler.

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai bot commented Apr 2, 2026

Tip

For best results, initiate chat on the files or code changes.

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Check the current state of the result endpoint in views.py
rg -n "parse_obj_as\|PipelineTaskResult\|raw_results\|pydantic" ami/jobs/views.py -A3 -B3

Length of output: 0


🏁 Script executed:

#!/bin/bash
# Look at the result action in views.py
rg -n "def result\|raw_results\|PipelineTaskResult\|parse_obj\|pydantic\|ValidationError\|queued_tasks" ami/jobs/views.py -A5 -B2

Length of output: 0


🏁 Script executed:

#!/bin/bash
# Check git log to find the commit
git log --oneline -10

# Also dump the result action from views.py
cat ami/jobs/views.py | grep -n "result\|pydantic\|parse_obj\|PipelineTask\|raw_result\|queued" | head -60

Length of output: 3190


@mihow: Confirmed — the fix looks great. The updated code at line 320 correctly uses pydantic.parse_obj_as(list[PipelineTaskResult], raw_results), and the pydantic.ValidationError is properly caught at line 357 and re-raised as a DRF ValidationError, guaranteeing a clean 400 response for malformed items like bare integers or None values.

One minor heads-up for the future: pydantic.parse_obj_as is the Pydantic v1 API. If the project ever migrates to Pydantic v2, the equivalent call would be:

pydantic.TypeAdapter(list[PipelineTaskResult]).validate_python(raw_results)

Not an issue now, but worth keeping in mind when upgrading.


If you found this review helpful, would you consider giving us a shout-out on X?

Thank you for using CodeRabbit!

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
ami/jobs/views.py (1)

360-367: Consider narrowing the exception catch (optional).

Static analysis flags the broad Exception catch. While this serves as a safety net for unexpected Celery queueing failures, catching all exceptions can mask programming errors.

If desired, you could narrow this to specific expected exceptions:

♻️ Optional: Narrow exception types
         except pydantic.ValidationError as e:
             raise ValidationError(f"Invalid result data: {e}") from e
 
-        except Exception as e:
+        except (OSError, ConnectionError, RuntimeError) as e:
+            # Celery/Redis connection or task queueing failures
             logger.error("Failed to queue pipeline results for job %s: %s", job.pk, e)
             return Response(
                 {
                     "status": "error",
                     "job_id": job.pk,
                 },
                 status=500,
             )

Alternatively, you can suppress the lint with # noqa: BLE001 if the broad catch is intentional for resilience.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ami/jobs/views.py` around lines 360 - 367, The current except Exception as e
block around the pipeline queuing (catching all exceptions and logging via
logger.error and returning Response) is too broad; either restrict it to the
expected exception types (e.g., Celery-related exceptions such as CeleryError or
kombu.exceptions.* or whatever queueing-specific exceptions your project uses)
by replacing "except Exception" with those specific exceptions around the
queueing logic, or if the broad catch is intentionally required for resilience,
add a lint suppression comment (e.g., # noqa: BLE001) adjacent to the except
line to silence the static analysis; update logger.error and the Response return
behavior only if needed after narrowing the except.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@ami/jobs/views.py`:
- Around line 360-367: The current except Exception as e block around the
pipeline queuing (catching all exceptions and logging via logger.error and
returning Response) is too broad; either restrict it to the expected exception
types (e.g., Celery-related exceptions such as CeleryError or kombu.exceptions.*
or whatever queueing-specific exceptions your project uses) by replacing "except
Exception" with those specific exceptions around the queueing logic, or if the
broad catch is intentionally required for resilience, add a lint suppression
comment (e.g., # noqa: BLE001) adjacent to the except line to silence the static
analysis; update logger.error and the Response return behavior only if needed
after narrowing the except.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: ddcdb8fe-d13e-4252-9d2f-fdc4d0090f14

📥 Commits

Reviewing files that changed from the base of the PR and between b987f2d and 4cb1b7f.

📒 Files selected for processing (3)
  • ami/jobs/schemas.py
  • ami/jobs/tests/test_jobs.py
  • ami/jobs/views.py
🚧 Files skipped from review as they are similar to previous changes (2)
  • ami/jobs/tests/test_jobs.py
  • ami/jobs/schemas.py

@mihow
Copy link
Copy Markdown
Collaborator Author

mihow commented Apr 2, 2026

/jobs/{id}/tasks/ POST payload

Current (this PR):

POST /api/v2/jobs/{id}/tasks/?project_id={project_id}
Content-Type: application/json

{
  "batch": 10
}

batch (required, integer >= 1) controls how many tasks to fetch from the NATS queue. project_id stays as a query parameter (used by ProjectMixin).

After client_info lands (#1194):

POST /api/v2/jobs/{id}/tasks/?project_id={project_id}
Content-Type: application/json
Authorization: Api-Key <key>

{
  "batch": 10,
  "client_info": {
    "hostname": "worker-gpu-01",
    "software": "ami-data-companion",
    "version": "0.5.2",
    "platform": "linux-x86_64"
  }
}

client_info will be optional — omitting it works the same as today. When present, the endpoint can call mark_seen() on the associated ProcessingService, making the health status update a side effect of normal task-fetching activity rather than requiring a separate polling mechanism.

mihow and others added 3 commits April 1, 2026 22:19
Identity metadata for specific processing service instances.
Allows tracking which worker/pod is making requests when a single
ProcessingService has multiple workers.

Co-Authored-By: Claude <noreply@anthropic.com>
Add typed request/response serializers for both job endpoints:
- TasksRequestSerializer (existing, now with client_info)
- TasksResponseSerializer
- PipelineResultsRequestSerializer (replaces manual isinstance checks)
- PipelineResultsResponseSerializer

Uses SchemaField to delegate item validation to Pydantic models,
keeping DRF for the HTTP envelope and Pydantic for domain schemas.

Co-Authored-By: Claude <noreply@anthropic.com>
Replace manual isinstance branching and parse_obj_as with
PipelineResultsRequestSerializer. One canonical request shape:
{"results": [...]}. Bare list and single-item formats removed.

Tests updated to use wrapped format and assert bare lists are rejected.

Co-Authored-By: Claude <noreply@anthropic.com>
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
ami/jobs/views.py (1)

320-359: ⚠️ Potential issue | 🟠 Major

Don't report a full failure after partially queuing the batch.

The loop publishes results one by one, but any later exception returns a generic 500 for the whole request. A caller retrying after that will resend earlier reply_subjects that were already accepted. Either enqueue the batch atomically, or make process_nats_pipeline_result idempotent on reply_subject and return the queued subset on failure.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ami/jobs/views.py` around lines 320 - 359, The current loop over
validated_results calls process_nats_pipeline_result.delay and if any exception
occurs later the except block returns a 500 for the whole request even though
some tasks were already queued; update the handler to avoid reporting full
failure after partial success by making the enqueue operation fault-tolerant:
perform per-item try/except inside the loop (wrap the call to
process_nats_pipeline_result.delay and the queued_tasks append in a try) so you
collect both queued_tasks and failed_tasks (include reply_subject, error info,
and skip/continue on failures), log each per-item failure, and finally return a
response that reports accepted/queued subset and failed items instead of raising
a generic error. Alternatively, ensure process_nats_pipeline_result is
idempotent on reply_subject so retries are safe and then respond with the queued
subset if any exceptions still occur.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@ami/jobs/views.py`:
- Around line 316-318: Request handling currently rejects legacy bare-list
payloads because PipelineResultsRequestSerializer expects {"results": [...]};
detect when request.data is a list (or tuple) and wrap it into {"results":
request.data} before creating/validating the serializer so existing workers
posting [...] continue to work. Update the code around
PipelineResultsRequestSerializer / serializer / validated_results to normalize
request.data into the wrapped shape when necessary, then call
serializer.is_valid(raise_exception=True) as before.

In `@ami/ml/schemas.py`:
- Around line 265-276: ProcessingServiceClientInfo is being used as optional
client_info fields in the job schemas; confirm whether ami/jobs/schemas.py was
changed in this PR or already referenced client_info prior to it, and then
either (a) keep the ProcessingServiceClientInfo class if ami.jobs.schemas
legitimately needs it, (b) remove the class and related imports if
ami/jobs/schemas.py shouldn’t reference client_info in this PR, or (c) leave the
optional fields but add a clarifying comment and update the PR/deployment notes
to state that client_info is placeholder/deferred; check and act on usages of
ProcessingServiceClientInfo and the client_info fields in the Job schema
definitions to align code and PR notes.

---

Outside diff comments:
In `@ami/jobs/views.py`:
- Around line 320-359: The current loop over validated_results calls
process_nats_pipeline_result.delay and if any exception occurs later the except
block returns a 500 for the whole request even though some tasks were already
queued; update the handler to avoid reporting full failure after partial success
by making the enqueue operation fault-tolerant: perform per-item try/except
inside the loop (wrap the call to process_nats_pipeline_result.delay and the
queued_tasks append in a try) so you collect both queued_tasks and failed_tasks
(include reply_subject, error info, and skip/continue on failures), log each
per-item failure, and finally return a response that reports accepted/queued
subset and failed items instead of raising a generic error. Alternatively,
ensure process_nats_pipeline_result is idempotent on reply_subject so retries
are safe and then respond with the queued subset if any exceptions still occur.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: a40158ad-3a5b-4714-b1fe-b413034117e6

📥 Commits

Reviewing files that changed from the base of the PR and between 4cb1b7f and a0e3afa.

📒 Files selected for processing (4)
  • ami/jobs/schemas.py
  • ami/jobs/tests/test_jobs.py
  • ami/jobs/views.py
  • ami/ml/schemas.py
🚧 Files skipped from review as they are similar to previous changes (1)
  • ami/jobs/schemas.py

"batch" sounds like content, "batch_size" clearly communicates
a requested quantity. Matches the ADC worker's field name.

Co-Authored-By: Claude <noreply@anthropic.com>
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
ami/jobs/views.py (1)

357-364: Consider narrowing the exception catch or adding explicit exception types.

The static analysis flags except Exception as overly broad. While this serves as a reasonable catch-all for task queueing failures (Celery/Redis issues), it could mask unexpected bugs like AttributeError from code changes.

Consider catching specific exceptions that represent queueing failures:

♻️ Optional: Narrow exception types
-        except Exception as e:
+        except (ConnectionError, TimeoutError, RuntimeError) as e:
             logger.error("Failed to queue pipeline results for job %s: %s", job.pk, e)

Or alternatively, re-raise unexpected exceptions after logging:

         except Exception as e:
-            logger.error("Failed to queue pipeline results for job %s: %s", job.pk, e)
+            logger.error("Failed to queue pipeline results for job %s: %s", job.pk, e, exc_info=True)
             return Response(
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ami/jobs/views.py` around lines 357 - 364, The catch-all "except Exception as
e" around the queueing logic should be narrowed: replace it with explicit
handlers for queue-related errors (e.g., CeleryError,
kombu.exceptions.OperationalError, redis.exceptions.RedisError or your project's
QueueError) and log those with logger.error("Failed to queue pipeline results
for job %s: %s", job.pk, e); for any other unexpected exceptions keep a final
bare "except Exception" only to log and re-raise (or simply omit broad catching)
so AttributeError/TypeError won't be swallowed; add the necessary imports for
the specific exception classes and update the except blocks around the queueing
code that references job.pk and logger.error accordingly.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@ami/jobs/views.py`:
- Around line 357-364: The catch-all "except Exception as e" around the queueing
logic should be narrowed: replace it with explicit handlers for queue-related
errors (e.g., CeleryError, kombu.exceptions.OperationalError,
redis.exceptions.RedisError or your project's QueueError) and log those with
logger.error("Failed to queue pipeline results for job %s: %s", job.pk, e); for
any other unexpected exceptions keep a final bare "except Exception" only to log
and re-raise (or simply omit broad catching) so AttributeError/TypeError won't
be swallowed; add the necessary imports for the specific exception classes and
update the except blocks around the queueing code that references job.pk and
logger.error accordingly.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 442baafa-f395-42c9-ae11-8e205d7df8c5

📥 Commits

Reviewing files that changed from the base of the PR and between a0e3afa and a4e1961.

📒 Files selected for processing (3)
  • ami/jobs/schemas.py
  • ami/jobs/tests/test_jobs.py
  • ami/jobs/views.py
🚧 Files skipped from review as they are similar to previous changes (1)
  • ami/jobs/schemas.py

@mihow
Copy link
Copy Markdown
Collaborator Author

mihow commented Apr 2, 2026

Naming discussion: schema layers & conventions

Researched naming patterns across Celery, Temporal, NATS, SQS, Ray, BentoML, and Spring Batch to validate our naming choices.

How names map across layers

HTTP serializers (ami/jobs/schemas.py — DRF, Antenna API contract):

Proposed name Role Analogy
MLJobTasksRequestSerializer Fetch tasks request body
MLJobTasksResponseSerializer Tasks returned to worker
MLJobResultsRequestSerializer Batch result submission SQS SendMessageBatch
MLJobResultsResponseSerializer Acknowledgment of receipt NATS PubAck

Pattern: MLJob{Tasks,Results}{Request,Response}Serializer

Domain schemas (ami/ml/schemas.py — Pydantic, shared contract with processing services):

Current name Role Framework analogy
PipelineProcessingTask Work item dispatched to worker Celery Task, Temporal ActivityTask
PipelineTaskResult Envelope: reply_subject + success OR error Celery AsyncResult, Temporal ActivityResult
PipelineResultsResponse Success payload (detections, classifications, timing) Celery .result, Temporal Success
PipelineResultsError Failure payload (error + image_id) Temporal Failure
ProcessingServiceClientInfo Worker/pod identity metadata

Key insight

"TaskResult" as the envelope name is the dominant pattern across distributed frameworks (Celery, Temporal, Ray all use it). The original confusion wasn't the domain names — it was the collision between PipelineResultsResponse (domain) and PipelineResultsResponseSerializer (HTTP). The MLJob* prefix on DRF serializers resolves this.

Future cleanup (separate PR)

Shorter domain schema names for clarity:

  • PipelineProcessingTaskPipelineTask
  • PipelineResultsResponsePipelineResults
  • PipelineResultsErrorPipelineError

These touch 30+ files across the codebase so they belong in a dedicated rename PR.

Rename DRF serializers to MLJob{Action}{Direction}Serializer pattern:
- TasksRequestSerializer → MLJobTasksRequestSerializer
- TasksResponseSerializer → MLJobTasksResponseSerializer
- PipelineResultsRequestSerializer → MLJobResultsRequestSerializer
- PipelineResultsResponseSerializer → MLJobResultsResponseSerializer

ProcessingServiceClientInfo: remove specific fields, use extra="allow"
so processing services can send any useful identity key-value pairs.

Add detailed docstrings clarifying each serializer's role in the
processing service ↔ Antenna flow.

Fix test_result_endpoint_with_error_result to use wrapped payload format.

Co-Authored-By: Claude <noreply@anthropic.com>
mihow added a commit that referenced this pull request Apr 2, 2026
Add djangorestframework-api-key based authentication for processing services.
Each ProcessingService can have API keys managed via Django admin or API.
Authenticated requests identify the specific service, enabling per-PS
heartbeat tracking and client_info collection.

Changes:
- ProcessingServiceAPIKey model (AbstractAPIKey) with FK to ProcessingService
- DRF auth backend (ProcessingServiceAPIKeyAuthentication) + permission class
- HasProcessingServiceAPIKey permission on /tasks and /result endpoints
- Per-PS heartbeat with client_info (ip, user_agent, hostname, software, etc.)
- generate_key API action and Django admin action
- Dual-auth pipeline registration (API key or legacy token)
- Serializers refactored into package with client_info extraction
- Minimal processing service self-registration example

Jobs endpoint HTTP semantics unchanged (GET /tasks, bare list /result).
The GET→POST refactor and request serializers are in PR #1197.

Co-Authored-By: Claude <noreply@anthropic.com>
mihow added a commit that referenced this pull request Apr 2, 2026
Add djangorestframework-api-key based authentication for processing services.
Each ProcessingService can have API keys managed via Django admin or API.
Authenticated requests identify the specific service, enabling per-PS
heartbeat tracking and client_info collection.

Changes:
- ProcessingServiceAPIKey model (AbstractAPIKey) with FK to ProcessingService
- DRF auth backend (ProcessingServiceAPIKeyAuthentication) + permission class
- HasProcessingServiceAPIKey permission on /tasks and /result endpoints
- Per-PS heartbeat with client_info (ip, user_agent, hostname, software, etc.)
- generate_key API action and Django admin action
- Dual-auth pipeline registration (API key or legacy token)
- Serializers refactored into package with client_info extraction
- Minimal processing service self-registration example

Jobs endpoint HTTP semantics unchanged (GET /tasks, bare list /result).
The GET→POST refactor and request serializers are in PR #1197.

Co-Authored-By: Claude <noreply@anthropic.com>
mihow added a commit that referenced this pull request Apr 2, 2026
Add djangorestframework-api-key based authentication for processing services.
Each ProcessingService can have API keys managed via Django admin or API.
Authenticated requests identify the specific service, enabling per-PS
heartbeat tracking and client_info collection.

Changes:
- ProcessingServiceAPIKey model (AbstractAPIKey) with FK to ProcessingService
- DRF auth backend (ProcessingServiceAPIKeyAuthentication) + permission class
- HasProcessingServiceAPIKey permission on /tasks and /result endpoints
- Per-PS heartbeat with client_info (ip, user_agent, hostname, software, etc.)
- generate_key API action and Django admin action
- Dual-auth pipeline registration (API key or legacy token)
- Serializers refactored into package with client_info extraction
- Minimal processing service self-registration example

Jobs endpoint HTTP semantics unchanged (GET /tasks, bare list /result).
The GET→POST refactor and request serializers are in PR #1197.

Co-Authored-By: Claude <noreply@anthropic.com>
mihow added a commit that referenced this pull request Apr 2, 2026
Add djangorestframework-api-key based authentication for processing services.
Each ProcessingService can have API keys managed via Django admin or API.
Authenticated requests identify the specific service, enabling per-PS
heartbeat tracking and client_info collection.

Changes:
- ProcessingServiceAPIKey model (AbstractAPIKey) with FK to ProcessingService
- DRF auth backend (ProcessingServiceAPIKeyAuthentication) + permission class
- HasProcessingServiceAPIKey permission on /tasks and /result endpoints
- Per-PS heartbeat with client_info (ip, user_agent, hostname, software, etc.)
- generate_key API action and Django admin action
- Dual-auth pipeline registration (API key or legacy token)
- Serializers refactored into package with client_info extraction
- Minimal processing service self-registration example

Jobs endpoint HTTP semantics unchanged (GET /tasks, bare list /result).
The GET→POST refactor and request serializers are in PR #1197.

Co-Authored-By: Claude <noreply@anthropic.com>
mihow added a commit that referenced this pull request Apr 2, 2026
Add djangorestframework-api-key based authentication for processing services.
Each ProcessingService can have API keys managed via Django admin or API.
Authenticated requests identify the specific service, enabling per-PS
heartbeat tracking and client_info collection.

Changes:
- ProcessingServiceAPIKey model (AbstractAPIKey) with FK to ProcessingService
- DRF auth backend (ProcessingServiceAPIKeyAuthentication) + permission class
- HasProcessingServiceAPIKey permission on /tasks and /result endpoints
- Per-PS heartbeat with client_info (ip, user_agent, hostname, software, etc.)
- generate_key API action and Django admin action
- Dual-auth pipeline registration (API key or legacy token)
- Serializers refactored into package with client_info extraction
- Minimal processing service self-registration example

Jobs endpoint HTTP semantics unchanged (GET /tasks, bare list /result).
The GET→POST refactor and request serializers are in PR #1197.

Co-Authored-By: Claude <noreply@anthropic.com>
@mihow mihow changed the title refactor(jobs): change /tasks endpoint from GET to POST refactor(jobs): change /tasks and /results endpoints for ML jobs Apr 2, 2026
mihow and others added 4 commits April 2, 2026 11:47
Replace broad `except Exception` with specific `(OSError, KombuError)`
to catch only Celery broker connection failures. Returns 503 with a
descriptive message instead of a generic 500 that would swallow bugs.

Co-Authored-By: Claude <noreply@anthropic.com>
Follows the codebase convention: schemas.py holds OpenApiParameter
definitions and Pydantic models, serializers.py holds DRF serializers.

Co-Authored-By: Claude <noreply@anthropic.com>
Replace ListField(child=DictField()) with SchemaField using real Pydantic
models for proper OpenAPI schema generation and validation:
- MLJobTasksResponseSerializer.tasks → list[PipelineProcessingTask]
- MLJobResultsResponseSerializer.tasks → list[QueuedTaskAcknowledgment]

Co-Authored-By: Claude <noreply@anthropic.com>
Pydantic model belongs in the schema file, not serializers.

Co-Authored-By: Claude <noreply@anthropic.com>
mihow added a commit to RolnickLab/ami-data-companion that referenced this pull request Apr 3, 2026
Adapt to breaking API changes in RolnickLab/antenna#1197:

- /jobs/{id}/tasks/ changed from GET with ?batch=N to POST with
  {"batch_size": N} in the request body
- /jobs/{id}/result/ now expects {"results": [...]} wrapper instead of
  a bare list
- Trailing slash is required on POST URLs (Django APPEND_SLASH cannot
  redirect POSTs)
- processing_service_name query param removed from both endpoints

Also adds typed Pydantic schemas for the new request/response shapes
(AntennaTasksRequest, AntennaResultPostResponse) and updates the mock
test server to use them as FastAPI body parameters.

Co-Authored-By: Claude <noreply@anthropic.com>
@mihow
Copy link
Copy Markdown
Collaborator Author

mihow commented Apr 4, 2026

Re: bare-list backward compatibility concern from CodeRabbit — the ADC PR (RolnickLab/ami-data-companion#134) already sends the wrapped {"results": [...]} format. These two PRs are deployed together, and the only consumer of this endpoint is the ADC worker. No compatibility shim is needed.

@mihow mihow merged commit 67d8b4c into main Apr 4, 2026
7 checks passed
@mihow mihow deleted the refactor/tasks-endpoint-post branch April 4, 2026 06:36
mihow added a commit to RolnickLab/ami-data-companion that referenced this pull request Apr 4, 2026
The upstream Antenna API (RolnickLab/antenna#1197) no longer accepts
processing_service_name on GET /jobs. Remove the parameter from the
client, worker call site, mock server, and test assertions.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
mihow added a commit to RolnickLab/ami-data-companion that referenced this pull request Apr 4, 2026
…lt (#134)

* refactor: update Antenna API client for POST /tasks and wrapped /result

Adapt to breaking API changes in RolnickLab/antenna#1197:

- /jobs/{id}/tasks/ changed from GET with ?batch=N to POST with
  {"batch_size": N} in the request body
- /jobs/{id}/result/ now expects {"results": [...]} wrapper instead of
  a bare list
- Trailing slash is required on POST URLs (Django APPEND_SLASH cannot
  redirect POSTs)
- processing_service_name query param removed from both endpoints

Also adds typed Pydantic schemas for the new request/response shapes
(AntennaTasksRequest, AntennaResultPostResponse) and updates the mock
test server to use them as FastAPI body parameters.

Co-Authored-By: Claude <noreply@anthropic.com>

* refactor: remove unused processing_service_name from result posting and dataset APIs

The processing_service_name parameter was threaded through post_batch_results,
ResultPoster.post_async, RESTDataset, get_rest_dataloader, and _process_job but
was never sent to the Antenna API in any of these code paths. Removes the dead
parameter from all affected signatures and call sites.

Also adds gt=0 validation to AntennaTasksRequest.batch_size.

Co-Authored-By: Claude <noreply@anthropic.com>

* fix(tests): remove stale processing_service_name arg from _process_job calls

The previous commit removed processing_service_name from _process_job but
missed updating the test call sites, causing "got multiple values for
argument 'device'" errors.

Co-Authored-By: Claude <noreply@anthropic.com>

* refactor: remove processing_service_name from GET /jobs endpoint

The upstream Antenna API (RolnickLab/antenna#1197) no longer accepts
processing_service_name on GET /jobs. Remove the parameter from the
client, worker call site, mock server, and test assertions.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* fix(tests): strict URL matching in mock server and TestClient

- Add trailing slash to POST /tasks/ route (matches client and /result/)
- Set redirect_slashes=False on mock FastAPI app so slash mismatches
  cause 404 instead of silent redirects
- Set follow_redirects=False on all TestClient instances so redirects
  surface as test failures

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* refactor: clean up stale docstrings and vestigial benchmark service_name

- Remove service_name from result_posting.py docstring examples (param
  was removed in 588c186 but examples not updated)
- Remove vestigial service_name param from benchmark run_benchmark()
  and CLI arg (no longer passed to any API call)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude <noreply@anthropic.com>
mihow added a commit that referenced this pull request Apr 4, 2026
Add djangorestframework-api-key based authentication for processing services.
Each ProcessingService can have API keys managed via Django admin or API.
Authenticated requests identify the specific service, enabling per-PS
heartbeat tracking and client_info collection.

Changes:
- ProcessingServiceAPIKey model (AbstractAPIKey) with FK to ProcessingService
- DRF auth backend (ProcessingServiceAPIKeyAuthentication) + permission class
- HasProcessingServiceAPIKey permission on /tasks and /result endpoints
- Per-PS heartbeat with client_info (ip, user_agent, hostname, software, etc.)
- generate_key API action and Django admin action
- Dual-auth pipeline registration (API key or legacy token)
- Serializers refactored into package with client_info extraction
- Minimal processing service self-registration example

Jobs endpoint HTTP semantics unchanged (GET /tasks, bare list /result).
The GET→POST refactor and request serializers are in PR #1197.

Co-Authored-By: Claude <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants