Skip to content
Open
13 changes: 13 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ name = "zenith-builder-example"
path = "bin/builder.rs"

[dependencies]
init4-bin-base = { version = "0.18.0-rc.13", features = ["perms", "aws", "pylon"] }
init4-bin-base = { version = "0.18.0", features = ["perms", "aws", "pylon", "sse"] }

signet-constants = { version = "0.16.0-rc.17" }
signet-sim = { version = "0.16.0-rc.17" }
Expand Down
9 changes: 9 additions & 0 deletions src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ const TX_POLL_ERRORS_HELP: &str = "Transaction cache poll errors.";
const TXS_FETCHED: &str = "signet.builder.cache.txs_fetched";
const TXS_FETCHED_HELP: &str = "Transactions fetched per poll cycle.";

const SSE_RECONNECT_ATTEMPTS: &str = "signet.builder.cache.sse_reconnect_attempts";
const SSE_RECONNECT_ATTEMPTS_HELP: &str = "SSE transaction stream reconnect attempts.";

const BUNDLE_POLL_COUNT: &str = "signet.builder.cache.bundle_poll_count";
const BUNDLE_POLL_COUNT_HELP: &str = "Bundle cache poll attempts.";

Expand Down Expand Up @@ -148,6 +151,7 @@ static DESCRIPTIONS: LazyLock<()> = LazyLock::new(|| {
describe_counter!(TX_POLL_COUNT, TX_POLL_COUNT_HELP);
describe_counter!(TX_POLL_ERRORS, TX_POLL_ERRORS_HELP);
describe_histogram!(TXS_FETCHED, TXS_FETCHED_HELP);
describe_counter!(SSE_RECONNECT_ATTEMPTS, SSE_RECONNECT_ATTEMPTS_HELP);
describe_counter!(BUNDLE_POLL_COUNT, BUNDLE_POLL_COUNT_HELP);
describe_counter!(BUNDLE_POLL_ERRORS, BUNDLE_POLL_ERRORS_HELP);
describe_histogram!(BUNDLES_FETCHED, BUNDLES_FETCHED_HELP);
Expand Down Expand Up @@ -234,6 +238,11 @@ pub(crate) fn record_txs_fetched(count: usize) {
histogram!(TXS_FETCHED).record(count as f64);
}

/// Increment the SSE reconnect attempts counter.
pub(crate) fn inc_sse_reconnect_attempts() {
counter!(SSE_RECONNECT_ATTEMPTS).increment(1);
}

/// Increment the bundle poll attempt counter.
pub(crate) fn inc_bundle_poll_count() {
counter!(BUNDLE_POLL_COUNT).increment(1);
Expand Down
2 changes: 1 addition & 1 deletion src/tasks/cache/system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ impl CacheTasks {
/// [`CacheTask`], [`TxPoller`], and [`BundlePoller`] internally and yields their [`JoinHandle`]s.
pub fn spawn(&self) -> CacheSystem {
// Tx Poller pulls transactions from the cache
let tx_poller = TxPoller::new();
let tx_poller = TxPoller::new(self.block_env.clone());
let (tx_receiver, tx_poller) = tx_poller.spawn();

// Bundle Poller pulls bundles from the cache
Expand Down
227 changes: 161 additions & 66 deletions src/tasks/cache/tx.rs
Original file line number Diff line number Diff line change
@@ -1,59 +1,68 @@
//! Transaction service responsible for fetching and sending transactions to the simulator.
use crate::config::BuilderConfig;
use crate::{config::BuilderConfig, tasks::env::SimEnv};
use alloy::{
consensus::{Transaction, TxEnvelope, transaction::SignerRecoverable},
providers::Provider,
};
use futures_util::{TryFutureExt, TryStreamExt};
use backon::{BackoffBuilder, ExponentialBackoff, ExponentialBuilder};
use futures_util::{Stream, StreamExt, TryFutureExt, TryStreamExt};
use signet_tx_cache::{TxCache, TxCacheError};
use std::time::Duration;
use tokio::{sync::mpsc, task::JoinHandle, time};
use tracing::{Instrument, debug, debug_span, trace, trace_span};

/// Poll interval for the transaction poller in milliseconds.
const POLL_INTERVAL_MS: u64 = 1000;
use std::{ops::ControlFlow, pin::Pin, time::Duration};
use tokio::{
sync::{mpsc, watch},
task::JoinHandle,
time,
};
use tracing::{Instrument, debug, debug_span, trace, trace_span, warn};

type SseStream = Pin<Box<dyn Stream<Item = Result<TxEnvelope, TxCacheError>> + Send>>;

const INITIAL_RECONNECT_BACKOFF: Duration = Duration::from_millis(250);
const MAX_RECONNECT_BACKOFF: Duration = Duration::from_secs(30);

/// Builds the SSE reconnect backoff iterator.
///
/// Yields delays of 250ms, 500ms, 1s, 2s, 4s, 8s, 16s, 30s, 30s, … — doubling
/// each step, capped at [`MAX_RECONNECT_BACKOFF`] and unbounded in count.
/// Retries are intentionally unbounded: the per-block-env full refetch in
/// [`TxPoller::task_future`] is the floor on correctness, so SSE is a
/// best-effort latency optimization on top of it.
fn reconnect_backoff() -> ExponentialBackoff {
ExponentialBuilder::default()
.with_factor(2.0)
.with_min_delay(INITIAL_RECONNECT_BACKOFF)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so looking at this, the sequence would be 1,2,4,8,16,32,32,... and never stop, right? the behavior should be documented. Is 1 second a good starting place? or should it be shorter?

.with_max_delay(MAX_RECONNECT_BACKOFF)
.without_max_times()
.build()
}

/// Implements a poller for the block builder to pull transactions from the
/// transaction pool.
#[derive(Debug, Clone)]
/// Fetches transactions from the transaction pool on startup and on each
/// block environment change, and subscribes to an SSE stream for real-time
/// delivery of new transactions in between.
#[derive(Debug)]
pub struct TxPoller {
/// Config values from the Builder.
config: &'static BuilderConfig,
/// Client for the tx cache.
tx_cache: TxCache,
/// Defines the interval at which the service should poll the cache.
poll_interval_ms: u64,
/// Receiver for block environment updates, used to trigger refetches.
envs: watch::Receiver<Option<SimEnv>>,
/// SSE reconnect backoff. Reconnect attempts are intentionally unbounded:
/// this poller is a long-lived subscriber, so tx-cache outages should
/// degrade delivery until recovery rather than permanently disabling it.
reconnect_backoff: ExponentialBackoff,
}

impl Default for TxPoller {
fn default() -> Self {
Self::new()
}
}

/// [`TxPoller`] implements a poller task that fetches transactions from the transaction pool
/// and sends them into the provided channel sender.
impl TxPoller {
/// Returns a new [`TxPoller`] with the given config.
/// * Defaults to 1000ms poll interval (1s).
pub fn new() -> Self {
Self::new_with_poll_interval_ms(POLL_INTERVAL_MS)
}

/// Returns a new [`TxPoller`] with the given config and cache polling interval in milliseconds.
pub fn new_with_poll_interval_ms(poll_interval_ms: u64) -> Self {
/// Returns a new [`TxPoller`] with the given block environment receiver.
pub fn new(envs: watch::Receiver<Option<SimEnv>>) -> Self {
let config = crate::config();
let tx_cache = TxCache::new(config.tx_pool_url.clone());
Self { config, tx_cache, poll_interval_ms }
}

/// Returns the poll duration as a [`Duration`].
const fn poll_duration(&self) -> Duration {
Duration::from_millis(self.poll_interval_ms)
Self { config, tx_cache, envs, reconnect_backoff: reconnect_backoff() }
}

// Spawn a tokio task to check the nonce of a transaction before sending
// it to the cachetask via the outbound channel.
/// Spawn a tokio task to check the nonce of a transaction before sending
/// it to the cachetask via the outbound channel.
fn spawn_check_nonce(&self, tx: TxEnvelope, outbound: mpsc::UnboundedSender<ReceivedTx>) {
tokio::spawn(async move {
let span = debug_span!("check_nonce", tx_id = %tx.tx_hash());
Expand Down Expand Up @@ -95,46 +104,132 @@ impl TxPoller {
});
}

/// Polls the transaction cache for transactions, paginating through all available pages.
pub async fn check_tx_cache(&self) -> Result<Vec<TxEnvelope>, TxCacheError> {
/// Pulls every transaction currently in the cache, paginating until the
/// stream is exhausted. Pure fetch — no metrics, no dispatch.
async fn check_tx_cache(&self) -> Result<Vec<TxEnvelope>, TxCacheError> {
self.tx_cache.stream_transactions().try_collect().await
}

async fn task_future(self, outbound: mpsc::UnboundedSender<ReceivedTx>) {
loop {
let span = trace_span!("TxPoller::loop", url = %self.config.tx_pool_url);
/// Fetches all transactions from the cache and dispatches each one to
/// a nonce-check task. Records poll metrics around the fetch.
async fn fetch_and_dispatch(&self, outbound: &mpsc::UnboundedSender<ReceivedTx>) {
let span = trace_span!("TxPoller::fetch_and_dispatch", url = %self.config.tx_pool_url);

crate::metrics::inc_tx_poll_count();
let Ok(transactions) = self
.check_tx_cache()
.inspect_err(|error| {
crate::metrics::inc_tx_poll_errors();
debug!(%error, "Error fetching transactions");
})
.instrument(span.clone())
.await
else {
return;
};

let _guard = span.entered();
crate::metrics::record_txs_fetched(transactions.len());
trace!(count = transactions.len(), "found transactions");
for tx in transactions {
self.spawn_check_nonce(tx, outbound.clone());
}
}

// Check this here to avoid making the web request if we know
// we don't need the results.
if outbound.is_closed() {
span.in_scope(|| trace!("No receivers left, shutting down"));
break;
}
/// Opens an SSE subscription to the transaction feed. Returns an empty
/// stream on connection failure so the caller can handle reconnection
/// uniformly.
async fn subscribe(&self) -> SseStream {
self.tx_cache
.subscribe_transactions()
.await
.inspect(|_| debug!(url = %self.config.tx_pool_url, "SSE transaction subscription established"))
.inspect_err(|error| warn!(%error, "Failed to open SSE transaction subscription"))
.map(|s| Box::pin(s) as SseStream)
.unwrap_or_else(|_| Box::pin(futures_util::stream::empty()))
}

crate::metrics::inc_tx_poll_count();
if let Ok(transactions) = self
.check_tx_cache()
.inspect_err(|error| {
crate::metrics::inc_tx_poll_errors();
debug!(%error, "Error fetching transactions");
})
.instrument(span.clone())
.await
{
let _guard = span.entered();
crate::metrics::record_txs_fetched(transactions.len());
trace!(count = transactions.len(), "found transactions");
for tx in transactions.into_iter() {
self.spawn_check_nonce(tx, outbound.clone());
/// Reconnects the SSE stream with backoff. Performs a full refetch to
/// cover any items missed while disconnected.
async fn reconnect(&mut self, outbound: &mpsc::UnboundedSender<ReceivedTx>) -> SseStream {
crate::metrics::inc_sse_reconnect_attempts();
let delay = self.reconnect_backoff.next().expect("backoff is unbounded");
tokio::select! {
// Biased: a block env change wins over the backoff sleep. An env
// change triggers a full refetch below anyway, which supersedes the
// sleep-then-reconnect path — so there's no point waiting out the
// backoff.
biased;
_ = self.envs.changed() => {}
_ = time::sleep(delay) => {}
}
let (_, stream) = tokio::join!(self.fetch_and_dispatch(outbound), self.subscribe());
stream
}

/// Processes a single item yielded by the SSE stream: dispatches the tx
/// for nonce checking on success, or reconnects on error / stream end.
/// Returns `Break` when the outbound channel has closed and the task
/// should shut down.
async fn handle_sse_item(
&mut self,
item: Option<Result<TxEnvelope, TxCacheError>>,
outbound: &mpsc::UnboundedSender<ReceivedTx>,
stream: &mut SseStream,
) -> ControlFlow<()> {
match item {
Some(Ok(tx)) => {
if outbound.is_closed() {
trace!("No receivers left, shutting down");
return ControlFlow::Break(());
}
self.reconnect_backoff = reconnect_backoff();
self.spawn_check_nonce(tx, outbound.clone());
}
Some(Err(error)) => {
warn!(%error, "SSE transaction stream error, reconnecting");
*stream = self.reconnect(outbound).await;
}
None => {
warn!("SSE transaction stream ended, reconnecting");
*stream = self.reconnect(outbound).await;
}
}
ControlFlow::Continue(())
}

time::sleep(self.poll_duration()).await;
async fn task_future(mut self, outbound: mpsc::UnboundedSender<ReceivedTx>) {
// Initial full fetch of all currently-cached transactions, plus SSE
// subscription for real-time delivery.
let (_, mut sse_stream) =
tokio::join!(self.fetch_and_dispatch(&outbound), self.subscribe());

loop {
tokio::select! {
item = sse_stream.next() => {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think this arm can be collapsed a bit by adding stream combinators that transform the sse_stream into a future that only ever returns on ControlFlow::Break?

sketch:

let stream = sse_stream
   .then(|item| self.handle_sse_item(...))
   .skip_while(|flow| flow.is_continue)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

passing the &mut sse_stream as an arg break this tho

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but that's a bit of a code smell anyway, so it might be worth re-flowing to remove it by adding more information in the ControlFLow

if self
.handle_sse_item(item, &outbound, &mut sse_stream)
.await
.is_break()
{
break;
}
}
res = self.envs.changed() => {
if res.is_err() {
debug!("Block env channel closed, shutting down");
break;
}
debug!("Block env changed, refetching all transactions");
self.fetch_and_dispatch(&outbound).await;
}
}
}
}

/// Spawns a task that continuously polls the cache for transactions and sends any it finds to
/// its sender.
/// Spawns a task that fetches all current transactions, then subscribes
/// to the SSE feed for real-time updates, refetching on each new block
/// environment.
pub fn spawn(self) -> (mpsc::UnboundedReceiver<ReceivedTx>, JoinHandle<()>) {
let (outbound, inbound) = mpsc::unbounded_channel();
let jh = tokio::spawn(self.task_future(outbound));
Expand Down
13 changes: 5 additions & 8 deletions tests/tx_poller_test.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
#![cfg(feature = "test-utils")]

use alloy::{primitives::U256, signers::local::PrivateKeySigner};
use builder::{
tasks::cache::TxPoller,
test_utils::{new_signed_tx, setup_logging, setup_test_config},
};
use builder::test_utils::{new_signed_tx, setup_logging, setup_test_config};
use eyre::{Ok, Result};
use futures_util::TryStreamExt;
use signet_tx_cache::TxCache;

#[tokio::test]
async fn test_tx_roundtrip() -> Result<()> {
Expand All @@ -15,11 +14,9 @@ async fn test_tx_roundtrip() -> Result<()> {
// Post a transaction to the cache
post_tx().await?;

// Create a new poller
let poller = TxPoller::new();

// Fetch transactions from the pool
let transactions = poller.check_tx_cache().await?;
let tx_cache = TxCache::new(builder::config().tx_pool_url.clone());
let transactions: Vec<_> = tx_cache.stream_transactions().try_collect().await?;

// Ensure at least one transaction exists
assert!(!transactions.is_empty());
Expand Down