feat: replace TxPoller polling with SSE streaming#259
Conversation
This stack of pull requests is managed by Graphite. Learn more about stacking. |
| // full_fetch below serves the same purpose the env arm would have. | ||
| _ = self.envs.changed() => {} | ||
| } | ||
| *backoff = (*backoff * 2).min(Self::MAX_RECONNECT_BACKOFF); |
There was a problem hiding this comment.
i don't love putting the exponential backoff in-line instead of using an existing implementation, or having it be an unbounded number of attempts. at what point is a failure deemed permanent?
| counter!("signet.builder.cache.tx_poll_count").increment(1); | ||
| if let Ok(transactions) = self | ||
| .tx_cache | ||
| .stream_transactions() |
There was a problem hiding this comment.
sdk API thing. we now have "stream transactions" and "subscribe", which are not clear about their behavior
There was a problem hiding this comment.
Yeah I don't love this personally. How about fetch_ vs watch_?
| self.tx_cache.stream_transactions().try_collect().await | ||
| /// Fetches all transactions from the cache, forwarding each to nonce | ||
| /// checking before it reaches the cache task. | ||
| async fn full_fetch(&self, outbound: &mpsc::UnboundedSender<ReceivedTx>) { |
There was a problem hiding this comment.
architectural:
why was check_tx_cache deleted if its logic is repeated inline here?
This function also does more than fetch, it dispatches tasks. So its name should reflect that
There was a problem hiding this comment.
there wasn't a principled decision why here. the fn just got absorbed into the refactored big fetch function. Aside from what claude thinks, I think it's better to keep it separate just for the sake of keeping everything else maintainable.
Addresses the obvious nits from prestwich's review on src/tasks/cache/tx.rs: - Move backoff constants to module level (was assoc consts) - Use crate::metrics::inc_*/record_* helpers instead of bare counter!/histogram! macros (matches #263's metrics module) - Rewrite subscribe() as a combinator chain over the Result - Bias the select! in reconnect() so env changes preempt the backoff sleep, with an inline rationale - Run full_fetch and subscribe concurrently via tokio::join! in reconnect() - Extract handle_sse_item helper to flatten the nested match inside task_future's SSE select arm Deferred to a follow-up (need design decisions): - Split full_fetch into fetch + dispatch with better name - Replace inline backoff with backon + permanence criterion - Rename SDK stream_transactions/subscribe_transactions Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
bba3625 to
1809ad1
Compare
Address PR #259 nit #2: full_fetch was named like a pure data accessor but actually dispatched nonce-check tasks per tx. Restore check_tx_cache as a private pure-fetch helper returning Result<Vec<TxEnvelope>, _>, and rename full_fetch to fetch_and_dispatch — its name now matches what it does. The orchestrator uses let-else over the fetch result to drop a level of indentation. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Mirrors PR #259's 9bc0ff4 split applied to BundlePoller. Restore check_bundle_cache as a private pure-fetch helper returning Result<Vec<CachedBundle>, _>, and rename full_fetch to fetch_and_forward — its name now matches what it does (fetch + forward to the outbound channel). Use let-else over the fetch result to drop a level of indentation.
Fraser999
left a comment
There was a problem hiding this comment.
We could add a counter to metrics for the number of SSE stream reconnect attempts.
- Move struct doc to reflect SSE behavior (was still describing the old polling implementation); drop the redundant impl-block doc. - Run initial fetch + SSE subscribe concurrently in task_future via tokio::join!, mirroring the reconnect path. - Bump "Block env changed" log from trace to debug — env changes are infrequent and worth seeing in normal debug output. - Add a sse_reconnect_attempts counter; increment once per reconnect call. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Switch TxPoller from 1s timer-based polling to SSE streaming for real-time transaction delivery. The new lifecycle: 1. Full fetch of all transactions at startup 2. SSE stream for real-time new transaction delivery 3. Full refetch on each block environment change Adds exponential backoff (1s-30s) on SSE reconnection to prevent tight loops when the endpoint is unavailable. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Expand tokio import for nightly rustfmt, remove unresolved `CacheTask` rustdoc link. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Race the backoff sleep against envs.changed() so a block env change arriving during reconnect cuts the sleep short, instead of buffering up to 30s while the simulator operates on a stale cache. Also replace the nested let-else + unwrap_err in the SSE arm with a single match — no behavior change, drops the double-unwrap. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Addresses the obvious nits from prestwich's review on src/tasks/cache/tx.rs: - Move backoff constants to module level (was assoc consts) - Use crate::metrics::inc_*/record_* helpers instead of bare counter!/histogram! macros (matches #263's metrics module) - Rewrite subscribe() as a combinator chain over the Result - Bias the select! in reconnect() so env changes preempt the backoff sleep, with an inline rationale - Run full_fetch and subscribe concurrently via tokio::join! in reconnect() - Extract handle_sse_item helper to flatten the nested match inside task_future's SSE select arm Deferred to a follow-up (need design decisions): - Split full_fetch into fetch + dispatch with better name - Replace inline backoff with backon + permanence criterion - Rename SDK stream_transactions/subscribe_transactions Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Address PR #259 nit #2: full_fetch was named like a pure data accessor but actually dispatched nonce-check tasks per tx. Restore check_tx_cache as a private pure-fetch helper returning Result<Vec<TxEnvelope>, _>, and rename full_fetch to fetch_and_dispatch — its name now matches what it does. The orchestrator uses let-else over the fetch result to drop a level of indentation. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- Move struct doc to reflect SSE behavior (was still describing the old polling implementation); drop the redundant impl-block doc. - Run initial fetch + SSE subscribe concurrently in task_future via tokio::join!, mirroring the reconnect path. - Bump "Block env changed" log from trace to debug — env changes are infrequent and worth seeing in normal debug output. - Add a sse_reconnect_attempts counter; increment once per reconnect call. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
a54ea30 to
ddf5548
Compare
Move the SSE reconnect backoff from a `&mut Duration` plumbed through `reconnect` and `handle_sse_item` to a field on `TxPoller`. The state was already specific to the running task; carrying it on self drops two parameters and one local in `task_future`. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Fraser999
left a comment
There was a problem hiding this comment.
This LGTM now, with the proviso that I think this comment potentially isn't resolved.
# Conflicts: # Cargo.lock

Summary
Replaces the 1s timer-based polling loop in
TxPollerwith SSE streaming for real-time transaction delivery from the tx-pool. The new task lifecycle:/transactions/feed) pushes new transactions as they arrive — no more redundant refetchesOn SSE disconnect or error, the poller reconnects with exponential backoff (1s initial, doubling up to 30s cap) and does a full refetch to cover the gap. Backoff resets on each successfully received transaction.
Changes
Cargo.toml: enablessefeature oninit4-bin-base(transitively enablessignet-tx-cache/sse)src/tasks/cache/tx.rs: rewriteTxPoller— replace poll loop withfull_fetch()+subscribe()+select!over SSE items and block env changes. Addreconnect()with exponential backoff. Removepoll_interval_ms,poll_duration(),Defaultimpl.src/tasks/cache/system.rs: passblock_envwatch receiver toTxPoller::new()tests/tx_poller_test.rs: update integration test to useTxCachedirectly (no morecheck_tx_cache()method)BundlePolleris unchanged — the/bundles/feedserver endpoint is not yet available.Test plan
make clippypasses cleanmake test— all 8 unit tests pass, integration tests correctly ignored🤖 Generated with Claude Code