Skip to content

feat: replace BundlePoller polling with SSE streaming#269

Open
Evalir wants to merge 4 commits intographite-base/269from
evalir/eop/sse-bundle-poller
Open

feat: replace BundlePoller polling with SSE streaming#269
Evalir wants to merge 4 commits intographite-base/269from
evalir/eop/sse-bundle-poller

Conversation

@Evalir
Copy link
Copy Markdown
Member

@Evalir Evalir commented Apr 23, 2026

Description

Mirrors #259 (tx-poller SSE) for bundles. Replaces the 1s polling loop in BundlePoller with an SSE subscription to /bundles/feed via BuilderTxCache::subscribe_bundles (newly exposed by the SDK, matched by the tx-pool-webservice endpoint).

The structure matches TxPoller exactly:

  • Initial full_fetch paginates through stream_bundles to seed the cache on startup.
  • subscribe opens the SSE stream and yields CachedBundles in real time.
  • On stream error / stream end, reconnects with exponential backoff (1s → 30s cap), racing the sleep against the block-env watcher so an env change wins and triggers a fresh full refetch immediately.
  • On block-env change, the full_fetch runs again to cover anything missed during disconnection.
  • NotOurSlot is logged at trace level (expected when the builder is not slot-permissioned) in both the full-fetch and subscribe paths — avoids spurious warn-level noise.

The public check_bundle_cache() wrapper is dropped; the integration test now uses BuilderTxCache::stream_bundles directly, matching the style of tx_poller_test.rs.

Related Issue

Stacked on #259.

Testing

  • make fmt passes
  • make clippy passes
  • make test passes
  • cargo doc --no-deps passes with -D warnings
  • cargo test --features test-utils --no-run builds all integration tests
  • Exercise against a live tx-pool-webservice to verify SSE reconnect and env-driven refetch behavior end-to-end

Copy link
Copy Markdown
Member Author

Evalir commented Apr 23, 2026

Warning

This pull request is not mergeable via GitHub because a downstack PR is open. Once all requirements are satisfied, merge this PR as a stack on Graphite.
Learn more

This stack of pull requests is managed by Graphite. Learn more about stacking.

@Evalir Evalir force-pushed the evalir/eop/sse-bundle-poller branch from b4f7193 to 8c472ae Compare April 23, 2026 21:45
Mirrors the TxPoller SSE change: subscribe to /bundles/feed via
BuilderTxCache::subscribe_bundles for real-time delivery of new bundles,
with an initial full_fetch on startup/block-env change and exponential
backoff reconnect on error or stream end. Drops the 1s polling loop.
@Evalir Evalir changed the base branch from evalir/eop/sse-tx-poller to graphite-base/269 April 27, 2026 11:33
@Evalir Evalir marked this pull request as ready for review April 27, 2026 11:34
Mirrors PR #259's 9bc0ff4 split applied to BundlePoller. Restore
check_bundle_cache as a private pure-fetch helper returning
Result<Vec<CachedBundle>, _>, and rename full_fetch to
fetch_and_forward — its name now matches what it does (fetch +
forward to the outbound channel). Use let-else over the fetch
result to drop a level of indentation.
@Evalir Evalir force-pushed the evalir/eop/sse-bundle-poller branch from 8c472ae to 58c25a1 Compare April 27, 2026 11:38
@Evalir Evalir changed the base branch from graphite-base/269 to evalir/eop/sse-tx-poller April 27, 2026 11:39
Copy link
Copy Markdown
Contributor

@dylanlott dylanlott left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should test these changes in Parmigiana asap before taking to mainnet.

Comment thread src/tasks/cache/bundle.rs
time::sleep(self.poll_duration()).await;
continue;
};
async fn task_future(mut self, outbound: mpsc::UnboundedSender<CachedBundle>) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should span this and pass the span to the lifecycle methods as necessary

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and as I think about it more, we probably want to add at least the block number to span's log fields.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Claude Code]

Done in 13b25a4task_future now carries an #[instrument] span with url and block_number fields. block_number is recorded on startup and re-recorded on every env-change branch so the field tracks the current block. The lifecycle methods (fetch_and_forward, subscribe, reconnect, handle_sse_item) inherit this span automatically via async context propagation, so I dropped the redundant per-method trace_span!s.

Comment thread src/tasks/cache/bundle.rs

// Check this here to avoid making the web request if we know
// we don't need the results.
if outbound.is_closed() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check is removed from this arm, but it's still called in the tx cache arm. Need to check outbound.is_closed somewhere in this arm. In its current state, the simulator could drop its receiver while the SSE is in the error or none branch and would repeat its reconnection loop forever.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Claude Code]

Done in 13b25a4 — added an if outbound.is_closed() { break; } guard at the top of task_future's select loop, plus an early outbound.is_closed() check at the top of reconnect's loop body. Together those break the forever-reconnect cycle: if the simulator drops its receiver while we're stuck in a stream error/None loop, the next iteration of either loop exits cleanly.

Comment thread src/tasks/cache/bundle.rs Outdated
BuilderTxCacheError::TxCache(TxCacheError::NotOurSlot) => {
trace!("Not our slot to subscribe to bundles");
}
_ => warn!(%error, "Failed to open SSE bundle subscription"),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should add a metric for SSE failures in the subscribe loop to match the inc_tx_poll_errors() in the fetch and dispatch loop.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Claude Code]

Done in 13b25a4 — added inc_sse_subscribe_errors and called it from subscribe's inspect_err. Used a generic name (no bundle_ prefix) to match the existing inc_sse_reconnect_attempts pattern; tx.rs can adopt the same helper later without renaming.

Comment thread src/tasks/cache/bundle.rs Outdated
outbound: &mpsc::UnboundedSender<CachedBundle>,
backoff: &mut Duration,
) -> SseStream {
tokio::select! {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should add a metric like inc_tx_sse_reconnects() as well

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Claude Code]

Done in 13b25a4BundlePoller::reconnect now increments the existing inc_sse_reconnect_attempts counter once per attempt, matching how TxPoller::reconnect uses it. Also generalized the metric's description from "SSE transaction stream reconnect attempts" to "SSE stream reconnect attempts" since both pollers share it.

Copy link
Copy Markdown
Contributor

@Fraser999 Fraser999 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could use actual integration tests for this stream and the tx stream (I think the existing one is more like a connectivity smoke test). Not blocking, but worth a follow-up ticket?

Comment thread src/tasks/cache/bundle.rs Outdated
debug!("Block env channel closed, shutting down");
break;
}
trace!("Block env changed, refetching all bundles");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe worth changing to debug!? I guess be consistent with how the corresponding log line is handled for the tx stream (ref this comment).

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Claude Code]

Done in 13b25a4 — bumped the env-change refetch log from trace! to debug!, matching the agreed change on tx.rs.

Comment thread src/tasks/cache/bundle.rs
*stream = self.reconnect(outbound, backoff).await;
}
None => {
warn!("SSE bundle stream ended, reconnecting");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Claude Code]

When subscribe() (line 98) fails to open the connection, it logs "Failed to open SSE bundle subscription" and returns stream::empty() as a sentinel. The first sse_stream.next() then yields None immediately, which lands here and emits "SSE bundle stream ended, reconnecting". Two warns per failed subscribe cycle, and the second one is a lie - the stream never opened.

Suggest having subscribe() return Option<SseStream> (.ok().map(...) instead of unwrap_or_else(empty)) and adding a None-branch in task_future/reconnect that just schedules a retry without going through handle_sse_item. That keeps the "stream ended" warn for the case it actually describes (a real stream that closed mid-flight) and avoids double-logging.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Claude Code]

Done in 13b25a4subscribe() now returns Option<SseStream>; on None, the caller (initial path in task_future or the Err/None arms of handle_sse_item) hands off to reconnect, which loops internally with backoff until either a real stream is established (Some) or the outbound channel closes (None, signalling the task to shut down). The misleading "stream ended, reconnecting" warn no longer fires after a failed subscribe — the only warn comes from subscribe's actual error path.

Comment thread src/tasks/cache/bundle.rs
async fn fetch_and_forward(&self, outbound: &mpsc::UnboundedSender<CachedBundle>) {
let span = trace_span!("BundlePoller::fetch_and_forward", url = %self.config.tx_pool_url);

crate::metrics::inc_bundle_poll_count();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This metric (and other ones with "poll" in their names) should maybe be renamed as they don't really measure individual poll attempts any more.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Claude Code]

Acknowledged. Deferring this rename to its own PR — it cuts across BundlePoller and TxPoller metrics plus any dashboards/alerts, so I'd rather not mix it with the SSE work here. Will track as a follow-up.

@Evalir Evalir changed the base branch from evalir/eop/sse-tx-poller to graphite-base/269 April 28, 2026 15:54
Addresses dylanlott + Fraser999 comments on the BundlePoller SSE
PR:

- Wrap task_future with #[instrument], adding url + block_number
  fields to the parent span; record block_number on every env
  change so tracing reflects current state. (dylan)
- Guard the task_future select loop with outbound.is_closed() so a
  dropped consumer breaks the loop instead of letting reconnect
  retry forever. (dylan)
- Subscribe now returns Option<SseStream>; reconnect loops
  internally with backoff until it gets a real stream or the
  outbound closes (returning Option). Kills the misleading double
  warn ("Failed to open SSE bundle subscription" + "stream
  ended") on subscribe failure. (Fraser/Claude)
- Add inc_sse_subscribe_errors metric, called from subscribe's
  error path. Generic name (not bundle-specific) since the existing
  inc_sse_reconnect_attempts is also generic. (dylan)
- Increment inc_sse_reconnect_attempts in BundlePoller::reconnect,
  matching tx.rs. (dylan)
- Bump "Block env changed" log from trace to debug, matching the
  agreed change on tx.rs. (Fraser)
- Generalize SSE_RECONNECT_ATTEMPTS_HELP from "transaction stream"
  to just "SSE stream" since the metric is shared.

Defers (per plan): renaming "poll" metrics to "fetch" — that's a
cross-cutting metrics rename that should ride in its own PR.
@Evalir Evalir requested review from Fraser999 and dylanlott April 29, 2026 09:54
- Collapse the duplicated reconnect match arms in handle_sse_item
  into let-else, matching the project's terse Option/Result style.
- Promote record_block_number from a free fn to a &self method,
  shrinking the call sites and using SimEnv::rollup_block_number()
  instead of inlining the rollup_env().number.to::<u64>() chain.
- Drop a stray WHAT-comment on reconnect's loop tail.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants