Skip to content

Commit 78782b4

Browse files
BoykoNeovclaude
andcommitted
Add a regression test for the shell-channel reply-send wedge
Deterministically reproduces the strand precondition (a request queued on the dual-use shell ROUTER with its edge-triggered ZMQ_FD read edge already consumed) then performs the out-of-band reply send through the real SubshellManager._send_on_shell_channel path and asserts the queued request is still delivered to on_recv. Behavioural, not implementation-coupled: it passes whether the reply send re-arms the stream explicitly or is routed through it, and fails (times out) against a raw-send body. Deterministic (no timing race); verified on Windows, the edge-trigger behaviour it relies on is general. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
1 parent 2784c12 commit 78782b4

1 file changed

Lines changed: 166 additions & 0 deletions

File tree

tests/test_subshell_wedge.py

Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
"""Regression test for the dual-use shell ROUTER wedge (gh-1529).
2+
3+
ipykernel 7's shell ROUTER is read by a ``ZMQStream`` on the shell-channel thread while
4+
replies are sent back over the *same* socket out-of-band by
5+
``SubshellManager._send_on_shell_channel``. A raw ``send_multipart`` on that socket drains
6+
its edge-triggered ``ZMQ_FD`` read edge; because the stream never sees the send it is never
7+
re-armed, so a request that arrived concurrently can strand unread on a registered-but-
8+
non-readable fd. The kernel then goes idle and never replies -- an intermittent dropped
9+
``execute_request``, most visible on Windows but a generic libzmq edge-trigger behaviour.
10+
11+
This test reproduces the strand *precondition* deterministically -- a request queued on the
12+
ROUTER whose read edge has already been consumed, with the stream not yet having delivered
13+
it -- then performs the out-of-band reply send through the real code path and asserts the
14+
queued request is still delivered to ``on_recv``.
15+
16+
It is deliberately *behavioural*: it checks delivery, not how the fix is implemented, so it
17+
holds whether the reply send re-arms the stream explicitly or is routed through the stream.
18+
Without the fix the queued request is never delivered and the test fails (times out). The
19+
strand precondition is created with documented raw-``zmq`` operations rather than a timing
20+
race, so the test is deterministic. It relies on the libzmq ``ZMQ_FD`` edge-trigger
21+
behaviour, which is documented as general (not Windows-specific); it has been verified here
22+
on Windows, and CI confirms the other platforms.
23+
"""
24+
25+
from __future__ import annotations
26+
27+
import asyncio
28+
import threading
29+
import time
30+
31+
import zmq
32+
from tornado.ioloop import IOLoop
33+
from zmq.eventloop.zmqstream import ZMQStream
34+
35+
from ipykernel.subshell_manager import SubshellManager
36+
from ipykernel.thread import SHELL_CHANNEL_THREAD_NAME
37+
38+
TIMEOUT = 10.0
39+
40+
41+
def _run_on_loop(loop, func):
42+
"""Run ``func()`` on the loop thread, block until it finishes, return/raise its result."""
43+
box: dict = {}
44+
done = threading.Event()
45+
46+
def runner():
47+
try:
48+
box["result"] = func()
49+
except BaseException as exc: # noqa: BLE001 - re-raised on the calling thread
50+
box["error"] = exc
51+
finally:
52+
done.set()
53+
54+
loop.add_callback(runner)
55+
if not done.wait(TIMEOUT):
56+
msg = "callback did not complete on the shell-channel loop"
57+
raise TimeoutError(msg)
58+
if "error" in box:
59+
raise box["error"]
60+
return box.get("result")
61+
62+
63+
def test_concurrent_request_not_stranded_by_reply_send():
64+
context = zmq.Context()
65+
66+
# Shell ROUTER, read by a ZMQStream on the shell-channel loop -- exactly like the kernel.
67+
shell_socket = context.socket(zmq.ROUTER)
68+
port = shell_socket.bind_to_random_port("tcp://127.0.0.1")
69+
70+
client = context.socket(zmq.DEALER)
71+
client.setsockopt(zmq.IDENTITY, b"client-1")
72+
client.connect(f"tcp://127.0.0.1:{port}")
73+
74+
# An IOLoop in a thread named like the kernel's shell-channel thread: the
75+
# _send_on_shell_channel assert requires this exact thread name.
76+
loop_box: dict = {}
77+
loop_ready = threading.Event()
78+
79+
def run_loop():
80+
asyncio.set_event_loop(asyncio.new_event_loop())
81+
loop = IOLoop.current()
82+
loop_box["loop"] = loop
83+
loop.add_callback(loop_ready.set)
84+
loop.start()
85+
86+
thread = threading.Thread(target=run_loop, name=SHELL_CHANNEL_THREAD_NAME, daemon=True)
87+
thread.start()
88+
assert loop_ready.wait(TIMEOUT), "shell-channel loop did not start"
89+
loop = loop_box["loop"]
90+
91+
received: list[list[bytes]] = []
92+
got_message = threading.Event()
93+
stream = manager = None
94+
95+
try:
96+
# Build the shell stream and manager on the loop thread (add_handler must run there).
97+
def setup():
98+
_stream = ZMQStream(shell_socket, loop)
99+
100+
def on_recv(frames):
101+
received.append(frames)
102+
got_message.set()
103+
104+
_stream.on_recv(on_recv, copy=True)
105+
_manager = SubshellManager(context, loop, shell_socket, _stream)
106+
return _stream, _manager
107+
108+
stream, manager = _run_on_loop(loop, setup)
109+
110+
# Warmup: teach the ROUTER the client's route and let the stream drain to idle.
111+
client.send_multipart([b"warmup"])
112+
assert got_message.wait(TIMEOUT), "warmup request never delivered"
113+
routing_id = received[0][0]
114+
assert routing_id == b"client-1"
115+
116+
received.clear()
117+
got_message.clear()
118+
119+
def strand_then_reply():
120+
# Runs on the loop thread, so the stream's fd handler cannot interleave while
121+
# this callback executes -- that is what makes the strand deterministic.
122+
client.send_multipart([b"req-1"])
123+
124+
# Wait until the request is actually queued on the ROUTER. Reading EVENTS here
125+
# also consumes the edge-triggered read edge (libzmq ZMQ_FD corollary), so by
126+
# the time we exit this loop the request is queued and unread while the fd is
127+
# no longer readable -- the coalesced-edge strand precondition.
128+
deadline = time.monotonic() + TIMEOUT
129+
while not (shell_socket.events & zmq.POLLIN):
130+
if time.monotonic() > deadline:
131+
msg = "request never queued on the ROUTER"
132+
raise TimeoutError(msg)
133+
134+
assert not got_message.is_set(), "request delivered before the reply send"
135+
136+
# Out-of-band reply send through the real code path. With the fix this re-arms /
137+
# routes through the stream so the queued request is serviced; without it the
138+
# request stays stranded on the registered-but-non-readable fd.
139+
manager._send_on_shell_channel([routing_id, b"reply"])
140+
141+
_run_on_loop(loop, strand_then_reply)
142+
143+
assert got_message.wait(TIMEOUT), (
144+
"the concurrently-queued request was stranded by the out-of-band reply send "
145+
"and never delivered to on_recv -- the shell-channel wedge has regressed"
146+
)
147+
assert received and received[-1][-1] == b"req-1"
148+
finally:
149+
def teardown():
150+
if manager is not None:
151+
try:
152+
manager.close()
153+
except Exception: # noqa: BLE001 - best-effort teardown
154+
pass
155+
if stream is not None:
156+
stream.close()
157+
158+
try:
159+
_run_on_loop(loop, teardown)
160+
except Exception: # noqa: BLE001 - best-effort teardown
161+
pass
162+
loop.add_callback(loop.stop)
163+
thread.join(timeout=TIMEOUT)
164+
client.close()
165+
shell_socket.close()
166+
context.term()

0 commit comments

Comments
 (0)