RFC: Turn-boundary message batching
Status: Draft
Author: @brettchien
Date: 2026-04-26
Related: #78 (Session Management), #58 (per-connection locking) — as preconditions, not as design space
Contents
Summary
Within a single thread, openab today processes one user message per ACP turn. Messages that arrive while a turn is running become independent tokio::spawn tasks racing for the per-connection mutex; they end up dispatched as separate, sequential ACP turns.
This RFC proposes one structural change: introduce a per-thread message buffer, batched at turn boundaries. Messages arriving during an in-flight turn accumulate in the buffer; when the turn finishes, the buffer is dispatched as a single ACP turn carrying all accumulated messages as multiple ContentBlocks. The ACP agent — which has full session context and is paid for anyway — decides how to handle them (split, merge, ignore, ask back).
The broker does structural buffering only; semantic decisions are deferred to ACP. Time ≠ semantics, so the broker does not attempt rule-based or LLM-based topic detection.
Problem
Within one thread, humans don't message in turn-shaped units. Cases the current 1-msg-per-turn model handles poorly:
-
Stream-of-thought split — alice types three messages in 5 seconds:
- "can you check the build"
- "actually wait"
- "check the build and run the e2e tests"
M1 fires immediately; M2 and M3 queue at the mutex. Three sequential turns: turn 1 wastes work checking the build, turn 2 reacts to "wait" with no idea what's coming, turn 3 finally has the full intent — but turns 1 and 2 are already wasted output.
-
Late attachment / clarification — text question, then 8 seconds later the screenshot or link. Today: two turns, the first answers blind to the screenshot.
-
Independent topics interleaved — two genuinely unrelated asks back-to-back. The proposal merges them into one ACP turn; the agent answers both in one response. Acceptable — agents handle multi-intent prompts well.
The first case is the bulk of the problem. The second is the same shape (fragmented intent across messages). The third is a constraint: don't pretend to split topics with rules — let ACP decide.
Current state (within one thread)
After RFC #78 §2b, each thread has its own Arc<Mutex<AcpConnection>> (pool.rs:15). Inter-thread isolation is solved. This RFC zooms inside one thread.
One lock cycle = one user message
adapter.rs:181 → pool.with_connection(thread_key, |conn| { ... }) (pool.rs:223) calls conn.session_prompt(content_blocks).await (adapter.rs:240) exactly once per call. content_blocks is built from one user message — its prompt text plus that message's own image / transcript blocks (adapter.rs:131-152). Multiple ContentBlocks in one turn means "one message with multiple media parts," never "multiple messages."
Where pending messages live today
There is no buffer and no queue in the codebase. When M1, M2, M3 arrive on the same thread while M0's turn is streaming, they become N independent tokio::spawn tasks (discord.rs:600-608), each parked on the per-connection mutex.
Tokio's Mutex keeps a fair-ish FIFO LinkedList<Waker>, but docs do not guarantee strict FIFO. The mutex sees only "someone is waiting" — wakers are opaque, so it cannot enumerate pending messages, inspect content, or batch them. Batching therefore can't be retrofitted onto the mutex; it requires an explicit queue at a layer that owns the message data.
So the question this RFC answers is:
Within one thread, when messages arrive during an in-flight turn, how should they be mapped to ACP turns?
Today's implicit answer: "spawn N independent futures, let the mutex serialize them, run them as N sequential turns." This RFC's answer: "send them into a per-thread bounded mpsc::channel; one consumer task drains and dispatches as one batch when the turn completes."
Goals & non-goals
Goals
- Replace 1 message → 1 turn with N messages-arrived-during-turn → 1 next turn within a single thread.
- Introduce the data structure required (an explicit per-thread bounded
mpsc::channel).
- Achieve deterministic message ordering as a side benefit.
Non-goals
| Concern |
Why not this RFC |
| Inter-thread isolation |
Already solved by per-connection mutex (RFC #78 §2b / #58). Precondition. |
| Cross-session blocking (#307) |
Different layer — about a new thread's session unable to start. |
| Pre-turn debouncing |
First message responds immediately; no artificial latency floor. |
| Topic detection / semantic grouping |
Deferred to ACP agent (it has the context and inference budget). |
| Cancelling / restarting in-flight turns |
Existing /cancel semantics unchanged. Mid-turn corrections are deferred to a future RFC. |
| Persisting buffer across pod restarts |
Buffer only exists during in-flight turn — restart loses the turn anyway. |
| Replacing the per-connection mutex |
The mutex stays exactly as RFC #78 §2b describes it. |
Proposed solution
Mechanism
state event action
────────────────────────────────────────────────────────────────────────
thread idle M1 arrives fire turn 1 with M1 immediately
turn 1 in flight M2 arrives send M2 into channel
turn 1 in flight M3 arrives send M3 into channel
turn 1 completes (consumer recv wakes) drain channel → fire turn 2 with [M2, M3]
turn 2 in flight M4 arrives send M4 into channel
turn 2 completes (consumer recv wakes) drain channel → fire turn 3 with [M4]
turn 3 completes (channel empty) consumer parks on recv → awaits next message
Two invariants:
- First message after idle has zero added latency — it fires immediately, just like today's per-message dispatch.
- At most one in-flight turn per thread — all buffering happens between turns, never within.
State
A new Dispatcher sits above SessionPool in the call graph. Per-thread keying matches the existing thread_id keying from pool.rs:15. Each active thread owns a bounded mpsc::channel (capacity = max_buffered_messages from config, default 10) and a long-lived consumer task that drains it.
struct ThreadHandle {
tx: mpsc::Sender<BufferedMessage>,
// consumer JoinHandle held for cleanup on idle eviction
_consumer: tokio::task::JoinHandle<()>,
}
pub struct Dispatcher {
per_thread: Mutex<HashMap<String, ThreadHandle>>,
router: Arc<Router>, // existing — calls handle_message
max_buffered_messages: usize, // from config; channel capacity per thread
}
BufferedMessage carries prompt, extra_blocks, sender_json, trigger_msg, arrived_at, sender_name — i.e. everything handle_message already takes, captured at ingest time. sender_name is a display name used for inline labelling inside merged batched prompts (the <message from="..."> tag — see Packing a batch below); it is not a stable user ID. Full struct in Appendix A.
Algorithm (sketch)
submit(thread_key, thread_channel, adapter, msg, other_bot_present):
- Acquire dispatcher mutex briefly to look up the per-thread
ThreadHandle. If absent, create the channel and spawn a consumer_loop task (capturing thread_channel, adapter, and other_bot_present for the consumer's lifetime); insert the handle.
- Drop the dispatcher mutex.
tx.send(msg).await — returns immediately if the channel has space; parks the calling task if the channel (capacity = max_buffered_messages) is full. Only this submit future blocks; the platform event loop is unaffected because submit runs inside its own tokio::spawn'd task per inbound message.
The extra parameters (over a minimal submit(thread_id, msg)) are needed because the Discord adapter is constructed lazily inside serenity's ready callback — see the rationale comment on Dispatcher::submit in Appendix A.
consumer_loop(thread_id) (one per active thread, lives until the channel closes):
rx.recv().await — blocks until at least one message arrives. (First message after idle has zero added latency: recv unblocks immediately when send completes.)
try_recv greedily until the channel is empty or batch.len() == max_buffered_messages.
- Dispatch the batch as one ACP turn via
pool.with_connection + session_prompt.
- Loop back to step 1.
Idle threads: when cleanup_idle evicts a thread, the dispatcher drops its ThreadHandle → Sender drops → channel closes → recv() returns None → consumer exits cleanly. No leader-election race; there is always exactly one consumer per active thread. Full reference implementation in Appendix A.
The new wiring at discord.rs:600 (Phase 1 wires Discord only; slack.rs:909 is a follow-up):
// Before:
tokio::spawn(async move { router.handle_message(...).await });
// After (per-message mode unchanged; batched mode goes through dispatcher):
tokio::spawn(async move {
dispatcher.submit(
thread_key, thread_channel, adapter, buf, other_bot_present,
).await;
});
Packing a batch into one ACP turn
adapter.rs:131-152 already builds Vec<ContentBlock> from one message. For a batch, the consumer merges all sub-messages into one combined Text block with a leading banner and per-sub-message <message index="N" from="..."> tags, then concatenates each sub-message's non-text extra_blocks in order:
Vec<ContentBlock>:
Text { "[Batched: 3 messages received during the previous turn — handle as one logical unit]\n\n
<message index=\"1\" from=\"alice\">\ncan you check the build\n</message>\n\n
<message index=\"2\" from=\"alice\">\nactually wait\n</message>\n\n
<message index=\"3\" from=\"alice\">\ncheck the build *and* run e2e tests\n</message>\n" }
// ...followed by each sub-message's extra_blocks (images, transcripts) in order
Single-message batches (batch.len() == 1) skip the banner entirely and dispatch verbatim, so Batched mode is byte-identical to PerMessage for isolated input.
Only the leading message's sender_json is forwarded to handle_message as the batch's SenderContext wrapper; per-sub-message attribution is carried by the <message from="..."> tags inside the combined text. The trailing message's trigger_msg is used to anchor reactions (Decision #3).
ACP session_prompt accepts Vec<ContentBlock> (connection.rs:426). No protocol change.
Why this design
Why doesn't the broker try to group messages semantically?
Any rule the broker could apply to "group" messages — same user, time window, reply chain, attachment proximity — observes only structural signals. Real grouping is semantic: was message N+1 a continuation of N's intent, an unrelated topic, a correction?
The broker has no way to answer that without an LLM. Spawning a small classifier LLM just for this adds cost, latency, and coupling — and the user's ACP session already has the full context and is the right place to make semantic decisions:
| Layer |
Right responsibility |
| Broker |
Structural buffering (collect during turn, drain at boundary) |
| ACP agent |
Semantic processing (merge, split, prioritize, ask back) |
So: the broker does not attempt grouping. It just collects messages that arrive while a turn is running and hands them all to the agent in the next turn. The agent — Claude Code, Cursor, Codex, etc. — handles "user said X, then said Y, then sent a screenshot" naturally. Time ≠ semantics.
Why turn-boundary, not debounce?
Debounce imposes a debounce_ms floor on every message's response latency, even isolated ones. Turn-boundary imposes nothing on isolated messages — the buffer only fills during an active turn, when the user is already waiting on the agent. The agent's turn duration is itself a natural "user is waiting" window, used for free.
Why at the broker layer, not the ACP client?
ACP coding CLIs — Claude Code, Cursor, Codex — consume one turn at a time: each session_prompt is one input → one response. They do not inspect incoming chat traffic and do not batch messages themselves. If multi-message buffering happens anywhere, it has to be at the broker.
Why per-thread, not per-channel or global?
Conversation scope in openab = thread. Per-thread keying matches the existing Arc<Mutex<AcpConnection>> keying (adapter.rs:154-161). Cross-thread merging would conflate independent conversations.
Benefits
These fall out of "N messages → 1 turn" — not the primary motivation, but real.
Token cost. Each ACP turn re-sends system + tools + accumulated history + new input. Three sequential turns:
T1 input = sys + tools + M1
T1 output = R1 ← may be wasted (e.g. "check build" before "wait")
T2 input = sys + tools + M1 + R1 + M2 ← R1 re-fed
T2 output = R2
T3 input = sys + tools + M1 + R1 + M2 + R2 + M3 ← R1, R2 re-fed
vs one batched turn: input = sys + tools + [M1, M2, M3], output = single response.
Saved tokens come from (in descending impact):
- Wasted intermediate output — turn 1's full output + tool execution before turn 3 supersedes it never gets generated.
- Redundant tool invocations — tools triggered by stale intent don't run.
- Intermediate responses re-fed — R1, R2 never become input for a later turn.
- Prompt cache — one prefix invalidation instead of three.
Latency. Three sequential turns ≈ T1 + T2 + T3 wall-clock; the batched path ≈ T1 + T_batch (M1 fires alone immediately; M2 and M3 merge into one follow-up turn). Leading-message latency is unchanged (M1 still fires immediately); subsequent-message latency drops because no per-message round-trip is paid.
These benefits scale with input fragmentation — exactly the workload this RFC targets. They do not apply to isolated messages (buffer never fills) or to truly unrelated overlapping messages from different users (no "wasted work" component, only prompt-prefix amortization).
Deterministic ordering. Today's same-thread ordering is approximate (tokio::spawn race + Tokio Mutex's not-strictly-FIFO waiter list). With a per-thread mpsc::channel:
| Property |
Today |
After RFC |
| Same-thread message ordering |
Approximate (μs-to-ms scheduler-dependent) |
Strict (mpsc FIFO) |
| Ordering-determining sync point |
Per-connection mutex's waker list (held during 30s+ ACP turn) |
Per-thread mpsc enqueue (μs handle lookup on dispatcher mutex). The per-connection mutex still wraps each ACP turn but no longer determines message order. |
| Contention window |
Seconds |
Microseconds |
If paranoid ordering is required, the consumer can sort_by_key(|m| m.arrived_at) after try_recv-greedy-drain. In practice Discord/Slack deliver per-channel events in order; this is overkill but cheap.
Tradeoffs
| Tradeoff |
Cost |
Mitigation |
| Turn 1 may run with stale info if user sends correction immediately after |
Real, by design |
Cost is one wasted turn — acceptable vs. paying debounce_ms on every message. |
| Multi-message batches lower the ACP-turn count visible to ops |
Real semantic shift |
bot_turns ingest counter (slack.rs:672-696) runs before the dispatcher, so per-message loop limits still fire correctly. The downstream "ACP turns" metric shifts to count batches; document it. |
/cancel cancels one ACP turn that may now contain multiple user messages |
Real |
Document: "cancel current ACP work; buffered messages fire next." Add /cancel-all in a follow-up if drop-on-cancel is wanted. |
cleanup_idle (pool.rs:295) uses last_active updated only inside session_prompt (connection.rs:430) — buffered-only sessions look idle |
Real |
submit must touch(last_active) on enqueue. |
Buffer fills (max_buffered_messages reached during a long turn) |
Real |
New submit futures park inside their per-message tokio::spawn'd task until the consumer drains. Per-thread backpressure only — platform event loop and other threads are unaffected. Memory cost = parked-task overhead × stuck-thread count, naturally bounded by human typing rate. |
| Consumer task panics or exits unexpectedly |
Real, low-prob |
Stale ThreadHandle.tx survives a dead consumer; the next submit returns SendError. The Appendix A sketch logs and drops the message; Phase 1 production code instead detects SendError, evicts the stale entry, and retries — or_insert_with then re-spawns a fresh consumer. In-flight batch is lost (same blast radius as a pod restart mid-turn). |
| Behavior change observable to every user of an opted-in channel |
Real |
Opt-in via mode flag for v1; revisit defaulting after validation. |
Multi-party interaction & bot ingest strategy
Buffer invariant: by the time a message reaches the buffer, it has already been determined to be intended for our agent. The buffer makes no addressing decisions and needs no awareness of mention rules. All multi-party complexity lives upstream.
Producer-side gates (already in place)
The Slack/Discord adapters apply a chain of filters before any message reaches submit:
| Gate |
Source |
Multi-party role |
allow_bot_messages (off / mentions / all) |
slack.rs:710-765 |
Decides whether bot messages enter at all. mentions = bot messages enter only when they explicitly @ us. |
allow_user_messages (involved / mentions / multibot-mentions) |
slack.rs:768-810 |
Decides which human messages we should respond to. multibot-mentions requires @us whenever another openab bot is in the thread. |
trusted_bot_ids |
config |
Whitelist for mentions / all modes. |
bot_turns consecutive-bot limit |
slack.rs:672-696 |
Loop guard. Per-message at ingest, not per batch. |
| Eager multi-bot detection |
slack.rs:649-657 |
Sets other_bot_present → triggers multibot-mentions semantics. |
Scenarios
(A) Multi-human, single bot — allow_user_messages = "involved". alice and bob both type at us; both pass the gate, both end up in the buffer, drained together. The merged prompt's <message index="N" from="..."> tags distinguish their sub-messages (see Packing a batch into one ACP turn); the agent decides whether to address them jointly or separately.
(B) Multi-bot — allow_user_messages = "multibot-mentions", allow_bot_messages = "mentions". A peer bot and us share a thread. Only messages explicitly @-ing us pass the gate; everything else is dropped before reaching the dispatcher. The buffer sees exactly what was directed at us.
(C) Bot-to-bot coordination — allow_bot_messages = "mentions" + trusted_bot_ids = [peer-bot]. peer-bot's @us messages enter; bot_turns increments at ingest. Batching cannot inflate the bot-turn count — limits fire before submit.
Implications for design
other_bot_present is a per-thread fact set upstream; the dispatcher does not need to compute it.
MAX_CONSECUTIVE_BOT_TURNS runs before submit; batching is downstream and cannot bypass it.
- Per-sub-message attribution in a merged batch is carried by the
<message from="..."> tag (the leading message's <sender_context>, from adapter.rs:131-152, still wraps the batch as a whole).
No new mode is required. The producer gates already produce the right input:
[platform event] → [gates: addressee + loop decisions] → [Dispatcher.submit] → [batched ACP turn]
Decisions (defaults proposed)
These have a clear default; reviewers can push back but the design works as stated. No open questions remain.
- Bot messages go through the same buffer as human messages. Producer gates already gatekeep what enters; once admitted, a bot message is just another
ContentBlock with sender attribution. Bypassing for bots would re-introduce the very fragmentation problem this RFC solves.
- Bot-turn-limit counts batches as turns (one ACP invocation = one logical turn). The per-message ingest counter is unchanged.
- Reactions track the trailing (most-recently-enqueued) message of a batch. The leading message may have arrived 30+ seconds ago and already looks "read" to the user; anchoring reactions on the newest message in the batch gives feedback on what the user just typed, which matches user expectation. Older messages in a batch don't get their own reaction stream until a later turn.
/cancel cancels the current ACP turn only; buffer is preserved. User can re-/cancel next turn if needed. A /cancel-all flag is a follow-up.
- Buffer cap configurable, default = 10, full = block (await space). Per-thread bounded
mpsc::channel(max_buffered_messages). When full, additional submit futures park inside their own tokio::spawn'd task until the consumer drains. Per-thread backpressure only — the platform event loop and other threads are unaffected. Operators can lower the cap (smaller batches, tighter memory) or raise it (tolerate longer agent stalls before backpressure kicks in).
- Default mode =
per-message for v1. Opt-in to batched per adapter. batched is strictly better for fragmented input and identical for isolated input, but the conservative default keeps the rollout safe; flipping the default is left to Phase 3 after a validation period.
Configuration & rollout
Conservative rollout — opt-in mode for v1:
[discord]
message_processing_mode = "per-message" # default — no behavior change
# Or:
message_processing_mode = "batched" # turn-boundary batching
max_buffered_messages = 10 # batched mode only; per-thread channel cap
# default 10; ignored in per-message mode
After a validation period (e.g. one minor release), revisit whether batched should become the default. The only tunable in batched mode is max_buffered_messages — no debounce, no grouper config. The mechanism is one channel + one consumer task per thread.
Sizing max_buffered_messages
The default of 10 covers human-only fragmentation comfortably (typical human typing rate caps at ~3 messages per 30 seconds). For multi-bot collaboration channels, however, peer bots can push burst rates well past that.
In one early production instance, the opt-in batched mode was deployed on a multi-bot channel where two peer bots (a code reviewer and a test engineer) replied to each other and to the primary agent. Sampling the three most-active threads (~300–1000 messages each, accumulated over several days):
| Thread |
Human msgs (max in 30s / 60s) |
Peer-bot msgs (max in 30s / 60s) |
All incoming (max in 30s / 60s) |
| Active project thread (~1000 msgs) |
3 / 3 |
12 / 16 |
12 / 16 |
| Status report thread (~360 msgs) |
2 / 3 |
11 / 20 |
11 / 20 |
| Task triage thread (~300 msgs) |
2 / 2 |
24 / 24 |
24 / 24 |
Humans alone never exceeded 3 messages in 30s; peer bots drove all of the burstiness. With max_buffered_messages = 10, channel-full backpressure would have engaged repeatedly on these threads (one had ten separate 60s windows with ≥10 incoming messages), splitting bursts into 2–3 turns and partially defeating the batching benefit.
After this sampling, the same deployment raised the cap to 30, sized for ~25% headroom over the largest observed 60s burst (24). At full cap the merged prompt is roughly 3k tokens of batched payload — within reasonable ACP-input range.
Guidance:
- Human-only adapters: 10 is fine.
- Multi-bot adapters: size to the observed peer-bot burst rate, with headroom. In practice 20–50 covers most multi-agent topologies. The trade-off is merged-prompt size at full cap (≈
cap × per-message-tokens).
- Backpressure ≠ data loss: when full,
submit parks the calling task per-thread; nothing is dropped. Undersizing only produces "more, smaller batches", not lost messages — so it's safe to start at the default and tune up after observing real burst patterns in dispatch debug logs (batch_size=N distribution).
Implementation phases
Phase 1 — Mechanism (single PR)
- New module: src/dispatch/mod.rs with Dispatcher + ThreadHandle + consumer_loop
- Add MessageProcessingMode enum to config (default = PerMessage)
- Hook discord.rs:600: branch on mode
PerMessage → existing tokio::spawn { handle_message } path (unchanged)
Batched → dispatcher.submit(thread_key, thread_channel, adapter,
BufferedMessage, other_bot_present)
Slack hookup (slack.rs:909) is a deliberate follow-up — Discord covers
the multi-bot validation target; Slack inherits unchanged per-message
behavior in the meantime.
- dispatch_batch packs multiple messages into one Vec<ContentBlock>
- last_active touch on enqueue (fix cleanup_idle interaction)
- SendError detection at submit: evict stale ThreadHandle + retry to re-spawn consumer (consumer-panic recovery)
- Reactions strategy: trailing message anchors batch progress (see Decision #3)
- Tests:
- single-message turn (batched mode == per-message for batch_size=1)
- 3-message fragmentation merges into one batch
- new message arrives mid-turn, joins next batch
- /cancel during batched turn does not drop buffer
- last_active touched on enqueue (cleanup_idle does not evict)
Phase 2 — Validation
- Roll out to a single channel (e.g. dev sandbox)
- Compare turn counts, latency distributions, user-reported quality
Phase 3 — Default flip (separate RFC if needed)
- Make `batched` the default mode
- Or: remove the mode flag entirely if no real-world reason to keep `per-message`
Prior art
Two adjacent systems solve "user typed multiple times in quick succession" with different trade-offs. Both are personal AI agent runtimes (single-user, agent loop bundled into the gateway process) — different shape from openab's multi-tenant broker, but the in-flight buffering problem is the same.
Hermes Agent / OpenClaw / current openab / this RFC
| Aspect |
Hermes Agent |
OpenClaw |
Current openab |
This RFC |
| Shape |
Single-user runtime, gateway = agent |
Single-user runtime |
Multi-tenant broker → external ACP CLI |
Same as current |
| First-message latency |
~2s (Discord adapter debounce) |
n/a observed |
0 (immediate dispatch) |
0 (preserved) |
| Adapter-level batching |
_pending_text_batches, _text_batch_split_delay_seconds (default 2s) — gateway/platforms/discord.py |
n/a observed |
None |
None (deliberate) |
| In-flight new message |
Single-slot _pending_messages[key] — overwrites prior + interrupt_event.set() cancels in-flight |
n/a observed |
N independent tokio::spawn tasks each park on per-thread mutex |
Send to per-thread bounded mpsc; consumer batches at turn boundary |
| Buffer data structure |
Dict[str, MessageEvent] (1 slot) |
— |
None (mutex waker list, opaque) |
bounded mpsc::channel (FIFO, default cap 10) |
| 3 fast messages → ACP turns |
1 turn, middle message dropped by overwrite |
— |
3 turns, intermediate output wasted |
2 turns (M1, then batch [M2, M3]) — no message lost |
| Mid-turn interrupt |
Yes (asyncio interrupt event) — agent loop is in-process |
— |
No |
No |
| Same-thread message ordering |
(1-slot makes this moot) |
— |
Approximate (Tokio Mutex not strictly FIFO) |
Strict (mpsc FIFO) |
| Per-key serialization |
asyncio.Event + _active_sessions dict |
stall-watchdog.ts (different concern: stall detection) |
KeyedAsyncQueue (per-key Semaphore, Slack) + Arc<Mutex<AcpConnection>> |
Inherited |
| Bot-message gating |
DISCORD_ALLOW_BOTS (off / mentions / all) |
n/a observed |
allow_bot_messages (3-value, borrowed from Hermes) |
Inherited |
| Stall UX feedback |
— |
stall-watchdog.ts |
reactions.rs stall_soft / stall_hard (borrowed from OpenClaw) |
Inherited |
Three trade-off axes
- Drop vs preserve. Hermes' single-slot overwrite drops middle messages in fast bursts; openab (current and RFC) preserves all.
- Interrupt vs wait for boundary. Hermes can interrupt the in-flight LLM call because the agent loop is in-process. openab cannot — the agent is an external ACP CLI that yields control only at turn end. This is an architectural constraint, not a design choice. The RFC turns it into a feature: the existing turn duration is the natural buffering window, with no added latency for isolated messages.
- Debounce vs piggyback. Hermes' Discord adapter pays ~2s per message regardless. The RFC pays 0 for isolated messages — buffering only fills during an active turn, when the user is already waiting on the agent.
Other prior art
HTTP request coalescing in proxies (Varnish, nginx) — same shape ("while one request is in flight, batch / dedupe others") in a different domain.
Appendix A: Reference implementation
This sketch matches the MVP shape in src/dispatch/mod.rs. A few signatures
carry extra parameters not present in earlier drafts of this RFC; the
rationale is inline below.
use std::time::Instant;
use tokio::sync::{mpsc, Mutex};
struct BufferedMessage {
prompt: String,
extra_blocks: Vec<ContentBlock>,
sender_json: String,
trigger_msg: MessageRef,
arrived_at: Instant,
/// Display name for inline batch labelling (the `<message from="...">` tag
/// inside merged prompts). Not a stable user ID.
sender_name: String,
}
struct ThreadHandle {
tx: mpsc::Sender<BufferedMessage>,
_consumer: tokio::task::JoinHandle<()>,
}
pub struct Dispatcher {
per_thread: Mutex<HashMap<String, ThreadHandle>>,
router: Arc<Router>,
max_buffered_messages: usize, // from config (default: 10)
}
impl Dispatcher {
/// `adapter` and `other_bot_present` are passed per-call (rather than stored
/// on the Dispatcher) because the Discord adapter is constructed inside
/// serenity's `ready` callback via `OnceLock` — well after the Dispatcher
/// itself is built in `main.rs`. Per-call passing avoids that chicken-and-
/// egg without a second `OnceLock` here. In practice all submits for a
/// given thread share the same adapter Arc; only the first one (which
/// spawns the consumer) is captured for that thread's lifetime.
pub async fn submit(
&self,
thread_key: String,
thread_channel: ChannelRef,
adapter: Arc<dyn ChatAdapter>,
msg: BufferedMessage,
other_bot_present: bool,
) {
// Pull these out of `self` before locking so the or_insert_with closure
// doesn't try to re-borrow `self` while `self.per_thread` is locked.
let cap = self.max_buffered_messages;
let router = Arc::clone(&self.router);
let tx = {
let mut map = self.per_thread.lock().await;
map.entry(thread_key.clone())
.or_insert_with(|| {
let (tx, rx) = mpsc::channel(cap);
let consumer = tokio::spawn(consumer_loop(
thread_key.clone(), thread_channel, rx, router,
adapter, cap, other_bot_present,
));
ThreadHandle { tx, _consumer: consumer }
})
.tx
.clone()
};
// dispatcher mutex released — held only to look up/insert the handle
// send.await parks the calling task if the channel (cap = max_buffered_messages) is full.
// Only this submit future blocks; the platform event loop is unaffected
// because submit runs inside its own per-message tokio::spawn'd task.
if tx.send(msg).await.is_err() {
// Consumer has exited — see Tradeoffs row "Consumer task panics".
// Phase 1 production code evicts the stale ThreadHandle and retries
// so or_insert_with re-spawns a fresh consumer.
warn!(thread_key, "consumer task has exited; message dropped");
}
}
}
async fn consumer_loop(
thread_key: String,
thread_channel: ChannelRef,
mut rx: mpsc::Receiver<BufferedMessage>,
router: Arc<Router>,
adapter: Arc<dyn ChatAdapter>,
max_batch: usize,
other_bot_present: bool,
) {
while let Some(first) = rx.recv().await {
// Greedy drain up to max_batch messages already enqueued.
let mut batch = vec![first];
while batch.len() < max_batch {
match rx.try_recv() {
Ok(more) => batch.push(more),
Err(_) => break,
}
}
// Process batch as ONE ACP turn
// - if batch.len() == 1: identical to today's single-message dispatch
// - if batch.len() > 1: merge into one combined Text block with
// [Batched: N messages...] banner +
// <message index="N" from="..."> tags
// (see "Packing a batch into one ACP turn")
dispatch_batch(&router, &adapter, &thread_channel, batch, other_bot_present).await;
}
// recv() returned None → all senders dropped → cleanup_idle evicted us. Exit cleanly.
}
RFC: Turn-boundary message batching
Status: Draft
Author: @brettchien
Date: 2026-04-26
Related: #78 (Session Management), #58 (per-connection locking) — as preconditions, not as design space
Contents
Summary
Within a single thread, openab today processes one user message per ACP turn. Messages that arrive while a turn is running become independent
tokio::spawntasks racing for the per-connection mutex; they end up dispatched as separate, sequential ACP turns.This RFC proposes one structural change: introduce a per-thread message buffer, batched at turn boundaries. Messages arriving during an in-flight turn accumulate in the buffer; when the turn finishes, the buffer is dispatched as a single ACP turn carrying all accumulated messages as multiple
ContentBlocks. The ACP agent — which has full session context and is paid for anyway — decides how to handle them (split, merge, ignore, ask back).The broker does structural buffering only; semantic decisions are deferred to ACP. Time ≠ semantics, so the broker does not attempt rule-based or LLM-based topic detection.
Problem
Within one thread, humans don't message in turn-shaped units. Cases the current 1-msg-per-turn model handles poorly:
Stream-of-thought split — alice types three messages in 5 seconds:
M1 fires immediately; M2 and M3 queue at the mutex. Three sequential turns: turn 1 wastes work checking the build, turn 2 reacts to "wait" with no idea what's coming, turn 3 finally has the full intent — but turns 1 and 2 are already wasted output.
Late attachment / clarification — text question, then 8 seconds later the screenshot or link. Today: two turns, the first answers blind to the screenshot.
Independent topics interleaved — two genuinely unrelated asks back-to-back. The proposal merges them into one ACP turn; the agent answers both in one response. Acceptable — agents handle multi-intent prompts well.
The first case is the bulk of the problem. The second is the same shape (fragmented intent across messages). The third is a constraint: don't pretend to split topics with rules — let ACP decide.
Current state (within one thread)
After RFC #78 §2b, each thread has its own
Arc<Mutex<AcpConnection>>(pool.rs:15). Inter-thread isolation is solved. This RFC zooms inside one thread.One lock cycle = one user message
adapter.rs:181→pool.with_connection(thread_key, |conn| { ... })(pool.rs:223) callsconn.session_prompt(content_blocks).await(adapter.rs:240) exactly once per call.content_blocksis built from one user message — its prompt text plus that message's own image / transcript blocks (adapter.rs:131-152). MultipleContentBlocks in one turn means "one message with multiple media parts," never "multiple messages."Where pending messages live today
There is no buffer and no queue in the codebase. When M1, M2, M3 arrive on the same thread while M0's turn is streaming, they become N independent
tokio::spawntasks (discord.rs:600-608), each parked on the per-connection mutex.Tokio's
Mutexkeeps a fair-ish FIFOLinkedList<Waker>, but docs do not guarantee strict FIFO. The mutex sees only "someone is waiting" — wakers are opaque, so it cannot enumerate pending messages, inspect content, or batch them. Batching therefore can't be retrofitted onto the mutex; it requires an explicit queue at a layer that owns the message data.So the question this RFC answers is:
Today's implicit answer: "spawn N independent futures, let the mutex serialize them, run them as N sequential turns." This RFC's answer: "send them into a per-thread bounded
mpsc::channel; one consumer task drains and dispatches as one batch when the turn completes."Goals & non-goals
Goals
mpsc::channel).Non-goals
/cancelsemantics unchanged. Mid-turn corrections are deferred to a future RFC.Proposed solution
Mechanism
Two invariants:
State
A new
Dispatchersits aboveSessionPoolin the call graph. Per-thread keying matches the existingthread_idkeying frompool.rs:15. Each active thread owns a boundedmpsc::channel(capacity =max_buffered_messagesfrom config, default10) and a long-lived consumer task that drains it.BufferedMessagecarriesprompt,extra_blocks,sender_json,trigger_msg,arrived_at,sender_name— i.e. everythinghandle_messagealready takes, captured at ingest time.sender_nameis a display name used for inline labelling inside merged batched prompts (the<message from="...">tag — see Packing a batch below); it is not a stable user ID. Full struct in Appendix A.Algorithm (sketch)
submit(thread_key, thread_channel, adapter, msg, other_bot_present):ThreadHandle. If absent, create the channel and spawn aconsumer_looptask (capturingthread_channel,adapter, andother_bot_presentfor the consumer's lifetime); insert the handle.tx.send(msg).await— returns immediately if the channel has space; parks the calling task if the channel (capacity =max_buffered_messages) is full. Only thissubmitfuture blocks; the platform event loop is unaffected becausesubmitruns inside its owntokio::spawn'd task per inbound message.The extra parameters (over a minimal
submit(thread_id, msg)) are needed because the Discord adapter is constructed lazily inside serenity'sreadycallback — see the rationale comment onDispatcher::submitin Appendix A.consumer_loop(thread_id)(one per active thread, lives until the channel closes):rx.recv().await— blocks until at least one message arrives. (First message after idle has zero added latency:recvunblocks immediately when send completes.)try_recvgreedily until the channel is empty orbatch.len() == max_buffered_messages.pool.with_connection+session_prompt.Idle threads: when
cleanup_idleevicts a thread, the dispatcher drops itsThreadHandle→Senderdrops → channel closes →recv()returnsNone→ consumer exits cleanly. No leader-election race; there is always exactly one consumer per active thread. Full reference implementation in Appendix A.The new wiring at
discord.rs:600(Phase 1 wires Discord only;slack.rs:909is a follow-up):Packing a batch into one ACP turn
adapter.rs:131-152already buildsVec<ContentBlock>from one message. For a batch, the consumer merges all sub-messages into one combinedTextblock with a leading banner and per-sub-message<message index="N" from="...">tags, then concatenates each sub-message's non-textextra_blocksin order:Single-message batches (
batch.len() == 1) skip the banner entirely and dispatch verbatim, soBatchedmode is byte-identical toPerMessagefor isolated input.Only the leading message's
sender_jsonis forwarded tohandle_messageas the batch'sSenderContextwrapper; per-sub-message attribution is carried by the<message from="...">tags inside the combined text. The trailing message'strigger_msgis used to anchor reactions (Decision #3).ACP
session_promptacceptsVec<ContentBlock>(connection.rs:426). No protocol change.Why this design
Why doesn't the broker try to group messages semantically?
Any rule the broker could apply to "group" messages — same user, time window, reply chain, attachment proximity — observes only structural signals. Real grouping is semantic: was message N+1 a continuation of N's intent, an unrelated topic, a correction?
The broker has no way to answer that without an LLM. Spawning a small classifier LLM just for this adds cost, latency, and coupling — and the user's ACP session already has the full context and is the right place to make semantic decisions:
So: the broker does not attempt grouping. It just collects messages that arrive while a turn is running and hands them all to the agent in the next turn. The agent — Claude Code, Cursor, Codex, etc. — handles "user said X, then said Y, then sent a screenshot" naturally. Time ≠ semantics.
Why turn-boundary, not debounce?
Debounce imposes a
debounce_msfloor on every message's response latency, even isolated ones. Turn-boundary imposes nothing on isolated messages — the buffer only fills during an active turn, when the user is already waiting on the agent. The agent's turn duration is itself a natural "user is waiting" window, used for free.Why at the broker layer, not the ACP client?
ACP coding CLIs — Claude Code, Cursor, Codex — consume one turn at a time: each
session_promptis one input → one response. They do not inspect incoming chat traffic and do not batch messages themselves. If multi-message buffering happens anywhere, it has to be at the broker.Why per-thread, not per-channel or global?
Conversation scope in openab = thread. Per-thread keying matches the existing
Arc<Mutex<AcpConnection>>keying (adapter.rs:154-161). Cross-thread merging would conflate independent conversations.Benefits
These fall out of "N messages → 1 turn" — not the primary motivation, but real.
Token cost. Each ACP turn re-sends
system + tools + accumulated history + new input. Three sequential turns:vs one batched turn:
input = sys + tools + [M1, M2, M3], output = single response.Saved tokens come from (in descending impact):
Latency. Three sequential turns ≈
T1 + T2 + T3wall-clock; the batched path ≈T1 + T_batch(M1 fires alone immediately; M2 and M3 merge into one follow-up turn). Leading-message latency is unchanged (M1 still fires immediately); subsequent-message latency drops because no per-message round-trip is paid.These benefits scale with input fragmentation — exactly the workload this RFC targets. They do not apply to isolated messages (buffer never fills) or to truly unrelated overlapping messages from different users (no "wasted work" component, only prompt-prefix amortization).
Deterministic ordering. Today's same-thread ordering is approximate (
tokio::spawnrace + Tokio Mutex's not-strictly-FIFO waiter list). With a per-threadmpsc::channel:If paranoid ordering is required, the consumer can
sort_by_key(|m| m.arrived_at)aftertry_recv-greedy-drain. In practice Discord/Slack deliver per-channel events in order; this is overkill but cheap.Tradeoffs
debounce_mson every message.bot_turnsingest counter (slack.rs:672-696) runs before the dispatcher, so per-message loop limits still fire correctly. The downstream "ACP turns" metric shifts to count batches; document it./cancelcancels one ACP turn that may now contain multiple user messages/cancel-allin a follow-up if drop-on-cancel is wanted.cleanup_idle(pool.rs:295) useslast_activeupdated only insidesession_prompt(connection.rs:430) — buffered-only sessions look idlesubmitmusttouch(last_active)on enqueue.max_buffered_messagesreached during a long turn)submitfutures park inside their per-messagetokio::spawn'd task until the consumer drains. Per-thread backpressure only — platform event loop and other threads are unaffected. Memory cost = parked-task overhead × stuck-thread count, naturally bounded by human typing rate.ThreadHandle.txsurvives a dead consumer; the nextsubmitreturnsSendError. The Appendix A sketch logs and drops the message; Phase 1 production code instead detectsSendError, evicts the stale entry, and retries —or_insert_withthen re-spawns a fresh consumer. In-flight batch is lost (same blast radius as a pod restart mid-turn).Multi-party interaction & bot ingest strategy
Buffer invariant: by the time a message reaches the buffer, it has already been determined to be intended for our agent. The buffer makes no addressing decisions and needs no awareness of mention rules. All multi-party complexity lives upstream.
Producer-side gates (already in place)
The Slack/Discord adapters apply a chain of filters before any message reaches
submit:allow_bot_messages(off / mentions / all)slack.rs:710-765mentions= bot messages enter only when they explicitly @ us.allow_user_messages(involved / mentions / multibot-mentions)slack.rs:768-810multibot-mentionsrequires@uswhenever another openab bot is in the thread.trusted_bot_idsmentions/allmodes.bot_turnsconsecutive-bot limitslack.rs:672-696slack.rs:649-657other_bot_present→ triggersmultibot-mentionssemantics.Scenarios
(A) Multi-human, single bot —
allow_user_messages = "involved". alice and bob both type at us; both pass the gate, both end up in the buffer, drained together. The merged prompt's<message index="N" from="...">tags distinguish their sub-messages (see Packing a batch into one ACP turn); the agent decides whether to address them jointly or separately.(B) Multi-bot —
allow_user_messages = "multibot-mentions",allow_bot_messages = "mentions". A peer bot and us share a thread. Only messages explicitly@-ing us pass the gate; everything else is dropped before reaching the dispatcher. The buffer sees exactly what was directed at us.(C) Bot-to-bot coordination —
allow_bot_messages = "mentions"+trusted_bot_ids = [peer-bot]. peer-bot's@usmessages enter;bot_turnsincrements at ingest. Batching cannot inflate the bot-turn count — limits fire beforesubmit.Implications for design
other_bot_presentis a per-thread fact set upstream; the dispatcher does not need to compute it.MAX_CONSECUTIVE_BOT_TURNSruns beforesubmit; batching is downstream and cannot bypass it.<message from="...">tag (the leading message's<sender_context>, fromadapter.rs:131-152, still wraps the batch as a whole).No new mode is required. The producer gates already produce the right input:
Decisions (defaults proposed)
These have a clear default; reviewers can push back but the design works as stated. No open questions remain.
ContentBlockwith sender attribution. Bypassing for bots would re-introduce the very fragmentation problem this RFC solves./cancelcancels the current ACP turn only; buffer is preserved. User can re-/cancelnext turn if needed. A/cancel-allflag is a follow-up.mpsc::channel(max_buffered_messages). When full, additionalsubmitfutures park inside their owntokio::spawn'd task until the consumer drains. Per-thread backpressure only — the platform event loop and other threads are unaffected. Operators can lower the cap (smaller batches, tighter memory) or raise it (tolerate longer agent stalls before backpressure kicks in).per-messagefor v1. Opt-in tobatchedper adapter.batchedis strictly better for fragmented input and identical for isolated input, but the conservative default keeps the rollout safe; flipping the default is left to Phase 3 after a validation period.Configuration & rollout
Conservative rollout — opt-in mode for v1:
After a validation period (e.g. one minor release), revisit whether
batchedshould become the default. The only tunable inbatchedmode ismax_buffered_messages— no debounce, no grouper config. The mechanism is one channel + one consumer task per thread.Sizing
max_buffered_messagesThe default of 10 covers human-only fragmentation comfortably (typical human typing rate caps at ~3 messages per 30 seconds). For multi-bot collaboration channels, however, peer bots can push burst rates well past that.
In one early production instance, the opt-in
batchedmode was deployed on a multi-bot channel where two peer bots (a code reviewer and a test engineer) replied to each other and to the primary agent. Sampling the three most-active threads (~300–1000 messages each, accumulated over several days):Humans alone never exceeded 3 messages in 30s; peer bots drove all of the burstiness. With
max_buffered_messages = 10, channel-full backpressure would have engaged repeatedly on these threads (one had ten separate 60s windows with ≥10 incoming messages), splitting bursts into 2–3 turns and partially defeating the batching benefit.After this sampling, the same deployment raised the cap to 30, sized for ~25% headroom over the largest observed 60s burst (24). At full cap the merged prompt is roughly 3k tokens of batched payload — within reasonable ACP-input range.
Guidance:
cap × per-message-tokens).submitparks the calling task per-thread; nothing is dropped. Undersizing only produces "more, smaller batches", not lost messages — so it's safe to start at the default and tune up after observing real burst patterns indispatchdebug logs (batch_size=Ndistribution).Implementation phases
Prior art
Two adjacent systems solve "user typed multiple times in quick succession" with different trade-offs. Both are personal AI agent runtimes (single-user, agent loop bundled into the gateway process) — different shape from openab's multi-tenant broker, but the in-flight buffering problem is the same.
Hermes Agent / OpenClaw / current openab / this RFC
_pending_text_batches,_text_batch_split_delay_seconds(default 2s) —gateway/platforms/discord.py_pending_messages[key]— overwrites prior +interrupt_event.set()cancels in-flighttokio::spawntasks each park on per-thread mutexmpsc; consumer batches at turn boundaryDict[str, MessageEvent](1 slot)mpsc::channel(FIFO, default cap 10)asyncio.Event+_active_sessionsdictstall-watchdog.ts(different concern: stall detection)KeyedAsyncQueue(per-key Semaphore, Slack) +Arc<Mutex<AcpConnection>>DISCORD_ALLOW_BOTS(off / mentions / all)allow_bot_messages(3-value, borrowed from Hermes)stall-watchdog.tsreactions.rsstall_soft / stall_hard (borrowed from OpenClaw)Three trade-off axes
Other prior art
HTTP request coalescing in proxies (Varnish, nginx) — same shape ("while one request is in flight, batch / dedupe others") in a different domain.
Appendix A: Reference implementation
This sketch matches the MVP shape in
src/dispatch/mod.rs. A few signaturescarry extra parameters not present in earlier drafts of this RFC; the
rationale is inline below.