diff --git a/crates/durability/src/imp/local.rs b/crates/durability/src/imp/local.rs index c9397ca0fe3..dc2889c41d2 100644 --- a/crates/durability/src/imp/local.rs +++ b/crates/durability/src/imp/local.rs @@ -1,11 +1,11 @@ use std::{ io, + num::NonZeroUsize, path::PathBuf, sync::{ atomic::{AtomicU64, Ordering::Relaxed}, Arc, }, - time::Duration, }; use futures::{FutureExt as _, TryFutureExt as _}; @@ -19,7 +19,6 @@ use thiserror::Error; use tokio::{ sync::{futures::OwnedNotified, mpsc, oneshot, watch, Notify}, task::{spawn_blocking, AbortHandle}, - time::{interval, MissedTickBehavior}, }; use tracing::{instrument, Span}; @@ -30,23 +29,29 @@ pub use spacetimedb_commitlog::repo::{OnNewSegmentFn, SizeOnDisk}; /// [`Local`] configuration. #[derive(Clone, Copy, Debug)] pub struct Options { - /// Periodically flush and sync the log this often. + /// The number of elements to reserve for batching transactions. /// - /// Default: 50ms - pub sync_interval: Duration, - /// If `true`, flush (but not sync) each transaction. + /// This puts an upper bound on the buffer capacity, while not preventing + /// reallocations when the number of queued transactions exceeds it. /// - /// Default: false - pub flush_each_tx: bool, + /// In other words, the durability actor will attempt to receive all + /// transactions that are currently in the queue, but shrink the buffer to + /// `batch_capacity` if it had to make additional space during a burst. + /// + /// Default: 4096 + pub batch_capacity: NonZeroUsize, /// [`Commitlog`] configuration. pub commitlog: spacetimedb_commitlog::Options, } +impl Options { + pub const DEFAULT_BATCH_CAPACITY: NonZeroUsize = NonZeroUsize::new(4096).unwrap(); +} + impl Default for Options { fn default() -> Self { Self { - sync_interval: Duration::from_millis(50), - flush_each_tx: false, + batch_capacity: Self::DEFAULT_BATCH_CAPACITY, commitlog: Default::default(), } } @@ -134,8 +139,7 @@ impl Local { durable_offset: durable_tx, queue_depth: queue_depth.clone(), - sync_interval: opts.sync_interval, - flush_each_tx: opts.flush_each_tx, + batch_capacity: opts.batch_capacity, lock, } @@ -193,8 +197,7 @@ struct Actor { durable_offset: watch::Sender>, queue_depth: Arc, - sync_interval: Duration, - flush_each_tx: bool, + batch_capacity: NonZeroUsize, #[allow(unused)] lock: Lock, @@ -209,8 +212,7 @@ impl Actor { ) { info!("starting durability actor"); - let mut sync_interval = interval(self.sync_interval); - sync_interval.set_missed_tick_behavior(MissedTickBehavior::Delay); + let mut tx_buf = Vec::with_capacity(self.batch_capacity.get()); // `flush_and_sync` when the loop exits without panicking, // or `flush_and_sync` inside the loop failed. let mut sync_on_exit = true; @@ -220,10 +222,6 @@ impl Actor { // Biased towards the shutdown channel, // so that we stop accepting new data promptly after // `Durability::close` was called. - // - // Note that periodic `flush_and_sync` needs to be polled before - // the txdata channel, so that we don't delay `fsync(2)` under - // high transaction throughput. biased; Some(reply) = shutdown_rx.recv() => { @@ -231,31 +229,31 @@ impl Actor { let _ = reply.send(self.lock.notified()); }, - _ = sync_interval.tick() => { - if self.flush_and_sync().await.is_err() { - sync_on_exit = false; + // Pop as many elements from the channel as possible, + // potentially requiring the `tx_buf` to allocate additional + // capacity. + // We'll reclaim capacity in excess of `self.batch_size` below. + n = transactions_rx.recv_many(&mut tx_buf, usize::MAX) => { + if n == 0 { break; } - }, - - tx = transactions_rx.recv() => { - let Some(tx) = tx else { - break; - }; - self.queue_depth.fetch_sub(1, Relaxed); + self.queue_depth.fetch_sub(n as u64, Relaxed); let clog = self.clog.clone(); - let flush = self.flush_each_tx; - spawn_blocking(move || -> io::Result<()> { - clog.commit([tx])?; - if flush { - clog.flush()?; + tx_buf = spawn_blocking(move || -> io::Result>>> { + for tx in tx_buf.drain(..) { + clog.commit([tx])?; } - - Ok(()) + Ok(tx_buf) }) .await .expect("commitlog write panicked") .expect("commitlog write failed"); + if self.flush_and_sync().await.is_err() { + sync_on_exit = false; + break; + } + // Reclaim burst capacity. + tx_buf.shrink_to(self.batch_capacity.get()); }, } }