Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 136 additions & 1 deletion docker/patch/latest/sglang.patch
Original file line number Diff line number Diff line change
Expand Up @@ -1125,7 +1125,7 @@ index 0ac1878..bbc94f7 100644
is_k_full=self.is_k_full,
routed_scaling_factor=self.moe_runner_config.routed_scaling_factor,
diff --git a/python/sglang/srt/managers/io_struct.py b/python/sglang/srt/managers/io_struct.py
index 293335f..51e5669 100644
index 293335f..a027c5f 100644
--- a/python/sglang/srt/managers/io_struct.py
+++ b/python/sglang/srt/managers/io_struct.py
@@ -1407,6 +1407,8 @@ class PauseContinueBroadcast:
Expand Down Expand Up @@ -1207,6 +1207,28 @@ index 293335f..51e5669 100644
@dataclass
class CheckWeightsReqInput(BaseReq):
action: str
@@ -2021,7 +2072,7 @@ class GetLoadsReqInput(BaseReq):
"""Request for /v1/loads endpoint."""

VALID_SECTIONS = frozenset(
- {"core", "memory", "spec", "lora", "disagg", "queues", "all"}
+ {"core", "memory", "spec", "lora", "disagg", "queues", "inflight", "all"}
)

include: List[str] = field(default_factory=lambda: ["all"])
@@ -2083,6 +2134,12 @@ class GetLoadsReqOutput(BaseReq):
lora: Optional[LoRAMetrics] = None
disaggregation: Optional[DisaggregationMetrics] = None
queues: Optional[QueueMetrics] = None
+ # Per-request breakdown of every queue (only populated when "inflight" or
+ # "all" is requested). Each entry: {name, num_reqs, reqs: [{rid,
+ # bootstrap_room, seqlen, age_s, stage, waiting_for_input?,
+ # timeout_cancel_issued?}, ...]}. Kept as plain dicts (not a dataclass) to
+ # mirror the existing queue_details shape and avoid asdict recursion.
+ inflight: Optional[List[Dict[str, Any]]] = None


@dataclass
diff --git a/python/sglang/srt/managers/schedule_batch.py b/python/sglang/srt/managers/schedule_batch.py
index feecc54..6fce256 100755
--- a/python/sglang/srt/managers/schedule_batch.py
Expand Down Expand Up @@ -2759,6 +2781,119 @@ index 326aace..b62804c 100644
return meta_data

def format_duration(self, duration: float) -> str:
diff --git a/python/sglang/srt/observability/scheduler_metrics_mixin.py b/python/sglang/srt/observability/scheduler_metrics_mixin.py
index 0508953..9131fee 100644
--- a/python/sglang/srt/observability/scheduler_metrics_mixin.py
+++ b/python/sglang/srt/observability/scheduler_metrics_mixin.py
@@ -1088,6 +1088,100 @@ class SchedulerMetricsMixin:
retracted=self.stats.num_retracted_reqs,
)

+ inflight = None
+ if include_all or "inflight" in include:
+ # Per-request breakdown of every queue: which rid is sitting where,
+ # for how long, and (for decode transfer) why it is stuck. All
+ # entry_time values are perf_counter() based (req_time_stats.py), so
+ # age MUST be computed here in the scheduler process; the client
+ # cannot subtract its own wall-clock from them.
+ now_perf = time.perf_counter()
+ # name -> (queue, entry_time field on req.time_stats)
+ inflight_queues = [("running", self.running_batch.reqs, None)]
+ if self.disaggregation_mode == DisaggregationMode.PREFILL:
+ inflight_queues += [
+ ("waiting", self.waiting_queue, "wait_queue_entry_time"),
+ (
+ "bootstrap",
+ self.disagg_prefill_bootstrap_queue.queue,
+ "prefill_bootstrap_queue_entry_time",
+ ),
+ (
+ "prefill_inflight",
+ self.disagg_prefill_inflight_queue,
+ "prefill_transfer_queue_entry_time",
+ ),
+ ]
+ elif self.disaggregation_mode == DisaggregationMode.DECODE:
+ inflight_queues += [
+ ("waiting", self.waiting_queue, "wait_queue_entry_time"),
+ (
+ "prealloc",
+ self.disagg_decode_prealloc_queue.queue,
+ "decode_prealloc_queue_entry_time",
+ ),
+ (
+ "transfer",
+ self.disagg_decode_transfer_queue.queue,
+ "decode_transfer_queue_entry_time",
+ ),
+ # retracted reqs were preallocated first; reuse that stamp.
+ (
+ "retracted",
+ self.disagg_decode_prealloc_queue.retracted_queue,
+ "decode_prealloc_queue_entry_time",
+ ),
+ ]
+ else:
+ inflight_queues.append(
+ ("waiting", self.waiting_queue, "wait_queue_entry_time")
+ )
+
+ def describe_req(entry, stage, entry_time_field):
+ # Queue elements are either Req (waiting/bootstrap/retracted/
+ # running) or DecodeRequest (prealloc/transfer). DecodeRequest
+ # wraps .req and carries the disagg-transfer state flags.
+ req = getattr(entry, "req", entry)
+ info = {
+ "rid": getattr(req, "rid", None),
+ "bootstrap_room": getattr(req, "bootstrap_room", None),
+ "seqlen": getattr(entry, "seqlen", None),
+ "stage": stage,
+ }
+ if entry_time_field is not None:
+ time_stats = getattr(req, "time_stats", None)
+ entry_time = (
+ getattr(time_stats, entry_time_field, 0.0)
+ if time_stats
+ else 0.0
+ )
+ info["age_s"] = (
+ round(now_perf - entry_time, 3) if entry_time else None
+ )
+ # Disagg-transfer state lives on DecodeRequest only. These flags
+ # are exactly what "stuck in transfer" means at the code level.
+ if entry is not req:
+ info["waiting_for_input"] = getattr(
+ entry, "waiting_for_input", None
+ )
+ info["timeout_cancel_issued"] = getattr(
+ entry, "timeout_cancel_issued", None
+ )
+ return info
+
+ inflight = []
+ for name, queue, entry_time_field in inflight_queues:
+ reqs_info = [
+ describe_req(entry, name, entry_time_field) for entry in queue
+ ]
+ inflight.append(
+ {
+ "name": name,
+ "num_reqs": len(queue),
+ "reqs": reqs_info,
+ }
+ )
+
return GetLoadsReqOutput(
dp_rank=self.dp_rank,
timestamp=time.time(),
@@ -1106,6 +1200,7 @@ class SchedulerMetricsMixin:
lora=lora,
disaggregation=disaggregation,
queues=queues,
+ inflight=inflight,
)

def update_device_timer(self: Scheduler):
diff --git a/python/sglang/srt/server_args.py b/python/sglang/srt/server_args.py
index 173bdbb..339791f 100644
--- a/python/sglang/srt/server_args.py
Expand Down
2 changes: 1 addition & 1 deletion docker/version.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
nightly-dev-20260608b
nightly-dev-20260612a
Loading