Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion diskann-benchmark/src/backend/index/product.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ mod imp {
train_data.as_view(),
self.input.num_pq_chunks,
&mut StdRng::seed_from_u64(self.input.seed),
build.num_threads,
&diskann_providers::utils::create_thread_pool(build.num_threads)?,
)?
};

Expand Down
4 changes: 2 additions & 2 deletions diskann-disk/src/build/builder/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,13 +289,13 @@ where

let generator = QuantDataGenerator::<
Data::VectorDataType,
PQGeneration<Data::VectorDataType, StorageProvider, &RayonThreadPool>,
PQGeneration<Data::VectorDataType, StorageProvider>,
>::new(
self.index_writer.get_dataset_file(),
generator_context,
&quantizer_context,
)?;
let progress = generator.generate_data(storage_provider, &pool, &self.chunking_config)?;
let progress = generator.generate_data(storage_provider, pool, &self.chunking_config)?;

checkpoint_context.update(progress.clone())?;
if let Progress::Processed(progress_point) = progress {
Expand Down
2 changes: 1 addition & 1 deletion diskann-disk/src/build/builder/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,7 @@ impl<'a> MergedVamanaIndexWorkflow<'a> {
builder.disk_build_param.build_memory_limit().in_bytes() as f64;
// calculate how many partitions we need, in order to fit in RAM budget
// save id_map for each partition to disk
partition_with_ram_budget::<Data::VectorDataType, _, _, _>(
partition_with_ram_budget::<Data::VectorDataType, _, _>(
&self.dataset_file,
builder.index_configuration.dim,
sampling_rate,
Expand Down
4 changes: 2 additions & 2 deletions diskann-disk/src/build/builder/quantizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use diskann_providers::{
FixedChunkPQTable, IndexConfiguration, MAX_PQ_TRAINING_SET_SIZE,
},
storage::{PQStorage, SQStorage},
utils::{BridgeErr, PQPathNames},
utils::{create_thread_pool, BridgeErr, PQPathNames},
};
use diskann_quantization::scalar::train::ScalarQuantizationParameters;
use diskann_utils::views::MatrixView;
Expand Down Expand Up @@ -63,7 +63,7 @@ impl BuildQuantizer {
MatrixView::try_from(&train_data, train_size, train_dim).bridge_err()?,
num_chunks,
&mut rnd,
index_configuration.num_threads,
&create_thread_pool(index_configuration.num_threads)?,
)?
};
// Save at checkpoint. Note the the compressed data path and pivots path here
Expand Down
13 changes: 5 additions & 8 deletions diskann-disk/src/storage/quant/generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,8 @@ use std::{

use diskann::{error::IntoANNResult, utils::VectorRepr, ANNError, ANNResult};
use diskann_providers::storage::{StorageReadProvider, StorageWriteProvider};
use diskann_providers::{
forward_threadpool,
utils::{load_metadata_from_file, AsThreadPool, BridgeErr, ParallelIteratorInPool, Timer},
use diskann_providers::utils::{
load_metadata_from_file, BridgeErr, ParallelIteratorInPool, RayonThreadPool, Timer,
};
use diskann_utils::{io::Metadata, views};
use rayon::iter::IndexedParallelIterator;
Expand Down Expand Up @@ -99,15 +98,14 @@ where
/// 4. Processes data in blocks of size given by chunking_config.data_compression_chunk_vector_count = 50_000
/// 5. Compresses each block in small batch sizes in parallel to (potentially) take advantage of batch compression with quantizer
/// 6. Writes compressed blocks to the output file.
pub fn generate_data<Storage, Pool>(
pub fn generate_data<Storage>(
&self,
storage_provider: &Storage, // Provider for reading source data and writing compressed results
pool: &Pool, // Thread pool for parallel processing
pool: &RayonThreadPool, // Thread pool for parallel processing
chunking_config: &ChunkingConfig, // Configuration for batching and checkpoint handling
) -> ANNResult<Progress>
where
Storage: StorageReadProvider + StorageWriteProvider,
Pool: AsThreadPool,
{
let timer = Timer::new();

Expand Down Expand Up @@ -157,7 +155,6 @@ where

let mut compressed_buffer = vec![0_u8; block_size * compressed_size];

forward_threadpool!(pool = pool: Pool);
//Every block has size exactly block_size, except for potentially the last one
let action = |block_index| -> ANNResult<()> {
let start_index: usize = offset + block_index * block_size;
Expand Down Expand Up @@ -431,7 +428,7 @@ mod generator_tests {
)
.unwrap();
// Run generator
let result = generator.generate_data(storage_provider, &&pool, chunking_config);
let result = generator.generate_data(storage_provider, &pool, chunking_config);
(generator, result)
}

Expand Down
33 changes: 13 additions & 20 deletions diskann-disk/src/storage/quant/pq/pq_generation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,12 @@ use std::marker::PhantomData;
use diskann::{utils::VectorRepr, ANNError};
use diskann_providers::storage::{StorageReadProvider, StorageWriteProvider};
use diskann_providers::{
forward_threadpool,
model::{
pq::{accum_row_inplace, generate_pq_pivots},
GeneratePivotArguments,
},
storage::PQStorage,
utils::{AsThreadPool, BridgeErr, Timer},
utils::{BridgeErr, RayonThreadPool, Timer},
};
use diskann_quantization::{product::TransposedTable, CompressInto};
use diskann_utils::views::MatrixBase;
Expand All @@ -23,43 +22,39 @@ use tracing::info;

use crate::storage::quant::compressor::{CompressionStage, QuantCompressor};

pub struct PQGenerationContext<'a, Storage, Pool>
pub struct PQGenerationContext<'a, Storage>
where
Storage: StorageReadProvider + StorageWriteProvider,
Pool: AsThreadPool,
{
pub pq_storage: PQStorage,
pub num_chunks: usize,
pub seed: Option<u64>,
pub p_val: f64,
pub storage_provider: &'a Storage,
pub pool: Pool,
pub pool: &'a RayonThreadPool,
pub metric: Metric,
pub dim: usize,
pub max_kmeans_reps: usize,
pub num_centers: usize,
}

pub struct PQGeneration<'a, T, Storage, Pool>
pub struct PQGeneration<'a, T, Storage>
where
T: VectorRepr,
Storage: StorageReadProvider + StorageWriteProvider + 'a,
Pool: AsThreadPool,
{
table: TransposedTable,
num_chunks: usize,
phantom_data: PhantomData<T>,
phantom_storage: PhantomData<&'a Storage>,
phantom_pool: PhantomData<Pool>,
}

impl<'a, T, Storage, Pool> QuantCompressor<T> for PQGeneration<'a, T, Storage, Pool>
impl<'a, T, Storage> QuantCompressor<T> for PQGeneration<'a, T, Storage>
where
T: VectorRepr,
Storage: StorageReadProvider + StorageWriteProvider + 'a,
Pool: AsThreadPool,
{
type CompressorContext = PQGenerationContext<'a, Storage, Pool>;
type CompressorContext = PQGenerationContext<'a, Storage>;

fn new_at_stage(
stage: CompressionStage,
Expand All @@ -76,8 +71,7 @@ where
.pq_storage
.pivot_data_exist(context.storage_provider);

let pool = &context.pool;
forward_threadpool!(pool = pool: Pool);
let pool = context.pool;

if !pivots_exists {
if stage == CompressionStage::Resume {
Expand Down Expand Up @@ -156,7 +150,6 @@ where
table,
num_chunks,
phantom_data: PhantomData,
phantom_pool: PhantomData,
phantom_storage: PhantomData,
})
}
Expand Down Expand Up @@ -188,7 +181,7 @@ mod pq_generation_tests {
use diskann_providers::storage::{
PQStorage, StorageReadProvider, StorageWriteProvider, VirtualStorageProvider,
};
use diskann_providers::utils::{create_thread_pool_for_test, AsThreadPool};
use diskann_providers::utils::{create_thread_pool_for_test, RayonThreadPool};
use diskann_utils::{
io::{read_bin, write_bin},
test_data_root,
Expand All @@ -212,21 +205,21 @@ mod pq_generation_tests {
100.0f32, 100.0f32, 100.0f32, 100.0f32, 100.0f32, 100.0f32, 100.0f32,
];
#[allow(clippy::too_many_arguments)]
fn create_new_compressor<'a, R: AsThreadPool, F: vfs::FileSystem>(
fn create_new_compressor<'a, F: vfs::FileSystem>(
stage: CompressionStage,
provider: &'a VirtualStorageProvider<F>,
dim: usize,
num_chunks: usize,
max_kmeans_reps: usize,
num_centers: usize,
p_val: f64,
pool: R,
pool: &'a RayonThreadPool,
pivots_path: String,
compressed_path: String,
data_path: Option<&str>,
) -> Result<PQGeneration<'a, f32, VirtualStorageProvider<F>, R>, ANNError> {
) -> Result<PQGeneration<'a, f32, VirtualStorageProvider<F>>, ANNError> {
let pq_storage = PQStorage::new(&pivots_path, &compressed_path, data_path);
let context = PQGenerationContext::<'_, _, _> {
let context = PQGenerationContext::<'_, _> {
pq_storage,
num_chunks,
num_centers,
Expand All @@ -238,7 +231,7 @@ mod pq_generation_tests {
metric: Metric::L2,
dim,
};
PQGeneration::<_, _, _>::new_at_stage(stage, &context)
PQGeneration::<_, _>::new_at_stage(stage, &context)
}

#[rstest]
Expand Down
21 changes: 7 additions & 14 deletions diskann-disk/src/utils/kmeans.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,7 @@
use std::cmp::min;

use diskann::{ANNError, ANNResult};
use diskann_providers::{
forward_threadpool,
utils::{AsThreadPool, ParallelIteratorInPool, RayonThreadPool},
};
use diskann_providers::utils::{ParallelIteratorInPool, RayonThreadPool};
use diskann_vector::{distance::SquaredL2, PureDistanceFunction};
use hashbrown::HashSet;
use rand::{
Expand Down Expand Up @@ -118,15 +115,15 @@ fn lloyds_iter(
/// new vec<usize> [num_centers]`, and `closest_center = new size_t[num_points]`
/// Final centers are output in centers as row-major num_centers * dim.
#[allow(clippy::too_many_arguments)]
pub fn run_lloyds<Pool: AsThreadPool>(
pub fn run_lloyds(
data: &[f32],
num_points: usize,
dim: usize,
centers: &mut [f32],
num_centers: usize,
max_reps: usize,
cancellation_token: &mut bool,
pool: Pool,
pool: &RayonThreadPool,
) -> ANNResult<(Vec<Vec<usize>>, Vec<u32>, f32)> {
let mut residual = f32::MAX;

Expand All @@ -135,7 +132,6 @@ pub fn run_lloyds<Pool: AsThreadPool>(

let mut docs_l2sq = vec![0.0; num_points];

forward_threadpool!(pool = pool);
compute_vecs_l2sq(&mut docs_l2sq, data, num_points, dim, pool)?;

let mut old_residual;
Expand Down Expand Up @@ -231,15 +227,15 @@ fn select_random_pivots(
/// If there are are fewer than num_center distinct points, pick all unique points as pivots,
/// and sample data randomly for the remaining pivots.
#[allow(clippy::too_many_arguments)]
pub fn k_meanspp_selecting_pivots<Pool: AsThreadPool>(
pub fn k_meanspp_selecting_pivots(
data: &[f32],
num_points: usize,
dim: usize,
pivot_data: &mut [f32],
num_centers: usize,
rng: &mut impl Rng,
cancellation_token: &mut bool,
pool: Pool,
pool: &RayonThreadPool,
) -> ANNResult<()> {
if num_points > (1 << 23) {
return Err(ANNError::log_kmeans_error(format!(
Expand Down Expand Up @@ -280,7 +276,6 @@ pub fn k_meanspp_selecting_pivots<Pool: AsThreadPool>(

let mut dist = vec![0.0; num_points];

forward_threadpool!(pool = pool);
// Calculate the distance between each node and the first pivot and store the result in dist.
dist.par_iter_mut()
.enumerate()
Expand Down Expand Up @@ -394,7 +389,7 @@ pub fn k_meanspp_selecting_pivots<Pool: AsThreadPool>(

/// k-means algorithm interface
#[allow(clippy::too_many_arguments)]
pub fn k_means_clustering<Pool: AsThreadPool>(
pub fn k_means_clustering(
data: &[f32],
num_points: usize,
dim: usize,
Expand All @@ -403,10 +398,8 @@ pub fn k_means_clustering<Pool: AsThreadPool>(
max_reps: usize,
rng: &mut impl Rng,
cancellation_token: &mut bool,
pool: Pool,
pool: &RayonThreadPool,
) -> ANNResult<(Vec<Vec<usize>>, Vec<u32>, f32)> {
forward_threadpool!(pool = pool);

k_meanspp_selecting_pivots(
data,
num_points,
Expand Down
16 changes: 5 additions & 11 deletions diskann-disk/src/utils/math_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,7 @@ use std::{cmp::Ordering, collections::BinaryHeap};

use diskann::{ANNError, ANNResult};
use diskann_linalg::{self, Transpose};
use diskann_providers::{
forward_threadpool,
utils::{AsThreadPool, ParallelIteratorInPool, RayonThreadPool},
};
use diskann_providers::utils::{ParallelIteratorInPool, RayonThreadPool};
use rayon::prelude::*;

// This is the chunk size applied when computing the closest centers in a block.
Expand Down Expand Up @@ -90,12 +87,12 @@ fn compute_vec_l2sq(data: &[f32], index: usize, dim: usize) -> f32 {

/// Compute L2-squared norms of data stored in row-major num_points * dim,
/// need to be pre-allocated
pub fn compute_vecs_l2sq<Pool: AsThreadPool>(
pub fn compute_vecs_l2sq(
vecs_l2sq: &mut [f32],
data: &[f32],
num_points: usize,
dim: usize,
pool: Pool,
pool: &RayonThreadPool,
) -> ANNResult<()> {
if data.len() != num_points * dim {
return Err(ANNError::log_pq_error(format_args!(
Expand All @@ -111,7 +108,6 @@ pub fn compute_vecs_l2sq<Pool: AsThreadPool>(
*vec_l2sq = compute_vec_l2sq(data, i, dim);
}
} else {
forward_threadpool!(pool = pool);
vecs_l2sq
.par_iter_mut()
.enumerate()
Expand Down Expand Up @@ -246,7 +242,7 @@ pub fn compute_closest_centers_in_block(
/// indices is an empty vector. Additionally, if pts_norms_squared is not null,
/// then it will assume that point norms are pre-computed and use those values
#[allow(clippy::too_many_arguments)]
pub fn compute_closest_centers<Pool: AsThreadPool>(
pub fn compute_closest_centers(
data: &[f32],
num_points: usize,
dim: usize,
Expand All @@ -256,7 +252,7 @@ pub fn compute_closest_centers<Pool: AsThreadPool>(
closest_centers_ivf: &mut [u32],
mut inverted_index: Option<&mut Vec<Vec<usize>>>,
pts_norms_squared: Option<&[f32]>,
pool: Pool,
pool: &RayonThreadPool,
) -> ANNResult<()> {
if k > num_centers {
return Err(ANNError::log_index_error(format_args!(
Expand All @@ -265,8 +261,6 @@ pub fn compute_closest_centers<Pool: AsThreadPool>(
)));
}

forward_threadpool!(pool = pool);

let pts_norms_squared = if let Some(pts_norms) = pts_norms_squared {
pts_norms.to_vec()
} else {
Expand Down
Loading
Loading