Skip to content

StreamMailbox race fix and new pulsing backend bench#97

Merged
JimyMa merged 3 commits into
mainfrom
fix_rpc_race
May 11, 2026
Merged

StreamMailbox race fix and new pulsing backend bench#97
JimyMa merged 3 commits into
mainfrom
fix_rpc_race

Conversation

@JimyMa
Copy link
Copy Markdown
Contributor

@JimyMa JimyMa commented May 11, 2026

Motivation

The SlimeRPC bench was broken on HEAD: [PeerAgent] refine peer_agent api (PR #88) replaced set_desired_topology([...]) / wait_for_peers([...]) with a connect_to(peer, ...).wait() pair, but the bench scripts still called driver.set_desired_topology(...) and crashed with AttributeError: 'PeerAgent' object has no attribute 'set_desired_topology'. Porting the bench to the new API then surfaced a latent multi-process handshake race in StreamMailbox.start(): worker connected, driver hung forever on its wait(), and worker's serve() timed out on _wait_for_mr.

This branch fixes both, and adds an opt-in Pulsing (@pul.remote) baseline to the bench so SlimeRPC can be compared against a second actor framework on the same payload sizes and metrics.

Library fix — StreamMailbox.start() no longer deletes stream:<alias>

dlslime/peer_agent/_mailbox.py previously did an unconditional redis.delete(stream_key) as the first thing the listener did. In multi-process benches the worker boots first, its connect_to drives _try_connect_peer_inner through step B, and it XADDs qp_ready (with its QP info) to stream:bench-driver before the driver ever registers. Worker then sets _notified_peers for that conn_id, which the guard at _mailbox.py:300 uses to suppress retry XADDs. When the driver finally registers, its StreamMailbox.start() deletes stream:bench-driver — wiping the worker's message. The driver's listener never sees a qp_ready, never calls _mark_connection_connected, and PeerConnection.wait() hangs indefinitely. Symptom: worker prints Connected. Serving… (its own side gets driver's later qp_ready via the fast path) and then serve() times out inside _wait_for_mr because the driver never reached make_proxy.

Fix is the minimum surgical change: drop the DELETE and process from last_id = "0-0". The listen-loop already catches exceptions from _handle_message, so any truly stale qp_ready from a prior crashed session fails at endpoint.connect and is logged, while the current peer session's later qp_ready still completes the handshake. Comments in start() and _listen_loop() capture the reasoning so the DELETE is not re-introduced by a future cleanup pass.

SlimeRPC bench ported to the new PeerAgent API

bench/python/rpc_bench_slime_driver.py and rpc_bench_slime_worker.py now follow examples/python/rpc_example.py: agent.connect_to(peer, ib_port=1, qp_num=1).wait(timeout=120) replaces the old set_desired_topology([peer]) + wait_for_peers([peer], timeout_sec=120) pair. No other behavior change.

Pulsing baseline

bench/python/rpc_bench_pulsing.py (new) mirrors rpc_bench_ray.py: a @pul.remote EchoActor echoing raw bytes across the same SIZES array, the same warmup/iteration formula (max(50, min(500, 50_000_000 // size))), and the same CSV schema (size,avg_us,p50_us,p99_us,bw_gbps). Runs under asyncio.run(...) with await pul.init() / await pul.shutdown() bracketing.

bench/python/rpc_bench_compare.py now treats --pulsing as optional. When present, the table adds three Pulsing columns (avg / p99 / BW) and an S/Pul = Pulsing avg / SlimeRPC avg speedup column. When absent or the CSV is missing, the table collapses to the original Slime-vs-Ray layout and only prints S/Ray. The separator width is computed from len(header) rather than a fixed literal, so the rule always matches.

bench/python/run_rpc_bench.sh defaults Pulsing off. Opt-in via --with-pulsing or WITH_PULSING=1. Stage numbering is computed from the flag ([1/3]…[3/3] by default; [1/4]…[4/4] when Pulsing is on). The compare invocation passes --pulsing only when enabled, so the existing default bash bench/python/run_rpc_bench.sh contract — two CSVs, one comparison table — is preserved for users who do not have pulsing installed.

Docs

Benchmark docs updated in three places:

  • docs/src/guide/benchmark-rpc.md (mkdocs-canonical source referenced from docs/mkdocs.yml:125).
  • docs/benchmark-rpc.md (duplicate, historically kept in sync).
  • bench/README.md (directory-layout table + the "SlimeRPC vs Ray Benchmark" section).

New content covers: opt-in Pulsing baseline, --with-pulsing / WITH_PULSING=1, the S/Pul column, the optional bench/results/pulsing_rpc.csv output, and the pip install pulsing prerequisite.

Files touched

Area File Purpose
Library dlslime/peer_agent/_mailbox.py drop startup redis.delete(stream:<alias>); read from 0-0 with a note on why the delete was unsafe
Bench bench/python/rpc_bench_slime_driver.py port to connect_to(peer, ib_port=1, qp_num=1).wait(timeout=120)
Bench bench/python/rpc_bench_slime_worker.py port to connect_to(peer, ib_port=1, qp_num=1).wait(timeout=120)
Bench bench/python/rpc_bench_pulsing.py (new) Pulsing actor echo baseline; same SIZES/metrics/CSV schema as Ray
Bench bench/python/rpc_bench_compare.py --pulsing optional; Puls columns + S/Pul rendered only when present; separator width from header
Bench bench/python/run_rpc_bench.sh --with-pulsing / WITH_PULSING=1 gate; dynamic [N/TOTAL] stage numbering; Pulsing CSV path wired
Docs docs/src/guide/benchmark-rpc.md Pulsing opt-in, S/Pul, prerequisites, outputs
Docs docs/benchmark-rpc.md mirror of the mkdocs source
Docs bench/README.md directory-layout table + RPC bench section mention Pulsing

Verification

bash bench/python/run_rpc_bench.sh on a single host running NanoCtrl + Redis. Previous HEAD: driver hangs at Waiting for worker… until worker's serve() times out after 30 s with Timeout waiting for MR 'rpc:mailbox:bench-driver:bench-worker'. After this branch: both sides hand-shake, two CSVs written (slime_rpc.csv, ray_rpc.csv), comparison table prints with S/Ray.

bash bench/python/run_rpc_bench.sh --with-pulsing with pip install pulsing writes the 3rd CSV and prints the 3-way table with both S/Pul and S/Ray.

python bench/python/rpc_bench_compare.py --slime bench/results/slime_rpc.csv --ray bench/results/ray_rpc.csv prints the 2-way table; adding --pulsing bench/results/pulsing_rpc.csv swaps in the 3-way table. Missing-CSV sanity: --pulsing does-not-exist.csv prints Missing Pulsing results file: … and exits cleanly.

Deferred

  • Removing or event-gating the _notified_peers guard. It is still load-bearing to prevent qp_ready → qp_ready ping-pong between peers before either has completed handshake; dropping the startup DELETE already unblocks the bench, and the rare alias-reuse-after-crash edge case degrades gracefully (the listener's except in _listen_loop swallows the failed endpoint.connect).
  • XTRIM-by-minid cleanup of truly stale stream messages. Follow-up if pinned-alias reuse becomes a first-class operational mode.
  • Extending the Pulsing baseline beyond echo (multi-method / streaming / flatbuf variants, matching rpc_flatbuf_example.py).

@JimyMa JimyMa deployed to self-hosted-rdma May 11, 2026 03:36 — with GitHub Actions Active
@JimyMa JimyMa changed the title Fix rpc race Pulsing bench + StreamMailbox race fix May 11, 2026
@JimyMa JimyMa changed the title Pulsing bench + StreamMailbox race fix StreamMailbox race fix and new pulsing backend bench May 11, 2026
@JimyMa JimyMa merged commit 6e440d2 into main May 11, 2026
4 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant