diff --git a/analytic_engine/src/sst/parquet/encoding.rs b/analytic_engine/src/sst/parquet/encoding.rs index 2c93187329..85e6dca733 100644 --- a/analytic_engine/src/sst/parquet/encoding.rs +++ b/analytic_engine/src/sst/parquet/encoding.rs @@ -7,7 +7,12 @@ use std::{ }; use arrow_deps::{ - arrow::record_batch::RecordBatch as ArrowRecordBatch, + arrow::{ + array::{Array, ArrayData, ArrayRef}, + buffer::MutableBuffer, + record_batch::RecordBatch as ArrowRecordBatch, + util::bit_util, + }, parquet::{ arrow::ArrowWriter, basic::Compression, @@ -17,9 +22,12 @@ use arrow_deps::{ use common_types::{ bytes::{BytesMut, MemBufMut, Writer}, datum::DatumKind, - schema::{ArrowSchemaRef, Schema, StorageFormat}, + schema::{ + ArrowSchema, ArrowSchemaMeta, ArrowSchemaRef, DataType, Field, Schema, StorageFormat, + }, }; use common_util::define_result; +use log::trace; use proto::sst::SstMetaData as SstMetaDataPb; use protobuf::Message; use snafu::{ensure, Backtrace, OptionExt, ResultExt, Snafu}; @@ -29,6 +37,9 @@ use crate::sst::{ parquet::hybrid::{self, IndexedType}, }; +// TODO: Only support i32 offset now, consider i64 here? +const OFFSET_SIZE: usize = std::mem::size_of::(); + #[derive(Debug, Snafu)] pub enum Error { #[snafu(display( @@ -121,6 +132,22 @@ pub enum Error { backtrace: Backtrace, }, + #[snafu(display( + "Failed to decode hybrid record batch, err:{}.\nBacktrace:\n{}", + source, + backtrace + ))] + DecodeRecordBatch { + source: Box, + backtrace: Backtrace, + }, + + #[snafu(display( + "At least one ListArray(such as Timestamp) is required to decode hybrid record batch.\nBacktrace:\n{}", + backtrace + ))] + ListArrayRequired { backtrace: Backtrace }, + #[snafu(display("Tsid is required for hybrid format.\nBacktrace:\n{}", backtrace))] TsidRequired { backtrace: Backtrace }, @@ -461,3 +488,456 @@ impl ParquetEncoder { self.record_encoder.close() } } + +/// RecordDecoder is used for decoding ArrowRecordBatch based on +/// `schema.StorageFormat` +trait RecordDecoder { + fn decode(&self, arrow_record_batch: ArrowRecordBatch) -> Result; +} + +struct ColumnarRecordDecoder {} + +impl RecordDecoder for ColumnarRecordDecoder { + fn decode(&self, arrow_record_batch: ArrowRecordBatch) -> Result { + Ok(arrow_record_batch) + } +} + +struct HybridRecordDecoder {} + +impl HybridRecordDecoder { + /// Convert `ListArray` fields to underlying data type + fn convert_schema(arrow_schema: ArrowSchemaRef) -> ArrowSchemaRef { + let new_fields: Vec<_> = arrow_schema + .fields() + .iter() + .map(|f| { + if let DataType::List(nested_field) = f.data_type() { + Field::new(f.name(), nested_field.data_type().clone(), true) + } else { + f.clone() + } + }) + .collect(); + Arc::new(ArrowSchema::new_with_metadata( + new_fields, + arrow_schema.metadata().clone(), + )) + } + + /// Stretch hybrid collpased column into columnar column. + /// `value_offsets` specify offsets each value occupied, which means that + /// the number of a `value[n]` is `value_offsets[n] - value_offsets[n-1]`. + /// Ex: + /// + /// `array_ref` is `a b c`, `value_offsets` is `[0, 3, 5, 6]`, then + /// output array is `a a a b b c` + /// + /// Note: caller should ensure offsets is not empty. + fn stretch_variable_length_column( + array_ref: &ArrayRef, + value_offsets: &[i32], + ) -> Result { + assert_eq!(array_ref.len() + 1, value_offsets.len()); + + let values_num = *value_offsets.last().unwrap() as usize; + let offset_slices = array_ref.data().buffers()[0].as_slice(); + let value_slices = array_ref.data().buffers()[1].as_slice(); + let null_bitmap = array_ref.data().null_bitmap(); + trace!( + "raw buffer slice, offsets:{:#02x?}, values:{:#02x?}, bitmap:{:#02x?}", + offset_slices, + value_slices, + null_bitmap.map(|v| v.buffer_ref().as_slice()) + ); + + let i32_offsets = Self::get_array_offsets(offset_slices); + let mut value_bytes = 0; + for (idx, (current, prev)) in i32_offsets[1..].iter().zip(&i32_offsets).enumerate() { + let value_len = current - prev; + let value_num = value_offsets[idx + 1] - value_offsets[idx]; + value_bytes += value_len * value_num; + } + + // construct new expanded array + let mut new_offsets_buffer = MutableBuffer::new(OFFSET_SIZE * values_num); + let mut new_values_buffer = MutableBuffer::new(value_bytes as usize); + let mut new_null_buffer = MutableBuffer::new(values_num); + new_null_buffer = new_null_buffer.with_bitset(values_num, true); + let null_slice = new_null_buffer.as_slice_mut(); + let mut value_length_so_far: i32 = 0; + new_offsets_buffer.push(value_length_so_far); + let mut bitmap_length_so_far: usize = 0; + + for (idx, (current, prev)) in i32_offsets[1..].iter().zip(&i32_offsets).enumerate() { + let value_len = current - prev; + let value_num = value_offsets[idx + 1] - value_offsets[idx]; + + if let Some(bitmap) = null_bitmap { + if !bitmap.is_set(idx) { + for i in 0..value_num { + bit_util::unset_bit(null_slice, bitmap_length_so_far + i as usize); + } + } + } + bitmap_length_so_far += value_num as usize; + new_values_buffer + .extend(value_slices[*prev as usize..*current as usize].repeat(value_num as usize)); + for _ in 0..value_num { + value_length_so_far += value_len; + new_offsets_buffer.push(value_length_so_far); + } + } + trace!( + "new buffer slice, offsets:{:#02x?}, values:{:#02x?}, bitmap:{:#02x?}", + new_offsets_buffer.as_slice(), + new_values_buffer.as_slice(), + new_null_buffer.as_slice(), + ); + + let array_data = ArrayData::builder(array_ref.data_type().clone()) + .len(values_num) + .add_buffer(new_offsets_buffer.into()) + .add_buffer(new_values_buffer.into()) + .null_bit_buffer(Some(new_null_buffer.into())) + .build() + .map_err(|e| Box::new(e) as _) + .context(DecodeRecordBatch)?; + + Ok(array_data.into()) + } + + /// Like `stretch_variable_length_column`, but array value is fixed-size + /// type. + /// + /// Note: caller should ensure offsets is not empty. + fn stretch_fixed_length_column( + array_ref: &ArrayRef, + value_size: usize, + value_offsets: &[i32], + ) -> Result { + assert!(!value_offsets.is_empty()); + + let values_num = *value_offsets.last().unwrap() as usize; + let old_values_buffer = array_ref.data().buffers()[0].as_slice(); + let old_null_bitmap = array_ref.data().null_bitmap(); + + let mut new_values_buffer = MutableBuffer::new(value_size * values_num); + let mut new_null_buffer = MutableBuffer::new(values_num); + new_null_buffer = new_null_buffer.with_bitset(values_num, true); + let null_slice = new_null_buffer.as_slice_mut(); + let mut length_so_far = 0; + + for (idx, offset) in (0..old_values_buffer.len()).step_by(value_size).enumerate() { + let value_num = (value_offsets[idx + 1] - value_offsets[idx]) as usize; + if let Some(bitmap) = old_null_bitmap { + if !bitmap.is_set(idx) { + for i in 0..value_num { + bit_util::unset_bit(null_slice, length_so_far + i as usize); + } + } + } + length_so_far += value_num; + new_values_buffer + .extend(old_values_buffer[offset..offset + value_size].repeat(value_num)) + } + let array_data = ArrayData::builder(array_ref.data_type().clone()) + .add_buffer(new_values_buffer.into()) + .null_bit_buffer(Some(new_null_buffer.into())) + .len(values_num) + .build() + .map_err(|e| Box::new(e) as _) + .context(DecodeRecordBatch)?; + + Ok(array_data.into()) + } + + /// Decode offset slices into Vec + fn get_array_offsets(offset_slices: &[u8]) -> Vec { + let mut i32_offsets = Vec::with_capacity(offset_slices.len() / OFFSET_SIZE); + for i in (0..offset_slices.len()).step_by(OFFSET_SIZE) { + let offset = i32::from_le_bytes(offset_slices[i..i + OFFSET_SIZE].try_into().unwrap()); + i32_offsets.push(offset); + } + + i32_offsets + } +} + +impl RecordDecoder for HybridRecordDecoder { + /// Decode records from hybrid to columnar format + fn decode(&self, arrow_record_batch: ArrowRecordBatch) -> Result { + let new_arrow_schema = Self::convert_schema(arrow_record_batch.schema()); + let arrays = arrow_record_batch.columns(); + let mut value_offsets = None; + // Find value offsets from the first `ListArray` column + for array_ref in arrays { + if matches!(array_ref.data_type(), DataType::List(_)) { + let offset_slices = array_ref.data().buffers()[0].as_slice(); + value_offsets = Some(Self::get_array_offsets(offset_slices)); + break; + } + } + + ensure!(value_offsets.is_some(), ListArrayRequired {}); + + let value_offsets = value_offsets.unwrap(); + let arrays = arrays + .iter() + .map(|array_ref| { + let data_type = array_ref.data_type(); + match data_type { + // TODO: + // 1. we assume the datatype inside the List is primitive now + // Ensure this when create table + // 2. Although nested structure isn't support now, but may will someday in + // future. So We should keep metadata about which columns + // are collapsed by hybrid storage format, to differentiate + // List column in original records + DataType::List(_nested_field) => { + Ok(array_ref.data().child_data()[0].clone().into()) + } + _ => { + let datum_kind = DatumKind::from_data_type(data_type).unwrap(); + match datum_kind.size() { + None => Self::stretch_variable_length_column(array_ref, &value_offsets), + Some(value_size) => Self::stretch_fixed_length_column( + array_ref, + value_size, + &value_offsets, + ), + } + } + } + }) + .collect::>>()?; + + ArrowRecordBatch::try_new(new_arrow_schema, arrays) + .map_err(|e| Box::new(e) as _) + .context(EncodeRecordBatch) + } +} + +pub struct ParquetDecoder { + record_decoder: Box, +} + +impl ParquetDecoder { + pub fn try_new(arrow_schema: ArrowSchemaRef) -> Result { + let arrow_schema_meta = ArrowSchemaMeta::try_from(arrow_schema.metadata()) + .map_err(|e| Box::new(e) as _) + .context(DecodeRecordBatch)?; + let mut format = arrow_schema_meta.storage_format(); + // TODO: remove this overwrite when we can set format via table options + if matches!(format, StorageFormat::Hybrid) && !arrow_schema_meta.enable_tsid_primary_key() { + format = StorageFormat::Columnar; + } + + let record_decoder: Box = match format { + StorageFormat::Hybrid => Box::new(HybridRecordDecoder {}), + StorageFormat::Columnar => Box::new(ColumnarRecordDecoder {}), + }; + + Ok(Self { record_decoder }) + } + + pub fn decode_record_batch( + &self, + arrow_record_batch: ArrowRecordBatch, + ) -> Result { + self.record_decoder.decode(arrow_record_batch) + } +} + +#[cfg(test)] +mod tests { + + use arrow_deps::{ + arrow::array::{Int32Array, StringArray, TimestampMillisecondArray, UInt64Array}, + parquet::{ + arrow::{ArrowReader, ParquetFileArrowReader}, + file::serialized_reader::{SerializedFileReader, SliceableCursor}, + }, + }; + use common_types::{ + column_schema, + schema::{Builder, TSID_COLUMN}, + }; + + use super::*; + + fn build_schema() -> Schema { + Builder::new() + .auto_increment_column_id(true) + .enable_tsid_primary_key(true) + .add_key_column( + column_schema::Builder::new(TSID_COLUMN.to_string(), DatumKind::UInt64) + .build() + .unwrap(), + ) + .unwrap() + .add_key_column( + column_schema::Builder::new("timestamp".to_string(), DatumKind::Timestamp) + .build() + .unwrap(), + ) + .unwrap() + .add_normal_column( + column_schema::Builder::new("host".to_string(), DatumKind::String) + .is_tag(true) + .build() + .unwrap(), + ) + .unwrap() + .add_normal_column( + column_schema::Builder::new("region".to_string(), DatumKind::String) + .is_tag(true) + .build() + .unwrap(), + ) + .unwrap() + .add_normal_column( + column_schema::Builder::new("value".to_string(), DatumKind::Int32) + .build() + .unwrap(), + ) + .unwrap() + .build() + .unwrap() + } + + fn string_array(values: Vec>) -> ArrayRef { + Arc::new(StringArray::from(values)) + } + + fn int32_array(values: Vec>) -> ArrayRef { + Arc::new(Int32Array::from(values)) + } + + fn timestamp_array(values: Vec) -> ArrayRef { + Arc::new(TimestampMillisecondArray::from(values)) + } + + #[test] + fn stretch_int32_column() { + let testcases = [ + // (input, value_offsets, expected) + ( + vec![Some(1), Some(2)], + vec![0, 2, 4], + vec![Some(1), Some(1), Some(2), Some(2)], + ), + ( + vec![Some(1), None, Some(2)], + vec![0, 2, 4, 5], + vec![Some(1), Some(1), None, None, Some(2)], + ), + ]; + + for (input, value_offsets, expected) in testcases { + let input = int32_array(input); + let expected = int32_array(expected); + let actual = HybridRecordDecoder::stretch_fixed_length_column( + &input, + std::mem::size_of::(), + &value_offsets, + ) + .unwrap(); + assert_eq!( + actual.as_any().downcast_ref::().unwrap(), + expected.as_any().downcast_ref::().unwrap(), + ); + } + } + + #[test] + fn stretch_string_column() { + let testcases = [ + // (input, value_offsets, values_num, expected) + // + // value with same length + ( + vec![Some("a"), Some("b"), Some("c")], + vec![0, 3, 5, 6], + vec![ + Some("a"), + Some("a"), + Some("a"), + Some("b"), + Some("b"), + Some("c"), + ], + ), + // value with different length + ( + vec![Some("hello"), Some("ceresdb")], + vec![0, 1, 3], + vec![Some("hello"), Some("ceresdb"), Some("ceresdb")], + ), + // value with none + ( + vec![None, None, Some("hello"), None], + vec![0, 1, 3, 4, 5], + vec![None, None, None, Some("hello"), None], + ), + ]; + + for (input, value_offsets, expected) in testcases { + let input = string_array(input); + let expected = string_array(expected); + let actual = + HybridRecordDecoder::stretch_variable_length_column(&input, &value_offsets) + .unwrap(); + assert_eq!( + actual.as_any().downcast_ref::().unwrap(), + expected.as_any().downcast_ref::().unwrap(), + ); + } + } + + #[test] + fn encode_hybrid_record_and_decode_back() { + let write_props = WriterProperties::builder().build(); + let schema = build_schema(); + let mut encoder = HybridRecordEncoder::try_new(write_props, &schema).unwrap(); + + let columns = vec![ + Arc::new(UInt64Array::from(vec![1, 1, 2, 2])) as ArrayRef, + timestamp_array(vec![100, 101, 100, 101]), + string_array(vec![ + Some("host1"), + Some("host1"), + Some("host2"), + Some("host2"), + ]), + string_array(vec![ + Some("region1"), + Some("region1"), + Some("region2"), + Some("region2"), + ]), + int32_array(vec![Some(1), Some(2), Some(11), Some(12)]), + ]; + + let input_record_batch = + ArrowRecordBatch::try_new(schema.to_arrow_schema_ref(), columns).unwrap(); + let row_nums = encoder.encode(vec![input_record_batch.clone()]).unwrap(); + assert_eq!(2, row_nums); + + // read encoded records back, and then compare with input records + let encoded_bytes = encoder.close().unwrap(); + let reader = + SerializedFileReader::new(SliceableCursor::new(Arc::new(encoded_bytes))).unwrap(); + let mut reader = ParquetFileArrowReader::new(Arc::new(reader)); + let mut reader = reader.get_record_reader(2048).unwrap(); + let hybrid_record_batch = reader.next().unwrap().unwrap(); + + let decoder = HybridRecordDecoder {}; + let decoded_record_batch = decoder.decode(hybrid_record_batch).unwrap(); + + // Note: decode record batch's schema doesn't have metadata + // It's encoded in metadata of every fields + // assert_eq!(decoded_record_batch.schema(), input_record_batch.schema()); + assert_eq!(decoded_record_batch.columns(), input_record_batch.columns()); + } +} diff --git a/analytic_engine/src/sst/parquet/reader.rs b/analytic_engine/src/sst/parquet/reader.rs index 7f82927801..4960a95d38 100644 --- a/analytic_engine/src/sst/parquet/reader.rs +++ b/analytic_engine/src/sst/parquet/reader.rs @@ -3,7 +3,6 @@ //! Sst reader implementation based on parquet. use std::{ - convert::TryFrom, pin::Pin, sync::Arc, task::{Context, Poll}, @@ -23,7 +22,7 @@ use async_trait::async_trait; use common_types::{ projected_schema::{ProjectedSchema, RowProjector}, record_batch::{ArrowRecordBatchProjector, RecordBatchWithKey}, - schema::{Schema, StorageFormat}, + schema::Schema, }; use common_util::runtime::Runtime; use futures::Stream; @@ -40,7 +39,7 @@ use tokio::sync::mpsc::{self, Receiver, Sender}; use crate::sst::{ factory::SstReaderOptions, file::SstMetaData, - parquet::encoding, + parquet::encoding::{self, ParquetDecoder}, reader::{error::*, SstReader}, }; @@ -320,6 +319,10 @@ impl ProjectAndFilterReader { let reader = self.project_and_filter_reader()?; let arrow_record_batch_projector = ArrowRecordBatchProjector::from(self.row_projector); + let arrow_schema = self.projected_schema.to_projected_arrow_schema(); + let parquet_decoder = ParquetDecoder::try_new(arrow_schema) + .map_err(|e| Box::new(e) as _) + .context(DecodeRecordBatch)?; let mut row_num = 0; for record_batch in reader { trace!( @@ -333,13 +336,10 @@ impl ProjectAndFilterReader { .context(DecodeRecordBatch) { Ok(record_batch) => { - let arrow_schema = record_batch.schema(); - let schema = Schema::try_from(arrow_schema).context(InvalidSchema)?; - let record_batch = match schema.storage_format() { - StorageFormat::Hybrid => todo!("Will implement this in PR 207"), - StorageFormat::Columnar => record_batch, - }; - + let record_batch = parquet_decoder + .decode_record_batch(record_batch) + .map_err(|e| Box::new(e) as _) + .context(DecodeRecordBatch)?; row_num += record_batch.num_rows(); let record_batch_with_key = arrow_record_batch_projector .project_to_record_batch_with_key(record_batch) diff --git a/common_types/src/schema.rs b/common_types/src/schema.rs index 1a135fec2a..05d313be64 100644 --- a/common_types/src/schema.rs +++ b/common_types/src/schema.rs @@ -177,13 +177,72 @@ pub enum CompatError { } /// Meta data of the arrow schema -struct ArrowSchemaMeta { +pub struct ArrowSchemaMeta { num_key_columns: usize, timestamp_index: usize, enable_tsid_primary_key: bool, version: u32, } +impl ArrowSchemaMeta { + pub fn storage_format(&self) -> StorageFormat { + // TODO: parse it from table options + match std::env::var("CERESDB_TABLE_FORMAT") { + Ok(format) => { + if format == "HYBRID" { + StorageFormat::Hybrid + } else { + StorageFormat::Columnar + } + } + Err(_) => StorageFormat::Columnar, + } + } + + pub fn enable_tsid_primary_key(&self) -> bool { + self.enable_tsid_primary_key + } + + fn parse_arrow_schema_meta_value( + meta: &HashMap, + key: ArrowSchemaMetaKey, + ) -> Result + where + T: FromStr, + T::Err: std::error::Error + Send + Sync + 'static, + { + let raw_value = meta + .get(key.as_str()) + .context(ArrowSchemaMetaKeyNotFound { key })?; + T::from_str(raw_value.as_str()) + .map_err(|e| Box::new(e) as _) + .context(InvalidArrowSchemaMetaValue { key, raw_value }) + } +} + +/// Parse the necessary meta information from the arrow schema's meta data. +impl TryFrom<&HashMap> for ArrowSchemaMeta { + type Error = Error; + + fn try_from(meta: &HashMap) -> Result { + Ok(ArrowSchemaMeta { + num_key_columns: Self::parse_arrow_schema_meta_value( + meta, + ArrowSchemaMetaKey::NumKeyColumns, + )?, + timestamp_index: Self::parse_arrow_schema_meta_value( + meta, + ArrowSchemaMetaKey::TimestampIndex, + )?, + enable_tsid_primary_key: Self::parse_arrow_schema_meta_value( + meta, + ArrowSchemaMetaKey::EnableTsidPrimaryKey, + )?, + version: Self::parse_arrow_schema_meta_value(meta, ArrowSchemaMetaKey::Version)?, + }) + } +} + #[derive(Copy, Clone, Debug)] pub enum ArrowSchemaMetaKey { NumKeyColumns, @@ -448,6 +507,7 @@ pub fn compare_row( } /// StorageFormat specify how records are saved in persistent storage +#[derive(Debug)] pub enum StorageFormat { /// Traditional columnar format, every column is saved in one exact one /// column, for example: @@ -1053,27 +1113,11 @@ impl Builder { }) } - fn parse_arrow_schema_meta_value( - meta: &HashMap, - key: ArrowSchemaMetaKey, - ) -> Result - where - T: FromStr, - T::Err: std::error::Error + Send + Sync + 'static, - { - let raw_value = meta - .get(key.as_str()) - .context(ArrowSchemaMetaKeyNotFound { key })?; - T::from_str(raw_value.as_str()) - .map_err(|e| Box::new(e) as _) - .context(InvalidArrowSchemaMetaValue { key, raw_value }) - } - /// Parse the necessary meta information from the arrow schema's meta data. fn parse_arrow_schema_meta_or_default( meta: &HashMap, ) -> Result { - match Self::parse_arrow_schema_meta(meta) { + match ArrowSchemaMeta::try_from(meta) { Ok(v) => Ok(v), Err(Error::ArrowSchemaMetaKeyNotFound { .. }) => Ok(ArrowSchemaMeta { num_key_columns: 0, @@ -1085,25 +1129,6 @@ impl Builder { } } - /// Parse the necessary meta information from the arrow schema's meta data. - fn parse_arrow_schema_meta(meta: &HashMap) -> Result { - Ok(ArrowSchemaMeta { - num_key_columns: Self::parse_arrow_schema_meta_value( - meta, - ArrowSchemaMetaKey::NumKeyColumns, - )?, - timestamp_index: Self::parse_arrow_schema_meta_value( - meta, - ArrowSchemaMetaKey::TimestampIndex, - )?, - enable_tsid_primary_key: Self::parse_arrow_schema_meta_value( - meta, - ArrowSchemaMetaKey::EnableTsidPrimaryKey, - )?, - version: Self::parse_arrow_schema_meta_value(meta, ArrowSchemaMetaKey::Version)?, - }) - } - /// Build arrow schema meta data. /// /// Requires: the timestamp index is not None.