StreamMailbox race fix and new pulsing backend bench#97
Merged
Conversation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Motivation
The SlimeRPC bench was broken on HEAD:
[PeerAgent] refine peer_agent api(PR #88) replacedset_desired_topology([...])/wait_for_peers([...])with aconnect_to(peer, ...).wait()pair, but the bench scripts still calleddriver.set_desired_topology(...)and crashed withAttributeError: 'PeerAgent' object has no attribute 'set_desired_topology'. Porting the bench to the new API then surfaced a latent multi-process handshake race inStreamMailbox.start(): worker connected, driver hung forever on itswait(), and worker'sserve()timed out on_wait_for_mr.This branch fixes both, and adds an opt-in Pulsing (
@pul.remote) baseline to the bench so SlimeRPC can be compared against a second actor framework on the same payload sizes and metrics.Library fix —
StreamMailbox.start()no longer deletesstream:<alias>dlslime/peer_agent/_mailbox.pypreviously did an unconditionalredis.delete(stream_key)as the first thing the listener did. In multi-process benches the worker boots first, itsconnect_todrives_try_connect_peer_innerthrough step B, and itXADDsqp_ready(with its QP info) tostream:bench-driverbefore the driver ever registers. Worker then sets_notified_peersfor thatconn_id, which the guard at_mailbox.py:300uses to suppress retry XADDs. When the driver finally registers, itsStreamMailbox.start()deletesstream:bench-driver— wiping the worker's message. The driver's listener never sees aqp_ready, never calls_mark_connection_connected, andPeerConnection.wait()hangs indefinitely. Symptom: worker printsConnected. Serving…(its own side gets driver's laterqp_readyvia the fast path) and thenserve()times out inside_wait_for_mrbecause the driver never reachedmake_proxy.Fix is the minimum surgical change: drop the
DELETEand process fromlast_id = "0-0". The listen-loop already catches exceptions from_handle_message, so any truly staleqp_readyfrom a prior crashed session fails atendpoint.connectand is logged, while the current peer session's laterqp_readystill completes the handshake. Comments instart()and_listen_loop()capture the reasoning so theDELETEis not re-introduced by a future cleanup pass.SlimeRPC bench ported to the new PeerAgent API
bench/python/rpc_bench_slime_driver.pyandrpc_bench_slime_worker.pynow followexamples/python/rpc_example.py:agent.connect_to(peer, ib_port=1, qp_num=1).wait(timeout=120)replaces the oldset_desired_topology([peer])+wait_for_peers([peer], timeout_sec=120)pair. No other behavior change.Pulsing baseline
bench/python/rpc_bench_pulsing.py(new) mirrorsrpc_bench_ray.py: a@pul.remoteEchoActorechoing raw bytes across the sameSIZESarray, the same warmup/iteration formula (max(50, min(500, 50_000_000 // size))), and the same CSV schema (size,avg_us,p50_us,p99_us,bw_gbps). Runs underasyncio.run(...)withawait pul.init()/await pul.shutdown()bracketing.bench/python/rpc_bench_compare.pynow treats--pulsingas optional. When present, the table adds three Pulsing columns (avg / p99 / BW) and anS/Pul = Pulsing avg / SlimeRPC avgspeedup column. When absent or the CSV is missing, the table collapses to the original Slime-vs-Ray layout and only printsS/Ray. The separator width is computed fromlen(header)rather than a fixed literal, so the rule always matches.bench/python/run_rpc_bench.shdefaults Pulsing off. Opt-in via--with-pulsingorWITH_PULSING=1. Stage numbering is computed from the flag ([1/3]…[3/3]by default;[1/4]…[4/4]when Pulsing is on). The compare invocation passes--pulsingonly when enabled, so the existing defaultbash bench/python/run_rpc_bench.shcontract — two CSVs, one comparison table — is preserved for users who do not havepulsinginstalled.Docs
Benchmark docs updated in three places:
docs/src/guide/benchmark-rpc.md(mkdocs-canonical source referenced fromdocs/mkdocs.yml:125).docs/benchmark-rpc.md(duplicate, historically kept in sync).bench/README.md(directory-layout table + the "SlimeRPC vs Ray Benchmark" section).New content covers: opt-in Pulsing baseline,
--with-pulsing/WITH_PULSING=1, theS/Pulcolumn, the optionalbench/results/pulsing_rpc.csvoutput, and thepip install pulsingprerequisite.Files touched
dlslime/peer_agent/_mailbox.pyredis.delete(stream:<alias>); read from0-0with a note on why the delete was unsafebench/python/rpc_bench_slime_driver.pyconnect_to(peer, ib_port=1, qp_num=1).wait(timeout=120)bench/python/rpc_bench_slime_worker.pyconnect_to(peer, ib_port=1, qp_num=1).wait(timeout=120)bench/python/rpc_bench_pulsing.py(new)bench/python/rpc_bench_compare.py--pulsingoptional; Puls columns +S/Pulrendered only when present; separator width from headerbench/python/run_rpc_bench.sh--with-pulsing/WITH_PULSING=1gate; dynamic[N/TOTAL]stage numbering; Pulsing CSV path wireddocs/src/guide/benchmark-rpc.mdS/Pul, prerequisites, outputsdocs/benchmark-rpc.mdbench/README.mdVerification
bash bench/python/run_rpc_bench.shon a single host running NanoCtrl + Redis. Previous HEAD: driver hangs atWaiting for worker…until worker'sserve()times out after 30 s withTimeout waiting for MR 'rpc:mailbox:bench-driver:bench-worker'. After this branch: both sides hand-shake, two CSVs written (slime_rpc.csv,ray_rpc.csv), comparison table prints withS/Ray.bash bench/python/run_rpc_bench.sh --with-pulsingwithpip install pulsingwrites the 3rd CSV and prints the 3-way table with bothS/PulandS/Ray.python bench/python/rpc_bench_compare.py --slime bench/results/slime_rpc.csv --ray bench/results/ray_rpc.csvprints the 2-way table; adding--pulsing bench/results/pulsing_rpc.csvswaps in the 3-way table. Missing-CSV sanity:--pulsing does-not-exist.csvprintsMissing Pulsing results file: …and exits cleanly.Deferred
_notified_peersguard. It is still load-bearing to preventqp_ready → qp_readyping-pong between peers before either has completed handshake; dropping the startupDELETEalready unblocks the bench, and the rare alias-reuse-after-crash edge case degrades gracefully (the listener'sexceptin_listen_loopswallows the failedendpoint.connect).rpc_flatbuf_example.py).