Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub fn benchmark_aligned_file_reader(c: &mut Criterion) {
let mut mem_slices: Vec<&mut [u8]> = aligned_mem.chunks_mut(read_length).collect();

// Read the same data from disk over and over again. We guarantee that it is not all zeros.
let mut aligned_reads: Vec<AlignedRead<'_, u8>> = mem_slices
let mut aligned_reads: Vec<AlignedRead<'_, u8, _>> = mem_slices
.iter_mut()
.map(|slice| AlignedRead::new(0, slice).unwrap())
.collect();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pub fn benchmark_aligned_file_reader_iai() {
let mut mem_slices: Vec<&mut [u8]> = aligned_mem.chunks_mut(read_length).collect();

// Read the same data from disk over and over again. We guarantee that it is not all zeros.
let mut aligned_reads: Vec<AlignedRead<'_, u8>> = mem_slices
let mut aligned_reads: Vec<AlignedRead<'_, u8, _>> = mem_slices
.iter_mut()
.map(|slice| AlignedRead::new(0, slice).unwrap())
.collect();
Expand Down
1 change: 1 addition & 0 deletions diskann-disk/src/search/provider/disk_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -653,6 +653,7 @@ where
query,
})
}

Comment thread
doliawu marked this conversation as resolved.
fn ensure_loaded(&mut self, ids: &[u32]) -> Result<(), ANNError> {
if ids.is_empty() {
return Ok(());
Expand Down
18 changes: 6 additions & 12 deletions diskann-disk/src/search/provider/disk_sector_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,11 @@
use std::ops::Deref;

use diskann::{ANNError, ANNResult};
use diskann_quantization::{
alloc::{AlignedAllocator, Poly},
num::PowerOfTwo,
};
use diskann_quantization::alloc::{AlignedAllocator, Poly};

use crate::{
data_model::GraphHeader,
utils::aligned_file_reader::{traits::AlignedFileReader, AlignedRead},
utils::aligned_file_reader::{traits::AlignedFileReader, AlignedRead, Alignment},
};

const DEFAULT_DISK_SECTOR_LEN: usize = 4096;
Expand Down Expand Up @@ -79,9 +76,7 @@ impl<AlignedReaderType: AlignedFileReader> DiskSectorGraph<AlignedReaderType> {
sectors_data: Poly::broadcast(
0u8,
max_n_batch_sector_read * num_sectors_per_node * block_size,
AlignedAllocator::new(
PowerOfTwo::new(block_size).map_err(ANNError::log_index_error)?,
),
AlignedAllocator::new(AlignedReaderType::Alignment::VALUE),
)
.map_err(ANNError::log_index_error)?,
cur_sector_idx: 0,
Expand All @@ -100,9 +95,7 @@ impl<AlignedReaderType: AlignedFileReader> DiskSectorGraph<AlignedReaderType> {
self.sectors_data = Poly::broadcast(
0u8,
max_n_batch_sector_read * self.num_sectors_per_node * self.block_size,
AlignedAllocator::new(
PowerOfTwo::new(self.block_size).map_err(ANNError::log_index_error)?,
),
AlignedAllocator::new(AlignedReaderType::Alignment::VALUE),
)
.map_err(ANNError::log_index_error)?;
}
Expand Down Expand Up @@ -143,7 +136,8 @@ impl<AlignedReaderType: AlignedFileReader> DiskSectorGraph<AlignedReaderType> {
);
let mut sector_slices: Vec<&mut [u8]> =
self.sectors_data[range].chunks_mut(len_per_node).collect();
let mut read_requests = Vec::with_capacity(sector_slices.len());
let mut read_requests: Vec<AlignedRead<'_, u8, AlignedReaderType::Alignment>> =
Vec::with_capacity(sector_slices.len());
for (local_sector_idx, slice) in sector_slices.iter_mut().enumerate() {
let sector_id = sectors_to_fetch[local_sector_idx];
read_requests.push(AlignedRead::new(sector_id * self.block_size as u64, slice)?);
Expand Down
16 changes: 8 additions & 8 deletions diskann-disk/src/search/traits/vertex_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use diskann::ANNResult;

/// `VertexProvider` is a trait that abstracts the access to Vertex data.
///
/// This trait provides an interface to interact with different types of vertex providers structures such as `DiskVertexProvider` and `JetVertexProvider`.
/// This trait provides an interface to interact with vertex provider structures such as `DiskVertexProvider`.
///
/// # Types
///
Expand Down Expand Up @@ -47,7 +47,7 @@ pub trait VertexProvider<Data: GraphDataType>: Send + Sync {
vertex_id: &Data::VectorIdType,
) -> ANNResult<&[Data::VectorIdType]>;

// Gets the associated data for a given vertex id.
/// Gets the associated data for a given vertex id.
///
/// The `get_associated_data` function attempts to retrieve the associated data for the
/// specified `vertex_id`. This function returns an `ANNResult` that wraps a reference to the associated data.
Expand Down Expand Up @@ -80,24 +80,24 @@ pub trait VertexProvider<Data: GraphDataType>: Send + Sync {
/// If it fails, returns an `ANNError`.
fn load_vertices(&mut self, vertex_ids: &[Data::VectorIdType]) -> ANNResult<()>;

/// This function to process the loaded node
/// Processes a vertex previously loaded by `load_vertices`, materializing its full-precision vector and adjacency list from the raw on-disk record so they can be served by the `get_*` accessors.
/// # Parameters
///
/// * `vertex_id`: A Data::VectorIdType value representing the id of the vertex for which to process.
/// * `idx`: A usize value representing the index of the vertex in the loaded node list.
/// * `vertex_id`: A Data::VectorIdType value representing the id of the vertex to process.
/// * `idx`: A usize value representing the index of the vertex in the slice passed to the preceding `load_vertices` call.
///
/// # Returns
/// * `ANNResult<()>`: If the operation is successful, returns Ok.
///
/// If it fails, returns an `ANNError`.
fn process_loaded_node(&mut self, vertex_id: &Data::VectorIdType, idx: usize) -> ANNResult<()>;

// Returns the number of IO operations performed by the vertex provider.
/// Returns the number of IO operations performed by the vertex provider.
fn io_operations(&self) -> u32;

// Returns the number of vertices loaded by the vertex provider.
/// Returns the number of vertices loaded by the vertex provider.
fn vertices_loaded_count(&self) -> u32;

// Clears the members of the vertex provider.
/// Clears the members of the vertex provider.
fn clear(&mut self);
}
16 changes: 8 additions & 8 deletions diskann-disk/src/search/traits/vertex_provider_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,20 @@ use diskann::ANNResult;
use super::VertexProvider;
use crate::data_model::GraphHeader;

/// The `VertexProviderFactory` trait provides an interface to create a GraphProvider`. This trait forms an important part
/// of the interaction between the `ANNWrapper` and the creation of `DiskIndexSearcher`. The `ANNWrapper` passes a `VertexProviderFactory` when
/// it initializes a `DiskIndexSearcher` When serving each search request, the `DiskIndexSearcher` opens a `VertexProvider`
/// using the provided `VertexProviderFactory`. There will be two flavors of VertexProviderFactory, one that reads vertex data from data another that reads vertex data from a stream.
/// The `VertexProviderFactory` trait provides an interface to create a `VertexProvider`. This trait forms an important part
/// of the interaction between the `DiskIndexSearcher` and a `VertexProvider`. A `VertexProviderFactory` is passed to `DiskIndexSearcher` when
/// it is constructed. When serving each search request, the `DiskIndexSearcher` opens a `VertexProvider`
/// using the provided `VertexProviderFactory`.
///
/// This trait has an associated VertexProvider type that signifies the specific type of VertexProvider which this `VertexProviderFactory` will create.
///
/// # Parameters
/// * `GraphMetadata`: This contains the metadata of the disk index graph, like the number of points, dimension, max_node_length, etc.
/// * `Data`: A `GraphDataType` that defines the vector element type, id type, and associated payload type for the graph.
///
/// # Functions
/// * `create_vertex_provider`: This function takes a `Metadata` object as an argument and returns a `VertexProvider` object. It also accepts a max batch read sizes which is
/// used to control the maximum number of nodes it can get in one batch.
/// * `get_header`: This function returns the metadata of the graph.
/// * `create_vertex_provider`: This function takes a `GraphHeader` reference and a max batch size and returns a `VertexProvider` object.
/// The max batch size controls the maximum number of nodes that can be loaded in a single batch.
/// * `get_header`: This function returns the header of the graph.
pub trait VertexProviderFactory<Data: GraphDataType>: Send + Sync {
type VertexProviderType: VertexProvider<Data>;

Expand Down
152 changes: 98 additions & 54 deletions diskann-disk/src/utils/aligned_file_reader/aligned_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,44 +3,78 @@
* Licensed under the MIT license.
*/

use std::marker::PhantomData;

use diskann::{ANNError, ANNResult};
use diskann_quantization::num::PowerOfTwo;

/// Type-level memory-alignment witness for [`AlignedRead`]. Each implementor is
/// a unit type carrying a single `PowerOfTwo` value.
///
/// Custom readers can define their own marker (e.g. `A4096`) by adding a unit
/// type and an `impl Alignment` with the desired `VALUE`.
pub trait Alignment {
/// The alignment, in bytes.
const VALUE: PowerOfTwo;
}

pub const DISK_IO_ALIGNMENT: usize = 512;
macro_rules! alignment_marker {
($name:ident, $value:expr) => {
#[doc = concat!("Alignment witness for ", stringify!($value), " bytes.")]
#[derive(Debug, Clone, Copy)]
pub struct $name;
impl Alignment for $name {
const VALUE: PowerOfTwo = $value;
}
};
}

/// Aligned read struct for disk IO.
pub struct AlignedRead<'a, T> {
/// where to read from
/// offset needs to be aligned with DISK_IO_ALIGNMENT
alignment_marker!(A1, PowerOfTwo::V1);
alignment_marker!(A512, PowerOfTwo::V512);

/// Disk-IO read request, parameterized by its required memory alignment `A`.
///
/// Three constraints govern a read:
/// 1. Disk offset alignment.
/// 2. Buffer length alignment.
/// 3. Buffer pointer alignment in memory.
///
/// All three are checked against `A::VALUE` at construction time by
/// [`AlignedRead::new`]. A typed `AlignedRead<T, A>` is therefore a witness
/// that the request satisfies its declared alignment, and the file reader's
/// `read` method can rely on it without re-checking.
#[derive(Debug)]
pub struct AlignedRead<'a, T, A: Alignment = A1> {
offset: u64,

/// where to read into
/// aligned_buf and its len need to be aligned with DISK_IO_ALIGNMENT
aligned_buf: &'a mut [T],
_alignment: PhantomData<A>,
}

impl<'a, T> AlignedRead<'a, T> {
impl<'a, T, A: Alignment> AlignedRead<'a, T, A> {
/// Build an `AlignedRead` after validating that `offset`, the buffer
/// length (in bytes), and the buffer pointer all satisfy `A::VALUE`.
pub fn new(offset: u64, aligned_buf: &'a mut [T]) -> ANNResult<Self> {
Self::assert_is_aligned(offset as usize)?;
Self::assert_is_aligned(std::mem::size_of_val(aligned_buf))?;

Self::assert_is_aligned(aligned_buf.as_ptr() as usize, "buffer pointer")?;
Self::assert_is_aligned(std::mem::size_of_val(aligned_buf), "buffer length")?;
Self::assert_is_aligned(offset as usize, "offset")?;
Ok(Self {
offset,
aligned_buf,
_alignment: PhantomData,
})
}

fn assert_is_aligned(val: usize) -> ANNResult<()> {
match val % DISK_IO_ALIGNMENT {
0 => Ok(()),
_ => Err(ANNError::log_disk_io_request_alignment_error(format!(
"The offset or length of AlignedRead request is not {} bytes aligned",
DISK_IO_ALIGNMENT
))),
fn assert_is_aligned(val: usize, kind: &str) -> ANNResult<()> {
let align = A::VALUE.raw();
if val.is_multiple_of(align) {
Ok(())
} else {
Err(ANNError::log_disk_io_request_alignment_error(format!(
"{kind} {val} not aligned to {align}",
)))
}
}

/// where to read from
/// offset needs to be aligned with DISK_IO_ALIGNMENT
pub fn offset(&self) -> u64 {
self.offset
}
Expand All @@ -49,8 +83,6 @@ impl<'a, T> AlignedRead<'a, T> {
self.aligned_buf
}

/// where to read into
/// aligned_buf and its len need to be aligned with DISK_IO_ALIGNMENT
pub fn aligned_buf_mut(&mut self) -> &mut [T] {
self.aligned_buf
}
Expand All @@ -59,54 +91,66 @@ impl<'a, T> AlignedRead<'a, T> {
#[cfg(test)]
mod tests {
use super::*;
use diskann::ANNErrorKind;
use diskann_quantization::alloc::{AlignedAllocator, Poly};

fn aligned_512(len: usize) -> Poly<[u8], AlignedAllocator> {
Poly::broadcast(0u8, len, AlignedAllocator::A512).unwrap()
}

#[test]
fn test_aligned_read_valid() {
fn aligned_read_carries_offset_and_buffer() {
let mut buffer = vec![0u8; 512];
let aligned_read = AlignedRead::new(0, &mut buffer);

assert!(aligned_read.is_ok());
let aligned_read = aligned_read.unwrap();
assert_eq!(aligned_read.offset(), 0);
assert_eq!(aligned_read.aligned_buf().len(), 512);
let read = AlignedRead::<u8, A1>::new(512, &mut buffer).unwrap();
assert_eq!(read.offset(), 512);
assert_eq!(read.aligned_buf().len(), 512);
}

#[test]
fn test_aligned_read_valid_offset() {
let mut buffer = vec![0u8; 1024];
let aligned_read = AlignedRead::new(512, &mut buffer);

assert!(aligned_read.is_ok());
let aligned_read = aligned_read.unwrap();
assert_eq!(aligned_read.offset(), 512);
fn aligned_read_buffer_access() {
let mut buffer = vec![42u8; 512];
let mut read = AlignedRead::<u8, A1>::new(0, &mut buffer).unwrap();
assert_eq!(read.aligned_buf()[0], 42);
read.aligned_buf_mut()[0] = 100;
assert_eq!(read.aligned_buf()[0], 100);
}

#[test]
fn test_aligned_read_invalid_offset() {
let mut buffer = vec![0u8; 512];
let aligned_read = AlignedRead::new(100, &mut buffer);

assert!(aligned_read.is_err());
fn a512_accepts_fully_aligned_request() {
let mut buf = aligned_512(512);
AlignedRead::<u8, A512>::new(0, &mut buf).expect("aligned request should pass");
}

#[test]
fn test_aligned_read_invalid_buffer_size() {
fn a1_default_accepts_anything() {
let mut buffer = vec![0u8; 100];
let aligned_read = AlignedRead::new(0, &mut buffer);

assert!(aligned_read.is_err());
AlignedRead::<u8, A1>::new(1, &mut buffer).expect("A1 alignment should accept any request");
}

#[test]
fn test_aligned_read_buffer_access() {
let mut buffer = vec![42u8; 512];
let mut aligned_read = AlignedRead::new(0, &mut buffer).unwrap();
fn rejects_unaligned_buffer_pointer() {
let mut buf = aligned_512(1024);
let slice = &mut buf[1..513]; // ptr offset by 1; length 512 ✓; offset 0 ✓
let err = AlignedRead::<u8, A512>::new(0, slice)
.expect_err("misaligned buffer pointer should be rejected");
assert_eq!(err.kind(), ANNErrorKind::DiskIOAlignmentError);
}

// Test immutable access
assert_eq!(aligned_read.aligned_buf()[0], 42);
#[test]
fn rejects_unaligned_buffer_length() {
let mut buf = aligned_512(1024);
let slice = &mut buf[..100]; // ptr ✓; length 100 ✗; offset 0 ✓
let err = AlignedRead::<u8, A512>::new(0, slice)
.expect_err("buffer length 100 (not a multiple of 512) should be rejected");
assert_eq!(err.kind(), ANNErrorKind::DiskIOAlignmentError);
}

// Test mutable access
aligned_read.aligned_buf_mut()[0] = 100;
assert_eq!(aligned_read.aligned_buf()[0], 100);
#[test]
fn rejects_unaligned_offset() {
let mut buf = aligned_512(1024);
let slice = &mut buf[..512]; // ptr ✓; length 512 ✓; offset 1 ✗
let err = AlignedRead::<u8, A512>::new(1, slice)
.expect_err("offset 1 (not a multiple of 512) should be rejected");
assert_eq!(err.kind(), ANNErrorKind::DiskIOAlignmentError);
}
}
Loading
Loading