diff --git a/src/scope/cloud/livepeer_app.py b/src/scope/cloud/livepeer_app.py index d75f985a5..54d0b91c5 100644 --- a/src/scope/cloud/livepeer_app.py +++ b/src/scope/cloud/livepeer_app.py @@ -44,6 +44,7 @@ from scope.server.app import app as scope_app from scope.server.app import lifespan as scope_lifespan from scope.server.frame_processor import FrameProcessor +from scope.server.kafka_publisher import publish_event from scope.server.media_packets import ensure_video_packet logger = logging.getLogger(__name__) @@ -113,6 +114,29 @@ class LivepeerSession: media_publishes: list[MediaPublish | None] = field(default_factory=list) user_id: str | None = None connection_id: str | None = None + manifest_id: str | None = None + session_id: str | None = None + connection_info: dict[str, Any] | None = None + + +def _build_connection_info() -> dict[str, Any]: + """Mirror the fal-wrapper's connection_info shape from env vars. + + Keeps the runner's events (stream_*, pipeline_*) carrying the same + fal-region / runner-id / log-labels the wrapper's websocket_connected + uses, so downstream consumers can correlate them. + """ + fal_log_labels_raw = os.getenv("FAL_LOG_LABELS", "unknown") + try: + fal_log_labels = json.loads(fal_log_labels_raw) + except (json.JSONDecodeError, TypeError): + fal_log_labels = fal_log_labels_raw + return { + "gpu_type": os.getenv("FAL_MACHINE_TYPE", "GPU-H100"), + "fal_region": os.getenv("NOMAD_DC", "unknown"), + "fal_runner_id": os.getenv("FAL_JOB_ID", os.getenv("FAL_RUNNER_ID", "unknown")), + "fal_log_labels": fal_log_labels, + } async def _shutdown_task( @@ -327,6 +351,16 @@ async def _stop_stream(session: LivepeerSession) -> None: if session.frame_processor is not None: session.frame_processor.stop() session.frame_processor = None + # Pair with the session_created emitted at stream start. + if session.session_id is not None: + publish_event( + event_type="session_closed", + session_id=session.session_id, + connection_id=session.manifest_id, + user_id=session.user_id, + connection_info=session.connection_info, + ) + session.session_id = None if session.active_channels: channel_urls = [ch["url"] for ch in session.active_channels] @@ -704,7 +738,10 @@ async def _handle_api_request( "error": f"Failed restart websocket handshake: {exc}", } - # Pass through validated user_id for pipeline load requests. + # Pass through validated user_id and the orchestrator-provided + # manifest_id for pipeline load requests. Using manifest_id (not the + # runner's internal connection_id) so pipeline_loaded events correlate + # with the fal wrapper's websocket_connected.connection_id. if ( method == "POST" and normalized_path == "/api/v1/pipeline/load" @@ -712,7 +749,9 @@ async def _handle_api_request( and session.user_id ): body["user_id"] = session.user_id - body["connection_id"] = session.connection_id + body["connection_id"] = session.manifest_id or session.connection_id + if session.connection_info is not None: + body["connection_info"] = session.connection_info client = scope_client if client is None: @@ -1004,8 +1043,25 @@ async def _handle_control_message( "produces_video": produces_video, "produces_audio": produces_audio, }, + session_id=session.session_id, + user_id=session.user_id, + connection_id=session.manifest_id, + connection_info=session.connection_info, ) session.frame_processor.start() + # Emit session_created so livepeer-mode event streams match the + # cloud-relay shape (webrtc.py:731 fires this in relay mode; we + # mirror it here because livepeer doesn't go through the WebRTC + # offer handler). + publish_event( + event_type="session_created", + session_id=session.session_id, + connection_id=session.manifest_id, + pipeline_ids=pipeline_ids if pipeline_ids else None, + user_id=session.user_id, + metadata={"mode": "livepeer"}, + connection_info=session.connection_info, + ) session.media_stop_event.clear() session.active_channels = active_channels session.input_subscribe_urls = input_subscribe_urls @@ -1303,6 +1359,13 @@ async def websocket_endpoint(ws: WebSocket) -> None: # TODO move this into the top level request params.pop("daydream_user_id", None) session.user_id = user_id + # Persist fields needed downstream so Kafka events emitted by the + # inner pipeline_manager / FrameProcessor correlate with the fal + # wrapper's websocket_connected (which uses manifest_id as + # connection_id). + session.manifest_id = job_info.manifest_id + session.session_id = str(uuid.uuid4()) + session.connection_info = _build_connection_info() if not job_info.control_url: await ws.send_text( diff --git a/src/scope/cloud/livepeer_fal_app.py b/src/scope/cloud/livepeer_fal_app.py index d0a57ee4b..6134b0ec1 100644 --- a/src/scope/cloud/livepeer_fal_app.py +++ b/src/scope/cloud/livepeer_fal_app.py @@ -390,6 +390,13 @@ def setup(self): "KAFKA_TOPIC", "KAFKA_SASL_USERNAME", "KAFKA_SASL_PASSWORD", + # fal runtime metadata — used by the runner to build + # connection_info on Kafka events so they match the wrapper. + "NOMAD_DC", + "FAL_JOB_ID", + "FAL_RUNNER_ID", + "FAL_LOG_LABELS", + "FAL_MACHINE_TYPE", ] runner_env = {k: os.environ[k] for k in env_allowlist if k in os.environ} runner_env.setdefault("UV_CACHE_DIR", "/tmp/uv-cache")