Skip to content

RFC: Turn-boundary message batching #580

@brettchien

Description

@brettchien

RFC: Turn-boundary message batching

Status: Draft
Author: @brettchien
Date: 2026-04-26
Related: #78 (Session Management), #58 (per-connection locking) — as preconditions, not as design space


Contents


Summary

Within a single thread, openab today processes one user message per ACP turn. Messages that arrive while a turn is running become independent tokio::spawn tasks racing for the per-connection mutex; they end up dispatched as separate, sequential ACP turns.

This RFC proposes one structural change: introduce a per-thread message buffer, batched at turn boundaries. Messages arriving during an in-flight turn accumulate in the buffer; when the turn finishes, the buffer is dispatched as a single ACP turn carrying all accumulated messages as multiple ContentBlocks. The ACP agent — which has full session context and is paid for anyway — decides how to handle them (split, merge, ignore, ask back).

The broker does structural buffering only; semantic decisions are deferred to ACP. Time ≠ semantics, so the broker does not attempt rule-based or LLM-based topic detection.


Problem

Within one thread, humans don't message in turn-shaped units. Cases the current 1-msg-per-turn model handles poorly:

  1. Stream-of-thought split — alice types three messages in 5 seconds:

    • "can you check the build"
    • "actually wait"
    • "check the build and run the e2e tests"

    M1 fires immediately; M2 and M3 queue at the mutex. Three sequential turns: turn 1 wastes work checking the build, turn 2 reacts to "wait" with no idea what's coming, turn 3 finally has the full intent — but turns 1 and 2 are already wasted output.

  2. Late attachment / clarification — text question, then 8 seconds later the screenshot or link. Today: two turns, the first answers blind to the screenshot.

  3. Independent topics interleaved — two genuinely unrelated asks back-to-back. The proposal merges them into one ACP turn; the agent answers both in one response. Acceptable — agents handle multi-intent prompts well.

The first case is the bulk of the problem. The second is the same shape (fragmented intent across messages). The third is a constraint: don't pretend to split topics with rules — let ACP decide.


Current state (within one thread)

After RFC #78 §2b, each thread has its own Arc<Mutex<AcpConnection>> (pool.rs:15). Inter-thread isolation is solved. This RFC zooms inside one thread.

One lock cycle = one user message

adapter.rs:181pool.with_connection(thread_key, |conn| { ... }) (pool.rs:223) calls conn.session_prompt(content_blocks).await (adapter.rs:240) exactly once per call. content_blocks is built from one user message — its prompt text plus that message's own image / transcript blocks (adapter.rs:131-152). Multiple ContentBlocks in one turn means "one message with multiple media parts," never "multiple messages."

Where pending messages live today

There is no buffer and no queue in the codebase. When M1, M2, M3 arrive on the same thread while M0's turn is streaming, they become N independent tokio::spawn tasks (discord.rs:600-608), each parked on the per-connection mutex.

Tokio's Mutex keeps a fair-ish FIFO LinkedList<Waker>, but docs do not guarantee strict FIFO. The mutex sees only "someone is waiting" — wakers are opaque, so it cannot enumerate pending messages, inspect content, or batch them. Batching therefore can't be retrofitted onto the mutex; it requires an explicit queue at a layer that owns the message data.

So the question this RFC answers is:

Within one thread, when messages arrive during an in-flight turn, how should they be mapped to ACP turns?

Today's implicit answer: "spawn N independent futures, let the mutex serialize them, run them as N sequential turns." This RFC's answer: "send them into a per-thread bounded mpsc::channel; one consumer task drains and dispatches as one batch when the turn completes."


Goals & non-goals

Goals

  • Replace 1 message → 1 turn with N messages-arrived-during-turn → 1 next turn within a single thread.
  • Introduce the data structure required (an explicit per-thread bounded mpsc::channel).
  • Achieve deterministic message ordering as a side benefit.

Non-goals

Concern Why not this RFC
Inter-thread isolation Already solved by per-connection mutex (RFC #78 §2b / #58). Precondition.
Cross-session blocking (#307) Different layer — about a new thread's session unable to start.
Pre-turn debouncing First message responds immediately; no artificial latency floor.
Topic detection / semantic grouping Deferred to ACP agent (it has the context and inference budget).
Cancelling / restarting in-flight turns Existing /cancel semantics unchanged. Mid-turn corrections are deferred to a future RFC.
Persisting buffer across pod restarts Buffer only exists during in-flight turn — restart loses the turn anyway.
Replacing the per-connection mutex The mutex stays exactly as RFC #78 §2b describes it.

Proposed solution

Mechanism

state                    event                              action
────────────────────────────────────────────────────────────────────────
thread idle              M1 arrives                          fire turn 1 with M1 immediately
turn 1 in flight         M2 arrives                          send M2 into channel
turn 1 in flight         M3 arrives                          send M3 into channel
turn 1 completes         (consumer recv wakes)               drain channel → fire turn 2 with [M2, M3]
turn 2 in flight         M4 arrives                          send M4 into channel
turn 2 completes         (consumer recv wakes)               drain channel → fire turn 3 with [M4]
turn 3 completes         (channel empty)                     consumer parks on recv → awaits next message

Two invariants:

  1. First message after idle has zero added latency — it fires immediately, just like today's per-message dispatch.
  2. At most one in-flight turn per thread — all buffering happens between turns, never within.

State

A new Dispatcher sits above SessionPool in the call graph. Per-thread keying matches the existing thread_id keying from pool.rs:15. Each active thread owns a bounded mpsc::channel (capacity = max_buffered_messages from config, default 10) and a long-lived consumer task that drains it.

struct ThreadHandle {
    tx: mpsc::Sender<BufferedMessage>,
    // consumer JoinHandle held for cleanup on idle eviction
    _consumer: tokio::task::JoinHandle<()>,
}

pub struct Dispatcher {
    per_thread: Mutex<HashMap<String, ThreadHandle>>,
    router: Arc<Router>,           // existing — calls handle_message
    max_buffered_messages: usize,  // from config; channel capacity per thread
}

BufferedMessage carries prompt, extra_blocks, sender_json, trigger_msg, arrived_at, sender_name — i.e. everything handle_message already takes, captured at ingest time. sender_name is a display name used for inline labelling inside merged batched prompts (the <message from="..."> tag — see Packing a batch below); it is not a stable user ID. Full struct in Appendix A.

Algorithm (sketch)

submit(thread_key, thread_channel, adapter, msg, other_bot_present):

  1. Acquire dispatcher mutex briefly to look up the per-thread ThreadHandle. If absent, create the channel and spawn a consumer_loop task (capturing thread_channel, adapter, and other_bot_present for the consumer's lifetime); insert the handle.
  2. Drop the dispatcher mutex.
  3. tx.send(msg).await — returns immediately if the channel has space; parks the calling task if the channel (capacity = max_buffered_messages) is full. Only this submit future blocks; the platform event loop is unaffected because submit runs inside its own tokio::spawn'd task per inbound message.

The extra parameters (over a minimal submit(thread_id, msg)) are needed because the Discord adapter is constructed lazily inside serenity's ready callback — see the rationale comment on Dispatcher::submit in Appendix A.

consumer_loop(thread_id) (one per active thread, lives until the channel closes):

  1. rx.recv().await — blocks until at least one message arrives. (First message after idle has zero added latency: recv unblocks immediately when send completes.)
  2. try_recv greedily until the channel is empty or batch.len() == max_buffered_messages.
  3. Dispatch the batch as one ACP turn via pool.with_connection + session_prompt.
  4. Loop back to step 1.

Idle threads: when cleanup_idle evicts a thread, the dispatcher drops its ThreadHandleSender drops → channel closes → recv() returns None → consumer exits cleanly. No leader-election race; there is always exactly one consumer per active thread. Full reference implementation in Appendix A.

The new wiring at discord.rs:600 (Phase 1 wires Discord only; slack.rs:909 is a follow-up):

// Before:
tokio::spawn(async move { router.handle_message(...).await });

// After (per-message mode unchanged; batched mode goes through dispatcher):
tokio::spawn(async move {
    dispatcher.submit(
        thread_key, thread_channel, adapter, buf, other_bot_present,
    ).await;
});

Packing a batch into one ACP turn

adapter.rs:131-152 already builds Vec<ContentBlock> from one message. For a batch, the consumer merges all sub-messages into one combined Text block with a leading banner and per-sub-message <message index="N" from="..."> tags, then concatenates each sub-message's non-text extra_blocks in order:

Vec<ContentBlock>:
  Text { "[Batched: 3 messages received during the previous turn — handle as one logical unit]\n\n
         <message index=\"1\" from=\"alice\">\ncan you check the build\n</message>\n\n
         <message index=\"2\" from=\"alice\">\nactually wait\n</message>\n\n
         <message index=\"3\" from=\"alice\">\ncheck the build *and* run e2e tests\n</message>\n" }
  // ...followed by each sub-message's extra_blocks (images, transcripts) in order

Single-message batches (batch.len() == 1) skip the banner entirely and dispatch verbatim, so Batched mode is byte-identical to PerMessage for isolated input.

Only the leading message's sender_json is forwarded to handle_message as the batch's SenderContext wrapper; per-sub-message attribution is carried by the <message from="..."> tags inside the combined text. The trailing message's trigger_msg is used to anchor reactions (Decision #3).

ACP session_prompt accepts Vec<ContentBlock> (connection.rs:426). No protocol change.


Why this design

Why doesn't the broker try to group messages semantically?

Any rule the broker could apply to "group" messages — same user, time window, reply chain, attachment proximity — observes only structural signals. Real grouping is semantic: was message N+1 a continuation of N's intent, an unrelated topic, a correction?

The broker has no way to answer that without an LLM. Spawning a small classifier LLM just for this adds cost, latency, and coupling — and the user's ACP session already has the full context and is the right place to make semantic decisions:

Layer Right responsibility
Broker Structural buffering (collect during turn, drain at boundary)
ACP agent Semantic processing (merge, split, prioritize, ask back)

So: the broker does not attempt grouping. It just collects messages that arrive while a turn is running and hands them all to the agent in the next turn. The agent — Claude Code, Cursor, Codex, etc. — handles "user said X, then said Y, then sent a screenshot" naturally. Time ≠ semantics.

Why turn-boundary, not debounce?

Debounce imposes a debounce_ms floor on every message's response latency, even isolated ones. Turn-boundary imposes nothing on isolated messages — the buffer only fills during an active turn, when the user is already waiting on the agent. The agent's turn duration is itself a natural "user is waiting" window, used for free.

Why at the broker layer, not the ACP client?

ACP coding CLIs — Claude Code, Cursor, Codex — consume one turn at a time: each session_prompt is one input → one response. They do not inspect incoming chat traffic and do not batch messages themselves. If multi-message buffering happens anywhere, it has to be at the broker.

Why per-thread, not per-channel or global?

Conversation scope in openab = thread. Per-thread keying matches the existing Arc<Mutex<AcpConnection>> keying (adapter.rs:154-161). Cross-thread merging would conflate independent conversations.


Benefits

These fall out of "N messages → 1 turn" — not the primary motivation, but real.

Token cost. Each ACP turn re-sends system + tools + accumulated history + new input. Three sequential turns:

T1 input  = sys + tools + M1
T1 output = R1                       ← may be wasted (e.g. "check build" before "wait")
T2 input  = sys + tools + M1 + R1 + M2     ← R1 re-fed
T2 output = R2
T3 input  = sys + tools + M1 + R1 + M2 + R2 + M3   ← R1, R2 re-fed

vs one batched turn: input = sys + tools + [M1, M2, M3], output = single response.

Saved tokens come from (in descending impact):

  1. Wasted intermediate output — turn 1's full output + tool execution before turn 3 supersedes it never gets generated.
  2. Redundant tool invocations — tools triggered by stale intent don't run.
  3. Intermediate responses re-fed — R1, R2 never become input for a later turn.
  4. Prompt cache — one prefix invalidation instead of three.

Latency. Three sequential turns ≈ T1 + T2 + T3 wall-clock; the batched path ≈ T1 + T_batch (M1 fires alone immediately; M2 and M3 merge into one follow-up turn). Leading-message latency is unchanged (M1 still fires immediately); subsequent-message latency drops because no per-message round-trip is paid.

These benefits scale with input fragmentation — exactly the workload this RFC targets. They do not apply to isolated messages (buffer never fills) or to truly unrelated overlapping messages from different users (no "wasted work" component, only prompt-prefix amortization).

Deterministic ordering. Today's same-thread ordering is approximate (tokio::spawn race + Tokio Mutex's not-strictly-FIFO waiter list). With a per-thread mpsc::channel:

Property Today After RFC
Same-thread message ordering Approximate (μs-to-ms scheduler-dependent) Strict (mpsc FIFO)
Ordering-determining sync point Per-connection mutex's waker list (held during 30s+ ACP turn) Per-thread mpsc enqueue (μs handle lookup on dispatcher mutex). The per-connection mutex still wraps each ACP turn but no longer determines message order.
Contention window Seconds Microseconds

If paranoid ordering is required, the consumer can sort_by_key(|m| m.arrived_at) after try_recv-greedy-drain. In practice Discord/Slack deliver per-channel events in order; this is overkill but cheap.


Tradeoffs

Tradeoff Cost Mitigation
Turn 1 may run with stale info if user sends correction immediately after Real, by design Cost is one wasted turn — acceptable vs. paying debounce_ms on every message.
Multi-message batches lower the ACP-turn count visible to ops Real semantic shift bot_turns ingest counter (slack.rs:672-696) runs before the dispatcher, so per-message loop limits still fire correctly. The downstream "ACP turns" metric shifts to count batches; document it.
/cancel cancels one ACP turn that may now contain multiple user messages Real Document: "cancel current ACP work; buffered messages fire next." Add /cancel-all in a follow-up if drop-on-cancel is wanted.
cleanup_idle (pool.rs:295) uses last_active updated only inside session_prompt (connection.rs:430) — buffered-only sessions look idle Real submit must touch(last_active) on enqueue.
Buffer fills (max_buffered_messages reached during a long turn) Real New submit futures park inside their per-message tokio::spawn'd task until the consumer drains. Per-thread backpressure only — platform event loop and other threads are unaffected. Memory cost = parked-task overhead × stuck-thread count, naturally bounded by human typing rate.
Consumer task panics or exits unexpectedly Real, low-prob Stale ThreadHandle.tx survives a dead consumer; the next submit returns SendError. The Appendix A sketch logs and drops the message; Phase 1 production code instead detects SendError, evicts the stale entry, and retries — or_insert_with then re-spawns a fresh consumer. In-flight batch is lost (same blast radius as a pod restart mid-turn).
Behavior change observable to every user of an opted-in channel Real Opt-in via mode flag for v1; revisit defaulting after validation.

Multi-party interaction & bot ingest strategy

Buffer invariant: by the time a message reaches the buffer, it has already been determined to be intended for our agent. The buffer makes no addressing decisions and needs no awareness of mention rules. All multi-party complexity lives upstream.

Producer-side gates (already in place)

The Slack/Discord adapters apply a chain of filters before any message reaches submit:

Gate Source Multi-party role
allow_bot_messages (off / mentions / all) slack.rs:710-765 Decides whether bot messages enter at all. mentions = bot messages enter only when they explicitly @ us.
allow_user_messages (involved / mentions / multibot-mentions) slack.rs:768-810 Decides which human messages we should respond to. multibot-mentions requires @us whenever another openab bot is in the thread.
trusted_bot_ids config Whitelist for mentions / all modes.
bot_turns consecutive-bot limit slack.rs:672-696 Loop guard. Per-message at ingest, not per batch.
Eager multi-bot detection slack.rs:649-657 Sets other_bot_present → triggers multibot-mentions semantics.

Scenarios

(A) Multi-human, single bot — allow_user_messages = "involved". alice and bob both type at us; both pass the gate, both end up in the buffer, drained together. The merged prompt's <message index="N" from="..."> tags distinguish their sub-messages (see Packing a batch into one ACP turn); the agent decides whether to address them jointly or separately.

(B) Multi-bot — allow_user_messages = "multibot-mentions", allow_bot_messages = "mentions". A peer bot and us share a thread. Only messages explicitly @-ing us pass the gate; everything else is dropped before reaching the dispatcher. The buffer sees exactly what was directed at us.

(C) Bot-to-bot coordination — allow_bot_messages = "mentions" + trusted_bot_ids = [peer-bot]. peer-bot's @us messages enter; bot_turns increments at ingest. Batching cannot inflate the bot-turn count — limits fire before submit.

Implications for design

  • other_bot_present is a per-thread fact set upstream; the dispatcher does not need to compute it.
  • MAX_CONSECUTIVE_BOT_TURNS runs before submit; batching is downstream and cannot bypass it.
  • Per-sub-message attribution in a merged batch is carried by the <message from="..."> tag (the leading message's <sender_context>, from adapter.rs:131-152, still wraps the batch as a whole).

No new mode is required. The producer gates already produce the right input:

[platform event] → [gates: addressee + loop decisions] → [Dispatcher.submit] → [batched ACP turn]

Decisions (defaults proposed)

These have a clear default; reviewers can push back but the design works as stated. No open questions remain.

  1. Bot messages go through the same buffer as human messages. Producer gates already gatekeep what enters; once admitted, a bot message is just another ContentBlock with sender attribution. Bypassing for bots would re-introduce the very fragmentation problem this RFC solves.
  2. Bot-turn-limit counts batches as turns (one ACP invocation = one logical turn). The per-message ingest counter is unchanged.
  3. Reactions track the trailing (most-recently-enqueued) message of a batch. The leading message may have arrived 30+ seconds ago and already looks "read" to the user; anchoring reactions on the newest message in the batch gives feedback on what the user just typed, which matches user expectation. Older messages in a batch don't get their own reaction stream until a later turn.
  4. /cancel cancels the current ACP turn only; buffer is preserved. User can re-/cancel next turn if needed. A /cancel-all flag is a follow-up.
  5. Buffer cap configurable, default = 10, full = block (await space). Per-thread bounded mpsc::channel(max_buffered_messages). When full, additional submit futures park inside their own tokio::spawn'd task until the consumer drains. Per-thread backpressure only — the platform event loop and other threads are unaffected. Operators can lower the cap (smaller batches, tighter memory) or raise it (tolerate longer agent stalls before backpressure kicks in).
  6. Default mode = per-message for v1. Opt-in to batched per adapter. batched is strictly better for fragmented input and identical for isolated input, but the conservative default keeps the rollout safe; flipping the default is left to Phase 3 after a validation period.

Configuration & rollout

Conservative rollout — opt-in mode for v1:

[discord]
message_processing_mode = "per-message"  # default — no behavior change

# Or:
message_processing_mode = "batched"      # turn-boundary batching
max_buffered_messages   = 10             # batched mode only; per-thread channel cap
                                          # default 10; ignored in per-message mode

After a validation period (e.g. one minor release), revisit whether batched should become the default. The only tunable in batched mode is max_buffered_messages — no debounce, no grouper config. The mechanism is one channel + one consumer task per thread.

Sizing max_buffered_messages

The default of 10 covers human-only fragmentation comfortably (typical human typing rate caps at ~3 messages per 30 seconds). For multi-bot collaboration channels, however, peer bots can push burst rates well past that.

In one early production instance, the opt-in batched mode was deployed on a multi-bot channel where two peer bots (a code reviewer and a test engineer) replied to each other and to the primary agent. Sampling the three most-active threads (~300–1000 messages each, accumulated over several days):

Thread Human msgs (max in 30s / 60s) Peer-bot msgs (max in 30s / 60s) All incoming (max in 30s / 60s)
Active project thread (~1000 msgs) 3 / 3 12 / 16 12 / 16
Status report thread (~360 msgs) 2 / 3 11 / 20 11 / 20
Task triage thread (~300 msgs) 2 / 2 24 / 24 24 / 24

Humans alone never exceeded 3 messages in 30s; peer bots drove all of the burstiness. With max_buffered_messages = 10, channel-full backpressure would have engaged repeatedly on these threads (one had ten separate 60s windows with ≥10 incoming messages), splitting bursts into 2–3 turns and partially defeating the batching benefit.

After this sampling, the same deployment raised the cap to 30, sized for ~25% headroom over the largest observed 60s burst (24). At full cap the merged prompt is roughly 3k tokens of batched payload — within reasonable ACP-input range.

Guidance:

  • Human-only adapters: 10 is fine.
  • Multi-bot adapters: size to the observed peer-bot burst rate, with headroom. In practice 20–50 covers most multi-agent topologies. The trade-off is merged-prompt size at full cap (≈ cap × per-message-tokens).
  • Backpressure ≠ data loss: when full, submit parks the calling task per-thread; nothing is dropped. Undersizing only produces "more, smaller batches", not lost messages — so it's safe to start at the default and tune up after observing real burst patterns in dispatch debug logs (batch_size=N distribution).

Implementation phases

Phase 1 — Mechanism (single PR)
  - New module: src/dispatch/mod.rs with Dispatcher + ThreadHandle + consumer_loop
  - Add MessageProcessingMode enum to config (default = PerMessage)
  - Hook discord.rs:600: branch on mode
      PerMessage → existing tokio::spawn { handle_message } path (unchanged)
      Batched    → dispatcher.submit(thread_key, thread_channel, adapter,
                                     BufferedMessage, other_bot_present)
    Slack hookup (slack.rs:909) is a deliberate follow-up — Discord covers
    the multi-bot validation target; Slack inherits unchanged per-message
    behavior in the meantime.
  - dispatch_batch packs multiple messages into one Vec<ContentBlock>
  - last_active touch on enqueue (fix cleanup_idle interaction)
  - SendError detection at submit: evict stale ThreadHandle + retry to re-spawn consumer (consumer-panic recovery)
  - Reactions strategy: trailing message anchors batch progress (see Decision #3)
  - Tests:
    - single-message turn (batched mode == per-message for batch_size=1)
    - 3-message fragmentation merges into one batch
    - new message arrives mid-turn, joins next batch
    - /cancel during batched turn does not drop buffer
    - last_active touched on enqueue (cleanup_idle does not evict)

Phase 2 — Validation
  - Roll out to a single channel (e.g. dev sandbox)
  - Compare turn counts, latency distributions, user-reported quality

Phase 3 — Default flip (separate RFC if needed)
  - Make `batched` the default mode
  - Or: remove the mode flag entirely if no real-world reason to keep `per-message`

Prior art

Two adjacent systems solve "user typed multiple times in quick succession" with different trade-offs. Both are personal AI agent runtimes (single-user, agent loop bundled into the gateway process) — different shape from openab's multi-tenant broker, but the in-flight buffering problem is the same.

Hermes Agent / OpenClaw / current openab / this RFC

Aspect Hermes Agent OpenClaw Current openab This RFC
Shape Single-user runtime, gateway = agent Single-user runtime Multi-tenant broker → external ACP CLI Same as current
First-message latency ~2s (Discord adapter debounce) n/a observed 0 (immediate dispatch) 0 (preserved)
Adapter-level batching _pending_text_batches, _text_batch_split_delay_seconds (default 2s) — gateway/platforms/discord.py n/a observed None None (deliberate)
In-flight new message Single-slot _pending_messages[key]overwrites prior + interrupt_event.set() cancels in-flight n/a observed N independent tokio::spawn tasks each park on per-thread mutex Send to per-thread bounded mpsc; consumer batches at turn boundary
Buffer data structure Dict[str, MessageEvent] (1 slot) None (mutex waker list, opaque) bounded mpsc::channel (FIFO, default cap 10)
3 fast messages → ACP turns 1 turn, middle message dropped by overwrite 3 turns, intermediate output wasted 2 turns (M1, then batch [M2, M3]) — no message lost
Mid-turn interrupt Yes (asyncio interrupt event) — agent loop is in-process No No
Same-thread message ordering (1-slot makes this moot) Approximate (Tokio Mutex not strictly FIFO) Strict (mpsc FIFO)
Per-key serialization asyncio.Event + _active_sessions dict stall-watchdog.ts (different concern: stall detection) KeyedAsyncQueue (per-key Semaphore, Slack) + Arc<Mutex<AcpConnection>> Inherited
Bot-message gating DISCORD_ALLOW_BOTS (off / mentions / all) n/a observed allow_bot_messages (3-value, borrowed from Hermes) Inherited
Stall UX feedback stall-watchdog.ts reactions.rs stall_soft / stall_hard (borrowed from OpenClaw) Inherited

Three trade-off axes

  1. Drop vs preserve. Hermes' single-slot overwrite drops middle messages in fast bursts; openab (current and RFC) preserves all.
  2. Interrupt vs wait for boundary. Hermes can interrupt the in-flight LLM call because the agent loop is in-process. openab cannot — the agent is an external ACP CLI that yields control only at turn end. This is an architectural constraint, not a design choice. The RFC turns it into a feature: the existing turn duration is the natural buffering window, with no added latency for isolated messages.
  3. Debounce vs piggyback. Hermes' Discord adapter pays ~2s per message regardless. The RFC pays 0 for isolated messages — buffering only fills during an active turn, when the user is already waiting on the agent.

Other prior art

HTTP request coalescing in proxies (Varnish, nginx) — same shape ("while one request is in flight, batch / dedupe others") in a different domain.


Appendix A: Reference implementation

This sketch matches the MVP shape in src/dispatch/mod.rs. A few signatures
carry extra parameters not present in earlier drafts of this RFC; the
rationale is inline below.

use std::time::Instant;
use tokio::sync::{mpsc, Mutex};

struct BufferedMessage {
    prompt: String,
    extra_blocks: Vec<ContentBlock>,
    sender_json: String,
    trigger_msg: MessageRef,
    arrived_at: Instant,
    /// Display name for inline batch labelling (the `<message from="...">` tag
    /// inside merged prompts). Not a stable user ID.
    sender_name: String,
}

struct ThreadHandle {
    tx: mpsc::Sender<BufferedMessage>,
    _consumer: tokio::task::JoinHandle<()>,
}

pub struct Dispatcher {
    per_thread: Mutex<HashMap<String, ThreadHandle>>,
    router: Arc<Router>,
    max_buffered_messages: usize,  // from config (default: 10)
}

impl Dispatcher {
    /// `adapter` and `other_bot_present` are passed per-call (rather than stored
    /// on the Dispatcher) because the Discord adapter is constructed inside
    /// serenity's `ready` callback via `OnceLock` — well after the Dispatcher
    /// itself is built in `main.rs`. Per-call passing avoids that chicken-and-
    /// egg without a second `OnceLock` here. In practice all submits for a
    /// given thread share the same adapter Arc; only the first one (which
    /// spawns the consumer) is captured for that thread's lifetime.
    pub async fn submit(
        &self,
        thread_key: String,
        thread_channel: ChannelRef,
        adapter: Arc<dyn ChatAdapter>,
        msg: BufferedMessage,
        other_bot_present: bool,
    ) {
        // Pull these out of `self` before locking so the or_insert_with closure
        // doesn't try to re-borrow `self` while `self.per_thread` is locked.
        let cap = self.max_buffered_messages;
        let router = Arc::clone(&self.router);

        let tx = {
            let mut map = self.per_thread.lock().await;
            map.entry(thread_key.clone())
                .or_insert_with(|| {
                    let (tx, rx) = mpsc::channel(cap);
                    let consumer = tokio::spawn(consumer_loop(
                        thread_key.clone(), thread_channel, rx, router,
                        adapter, cap, other_bot_present,
                    ));
                    ThreadHandle { tx, _consumer: consumer }
                })
                .tx
                .clone()
        };
        // dispatcher mutex released — held only to look up/insert the handle

        // send.await parks the calling task if the channel (cap = max_buffered_messages) is full.
        // Only this submit future blocks; the platform event loop is unaffected
        // because submit runs inside its own per-message tokio::spawn'd task.
        if tx.send(msg).await.is_err() {
            // Consumer has exited — see Tradeoffs row "Consumer task panics".
            // Phase 1 production code evicts the stale ThreadHandle and retries
            // so or_insert_with re-spawns a fresh consumer.
            warn!(thread_key, "consumer task has exited; message dropped");
        }
    }
}

async fn consumer_loop(
    thread_key: String,
    thread_channel: ChannelRef,
    mut rx: mpsc::Receiver<BufferedMessage>,
    router: Arc<Router>,
    adapter: Arc<dyn ChatAdapter>,
    max_batch: usize,
    other_bot_present: bool,
) {
    while let Some(first) = rx.recv().await {
        // Greedy drain up to max_batch messages already enqueued.
        let mut batch = vec![first];
        while batch.len() < max_batch {
            match rx.try_recv() {
                Ok(more) => batch.push(more),
                Err(_) => break,
            }
        }

        // Process batch as ONE ACP turn
        // - if batch.len() == 1: identical to today's single-message dispatch
        // - if batch.len() > 1:  merge into one combined Text block with
        //                        [Batched: N messages...] banner +
        //                        <message index="N" from="..."> tags
        //                        (see "Packing a batch into one ACP turn")
        dispatch_batch(&router, &adapter, &thread_channel, batch, other_bot_present).await;
    }
    // recv() returned None → all senders dropped → cleanup_idle evicted us. Exit cleanly.
}

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions