-
Notifications
You must be signed in to change notification settings - Fork 2
feat: replace BundlePoller polling with SSE streaming #269
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: graphite-base/269
Are you sure you want to change the base?
Changes from all commits
36ebe82
58c25a1
13b25a4
27049ef
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,113 +1,225 @@ | ||
| //! Bundler service responsible for fetching bundles and sending them to the simulator. | ||
| use crate::config::BuilderConfig; | ||
| use futures_util::{TryFutureExt, TryStreamExt}; | ||
| use crate::{config::BuilderConfig, tasks::env::SimEnv}; | ||
| use futures_util::{Stream, StreamExt, TryFutureExt, TryStreamExt}; | ||
| use init4_bin_base::perms::tx_cache::{BuilderTxCache, BuilderTxCacheError}; | ||
| use signet_tx_cache::{TxCacheError, types::CachedBundle}; | ||
| use std::{ops::ControlFlow, pin::Pin, time::Duration}; | ||
| use tokio::{ | ||
| sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel}, | ||
| sync::{mpsc, watch}, | ||
| task::JoinHandle, | ||
| time::{self, Duration}, | ||
| time, | ||
| }; | ||
| use tracing::{Instrument, debug, trace, trace_span, warn}; | ||
| use tracing::{Span, debug, instrument, trace, warn}; | ||
|
|
||
| /// Poll interval for the bundle poller in milliseconds. | ||
| const POLL_INTERVAL_MS: u64 = 1000; | ||
| type SseStream = Pin<Box<dyn Stream<Item = Result<CachedBundle, BuilderTxCacheError>> + Send>>; | ||
|
|
||
| /// The BundlePoller polls the tx-pool for bundles. | ||
| const INITIAL_RECONNECT_BACKOFF: Duration = Duration::from_secs(1); | ||
| const MAX_RECONNECT_BACKOFF: Duration = Duration::from_secs(30); | ||
|
|
||
| /// The BundlePoller fetches bundles from the tx-pool on startup and on each | ||
| /// block environment change, and subscribes to an SSE stream for real-time | ||
| /// delivery of new bundles in between. | ||
| #[derive(Debug)] | ||
| pub struct BundlePoller { | ||
| /// The builder configuration values. | ||
| config: &'static BuilderConfig, | ||
|
|
||
| /// Client for the tx cache. | ||
| tx_cache: BuilderTxCache, | ||
|
|
||
| /// Defines the interval at which the bundler polls the tx-pool for bundles. | ||
| poll_interval_ms: u64, | ||
| } | ||
|
|
||
| impl Default for BundlePoller { | ||
| fn default() -> Self { | ||
| Self::new() | ||
| } | ||
| /// Receiver for block environment updates, used to trigger refetches. | ||
| envs: watch::Receiver<Option<SimEnv>>, | ||
| } | ||
|
|
||
| /// Implements a poller for the block builder to pull bundles from the tx-pool. | ||
| impl BundlePoller { | ||
| /// Creates a new BundlePoller from the provided builder config. | ||
| pub fn new() -> Self { | ||
| Self::new_with_poll_interval_ms(POLL_INTERVAL_MS) | ||
| } | ||
|
|
||
| /// Creates a new BundlePoller from the provided builder config and with the specified poll interval in ms. | ||
| pub fn new_with_poll_interval_ms(poll_interval_ms: u64) -> Self { | ||
| /// Returns a new [`BundlePoller`] with the given block environment receiver. | ||
| pub fn new(envs: watch::Receiver<Option<SimEnv>>) -> Self { | ||
| let config = crate::config(); | ||
| let tx_cache = BuilderTxCache::new(config.tx_pool_url.clone(), config.oauth_token()); | ||
| Self { config, tx_cache, poll_interval_ms } | ||
| Self { config, tx_cache, envs } | ||
| } | ||
|
|
||
| /// Returns the poll duration as a [`Duration`]. | ||
| const fn poll_duration(&self) -> Duration { | ||
| Duration::from_millis(self.poll_interval_ms) | ||
| /// Pulls every bundle currently in the cache, paginating until the stream | ||
| /// is exhausted. Pure fetch — no metrics, no forwarding. | ||
| async fn check_bundle_cache(&self) -> Result<Vec<CachedBundle>, BuilderTxCacheError> { | ||
| self.tx_cache.stream_bundles().try_collect().await | ||
| } | ||
|
|
||
| /// Fetches all bundles from the tx-cache, paginating through all available pages. | ||
| pub async fn check_bundle_cache(&self) -> Result<Vec<CachedBundle>, BuilderTxCacheError> { | ||
| self.tx_cache.stream_bundles().try_collect().await | ||
| /// Fetches all bundles from the cache and forwards each to the outbound | ||
| /// channel. Records poll metrics around the fetch. | ||
| async fn fetch_and_forward(&self, outbound: &mpsc::UnboundedSender<CachedBundle>) { | ||
| crate::metrics::inc_bundle_poll_count(); | ||
| // NotOurSlot is expected whenever the builder isn't slot-permissioned; | ||
| // don't bump the error counter or warn. | ||
| let Ok(bundles) = self | ||
| .check_bundle_cache() | ||
| .inspect_err(|error| match error { | ||
| BuilderTxCacheError::TxCache(TxCacheError::NotOurSlot) => { | ||
| trace!("Not our slot to fetch bundles"); | ||
| } | ||
| _ => { | ||
| crate::metrics::inc_bundle_poll_errors(); | ||
| warn!(%error, "Failed to fetch bundles from tx-cache"); | ||
| } | ||
| }) | ||
| .await | ||
| else { | ||
| return; | ||
| }; | ||
|
|
||
| crate::metrics::record_bundles_fetched(bundles.len()); | ||
| trace!(count = bundles.len(), "found bundles"); | ||
| for bundle in bundles { | ||
| if outbound.send(bundle).is_err() { | ||
| debug!("Outbound channel closed, dropping remaining bundles"); | ||
| return; | ||
| } | ||
| } | ||
| } | ||
|
|
||
| async fn task_future(self, outbound: UnboundedSender<CachedBundle>) { | ||
| /// Returns `None` on connection failure; the caller is responsible for | ||
| /// scheduling a retry. Avoids the empty-stream sentinel pattern that | ||
| /// would double-log "stream ended" on a failure that never opened. | ||
| async fn subscribe(&self) -> Option<SseStream> { | ||
| self.tx_cache | ||
| .subscribe_bundles() | ||
| .await | ||
| .inspect( | ||
| |_| debug!(url = %self.config.tx_pool_url, "SSE bundle subscription established"), | ||
| ) | ||
| .inspect_err(|error| { | ||
| crate::metrics::inc_sse_subscribe_errors(); | ||
| match error { | ||
| BuilderTxCacheError::TxCache(TxCacheError::NotOurSlot) => { | ||
| trace!("Not our slot to subscribe to bundles"); | ||
| } | ||
| _ => warn!(%error, "Failed to open SSE bundle subscription"), | ||
| } | ||
| }) | ||
| .ok() | ||
| .map(|s| Box::pin(s) as SseStream) | ||
| } | ||
|
|
||
| /// Loops with exponential backoff until either a fresh SSE stream is | ||
| /// established (returned as `Some`) or the outbound channel is closed | ||
| /// (returned as `None`, signalling the task should shut down). Runs a | ||
| /// full refetch alongside each subscribe attempt to cover items missed | ||
| /// while disconnected. | ||
| async fn reconnect( | ||
| &mut self, | ||
| outbound: &mpsc::UnboundedSender<CachedBundle>, | ||
| backoff: &mut Duration, | ||
| ) -> Option<SseStream> { | ||
| loop { | ||
| let span = trace_span!("BundlePoller::loop", url = %self.config.tx_pool_url); | ||
| if outbound.is_closed() { | ||
| return None; | ||
| } | ||
| crate::metrics::inc_sse_reconnect_attempts(); | ||
| tokio::select! { | ||
| // Biased: a block env change wins over the backoff sleep. An env | ||
| // change triggers a full refetch below anyway, which supersedes the | ||
| // sleep-then-reconnect path — so there's no point waiting out the | ||
| // backoff. | ||
| biased; | ||
| _ = self.envs.changed() => {} | ||
| _ = time::sleep(*backoff) => {} | ||
| } | ||
| *backoff = (*backoff * 2).min(MAX_RECONNECT_BACKOFF); | ||
| let (_, stream) = tokio::join!(self.fetch_and_forward(outbound), self.subscribe()); | ||
| if let Some(stream) = stream { | ||
| return Some(stream); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /// Returns `Break` when the outbound channel has closed and the task | ||
| /// should shut down. | ||
| async fn handle_sse_item( | ||
| &mut self, | ||
| item: Option<Result<CachedBundle, BuilderTxCacheError>>, | ||
| outbound: &mpsc::UnboundedSender<CachedBundle>, | ||
| backoff: &mut Duration, | ||
| stream: &mut SseStream, | ||
| ) -> ControlFlow<()> { | ||
| match item { | ||
| Some(Ok(bundle)) => { | ||
| *backoff = INITIAL_RECONNECT_BACKOFF; | ||
| if outbound.send(bundle).is_err() { | ||
| trace!("No receivers left, shutting down"); | ||
| return ControlFlow::Break(()); | ||
| } | ||
| } | ||
| Some(Err(error)) => { | ||
| warn!(%error, "SSE bundle stream interrupted, reconnecting"); | ||
| let Some(s) = self.reconnect(outbound, backoff).await else { | ||
| return ControlFlow::Break(()); | ||
| }; | ||
| *stream = s; | ||
| } | ||
| None => { | ||
| warn!("SSE bundle stream ended, reconnecting"); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [Claude Code] When Suggest having
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [Claude Code] Done in 13b25a4 — |
||
| let Some(s) = self.reconnect(outbound, backoff).await else { | ||
| return ControlFlow::Break(()); | ||
| }; | ||
| *stream = s; | ||
| } | ||
| } | ||
| ControlFlow::Continue(()) | ||
| } | ||
|
|
||
| #[instrument( | ||
| skip_all, | ||
| fields(url = %self.config.tx_pool_url, block_number = tracing::field::Empty), | ||
| )] | ||
| async fn task_future(mut self, outbound: mpsc::UnboundedSender<CachedBundle>) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should span this and pass the span to the lifecycle methods as necessary
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. and as I think about it more, we probably want to add at least the block number to span's log fields.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [Claude Code] Done in 13b25a4 — |
||
| self.record_block_number(); | ||
|
|
||
| let (_, sub) = tokio::join!(self.fetch_and_forward(&outbound), self.subscribe()); | ||
| let mut backoff = INITIAL_RECONNECT_BACKOFF; | ||
| let mut sse_stream = match sub { | ||
| Some(s) => s, | ||
| None => match self.reconnect(&outbound, &mut backoff).await { | ||
| Some(s) => s, | ||
| None => return, | ||
| }, | ||
| }; | ||
|
|
||
| // Check this here to avoid making the web request if we know | ||
| // we don't need the results. | ||
| loop { | ||
| if outbound.is_closed() { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This check is removed from this arm, but it's still called in the tx cache arm. Need to check
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [Claude Code] Done in 13b25a4 — added an |
||
| span.in_scope(|| trace!("No receivers left, shutting down")); | ||
| debug!("Outbound channel closed, shutting down"); | ||
| break; | ||
| } | ||
|
|
||
| crate::metrics::inc_bundle_poll_count(); | ||
| let Ok(bundles) = self | ||
| .check_bundle_cache() | ||
| .inspect_err(|error| match error { | ||
| BuilderTxCacheError::TxCache(TxCacheError::NotOurSlot) => { | ||
| trace!("Not our slot to fetch bundles"); | ||
| } | ||
| _ => { | ||
| crate::metrics::inc_bundle_poll_errors(); | ||
| warn!(%error, "Failed to fetch bundles from tx-cache"); | ||
| tokio::select! { | ||
| item = sse_stream.next() => { | ||
| if self | ||
| .handle_sse_item(item, &outbound, &mut backoff, &mut sse_stream) | ||
| .await | ||
| .is_break() | ||
| { | ||
| break; | ||
| } | ||
| }) | ||
| .instrument(span.clone()) | ||
| .await | ||
| else { | ||
| time::sleep(self.poll_duration()).await; | ||
| continue; | ||
| }; | ||
|
|
||
| { | ||
| let _guard = span.entered(); | ||
| crate::metrics::record_bundles_fetched(bundles.len()); | ||
| trace!(count = bundles.len(), "fetched bundles from tx-cache"); | ||
| for bundle in bundles { | ||
| if let Err(err) = outbound.send(bundle) { | ||
| debug!(?err, "Failed to send bundle - channel is dropped"); | ||
| } | ||
| res = self.envs.changed() => { | ||
| if res.is_err() { | ||
| debug!("Block env channel closed, shutting down"); | ||
| break; | ||
| } | ||
| self.record_block_number(); | ||
| debug!("Block env changed, refetching all bundles"); | ||
| self.fetch_and_forward(&outbound).await; | ||
| } | ||
| } | ||
|
|
||
| time::sleep(self.poll_duration()).await; | ||
| } | ||
| } | ||
|
|
||
| /// Spawns a task that sends bundles it finds to its channel sender. | ||
| pub fn spawn(self) -> (UnboundedReceiver<CachedBundle>, JoinHandle<()>) { | ||
| let (outbound, inbound) = unbounded_channel(); | ||
| fn record_block_number(&self) { | ||
| if let Some(env) = self.envs.borrow().as_ref() { | ||
| Span::current().record("block_number", env.rollup_block_number()); | ||
| } | ||
| } | ||
|
|
||
| /// Spawns the task future and returns a receiver for bundles it finds. | ||
| pub fn spawn(self) -> (mpsc::UnboundedReceiver<CachedBundle>, JoinHandle<()>) { | ||
| let (outbound, inbound) = mpsc::unbounded_channel(); | ||
| let jh = tokio::spawn(self.task_future(outbound)); | ||
|
|
||
| (inbound, jh) | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This metric (and other ones with "poll" in their names) should maybe be renamed as they don't really measure individual poll attempts any more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[Claude Code]
Acknowledged. Deferring this rename to its own PR — it cuts across
BundlePollerandTxPollermetrics plus any dashboards/alerts, so I'd rather not mix it with the SSE work here. Will track as a follow-up.