From 36ebe82a427d23f70b2b8142e6efc5bcae4762e3 Mon Sep 17 00:00:00 2001 From: evalir Date: Thu, 23 Apr 2026 23:27:05 +0200 Subject: [PATCH 1/5] feat: replace BundlePoller polling with SSE streaming Mirrors the TxPoller SSE change: subscribe to /bundles/feed via BuilderTxCache::subscribe_bundles for real-time delivery of new bundles, with an initial full_fetch on startup/block-env change and exponential backoff reconnect on error or stream end. Drops the 1s polling loop. --- src/tasks/cache/bundle.rs | 207 +++++++++++++++++++++++------------- src/tasks/cache/system.rs | 2 +- tests/bundle_poller_test.rs | 7 +- 3 files changed, 141 insertions(+), 75 deletions(-) diff --git a/src/tasks/cache/bundle.rs b/src/tasks/cache/bundle.rs index a45fff03..470b3964 100644 --- a/src/tasks/cache/bundle.rs +++ b/src/tasks/cache/bundle.rs @@ -1,113 +1,176 @@ //! Bundler service responsible for fetching bundles and sending them to the simulator. -use crate::config::BuilderConfig; -use futures_util::{TryFutureExt, TryStreamExt}; +use crate::{config::BuilderConfig, tasks::env::SimEnv}; +use futures_util::{Stream, StreamExt, TryFutureExt, TryStreamExt}; use init4_bin_base::perms::tx_cache::{BuilderTxCache, BuilderTxCacheError}; use signet_tx_cache::{TxCacheError, types::CachedBundle}; +use std::{ops::ControlFlow, pin::Pin, time::Duration}; use tokio::{ - sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel}, + sync::{mpsc, watch}, task::JoinHandle, - time::{self, Duration}, + time, }; use tracing::{Instrument, debug, trace, trace_span, warn}; -/// Poll interval for the bundle poller in milliseconds. -const POLL_INTERVAL_MS: u64 = 1000; +type SseStream = Pin> + Send>>; -/// The BundlePoller polls the tx-pool for bundles. +const INITIAL_RECONNECT_BACKOFF: Duration = Duration::from_secs(1); +const MAX_RECONNECT_BACKOFF: Duration = Duration::from_secs(30); + +/// The BundlePoller fetches bundles from the tx-pool on startup and on each +/// block environment change, and subscribes to an SSE stream for real-time +/// delivery of new bundles in between. #[derive(Debug)] pub struct BundlePoller { /// The builder configuration values. config: &'static BuilderConfig, - /// Client for the tx cache. tx_cache: BuilderTxCache, - - /// Defines the interval at which the bundler polls the tx-pool for bundles. - poll_interval_ms: u64, -} - -impl Default for BundlePoller { - fn default() -> Self { - Self::new() - } + /// Receiver for block environment updates, used to trigger refetches. + envs: watch::Receiver>, } -/// Implements a poller for the block builder to pull bundles from the tx-pool. impl BundlePoller { - /// Creates a new BundlePoller from the provided builder config. - pub fn new() -> Self { - Self::new_with_poll_interval_ms(POLL_INTERVAL_MS) - } - - /// Creates a new BundlePoller from the provided builder config and with the specified poll interval in ms. - pub fn new_with_poll_interval_ms(poll_interval_ms: u64) -> Self { + /// Returns a new [`BundlePoller`] with the given block environment receiver. + pub fn new(envs: watch::Receiver>) -> Self { let config = crate::config(); let tx_cache = BuilderTxCache::new(config.tx_pool_url.clone(), config.oauth_token()); - Self { config, tx_cache, poll_interval_ms } + Self { config, tx_cache, envs } } - /// Returns the poll duration as a [`Duration`]. - const fn poll_duration(&self) -> Duration { - Duration::from_millis(self.poll_interval_ms) + async fn full_fetch(&self, outbound: &mpsc::UnboundedSender) { + let span = trace_span!("BundlePoller::full_fetch", url = %self.config.tx_pool_url); + + crate::metrics::inc_bundle_poll_count(); + if let Ok(bundles) = self + .tx_cache + .stream_bundles() + .try_collect::>() + // NotOurSlot is expected whenever the builder isn't slot-permissioned; + // don't bump the error counter or warn. + .inspect_err(|error| match error { + BuilderTxCacheError::TxCache(TxCacheError::NotOurSlot) => { + trace!("Not our slot to fetch bundles"); + } + _ => { + crate::metrics::inc_bundle_poll_errors(); + warn!(%error, "Failed to fetch bundles from tx-cache"); + } + }) + .instrument(span.clone()) + .await + { + let _guard = span.entered(); + crate::metrics::record_bundles_fetched(bundles.len()); + trace!(count = bundles.len(), "found bundles"); + for bundle in bundles { + if outbound.send(bundle).is_err() { + debug!("Outbound channel closed during full fetch"); + return; + } + } + } } - /// Fetches all bundles from the tx-cache, paginating through all available pages. - pub async fn check_bundle_cache(&self) -> Result, BuilderTxCacheError> { - self.tx_cache.stream_bundles().try_collect().await + /// Returns an empty stream on connection failure so the caller can handle + /// reconnection uniformly. + async fn subscribe(&self) -> SseStream { + self.tx_cache + .subscribe_bundles() + .await + .inspect( + |_| debug!(url = %self.config.tx_pool_url, "SSE bundle subscription established"), + ) + .inspect_err(|error| match error { + BuilderTxCacheError::TxCache(TxCacheError::NotOurSlot) => { + trace!("Not our slot to subscribe to bundles"); + } + _ => warn!(%error, "Failed to open SSE bundle subscription"), + }) + .map(|s| Box::pin(s) as SseStream) + .unwrap_or_else(|_| Box::pin(futures_util::stream::empty())) } - async fn task_future(self, outbound: UnboundedSender) { - loop { - let span = trace_span!("BundlePoller::loop", url = %self.config.tx_pool_url); + /// Runs a full refetch concurrently with re-subscribing, to cover any + /// items missed while disconnected. + async fn reconnect( + &mut self, + outbound: &mpsc::UnboundedSender, + backoff: &mut Duration, + ) -> SseStream { + tokio::select! { + // Biased: a block env change wins over the backoff sleep. An env + // change triggers a full refetch below anyway, which supersedes the + // sleep-then-reconnect path — so there's no point waiting out the + // backoff. + biased; + _ = self.envs.changed() => {} + _ = time::sleep(*backoff) => {} + } + *backoff = (*backoff * 2).min(MAX_RECONNECT_BACKOFF); + let (_, stream) = tokio::join!(self.full_fetch(outbound), self.subscribe()); + stream + } - // Check this here to avoid making the web request if we know - // we don't need the results. - if outbound.is_closed() { - span.in_scope(|| trace!("No receivers left, shutting down")); - break; + /// Returns `Break` when the outbound channel has closed and the task + /// should shut down. + async fn handle_sse_item( + &mut self, + item: Option>, + outbound: &mpsc::UnboundedSender, + backoff: &mut Duration, + stream: &mut SseStream, + ) -> ControlFlow<()> { + match item { + Some(Ok(bundle)) => { + *backoff = INITIAL_RECONNECT_BACKOFF; + if outbound.send(bundle).is_err() { + trace!("No receivers left, shutting down"); + return ControlFlow::Break(()); + } + } + Some(Err(error)) => { + warn!(%error, "SSE bundle stream interrupted, reconnecting"); + *stream = self.reconnect(outbound, backoff).await; + } + None => { + warn!("SSE bundle stream ended, reconnecting"); + *stream = self.reconnect(outbound, backoff).await; } + } + ControlFlow::Continue(()) + } - crate::metrics::inc_bundle_poll_count(); - let Ok(bundles) = self - .check_bundle_cache() - .inspect_err(|error| match error { - BuilderTxCacheError::TxCache(TxCacheError::NotOurSlot) => { - trace!("Not our slot to fetch bundles"); - } - _ => { - crate::metrics::inc_bundle_poll_errors(); - warn!(%error, "Failed to fetch bundles from tx-cache"); - } - }) - .instrument(span.clone()) - .await - else { - time::sleep(self.poll_duration()).await; - continue; - }; + async fn task_future(mut self, outbound: mpsc::UnboundedSender) { + let (_, mut sse_stream) = tokio::join!(self.full_fetch(&outbound), self.subscribe()); + let mut backoff = INITIAL_RECONNECT_BACKOFF; - { - let _guard = span.entered(); - crate::metrics::record_bundles_fetched(bundles.len()); - trace!(count = bundles.len(), "fetched bundles from tx-cache"); - for bundle in bundles { - if let Err(err) = outbound.send(bundle) { - debug!(?err, "Failed to send bundle - channel is dropped"); + loop { + tokio::select! { + item = sse_stream.next() => { + if self + .handle_sse_item(item, &outbound, &mut backoff, &mut sse_stream) + .await + .is_break() + { + break; + } + } + res = self.envs.changed() => { + if res.is_err() { + debug!("Block env channel closed, shutting down"); break; } + trace!("Block env changed, refetching all bundles"); + self.full_fetch(&outbound).await; } } - - time::sleep(self.poll_duration()).await; } } - /// Spawns a task that sends bundles it finds to its channel sender. - pub fn spawn(self) -> (UnboundedReceiver, JoinHandle<()>) { - let (outbound, inbound) = unbounded_channel(); - + /// Spawns the task future and returns a receiver for bundles it finds. + pub fn spawn(self) -> (mpsc::UnboundedReceiver, JoinHandle<()>) { + let (outbound, inbound) = mpsc::unbounded_channel(); let jh = tokio::spawn(self.task_future(outbound)); - (inbound, jh) } } diff --git a/src/tasks/cache/system.rs b/src/tasks/cache/system.rs index 89698a4d..26a1a5b6 100644 --- a/src/tasks/cache/system.rs +++ b/src/tasks/cache/system.rs @@ -27,7 +27,7 @@ impl CacheTasks { let (tx_receiver, tx_poller) = tx_poller.spawn(); // Bundle Poller pulls bundles from the cache - let bundle_poller = BundlePoller::new(); + let bundle_poller = BundlePoller::new(self.block_env.clone()); let (bundle_receiver, bundle_poller) = bundle_poller.spawn(); // Set up the cache task diff --git a/tests/bundle_poller_test.rs b/tests/bundle_poller_test.rs index 9acfe483..1dc7c3c4 100644 --- a/tests/bundle_poller_test.rs +++ b/tests/bundle_poller_test.rs @@ -2,15 +2,18 @@ use builder::test_utils::{setup_logging, setup_test_config}; use eyre::Result; +use futures_util::TryStreamExt; +use init4_bin_base::perms::tx_cache::BuilderTxCache; #[tokio::test] async fn test_bundle_poller_roundtrip() -> Result<()> { setup_logging(); setup_test_config(); - let bundle_poller = builder::tasks::cache::BundlePoller::new(); + let config = builder::config(); + let tx_cache = BuilderTxCache::new(config.tx_pool_url.clone(), config.oauth_token()); - let _ = bundle_poller.check_bundle_cache().await?; + let _bundles: Vec<_> = tx_cache.stream_bundles().try_collect().await?; Ok(()) } From 58c25a14d02d9bd9bd20e9a1dad98495ae5c1837 Mon Sep 17 00:00:00 2001 From: evalir Date: Mon, 27 Apr 2026 13:38:33 +0200 Subject: [PATCH 2/5] refactor: split fetch from forward in BundlePoller MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Mirrors PR #259's 9bc0ff4 split applied to BundlePoller. Restore check_bundle_cache as a private pure-fetch helper returning Result, _>, and rename full_fetch to fetch_and_forward — its name now matches what it does (fetch + forward to the outbound channel). Use let-else over the fetch result to drop a level of indentation. --- src/tasks/cache/bundle.rs | 48 +++++++++++++++++++++++---------------- 1 file changed, 28 insertions(+), 20 deletions(-) diff --git a/src/tasks/cache/bundle.rs b/src/tasks/cache/bundle.rs index 470b3964..050288dd 100644 --- a/src/tasks/cache/bundle.rs +++ b/src/tasks/cache/bundle.rs @@ -37,16 +37,22 @@ impl BundlePoller { Self { config, tx_cache, envs } } - async fn full_fetch(&self, outbound: &mpsc::UnboundedSender) { - let span = trace_span!("BundlePoller::full_fetch", url = %self.config.tx_pool_url); + /// Pulls every bundle currently in the cache, paginating until the stream + /// is exhausted. Pure fetch — no metrics, no forwarding. + async fn check_bundle_cache(&self) -> Result, BuilderTxCacheError> { + self.tx_cache.stream_bundles().try_collect().await + } + + /// Fetches all bundles from the cache and forwards each to the outbound + /// channel. Records poll metrics around the fetch. + async fn fetch_and_forward(&self, outbound: &mpsc::UnboundedSender) { + let span = trace_span!("BundlePoller::fetch_and_forward", url = %self.config.tx_pool_url); crate::metrics::inc_bundle_poll_count(); - if let Ok(bundles) = self - .tx_cache - .stream_bundles() - .try_collect::>() - // NotOurSlot is expected whenever the builder isn't slot-permissioned; - // don't bump the error counter or warn. + // NotOurSlot is expected whenever the builder isn't slot-permissioned; + // don't bump the error counter or warn. + let Ok(bundles) = self + .check_bundle_cache() .inspect_err(|error| match error { BuilderTxCacheError::TxCache(TxCacheError::NotOurSlot) => { trace!("Not our slot to fetch bundles"); @@ -58,15 +64,17 @@ impl BundlePoller { }) .instrument(span.clone()) .await - { - let _guard = span.entered(); - crate::metrics::record_bundles_fetched(bundles.len()); - trace!(count = bundles.len(), "found bundles"); - for bundle in bundles { - if outbound.send(bundle).is_err() { - debug!("Outbound channel closed during full fetch"); - return; - } + else { + return; + }; + + let _guard = span.entered(); + crate::metrics::record_bundles_fetched(bundles.len()); + trace!(count = bundles.len(), "found bundles"); + for bundle in bundles { + if outbound.send(bundle).is_err() { + debug!("Outbound channel closed, dropping remaining bundles"); + return; } } } @@ -107,7 +115,7 @@ impl BundlePoller { _ = time::sleep(*backoff) => {} } *backoff = (*backoff * 2).min(MAX_RECONNECT_BACKOFF); - let (_, stream) = tokio::join!(self.full_fetch(outbound), self.subscribe()); + let (_, stream) = tokio::join!(self.fetch_and_forward(outbound), self.subscribe()); stream } @@ -141,7 +149,7 @@ impl BundlePoller { } async fn task_future(mut self, outbound: mpsc::UnboundedSender) { - let (_, mut sse_stream) = tokio::join!(self.full_fetch(&outbound), self.subscribe()); + let (_, mut sse_stream) = tokio::join!(self.fetch_and_forward(&outbound), self.subscribe()); let mut backoff = INITIAL_RECONNECT_BACKOFF; loop { @@ -161,7 +169,7 @@ impl BundlePoller { break; } trace!("Block env changed, refetching all bundles"); - self.full_fetch(&outbound).await; + self.fetch_and_forward(&outbound).await; } } } From 13b25a4af850047e552be8f6375d2c7cbdd4a154 Mon Sep 17 00:00:00 2001 From: evalir Date: Wed, 29 Apr 2026 11:49:34 +0200 Subject: [PATCH 3/5] refactor: address PR #269 review feedback MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Addresses dylanlott + Fraser999 comments on the BundlePoller SSE PR: - Wrap task_future with #[instrument], adding url + block_number fields to the parent span; record block_number on every env change so tracing reflects current state. (dylan) - Guard the task_future select loop with outbound.is_closed() so a dropped consumer breaks the loop instead of letting reconnect retry forever. (dylan) - Subscribe now returns Option; reconnect loops internally with backoff until it gets a real stream or the outbound closes (returning Option). Kills the misleading double warn ("Failed to open SSE bundle subscription" + "stream ended") on subscribe failure. (Fraser/Claude) - Add inc_sse_subscribe_errors metric, called from subscribe's error path. Generic name (not bundle-specific) since the existing inc_sse_reconnect_attempts is also generic. (dylan) - Increment inc_sse_reconnect_attempts in BundlePoller::reconnect, matching tx.rs. (dylan) - Bump "Block env changed" log from trace to debug, matching the agreed change on tx.rs. (Fraser) - Generalize SSE_RECONNECT_ATTEMPTS_HELP from "transaction stream" to just "SSE stream" since the metric is shared. Defers (per plan): renaming "poll" metrics to "fetch" — that's a cross-cutting metrics rename that should ride in its own PR. --- src/metrics.rs | 18 +++++++ src/tasks/cache/bundle.rs | 104 ++++++++++++++++++++++++++------------ 2 files changed, 91 insertions(+), 31 deletions(-) diff --git a/src/metrics.rs b/src/metrics.rs index 14c57dbd..bf96c4c4 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -33,6 +33,12 @@ const TX_POLL_ERRORS_HELP: &str = "Transaction cache poll errors."; const TXS_FETCHED: &str = "signet.builder.cache.txs_fetched"; const TXS_FETCHED_HELP: &str = "Transactions fetched per poll cycle."; +const SSE_RECONNECT_ATTEMPTS: &str = "signet.builder.cache.sse_reconnect_attempts"; +const SSE_RECONNECT_ATTEMPTS_HELP: &str = "SSE stream reconnect attempts."; + +const SSE_SUBSCRIBE_ERRORS: &str = "signet.builder.cache.sse_subscribe_errors"; +const SSE_SUBSCRIBE_ERRORS_HELP: &str = "SSE stream subscription failures."; + const BUNDLE_POLL_COUNT: &str = "signet.builder.cache.bundle_poll_count"; const BUNDLE_POLL_COUNT_HELP: &str = "Bundle cache poll attempts."; @@ -148,6 +154,8 @@ static DESCRIPTIONS: LazyLock<()> = LazyLock::new(|| { describe_counter!(TX_POLL_COUNT, TX_POLL_COUNT_HELP); describe_counter!(TX_POLL_ERRORS, TX_POLL_ERRORS_HELP); describe_histogram!(TXS_FETCHED, TXS_FETCHED_HELP); + describe_counter!(SSE_RECONNECT_ATTEMPTS, SSE_RECONNECT_ATTEMPTS_HELP); + describe_counter!(SSE_SUBSCRIBE_ERRORS, SSE_SUBSCRIBE_ERRORS_HELP); describe_counter!(BUNDLE_POLL_COUNT, BUNDLE_POLL_COUNT_HELP); describe_counter!(BUNDLE_POLL_ERRORS, BUNDLE_POLL_ERRORS_HELP); describe_histogram!(BUNDLES_FETCHED, BUNDLES_FETCHED_HELP); @@ -234,6 +242,16 @@ pub(crate) fn record_txs_fetched(count: usize) { histogram!(TXS_FETCHED).record(count as f64); } +/// Increment the SSE reconnect attempts counter. +pub(crate) fn inc_sse_reconnect_attempts() { + counter!(SSE_RECONNECT_ATTEMPTS).increment(1); +} + +/// Increment the SSE subscribe error counter. +pub(crate) fn inc_sse_subscribe_errors() { + counter!(SSE_SUBSCRIBE_ERRORS).increment(1); +} + /// Increment the bundle poll attempt counter. pub(crate) fn inc_bundle_poll_count() { counter!(BUNDLE_POLL_COUNT).increment(1); diff --git a/src/tasks/cache/bundle.rs b/src/tasks/cache/bundle.rs index 050288dd..6880f321 100644 --- a/src/tasks/cache/bundle.rs +++ b/src/tasks/cache/bundle.rs @@ -9,7 +9,7 @@ use tokio::{ task::JoinHandle, time, }; -use tracing::{Instrument, debug, trace, trace_span, warn}; +use tracing::{Span, debug, instrument, trace, warn}; type SseStream = Pin> + Send>>; @@ -46,8 +46,6 @@ impl BundlePoller { /// Fetches all bundles from the cache and forwards each to the outbound /// channel. Records poll metrics around the fetch. async fn fetch_and_forward(&self, outbound: &mpsc::UnboundedSender) { - let span = trace_span!("BundlePoller::fetch_and_forward", url = %self.config.tx_pool_url); - crate::metrics::inc_bundle_poll_count(); // NotOurSlot is expected whenever the builder isn't slot-permissioned; // don't bump the error counter or warn. @@ -62,13 +60,11 @@ impl BundlePoller { warn!(%error, "Failed to fetch bundles from tx-cache"); } }) - .instrument(span.clone()) .await else { return; }; - let _guard = span.entered(); crate::metrics::record_bundles_fetched(bundles.len()); trace!(count = bundles.len(), "found bundles"); for bundle in bundles { @@ -79,44 +75,60 @@ impl BundlePoller { } } - /// Returns an empty stream on connection failure so the caller can handle - /// reconnection uniformly. - async fn subscribe(&self) -> SseStream { + /// Returns `None` on connection failure; the caller is responsible for + /// scheduling a retry. Avoids the empty-stream sentinel pattern that + /// would double-log "stream ended" on a failure that never opened. + async fn subscribe(&self) -> Option { self.tx_cache .subscribe_bundles() .await .inspect( |_| debug!(url = %self.config.tx_pool_url, "SSE bundle subscription established"), ) - .inspect_err(|error| match error { - BuilderTxCacheError::TxCache(TxCacheError::NotOurSlot) => { - trace!("Not our slot to subscribe to bundles"); + .inspect_err(|error| { + crate::metrics::inc_sse_subscribe_errors(); + match error { + BuilderTxCacheError::TxCache(TxCacheError::NotOurSlot) => { + trace!("Not our slot to subscribe to bundles"); + } + _ => warn!(%error, "Failed to open SSE bundle subscription"), } - _ => warn!(%error, "Failed to open SSE bundle subscription"), }) + .ok() .map(|s| Box::pin(s) as SseStream) - .unwrap_or_else(|_| Box::pin(futures_util::stream::empty())) } - /// Runs a full refetch concurrently with re-subscribing, to cover any - /// items missed while disconnected. + /// Loops with exponential backoff until either a fresh SSE stream is + /// established (returned as `Some`) or the outbound channel is closed + /// (returned as `None`, signalling the task should shut down). Runs a + /// full refetch alongside each subscribe attempt to cover items missed + /// while disconnected. async fn reconnect( &mut self, outbound: &mpsc::UnboundedSender, backoff: &mut Duration, - ) -> SseStream { - tokio::select! { - // Biased: a block env change wins over the backoff sleep. An env - // change triggers a full refetch below anyway, which supersedes the - // sleep-then-reconnect path — so there's no point waiting out the - // backoff. - biased; - _ = self.envs.changed() => {} - _ = time::sleep(*backoff) => {} + ) -> Option { + loop { + if outbound.is_closed() { + return None; + } + crate::metrics::inc_sse_reconnect_attempts(); + tokio::select! { + // Biased: a block env change wins over the backoff sleep. An env + // change triggers a full refetch below anyway, which supersedes the + // sleep-then-reconnect path — so there's no point waiting out the + // backoff. + biased; + _ = self.envs.changed() => {} + _ = time::sleep(*backoff) => {} + } + *backoff = (*backoff * 2).min(MAX_RECONNECT_BACKOFF); + let (_, stream) = tokio::join!(self.fetch_and_forward(outbound), self.subscribe()); + if let Some(stream) = stream { + return Some(stream); + } + // subscribe failed; loop with longer backoff (no extra warn). } - *backoff = (*backoff * 2).min(MAX_RECONNECT_BACKOFF); - let (_, stream) = tokio::join!(self.fetch_and_forward(outbound), self.subscribe()); - stream } /// Returns `Break` when the outbound channel has closed and the task @@ -138,21 +150,44 @@ impl BundlePoller { } Some(Err(error)) => { warn!(%error, "SSE bundle stream interrupted, reconnecting"); - *stream = self.reconnect(outbound, backoff).await; + match self.reconnect(outbound, backoff).await { + Some(s) => *stream = s, + None => return ControlFlow::Break(()), + } } None => { warn!("SSE bundle stream ended, reconnecting"); - *stream = self.reconnect(outbound, backoff).await; + match self.reconnect(outbound, backoff).await { + Some(s) => *stream = s, + None => return ControlFlow::Break(()), + } } } ControlFlow::Continue(()) } + #[instrument( + skip_all, + fields(url = %self.config.tx_pool_url, block_number = tracing::field::Empty), + )] async fn task_future(mut self, outbound: mpsc::UnboundedSender) { - let (_, mut sse_stream) = tokio::join!(self.fetch_and_forward(&outbound), self.subscribe()); + record_block_number(&self.envs); + + let (_, sub) = tokio::join!(self.fetch_and_forward(&outbound), self.subscribe()); let mut backoff = INITIAL_RECONNECT_BACKOFF; + let mut sse_stream = match sub { + Some(s) => s, + None => match self.reconnect(&outbound, &mut backoff).await { + Some(s) => s, + None => return, + }, + }; loop { + if outbound.is_closed() { + debug!("Outbound channel closed, shutting down"); + break; + } tokio::select! { item = sse_stream.next() => { if self @@ -168,7 +203,8 @@ impl BundlePoller { debug!("Block env channel closed, shutting down"); break; } - trace!("Block env changed, refetching all bundles"); + record_block_number(&self.envs); + debug!("Block env changed, refetching all bundles"); self.fetch_and_forward(&outbound).await; } } @@ -182,3 +218,9 @@ impl BundlePoller { (inbound, jh) } } + +fn record_block_number(envs: &watch::Receiver>) { + if let Some(env) = envs.borrow().as_ref() { + Span::current().record("block_number", env.rollup_env().number.to::()); + } +} From 27049efdc593cd3dc1b457adb2de55d9b01d6998 Mon Sep 17 00:00:00 2001 From: evalir Date: Wed, 29 Apr 2026 11:58:43 +0200 Subject: [PATCH 4/5] refactor: simplify-pass cleanup on BundlePoller - Collapse the duplicated reconnect match arms in handle_sse_item into let-else, matching the project's terse Option/Result style. - Promote record_block_number from a free fn to a &self method, shrinking the call sites and using SimEnv::rollup_block_number() instead of inlining the rollup_env().number.to::() chain. - Drop a stray WHAT-comment on reconnect's loop tail. --- src/tasks/cache/bundle.rs | 33 ++++++++++++++++----------------- 1 file changed, 16 insertions(+), 17 deletions(-) diff --git a/src/tasks/cache/bundle.rs b/src/tasks/cache/bundle.rs index 6880f321..fbe50f88 100644 --- a/src/tasks/cache/bundle.rs +++ b/src/tasks/cache/bundle.rs @@ -127,7 +127,6 @@ impl BundlePoller { if let Some(stream) = stream { return Some(stream); } - // subscribe failed; loop with longer backoff (no extra warn). } } @@ -150,17 +149,17 @@ impl BundlePoller { } Some(Err(error)) => { warn!(%error, "SSE bundle stream interrupted, reconnecting"); - match self.reconnect(outbound, backoff).await { - Some(s) => *stream = s, - None => return ControlFlow::Break(()), - } + let Some(s) = self.reconnect(outbound, backoff).await else { + return ControlFlow::Break(()); + }; + *stream = s; } None => { warn!("SSE bundle stream ended, reconnecting"); - match self.reconnect(outbound, backoff).await { - Some(s) => *stream = s, - None => return ControlFlow::Break(()), - } + let Some(s) = self.reconnect(outbound, backoff).await else { + return ControlFlow::Break(()); + }; + *stream = s; } } ControlFlow::Continue(()) @@ -171,7 +170,7 @@ impl BundlePoller { fields(url = %self.config.tx_pool_url, block_number = tracing::field::Empty), )] async fn task_future(mut self, outbound: mpsc::UnboundedSender) { - record_block_number(&self.envs); + self.record_block_number(); let (_, sub) = tokio::join!(self.fetch_and_forward(&outbound), self.subscribe()); let mut backoff = INITIAL_RECONNECT_BACKOFF; @@ -203,7 +202,7 @@ impl BundlePoller { debug!("Block env channel closed, shutting down"); break; } - record_block_number(&self.envs); + self.record_block_number(); debug!("Block env changed, refetching all bundles"); self.fetch_and_forward(&outbound).await; } @@ -211,6 +210,12 @@ impl BundlePoller { } } + fn record_block_number(&self) { + if let Some(env) = self.envs.borrow().as_ref() { + Span::current().record("block_number", env.rollup_block_number()); + } + } + /// Spawns the task future and returns a receiver for bundles it finds. pub fn spawn(self) -> (mpsc::UnboundedReceiver, JoinHandle<()>) { let (outbound, inbound) = mpsc::unbounded_channel(); @@ -218,9 +223,3 @@ impl BundlePoller { (inbound, jh) } } - -fn record_block_number(envs: &watch::Receiver>) { - if let Some(env) = envs.borrow().as_ref() { - Span::current().record("block_number", env.rollup_env().number.to::()); - } -} From 53e49ff048f9516a3ab96ac92653ccf7bb7a8f06 Mon Sep 17 00:00:00 2001 From: evalir Date: Wed, 29 Apr 2026 22:09:58 +0200 Subject: [PATCH 5/5] refactor: use EnvTask's BlockConstruction span instead of ad-hoc instrument MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Addresses Fraser's review on PR #269: - Drop the #[instrument] attribute from BundlePoller::task_future. Long-lived tasks shouldn't keep a single span open forever, and the Span::current() lookup inside record_block_number was flaky depending on the runtime log level. - Drop the record_block_number helper. Per-block context is already carried by the BlockConstruction span attached to each SimEnv (see EnvTask, env.rs:294), which already populates sim.ru.number, sim.host.number, sim.slot, etc. — the field names Fraser pointed to. - In the env-change branch of the select loop, capture env.clone_span() and use .instrument() on the inline async block that runs the refetch. This mirrors how SubmitTask::task_future picks up SimResult::clone_span() per work item, and how CacheTask::task_future enters env.span() for its sync work. --- src/tasks/cache/bundle.rs | 31 +++++++++++++++---------------- 1 file changed, 15 insertions(+), 16 deletions(-) diff --git a/src/tasks/cache/bundle.rs b/src/tasks/cache/bundle.rs index fbe50f88..6564c7cb 100644 --- a/src/tasks/cache/bundle.rs +++ b/src/tasks/cache/bundle.rs @@ -9,7 +9,7 @@ use tokio::{ task::JoinHandle, time, }; -use tracing::{Span, debug, instrument, trace, warn}; +use tracing::{Instrument, debug, trace, warn}; type SseStream = Pin> + Send>>; @@ -165,13 +165,7 @@ impl BundlePoller { ControlFlow::Continue(()) } - #[instrument( - skip_all, - fields(url = %self.config.tx_pool_url, block_number = tracing::field::Empty), - )] async fn task_future(mut self, outbound: mpsc::UnboundedSender) { - self.record_block_number(); - let (_, sub) = tokio::join!(self.fetch_and_forward(&outbound), self.subscribe()); let mut backoff = INITIAL_RECONNECT_BACKOFF; let mut sse_stream = match sub { @@ -202,20 +196,25 @@ impl BundlePoller { debug!("Block env channel closed, shutting down"); break; } - self.record_block_number(); - debug!("Block env changed, refetching all bundles"); - self.fetch_and_forward(&outbound).await; + // Run the refetch under the BlockConstruction span built by + // EnvTask, so its sim.ru.number / sim.host.number fields + // attach to anything the refetch logs. + let span = self + .envs + .borrow() + .as_ref() + .map_or_else(tracing::Span::none, |env| env.clone_span()); + async { + debug!("Block env changed, refetching all bundles"); + self.fetch_and_forward(&outbound).await; + } + .instrument(span) + .await; } } } } - fn record_block_number(&self) { - if let Some(env) = self.envs.borrow().as_ref() { - Span::current().record("block_number", env.rollup_block_number()); - } - } - /// Spawns the task future and returns a receiver for bundles it finds. pub fn spawn(self) -> (mpsc::UnboundedReceiver, JoinHandle<()>) { let (outbound, inbound) = mpsc::unbounded_channel();