diff --git a/Cargo.lock b/Cargo.lock index 32430a2b181..cb3dc000cdd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2976,6 +2976,26 @@ dependencies = [ "syn 2.0.106", ] +[[package]] +name = "enum-map" +version = "2.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6866f3bfdf8207509a033af1a75a7b08abda06bbaaeae6669323fd5a097df2e9" +dependencies = [ + "enum-map-derive", +] + +[[package]] +name = "enum-map-derive" +version = "0.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f282cfdfe92516eb26c2af8589c274c7c17681f5ecc03c18255fe741c6aa64eb" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.106", +] + [[package]] name = "env_filter" version = "0.1.3" @@ -8524,6 +8544,7 @@ dependencies = [ "cfg-if", "codspeed-divan-compat", "enum-iterator", + "enum-map", "flatbuffers", "futures", "getrandom 0.3.4", @@ -8550,13 +8571,16 @@ dependencies = [ "termtree", "vortex-array", "vortex-buffer", + "vortex-compute", "vortex-dtype", "vortex-error", "vortex-flatbuffers", + "vortex-io", "vortex-mask", "vortex-metrics", "vortex-scalar", "vortex-utils", + "vortex-vector", ] [[package]] diff --git a/encodings/alp/src/alp/array.rs b/encodings/alp/src/alp/array.rs index 9249714487b..71f20824384 100644 --- a/encodings/alp/src/alp/array.rs +++ b/encodings/alp/src/alp/array.rs @@ -32,7 +32,7 @@ impl VTable for ALPVTable { type ComputeVTable = NotSupported; type EncodeVTable = Self; type SerdeVTable = Self; - type PipelineVTable = NotSupported; + type OperatorVTable = NotSupported; fn id(_encoding: &Self::Encoding) -> EncodingId { EncodingId::new_ref("vortex.alp") diff --git a/encodings/alp/src/alp_rd/array.rs b/encodings/alp/src/alp_rd/array.rs index 07e0334d2dd..8df4578da2e 100644 --- a/encodings/alp/src/alp_rd/array.rs +++ b/encodings/alp/src/alp_rd/array.rs @@ -35,7 +35,7 @@ impl VTable for ALPRDVTable { type ComputeVTable = NotSupported; type EncodeVTable = Self; type SerdeVTable = Self; - type PipelineVTable = NotSupported; + type OperatorVTable = NotSupported; fn id(_encoding: &Self::Encoding) -> EncodingId { EncodingId::new_ref("vortex.alprd") diff --git a/encodings/bytebool/src/array.rs b/encodings/bytebool/src/array.rs index 695913e1da9..5202a12600d 100644 --- a/encodings/bytebool/src/array.rs +++ b/encodings/bytebool/src/array.rs @@ -34,7 +34,7 @@ impl VTable for ByteBoolVTable { type ComputeVTable = NotSupported; type EncodeVTable = NotSupported; type SerdeVTable = Self; - type PipelineVTable = NotSupported; + type OperatorVTable = NotSupported; fn id(_encoding: &Self::Encoding) -> EncodingId { EncodingId::new_ref("vortex.bytebool") diff --git a/encodings/datetime-parts/src/array.rs b/encodings/datetime-parts/src/array.rs index 5cab986a726..7a778aa5e96 100644 --- a/encodings/datetime-parts/src/array.rs +++ b/encodings/datetime-parts/src/array.rs @@ -28,7 +28,7 @@ impl VTable for DateTimePartsVTable { type ComputeVTable = NotSupported; type EncodeVTable = Self; type SerdeVTable = Self; - type PipelineVTable = NotSupported; + type OperatorVTable = NotSupported; fn id(_encoding: &Self::Encoding) -> EncodingId { EncodingId::new_ref("vortex.datetimeparts") diff --git a/encodings/decimal-byte-parts/src/decimal_byte_parts/mod.rs b/encodings/decimal-byte-parts/src/decimal_byte_parts/mod.rs index 3ac7b9e6bfc..95334c9a1f9 100644 --- a/encodings/decimal-byte-parts/src/decimal_byte_parts/mod.rs +++ b/encodings/decimal-byte-parts/src/decimal_byte_parts/mod.rs @@ -35,7 +35,7 @@ impl VTable for DecimalBytePartsVTable { type ComputeVTable = NotSupported; type EncodeVTable = NotSupported; type SerdeVTable = Self; - type PipelineVTable = NotSupported; + type OperatorVTable = NotSupported; fn id(_encoding: &Self::Encoding) -> EncodingId { EncodingId::new_ref("vortex.decimal_byte_parts") diff --git a/encodings/dict/src/array.rs b/encodings/dict/src/array.rs index cf10112ef4f..01c2108bb12 100644 --- a/encodings/dict/src/array.rs +++ b/encodings/dict/src/array.rs @@ -28,7 +28,7 @@ impl VTable for DictVTable { type ComputeVTable = NotSupported; type EncodeVTable = Self; type SerdeVTable = Self; - type PipelineVTable = NotSupported; + type OperatorVTable = NotSupported; fn id(_encoding: &Self::Encoding) -> EncodingId { EncodingId::new_ref("vortex.dict") diff --git a/encodings/fastlanes/src/bitpacking/mod.rs b/encodings/fastlanes/src/bitpacking/mod.rs index e8136d697b8..efb80640b01 100644 --- a/encodings/fastlanes/src/bitpacking/mod.rs +++ b/encodings/fastlanes/src/bitpacking/mod.rs @@ -45,7 +45,7 @@ impl VTable for BitPackedVTable { type ComputeVTable = NotSupported; type EncodeVTable = Self; type SerdeVTable = Self; - type PipelineVTable = Self; + type OperatorVTable = Self; fn id(_encoding: &Self::Encoding) -> EncodingId { EncodingId::new_ref("fastlanes.bitpacked") diff --git a/encodings/fastlanes/src/bitpacking/operator/mod.rs b/encodings/fastlanes/src/bitpacking/operator/mod.rs index 9f780cf25cc..a01e7bc183f 100644 --- a/encodings/fastlanes/src/bitpacking/operator/mod.rs +++ b/encodings/fastlanes/src/bitpacking/operator/mod.rs @@ -13,7 +13,7 @@ use vortex_array::operator::{ LengthBounds, Operator, OperatorEq, OperatorHash, OperatorId, OperatorRef, }; use vortex_array::pipeline::{BindContext, Kernel, PipelinedOperator, RowSelection}; -use vortex_array::vtable::PipelineVTable; +use vortex_array::vtable::OperatorVTable; use vortex_buffer::Buffer; use vortex_dtype::{DType, PhysicalPType, match_each_integer_ptype}; use vortex_error::VortexResult; @@ -21,7 +21,7 @@ use vortex_error::VortexResult; use crate::operator::aligned_kernel::BitPackedKernel; use crate::{BitPackedArray, BitPackedVTable}; -impl PipelineVTable for BitPackedVTable { +impl OperatorVTable for BitPackedVTable { fn to_operator(array: &BitPackedArray) -> VortexResult> { if array.dtype.is_nullable() { log::trace!("BitPackedVTable does not support nullable arrays"); diff --git a/encodings/fastlanes/src/delta/mod.rs b/encodings/fastlanes/src/delta/mod.rs index 956fd7f1829..d9f916d6243 100644 --- a/encodings/fastlanes/src/delta/mod.rs +++ b/encodings/fastlanes/src/delta/mod.rs @@ -40,7 +40,7 @@ impl VTable for DeltaVTable { type ComputeVTable = NotSupported; type EncodeVTable = NotSupported; type SerdeVTable = Self; - type PipelineVTable = NotSupported; + type OperatorVTable = NotSupported; fn id(_encoding: &Self::Encoding) -> EncodingId { EncodingId::new_ref("fastlanes.delta") diff --git a/encodings/fastlanes/src/for/mod.rs b/encodings/fastlanes/src/for/mod.rs index cc9de46b51d..13fd9572cfa 100644 --- a/encodings/fastlanes/src/for/mod.rs +++ b/encodings/fastlanes/src/for/mod.rs @@ -35,7 +35,7 @@ impl VTable for FoRVTable { type ComputeVTable = NotSupported; type EncodeVTable = Self; type SerdeVTable = Self; - type PipelineVTable = Self; + type OperatorVTable = Self; fn id(_encoding: &Self::Encoding) -> EncodingId { EncodingId::new_ref("fastlanes.for") diff --git a/encodings/fastlanes/src/for/pipeline.rs b/encodings/fastlanes/src/for/pipeline.rs index 647c40cfbff..428c334c88e 100644 --- a/encodings/fastlanes/src/for/pipeline.rs +++ b/encodings/fastlanes/src/for/pipeline.rs @@ -16,14 +16,14 @@ use vortex_array::pipeline::view::ViewMut; use vortex_array::pipeline::{ BindContext, Element, Kernel, KernelContext, PipelinedOperator, RowSelection, VectorId, }; -use vortex_array::vtable::PipelineVTable; +use vortex_array::vtable::OperatorVTable; use vortex_dtype::{DType, NativePType, PType, match_each_integer_ptype}; use vortex_error::{VortexExpect, VortexResult, vortex_bail}; use vortex_scalar::Scalar; use crate::{FoRArray, FoRVTable}; -impl PipelineVTable for FoRVTable { +impl OperatorVTable for FoRVTable { fn to_operator(array: &FoRArray) -> VortexResult> { let Some(op) = array.encoded.to_operator()? else { return Ok(None); diff --git a/encodings/fastlanes/src/rle/mod.rs b/encodings/fastlanes/src/rle/mod.rs index 88f36d13c2b..f9f713fb00d 100644 --- a/encodings/fastlanes/src/rle/mod.rs +++ b/encodings/fastlanes/src/rle/mod.rs @@ -37,7 +37,7 @@ impl VTable for RLEVTable { type ComputeVTable = NotSupported; type EncodeVTable = Self; type SerdeVTable = Self; - type PipelineVTable = NotSupported; + type OperatorVTable = NotSupported; fn id(_encoding: &Self::Encoding) -> EncodingId { EncodingId::new_ref("fastlanes.rle") diff --git a/encodings/fsst/src/array.rs b/encodings/fsst/src/array.rs index 5aefc19cc36..7df661e604f 100644 --- a/encodings/fsst/src/array.rs +++ b/encodings/fsst/src/array.rs @@ -32,7 +32,7 @@ impl VTable for FSSTVTable { type ComputeVTable = NotSupported; type EncodeVTable = Self; type SerdeVTable = Self; - type PipelineVTable = Self; + type OperatorVTable = Self; fn id(_encoding: &Self::Encoding) -> EncodingId { EncodingId::new_ref("vortex.fsst") diff --git a/encodings/fsst/src/operator.rs b/encodings/fsst/src/operator.rs index ef0f09f4202..f203fcdcb72 100644 --- a/encodings/fsst/src/operator.rs +++ b/encodings/fsst/src/operator.rs @@ -13,7 +13,7 @@ use vortex_array::operator::{ BatchBindCtx, BatchExecution, BatchExecutionRef, BatchOperator, LengthBounds, Operator, OperatorEq, OperatorHash, OperatorId, OperatorRef, }; -use vortex_array::vtable::PipelineVTable; +use vortex_array::vtable::OperatorVTable; use vortex_array::{Array, Canonical}; use vortex_dtype::DType; use vortex_error::VortexResult; @@ -21,7 +21,7 @@ use vortex_mask::Mask; use crate::{FSSTArray, FSSTVTable}; -impl PipelineVTable for FSSTVTable { +impl OperatorVTable for FSSTVTable { fn to_operator(array: &FSSTArray) -> VortexResult> { Ok(Some(Arc::new(array.clone()))) } diff --git a/encodings/pco/src/array.rs b/encodings/pco/src/array.rs index 253a43e1593..34481007ddf 100644 --- a/encodings/pco/src/array.rs +++ b/encodings/pco/src/array.rs @@ -64,7 +64,7 @@ impl VTable for PcoVTable { type ComputeVTable = NotSupported; type EncodeVTable = Self; type SerdeVTable = Self; - type PipelineVTable = NotSupported; + type OperatorVTable = NotSupported; fn id(_encoding: &Self::Encoding) -> EncodingId { EncodingId::new_ref("vortex.pco") diff --git a/encodings/runend/src/array.rs b/encodings/runend/src/array.rs index c8081371c66..ca379d5f34e 100644 --- a/encodings/runend/src/array.rs +++ b/encodings/runend/src/array.rs @@ -33,7 +33,7 @@ impl VTable for RunEndVTable { type ComputeVTable = NotSupported; type EncodeVTable = Self; type SerdeVTable = Self; - type PipelineVTable = NotSupported; + type OperatorVTable = NotSupported; fn id(_encoding: &Self::Encoding) -> EncodingId { EncodingId::new_ref("vortex.runend") diff --git a/encodings/sequence/src/array.rs b/encodings/sequence/src/array.rs index 744566bb617..57c90d7ec23 100644 --- a/encodings/sequence/src/array.rs +++ b/encodings/sequence/src/array.rs @@ -160,7 +160,7 @@ impl VTable for SequenceVTable { type ComputeVTable = NotSupported; type EncodeVTable = Self; type SerdeVTable = Self; - type PipelineVTable = Self; + type OperatorVTable = Self; fn id(_encoding: &Self::Encoding) -> EncodingId { EncodingId::new_ref("vortex.sequence") diff --git a/encodings/sequence/src/operator.rs b/encodings/sequence/src/operator.rs index 7d77b99773e..0e64ab4799b 100644 --- a/encodings/sequence/src/operator.rs +++ b/encodings/sequence/src/operator.rs @@ -17,13 +17,13 @@ use vortex_array::pipeline::view::ViewMut; use vortex_array::pipeline::{ BindContext, Element, Kernel, KernelContext, N, PipelinedOperator, RowSelection, }; -use vortex_array::vtable::PipelineVTable; +use vortex_array::vtable::OperatorVTable; use vortex_dtype::{DType, IntegerPType, NativePType, match_each_integer_ptype}; use vortex_error::{VortexResult, vortex_err}; use crate::{SequenceArray, SequenceVTable}; -impl PipelineVTable for SequenceVTable { +impl OperatorVTable for SequenceVTable { fn to_operator(array: &SequenceArray) -> VortexResult> { Ok(Some(Arc::new(array.clone()))) } diff --git a/encodings/sparse/src/lib.rs b/encodings/sparse/src/lib.rs index a74e0c8232a..33a3bcaa8a5 100644 --- a/encodings/sparse/src/lib.rs +++ b/encodings/sparse/src/lib.rs @@ -40,7 +40,7 @@ impl VTable for SparseVTable { type ComputeVTable = NotSupported; type EncodeVTable = Self; type SerdeVTable = Self; - type PipelineVTable = NotSupported; + type OperatorVTable = NotSupported; fn id(_encoding: &Self::Encoding) -> EncodingId { EncodingId::new_ref("vortex.sparse") diff --git a/encodings/zigzag/src/array.rs b/encodings/zigzag/src/array.rs index 66003260ef0..811480c52ef 100644 --- a/encodings/zigzag/src/array.rs +++ b/encodings/zigzag/src/array.rs @@ -35,7 +35,7 @@ impl VTable for ZigZagVTable { type ComputeVTable = NotSupported; type EncodeVTable = Self; type SerdeVTable = Self; - type PipelineVTable = NotSupported; + type OperatorVTable = NotSupported; fn id(_encoding: &Self::Encoding) -> EncodingId { EncodingId::new_ref("vortex.zigzag") diff --git a/encodings/zstd/src/array.rs b/encodings/zstd/src/array.rs index ca0196d5bf5..9238c4a507a 100644 --- a/encodings/zstd/src/array.rs +++ b/encodings/zstd/src/array.rs @@ -65,7 +65,7 @@ impl VTable for ZstdVTable { type ComputeVTable = NotSupported; type EncodeVTable = Self; type SerdeVTable = Self; - type PipelineVTable = NotSupported; + type OperatorVTable = NotSupported; fn id(_encoding: &Self::Encoding) -> EncodingId { EncodingId::new_ref("vortex.zstd") diff --git a/vortex-array/Cargo.toml b/vortex-array/Cargo.toml index 18b65022171..8d15e54aa7d 100644 --- a/vortex-array/Cargo.toml +++ b/vortex-array/Cargo.toml @@ -34,6 +34,7 @@ async-trait = { workspace = true } bitvec = { workspace = true } cfg-if = { workspace = true } enum-iterator = { workspace = true } +enum-map = { workspace = true } flatbuffers = { workspace = true } futures = { workspace = true, features = ["alloc", "async-await", "std"] } getrandom_v03 = { workspace = true } @@ -60,13 +61,16 @@ tabled = { workspace = true, optional = true, default-features = false, features ] } termtree = { workspace = true } vortex-buffer = { workspace = true, features = ["arrow"] } -vortex-dtype = { workspace = true, features = ["arrow"] } +vortex-compute = { workspace = true, default-features = true } +vortex-dtype = { workspace = true, features = ["arrow", "serde"] } vortex-error = { workspace = true, features = ["prost"] } vortex-flatbuffers = { workspace = true, features = ["array"] } +vortex-io = { workspace = true } vortex-mask = { workspace = true } vortex-metrics = { workspace = true } vortex-scalar = { workspace = true } vortex-utils = { workspace = true } +vortex-vector = { workspace = true } [features] arbitrary = [ diff --git a/vortex-array/src/array/mod.rs b/vortex-array/src/array/mod.rs index ddae70c5a77..ca7a0c835ef 100644 --- a/vortex-array/src/array/mod.rs +++ b/vortex-array/src/array/mod.rs @@ -2,6 +2,7 @@ // SPDX-FileCopyrightText: Copyright the Vortex contributors pub mod display; +mod operator; mod visitor; use std::any::Any; @@ -10,6 +11,7 @@ use std::hash::{Hash, Hasher}; use std::ops::Range; use std::sync::Arc; +pub use operator::*; pub use visitor::*; use vortex_buffer::ByteBuffer; use vortex_dtype::{DType, Nullability}; @@ -28,7 +30,7 @@ use crate::operator::OperatorRef; use crate::serde::ArrayChildren; use crate::stats::{Precision, Stat, StatsProviderExt, StatsSetRef}; use crate::vtable::{ - ArrayVTable, CanonicalVTable, ComputeVTable, OperationsVTable, PipelineVTable, SerdeVTable, + ArrayVTable, CanonicalVTable, ComputeVTable, OperationsVTable, OperatorVTable, SerdeVTable, VTable, ValidityVTable, VisitorVTable, }; use crate::{ @@ -38,7 +40,15 @@ use crate::{ /// The public API trait for all Vortex arrays. pub trait Array: - 'static + private::Sealed + Send + Sync + Debug + DynArrayEq + DynArrayHash + ArrayVisitor + 'static + + private::Sealed + + Send + + Sync + + Debug + + DynArrayEq + + DynArrayHash + + ArrayVisitor + + ArrayOperator { /// Returns the array as a reference to a generic [`Any`] trait object. fn as_any(&self) -> &dyn Any; @@ -159,7 +169,7 @@ pub trait Array: fn invoke(&self, compute_fn: &ComputeFn, args: &InvocationArgs) -> VortexResult>; - /// Convert the array to a operator operator if supported by the encoding. + /// Convert the array to an operator if supported by the encoding. /// /// Returns `None` if the encoding does not support operator operations. fn to_operator(&self) -> VortexResult>; @@ -640,7 +650,7 @@ impl Array for ArrayAdapter { } fn to_operator(&self) -> VortexResult> { - >::to_operator(&self.0) + >::to_operator(&self.0) } } diff --git a/vortex-array/src/array/operator.rs b/vortex-array/src/array/operator.rs new file mode 100644 index 00000000000..62c204e4982 --- /dev/null +++ b/vortex-array/src/array/operator.rs @@ -0,0 +1,75 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::sync::Arc; + +use async_trait::async_trait; +use vortex_error::VortexResult; +use vortex_vector::Vector; + +use crate::execution::{BatchKernel, BindCtx}; +use crate::vtable::{OperatorVTable, VTable}; +use crate::{Array, ArrayAdapter, ArrayRef}; + +/// Array functions as provided by the `OperatorVTable`. +/// +/// Note: the public functions such as "execute" should move onto the main `Array` trait when +/// operators is stabilized. The other functions should remain on a `pub(crate)` trait. +#[async_trait] +pub trait ArrayOperator: 'static + Send + Sync { + /// Execute the array producing a canonical vector. + async fn execute(&self) -> VortexResult { + self.execute_with_selection(None).await + } + + /// Execute the array with a selection mask, producing a canonical vector. + async fn execute_with_selection(&self, selection: Option<&ArrayRef>) -> VortexResult; + + /// Bind the array to a batch kernel. This is an internal function + fn bind( + &self, + selection: Option<&ArrayRef>, + ctx: &mut dyn BindCtx, + ) -> VortexResult; +} + +#[async_trait] +impl ArrayOperator for Arc { + async fn execute_with_selection(&self, selection: Option<&ArrayRef>) -> VortexResult { + self.as_ref().execute_with_selection(selection).await + } + + fn bind( + &self, + selection: Option<&ArrayRef>, + ctx: &mut dyn BindCtx, + ) -> VortexResult { + self.as_ref().bind(selection, ctx) + } +} + +#[async_trait] +impl ArrayOperator for ArrayAdapter { + async fn execute_with_selection(&self, selection: Option<&ArrayRef>) -> VortexResult { + self.bind(selection, &mut ())?.await + } + + fn bind( + &self, + selection: Option<&ArrayRef>, + ctx: &mut dyn BindCtx, + ) -> VortexResult { + >::bind(&self.0, selection, ctx) + } +} + +// TODO(ngates): create a smarter context in the future +impl BindCtx for () { + fn bind( + &mut self, + array: &ArrayRef, + selection: Option<&ArrayRef>, + ) -> VortexResult { + array.bind(selection, self) + } +} diff --git a/vortex-array/src/arrays/bool/array.rs b/vortex-array/src/arrays/bool/array.rs index fa05ca18c7e..67c997460be 100644 --- a/vortex-array/src/arrays/bool/array.rs +++ b/vortex-array/src/arrays/bool/array.rs @@ -10,6 +10,7 @@ use vortex_mask::Mask; use crate::arrays::bool; use crate::stats::ArrayStats; use crate::validity::Validity; +use crate::{ArrayRef, IntoArray}; /// A boolean array that stores true/false values in a compact bit-packed format. /// @@ -239,6 +240,21 @@ impl FromIterator> for BoolArray { } } +impl IntoArray for BitBuffer { + fn into_array(self) -> ArrayRef { + let len = self.len(); + BoolArray::try_new(self.into_inner(), 0, len, Validity::NonNullable) + .vortex_expect("known correct") + .into_array() + } +} + +impl IntoArray for BitBufferMut { + fn into_array(self) -> ArrayRef { + self.freeze().into_array() + } +} + #[cfg(test)] mod tests { use vortex_buffer::{BitBuffer, BitBufferMut, buffer}; diff --git a/vortex-array/src/arrays/bool/vtable/mod.rs b/vortex-array/src/arrays/bool/vtable/mod.rs index 56fda71891e..6aff70265e1 100644 --- a/vortex-array/src/arrays/bool/vtable/mod.rs +++ b/vortex-array/src/arrays/bool/vtable/mod.rs @@ -8,6 +8,7 @@ use crate::{EncodingId, EncodingRef, vtable}; mod array; mod canonical; mod operations; +mod operator; mod serde; mod validity; mod visitor; @@ -25,7 +26,7 @@ impl VTable for BoolVTable { type VisitorVTable = Self; type ComputeVTable = NotSupported; type EncodeVTable = NotSupported; - type PipelineVTable = NotSupported; + type OperatorVTable = Self; type SerdeVTable = Self; fn id(_encoding: &Self::Encoding) -> EncodingId { diff --git a/vortex-array/src/arrays/bool/vtable/operator.rs b/vortex-array/src/arrays/bool/vtable/operator.rs new file mode 100644 index 00000000000..057ad1ce3fc --- /dev/null +++ b/vortex-array/src/arrays/bool/vtable/operator.rs @@ -0,0 +1,34 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use futures::{FutureExt, try_join}; +use vortex_compute::filter::Filter; +use vortex_error::VortexResult; +use vortex_vector::BoolVector; + +use crate::ArrayRef; +use crate::arrays::{BoolArray, BoolVTable}; +use crate::execution::{BatchKernel, BindCtx}; +use crate::vtable::{OperatorVTable, ValidityHelper}; + +impl OperatorVTable for BoolVTable { + fn bind( + array: &BoolArray, + selection: Option<&ArrayRef>, + ctx: &mut dyn BindCtx, + ) -> VortexResult { + let bits = array.buffer.clone(); + let mask = ctx.bind_selection(array.len(), selection)?; + let validity = ctx.bind_validity(array.validity(), array.len(), selection)?; + + Ok(async move { + let (mask, validity) = try_join!(mask, validity)?; + + // Note that validity already has the mask applied so we only need to apply it to bits. + let bits = bits.filter(&mask); + + Ok(BoolVector::new(bits, validity).into()) + } + .boxed()) + } +} diff --git a/vortex-array/src/arrays/chunked/vtable/mod.rs b/vortex-array/src/arrays/chunked/vtable/mod.rs index d91f92b8630..aa111c1763a 100644 --- a/vortex-array/src/arrays/chunked/vtable/mod.rs +++ b/vortex-array/src/arrays/chunked/vtable/mod.rs @@ -26,7 +26,7 @@ impl VTable for ChunkedVTable { type VisitorVTable = Self; type ComputeVTable = Self; type EncodeVTable = NotSupported; - type PipelineVTable = NotSupported; + type OperatorVTable = NotSupported; type SerdeVTable = Self; fn id(_encoding: &Self::Encoding) -> EncodingId { diff --git a/vortex-array/src/arrays/constant/vtable/mod.rs b/vortex-array/src/arrays/constant/vtable/mod.rs index 9ac5a1071fb..9b1d9ecbe36 100644 --- a/vortex-array/src/arrays/constant/vtable/mod.rs +++ b/vortex-array/src/arrays/constant/vtable/mod.rs @@ -31,7 +31,7 @@ impl VTable for ConstantVTable { // TODO(ngates): implement a compute kernel for elementwise operations type ComputeVTable = NotSupported; type EncodeVTable = Self; - type PipelineVTable = Self; + type OperatorVTable = Self; type SerdeVTable = Self; fn id(_encoding: &Self::Encoding) -> EncodingId { diff --git a/vortex-array/src/arrays/constant/vtable/pipeline.rs b/vortex-array/src/arrays/constant/vtable/pipeline.rs index ab55e590439..b15399e1f9b 100644 --- a/vortex-array/src/arrays/constant/vtable/pipeline.rs +++ b/vortex-array/src/arrays/constant/vtable/pipeline.rs @@ -16,9 +16,9 @@ use crate::pipeline::view::ViewMut; use crate::pipeline::{ BindContext, Element, Kernel, KernelContext, N, PipelinedOperator, RowSelection, }; -use crate::vtable::PipelineVTable; +use crate::vtable::OperatorVTable; -impl PipelineVTable for ConstantVTable { +impl OperatorVTable for ConstantVTable { fn to_operator(array: &ConstantArray) -> VortexResult> { Ok(Some(Arc::new(array.clone()))) } diff --git a/vortex-array/src/arrays/decimal/vtable/mod.rs b/vortex-array/src/arrays/decimal/vtable/mod.rs index fa408626053..c57099d458b 100644 --- a/vortex-array/src/arrays/decimal/vtable/mod.rs +++ b/vortex-array/src/arrays/decimal/vtable/mod.rs @@ -25,7 +25,7 @@ impl VTable for DecimalVTable { type VisitorVTable = Self; type ComputeVTable = NotSupported; type EncodeVTable = NotSupported; - type PipelineVTable = NotSupported; + type OperatorVTable = NotSupported; type SerdeVTable = Self; fn id(_encoding: &Self::Encoding) -> EncodingId { diff --git a/vortex-array/src/arrays/extension/vtable/mod.rs b/vortex-array/src/arrays/extension/vtable/mod.rs index 3a56ba4b1a7..0e4dfbabfc5 100644 --- a/vortex-array/src/arrays/extension/vtable/mod.rs +++ b/vortex-array/src/arrays/extension/vtable/mod.rs @@ -25,7 +25,7 @@ impl VTable for ExtensionVTable { type VisitorVTable = Self; type ComputeVTable = NotSupported; type EncodeVTable = NotSupported; - type PipelineVTable = NotSupported; + type OperatorVTable = NotSupported; type SerdeVTable = Self; fn id(_encoding: &Self::Encoding) -> EncodingId { diff --git a/vortex-array/src/arrays/fixed_size_list/vtable/mod.rs b/vortex-array/src/arrays/fixed_size_list/vtable/mod.rs index 541536a7229..22b0f342666 100644 --- a/vortex-array/src/arrays/fixed_size_list/vtable/mod.rs +++ b/vortex-array/src/arrays/fixed_size_list/vtable/mod.rs @@ -28,7 +28,7 @@ impl VTable for FixedSizeListVTable { type VisitorVTable = Self; type ComputeVTable = NotSupported; type EncodeVTable = NotSupported; - type PipelineVTable = NotSupported; + type OperatorVTable = NotSupported; type SerdeVTable = Self; fn id(_encoding: &Self::Encoding) -> EncodingId { diff --git a/vortex-array/src/arrays/list/vtable/mod.rs b/vortex-array/src/arrays/list/vtable/mod.rs index 00455ef270e..cdce55216ba 100644 --- a/vortex-array/src/arrays/list/vtable/mod.rs +++ b/vortex-array/src/arrays/list/vtable/mod.rs @@ -25,7 +25,7 @@ impl VTable for ListVTable { type VisitorVTable = Self; type ComputeVTable = NotSupported; type EncodeVTable = NotSupported; - type PipelineVTable = NotSupported; + type OperatorVTable = NotSupported; type SerdeVTable = Self; fn id(_encoding: &Self::Encoding) -> EncodingId { diff --git a/vortex-array/src/arrays/listview/vtable/mod.rs b/vortex-array/src/arrays/listview/vtable/mod.rs index 3d5ba7f935b..423051a2e60 100644 --- a/vortex-array/src/arrays/listview/vtable/mod.rs +++ b/vortex-array/src/arrays/listview/vtable/mod.rs @@ -28,7 +28,7 @@ impl VTable for ListViewVTable { type VisitorVTable = Self; type ComputeVTable = NotSupported; type EncodeVTable = NotSupported; - type PipelineVTable = NotSupported; + type OperatorVTable = NotSupported; type SerdeVTable = Self; fn id(_encoding: &Self::Encoding) -> EncodingId { diff --git a/vortex-array/src/arrays/masked/vtable/mod.rs b/vortex-array/src/arrays/masked/vtable/mod.rs index 1f40a6a4344..f2eff52a244 100644 --- a/vortex-array/src/arrays/masked/vtable/mod.rs +++ b/vortex-array/src/arrays/masked/vtable/mod.rs @@ -28,7 +28,7 @@ impl VTable for MaskedVTable { type ComputeVTable = NotSupported; type EncodeVTable = NotSupported; type SerdeVTable = Self; - type PipelineVTable = NotSupported; + type OperatorVTable = NotSupported; fn id(_encoding: &Self::Encoding) -> EncodingId { EncodingId::new_ref("vortex.masked") diff --git a/vortex-array/src/arrays/null/mod.rs b/vortex-array/src/arrays/null/mod.rs index 8c12cdba46b..79e6e95dd80 100644 --- a/vortex-array/src/arrays/null/mod.rs +++ b/vortex-array/src/arrays/null/mod.rs @@ -36,7 +36,7 @@ impl VTable for NullVTable { type VisitorVTable = Self; type ComputeVTable = NotSupported; type EncodeVTable = NotSupported; - type PipelineVTable = NotSupported; + type OperatorVTable = NotSupported; type SerdeVTable = Self; fn id(_encoding: &Self::Encoding) -> EncodingId { diff --git a/vortex-array/src/arrays/primitive/vtable/mod.rs b/vortex-array/src/arrays/primitive/vtable/mod.rs index e8b8887bfc5..ac1af8d8cdc 100644 --- a/vortex-array/src/arrays/primitive/vtable/mod.rs +++ b/vortex-array/src/arrays/primitive/vtable/mod.rs @@ -26,7 +26,7 @@ impl VTable for PrimitiveVTable { type VisitorVTable = Self; type ComputeVTable = NotSupported; type EncodeVTable = NotSupported; - type PipelineVTable = Self; + type OperatorVTable = Self; type SerdeVTable = Self; fn id(_encoding: &Self::Encoding) -> EncodingId { diff --git a/vortex-array/src/arrays/primitive/vtable/pipeline.rs b/vortex-array/src/arrays/primitive/vtable/pipeline.rs index 80fca778b3a..cdf288f5edd 100644 --- a/vortex-array/src/arrays/primitive/vtable/pipeline.rs +++ b/vortex-array/src/arrays/primitive/vtable/pipeline.rs @@ -18,9 +18,9 @@ use crate::operator::{ BatchBindCtx, BatchExecutionRef, BatchOperator, DisplayFormat, LengthBounds, Operator, OperatorEq, OperatorHash, OperatorId, OperatorRef, }; -use crate::vtable::PipelineVTable; +use crate::vtable::OperatorVTable; -impl PipelineVTable for PrimitiveVTable { +impl OperatorVTable for PrimitiveVTable { fn to_operator(array: &PrimitiveArray) -> VortexResult> { Ok(Some(Arc::new(array.clone()))) } diff --git a/vortex-array/src/arrays/struct_/vtable/mod.rs b/vortex-array/src/arrays/struct_/vtable/mod.rs index 659cc5d9d18..3833fd69ea0 100644 --- a/vortex-array/src/arrays/struct_/vtable/mod.rs +++ b/vortex-array/src/arrays/struct_/vtable/mod.rs @@ -26,7 +26,7 @@ impl VTable for StructVTable { type VisitorVTable = Self; type ComputeVTable = NotSupported; type EncodeVTable = NotSupported; - type PipelineVTable = Self; + type OperatorVTable = Self; type SerdeVTable = Self; fn id(_encoding: &Self::Encoding) -> EncodingId { diff --git a/vortex-array/src/arrays/struct_/vtable/pipeline.rs b/vortex-array/src/arrays/struct_/vtable/pipeline.rs index f2b4d42c9c1..9c9aa43c84d 100644 --- a/vortex-array/src/arrays/struct_/vtable/pipeline.rs +++ b/vortex-array/src/arrays/struct_/vtable/pipeline.rs @@ -17,10 +17,10 @@ use crate::operator::{ OperatorEq, OperatorHash, OperatorId, OperatorRef, }; use crate::validity::Validity; -use crate::vtable::PipelineVTable; +use crate::vtable::OperatorVTable; use crate::{Array, ArrayRef, Canonical, IntoArray}; -impl PipelineVTable for StructVTable { +impl OperatorVTable for StructVTable { fn to_operator(array: &StructArray) -> VortexResult> { let mut children = Vec::with_capacity(array.fields.len()); for field in array.fields().iter() { diff --git a/vortex-array/src/arrays/varbin/vtable/mod.rs b/vortex-array/src/arrays/varbin/vtable/mod.rs index 15295c495af..1de0c30022e 100644 --- a/vortex-array/src/arrays/varbin/vtable/mod.rs +++ b/vortex-array/src/arrays/varbin/vtable/mod.rs @@ -24,7 +24,7 @@ impl VTable for VarBinVTable { type VisitorVTable = Self; type ComputeVTable = NotSupported; type EncodeVTable = NotSupported; - type PipelineVTable = NotSupported; + type OperatorVTable = NotSupported; type SerdeVTable = Self; fn id(_encoding: &Self::Encoding) -> EncodingId { diff --git a/vortex-array/src/arrays/varbinview/vtable/mod.rs b/vortex-array/src/arrays/varbinview/vtable/mod.rs index 00b21cc4b1b..c81c3357aa7 100644 --- a/vortex-array/src/arrays/varbinview/vtable/mod.rs +++ b/vortex-array/src/arrays/varbinview/vtable/mod.rs @@ -25,7 +25,7 @@ impl VTable for VarBinViewVTable { type VisitorVTable = Self; type ComputeVTable = NotSupported; type EncodeVTable = NotSupported; - type PipelineVTable = NotSupported; + type OperatorVTable = NotSupported; type SerdeVTable = Self; fn id(_encoding: &Self::Encoding) -> EncodingId { diff --git a/vortex-array/src/arrow/array.rs b/vortex-array/src/arrow/array.rs index 3d727688717..a0d3253db66 100644 --- a/vortex-array/src/arrow/array.rs +++ b/vortex-array/src/arrow/array.rs @@ -35,7 +35,7 @@ impl VTable for ArrowVTable { type VisitorVTable = Self; type ComputeVTable = NotSupported; type EncodeVTable = NotSupported; - type PipelineVTable = NotSupported; + type OperatorVTable = NotSupported; type SerdeVTable = NotSupported; fn id(_encoding: &Self::Encoding) -> EncodingId { diff --git a/vortex-array/src/compute/arrays/logical.rs b/vortex-array/src/compute/arrays/logical.rs new file mode 100644 index 00000000000..0a859489d24 --- /dev/null +++ b/vortex-array/src/compute/arrays/logical.rs @@ -0,0 +1,251 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::hash::{Hash, Hasher}; +use std::sync::LazyLock; + +use enum_map::{Enum, EnumMap, enum_map}; +use futures::{FutureExt, try_join}; +use vortex_buffer::ByteBuffer; +use vortex_compute::logical::{ + LogicalAnd, LogicalAndKleene, LogicalAndNot, LogicalOr, LogicalOrKleene, +}; +use vortex_dtype::DType; +use vortex_error::VortexResult; +use vortex_vector::BoolVector; + +use crate::execution::{BatchKernel, BindCtx}; +use crate::serde::ArrayChildren; +use crate::stats::{ArrayStats, StatsSetRef}; +use crate::vtable::{ + ArrayVTable, NotSupported, OperatorVTable, SerdeVTable, VTable, VisitorVTable, +}; +use crate::{ + Array, ArrayBufferVisitor, ArrayChildVisitor, ArrayEq, ArrayHash, ArrayRef, + DeserializeMetadata, EmptyMetadata, EncodingId, EncodingRef, Precision, vtable, +}; + +/// The set of operators supported by a logical array. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Enum)] +pub enum LogicalOperator { + /// Logical AND + And, + /// Logical AND with Kleene logic + AndKleene, + /// Logical OR + Or, + /// Logical OR with Kleene logic + OrKleene, + /// Logical AND NOT + AndNot, +} + +vtable!(Logical); + +#[derive(Debug, Clone)] +pub struct LogicalArray { + encoding: EncodingRef, + lhs: ArrayRef, + rhs: ArrayRef, + stats: ArrayStats, +} + +impl LogicalArray { + /// Create a new logical array. + pub fn new(lhs: ArrayRef, rhs: ArrayRef, operator: LogicalOperator) -> Self { + assert_eq!( + lhs.len(), + rhs.len(), + "Logical arrays require lhs and rhs to have the same length" + ); + + // TODO(ngates): should we automatically cast non-null to nullable if required? + assert!(matches!(lhs.dtype(), DType::Bool(_))); + assert_eq!(lhs.dtype(), rhs.dtype()); + + Self { + encoding: ENCODINGS[operator].clone(), + lhs, + rhs, + stats: ArrayStats::default(), + } + } + + /// Returns the operator of this logical array. + pub fn operator(&self) -> LogicalOperator { + self.encoding.as_::().operator + } +} + +#[derive(Debug, Clone)] +pub struct LogicalEncoding { + // We include the operator in the encoding so each operator is a different encoding ID. + // This makes it easier for plugins to construct expressions and perform pushdown + // optimizations. + operator: LogicalOperator, +} + +#[allow(clippy::mem_forget)] +static ENCODINGS: LazyLock> = LazyLock::new(|| { + enum_map! { + operator => LogicalEncoding { operator }.to_encoding(), + } +}); + +impl VTable for LogicalVTable { + type Array = LogicalArray; + type Encoding = LogicalEncoding; + type ArrayVTable = Self; + type CanonicalVTable = NotSupported; + type OperationsVTable = NotSupported; + type ValidityVTable = NotSupported; + type VisitorVTable = Self; + type ComputeVTable = NotSupported; + type EncodeVTable = NotSupported; + type SerdeVTable = Self; + type OperatorVTable = Self; + + fn id(encoding: &Self::Encoding) -> EncodingId { + match encoding.operator { + LogicalOperator::And => EncodingId::from("vortex.and"), + LogicalOperator::AndKleene => EncodingId::from("vortex.and_kleene"), + LogicalOperator::Or => EncodingId::from("vortex.or"), + LogicalOperator::OrKleene => EncodingId::from("vortex.or_kleene"), + LogicalOperator::AndNot => EncodingId::from("vortex.and_not"), + } + } + + fn encoding(array: &Self::Array) -> EncodingRef { + array.encoding.clone() + } +} + +impl ArrayVTable for LogicalVTable { + fn len(array: &LogicalArray) -> usize { + array.lhs.len() + } + + fn dtype(array: &LogicalArray) -> &DType { + array.lhs.dtype() + } + + fn stats(array: &LogicalArray) -> StatsSetRef<'_> { + array.stats.to_ref(array.as_ref()) + } + + fn array_hash(array: &LogicalArray, state: &mut H, precision: Precision) { + array.lhs.array_hash(state, precision); + array.rhs.array_hash(state, precision); + } + + fn array_eq(array: &LogicalArray, other: &LogicalArray, precision: Precision) -> bool { + array.lhs.array_eq(&other.lhs, precision) && array.rhs.array_eq(&other.rhs, precision) + } +} + +impl VisitorVTable for LogicalVTable { + fn visit_buffers(_array: &LogicalArray, _visitor: &mut dyn ArrayBufferVisitor) { + // No buffers + } + + fn visit_children(array: &LogicalArray, visitor: &mut dyn ArrayChildVisitor) { + visitor.visit_child("lhs", array.lhs.as_ref()); + visitor.visit_child("rhs", array.rhs.as_ref()); + } +} + +impl SerdeVTable for LogicalVTable { + type Metadata = EmptyMetadata; + + fn metadata(_array: &LogicalArray) -> VortexResult> { + Ok(Some(EmptyMetadata)) + } + + fn build( + encoding: &LogicalEncoding, + dtype: &DType, + len: usize, + _metadata: &::Output, + buffers: &[ByteBuffer], + children: &dyn ArrayChildren, + ) -> VortexResult { + assert!(buffers.is_empty()); + Ok(LogicalArray::new( + children.get(0, dtype, len)?, + children.get(1, dtype, len)?, + encoding.operator, + )) + } +} + +impl OperatorVTable for LogicalVTable { + fn bind( + array: &LogicalArray, + selection: Option<&ArrayRef>, + ctx: &mut dyn BindCtx, + ) -> VortexResult { + let lhs = ctx.bind(&array.lhs, selection)?; + let rhs = ctx.bind(&array.rhs, selection)?; + + Ok(match array.operator() { + LogicalOperator::And => kernel(lhs, rhs, |l, r| l.and(&r)), + LogicalOperator::AndKleene => kernel(lhs, rhs, |l, r| l.and_kleene(&r)), + LogicalOperator::Or => kernel(lhs, rhs, |l, r| l.or(&r)), + LogicalOperator::OrKleene => kernel(lhs, rhs, |l, r| l.or_kleene(&r)), + LogicalOperator::AndNot => kernel(lhs, rhs, |l, r| l.and_not(&r)), + }) + } +} + +/// Batch execution kernel for logical operations. +fn kernel(lhs: BatchKernel, rhs: BatchKernel, op: O) -> BatchKernel +where + O: Fn(BoolVector, BoolVector) -> BoolVector + Send + 'static, +{ + async move { + let (lhs, rhs) = try_join!(lhs, rhs)?; + let (lhs, rhs) = (lhs.into_bool(), rhs.into_bool()); + Ok(op(lhs, rhs).into()) + } + .boxed() +} + +#[cfg(test)] +mod tests { + use vortex_buffer::bitbuffer; + use vortex_io::runtime::single::block_on; + + use crate::compute::arrays::logical::{LogicalArray, LogicalOperator}; + use crate::{ArrayOperator, ArrayRef, IntoArray}; + + fn and_(lhs: ArrayRef, rhs: ArrayRef) -> ArrayRef { + LogicalArray::new(lhs, rhs, LogicalOperator::And).into_array() + } + + #[test] + fn test_and() { + block_on(|_| async { + let lhs = bitbuffer![0 1 0].into_array(); + let rhs = bitbuffer![0 1 1].into_array(); + let result = and_(lhs, rhs).execute().await.unwrap().into_bool(); + assert_eq!(result.bits(), &bitbuffer![0 1 0]); + }) + } + + #[test] + fn test_and_selected() { + block_on(|_| async { + let lhs = bitbuffer![0 1 0].into_array(); + let rhs = bitbuffer![0 1 1].into_array(); + + let selection = bitbuffer![0 1 1].into_array(); + + let result = and_(lhs, rhs) + .execute_with_selection(Some(&selection)) + .await + .unwrap() + .into_bool(); + assert_eq!(result.bits(), &bitbuffer![1 0]); + }) + } +} diff --git a/vortex-array/src/compute/arrays/mod.rs b/vortex-array/src/compute/arrays/mod.rs new file mode 100644 index 00000000000..97dc8731c56 --- /dev/null +++ b/vortex-array/src/compute/arrays/mod.rs @@ -0,0 +1,4 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +mod logical; diff --git a/vortex-array/src/compute/mod.rs b/vortex-array/src/compute/mod.rs index 8914bc7101e..094bf982313 100644 --- a/vortex-array/src/compute/mod.rs +++ b/vortex-array/src/compute/mod.rs @@ -43,6 +43,7 @@ use crate::{Array, ArrayRef}; #[cfg(feature = "arbitrary")] mod arbitrary; +mod arrays; mod between; mod boolean; mod cast; diff --git a/vortex-array/src/encoding.rs b/vortex-array/src/encoding.rs index 7f8057b192e..5859c7848fe 100644 --- a/vortex-array/src/encoding.rs +++ b/vortex-array/src/encoding.rs @@ -28,10 +28,6 @@ pub trait Encoding: 'static + private::Sealed + Send + Sync + Debug { fn to_encoding(&self) -> EncodingRef; - fn into_encoding(self) -> EncodingRef - where - Self: Sized; - /// Returns the ID of the encoding. fn id(&self) -> EncodingId; @@ -72,13 +68,6 @@ impl Encoding for EncodingAdapter { ArcRef::new_arc(Arc::new(EncodingAdapter::(self.0.clone()))) } - fn into_encoding(self) -> EncodingRef - where - Self: Sized, - { - todo!() - } - fn id(&self) -> EncodingId { V::id(&self.0) } diff --git a/vortex-array/src/execution/batch.rs b/vortex-array/src/execution/batch.rs new file mode 100644 index 00000000000..add37f977df --- /dev/null +++ b/vortex-array/src/execution/batch.rs @@ -0,0 +1,22 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use futures::future::BoxFuture; +use vortex_error::VortexResult; +use vortex_vector::Vector; + +use crate::ArrayRef; + +/// Type-alias for heap-allocated batch execution kernels. +pub type BatchKernel = BoxFuture<'static, VortexResult>; + +/// Context for binding batch execution kernels. +/// +/// By binding child arrays through this context, we can perform common subtree elimination and +/// share canonicalized results across multiple kernels. +pub trait BindCtx { + /// Bind the given array and optional selection to produce a batch kernel, possibly reusing + /// previously bound results from this context. + fn bind(&mut self, array: &ArrayRef, selection: Option<&ArrayRef>) + -> VortexResult; +} diff --git a/vortex-array/src/execution/mask.rs b/vortex-array/src/execution/mask.rs new file mode 100644 index 00000000000..89b9471bf94 --- /dev/null +++ b/vortex-array/src/execution/mask.rs @@ -0,0 +1,98 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::pin::Pin; +use std::task::{Context, Poll}; + +use futures::FutureExt; +use futures::future::BoxFuture; +use vortex_dtype::DType; +use vortex_dtype::Nullability::NonNullable; +use vortex_error::{VortexExpect, VortexResult, vortex_bail}; +use vortex_mask::Mask; + +use crate::ArrayRef; +use crate::execution::BindCtx; + +pub enum MaskExecution { + AllTrue(usize), + AllFalse(usize), + Future(BoxFuture<'static, VortexResult>), +} + +impl Future for MaskExecution { + type Output = VortexResult; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match self.get_mut() { + MaskExecution::AllTrue(len) => { + let mask = Mask::new_true(*len); + Poll::Ready(Ok(mask)) + } + MaskExecution::AllFalse(len) => { + let mask = Mask::new_false(*len); + Poll::Ready(Ok(mask)) + } + MaskExecution::Future(fut) => fut.poll_unpin(cx), + } + } +} + +impl dyn BindCtx + '_ { + /// Bind an optional selection mask into a `MaskExecution`. + /// + /// The caller must provide a mask length to handle the case where no mask is provided. + pub fn bind_selection( + &mut self, + mask_len: usize, + mask: Option<&ArrayRef>, + ) -> VortexResult { + match mask { + Some(mask) => { + assert_eq!(mask.len(), mask_len); + self.bind_mask(mask) + } + None => Ok(MaskExecution::AllTrue(mask_len)), + } + } + + /// Bind a non-nullable boolean array into a `MaskExecution`. + /// + /// This binding will optimize for constant arrays or other array types that can be more + /// efficiently converted into a `Mask`. + pub fn bind_mask(&mut self, mask: &ArrayRef) -> VortexResult { + if !matches!(mask.dtype(), DType::Bool(NonNullable)) { + vortex_bail!( + "Expected non-nullable boolean array for mask binding, got {}", + mask.dtype() + ); + } + + // Check for a constant mask + if let Some(scalar) = mask.as_constant() { + let constant = scalar + .as_bool() + .value() + .vortex_expect("checked non-nullable"); + let len = mask.len(); + if constant { + return Ok(MaskExecution::AllTrue(len)); + } else { + return Ok(MaskExecution::AllFalse(len)); + } + } + + // TODO(ngates): we may want to support creating masks from iterator of slices, in which + // case we could check for run-end encoding here? + + // If none of the above patterns match, we fall back to canonicalizing. + let execution = self.bind(mask, None)?; + Ok(MaskExecution::Future( + async move { + let mask = execution.await?.into_bool(); + Ok(Mask::from(mask.bits().clone())) + } + .boxed(), + )) + } +} diff --git a/vortex-array/src/execution/mod.rs b/vortex-array/src/execution/mod.rs new file mode 100644 index 00000000000..973c63b7660 --- /dev/null +++ b/vortex-array/src/execution/mod.rs @@ -0,0 +1,9 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +mod batch; +mod mask; +mod validity; + +pub use batch::*; +pub use mask::*; diff --git a/vortex-array/src/execution/validity.rs b/vortex-array/src/execution/validity.rs new file mode 100644 index 00000000000..4bac8c25b41 --- /dev/null +++ b/vortex-array/src/execution/validity.rs @@ -0,0 +1,52 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use futures::future::FutureExt; +use vortex_compute::filter::Filter; +use vortex_error::VortexResult; +use vortex_mask::Mask; + +use crate::ArrayRef; +use crate::execution::{BindCtx, MaskExecution}; +use crate::validity::Validity; + +impl dyn BindCtx + '_ { + /// Bind a validity helper into a [`MaskExecution`]. + pub fn bind_validity( + &mut self, + validity: &Validity, + array_len: usize, + selection: Option<&ArrayRef>, + ) -> VortexResult { + match selection { + None => match validity { + Validity::NonNullable | Validity::AllValid => Ok(MaskExecution::AllTrue(array_len)), + Validity::AllInvalid => Ok(MaskExecution::AllFalse(array_len)), + Validity::Array(validity) => self.bind_mask(validity), + }, + Some(selection) => { + let selection = self.bind_mask(selection)?; + match validity { + Validity::NonNullable | Validity::AllValid => Ok(MaskExecution::Future( + async move { Ok(Mask::AllTrue(selection.await?.true_count())) }.boxed(), + )), + Validity::AllInvalid => Ok(MaskExecution::Future( + async move { Ok(Mask::AllFalse(selection.await?.true_count())) }.boxed(), + )), + Validity::Array(validity) => { + let validity = self.bind_mask(validity)?; + Ok(MaskExecution::Future( + async move { + let validity = validity.await?; + let selection = selection.await?; + // We perform a take on the validity mask using the selection mask. + Ok(validity.filter(&selection)) + } + .boxed(), + )) + } + } + } + } + } +} diff --git a/vortex-array/src/executor.rs b/vortex-array/src/executor.rs index fccec6c8610..bc54df349d5 100644 --- a/vortex-array/src/executor.rs +++ b/vortex-array/src/executor.rs @@ -16,9 +16,9 @@ use crate::Canonical; use crate::operator::{BatchBindCtx, BatchExecution, BatchExecutionRef, OperatorKey, OperatorRef}; use crate::pipeline::operator::PipelineOperator; -/// An executor that runs an operator tree. +/// An execution that runs an operator tree. /// -/// The executor performs common subtree elimination by creating BatchExecution nodes that hold +/// The execution performs common subtree elimination by creating BatchExecution nodes that hold /// shared futures to the underlying execution. /// /// It also finds sub-graphs of operator operators and executes them as a operator. @@ -71,7 +71,7 @@ impl Executor { log::info!("Executing operator: {}", operator.display_tree()); println!("Executing operator: {}", operator.display_tree()); - // For each child, create a batch execution that uses the executor to compute it. + // For each child, create a batch execution that uses the execution to compute it. let mut children: Vec<_> = operator .children() .iter() diff --git a/vortex-array/src/lib.rs b/vortex-array/src/lib.rs index e4c13d1f12f..ebe4bab407b 100644 --- a/vortex-array/src/lib.rs +++ b/vortex-array/src/lib.rs @@ -31,7 +31,7 @@ mod canonical; pub mod compute; mod context; mod encoding; -pub mod executor; +pub mod execution; mod hash; pub mod iter; mod mask_future; diff --git a/vortex-array/src/operator/mod.rs b/vortex-array/src/operator/mod.rs index e89d27aa7fa..67ef0734895 100644 --- a/vortex-array/src/operator/mod.rs +++ b/vortex-array/src/operator/mod.rs @@ -20,10 +20,13 @@ //! Vortex array to be a wrapped around an operator that _does_ have a known length, amongst other //! properties (such as non-blocking evaluation). //! -//! We also introduce the idea of an executor that can evaluate an operator tree efficiently. It +//! We also introduce the idea of an execution that can evaluate an operator tree efficiently. It //! supports common subtree elimination, as well as extracting sub-graphs for pipelined and GPU -//! execution. The executor is also responsible for managing memory and scheduling work across +//! execution. The execution is also responsible for managing memory and scheduling work across //! different execution resources. +//! + +#![allow(dead_code)] pub mod canonical; pub mod compare; diff --git a/vortex-array/src/pipeline/operator/bind.rs b/vortex-array/src/pipeline/operator/bind.rs index c55147f64ed..16dc77b6972 100644 --- a/vortex-array/src/pipeline/operator/bind.rs +++ b/vortex-array/src/pipeline/operator/bind.rs @@ -1,6 +1,8 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors +#![allow(dead_code)] + use vortex_error::{VortexExpect, VortexResult}; use crate::pipeline::operator::PipelineNode; diff --git a/vortex-array/src/pipeline/operator/buffers.rs b/vortex-array/src/pipeline/operator/buffers.rs index 25db4584586..527dc71d9cc 100644 --- a/vortex-array/src/pipeline/operator/buffers.rs +++ b/vortex-array/src/pipeline/operator/buffers.rs @@ -3,6 +3,8 @@ //! Vector allocation strategy for pipelines +#![allow(dead_code)] + use std::cell::RefCell; use vortex_error::{VortexExpect, VortexResult}; diff --git a/vortex-array/src/pipeline/operator/input.rs b/vortex-array/src/pipeline/operator/input.rs index 857ca622912..577c6b54d32 100644 --- a/vortex-array/src/pipeline/operator/input.rs +++ b/vortex-array/src/pipeline/operator/input.rs @@ -1,6 +1,8 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors +#![allow(dead_code)] + use std::any::Any; use std::hash::Hasher; use std::marker::PhantomData; diff --git a/vortex-array/src/pipeline/operator/mod.rs b/vortex-array/src/pipeline/operator/mod.rs index c26ad473273..9b13fa3320f 100644 --- a/vortex-array/src/pipeline/operator/mod.rs +++ b/vortex-array/src/pipeline/operator/mod.rs @@ -1,6 +1,8 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors +#![allow(dead_code)] + mod bind; pub mod buffers; mod input; diff --git a/vortex-array/src/pipeline/operator/toposort.rs b/vortex-array/src/pipeline/operator/toposort.rs index ae257632df0..fa2655ca0c9 100644 --- a/vortex-array/src/pipeline/operator/toposort.rs +++ b/vortex-array/src/pipeline/operator/toposort.rs @@ -1,6 +1,8 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors +#![allow(dead_code)] + use std::collections::VecDeque; use vortex_error::{VortexResult, vortex_bail}; diff --git a/vortex-array/src/vtable/decode.rs b/vortex-array/src/vtable/canonical.rs similarity index 73% rename from vortex-array/src/vtable/decode.rs rename to vortex-array/src/vtable/canonical.rs index 4a5cd2c2344..57bab935c58 100644 --- a/vortex-array/src/vtable/decode.rs +++ b/vortex-array/src/vtable/canonical.rs @@ -1,11 +1,12 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors +use vortex_error::vortex_panic; + use crate::Canonical; use crate::builders::ArrayBuilder; -use crate::vtable::VTable; +use crate::vtable::{NotSupported, VTable}; -// TODO(ngates): rename to `DecodeVTable`. pub trait CanonicalVTable { /// Returns the canonical representation of the array. /// @@ -24,3 +25,12 @@ pub trait CanonicalVTable { builder.extend_from_array(canonical.as_ref()) } } + +impl CanonicalVTable for NotSupported { + fn canonicalize(array: &V::Array) -> Canonical { + vortex_panic!( + "Legacy canonicalize is not supported for {} arrays", + array.encoding_id() + ) + } +} diff --git a/vortex-array/src/vtable/mod.rs b/vortex-array/src/vtable/mod.rs index 7035b1bddb1..e0507582896 100644 --- a/vortex-array/src/vtable/mod.rs +++ b/vortex-array/src/vtable/mod.rs @@ -4,8 +4,8 @@ //! This module contains the VTable definitions for a Vortex encoding. mod array; +mod canonical; mod compute; -mod decode; mod encode; mod operations; mod operator; @@ -17,8 +17,8 @@ use std::fmt::Debug; use std::ops::Deref; pub use array::*; +pub use canonical::*; pub use compute::*; -pub use decode::*; pub use encode::*; pub use operations::*; pub use operator::*; @@ -63,9 +63,9 @@ pub trait VTable: 'static + Sized + Send + Sync + Debug { /// Optionally enable serde for this encoding by implementing the [`SerdeVTable`] trait. /// Can be disabled by assigning to the [`NotSupported`] type. type SerdeVTable: SerdeVTable; - /// Optionally enable the [`PipelineVTable`] for this encoding. This allows it to partake in + /// Optionally enable the [`OperatorVTable`] for this encoding. This allows it to partake in /// operator operations. - type PipelineVTable: PipelineVTable; + type OperatorVTable: OperatorVTable; /// Returns the ID of the encoding. fn id(encoding: &Self::Encoding) -> EncodingId; diff --git a/vortex-array/src/vtable/operations.rs b/vortex-array/src/vtable/operations.rs index 285cd11dc9b..08fe7e85f5f 100644 --- a/vortex-array/src/vtable/operations.rs +++ b/vortex-array/src/vtable/operations.rs @@ -3,10 +3,11 @@ use std::ops::Range; +use vortex_error::vortex_panic; use vortex_scalar::Scalar; use crate::ArrayRef; -use crate::vtable::VTable; +use crate::vtable::{NotSupported, VTable}; pub trait OperationsVTable { /// Perform a constant-time slice of the array. @@ -30,3 +31,19 @@ pub trait OperationsVTable { /// and the index is guaranteed to be non-null. fn scalar_at(array: &V::Array, index: usize) -> Scalar; } + +impl OperationsVTable for NotSupported { + fn slice(array: &V::Array, _range: Range) -> ArrayRef { + vortex_panic!( + "Legacy slice operation is not supported for {} arrays", + array.encoding_id() + ) + } + + fn scalar_at(array: &V::Array, _index: usize) -> Scalar { + vortex_panic!( + "Legacy scalar_at operation is not supported for {} arrays", + array.encoding_id() + ) + } +} diff --git a/vortex-array/src/vtable/operator.rs b/vortex-array/src/vtable/operator.rs index 1ebd9746789..79957c58a0b 100644 --- a/vortex-array/src/vtable/operator.rs +++ b/vortex-array/src/vtable/operator.rs @@ -1,19 +1,95 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors -use vortex_error::VortexResult; +use vortex_error::{VortexResult, vortex_bail}; +use crate::ArrayRef; +use crate::execution::{BatchKernel, BindCtx}; use crate::operator::OperatorRef; use crate::vtable::{NotSupported, VTable}; -pub trait PipelineVTable { +/// A vtable for the new operator-based array functionality. Eventually this vtable will be +/// merged into the main `VTable`, but for now it is kept separate to allow for incremental +/// adoption of the new operator framework. +/// +/// See for the operators RFC. +pub trait OperatorVTable { /// Convert the current array into a [`OperatorRef`]. /// Returns `None` if the array cannot be converted to an operator. - fn to_operator(array: &V::Array) -> VortexResult>; + fn to_operator(_array: &V::Array) -> VortexResult> { + Ok(None) + } + + /// Attempt to optimize this array by analyzing its children. + /// + /// For example, if all the children are constant, this function should perform constant + /// folding and return a constant operator. + /// + /// This function should typically be implemented only for self-contained optimizations based + /// on child properties. + /// + /// Returns `None` if no optimization is possible. + fn reduce_children(_array: &V::Array) -> VortexResult> { + Ok(None) + } + + /// Attempt to push down a parent array through this node. + /// + /// The `child_idx` parameter indicates which child of the parent this array occupies. + /// For example, if the parent is a binary array, and this array is the left child, + /// then `child_idx` will be 0. If this array is the right child, then `child_idx` will be 1. + /// + /// The returned array will replace the parent in the tree. + /// + /// This function should typically be implemented for cross-array optimizations where the + /// child needs to adapt to the parent's requirements. + /// + /// Returns `None` if no optimization is possible. + fn reduce_parent( + _array: &V::Array, + _parent: ArrayRef, + _child_idx: usize, + ) -> VortexResult> { + Ok(None) + } + + /// Bind the array for execution in batch mode. + /// + /// This function should return a [`BatchKernel`] that can be used to execute the array in + /// batch mode. + /// + /// The selection parameter is a non-nullable boolean array that indicates which rows to + /// return. i.e. the result of the kernel should be a vector whose length is equal to the + /// true count of the selection array. + /// + /// The context should be used to bind child arrays in order to support common subtree + /// elimination. See also the utility functions on the `BindCtx` for efficiently extracting + /// common objects such as a [`vortex_mask::Mask`]. + fn bind( + array: &V::Array, + _selection: Option<&ArrayRef>, + _ctx: &mut dyn BindCtx, + ) -> VortexResult { + vortex_bail!( + "Bind is not yet implemented for {} arrays", + array.encoding_id() + ) + } } -impl PipelineVTable for NotSupported { +impl OperatorVTable for NotSupported { fn to_operator(_array: &V::Array) -> VortexResult> { Ok(None) } + + fn bind( + array: &V::Array, + _selection: Option<&ArrayRef>, + _ctx: &mut dyn BindCtx, + ) -> VortexResult { + vortex_bail!( + "Pipeline execution not supported for this encoding: {:?}", + array.encoding_id(), + ) + } } diff --git a/vortex-array/src/vtable/validity.rs b/vortex-array/src/vtable/validity.rs index a918a61edb2..d84a57342db 100644 --- a/vortex-array/src/vtable/validity.rs +++ b/vortex-array/src/vtable/validity.rs @@ -1,10 +1,11 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors +use vortex_error::vortex_panic; use vortex_mask::Mask; use crate::validity::Validity; -use crate::vtable::VTable; +use crate::vtable::{NotSupported, VTable}; use crate::{Array, ArrayRef}; pub trait ValidityVTable { @@ -33,6 +34,36 @@ pub trait ValidityVTable { fn validity_mask(array: &V::Array) -> Mask; } +impl ValidityVTable for NotSupported { + fn is_valid(array: &V::Array, _index: usize) -> bool { + vortex_panic!( + "Legacy is_valid is not supported for {} arrays", + array.encoding_id() + ) + } + + fn all_valid(array: &V::Array) -> bool { + vortex_panic!( + "Legacy all_valid is not supported for {} arrays", + array.encoding_id() + ) + } + + fn all_invalid(array: &V::Array) -> bool { + vortex_panic!( + "Legacy all_invalid is not supported for {} arrays", + array.encoding_id() + ) + } + + fn validity_mask(array: &V::Array) -> Mask { + vortex_panic!( + "Legacy validity_mask is not supported for {} arrays", + array.encoding_id() + ) + } +} + /// An implementation of the [`ValidityVTable`] for arrays that hold validity as a child array. pub struct ValidityVTableFromValidityHelper; diff --git a/vortex-io/src/runtime/current.rs b/vortex-io/src/runtime/current.rs index d997845cff8..cb7807738c0 100644 --- a/vortex-io/src/runtime/current.rs +++ b/vortex-io/src/runtime/current.rs @@ -68,7 +68,7 @@ impl CurrentThreadRuntime { let stream = f(self.handle()); // We create an MPMC result channel and spawn a task to drive the stream and send results. - // This allows multiple worker threads to drive the executor while all waiting for results + // This allows multiple worker threads to drive the execution while all waiting for results // on the channel. let (result_tx, result_rx) = kanal::bounded_async(1); self.executor @@ -91,7 +91,7 @@ impl CurrentThreadRuntime { } } -/// An iterator that wraps up a stream to drive it using the current thread executor. +/// An iterator that wraps up a stream to drive it using the current thread execution. pub struct CurrentThreadIterator<'a, T> { executor: Arc>, stream: BoxStream<'a, T>, diff --git a/vortex-io/src/runtime/single.rs b/vortex-io/src/runtime/single.rs index b24e09e2183..b8e16c69e05 100644 --- a/vortex-io/src/runtime/single.rs +++ b/vortex-io/src/runtime/single.rs @@ -45,8 +45,8 @@ impl Sender { let (blocking_send, blocking_recv) = kanal::unbounded::(); let (io_send, io_recv) = kanal::unbounded::(); - // We pass weak references to the local executor into the async tasks such that the task's - // reference doesn't keep the executor alive after the runtime is dropped. + // We pass weak references to the local execution into the async tasks such that the task's + // reference doesn't keep the execution alive after the runtime is dropped. let weak_local = Rc::downgrade(local); // Drive scheduling tasks. @@ -253,7 +253,7 @@ impl AbortHandle for LazyAbortHandle { } } -/// A stream that wraps up the stream with the executor that drives it. +/// A stream that wraps up the stream with the execution that drives it. pub struct SingleThreadIterator<'a, T> { executor: Rc>, stream: LocalBoxStream<'a, T>, diff --git a/vortex-io/src/runtime/smol.rs b/vortex-io/src/runtime/smol.rs index e1e3f2338c3..efde32dcba7 100644 --- a/vortex-io/src/runtime/smol.rs +++ b/vortex-io/src/runtime/smol.rs @@ -14,7 +14,7 @@ impl Executor for smol::Executor<'static> { } fn spawn_cpu(&self, task: Box) -> AbortHandleRef { - // For now, we spawn CPU work back onto the same executor. + // For now, we spawn CPU work back onto the same execution. SmolAbortHandle::new_handle(smol::Executor::spawn(self, async move { task() })) } diff --git a/vortex-layout/src/layouts/flat/reader.rs b/vortex-layout/src/layouts/flat/reader.rs index f17232a7280..b29205fe969 100644 --- a/vortex-layout/src/layouts/flat/reader.rs +++ b/vortex-layout/src/layouts/flat/reader.rs @@ -2,22 +2,17 @@ // SPDX-FileCopyrightText: Copyright the Vortex contributors use std::collections::BTreeSet; -use std::env; use std::ops::{BitAnd, Range}; -use std::sync::{Arc, LazyLock}; +use std::sync::Arc; use futures::FutureExt; use futures::future::BoxFuture; use vortex_array::compute::filter; -use vortex_array::executor::Executor; -use vortex_array::operator::OperatorRef; -use vortex_array::operator::filter::FilterOperator; -use vortex_array::operator::slice::SliceOperator; use vortex_array::serde::ArrayParts; use vortex_array::stats::Precision; -use vortex_array::{Array, ArrayRef, IntoArray, MaskFuture}; +use vortex_array::{Array, ArrayRef, MaskFuture}; use vortex_dtype::{DType, FieldMask}; -use vortex_error::{VortexExpect, VortexResult, VortexUnwrap as _, vortex_bail}; +use vortex_error::{VortexExpect, VortexResult, VortexUnwrap as _}; use vortex_expr::{ExprRef, Scope, is_root}; use vortex_mask::Mask; @@ -33,13 +28,6 @@ use crate::segments::SegmentSource; // actual expression? Perhaps all expressions are given a selection mask to decide for themselves? const EXPR_EVAL_THRESHOLD: f64 = 0.2; -/// While we develop operator-based evaluation, we can enable it via an environment variable. -static USE_OPERATOR_EVAL: LazyLock = LazyLock::new(|| { - env::var("VORTEX_USE_OPERATOR_EVAL") - .ok() - .is_some_and(|v| v == "1") -}); - pub struct FlatReader { layout: FlatLayout, name: Arc, @@ -134,14 +122,6 @@ impl LayoutReader for FlatReader { let mut array = array.clone().await?; let mask = mask.await?; - if *USE_OPERATOR_EVAL { - let array = - try_evaluate_using_operator(row_range.clone(), &array, &expr, &mask).await?; - let array_mask = array.try_to_mask_fill_null_false()?; - let mask = mask.intersect_by_rank(&array_mask); - return Ok(mask); - } - // Slice the array based on the row mask. if row_range.start > 0 || row_range.end < array.len() { array = array.slice(row_range.clone()); @@ -201,10 +181,6 @@ impl LayoutReader for FlatReader { let mut array = array.clone().await?; let mask = mask.await?; - if *USE_OPERATOR_EVAL { - return try_evaluate_using_operator(row_range.clone(), &array, &expr, &mask).await; - } - // Slice the array based on the row mask. if row_range.start > 0 || row_range.end < array.len() { array = array.slice(row_range.clone()); @@ -226,34 +202,6 @@ impl LayoutReader for FlatReader { } } -async fn try_evaluate_using_operator( - row_range: Range, - array: &ArrayRef, - expr: &ExprRef, - mask: &Mask, -) -> VortexResult { - let Some(operator) = array.to_operator()? else { - vortex_bail!( - "ArrayEvaluation: cannot convert array to operator {}", - array.display_tree() - ); - }; - let Some(operator) = expr.operator(&operator)? else { - vortex_bail!("ArrayEvaluation: cannot convert expr to operator {}", expr); - }; - - let mut operator: OperatorRef = Arc::new(SliceOperator::try_new(operator, row_range)?); - if !mask.all_true() { - operator = Arc::new(FilterOperator::new(operator, mask.clone())); - } - - // TODO(ngates): in the future we should be able to return operators from projection. - println!("Optimizing operator: {}", operator.display_tree()); - let operator = operator.optimize()?; - println!("Executing operator: {}", operator.display_tree()); - Ok(Executor::default().execute(operator).await?.into_array()) -} - #[cfg(test)] mod test { use std::sync::Arc; diff --git a/vortex-python/src/arrays/py/vtable.rs b/vortex-python/src/arrays/py/vtable.rs index ebaa58db739..82709306ae7 100644 --- a/vortex-python/src/arrays/py/vtable.rs +++ b/vortex-python/src/arrays/py/vtable.rs @@ -40,7 +40,7 @@ impl VTable for PythonVTable { type ComputeVTable = Self; type EncodeVTable = Self; type SerdeVTable = Self; - type PipelineVTable = NotSupported; + type OperatorVTable = NotSupported; fn id(encoding: &Self::Encoding) -> EncodingId { encoding.id.clone()