From fc7d78c9abb628755fc2b7706380e0e6561a5ac0 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Fri, 26 Jul 2024 21:46:39 -0700 Subject: [PATCH] refactor: new buffer abstractions in decoders (#2648) Introduce LanceBuffer, copy-on-write version designed for interoperability between arrow_buffer::Buffer, bytes::Bytes, and Vec (and their mutable counterparts). Use this to remove some copies on the decode path (the value decoder is now truly zero-copy). Introduce the DataBlock and a concept of "data layouts". Use this to simplify decoding pathways. --- rust/lance-encoding-datafusion/src/zone.rs | 6 +- rust/lance-encoding/src/buffer.rs | 91 ++++++ rust/lance-encoding/src/data.rs | 234 +++++++++++++ rust/lance-encoding/src/decoder.rs | 26 +- rust/lance-encoding/src/encodings.rs | 1 - .../src/encodings/logical/primitive.rs | 39 ++- .../src/encodings/physical/basic.rs | 64 +--- .../src/encodings/physical/binary.rs | 56 ++-- .../src/encodings/physical/bitmap.rs | 44 +-- .../src/encodings/physical/bitpack.rs | 34 +- .../src/encodings/physical/dictionary.rs | 70 ++-- .../src/encodings/physical/fixed_size_list.rs | 21 +- .../src/encodings/physical/fsst.rs | 72 ++-- .../src/encodings/physical/packed_struct.rs | 26 +- .../src/encodings/physical/value.rs | 94 +++--- rust/lance-encoding/src/encodings/utils.rs | 308 ------------------ rust/lance-encoding/src/lib.rs | 2 + rust/lance-encoding/src/testing.rs | 10 +- 18 files changed, 565 insertions(+), 633 deletions(-) create mode 100644 rust/lance-encoding/src/buffer.rs create mode 100644 rust/lance-encoding/src/data.rs delete mode 100644 rust/lance-encoding/src/encodings/utils.rs diff --git a/rust/lance-encoding-datafusion/src/zone.rs b/rust/lance-encoding-datafusion/src/zone.rs index 4bac25f73e..b69928d4c6 100644 --- a/rust/lance-encoding-datafusion/src/zone.rs +++ b/rust/lance-encoding-datafusion/src/zone.rs @@ -604,7 +604,7 @@ mod tests { let decoder_middleware = DecoderMiddlewareChain::new() .add_strategy(Arc::new(LanceDfFieldDecoderStrategy::new(schema.clone()))) - .add_strategy(Arc::new(CoreFieldDecoderStrategy)); + .add_strategy(Arc::new(CoreFieldDecoderStrategy::default())); let num_rows = data.iter().map(|rb| rb.num_rows()).sum::(); @@ -618,7 +618,7 @@ mod tests { let decoder_middleware = DecoderMiddlewareChain::new() .add_strategy(Arc::new(LanceDfFieldDecoderStrategy::new(schema.clone()))) - .add_strategy(Arc::new(CoreFieldDecoderStrategy)); + .add_strategy(Arc::new(CoreFieldDecoderStrategy::default())); let result = count_lance_file( &fs, @@ -638,7 +638,7 @@ mod tests { let decoder_middleware = DecoderMiddlewareChain::new() .add_strategy(Arc::new(LanceDfFieldDecoderStrategy::new(schema.clone()))) - .add_strategy(Arc::new(CoreFieldDecoderStrategy)); + .add_strategy(Arc::new(CoreFieldDecoderStrategy::default())); let result = count_lance_file( &fs, diff --git a/rust/lance-encoding/src/buffer.rs b/rust/lance-encoding/src/buffer.rs new file mode 100644 index 0000000000..1112b0c4d4 --- /dev/null +++ b/rust/lance-encoding/src/buffer.rs @@ -0,0 +1,91 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +//! Utilities for byte arrays + +use std::{ops::Deref, ptr::NonNull, sync::Arc}; + +use arrow_buffer::Buffer; + +/// A copy-on-write byte buffer +/// +/// It can be created from read-only buffers (e.g. bytes::Bytes or arrow_buffer::Buffer), e.g. "borrowed" +/// or from writeable buffers (e.g. Vec), e.g. "owned" +#[derive(Debug)] +pub enum LanceBuffer { + Borrowed(Buffer), + Owned(Vec), +} + +impl LanceBuffer { + /// Convert into a mutable buffer. If this is a borrowed buffer, the data will be copied. + pub fn into_owned(self) -> Vec { + match self { + LanceBuffer::Borrowed(buffer) => buffer.to_vec(), + LanceBuffer::Owned(buffer) => buffer, + } + } + + /// Convert into an Arrow buffer. Never copies data. + pub fn into_buffer(self) -> Buffer { + match self { + LanceBuffer::Borrowed(buffer) => buffer, + LanceBuffer::Owned(buffer) => Buffer::from_vec(buffer), + } + } + + /// Create a LanceBuffer from a bytes::Bytes object + /// + /// The alignment must be specified (as `bytes_per_value`) since we want to make + /// sure we can safely reinterpret the buffer. + /// + /// If the buffer is properly aligned this will be zero-copy. If not, a copy + /// will be made and an owned buffer returned. + pub fn from_bytes(bytes: bytes::Bytes, bytes_per_value: u64) -> LanceBuffer { + if bytes.as_ptr().align_offset(bytes_per_value as usize) != 0 { + // The original buffer is not aligned, cannot zero-copy + let mut buf = Vec::with_capacity(bytes.len()); + buf.extend_from_slice(&bytes); + LanceBuffer::Owned(buf) + } else { + // The original buffer is aligned, can zero-copy + // SAFETY: the alignment is correct we can make this conversion + unsafe { + LanceBuffer::Borrowed(Buffer::from_custom_allocation( + NonNull::new(bytes.as_ptr() as _).expect("should be a valid pointer"), + bytes.len(), + Arc::new(bytes), + )) + } + } + } +} + +impl AsRef<[u8]> for LanceBuffer { + fn as_ref(&self) -> &[u8] { + match self { + LanceBuffer::Borrowed(buffer) => buffer.as_slice(), + LanceBuffer::Owned(buffer) => buffer.as_slice(), + } + } +} + +impl Deref for LanceBuffer { + type Target = [u8]; + + fn deref(&self) -> &Self::Target { + self.as_ref() + } +} + +impl From> for LanceBuffer { + fn from(buffer: Vec) -> Self { + LanceBuffer::Owned(buffer) + } +} + +impl From for LanceBuffer { + fn from(buffer: Buffer) -> Self { + LanceBuffer::Borrowed(buffer) + } +} diff --git a/rust/lance-encoding/src/data.rs b/rust/lance-encoding/src/data.rs new file mode 100644 index 0000000000..59afc863ba --- /dev/null +++ b/rust/lance-encoding/src/data.rs @@ -0,0 +1,234 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +//! Data layouts to represent encoded data in a sub-Arrow format + +use std::any::Any; + +use arrow::array::{ArrayData, ArrayDataBuilder}; +use arrow_schema::DataType; +use snafu::{location, Location}; + +use lance_core::{Error, Result}; + +use crate::buffer::LanceBuffer; + +/// A DataBlock is a collection of buffers that represents an "array" of data in very generic terms +/// +/// The output of each decoder is a DataBlock. Decoders can be chained together to transform +/// one DataBlock into a different kind of DataBlock. +/// +/// The DataBlock is somewhere in between Arrow's ArrayData and Array and represents a physical +/// layout of the data. +/// +/// A DataBlock can be converted into an Arrow ArrayData (and then Array) for a given array type. +/// For example, a FixedWidthDataBlock can be converted into any primitive type or a fixed size +/// list of a primitive type. +pub trait DataBlock: Any { + /// Get a reference to the Any trait object + fn as_any(&self) -> &dyn Any; + /// Convert self into a Box + fn as_any_box(self: Box) -> Box; + /// Convert self into an Arrow ArrayData + fn into_arrow(self: Box, data_type: DataType, validate: bool) -> Result; +} + +/// Extension trait for DataBlock +pub trait DataBlockExt { + /// Try to convert a DataBlock into a specific layout + fn try_into_layout(self) -> Result>; +} + +impl DataBlockExt for Box { + fn try_into_layout(self) -> Result> { + self.as_any_box() + .downcast::() + .map_err(|_| Error::Internal { + message: "Couldn't convert to expected layout".to_string(), + location: location!(), + }) + } +} + +/// A data block with no buffers where everything is null +pub struct AllNullDataBlock { + /// The number of values represented by this block + pub num_values: u64, +} + +impl DataBlock for AllNullDataBlock { + fn as_any(&self) -> &dyn Any { + self + } + + fn as_any_box(self: Box) -> Box { + self + } + + fn into_arrow(self: Box, data_type: DataType, _validate: bool) -> Result { + Ok(ArrayData::new_null(&data_type, self.num_values as usize)) + } +} + +/// Wraps a data block and adds nullability information to it +pub struct NullableDataBlock { + /// The underlying data + pub data: Box, + /// A bitmap of validity for each value + pub nulls: LanceBuffer, +} + +impl DataBlock for NullableDataBlock { + fn as_any(&self) -> &dyn Any { + self + } + + fn as_any_box(self: Box) -> Box { + self + } + + fn into_arrow(self: Box, data_type: DataType, validate: bool) -> Result { + let nulls = self.nulls.into_buffer(); + let data = self.data.into_arrow(data_type, validate)?.into_builder(); + let data = data.null_bit_buffer(Some(nulls)); + if validate { + Ok(data.build()?) + } else { + Ok(unsafe { data.build_unchecked() }) + } + } +} + +/// A data block for a single buffer of data where each element has a fixed number of bits +pub struct FixedWidthDataBlock { + /// The data buffer + pub data: LanceBuffer, + /// The number of bits per value + pub bits_per_value: u64, + /// The number of values represented by this block + pub num_values: u64, +} + +impl FixedWidthDataBlock { + fn do_into_arrow( + self: Box, + data_type: DataType, + num_values: u64, + validate: bool, + ) -> Result { + let builder = match &data_type { + DataType::FixedSizeList(child_field, dim) => { + let child_len = num_values * *dim as u64; + let child_data = + self.do_into_arrow(child_field.data_type().clone(), child_len, validate)?; + ArrayDataBuilder::new(data_type) + .add_child_data(child_data) + .len(num_values as usize) + .null_count(0) + } + _ => { + let data_buffer = self.data.into_buffer(); + ArrayDataBuilder::new(data_type) + .add_buffer(data_buffer) + .len(num_values as usize) + .null_count(0) + } + }; + if validate { + Ok(builder.build()?) + } else { + Ok(unsafe { builder.build_unchecked() }) + } + } +} + +impl DataBlock for FixedWidthDataBlock { + fn as_any(&self) -> &dyn Any { + self + } + + fn as_any_box(self: Box) -> Box { + self + } + + fn into_arrow(self: Box, data_type: DataType, validate: bool) -> Result { + let root_num_values = self.num_values; + self.do_into_arrow(data_type, root_num_values, validate) + } +} + +/// A data block for variable-width data (e.g. strings, packed rows, etc.) +pub struct VariableWidthBlock { + /// The data buffer + pub data: LanceBuffer, + /// The offsets buffer (contains num_values + 1 offsets) + pub offsets: LanceBuffer, + /// The number of bits per offset + pub bits_per_offset: u8, + /// The number of values represented by this block + pub num_values: u64, +} + +impl DataBlock for VariableWidthBlock { + fn as_any(&self) -> &dyn Any { + self + } + + fn as_any_box(self: Box) -> Box { + self + } + + fn into_arrow(self: Box, data_type: DataType, validate: bool) -> Result { + let data_buffer = self.data.into_buffer(); + let offsets_buffer = self.offsets.into_buffer(); + let builder = ArrayDataBuilder::new(data_type) + .add_buffer(offsets_buffer) + .add_buffer(data_buffer) + .len(self.num_values as usize) + .null_count(0); + if validate { + Ok(builder.build()?) + } else { + Ok(unsafe { builder.build_unchecked() }) + } + } +} + +/// A data block representing a struct +pub struct StructDataBlock { + /// The child arrays + pub children: Vec>, +} + +impl DataBlock for StructDataBlock { + fn as_any(&self) -> &dyn Any { + self + } + + fn as_any_box(self: Box) -> Box { + self + } + + fn into_arrow(self: Box, data_type: DataType, validate: bool) -> Result { + if let DataType::Struct(fields) = &data_type { + let mut builder = ArrayDataBuilder::new(DataType::Struct(fields.clone())); + let mut num_rows = 0; + for (field, child) in fields.iter().zip(self.children) { + let child_data = child.into_arrow(field.data_type().clone(), validate)?; + num_rows = child_data.len(); + builder = builder.add_child_data(child_data); + } + let builder = builder.null_count(0).len(num_rows); + if validate { + Ok(builder.build()?) + } else { + Ok(unsafe { builder.build_unchecked() }) + } + } else { + Err(Error::Internal { + message: format!("Expected Struct, got {:?}", data_type), + location: location!(), + }) + } + } +} diff --git a/rust/lance-encoding/src/decoder.rs b/rust/lance-encoding/src/decoder.rs index db08c9b228..fe88f7941b 100644 --- a/rust/lance-encoding/src/decoder.rs +++ b/rust/lance-encoding/src/decoder.rs @@ -218,7 +218,7 @@ use std::{ops::Range, sync::Arc}; use arrow_array::cast::AsArray; use arrow_array::{ArrayRef, RecordBatch}; use arrow_schema::{DataType, Field as ArrowField, Fields, Schema as ArrowSchema}; -use bytes::{Bytes, BytesMut}; +use bytes::Bytes; use futures::future::BoxFuture; use futures::stream::BoxStream; use futures::{FutureExt, StreamExt}; @@ -231,6 +231,7 @@ use tokio::sync::mpsc::{self, unbounded_channel}; use lance_core::{Error, Result}; use tracing::instrument; +use crate::data::DataBlock; use crate::encoder::{values_column_encoding, EncodedBatch}; use crate::encodings::logical::list::{ListFieldScheduler, OffsetPageInfo}; use crate::encodings::logical::primitive::PrimitiveFieldScheduler; @@ -323,7 +324,7 @@ impl Default for DecoderMiddlewareChain { Self { chain: Default::default(), } - .add_strategy(Arc::new(CoreFieldDecoderStrategy)) + .add_strategy(Arc::new(CoreFieldDecoderStrategy::default())) } } @@ -565,7 +566,9 @@ pub trait FieldDecoderStrategy: Send + Sync + std::fmt::Debug { /// The core decoder strategy handles all the various Arrow types #[derive(Debug, Default)] -pub struct CoreFieldDecoderStrategy; +pub struct CoreFieldDecoderStrategy { + pub validate_data: bool, +} impl CoreFieldDecoderStrategy { /// This is just a sanity check to ensure there is no "wrapped encodings" @@ -608,6 +611,7 @@ impl CoreFieldDecoderStrategy { } fn create_primitive_scheduler( + &self, data_type: &DataType, path: &VecDeque, column: &ColumnInfo, @@ -623,6 +627,7 @@ impl CoreFieldDecoderStrategy { data_type.clone(), column.page_infos.clone(), column_buffers, + self.validate_data, ))) } @@ -659,7 +664,7 @@ impl FieldDecoderStrategy for CoreFieldDecoderStrategy { let data_type = field.data_type(); if Self::is_primitive(&data_type) { let primitive_col = column_infos.next().unwrap(); - let scheduler = Self::create_primitive_scheduler( + let scheduler = self.create_primitive_scheduler( &data_type, chain.current_path(), primitive_col, @@ -673,7 +678,7 @@ impl FieldDecoderStrategy for CoreFieldDecoderStrategy { // depending on the child data type. if Self::is_primitive(inner.data_type()) { let primitive_col = column_infos.next().unwrap(); - let scheduler = Self::create_primitive_scheduler( + let scheduler = self.create_primitive_scheduler( &data_type, chain.current_path(), primitive_col, @@ -732,6 +737,7 @@ impl FieldDecoderStrategy for CoreFieldDecoderStrategy { DataType::UInt64, Arc::from(inner_infos.into_boxed_slice()), offsets_column_buffers, + self.validate_data, )) as Arc; let offset_type = if matches!(data_type, DataType::List(_)) { DataType::Int32 @@ -754,7 +760,7 @@ impl FieldDecoderStrategy for CoreFieldDecoderStrategy { if Self::check_packed_struct(column_info) { // use packed struct encoding - let scheduler = Self::create_primitive_scheduler( + let scheduler = self.create_primitive_scheduler( &data_type, chain.current_path(), column_info, @@ -1217,13 +1223,7 @@ pub trait PrimitivePageDecoder: Send + Sync { /// * `rows_to_skip` - how many rows to skip (within the page) before decoding /// * `num_rows` - how many rows to decode /// * `all_null` - A mutable bool, set to true if a decoder determines all values are null - fn decode( - &self, - rows_to_skip: u64, - num_rows: u64, - all_null: &mut bool, - ) -> Result>; - fn num_buffers(&self) -> u32; + fn decode(&self, rows_to_skip: u64, num_rows: u64) -> Result>; } /// A scheduler for single-column encodings of primitive data diff --git a/rust/lance-encoding/src/encodings.rs b/rust/lance-encoding/src/encodings.rs index 808cf421b8..22fc5ac875 100644 --- a/rust/lance-encoding/src/encodings.rs +++ b/rust/lance-encoding/src/encodings.rs @@ -3,4 +3,3 @@ pub mod logical; pub mod physical; -pub mod utils; diff --git a/rust/lance-encoding/src/encodings/logical/primitive.rs b/rust/lance-encoding/src/encodings/logical/primitive.rs index 2c81908651..777ad03bea 100644 --- a/rust/lance-encoding/src/encodings/logical/primitive.rs +++ b/rust/lance-encoding/src/encodings/logical/primitive.rs @@ -3,7 +3,7 @@ use std::{fmt::Debug, ops::Range, sync::Arc}; -use arrow_array::{new_null_array, ArrayRef}; +use arrow_array::{make_array, ArrayRef}; use arrow_schema::DataType; use futures::{future::BoxFuture, FutureExt}; use lance_arrow::deepcopy::deep_copy_array; @@ -21,8 +21,6 @@ use crate::{ encodings::physical::{decoder_from_array_encoding, ColumnBuffers, PageBuffers}, }; -use crate::encodings::utils::primitive_array_from_buffers; - #[derive(Debug)] struct PrimitivePage { scheduler: Box, @@ -43,10 +41,16 @@ pub struct PrimitiveFieldScheduler { data_type: DataType, page_schedulers: Vec, num_rows: u64, + should_validate: bool, } impl PrimitiveFieldScheduler { - pub fn new(data_type: DataType, pages: Arc<[PageInfo]>, buffers: ColumnBuffers) -> Self { + pub fn new( + data_type: DataType, + pages: Arc<[PageInfo]>, + buffers: ColumnBuffers, + should_validate: bool, + ) -> Self { let page_schedulers = pages .iter() .map(|page| { @@ -67,6 +71,7 @@ impl PrimitiveFieldScheduler { data_type, page_schedulers, num_rows, + should_validate, } } } @@ -164,6 +169,7 @@ impl<'a> SchedulingJob for PrimitiveFieldSchedulingJob<'a> { physical_decoder: None, rows_drained: 0, num_rows: num_rows_in_next, + should_validate: self.scheduler.should_validate, }; let decoder = Box::new(logical_decoder); @@ -201,6 +207,7 @@ pub struct PrimitiveFieldDecoder { data_type: DataType, unloaded_physical_decoder: Option>>>, physical_decoder: Option>, + should_validate: bool, num_rows: u64, rows_drained: u64, } @@ -210,11 +217,13 @@ impl PrimitiveFieldDecoder { physical_decoder: Arc, data_type: DataType, num_rows: u64, + should_validate: bool, ) -> Self { Self { data_type, unloaded_physical_decoder: None, physical_decoder: Some(physical_decoder), + should_validate, num_rows, rows_drained: 0, } @@ -234,27 +243,20 @@ impl Debug for PrimitiveFieldDecoder { struct PrimitiveFieldDecodeTask { rows_to_skip: u64, rows_to_take: u64, + should_validate: bool, physical_decoder: Arc, data_type: DataType, } impl DecodeArrayTask for PrimitiveFieldDecodeTask { fn decode(self: Box) -> Result { - let mut all_null = false; - - // The number of buffers needed is based on the data type. - // Most data types need two buffers but each layer of fixed-size-list, for - // example, adds another validity buffer. - let bufs = - self.physical_decoder - .decode(self.rows_to_skip, self.rows_to_take, &mut all_null)?; - - if all_null { - return Ok(new_null_array(&self.data_type, self.rows_to_take as usize)); - } + let block = self + .physical_decoder + .decode(self.rows_to_skip, self.rows_to_take)?; - // Convert the buffers into an Arrow array - primitive_array_from_buffers(&self.data_type, bufs, self.rows_to_take) + Ok(make_array( + block.into_arrow(self.data_type, self.should_validate)?, + )) } } @@ -279,6 +281,7 @@ impl LogicalPageDecoder for PrimitiveFieldDecoder { let task = Box::new(PrimitiveFieldDecodeTask { rows_to_skip, rows_to_take, + should_validate: self.should_validate, physical_decoder: self.physical_decoder.as_ref().unwrap().clone(), data_type: self.data_type.clone(), }); diff --git a/rust/lance-encoding/src/encodings/physical/basic.rs b/rust/lance-encoding/src/encodings/physical/basic.rs index dd48a9c438..4e09de268b 100644 --- a/rust/lance-encoding/src/encodings/physical/basic.rs +++ b/rust/lance-encoding/src/encodings/physical/basic.rs @@ -5,11 +5,11 @@ use std::sync::Arc; use arrow_array::{ArrayRef, BooleanArray}; use arrow_buffer::BooleanBuffer; -use bytes::BytesMut; use futures::{future::BoxFuture, FutureExt}; use log::trace; use crate::{ + data::{AllNullDataBlock, DataBlock, DataBlockExt, FixedWidthDataBlock, NullableDataBlock}, decoder::{PageScheduler, PrimitivePageDecoder}, encoder::{ArrayEncoder, BufferEncoder, EncodedArray, EncodedArrayBuffer}, format::pb, @@ -34,16 +34,6 @@ enum DataNullStatus { Some(DataDecoders), } -impl DataNullStatus { - fn values_decoder(&self) -> Option<&dyn PrimitivePageDecoder> { - match self { - Self::All => None, - Self::Some(decoders) => Some(decoders.values.as_ref()), - Self::None(values) => Some(values.as_ref()), - } - } -} - #[derive(Debug)] struct DataSchedulers { validity: Box, @@ -169,46 +159,22 @@ struct BasicPageDecoder { } impl PrimitivePageDecoder for BasicPageDecoder { - fn decode( - &self, - rows_to_skip: u64, - num_rows: u64, - all_null: &mut bool, - ) -> Result> { - let dest_buffers = match &self.mode { + fn decode(&self, rows_to_skip: u64, num_rows: u64) -> Result> { + match &self.mode { DataNullStatus::Some(decoders) => { - let mut buffers = decoders.validity.decode(rows_to_skip, num_rows, all_null)?; // buffer 0 - let mut values_bytesmut = - decoders.values.decode(rows_to_skip, num_rows, all_null)?; // buffer 1 onwards - - buffers.append(&mut values_bytesmut); - buffers + let validity = decoders.validity.decode(rows_to_skip, num_rows)?; + let validity = validity.try_into_layout::()?; + let values = decoders.values.decode(rows_to_skip, num_rows)?; + Ok(Box::new(NullableDataBlock { + data: values, + nulls: validity.data, + })) } - // Either dest_buffers[0] is empty, in which case these are no-ops, or one of the - // other pages needed the buffer, in which case we need to fill our section - DataNullStatus::All => { - let buffers = vec![BytesMut::default()]; - *all_null = true; - buffers - } - DataNullStatus::None(values) => { - let mut dest_buffers = vec![BytesMut::default()]; - - let mut values_bytesmut = values.decode(rows_to_skip, num_rows, all_null)?; - dest_buffers.append(&mut values_bytesmut); - dest_buffers - } - }; - - Ok(dest_buffers) - } - - fn num_buffers(&self) -> u32 { - 1 + self - .mode - .values_decoder() - .map(|val| val.num_buffers()) - .unwrap_or(0) + DataNullStatus::All => Ok(Box::new(AllNullDataBlock { + num_values: num_rows, + })), + DataNullStatus::None(values) => values.decode(rows_to_skip, num_rows), + } } } diff --git a/rust/lance-encoding/src/encodings/physical/binary.rs b/rust/lance-encoding/src/encodings/physical/binary.rs index c39e3b2d39..700573a59c 100644 --- a/rust/lance-encoding/src/encodings/physical/binary.rs +++ b/rust/lance-encoding/src/encodings/physical/binary.rs @@ -8,10 +8,13 @@ use arrow_array::cast::AsArray; use arrow_array::types::UInt64Type; use arrow_array::{Array, ArrayRef}; use arrow_buffer::{BooleanBuffer, BooleanBufferBuilder, ScalarBuffer}; -use bytes::BytesMut; use futures::stream::StreamExt; use futures::{future::BoxFuture, stream::FuturesOrdered, FutureExt}; +use crate::buffer::LanceBuffer; +use crate::data::{ + DataBlock, DataBlockExt, FixedWidthDataBlock, NullableDataBlock, VariableWidthBlock, +}; use crate::{ decoder::{PageScheduler, PrimitivePageDecoder}, encoder::{ArrayEncoder, EncodedArray}, @@ -102,7 +105,7 @@ impl BinaryPageScheduler { impl BinaryPageScheduler { fn decode_indices(decoder: Arc, num_rows: u64) -> Result { let mut primitive_wrapper = - PrimitiveFieldDecoder::new_from_data(decoder, DataType::UInt64, num_rows); + PrimitiveFieldDecoder::new_from_data(decoder, DataType::UInt64, num_rows, false); let drained_task = primitive_wrapper.drain(num_rows)?; let indices_decode_task = drained_task.task; indices_decode_task.decode() @@ -235,12 +238,7 @@ impl PrimitivePageDecoder for BinaryPageDecoder { // We only need [8, 13] to decode in this case. // These need to be normalized in order to build the string later // So return [0, 5] - fn decode( - &self, - rows_to_skip: u64, - num_rows: u64, - all_null: &mut bool, - ) -> Result> { + fn decode(&self, rows_to_skip: u64, num_rows: u64) -> Result> { // Buffers[0] == validity buffer // Buffers[1] == offsets buffer // Buffers[2] == null buffer // TODO: Micro-optimization, can we get rid of this? Doesn't hurt much though @@ -255,7 +253,7 @@ impl PrimitivePageDecoder for BinaryPageDecoder { let validity_buffer = if has_nulls { let num_validity_bits = arrow_buffer::bit_util::ceil(num_rows as usize, 8); - let mut validity_buffer = BytesMut::with_capacity(num_validity_bits); + let mut validity_buffer = Vec::with_capacity(num_validity_bits); if rows_to_skip == 0 { validity_buffer.extend_from_slice(target_validity.inner().as_slice()); @@ -264,9 +262,9 @@ impl PrimitivePageDecoder for BinaryPageDecoder { let target_validity = BooleanBuffer::from_iter(target_validity.iter()); validity_buffer.extend_from_slice(target_validity.inner().as_slice()); } - validity_buffer + Some(validity_buffer) } else { - BytesMut::new() + None }; // STEP 2: offsets buffer @@ -294,8 +292,6 @@ impl PrimitivePageDecoder for BinaryPageDecoder { .into_inner(), _ => panic!("Unsupported offsets type"), }; - // TODO: This forces a second copy, which is unfortunate, try and remove in the future - let offsets_buf = BytesMut::from(offsets_buffer.as_slice()); let bytes_to_skip = self.decoded_indices.value(rows_to_skip as usize); let num_bytes = self @@ -303,22 +299,24 @@ impl PrimitivePageDecoder for BinaryPageDecoder { .value((rows_to_skip + num_rows) as usize) - bytes_to_skip; - let mut output_buffers = vec![validity_buffer, offsets_buf]; - - // Add decoded bytes into output_buffers[2..] - // Currently an empty null buffer is the first one - // The actual bytes are in the second buffer - // Including the indices this results in 4 buffers in total - output_buffers.extend( - self.bytes_decoder - .decode(bytes_to_skip, num_bytes, all_null)?, - ); - - Ok(output_buffers) - } - - fn num_buffers(&self) -> u32 { - self.bytes_decoder.num_buffers() + 2 + let bytes = self.bytes_decoder.decode(bytes_to_skip, num_bytes)?; + let bytes = bytes.try_into_layout::()?; + debug_assert_eq!(bytes.bits_per_value, 8); + + let string_data = Box::new(VariableWidthBlock { + bits_per_offset: bytes_per_offset * 8, + data: bytes.data, + num_values: num_rows, + offsets: LanceBuffer::from(offsets_buffer), + }); + if let Some(validity) = validity_buffer { + Ok(Box::new(NullableDataBlock { + data: string_data, + nulls: LanceBuffer::from(validity), + })) + } else { + Ok(string_data) + } } } diff --git a/rust/lance-encoding/src/encodings/physical/bitmap.rs b/rust/lance-encoding/src/encodings/physical/bitmap.rs index 80ea8b479a..2e5f627534 100644 --- a/rust/lance-encoding/src/encodings/physical/bitmap.rs +++ b/rust/lance-encoding/src/encodings/physical/bitmap.rs @@ -4,13 +4,15 @@ use std::{ops::Range, sync::Arc}; use arrow_buffer::BooleanBufferBuilder; -use bytes::{Bytes, BytesMut}; +use bytes::Bytes; use futures::{future::BoxFuture, FutureExt}; use lance_core::Result; use log::trace; use crate::{ + buffer::LanceBuffer, + data::{DataBlock, FixedWidthDataBlock}, decoder::{PageScheduler, PrimitivePageDecoder}, EncodingsIo, }; @@ -93,17 +95,8 @@ struct BitmapDecoder { } impl PrimitivePageDecoder for BitmapDecoder { - fn decode( - &self, - rows_to_skip: u64, - num_rows: u64, - _all_null: &mut bool, - ) -> Result> { - let num_bytes = arrow_buffer::bit_util::ceil(num_rows as usize, 8); - let mut dest_buffers = vec![BytesMut::with_capacity(num_bytes)]; - + fn decode(&self, rows_to_skip: u64, num_rows: u64) -> Result> { let mut rows_to_skip = rows_to_skip; - let mut dest_builder = BooleanBufferBuilder::new(num_rows as usize); let mut rows_remaining = num_rows; @@ -121,28 +114,11 @@ impl PrimitivePageDecoder for BitmapDecoder { } let bool_buffer = dest_builder.finish().into_inner(); - unsafe { dest_buffers[0].set_len(bool_buffer.len()) } - // TODO: This requires an extra copy. First we copy the data from the read buffer(s) - // into dest_builder (one copy is inevitable). Then we copy the data from dest_builder - // into dest_buffers. This second copy could be avoided (e.g. BooleanBufferBuilder - // has a new_from_buffer but that requires MutableBuffer and we can't easily get there - // from BytesMut [or can we?]) - // - // Worst case, we vendor our own copy of BooleanBufferBuilder based on BytesMut. We could - // also use MutableBuffer ourselves instead of BytesMut but arrow-rs claims MutableBuffer may - // be deprecated in the future (though that discussion seems to have died) - - // TODO: Will this work at the boundaries? If we have to skip 3 bits for example then the first - // bytes of bool_buffer.as_slice will be 000XXXXX and if we copy it on top of YYY00000 then the YYY - // will be clobbered. - // - // It's a moot point at the moment since we don't support page bridging - dest_buffers[0].copy_from_slice(bool_buffer.as_slice()); - Ok(dest_buffers) - } - - fn num_buffers(&self) -> u32 { - 1 + Ok(Box::new(FixedWidthDataBlock { + data: LanceBuffer::from(bool_buffer), + bits_per_value: 1, + num_values: num_rows, + })) } } @@ -184,7 +160,7 @@ mod tests { ], }; - let result = decoder.decode(5, 1, &mut false); + let result = decoder.decode(5, 1); assert!(result.is_ok()); } } diff --git a/rust/lance-encoding/src/encodings/physical/bitpack.rs b/rust/lance-encoding/src/encodings/physical/bitpack.rs index 1acdc0be34..6eb953d179 100644 --- a/rust/lance-encoding/src/encodings/physical/bitpack.rs +++ b/rust/lance-encoding/src/encodings/physical/bitpack.rs @@ -8,7 +8,7 @@ use arrow::datatypes::{ArrowPrimitiveType, UInt16Type, UInt32Type, UInt64Type, U use arrow::util::bit_util::ceil; use arrow_array::{cast::AsArray, Array, ArrayRef, PrimitiveArray}; use arrow_schema::DataType; -use bytes::{Bytes, BytesMut}; +use bytes::Bytes; use futures::future::{BoxFuture, FutureExt}; use log::trace; use num_traits::{AsPrimitive, PrimInt}; @@ -17,6 +17,8 @@ use snafu::{location, Location}; use lance_arrow::DataTypeExt; use lance_core::{Error, Result}; +use crate::buffer::LanceBuffer; +use crate::data::{DataBlock, FixedWidthDataBlock}; use crate::encoder::EncodedBufferMeta; use crate::{ decoder::{PageScheduler, PrimitivePageDecoder}, @@ -310,14 +312,9 @@ struct BitpackedPageDecoder { } impl PrimitivePageDecoder for BitpackedPageDecoder { - fn decode( - &self, - rows_to_skip: u64, - num_rows: u64, - _all_null: &mut bool, - ) -> Result> { + fn decode(&self, rows_to_skip: u64, num_rows: u64) -> Result> { let num_bytes = self.uncompressed_bits_per_value / 8 * num_rows; - let mut dest_buffers = vec![BytesMut::with_capacity(num_bytes as usize)]; + let mut dest = vec![0; num_bytes as usize]; // current maximum supported bits per value = 64 debug_assert!(self.bits_per_value <= 64); @@ -325,8 +322,7 @@ impl PrimitivePageDecoder for BitpackedPageDecoder { let mut rows_to_skip = rows_to_skip; let mut rows_taken = 0; let byte_len = self.uncompressed_bits_per_value / 8; - let dst = &mut dest_buffers[0]; - let mut dst_idx = dst.len(); // index for current byte being written to destination buffer + let mut dst_idx = 0; // index for current byte being written to destination buffer // create bit mask for source bits let mask = u64::MAX >> (64 - self.bits_per_value); @@ -364,11 +360,8 @@ impl PrimitivePageDecoder for BitpackedPageDecoder { let mut dst_offset = 0; while src_bits_written < self.bits_per_value { - // add extra byte to buffer to hold next location - dst.extend([0].repeat(dst_idx + 1 - dst.len())); - // write bits from current source byte into destination - dst[dst_idx] += (curr_src >> src_offset) << dst_offset; + dest[dst_idx] += (curr_src >> src_offset) << dst_offset; let bits_written = (self.bits_per_value - src_bits_written) .min(8 - src_offset) .min(8 - dst_offset); @@ -427,14 +420,11 @@ impl PrimitivePageDecoder for BitpackedPageDecoder { } } - // add pad any extra needed 0s onto end of buffer - dst.extend([0].repeat(dst_idx + 1 - dst.len())); - - Ok(dest_buffers) - } - - fn num_buffers(&self) -> u32 { - 1 + Ok(Box::new(FixedWidthDataBlock { + data: LanceBuffer::from(dest), + bits_per_value: self.uncompressed_bits_per_value, + num_values: num_rows, + })) } } diff --git a/rust/lance-encoding/src/encodings/physical/dictionary.rs b/rust/lance-encoding/src/encodings/physical/dictionary.rs index 296ca745b6..520afaccbb 100644 --- a/rust/lance-encoding/src/encodings/physical/dictionary.rs +++ b/rust/lance-encoding/src/encodings/physical/dictionary.rs @@ -5,9 +5,11 @@ use std::sync::Arc; use arrow_array::builder::{ArrayBuilder, StringBuilder}; use arrow_array::types::UInt8Type; -use arrow_array::{Array, ArrayRef, DictionaryArray, StringArray, UInt8Array}; +use arrow_array::{make_array, Array, ArrayRef, DictionaryArray, StringArray, UInt8Array}; use futures::{future::BoxFuture, FutureExt}; +use crate::buffer::LanceBuffer; +use crate::data::{DataBlock, NullableDataBlock, VariableWidthBlock}; use crate::{ decoder::{PageScheduler, PrimitivePageDecoder}, encoder::{ArrayEncoder, EncodedArray}, @@ -19,11 +21,9 @@ use crate::decoder::LogicalPageDecoder; use crate::encodings::logical::primitive::PrimitiveFieldDecoder; use arrow_schema::DataType; -use bytes::BytesMut; use lance_core::Result; use std::collections::HashMap; -use crate::encodings::utils::new_primitive_array; use arrow_array::cast::AsArray; #[derive(Debug)] @@ -80,9 +80,10 @@ impl PageScheduler for DictionaryPageScheduler { let items_decoder: Arc = Arc::from(items_page_decoder.await?); let mut primitive_wrapper = PrimitiveFieldDecoder::new_from_data( - items_decoder.clone(), + items_decoder, DataType::Utf8, copy_size, + false, ); // Decode all items @@ -95,7 +96,6 @@ impl PageScheduler for DictionaryPageScheduler { Ok(Box::new(DictionaryPageDecoder { decoded_dict, indices_decoder, - items_decoder, }) as Box) }) .map(|join_handle| join_handle.unwrap()) @@ -106,25 +106,16 @@ impl PageScheduler for DictionaryPageScheduler { struct DictionaryPageDecoder { decoded_dict: Arc, indices_decoder: Box, - items_decoder: Arc, } impl PrimitivePageDecoder for DictionaryPageDecoder { - fn decode( - &self, - rows_to_skip: u64, - num_rows: u64, - all_null: &mut bool, - ) -> Result> { + fn decode(&self, rows_to_skip: u64, num_rows: u64) -> Result> { // Decode the indices - let indices_buffers = self - .indices_decoder - .decode(rows_to_skip, num_rows, all_null)?; + let indices_data = self.indices_decoder.decode(rows_to_skip, num_rows)?; - let indices_array = - new_primitive_array::(indices_buffers.clone(), num_rows, &DataType::UInt8); + let indices_array = make_array(indices_data.into_arrow(DataType::UInt8, false)?); + let indices_array = indices_array.as_primitive::(); - let indices_array = indices_array.as_primitive::().clone(); let dictionary = self.decoded_dict.clone(); let adjusted_indices: UInt8Array = indices_array @@ -142,31 +133,24 @@ impl PrimitivePageDecoder for DictionaryPageDecoder { let string_array = arrow_cast::cast(&dict_array, &DataType::Utf8).unwrap(); let string_array = string_array.as_any().downcast_ref::().unwrap(); - // This workflow is not ideal, since we go from DictionaryArray -> StringArray -> nulls, offsets, and bytes buffers (BytesMut) - // and later in primitive_array_from_buffers() we will go from nulls, offsets, and bytes buffers -> StringArray again. - // Creating the BytesMut is an unnecessary copy. But it is the best we can do in the current structure - let null_buffer = string_array - .nulls() - .map(|n| BytesMut::from(n.buffer().as_slice())) - .unwrap_or_else(BytesMut::new); - - let offsets_buffer = BytesMut::from(string_array.offsets().inner().inner().as_slice()); - - // Empty buffer for nulls of bytes - let empty_buffer = BytesMut::new(); - - let bytes_buffer = BytesMut::from_iter(string_array.values().iter().copied()); - - Ok(vec![ - null_buffer, - offsets_buffer, - empty_buffer, - bytes_buffer, - ]) - } - - fn num_buffers(&self) -> u32 { - self.items_decoder.num_buffers() + 2 + let null_buffer = string_array.nulls().map(|n| n.buffer().clone()); + let offsets_buffer = string_array.offsets().inner().inner().clone(); + let bytes_buffer = string_array.values().clone(); + + let string_data = Box::new(VariableWidthBlock { + bits_per_offset: 32, + data: LanceBuffer::from(bytes_buffer), + offsets: LanceBuffer::from(offsets_buffer), + num_values: num_rows, + }); + if let Some(nulls) = null_buffer { + Ok(Box::new(NullableDataBlock { + data: string_data, + nulls: LanceBuffer::from(nulls), + })) + } else { + Ok(string_data) + } } } diff --git a/rust/lance-encoding/src/encodings/physical/fixed_size_list.rs b/rust/lance-encoding/src/encodings/physical/fixed_size_list.rs index f1c2a36717..a1b8ef4ecb 100644 --- a/rust/lance-encoding/src/encodings/physical/fixed_size_list.rs +++ b/rust/lance-encoding/src/encodings/physical/fixed_size_list.rs @@ -4,12 +4,12 @@ use std::sync::Arc; use arrow_array::{cast::AsArray, ArrayRef}; -use bytes::BytesMut; use futures::{future::BoxFuture, FutureExt}; use lance_core::Result; use log::trace; use crate::{ + data::{DataBlock, DataBlockExt, FixedWidthDataBlock}, decoder::{PageScheduler, PrimitivePageDecoder}, encoder::{ArrayEncoder, EncodedArray}, format::pb, @@ -74,19 +74,14 @@ pub struct FixedListDecoder { } impl PrimitivePageDecoder for FixedListDecoder { - fn decode( - &self, - rows_to_skip: u64, - num_rows: u64, - all_null: &mut bool, - ) -> Result> { + fn decode(&self, rows_to_skip: u64, num_rows: u64) -> Result> { let rows_to_skip = rows_to_skip * self.dimension; - let num_rows = num_rows * self.dimension; - self.items_decoder.decode(rows_to_skip, num_rows, all_null) - } - - fn num_buffers(&self) -> u32 { - self.items_decoder.num_buffers() + let num_child_rows = num_rows * self.dimension; + let child_data = self.items_decoder.decode(rows_to_skip, num_child_rows)?; + let mut child_data = child_data.try_into_layout::()?; + child_data.num_values = num_rows; + child_data.bits_per_value *= self.dimension; + Ok(child_data) } } diff --git a/rust/lance-encoding/src/encodings/physical/fsst.rs b/rust/lance-encoding/src/encodings/physical/fsst.rs index edd89aacc8..8955d397ea 100644 --- a/rust/lance-encoding/src/encodings/physical/fsst.rs +++ b/rust/lance-encoding/src/encodings/physical/fsst.rs @@ -7,12 +7,13 @@ use arrow_array::{cast::AsArray, Array, BinaryArray}; use arrow_buffer::{Buffer, OffsetBuffer, ScalarBuffer}; use arrow_schema::DataType; use arrow_select::concat::concat; -use bytes::BytesMut; use futures::{future::BoxFuture, FutureExt}; use lance_core::Result; use crate::{ + buffer::LanceBuffer, + data::{DataBlock, DataBlockExt, NullableDataBlock, VariableWidthBlock}, decoder::{PageScheduler, PrimitivePageDecoder}, encoder::{ArrayEncoder, EncodedArray}, format::pb, @@ -63,30 +64,22 @@ struct FsstPageDecoder { } impl PrimitivePageDecoder for FsstPageDecoder { - fn decode( - &self, - rows_to_skip: u64, - num_rows: u64, - all_null: &mut bool, - ) -> Result> { - let buffers = self - .inner_decoder - .decode(rows_to_skip, num_rows, all_null)?; - - let mut buffers_iter = buffers.into_iter(); - - // Buffer order expected from inner binary decoder - let validity = buffers_iter.next().unwrap(); - let offsets = buffers_iter.next().unwrap(); - let dummy = buffers_iter.next().unwrap(); - let bytes = buffers_iter.next().unwrap(); - - // Reinterpret offsets as i32 - let offsets = ScalarBuffer::::new( - Buffer::from_bytes(offsets.freeze().into()), - 0, - num_rows as usize + 1, - ); + fn decode(&self, rows_to_skip: u64, num_rows: u64) -> Result> { + let compressed_data = self.inner_decoder.decode(rows_to_skip, num_rows)?; + let compressed_data = compressed_data.as_any_box(); + let (string_data, nulls) = match compressed_data.downcast::() { + Ok(nullable) => { + let data = nullable.data.try_into_layout::()?; + Result::Ok((data, Some(nullable.nulls))) + } + Err(data) => { + let data = data.downcast::().unwrap(); + Ok((data, None)) + } + }?; + + let offsets = ScalarBuffer::::from(string_data.offsets.into_buffer()); + let bytes = string_data.data.into_buffer(); let mut decompressed_offsets = vec![0_i32; offsets.len()]; let mut decompressed_bytes = vec![0_u8; bytes.len() * 8]; @@ -105,23 +98,28 @@ impl PrimitivePageDecoder for FsstPageDecoder { // TODO: Change PrimitivePageDecoder to use Vec instead of BytesMut // since there is no way to get BytesMut from Vec but these copies should be avoidable // This is not the first time this has happened - let mut offsets_as_bytes_mut = BytesMut::with_capacity(decompressed_offsets.len()); + let mut offsets_as_bytes_mut = Vec::with_capacity(decompressed_offsets.len()); let decompressed_offsets = ScalarBuffer::::from(decompressed_offsets); offsets_as_bytes_mut.extend_from_slice(decompressed_offsets.inner().as_slice()); - let mut bytes_as_bytes_mut = BytesMut::with_capacity(decompressed_bytes.len()); + let mut bytes_as_bytes_mut = Vec::with_capacity(decompressed_bytes.len()); bytes_as_bytes_mut.extend_from_slice(&decompressed_bytes); - Ok(vec![ - validity, - offsets_as_bytes_mut, - dummy, - bytes_as_bytes_mut, - ]) - } - - fn num_buffers(&self) -> u32 { - self.inner_decoder.num_buffers() + let new_string_data = Box::new(VariableWidthBlock { + bits_per_offset: 32, + data: LanceBuffer::from(bytes_as_bytes_mut), + num_values: num_rows, + offsets: LanceBuffer::from(offsets_as_bytes_mut), + }); + + if let Some(nulls) = nulls { + Ok(Box::new(NullableDataBlock { + data: new_string_data, + nulls, + })) + } else { + Ok(new_string_data) + } } } diff --git a/rust/lance-encoding/src/encodings/physical/packed_struct.rs b/rust/lance-encoding/src/encodings/physical/packed_struct.rs index 49e1f7bcfb..b9ef3423b1 100644 --- a/rust/lance-encoding/src/encodings/physical/packed_struct.rs +++ b/rust/lance-encoding/src/encodings/physical/packed_struct.rs @@ -8,6 +8,8 @@ use futures::{future::BoxFuture, FutureExt}; use lance_arrow::DataTypeExt; use crate::{ + buffer::LanceBuffer, + data::{DataBlock, FixedWidthDataBlock, StructDataBlock}, decoder::{PageScheduler, PrimitivePageDecoder}, encoder::{ArrayEncoder, EncodedArray, EncodedArrayBuffer}, format::pb::{self}, @@ -106,12 +108,7 @@ struct PackedStructPageDecoder { } impl PrimitivePageDecoder for PackedStructPageDecoder { - fn decode( - &self, - rows_to_skip: u64, - num_rows: u64, - _all_null: &mut bool, - ) -> Result> { + fn decode(&self, rows_to_skip: u64, num_rows: u64) -> Result> { // Decoding workflow: // rows 0-2: {x: [1, 2, 3], y: [4, 5, 6], z: [7, 8, 9]} // rows 3-5: {x: [10, 11, 12], y: [13, 14, 15], z: [16, 17, 18]} @@ -128,13 +125,13 @@ impl PrimitivePageDecoder for PackedStructPageDecoder { let bytes_to_skip = (rows_to_skip as usize) * self.total_bytes_per_row; - let mut struct_bytes = Vec::new(); + let mut children = Vec::with_capacity(self.fields.len()); let mut start_index = 0; for field in &self.fields { let bytes_per_field = field.data_type().byte_width(); - let mut field_bytes = BytesMut::default(); + let mut field_bytes = Vec::with_capacity(bytes_per_field * num_rows as usize); let mut byte_index = start_index; @@ -145,14 +142,13 @@ impl PrimitivePageDecoder for PackedStructPageDecoder { } start_index += bytes_per_field; - struct_bytes.push(field_bytes); + children.push(Box::new(FixedWidthDataBlock { + data: LanceBuffer::from(field_bytes), + bits_per_value: bytes_per_field as u64 * 8, + num_values: num_rows, + }) as Box); } - - Ok(struct_bytes) - } - - fn num_buffers(&self) -> u32 { - self.fields.len() as u32 + Ok(Box::new(StructDataBlock { children })) } } diff --git a/rust/lance-encoding/src/encodings/physical/value.rs b/rust/lance-encoding/src/encodings/physical/value.rs index 1ac7d051c0..e416562e94 100644 --- a/rust/lance-encoding/src/encodings/physical/value.rs +++ b/rust/lance-encoding/src/encodings/physical/value.rs @@ -2,7 +2,7 @@ // SPDX-FileCopyrightText: Copyright The Lance Authors use arrow_array::ArrayRef; -use bytes::{Bytes, BytesMut}; +use bytes::Bytes; use futures::{future::BoxFuture, FutureExt}; use log::trace; use snafu::{location, Location}; @@ -10,6 +10,8 @@ use std::fmt; use std::ops::Range; use std::sync::{Arc, Mutex}; +use crate::buffer::LanceBuffer; +use crate::data::{DataBlock, FixedWidthDataBlock}; use crate::encoder::BufferEncodingStrategy; use crate::{ decoder::{PageScheduler, PrimitivePageDecoder}, @@ -179,58 +181,60 @@ impl ValuePageDecoder { !self.uncompressed_range_offsets.is_empty() } - fn decode_buffer( - &self, - buf: &Bytes, - bytes_to_skip: &mut u64, - bytes_to_take: &mut u64, - dest: &mut bytes::BytesMut, - ) { - let buf_len = buf.len() as u64; - if *bytes_to_skip > buf_len { - *bytes_to_skip -= buf_len; - } else { - let bytes_to_take_here = (buf_len - *bytes_to_skip).min(*bytes_to_take); - *bytes_to_take -= bytes_to_take_here; - let start = *bytes_to_skip as usize; - let end = start + bytes_to_take_here as usize; - dest.extend_from_slice(&buf.slice(start..end)); - *bytes_to_skip = 0; + fn decode_buffers<'a>( + &'a self, + buffers: impl IntoIterator, + mut bytes_to_skip: u64, + mut bytes_to_take: u64, + ) -> LanceBuffer { + let mut dest: Option> = None; + + for buf in buffers.into_iter() { + let buf_len = buf.len() as u64; + if bytes_to_skip > buf_len { + bytes_to_skip -= buf_len; + } else { + let bytes_to_take_here = (buf_len - bytes_to_skip).min(bytes_to_take); + bytes_to_take -= bytes_to_take_here; + let start = bytes_to_skip as usize; + let end = start + bytes_to_take_here as usize; + let slice = buf.slice(start..end); + match (&mut dest, bytes_to_take) { + (None, 0) => { + // The entire request is contained in one buffer so we can maybe zero-copy + // if the slice is aligned properly + return LanceBuffer::from_bytes(slice, self.bytes_per_value); + } + (None, _) => { + dest.replace(Vec::with_capacity(bytes_to_take as usize)); + } + _ => {} + } + dest.as_mut().unwrap().extend_from_slice(&slice); + bytes_to_skip = 0; + } } + LanceBuffer::from(dest.unwrap_or_default()) } } impl PrimitivePageDecoder for ValuePageDecoder { - fn decode( - &self, - rows_to_skip: u64, - num_rows: u64, - _all_null: &mut bool, - ) -> Result> { - let mut bytes_to_skip = rows_to_skip * self.bytes_per_value; - let mut bytes_to_take = num_rows * self.bytes_per_value; - - let mut dest_buffers = vec![BytesMut::with_capacity(bytes_to_take as usize)]; - - let dest = &mut dest_buffers[0]; + fn decode(&self, rows_to_skip: u64, num_rows: u64) -> Result> { + let bytes_to_skip = rows_to_skip * self.bytes_per_value; + let bytes_to_take = num_rows * self.bytes_per_value; - debug_assert!(dest.capacity() as u64 >= bytes_to_take); - - if self.is_compressed() { + let data_buffer = if self.is_compressed() { let decoding_data = self.get_uncompressed_bytes()?; - for buf in decoding_data.lock().unwrap().as_ref().unwrap() { - self.decode_buffer(buf, &mut bytes_to_skip, &mut bytes_to_take, dest); - } + let buffers = decoding_data.lock().unwrap(); + self.decode_buffers(buffers.as_ref().unwrap(), bytes_to_skip, bytes_to_take) } else { - for buf in &self.data { - self.decode_buffer(buf, &mut bytes_to_skip, &mut bytes_to_take, dest); - } - } - Ok(dest_buffers) - } - - fn num_buffers(&self) -> u32 { - 1 + self.decode_buffers(&self.data, bytes_to_skip, bytes_to_take) + }; + Ok(Box::new(FixedWidthDataBlock { + bits_per_value: self.bytes_per_value * 8, + data: data_buffer, + num_values: num_rows, + })) } } diff --git a/rust/lance-encoding/src/encodings/utils.rs b/rust/lance-encoding/src/encodings/utils.rs deleted file mode 100644 index 8118bc88b4..0000000000 --- a/rust/lance-encoding/src/encodings/utils.rs +++ /dev/null @@ -1,308 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 -// SPDX-FileCopyrightText: Copyright The Lance Authors - -use std::sync::Arc; - -use arrow_array::{ - new_null_array, - types::{ - ArrowPrimitiveType, ByteArrayType, Date32Type, Date64Type, Decimal128Type, Decimal256Type, - DurationMicrosecondType, DurationMillisecondType, DurationNanosecondType, - DurationSecondType, Float16Type, Float32Type, Float64Type, GenericBinaryType, - GenericStringType, Int16Type, Int32Type, Int64Type, Int8Type, IntervalDayTimeType, - IntervalMonthDayNanoType, IntervalYearMonthType, Time32MillisecondType, Time32SecondType, - Time64MicrosecondType, Time64NanosecondType, TimestampMicrosecondType, - TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType, UInt16Type, - UInt32Type, UInt64Type, UInt8Type, - }, - ArrayRef, BooleanArray, FixedSizeBinaryArray, FixedSizeListArray, GenericByteArray, - PrimitiveArray, StructArray, -}; -use arrow_buffer::{BooleanBuffer, Buffer, NullBuffer, OffsetBuffer, ScalarBuffer}; -use arrow_schema::{DataType, IntervalUnit, TimeUnit}; -use bytes::BytesMut; -use snafu::{location, Location}; - -use lance_core::{Error, Result}; - -pub fn new_primitive_array( - buffers: Vec, - num_rows: u64, - data_type: &DataType, -) -> ArrayRef { - let mut buffer_iter = buffers.into_iter(); - let null_buffer = buffer_iter.next().unwrap(); - let null_buffer = if null_buffer.is_empty() { - None - } else { - let null_buffer = null_buffer.freeze().into(); - Some(NullBuffer::new(BooleanBuffer::new( - Buffer::from_bytes(null_buffer), - 0, - num_rows as usize, - ))) - }; - - let data_buffer = buffer_iter.next().unwrap().freeze(); - let data_buffer = Buffer::from_bytes(data_buffer.into()); - let data_buffer = ScalarBuffer::::new(data_buffer, 0, num_rows as usize); - - // The with_data_type is needed here to recover the parameters for types like Decimal/Timestamp - Arc::new(PrimitiveArray::::new(data_buffer, null_buffer).with_data_type(data_type.clone())) -} - -pub fn new_generic_byte_array(buffers: Vec, num_rows: u64) -> ArrayRef { - // iterate over buffers to get offsets and then bytes - let mut buffer_iter = buffers.into_iter(); - - let null_buffer = buffer_iter.next().unwrap(); - let null_buffer = if null_buffer.is_empty() { - None - } else { - let null_buffer = null_buffer.freeze().into(); - Some(NullBuffer::new(BooleanBuffer::new( - Buffer::from_bytes(null_buffer), - 0, - num_rows as usize, - ))) - }; - - let indices_bytes = buffer_iter.next().unwrap().freeze(); - let indices_buffer = Buffer::from_bytes(indices_bytes.into()); - let indices_buffer = ScalarBuffer::::new(indices_buffer, 0, num_rows as usize + 1); - - let offsets = OffsetBuffer::new(indices_buffer.clone()); - - // Decoding the bytes creates 2 buffers, the first one is empty since - // validity is stored in an earlier buffer - buffer_iter.next().unwrap(); - - let bytes_buffer = buffer_iter.next().unwrap().freeze(); - let bytes_buffer = Buffer::from_bytes(bytes_buffer.into()); - let bytes_buffer_len = bytes_buffer.len(); - let bytes_buffer = ScalarBuffer::::new(bytes_buffer, 0, bytes_buffer_len); - - let bytes_array = Arc::new( - PrimitiveArray::::new(bytes_buffer, None).with_data_type(DataType::UInt8), - ); - - Arc::new(GenericByteArray::::new( - offsets, - bytes_array.values().into(), - null_buffer, - )) -} - -pub fn bytes_to_validity(bytes: BytesMut, num_rows: u64) -> Option { - if bytes.is_empty() { - None - } else { - let null_buffer = bytes.freeze().into(); - Some(NullBuffer::new(BooleanBuffer::new( - Buffer::from_bytes(null_buffer), - 0, - num_rows as usize, - ))) - } -} - -pub fn primitive_array_from_buffers( - data_type: &DataType, - buffers: Vec, - num_rows: u64, -) -> Result { - match data_type { - DataType::Boolean => { - let mut buffer_iter = buffers.into_iter(); - let null_buffer = buffer_iter.next().unwrap(); - let null_buffer = bytes_to_validity(null_buffer, num_rows); - - let data_buffer = buffer_iter.next().unwrap().freeze(); - let data_buffer = Buffer::from(data_buffer); - let data_buffer = BooleanBuffer::new(data_buffer, 0, num_rows as usize); - - Ok(Arc::new(BooleanArray::new(data_buffer, null_buffer))) - } - DataType::Date32 => Ok(new_primitive_array::( - buffers, num_rows, data_type, - )), - DataType::Date64 => Ok(new_primitive_array::( - buffers, num_rows, data_type, - )), - DataType::Decimal128(_, _) => Ok(new_primitive_array::( - buffers, num_rows, data_type, - )), - DataType::Decimal256(_, _) => Ok(new_primitive_array::( - buffers, num_rows, data_type, - )), - DataType::Duration(units) => Ok(match units { - TimeUnit::Second => { - new_primitive_array::(buffers, num_rows, data_type) - } - TimeUnit::Microsecond => { - new_primitive_array::(buffers, num_rows, data_type) - } - TimeUnit::Millisecond => { - new_primitive_array::(buffers, num_rows, data_type) - } - TimeUnit::Nanosecond => { - new_primitive_array::(buffers, num_rows, data_type) - } - }), - DataType::Float16 => Ok(new_primitive_array::( - buffers, num_rows, data_type, - )), - DataType::Float32 => Ok(new_primitive_array::( - buffers, num_rows, data_type, - )), - DataType::Float64 => Ok(new_primitive_array::( - buffers, num_rows, data_type, - )), - DataType::Int16 => Ok(new_primitive_array::( - buffers, num_rows, data_type, - )), - DataType::Int32 => Ok(new_primitive_array::( - buffers, num_rows, data_type, - )), - DataType::Int64 => Ok(new_primitive_array::( - buffers, num_rows, data_type, - )), - DataType::Int8 => Ok(new_primitive_array::( - buffers, num_rows, data_type, - )), - DataType::Interval(unit) => Ok(match unit { - IntervalUnit::DayTime => { - new_primitive_array::(buffers, num_rows, data_type) - } - IntervalUnit::MonthDayNano => { - new_primitive_array::(buffers, num_rows, data_type) - } - IntervalUnit::YearMonth => { - new_primitive_array::(buffers, num_rows, data_type) - } - }), - DataType::Null => Ok(new_null_array(data_type, num_rows as usize)), - DataType::Time32(unit) => match unit { - TimeUnit::Millisecond => Ok(new_primitive_array::( - buffers, num_rows, data_type, - )), - TimeUnit::Second => Ok(new_primitive_array::( - buffers, num_rows, data_type, - )), - _ => Err(Error::io( - format!("invalid time unit {:?} for 32-bit time type", unit), - location!(), - )), - }, - DataType::Time64(unit) => match unit { - TimeUnit::Microsecond => Ok(new_primitive_array::( - buffers, num_rows, data_type, - )), - TimeUnit::Nanosecond => Ok(new_primitive_array::( - buffers, num_rows, data_type, - )), - _ => Err(Error::io( - format!("invalid time unit {:?} for 64-bit time type", unit), - location!(), - )), - }, - DataType::Timestamp(unit, _) => Ok(match unit { - TimeUnit::Microsecond => { - new_primitive_array::(buffers, num_rows, data_type) - } - TimeUnit::Millisecond => { - new_primitive_array::(buffers, num_rows, data_type) - } - TimeUnit::Nanosecond => { - new_primitive_array::(buffers, num_rows, data_type) - } - TimeUnit::Second => { - new_primitive_array::(buffers, num_rows, data_type) - } - }), - DataType::UInt16 => Ok(new_primitive_array::( - buffers, num_rows, data_type, - )), - DataType::UInt32 => Ok(new_primitive_array::( - buffers, num_rows, data_type, - )), - DataType::UInt64 => Ok(new_primitive_array::( - buffers, num_rows, data_type, - )), - DataType::UInt8 => Ok(new_primitive_array::( - buffers, num_rows, data_type, - )), - DataType::FixedSizeBinary(dimension) => { - let mut buffers_iter = buffers.into_iter(); - let fsb_validity = buffers_iter.next().unwrap(); - let fsb_nulls = bytes_to_validity(fsb_validity, num_rows); - - let fsb_values = buffers_iter.next().unwrap(); - let fsb_values = Buffer::from_bytes(fsb_values.freeze().into()); - Ok(Arc::new(FixedSizeBinaryArray::new( - *dimension, fsb_values, fsb_nulls, - ))) - } - DataType::FixedSizeList(items, dimension) => { - let mut buffers_iter = buffers.into_iter(); - let fsl_validity = buffers_iter.next().unwrap(); - let fsl_nulls = bytes_to_validity(fsl_validity, num_rows); - - let remaining_buffers = buffers_iter.collect::>(); - let items_array = primitive_array_from_buffers( - items.data_type(), - remaining_buffers, - num_rows * (*dimension as u64), - )?; - Ok(Arc::new(FixedSizeListArray::new( - items.clone(), - *dimension, - items_array, - fsl_nulls, - ))) - } - DataType::Utf8 => Ok(new_generic_byte_array::>( - buffers, num_rows, - )), - DataType::LargeUtf8 => Ok(new_generic_byte_array::>( - buffers, num_rows, - )), - DataType::Binary => Ok(new_generic_byte_array::>( - buffers, num_rows, - )), - DataType::LargeBinary => Ok(new_generic_byte_array::>( - buffers, num_rows, - )), - DataType::Struct(fields) => { - let mut field_arrays = Vec::new(); - - for (field_index, field) in fields.iter().enumerate() { - let null_bytes = BytesMut::default(); - let mut final_buffers = vec![null_bytes]; - - // Pushes a null buffer for inner field of the FSL - // Right now this works only if inner fields of the FSL are nullable - if matches!(field.data_type(), DataType::FixedSizeList(_, _)) { - final_buffers.push(BytesMut::default()); - } - - final_buffers.push(buffers[field_index].clone()); - - let field_array = - primitive_array_from_buffers(field.data_type(), final_buffers, num_rows)?; - - field_arrays.push(field_array); - } - - let struct_array = StructArray::try_new(fields.clone(), field_arrays, None).unwrap(); - Ok(Arc::new(struct_array)) - } - _ => Err(Error::io( - format!( - "The data type {} cannot be decoded from a primitive encoding", - data_type - ), - location!(), - )), - } -} diff --git a/rust/lance-encoding/src/lib.rs b/rust/lance-encoding/src/lib.rs index a102c1bb32..73f6d3f764 100644 --- a/rust/lance-encoding/src/lib.rs +++ b/rust/lance-encoding/src/lib.rs @@ -8,6 +8,8 @@ use futures::{future::BoxFuture, FutureExt, TryFutureExt}; use lance_core::Result; +pub mod buffer; +pub mod data; pub mod decoder; pub mod encoder; pub mod encodings; diff --git a/rust/lance-encoding/src/testing.rs b/rust/lance-encoding/src/testing.rs index 9882e8bbfa..85927468f0 100644 --- a/rust/lance-encoding/src/testing.rs +++ b/rust/lance-encoding/src/testing.rs @@ -16,8 +16,8 @@ use lance_datagen::{array, gen, ArrayGenerator, RowCount, Seed}; use crate::{ decoder::{ - BatchDecodeStream, ColumnInfo, DecodeBatchScheduler, DecoderMessage, - DecoderMiddlewareChain, FilterExpression, PageInfo, + BatchDecodeStream, ColumnInfo, CoreFieldDecoderStrategy, DecodeBatchScheduler, + DecoderMessage, DecoderMiddlewareChain, FilterExpression, PageInfo, }, encoder::{ ColumnIndexSequence, CoreFieldEncodingStrategy, EncodedBuffer, EncodedPage, FieldEncoder, @@ -74,13 +74,17 @@ async fn test_decode( ) -> (SimpleStructDecoder, BoxFuture<'static, ()>), ) { let lance_schema = lance_core::datatypes::Schema::try_from(schema).unwrap(); + let decode_and_validate = + DecoderMiddlewareChain::new().add_strategy(Arc::new(CoreFieldDecoderStrategy { + validate_data: true, + })); let decode_scheduler = DecodeBatchScheduler::try_new( &lance_schema, column_indices, column_infos, &Vec::new(), num_rows, - &DecoderMiddlewareChain::default(), + &decode_and_validate, io, ) .unwrap();