Skip to content

feat: replace TxPoller polling with SSE streaming#259

Open
Evalir wants to merge 8 commits intomainfrom
evalir/eop/sse-tx-poller
Open

feat: replace TxPoller polling with SSE streaming#259
Evalir wants to merge 8 commits intomainfrom
evalir/eop/sse-tx-poller

Conversation

@Evalir
Copy link
Copy Markdown
Member

@Evalir Evalir commented Apr 2, 2026

Summary

Replaces the 1s timer-based polling loop in TxPoller with SSE streaming for real-time transaction delivery from the tx-pool. The new task lifecycle:

  1. Startup: full paginated fetch of all transactions currently in the cache
  2. Steady state: SSE stream (/transactions/feed) pushes new transactions as they arrive — no more redundant refetches
  3. Block env change: full refetch to ensure consistency (covers any items the SSE stream may have missed)

On SSE disconnect or error, the poller reconnects with exponential backoff (1s initial, doubling up to 30s cap) and does a full refetch to cover the gap. Backoff resets on each successfully received transaction.

Changes

  • Cargo.toml: enable sse feature on init4-bin-base (transitively enables signet-tx-cache/sse)
  • src/tasks/cache/tx.rs: rewrite TxPoller — replace poll loop with full_fetch() + subscribe() + select! over SSE items and block env changes. Add reconnect() with exponential backoff. Remove poll_interval_ms, poll_duration(), Default impl.
  • src/tasks/cache/system.rs: pass block_env watch receiver to TxPoller::new()
  • tests/tx_poller_test.rs: update integration test to use TxCache directly (no more check_tx_cache() method)

BundlePoller is unchanged — the /bundles/feed server endpoint is not yet available.

Test plan

  • make clippy passes clean
  • make test — all 8 unit tests pass, integration tests correctly ignored
  • Manual test against tx-pool: verify SSE subscription log, real-time tx delivery, refetch on block env change, reconnect with backoff on disconnect

🤖 Generated with Claude Code

@Evalir Evalir marked this pull request as ready for review April 15, 2026 16:07
Copy link
Copy Markdown
Member Author

Evalir commented Apr 15, 2026

Comment thread src/tasks/cache/tx.rs Outdated
Comment thread src/tasks/cache/tx.rs Outdated
Comment thread src/tasks/cache/tx.rs Outdated
Comment thread src/tasks/cache/tx.rs Outdated
Comment thread src/tasks/cache/tx.rs Outdated
// full_fetch below serves the same purpose the env arm would have.
_ = self.envs.changed() => {}
}
*backoff = (*backoff * 2).min(Self::MAX_RECONNECT_BACKOFF);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i don't love putting the exponential backoff in-line instead of using an existing implementation, or having it be an unbounded number of attempts. at what point is a failure deemed permanent?

Comment thread src/tasks/cache/tx.rs Outdated
Comment thread src/tasks/cache/tx.rs Outdated
Comment thread src/tasks/cache/tx.rs Outdated
counter!("signet.builder.cache.tx_poll_count").increment(1);
if let Ok(transactions) = self
.tx_cache
.stream_transactions()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sdk API thing. we now have "stream transactions" and "subscribe", which are not clear about their behavior

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I don't love this personally. How about fetch_ vs watch_?

Comment thread src/tasks/cache/tx.rs Outdated
self.tx_cache.stream_transactions().try_collect().await
/// Fetches all transactions from the cache, forwarding each to nonce
/// checking before it reaches the cache task.
async fn full_fetch(&self, outbound: &mpsc::UnboundedSender<ReceivedTx>) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

architectural:
why was check_tx_cache deleted if its logic is repeated inline here?

This function also does more than fetch, it dispatches tasks. So its name should reflect that

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there wasn't a principled decision why here. the fn just got absorbed into the refactored big fetch function. Aside from what claude thinks, I think it's better to keep it separate just for the sake of keeping everything else maintainable.

Evalir added a commit that referenced this pull request Apr 23, 2026
Addresses the obvious nits from prestwich's review on
src/tasks/cache/tx.rs:

- Move backoff constants to module level (was assoc consts)
- Use crate::metrics::inc_*/record_* helpers instead of bare
  counter!/histogram! macros (matches #263's metrics module)
- Rewrite subscribe() as a combinator chain over the Result
- Bias the select! in reconnect() so env changes preempt the
  backoff sleep, with an inline rationale
- Run full_fetch and subscribe concurrently via tokio::join!
  in reconnect()
- Extract handle_sse_item helper to flatten the nested match
  inside task_future's SSE select arm

Deferred to a follow-up (need design decisions):
- Split full_fetch into fetch + dispatch with better name
- Replace inline backoff with backon + permanence criterion
- Rename SDK stream_transactions/subscribe_transactions

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@Evalir Evalir force-pushed the evalir/eop/sse-tx-poller branch from bba3625 to 1809ad1 Compare April 23, 2026 19:04
@Evalir Evalir requested a review from prestwich April 27, 2026 10:51
Evalir added a commit that referenced this pull request Apr 27, 2026
Address PR #259 nit #2: full_fetch was named like a pure data
accessor but actually dispatched nonce-check tasks per tx.

Restore check_tx_cache as a private pure-fetch helper returning
Result<Vec<TxEnvelope>, _>, and rename full_fetch to
fetch_and_dispatch — its name now matches what it does. The
orchestrator uses let-else over the fetch result to drop a level
of indentation.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Evalir added a commit that referenced this pull request Apr 27, 2026
Mirrors PR #259's 9bc0ff4 split applied to BundlePoller. Restore
check_bundle_cache as a private pure-fetch helper returning
Result<Vec<CachedBundle>, _>, and rename full_fetch to
fetch_and_forward — its name now matches what it does (fetch +
forward to the outbound channel). Use let-else over the fetch
result to drop a level of indentation.
Copy link
Copy Markdown
Contributor

@Fraser999 Fraser999 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could add a counter to metrics for the number of SSE stream reconnect attempts.

Comment thread src/tasks/cache/tx.rs Outdated
Comment thread src/tasks/cache/tx.rs Outdated
Comment thread src/tasks/cache/tx.rs Outdated
Evalir added a commit that referenced this pull request Apr 28, 2026
- Move struct doc to reflect SSE behavior (was still describing
  the old polling implementation); drop the redundant impl-block
  doc.
- Run initial fetch + SSE subscribe concurrently in task_future
  via tokio::join!, mirroring the reconnect path.
- Bump "Block env changed" log from trace to debug — env changes
  are infrequent and worth seeing in normal debug output.
- Add a sse_reconnect_attempts counter; increment once per
  reconnect call.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@Evalir Evalir requested a review from Fraser999 April 29, 2026 08:23
Evalir and others added 6 commits April 29, 2026 10:25
Switch TxPoller from 1s timer-based polling to SSE streaming for
real-time transaction delivery. The new lifecycle:
1. Full fetch of all transactions at startup
2. SSE stream for real-time new transaction delivery
3. Full refetch on each block environment change

Adds exponential backoff (1s-30s) on SSE reconnection to prevent
tight loops when the endpoint is unavailable.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Expand tokio import for nightly rustfmt, remove unresolved
`CacheTask` rustdoc link.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Race the backoff sleep against envs.changed() so a block env change
arriving during reconnect cuts the sleep short, instead of buffering
up to 30s while the simulator operates on a stale cache.

Also replace the nested let-else + unwrap_err in the SSE arm with a
single match — no behavior change, drops the double-unwrap.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Addresses the obvious nits from prestwich's review on
src/tasks/cache/tx.rs:

- Move backoff constants to module level (was assoc consts)
- Use crate::metrics::inc_*/record_* helpers instead of bare
  counter!/histogram! macros (matches #263's metrics module)
- Rewrite subscribe() as a combinator chain over the Result
- Bias the select! in reconnect() so env changes preempt the
  backoff sleep, with an inline rationale
- Run full_fetch and subscribe concurrently via tokio::join!
  in reconnect()
- Extract handle_sse_item helper to flatten the nested match
  inside task_future's SSE select arm

Deferred to a follow-up (need design decisions):
- Split full_fetch into fetch + dispatch with better name
- Replace inline backoff with backon + permanence criterion
- Rename SDK stream_transactions/subscribe_transactions

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Address PR #259 nit #2: full_fetch was named like a pure data
accessor but actually dispatched nonce-check tasks per tx.

Restore check_tx_cache as a private pure-fetch helper returning
Result<Vec<TxEnvelope>, _>, and rename full_fetch to
fetch_and_dispatch — its name now matches what it does. The
orchestrator uses let-else over the fetch result to drop a level
of indentation.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- Move struct doc to reflect SSE behavior (was still describing
  the old polling implementation); drop the redundant impl-block
  doc.
- Run initial fetch + SSE subscribe concurrently in task_future
  via tokio::join!, mirroring the reconnect path.
- Bump "Block env changed" log from trace to debug — env changes
  are infrequent and worth seeing in normal debug output.
- Add a sse_reconnect_attempts counter; increment once per
  reconnect call.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@Evalir Evalir force-pushed the evalir/eop/sse-tx-poller branch from a54ea30 to ddf5548 Compare April 29, 2026 08:29
Move the SSE reconnect backoff from a `&mut Duration` plumbed
through `reconnect` and `handle_sse_item` to a field on `TxPoller`.
The state was already specific to the running task; carrying it
on self drops two parameters and one local in `task_future`.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Copy link
Copy Markdown
Contributor

@Fraser999 Fraser999 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This LGTM now, with the proviso that I think this comment potentially isn't resolved.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants