Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
4e4d3ef
Add redo and is_shutdown to log_checkpoint
fabubaker Mar 10, 2026
d89cb57
Add ControlFileOps and NoControlFile
fabubaker Mar 11, 2026
0b1ecbb
Remove new and load from WalOps
fabubaker Mar 11, 2026
257ee27
Attach control_file to strategy structs
fabubaker Mar 11, 2026
97d77b8
Expose control_file in PersistenceStrategy
fabubaker Mar 11, 2026
d84b5e2
Expose control_file in DurabilityOps
fabubaker Mar 11, 2026
50fc86c
Move DurabilityOps out of mutation
fabubaker Mar 11, 2026
166e895
Write to control file on drop
fabubaker Mar 12, 2026
337023a
Add sketch of recovery
fabubaker Mar 12, 2026
7569cac
Add read to WalOps
fabubaker Mar 12, 2026
05faeb8
Add read_checkpoint
fabubaker Mar 12, 2026
7eab0fc
Return DBState
fabubaker Mar 13, 2026
8830b10
Remove wal rotate
fabubaker Mar 13, 2026
7b8b949
Modify replay to take a start LSN
fabubaker Mar 16, 2026
f26bca2
Fix crash recovery with byte offsets
fabubaker Mar 17, 2026
0fd0764
Remove replay_iter
fabubaker Mar 17, 2026
3bece44
Run fmt
fabubaker Mar 26, 2026
1082672
Rename next_lsn to position
fabubaker Mar 27, 2026
875c688
Replace raw_bytes with next_lsn
fabubaker Mar 27, 2026
0f15401
Remove stray print
fabubaker Mar 27, 2026
5c6c6e8
Merge branch 'db_v4' of github.com:Pometry/Raphtory into db_v4_/lsn-b…
fabubaker Mar 27, 2026
5372d75
Fix stray merge conflicts
fabubaker Mar 27, 2026
1d8fdf2
Create graph folder in path.init()
fabubaker Mar 27, 2026
83dea34
Run fmt
fabubaker Mar 27, 2026
85b04fa
chore: apply tidy-public auto-fixes
github-actions[bot] Mar 28, 2026
181b05b
Merge branch 'db_v4' of github.com:Pometry/Raphtory into db_v4_/lsn-b…
fabubaker Mar 30, 2026
124389f
Simplify shutdown
fabubaker Mar 30, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions db4-storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ pub type GS<P> = GraphPropSegmentView<P>;
pub type Layer<P> = GraphStore<NS<P>, ES<P>, GS<P>, P>;

pub type Wal = <Extension as PersistenceStrategy>::Wal;
pub type ControlFile = <Extension as PersistenceStrategy>::ControlFile;
pub type Config = <Extension as PersistenceStrategy>::Config;
pub type GIDResolver = MappingResolver;

Expand Down
46 changes: 24 additions & 22 deletions db4-storage/src/pages/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,12 @@ use crate::{
api::{edges::EdgeSegmentOps, graph_props::GraphPropSegmentOps, nodes::NodeSegmentOps},
error::StorageError,
pages::{edge_store::ReadLockedEdgeStorage, node_store::ReadLockedNodeStorage},
persist::{config::ConfigOps, strategy::PersistenceStrategy},
persist::{
config::ConfigOps,
control_file::{ControlFileOps, DBState},
strategy::PersistenceStrategy,
},
properties::props_meta_writer::PropsMetaWriter,
segments::{edge::segment::MemEdgeSegment, node::segment::MemNodeSegment},
state::StateIndex,
wal::{GraphWalOps, WalOps},
Expand Down Expand Up @@ -350,32 +355,29 @@ impl<
> Drop for GraphStore<NS, ES, GS, EXT>
{
fn drop(&mut self) {
let wal = self.ext.wal();
let control_file = self.ext.control_file();

match self.flush() {
Ok(_) => {
let wal = self.ext.wal();

// INVARIANTS:
// 1. No new writes can occur since we are in a drop.
// 2. flush() has persisted all the segments to disk.
//
// Thus, we can safely discard all records with LSN <= latest_lsn_on_disk
// by rotating the WAL.
let latest_lsn_on_disk = wal.next_lsn() - 1;

if let Err(e) = wal.rotate(latest_lsn_on_disk) {
eprintln!("Failed to rotate WAL in drop: {}", e);
}

// FIXME: If the process crashes here after rotation, we lose the
// checkpoint record. Write next LSN to a separate file before rotation.

// Log a checkpoint record so we can restore the next LSN after reload.
// Log a checkpoint record in the WAL, indicating that the DB was shutdown
// with all the segments flushed to disk.
// On startup, recovery is skipped since there are no pending writes to replay.
let checkpoint_lsn = wal
.log_checkpoint(latest_lsn_on_disk)
.expect("Failed to log checkpoint in drop");
.log_shutdown_checkpoint()
.expect("Failed to log shutdown checkpoint in drop");

wal.flush(checkpoint_lsn)
// Flush up to the end of the WAL stream.
let flush_lsn = wal.position();
wal.flush(flush_lsn)
.expect("Failed to flush checkpoint record in drop");

// Record the checkpoint and shutdown state and write control file to disk.
control_file.set_checkpoint(checkpoint_lsn);
control_file.set_db_state(DBState::Shutdown);
control_file
.save()
.expect("Failed to save control file in drop");
}
Err(err) => {
eprintln!("Failed to flush storage in drop: {err}")
Expand Down
7 changes: 4 additions & 3 deletions db4-storage/src/persist/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ use tracing::error;

pub const DEFAULT_MAX_PAGE_LEN_NODES: u32 = 131_072; // 2^17
pub const DEFAULT_MAX_PAGE_LEN_EDGES: u32 = 1_048_576; // 2^20
pub const CONFIG_FILE: &str = "config.json";

const CONFIG_FILE_NAME: &str = "config.json";

pub trait ConfigOps: Serialize + DeserializeOwned + Args + Sized {
fn max_node_page_len(&self) -> u32;
Expand All @@ -25,14 +26,14 @@ pub trait ConfigOps: Serialize + DeserializeOwned + Args + Sized {
fn with_node_types(&self, node_types: impl IntoIterator<Item = impl AsRef<str>>) -> Self;

fn load_from_dir(dir: &Path) -> Result<Self, StorageError> {
let config_file = dir.join(CONFIG_FILE);
let config_file = dir.join(CONFIG_FILE_NAME);
let config_file = std::fs::File::open(config_file)?;
let config = serde_json::from_reader(config_file)?;
Ok(config)
}

fn save_to_dir(&self, dir: &Path) -> Result<(), StorageError> {
let config_file = dir.join(CONFIG_FILE);
let config_file = dir.join(CONFIG_FILE_NAME);
let config_file = std::fs::File::create(&config_file)?;
serde_json::to_writer_pretty(config_file, self)?;
Ok(())
Expand Down
53 changes: 53 additions & 0 deletions db4-storage/src/persist/control_file.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
use crate::{error::StorageError, wal::LSN};
use serde::{Deserialize, Serialize};
use std::path::Path;

#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
pub enum DBState {
Running,
Shutdown,
CrashRecovery,
NotSupported,
}

// Starting value for `last_checkpoint` in the control file.
pub const LAST_CHECKPOINT_INIT: LSN = 0;

pub trait ControlFileOps: Sized {
fn load(dir: &Path) -> Result<Self, StorageError>;

fn save(&self) -> Result<(), StorageError>;

fn db_state(&self) -> DBState;

fn last_checkpoint(&self) -> LSN;

fn set_db_state(&self, state: DBState);

fn set_checkpoint(&self, lsn: LSN);
}

#[derive(Debug, Clone)]
pub struct NoControlFile;

impl ControlFileOps for NoControlFile {
fn load(_dir: &Path) -> Result<Self, StorageError> {
Ok(NoControlFile)
}

fn save(&self) -> Result<(), StorageError> {
Ok(())
}

fn db_state(&self) -> DBState {
DBState::NotSupported
}

fn last_checkpoint(&self) -> LSN {
0
}

fn set_db_state(&self, state: DBState) {}

fn set_checkpoint(&self, lsn: LSN) {}
}
1 change: 1 addition & 0 deletions db4-storage/src/persist/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
pub mod config;
pub mod control_file;
pub mod strategy;
17 changes: 15 additions & 2 deletions db4-storage/src/persist/strategy.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use crate::{
api::{edges::EdgeSegmentOps, graph_props::GraphPropSegmentOps, nodes::NodeSegmentOps},
error::StorageError,
persist::config::{BaseConfig, ConfigOps},
persist::{
config::{BaseConfig, ConfigOps},
control_file::{ControlFileOps, NoControlFile},
},
segments::{
edge::segment::{EdgeSegmentView, MemEdgeSegment},
graph_prop::{GraphPropSegmentView, segment::MemGraphPropSegment},
Expand All @@ -25,6 +28,7 @@ pub trait PersistenceStrategy: Debug + Clone + Send + Sync + 'static {
type GS: GraphPropSegmentOps;
type Wal: WalOps + GraphWalOps;
type Config: ConfigOps;
type ControlFile: ControlFileOps;

fn new(config: Self::Config, graph_dir: Option<&Path>) -> Result<Self, StorageError>;

Expand All @@ -38,6 +42,8 @@ pub trait PersistenceStrategy: Debug + Clone + Send + Sync + 'static {

fn wal(&self) -> &Self::Wal;

fn control_file(&self) -> &Self::ControlFile;

/// Called after every write and checks memory limits to decide if a flush is needed
fn persist_node_segment<MP: DerefMut<Target = MemNodeSegment>>(
&self,
Expand Down Expand Up @@ -81,6 +87,7 @@ pub struct NoOpStrategy {
config: BaseConfig,
memory_tracker: Arc<AtomicUsize>,
wal: NoWal,
control_file: NoControlFile,
}

impl PersistenceStrategy for NoOpStrategy {
Expand All @@ -89,12 +96,14 @@ impl PersistenceStrategy for NoOpStrategy {
type GS = GraphPropSegmentView<Self>;
type Wal = NoWal;
type Config = BaseConfig;
type ControlFile = NoControlFile;

fn new(config: BaseConfig, _graph_dir: Option<&Path>) -> Result<Self, StorageError> {
Ok(Self {
config,
memory_tracker: Arc::new(AtomicUsize::new(0)),
wal: NoWal,
control_file: NoControlFile,
memory_tracker: Arc::new(AtomicUsize::new(0)),
})
}

Expand All @@ -118,6 +127,10 @@ impl PersistenceStrategy for NoOpStrategy {
&self.wal
}

fn control_file(&self) -> &Self::ControlFile {
&self.control_file
}

fn persist_node_segment<MP: DerefMut<Target = MemNodeSegment>>(
&self,
_node_page: &Self::NS,
Expand Down
24 changes: 20 additions & 4 deletions db4-storage/src/wal/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,15 +101,31 @@ impl GraphWalOps for NoWal {
Ok(0)
}

fn log_checkpoint(&self, _lsn: LSN) -> Result<LSN, StorageError> {
fn log_checkpoint(&self, _redo: LSN) -> Result<LSN, StorageError> {
Ok(0)
}

fn replay_iter(&self) -> impl Iterator<Item = Result<(LSN, ()), StorageError>> {
std::iter::empty()
fn log_shutdown_checkpoint(&self) -> Result<LSN, StorageError> {
Ok(0)
}

fn read_checkpoint(&self, _lsn: LSN) -> Result<LSN, StorageError> {
Err(StorageError::GenericFailure(
"read_checkpoint is not supported for NoWAL".to_string(),
))
}

fn replay_to_graph<G: GraphReplay>(&self, _graph: &mut G) -> Result<(), StorageError> {
fn read_shutdown_checkpoint(&self, _lsn: LSN) -> Result<LSN, StorageError> {
Err(StorageError::GenericFailure(
"read_shutdown_checkpoint is not supported for NoWAL".to_string(),
))
}

fn replay_to_graph<G: GraphReplay>(
&self,
_graph: &mut G,
_start: LSN,
) -> Result<LSN, StorageError> {
panic!("NoWAL does not support replay")
}
}
75 changes: 38 additions & 37 deletions db4-storage/src/wal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use raphtory_core::{
entities::{EID, GID, VID},
storage::timeindex::EventTime,
};
use std::path::Path;

pub mod entry;
pub mod no_wal;
Expand All @@ -14,36 +13,25 @@ pub type TransactionID = u64;

/// Core Wal methods.
pub trait WalOps {
type Config;

fn new(dir: Option<&Path>, config: Self::Config) -> Result<Self, StorageError>
where
Self: Sized;

/// Loads an existing WAL file from the given directory in append mode.
fn load(dir: Option<&Path>, config: Self::Config) -> Result<Self, StorageError>
where
Self: Sized;

/// Appends data to the WAL and returns the assigned LSN.
fn append(&self, data: &[u8]) -> Result<LSN, StorageError>;

/// Flushes in-memory WAL entries up to the given LSN to disk.
/// Returns immediately if the given LSN is already flushed to disk.
fn flush(&self, lsn: LSN) -> Result<(), StorageError>;

/// Rotates the underlying WAL file.
/// All records with LSN > `cutoff_lsn` are copied to the new WAL file.
fn rotate(&self, cutoff_lsn: LSN) -> Result<(), StorageError>;
/// Reads the WAL record at the given LSN.
/// Returns `Ok(None)` if there is no record at that LSN.
fn read(&self, lsn: LSN) -> Result<Option<ReplayRecord>, StorageError>;

/// Returns an iterator over the entries in the wal.
fn replay(&self) -> impl Iterator<Item = Result<ReplayRecord, StorageError>>;
/// Returns an iterator over the entries in the wal, starting from the given LSN.
fn replay(&self, start: LSN) -> impl Iterator<Item = Result<ReplayRecord, StorageError>>;

/// Returns true if there are entries in the WAL file on disk.
fn has_entries(&self) -> Result<bool, StorageError>;
/// Returns the current position in the WAL stream.
fn position(&self) -> LSN;

/// Returns the LSN that will be assigned to the next appended record.
fn next_lsn(&self) -> LSN;
/// Sets the position in the WAL stream.
fn set_position(&self, lsn: LSN) -> Result<(), StorageError>;
}

#[derive(Debug)]
Expand All @@ -52,29 +40,30 @@ pub struct ReplayRecord {

data: Vec<u8>,

/// The raw bytes of the WAL entry stored on disk, including CRC data.
raw_bytes: Vec<u8>,
/// LSN immediately after this record in the WAL stream.
next_lsn: LSN,
}

impl ReplayRecord {
pub fn new(lsn: LSN, data: Vec<u8>, raw_bytes: Vec<u8>) -> Self {
pub fn new(lsn: LSN, data: Vec<u8>, next_lsn: LSN) -> Self {
Self {
lsn,
data,
raw_bytes,
next_lsn,
}
}

pub fn lsn(&self) -> LSN {
self.lsn
}

pub fn data(&self) -> &[u8] {
&self.data
/// Returns the LSN immediately following this record in the WAL stream.
pub fn next_lsn(&self) -> LSN {
self.next_lsn
}

pub fn raw_bytes(&self) -> &[u8] {
&self.raw_bytes
pub fn data(&self) -> &[u8] {
&self.data
}
}

Expand Down Expand Up @@ -156,16 +145,28 @@ pub trait GraphWalOps {
props: Vec<(&str, usize, Prop)>,
) -> Result<LSN, StorageError>;

/// Logs a checkpoint record, indicating that all Wal operations upto and including
/// `lsn` has been persisted to disk.
fn log_checkpoint(&self, lsn: LSN) -> Result<LSN, StorageError>;
/// Logs a checkpoint indicating that all LSN < `redo` are persisted.
/// On recovery, replay will start from `redo` in the WAL stream.
fn log_checkpoint(&self, redo: LSN) -> Result<LSN, StorageError>;

/// Logs a shutdown checkpoint indicating a clean shutdown with all writes persisted.
fn log_shutdown_checkpoint(&self) -> Result<LSN, StorageError>;

/// Reads and decodes the WAL entry at the given LSN and validates that it is a checkpoint.
/// Returns the checkpoint redo LSN, denoting where replay should start from.
fn read_checkpoint(&self, lsn: LSN) -> Result<LSN, StorageError>;

/// Returns an iterator over the entries in the wal.
fn replay_iter(&self) -> impl Iterator<Item = Result<(LSN, Self::ReplayEntry), StorageError>>;
/// Reads and decodes the WAL entry at the given LSN and validates that it is a shutdown checkpoint.
/// Returns the LSN immediately after this record, marking the end of the WAL stream.
fn read_shutdown_checkpoint(&self, lsn: LSN) -> Result<LSN, StorageError>;

/// Replays and applies all the entries in the wal to the given graph.
/// Subsequent appends to the WAL will start from the LSN of the last replayed entry.
fn replay_to_graph<G: GraphReplay>(&self, graph: &mut G) -> Result<(), StorageError>;
/// Replays and applies all the entries in the wal to the given graph, starting from the given LSN.
/// Returns the LSN immediately after the last entry in the WAL stream on success.
fn replay_to_graph<G: GraphReplay>(
&self,
graph: &mut G,
start: LSN,
) -> Result<LSN, StorageError>;
}

/// Trait for defining callbacks for replaying from wal.
Expand Down
Loading
Loading