diff --git a/examples/train/megatron/NEMOTRON_ULTRA_THROUGHPUT.md b/examples/train/megatron/NEMOTRON_ULTRA_THROUGHPUT.md new file mode 100644 index 0000000000..e7268602f1 --- /dev/null +++ b/examples/train/megatron/NEMOTRON_ULTRA_THROUGHPUT.md @@ -0,0 +1,167 @@ +# Nemotron-3-Ultra-550B — Megatron training throughput & memory sweep + +Goal: find, on **8× nodes of 8×H200-141GB (64 GPUs, EFA)**, (1) the maximum +`trainer.max_tokens_per_microbatch` (MTPM) that fits for training fwd+bwd, and +(2) the Megatron parallelism (TP/PP/CP/EP/DP) that maximizes training throughput +for full-finetuning GRPO of Nemotron-3-Ultra-550B. + +## Method + +Runs use a dedicated harness that executes the **real** Megatron fwd+bwd training +path on fabricated rollouts (no vLLM generation), so numbers reflect genuine +training cost while iterating fast: + +- Trainer: `examples/train_scripts/full_context/trainer_ultra_sweep.py` + (extends the dummy `FullCtxTrainer`; logs per-step wall time + peak CUDA + reserved/allocated memory across policy workers to a JSONL file). +- Launcher: `examples/train/megatron/run_ultra_sweep.sh` (all knobs are env vars). +- Analysis: `examples/train_scripts/full_context/analyze_sweep.py`. + +Colocated config matches the validated recipe (`run_megatron_nemotron_ultra.sh`): +optimizer CPU-offloaded, `recompute_granularity=full`, `remove_microbatch_padding=true`, +vLLM colocated but **asleep during training** (engines sleep at init in colocate +mode, and the harness never wakes them — exactly the memory state of the real +training step). Peak reserved ≈ caching-allocator high-water mark during the step. + +`max_tokens_per_microbatch` bin-packs each microbatch up to that many tokens **per +DP rank**; a single sequence longer than the budget gets its own microbatch. So the +MTPM memory ceiling is per-DP-rank and independent of DP size. + +## TL;DR + +- **Max `max_tokens_per_microbatch`** at the validated config (TP8/PP4/EP16/DP2): **~64k** tokens/microbatch + (per DP rank). 64k fits; 72k+ OOMs. The model + bf16 grads take ~38 GiB/GPU (optimizer CPU-offloaded) + and a sleeping vLLM holds ~5–8 GiB, leaving ~95 GiB for activations even with `recompute_granularity=full`. +- **Highest training throughput**: **TP8 / PP2 / EP32 / DP4** at MTPM≈32k → **~8540 tok/s**, vs **7720 tok/s** + for the PP4/DP2 baseline at MTPM=48k — a ~11% gain from doubling data parallelism (DP2→DP4). PP2 is what + frees the GPUs for DP4; it costs MTPM headroom (caps ~32–40k) but nets faster. +- **The config space is tightly pinned** by the model (108 layers, MoE with 512 experts) and 141 GiB H200s: + PP must divide 108 (**PP=8 is invalid**), EP8 doubles expert memory and **OOMs at load**, and TP4 doubles + activation memory (sequence-parallel shards activations by TP) and **OOMs**. Viable full-FT configs are + essentially {TP8/PP4/EP16/DP2, TP8/PP2/EP32/DP4}. +- **Long context is activation-bound by the single longest sequence**: with `remove_microbatch_padding`, any + sequence longer than MTPM gets its own microbatch that must fit alone. The *single-sequence* ceiling is + **~40–48k tokens** at CP1/PP4/DP2 (well below the ~64k *packed* ceiling — a long contiguous sequence has a + much larger per-microbatch footprint than an equal-token pack of short ones). +- **Context parallelism roughly doubles that ceiling to ~96k.** CP *composes* with EP (in Megatron-Core + `EP` divides `TP·CP·DP`, so CP does not steal from EP's budget): **`TP8/PP4/CP2/EP16/DP1` fits a single 96k + sequence** (128k OOMs) while keeping baseline expert memory. CP=4 via `TP8/PP2/CP4/EP32/DP1` is *valid* but + *worse* — it still OOMs at 128k because dropping to PP2 (needed to free GPUs for CP4) doubles the weights and + eats the budget CP frees. So the practical long-context recipe is **PP4 + CP2** (≤~96k/seq, at the cost of + DP→1), and the 60k±30k distribution becomes mostly trainable (clamp ~96k truncates only the ~10% tail vs + ~half at CP1's ~40k clamp). +- **Long sequences are *more* throughput-efficient per token** (~12k tok/s on a ~39k-mean distribution at + PP4/DP2, vs ~7.7k for uniform 10 240-token seqs): bigger microbatches use the GEMMs better and incur less + per-microbatch/pipeline overhead. +- **The throughput-optimal config is sequence-length-dependent**: PP2/EP32/DP4 for short/medium sequences + (≤~48k); PP4/EP16/DP2 for long sequences (higher single-sequence ceiling). + +## Cluster / model facts (measured) +- 64× H200-141 GiB (8 nodes), EFA. GPU usable ≈ 139.8 GiB; sleeping colocated vLLM holds ~5–8 GiB during training. +- Model: 108 layers (hybrid Mamba2 + attention + latent MoE, 512 experts). At TP8/PP4/EP16: **9.44B params/GPU** + → ~18.9 GiB bf16 weights + ~18.9 GiB bf16 grads (~38 GiB fixed; AdamW master/moments CPU-offloaded). +- MoE expert memory/GPU ∝ (108/PP)·(512/EP); **PP·EP is the invariant**. Baseline PP4·EP16 = 64. + PP2 needs EP32 to match (PP2·EP32=64); PP4·EP8=32 ⇒ 2× expert memory ⇒ OOM. + +## Stage 1 — max tokens per microbatch (TP8/PP4/EP16/DP2, uniform 10240-token seqs) + +| MTPM (setting) | largest microbatch | result | +|---:|---:|:--| +| 65536 (64k) | 61 440 tok | **FITS** — steady 77 s/step @ 327 680 tok | +| 73728 (72k) | 71 680 tok | FAIL (DistBackend; one rank OOM aborts NCCL — boundary) | +| 81920 (80k) | 81 920 tok | OOM (forward_backward) | +| 98304 (96k) | 92 160 tok | OOM | +| 131072 (128k) | 131 072 tok | OOM (needed +14.1 GiB; only 8.2 GiB free) | + +**Max safe MTPM ≈ 64k** for packed short/medium sequences. + +## Stage 2 — parallelism sweep for throughput (fixed 655 360-token workload, uniform 10240-token seqs) + +| Config (TP/PP/CP/EP, DP) | MTPM | step time | **throughput** | in-step peak | result | +|:--|---:|---:|---:|---:|:--| +| **TP8/PP2/EP32, DP4** | 32k | 76.8 s | **8 539 tok/s** | 109.9 GB | **OK — fastest** | +| TP8/PP4/EP16, DP2 (baseline) | 48k | 84.9 s | 7 719 tok/s | 108.3 GB | OK | +| TP8/PP2/EP32, DP4 | 48k | — | — | — | OOM (PP2 caps MTPM ~32–40k) | +| TP4/PP4/EP16, DP4 | 32k | — | — | — | OOM (TP4 ⇒ 2× activation via SP) | +| TP8/PP4/EP8, DP2 | 48k | — | — | — | OOM at model load (2× expert mem) | +| TP8/PP8/EP8, DP1 | — | — | — | — | INVALID (108 not divisible by 8) | +| TP4/PP8/EP8, DP2 | — | — | — | — | INVALID (108 not divisible by 8) | + +Doubling data parallelism (DP2→DP4) is the throughput lever; PP2 is the only way to free GPUs for DP4 +given the layer-count and EP constraints. TP must stay 8 (sequence parallelism shards activations by TP). + +## Stage 3 — long context (variable length) & context parallelism + +**CP composes with EP** (not with DP-budget-for-EP as first assumed): in Megatron-Core the expert group is +formed over `TP·CP·DP`, so with `ETP=1`, **`EP` must divide `TP·CP·DP`** — adding CP does *not* force EP down. +This makes CP genuinely useful here. Single-sequence ceiling (one sequence alone in its microbatch): + +| Config | world | single-seq ceiling | note | +|:--|:--|---:|:--| +| CP1 / PP4 / EP16 / DP2 | 8·4·1·2 | **~40–48k tok** | single 40 960 fits (peak 110 GB); 49 152 OOMs. Long contiguous seq footprint ≫ equal-token pack of short seqs, so single-seq ceiling < packed 64k. | +| **CP2 / PP4 / EP16 / DP1** | 8·4·2·1 | **~96k tok** | EP16 still valid (`EP \| TP·CP·DP = 16`); **single 98 304 FITS**, 131 072 OOMs. Keeps PP4's low weights (~38 GiB) and shards the seq 2×. Best long-context config. | +| CP4 / PP2 / EP32 / DP1 | 8·2·4·1 | <128k | *valid* (`EP32 \| 8·4·1=32`) and loads, but **131 072 OOMs**: dropping to PP2 to free GPUs for CP4 doubles weights (~76 GiB) and eats the budget CP frees — worse than PP4/CP2. | + +So **CP roughly doubles the usable single-sequence length** (~40–48k → ~96k) via **PP4 + CP2**, at the cost of +collapsing DP to 1 (≈ half the data-parallel throughput). The 60k±30k distribution is then **mostly trainable**: +clamping at ~96k truncates only the ~10% upper tail (vs truncating ~half at CP1's ~40k clamp). The extreme +131k tail still OOMs (the LM-head logits / non-CP-sharded buffers don't shrink enough); CP4 doesn't fix it +because of the PP2 weight penalty. + +### Throughput on a variable-length long-context distribution + +The requested distribution was 256 samples ~ N(60k, 30k) tokens. Because the single-sequence +ceiling is ~40–48k (above), the distribution must be **truncated** to a value that fits as a single +microbatch; the 60k mean is not trainable per-sequence on 64 GPUs. Measured on PP4/EP16/DP2 with +sequences ~ N(60k, 30k) **clamped to [1k, 40 960]** (realized mean ~39k, max 40 960), MTPM=40 960: + +| Config | distribution (realized) | MTPM | step time | **throughput** | peak | +|:--|:--|---:|---:|---:|---:| +| TP8/PP4/EP16/DP2 | 64 varlen seqs, mean ~39k, max 40 960 | 40 960 | 207 s | **~12 070 tok/s** | 112.9 GB | + +(Throughput is per-token and count-independent; 64 samples used for tractable wall-clock. Earlier +256-sample runs confirmed the packing/fit behaviour and the OOM on sequences >~48k.) + +**Long sequences are *more* throughput-efficient per token** than short ones (~12.1k tok/s here vs +~7.7k for uniform 10 240-token seqs at the same PP4/DP2): a microbatch of one ~40k sequence (or a few +medium ones) uses the GEMMs far better and incurs less per-microbatch / pipeline overhead than packing +many tiny 10 240-token sequences. So for long-context RL the throughput ceiling is set by **fitting the +longest single sequence** (clamp responses to ~40k), not by aggregate tokens. + +## Recommendations + +- **Short/medium sequences (≤~40k), max throughput:** `TP8 / PP2 / EP32 / ETP1 / DP4`, MTPM≈32k + (~8.5k tok/s on uniform short seqs; ~11% over the PP4/DP2 baseline). +- **Long context, no CP:** `TP8 / PP4 / EP16 / ETP1 / DP2`, MTPM≈40–64k (single-sequence ceiling ~40–48k; + ~12k tok/s on a ~39k-mean distribution — long seqs are more throughput-efficient per token). +- **Longest single sequences (up to ~96k):** `TP8 / PP4 / CP2 / EP16 / ETP1 / DP1` — CP2 ~doubles the + single-seq ceiling to ~96k (EP16 stays valid since `EP \| TP·CP·DP`). Costs DP→1 (≈ half the DP throughput), + so use it only when sequences actually exceed ~48k. Prefer **PP4+CP2** over PP2+CP4 (PP2's weights negate CP). +- **For the 60k±30k distribution:** clamp responses to ~96k with PP4/CP2 (≈10% of samples truncated), or to + ~40k with PP4/DP2 (≈half truncated, but full DP throughput). The full untruncated 131k tail is not trainable + on 64×H200. +- Keep `TP=8` (sequence parallelism shards activations by TP — TP4 doubles activation memory and OOMs), + optimizer CPU-offload on, and `recompute_granularity=full`. + + +## Reproducing + +```bash +# Stage 1 — MTPM ceiling at the baseline config (uniform 10k-token seqs). +TP=8 PP=4 CP=1 EP=16 ETP=1 MTPM=65536 MODE=uniform SEQ_LEN=10240 \ + TAG=s1_tp8pp4ep16_mtpm65536 SWEEP_RESULTS_FILE=/home/ray/ultra_sweep/results.jsonl \ + bash examples/train/megatron/run_ultra_sweep.sh + +# Stage 2 — highest-throughput config (DP4). +TP=8 PP=2 CP=1 EP=32 ETP=1 MTPM=32768 MODE=uniform SEQ_LEN=10240 NUM_SEQ=64 \ + TAG=s2_tp8pp2ep32_dp4 SWEEP_RESULTS_FILE=/home/ray/ultra_sweep/results.jsonl \ + bash examples/train/megatron/run_ultra_sweep.sh + +# Stage 3 — long-context varlen distribution (clamped to the ~40k single-seq ceiling). +TP=8 PP=4 CP=1 EP=16 ETP=1 MODE=varlen AVG_LEN=60000 STD_LEN=30000 MAX_LEN=40960 \ + MTPM=40960 NUM_SEQ=64 TAG=s3_varlen_clamp40k \ + SWEEP_RESULTS_FILE=/home/ray/ultra_sweep/results.jsonl \ + bash examples/train/megatron/run_ultra_sweep.sh + +python examples/train_scripts/full_context/analyze_sweep.py /home/ray/ultra_sweep/results.jsonl +``` diff --git a/examples/train/megatron/README_nemotron_ultra.md b/examples/train/megatron/README_nemotron_ultra.md new file mode 100644 index 0000000000..12ce1f2c90 --- /dev/null +++ b/examples/train/megatron/README_nemotron_ultra.md @@ -0,0 +1,115 @@ +# Nemotron-3-Ultra-550B GRPO RL on GSM8K (Megatron, multi-node) + +Full-finetuning GRPO RL of **NVIDIA-Nemotron-3-Ultra-550B-A55B-BF16** (NemotronH hybrid +Mamba2 + attention, latent MoE with 512 experts, reasoning model) colocated with vLLM on +**8× nodes of 8×H200-141GB (64 GPUs, EFA)**. + +Recipe: [`run_megatron_nemotron_ultra.sh`](./run_megatron_nemotron_ultra.sh). +Staging helper: [`stage_nemotron_ultra.py`](./stage_nemotron_ultra.py). + +**Validated:** trains end-to-end with this config — `avg_raw_reward ≈ 0.9`, GSM8K +`eval ≈ 0.94`, `grad_norm > 0` (genuinely learning). Megatron mesh TP8 / PP4 / EP16 / ETP1 +(DP2); vLLM TP8 × PP4 (2 engines). + +--- + +## Replicating on a fresh cluster + +The cluster needs: 8 nodes × 8×H200-141GB, EFA, a Ray cluster, a large **node-local** disk at +`/mnt/local_storage` (~28 TB), and a small shared `/home` (which the 1.1 TB model must NOT touch). + +### 1. Make sure this PR's code is present +The recipe depends on several fixes in this PR (see [Why these fixes](#why-these-fixes-are-needed)). +On stock SkyRL/vLLM without them you get coherent-looking **garbage** generations and `reward=0`. + +### 2. Stage the model + data on every GPU node +Everything lives on node-local `/mnt/local_storage` (the model is too big for `/home`, and every +rank needs its data locally). One command does both, on all nodes, via Ray: + +```bash +export HF_TOKEN=$(cat ~/.HF_TOKEN) # fast authenticated download; unauthenticated is throttled +uv run --isolated --with ray --with huggingface_hub --with hf_transfer --with datasets \ + python examples/train/megatron/stage_nemotron_ultra.py +``` + +This downloads the HF snapshot to `/mnt/local_storage/hf_cache` **including `chat_template.jinja`** +and writes the GSM8K parquets to `/mnt/local_storage/data/gsm8k` on each node. Re-run it if the +autoscaler churns in a fresh (un-staged) node. + +> The `*.jinja` is essential. The tokenizer ships **no** chat template inline; the official ChatML + +> reasoning template lives in `chat_template.jinja`. Without it the instruct model is prompted +> off-distribution and never produces a parseable answer (reward stays 0). + +### 3. Caches go to `/mnt/local_storage` +Handled by the script: it exports `HF_HOME`, `XDG_CACHE_HOME`, `UV_CACHE_DIR`, `TRITON_CACHE_DIR`, +`TORCHINDUCTOR_CACHE_DIR`, `VLLM_CACHE_ROOT` → `/mnt/local_storage/...`, and SkyRL's +`prepare_runtime_environment` (this PR) forwards them to the Ray worker actors. Otherwise workers +default to `~/.cache` on the small `/home`, fill it, and take the node down (looks like an OOM / +preemption, but it's disk). + +### 4. Launch +```bash +export WANDB_API_KEY= +export HF_TOKEN=$(cat ~/.HF_TOKEN) # for churn resilience +bash examples/train/megatron/run_megatron_nemotron_ultra.sh +``` +EFA + multi-node specifics (all set by the script): `LD_LIBRARY_PATH=/opt/amazon/efa/lib`, +`SKYRL_LD_LIBRARY_PATH_EXPORT=1`, `VLLM_USE_RAY_V2_EXECUTOR_BACKEND=1`, `NVTE_FLASH_ATTN=0`, and +raised placement-group / inference-server health timeouts (the 550B takes >600 s to come up). + +That's it — **stage model+data on every node, keep caches on `/mnt/local_storage`, and run.** + +--- + +## Why these fixes are needed + +The hard part was that vLLM generated **garbage** (multilingual token-salad / degenerate +repetition) after every weight sync → all rewards 0 → no learning. The root-cause chain and the +fixes (all in this PR): + +1. **CUDA-IPC weight sync used only rank-0's slicing metadata** (the core bug, general to MoE). + Each Megatron rank packs its *own* contiguous buffer (different params/order per rank — expert + chunks carry per-EP-rank names) and registers one IPC handle per physical GPU, but only rank 0's + `names`/`sizes`/`shapes` were sent. Each vLLM worker rebuilt *its own* GPU's buffer yet sliced it + with rank-0's metadata → correct bytes loaded under the wrong names → coherent-but-garbage, no + crash. Identical across PP ranks (so it worked at PP=2) but divergent at **PP>2 / EP>16**. + *Fix:* send per-GPU metadata; each worker slices its own buffer with its own + (`cuda_ipc_strategy.py`, `new_inference_worker_wrap.py`). Verified: EP16/PP4 post-sync logprob + diff `2.0 → 0.15`. + +2. **fp32 MoE router bias must not be down-cast.** `gate.e_score_correction_bias` is large + (~25–57) with tiny per-expert offsets (std ~7e-4) far below bf16 ULP at that scale; bf16 rounding + collapses the offsets and corrupts routing. *Fix:* transfer it in native fp32 through the sync + (`megatron_worker.py`). + +3. **vLLM layerwise-reload dropped Mamba `mixer.D`** (cf. vllm-project/vllm#44814). The reload + element-counter double-counts `composed_weight_loader` params (Mamba `A`), finalizing the layer + early and leaving `mixer.D` uninitialized → NaN. *Fix:* a guarded monkeypatch capping the count at + `param.numel()` (`layerwise_reload.py`), alongside SkyRL's existing `conv_weights` reload patch. + Remove once on a vLLM that includes #44814. + +4. **Chat template staging** (`*.jinja`) — see step 2 above. + +5. **Robust reasoning-aware GSM8K reward** — strip the `` trace, then score the answer with + strict `#### ` else last-number-with-normalization, so boxed/natural-language answers from a + reasoning model are scored correctly (`skyrl_gym/envs/gsm8k/env.py`). + +6. **Worker env forwarding** — `prepare_runtime_environment` (training) and the GPU-CI conftest + forward `HF_*` / cache dirs / `VLLM_USE_RAY_V2_EXECUTOR_BACKEND` / + `SKYRL_WAIT_UNTIL_INFERENCE_SERVER_HEALTHY_TIMEOUT_S` to the Ray worker actors. + +## Memory & parallelism notes +- Full-FT adds bf16 grads (~= weights) + the AdamW master. At EP16/**PP2** that's ~69+69 GiB/GPU → + OOMs the 141 GiB H200, so we use **PP=4** (halves per-GPU weights and grads → ~34+34). The + optimizer (fp32 master + Adam moments) is **CPU-offloaded** (host RAM, ~2 TB/node). +- With fix #1 in place, weight sync is correct at **any** EP/PP; EP is now just a memory/throughput + knob (e.g. EP=32 → 16 experts/rank for more headroom). EP must divide TP×DP. +- vLLM PP=4 keeps its weights ~34 GiB/GPU so both vLLM (woken) and the resident policy shard fit + during the colocated sync. + +## Known issues +- The model emits a `` reasoning block; `max_generate_length=2048` gives room to + finish reasoning AND emit the answer (batched mode can't toggle `enable_thinking`). +- Node churn on large pools: a vLLM worker dying ("Executor failed") kills the run; raise + `trainer.ckpt_interval` resilience and re-stage churned-in nodes. `HF_HUB_OFFLINE=0` lets an + un-staged node self-download to `/mnt` instead of erroring. diff --git a/examples/train/megatron/run_megatron_nemotron_ultra.sh b/examples/train/megatron/run_megatron_nemotron_ultra.sh new file mode 100644 index 0000000000..41e173ed0b --- /dev/null +++ b/examples/train/megatron/run_megatron_nemotron_ultra.sh @@ -0,0 +1,192 @@ +set -x + +# Colocated GRPO training+generation for NVIDIA-Nemotron-3-Ultra-550B-A55B on GSM8K with Megatron. +# Runs on 8 nodes of 8xH200-141GB (64 GPUs), EFA interconnect. +# +# This is *full-finetuning* RL (no LoRA, no ref/KL model). It builds on the configs proven by +# the logprob round-trip test (tests/.../gpu_ci/megatron/test_megatron_models.py::[nemotron3-ultra]) +# but the test was forward-only (inference_only_init), so it had no optimizer/grads. Training adds +# bf16 grads (~same size as the weights) + the AdamW master state, so to fit the 141 GiB H200 we: +# (a) shard depth with PP=4 (halves per-GPU weights AND grads vs the test's PP=2 -> ~34+34 GiB), +# (b) CPU-offload the optimizer (fp32 master + Adam moments live on host RAM, not GPU), +# (c) recompute activations, (d) bin-pack microbatches by token count, and +# (e) drop the KL/ref model (no second 550B copy). +# VALIDATED working: this exact config trains end-to-end (reward ~0.9, gsm8k eval ~0.94, grad_norm>0). +# +# NOTE on correctness: getting coherent generations required two SkyRL fixes that are now in-tree +# (not knobs here): (1) the CUDA-IPC weight-sync sends per-GPU slicing metadata so each vLLM worker +# slices its own packed buffer correctly -- without it, weight sync corrupts vLLM at PP>2 / EP>16 +# (the policy stays fine, but vLLM generates token-salad and reward stays 0); (2) a vLLM +# layerwise-reload patch (cf. vllm-project/vllm#44814) so the NemotronH Mamba `mixer.D` isn't +# dropped during reload. If you run on a SkyRL/vLLM without these, expect garbage generations. +# +# Prereqs: +# uv run examples/train/gsm8k/gsm8k_dataset.py --output_dir $HOME/data/gsm8k +# Stage the model on every node's local disk (1.1TB; /home is too small): +# see the staging helper used for the test (HF_HOME=/mnt/local_storage/hf_cache). +# IMPORTANT: stage chat_template.jinja too (include *.jinja in allow_patterns). It is +# the model's official ChatML+reasoning template; without it the tokenizer/vLLM have NO +# chat template, the instruct model is prompted off-distribution, and reward stays 0. +# export WANDB_API_KEY= +# bash examples/train/megatron/run_megatron_nemotron_ultra.sh + +# --------------------------------------------------------------------------- +# Environment (must reach the Ray workers). These mirror the test's run env. +# --------------------------------------------------------------------------- +# Model is staged on each node's large local disk (1.1TB won't fit /home/ray's 255GB). +export HF_HOME=${HF_HOME:-/mnt/local_storage/hf_cache} +# Redirect ALL caches off the small home disk (255GB) to the big local disk (28TB). +# Workers write uv build envs, Triton/Inductor/vLLM/FlashInfer JIT caches, etc. to +# ~/.cache by default; on a small home disk that fills up and takes the node down. +# These are forwarded to Ray workers by prepare_runtime_environment. +export XDG_CACHE_HOME=${XDG_CACHE_HOME:-/mnt/local_storage/.cache} +export UV_CACHE_DIR=${UV_CACHE_DIR:-/mnt/local_storage/.cache/uv} +export TRITON_CACHE_DIR=${TRITON_CACHE_DIR:-/mnt/local_storage/.cache/triton} +export TORCHINDUCTOR_CACHE_DIR=${TORCHINDUCTOR_CACHE_DIR:-/mnt/local_storage/.cache/inductor} +export VLLM_CACHE_ROOT=${VLLM_CACHE_ROOT:-/mnt/local_storage/.cache/vllm} +# Use the local cache only (avoids re-downloading / HF rate limits). Unset if you +# want to allow downloads on first run. +export HF_HUB_OFFLINE=${HF_HUB_OFFLINE:-1} +# EFA: NCCL must see the EFA libs, and SkyRL must forward LD_LIBRARY_PATH to Ray workers. +export LD_LIBRARY_PATH=/opt/amazon/efa/lib:${LD_LIBRARY_PATH:-} +export SKYRL_LD_LIBRARY_PATH_EXPORT=1 +# vLLM multi-node executor: the default Ray compiled-DAG (shm channel) crashes the raylet +# on the cross-node hop; the V2 (MultiprocExecutor/MessageQueue) backend avoids it. +export VLLM_USE_RAY_V2_EXECUTOR_BACKEND=1 +# Megatron attention backend (TE flash attn off; see .claude/docs/backends/megatron.md). +export NVTE_FLASH_ATTN=0 +# 8-node uv cache warmup + 550B load can exceed the default placement-group timeout. +export SKYRL_RAY_PG_TIMEOUT_IN_S=${SKYRL_RAY_PG_TIMEOUT_IN_S:-1800} +# The 550B vLLM engines take a while to come up; raise the health-wait timeout. +export SKYRL_WAIT_UNTIL_INFERENCE_SERVER_HEALTHY_TIMEOUT_S=${SKYRL_WAIT_UNTIL_INFERENCE_SERVER_HEALTHY_TIMEOUT_S:-2400} +# Set HF_TOKEN (e.g. `export HF_TOKEN=$(cat ~/.HF_TOKEN)`) for fast authenticated staging. +# HF_HUB_OFFLINE=0 (instead of 1) makes workers re-download a missing shard to the big +# disk if a node churns in un-staged, instead of erroring; with a stable staged pool, 1 is fine. +# Surface vLLM/worker logs to stdout (helpful while bringing this up; comment out later). +export SKYRL_DUMP_INFRA_LOG_TO_STDOUT=${SKYRL_DUMP_INFRA_LOG_TO_STDOUT:-1} +# export NCCL_DEBUG=WARN + +# Data must be present on ALL nodes (node-local) for multi-node training. gsm8k is tiny; +# stage it to each node's local disk (e.g. copy $HOME/data/gsm8k -> here on every node). +DATA_DIR="/mnt/local_storage/data/gsm8k" +LOGGER="wandb" # change to "console" to print to stdout +MODEL_NAME="nvidia/NVIDIA-Nemotron-3-Ultra-550B-A55B-BF16" + +INFERENCE_BACKEND="vllm" # currently only vllm is supported for megatron + +NUM_NODES=8 +NUM_GPUS=8 # per node + +### Megatron (policy) parallelism. world = TP*PP*DP = 64. +# TP within the NVLink domain; NemotronH Mamba requires TP | n_groups(=8), so TP in {1,2,4,8}. +MEGATRON_TP=8 +# PP=4 (vs the forward-only test's 2): training adds bf16 grads (~same size as weights), so at +# EP=16/PP=2 the ~69 GiB weights/GPU + ~69 GiB grads ~= 138 GiB doesn't fit the 141 GiB H200. +# PP=4 halves the layers (hence weights AND grads) per GPU to ~34+34 GiB, which fits. +MEGATRON_PP=4 +MEGATRON_CP=1 +# EP=16, ETP=1 -> EDP=1 (world = TP*PP*DP = 8*4*2 = 64; EP*ETP = 16 = TP*DP = 8*2). +# This is the validated config. Earlier runs at EP=32 produced garbage vLLM generations, but that +# was the CUDA-IPC weight-sync TRANSPORT bug (rank-0 slicing metadata reused for every GPU's +# divergent buffer), NOT the expert sharding itself -- the bridge's expert export is bit-correct at +# every EP. With the per-GPU-metadata fix now in-tree, any valid EP syncs correctly, so EP is purely +# a memory/throughput knob: e.g. EP=32 (16 experts/rank vs 32) further lowers per-GPU expert memory +# if you need more headroom. EP must divide TP*DP. +MEGATRON_EP=16 +MEGATRON_ETP=1 + +# Activation recompute (gated by trainer.gradient_checkpointing=true, which is the default). +RECOMPUTE_GRANULARITY="full" +RECOMPUTE_METHOD="uniform" +RECOMPUTE_NUM_LAYERS=1 + +# CPU-offload the optimizer (fp32 master + AdamW) so it doesn't sit on the GPU. +OPTIMIZER_OFFLOAD=true +OPTIMIZER_OFFLOAD_FRACTION=1.0 + +# Bin-pack microbatches by token count (with remove_microbatch_padding). When >0, +# micro_*_batch_size_per_gpu are ignored. Bounds activation memory; a single sequence +# longer than this still gets its own microbatch. longest seq here ~= 512+1024. +MAX_TOKENS_PER_MICROBATCH=4096 + +### Inference engine (vLLM), colocated over the same 64 GPUs. +# TP=8 (intra-node, NVLink) x PP=4 (cross-node, EFA) = 32 GPUs/engine, 2 engines -> 64 GPUs. +# vLLM TP must divide Mamba n_groups(=8); cross-node scale comes from PP. PP=4 (not 2) keeps +# vLLM weights ~34GB/GPU so during the colocated weight sync (vLLM woken alongside the resident +# policy shard) both fit on the 141 GiB H200 (PP=2 -> ~69+69 OOMs). +NUM_INFERENCE_ENGINES=2 +INFERENCE_ENGINE_TP=8 +INFERENCE_ENGINE_PP=4 +# Cap context: the model's native max is huge and vLLM sizes the KV pool for 1 max-len request. +INFERENCE_ENGINE_MAX_MODEL_LEN=4096 +# Nemotron-3-Ultra is a REASONING model: its official chat_template.jinja defaults to +# enable_thinking=true, so each rollout emits a ... block before the answer. +# In batched mode chat templating is done server-side by vLLM (chat_template_kwargs is not +# supported), so we cannot disable thinking from here -- instead we give generation enough +# budget to finish reasoning AND emit the final `#### ` the gsm8k reward parser wants. +# (Earlier runs got reward=0 because the chat template wasn't staged at all -> the instruct +# model was prompted off-distribution and never produced a parseable answer.) +GEN_MAX_LEN=2048 +# vLLM and the policy alternate on-GPU (sleep/wake); leave headroom for the policy shard. +GPU_MEMORY_UTILIZATION=0.6 + +uv run --isolated --extra megatron -m skyrl.train.entrypoints.main_base \ + data.train_data="['$DATA_DIR/train.parquet']" \ + data.val_data="['$DATA_DIR/validation.parquet']" \ + trainer.algorithm.advantage_estimator="grpo" \ + trainer.policy.model.path=$MODEL_NAME \ + trainer.placement.colocate_all=true \ + trainer.strategy=megatron \ + trainer.placement.policy_num_nodes=$NUM_NODES \ + trainer.placement.policy_num_gpus_per_node=$NUM_GPUS \ + generator.inference_engine.num_engines=$NUM_INFERENCE_ENGINES \ + generator.inference_engine.tensor_parallel_size=$INFERENCE_ENGINE_TP \ + generator.inference_engine.pipeline_parallel_size=$INFERENCE_ENGINE_PP \ + generator.inference_engine.distributed_executor_backend=ray \ + generator.inference_engine.use_expandable_segments=true \ + trainer.policy.megatron_config.tensor_model_parallel_size=$MEGATRON_TP \ + trainer.policy.megatron_config.pipeline_model_parallel_size=$MEGATRON_PP \ + trainer.policy.megatron_config.context_parallel_size=$MEGATRON_CP \ + trainer.policy.megatron_config.expert_model_parallel_size=$MEGATRON_EP \ + trainer.policy.megatron_config.expert_tensor_parallel_size=$MEGATRON_ETP \ + trainer.policy.megatron_config.transformer_config_kwargs.mtp_num_layers=0 \ + trainer.policy.megatron_config.transformer_config_kwargs.mtp_hybrid_override_pattern=null \ + trainer.policy.megatron_config.transformer_config_kwargs.recompute_granularity=$RECOMPUTE_GRANULARITY \ + trainer.policy.megatron_config.transformer_config_kwargs.recompute_method=$RECOMPUTE_METHOD \ + trainer.policy.megatron_config.transformer_config_kwargs.recompute_num_layers=$RECOMPUTE_NUM_LAYERS \ + trainer.policy.megatron_config.optimizer_config_kwargs.overlap_cpu_optimizer_d2h_h2d=$OPTIMIZER_OFFLOAD \ + trainer.policy.megatron_config.optimizer_config_kwargs.use_precision_aware_optimizer=$OPTIMIZER_OFFLOAD \ + trainer.policy.megatron_config.optimizer_config_kwargs.optimizer_cpu_offload=$OPTIMIZER_OFFLOAD \ + trainer.policy.megatron_config.optimizer_config_kwargs.optimizer_offload_fraction=$OPTIMIZER_OFFLOAD_FRACTION \ + trainer.remove_microbatch_padding=true \ + trainer.max_tokens_per_microbatch=$MAX_TOKENS_PER_MICROBATCH \ + trainer.epochs=20 \ + trainer.eval_batch_size=1024 \ + trainer.eval_before_train=false \ + trainer.eval_interval=5 \ + trainer.update_epochs_per_batch=1 \ + trainer.train_batch_size=64 \ + trainer.policy_mini_batch_size=32 \ + trainer.micro_forward_batch_size_per_gpu=1 \ + trainer.micro_train_batch_size_per_gpu=1 \ + trainer.max_prompt_length=512 \ + generator.sampling_params.max_generate_length=$GEN_MAX_LEN \ + trainer.policy.optimizer_config.lr=1.0e-6 \ + trainer.algorithm.use_kl_loss=false \ + generator.inference_engine.backend=$INFERENCE_BACKEND \ + generator.inference_engine.run_engines_locally=true \ + generator.inference_engine.weight_sync_backend=nccl \ + generator.inference_engine.async_engine=true \ + generator.inference_engine.engine_init_kwargs.max_model_len=$INFERENCE_ENGINE_MAX_MODEL_LEN \ + generator.inference_engine.gpu_memory_utilization=$GPU_MEMORY_UTILIZATION \ + generator.batched=true \ + environment.env_class=gsm8k \ + generator.n_samples_per_prompt=5 \ + trainer.logger="$LOGGER" \ + trainer.project_name="gsm8k_nemotron_ultra" \ + trainer.run_name="gsm8k_nemotron_ultra_tp${MEGATRON_TP}_pp${MEGATRON_PP}_ep${MEGATRON_EP}" \ + trainer.resume_mode=latest \ + trainer.max_ckpts_to_keep=3 \ + trainer.ckpt_interval=20 \ + trainer.ckpt_path="$HOME/ckpts/gsm8k_nemotron_ultra_ckpt" \ + $@ diff --git a/examples/train/megatron/run_ultra_sweep.sh b/examples/train/megatron/run_ultra_sweep.sh new file mode 100644 index 0000000000..e5de472379 --- /dev/null +++ b/examples/train/megatron/run_ultra_sweep.sh @@ -0,0 +1,164 @@ +#!/usr/bin/env bash +# Throughput / memory sweep harness for Nemotron-3-Ultra-550B (Megatron). +# +# Drives examples.train_scripts.full_context.main_ultra_sweep, which runs the real +# fwd+bwd training path on fabricated rollouts (no vLLM generation) and logs per-step +# peak CUDA memory + step time to $SWEEP_RESULTS_FILE. +# +# All knobs are env vars (with defaults). Example: +# TP=8 PP=4 CP=1 EP=16 ETP=1 MTPM=131072 MODE=uniform SEQ_LEN=10240 NUM_SEQ=64 \ +# TAG=tp8pp4ep16_mtpm128k SWEEP_RESULTS_FILE=/path/results.jsonl \ +# bash examples/train/megatron/run_ultra_sweep.sh +set -x + +# ---- Megatron parallelism (world = TP*PP*CP*DP = 64) ---- +TP=${TP:-8} +PP=${PP:-4} +CP=${CP:-1} +EP=${EP:-16} +ETP=${ETP:-1} + +# ---- Microbatch token budget (per DP rank) ---- +MTPM=${MTPM:-131072} + +# ---- Workload ---- +MODE=${MODE:-uniform} # uniform | varlen +SEQ_LEN=${SEQ_LEN:-10240} # uniform: total tokens/seq +PROMPT_LEN=${PROMPT_LEN:-512} +AVG_LEN=${AVG_LEN:-60000} # varlen +STD_LEN=${STD_LEN:-30000} +MIN_LEN=${MIN_LEN:-1024} +MAX_LEN=${MAX_LEN:-131072} +NUM_STEPS=${NUM_STEPS:-3} + +NUM_NODES=8 +NUM_GPUS=8 +WORLD=$((NUM_NODES * NUM_GPUS)) +DP=$((WORLD / (TP * PP * CP))) + +# ---- sequence count & batch sizes ---- +# Default: enough sequences so each DP rank forms ~2 full microbatches (uniform), +# or exactly 256 for varlen (the stage-3 target distribution). +if [ -z "${NUM_SEQ:-}" ]; then + if [ "$MODE" = "varlen" ]; then + NUM_SEQ=256 + else + # ceil(2 * MTPM * DP / SEQ_LEN), rounded up to a multiple of 8 + NUM_SEQ=$(( (2 * MTPM * DP + SEQ_LEN - 1) / SEQ_LEN )) + NUM_SEQ=$(( ((NUM_SEQ + 7) / 8) * 8 )) + if [ "$NUM_SEQ" -lt 8 ]; then NUM_SEQ=8; fi + fi +fi +# Round NUM_SEQ up to a multiple of (2*DP) so n_samples=2 and DP divisibility both hold. +LCM=$((2 * DP)) +NUM_SEQ=$(( ((NUM_SEQ + LCM - 1) / LCM) * LCM )) +N_SAMPLES=2 +TBS=$(( NUM_SEQ / N_SAMPLES )) +MINI=$TBS + +TAG=${TAG:-tp${TP}pp${PP}cp${CP}ep${EP}_mtpm${MTPM}_${MODE}} +SWEEP_RESULTS_FILE=${SWEEP_RESULTS_FILE:-/home/ray/ultra_sweep/results.jsonl} +mkdir -p "$(dirname "$SWEEP_RESULTS_FILE")" + +# ---- Environment (mirror the validated nemotron recipe) ---- +export HF_HOME=${HF_HOME:-/mnt/local_storage/hf_cache} +export XDG_CACHE_HOME=${XDG_CACHE_HOME:-/mnt/local_storage/.cache} +export UV_CACHE_DIR=${UV_CACHE_DIR:-/mnt/local_storage/.cache/uv} +export TRITON_CACHE_DIR=${TRITON_CACHE_DIR:-/mnt/local_storage/.cache/triton} +export TORCHINDUCTOR_CACHE_DIR=${TORCHINDUCTOR_CACHE_DIR:-/mnt/local_storage/.cache/inductor} +export VLLM_CACHE_ROOT=${VLLM_CACHE_ROOT:-/mnt/local_storage/.cache/vllm} +export HF_HUB_OFFLINE=${HF_HUB_OFFLINE:-1} +export LD_LIBRARY_PATH=/opt/amazon/efa/lib:${LD_LIBRARY_PATH:-} +export SKYRL_LD_LIBRARY_PATH_EXPORT=1 +export VLLM_USE_RAY_V2_EXECUTOR_BACKEND=1 +export NVTE_FLASH_ATTN=0 +export SKYRL_RAY_PG_TIMEOUT_IN_S=${SKYRL_RAY_PG_TIMEOUT_IN_S:-1800} +export SKYRL_WAIT_UNTIL_INFERENCE_SERVER_HEALTHY_TIMEOUT_S=${SKYRL_WAIT_UNTIL_INFERENCE_SERVER_HEALTHY_TIMEOUT_S:-2400} +export SKYRL_DUMP_INFRA_LOG_TO_STDOUT=${SKYRL_DUMP_INFRA_LOG_TO_STDOUT:-1} + +# ---- sweep trainer params (read by trainer_ultra_sweep.py) ---- +export SWEEP_RESULTS_FILE SWEEP_TAG="$TAG" SWEEP_MODE="$MODE" +export SWEEP_NUM_STEPS="$NUM_STEPS" SWEEP_NUM_SEQ="$NUM_SEQ" SWEEP_PROMPT_LEN="$PROMPT_LEN" +export SWEEP_SEQ_LEN="$SEQ_LEN" SWEEP_AVG_LEN="$AVG_LEN" SWEEP_STD_LEN="$STD_LEN" +export SWEEP_MIN_LEN="$MIN_LEN" SWEEP_MAX_LEN="$MAX_LEN" + +DATA_DIR="/mnt/local_storage/data/gsm8k" +MODEL_NAME="nvidia/NVIDIA-Nemotron-3-Ultra-550B-A55B-BF16" + +# vLLM colocated config (matches the recipe; vLLM sleeps during the training step). +NUM_INFERENCE_ENGINES=2 +INFERENCE_ENGINE_TP=8 +INFERENCE_ENGINE_PP=4 + +echo "[run_ultra_sweep] TAG=$TAG TP=$TP PP=$PP CP=$CP EP=$EP ETP=$ETP DP=$DP MTPM=$MTPM MODE=$MODE NUM_SEQ=$NUM_SEQ TBS=$TBS MINI=$MINI" + +uv run --isolated --extra megatron -m examples.train_scripts.full_context.main_ultra_sweep \ + data.train_data="['$DATA_DIR/train.parquet']" \ + data.val_data="['$DATA_DIR/validation.parquet']" \ + trainer.algorithm.advantage_estimator="grpo" \ + trainer.policy.model.path=$MODEL_NAME \ + trainer.placement.colocate_all=true \ + trainer.strategy=megatron \ + trainer.placement.policy_num_nodes=$NUM_NODES \ + trainer.placement.policy_num_gpus_per_node=$NUM_GPUS \ + generator.inference_engine.num_engines=$NUM_INFERENCE_ENGINES \ + generator.inference_engine.tensor_parallel_size=$INFERENCE_ENGINE_TP \ + generator.inference_engine.pipeline_parallel_size=$INFERENCE_ENGINE_PP \ + generator.inference_engine.distributed_executor_backend=ray \ + generator.inference_engine.use_expandable_segments=true \ + trainer.policy.megatron_config.tensor_model_parallel_size=$TP \ + trainer.policy.megatron_config.pipeline_model_parallel_size=$PP \ + trainer.policy.megatron_config.context_parallel_size=$CP \ + trainer.policy.megatron_config.expert_model_parallel_size=$EP \ + trainer.policy.megatron_config.expert_tensor_parallel_size=$ETP \ + trainer.policy.megatron_config.transformer_config_kwargs.mtp_num_layers=0 \ + trainer.policy.megatron_config.transformer_config_kwargs.mtp_hybrid_override_pattern=null \ + trainer.policy.megatron_config.transformer_config_kwargs.recompute_granularity=full \ + trainer.policy.megatron_config.transformer_config_kwargs.recompute_method=uniform \ + trainer.policy.megatron_config.transformer_config_kwargs.recompute_num_layers=1 \ + trainer.policy.megatron_config.optimizer_config_kwargs.overlap_cpu_optimizer_d2h_h2d=true \ + trainer.policy.megatron_config.optimizer_config_kwargs.use_precision_aware_optimizer=true \ + trainer.policy.megatron_config.optimizer_config_kwargs.optimizer_cpu_offload=true \ + trainer.policy.megatron_config.optimizer_config_kwargs.optimizer_offload_fraction=1.0 \ + trainer.remove_microbatch_padding=true \ + trainer.max_tokens_per_microbatch=$MTPM \ + trainer.epochs=1 \ + trainer.eval_before_train=false \ + trainer.eval_interval=100000 \ + trainer.update_epochs_per_batch=1 \ + trainer.train_batch_size=$TBS \ + trainer.policy_mini_batch_size=$MINI \ + trainer.micro_forward_batch_size_per_gpu=1 \ + trainer.micro_train_batch_size_per_gpu=1 \ + trainer.max_prompt_length=$PROMPT_LEN \ + generator.max_input_length=$PROMPT_LEN \ + generator.sampling_params.max_generate_length=$((SEQ_LEN - PROMPT_LEN)) \ + trainer.policy.optimizer_config.lr=1.0e-6 \ + trainer.algorithm.use_kl_loss=false \ + generator.inference_engine.backend=vllm \ + generator.inference_engine.run_engines_locally=true \ + generator.inference_engine.weight_sync_backend=nccl \ + generator.inference_engine.async_engine=true \ + generator.inference_engine.engine_init_kwargs.max_model_len=4096 \ + generator.inference_engine.gpu_memory_utilization=0.6 \ + generator.batched=true \ + environment.env_class=gsm8k \ + generator.n_samples_per_prompt=$N_SAMPLES \ + trainer.num_dummy_steps=$NUM_STEPS \ + trainer.sweep_results_file="$SWEEP_RESULTS_FILE" \ + trainer.sweep_tag="$TAG" \ + trainer.sweep_mode="$MODE" \ + trainer.sweep_num_seq=$NUM_SEQ \ + trainer.sweep_prompt_len=$PROMPT_LEN \ + trainer.sweep_seq_len=$SEQ_LEN \ + trainer.sweep_avg_len=$AVG_LEN \ + trainer.sweep_std_len=$STD_LEN \ + trainer.sweep_min_len=$MIN_LEN \ + trainer.sweep_max_len=$MAX_LEN \ + trainer.logger="console" \ + trainer.project_name="ultra_sweep" \ + trainer.run_name="$TAG" \ + trainer.resume_mode=none \ + trainer.ckpt_interval=100000 \ + trainer.ckpt_path="/mnt/local_storage/ultra_sweep_ckpt" \ + "$@" diff --git a/examples/train/megatron/stage_nemotron_ultra.py b/examples/train/megatron/stage_nemotron_ultra.py new file mode 100644 index 0000000000..3c40604e9c --- /dev/null +++ b/examples/train/megatron/stage_nemotron_ultra.py @@ -0,0 +1,116 @@ +"""Pre-stage NVIDIA-Nemotron-3-Ultra-550B + the GSM8K dataset onto every GPU node's +local disk, for the multi-node recipe in ``run_megatron_nemotron_ultra.sh``. + +Why this exists: the model is ~1.1 TB and the cluster's shared ``/home`` is small +(~255 GB), so the checkpoint must live on each node's large node-local disk +(``/mnt/local_storage``, ~28 TB). Multi-node training also needs the data present on +every node (each rank reads its data locally). This launches one Ray task per distinct +GPU node that (1) downloads the HF snapshot and (2) writes the GSM8K parquets, both under +``/mnt/local_storage``. + +IMPORTANT: ``allow_patterns`` includes ``*.jinja`` so the model's official +``chat_template.jinja`` is staged. Without it the tokenizer/vLLM have NO chat template, +the instruct/reasoning model is prompted off-distribution, and reward stays 0. + +Usage (from the head node, on a running Ray cluster): + export HF_TOKEN=$(cat ~/.HF_TOKEN) # fast authenticated download; unauth is throttled + uv run --isolated --with ray --with huggingface_hub --with hf_transfer --with datasets \ + python examples/train/megatron/stage_nemotron_ultra.py +""" + +import os + +import ray +from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy +from ray.util.state import list_nodes + +REPO = "nvidia/NVIDIA-Nemotron-3-Ultra-550B-A55B-BF16" +HF_HOME = "/mnt/local_storage/hf_cache" +DATA_DIR = "/mnt/local_storage/data/gsm8k" + +ray.init(address="auto", ignore_reinit_error=True, log_to_driver=True) + +_ENV = {"HF_HOME": HF_HOME, "HF_HUB_ENABLE_HF_TRANSFER": "1"} +if os.environ.get("HF_TOKEN"): + _ENV["HF_TOKEN"] = os.environ["HF_TOKEN"] + + +@ray.remote( + num_cpus=8, + runtime_env={"pip": ["huggingface_hub", "hf_transfer", "datasets"], "env_vars": _ENV}, +) +def stage(node_ip): + import socket + import time + + from huggingface_hub import snapshot_download + + # 1) Model snapshot (weights + config + tokenizer + chat_template.jinja). + model_path = None + last_err = None + for _ in range(8): + try: + model_path = snapshot_download( + REPO, + cache_dir=f"{HF_HOME}/hub", + max_workers=16, + # NOTE: *.jinja is required (the chat template); *.py pulls any custom code. + allow_patterns=["*.safetensors", "*.json", "*.txt", "tokenizer*", "*.model", "*.jinja", "*.py"], + ) + break + except Exception as e: # noqa: BLE001 + last_err = f"{type(e).__name__}: {str(e)[:300]}" + time.sleep(15) + if model_path is None: + return (socket.gethostname(), node_ip, "MODEL_FAILED", last_err) + + # 2) GSM8K parquets (tiny; one copy per node so every rank reads locally). + try: + import re + + import datasets + + os.makedirs(DATA_DIR, exist_ok=True) + instruction = 'Let\'s think step by step and output the final answer after "####".' + + def to_row(example, idx, split): + q = example["question"] + sol = re.search(r"#### (\-?[0-9\.\,]+)", example["answer"]).group(0).split("#### ")[1].replace(",", "") + return { + "data_source": "openai/gsm8k", + "prompt": [{"role": "user", "content": q + " " + instruction}], + "env_class": "gsm8k", + "reward_spec": {"method": "rule", "ground_truth": sol}, + "extra_info": {"split": split, "index": idx}, + } + + ds = datasets.load_dataset("openai/gsm8k", "main") + ds["train"].map(lambda e, i: to_row(e, i, "train"), with_indices=True).to_parquet(f"{DATA_DIR}/train.parquet") + ds["test"].map(lambda e, i: to_row(e, i, "test"), with_indices=True).to_parquet( + f"{DATA_DIR}/validation.parquet" + ) + except Exception as e: # noqa: BLE001 + return (socket.gethostname(), node_ip, "DATA_FAILED", f"{type(e).__name__}: {str(e)[:300]}") + + return (socket.gethostname(), node_ip, "DONE", model_path) + + +nodes = [ + n + for n in list_nodes(detail=True) + if (n.get("resources_total") or {}).get("GPU", 0) > 0 and n.get("state") == "ALIVE" +] +print("staging on %d GPU nodes: %s" % (len(nodes), [n["node_ip"] for n in nodes]), flush=True) +results = ray.get( + [ + stage.options(scheduling_strategy=NodeAffinitySchedulingStrategy(node_id=n["node_id"], soft=False)).remote( + n["node_ip"] + ) + for n in nodes + ] +) +print("\n===== RESULTS =====", flush=True) +ok = sum(1 for r in results if r[2] == "DONE") +for host, ip, status, detail in results: + print(f"[{ip} {host}] {status}: {detail}", flush=True) +print(f"\n{ok}/{len(results)} nodes staged OK", flush=True) diff --git a/examples/train_scripts/full_context/analyze_sweep.py b/examples/train_scripts/full_context/analyze_sweep.py new file mode 100644 index 0000000000..6471ce7221 --- /dev/null +++ b/examples/train_scripts/full_context/analyze_sweep.py @@ -0,0 +1,62 @@ +"""Summarize ultra-sweep results.jsonl into a throughput/memory table. + +Usage: python analyze_sweep.py /home/ray/ultra_sweep/results.jsonl +For each tag, reports the steady-state (non-warmup, non-error) median step time, +tokens/s, max peak reserved GB, min free GB, and OOM/error status. +""" + +import json +import sys +from collections import defaultdict +from statistics import median + + +def main(path): + by_tag = defaultdict(list) + order = [] + with open(path) as f: + for line in f: + line = line.strip() + if not line: + continue + r = json.loads(line) + if r["tag"] not in by_tag: + order.append(r["tag"]) + by_tag[r["tag"]].append(r) + + hdr = ( + f"{'tag':<34} {'tp':>2} {'pp':>2} {'cp':>2} {'ep':>3} {'dp':>2} " + f"{'mtpm':>8} {'mode':>7} {'maxseq':>7} {'tok/step':>9} " + f"{'step_s':>7} {'tok/s':>9} {'pkRsv':>6} {'minFree':>7} {'status':>8}" + ) + print(hdr) + print("-" * len(hdr)) + for tag in order: + recs = by_tag[tag] + steady = [r for r in recs if not r.get("warmup") and not r.get("error")] + oom = any(r.get("oom") for r in recs) + err = next((r["error"] for r in recs if r.get("error")), None) + any0 = recs[0] + if steady: + st = median([r["step_time_s"] for r in steady]) + tps = median([r["tokens_per_s"] for r in steady if r["tokens_per_s"]]) + pk = max([r["peak_reserved_gb"] for r in steady if r["peak_reserved_gb"]] or [0]) + mf = min([r["min_free_gb"] for r in steady if r["min_free_gb"]] or [0]) + status = "OK" + else: + st = tps = pk = mf = 0 + # use warmup peak if present + pk = max([r.get("peak_reserved_gb") or 0 for r in recs] or [0]) + mf = min([r.get("min_free_gb") or 99 for r in recs] or [0]) + status = "OOM" if oom else ("ERR" if err else "NONE") + print( + f"{tag:<34} {any0['tp']:>2} {any0['pp']:>2} {any0['cp']:>2} {any0['ep']:>3} {any0['dp']:>2} " + f"{any0['mtpm']:>8} {any0.get('mode',''):>7} {any0.get('max_seqlen',0):>7} " + f"{any0.get('total_tokens',0):>9} {st:>7.1f} {tps:>9.0f} {pk:>6.1f} {mf:>7.1f} {status:>8}" + ) + if err and status != "OK": + print(f" └─ {err[:160]}") + + +if __name__ == "__main__": + main(sys.argv[1] if len(sys.argv) > 1 else "/home/ray/ultra_sweep/results.jsonl") diff --git a/examples/train_scripts/full_context/main_ultra_sweep.py b/examples/train_scripts/full_context/main_ultra_sweep.py new file mode 100644 index 0000000000..477f70462b --- /dev/null +++ b/examples/train_scripts/full_context/main_ultra_sweep.py @@ -0,0 +1,77 @@ +"""Entrypoint for the Nemotron-Ultra throughput/memory sweep. + +uv run --isolated --extra megatron -m examples.train_scripts.full_context.main_ultra_sweep ... +""" + +import sys +from dataclasses import dataclass + +import ray + +from skyrl.train.config import TrainerConfig, make_config +from skyrl.train.entrypoints.main_base import BasePPOExp +from skyrl.train.utils import initialize_ray, validate_cfg + +from .trainer_ultra_sweep import UltraSweepTrainer + + +@dataclass +class UltraSweepTrainerConfig(TrainerConfig): + num_dummy_steps: int = 3 + # Sweep params passed via CLI (trainer.=...) so they reach the Ray + # worker through cfg (shell env vars do NOT propagate to the entrypoint actor). + sweep_results_file: str = "/home/ray/ultra_sweep/results.jsonl" + sweep_tag: str = "run" + sweep_mode: str = "uniform" # uniform | varlen + sweep_num_seq: int = -1 # -1 -> train_batch_size * n_samples_per_prompt + sweep_prompt_len: int = 512 + sweep_seq_len: int = 10240 # uniform: total tokens/seq + sweep_avg_len: int = 60000 # varlen + sweep_std_len: int = 30000 + sweep_min_len: int = 1024 + sweep_max_len: int = 131072 + sweep_seed: int = 1234 + + +UltraSweepConfig = make_config(trainer_cls=UltraSweepTrainerConfig) + + +class UltraSweepPPOExp(BasePPOExp): + def get_trainer( + self, + cfg, + tracker, + tokenizer, + train_dataset, + eval_dataset, + inference_engine_client, + generator, + colocate_pg, + ): + return UltraSweepTrainer( + cfg=cfg, + tracker=tracker, + tokenizer=tokenizer, + train_dataset=train_dataset, + eval_dataset=eval_dataset, + inference_engine_client=inference_engine_client, + generator=generator, + colocate_pg=colocate_pg, + ) + + +@ray.remote(num_cpus=1) +def skyrl_entrypoint(cfg): + exp = UltraSweepPPOExp(cfg) + exp.run() + + +def main() -> None: + cfg = UltraSweepConfig.from_cli_overrides(sys.argv[1:]) + validate_cfg(cfg) + initialize_ray(cfg) + ray.get(skyrl_entrypoint.remote(cfg)) + + +if __name__ == "__main__": + main() diff --git a/examples/train_scripts/full_context/trainer_ultra_sweep.py b/examples/train_scripts/full_context/trainer_ultra_sweep.py new file mode 100644 index 0000000000..eec3ed6d0d --- /dev/null +++ b/examples/train_scripts/full_context/trainer_ultra_sweep.py @@ -0,0 +1,210 @@ +"""Throughput / memory sweep trainer for Nemotron-Ultra-550B (Megatron). + +Extends the dummy ``FullCtxTrainer`` to (a) build either uniform-length or +variable-length synthetic batches, (b) record per-step wall time and the peak +CUDA reserved memory across policy workers, and (c) append one JSON record per +step to ``$SWEEP_RESULTS_FILE``. It runs the *real* fwd+bwd training path +(``fwd_logprobs_values_reward`` + ``train_critic_and_policy``) so the numbers +reflect genuine training cost, but fabricates the rollout so no vLLM generation +is needed. + +Driven entirely by env vars (so the same module serves every sweep config): + + SWEEP_RESULTS_FILE path to append JSONL results to (required) + SWEEP_TAG label for this run (e.g. "tp8pp4ep16_mtpm128k") + SWEEP_MODE "uniform" (default) or "varlen" + SWEEP_NUM_STEPS number of measured steps (default 3; first is warmup) + SWEEP_NUM_SEQ total sequences per step (default = train_batch_size*n_samples) + SWEEP_PROMPT_LEN prompt length per sequence (default 512) + # uniform mode: + SWEEP_SEQ_LEN total tokens per sequence (prompt+response), default 10240 + # varlen mode: + SWEEP_AVG_LEN mean total tokens/sequence (default 60000) + SWEEP_STD_LEN stddev of total tokens/sequence (default 30000) + SWEEP_MIN_LEN clamp floor for total length (default 1024) + SWEEP_MAX_LEN clamp ceiling for total length (default 131072) + SWEEP_SEED RNG seed for reproducible varlen draws (default 1234) +""" + +import json +import random +import time + +from loguru import logger + +from skyrl.train.utils.utils import Timer + +from .trainer_full_ctx import FullCtxTrainer + + +class UltraSweepTrainer(FullCtxTrainer): + def _build_lengths(self, num_seq): + """Return a list of (prompt_len, response_len) per sequence.""" + t = self.cfg.trainer + mode = t.sweep_mode + prompt_len = t.sweep_prompt_len + if mode == "uniform": + seq_len = t.sweep_seq_len + resp = max(1, seq_len - prompt_len) + return [(prompt_len, resp)] * num_seq, mode, seq_len, 0 + # varlen + rng = random.Random(t.sweep_seed) + out = [] + for _ in range(num_seq): + total = int(round(rng.gauss(t.sweep_avg_len, t.sweep_std_len))) + total = max(t.sweep_min_len, min(t.sweep_max_len, total)) + total = max(total, prompt_len + 1) + out.append((prompt_len, total - prompt_len)) + return out, mode, t.sweep_avg_len, t.sweep_std_len + + def _peak_reserved_gb(self): + """Peak CUDA high-water (reserved, allocated) and min free, GB, across policy workers. + + Uses max_reserved/max_allocated (high-water marks that survive empty_cache/offload) + so we capture the in-step fwd/bwd peak even though this is queried after the policy + has been offloaded back to CPU. Falls back to current reserved/allocated on older + workers that don't return the max_* keys. + """ + import ray + + try: + mems = ray.get( + self.policy_model.async_run_ray_method("pass_through", "get_cuda_memory"), + timeout=60, + ) + except Exception as e: # noqa: BLE001 + logger.warning(f"get_cuda_memory failed: {e}") + return None, None, None + reserved = [m.get("max_reserved", m["reserved"]) / 1e9 for m in mems] + allocated = [m.get("max_allocated", m["allocated"]) / 1e9 for m in mems] + free = [m["free"] / 1e9 for m in mems] + return max(reserved), max(allocated), min(free) + + def _record(self, rec): + path = self.cfg.trainer.sweep_results_file + if path: + try: + import os + + os.makedirs(os.path.dirname(path), exist_ok=True) + with open(path, "a") as f: + f.write(json.dumps(rec) + "\n") + except Exception as e: # noqa: BLE001 + logger.warning(f"could not write results file {path}: {e}") + logger.info("SWEEP_RESULT " + json.dumps(rec)) + + async def train(self): + tag = self.cfg.trainer.sweep_tag + num_steps = self.cfg.trainer.num_dummy_steps + n_samples = self.cfg.generator.n_samples_per_prompt + default_num_seq = self.cfg.trainer.train_batch_size * n_samples + cfg_num_seq = self.cfg.trainer.sweep_num_seq + num_seq = cfg_num_seq if cfg_num_seq and cfg_num_seq > 0 else default_num_seq + + mc = self.cfg.trainer.policy.megatron_config + world = self.cfg.trainer.placement.policy_num_nodes * self.cfg.trainer.placement.policy_num_gpus_per_node + dp = world // (mc.pipeline_model_parallel_size * mc.context_parallel_size * mc.tensor_model_parallel_size) + cfg_hdr = { + "tag": tag, + "tp": mc.tensor_model_parallel_size, + "pp": mc.pipeline_model_parallel_size, + "cp": mc.context_parallel_size, + "ep": mc.expert_model_parallel_size, + "etp": mc.expert_tensor_parallel_size, + "dp": dp, + "mtpm": self.cfg.trainer.max_tokens_per_microbatch, + "num_seq": num_seq, + } + logger.info(f"[ultra-sweep] starting: {json.dumps(cfg_hdr)}") + + self.global_step = 0 + with Timer("init_weight_sync_state", self.all_timings): + self.init_weight_sync_state() + + lengths, mode, p0, p1 = self._build_lengths(num_seq) + total_tokens = sum(p + r for p, r in lengths) + max_seqlen = max(p + r for p, r in lengths) + logger.info( + f"[ultra-sweep] mode={mode} num_seq={num_seq} total_tokens={total_tokens} " + f"max_seqlen={max_seqlen} dp={dp} per_dp_tokens={total_tokens // dp}" + ) + + # uids group sequences into n_samples_per_prompt groups (for grpo advantage std). + uids = [] + for i in range(num_seq): + uids.append(str(i // n_samples)) + + self.global_step += 1 + for step in range(num_steps): + oom = False + err = None + t0 = time.time() + try: + prompt_token_ids = [[random.randint(0, self.tokenizer.vocab_size - 1)] * p for (p, r) in lengths] + response_ids = [[random.randint(0, self.tokenizer.vocab_size - 1)] * r for (p, r) in lengths] + rewards = [[0.0] * (r - 1) + [float(random.randint(0, 1))] for (p, r) in lengths] + loss_masks = [[1] * r for (p, r) in lengths] + dummy = { + "prompt_token_ids": prompt_token_ids, + "response_ids": response_ids, + "rewards": rewards, + "loss_masks": loss_masks, + } + training_input = self.convert_to_training_input(dummy, uids) + with Timer("step", self.all_timings): + with Timer("fwd_logprobs_values_reward", self.all_timings): + training_input = self.fwd_logprobs_values_reward(training_input) + with Timer("compute_advantages_and_returns", self.all_timings): + training_input = self.compute_advantages_and_returns(training_input) + for key in ["rewards"]: + training_input.pop(key) + training_input.metadata.pop("uids") + with Timer("train_critic_and_policy", self.all_timings): + self.train_critic_and_policy(training_input) + except Exception as e: # noqa: BLE001 + err = f"{type(e).__name__}: {str(e)[:500]}" + if "out of memory" in str(e).lower() or "OutOfMemory" in str(e): + oom = True + logger.error(f"[ultra-sweep] step {step} FAILED: {err}") + + step_time = time.time() - t0 + if err: + # Workers can be wedged after a CUDA OOM; querying memory may hang. + peak_res, peak_alloc, min_free = None, None, None + else: + peak_res, peak_alloc, min_free = self._peak_reserved_gb() + rec = dict(cfg_hdr) + rec.update( + { + "step": step, + "warmup": step == 0, + "mode": mode, + "total_tokens": total_tokens, + "max_seqlen": max_seqlen, + "per_dp_tokens": total_tokens // dp, + "step_time_s": round(step_time, 2), + "tokens_per_s": round(total_tokens / step_time, 1) if step_time > 0 and not err else None, + "peak_reserved_gb": round(peak_res, 2) if peak_res else None, + "peak_alloc_gb": round(peak_alloc, 2) if peak_alloc else None, + "min_free_gb": round(min_free, 2) if min_free else None, + "oom": oom, + "error": err, + } + ) + self._record(rec) + try: + self.tracker.log({"timing/" + k: v for k, v in self.all_timings.items()}, step=self.global_step) + except Exception: # noqa: BLE001 + pass + self.all_timings = {} + self.all_metrics = {} + self.global_step += 1 + if err: + # No point continuing this config once it fails. + break + + try: + self.tracker.finish() + except Exception: # noqa: BLE001 + pass + logger.info(f"[ultra-sweep] done: {tag}") diff --git a/skyrl-gym/skyrl_gym/envs/gsm8k/env.py b/skyrl-gym/skyrl_gym/envs/gsm8k/env.py index d4fe5b3d88..d8266312d2 100644 --- a/skyrl-gym/skyrl_gym/envs/gsm8k/env.py +++ b/skyrl-gym/skyrl_gym/envs/gsm8k/env.py @@ -1,6 +1,15 @@ from skyrl_gym.envs.base_text_env import BaseTextEnv, BaseTextEnvStepOutput from skyrl_gym.envs.gsm8k import utils from typing import Dict, Any +import re as _re + +# Matches an integer/decimal (optionally signed, with thousands separators), e.g. "1,800", "-5", "72.0". +_NUM_RE = _re.compile(r"-?[0-9][0-9,]*(?:\.[0-9]+)?") + + +def _norm_num(s: str) -> str: + """Normalize a parsed number to compare against the (comma-free integer) ground truth.""" + return s.strip().rstrip(".").replace(",", "").replace("$", "") class GSM8kEnv(BaseTextEnv): @@ -16,7 +25,20 @@ def __init__(self, env_config: Any = None, extras: Dict[str, Any] = {}): self.ground_truth = extras["reward_spec"]["ground_truth"] def _get_reward(self, action: str) -> float: - return utils.compute_score(action, self.ground_truth) + # Reasoning models (e.g. Nemotron-3-Ultra) emit `` and may write + # the final answer in their own style (boxed / natural language) rather than the exact + # `#### ` the strict parser wants. Drop the `` trace (so its intermediate + # numbers can't be mistaken for the answer), then reward the exact `#### ` format if + # present (strict), else fall back to the last number in the answer (robust to boxed / + # natural-language answers and comma/$ formatting). Non-reasoning models (no ``) keep + # the original strict scoring on the full output, so this is a no-op for them. + answer_segment = action.split("")[-1] if "" in action else action + reward = utils.compute_score(answer_segment, self.ground_truth, method="strict") + if reward == 0.0: + nums = _NUM_RE.findall(answer_segment) + if nums and _norm_num(nums[-1]) == _norm_num(str(self.ground_truth)): + reward = 1.0 + return reward def step(self, action: str) -> BaseTextEnvStepOutput: done = True # always done after one step diff --git a/skyrl/backends/skyrl_train/inference_servers/layerwise_reload.py b/skyrl/backends/skyrl_train/inference_servers/layerwise_reload.py index d3aa2918d5..c2e0ca484c 100644 --- a/skyrl/backends/skyrl_train/inference_servers/layerwise_reload.py +++ b/skyrl/backends/skyrl_train/inference_servers/layerwise_reload.py @@ -42,6 +42,38 @@ except ImportError: pass +# Workaround for a SECOND vLLM layerwise-reload corruption affecting NemotronH/Mamba. +# `get_numel_loaded` counts elements copied by a weight loader to decide when a layer is +# fully loaded (so it can be finalized). A `composed_weight_loader` copies into its param +# twice (initial load + in-place transform), so the counter reports 2x the param's numel. +# That inflated count can reach the layer's total before all params arrive, finalizing the +# layer early and leaving trailing params (e.g. Mamba `mixer.D`) UNINITIALIZED -> NaN logits. +# Whether it trips depends on the order params arrive (i.e. the weight-sync chunk/bucket +# layout), which is why it corrupts at some Megatron EP/PP configs (PP>2, EP>16) but not +# others. Fix mirrors vLLM PR #44814: cap each loader call's counted elements at the +# destination param's numel(). Patch both the `meta` source and the `layerwise` import-site +# reference (layerwise.py does `from .meta import get_numel_loaded` at module load). +# Remove once the vLLM version includes https://github.com/vllm-project/vllm/pull/44814 . +try: + from vllm.model_executor.model_loader.reload import ( + layerwise as _vllm_reload_layerwise, + ) + from vllm.model_executor.model_loader.reload import meta as _vllm_reload_meta + + _orig_get_numel_loaded = _vllm_reload_meta.get_numel_loaded + + def _capped_get_numel_loaded(weight_loader, args): + numel, return_value = _orig_get_numel_loaded(weight_loader, args) + param = getattr(args, "arguments", {}).get("param", None) + if isinstance(param, torch.Tensor): + numel = min(numel, param.numel()) + return numel, return_value + + _vllm_reload_meta.get_numel_loaded = _capped_get_numel_loaded + _vllm_reload_layerwise.get_numel_loaded = _capped_get_numel_loaded +except (ImportError, AttributeError): + pass + class LayerwiseReloadWorkerMixin: """Bracket a multi-chunk weight sync with one vLLM layerwise-reload init/finalize. diff --git a/skyrl/backends/skyrl_train/inference_servers/new_inference_worker_wrap.py b/skyrl/backends/skyrl_train/inference_servers/new_inference_worker_wrap.py index 92f5906211..4d3831f536 100644 --- a/skyrl/backends/skyrl_train/inference_servers/new_inference_worker_wrap.py +++ b/skyrl/backends/skyrl_train/inference_servers/new_inference_worker_wrap.py @@ -91,6 +91,19 @@ def update_weights_ipc(self, update_info: dict) -> None: physical_gpu_id = str(torch.cuda.get_device_properties(device_index).uuid) if physical_gpu_id not in handles: raise ValueError(f"IPC handle not found for GPU UUID {physical_gpu_id}. " f"Available: {list(handles)}") + + # Each GPU's packed buffer was built from a DIFFERENT set/order of params, so slice + # it with THIS GPU's own metadata when available (falling back to the flat rank-0 + # metadata for older senders). Using rank 0's metadata for every GPU silently + # mis-slices buffers whose layout differs from rank 0's (EP>16 / PP>2 expert chunks). + per_gpu_pickled = update_info.get("per_gpu_meta_pickled") + if per_gpu_pickled is not None: + per_gpu = pickle.loads(base64.b64decode(per_gpu_pickled)) + meta = per_gpu.get(physical_gpu_id) + if meta is not None: + names = meta["names"] + shapes = meta["shapes"] + sizes = meta["sizes"] func, args = handles[physical_gpu_id] # Remap device index to the LOCAL current-device. list_args = list(args) diff --git a/skyrl/backends/skyrl_train/weight_sync/cuda_ipc_strategy.py b/skyrl/backends/skyrl_train/weight_sync/cuda_ipc_strategy.py index bdf2c7408e..fbf7e1337b 100644 --- a/skyrl/backends/skyrl_train/weight_sync/cuda_ipc_strategy.py +++ b/skyrl/backends/skyrl_train/weight_sync/cuda_ipc_strategy.py @@ -197,8 +197,6 @@ async def _send_chunks_vllm_native( world_size = torch.distributed.get_world_size() device = torch.cuda.current_device() gpu_uuid = str(torch.cuda.get_device_properties(device).uuid) - dtype = str_to_torch_dtype(self._init_info.model_dtype_str) - dtype_name = self._init_info.model_dtype_str.split(".")[-1] if rank == 0: await self._inference_client.start_weight_update(is_checkpoint_format=True) @@ -206,8 +204,13 @@ async def _send_chunks_vllm_native( for chunk in chunks: # --- pack all tensors in this chunk into one contiguous buffer --- - # Chunk tensors share a single dtype by construction (see - # weight_extractor_utils.py), so offsets in element units are safe. + # Chunk tensors share a single dtype by construction (the weight extractor + # emits one chunk per dtype), so offsets in element units are safe. Derive + # the packed dtype from the chunk itself rather than assuming model_dtype, + # so fp32 params (e.g. the MoE router bias) are transferred without bf16 + # down-casting; the receiver rebuilds at this dtype from the IPC handle. + chunk_dtype = chunk.tensors[0].dtype + dtype_name = str(chunk_dtype).split(".")[-1] names: List[str] = [] dtype_names: List[str] = [] shapes: List[List[int]] = [] @@ -217,7 +220,7 @@ async def _send_chunks_vllm_native( packed_tensor = torch.empty( total_numel, device=device, - dtype=dtype, + dtype=chunk_dtype, requires_grad=False, ) @@ -231,28 +234,49 @@ async def _send_chunks_vllm_native( shapes.append(list(shape) if not isinstance(shape, list) else shape) sizes.append(size) - # --- one IPC handle per rank for the packed buffer --- + # --- one IPC handle per rank for the packed buffer, plus THIS rank's own + # per-param metadata. Each rank packs a DIFFERENT set/order of params into its + # buffer (e.g. expert chunks carry per-EP-rank expert names), so the receiver + # must slice each GPU's buffer with THAT GPU's own names/sizes — not rank 0's. + # Using rank 0's metadata for every GPU mis-slices any buffer whose layout + # differs from rank 0's, which happens under EP>16 / PP>2 and silently loads + # the wrong tensor under each name (no crash; byte totals still match). See + # NEMOTRON_ULTRA_FINDINGS.md. ipc_handle: IpcHandle = reduce_tensor(packed_tensor) - local_handle_dict: Dict[str, IpcHandle] = {gpu_uuid: ipc_handle} - gathered: List[Optional[Dict[str, IpcHandle]]] = [None] * world_size - torch.distributed.all_gather_object(gathered, local_handle_dict) + local_entry: Dict[str, Any] = { + gpu_uuid: { + "handle": ipc_handle, + "names": names, + "dtype_names": dtype_names, + "shapes": shapes, + "sizes": sizes, + } + } + gathered: List[Optional[Dict[str, Any]]] = [None] * world_size + torch.distributed.all_gather_object(gathered, local_entry) torch.distributed.barrier() torch.cuda.synchronize() if rank == 0: - merged_handles: Dict[str, IpcHandle] = {} + merged: Dict[str, Any] = {} for d in gathered: if d is not None: - merged_handles.update(d) + merged.update(d) - pickled = base64.b64encode(pickle.dumps(merged_handles)).decode("utf-8") + # {gpu_uuid: handle} (back-compat) and {gpu_uuid: per-GPU metadata} (the fix). + merged_handles: Dict[str, IpcHandle] = {uuid: e["handle"] for uuid, e in merged.items()} + per_gpu_meta: Dict[str, Any] = { + uuid: {k: e[k] for k in ("names", "dtype_names", "shapes", "sizes")} for uuid, e in merged.items() + } chunk_update_info: Dict[str, Any] = { + # rank 0's flat metadata kept for receivers not yet using per-GPU metadata "names": names, "dtype_names": dtype_names, "shapes": shapes, "sizes": sizes, - "ipc_handles_pickled": pickled, + "ipc_handles_pickled": base64.b64encode(pickle.dumps(merged_handles)).decode("utf-8"), + "per_gpu_meta_pickled": base64.b64encode(pickle.dumps(per_gpu_meta)).decode("utf-8"), } await self._inference_client.update_weights_ipc(chunk_update_info) diff --git a/skyrl/backends/skyrl_train/workers/megatron/megatron_worker.py b/skyrl/backends/skyrl_train/workers/megatron/megatron_worker.py index 6e1f098653..61fe46f2a6 100644 --- a/skyrl/backends/skyrl_train/workers/megatron/megatron_worker.py +++ b/skyrl/backends/skyrl_train/workers/megatron/megatron_worker.py @@ -85,6 +85,19 @@ maybe_force_qwen35_text_bridge, ) +# Parameters that must be weight-synced in their native (fp32) precision rather than +# down-cast to the inference dtype (bf16). The MoE router score-correction bias +# (HF `*.gate.e_score_correction_bias`) has large magnitude (~25-57) with tiny +# per-expert offsets (std ~7e-4) far below bf16's ULP at that scale, so bf16 rounding +# collapses the offsets and corrupts expert routing after weight sync. vLLM keeps the +# router in fp32 (moe_router_dtype="fp32"), so we transfer this param in fp32. +_FP32_SYNC_SUFFIXES = ("e_score_correction_bias",) + + +def _sync_dtype_for(name: str, default_dtype: "torch.dtype", native_dtype: "torch.dtype") -> "torch.dtype": + """Dtype to weight-sync ``name`` in: native (fp32) for routing-critical params, else default.""" + return native_dtype if name.endswith(_FP32_SYNC_SUFFIXES) else default_dtype + class MegatronWeightExtractor(WeightExtractor): """Extracts weights from Megatron model-parallel models. @@ -216,7 +229,6 @@ def get_weight_metadata(self, dtype: torch.dtype) -> dict: names = [] dtype_names = [] shapes = [] - dtype_name = str(dtype).split(".")[-1] # Collect parameter metadata in the same order # as provided by `.extract_weights`. if not self.enable_bucketing: @@ -226,7 +238,7 @@ def get_weight_metadata(self, dtype: torch.dtype) -> dict: conversion_tasks=None, ): names.append(name) - dtype_names.append(dtype_name) + dtype_names.append(str(_sync_dtype_for(name, dtype, tensor.dtype)).split(".")[-1]) shapes.append(list(tensor.shape)) del tensor else: @@ -242,7 +254,7 @@ def get_weight_metadata(self, dtype: torch.dtype) -> dict: ): names.append(name) shapes.append(list(tensor.shape)) - dtype_names.append(dtype_name) + dtype_names.append(str(_sync_dtype_for(name, dtype, tensor.dtype)).split(".")[-1]) del tensor self._weight_metadata_cache = {"names": names, "dtype_names": dtype_names, "shapes": shapes} @@ -277,11 +289,12 @@ def extract_weights(self, dtype: torch.dtype): ) for name, tensor in hf_params_generator: - tensor = tensor.to(device=device, dtype=dtype, non_blocking=True) + out_dtype = _sync_dtype_for(name, dtype, tensor.dtype) + tensor = tensor.to(device=device, dtype=out_dtype, non_blocking=True) yield WeightChunk( names=[name], - dtypes=[str(dtype)], + dtypes=[str(out_dtype)], shapes=[list(tensor.shape)], tensors=[tensor], ) @@ -298,29 +311,30 @@ def extract_weights(self, dtype: torch.dtype): conversion_tasks=bucket_tasks, ) - # Collect all parameters in this bucket into one chunk - names = [] - dtypes_list = [] - shapes = [] - tensors = [] - + # Collect this bucket's params, grouped by output dtype. Each emitted + # chunk must be uniform-dtype because the CUDA-IPC sender packs a chunk + # into a single contiguous buffer of one dtype. Most params down-cast to + # `dtype` (bf16); routing-critical fp32 params (see _FP32_SYNC_SUFFIXES) + # keep their native dtype and ride a separate fp32 chunk. + groups: dict = {} # out_dtype -> (names, dtypes_list, shapes, tensors) for name, tensor in hf_params_generator: - # Move to device and convert dtype - tensor = tensor.to(device=device, dtype=dtype, non_blocking=True) - - names.append(name) - dtypes_list.append(str(dtype)) - shapes.append(list(tensor.shape)) - tensors.append(tensor) - - # Yield one chunk containing all parameters in this bucket - if tensors: - yield WeightChunk( - names=names, - dtypes=dtypes_list, - shapes=shapes, - tensors=tensors, - ) + out_dtype = _sync_dtype_for(name, dtype, tensor.dtype) + tensor = tensor.to(device=device, dtype=out_dtype, non_blocking=True) + g = groups.setdefault(out_dtype, ([], [], [], [])) + g[0].append(name) + g[1].append(str(out_dtype)) + g[2].append(list(tensor.shape)) + g[3].append(tensor) + + # Yield one chunk per dtype group in this bucket. + for names, dtypes_list, shapes, tensors in groups.values(): + if tensors: + yield WeightChunk( + names=names, + dtypes=dtypes_list, + shapes=shapes, + tensors=tensors, + ) class MegatronWorker: diff --git a/skyrl/backends/skyrl_train/workers/worker.py b/skyrl/backends/skyrl_train/workers/worker.py index 6bc218313b..2eabffd02c 100644 --- a/skyrl/backends/skyrl_train/workers/worker.py +++ b/skyrl/backends/skyrl_train/workers/worker.py @@ -337,6 +337,10 @@ def get_cuda_memory(self) -> Dict[str, Any]: return { "allocated": torch.cuda.memory_allocated(), "reserved": torch.cuda.memory_reserved(), + # High-water marks (persist across empty_cache / offload, until reset_peak_memory_stats), + # so they capture the in-step fwd/bwd peak even when queried after the policy is offloaded. + "max_allocated": torch.cuda.max_memory_allocated(), + "max_reserved": torch.cuda.max_memory_reserved(), "free": free, "total": total, } diff --git a/skyrl/train/utils/utils.py b/skyrl/train/utils/utils.py index f0b321e9ba..cf8c795e14 100644 --- a/skyrl/train/utils/utils.py +++ b/skyrl/train/utils/utils.py @@ -724,6 +724,43 @@ def prepare_runtime_environment(cfg: SkyRLTrainConfig) -> dict[str, str]: logger.info(f"Exporting `SKYRL_RAY_PG_TIMEOUT_IN_S` to ray runtime env: {pg_timeout}") env_vars["SKYRL_RAY_PG_TIMEOUT_IN_S"] = pg_timeout + # The inference-server health-wait timeout is read at import time inside the + # VLLMServerActor process (a Ray worker actor), so it must be forwarded to take + # effect. Large models (e.g. 550B) loaded across multi-node PP engines from disk + # can take well over the 600s default to become healthy. + if health_timeout := os.environ.get("SKYRL_WAIT_UNTIL_INFERENCE_SERVER_HEALTHY_TIMEOUT_S"): + logger.info( + f"Exporting `SKYRL_WAIT_UNTIL_INFERENCE_SERVER_HEALTHY_TIMEOUT_S` to ray runtime env: {health_timeout}" + ) + env_vars["SKYRL_WAIT_UNTIL_INFERENCE_SERVER_HEALTHY_TIMEOUT_S"] = health_timeout + + # Forward HuggingFace + cache-location env vars to the Ray workers. Without this, + # workers fall back to the default home dir (`~/.cache`) for the HF model cache and + # uv/Triton/Inductor/vLLM build caches. For large models on nodes with a small home + # disk, that means re-downloading the checkpoint to (and filling) the home disk, + # which can take the node down. Point these at a large disk (e.g. /mnt/local_storage) + # via the launch env so workers read the pre-staged model and write caches there. + for var_name in ( + "HF_HOME", + "HF_HUB_OFFLINE", + "HF_TOKEN", + "HF_HUB_ENABLE_HF_TRANSFER", + "HUGGING_FACE_HUB_TOKEN", + "XDG_CACHE_HOME", + "UV_CACHE_DIR", + "TRITON_CACHE_DIR", + "TORCHINDUCTOR_CACHE_DIR", + "VLLM_CACHE_ROOT", + "TMPDIR", + # vLLM executor selection: cross-node serving needs the V2 (MultiprocExecutor) + # backend; the default compiled-DAG executor both crashes the raylet on the + # cross-node hop and takes a KV-cache-init path that KeyErrors on NemotronH. + "VLLM_USE_RAY_V2_EXECUTOR_BACKEND", + "VLLM_USE_RAY_COMPILED_DAG_CHANNEL_TYPE", + ): + if value := os.environ.get(var_name): + env_vars.setdefault(var_name, value) + return env_vars diff --git a/tests/backends/skyrl_train/gpu/gpu_ci/conftest.py b/tests/backends/skyrl_train/gpu/gpu_ci/conftest.py index b3d154c134..a6e3b5d98b 100644 --- a/tests/backends/skyrl_train/gpu/gpu_ci/conftest.py +++ b/tests/backends/skyrl_train/gpu/gpu_ci/conftest.py @@ -6,7 +6,11 @@ import ray from loguru import logger -from skyrl.env_vars import _SKYRL_USE_NEW_INFERENCE, SKYRL_PYTHONPATH_EXPORT +from skyrl.env_vars import ( + _SKYRL_USE_NEW_INFERENCE, + SKYRL_LD_LIBRARY_PATH_EXPORT, + SKYRL_PYTHONPATH_EXPORT, +) from skyrl.train.utils.utils import peer_access_supported @@ -47,6 +51,38 @@ def _build_ray_env_vars(): raise RuntimeError("SKYRL_PYTHONPATH_EXPORT is set but PYTHONPATH is not defined in environment") env_vars["PYTHONPATH"] = pythonpath + # Mirror prepare_runtime_environment: for multi-node tests over EFA, the + # driver's LD_LIBRARY_PATH (e.g. /opt/amazon/efa/lib) must reach the Ray + # workers so NCCL picks up the EFA provider. Set SKYRL_LD_LIBRARY_PATH_EXPORT=1. + if SKYRL_LD_LIBRARY_PATH_EXPORT: + ld_library_path = os.environ.get("LD_LIBRARY_PATH") + if ld_library_path is None: + raise RuntimeError("SKYRL_LD_LIBRARY_PATH_EXPORT is set but LD_LIBRARY_PATH is not defined in environment") + env_vars["LD_LIBRARY_PATH"] = ld_library_path + + # Forward debugging/observability env vars to the Ray workers when set on the + # driver. Useful for multi-node bring-up: NCCL_DEBUG / FI_* surface + # NCCL+EFA init failures, and SKYRL_DUMP_INFRA_LOG_TO_STDOUT=1 stops the + # inference-server actors from redirecting their stdout to per-node log + # files (so crash tracebacks reach the driver). vLLM additionally copies + # NCCL_*/FI_* prefixed vars from the engine to its TP/PP workers. + # HF_* / HUGGING_FACE_* let the workers find a pre-staged model cache + # (e.g. HF_HOME on a large node-local disk for very big models) and use an + # HF token; vLLM also copies HF_-prefixed vars to its TP/PP workers. + # SKYRL_WAIT_UNTIL_INFERENCE_SERVER_HEALTHY_TIMEOUT_S is read at import time inside + # the VLLMServerActor worker process, so it must be forwarded to take effect. Very + # large models (e.g. 550B) loaded across multi-node PP engines can take well over + # the 600s default to become healthy. + _DEBUG_PASSTHROUGH = ( + "SKYRL_DUMP_INFRA_LOG_TO_STDOUT", + "PYTORCH_CUDA_ALLOC_CONF", + "SKYRL_WAIT_UNTIL_INFERENCE_SERVER_HEALTHY_TIMEOUT_S", + ) + _DEBUG_PREFIXES = ("NCCL_", "FI_", "HF_", "HUGGING_FACE_", "VLLM_") + for name, value in os.environ.items(): + if name in _DEBUG_PASSTHROUGH or name.startswith(_DEBUG_PREFIXES): + env_vars.setdefault(name, value) + return env_vars diff --git a/tests/backends/skyrl_train/gpu/gpu_ci/megatron/test_megatron_models.py b/tests/backends/skyrl_train/gpu/gpu_ci/megatron/test_megatron_models.py index b92ef1aa73..5982ef3942 100644 --- a/tests/backends/skyrl_train/gpu/gpu_ci/megatron/test_megatron_models.py +++ b/tests/backends/skyrl_train/gpu/gpu_ci/megatron/test_megatron_models.py @@ -25,6 +25,7 @@ from tests.backends.skyrl_train.gpu.utils import ( InferenceEngineState, Timer, + _ensure_chat_template, get_test_generator_input, init_worker_with_type, ) @@ -83,11 +84,25 @@ def get_test_actor_config(model_name) -> SkyRLTrainConfig: # the fp32 master + AdamW state on GPU at init (~6x model size), which # OOMs on 4xH100 before forward ever runs. These tests only forward + # weight-sync, so skip optimizer construction entirely. - is_large_moe = ("qwen3.5-35b" in model_name.lower() and "tiny" not in model_name.lower()) or ( - "nemotron-3-nano" in model_name.lower() + is_large_moe = ( + ("qwen3.5-35b" in model_name.lower() and "tiny" not in model_name.lower()) + or ("nemotron-3-nano" in model_name.lower()) + or ("nemotron-3-ultra" in model_name.lower()) ) if is_large_moe: cfg.trainer.policy.inference_only_init = True + if "nemotron-3-ultra" in model_name.lower(): + # Nemotron-Ultra ships MTP layers (num_nextn_predict_layers=1). Megatron-Bridge's + # Mamba provider builds an `mtp_hybrid_override_pattern` from that, and its + # finalize() does `[pattern] * mtp_num_layers`. SkyRL disables MTP for + # training by nulling mtp_num_layers, but for the Mamba provider that leaves + # the pattern set -> `[pattern] * None` -> TypeError. Clear both up front + # (transformer_config_kwargs are applied right before provider.finalize()). + if cfg.trainer.policy.megatron_config.transformer_config_kwargs is None: + cfg.trainer.policy.megatron_config.transformer_config_kwargs = {} + cfg.trainer.policy.megatron_config.transformer_config_kwargs.update( + {"mtp_num_layers": 0, "mtp_hybrid_override_pattern": None} + ) validate_cfg(cfg) return cfg @@ -108,6 +123,14 @@ def _engine_overrides_for_model(model_name: str) -> dict: # Megatron policy init also needs room alongside vLLM on the same # GPU, so lower vLLM's pool footprint. overrides["gpu_memory_utilization"] = 0.5 + if "Nemotron-3-Ultra" in model_name: + # 550B sharded 16-way (vLLM TP8 x PP2) is ~69 GB of weights per GPU, so + # the KV pool needs gmu well above 0.5 just to leave cache room. vLLM + # and the Megatron policy alternate on-GPU via sleep/wake (sleep_level=2), + # so vLLM can claim most of the H200 while loading. Cap context to 4096. + # Starting point -- tune alongside the parallelism. + overrides["engine_init_kwargs"]["max_model_len"] = 4096 + overrides["gpu_memory_utilization"] = 0.85 # Large MoE: Megatron policy init also needs room alongside vLLM on the # same GPU, so lower vLLM's pool footprint. if "qwen3.5-35b" in model_name.lower() and "tiny" not in model_name.lower(): @@ -195,10 +218,10 @@ async def construct_training_input_from_generator_output(generator_output, token @pytest.mark.asyncio @pytest.mark.megatron_models @pytest.mark.parametrize( - "tp,pp,cp,ep,etp,inference_tp,num_gpus,model_name,vllm_threshold,megatron_threshold", + "tp,pp,cp,ep,etp,inference_tp,inference_pp,num_gpus,num_nodes,model_name,vllm_threshold,megatron_threshold", [ - pytest.param(2, 1, 1, 2, 1, 2, 4, "eatang/qwen3-moe-tiny-random", 1e-1, 2e-1, id="qwen3-moe_tp2_ep2"), - pytest.param(1, 2, 2, 1, None, 2, 4, "eatang/qwen3-moe-tiny-random", 1e-1, 2e-1, id="qwen3-moe_pp2_cp2"), + pytest.param(2, 1, 1, 2, 1, 2, 1, 4, 1, "eatang/qwen3-moe-tiny-random", 1e-1, 2e-1, id="qwen3-moe_tp2_ep2"), + pytest.param(1, 2, 2, 1, None, 2, 1, 4, 1, "eatang/qwen3-moe-tiny-random", 1e-1, 2e-1, id="qwen3-moe_pp2_cp2"), pytest.param( 2, 1, @@ -206,7 +229,9 @@ async def construct_training_input_from_generator_output(generator_output, token 2, 1, 2, + 1, 4, + 1, "eatang/glm-4.7-flash-tiny-random", 1e-1, 2e-2, @@ -220,7 +245,9 @@ async def construct_training_input_from_generator_output(generator_output, token 2, 1, 4, + 1, 4, + 1, "eatang/qwen3.5-moe-tiny-random", 1e-1, 2e-1, @@ -237,7 +264,9 @@ async def construct_training_input_from_generator_output(generator_output, token 1, None, 2, + 1, 2, + 1, "Qwen/Qwen3.5-0.8B", 1e-1, 5e-2, @@ -257,7 +286,9 @@ async def construct_training_input_from_generator_output(generator_output, token 4, 1, 4, + 1, 4, + 1, "nvidia/NVIDIA-Nemotron-3-Nano-30B-A3B-BF16", 5e-1, 5e-2, @@ -275,26 +306,110 @@ async def construct_training_input_from_generator_output(generator_output, token 4, 1, 4, + 1, 4, + 1, "Qwen/Qwen3.5-35B-A3B", 3e-1, 5e-2, id="qwen3.5-35b-a3b_h100_tp4_ep4", marks=pytest.mark.h100, ), + # Nemotron-3-Ultra (550B MoE, ~55B activated, bf16) on 8 nodes x 8xH200 + # = 64 GPUs. Megatron mesh: TP=8 PP=2 CP=1 EP=16 ETP=1 -> DP=4 + # (8*2 = 16 GPUs per model replica, 64/16 = 4 DP, ~69 GiB weights/GPU). + # vLLM: TP=8 (intra-node, NVLink) x PP=4 (across 4 nodes, EFA) = 32 + # GPUs/engine, num_engines = 64/32 = 2 (colocated over the same 64 GPUs). + # vLLM TP must divide NemotronH's Mamba n_groups (=8), so TP=16 is invalid; + # the cross-node parallelism comes from PP instead. vLLM PP=4 (not 2) shards + # vLLM weights ~34 GiB/GPU so that during the colocated Megatron->vLLM weight + # broadcast the policy shard (~69 GiB) + woken vLLM weights (~34 GiB) fit + # alongside the broadcast buffers on the 141 GiB H200 (PP=2 -> ~69+69 OOMs). + # Needs VLLM_USE_RAY_V2_EXECUTOR_BACKEND=1 (compiled-DAG shm channel crashes + # the raylet cross-node) and the ray distributed executor backend. + pytest.param( + 8, + 2, + 1, + 16, + 1, + 8, + 4, + 64, + 8, + "nvidia/NVIDIA-Nemotron-3-Ultra-550B-A55B-BF16", + 3e-1, + # 550B hybrid-MoE diverges more between vLLM and Megatron than the + # smaller MoE cases (observed diff mean ~0.059, driven by a minority + # of high-divergence tokens: Megatron logprob std ~0.445 vs vLLM ~0.178). + 8e-2, + id="nemotron3-ultra_tp8pp4_mega_tp8pp2ep16_8node", + marks=pytest.mark.h100, + ), + # Same as above but EP=32 (full expert sharding, ETP=1 -> EDP=1). This is the + # parallelism the full-FT training recipe uses (EP=16 OOMs there once the + # optimizer is added). Diagnostic: training at EP=32 produced GARBAGE vLLM + # generations from step 1, while EP=16 (above) passes -> this isolates whether + # the Megatron->vLLM expert weight gather is correct at EP=32. A blown-up + # post-sync diff (vs pre-sync OK) would pin the bug to the EP=32 sync gather. + pytest.param( + 8, + 2, + 1, + 32, + 1, + 8, + 4, + 64, + 8, + "nvidia/NVIDIA-Nemotron-3-Ultra-550B-A55B-BF16", + 3e-1, + 8e-2, + id="nemotron3-ultra_tp8pp4_mega_tp8pp2ep32_8node", + marks=pytest.mark.h100, + ), + # Same as the validated EP=16 case but Megatron PP=4 (vs 2). DISCRIMINATOR: full-FT + # training at EP=16/PP=4 produced GARBAGE vLLM generations, while this inference-only + # path at EP=16/PP=2 passes (0.298). PP=4 is the only parallelism change from the + # validated case, so if this PASSES, PP=4 sync is fine and the training garbage comes + # from the full-FT optimizer init; if it FAILS, PP=4 itself breaks the weight-sync gather. + pytest.param( + 8, + 4, + 1, + 16, + 1, + 8, + 4, + 64, + 8, + "nvidia/NVIDIA-Nemotron-3-Ultra-550B-A55B-BF16", + 3e-1, + 8e-2, + id="nemotron3-ultra_tp8pp4_mega_tp8pp4ep16_8node", + marks=pytest.mark.h100, + ), ], ) async def test_logprobs_matching_roundtrip( - tp, pp, cp, ep, etp, inference_tp, num_gpus, model_name, vllm_threshold, megatron_threshold + tp, pp, cp, ep, etp, inference_tp, inference_pp, num_gpus, num_nodes, model_name, vllm_threshold, megatron_threshold ): """ Check that logprob diff matches acrosss vllm and megatron. """ + assert num_gpus % num_nodes == 0, f"num_gpus ({num_gpus}) must be divisible by num_nodes ({num_nodes})" + num_gpus_per_node = num_gpus // num_nodes with ray_init(extra_env_vars=_extra_env_vars_for_model(model_name)): cfg = get_test_actor_config(model_name=model_name) cfg.trainer.strategy = "megatron" cfg.generator.inference_engine.tensor_parallel_size = inference_tp - cfg.generator.inference_engine.num_engines = num_gpus // inference_tp + cfg.generator.inference_engine.pipeline_parallel_size = inference_pp + cfg.generator.inference_engine.num_engines = num_gpus // (inference_tp * inference_pp) + # Colocated weight sync keeps the Megatron policy shard and the woken vLLM + # weights on the same GPUs simultaneously. For large models this is tight; + # the expandable_segments allocator reclaims fragmentation so the weight + # broadcast's working buffers fit. + cfg.generator.inference_engine.use_expandable_segments = True cfg.generator.sampling_params = SamplingParams( max_generate_length=MAX_GENERATE_LENGTH, logprobs=1, @@ -305,6 +420,11 @@ async def test_logprobs_matching_roundtrip( tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True) tokenizer.pad_token = tokenizer.eos_token + # Some checkpoints (e.g. NemotronH-Ultra) ship no chat template; install a + # minimal one so the generator can format prompts. No-op if one exists. + # vLLM and Megatron both see the same formatted input, so the logprob + # comparison stays valid. + _ensure_chat_template(tokenizer) engine_overrides = _engine_overrides_for_model(model_name) async with InferenceEngineState.create( @@ -331,7 +451,8 @@ async def test_logprobs_matching_roundtrip( generator, client, model_name, tokenizer, return_training_input=True ) await client.sleep() - cfg.trainer.placement.policy_num_gpus_per_node = num_gpus + cfg.trainer.placement.policy_num_nodes = num_nodes + cfg.trainer.placement.policy_num_gpus_per_node = num_gpus_per_node cfg.trainer.policy.megatron_config.tensor_model_parallel_size = tp cfg.trainer.policy.megatron_config.pipeline_model_parallel_size = pp cfg.trainer.policy.megatron_config.context_parallel_size = cp @@ -344,7 +465,8 @@ async def test_logprobs_matching_roundtrip( "policy", shared_pg=pg, colocate_all=True, - num_gpus_per_node=num_gpus, + num_gpus_per_node=num_gpus_per_node, + num_nodes=num_nodes, cfg=cfg, ) ray.get( diff --git a/tests/backends/skyrl_train/gpu/utils.py b/tests/backends/skyrl_train/gpu/utils.py index b4410e4890..19a557350b 100644 --- a/tests/backends/skyrl_train/gpu/utils.py +++ b/tests/backends/skyrl_train/gpu/utils.py @@ -44,7 +44,11 @@ TrainingInputBatch, ) from skyrl.backends.skyrl_train.workers.worker import PPORayActorGroup -from skyrl.env_vars import _SKYRL_USE_NEW_INFERENCE, SKYRL_PYTHONPATH_EXPORT +from skyrl.env_vars import ( + _SKYRL_USE_NEW_INFERENCE, + SKYRL_PYTHONPATH_EXPORT, + SKYRL_RAY_PG_TIMEOUT_IN_S, +) from skyrl.train.config import SkyRLTrainConfig from skyrl.train.dataset import PromptDataset from skyrl.train.dataset.replay_buffer import Experience @@ -584,7 +588,7 @@ def create( [{"GPU": 1, "CPU": 1}] * total_gpu_slots, strategy="PACK", ) - get_ray_pg_ready_with_timeout(raw_pg, timeout=60) + get_ray_pg_ready_with_timeout(raw_pg, timeout=SKYRL_RAY_PG_TIMEOUT_IN_S) shared_pg = ResolvedPlacementGroup(raw_pg) sleep = True else: