Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 65 additions & 2 deletions src/scope/cloud/livepeer_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
from scope.server.app import app as scope_app
from scope.server.app import lifespan as scope_lifespan
from scope.server.frame_processor import FrameProcessor
from scope.server.kafka_publisher import publish_event
from scope.server.media_packets import ensure_video_packet

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -113,6 +114,29 @@ class LivepeerSession:
media_publishes: list[MediaPublish | None] = field(default_factory=list)
user_id: str | None = None
connection_id: str | None = None
manifest_id: str | None = None
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pretty sure manifest_id is the same as the connection_id above

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... oh dear it's not. Well that's a simple change then

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i made manifest id to be same as connection id and the old conenction id should be gone now.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the manifest_id does not need to be passed in anymore if that's the case? Would simplify some of the ad hoc checks around here.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that's true. I'll clean this up in a follow up and file a ticket so i don't forget.

session_id: str | None = None
connection_info: dict[str, Any] | None = None


def _build_connection_info() -> dict[str, Any]:
"""Mirror the fal-wrapper's connection_info shape from env vars.

Keeps the runner's events (stream_*, pipeline_*) carrying the same
fal-region / runner-id / log-labels the wrapper's websocket_connected
uses, so downstream consumers can correlate them.
"""
fal_log_labels_raw = os.getenv("FAL_LOG_LABELS", "unknown")
try:
fal_log_labels = json.loads(fal_log_labels_raw)
except (json.JSONDecodeError, TypeError):
fal_log_labels = fal_log_labels_raw
return {
"gpu_type": os.getenv("FAL_MACHINE_TYPE", "GPU-H100"),
"fal_region": os.getenv("NOMAD_DC", "unknown"),
"fal_runner_id": os.getenv("FAL_JOB_ID", os.getenv("FAL_RUNNER_ID", "unknown")),
"fal_log_labels": fal_log_labels,
}


async def _shutdown_task(
Expand Down Expand Up @@ -327,6 +351,16 @@ async def _stop_stream(session: LivepeerSession) -> None:
if session.frame_processor is not None:
session.frame_processor.stop()
session.frame_processor = None
# Pair with the session_created emitted at stream start.
if session.session_id is not None:
publish_event(
event_type="session_closed",
session_id=session.session_id,
connection_id=session.manifest_id,
user_id=session.user_id,
connection_info=session.connection_info,
)
session.session_id = None

if session.active_channels:
channel_urls = [ch["url"] for ch in session.active_channels]
Expand Down Expand Up @@ -704,15 +738,20 @@ async def _handle_api_request(
"error": f"Failed restart websocket handshake: {exc}",
}

# Pass through validated user_id for pipeline load requests.
# Pass through validated user_id and the orchestrator-provided
# manifest_id for pipeline load requests. Using manifest_id (not the
# runner's internal connection_id) so pipeline_loaded events correlate
# with the fal wrapper's websocket_connected.connection_id.
if (
method == "POST"
and normalized_path == "/api/v1/pipeline/load"
and isinstance(body, dict)
and session.user_id
):
body["user_id"] = session.user_id
body["connection_id"] = session.connection_id
body["connection_id"] = session.manifest_id or session.connection_id
if session.connection_info is not None:
body["connection_info"] = session.connection_info

client = scope_client
if client is None:
Expand Down Expand Up @@ -1004,8 +1043,25 @@ async def _handle_control_message(
"produces_video": produces_video,
"produces_audio": produces_audio,
},
session_id=session.session_id,
user_id=session.user_id,
connection_id=session.manifest_id,
connection_info=session.connection_info,
)
session.frame_processor.start()
# Emit session_created so livepeer-mode event streams match the
# cloud-relay shape (webrtc.py:731 fires this in relay mode; we
# mirror it here because livepeer doesn't go through the WebRTC
# offer handler).
publish_event(
event_type="session_created",
session_id=session.session_id,
connection_id=session.manifest_id,
pipeline_ids=pipeline_ids if pipeline_ids else None,
user_id=session.user_id,
metadata={"mode": "livepeer"},
connection_info=session.connection_info,
)
session.media_stop_event.clear()
session.active_channels = active_channels
session.input_subscribe_urls = input_subscribe_urls
Expand Down Expand Up @@ -1303,6 +1359,13 @@ async def websocket_endpoint(ws: WebSocket) -> None:
# TODO move this into the top level request
params.pop("daydream_user_id", None)
session.user_id = user_id
# Persist fields needed downstream so Kafka events emitted by the
# inner pipeline_manager / FrameProcessor correlate with the fal
# wrapper's websocket_connected (which uses manifest_id as
# connection_id).
session.manifest_id = job_info.manifest_id
session.session_id = str(uuid.uuid4())
session.connection_info = _build_connection_info()

if not job_info.control_url:
await ws.send_text(
Expand Down
7 changes: 7 additions & 0 deletions src/scope/cloud/livepeer_fal_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,13 @@ def setup(self):
"KAFKA_TOPIC",
"KAFKA_SASL_USERNAME",
"KAFKA_SASL_PASSWORD",
# fal runtime metadata — used by the runner to build
# connection_info on Kafka events so they match the wrapper.
"NOMAD_DC",
"FAL_JOB_ID",
"FAL_RUNNER_ID",
"FAL_LOG_LABELS",
"FAL_MACHINE_TYPE",
]
runner_env = {k: os.environ[k] for k in env_allowlist if k in os.environ}
runner_env.setdefault("UV_CACHE_DIR", "/tmp/uv-cache")
Expand Down
Loading