diff --git a/src/metrics.rs b/src/metrics.rs index 14c57dbd..bf96c4c4 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -33,6 +33,12 @@ const TX_POLL_ERRORS_HELP: &str = "Transaction cache poll errors."; const TXS_FETCHED: &str = "signet.builder.cache.txs_fetched"; const TXS_FETCHED_HELP: &str = "Transactions fetched per poll cycle."; +const SSE_RECONNECT_ATTEMPTS: &str = "signet.builder.cache.sse_reconnect_attempts"; +const SSE_RECONNECT_ATTEMPTS_HELP: &str = "SSE stream reconnect attempts."; + +const SSE_SUBSCRIBE_ERRORS: &str = "signet.builder.cache.sse_subscribe_errors"; +const SSE_SUBSCRIBE_ERRORS_HELP: &str = "SSE stream subscription failures."; + const BUNDLE_POLL_COUNT: &str = "signet.builder.cache.bundle_poll_count"; const BUNDLE_POLL_COUNT_HELP: &str = "Bundle cache poll attempts."; @@ -148,6 +154,8 @@ static DESCRIPTIONS: LazyLock<()> = LazyLock::new(|| { describe_counter!(TX_POLL_COUNT, TX_POLL_COUNT_HELP); describe_counter!(TX_POLL_ERRORS, TX_POLL_ERRORS_HELP); describe_histogram!(TXS_FETCHED, TXS_FETCHED_HELP); + describe_counter!(SSE_RECONNECT_ATTEMPTS, SSE_RECONNECT_ATTEMPTS_HELP); + describe_counter!(SSE_SUBSCRIBE_ERRORS, SSE_SUBSCRIBE_ERRORS_HELP); describe_counter!(BUNDLE_POLL_COUNT, BUNDLE_POLL_COUNT_HELP); describe_counter!(BUNDLE_POLL_ERRORS, BUNDLE_POLL_ERRORS_HELP); describe_histogram!(BUNDLES_FETCHED, BUNDLES_FETCHED_HELP); @@ -234,6 +242,16 @@ pub(crate) fn record_txs_fetched(count: usize) { histogram!(TXS_FETCHED).record(count as f64); } +/// Increment the SSE reconnect attempts counter. +pub(crate) fn inc_sse_reconnect_attempts() { + counter!(SSE_RECONNECT_ATTEMPTS).increment(1); +} + +/// Increment the SSE subscribe error counter. +pub(crate) fn inc_sse_subscribe_errors() { + counter!(SSE_SUBSCRIBE_ERRORS).increment(1); +} + /// Increment the bundle poll attempt counter. pub(crate) fn inc_bundle_poll_count() { counter!(BUNDLE_POLL_COUNT).increment(1); diff --git a/src/tasks/cache/bundle.rs b/src/tasks/cache/bundle.rs index a45fff03..6564c7cb 100644 --- a/src/tasks/cache/bundle.rs +++ b/src/tasks/cache/bundle.rs @@ -1,113 +1,224 @@ //! Bundler service responsible for fetching bundles and sending them to the simulator. -use crate::config::BuilderConfig; -use futures_util::{TryFutureExt, TryStreamExt}; +use crate::{config::BuilderConfig, tasks::env::SimEnv}; +use futures_util::{Stream, StreamExt, TryFutureExt, TryStreamExt}; use init4_bin_base::perms::tx_cache::{BuilderTxCache, BuilderTxCacheError}; use signet_tx_cache::{TxCacheError, types::CachedBundle}; +use std::{ops::ControlFlow, pin::Pin, time::Duration}; use tokio::{ - sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel}, + sync::{mpsc, watch}, task::JoinHandle, - time::{self, Duration}, + time, }; -use tracing::{Instrument, debug, trace, trace_span, warn}; +use tracing::{Instrument, debug, trace, warn}; -/// Poll interval for the bundle poller in milliseconds. -const POLL_INTERVAL_MS: u64 = 1000; +type SseStream = Pin> + Send>>; -/// The BundlePoller polls the tx-pool for bundles. +const INITIAL_RECONNECT_BACKOFF: Duration = Duration::from_secs(1); +const MAX_RECONNECT_BACKOFF: Duration = Duration::from_secs(30); + +/// The BundlePoller fetches bundles from the tx-pool on startup and on each +/// block environment change, and subscribes to an SSE stream for real-time +/// delivery of new bundles in between. #[derive(Debug)] pub struct BundlePoller { /// The builder configuration values. config: &'static BuilderConfig, - /// Client for the tx cache. tx_cache: BuilderTxCache, - - /// Defines the interval at which the bundler polls the tx-pool for bundles. - poll_interval_ms: u64, -} - -impl Default for BundlePoller { - fn default() -> Self { - Self::new() - } + /// Receiver for block environment updates, used to trigger refetches. + envs: watch::Receiver>, } -/// Implements a poller for the block builder to pull bundles from the tx-pool. impl BundlePoller { - /// Creates a new BundlePoller from the provided builder config. - pub fn new() -> Self { - Self::new_with_poll_interval_ms(POLL_INTERVAL_MS) - } - - /// Creates a new BundlePoller from the provided builder config and with the specified poll interval in ms. - pub fn new_with_poll_interval_ms(poll_interval_ms: u64) -> Self { + /// Returns a new [`BundlePoller`] with the given block environment receiver. + pub fn new(envs: watch::Receiver>) -> Self { let config = crate::config(); let tx_cache = BuilderTxCache::new(config.tx_pool_url.clone(), config.oauth_token()); - Self { config, tx_cache, poll_interval_ms } + Self { config, tx_cache, envs } } - /// Returns the poll duration as a [`Duration`]. - const fn poll_duration(&self) -> Duration { - Duration::from_millis(self.poll_interval_ms) + /// Pulls every bundle currently in the cache, paginating until the stream + /// is exhausted. Pure fetch — no metrics, no forwarding. + async fn check_bundle_cache(&self) -> Result, BuilderTxCacheError> { + self.tx_cache.stream_bundles().try_collect().await } - /// Fetches all bundles from the tx-cache, paginating through all available pages. - pub async fn check_bundle_cache(&self) -> Result, BuilderTxCacheError> { - self.tx_cache.stream_bundles().try_collect().await + /// Fetches all bundles from the cache and forwards each to the outbound + /// channel. Records poll metrics around the fetch. + async fn fetch_and_forward(&self, outbound: &mpsc::UnboundedSender) { + crate::metrics::inc_bundle_poll_count(); + // NotOurSlot is expected whenever the builder isn't slot-permissioned; + // don't bump the error counter or warn. + let Ok(bundles) = self + .check_bundle_cache() + .inspect_err(|error| match error { + BuilderTxCacheError::TxCache(TxCacheError::NotOurSlot) => { + trace!("Not our slot to fetch bundles"); + } + _ => { + crate::metrics::inc_bundle_poll_errors(); + warn!(%error, "Failed to fetch bundles from tx-cache"); + } + }) + .await + else { + return; + }; + + crate::metrics::record_bundles_fetched(bundles.len()); + trace!(count = bundles.len(), "found bundles"); + for bundle in bundles { + if outbound.send(bundle).is_err() { + debug!("Outbound channel closed, dropping remaining bundles"); + return; + } + } } - async fn task_future(self, outbound: UnboundedSender) { + /// Returns `None` on connection failure; the caller is responsible for + /// scheduling a retry. Avoids the empty-stream sentinel pattern that + /// would double-log "stream ended" on a failure that never opened. + async fn subscribe(&self) -> Option { + self.tx_cache + .subscribe_bundles() + .await + .inspect( + |_| debug!(url = %self.config.tx_pool_url, "SSE bundle subscription established"), + ) + .inspect_err(|error| { + crate::metrics::inc_sse_subscribe_errors(); + match error { + BuilderTxCacheError::TxCache(TxCacheError::NotOurSlot) => { + trace!("Not our slot to subscribe to bundles"); + } + _ => warn!(%error, "Failed to open SSE bundle subscription"), + } + }) + .ok() + .map(|s| Box::pin(s) as SseStream) + } + + /// Loops with exponential backoff until either a fresh SSE stream is + /// established (returned as `Some`) or the outbound channel is closed + /// (returned as `None`, signalling the task should shut down). Runs a + /// full refetch alongside each subscribe attempt to cover items missed + /// while disconnected. + async fn reconnect( + &mut self, + outbound: &mpsc::UnboundedSender, + backoff: &mut Duration, + ) -> Option { loop { - let span = trace_span!("BundlePoller::loop", url = %self.config.tx_pool_url); + if outbound.is_closed() { + return None; + } + crate::metrics::inc_sse_reconnect_attempts(); + tokio::select! { + // Biased: a block env change wins over the backoff sleep. An env + // change triggers a full refetch below anyway, which supersedes the + // sleep-then-reconnect path — so there's no point waiting out the + // backoff. + biased; + _ = self.envs.changed() => {} + _ = time::sleep(*backoff) => {} + } + *backoff = (*backoff * 2).min(MAX_RECONNECT_BACKOFF); + let (_, stream) = tokio::join!(self.fetch_and_forward(outbound), self.subscribe()); + if let Some(stream) = stream { + return Some(stream); + } + } + } + + /// Returns `Break` when the outbound channel has closed and the task + /// should shut down. + async fn handle_sse_item( + &mut self, + item: Option>, + outbound: &mpsc::UnboundedSender, + backoff: &mut Duration, + stream: &mut SseStream, + ) -> ControlFlow<()> { + match item { + Some(Ok(bundle)) => { + *backoff = INITIAL_RECONNECT_BACKOFF; + if outbound.send(bundle).is_err() { + trace!("No receivers left, shutting down"); + return ControlFlow::Break(()); + } + } + Some(Err(error)) => { + warn!(%error, "SSE bundle stream interrupted, reconnecting"); + let Some(s) = self.reconnect(outbound, backoff).await else { + return ControlFlow::Break(()); + }; + *stream = s; + } + None => { + warn!("SSE bundle stream ended, reconnecting"); + let Some(s) = self.reconnect(outbound, backoff).await else { + return ControlFlow::Break(()); + }; + *stream = s; + } + } + ControlFlow::Continue(()) + } + + async fn task_future(mut self, outbound: mpsc::UnboundedSender) { + let (_, sub) = tokio::join!(self.fetch_and_forward(&outbound), self.subscribe()); + let mut backoff = INITIAL_RECONNECT_BACKOFF; + let mut sse_stream = match sub { + Some(s) => s, + None => match self.reconnect(&outbound, &mut backoff).await { + Some(s) => s, + None => return, + }, + }; - // Check this here to avoid making the web request if we know - // we don't need the results. + loop { if outbound.is_closed() { - span.in_scope(|| trace!("No receivers left, shutting down")); + debug!("Outbound channel closed, shutting down"); break; } - - crate::metrics::inc_bundle_poll_count(); - let Ok(bundles) = self - .check_bundle_cache() - .inspect_err(|error| match error { - BuilderTxCacheError::TxCache(TxCacheError::NotOurSlot) => { - trace!("Not our slot to fetch bundles"); - } - _ => { - crate::metrics::inc_bundle_poll_errors(); - warn!(%error, "Failed to fetch bundles from tx-cache"); + tokio::select! { + item = sse_stream.next() => { + if self + .handle_sse_item(item, &outbound, &mut backoff, &mut sse_stream) + .await + .is_break() + { + break; } - }) - .instrument(span.clone()) - .await - else { - time::sleep(self.poll_duration()).await; - continue; - }; - - { - let _guard = span.entered(); - crate::metrics::record_bundles_fetched(bundles.len()); - trace!(count = bundles.len(), "fetched bundles from tx-cache"); - for bundle in bundles { - if let Err(err) = outbound.send(bundle) { - debug!(?err, "Failed to send bundle - channel is dropped"); + } + res = self.envs.changed() => { + if res.is_err() { + debug!("Block env channel closed, shutting down"); break; } + // Run the refetch under the BlockConstruction span built by + // EnvTask, so its sim.ru.number / sim.host.number fields + // attach to anything the refetch logs. + let span = self + .envs + .borrow() + .as_ref() + .map_or_else(tracing::Span::none, |env| env.clone_span()); + async { + debug!("Block env changed, refetching all bundles"); + self.fetch_and_forward(&outbound).await; + } + .instrument(span) + .await; } } - - time::sleep(self.poll_duration()).await; } } - /// Spawns a task that sends bundles it finds to its channel sender. - pub fn spawn(self) -> (UnboundedReceiver, JoinHandle<()>) { - let (outbound, inbound) = unbounded_channel(); - + /// Spawns the task future and returns a receiver for bundles it finds. + pub fn spawn(self) -> (mpsc::UnboundedReceiver, JoinHandle<()>) { + let (outbound, inbound) = mpsc::unbounded_channel(); let jh = tokio::spawn(self.task_future(outbound)); - (inbound, jh) } } diff --git a/src/tasks/cache/system.rs b/src/tasks/cache/system.rs index 89698a4d..26a1a5b6 100644 --- a/src/tasks/cache/system.rs +++ b/src/tasks/cache/system.rs @@ -27,7 +27,7 @@ impl CacheTasks { let (tx_receiver, tx_poller) = tx_poller.spawn(); // Bundle Poller pulls bundles from the cache - let bundle_poller = BundlePoller::new(); + let bundle_poller = BundlePoller::new(self.block_env.clone()); let (bundle_receiver, bundle_poller) = bundle_poller.spawn(); // Set up the cache task diff --git a/tests/bundle_poller_test.rs b/tests/bundle_poller_test.rs index 9acfe483..1dc7c3c4 100644 --- a/tests/bundle_poller_test.rs +++ b/tests/bundle_poller_test.rs @@ -2,15 +2,18 @@ use builder::test_utils::{setup_logging, setup_test_config}; use eyre::Result; +use futures_util::TryStreamExt; +use init4_bin_base::perms::tx_cache::BuilderTxCache; #[tokio::test] async fn test_bundle_poller_roundtrip() -> Result<()> { setup_logging(); setup_test_config(); - let bundle_poller = builder::tasks::cache::BundlePoller::new(); + let config = builder::config(); + let tx_cache = BuilderTxCache::new(config.tx_pool_url.clone(), config.oauth_token()); - let _ = bundle_poller.check_bundle_cache().await?; + let _bundles: Vec<_> = tx_cache.stream_bundles().try_collect().await?; Ok(()) }