From 3f8f124478b9da98ba0c4c3a3d9e65853896975a Mon Sep 17 00:00:00 2001 From: jiacai2050 Date: Wed, 24 Aug 2022 22:11:20 +0800 Subject: [PATCH] support variable length decode --- analytic_engine/src/sst/parquet/encoding.rs | 150 ++++++++++++++++---- 1 file changed, 121 insertions(+), 29 deletions(-) diff --git a/analytic_engine/src/sst/parquet/encoding.rs b/analytic_engine/src/sst/parquet/encoding.rs index 9f9127f6fb..e1b181915a 100644 --- a/analytic_engine/src/sst/parquet/encoding.rs +++ b/analytic_engine/src/sst/parquet/encoding.rs @@ -8,7 +8,7 @@ use std::{ use arrow_deps::{ arrow::{ - array::{Array, ArrayData}, + array::{Array, ArrayData, ArrayRef}, buffer::{Buffer, MutableBuffer}, record_batch::RecordBatch as ArrowRecordBatch, }, @@ -34,6 +34,8 @@ use crate::sst::{ parquet::hybrid::{self, IndexedType}, }; +const OFFSET_SIZE: usize = std::mem::size_of::(); + #[derive(Debug, Snafu)] pub enum Error { #[snafu(display( @@ -126,6 +128,22 @@ pub enum Error { backtrace: Backtrace, }, + #[snafu(display( + "Failed to decode hybrid record batch, err:{}.\nBacktrace:\n{}", + source, + backtrace + ))] + DecodeRecordBatch { + source: Box, + backtrace: Backtrace, + }, + + #[snafu(display( + "At least one ListArray(such as Timestamp) is required to decode hybrid record batch.\nBacktrace:\n{}", + backtrace + ))] + ListArrayRequired { backtrace: Backtrace }, + #[snafu(display("Tsid is required for hybrid format.\nBacktrace:\n{}", backtrace))] TsidRequired { backtrace: Backtrace }, @@ -491,63 +509,137 @@ impl ParquetDecoder { )) } + /// Stretch hybrid collpased column into columnar column. + /// `value_offsets` specify offsets each value occupied, which means that + /// the number of a `value[n]` is `value_offsets[n] - value_offsets[n-1]`. + /// Ex: + /// + /// `array_ref` is `a b c`, `value_offsets` is `[0, 3, 5, 6]`, then + /// output array is `a a a b b c` + fn stretch_variable_length_column( + array_ref: &ArrayRef, + value_offsets: &[i32], + values_num: usize, + ) -> Result { + let offset_slices = array_ref.data().buffers()[0].as_slice(); + let value_slices = array_ref.data().buffers()[1].as_slice(); + debug!( + "raw buffer slice, offset:{:#02x?}, value:{:#02x?}", + offset_slices, value_slices + ); + + let mut offsets = Vec::with_capacity(offset_slices.len() / OFFSET_SIZE); + for i in (0..offset_slices.len()).step_by(OFFSET_SIZE) { + let offset = i32::from_le_bytes(offset_slices[i..i + OFFSET_SIZE].try_into().unwrap()); + offsets.push(offset); + } + + let mut value_bytes = 0; + for (idx, (current, prev)) in offsets[1..].iter().zip(&offsets).enumerate() { + let value_len = current - prev; + let value_num = value_offsets[idx + 1] - value_offsets[idx]; + value_bytes += value_len * value_num; + } + let mut new_offsets = MutableBuffer::new(OFFSET_SIZE * values_num); + let mut new_values = MutableBuffer::new(value_bytes as usize); + let mut length_so_far: i32 = 0; + new_offsets.push(length_so_far); + + for (idx, (current, prev)) in offsets[1..].iter().zip(&offsets).enumerate() { + let value_len = current - prev; + let value_num = value_offsets[idx + 1] - value_offsets[idx]; + new_values + .extend(value_slices[*prev as usize..*current as usize].repeat(value_num as usize)); + for _ in 0..value_num { + length_so_far += value_len; + new_offsets.push(length_so_far); + } + } + let array_data = ArrayData::builder(array_ref.data_type().clone()) + .len(values_num) + .add_buffer(new_offsets.into()) + .add_buffer(new_values.into()) + .build() + .map_err(|e| Box::new(e) as _) + .context(DecodeRecordBatch)?; + + Ok(array_data.into()) + } + + /// Like `stretch_variable_length_column`, but value is fixed-size + fn stretch_fixed_length_column( + array_ref: &ArrayRef, + value_size: usize, + value_offsets: &[i32], + values_num: usize, + ) -> Result { + let mut new_buffer = MutableBuffer::new(value_size * values_num); + let raw_buffer = array_ref.data().buffers()[0].as_slice(); + for (idx, offset) in (0..raw_buffer.len()).step_by(value_size).enumerate() { + let value_num = value_offsets[idx + 1] - value_offsets[idx]; + new_buffer.extend(raw_buffer[offset..offset + value_size].repeat(value_num as usize)) + } + let array_data = ArrayData::builder(array_ref.data_type().clone()) + .add_buffer(new_buffer.into()) + .len(values_num) + .build() + .map_err(|e| Box::new(e) as _) + .context(DecodeRecordBatch)?; + + Ok(array_data.into()) + } + /// Decode records from hybrid to columnar format pub fn decode(self) -> Result { let new_arrow_schema = Self::convert_schema(self.record_batch.schema()); let columns = self.record_batch.columns(); - let mut offsets = None; - let mut num_values = 0; + let mut value_offsets = None; + let mut values_num = 0; + // Find value offsets from first `ListArray` column for col in columns { if matches!(col.data_type(), DataType::List(_)) { let offset_slices = col.data().buffers()[0].as_slice(); - num_values = col.data().child_data()[0].len(); + values_num = col.data().child_data()[0].len(); let mut i32_offsets = Vec::with_capacity(offset_slices.len() / 4); for i in (0..offset_slices.len()).step_by(4) { let offset = i32::from_le_bytes(offset_slices[i..i + 4].try_into().unwrap()); i32_offsets.push(offset); } - offsets = Some(i32_offsets); + value_offsets = Some(i32_offsets); break; } } - debug!("offsets:{:?}, num_values:{}", offsets, num_values); - let offsets = offsets.unwrap(); + ensure!(value_offsets.is_some(), ListArrayRequired {}); + + let value_offsets = value_offsets.unwrap(); let new_cols = columns .iter() .map(|col| { let data_type = col.data_type(); match data_type { - DataType::List(_nested_field) => col.data().child_data()[0].clone().into(), + DataType::List(_nested_field) => Ok(col.data().child_data()[0].clone().into()), _ => { let datum_kind = DatumKind::from_data_type(data_type).unwrap(); - let value_size = datum_kind.size().unwrap(); - let mut new_buffer = MutableBuffer::new(value_size * num_values); - let raw_buffer = col.data().buffers()[0].as_slice(); - for (idx, offset) in (0..raw_buffer.len()).step_by(value_size).enumerate() { - debug!("xx {idx}-{offset}-{}-{value_size}", offsets[idx]); - let value_num = offsets[idx + 1] - offsets[idx]; - new_buffer.extend( - raw_buffer[offset..offset + value_size].repeat(value_num as usize), - ) + match datum_kind.size() { + None => Self::stretch_variable_length_column( + col, + &value_offsets, + values_num, + ), + Some(value_size) => Self::stretch_fixed_length_column( + col, + value_size, + &value_offsets, + values_num, + ), } - let buf: Buffer = new_buffer.into(); - debug!("11_{:?}, new_buffer:{:#02x?}", data_type, buf.as_slice()); - - let array_data = ArrayData::builder(data_type.clone()) - .add_buffer(buf) - .len(num_values) - .build() - .unwrap(); - - array_data.into() } } }) - .collect::>(); + .collect::>>()?; - println!("try_new:{:?}, cols:{:?}", new_arrow_schema, new_cols); ArrowRecordBatch::try_new(new_arrow_schema, new_cols) .map_err(|e| Box::new(e) as _) .context(EncodeRecordBatch)