Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ const TX_POLL_ERRORS_HELP: &str = "Transaction cache poll errors.";
const TXS_FETCHED: &str = "signet.builder.cache.txs_fetched";
const TXS_FETCHED_HELP: &str = "Transactions fetched per poll cycle.";

const SSE_RECONNECT_ATTEMPTS: &str = "signet.builder.cache.sse_reconnect_attempts";
const SSE_RECONNECT_ATTEMPTS_HELP: &str = "SSE stream reconnect attempts.";

const SSE_SUBSCRIBE_ERRORS: &str = "signet.builder.cache.sse_subscribe_errors";
const SSE_SUBSCRIBE_ERRORS_HELP: &str = "SSE stream subscription failures.";

const BUNDLE_POLL_COUNT: &str = "signet.builder.cache.bundle_poll_count";
const BUNDLE_POLL_COUNT_HELP: &str = "Bundle cache poll attempts.";

Expand Down Expand Up @@ -148,6 +154,8 @@ static DESCRIPTIONS: LazyLock<()> = LazyLock::new(|| {
describe_counter!(TX_POLL_COUNT, TX_POLL_COUNT_HELP);
describe_counter!(TX_POLL_ERRORS, TX_POLL_ERRORS_HELP);
describe_histogram!(TXS_FETCHED, TXS_FETCHED_HELP);
describe_counter!(SSE_RECONNECT_ATTEMPTS, SSE_RECONNECT_ATTEMPTS_HELP);
describe_counter!(SSE_SUBSCRIBE_ERRORS, SSE_SUBSCRIBE_ERRORS_HELP);
describe_counter!(BUNDLE_POLL_COUNT, BUNDLE_POLL_COUNT_HELP);
describe_counter!(BUNDLE_POLL_ERRORS, BUNDLE_POLL_ERRORS_HELP);
describe_histogram!(BUNDLES_FETCHED, BUNDLES_FETCHED_HELP);
Expand Down Expand Up @@ -234,6 +242,16 @@ pub(crate) fn record_txs_fetched(count: usize) {
histogram!(TXS_FETCHED).record(count as f64);
}

/// Increment the SSE reconnect attempts counter.
pub(crate) fn inc_sse_reconnect_attempts() {
counter!(SSE_RECONNECT_ATTEMPTS).increment(1);
}

/// Increment the SSE subscribe error counter.
pub(crate) fn inc_sse_subscribe_errors() {
counter!(SSE_SUBSCRIBE_ERRORS).increment(1);
}

/// Increment the bundle poll attempt counter.
pub(crate) fn inc_bundle_poll_count() {
counter!(BUNDLE_POLL_COUNT).increment(1);
Expand Down
252 changes: 182 additions & 70 deletions src/tasks/cache/bundle.rs
Original file line number Diff line number Diff line change
@@ -1,113 +1,225 @@
//! Bundler service responsible for fetching bundles and sending them to the simulator.
use crate::config::BuilderConfig;
use futures_util::{TryFutureExt, TryStreamExt};
use crate::{config::BuilderConfig, tasks::env::SimEnv};
use futures_util::{Stream, StreamExt, TryFutureExt, TryStreamExt};
use init4_bin_base::perms::tx_cache::{BuilderTxCache, BuilderTxCacheError};
use signet_tx_cache::{TxCacheError, types::CachedBundle};
use std::{ops::ControlFlow, pin::Pin, time::Duration};
use tokio::{
sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel},
sync::{mpsc, watch},
task::JoinHandle,
time::{self, Duration},
time,
};
use tracing::{Instrument, debug, trace, trace_span, warn};
use tracing::{Span, debug, instrument, trace, warn};

/// Poll interval for the bundle poller in milliseconds.
const POLL_INTERVAL_MS: u64 = 1000;
type SseStream = Pin<Box<dyn Stream<Item = Result<CachedBundle, BuilderTxCacheError>> + Send>>;

/// The BundlePoller polls the tx-pool for bundles.
const INITIAL_RECONNECT_BACKOFF: Duration = Duration::from_secs(1);
const MAX_RECONNECT_BACKOFF: Duration = Duration::from_secs(30);

/// The BundlePoller fetches bundles from the tx-pool on startup and on each
/// block environment change, and subscribes to an SSE stream for real-time
/// delivery of new bundles in between.
#[derive(Debug)]
pub struct BundlePoller {
/// The builder configuration values.
config: &'static BuilderConfig,

/// Client for the tx cache.
tx_cache: BuilderTxCache,

/// Defines the interval at which the bundler polls the tx-pool for bundles.
poll_interval_ms: u64,
}

impl Default for BundlePoller {
fn default() -> Self {
Self::new()
}
/// Receiver for block environment updates, used to trigger refetches.
envs: watch::Receiver<Option<SimEnv>>,
}

/// Implements a poller for the block builder to pull bundles from the tx-pool.
impl BundlePoller {
/// Creates a new BundlePoller from the provided builder config.
pub fn new() -> Self {
Self::new_with_poll_interval_ms(POLL_INTERVAL_MS)
}

/// Creates a new BundlePoller from the provided builder config and with the specified poll interval in ms.
pub fn new_with_poll_interval_ms(poll_interval_ms: u64) -> Self {
/// Returns a new [`BundlePoller`] with the given block environment receiver.
pub fn new(envs: watch::Receiver<Option<SimEnv>>) -> Self {
let config = crate::config();
let tx_cache = BuilderTxCache::new(config.tx_pool_url.clone(), config.oauth_token());
Self { config, tx_cache, poll_interval_ms }
Self { config, tx_cache, envs }
}

/// Returns the poll duration as a [`Duration`].
const fn poll_duration(&self) -> Duration {
Duration::from_millis(self.poll_interval_ms)
/// Pulls every bundle currently in the cache, paginating until the stream
/// is exhausted. Pure fetch — no metrics, no forwarding.
async fn check_bundle_cache(&self) -> Result<Vec<CachedBundle>, BuilderTxCacheError> {
self.tx_cache.stream_bundles().try_collect().await
}

/// Fetches all bundles from the tx-cache, paginating through all available pages.
pub async fn check_bundle_cache(&self) -> Result<Vec<CachedBundle>, BuilderTxCacheError> {
self.tx_cache.stream_bundles().try_collect().await
/// Fetches all bundles from the cache and forwards each to the outbound
/// channel. Records poll metrics around the fetch.
async fn fetch_and_forward(&self, outbound: &mpsc::UnboundedSender<CachedBundle>) {
crate::metrics::inc_bundle_poll_count();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This metric (and other ones with "poll" in their names) should maybe be renamed as they don't really measure individual poll attempts any more.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Claude Code]

Acknowledged. Deferring this rename to its own PR — it cuts across BundlePoller and TxPoller metrics plus any dashboards/alerts, so I'd rather not mix it with the SSE work here. Will track as a follow-up.

// NotOurSlot is expected whenever the builder isn't slot-permissioned;
// don't bump the error counter or warn.
let Ok(bundles) = self
.check_bundle_cache()
.inspect_err(|error| match error {
BuilderTxCacheError::TxCache(TxCacheError::NotOurSlot) => {
trace!("Not our slot to fetch bundles");
}
_ => {
crate::metrics::inc_bundle_poll_errors();
warn!(%error, "Failed to fetch bundles from tx-cache");
}
})
.await
else {
return;
};

crate::metrics::record_bundles_fetched(bundles.len());
trace!(count = bundles.len(), "found bundles");
for bundle in bundles {
if outbound.send(bundle).is_err() {
debug!("Outbound channel closed, dropping remaining bundles");
return;
}
}
}

async fn task_future(self, outbound: UnboundedSender<CachedBundle>) {
/// Returns `None` on connection failure; the caller is responsible for
/// scheduling a retry. Avoids the empty-stream sentinel pattern that
/// would double-log "stream ended" on a failure that never opened.
async fn subscribe(&self) -> Option<SseStream> {
self.tx_cache
.subscribe_bundles()
.await
.inspect(
|_| debug!(url = %self.config.tx_pool_url, "SSE bundle subscription established"),
)
.inspect_err(|error| {
crate::metrics::inc_sse_subscribe_errors();
match error {
BuilderTxCacheError::TxCache(TxCacheError::NotOurSlot) => {
trace!("Not our slot to subscribe to bundles");
}
_ => warn!(%error, "Failed to open SSE bundle subscription"),
}
})
.ok()
.map(|s| Box::pin(s) as SseStream)
}

/// Loops with exponential backoff until either a fresh SSE stream is
/// established (returned as `Some`) or the outbound channel is closed
/// (returned as `None`, signalling the task should shut down). Runs a
/// full refetch alongside each subscribe attempt to cover items missed
/// while disconnected.
async fn reconnect(
&mut self,
outbound: &mpsc::UnboundedSender<CachedBundle>,
backoff: &mut Duration,
) -> Option<SseStream> {
loop {
let span = trace_span!("BundlePoller::loop", url = %self.config.tx_pool_url);
if outbound.is_closed() {
return None;
}
crate::metrics::inc_sse_reconnect_attempts();
tokio::select! {
// Biased: a block env change wins over the backoff sleep. An env
// change triggers a full refetch below anyway, which supersedes the
// sleep-then-reconnect path — so there's no point waiting out the
// backoff.
biased;
_ = self.envs.changed() => {}
_ = time::sleep(*backoff) => {}
}
*backoff = (*backoff * 2).min(MAX_RECONNECT_BACKOFF);
let (_, stream) = tokio::join!(self.fetch_and_forward(outbound), self.subscribe());
if let Some(stream) = stream {
return Some(stream);
}
}
}

/// Returns `Break` when the outbound channel has closed and the task
/// should shut down.
async fn handle_sse_item(
&mut self,
item: Option<Result<CachedBundle, BuilderTxCacheError>>,
outbound: &mpsc::UnboundedSender<CachedBundle>,
backoff: &mut Duration,
stream: &mut SseStream,
) -> ControlFlow<()> {
match item {
Some(Ok(bundle)) => {
*backoff = INITIAL_RECONNECT_BACKOFF;
if outbound.send(bundle).is_err() {
trace!("No receivers left, shutting down");
return ControlFlow::Break(());
}
}
Some(Err(error)) => {
warn!(%error, "SSE bundle stream interrupted, reconnecting");
let Some(s) = self.reconnect(outbound, backoff).await else {
return ControlFlow::Break(());
};
*stream = s;
}
None => {
warn!("SSE bundle stream ended, reconnecting");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Claude Code]

When subscribe() (line 98) fails to open the connection, it logs "Failed to open SSE bundle subscription" and returns stream::empty() as a sentinel. The first sse_stream.next() then yields None immediately, which lands here and emits "SSE bundle stream ended, reconnecting". Two warns per failed subscribe cycle, and the second one is a lie - the stream never opened.

Suggest having subscribe() return Option<SseStream> (.ok().map(...) instead of unwrap_or_else(empty)) and adding a None-branch in task_future/reconnect that just schedules a retry without going through handle_sse_item. That keeps the "stream ended" warn for the case it actually describes (a real stream that closed mid-flight) and avoids double-logging.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Claude Code]

Done in 13b25a4subscribe() now returns Option<SseStream>; on None, the caller (initial path in task_future or the Err/None arms of handle_sse_item) hands off to reconnect, which loops internally with backoff until either a real stream is established (Some) or the outbound channel closes (None, signalling the task to shut down). The misleading "stream ended, reconnecting" warn no longer fires after a failed subscribe — the only warn comes from subscribe's actual error path.

let Some(s) = self.reconnect(outbound, backoff).await else {
return ControlFlow::Break(());
};
*stream = s;
}
}
ControlFlow::Continue(())
}

#[instrument(
skip_all,
fields(url = %self.config.tx_pool_url, block_number = tracing::field::Empty),
)]
async fn task_future(mut self, outbound: mpsc::UnboundedSender<CachedBundle>) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should span this and pass the span to the lifecycle methods as necessary

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and as I think about it more, we probably want to add at least the block number to span's log fields.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Claude Code]

Done in 13b25a4task_future now carries an #[instrument] span with url and block_number fields. block_number is recorded on startup and re-recorded on every env-change branch so the field tracks the current block. The lifecycle methods (fetch_and_forward, subscribe, reconnect, handle_sse_item) inherit this span automatically via async context propagation, so I dropped the redundant per-method trace_span!s.

self.record_block_number();

let (_, sub) = tokio::join!(self.fetch_and_forward(&outbound), self.subscribe());
let mut backoff = INITIAL_RECONNECT_BACKOFF;
let mut sse_stream = match sub {
Some(s) => s,
None => match self.reconnect(&outbound, &mut backoff).await {
Some(s) => s,
None => return,
},
};

// Check this here to avoid making the web request if we know
// we don't need the results.
loop {
if outbound.is_closed() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check is removed from this arm, but it's still called in the tx cache arm. Need to check outbound.is_closed somewhere in this arm. In its current state, the simulator could drop its receiver while the SSE is in the error or none branch and would repeat its reconnection loop forever.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Claude Code]

Done in 13b25a4 — added an if outbound.is_closed() { break; } guard at the top of task_future's select loop, plus an early outbound.is_closed() check at the top of reconnect's loop body. Together those break the forever-reconnect cycle: if the simulator drops its receiver while we're stuck in a stream error/None loop, the next iteration of either loop exits cleanly.

span.in_scope(|| trace!("No receivers left, shutting down"));
debug!("Outbound channel closed, shutting down");
break;
}

crate::metrics::inc_bundle_poll_count();
let Ok(bundles) = self
.check_bundle_cache()
.inspect_err(|error| match error {
BuilderTxCacheError::TxCache(TxCacheError::NotOurSlot) => {
trace!("Not our slot to fetch bundles");
}
_ => {
crate::metrics::inc_bundle_poll_errors();
warn!(%error, "Failed to fetch bundles from tx-cache");
tokio::select! {
item = sse_stream.next() => {
if self
.handle_sse_item(item, &outbound, &mut backoff, &mut sse_stream)
.await
.is_break()
{
break;
}
})
.instrument(span.clone())
.await
else {
time::sleep(self.poll_duration()).await;
continue;
};

{
let _guard = span.entered();
crate::metrics::record_bundles_fetched(bundles.len());
trace!(count = bundles.len(), "fetched bundles from tx-cache");
for bundle in bundles {
if let Err(err) = outbound.send(bundle) {
debug!(?err, "Failed to send bundle - channel is dropped");
}
res = self.envs.changed() => {
if res.is_err() {
debug!("Block env channel closed, shutting down");
break;
}
self.record_block_number();
debug!("Block env changed, refetching all bundles");
self.fetch_and_forward(&outbound).await;
}
}

time::sleep(self.poll_duration()).await;
}
}

/// Spawns a task that sends bundles it finds to its channel sender.
pub fn spawn(self) -> (UnboundedReceiver<CachedBundle>, JoinHandle<()>) {
let (outbound, inbound) = unbounded_channel();
fn record_block_number(&self) {
if let Some(env) = self.envs.borrow().as_ref() {
Span::current().record("block_number", env.rollup_block_number());
}
}

/// Spawns the task future and returns a receiver for bundles it finds.
pub fn spawn(self) -> (mpsc::UnboundedReceiver<CachedBundle>, JoinHandle<()>) {
let (outbound, inbound) = mpsc::unbounded_channel();
let jh = tokio::spawn(self.task_future(outbound));

(inbound, jh)
}
}
2 changes: 1 addition & 1 deletion src/tasks/cache/system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ impl CacheTasks {
let (tx_receiver, tx_poller) = tx_poller.spawn();

// Bundle Poller pulls bundles from the cache
let bundle_poller = BundlePoller::new();
let bundle_poller = BundlePoller::new(self.block_env.clone());
let (bundle_receiver, bundle_poller) = bundle_poller.spawn();

// Set up the cache task
Expand Down
7 changes: 5 additions & 2 deletions tests/bundle_poller_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,18 @@

use builder::test_utils::{setup_logging, setup_test_config};
use eyre::Result;
use futures_util::TryStreamExt;
use init4_bin_base::perms::tx_cache::BuilderTxCache;

#[tokio::test]
async fn test_bundle_poller_roundtrip() -> Result<()> {
setup_logging();
setup_test_config();

let bundle_poller = builder::tasks::cache::BundlePoller::new();
let config = builder::config();
let tx_cache = BuilderTxCache::new(config.tx_pool_url.clone(), config.oauth_token());

let _ = bundle_poller.check_bundle_cache().await?;
let _bundles: Vec<_> = tx_cache.stream_bundles().try_collect().await?;

Ok(())
}