Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<img src="docs/imgs/assets/logo.png" alt="DLSlime logo" width="44%">
</div>
<p align="center">
<a href="https://deeplink-org.github.io/DLSlime">Docs</a> |
<a href="https://deeplink-org.github.io/DLSlime"><img src="docs/imgs/assets/docs.png" width="16" height="16" style="vertical-align: middle;"> Docs </a> |
<a href="docs/roadmap.md"><img src="docs/imgs/assets/roadmap.svg" width="16" height="16" style="vertical-align: middle;"> Roadmap </a> |
<a href="https://join.slack.com/t/dlslime/shared_invite/zt-3e9zvercw-a89KI_Ig8N1UTaol_q6MXg"><img src="docs/imgs/assets/slack.svg" width="16" height="16" style="vertical-align: middle;"> Slack </a> |
<a href="docs/imgs/assets/wechat_qrcode.jpg"><img src="docs/imgs/assets/wechat.svg" width="16" height="16" style="vertical-align: middle;"> WeChat Group </a> |
Expand Down
2 changes: 1 addition & 1 deletion README_zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<img src="docs/imgs/assets/logo.png" alt="DLSlime logo" width="44%">
</div>
<p align="center">
<a href="https://deeplink-org.github.io/DLSlime">文档</a> |
<a href="https://deeplink-org.github.io/DLSlime"><img src="docs/imgs/assets/docs.png" width="16" height="16" style="vertical-align: middle;"> 文档 </a> |
<a href="docs/roadmap.md"><img src="docs/imgs/assets/roadmap.svg" width="16" height="16" style="vertical-align: middle;"> 路线图 </a> |
<a href="https://join.slack.com/t/dlslime/shared_invite/zt-3e9zvercw-a89KI_Ig8N1UTaol_q6MXg"><img src="docs/imgs/assets/slack.svg" width="16" height="16" style="vertical-align: middle;"> Slack </a> |
<a href="docs/imgs/assets/wechat_qrcode.jpg"><img src="docs/imgs/assets/wechat.svg" width="16" height="16" style="vertical-align: middle;"> 微信群 </a> |
Expand Down
16 changes: 13 additions & 3 deletions bench/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ so benchmark commands, hardware notes, and result files can evolve together.
| `python/endpoint_io_bench.py` | Endpoint-level I/O benchmark |
| `python/endpoint_sendrecv_bench.py` | Endpoint send/recv benchmark |
| `python/cache_bench.py` | DLSlimeCache benchmark |
| `python/run_rpc_bench.sh` | SlimeRPC vs Ray benchmark wrapper |
| `python/rpc_bench_*.py` | SlimeRPC and Ray benchmark implementations |
| `python/run_rpc_bench.sh` | SlimeRPC vs Ray (optionally + Pulsing) benchmark wrapper |
| `python/rpc_bench_*.py` | SlimeRPC, Ray, and Pulsing benchmark implementations |
| `results/` | CSV outputs and captured worker logs |

## Prerequisites
Expand Down Expand Up @@ -112,12 +112,21 @@ dlslime-cache stop
## SlimeRPC vs Ray Benchmark

The RPC benchmark compares SlimeRPC round-trip latency and bandwidth with a Ray
actor baseline.
actor baseline. A Pulsing (`@pul.remote`) actor baseline is available as an
opt-in third comparator.

```bash
bash bench/python/run_rpc_bench.sh
```

Include the Pulsing baseline (requires `pip install pulsing`):

```bash
bash bench/python/run_rpc_bench.sh --with-pulsing
# or
WITH_PULSING=1 bash bench/python/run_rpc_bench.sh
```

With explicit parameters:

```bash
Expand All @@ -132,6 +141,7 @@ The script writes:
```text
bench/results/slime_rpc.csv
bench/results/ray_rpc.csv
bench/results/pulsing_rpc.csv # only when --with-pulsing is passed
```

See [../docs/benchmark-rpc.md](../docs/benchmark-rpc.md) for the full RPC
Expand Down
91 changes: 68 additions & 23 deletions bench/python/rpc_bench_compare.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
#!/usr/bin/env python3
"""Print a side-by-side comparison of SlimeRPC vs Ray benchmark CSVs.
"""Print a side-by-side comparison of SlimeRPC, Ray (and optionally Pulsing) benchmark CSVs.

Usage:
python rpc_bench_compare.py [--slime results/slime_rpc.csv] [--ray results/ray_rpc.csv]
python rpc_bench_compare.py \\
[--slime results/slime_rpc.csv] \\
[--ray results/ray_rpc.csv] \\
[--pulsing results/pulsing_rpc.csv] # optional
"""

import argparse
Expand All @@ -22,10 +25,15 @@ def label(size: int) -> str:
def main():
default_dir = os.path.join(os.path.dirname(__file__), "..", "results")
parser = argparse.ArgumentParser(
description="Compare SlimeRPC vs Ray benchmark results"
description="Compare SlimeRPC vs Ray (and optionally Pulsing) benchmark results"
)
parser.add_argument("--slime", default=os.path.join(default_dir, "slime_rpc.csv"))
parser.add_argument("--ray", default=os.path.join(default_dir, "ray_rpc.csv"))
parser.add_argument(
"--pulsing",
default=None,
help="Optional Pulsing CSV; if omitted, Pulsing columns are skipped.",
)
args = parser.parse_args()

for path in (args.slime, args.ray):
Expand All @@ -37,25 +45,43 @@ def main():
slime = load(args.slime)
ray_r = load(args.ray)

common = sorted(set(slime) & set(ray_r))
pulsing = None
if args.pulsing:
if not os.path.exists(args.pulsing):
print(f"Missing Pulsing results file: {args.pulsing}")
print("Run rpc_bench_pulsing.py first, or omit --pulsing.")
return
pulsing = load(args.pulsing)

common = set(slime) & set(ray_r)
if pulsing is not None:
common &= set(pulsing)
common = sorted(common)
if not common:
print("No overlapping sizes between the two result files.")
print("No overlapping sizes between the result files.")
return

col = 12
sep = "-" * (
10 + 1 + col + 1 + col + 1 + col + 1 + col + 1 + col + 1 + col + 1 + 12
)

print(
"\n┌─ Avg Latency (µs) ─────────────────────────────────────────────────────────┐"
)
header = (
f"{'Size':<10} | "
f"{'Slime avg':>{col}} | {'Slime p99':>{col}} | {'Slime BW':>{col}} | "
f"{'Ray avg':>{col}} | {'Ray p99':>{col}} | {'Ray BW':>{col}} | "
f"{'Speedup':>10}"
)

if pulsing is not None:
header = (
f"{'Size':<10} | "
f"{'Slime avg':>{col}} | {'Slime p99':>{col}} | {'Slime BW':>{col}} | "
f"{'Puls avg':>{col}} | {'Puls p99':>{col}} | {'Puls BW':>{col}} | "
f"{'Ray avg':>{col}} | {'Ray p99':>{col}} | {'Ray BW':>{col}} | "
f"{'S/Pul':>10} | {'S/Ray':>10}"
)
else:
header = (
f"{'Size':<10} | "
f"{'Slime avg':>{col}} | {'Slime p99':>{col}} | {'Slime BW':>{col}} | "
f"{'Ray avg':>{col}} | {'Ray p99':>{col}} | {'Ray BW':>{col}} | "
f"{'S/Ray':>10}"
)
sep = "-" * len(header)
print(header)
print(sep)

Expand All @@ -66,17 +92,36 @@ def main():
ray_avg = float(ray_r[size]["avg_us"])
ray_p99 = float(ray_r[size]["p99_us"])
ray_bw = float(ray_r[size]["bw_gbps"])
speedup = ray_avg / sl_avg # >1 means SlimeRPC is faster
sp_ray = ray_avg / sl_avg # >1 means SlimeRPC is faster than Ray

if pulsing is not None:
pu_avg = float(pulsing[size]["avg_us"])
pu_p99 = float(pulsing[size]["p99_us"])
pu_bw = float(pulsing[size]["bw_gbps"])
sp_pul = pu_avg / sl_avg # >1 means SlimeRPC is faster than Pulsing
print(
f"{label(size):<10} | "
f"{sl_avg:>{col}.1f} | {sl_p99:>{col}.1f} | {sl_bw:>{col}.3f} | "
f"{pu_avg:>{col}.1f} | {pu_p99:>{col}.1f} | {pu_bw:>{col}.3f} | "
f"{ray_avg:>{col}.1f} | {ray_p99:>{col}.1f} | {ray_bw:>{col}.3f} | "
f"{'%.2fx' % sp_pul:>10} | {'%.2fx' % sp_ray:>10}"
)
else:
print(
f"{label(size):<10} | "
f"{sl_avg:>{col}.1f} | {sl_p99:>{col}.1f} | {sl_bw:>{col}.3f} | "
f"{ray_avg:>{col}.1f} | {ray_p99:>{col}.1f} | {ray_bw:>{col}.3f} | "
f"{'%.2fx' % sp_ray:>10}"
)

print(sep)
if pulsing is not None:
print(
f"{label(size):<10} | "
f"{sl_avg:>{col}.1f} | {sl_p99:>{col}.1f} | {sl_bw:>{col}.3f} | "
f"{ray_avg:>{col}.1f} | {ray_p99:>{col}.1f} | {ray_bw:>{col}.3f} | "
f"{'%.2fx' % speedup:>10}"
"S/Pul = Pulsing avg latency / SlimeRPC avg latency (>1 means SlimeRPC wins)"
)

print(sep)
print("Speedup = Ray avg latency / SlimeRPC avg latency (>1 means SlimeRPC wins)")
print(
"S/Ray = Ray avg latency / SlimeRPC avg latency (>1 means SlimeRPC wins)"
)
print()


Expand Down
113 changes: 113 additions & 0 deletions bench/python/rpc_bench_pulsing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
#!/usr/bin/env python3
"""Pulsing RPC benchmark — same-machine CPU bytes echo.

Measures round-trip latency for bytes echo via a Pulsing actor across a range
of payload sizes, using identical metrics as rpc_bench_slime_driver.py and
rpc_bench_ray.py so the three CSVs can be compared directly.

Usage:
python rpc_bench_pulsing.py [--out results/pulsing_rpc.csv]
"""

import argparse
import asyncio
import csv
import os
import time

import pulsing as pul


@pul.remote
class EchoActor:
def echo(self, data: bytes) -> bytes:
return data


SIZES = [
1 * 1024,
4 * 1024,
16 * 1024,
64 * 1024,
256 * 1024,
1 * 1024 * 1024,
4 * 1024 * 1024,
16 * 1024 * 1024,
64 * 1024 * 1024,
]


async def _benchmark(actor, data: bytes, warmup: int, iterations: int) -> dict:
for _ in range(warmup):
await actor.echo(data)

latencies_us = []
for _ in range(iterations):
t0 = time.perf_counter()
await actor.echo(data)
latencies_us.append((time.perf_counter() - t0) * 1e6)

latencies_us.sort()
n = len(latencies_us)
avg = sum(latencies_us) / n
return {
"avg_us": avg,
"p50_us": latencies_us[n // 2],
"p99_us": latencies_us[int(n * 0.99)],
"bw_gbps": (2 * len(data)) / 1e9 / (avg / 1e6),
}


def _label(size: int) -> str:
return f"{size // 1024}KB" if size < 1024 * 1024 else f"{size // (1024 * 1024)}MB"


async def _run(out_path: str):
await pul.init()
try:
actor = await EchoActor.spawn()

header = (
f"{'Size':<10} | {'Avg (µs)':<12} | {'P50 (µs)':<12} "
f"| {'P99 (µs)':<12} | {'BW (GB/s)':<10}"
)
print(header)
print("-" * len(header))

records = []
for size in SIZES:
data = bytes(size)
iters = max(50, min(500, 50_000_000 // size))
r = await _benchmark(actor, data, warmup=20, iterations=iters)
records.append({"size": size, **r})
print(
f"{_label(size):<10} | {r['avg_us']:<12.1f} | {r['p50_us']:<12.1f} "
f"| {r['p99_us']:<12.1f} | {r['bw_gbps']:<10.3f}"
)

os.makedirs(os.path.dirname(os.path.abspath(out_path)), exist_ok=True)
with open(out_path, "w", newline="") as f:
writer = csv.DictWriter(
f, fieldnames=["size", "avg_us", "p50_us", "p99_us", "bw_gbps"]
)
writer.writeheader()
writer.writerows(records)
print(f"\nResults saved → {out_path}")
finally:
await pul.shutdown()


def main():
parser = argparse.ArgumentParser(description="Pulsing RPC benchmark")
parser.add_argument(
"--out",
default=os.path.join(
os.path.dirname(__file__), "..", "results", "pulsing_rpc.csv"
),
)
args = parser.parse_args()
asyncio.run(_run(args.out))


if __name__ == "__main__":
main()
3 changes: 1 addition & 2 deletions bench/python/rpc_bench_slime_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,8 @@ def main():
driver._rpc_buffer_size = buf_bytes
driver._rpc_max_inflight = max(1, int(getattr(args, "max_inflight", 4)))

driver.set_desired_topology(["bench-worker"])
print("Waiting for worker…")
driver.wait_for_peers(["bench-worker"], timeout_sec=120)
driver.connect_to("bench-worker", ib_port=1, qp_num=1).wait(timeout=120)
print("Connected. Starting benchmark.\n")

w = make_proxy(driver, "bench-worker", EchoService)
Expand Down
3 changes: 1 addition & 2 deletions bench/python/rpc_bench_slime_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,8 @@ def main():
# buf_mb * SLIME_RPC_MAX_INFLIGHT bytes of CPU memory needlessly.
worker._rpc_max_inflight = max(1, int(getattr(args, "max_inflight", 4)))

worker.set_desired_topology(["bench-driver"])
print("Worker ready, waiting for driver to connect…")
worker.wait_for_peers(["bench-driver"], timeout_sec=120)
worker.connect_to("bench-driver", ib_port=1, qp_num=1).wait(timeout=120)
print("Connected. Serving (Ctrl-C to stop).")

try:
Expand Down
Loading