Merge CommManager into Shell and IOPub#1031
Closed
lionel- wants to merge 11 commits intotask/iopub-comm-unificationfrom
Closed
Merge CommManager into Shell and IOPub#1031lionel- wants to merge 11 commits intotask/iopub-comm-unificationfrom
lionel- wants to merge 11 commits intotask/iopub-comm-unificationfrom
Conversation
Contributor
Author
|
Wrong branch! |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to subscribe to this conversation on GitHub.
Already have an account?
Sign in.
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Branched from #1027 (DAP integration tests)
This PR merges the comm manager into:
comm_msgemissioncomm_msgdispatching and outgoingcomm_open/comm_closeemissionThe main goal is for outgoing
comm_msgand other IOPub messages to go through the same crossbeam channel. Previously, these took separate paths to the frontend:iopub_tx> IOPub thread > ZMQ socketoutgoing_txon CommSocket > CommManager thread >iopub_tx> IOPub thread > ZMQ socketThe comm manager intermediary caused non-deterministic message ordering between comm and IOPub messages. When R code on the same thread emits a stream output followed by a comm message, or a comm message followed by an Idle boundary, the frontend could receive them in either order. After this PR, all comm outgoing messages go directly through
iopub_tx, the same channel used by all other IOPub messages. Ordering is now deterministic: messages sent sequentially from the same thread arrive at the frontend in that order.Motivation
While working on #1027 I observed non-deterministic ordering between
busy/idlestatus messages (sent via IOPub) andstop_debug/start_debugcomm messages (sent via the DAP comm through CommManager). The frontend handled this fine since these messages are consumed by different components, but it caused significant problems writing robust and deterministic protocol tests: tests could not make sequential assertions about message ordering without resorting to fuzzy accumulator-based matching that was fragile and hid bugs.More generally, any code that depends on the relative ordering of comm messages and other IOPub messages has a potential race. With the unified path, ordering matches programmer expectations by construction, eliminating this class of issues.
The change also reduces overall complexity. The CommManager was a dedicated thread that existed solely to bridge comm channels to IOPub, using
crossbeam::Selectover a dynamic set of receivers and maintaining a lookup table for RPC headers. Removing it eliminates a thread and ~260 lines. The responsibilities it held are absorbed by Shell and IOPub, where they naturally belong: Shell already handlescomm_open,comm_msg, andcomm_closefrom the frontend, so it's the right place to manage the set of open comms. IOPub already serializes all messages to the frontend, so it's the right place to handle comm outgoing messages.The direct payoff is the follow-up PR (task/dap-protocol-tests-sync-update), which leverages the ordering guarantee to replace fuzzy message matching with strict sequential assertions.
Architecture changes
Delete CommManager
The
CommManagerthread is removed entirely (~260 lines). It previously:crossbeam::Selectover a dynamic set of commoutgoing_rxchannelspending_rpcsHashMap to match RPC replies to their original Jupyter headersAll of these responsibilities are absorbed by Shell and IOPub.
CommOutgoingTxwrapperA new
CommOutgoingTxtype wrapsSender<IOPubMessage>with acomm_id. When a comm handler callsoutgoing_tx.send(msg), it sendsIOPubMessage::CommOutgoing(comm_id, msg)through the IOPub channel. This is the key mechanism allowing message determinism.The
outgoing_rxside ofCommSocketis removed. Comm handlers no longer have a dedicated receive channel for outgoing messages since there's no CommManager polling them.CommMsg::Rpccarries the parent headerPreviously, when the frontend sent an RPC to a comm, the Shell thread would store the Jupyter header in CommManager's
pending_rpcsmap. When the comm replied, CommManager would look up the header to create a properly parented IOPub message.Now
CommMsg::Rpcis a named struct that carries theparent_headerdirectly:The header travels with the message through the comm handler and back, so a lookup table is no longer needed.
Shell manages open comms
Shell now holds
open_comms: Vec<CommSocket>and handles comm lifecycle directly:comm_open,comm_msg,comm_closeon the Shell socket) are handled in-placecomm_manager_rx) arrive when R code opens a comm (data explorer, variables pane, connections, etc.)The challenge is that Shell is already in a
zmq_poll()loop waiting on its ZMQ socket, but backend comm events arrive on a crossbeam channel. You can't mix ZMQpoll()and crossbeamSelectin a single wait. To solve this problem, I generalised the solution implemented for StdIn in #58The bridge mechanism was cleaned up and generalised to allow management of several sockets/entities. As part of this work, the "notifier" thread is now called the "channel bridge" thread, and the "forwarding" thread is called the "socket bridge" thread.
IOPub processes
CommOutgoingA new
IOPubMessage::CommOutgoing(String, CommMsg)variant is handled in the IOPub thread. It flushes any buffered stream output before forwarding the comm message, preserving the ordering from the sender's perspective. This unconditional flush on every comm message is the key to the ordering guarantee. For the expected message rates it has negligible overhead (stream flushing is a no-op when the buffer is empty).What's mechanical
About 70% of the diff is mechanical plumbing:
iopub_txthroughCommSocket::new()and everystart()function that creates comms (data explorer, variables, connections, reticulate, plots, help)outgoing_rxCommMsg::Rpc(id, data)toCommMsg::Rpc { id, parent_header, data }at all call sitesThe core logic changes are in:
crates/amalthea/src/kernel.rs(channel bridge thread, Forwarder)crates/amalthea/src/socket/shell.rs(comm management, RefCell removal)crates/amalthea/src/socket/iopub.rs(CommOutgoing handling)crates/amalthea/src/socket/comm.rs(CommOutgoingTx)Testing
All existing tests pass and are adapted to the new routing. The test helper
socket_rpc_requestnow receives responses from the IOPub channel. A newIOPubReceiverExttrait providesrecv_comm_msg()for ergonomic comm message extraction in tests.