Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions diskann-disk/benches/benchmarks/aligned_file_reader_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use diskann_disk::utils::aligned_file_reader::{
traits::{AlignedFileReader, AlignedReaderFactory},
AlignedFileReaderFactory, AlignedRead,
};
use diskann_providers::common::AlignedBoxWithSlice;
use diskann_quantization::{alloc::aligned_slice, num::PowerOfTwo};

pub const TEST_INDEX_PATH: &str =
"../test_data/disk_index_misc/disk_index_siftsmall_learn_256pts_R4_L50_A1.2_aligned_reader_test.index";
Expand All @@ -33,12 +33,11 @@ pub fn benchmark_aligned_file_reader(c: &mut Criterion) {

let read_length = 512;
let num_read = MAX_IO_CONCURRENCY * 100; // The LinuxAlignedFileReader batches reads according to MAX_IO_CONCURRENCY. Make sure we have many batches to handle.
let mut aligned_mem = AlignedBoxWithSlice::<u8>::new(read_length * num_read, 512).unwrap();
let mut aligned_mem =
aligned_slice::<u8>(read_length * num_read, PowerOfTwo::new(512).unwrap()).unwrap();

// create and add AlignedReads to the vector
let mut mem_slices = aligned_mem
.split_into_nonoverlapping_mut_slices(0..aligned_mem.len(), read_length)
.unwrap();
let mut mem_slices: Vec<&mut [u8]> = aligned_mem.chunks_mut(read_length).collect();

// Read the same data from disk over and over again. We guarantee that it is not all zeros.
let mut aligned_reads: Vec<AlignedRead<'_, u8>> = mem_slices
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use diskann_disk::utils::aligned_file_reader::{
traits::{AlignedFileReader, AlignedReaderFactory},
AlignedFileReaderFactory, AlignedRead,
};
use diskann_providers::common::AlignedBoxWithSlice;
use diskann_quantization::{alloc::aligned_slice, num::PowerOfTwo};

pub const TEST_INDEX_PATH: &str =
"../test_data/disk_index_misc/disk_index_siftsmall_learn_256pts_R4_L50_A1.2_aligned_reader_test.index";
Expand All @@ -24,12 +24,11 @@ pub fn benchmark_aligned_file_reader_iai() {

let read_length = 512;
let num_read = MAX_IO_CONCURRENCY * 100; // The LinuxAlignedFileReader batches reads according to MAX_IO_CONCURRENCY. Make sure we have many batches to handle.
let mut aligned_mem = AlignedBoxWithSlice::<u8>::new(read_length * num_read, 512).unwrap();
let mut aligned_mem =
aligned_slice::<u8>(read_length * num_read, PowerOfTwo::new(512).unwrap()).unwrap();

// create and add AlignedReads to the vector
let mut mem_slices = aligned_mem
.split_into_nonoverlapping_mut_slices(0..aligned_mem.len(), read_length)
.unwrap();
let mut mem_slices: Vec<&mut [u8]> = aligned_mem.chunks_mut(read_length).collect();

// Read the same data from disk over and over again. We guarantee that it is not all zeros.
let mut aligned_reads: Vec<AlignedRead<'_, u8>> = mem_slices
Expand Down
36 changes: 23 additions & 13 deletions diskann-disk/src/search/pq/pq_scratch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,25 @@

use diskann::{error::IntoANNResult, utils::VectorRepr, ANNError, ANNResult};

use diskann_providers::common::AlignedBoxWithSlice;
use diskann_quantization::{
alloc::{aligned_slice, AlignedSlice},
num::PowerOfTwo,
};

#[derive(Debug)]
/// PQ scratch
pub struct PQScratch {
/// Aligned pq table distance scratch, the length must be at least [256 * NCHUNKS]. 256 is the number of PQ centroids.
/// This is used to store the distance between each chunk in the query vector to each centroid, which is why the length is num of centroids * num of chunks
pub aligned_pqtable_dist_scratch: AlignedBoxWithSlice<f32>,
pub aligned_pqtable_dist_scratch: AlignedSlice<f32>,

/// Aligned dist scratch, must be at least diskann MAX_DEGREE
/// This is used to temporarily save the pq distance between query vector to the candidate vectors.
pub aligned_dist_scratch: AlignedBoxWithSlice<f32>,
pub aligned_dist_scratch: AlignedSlice<f32>,

/// Aligned pq coord scratch, must be at least [N_CHUNKS * MAX_DEGREE]
/// This is used to store the pq coordinates of the candidate vectors.
pub aligned_pq_coord_scratch: AlignedBoxWithSlice<u8>,
pub aligned_pq_coord_scratch: AlignedSlice<u8>,

/// Query scratch buffer stored as `f32`. `set` initializes it by copying/converting the
/// raw query values; `PQTable.PreprocessQuery` can then rotate or otherwise preprocess it.
Expand All @@ -30,7 +33,10 @@ pub struct PQScratch {

impl PQScratch {
/// 128 bytes alignment to optimize for the L2 Adjacent Cache Line Prefetcher.
const ALIGNED_ALLOC_128: usize = 128;
const ALIGNED_ALLOC_128: PowerOfTwo = match PowerOfTwo::new(128) {
Ok(v) => v,
Err(_) => unreachable!(),
};

/// Create a new pq scratch
pub fn new(
Expand All @@ -40,11 +46,13 @@ impl PQScratch {
num_centers: usize,
) -> ANNResult<Self> {
let aligned_pq_coord_scratch =
AlignedBoxWithSlice::new(graph_degree * num_pq_chunks, PQScratch::ALIGNED_ALLOC_128)?;
aligned_slice(graph_degree * num_pq_chunks, PQScratch::ALIGNED_ALLOC_128)
.map_err(ANNError::log_index_error)?;
let aligned_pqtable_dist_scratch =
AlignedBoxWithSlice::new(num_centers * num_pq_chunks, PQScratch::ALIGNED_ALLOC_128)?;
let aligned_dist_scratch =
AlignedBoxWithSlice::new(graph_degree, PQScratch::ALIGNED_ALLOC_128)?;
aligned_slice(num_centers * num_pq_chunks, PQScratch::ALIGNED_ALLOC_128)
.map_err(ANNError::log_index_error)?;
let aligned_dist_scratch = aligned_slice(graph_degree, PQScratch::ALIGNED_ALLOC_128)
.map_err(ANNError::log_index_error)?;
let rotated_query = vec![0.0f32; dim];

Ok(Self {
Expand Down Expand Up @@ -102,18 +110,20 @@ mod tests {
let mut pq_scratch: PQScratch =
PQScratch::new(graph_degree, dim, num_pq_chunks, num_centers).unwrap();

// Check alignment of the remaining AlignedBoxWithSlice buffers.
// Check alignment of the remaining AlignedSlice buffers.
assert_eq!(
(pq_scratch.aligned_pqtable_dist_scratch.as_ptr() as usize)
% PQScratch::ALIGNED_ALLOC_128,
% PQScratch::ALIGNED_ALLOC_128.raw(),
0
);
assert_eq!(
(pq_scratch.aligned_dist_scratch.as_ptr() as usize) % PQScratch::ALIGNED_ALLOC_128,
(pq_scratch.aligned_dist_scratch.as_ptr() as usize)
% PQScratch::ALIGNED_ALLOC_128.raw(),
0
);
assert_eq!(
(pq_scratch.aligned_pq_coord_scratch.as_ptr() as usize) % PQScratch::ALIGNED_ALLOC_128,
(pq_scratch.aligned_pq_coord_scratch.as_ptr() as usize)
% PQScratch::ALIGNED_ALLOC_128.raw(),
0
);

Expand Down
6 changes: 3 additions & 3 deletions diskann-disk/src/search/pq/quantizer_preprocess.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub fn quantizer_preprocess(
let dim = table.dim();
let expected_len = table.ncenters() * table.nchunks();
let dst = diskann_utils::views::MutMatrixView::try_from(
&mut pq_scratch.aligned_pqtable_dist_scratch.as_mut_slice()[..expected_len],
&mut (*pq_scratch.aligned_pqtable_dist_scratch)[..expected_len],
table.nchunks(),
table.ncenters(),
)
Expand Down Expand Up @@ -69,13 +69,13 @@ pub fn quantizer_preprocess(
// Compute the distance between each chunk of the query to each pq centroids.
table.populate_chunk_distances(
pq_scratch.rotated_query.as_slice(),
pq_scratch.aligned_pqtable_dist_scratch.as_mut_slice(),
&mut pq_scratch.aligned_pqtable_dist_scratch,
)?;
}
Metric::InnerProduct => {
table.populate_chunk_inner_products(
pq_scratch.rotated_query.as_slice(),
pq_scratch.aligned_pqtable_dist_scratch.as_mut_slice(),
&mut pq_scratch.aligned_pqtable_dist_scratch,
)?;
}
}
Expand Down
1 change: 0 additions & 1 deletion diskann-disk/src/search/provider/disk_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,6 @@ where
.scratch
.pq_scratch
.aligned_pqtable_dist_scratch
.as_slice()
.to_vec(),
})
}
Expand Down
44 changes: 30 additions & 14 deletions diskann-disk/src/search/provider/disk_sector_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@
use std::ops::Deref;

use diskann::{ANNError, ANNResult};
use diskann_providers::common::AlignedBoxWithSlice;
use diskann_quantization::{
alloc::{aligned_slice, AlignedSlice},
num::PowerOfTwo,
};

use crate::{
data_model::GraphHeader,
Expand All @@ -34,7 +37,7 @@ pub struct DiskSectorGraph<AlignedReaderType: AlignedFileReader> {
/// index info for multi-sector nodes
/// node `i` is in sector: [i * max_node_len.div_ceil(block_size)]
/// offset in sector: [0]
sectors_data: AlignedBoxWithSlice<u8>,
sectors_data: AlignedSlice<u8>,
/// Current sector index into which the next read reads data
cur_sector_idx: u64,

Expand Down Expand Up @@ -73,10 +76,11 @@ impl<AlignedReaderType: AlignedFileReader> DiskSectorGraph<AlignedReaderType> {

Ok(Self {
sector_reader,
sectors_data: AlignedBoxWithSlice::new(
sectors_data: aligned_slice(
max_n_batch_sector_read * num_sectors_per_node * block_size,
block_size,
)?,
PowerOfTwo::new(block_size).map_err(ANNError::log_index_error)?,
)
.map_err(ANNError::log_index_error)?,
cur_sector_idx: 0,
num_nodes_per_sector,
node_len,
Expand All @@ -90,10 +94,11 @@ impl<AlignedReaderType: AlignedFileReader> DiskSectorGraph<AlignedReaderType> {
pub fn reconfigure(&mut self, max_n_batch_sector_read: usize) -> ANNResult<()> {
if max_n_batch_sector_read > self.max_n_batch_sector_read {
self.max_n_batch_sector_read = max_n_batch_sector_read;
self.sectors_data = AlignedBoxWithSlice::new(
self.sectors_data = aligned_slice(
max_n_batch_sector_read * self.num_sectors_per_node * self.block_size,
self.block_size,
)?;
PowerOfTwo::new(self.block_size).map_err(ANNError::log_index_error)?,
)
.map_err(ANNError::log_index_error)?;
}
Ok(())
}
Expand All @@ -116,11 +121,22 @@ impl<AlignedReaderType: AlignedFileReader> DiskSectorGraph<AlignedReaderType> {
}

let len_per_node = self.num_sectors_per_node * self.block_size;
let mut sector_slices = self.sectors_data.split_into_nonoverlapping_mut_slices(
cur_sector_idx_usize * len_per_node
..(cur_sector_idx_usize + sectors_to_fetch.len()) * len_per_node,
len_per_node,
)?;
if len_per_node == 0 {
return Err(ANNError::log_index_error(format_args!(
"len_per_node is 0 (num_sectors_per_node={}, block_size={})",
self.num_sectors_per_node, self.block_size,
)));
}
let range = cur_sector_idx_usize * len_per_node
..(cur_sector_idx_usize + sectors_to_fetch.len()) * len_per_node;
debug_assert!(
range.len() % len_per_node == 0,
"range length {} is not divisible by {}",
range.len(),
len_per_node
);
Comment thread
JordanMaples marked this conversation as resolved.
let mut sector_slices: Vec<&mut [u8]> =
self.sectors_data[range].chunks_mut(len_per_node).collect();
let mut read_requests = Vec::with_capacity(sector_slices.len());
for (local_sector_idx, slice) in sector_slices.iter_mut().enumerate() {
let sector_id = sectors_to_fetch[local_sector_idx];
Expand Down Expand Up @@ -204,7 +220,7 @@ mod disk_sector_graph_test {
) -> DiskSectorGraph<<AlignedFileReaderFactory as AlignedReaderFactory>::AlignedReaderType>
{
DiskSectorGraph {
sectors_data: AlignedBoxWithSlice::new(512, 512).unwrap(),
sectors_data: aligned_slice(512, PowerOfTwo::new(512).unwrap()).unwrap(),
sector_reader,
cur_sector_idx: 0,
num_nodes_per_sector,
Expand Down
12 changes: 8 additions & 4 deletions diskann-disk/src/search/provider/disk_vertex_provider_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::{cmp::min, collections::VecDeque, sync::Arc, time::Instant};

use crate::data_model::GraphDataType;
use diskann::{graph::AdjacencyList, utils::TryIntoVectorId, ANNError, ANNResult};
use diskann_providers::common::AlignedBoxWithSlice;
use diskann_quantization::{alloc::aligned_slice, num::PowerOfTwo};
use hashbrown::HashSet;
use tracing::info;

Expand Down Expand Up @@ -52,14 +52,18 @@ where
// since this is the implementation for the disk vertex provider, there're only two kinds of sector lengths: 4096 and 512.
// it's okay to hardcoded at this place.
let buffer_len = GraphHeader::get_size().next_multiple_of(DEFAULT_DISK_SECTOR_LEN);
let mut read_buf = AlignedBoxWithSlice::<u8>::new(buffer_len, buffer_len)?;
let aligned_read = AlignedRead::new(0_u64, read_buf.as_mut_slice())?;
let mut read_buf = aligned_slice::<u8>(
buffer_len,
PowerOfTwo::new(buffer_len).map_err(ANNError::log_index_error)?,
)
.map_err(ANNError::log_index_error)?;
let aligned_read = AlignedRead::new(0_u64, &mut read_buf)?;
self.aligned_reader_factory
.build()?
.read(&mut [aligned_read])?;

// Create a GraphHeader from the buffer.
GraphHeader::try_from(&read_buf.as_slice()[8..])
GraphHeader::try_from(&read_buf[8..])
}

fn create_vertex_provider(
Expand Down
2 changes: 1 addition & 1 deletion diskann-disk/src/utils/aligned_file_reader/aligned_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use diskann::{ANNError, ANNResult};

pub const DISK_IO_ALIGNMENT: usize = 512;

/// Aligned read struct for disk IO, it takes the ownership of the AlignedBoxedSlice and returns the AlignedBoxWithSlice data immutably.
/// Aligned read struct for disk IO.
pub struct AlignedRead<'a, T> {
/// where to read from
/// offset needs to be aligned with DISK_IO_ALIGNMENT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ mod tests {
use serde::{Deserialize, Serialize};

use super::*;
use diskann_providers::common::AlignedBoxWithSlice;
use diskann_quantization::{alloc::aligned_slice, num::PowerOfTwo};
pub const TEST_INDEX_PATH: &str =
"../test_data/disk_index_misc/disk_index_siftsmall_learn_256pts_R4_L50_A1.2_aligned_reader_test.index";
pub const TRUTH_NODE_DATA_PATH: &str =
Expand Down Expand Up @@ -170,12 +170,11 @@ mod tests {

let read_length = 512; // adjust according to your logic
let num_read = 10;
let mut aligned_mem = AlignedBoxWithSlice::<u8>::new(read_length * num_read, 512).unwrap();
let mut aligned_mem =
aligned_slice::<u8>(read_length * num_read, PowerOfTwo::new(512).unwrap()).unwrap();

// create and add AlignedReads to the vector
let mut mem_slices = aligned_mem
.split_into_nonoverlapping_mut_slices(0..aligned_mem.len(), read_length)
.unwrap();
let mut mem_slices: Vec<&mut [u8]> = aligned_mem.chunks_mut(read_length).collect();

let mut aligned_reads: Vec<AlignedRead<'_, u8>> = mem_slices
.iter_mut()
Expand Down Expand Up @@ -215,12 +214,11 @@ mod tests {

let read_length = 512;
let num_read = MAX_IO_CONCURRENCY * 100; // The LinuxAlignedFileReader batches reads according to MAX_IO_CONCURRENCY. Make sure we have many batches to handle.
let mut aligned_mem = AlignedBoxWithSlice::<u8>::new(read_length * num_read, 512).unwrap();
let mut aligned_mem =
aligned_slice::<u8>(read_length * num_read, PowerOfTwo::new(512).unwrap()).unwrap();

// create and add AlignedReads to the vector
let mut mem_slices = aligned_mem
.split_into_nonoverlapping_mut_slices(0..aligned_mem.len(), read_length)
.unwrap();
let mut mem_slices: Vec<&mut [u8]> = aligned_mem.chunks_mut(read_length).collect();

// Read the same data from disk over and over again. We guarantee that it is not all zeros.
let mut aligned_reads: Vec<AlignedRead<'_, u8>> = mem_slices
Expand Down Expand Up @@ -257,12 +255,10 @@ mod tests {
let read_length = 512; // adjust according to your logic
let num_sector = 10;
let mut aligned_mem =
AlignedBoxWithSlice::<u8>::new(read_length * num_sector, 512).unwrap();
aligned_slice::<u8>(read_length * num_sector, PowerOfTwo::new(512).unwrap()).unwrap();

// Each slice will be used as the buffer for a read request of a sector.
let mut mem_slices = aligned_mem
.split_into_nonoverlapping_mut_slices(0..aligned_mem.len(), read_length)
.unwrap();
let mut mem_slices: Vec<&mut [u8]> = aligned_mem.chunks_mut(read_length).collect();

let mut aligned_reads: Vec<AlignedRead<'_, u8>> = mem_slices
.iter_mut()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ mod tests {
use diskann_utils::test_data_root;

use super::*;
use diskann_providers::common::AlignedBoxWithSlice;
use diskann_quantization::{alloc::aligned_slice, num::PowerOfTwo};

fn test_index_path() -> String {
"/disk_index_misc/disk_index_siftsmall_learn_256pts_R4_L50_A1.2_aligned_reader_test.index"
Expand All @@ -78,12 +78,11 @@ mod tests {

let read_length = 512;
let num_read = 10;
let mut aligned_mem = AlignedBoxWithSlice::<u8>::new(read_length * num_read, 512).unwrap();
let mut aligned_mem =
aligned_slice::<u8>(read_length * num_read, PowerOfTwo::new(512).unwrap()).unwrap();

// create and add AlignedReads to the vector
let mut mem_slices = aligned_mem
.split_into_nonoverlapping_mut_slices(0..aligned_mem.len(), read_length)
.unwrap();
let mut mem_slices: Vec<&mut [u8]> = aligned_mem.chunks_mut(read_length).collect();

let mut aligned_reads: Vec<AlignedRead<'_, u8>> = mem_slices
.iter_mut()
Expand Down
Loading
Loading