Skip to content

Merge CommManager into Shell and IOPub#1031

Closed
lionel- wants to merge 11 commits intotask/iopub-comm-unificationfrom
task/dap-protocol-tests-sync-update
Closed

Merge CommManager into Shell and IOPub#1031
lionel- wants to merge 11 commits intotask/iopub-comm-unificationfrom
task/dap-protocol-tests-sync-update

Conversation

@lionel-
Copy link
Contributor

@lionel- lionel- commented Feb 9, 2026

Branched from #1027 (DAP integration tests)

This PR merges the comm manager into:

  • The IOPub thread, for outgoing comm_msg emission
  • The Shell thread, for incoming comm_msg dispatching and outgoing comm_open/comm_close emission

The main goal is for outgoing comm_msg and other IOPub messages to go through the same crossbeam channel. Previously, these took separate paths to the frontend:

  • IOPub messages: sender > iopub_tx > IOPub thread > ZMQ socket
  • Comm messages: sender > outgoing_tx on CommSocket > CommManager thread > iopub_tx > IOPub thread > ZMQ socket

The comm manager intermediary caused non-deterministic message ordering between comm and IOPub messages. When R code on the same thread emits a stream output followed by a comm message, or a comm message followed by an Idle boundary, the frontend could receive them in either order. After this PR, all comm outgoing messages go directly through iopub_tx, the same channel used by all other IOPub messages. Ordering is now deterministic: messages sent sequentially from the same thread arrive at the frontend in that order.

Motivation

While working on #1027 I observed non-deterministic ordering between busy/idle status messages (sent via IOPub) and stop_debug/start_debug comm messages (sent via the DAP comm through CommManager). The frontend handled this fine since these messages are consumed by different components, but it caused significant problems writing robust and deterministic protocol tests: tests could not make sequential assertions about message ordering without resorting to fuzzy accumulator-based matching that was fragile and hid bugs.

More generally, any code that depends on the relative ordering of comm messages and other IOPub messages has a potential race. With the unified path, ordering matches programmer expectations by construction, eliminating this class of issues.

The change also reduces overall complexity. The CommManager was a dedicated thread that existed solely to bridge comm channels to IOPub, using crossbeam::Select over a dynamic set of receivers and maintaining a lookup table for RPC headers. Removing it eliminates a thread and ~260 lines. The responsibilities it held are absorbed by Shell and IOPub, where they naturally belong: Shell already handles comm_open, comm_msg, and comm_close from the frontend, so it's the right place to manage the set of open comms. IOPub already serializes all messages to the frontend, so it's the right place to handle comm outgoing messages.

The direct payoff is the follow-up PR (task/dap-protocol-tests-sync-update), which leverages the ordering guarantee to replace fuzzy message matching with strict sequential assertions.

Architecture changes

Delete CommManager

The CommManager thread is removed entirely (~260 lines). It previously:

  • Held the list of open comms
  • Used crossbeam::Select over a dynamic set of comm outgoing_rx channels
  • Maintained a pending_rpcs HashMap to match RPC replies to their original Jupyter headers
  • Relayed comm messages to IOPub

All of these responsibilities are absorbed by Shell and IOPub.

CommOutgoingTx wrapper

A new CommOutgoingTx type wraps Sender<IOPubMessage> with a comm_id. When a comm handler calls outgoing_tx.send(msg), it sends IOPubMessage::CommOutgoing(comm_id, msg) through the IOPub channel. This is the key mechanism allowing message determinism.

The outgoing_rx side of CommSocket is removed. Comm handlers no longer have a dedicated receive channel for outgoing messages since there's no CommManager polling them.

CommMsg::Rpc carries the parent header

Previously, when the frontend sent an RPC to a comm, the Shell thread would store the Jupyter header in CommManager's pending_rpcs map. When the comm replied, CommManager would look up the header to create a properly parented IOPub message.

Now CommMsg::Rpc is a named struct that carries the parent_header directly:

CommMsg::Rpc {
    id: String,
    parent_header: JupyterHeader,
    data: Value,
}

The header travels with the message through the comm handler and back, so a lookup table is no longer needed.

Shell manages open comms

Shell now holds open_comms: Vec<CommSocket> and handles comm lifecycle directly:

  • Frontend-initiated comms (comm_open, comm_msg, comm_close on the Shell socket) are handled in-place
  • Backend-initiated comms (from comm_manager_rx) arrive when R code opens a comm (data explorer, variables pane, connections, etc.)

The challenge is that Shell is already in a zmq_poll() loop waiting on its ZMQ socket, but backend comm events arrive on a crossbeam channel. You can't mix ZMQ poll() and crossbeam Select in a single wait. To solve this problem, I generalised the solution implemented for StdIn in #58

The bridge mechanism was cleaned up and generalised to allow management of several sockets/entities. As part of this work, the "notifier" thread is now called the "channel bridge" thread, and the "forwarding" thread is called the "socket bridge" thread.

IOPub processes CommOutgoing

A new IOPubMessage::CommOutgoing(String, CommMsg) variant is handled in the IOPub thread. It flushes any buffered stream output before forwarding the comm message, preserving the ordering from the sender's perspective. This unconditional flush on every comm message is the key to the ordering guarantee. For the expected message rates it has negligible overhead (stream flushing is a no-op when the buffer is empty).

What's mechanical

About 70% of the diff is mechanical plumbing:

  • Threading iopub_tx through CommSocket::new() and every start() function that creates comms (data explorer, variables, connections, reticulate, plots, help)
  • Adapting tests to receive comm responses from an IOPub channel instead of outgoing_rx
  • Updating CommMsg::Rpc(id, data) to CommMsg::Rpc { id, parent_header, data } at all call sites

The core logic changes are in:

  • crates/amalthea/src/kernel.rs (channel bridge thread, Forwarder)
  • crates/amalthea/src/socket/shell.rs (comm management, RefCell removal)
  • crates/amalthea/src/socket/iopub.rs (CommOutgoing handling)
  • crates/amalthea/src/socket/comm.rs (CommOutgoingTx)

Testing

All existing tests pass and are adapted to the new routing. The test helper socket_rpc_request now receives responses from the IOPub channel. A new IOPubReceiverExt trait provides recv_comm_msg() for ergonomic comm message extraction in tests.

@lionel-
Copy link
Contributor Author

lionel- commented Feb 9, 2026

Wrong branch!

@lionel- lionel- closed this Feb 9, 2026
@github-actions github-actions bot locked and limited conversation to collaborators Feb 9, 2026
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant

Comments