Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 35 additions & 37 deletions crates/durability/src/imp/local.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use std::{
io,
num::NonZeroUsize,
path::PathBuf,
sync::{
atomic::{AtomicU64, Ordering::Relaxed},
Arc,
},
time::Duration,
};

use futures::{FutureExt as _, TryFutureExt as _};
Expand All @@ -19,7 +19,6 @@ use thiserror::Error;
use tokio::{
sync::{futures::OwnedNotified, mpsc, oneshot, watch, Notify},
task::{spawn_blocking, AbortHandle},
time::{interval, MissedTickBehavior},
};
use tracing::{instrument, Span};

Expand All @@ -30,23 +29,29 @@ pub use spacetimedb_commitlog::repo::{OnNewSegmentFn, SizeOnDisk};
/// [`Local`] configuration.
#[derive(Clone, Copy, Debug)]
pub struct Options {
/// Periodically flush and sync the log this often.
/// The number of elements to reserve for batching transactions.
///
/// Default: 50ms
pub sync_interval: Duration,
/// If `true`, flush (but not sync) each transaction.
/// This puts an upper bound on the buffer capacity, while not preventing
/// reallocations when the number of queued transactions exceeds it.
///
/// Default: false
pub flush_each_tx: bool,
/// In other words, the durability actor will attempt to receive all
/// transactions that are currently in the queue, but shrink the buffer to
/// `batch_capacity` if it had to make additional space during a burst.
///
/// Default: 4096
pub batch_capacity: NonZeroUsize,
/// [`Commitlog`] configuration.
pub commitlog: spacetimedb_commitlog::Options,
}

impl Options {
pub const DEFAULT_BATCH_CAPACITY: NonZeroUsize = NonZeroUsize::new(4096).unwrap();
}

impl Default for Options {
fn default() -> Self {
Self {
sync_interval: Duration::from_millis(50),
flush_each_tx: false,
batch_capacity: Self::DEFAULT_BATCH_CAPACITY,
commitlog: Default::default(),
}
}
Expand Down Expand Up @@ -134,8 +139,7 @@ impl<T: Encode + Send + Sync + 'static> Local<T> {
durable_offset: durable_tx,
queue_depth: queue_depth.clone(),

sync_interval: opts.sync_interval,
flush_each_tx: opts.flush_each_tx,
batch_capacity: opts.batch_capacity,

lock,
}
Expand Down Expand Up @@ -193,8 +197,7 @@ struct Actor<T> {
durable_offset: watch::Sender<Option<TxOffset>>,
queue_depth: Arc<AtomicU64>,

sync_interval: Duration,
flush_each_tx: bool,
batch_capacity: NonZeroUsize,

#[allow(unused)]
lock: Lock,
Expand All @@ -209,8 +212,7 @@ impl<T: Encode + Send + Sync + 'static> Actor<T> {
) {
info!("starting durability actor");

let mut sync_interval = interval(self.sync_interval);
sync_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
let mut tx_buf = Vec::with_capacity(self.batch_capacity.get());
// `flush_and_sync` when the loop exits without panicking,
// or `flush_and_sync` inside the loop failed.
let mut sync_on_exit = true;
Expand All @@ -220,42 +222,38 @@ impl<T: Encode + Send + Sync + 'static> Actor<T> {
// Biased towards the shutdown channel,
// so that we stop accepting new data promptly after
// `Durability::close` was called.
//
// Note that periodic `flush_and_sync` needs to be polled before
// the txdata channel, so that we don't delay `fsync(2)` under
// high transaction throughput.
biased;

Some(reply) = shutdown_rx.recv() => {
transactions_rx.close();
let _ = reply.send(self.lock.notified());
},

_ = sync_interval.tick() => {
if self.flush_and_sync().await.is_err() {
sync_on_exit = false;
// Pop as many elements from the channel as possible,
// potentially requiring the `tx_buf` to allocate additional
// capacity.
// We'll reclaim capacity in excess of `self.batch_size` below.
n = transactions_rx.recv_many(&mut tx_buf, usize::MAX) => {
if n == 0 {
break;
}
},

tx = transactions_rx.recv() => {
let Some(tx) = tx else {
break;
};
self.queue_depth.fetch_sub(1, Relaxed);
self.queue_depth.fetch_sub(n as u64, Relaxed);
let clog = self.clog.clone();
let flush = self.flush_each_tx;
spawn_blocking(move || -> io::Result<()> {
clog.commit([tx])?;
if flush {
clog.flush()?;
tx_buf = spawn_blocking(move || -> io::Result<Vec<Transaction<Txdata<T>>>> {
for tx in tx_buf.drain(..) {
clog.commit([tx])?;
}

Ok(())
Ok(tx_buf)
})
.await
.expect("commitlog write panicked")
.expect("commitlog write failed");
if self.flush_and_sync().await.is_err() {
sync_on_exit = false;
break;
}
// Reclaim burst capacity.
tx_buf.shrink_to(self.batch_capacity.get());
},
}
}
Expand Down
Loading