Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions benchmarks/compress-bench/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,10 @@ pub mod vortex;
pub fn chunked_to_vec_record_batch(
chunked: ChunkedArray,
) -> anyhow::Result<(Vec<RecordBatch>, Arc<Schema>)> {
let chunks_vec = chunked.chunks();
assert!(!chunks_vec.is_empty(), "empty chunks");
assert!(chunked.nchunks() > 0, "empty chunks");

let batches = chunks_vec
.iter()
let batches = chunked
.iter_chunks()
.map(|array| {
// TODO(connor)[ListView]: The rust Parquet implementation does not support writing
// `ListView` to Parquet files yet.
Expand Down
24 changes: 10 additions & 14 deletions encodings/alp/public-api.lock
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,6 @@ pub fn vortex_alp::ALP::buffer_name(_array: &vortex_alp::ALPArray, _idx: usize)

pub fn vortex_alp::ALP::build(dtype: &vortex_array::dtype::DType, len: usize, metadata: &Self::Metadata, _buffers: &[vortex_array::buffer::BufferHandle], children: &dyn vortex_array::serde::ArrayChildren) -> vortex_error::VortexResult<vortex_alp::ALPArray>

pub fn vortex_alp::ALP::child(array: &vortex_alp::ALPArray, idx: usize) -> vortex_array::array::ArrayRef

pub fn vortex_alp::ALP::child_name(array: &vortex_alp::ALPArray, idx: usize) -> alloc::string::String

pub fn vortex_alp::ALP::deserialize(bytes: &[u8], _dtype: &vortex_array::dtype::DType, _len: usize, _buffers: &[vortex_array::buffer::BufferHandle], _session: &vortex_session::VortexSession) -> vortex_error::VortexResult<Self::Metadata>

pub fn vortex_alp::ALP::dtype(array: &vortex_alp::ALPArray) -> &vortex_array::dtype::DType
Expand All @@ -84,15 +80,17 @@ pub fn vortex_alp::ALP::metadata(array: &vortex_alp::ALPArray) -> vortex_error::

pub fn vortex_alp::ALP::nbuffers(_array: &vortex_alp::ALPArray) -> usize

pub fn vortex_alp::ALP::nchildren(array: &vortex_alp::ALPArray) -> usize

pub fn vortex_alp::ALP::reduce_parent(array: &Self::Array, parent: &vortex_array::array::ArrayRef, child_idx: usize) -> vortex_error::VortexResult<core::option::Option<vortex_array::array::ArrayRef>>

pub fn vortex_alp::ALP::serialize(metadata: Self::Metadata) -> vortex_error::VortexResult<core::option::Option<alloc::vec::Vec<u8>>>

pub fn vortex_alp::ALP::slot_name(_array: &vortex_alp::ALPArray, idx: usize) -> alloc::string::String

pub fn vortex_alp::ALP::slots(array: &vortex_alp::ALPArray) -> &[core::option::Option<vortex_array::array::ArrayRef>]

pub fn vortex_alp::ALP::stats(array: &vortex_alp::ALPArray) -> vortex_array::stats::array::StatsSetRef<'_>

pub fn vortex_alp::ALP::with_children(array: &mut Self::Array, children: alloc::vec::Vec<vortex_array::array::ArrayRef>) -> vortex_error::VortexResult<()>
pub fn vortex_alp::ALP::with_slots(array: &mut vortex_alp::ALPArray, slots: alloc::vec::Vec<core::option::Option<vortex_array::array::ArrayRef>>) -> vortex_error::VortexResult<()>

impl vortex_array::vtable::operations::OperationsVTable<vortex_alp::ALP> for vortex_alp::ALP

Expand Down Expand Up @@ -220,10 +218,6 @@ pub fn vortex_alp::ALPRD::buffer_name(_array: &vortex_alp::ALPRDArray, _idx: usi

pub fn vortex_alp::ALPRD::build(dtype: &vortex_array::dtype::DType, len: usize, metadata: &Self::Metadata, _buffers: &[vortex_array::buffer::BufferHandle], children: &dyn vortex_array::serde::ArrayChildren) -> vortex_error::VortexResult<vortex_alp::ALPRDArray>

pub fn vortex_alp::ALPRD::child(array: &vortex_alp::ALPRDArray, idx: usize) -> vortex_array::array::ArrayRef

pub fn vortex_alp::ALPRD::child_name(array: &vortex_alp::ALPRDArray, idx: usize) -> alloc::string::String

pub fn vortex_alp::ALPRD::deserialize(bytes: &[u8], _dtype: &vortex_array::dtype::DType, _len: usize, _buffers: &[vortex_array::buffer::BufferHandle], _session: &vortex_session::VortexSession) -> vortex_error::VortexResult<Self::Metadata>

pub fn vortex_alp::ALPRD::dtype(array: &vortex_alp::ALPRDArray) -> &vortex_array::dtype::DType
Expand All @@ -240,15 +234,17 @@ pub fn vortex_alp::ALPRD::metadata(array: &vortex_alp::ALPRDArray) -> vortex_err

pub fn vortex_alp::ALPRD::nbuffers(_array: &vortex_alp::ALPRDArray) -> usize

pub fn vortex_alp::ALPRD::nchildren(array: &vortex_alp::ALPRDArray) -> usize

pub fn vortex_alp::ALPRD::reduce_parent(array: &Self::Array, parent: &vortex_array::array::ArrayRef, child_idx: usize) -> vortex_error::VortexResult<core::option::Option<vortex_array::array::ArrayRef>>

pub fn vortex_alp::ALPRD::serialize(metadata: Self::Metadata) -> vortex_error::VortexResult<core::option::Option<alloc::vec::Vec<u8>>>

pub fn vortex_alp::ALPRD::slot_name(_array: &vortex_alp::ALPRDArray, idx: usize) -> alloc::string::String

pub fn vortex_alp::ALPRD::slots(array: &vortex_alp::ALPRDArray) -> &[core::option::Option<vortex_array::array::ArrayRef>]

pub fn vortex_alp::ALPRD::stats(array: &vortex_alp::ALPRDArray) -> vortex_array::stats::array::StatsSetRef<'_>

pub fn vortex_alp::ALPRD::with_children(array: &mut Self::Array, children: alloc::vec::Vec<vortex_array::array::ArrayRef>) -> vortex_error::VortexResult<()>
pub fn vortex_alp::ALPRD::with_slots(array: &mut vortex_alp::ALPRDArray, slots: alloc::vec::Vec<core::option::Option<vortex_array::array::ArrayRef>>) -> vortex_error::VortexResult<()>

impl vortex_array::vtable::operations::OperationsVTable<vortex_alp::ALPRD> for vortex_alp::ALPRD

Expand Down
154 changes: 76 additions & 78 deletions encodings/alp/src/alp/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,10 @@ use vortex_array::vtable::ArrayId;
use vortex_array::vtable::VTable;
use vortex_array::vtable::ValidityChild;
use vortex_array::vtable::ValidityVTableFromChild;
use vortex_array::vtable::patches_child;
use vortex_array::vtable::patches_child_name;
use vortex_array::vtable::patches_nchildren;
use vortex_error::VortexExpect;
use vortex_error::VortexResult;
use vortex_error::vortex_bail;
use vortex_error::vortex_ensure;
use vortex_error::vortex_err;
use vortex_error::vortex_panic;
use vortex_session::VortexSession;

Expand All @@ -59,7 +55,7 @@ impl VTable for ALP {
}

fn len(array: &ALPArray) -> usize {
array.encoded.len()
array.encoded().len()
}

fn dtype(array: &ALPArray) -> &DType {
Expand All @@ -72,14 +68,14 @@ impl VTable for ALP {

fn array_hash<H: std::hash::Hasher>(array: &ALPArray, state: &mut H, precision: Precision) {
array.dtype.hash(state);
array.encoded.array_hash(state, precision);
array.encoded().array_hash(state, precision);
array.exponents.hash(state);
array.patches.array_hash(state, precision);
}

fn array_eq(array: &ALPArray, other: &ALPArray, precision: Precision) -> bool {
array.dtype == other.dtype
&& array.encoded.array_eq(&other.encoded, precision)
&& array.encoded().array_eq(other.encoded(), precision)
&& array.exponents == other.exponents
&& array.patches.array_eq(&other.patches, precision)
}
Expand All @@ -96,32 +92,41 @@ impl VTable for ALP {
None
}

fn nchildren(array: &ALPArray) -> usize {
1 + array.patches().map_or(0, patches_nchildren)
fn slots(array: &ALPArray) -> &[Option<ArrayRef>] {
&array.slots
}

fn child(array: &ALPArray, idx: usize) -> ArrayRef {
match idx {
0 => array.encoded().clone(),
_ => {
let patches = array
.patches()
.unwrap_or_else(|| vortex_panic!("ALPArray child index {idx} out of bounds"));
patches_child(patches, idx - 1)
}
}
fn slot_name(_array: &ALPArray, idx: usize) -> String {
SLOT_NAMES[idx].to_string()
}

fn child_name(array: &ALPArray, idx: usize) -> String {
match idx {
0 => "encoded".to_string(),
_ => {
if array.patches().is_none() {
vortex_panic!("ALPArray child_name index {idx} out of bounds");
}
patches_child_name(idx - 1).to_string()
fn with_slots(array: &mut ALPArray, slots: Vec<Option<ArrayRef>>) -> VortexResult<()> {
vortex_ensure!(
slots.len() == NUM_SLOTS,
"ALPArray expects {} slots, got {}",
NUM_SLOTS,
slots.len()
);

// Reconstruct patches from slots + existing metadata
array.patches = match (&slots[PATCH_INDICES_SLOT], &slots[PATCH_VALUES_SLOT]) {
(Some(indices), Some(values)) => {
let old = array
.patches
.as_ref()
.vortex_expect("ALPArray had patch slots but no patches metadata");
Some(Patches::new(
old.array_len(),
old.offset(),
indices.clone(),
values.clone(),
slots[PATCH_CHUNK_OFFSETS_SLOT].clone(),
)?)
}
}
_ => None,
};
array.slots = slots;
Ok(())
}

fn metadata(array: &ALPArray) -> VortexResult<Self::Metadata> {
Expand Down Expand Up @@ -190,51 +195,6 @@ impl VTable for ALP {
)
}

fn with_children(array: &mut Self::Array, children: Vec<ArrayRef>) -> VortexResult<()> {
// Children: encoded, patches (if present): indices, values, chunk_offsets (optional)
let patches_info = array
.patches
.as_ref()
.map(|p| (p.array_len(), p.offset(), p.chunk_offsets().is_some()));

let expected_children = match &patches_info {
Some((_, _, has_chunk_offsets)) => 1 + 2 + if *has_chunk_offsets { 1 } else { 0 },
None => 1,
};

vortex_ensure!(
children.len() == expected_children,
"ALPArray expects {} children, got {}",
expected_children,
children.len()
);

let mut children_iter = children.into_iter();
array.encoded = children_iter
.next()
.ok_or_else(|| vortex_err!("Expected encoded child"))?;

if let Some((array_len, offset, _has_chunk_offsets)) = patches_info {
let indices = children_iter
.next()
.ok_or_else(|| vortex_err!("Expected patch indices child"))?;
let values = children_iter
.next()
.ok_or_else(|| vortex_err!("Expected patch values child"))?;
let chunk_offsets = children_iter.next();

array.patches = Some(Patches::new(
array_len,
offset,
indices,
values,
chunk_offsets,
)?);
}

Ok(())
}

fn execute(array: &Self::Array, ctx: &mut ExecutionCtx) -> VortexResult<ExecutionStep> {
// TODO(joe): take by value
Ok(ExecutionStep::Done(
Expand All @@ -260,9 +220,21 @@ impl VTable for ALP {
}
}

pub(super) const ENCODED_SLOT: usize = 0;
pub(super) const PATCH_INDICES_SLOT: usize = 1;
pub(super) const PATCH_VALUES_SLOT: usize = 2;
pub(super) const PATCH_CHUNK_OFFSETS_SLOT: usize = 3;
pub(super) const NUM_SLOTS: usize = 4;
pub(super) const SLOT_NAMES: [&str; NUM_SLOTS] = [
"encoded",
"patch_indices",
"patch_values",
"patch_chunk_offsets",
];

#[derive(Clone, Debug)]
pub struct ALPArray {
encoded: ArrayRef,
slots: Vec<Option<ArrayRef>>,
patches: Option<Patches>,
dtype: DType,
exponents: Exponents,
Expand Down Expand Up @@ -431,9 +403,11 @@ impl ALPArray {
_ => unreachable!(),
};

let slots = Self::make_slots(&encoded, &patches);

Ok(Self {
dtype,
encoded,
slots,
exponents,
patches,
stats_set: Default::default(),
Expand All @@ -450,21 +424,42 @@ impl ALPArray {
patches: Option<Patches>,
dtype: DType,
) -> Self {
let slots = Self::make_slots(&encoded, &patches);

Self {
dtype,
encoded,
slots,
exponents,
patches,
stats_set: Default::default(),
}
}

fn make_slots(encoded: &ArrayRef, patches: &Option<Patches>) -> Vec<Option<ArrayRef>> {
let (patch_indices, patch_values, patch_chunk_offsets) = match patches {
Some(p) => (
Some(p.indices().clone()),
Some(p.values().clone()),
p.chunk_offsets().clone(),
),
None => (None, None, None),
};
vec![
Some(encoded.clone()),
patch_indices,
patch_values,
patch_chunk_offsets,
]
}

pub fn ptype(&self) -> PType {
self.dtype.as_ptype()
}

pub fn encoded(&self) -> &ArrayRef {
&self.encoded
self.slots[ENCODED_SLOT]
.as_ref()
.vortex_expect("ALPArray encoded slot")
}

#[inline]
Expand All @@ -479,7 +474,10 @@ impl ALPArray {
/// Consumes the array and returns its parts.
#[inline]
pub fn into_parts(self) -> (ArrayRef, Exponents, Option<Patches>, DType) {
(self.encoded, self.exponents, self.patches, self.dtype)
let encoded = self.slots[ENCODED_SLOT]
.clone()
.vortex_expect("ALPArray encoded slot");
(encoded, self.exponents, self.patches, self.dtype)
}
}

Expand Down
Loading
Loading