Skip to content

Conversation

@clumsy
Copy link

@clumsy clumsy commented Nov 10, 2025

What does this PR do ?

Enables vLLM DP+EP in async engine by using Ray-level data parallelism to avoid NCCL ALLREDUCE deadlocks.

Issues

List issues that this PR closes (syntax): #1101

Usage

  • Tested async DP+EP:
    true uv run python examples/run_eval.py --config examples/configs/evals/eval.yaml generation.model_name=Qwen/Qwen3-30B-A3B-Instruct-2507 generation.vllm_cfg.async_engine=true generation.vllm_cfg.expert_parallel_size=2 data.num_samples=10 cluster.gpus_per_node=8
============================================================
model_name='Qwen3-30B-A3B-Instruct-2507' dataset_name='aime2024'
max_new_tokens=2048 temperature=0.0 top_p=1.0 top_k=-1 seed=42

metric=pass@1 num_tests_per_prompt=1

score=0.2333 (7.0/30)
============================================================
  • Tested sync DP+EP:
    NRL_FORCE_REBUILD_VENVS=true uv run python examples/run_eval.py --config examples/configs/evals/eval.yaml generation.model_name=Qwen/Qwen3-30B-A3B-Instruct-2507 generation.vllm_cfg.async_engine=false generation.vllm_cfg.expert_parallel_size=2 data.num_samples=10 cluster.gpus_per_node=8
============================================================
model_name='Qwen3-30B-A3B-Instruct-2507' dataset_name='aime2024'
max_new_tokens=2048 temperature=0.0 top_p=1.0 top_k=-1 seed=42

metric=pass@1 num_tests_per_prompt=1

score=0.1667 (5.0/30)
============================================================

Before your PR is "Ready for review"

Pre checks:

  • Make sure you read and followed Contributor guidelines
  • Did you write any new necessary tests?
  • Did you run the unit tests and functional tests locally? Visit our Testing Guide for how to run tests
  • Did you add or update any necessary documentation? Visit our Document Development Guide for how to write, build and test the docs.

Additional Information

Ray-level DP instead of vLLM internal DP for async engine with DP+EP

When async_engine=true with both dp_size>1 and ep_size>1:
• Create multiple independent Ray actors (one per DP replica)
• Each actor runs a single vLLM engine with VLLM_DP_SIZE=1 (no internal DP)
• Avoids NCCL ALLREDUCE collectives that cause deadlocks in async generation

Summary by CodeRabbit

  • Bug Fixes

    • Removed execution parallelism configuration constraints and improved async generation compatibility
    • Enhanced distributed device allocation and communication setup for multi-worker scenarios
    • Improved environment variable handling for distributed parallelism modes
  • Tests

    • Added unit tests validating distributed parallelism configurations, device allocation, and communication setup

@clumsy clumsy requested review from a team as code owners November 10, 2025 18:32
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Nov 10, 2025

📝 Walkthrough

Walkthrough

Refactored vLLM distributed parallelism handling to enable Ray-level DP mode when async_engine is active with multi-DP and multi-EP configurations. Replaced EP/TP assertion with Ray-level DP gating logic. Added device allocation and stats address patch methods to worker to handle multi-DP setup corrections.

Changes

Cohort / File(s) Summary
vLLM Ray-level DP Configuration
nemo_rl/models/generation/vllm/vllm_generation.py
Removed conditional assertion for EP > 1; introduced Ray-level DP mode gate (enabled when dp_size > 1, ep_size > 1, and async_engine active); set vllm_dp_size to 1 when Ray-level DP active; ensured VLLM_DP_SIZE environment variable is always set.
vLLM Worker Patch Methods
nemo_rl/models/generation/vllm/vllm_worker_async.py
Added _patch_vllm_device_allocation method to handle device index resolution for DP+EP and single-device environments; added _patch_vllm_stats_address method to patch stats_update_address for multi-DP setups; both patches invoked from _create_engine prior to engine creation.
Unit Tests for DP+EP Patches
tests/unit/models/generation/test_vllm_async_dp_ep.py
New test module covering device allocation and stats address patch behavior with mocked vLLM internals; test cases include single device allocation, missing environment variables, and DP size conditional branching.

Sequence Diagram(s)

sequenceDiagram
    participant Config as vllm_generation.py
    participant Worker as VllmAsyncGenerationWorker
    participant vLLM as vLLM Internals
    
    Config->>Config: Evaluate Ray-level DP condition<br/>(dp_size > 1 AND ep_size > 1<br/>AND async_engine enabled)
    
    alt Ray-level DP Mode Active
        Config->>Config: Set vllm_dp_size = 1
        Config->>Config: Log: Using Ray-level DP
    else Ray-level DP Mode Inactive
        Config->>Config: Keep vllm_dp_size as-is
    end
    
    Config->>Config: Set VLLM_DP_SIZE env var
    Config->>Worker: Initialize worker with config
    
    Worker->>Worker: Call _create_engine()
    Worker->>Worker: Call _patch_vllm_device_allocation()
    Worker->>vLLM: Patch get_device_indices()
    
    Worker->>Worker: Call _patch_vllm_stats_address()
    alt VLLM_DP_SIZE > 1
        Worker->>vLLM: Patch stats_update_address init
        Worker->>vLLM: Inject local TCP address
    end
    
    Worker->>vLLM: Create engine with patched internals
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20 minutes

  • vllm_generation.py: Logic changes around Ray-level DP gating require understanding the distributed configuration constraints and environment variable handling. The removed assertion and new conditional logic need verification.
  • vllm_worker_async.py: Two new patch methods introduce runtime monkey-patching of vLLM internals. Review requires understanding vLLM's device allocation and stats addressing mechanisms to ensure patches are robust and don't break with vLLM version changes.
  • test_vllm_async_dp_ep.py: Comprehensive test coverage, but mocking-heavy nature requires careful validation that mocks accurately represent actual vLLM behavior.

Possibly related PRs

Suggested labels

r0.4.0

Suggested reviewers

  • terrykong

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 63.64% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title clearly and concisely summarizes the main change: enabling vLLM DP+EP support in async engine via Ray-level data parallelism, which matches the primary objective of the PR.
Test Results For Major Changes ✅ Passed PR includes documented test results (pass@1 scores: async DP+EP 0.2333, sync DP+EP 0.1667) and unit tests for major vLLM async generation changes with DP+EP support.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Nitpick comments (1)
nemo_rl/models/generation/vllm/vllm_worker_async.py (1)

129-154: Consider more specific exception handling in the fallback logic.

While the broad Exception catch at line 143 serves as a defensive fallback when vLLM's device index parser fails, catching more specific exceptions (e.g., ValueError, KeyError) would improve clarity and avoid masking unexpected errors.

Apply this diff to narrow the exception scope:

             def patched_get_device_indices(
                 device_control_env_var, local_dp_rank, world_size
             ):
                 try:
                     return original_fn(
                         device_control_env_var, local_dp_rank, world_size
                     )
-                except Exception:
+                except (ValueError, KeyError, IndexError):
                     import os
 
                     value = os.environ.get(device_control_env_var, "")
                     # Return string for single device, list for multiple
                     if value and "," not in value:
                         return value  # Return as string, not list
                     return [local_dp_rank * world_size + i for i in range(world_size)]
📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 2951ce3 and 0b1ec1d.

📒 Files selected for processing (3)
  • nemo_rl/models/generation/vllm/vllm_generation.py (1 hunks)
  • nemo_rl/models/generation/vllm/vllm_worker_async.py (3 hunks)
  • tests/unit/models/generation/test_vllm_async_dp_ep.py (1 hunks)
🧰 Additional context used
📓 Path-based instructions (2)
**/*.py

📄 CodeRabbit inference engine (CODING_GUIDELINES.md)

**/*.py: Follow the Google Python Style Guide for all Python code
Target Python 3.12+ for all Python code in NeMo-RL
Indent Python code with 4 spaces; do not use tabs
Python filenames should be snake_case (e.g., some_file.py)
Class names should be PascalCase
Function and method names should be snake_case
Local variable names should be snake_case; if starting with a number, prefix with k (e.g., k_99th_percentile)
Global variables should be UPPER_SNAKE_CASE and prefixed with G_ (e.g., G_MY_GLOBAL)
Constants should be UPPER_SNAKE_CASE
Avoid shadowing variables declared in an outer scope
Initialize all externally visible members of a class in the constructor
For public interfaces used outside a file, prefer docstrings over comments
Use comments mainly for code within a function or interfaces local to a file
Commented-out code must include a nearby comment explaining usage and why it is commented out; otherwise remove before merging
Use Google-style docstrings for classes and functions (Sphinx-parseable)
Avoid using reflection when functionality can be easily achieved without it
Limit except clauses to the smallest specific set of exceptions possible
For duck-typing via try/except, keep the try body minimal and use else for main logic
Add the NVIDIA copyright header (with current year) at the top of all Python files, excluding tests/ and test-only scripts

Files:

  • nemo_rl/models/generation/vllm/vllm_worker_async.py
  • tests/unit/models/generation/test_vllm_async_dp_ep.py
  • nemo_rl/models/generation/vllm/vllm_generation.py
nemo_rl/**/*.py

📄 CodeRabbit inference engine (CODING_GUIDELINES.md)

nemo_rl/**/*.py: Do not set non-None configuration defaults in code; YAML is the single source of truth for defaults
Access required config attributes directly (e.g., policy_cfg["precision"]) and assume presence; do not introduce hidden defaults
Express configuration optionality via TypedDict using typing.NotRequired
When adding a new config key to a TypedDict subclass, document the key’s purpose, valid values/types, and recommended default in code
For any class or function decorated with @ray.remote, add '# pragma: no cover' on the class/def line (and on remote functions)

Files:

  • nemo_rl/models/generation/vllm/vllm_worker_async.py
  • nemo_rl/models/generation/vllm/vllm_generation.py
🧠 Learnings (3)
📓 Common learnings
Learnt from: adil-a
Repo: NVIDIA-NeMo/RL PR: 1440
File: examples/configs/sft_automodel.yaml:48-58
Timestamp: 2025-10-30T20:50:44.126Z
Learning: In DTensor configurations for MoE (Mixture of Experts) models, expert_parallel_size and data_parallel_size can be applied together without multiplying the GPU requirements. Expert Parallelism (EP) only applies to MoE layers, while Data Parallelism/FSDP applies to non-MoE layers. Therefore, configurations like expert_parallel_size: 8 and data_parallel_size: 8 are valid on an 8-GPU cluster for MoE models.
📚 Learning: 2025-09-10T05:35:59.840Z
Learnt from: bxyu-nvidia
Repo: NVIDIA-NeMo/RL PR: 1110
File: nemo_rl/models/generation/vllm/vllm_worker_async.py:363-369
Timestamp: 2025-09-10T05:35:59.840Z
Learning: In nemo_rl/models/generation/vllm/vllm_worker_async.py, the HTTP server should explicitly bind to "0.0.0.0" (all interfaces) rather than a specific interface, as confirmed by bxyu-nvidia. This is an intentional design decision for the vLLM HTTP server functionality.

Applied to files:

  • nemo_rl/models/generation/vllm/vllm_worker_async.py
📚 Learning: 2025-10-30T20:50:44.126Z
Learnt from: adil-a
Repo: NVIDIA-NeMo/RL PR: 1440
File: examples/configs/sft_automodel.yaml:48-58
Timestamp: 2025-10-30T20:50:44.126Z
Learning: In DTensor configurations for MoE (Mixture of Experts) models, expert_parallel_size and data_parallel_size can be applied together without multiplying the GPU requirements. Expert Parallelism (EP) only applies to MoE layers, while Data Parallelism/FSDP applies to non-MoE layers. Therefore, configurations like expert_parallel_size: 8 and data_parallel_size: 8 are valid on an 8-GPU cluster for MoE models.

Applied to files:

  • nemo_rl/models/generation/vllm/vllm_generation.py
🧬 Code graph analysis (2)
tests/unit/models/generation/test_vllm_async_dp_ep.py (1)
nemo_rl/models/generation/vllm/vllm_worker_async.py (2)
  • _patch_vllm_device_allocation (129-154)
  • _patch_vllm_stats_address (156-213)
nemo_rl/models/generation/vllm/vllm_generation.py (1)
nemo_rl/distributed/worker_groups.py (1)
  • dp_size (600-602)
🪛 Ruff (0.14.3)
nemo_rl/models/generation/vllm/vllm_worker_async.py

143-143: Do not catch blind exception: Exception

(BLE001)


188-188: Local variable original_init is assigned to but never used

Remove assignment to unused variable original_init

(F841)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
  • GitHub Check: Post automodel integration comment / Comment on PR
  • GitHub Check: Post submodule check comment / Comment on PR
🔇 Additional comments (4)
nemo_rl/models/generation/vllm/vllm_generation.py (2)

172-185: LGTM: Ray-level DP gating logic addresses NCCL deadlock issue.

The conditional logic correctly enables Ray-level DP when async_engine is active with multi-DP and multi-EP configurations. Setting vllm_dp_size=1 effectively disables vLLM's internal DP, delegating data parallelism to Ray actors to avoid NCCL collective deadlocks during asynchronous generation.


186-186: LGTM: Environment variable correctly reflects DP mode.

The VLLM_DP_SIZE environment variable is now consistently set for all workers, with its value correctly reflecting whether Ray-level DP (value=1) or vLLM internal DP is being used.

nemo_rl/models/generation/vllm/vllm_worker_async.py (2)

226-227: LGTM: Patches correctly applied before engine creation.

The patch methods are invoked at the right point - before AsyncLLM.from_engine_args() - ensuring that device allocation and stats address corrections are active during engine initialization.


17-17: LGTM: Import required for environment variable access.

The os module is correctly imported for accessing VLLM_DP_SIZE environment variable in _patch_vllm_stats_address.

Copy link
Contributor

@terrykong terrykong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tysm for the contribution! @parthchadha @yuki-97 could you help review?

@clumsy
Copy link
Author

clumsy commented Nov 10, 2025

Sure, hope it helps @terrykong, I'll address the review issues meanwhile

@clumsy clumsy force-pushed the fix/async_engine_dp+ep branch from 0b1ec1d to 385027b Compare November 10, 2025 19:25
print(
f"INFO: Using Ray-level DP with {self.dp_size} independent workers (async engine with DP={self.dp_size}, EP={self.ep_size})"
)
self.vllm_dp_size = 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks so much for the contribution!

maybe I've missed something. if we use dp8ep2 with 8gpus like the usage case, use_ray_level_dp will be true and vllm_dp_size will be set to 1.
in this case, how will vLLM apply ep2? or will it become dp8ep1?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This forces DP1 w/ async_engine=true and DP+EP so we get dp1ep2 for each of 8 workers, @yuki-97.

Otherwise if we have 1 vLLM with dp8 it deadlocks in NCCL collectives, e.g. get_dp_padding:

...
    at::_ops::to_dtype_layout::call (libtorch_cpu.so)
    at::Tensor::to (libtorch_python.so)
    torch::autograd::dispatch_to (libtorch_python.so)
    torch::autograd::THPVariable_cpu (libtorch_python.so)
    num_tokens_across_dp (vllm/forward_context.py:114)
    get_dp_padding (vllm/v1/worker/gpu_model_runner.py:1841)
    _preprocess (vllm/v1/worker/gpu_model_runner.py:1963)
    execute_model (vllm/v1/worker/gpu_model_runner.py:2268)
    decorate_context (torch/utils/_contextlib.py:120)
    execute_model (vllm/v1/worker/gpu_worker.py:447)
    decorate_context (torch/utils/_contextlib.py:120)
    run_method (vllm/utils/__init__.py:3122)
    collective_rpc (vllm/executor/uniproc_executor.py:83)
    execute_model (vllm/v1/executor/abstract.py:103)
    execute_model_with_error_logging (vllm/v1/engine/core.py:261)
    step (vllm/v1/engine/core.py:284)
    _process_engine_step (vllm/v1/engine/core.py:754)
    run_busy_loop (vllm/v1/engine/core.py:1045)
    run_engine_core (vllm/v1/engine/core.py:701)
    run (multiprocessing/process.py:108)
    _bootstrap (multiprocessing/process.py:314)
    _main (multiprocessing/spawn.py:135)
    spawn_main (multiprocessing/spawn.py:122)
    <module> (<string>:1)
    0x7ff9a1574d90 (libc.so.6)

I believe verl does not even use vLLM async_engine...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I use this PR to test with the following script, it seems like we don't actually use EP.

Script:

NRL_FORCE_REBUILD_VENVS=true \
uv run python examples/run_eval.py \
    --config examples/configs/evals/eval.yaml \
    generation.model_name=Qwen/Qwen3-30B-A3B-Instruct-2507 \
    generation.vllm_cfg.async_engine=true/false \
    generation.vllm_cfg.expert_parallel_size=2 \
    data.num_samples=10 \
    cluster.gpus_per_node=2

Results:

# sync engine, which is using EP, you can see the EP rank is 0 and 1 seperately
(VllmGenerationWorker pid=387092) INFO 11-12 07:48:41 [parallel_state.py:1208] rank 0 in world size 2 is assigned as DP rank 0, PP rank 0, TP rank 0, EP rank 0
(VllmGenerationWorker pid=387091) INFO 11-12 07:48:41 [parallel_state.py:1208] rank 1 in world size 2 is assigned as DP rank 1, PP rank 0, TP rank 0, EP rank 1

# async engine, which seems not using EP
(VllmAsyncGenerationWorker pid=217853) (EngineCore_DP0 pid=219624) INFO 11-12 07:07:32 [parallel_state.py:1208] rank 0 in world size 1 is assigned as DP rank 0, PP rank 0, TP rank 0, EP rank 0
(VllmAsyncGenerationWorker pid=217854) (EngineCore_DP0 pid=219628) INFO 11-12 07:07:32 [parallel_state.py:1208] rank 0 in world size 1 is assigned as DP rank 0, PP rank 0, TP rank 0, EP rank 0

As I known, vLLM didn't support ep-size, it only supports enable_expert_parallel and will use all the DP * TP as its EP, that's why we need to use vLLM's DP for EP.

FYI, we write the vLLM DP logic here:

if self.expert_parallel_size > self.tensor_parallel_size:
# set vLLM DP rank
world_size = int(os.environ["VLLM_DP_SIZE"]) * model_parallel_size
rank = int(os.environ["RANK"]) % world_size
os.environ["VLLM_DP_RANK"] = str(rank // model_parallel_size)
os.environ["VLLM_DP_RANK_LOCAL"] = str((rank % 8) // model_parallel_size)
# set vLLM DP address and port
leader_rank = int(os.environ["RANK"]) // world_size * world_size
addr_list = eval(os.environ["AVAILABLE_ADDR_LIST"])
port_list = eval(os.environ["AVAILABLE_PORT_LIST"])
os.environ["VLLM_DP_MASTER_IP"] = addr_list[leader_rank]
os.environ["VLLM_DP_MASTER_PORT"] = str(port_list[leader_rank])

and set enable_expert_parallel here:
enable_expert_parallel=self.enable_expert_parallel,

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants