From 4e4d3ef368986efdc63572ba2f03474d8dd95511 Mon Sep 17 00:00:00 2001 From: Fadhil Abubaker Date: Tue, 10 Mar 2026 15:22:45 -0400 Subject: [PATCH 01/25] Add redo and is_shutdown to log_checkpoint --- db4-storage/src/pages/mod.rs | 2 +- db4-storage/src/wal/entry.rs | 2 +- db4-storage/src/wal/mod.rs | 7 ++++--- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/db4-storage/src/pages/mod.rs b/db4-storage/src/pages/mod.rs index 176ab5c3bd..2053929def 100644 --- a/db4-storage/src/pages/mod.rs +++ b/db4-storage/src/pages/mod.rs @@ -579,7 +579,7 @@ impl< // Log a checkpoint record so we can restore the next LSN after reload. let checkpoint_lsn = wal - .log_checkpoint(latest_lsn_on_disk) + .log_checkpoint(latest_lsn_on_disk, true) .expect("Failed to log checkpoint in drop"); wal.flush(checkpoint_lsn) diff --git a/db4-storage/src/wal/entry.rs b/db4-storage/src/wal/entry.rs index dcdec55d04..56685e4364 100644 --- a/db4-storage/src/wal/entry.rs +++ b/db4-storage/src/wal/entry.rs @@ -101,7 +101,7 @@ impl GraphWalOps for NoWal { Ok(0) } - fn log_checkpoint(&self, _lsn: LSN) -> Result { + fn log_checkpoint(&self, _redo: LSN, _is_shutdown: bool) -> Result { Ok(0) } diff --git a/db4-storage/src/wal/mod.rs b/db4-storage/src/wal/mod.rs index b6d8101adc..a7c2f936b5 100644 --- a/db4-storage/src/wal/mod.rs +++ b/db4-storage/src/wal/mod.rs @@ -156,9 +156,10 @@ pub trait GraphWalOps { props: Vec<(&str, usize, Prop)>, ) -> Result; - /// Logs a checkpoint record, indicating that all Wal operations upto and including - /// `lsn` has been persisted to disk. - fn log_checkpoint(&self, lsn: LSN) -> Result; + /// Logs a checkpoint indicating that all LSN < `redo` are persisted. + /// On recovery, replay will start from `redo` in the WAL stream. + /// Set `is_shutdown` to true on a clean shutdown to differentiate from periodic checkpoints. + fn log_checkpoint(&self, redo: LSN, is_shutdown: bool) -> Result; /// Returns an iterator over the entries in the wal. fn replay_iter(&self) -> impl Iterator>; From d89cb57a296d3d9bc76bd8d2504ff79ef8bec5db Mon Sep 17 00:00:00 2001 From: Fadhil Abubaker Date: Wed, 11 Mar 2026 09:14:43 -0400 Subject: [PATCH 02/25] Add ControlFileOps and NoControlFile --- db4-storage/src/persist/config.rs | 6 +-- db4-storage/src/persist/control_file.rs | 57 +++++++++++++++++++++++++ db4-storage/src/persist/mod.rs | 1 + 3 files changed, 61 insertions(+), 3 deletions(-) create mode 100644 db4-storage/src/persist/control_file.rs diff --git a/db4-storage/src/persist/config.rs b/db4-storage/src/persist/config.rs index 94eef349df..570ccccb7a 100644 --- a/db4-storage/src/persist/config.rs +++ b/db4-storage/src/persist/config.rs @@ -9,7 +9,7 @@ use tracing::error; pub const DEFAULT_MAX_PAGE_LEN_NODES: u32 = 131_072; // 2^17 pub const DEFAULT_MAX_PAGE_LEN_EDGES: u32 = 1_048_576; // 2^20 -pub const CONFIG_FILE: &str = "config.json"; +pub const CONFIG_FILE_NAME: &str = "config.json"; pub trait ConfigOps: Serialize + DeserializeOwned + Args + Sized { fn max_node_page_len(&self) -> u32; @@ -25,14 +25,14 @@ pub trait ConfigOps: Serialize + DeserializeOwned + Args + Sized { fn with_node_types(&self, node_types: impl IntoIterator>) -> Self; fn load_from_dir(dir: &Path) -> Result { - let config_file = dir.join(CONFIG_FILE); + let config_file = dir.join(CONFIG_FILE_NAME); let config_file = std::fs::File::open(config_file)?; let config = serde_json::from_reader(config_file)?; Ok(config) } fn save_to_dir(&self, dir: &Path) -> Result<(), StorageError> { - let config_file = dir.join(CONFIG_FILE); + let config_file = dir.join(CONFIG_FILE_NAME); let config_file = std::fs::File::create(&config_file)?; serde_json::to_writer_pretty(config_file, self)?; Ok(()) diff --git a/db4-storage/src/persist/control_file.rs b/db4-storage/src/persist/control_file.rs new file mode 100644 index 0000000000..e7a455cb1e --- /dev/null +++ b/db4-storage/src/persist/control_file.rs @@ -0,0 +1,57 @@ +use std::path::Path; +use crate::{error::StorageError, wal::LSN}; + +pub const CONTROL_FILE_NAME: &str = "control.json"; + +pub enum DBState { + Running, + Shutdown, +} + +pub trait ControlFileOps: Sized { + fn load_from_dir(dir: &Path) -> Result; + + fn save_to_dir(&self, dir: &Path) -> Result<(), StorageError>; + + fn db_state(&self) -> &DBState; + + fn last_checkpoint(&self) -> LSN; + + fn set_db_state(&self, state: DBState) -> Result<(), StorageError>; + + fn set_last_checkpoint(&self, lsn: LSN) -> Result<(), StorageError>; +} + +pub struct NoControlFile; + +impl NoControlFile { + pub fn new() -> Self { + NoControlFile + } +} + +impl ControlFileOps for NoControlFile { + fn load_from_dir(_dir: &Path) -> Result { + Ok(NoControlFile) + } + + fn save_to_dir(&self, _dir: &Path) -> Result<(), StorageError> { + Ok(()) + } + + fn db_state(&self) -> &DBState { + &DBState::Running + } + + fn last_checkpoint(&self) -> LSN { + 0 + } + + fn set_db_state(&self, state: DBState) -> Result<(), StorageError> { + Ok(()) + } + + fn set_last_checkpoint(&self, lsn: LSN) -> Result<(), StorageError> { + Ok(()) + } +} diff --git a/db4-storage/src/persist/mod.rs b/db4-storage/src/persist/mod.rs index 43275c62a7..7609d5b63e 100644 --- a/db4-storage/src/persist/mod.rs +++ b/db4-storage/src/persist/mod.rs @@ -1,2 +1,3 @@ pub mod config; +pub mod control_file; pub mod strategy; From 0b1ecbb9bfbb7abef38c99c9addbcb57a27907fb Mon Sep 17 00:00:00 2001 From: Fadhil Abubaker Date: Wed, 11 Mar 2026 10:16:34 -0400 Subject: [PATCH 03/25] Remove new and load from WalOps --- db4-storage/src/persist/control_file.rs | 7 +------ db4-storage/src/wal/mod.rs | 12 ------------ db4-storage/src/wal/no_wal.rs | 12 ------------ 3 files changed, 1 insertion(+), 30 deletions(-) diff --git a/db4-storage/src/persist/control_file.rs b/db4-storage/src/persist/control_file.rs index e7a455cb1e..0eef888ed5 100644 --- a/db4-storage/src/persist/control_file.rs +++ b/db4-storage/src/persist/control_file.rs @@ -6,6 +6,7 @@ pub const CONTROL_FILE_NAME: &str = "control.json"; pub enum DBState { Running, Shutdown, + CrashRecovery, } pub trait ControlFileOps: Sized { @@ -24,12 +25,6 @@ pub trait ControlFileOps: Sized { pub struct NoControlFile; -impl NoControlFile { - pub fn new() -> Self { - NoControlFile - } -} - impl ControlFileOps for NoControlFile { fn load_from_dir(_dir: &Path) -> Result { Ok(NoControlFile) diff --git a/db4-storage/src/wal/mod.rs b/db4-storage/src/wal/mod.rs index a7c2f936b5..1ba2d29928 100644 --- a/db4-storage/src/wal/mod.rs +++ b/db4-storage/src/wal/mod.rs @@ -4,7 +4,6 @@ use raphtory_core::{ entities::{EID, GID, VID}, storage::timeindex::EventTime, }; -use std::path::Path; pub mod entry; pub mod no_wal; @@ -14,17 +13,6 @@ pub type TransactionID = u64; /// Core Wal methods. pub trait WalOps { - type Config; - - fn new(dir: Option<&Path>, config: Self::Config) -> Result - where - Self: Sized; - - /// Loads an existing WAL file from the given directory in append mode. - fn load(dir: Option<&Path>, config: Self::Config) -> Result - where - Self: Sized; - /// Appends data to the WAL and returns the assigned LSN. fn append(&self, data: &[u8]) -> Result; diff --git a/db4-storage/src/wal/no_wal.rs b/db4-storage/src/wal/no_wal.rs index 8d5b865bde..5b9e40104f 100644 --- a/db4-storage/src/wal/no_wal.rs +++ b/db4-storage/src/wal/no_wal.rs @@ -1,5 +1,3 @@ -use std::path::Path; - use crate::{ error::StorageError, wal::{LSN, ReplayRecord, WalOps}, @@ -11,16 +9,6 @@ use crate::{ pub struct NoWal; impl WalOps for NoWal { - type Config = (); - - fn new(_dir: Option<&Path>, _config: ()) -> Result { - Ok(Self) - } - - fn load(_dir: Option<&Path>, _config: ()) -> Result { - Ok(Self) - } - fn append(&self, _data: &[u8]) -> Result { Ok(0) } From 257ee279d3bc6f10ca44464a8319d4c5116f4288 Mon Sep 17 00:00:00 2001 From: Fadhil Abubaker Date: Wed, 11 Mar 2026 11:01:09 -0400 Subject: [PATCH 04/25] Attach control_file to strategy structs --- db4-storage/src/persist/config.rs | 3 ++- db4-storage/src/persist/control_file.rs | 4 ++-- db4-storage/src/persist/strategy.rs | 9 +++++---- 3 files changed, 9 insertions(+), 7 deletions(-) diff --git a/db4-storage/src/persist/config.rs b/db4-storage/src/persist/config.rs index 570ccccb7a..80435eaa16 100644 --- a/db4-storage/src/persist/config.rs +++ b/db4-storage/src/persist/config.rs @@ -9,7 +9,8 @@ use tracing::error; pub const DEFAULT_MAX_PAGE_LEN_NODES: u32 = 131_072; // 2^17 pub const DEFAULT_MAX_PAGE_LEN_EDGES: u32 = 1_048_576; // 2^20 -pub const CONFIG_FILE_NAME: &str = "config.json"; + +const CONFIG_FILE_NAME: &str = "config.json"; pub trait ConfigOps: Serialize + DeserializeOwned + Args + Sized { fn max_node_page_len(&self) -> u32; diff --git a/db4-storage/src/persist/control_file.rs b/db4-storage/src/persist/control_file.rs index 0eef888ed5..817a557486 100644 --- a/db4-storage/src/persist/control_file.rs +++ b/db4-storage/src/persist/control_file.rs @@ -1,8 +1,7 @@ use std::path::Path; use crate::{error::StorageError, wal::LSN}; -pub const CONTROL_FILE_NAME: &str = "control.json"; - +#[derive(Debug)] pub enum DBState { Running, Shutdown, @@ -23,6 +22,7 @@ pub trait ControlFileOps: Sized { fn set_last_checkpoint(&self, lsn: LSN) -> Result<(), StorageError>; } +#[derive(Debug, Clone)] pub struct NoControlFile; impl ControlFileOps for NoControlFile { diff --git a/db4-storage/src/persist/strategy.rs b/db4-storage/src/persist/strategy.rs index 9abed5c3ab..dadd1f269e 100644 --- a/db4-storage/src/persist/strategy.rs +++ b/db4-storage/src/persist/strategy.rs @@ -1,13 +1,13 @@ use crate::{ api::{edges::EdgeSegmentOps, graph_props::GraphPropSegmentOps, nodes::NodeSegmentOps}, error::StorageError, - persist::config::{BaseConfig, ConfigOps}, + persist::{config::{BaseConfig, ConfigOps}, control_file::NoControlFile}, segments::{ edge::segment::{EdgeSegmentView, MemEdgeSegment}, - graph_prop::{GraphPropSegmentView, segment::MemGraphPropSegment}, + graph_prop::{segment::MemGraphPropSegment, GraphPropSegmentView}, node::segment::{MemNodeSegment, NodeSegmentView}, }, - wal::{GraphWalOps, WalOps, no_wal::NoWal}, + wal::{no_wal::NoWal, GraphWalOps, WalOps}, }; use std::{fmt::Debug, ops::DerefMut, path::Path}; @@ -63,6 +63,7 @@ pub trait PersistenceStrategy: Debug + Clone + Send + Sync + 'static { pub struct NoOpStrategy { config: BaseConfig, wal: NoWal, + control_file: NoControlFile, } impl PersistenceStrategy for NoOpStrategy { @@ -73,7 +74,7 @@ impl PersistenceStrategy for NoOpStrategy { type Config = BaseConfig; fn new(config: BaseConfig, _graph_dir: Option<&Path>) -> Result { - Ok(Self { config, wal: NoWal }) + Ok(Self { config, wal: NoWal, control_file: NoControlFile }) } fn load(_graph_dir: &Path) -> Result { From 97d77b8d735c19da982331e0bfc3fb4de510b259 Mon Sep 17 00:00:00 2001 From: Fadhil Abubaker Date: Wed, 11 Mar 2026 14:02:25 -0400 Subject: [PATCH 05/25] Expose control_file in PersistenceStrategy --- db4-storage/src/persist/control_file.rs | 4 +++- db4-storage/src/persist/strategy.rs | 12 ++++++++++-- 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/db4-storage/src/persist/control_file.rs b/db4-storage/src/persist/control_file.rs index 817a557486..8d17ab39cb 100644 --- a/db4-storage/src/persist/control_file.rs +++ b/db4-storage/src/persist/control_file.rs @@ -35,7 +35,9 @@ impl ControlFileOps for NoControlFile { } fn db_state(&self) -> &DBState { - &DBState::Running + // Without a control file there is no persistence, hence this always reports a clean + // shutdown state so that no recovery is attempted. + &DBState::Shutdown } fn last_checkpoint(&self) -> LSN { diff --git a/db4-storage/src/persist/strategy.rs b/db4-storage/src/persist/strategy.rs index dadd1f269e..461544103f 100644 --- a/db4-storage/src/persist/strategy.rs +++ b/db4-storage/src/persist/strategy.rs @@ -1,7 +1,7 @@ use crate::{ api::{edges::EdgeSegmentOps, graph_props::GraphPropSegmentOps, nodes::NodeSegmentOps}, error::StorageError, - persist::{config::{BaseConfig, ConfigOps}, control_file::NoControlFile}, + persist::{config::{BaseConfig, ConfigOps}, control_file::{ControlFileOps, NoControlFile}}, segments::{ edge::segment::{EdgeSegmentView, MemEdgeSegment}, graph_prop::{segment::MemGraphPropSegment, GraphPropSegmentView}, @@ -17,6 +17,7 @@ pub trait PersistenceStrategy: Debug + Clone + Send + Sync + 'static { type GS: GraphPropSegmentOps; type Wal: WalOps + GraphWalOps; type Config: ConfigOps; + type ControlFile: ControlFileOps; fn new(config: Self::Config, graph_dir: Option<&Path>) -> Result; @@ -34,6 +35,8 @@ pub trait PersistenceStrategy: Debug + Clone + Send + Sync + 'static { fn wal(&self) -> &Self::Wal; + fn control_file(&self) -> &Self::ControlFile; + fn persist_node_segment>( &self, node_segment: &Self::NS, @@ -67,11 +70,12 @@ pub struct NoOpStrategy { } impl PersistenceStrategy for NoOpStrategy { - type ES = EdgeSegmentView; type NS = NodeSegmentView; + type ES = EdgeSegmentView; type GS = GraphPropSegmentView; type Wal = NoWal; type Config = BaseConfig; + type ControlFile = NoControlFile; fn new(config: BaseConfig, _graph_dir: Option<&Path>) -> Result { Ok(Self { config, wal: NoWal, control_file: NoControlFile }) @@ -93,6 +97,10 @@ impl PersistenceStrategy for NoOpStrategy { &self.wal } + fn control_file(&self) -> &Self::ControlFile { + &self.control_file + } + fn persist_node_segment>( &self, _node_page: &Self::NS, From d84b5e2fe68af1e01859c979226534fd4894d424 Mon Sep 17 00:00:00 2001 From: Fadhil Abubaker Date: Wed, 11 Mar 2026 14:17:16 -0400 Subject: [PATCH 06/25] Expose control_file in DurabilityOps --- db4-storage/src/lib.rs | 1 + raphtory-storage/src/mutation/addition_ops_ext.rs | 6 +++++- raphtory-storage/src/mutation/durability_ops.rs | 10 ++++++++-- raphtory/src/db/api/storage/storage.rs | 1 + 4 files changed, 15 insertions(+), 3 deletions(-) diff --git a/db4-storage/src/lib.rs b/db4-storage/src/lib.rs index c56a9c2921..98f5a1d678 100644 --- a/db4-storage/src/lib.rs +++ b/db4-storage/src/lib.rs @@ -54,6 +54,7 @@ pub type GS

= GraphPropSegmentView

; pub type Layer

= GraphStore, ES

, GS

, P>; pub type Wal = ::Wal; +pub type ControlFile = ::ControlFile; pub type Config = ::Config; pub type GIDResolver = MappingResolver; diff --git a/raphtory-storage/src/mutation/addition_ops_ext.rs b/raphtory-storage/src/mutation/addition_ops_ext.rs index e0e5edbae3..e98275a1e0 100644 --- a/raphtory-storage/src/mutation/addition_ops_ext.rs +++ b/raphtory-storage/src/mutation/addition_ops_ext.rs @@ -32,7 +32,7 @@ use storage::{ resolver::{GIDResolverOps, Initialiser, MaybeInit}, transaction::TransactionManager, wal::LSN, - Extension, LocalPOS, Wal, ES, GS, NS, + Extension, LocalPOS, Wal, ES, GS, NS, ControlFile, }; pub struct AtomicAddEdge<'a, EXT> @@ -704,4 +704,8 @@ impl DurabilityOps for TemporalGraph { fn wal(&self) -> Result<&Wal, MutationError> { Ok(&self.extension().wal()) } + + fn control_file(&self) -> Result<&ControlFile, MutationError> { + Ok(&self.extension().control_file()) + } } diff --git a/raphtory-storage/src/mutation/durability_ops.rs b/raphtory-storage/src/mutation/durability_ops.rs index 0a2205b5a8..663aa63164 100644 --- a/raphtory-storage/src/mutation/durability_ops.rs +++ b/raphtory-storage/src/mutation/durability_ops.rs @@ -1,11 +1,13 @@ use crate::{graph::graph::GraphStorage, mutation::MutationError}; -use storage::{transaction::TransactionManager, Wal}; +use storage::{transaction::TransactionManager, ControlFile, Wal}; -/// Accessor methods for transactions and write-ahead logging. +/// Accessor methods for supporting durability. pub trait DurabilityOps { fn transaction_manager(&self) -> Result<&TransactionManager, MutationError>; fn wal(&self) -> Result<&Wal, MutationError>; + + fn control_file(&self) -> Result<&ControlFile, MutationError>; } impl DurabilityOps for GraphStorage { @@ -16,4 +18,8 @@ impl DurabilityOps for GraphStorage { fn wal(&self) -> Result<&Wal, MutationError> { self.mutable()?.wal() } + + fn control_file(&self) -> Result<&ControlFile, MutationError> { + self.mutable()?.control_file() + } } diff --git a/raphtory/src/db/api/storage/storage.rs b/raphtory/src/db/api/storage/storage.rs index e0c36299a2..dd3a62210f 100644 --- a/raphtory/src/db/api/storage/storage.rs +++ b/raphtory/src/db/api/storage/storage.rs @@ -141,6 +141,7 @@ impl Storage { fn load_with_extension(path: &Path, ext: Extension) -> Result { let temporal_graph = TemporalGraph::load(path, ext)?; let wal = temporal_graph.wal()?; + let control_file = temporal_graph.control_file()?; // Replay any pending writes from the WAL. if wal.has_entries()? { From 50fc86cf141c7ab3db7e30e7ad372b150598ccac Mon Sep 17 00:00:00 2001 From: Fadhil Abubaker Date: Wed, 11 Mar 2026 17:14:12 -0400 Subject: [PATCH 07/25] Move DurabilityOps out of mutation --- raphtory-storage/src/{mutation => }/durability_ops.rs | 0 raphtory-storage/src/lib.rs | 1 + raphtory-storage/src/mutation/addition_ops_ext.rs | 8 +++++--- raphtory-storage/src/mutation/mod.rs | 1 - raphtory/src/db/api/mutation/addition_ops.rs | 2 +- raphtory/src/db/api/mutation/deletion_ops.rs | 2 +- raphtory/src/db/api/mutation/property_addition_ops.rs | 3 ++- raphtory/src/db/api/storage/storage.rs | 3 ++- raphtory/src/db/graph/edge.rs | 2 +- raphtory/src/db/graph/node.rs | 2 +- 10 files changed, 14 insertions(+), 10 deletions(-) rename raphtory-storage/src/{mutation => }/durability_ops.rs (100%) diff --git a/raphtory-storage/src/mutation/durability_ops.rs b/raphtory-storage/src/durability_ops.rs similarity index 100% rename from raphtory-storage/src/mutation/durability_ops.rs rename to raphtory-storage/src/durability_ops.rs diff --git a/raphtory-storage/src/lib.rs b/raphtory-storage/src/lib.rs index 8fba0f0625..1822f83745 100644 --- a/raphtory-storage/src/lib.rs +++ b/raphtory-storage/src/lib.rs @@ -1,4 +1,5 @@ pub mod core_ops; +pub mod durability_ops; pub mod graph; pub mod layer_ops; pub mod mutation; diff --git a/raphtory-storage/src/mutation/addition_ops_ext.rs b/raphtory-storage/src/mutation/addition_ops_ext.rs index e98275a1e0..b4d70b3fb5 100644 --- a/raphtory-storage/src/mutation/addition_ops_ext.rs +++ b/raphtory-storage/src/mutation/addition_ops_ext.rs @@ -1,7 +1,9 @@ -use crate::mutation::{ - addition_ops::{EdgeWriteLock, InternalAdditionOps, NodeWriteLock, SessionAdditionOps}, +use crate::{ durability_ops::DurabilityOps, - MutationError, NodeWriterT, + mutation::{ + addition_ops::{EdgeWriteLock, InternalAdditionOps, NodeWriteLock, SessionAdditionOps}, + MutationError, NodeWriterT, + }, }; use db4_graph::{TemporalGraph, WriteLockedGraph}; use raphtory_api::core::{ diff --git a/raphtory-storage/src/mutation/mod.rs b/raphtory-storage/src/mutation/mod.rs index 2a8136ae0c..679dee7daa 100644 --- a/raphtory-storage/src/mutation/mod.rs +++ b/raphtory-storage/src/mutation/mod.rs @@ -34,7 +34,6 @@ use thiserror::Error; pub mod addition_ops; pub mod addition_ops_ext; pub mod deletion_ops; -pub mod durability_ops; pub mod property_addition_ops; pub type NodeWriterT<'a> = NodeWriter<'a, RwLockWriteGuard<'a, MemNodeSegment>, NS>; diff --git a/raphtory/src/db/api/mutation/addition_ops.rs b/raphtory/src/db/api/mutation/addition_ops.rs index 4f3718dae6..b80b3fd986 100644 --- a/raphtory/src/db/api/mutation/addition_ops.rs +++ b/raphtory/src/db/api/mutation/addition_ops.rs @@ -18,9 +18,9 @@ use raphtory_api::core::{ }; use raphtory_storage::mutation::{ addition_ops::{EdgeWriteLock, InternalAdditionOps, NodeWriteLock}, - durability_ops::DurabilityOps, MutationError, }; +use raphtory_storage::durability_ops::DurabilityOps; use storage::wal::{GraphWalOps, WalOps}; pub trait AdditionOps: StaticGraphViewOps + InternalAdditionOps> { diff --git a/raphtory/src/db/api/mutation/deletion_ops.rs b/raphtory/src/db/api/mutation/deletion_ops.rs index 31edfdcaaf..48e3a75fce 100644 --- a/raphtory/src/db/api/mutation/deletion_ops.rs +++ b/raphtory/src/db/api/mutation/deletion_ops.rs @@ -13,8 +13,8 @@ use raphtory_api::core::{entities::edges::edge_ref::EdgeRef, utils::time::IntoTi use raphtory_storage::mutation::{ addition_ops::{EdgeWriteLock, InternalAdditionOps}, deletion_ops::InternalDeletionOps, - durability_ops::DurabilityOps, }; +use raphtory_storage::durability_ops::DurabilityOps; use storage::wal::{GraphWalOps, WalOps}; pub trait DeletionOps: diff --git a/raphtory/src/db/api/mutation/property_addition_ops.rs b/raphtory/src/db/api/mutation/property_addition_ops.rs index 7f475fdb68..130554dc59 100644 --- a/raphtory/src/db/api/mutation/property_addition_ops.rs +++ b/raphtory/src/db/api/mutation/property_addition_ops.rs @@ -5,8 +5,9 @@ use crate::{ use raphtory_api::core::entities::properties::prop::Prop; use raphtory_storage::{ core_ops::CoreGraphOps, + durability_ops::DurabilityOps, mutation::{ - addition_ops::InternalAdditionOps, durability_ops::DurabilityOps, + addition_ops::InternalAdditionOps, property_addition_ops::InternalPropertyAdditionOps, }, }; diff --git a/raphtory/src/db/api/storage/storage.rs b/raphtory/src/db/api/storage/storage.rs index dd3a62210f..5add9501d5 100644 --- a/raphtory/src/db/api/storage/storage.rs +++ b/raphtory/src/db/api/storage/storage.rs @@ -19,13 +19,13 @@ use raphtory_api::core::{ }; use raphtory_storage::{ core_ops::InheritCoreGraphOps, + durability_ops::DurabilityOps, graph::graph::GraphStorage, layer_ops::InheritLayerOps, mutation::{ addition_ops::{EdgeWriteLock, InternalAdditionOps, SessionAdditionOps}, addition_ops_ext::{AtomicAddEdge, AtomicAddNode, UnlockedSession}, deletion_ops::InternalDeletionOps, - durability_ops::DurabilityOps, property_addition_ops::InternalPropertyAdditionOps, EdgeWriterT, GraphPropWriterT, NodeWriterT, }, @@ -36,6 +36,7 @@ use std::{ sync::Arc, }; use storage::wal::{GraphWalOps, WalOps, LSN}; +use storage::persist::control_file::{ControlFileOps, DBState}; #[cfg(feature = "search")] use { diff --git a/raphtory/src/db/graph/edge.rs b/raphtory/src/db/graph/edge.rs index 0680e3db3f..735343fa94 100644 --- a/raphtory/src/db/graph/edge.rs +++ b/raphtory/src/db/graph/edge.rs @@ -41,11 +41,11 @@ use raphtory_core::entities::{ nodes::node_ref::{AsNodeRef, NodeRef}, }; use raphtory_storage::{ + durability_ops::DurabilityOps, graph::edges::edge_storage_ops::EdgeStorageOps, mutation::{ addition_ops::{EdgeWriteLock, InternalAdditionOps}, deletion_ops::InternalDeletionOps, - durability_ops::DurabilityOps, property_addition_ops::InternalPropertyAdditionOps, }, }; diff --git a/raphtory/src/db/graph/node.rs b/raphtory/src/db/graph/node.rs index f3d724bdd8..002a396f48 100644 --- a/raphtory/src/db/graph/node.rs +++ b/raphtory/src/db/graph/node.rs @@ -38,10 +38,10 @@ use raphtory_api::core::{ }; use raphtory_storage::{ core_ops::CoreGraphOps, + durability_ops::DurabilityOps, graph::graph::GraphStorage, mutation::{ addition_ops::{InternalAdditionOps, NodeWriteLock}, - durability_ops::DurabilityOps, MutationError, }, }; From 166e89597111fd38b4d9b11fbfac76219ec3b788 Mon Sep 17 00:00:00 2001 From: Fadhil Abubaker Date: Thu, 12 Mar 2026 10:28:14 -0400 Subject: [PATCH 08/25] Write to control file on drop --- db4-storage/src/pages/mod.rs | 46 +++++++++++-------- db4-storage/src/persist/control_file.rs | 22 ++++----- db4-storage/src/persist/strategy.rs | 15 ++++-- raphtory-storage/src/lib.rs | 1 + .../src/mutation/addition_ops_ext.rs | 5 +- raphtory-storage/src/recovery_ops.rs | 30 ++++++++++++ raphtory/src/db/api/mutation/addition_ops.rs | 10 ++-- raphtory/src/db/api/mutation/deletion_ops.rs | 10 ++-- .../db/api/mutation/property_addition_ops.rs | 3 +- raphtory/src/db/api/storage/storage.rs | 18 ++++---- 10 files changed, 102 insertions(+), 58 deletions(-) create mode 100644 raphtory-storage/src/recovery_ops.rs diff --git a/db4-storage/src/pages/mod.rs b/db4-storage/src/pages/mod.rs index 2053929def..78c6544f42 100644 --- a/db4-storage/src/pages/mod.rs +++ b/db4-storage/src/pages/mod.rs @@ -3,7 +3,11 @@ use crate::{ api::{edges::EdgeSegmentOps, graph_props::GraphPropSegmentOps, nodes::NodeSegmentOps}, error::StorageError, pages::{edge_store::ReadLockedEdgeStorage, node_store::ReadLockedNodeStorage}, - persist::{config::ConfigOps, strategy::PersistenceStrategy}, + persist::{ + config::ConfigOps, + control_file::{ControlFileOps, DBState}, + strategy::PersistenceStrategy, + }, properties::props_meta_writer::PropsMetaWriter, segments::{edge::segment::MemEdgeSegment, node::segment::MemNodeSegment}, wal::{GraphWalOps, LSN, WalOps}, @@ -558,32 +562,34 @@ impl< > Drop for GraphStore { fn drop(&mut self) { - match self.flush() { - Ok(_) => { - let wal = self.ext.wal(); - - // INVARIANTS: - // 1. No new writes can occur since we are in a drop. - // 2. flush() has persisted all the segments to disk. - // - // Thus, we can safely discard all records with LSN <= latest_lsn_on_disk - // by rotating the WAL. - let latest_lsn_on_disk = wal.next_lsn() - 1; - - if let Err(e) = wal.rotate(latest_lsn_on_disk) { - eprintln!("Failed to rotate WAL in drop: {}", e); - } + let wal = self.ext.wal(); + let control_file = self.ext.control_file(); - // FIXME: If the process crashes here after rotation, we lose the - // checkpoint record. Write next LSN to a separate file before rotation. + // Since we are in a Drop, no more writes can occur. + // Thus, next_lsn == end of the WAL stream. + // So we can safely set this as the redo LSN for the checkpoint (i.e. nothing to redo). + let redo_lsn = wal.next_lsn(); - // Log a checkpoint record so we can restore the next LSN after reload. + match self.flush() { + Ok(_) => { + // Log a checkpoint record in the WAL, indicating that the DB was shutdown + // with all the segments flushed to disk. + // Now, redo_lsn points to this checkpoint record. + // On startup, recovery is skipped since there are no pending writes to replay. + let is_shutdown = true; let checkpoint_lsn = wal - .log_checkpoint(latest_lsn_on_disk, true) + .log_checkpoint(redo_lsn, is_shutdown) .expect("Failed to log checkpoint in drop"); wal.flush(checkpoint_lsn) .expect("Failed to flush checkpoint record in drop"); + + // Record the checkpoint and shutdown state and write control file to disk. + control_file.set_checkpoint(checkpoint_lsn); + control_file.set_db_state(DBState::Shutdown); + control_file + .save() + .expect("Failed to save control file in drop"); } Err(err) => { eprintln!("Failed to flush storage in drop: {err}") diff --git a/db4-storage/src/persist/control_file.rs b/db4-storage/src/persist/control_file.rs index 8d17ab39cb..1180968fab 100644 --- a/db4-storage/src/persist/control_file.rs +++ b/db4-storage/src/persist/control_file.rs @@ -1,5 +1,5 @@ -use std::path::Path; use crate::{error::StorageError, wal::LSN}; +use std::path::Path; #[derive(Debug)] pub enum DBState { @@ -9,28 +9,28 @@ pub enum DBState { } pub trait ControlFileOps: Sized { - fn load_from_dir(dir: &Path) -> Result; + fn load(dir: &Path) -> Result; - fn save_to_dir(&self, dir: &Path) -> Result<(), StorageError>; + fn save(&self) -> Result<(), StorageError>; fn db_state(&self) -> &DBState; fn last_checkpoint(&self) -> LSN; - fn set_db_state(&self, state: DBState) -> Result<(), StorageError>; + fn set_db_state(&self, state: DBState); - fn set_last_checkpoint(&self, lsn: LSN) -> Result<(), StorageError>; + fn set_checkpoint(&self, lsn: LSN); } #[derive(Debug, Clone)] pub struct NoControlFile; impl ControlFileOps for NoControlFile { - fn load_from_dir(_dir: &Path) -> Result { + fn load(_dir: &Path) -> Result { Ok(NoControlFile) } - fn save_to_dir(&self, _dir: &Path) -> Result<(), StorageError> { + fn save(&self) -> Result<(), StorageError> { Ok(()) } @@ -44,11 +44,7 @@ impl ControlFileOps for NoControlFile { 0 } - fn set_db_state(&self, state: DBState) -> Result<(), StorageError> { - Ok(()) - } + fn set_db_state(&self, state: DBState) {} - fn set_last_checkpoint(&self, lsn: LSN) -> Result<(), StorageError> { - Ok(()) - } + fn set_checkpoint(&self, lsn: LSN) {} } diff --git a/db4-storage/src/persist/strategy.rs b/db4-storage/src/persist/strategy.rs index 461544103f..df43338f4d 100644 --- a/db4-storage/src/persist/strategy.rs +++ b/db4-storage/src/persist/strategy.rs @@ -1,13 +1,16 @@ use crate::{ api::{edges::EdgeSegmentOps, graph_props::GraphPropSegmentOps, nodes::NodeSegmentOps}, error::StorageError, - persist::{config::{BaseConfig, ConfigOps}, control_file::{ControlFileOps, NoControlFile}}, + persist::{ + config::{BaseConfig, ConfigOps}, + control_file::{ControlFileOps, NoControlFile}, + }, segments::{ edge::segment::{EdgeSegmentView, MemEdgeSegment}, - graph_prop::{segment::MemGraphPropSegment, GraphPropSegmentView}, + graph_prop::{GraphPropSegmentView, segment::MemGraphPropSegment}, node::segment::{MemNodeSegment, NodeSegmentView}, }, - wal::{no_wal::NoWal, GraphWalOps, WalOps}, + wal::{GraphWalOps, WalOps, no_wal::NoWal}, }; use std::{fmt::Debug, ops::DerefMut, path::Path}; @@ -78,7 +81,11 @@ impl PersistenceStrategy for NoOpStrategy { type ControlFile = NoControlFile; fn new(config: BaseConfig, _graph_dir: Option<&Path>) -> Result { - Ok(Self { config, wal: NoWal, control_file: NoControlFile }) + Ok(Self { + config, + wal: NoWal, + control_file: NoControlFile, + }) } fn load(_graph_dir: &Path) -> Result { diff --git a/raphtory-storage/src/lib.rs b/raphtory-storage/src/lib.rs index 1822f83745..5e9308d61d 100644 --- a/raphtory-storage/src/lib.rs +++ b/raphtory-storage/src/lib.rs @@ -3,3 +3,4 @@ pub mod durability_ops; pub mod graph; pub mod layer_ops; pub mod mutation; +pub mod recovery_ops; diff --git a/raphtory-storage/src/mutation/addition_ops_ext.rs b/raphtory-storage/src/mutation/addition_ops_ext.rs index b4d70b3fb5..6b84b13caa 100644 --- a/raphtory-storage/src/mutation/addition_ops_ext.rs +++ b/raphtory-storage/src/mutation/addition_ops_ext.rs @@ -4,6 +4,7 @@ use crate::{ addition_ops::{EdgeWriteLock, InternalAdditionOps, NodeWriteLock, SessionAdditionOps}, MutationError, NodeWriterT, }, + recovery_ops::RecoveryOps, }; use db4_graph::{TemporalGraph, WriteLockedGraph}; use raphtory_api::core::{ @@ -34,7 +35,7 @@ use storage::{ resolver::{GIDResolverOps, Initialiser, MaybeInit}, transaction::TransactionManager, wal::LSN, - Extension, LocalPOS, Wal, ES, GS, NS, ControlFile, + ControlFile, Extension, LocalPOS, Wal, ES, GS, NS, }; pub struct AtomicAddEdge<'a, EXT> @@ -711,3 +712,5 @@ impl DurabilityOps for TemporalGraph { Ok(&self.extension().control_file()) } } + +impl RecoveryOps for TemporalGraph {} diff --git a/raphtory-storage/src/recovery_ops.rs b/raphtory-storage/src/recovery_ops.rs new file mode 100644 index 0000000000..696e03bfa5 --- /dev/null +++ b/raphtory-storage/src/recovery_ops.rs @@ -0,0 +1,30 @@ +use storage::persist::control_file::{ControlFileOps, DBState}; + +use crate::{ + durability_ops::DurabilityOps, + mutation::{addition_ops::InternalAdditionOps, MutationError}, +}; + +pub trait RecoveryOps: DurabilityOps + InternalAdditionOps { + /// Recover from a crash if needed by replaying updates from the WAL. + fn run_recovery(&self) -> Result<(), MutationError> { + let transaction_manager = self.transaction_manager()?; + let wal = self.wal()?; + let control_file = self.control_file()?; + + match control_file.db_state() { + DBState::Shutdown => { + // DB was shutdown cleanly, no need for recovery. + return Ok(()); + } + DBState::Running => { + todo!() + } + DBState::CrashRecovery => { + todo!() + } + } + + Ok(()) + } +} diff --git a/raphtory/src/db/api/mutation/addition_ops.rs b/raphtory/src/db/api/mutation/addition_ops.rs index b80b3fd986..0193f22340 100644 --- a/raphtory/src/db/api/mutation/addition_ops.rs +++ b/raphtory/src/db/api/mutation/addition_ops.rs @@ -16,11 +16,13 @@ use raphtory_api::core::{ }, utils::time::{IntoTimeWithFormat, TryIntoInputTime}, }; -use raphtory_storage::mutation::{ - addition_ops::{EdgeWriteLock, InternalAdditionOps, NodeWriteLock}, - MutationError, +use raphtory_storage::{ + durability_ops::DurabilityOps, + mutation::{ + addition_ops::{EdgeWriteLock, InternalAdditionOps, NodeWriteLock}, + MutationError, + }, }; -use raphtory_storage::durability_ops::DurabilityOps; use storage::wal::{GraphWalOps, WalOps}; pub trait AdditionOps: StaticGraphViewOps + InternalAdditionOps> { diff --git a/raphtory/src/db/api/mutation/deletion_ops.rs b/raphtory/src/db/api/mutation/deletion_ops.rs index 48e3a75fce..b6a38d85a7 100644 --- a/raphtory/src/db/api/mutation/deletion_ops.rs +++ b/raphtory/src/db/api/mutation/deletion_ops.rs @@ -10,11 +10,13 @@ use crate::{ errors::{into_graph_err, GraphError}, }; use raphtory_api::core::{entities::edges::edge_ref::EdgeRef, utils::time::IntoTimeWithFormat}; -use raphtory_storage::mutation::{ - addition_ops::{EdgeWriteLock, InternalAdditionOps}, - deletion_ops::InternalDeletionOps, +use raphtory_storage::{ + durability_ops::DurabilityOps, + mutation::{ + addition_ops::{EdgeWriteLock, InternalAdditionOps}, + deletion_ops::InternalDeletionOps, + }, }; -use raphtory_storage::durability_ops::DurabilityOps; use storage::wal::{GraphWalOps, WalOps}; pub trait DeletionOps: diff --git a/raphtory/src/db/api/mutation/property_addition_ops.rs b/raphtory/src/db/api/mutation/property_addition_ops.rs index 130554dc59..e0c143ad0b 100644 --- a/raphtory/src/db/api/mutation/property_addition_ops.rs +++ b/raphtory/src/db/api/mutation/property_addition_ops.rs @@ -7,8 +7,7 @@ use raphtory_storage::{ core_ops::CoreGraphOps, durability_ops::DurabilityOps, mutation::{ - addition_ops::InternalAdditionOps, - property_addition_ops::InternalPropertyAdditionOps, + addition_ops::InternalAdditionOps, property_addition_ops::InternalPropertyAdditionOps, }, }; use storage::wal::{GraphWalOps, WalOps}; diff --git a/raphtory/src/db/api/storage/storage.rs b/raphtory/src/db/api/storage/storage.rs index 5add9501d5..0ed8493001 100644 --- a/raphtory/src/db/api/storage/storage.rs +++ b/raphtory/src/db/api/storage/storage.rs @@ -19,7 +19,6 @@ use raphtory_api::core::{ }; use raphtory_storage::{ core_ops::InheritCoreGraphOps, - durability_ops::DurabilityOps, graph::graph::GraphStorage, layer_ops::InheritLayerOps, mutation::{ @@ -29,14 +28,17 @@ use raphtory_storage::{ property_addition_ops::InternalPropertyAdditionOps, EdgeWriterT, GraphPropWriterT, NodeWriterT, }, + recovery_ops::RecoveryOps, }; use std::{ fmt::{Display, Formatter}, path::Path, sync::Arc, }; -use storage::wal::{GraphWalOps, WalOps, LSN}; -use storage::persist::control_file::{ControlFileOps, DBState}; +use storage::{ + persist::control_file::{ControlFileOps, DBState}, + wal::{GraphWalOps, WalOps, LSN}, +}; #[cfg(feature = "search")] use { @@ -141,14 +143,8 @@ impl Storage { fn load_with_extension(path: &Path, ext: Extension) -> Result { let temporal_graph = TemporalGraph::load(path, ext)?; - let wal = temporal_graph.wal()?; - let control_file = temporal_graph.control_file()?; - // Replay any pending writes from the WAL. - if wal.has_entries()? { - let mut write_locked_graph = temporal_graph.write_lock()?; - wal.replay_to_graph(&mut write_locked_graph)?; - } + temporal_graph.run_recovery()?; Ok(Self { graph: GraphStorage::Unlocked(Arc::new(temporal_graph)), @@ -160,12 +156,14 @@ impl Storage { pub fn load(path: impl AsRef) -> Result { let path = path.as_ref(); let ext = Extension::load(path)?; + Self::load_with_extension(path, ext) } pub fn load_with_config(path: impl AsRef, config: Config) -> Result { let path = path.as_ref(); let ext = Extension::load_with_config(path, config)?; + Self::load_with_extension(path, ext) } From 337023a374aeed3484cd1e1e538c8a28e6469a9b Mon Sep 17 00:00:00 2001 From: Fadhil Abubaker Date: Thu, 12 Mar 2026 10:56:41 -0400 Subject: [PATCH 09/25] Add sketch of recovery --- db4-storage/src/persist/control_file.rs | 5 ++--- db4-storage/src/wal/mod.rs | 3 +++ db4-storage/src/wal/no_wal.rs | 5 ++++- raphtory-storage/src/recovery_ops.rs | 12 +++++++----- raphtory/src/db/api/storage/storage.rs | 1 + 5 files changed, 17 insertions(+), 9 deletions(-) diff --git a/db4-storage/src/persist/control_file.rs b/db4-storage/src/persist/control_file.rs index 1180968fab..30ab8f0926 100644 --- a/db4-storage/src/persist/control_file.rs +++ b/db4-storage/src/persist/control_file.rs @@ -6,6 +6,7 @@ pub enum DBState { Running, Shutdown, CrashRecovery, + NotSupported, } pub trait ControlFileOps: Sized { @@ -35,9 +36,7 @@ impl ControlFileOps for NoControlFile { } fn db_state(&self) -> &DBState { - // Without a control file there is no persistence, hence this always reports a clean - // shutdown state so that no recovery is attempted. - &DBState::Shutdown + &DBState::NotSupported } fn last_checkpoint(&self) -> LSN { diff --git a/db4-storage/src/wal/mod.rs b/db4-storage/src/wal/mod.rs index 1ba2d29928..4a1cadcfbc 100644 --- a/db4-storage/src/wal/mod.rs +++ b/db4-storage/src/wal/mod.rs @@ -32,6 +32,9 @@ pub trait WalOps { /// Returns the LSN that will be assigned to the next appended record. fn next_lsn(&self) -> LSN; + + /// Sets the next LSN to be assigned to a record. + fn set_next_lsn(&self, lsn: LSN); } #[derive(Debug)] diff --git a/db4-storage/src/wal/no_wal.rs b/db4-storage/src/wal/no_wal.rs index 5b9e40104f..eb553730de 100644 --- a/db4-storage/src/wal/no_wal.rs +++ b/db4-storage/src/wal/no_wal.rs @@ -31,6 +31,9 @@ impl WalOps for NoWal { } fn next_lsn(&self) -> LSN { - 1 + 0 + } + + fn set_next_lsn(&self, _lsn: LSN) { } } diff --git a/raphtory-storage/src/recovery_ops.rs b/raphtory-storage/src/recovery_ops.rs index 696e03bfa5..fcd5af5d3e 100644 --- a/raphtory-storage/src/recovery_ops.rs +++ b/raphtory-storage/src/recovery_ops.rs @@ -8,20 +8,22 @@ use crate::{ pub trait RecoveryOps: DurabilityOps + InternalAdditionOps { /// Recover from a crash if needed by replaying updates from the WAL. fn run_recovery(&self) -> Result<(), MutationError> { - let transaction_manager = self.transaction_manager()?; let wal = self.wal()?; let control_file = self.control_file()?; match control_file.db_state() { DBState::Shutdown => { - // DB was shutdown cleanly, no need for recovery. + let checkpoint_lsn = control_file.last_checkpoint(); + // let (record, next_lsn) = wal.read(checkpoint_lsn)?; + // Make sure record is Checkpoint and record.is_shutdown() is true. + // wal.set_next_lsn(next_lsn); return Ok(()); } - DBState::Running => { + DBState::Running | DBState::CrashRecovery => { todo!() } - DBState::CrashRecovery => { - todo!() + DBState::NotSupported => { + // Recovery is not supported, skip. } } diff --git a/raphtory/src/db/api/storage/storage.rs b/raphtory/src/db/api/storage/storage.rs index 0ed8493001..b7d2a3c7fa 100644 --- a/raphtory/src/db/api/storage/storage.rs +++ b/raphtory/src/db/api/storage/storage.rs @@ -144,6 +144,7 @@ impl Storage { fn load_with_extension(path: &Path, ext: Extension) -> Result { let temporal_graph = TemporalGraph::load(path, ext)?; + // Run crash recovery if needed. temporal_graph.run_recovery()?; Ok(Self { From 7569cacd861ce96b18e883521c9e9d9a8fc2d362 Mon Sep 17 00:00:00 2001 From: Fadhil Abubaker Date: Thu, 12 Mar 2026 11:10:01 -0400 Subject: [PATCH 10/25] Add read to WalOps --- db4-storage/src/wal/mod.rs | 5 +++-- db4-storage/src/wal/no_wal.rs | 6 ++++-- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/db4-storage/src/wal/mod.rs b/db4-storage/src/wal/mod.rs index 4a1cadcfbc..8e73089c2f 100644 --- a/db4-storage/src/wal/mod.rs +++ b/db4-storage/src/wal/mod.rs @@ -27,8 +27,9 @@ pub trait WalOps { /// Returns an iterator over the entries in the wal. fn replay(&self) -> impl Iterator>; - /// Returns true if there are entries in the WAL file on disk. - fn has_entries(&self) -> Result; + /// Reads the WAL record at the given LSN. + /// Returns `Ok(None)` if there is no record at that LSN. + fn read(&self, lsn: LSN) -> Result, StorageError>; /// Returns the LSN that will be assigned to the next appended record. fn next_lsn(&self) -> LSN; diff --git a/db4-storage/src/wal/no_wal.rs b/db4-storage/src/wal/no_wal.rs index eb553730de..3b77f542f9 100644 --- a/db4-storage/src/wal/no_wal.rs +++ b/db4-storage/src/wal/no_wal.rs @@ -26,8 +26,10 @@ impl WalOps for NoWal { std::iter::once(Err(StorageError::GenericFailure(error.to_string()))) } - fn has_entries(&self) -> Result { - Ok(false) + fn read(&self, _lsn: LSN) -> Result, StorageError> { + Err(StorageError::GenericFailure( + "read is not supported for NoWAL".to_string(), + )) } fn next_lsn(&self) -> LSN { From 05faeb8c31bf06c38ad69a1b310c3c5a8ba63ff7 Mon Sep 17 00:00:00 2001 From: Fadhil Abubaker Date: Thu, 12 Mar 2026 19:42:46 -0400 Subject: [PATCH 11/25] Add read_checkpoint --- db4-storage/src/wal/entry.rs | 12 ++++++++++++ db4-storage/src/wal/mod.rs | 20 ++++++++++++++++---- raphtory-storage/src/recovery_ops.rs | 10 +++++++--- 3 files changed, 35 insertions(+), 7 deletions(-) diff --git a/db4-storage/src/wal/entry.rs b/db4-storage/src/wal/entry.rs index 56685e4364..d563e923ab 100644 --- a/db4-storage/src/wal/entry.rs +++ b/db4-storage/src/wal/entry.rs @@ -105,6 +105,18 @@ impl GraphWalOps for NoWal { Ok(0) } + fn read_checkpoint(&self, _lsn: LSN) -> Result { + Err(StorageError::GenericFailure( + "read_checkpoint is not supported for NoWAL".to_string(), + )) + } + + fn read_shutdown_checkpoint(&self, _lsn: LSN) -> Result { + Err(StorageError::GenericFailure( + "read_shutdown_checkpoint is not supported for NoWAL".to_string(), + )) + } + fn replay_iter(&self) -> impl Iterator> { std::iter::empty() } diff --git a/db4-storage/src/wal/mod.rs b/db4-storage/src/wal/mod.rs index 8e73089c2f..d0554f134c 100644 --- a/db4-storage/src/wal/mod.rs +++ b/db4-storage/src/wal/mod.rs @@ -24,13 +24,13 @@ pub trait WalOps { /// All records with LSN > `cutoff_lsn` are copied to the new WAL file. fn rotate(&self, cutoff_lsn: LSN) -> Result<(), StorageError>; - /// Returns an iterator over the entries in the wal. - fn replay(&self) -> impl Iterator>; - /// Reads the WAL record at the given LSN. /// Returns `Ok(None)` if there is no record at that LSN. fn read(&self, lsn: LSN) -> Result, StorageError>; + /// Returns an iterator over the entries in the wal. + fn replay(&self) -> impl Iterator>; + /// Returns the LSN that will be assigned to the next appended record. fn next_lsn(&self) -> LSN; @@ -61,6 +61,11 @@ impl ReplayRecord { self.lsn } + /// Returns the LSN immediately following this record in the WAL stream. + pub fn next_lsn(&self) -> LSN { + self.lsn + self.raw_bytes.len() as LSN + } + pub fn data(&self) -> &[u8] { &self.data } @@ -153,11 +158,18 @@ pub trait GraphWalOps { /// Set `is_shutdown` to true on a clean shutdown to differentiate from periodic checkpoints. fn log_checkpoint(&self, redo: LSN, is_shutdown: bool) -> Result; + /// Reads and decodes the WAL entry at the given LSN and validates that it is a checkpoint. + /// Returns the checkpoint redo LSN, denoting where replay should start from. + fn read_checkpoint(&self, lsn: LSN) -> Result; + + /// Reads and decodes the WAL entry at the given LSN and validates that it is a shutdown checkpoint. + /// Returns the LSN immediately after this record which marks the end of the WAL stream. + fn read_shutdown_checkpoint(&self, lsn: LSN) -> Result; + /// Returns an iterator over the entries in the wal. fn replay_iter(&self) -> impl Iterator>; /// Replays and applies all the entries in the wal to the given graph. - /// Subsequent appends to the WAL will start from the LSN of the last replayed entry. fn replay_to_graph(&self, graph: &mut G) -> Result<(), StorageError>; } diff --git a/raphtory-storage/src/recovery_ops.rs b/raphtory-storage/src/recovery_ops.rs index fcd5af5d3e..d3b755f1b1 100644 --- a/raphtory-storage/src/recovery_ops.rs +++ b/raphtory-storage/src/recovery_ops.rs @@ -1,4 +1,5 @@ use storage::persist::control_file::{ControlFileOps, DBState}; +use storage::wal::{GraphWalOps, WalOps}; use crate::{ durability_ops::DurabilityOps, @@ -14,9 +15,12 @@ pub trait RecoveryOps: DurabilityOps + InternalAdditionOps { let checkpoint_lsn = control_file.last_checkpoint(); - // let (record, next_lsn) = wal.read(checkpoint_lsn)?; - // Make sure record is Checkpoint and record.is_shutdown() is true. - // wal.set_next_lsn(next_lsn); + let end_of_wal_lsn = wal.read_shutdown_checkpoint(checkpoint_lsn)?; + + // LSN after the shutdown checkpoint points to the end of WAL stream. + // Set this as the next LSN for future writes. + wal.set_next_lsn(end_of_wal_lsn); + return Ok(()); } DBState::Running | DBState::CrashRecovery => { From 7eab0fc879837059a853bad892facb5e7a18fb77 Mon Sep 17 00:00:00 2001 From: Fadhil Abubaker Date: Fri, 13 Mar 2026 17:01:33 -0400 Subject: [PATCH 12/25] Return DBState --- db4-storage/src/persist/control_file.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/db4-storage/src/persist/control_file.rs b/db4-storage/src/persist/control_file.rs index 30ab8f0926..45e448c964 100644 --- a/db4-storage/src/persist/control_file.rs +++ b/db4-storage/src/persist/control_file.rs @@ -1,7 +1,8 @@ use crate::{error::StorageError, wal::LSN}; +use serde::{Deserialize, Serialize}; use std::path::Path; -#[derive(Debug)] +#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] pub enum DBState { Running, Shutdown, @@ -14,7 +15,7 @@ pub trait ControlFileOps: Sized { fn save(&self) -> Result<(), StorageError>; - fn db_state(&self) -> &DBState; + fn db_state(&self) -> DBState; fn last_checkpoint(&self) -> LSN; @@ -35,8 +36,8 @@ impl ControlFileOps for NoControlFile { Ok(()) } - fn db_state(&self) -> &DBState { - &DBState::NotSupported + fn db_state(&self) -> DBState { + DBState::NotSupported } fn last_checkpoint(&self) -> LSN { From 8830b102de7e738c67974feb32c9cc590461a17a Mon Sep 17 00:00:00 2001 From: Fadhil Abubaker Date: Fri, 13 Mar 2026 17:40:16 -0400 Subject: [PATCH 13/25] Remove wal rotate --- db4-storage/src/wal/mod.rs | 4 ---- db4-storage/src/wal/no_wal.rs | 4 ---- 2 files changed, 8 deletions(-) diff --git a/db4-storage/src/wal/mod.rs b/db4-storage/src/wal/mod.rs index d0554f134c..5fa9108c8f 100644 --- a/db4-storage/src/wal/mod.rs +++ b/db4-storage/src/wal/mod.rs @@ -20,10 +20,6 @@ pub trait WalOps { /// Returns immediately if the given LSN is already flushed to disk. fn flush(&self, lsn: LSN) -> Result<(), StorageError>; - /// Rotates the underlying WAL file. - /// All records with LSN > `cutoff_lsn` are copied to the new WAL file. - fn rotate(&self, cutoff_lsn: LSN) -> Result<(), StorageError>; - /// Reads the WAL record at the given LSN. /// Returns `Ok(None)` if there is no record at that LSN. fn read(&self, lsn: LSN) -> Result, StorageError>; diff --git a/db4-storage/src/wal/no_wal.rs b/db4-storage/src/wal/no_wal.rs index 3b77f542f9..07a63ccb85 100644 --- a/db4-storage/src/wal/no_wal.rs +++ b/db4-storage/src/wal/no_wal.rs @@ -17,10 +17,6 @@ impl WalOps for NoWal { Ok(()) } - fn rotate(&self, _cutoff_lsn: LSN) -> Result<(), StorageError> { - Ok(()) - } - fn replay(&self) -> impl Iterator> { let error = "Recovery is not supported for NoWAL"; std::iter::once(Err(StorageError::GenericFailure(error.to_string()))) From 7b8b9497cc17eaf4b0139e37d47a16562f95a309 Mon Sep 17 00:00:00 2001 From: Fadhil Abubaker Date: Mon, 16 Mar 2026 13:43:45 -0400 Subject: [PATCH 14/25] Modify replay to take a start LSN --- db4-storage/src/pages/mod.rs | 4 +++- db4-storage/src/wal/entry.rs | 8 ++++++-- db4-storage/src/wal/mod.rs | 19 +++++++++++++------ db4-storage/src/wal/no_wal.rs | 3 ++- 4 files changed, 24 insertions(+), 10 deletions(-) diff --git a/db4-storage/src/pages/mod.rs b/db4-storage/src/pages/mod.rs index 78c6544f42..5a0b402495 100644 --- a/db4-storage/src/pages/mod.rs +++ b/db4-storage/src/pages/mod.rs @@ -581,7 +581,9 @@ impl< .log_checkpoint(redo_lsn, is_shutdown) .expect("Failed to log checkpoint in drop"); - wal.flush(checkpoint_lsn) + // Flush up to the end of the WAL stream. + let flush_lsn = wal.next_lsn(); + wal.flush(flush_lsn) .expect("Failed to flush checkpoint record in drop"); // Record the checkpoint and shutdown state and write control file to disk. diff --git a/db4-storage/src/wal/entry.rs b/db4-storage/src/wal/entry.rs index d563e923ab..2d09a48d63 100644 --- a/db4-storage/src/wal/entry.rs +++ b/db4-storage/src/wal/entry.rs @@ -117,11 +117,15 @@ impl GraphWalOps for NoWal { )) } - fn replay_iter(&self) -> impl Iterator> { + fn replay_iter(&self, _start: LSN) -> impl Iterator> { std::iter::empty() } - fn replay_to_graph(&self, _graph: &mut G) -> Result<(), StorageError> { + fn replay_to_graph( + &self, + _graph: &mut G, + _start: LSN, + ) -> Result<(), StorageError> { panic!("NoWAL does not support replay") } } diff --git a/db4-storage/src/wal/mod.rs b/db4-storage/src/wal/mod.rs index 5fa9108c8f..c40c14b7ed 100644 --- a/db4-storage/src/wal/mod.rs +++ b/db4-storage/src/wal/mod.rs @@ -24,8 +24,8 @@ pub trait WalOps { /// Returns `Ok(None)` if there is no record at that LSN. fn read(&self, lsn: LSN) -> Result, StorageError>; - /// Returns an iterator over the entries in the wal. - fn replay(&self) -> impl Iterator>; + /// Returns an iterator over the entries in the wal, starting from the given LSN. + fn replay(&self, start: LSN) -> impl Iterator>; /// Returns the LSN that will be assigned to the next appended record. fn next_lsn(&self) -> LSN; @@ -162,11 +162,18 @@ pub trait GraphWalOps { /// Returns the LSN immediately after this record which marks the end of the WAL stream. fn read_shutdown_checkpoint(&self, lsn: LSN) -> Result; - /// Returns an iterator over the entries in the wal. - fn replay_iter(&self) -> impl Iterator>; + /// Returns an iterator over the entries in the wal, starting from the given LSN. + fn replay_iter( + &self, + start: LSN, + ) -> impl Iterator>; - /// Replays and applies all the entries in the wal to the given graph. - fn replay_to_graph(&self, graph: &mut G) -> Result<(), StorageError>; + /// Replays and applies all the entries in the wal to the given graph, starting from the given LSN. + fn replay_to_graph( + &self, + graph: &mut G, + start: LSN, + ) -> Result<(), StorageError>; } /// Trait for defining callbacks for replaying from wal. diff --git a/db4-storage/src/wal/no_wal.rs b/db4-storage/src/wal/no_wal.rs index 07a63ccb85..90bf0471e1 100644 --- a/db4-storage/src/wal/no_wal.rs +++ b/db4-storage/src/wal/no_wal.rs @@ -17,7 +17,7 @@ impl WalOps for NoWal { Ok(()) } - fn replay(&self) -> impl Iterator> { + fn replay(&self, _start: LSN) -> impl Iterator> { let error = "Recovery is not supported for NoWAL"; std::iter::once(Err(StorageError::GenericFailure(error.to_string()))) } @@ -33,5 +33,6 @@ impl WalOps for NoWal { } fn set_next_lsn(&self, _lsn: LSN) { + panic!("set_next_lsn is not supported for NoWAL"); } } From f26bca2b0e5fff35d65dc4a14613d8707b8cec47 Mon Sep 17 00:00:00 2001 From: Fadhil Abubaker Date: Tue, 17 Mar 2026 12:29:59 -0400 Subject: [PATCH 15/25] Fix crash recovery with byte offsets --- db4-storage/src/persist/control_file.rs | 3 +++ db4-storage/src/wal/entry.rs | 2 +- db4-storage/src/wal/mod.rs | 5 ++-- raphtory-storage/src/recovery_ops.rs | 32 +++++++++++++++++++++---- 4 files changed, 35 insertions(+), 7 deletions(-) diff --git a/db4-storage/src/persist/control_file.rs b/db4-storage/src/persist/control_file.rs index 45e448c964..9c8c942884 100644 --- a/db4-storage/src/persist/control_file.rs +++ b/db4-storage/src/persist/control_file.rs @@ -10,6 +10,9 @@ pub enum DBState { NotSupported, } +// Starting value for `last_checkpoint` in the control file. +pub const LAST_CHECKPOINT_INIT: LSN = 0; + pub trait ControlFileOps: Sized { fn load(dir: &Path) -> Result; diff --git a/db4-storage/src/wal/entry.rs b/db4-storage/src/wal/entry.rs index 2d09a48d63..81347fe64c 100644 --- a/db4-storage/src/wal/entry.rs +++ b/db4-storage/src/wal/entry.rs @@ -125,7 +125,7 @@ impl GraphWalOps for NoWal { &self, _graph: &mut G, _start: LSN, - ) -> Result<(), StorageError> { + ) -> Result { panic!("NoWAL does not support replay") } } diff --git a/db4-storage/src/wal/mod.rs b/db4-storage/src/wal/mod.rs index c40c14b7ed..9e4bcdca4a 100644 --- a/db4-storage/src/wal/mod.rs +++ b/db4-storage/src/wal/mod.rs @@ -159,7 +159,7 @@ pub trait GraphWalOps { fn read_checkpoint(&self, lsn: LSN) -> Result; /// Reads and decodes the WAL entry at the given LSN and validates that it is a shutdown checkpoint. - /// Returns the LSN immediately after this record which marks the end of the WAL stream. + /// Returns the LSN immediately after this record, marking the end of the WAL stream. fn read_shutdown_checkpoint(&self, lsn: LSN) -> Result; /// Returns an iterator over the entries in the wal, starting from the given LSN. @@ -169,11 +169,12 @@ pub trait GraphWalOps { ) -> impl Iterator>; /// Replays and applies all the entries in the wal to the given graph, starting from the given LSN. + /// Returns the LSN immediately after the last entry in the WAL stream on success. fn replay_to_graph( &self, graph: &mut G, start: LSN, - ) -> Result<(), StorageError>; + ) -> Result; } /// Trait for defining callbacks for replaying from wal. diff --git a/raphtory-storage/src/recovery_ops.rs b/raphtory-storage/src/recovery_ops.rs index d3b755f1b1..d6674d6f3b 100644 --- a/raphtory-storage/src/recovery_ops.rs +++ b/raphtory-storage/src/recovery_ops.rs @@ -1,4 +1,4 @@ -use storage::persist::control_file::{ControlFileOps, DBState}; +use storage::persist::control_file::{ControlFileOps, DBState, LAST_CHECKPOINT_INIT}; use storage::wal::{GraphWalOps, WalOps}; use crate::{ @@ -20,17 +20,41 @@ pub trait RecoveryOps: DurabilityOps + InternalAdditionOps { - todo!() + let checkpoint_lsn = control_file.last_checkpoint(); + + let redo_lsn = if checkpoint_lsn == LAST_CHECKPOINT_INIT { + // No successful checkpoint has been written yet, + // replay from the start of the WAL stream. + 0 + } else { + wal.read_checkpoint(checkpoint_lsn)? + }; + + println!("redo_lsn: {redo_lsn}"); + + // Set db state to indicate that recovery is in progress. + control_file.set_db_state(DBState::CrashRecovery); + control_file.save()?; + + let mut write_locked_graph = self.write_lock()?; + let end_of_wal_lsn = wal.replay_to_graph(&mut write_locked_graph, redo_lsn)?; + + // Set the next LSN for future writes to the end of the WAL stream. + wal.set_next_lsn(end_of_wal_lsn); } DBState::NotSupported => { // Recovery is not supported, skip. } } + println!("next_lsn: {}", wal.next_lsn()); + + // Always set db state to Running after recovery completes. + control_file.set_db_state(DBState::Running); + control_file.save()?; + Ok(()) } } From 0fd0764b760ede1bf23124cb2e8b2de2f2c17860 Mon Sep 17 00:00:00 2001 From: Fadhil Abubaker Date: Tue, 17 Mar 2026 12:53:47 -0400 Subject: [PATCH 16/25] Remove replay_iter --- db4-storage/src/wal/entry.rs | 4 ---- db4-storage/src/wal/mod.rs | 6 ------ 2 files changed, 10 deletions(-) diff --git a/db4-storage/src/wal/entry.rs b/db4-storage/src/wal/entry.rs index 81347fe64c..91b7c1883c 100644 --- a/db4-storage/src/wal/entry.rs +++ b/db4-storage/src/wal/entry.rs @@ -117,10 +117,6 @@ impl GraphWalOps for NoWal { )) } - fn replay_iter(&self, _start: LSN) -> impl Iterator> { - std::iter::empty() - } - fn replay_to_graph( &self, _graph: &mut G, diff --git a/db4-storage/src/wal/mod.rs b/db4-storage/src/wal/mod.rs index 9e4bcdca4a..18c0c1b046 100644 --- a/db4-storage/src/wal/mod.rs +++ b/db4-storage/src/wal/mod.rs @@ -162,12 +162,6 @@ pub trait GraphWalOps { /// Returns the LSN immediately after this record, marking the end of the WAL stream. fn read_shutdown_checkpoint(&self, lsn: LSN) -> Result; - /// Returns an iterator over the entries in the wal, starting from the given LSN. - fn replay_iter( - &self, - start: LSN, - ) -> impl Iterator>; - /// Replays and applies all the entries in the wal to the given graph, starting from the given LSN. /// Returns the LSN immediately after the last entry in the WAL stream on success. fn replay_to_graph( From 3bece447209658a20f51497d0516287407761360 Mon Sep 17 00:00:00 2001 From: Fadhil Abubaker Date: Thu, 26 Mar 2026 17:11:51 -0400 Subject: [PATCH 17/25] Run fmt --- raphtory-storage/src/recovery_ops.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/raphtory-storage/src/recovery_ops.rs b/raphtory-storage/src/recovery_ops.rs index d6674d6f3b..b578da96e2 100644 --- a/raphtory-storage/src/recovery_ops.rs +++ b/raphtory-storage/src/recovery_ops.rs @@ -1,5 +1,7 @@ -use storage::persist::control_file::{ControlFileOps, DBState, LAST_CHECKPOINT_INIT}; -use storage::wal::{GraphWalOps, WalOps}; +use storage::{ + persist::control_file::{ControlFileOps, DBState, LAST_CHECKPOINT_INIT}, + wal::{GraphWalOps, WalOps}, +}; use crate::{ durability_ops::DurabilityOps, From 10826726e0bd6697960aea8b464a9ca0a5de6846 Mon Sep 17 00:00:00 2001 From: Fadhil Abubaker Date: Fri, 27 Mar 2026 17:28:17 -0400 Subject: [PATCH 18/25] Rename next_lsn to position --- db4-storage/src/pages/mod.rs | 4 ++-- db4-storage/src/wal/mod.rs | 8 ++++---- db4-storage/src/wal/no_wal.rs | 8 +++++--- raphtory-storage/src/recovery_ops.rs | 6 ++---- 4 files changed, 13 insertions(+), 13 deletions(-) diff --git a/db4-storage/src/pages/mod.rs b/db4-storage/src/pages/mod.rs index 5a0b402495..364cf9c5e7 100644 --- a/db4-storage/src/pages/mod.rs +++ b/db4-storage/src/pages/mod.rs @@ -568,7 +568,7 @@ impl< // Since we are in a Drop, no more writes can occur. // Thus, next_lsn == end of the WAL stream. // So we can safely set this as the redo LSN for the checkpoint (i.e. nothing to redo). - let redo_lsn = wal.next_lsn(); + let redo_lsn = wal.position(); match self.flush() { Ok(_) => { @@ -582,7 +582,7 @@ impl< .expect("Failed to log checkpoint in drop"); // Flush up to the end of the WAL stream. - let flush_lsn = wal.next_lsn(); + let flush_lsn = wal.position(); wal.flush(flush_lsn) .expect("Failed to flush checkpoint record in drop"); diff --git a/db4-storage/src/wal/mod.rs b/db4-storage/src/wal/mod.rs index 18c0c1b046..0c6afa045c 100644 --- a/db4-storage/src/wal/mod.rs +++ b/db4-storage/src/wal/mod.rs @@ -27,11 +27,11 @@ pub trait WalOps { /// Returns an iterator over the entries in the wal, starting from the given LSN. fn replay(&self, start: LSN) -> impl Iterator>; - /// Returns the LSN that will be assigned to the next appended record. - fn next_lsn(&self) -> LSN; + /// Returns the current position in the WAL stream. + fn position(&self) -> LSN; - /// Sets the next LSN to be assigned to a record. - fn set_next_lsn(&self, lsn: LSN); + /// Sets the position in the WAL stream. + fn set_position(&self, lsn: LSN) -> Result<(), StorageError>; } #[derive(Debug)] diff --git a/db4-storage/src/wal/no_wal.rs b/db4-storage/src/wal/no_wal.rs index 90bf0471e1..7f563080ca 100644 --- a/db4-storage/src/wal/no_wal.rs +++ b/db4-storage/src/wal/no_wal.rs @@ -28,11 +28,13 @@ impl WalOps for NoWal { )) } - fn next_lsn(&self) -> LSN { + fn position(&self) -> LSN { 0 } - fn set_next_lsn(&self, _lsn: LSN) { - panic!("set_next_lsn is not supported for NoWAL"); + fn set_position(&self, _lsn: LSN) -> Result<(), StorageError> { + Err(StorageError::GenericFailure( + "set_position is not supported for NoWAL".to_string(), + )) } } diff --git a/raphtory-storage/src/recovery_ops.rs b/raphtory-storage/src/recovery_ops.rs index b578da96e2..643fc800c2 100644 --- a/raphtory-storage/src/recovery_ops.rs +++ b/raphtory-storage/src/recovery_ops.rs @@ -21,7 +21,7 @@ pub trait RecoveryOps: DurabilityOps + InternalAdditionOps { let checkpoint_lsn = control_file.last_checkpoint(); @@ -44,15 +44,13 @@ pub trait RecoveryOps: DurabilityOps + InternalAdditionOps { // Recovery is not supported, skip. } } - println!("next_lsn: {}", wal.next_lsn()); - // Always set db state to Running after recovery completes. control_file.set_db_state(DBState::Running); control_file.save()?; From 875c68879eac36e978ea3efcbcbf49cec8599246 Mon Sep 17 00:00:00 2001 From: Fadhil Abubaker Date: Fri, 27 Mar 2026 18:00:48 -0400 Subject: [PATCH 19/25] Replace raw_bytes with next_lsn --- db4-storage/src/wal/mod.rs | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/db4-storage/src/wal/mod.rs b/db4-storage/src/wal/mod.rs index 0c6afa045c..c6f2120610 100644 --- a/db4-storage/src/wal/mod.rs +++ b/db4-storage/src/wal/mod.rs @@ -40,16 +40,16 @@ pub struct ReplayRecord { data: Vec, - /// The raw bytes of the WAL entry stored on disk, including CRC data. - raw_bytes: Vec, + /// LSN immediately after this record in the WAL stream. + next_lsn: LSN, } impl ReplayRecord { - pub fn new(lsn: LSN, data: Vec, raw_bytes: Vec) -> Self { + pub fn new(lsn: LSN, data: Vec, next_lsn: LSN) -> Self { Self { lsn, data, - raw_bytes, + next_lsn, } } @@ -59,16 +59,12 @@ impl ReplayRecord { /// Returns the LSN immediately following this record in the WAL stream. pub fn next_lsn(&self) -> LSN { - self.lsn + self.raw_bytes.len() as LSN + self.next_lsn } pub fn data(&self) -> &[u8] { &self.data } - - pub fn raw_bytes(&self) -> &[u8] { - &self.raw_bytes - } } // Raphtory-specific logging & replay methods. From 0f15401ae1f174b95b43cb81ccfc613b2d6e7864 Mon Sep 17 00:00:00 2001 From: Fadhil Abubaker Date: Fri, 27 Mar 2026 18:27:22 -0400 Subject: [PATCH 20/25] Remove stray print --- raphtory-storage/src/recovery_ops.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/raphtory-storage/src/recovery_ops.rs b/raphtory-storage/src/recovery_ops.rs index 643fc800c2..1f94473a9a 100644 --- a/raphtory-storage/src/recovery_ops.rs +++ b/raphtory-storage/src/recovery_ops.rs @@ -34,8 +34,6 @@ pub trait RecoveryOps: DurabilityOps + InternalAdditionOps Date: Fri, 27 Mar 2026 18:51:38 -0400 Subject: [PATCH 21/25] Fix stray merge conflicts --- raphtory-storage/src/mutation/mod.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/raphtory-storage/src/mutation/mod.rs b/raphtory-storage/src/mutation/mod.rs index 3c8e77cb56..7b8b5579fb 100644 --- a/raphtory-storage/src/mutation/mod.rs +++ b/raphtory-storage/src/mutation/mod.rs @@ -32,8 +32,6 @@ use thiserror::Error; pub mod addition_ops; pub mod addition_ops_ext; -pub mod deletion_ops; -pub mod durability_ops; pub mod property_addition_ops; pub type NodeWriterT<'a> = NodeWriter<'a, RwLockWriteGuard<'a, MemNodeSegment>, NS>; From 1d8fdf22e315644ee12b0a438777ba6dbcb73431 Mon Sep 17 00:00:00 2001 From: Fadhil Abubaker Date: Fri, 27 Mar 2026 19:22:27 -0400 Subject: [PATCH 22/25] Create graph folder in path.init() --- raphtory/src/db/api/mutation/deletion_ops.rs | 6 +----- raphtory/src/db/api/storage/storage.rs | 7 +------ raphtory/src/db/graph/edge.rs | 2 -- raphtory/src/serialise/graph_folder.rs | 13 ++++++++++--- 4 files changed, 12 insertions(+), 16 deletions(-) diff --git a/raphtory/src/db/api/mutation/deletion_ops.rs b/raphtory/src/db/api/mutation/deletion_ops.rs index 88e3faa676..2293e8c084 100644 --- a/raphtory/src/db/api/mutation/deletion_ops.rs +++ b/raphtory/src/db/api/mutation/deletion_ops.rs @@ -11,12 +11,8 @@ use crate::{ }; use raphtory_api::core::{entities::edges::edge_ref::EdgeRef, utils::time::IntoTimeWithFormat}; use raphtory_storage::{ - mutation::addition_ops::{EdgeWriteLock, InternalAdditionOps}, durability_ops::DurabilityOps, - mutation::{ - addition_ops::{EdgeWriteLock, InternalAdditionOps}, - deletion_ops::InternalDeletionOps, - }, + mutation::addition_ops::{EdgeWriteLock, InternalAdditionOps}, }; use storage::wal::{GraphWalOps, WalOps}; diff --git a/raphtory/src/db/api/storage/storage.rs b/raphtory/src/db/api/storage/storage.rs index c3b581b962..924151c9eb 100644 --- a/raphtory/src/db/api/storage/storage.rs +++ b/raphtory/src/db/api/storage/storage.rs @@ -24,8 +24,6 @@ use raphtory_storage::{ mutation::{ addition_ops::{EdgeWriteLock, InternalAdditionOps, SessionAdditionOps}, addition_ops_ext::{AtomicAddEdge, AtomicAddNode, UnlockedSession}, - deletion_ops::InternalDeletionOps, - durability_ops::DurabilityOps, property_addition_ops::InternalPropertyAdditionOps, EdgeWriterT, GraphPropWriterT, NodeWriterT, }, @@ -36,10 +34,7 @@ use std::{ path::Path, sync::Arc, }; -use storage::{ - persist::control_file::{ControlFileOps, DBState}, - wal::{GraphWalOps, WalOps, LSN}, -}; +use storage::wal::LSN; #[cfg(feature = "search")] use { diff --git a/raphtory/src/db/graph/edge.rs b/raphtory/src/db/graph/edge.rs index 270f6e8920..472a64aec5 100644 --- a/raphtory/src/db/graph/edge.rs +++ b/raphtory/src/db/graph/edge.rs @@ -45,8 +45,6 @@ use raphtory_storage::{ graph::edges::edge_storage_ops::EdgeStorageOps, mutation::{ addition_ops::{EdgeWriteLock, InternalAdditionOps}, - deletion_ops::InternalDeletionOps, - durability_ops::DurabilityOps, property_addition_ops::InternalPropertyAdditionOps, }, }; diff --git a/raphtory/src/serialise/graph_folder.rs b/raphtory/src/serialise/graph_folder.rs index abd4a2c6df..2b9faf178d 100644 --- a/raphtory/src/serialise/graph_folder.rs +++ b/raphtory/src/serialise/graph_folder.rs @@ -268,12 +268,19 @@ pub trait GraphPaths { } else { fs::create_dir_all(self.root())? } - let meta_path = self.relative_data_path()?; - fs::create_dir(self.root().join(&meta_path))?; + + // Create the data folder and have the root metadata file point to it. + let data_path = self.relative_data_path()?; + fs::create_dir(self.root().join(&data_path))?; fs::write( self.root_meta_path(), - serde_json::to_string(&RelativePath { path: meta_path })?, + serde_json::to_string(&RelativePath { path: data_path })?, )?; + + // Create the graph folder inside the data folder. + let graph_path = self.graph_path()?; + fs::create_dir(&graph_path)?; + Ok(()) } } From 83dea34210f612e7dbe04bdd190a772f9d3b004d Mon Sep 17 00:00:00 2001 From: Fadhil Abubaker Date: Fri, 27 Mar 2026 19:41:55 -0400 Subject: [PATCH 23/25] Run fmt --- raphtory/src/algorithms/components/connected_components.rs | 4 ++-- raphtory/src/python/packages/algorithms.rs | 4 +++- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/raphtory/src/algorithms/components/connected_components.rs b/raphtory/src/algorithms/components/connected_components.rs index f105de6435..954d29b55e 100644 --- a/raphtory/src/algorithms/components/connected_components.rs +++ b/raphtory/src/algorithms/components/connected_components.rs @@ -8,6 +8,7 @@ use crate::{ }, prelude::GraphViewOps, }; +use disjoint_sets::AUnionFind; use raphtory_api::core::entities::VID; use raphtory_core::entities::LayerIds; use rayon::prelude::*; @@ -16,7 +17,6 @@ use std::{ mem, sync::atomic::{AtomicUsize, Ordering}, }; -use disjoint_sets::AUnionFind; /// Keeps track of node assignments to weakly-connected components /// @@ -244,4 +244,4 @@ where }); let result = NodeState::new_from_eval(g.clone(), dss.to_vec()); result -} \ No newline at end of file +} diff --git a/raphtory/src/python/packages/algorithms.rs b/raphtory/src/python/packages/algorithms.rs index 785f290362..3670dc440b 100644 --- a/raphtory/src/python/packages/algorithms.rs +++ b/raphtory/src/python/packages/algorithms.rs @@ -151,7 +151,9 @@ pub fn weakly_connected_components(graph: &PyGraphView) -> NodeState<'static, us /// NodeStateUsize: Mapping of nodes to their component ids. #[pyfunction] #[pyo3(signature = (graph))] -pub fn weakly_connected_components_ds(graph: &PyGraphView) -> NodeState<'static, usize, DynamicGraph> { +pub fn weakly_connected_components_ds( + graph: &PyGraphView, +) -> NodeState<'static, usize, DynamicGraph> { components::weakly_connected_components_ds(&graph.graph) } From 85b04fad19d1b9dcf46d5f7c8e707b01b15bb1df Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Sat, 28 Mar 2026 01:00:41 +0000 Subject: [PATCH 24/25] chore: apply tidy-public auto-fixes --- python/python/raphtory/algorithms/__init__.pyi | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/python/python/raphtory/algorithms/__init__.pyi b/python/python/raphtory/algorithms/__init__.pyi index 6ae4cb0718..0920041416 100644 --- a/python/python/raphtory/algorithms/__init__.pyi +++ b/python/python/raphtory/algorithms/__init__.pyi @@ -54,6 +54,7 @@ __all__ = [ "local_clustering_coefficient", "local_clustering_coefficient_batch", "weakly_connected_components", + "weakly_connected_components_ds", "strongly_connected_components", "in_components", "in_component", @@ -425,6 +426,20 @@ def weakly_connected_components(graph: GraphView) -> NodeStateUsize: NodeStateUsize: Mapping of nodes to their component ids. """ +def weakly_connected_components_ds(graph: GraphView) -> NodeStateUsize: + """ + Weakly connected components (Disjoint Set Union) -- partitions the graph into node sets which are mutually reachable by an undirected path + + This function assigns a component id to each node such that nodes with the same component id are mutually reachable + by an undirected path. + + Arguments: + graph (GraphView): Raphtory graph + + Returns: + NodeStateUsize: Mapping of nodes to their component ids. + """ + def strongly_connected_components(graph: GraphView) -> NodeStateUsize: """ Strongly connected components From 124389feb10bd3acff0f8cdb5881bda0ce0efe81 Mon Sep 17 00:00:00 2001 From: Fadhil Abubaker Date: Mon, 30 Mar 2026 11:41:13 -0400 Subject: [PATCH 25/25] Simplify shutdown --- db4-storage/src/pages/mod.rs | 11 ++--------- db4-storage/src/wal/entry.rs | 6 +++++- db4-storage/src/wal/mod.rs | 6 ++++-- 3 files changed, 11 insertions(+), 12 deletions(-) diff --git a/db4-storage/src/pages/mod.rs b/db4-storage/src/pages/mod.rs index 56ba68ff5b..55faad5179 100644 --- a/db4-storage/src/pages/mod.rs +++ b/db4-storage/src/pages/mod.rs @@ -358,21 +358,14 @@ impl< let wal = self.ext.wal(); let control_file = self.ext.control_file(); - // Since we are in a Drop, no more writes can occur. - // Thus, next_lsn == end of the WAL stream. - // So we can safely set this as the redo LSN for the checkpoint (i.e. nothing to redo). - let redo_lsn = wal.position(); - match self.flush() { Ok(_) => { // Log a checkpoint record in the WAL, indicating that the DB was shutdown // with all the segments flushed to disk. - // Now, redo_lsn points to this checkpoint record. // On startup, recovery is skipped since there are no pending writes to replay. - let is_shutdown = true; let checkpoint_lsn = wal - .log_checkpoint(redo_lsn, is_shutdown) - .expect("Failed to log checkpoint in drop"); + .log_shutdown_checkpoint() + .expect("Failed to log shutdown checkpoint in drop"); // Flush up to the end of the WAL stream. let flush_lsn = wal.position(); diff --git a/db4-storage/src/wal/entry.rs b/db4-storage/src/wal/entry.rs index 91b7c1883c..cf410046e0 100644 --- a/db4-storage/src/wal/entry.rs +++ b/db4-storage/src/wal/entry.rs @@ -101,7 +101,11 @@ impl GraphWalOps for NoWal { Ok(0) } - fn log_checkpoint(&self, _redo: LSN, _is_shutdown: bool) -> Result { + fn log_checkpoint(&self, _redo: LSN) -> Result { + Ok(0) + } + + fn log_shutdown_checkpoint(&self) -> Result { Ok(0) } diff --git a/db4-storage/src/wal/mod.rs b/db4-storage/src/wal/mod.rs index c6f2120610..228d0b3822 100644 --- a/db4-storage/src/wal/mod.rs +++ b/db4-storage/src/wal/mod.rs @@ -147,8 +147,10 @@ pub trait GraphWalOps { /// Logs a checkpoint indicating that all LSN < `redo` are persisted. /// On recovery, replay will start from `redo` in the WAL stream. - /// Set `is_shutdown` to true on a clean shutdown to differentiate from periodic checkpoints. - fn log_checkpoint(&self, redo: LSN, is_shutdown: bool) -> Result; + fn log_checkpoint(&self, redo: LSN) -> Result; + + /// Logs a shutdown checkpoint indicating a clean shutdown with all writes persisted. + fn log_shutdown_checkpoint(&self) -> Result; /// Reads and decodes the WAL entry at the given LSN and validates that it is a checkpoint. /// Returns the checkpoint redo LSN, denoting where replay should start from.