Skip to content

Commit 45dd698

Browse files
committed
complete journal integration into node
1 parent 71e300e commit 45dd698

19 files changed

Lines changed: 1988 additions & 65 deletions

File tree

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ signet-types = "0.19"
5757
signet-zenith = "0.19"
5858
signet-journal = "0.19"
5959
signet-journal-chain = "0.1"
60+
signet-journal-client = "0.1"
6061
signet-storage = "0.10"
6162
signet-cold = "0.10"
6263
signet-hot = "0.10"

crates/host-reth/src/notifier.rs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use futures_util::StreamExt;
99
use reth::{
1010
chainspec::EthChainSpec,
1111
primitives::{EthPrimitives, Receipt},
12-
providers::{BlockIdReader, BlockReader, HeaderProvider, ReceiptProvider},
12+
providers::{BlockIdReader, BlockNumReader, BlockReader, HeaderProvider, ReceiptProvider},
1313
};
1414
use reth_exex::{ExExContext, ExExEvent, ExExNotifications, ExExNotificationsStream};
1515
use reth_node_api::{FullNodeComponents, NodeTypes};
@@ -179,7 +179,9 @@ where
179179
}
180180
}
181181

182-
// Phase 2: live ExEx notifications.
182+
// Phase 2: live ExEx notifications. A journal-syncing node drains here in `WithoutHead`
183+
// mode (set_head not called yet); the handoff relies on reth tolerating a later
184+
// `set_with_head`, and on `DbBackfill` re-reading the never-acked (retained) blocks.
183185
let notification = self.notifications.next().await?;
184186
let notification = match notification {
185187
Ok(n) => n,
@@ -261,4 +263,14 @@ where
261263
self.events.send(ExExEvent::FinishedHeight(BlockNumHash { number: block_number, hash }))?;
262264
Ok(())
263265
}
266+
267+
async fn host_tip(&self) -> Result<u64, Self::Error> {
268+
Ok(self.provider.best_block_number()?)
269+
}
270+
271+
// A reth ExEx shares the host's notification pipeline; unconsumed notifications fill reth's
272+
// buffer and stall its pipeline, so a journal-syncing node must drain them.
273+
fn backpressures_host(&self) -> bool {
274+
true
275+
}
264276
}

crates/host-rpc/src/notifier.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -711,4 +711,8 @@ where
711711
// No-op: no ExEx to notify for an RPC follower.
712712
Ok(())
713713
}
714+
715+
async fn host_tip(&self) -> Result<u64, Self::Error> {
716+
Ok(self.provider.get_block_number().await?)
717+
}
714718
}

crates/node-config/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ alloy.workspace = true
2525
eyre.workspace = true
2626
reqwest.workspace = true
2727
serde.workspace = true
28+
thiserror.workspace = true
2829
tokio-util.workspace = true
2930
tracing.workspace = true
3031
signet-genesis.workspace = true

crates/node-config/src/journal.rs

Lines changed: 225 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,54 @@
1-
use core::num::NonZeroU64;
2-
use init4_bin_base::utils::from_env::FromEnv;
1+
use core::{num::NonZeroU64, str::FromStr, time::Duration};
2+
use init4_bin_base::utils::from_env::{FromEnv, FromEnvErr, FromEnvVar};
33
use signet_journal_chain::SAFETY_MARGIN;
44
use tracing::warn;
55

6+
/// How a node sources rollup state.
7+
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, serde::Deserialize)]
8+
#[serde(rename_all = "lowercase")]
9+
pub enum SyncStrategy {
10+
/// Execute host blocks to derive state (the current, default behaviour).
11+
#[default]
12+
Blocks,
13+
/// Apply pre-computed journals from upstream sources without executing blocks.
14+
Journals,
15+
}
16+
17+
impl FromStr for SyncStrategy {
18+
type Err = ParseSyncStrategyError;
19+
20+
fn from_str(input: &str) -> Result<Self, Self::Err> {
21+
match input.trim().to_ascii_lowercase().as_str() {
22+
"blocks" => Ok(Self::Blocks),
23+
"journals" => Ok(Self::Journals),
24+
other => Err(ParseSyncStrategyError(other.to_owned())),
25+
}
26+
}
27+
}
28+
29+
impl FromEnvVar for SyncStrategy {
30+
fn from_env_var(env_var: &str) -> Result<Self, FromEnvErr> {
31+
let raw = String::from_env_var(env_var)?;
32+
raw.parse().map_err(|error| FromEnvErr::parse_error(env_var, error))
33+
}
34+
}
35+
36+
/// Error parsing a [`SyncStrategy`] from a string.
37+
#[derive(Debug, Clone, thiserror::Error)]
38+
#[error("invalid journal sync strategy '{0}', expected 'blocks' or 'journals'")]
39+
pub struct ParseSyncStrategyError(String);
40+
41+
/// Error returned by [`JournalConfig::validate`].
42+
#[derive(Debug, Clone, Copy, thiserror::Error)]
43+
pub enum JournalConfigError {
44+
/// `sync_strategy` is [`SyncStrategy::Journals`] but no upstream sources were configured.
45+
#[error(
46+
"journal sync strategy is 'journals' but no upstream sources were configured \
47+
(set SIGNET_JOURNAL_SOURCES)"
48+
)]
49+
MissingSources,
50+
}
51+
652
/// Default maximum total byte size of the journal ring buffer (64 MiB).
753
pub const DEFAULT_RING_BUFFER_MAX_BYTES: u64 = 64 * 1024 * 1024;
854

@@ -24,9 +70,45 @@ pub const DEFAULT_MAX_SUBSCRIBER_LAG: u64 = 100;
2470
/// All fields are optional. When unset, [`JournalConfig`] returns the
2571
/// constants above via its accessors. Configurable via environment variables
2672
/// (`SIGNET_JOURNAL_*`) or via serde for file-based config.
27-
#[derive(Debug, Clone, Copy, Default, serde::Deserialize, FromEnv)]
73+
#[derive(Debug, Clone, Default, serde::Deserialize, FromEnv)]
2874
#[serde(rename_all = "camelCase", default)]
2975
pub struct JournalConfig {
76+
/// Sync strategy: execute host blocks (`blocks`, default) or apply journals
77+
/// from upstream sources (`journals`).
78+
#[from_env(
79+
var = "SIGNET_JOURNAL_SYNC_STRATEGY",
80+
desc = "Journal sync strategy: 'blocks' or 'journals' [default: blocks]",
81+
optional
82+
)]
83+
sync_strategy: Option<SyncStrategy>,
84+
85+
/// Prioritised upstream journal WebSocket source URLs (comma-separated).
86+
/// Required when `sync_strategy` is `journals`.
87+
#[from_env(
88+
var = "SIGNET_JOURNAL_SOURCES",
89+
desc = "Comma-separated upstream journal WebSocket URLs (required for journals strategy)",
90+
optional
91+
)]
92+
sources: Option<Vec<String>>,
93+
94+
/// Per-source stall timeout in milliseconds for the journal client. Falls
95+
/// back to the client default (60s) when unset.
96+
#[from_env(
97+
var = "SIGNET_JOURNAL_CLIENT_SOURCE_STALL_TIMEOUT_MS",
98+
desc = "Journal client per-source stall timeout in ms [default: 60000]",
99+
optional
100+
)]
101+
client_source_stall_timeout_ms: Option<u64>,
102+
103+
/// Faulty-source backoff in milliseconds for the journal client. Falls back
104+
/// to the client default (30s) when unset.
105+
#[from_env(
106+
var = "SIGNET_JOURNAL_CLIENT_SOURCE_BACKOFF_MS",
107+
desc = "Journal client faulty-source backoff in ms [default: 30000]",
108+
optional
109+
)]
110+
client_source_backoff_ms: Option<u64>,
111+
30112
/// Maximum total byte size of the journal ring buffer.
31113
#[from_env(
32114
var = "SIGNET_JOURNAL_RING_BUFFER_MAX_BYTES",
@@ -86,11 +168,37 @@ impl JournalConfig {
86168
NonZeroU64::new(value).expect("DEFAULT_MAX_SUBSCRIBER_LAG is non-zero")
87169
}
88170

171+
/// The configured sync strategy, defaulting to [`SyncStrategy::Blocks`].
172+
pub fn sync_strategy(&self) -> SyncStrategy {
173+
self.sync_strategy.unwrap_or_default()
174+
}
175+
176+
/// Upstream journal WebSocket source URLs (as raw strings). Empty when none
177+
/// are configured. Required when [`Self::sync_strategy`] is
178+
/// [`SyncStrategy::Journals`].
179+
pub fn sources(&self) -> &[String] {
180+
self.sources.as_deref().unwrap_or(&[])
181+
}
182+
183+
/// Per-source stall timeout for the journal client, when overridden. `None`
184+
/// lets the client's own default (60s) stand.
185+
pub fn client_source_stall_timeout(&self) -> Option<Duration> {
186+
self.client_source_stall_timeout_ms.map(Duration::from_millis)
187+
}
188+
189+
/// Faulty-source backoff for the journal client, when overridden. `None`
190+
/// lets the client's own default (30s) stand.
191+
pub fn client_source_backoff(&self) -> Option<Duration> {
192+
self.client_source_backoff_ms.map(Duration::from_millis)
193+
}
194+
89195
/// Emit a warning for any field that is explicitly set to a value the
90196
/// journal chain will silently normalize. Covers a zero
91197
/// `max_subscriber_lag` (which the chain rejects, so the default is
92198
/// substituted) and a `ring_buffer_max_count` below [`SAFETY_MARGIN`]
93-
/// (which the chain clamps up). Intended to be called once at startup.
199+
/// (which the chain clamps up). Also warns when journal-client-only or
200+
/// `journals`-strategy-only options are set but the strategy will ignore
201+
/// them. Intended to be called once at startup.
94202
pub fn warn_on_misconfiguration(&self) {
95203
if self.max_subscriber_lag == Some(0) {
96204
warn!(
@@ -109,5 +217,118 @@ impl JournalConfig {
109217
margin and will be clamped up"
110218
);
111219
}
220+
// The journal-sync inputs (sources and client tuning knobs) are only consulted under
221+
// the `journals` strategy. If they are set while the node will execute blocks, they are
222+
// dead config - surface that rather than silently ignoring them.
223+
if self.sync_strategy() != SyncStrategy::Journals {
224+
if !self.sources().is_empty() {
225+
warn!(
226+
"SIGNET_JOURNAL_SOURCES is set but the sync strategy is not 'journals'; \
227+
the configured sources will be ignored"
228+
);
229+
}
230+
if self.client_source_stall_timeout_ms.is_some()
231+
|| self.client_source_backoff_ms.is_some()
232+
{
233+
warn!(
234+
"journal client tuning knobs are set but the sync strategy is not \
235+
'journals'; they will be ignored"
236+
);
237+
}
238+
}
239+
}
240+
241+
/// Validate cross-field invariants. Intended to be called once at startup,
242+
/// after [`Self::warn_on_misconfiguration`].
243+
///
244+
/// # Errors
245+
///
246+
/// Returns [`JournalConfigError::MissingSources`] when the strategy is
247+
/// [`SyncStrategy::Journals`] but no upstream sources are configured.
248+
pub fn validate(&self) -> Result<(), JournalConfigError> {
249+
if self.sync_strategy() == SyncStrategy::Journals && self.sources().is_empty() {
250+
return Err(JournalConfigError::MissingSources);
251+
}
252+
Ok(())
253+
}
254+
255+
/// Construct a journal-sync configuration ([`SyncStrategy::Journals`]) pointing at the given
256+
/// upstream sources. The client stall timeout is deliberately generous so a working but
257+
/// idle source (e.g. a test that has served all its journals and is waiting to be torn
258+
/// down) is never mistaken for a dead one and exhausted mid-test. All other fields take
259+
/// their defaults. Use [`Self::journal_sync_for_test_fail_fast`] to exercise exhaustion.
260+
#[cfg(any(test, feature = "test_utils"))]
261+
pub fn journal_sync_for_test(sources: Vec<String>) -> Self {
262+
Self {
263+
sync_strategy: Some(SyncStrategy::Journals),
264+
sources: Some(sources),
265+
client_source_stall_timeout_ms: Some(30_000),
266+
client_source_backoff_ms: Some(100),
267+
..Default::default()
268+
}
269+
}
270+
271+
/// Like [`Self::journal_sync_for_test`] but with short client timeouts so a node pointed at
272+
/// dead sources exhausts them quickly. Only for tests that assert on source exhaustion;
273+
/// other tests should use [`Self::journal_sync_for_test`] to avoid a spurious exhaustion
274+
/// racing test teardown.
275+
#[cfg(any(test, feature = "test_utils"))]
276+
pub fn journal_sync_for_test_fail_fast(sources: Vec<String>) -> Self {
277+
Self {
278+
sync_strategy: Some(SyncStrategy::Journals),
279+
sources: Some(sources),
280+
client_source_stall_timeout_ms: Some(200),
281+
client_source_backoff_ms: Some(50),
282+
..Default::default()
283+
}
284+
}
285+
}
286+
287+
#[cfg(test)]
288+
mod tests {
289+
use super::*;
290+
291+
#[test]
292+
fn sync_strategy_parses_case_insensitively() {
293+
assert_eq!("blocks".parse::<SyncStrategy>().unwrap(), SyncStrategy::Blocks);
294+
assert_eq!("Journals".parse::<SyncStrategy>().unwrap(), SyncStrategy::Journals);
295+
assert_eq!(" JOURNALS ".parse::<SyncStrategy>().unwrap(), SyncStrategy::Journals);
296+
"neither".parse::<SyncStrategy>().unwrap_err();
297+
}
298+
299+
#[test]
300+
fn default_strategy_is_blocks() {
301+
assert_eq!(JournalConfig::default().sync_strategy(), SyncStrategy::Blocks);
302+
}
303+
304+
#[test]
305+
fn validate_requires_sources_for_journals() {
306+
let config =
307+
JournalConfig { sync_strategy: Some(SyncStrategy::Journals), ..Default::default() };
308+
config.validate().unwrap_err();
309+
310+
let config = JournalConfig {
311+
sync_strategy: Some(SyncStrategy::Journals),
312+
sources: Some(vec!["ws://host:9545".to_owned()]),
313+
..Default::default()
314+
};
315+
config.validate().unwrap();
316+
}
317+
318+
#[test]
319+
fn validate_allows_blocks_without_sources() {
320+
JournalConfig::default().validate().unwrap();
321+
}
322+
323+
#[test]
324+
fn client_timeouts_convert_from_millis() {
325+
let config = JournalConfig {
326+
client_source_stall_timeout_ms: Some(1500),
327+
client_source_backoff_ms: Some(250),
328+
..Default::default()
329+
};
330+
assert_eq!(config.client_source_stall_timeout(), Some(Duration::from_millis(1500)));
331+
assert_eq!(config.client_source_backoff(), Some(Duration::from_millis(250)));
332+
assert_eq!(JournalConfig::default().client_source_stall_timeout(), None);
112333
}
113334
}

crates/node-config/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ pub use core::SignetNodeConfig;
2828
mod journal;
2929
pub use journal::{
3030
DEFAULT_MAX_SUBSCRIBER_LAG, DEFAULT_RING_BUFFER_MAX_BYTES, DEFAULT_RING_BUFFER_MAX_COUNT,
31-
JournalConfig,
31+
JournalConfig, JournalConfigError, ParseSyncStrategyError, SyncStrategy,
3232
};
3333

3434
mod storage;

crates/node-config/src/test_utils.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,17 @@ use std::borrow::Cow;
77

88
/// Make a test config.
99
pub fn test_config() -> SignetNodeConfig {
10+
test_config_with_journal(JournalConfig::default())
11+
}
12+
13+
/// Make a test config with a caller-supplied [`JournalConfig`]. Used by journal-sync tests to
14+
/// point the node at an upstream WebSocket source.
15+
pub const fn test_config_with_journal(journal: JournalConfig) -> SignetNodeConfig {
1016
SignetNodeConfig::new(
1117
BlobFetcherConfig::new(Cow::Borrowed("")),
1218
StorageConfig::new(Cow::Borrowed("NOP"), Cow::Borrowed("NOP")),
1319
None,
14-
JournalConfig::default(),
20+
journal,
1521
GenesisSpec::Known(KnownChains::Test),
1622
SlotCalculator::new(0, 0, 12),
1723
)

crates/node-tests/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,10 @@ tracing.workspace = true
3737
tracing-subscriber.workspace = true
3838

3939
[dev-dependencies]
40+
bytes.workspace = true
4041
serde_json.workspace = true
4142
serial_test = "3.2.0"
43+
signet-journal-chain = { workspace = true, features = ["test-utils", "signet-extract"] }
4244
signet-journal.workspace = true
45+
tokio-util.workspace = true
46+
trevm.workspace = true

0 commit comments

Comments
 (0)