feat: replace BundlePoller polling with SSE streaming#269
feat: replace BundlePoller polling with SSE streaming#269Evalir wants to merge 4 commits intographite-base/269from
Conversation
|
Warning This pull request is not mergeable via GitHub because a downstack PR is open. Once all requirements are satisfied, merge this PR as a stack on Graphite.
This stack of pull requests is managed by Graphite. Learn more about stacking. |
b4f7193 to
8c472ae
Compare
Mirrors the TxPoller SSE change: subscribe to /bundles/feed via BuilderTxCache::subscribe_bundles for real-time delivery of new bundles, with an initial full_fetch on startup/block-env change and exponential backoff reconnect on error or stream end. Drops the 1s polling loop.
Mirrors PR #259's 9bc0ff4 split applied to BundlePoller. Restore check_bundle_cache as a private pure-fetch helper returning Result<Vec<CachedBundle>, _>, and rename full_fetch to fetch_and_forward — its name now matches what it does (fetch + forward to the outbound channel). Use let-else over the fetch result to drop a level of indentation.
8c472ae to
58c25a1
Compare
dylanlott
left a comment
There was a problem hiding this comment.
We should test these changes in Parmigiana asap before taking to mainnet.
| time::sleep(self.poll_duration()).await; | ||
| continue; | ||
| }; | ||
| async fn task_future(mut self, outbound: mpsc::UnboundedSender<CachedBundle>) { |
There was a problem hiding this comment.
should span this and pass the span to the lifecycle methods as necessary
There was a problem hiding this comment.
and as I think about it more, we probably want to add at least the block number to span's log fields.
There was a problem hiding this comment.
[Claude Code]
Done in 13b25a4 — task_future now carries an #[instrument] span with url and block_number fields. block_number is recorded on startup and re-recorded on every env-change branch so the field tracks the current block. The lifecycle methods (fetch_and_forward, subscribe, reconnect, handle_sse_item) inherit this span automatically via async context propagation, so I dropped the redundant per-method trace_span!s.
|
|
||
| // Check this here to avoid making the web request if we know | ||
| // we don't need the results. | ||
| if outbound.is_closed() { |
There was a problem hiding this comment.
This check is removed from this arm, but it's still called in the tx cache arm. Need to check outbound.is_closed somewhere in this arm. In its current state, the simulator could drop its receiver while the SSE is in the error or none branch and would repeat its reconnection loop forever.
There was a problem hiding this comment.
[Claude Code]
Done in 13b25a4 — added an if outbound.is_closed() { break; } guard at the top of task_future's select loop, plus an early outbound.is_closed() check at the top of reconnect's loop body. Together those break the forever-reconnect cycle: if the simulator drops its receiver while we're stuck in a stream error/None loop, the next iteration of either loop exits cleanly.
| BuilderTxCacheError::TxCache(TxCacheError::NotOurSlot) => { | ||
| trace!("Not our slot to subscribe to bundles"); | ||
| } | ||
| _ => warn!(%error, "Failed to open SSE bundle subscription"), |
There was a problem hiding this comment.
should add a metric for SSE failures in the subscribe loop to match the inc_tx_poll_errors() in the fetch and dispatch loop.
There was a problem hiding this comment.
[Claude Code]
Done in 13b25a4 — added inc_sse_subscribe_errors and called it from subscribe's inspect_err. Used a generic name (no bundle_ prefix) to match the existing inc_sse_reconnect_attempts pattern; tx.rs can adopt the same helper later without renaming.
| outbound: &mpsc::UnboundedSender<CachedBundle>, | ||
| backoff: &mut Duration, | ||
| ) -> SseStream { | ||
| tokio::select! { |
There was a problem hiding this comment.
should add a metric like inc_tx_sse_reconnects() as well
There was a problem hiding this comment.
[Claude Code]
Done in 13b25a4 — BundlePoller::reconnect now increments the existing inc_sse_reconnect_attempts counter once per attempt, matching how TxPoller::reconnect uses it. Also generalized the metric's description from "SSE transaction stream reconnect attempts" to "SSE stream reconnect attempts" since both pollers share it.
Fraser999
left a comment
There was a problem hiding this comment.
We could use actual integration tests for this stream and the tx stream (I think the existing one is more like a connectivity smoke test). Not blocking, but worth a follow-up ticket?
| debug!("Block env channel closed, shutting down"); | ||
| break; | ||
| } | ||
| trace!("Block env changed, refetching all bundles"); |
There was a problem hiding this comment.
Maybe worth changing to debug!? I guess be consistent with how the corresponding log line is handled for the tx stream (ref this comment).
There was a problem hiding this comment.
[Claude Code]
Done in 13b25a4 — bumped the env-change refetch log from trace! to debug!, matching the agreed change on tx.rs.
| *stream = self.reconnect(outbound, backoff).await; | ||
| } | ||
| None => { | ||
| warn!("SSE bundle stream ended, reconnecting"); |
There was a problem hiding this comment.
[Claude Code]
When subscribe() (line 98) fails to open the connection, it logs "Failed to open SSE bundle subscription" and returns stream::empty() as a sentinel. The first sse_stream.next() then yields None immediately, which lands here and emits "SSE bundle stream ended, reconnecting". Two warns per failed subscribe cycle, and the second one is a lie - the stream never opened.
Suggest having subscribe() return Option<SseStream> (.ok().map(...) instead of unwrap_or_else(empty)) and adding a None-branch in task_future/reconnect that just schedules a retry without going through handle_sse_item. That keeps the "stream ended" warn for the case it actually describes (a real stream that closed mid-flight) and avoids double-logging.
There was a problem hiding this comment.
[Claude Code]
Done in 13b25a4 — subscribe() now returns Option<SseStream>; on None, the caller (initial path in task_future or the Err/None arms of handle_sse_item) hands off to reconnect, which loops internally with backoff until either a real stream is established (Some) or the outbound channel closes (None, signalling the task to shut down). The misleading "stream ended, reconnecting" warn no longer fires after a failed subscribe — the only warn comes from subscribe's actual error path.
| async fn fetch_and_forward(&self, outbound: &mpsc::UnboundedSender<CachedBundle>) { | ||
| let span = trace_span!("BundlePoller::fetch_and_forward", url = %self.config.tx_pool_url); | ||
|
|
||
| crate::metrics::inc_bundle_poll_count(); |
There was a problem hiding this comment.
This metric (and other ones with "poll" in their names) should maybe be renamed as they don't really measure individual poll attempts any more.
There was a problem hiding this comment.
[Claude Code]
Acknowledged. Deferring this rename to its own PR — it cuts across BundlePoller and TxPoller metrics plus any dashboards/alerts, so I'd rather not mix it with the SSE work here. Will track as a follow-up.
Addresses dylanlott + Fraser999 comments on the BundlePoller SSE
PR:
- Wrap task_future with #[instrument], adding url + block_number
fields to the parent span; record block_number on every env
change so tracing reflects current state. (dylan)
- Guard the task_future select loop with outbound.is_closed() so a
dropped consumer breaks the loop instead of letting reconnect
retry forever. (dylan)
- Subscribe now returns Option<SseStream>; reconnect loops
internally with backoff until it gets a real stream or the
outbound closes (returning Option). Kills the misleading double
warn ("Failed to open SSE bundle subscription" + "stream
ended") on subscribe failure. (Fraser/Claude)
- Add inc_sse_subscribe_errors metric, called from subscribe's
error path. Generic name (not bundle-specific) since the existing
inc_sse_reconnect_attempts is also generic. (dylan)
- Increment inc_sse_reconnect_attempts in BundlePoller::reconnect,
matching tx.rs. (dylan)
- Bump "Block env changed" log from trace to debug, matching the
agreed change on tx.rs. (Fraser)
- Generalize SSE_RECONNECT_ATTEMPTS_HELP from "transaction stream"
to just "SSE stream" since the metric is shared.
Defers (per plan): renaming "poll" metrics to "fetch" — that's a
cross-cutting metrics rename that should ride in its own PR.
- Collapse the duplicated reconnect match arms in handle_sse_item into let-else, matching the project's terse Option/Result style. - Promote record_block_number from a free fn to a &self method, shrinking the call sites and using SimEnv::rollup_block_number() instead of inlining the rollup_env().number.to::<u64>() chain. - Drop a stray WHAT-comment on reconnect's loop tail.

Description
Mirrors #259 (tx-poller SSE) for bundles. Replaces the 1s polling loop in
BundlePollerwith an SSE subscription to/bundles/feedviaBuilderTxCache::subscribe_bundles(newly exposed by the SDK, matched by the tx-pool-webservice endpoint).The structure matches
TxPollerexactly:full_fetchpaginates throughstream_bundlesto seed the cache on startup.subscribeopens the SSE stream and yieldsCachedBundles in real time.NotOurSlotis logged at trace level (expected when the builder is not slot-permissioned) in both the full-fetch and subscribe paths — avoids spurious warn-level noise.The public
check_bundle_cache()wrapper is dropped; the integration test now usesBuilderTxCache::stream_bundlesdirectly, matching the style oftx_poller_test.rs.Related Issue
Stacked on #259.
Testing
make fmtpassesmake clippypassesmake testpassescargo doc --no-depspasses with-D warningscargo test --features test-utils --no-runbuilds all integration tests