From 1cefda7745bd7f6dfc32edb7d00522346e37fbb8 Mon Sep 17 00:00:00 2001 From: Jiacai Liu Date: Fri, 19 Aug 2022 16:36:47 +0800 Subject: [PATCH] feat: write hybrid storage format (#185) * add convert_to_hybrid * add parquet encoder to keep schema consistent between writer and record * use buffer to build list array * fields support all primitive array * support bitmap * check tsid in try_new * fix CR, add more docs * fix clippy * rename field column to non key column * fix clippy * fix wrong data type size * fix reviews * fix naming convertion, add is_collapsible_column method * fix clippy * null bimap init to 1 to avoid unnecessory operation --- analytic_engine/src/sst/parquet/builder.rs | 69 +-- analytic_engine/src/sst/parquet/encoding.rs | 319 +++++++++++++- analytic_engine/src/sst/parquet/hybrid.rs | 447 ++++++++++++++++++++ analytic_engine/src/sst/parquet/mod.rs | 1 + analytic_engine/src/sst/parquet/reader.rs | 12 +- analytic_engine/src/sst/reader.rs | 3 + common_types/src/datum.rs | 42 ++ common_types/src/record_batch.rs | 2 +- common_types/src/schema.rs | 73 ++++ 9 files changed, 900 insertions(+), 68 deletions(-) create mode 100644 analytic_engine/src/sst/parquet/hybrid.rs diff --git a/analytic_engine/src/sst/parquet/builder.rs b/analytic_engine/src/sst/parquet/builder.rs index 9ae154cec1..3347a5d375 100644 --- a/analytic_engine/src/sst/parquet/builder.rs +++ b/analytic_engine/src/sst/parquet/builder.rs @@ -7,11 +7,7 @@ use std::sync::{ Arc, }; -use arrow_deps::{ - arrow::record_batch::RecordBatch as ArrowRecordBatch, - datafusion::parquet::basic::Compression, - parquet::{arrow::ArrowWriter, file::properties::WriterProperties}, -}; +use arrow_deps::datafusion::parquet::basic::Compression; use async_trait::async_trait; use common_types::request_id::RequestId; use futures::StreamExt; @@ -23,7 +19,7 @@ use crate::sst::{ builder::{RecordBatchStream, SstBuilder, *}, factory::SstBuilderOptions, file::SstMetaData, - parquet::encoding, + parquet::encoding::ParquetEncoder, }; /// The implementation of sst based on parquet and object storage. @@ -54,7 +50,6 @@ impl<'a> ParquetSstBuilder<'a> { struct RecordBytesReader { request_id: RequestId, record_stream: RecordBatchStream, - encoding_buffer: Vec, num_rows_per_row_group: usize, compression: Compression, meta_data: SstMetaData, @@ -63,37 +58,14 @@ struct RecordBytesReader { fetched_row_num: usize, } -/// Build the write properties containing the sst meta data. -fn build_write_properties( - num_rows_per_row_group: usize, - compression: Compression, - meta_data: &SstMetaData, -) -> Result { - let meta_data_kv = encoding::encode_sst_meta_data(meta_data.clone()) - .map_err(|e| Box::new(e) as _) - .context(EncodeMetaData)?; - - Ok(WriterProperties::builder() - .set_key_value_metadata(Some(vec![meta_data_kv])) - .set_max_row_group_size(num_rows_per_row_group) - .set_compression(compression) - .build()) -} - impl RecordBytesReader { async fn read_all(mut self) -> Result> { let mut arrow_record_batch_vec = Vec::new(); - // build writer - let write_props = build_write_properties( + let mut parquet_encoder = ParquetEncoder::try_new( self.num_rows_per_row_group, self.compression, &self.meta_data, - )?; - let mut arrow_writer = ArrowWriter::try_new( - &mut self.encoding_buffer, - self.meta_data.schema.as_arrow_schema_ref().clone(), - Some(write_props), ) .map_err(|e| Box::new(e) as _) .context(EncodeRecordBatch)?; @@ -114,7 +86,8 @@ impl RecordBytesReader { if self.fetched_row_num >= self.num_rows_per_row_group { let buf_len = arrow_record_batch_vec.len(); self.fetched_row_num = 0; - let row_num = Self::encode_record_batch(&mut arrow_writer, arrow_record_batch_vec) + let row_num = parquet_encoder + .encode_record_batch(arrow_record_batch_vec) .map_err(|e| Box::new(e) as _) .context(EncodeRecordBatch)?; arrow_record_batch_vec = Vec::with_capacity(buf_len); @@ -124,41 +97,18 @@ impl RecordBytesReader { // final check if there is any record batch left if self.fetched_row_num != 0 { - let row_num = Self::encode_record_batch(&mut arrow_writer, arrow_record_batch_vec) + let row_num = parquet_encoder + .encode_record_batch(arrow_record_batch_vec) .map_err(|e| Box::new(e) as _) .context(EncodeRecordBatch)?; self.total_row_num.fetch_add(row_num, Ordering::Relaxed); } - arrow_writer + let bytes = parquet_encoder .close() - .map_err(|e| Box::new(e) as _) - .context(EncodeMetaData)?; - - Ok(self.encoding_buffer) - } - - /// Encode the record batch with [ArrowWriter] and the encoded contents is - /// written to the buffer. - fn encode_record_batch( - arrow_writer: &mut ArrowWriter, - arrow_record_batch_vec: Vec, - ) -> Result { - if arrow_record_batch_vec.is_empty() { - return Ok(0); - } - - let arrow_schema = arrow_record_batch_vec[0].schema(); - let record_batch = ArrowRecordBatch::concat(&arrow_schema, &arrow_record_batch_vec) .map_err(|e| Box::new(e) as _) .context(EncodeRecordBatch)?; - - arrow_writer - .write(&record_batch) - .map_err(|e| Box::new(e) as _) - .context(EncodeRecordBatch)?; - - Ok(record_batch.num_rows()) + Ok(bytes) } } @@ -179,7 +129,6 @@ impl<'a> SstBuilder for ParquetSstBuilder<'a> { let reader = RecordBytesReader { request_id, record_stream, - encoding_buffer: vec![], num_rows_per_row_group: self.num_rows_per_row_group, compression: self.compression, total_row_num: total_row_num.clone(), diff --git a/analytic_engine/src/sst/parquet/encoding.rs b/analytic_engine/src/sst/parquet/encoding.rs index ddb916b14d..2c93187329 100644 --- a/analytic_engine/src/sst/parquet/encoding.rs +++ b/analytic_engine/src/sst/parquet/encoding.rs @@ -1,15 +1,33 @@ // Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. -use std::convert::TryFrom; +use std::{ + convert::TryFrom, + io::Write, + sync::{Arc, Mutex}, +}; -use arrow_deps::parquet::file::metadata::KeyValue; -use common_types::bytes::{BytesMut, MemBufMut, Writer}; +use arrow_deps::{ + arrow::record_batch::RecordBatch as ArrowRecordBatch, + parquet::{ + arrow::ArrowWriter, + basic::Compression, + file::{metadata::KeyValue, properties::WriterProperties}, + }, +}; +use common_types::{ + bytes::{BytesMut, MemBufMut, Writer}, + datum::DatumKind, + schema::{ArrowSchemaRef, Schema, StorageFormat}, +}; use common_util::define_result; use proto::sst::SstMetaData as SstMetaDataPb; use protobuf::Message; use snafu::{ensure, Backtrace, OptionExt, ResultExt, Snafu}; -use crate::sst::file::SstMetaData; +use crate::sst::{ + file::SstMetaData, + parquet::hybrid::{self, IndexedType}, +}; #[derive(Debug, Snafu)] pub enum Error { @@ -92,6 +110,39 @@ pub enum Error { #[snafu(display("Failed to convert sst meta data from protobuf, err:{}", source))] ConvertSstMetaData { source: crate::sst::file::Error }, + + #[snafu(display( + "Failed to encode record batch into sst, err:{}.\nBacktrace:\n{}", + source, + backtrace + ))] + EncodeRecordBatch { + source: Box, + backtrace: Backtrace, + }, + + #[snafu(display("Tsid is required for hybrid format.\nBacktrace:\n{}", backtrace))] + TsidRequired { backtrace: Backtrace }, + + #[snafu(display( + "Key column must be string type. type:{}\nBacktrace:\n{}", + type_name, + backtrace + ))] + StringKeyColumnRequired { + type_name: String, + backtrace: Backtrace, + }, + + #[snafu(display( + "Hybrid format doesn't support variable length type, type:{}.\nBacktrace:\n{}", + type_name, + backtrace + ))] + VariableLengthType { + type_name: String, + backtrace: Backtrace, + }, } define_result!(Error); @@ -150,3 +201,263 @@ pub fn decode_sst_meta_data(kv: &KeyValue) -> Result { SstMetaData::try_from(meta_data_pb).context(ConvertSstMetaData) } + +/// RecordEncoder is used for encoding ArrowBatch. +/// +/// TODO: allow pre-allocate buffer +trait RecordEncoder { + /// Encode vector of arrow batch, return encoded row number + fn encode(&mut self, arrow_record_batch_vec: Vec) -> Result; + + /// Return encoded bytes + /// Note: trait method cannot receive `self`, so take a &mut self here to + /// indicate this encoder is already consumed + fn close(&mut self) -> Result>; +} + +/// EncodingWriter implements `Write` trait, useful when Writer need shared +/// ownership. +/// +/// TODO: This is a temp workaround for [ArrowWriter](https://docs.rs/parquet/20.0.0/parquet/arrow/arrow_writer/struct.ArrowWriter.html), since it has no method to get underlying Writer +/// We can fix this by add `into_inner` method to it, or just replace it with +/// parquet2, which already have this method +/// https://github.com/CeresDB/ceresdb/issues/53 +#[derive(Clone)] +struct EncodingWriter(Arc>>); + +impl EncodingWriter { + fn into_bytes(self) -> Vec { + self.0.lock().unwrap().clone() + } +} + +impl Write for EncodingWriter { + fn write(&mut self, buf: &[u8]) -> std::io::Result { + let mut inner = self.0.lock().unwrap(); + inner.write(buf) + } + + fn flush(&mut self) -> std::io::Result<()> { + Ok(()) + } +} + +struct ColumnarRecordEncoder { + buf: EncodingWriter, + // wrap in Option so ownership can be taken out behind `&mut self` + arrow_writer: Option>, + arrow_schema: ArrowSchemaRef, +} + +impl ColumnarRecordEncoder { + fn try_new(write_props: WriterProperties, schema: &Schema) -> Result { + let arrow_schema = schema.to_arrow_schema_ref(); + + let buf = EncodingWriter(Arc::new(Mutex::new(Vec::new()))); + let arrow_writer = + ArrowWriter::try_new(buf.clone(), arrow_schema.clone(), Some(write_props)) + .map_err(|e| Box::new(e) as _) + .context(EncodeRecordBatch)?; + + Ok(Self { + buf, + arrow_writer: Some(arrow_writer), + arrow_schema, + }) + } +} + +impl RecordEncoder for ColumnarRecordEncoder { + fn encode(&mut self, arrow_record_batch_vec: Vec) -> Result { + assert!(self.arrow_writer.is_some()); + + let record_batch = ArrowRecordBatch::concat(&self.arrow_schema, &arrow_record_batch_vec) + .map_err(|e| Box::new(e) as _) + .context(EncodeRecordBatch)?; + + self.arrow_writer + .as_mut() + .unwrap() + .write(&record_batch) + .map_err(|e| Box::new(e) as _) + .context(EncodeRecordBatch)?; + + Ok(record_batch.num_rows()) + } + + fn close(&mut self) -> Result> { + assert!(self.arrow_writer.is_some()); + + let arrow_writer = self.arrow_writer.take().unwrap(); + arrow_writer + .close() + .map_err(|e| Box::new(e) as _) + .context(EncodeRecordBatch)?; + + Ok(self.buf.clone().into_bytes()) + } +} + +struct HybridRecordEncoder { + buf: EncodingWriter, + // wrap in Option so ownership can be taken out behind `&mut self` + arrow_writer: Option>, + arrow_schema: ArrowSchemaRef, + tsid_type: IndexedType, + non_collapsible_col_types: Vec, + // columns that can be collpased into list + collapsible_col_types: Vec, +} + +impl HybridRecordEncoder { + fn try_new(write_props: WriterProperties, schema: &Schema) -> Result { + // TODO: What we really want here is a unique ID, tsid is one case + // Maybe support other cases later. + let tsid_idx = schema.index_of_tsid().context(TsidRequired)?; + let tsid_type = IndexedType { + idx: tsid_idx, + data_type: schema.column(tsid_idx).data_type, + }; + + let mut non_collapsible_col_types = Vec::new(); + let mut collapsible_col_types = Vec::new(); + for (idx, col) in schema.columns().iter().enumerate() { + if idx == tsid_idx { + continue; + } + + if schema.is_collapsible_column(idx) { + // TODO: support variable length type + ensure!( + col.data_type.size().is_some(), + VariableLengthType { + type_name: col.data_type.to_string(), + } + ); + + collapsible_col_types.push(IndexedType { + idx, + data_type: schema.column(idx).data_type, + }); + } else { + // TODO: support non-string key columns + ensure!( + matches!(col.data_type, DatumKind::String), + StringKeyColumnRequired { + type_name: col.data_type.to_string(), + } + ); + non_collapsible_col_types.push(IndexedType { + idx, + data_type: col.data_type, + }); + } + } + + let arrow_schema = hybrid::build_hybrid_arrow_schema(schema); + + let buf = EncodingWriter(Arc::new(Mutex::new(Vec::new()))); + let arrow_writer = + ArrowWriter::try_new(buf.clone(), arrow_schema.clone(), Some(write_props)) + .map_err(|e| Box::new(e) as _) + .context(EncodeRecordBatch)?; + Ok(Self { + buf, + arrow_writer: Some(arrow_writer), + arrow_schema, + tsid_type, + non_collapsible_col_types, + collapsible_col_types, + }) + } +} + +impl RecordEncoder for HybridRecordEncoder { + fn encode(&mut self, arrow_record_batch_vec: Vec) -> Result { + assert!(self.arrow_writer.is_some()); + + let record_batch = hybrid::convert_to_hybrid_record( + &self.tsid_type, + &self.non_collapsible_col_types, + &self.collapsible_col_types, + self.arrow_schema.clone(), + arrow_record_batch_vec, + ) + .map_err(|e| Box::new(e) as _) + .context(EncodeRecordBatch)?; + + self.arrow_writer + .as_mut() + .unwrap() + .write(&record_batch) + .map_err(|e| Box::new(e) as _) + .context(EncodeRecordBatch)?; + + Ok(record_batch.num_rows()) + } + + fn close(&mut self) -> Result> { + assert!(self.arrow_writer.is_some()); + + let arrow_writer = self.arrow_writer.take().unwrap(); + arrow_writer + .close() + .map_err(|e| Box::new(e) as _) + .context(EncodeRecordBatch)?; + Ok(self.buf.clone().into_bytes()) + } +} + +pub struct ParquetEncoder { + record_encoder: Box, +} + +impl ParquetEncoder { + pub fn try_new( + num_rows_per_row_group: usize, + compression: Compression, + meta_data: &SstMetaData, + ) -> Result { + let write_props = WriterProperties::builder() + .set_key_value_metadata(Some(vec![encode_sst_meta_data(meta_data.clone())?])) + .set_max_row_group_size(num_rows_per_row_group) + .set_compression(compression) + .build(); + let mut format = meta_data.schema.storage_format(); + + // TODO: remove this overwrite when we can set format via table options + if matches!(format, StorageFormat::Hybrid) && meta_data.schema.index_of_tsid().is_none() { + format = StorageFormat::Columnar; + } + + let record_encoder: Box = match format { + StorageFormat::Hybrid => Box::new(HybridRecordEncoder::try_new( + write_props, + &meta_data.schema, + )?), + StorageFormat::Columnar => Box::new(ColumnarRecordEncoder::try_new( + write_props, + &meta_data.schema, + )?), + }; + + Ok(ParquetEncoder { record_encoder }) + } + + /// Encode the record batch with [ArrowWriter] and the encoded contents is + /// written to the buffer. + pub fn encode_record_batch( + &mut self, + arrow_record_batch_vec: Vec, + ) -> Result { + if arrow_record_batch_vec.is_empty() { + return Ok(0); + } + + self.record_encoder.encode(arrow_record_batch_vec) + } + + pub fn close(mut self) -> Result> { + self.record_encoder.close() + } +} diff --git a/analytic_engine/src/sst/parquet/hybrid.rs b/analytic_engine/src/sst/parquet/hybrid.rs new file mode 100644 index 0000000000..19a75eeea6 --- /dev/null +++ b/analytic_engine/src/sst/parquet/hybrid.rs @@ -0,0 +1,447 @@ +// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. + +use std::{collections::BTreeMap, sync::Arc}; + +use arrow_deps::arrow::{ + array::{Array, ArrayData, ArrayRef, ListArray, StringArray, UInt64Array}, + bitmap::Bitmap, + buffer::MutableBuffer, + datatypes::Schema as ArrowSchema, + record_batch::RecordBatch as ArrowRecordBatch, + util::bit_util, +}; +use common_types::{ + datum::DatumKind, + schema::{ArrowSchemaRef, DataType, Field, Schema}, +}; +use log::debug; +use snafu::ResultExt; + +use crate::sst::builder::{EncodeRecordBatch, Result}; + +// hard coded in https://github.com/apache/arrow-rs/blob/20.0.0/arrow/src/array/array_list.rs#L185 +const LIST_ITEM_NAME: &str = "item"; + +#[derive(Debug, Clone, Copy)] +struct SliceArg { + offset: usize, + length: usize, +} + +/// ArrayHandle is used to keep different offsets of array, which can be +/// concatenated together. +/// +/// Note: +/// 1. Array.slice(offset, length) don't work as expected, since the +/// underlying buffer is still shared without slice. +/// 2. Array shoule be [fixed-size primitive](https://arrow.apache.org/docs/format/Columnar.html#fixed-size-primitive-layout) +#[derive(Debug, Clone)] +struct ArrayHandle { + array: ArrayRef, + slice_args: Vec, +} + +impl ArrayHandle { + fn new(array: ArrayRef) -> Self { + Self::with_slice_args(array, Vec::new()) + } + + fn with_slice_args(array: ArrayRef, slice_args: Vec) -> Self { + Self { array, slice_args } + } + + fn append_slice_arg(&mut self, arg: SliceArg) { + self.slice_args.push(arg) + } + + fn len(&self) -> usize { + self.slice_args.iter().map(|arg| arg.length).sum() + } + + // Note: this require primitive array + fn data_slice(&self) -> &[u8] { + self.array.data().buffers()[0].as_slice() + } + + fn null_bitmap(&self) -> Option<&Bitmap> { + self.array.data().null_bitmap() + } +} + +/// `TsidBatch` is used to collect column data for the same TSID +#[derive(Debug)] +struct TsidBatch { + non_collapsible_col_values: Vec, + collapsible_col_arrays: Vec, +} + +impl TsidBatch { + fn new(non_collapsible_col_values: Vec, collapsible_col_arrays: Vec) -> Self { + Self { + non_collapsible_col_values, + collapsible_col_arrays: collapsible_col_arrays + .into_iter() + .map(|f| ArrayHandle::new(f)) + .collect(), + } + } + + fn append_slice_arg(&mut self, arg: SliceArg) { + for handle in &mut self.collapsible_col_arrays { + handle.append_slice_arg(arg); + } + } +} + +#[derive(Debug)] +pub struct IndexedType { + pub idx: usize, + pub data_type: DatumKind, +} + +struct IndexedArray { + idx: usize, + array: ArrayRef, +} + +/// Convert collapsible columns to list type +pub fn build_hybrid_arrow_schema(schema: &Schema) -> ArrowSchemaRef { + let arrow_schema = schema.to_arrow_schema_ref(); + let new_fields = arrow_schema + .fields() + .iter() + .enumerate() + .map(|(idx, field)| { + if schema.is_collapsible_column(idx) { + let field_type = DataType::List(Box::new(Field::new( + LIST_ITEM_NAME, + field.data_type().clone(), + true, + ))); + Field::new(field.name(), field_type, true) + } else { + field.clone() + } + }) + .collect::>(); + Arc::new(ArrowSchema::new_with_metadata( + new_fields, + arrow_schema.metadata().clone(), + )) +} + +/// ListArrayBuilder is used for concat slice of different Arrays represented by +/// ArrayHandle into one ListArray +struct ListArrayBuilder { + datum_kind: DatumKind, + list_of_arrays: Vec, +} + +impl ListArrayBuilder { + fn new(datum_kind: DatumKind, list_of_arrays: Vec) -> Self { + Self { + datum_kind, + list_of_arrays, + } + } + + fn build_child_data(&self, offsets: &mut MutableBuffer) -> Result { + let data_type_size = self + .datum_kind + .size() + .expect("checked in HybridRecordEncoder::try_new"); + let values_num = self.list_of_arrays.iter().map(|handle| handle.len()).sum(); + let mut values = MutableBuffer::new(values_num * data_type_size); + // Initialize null_buffer with all 1, so we don't need to set it when array's + // null_bitmap is None + // + // Note: bit set to 1 means value is not null. + let mut null_buffer = MutableBuffer::new_null(values_num).with_bitset(values_num, true); + let null_slice = null_buffer.as_slice_mut(); + + let mut length_so_far: i32 = 0; + offsets.push(length_so_far); + for array_handle in &self.list_of_arrays { + let shared_buffer = array_handle.data_slice(); + let null_bitmap = array_handle.null_bitmap(); + + for slice_arg in &array_handle.slice_args { + let offset = slice_arg.offset; + let length = slice_arg.length; + if let Some(bitmap) = null_bitmap { + // TODO: We now set bitmap one by one, a more complicated but efficient way is + // to operate on bitmap buffer bits directly, like what we do + // with values(slice and shift) + for i in 0..length { + if !bitmap.is_set(i + offset) { + bit_util::unset_bit(null_slice, length_so_far as usize + i); + } + } + } + length_so_far += length as i32; + values.extend_from_slice( + &shared_buffer[offset * data_type_size..(offset + length) * data_type_size], + ); + } + offsets.push(length_so_far); + } + + debug!( + "build_child_data offsets:{:?}, values:{:?}", + offsets.as_slice(), + values.as_slice() + ); + + let values_array_data = ArrayData::builder(self.datum_kind.to_arrow_data_type()) + .len(values_num) + .add_buffer(values.into()) + .null_bit_buffer(Some(null_buffer.into())) + .build() + .map_err(|e| Box::new(e) as _) + .context(EncodeRecordBatch)?; + + Ok(values_array_data) + } + + /// This function is a translation of [GenericListArray.from_iter_primitive](https://docs.rs/arrow/20.0.0/src/arrow/array/array_list.rs.html#151) + fn build(self) -> Result { + let array_len = self.list_of_arrays.len(); + let mut offsets = + MutableBuffer::new(self.list_of_arrays.len() * std::mem::size_of::()); + let child_data = self.build_child_data(&mut offsets)?; + let field = Box::new(Field::new( + LIST_ITEM_NAME, + self.datum_kind.to_arrow_data_type(), + true, + )); + let array_data = ArrayData::builder(DataType::List(field)) + .len(array_len) + .add_buffer(offsets.into()) + .add_child_data(child_data); + + // TODO: change to unsafe version? + // https://docs.rs/arrow/20.0.0/src/arrow/array/array_list.rs.html#192 + // let array_data = unsafe { array_data.build_unchecked() }; + let array_data = array_data + .build() + .map_err(|e| Box::new(e) as _) + .context(EncodeRecordBatch)?; + + Ok(ListArray::from(array_data)) + } +} + +/// Builds hybrid record by concat timestamp and non key columns into +/// `ListArray` +fn build_hybrid_record( + arrow_schema: ArrowSchemaRef, + tsid_type: &IndexedType, + non_collapsible_col_types: &[IndexedType], + collapsible_col_types: &[IndexedType], + batch_by_tsid: BTreeMap, +) -> Result { + let tsid_array = UInt64Array::from_iter_values(batch_by_tsid.keys().cloned()); + let mut collapsible_col_arrays = vec![Vec::new(); collapsible_col_types.len()]; + let mut non_collapsible_col_arrays = vec![Vec::new(); non_collapsible_col_types.len()]; + + for batch in batch_by_tsid.into_values() { + for (idx, arr) in batch.collapsible_col_arrays.into_iter().enumerate() { + collapsible_col_arrays[idx].push(arr); + } + for (idx, arr) in batch.non_collapsible_col_values.into_iter().enumerate() { + non_collapsible_col_arrays[idx].push(arr); + } + } + let tsid_array = IndexedArray { + idx: tsid_type.idx, + array: Arc::new(tsid_array), + }; + let non_collapsible_col_arrays = non_collapsible_col_arrays + .into_iter() + .zip(non_collapsible_col_types.iter().map(|n| n.idx)) + .map(|(c, idx)| IndexedArray { + idx, + array: Arc::new(StringArray::from(c)) as ArrayRef, + }) + .collect::>(); + let collapsible_col_arrays = collapsible_col_arrays + .into_iter() + .zip(collapsible_col_types.iter().map(|n| (n.idx, n.data_type))) + .map(|(handle, (idx, datum_type))| { + Ok(IndexedArray { + idx, + array: Arc::new(ListArrayBuilder::new(datum_type, handle).build()?), + }) + }) + .collect::>>()?; + let all_columns = [ + vec![tsid_array], + non_collapsible_col_arrays, + collapsible_col_arrays, + ] + .into_iter() + .flatten() + .map(|indexed_array| (indexed_array.idx, indexed_array.array)) + .collect::>() + .into_values() + .collect::>(); + + ArrowRecordBatch::try_new(arrow_schema, all_columns) + .map_err(|e| Box::new(e) as _) + .context(EncodeRecordBatch) +} + +/// Converts arrow record batch into hybrid record format describe in +/// `StorageFormat::Hybrid` +pub fn convert_to_hybrid_record( + tsid_type: &IndexedType, + non_collapsible_col_types: &[IndexedType], + collapsible_col_types: &[IndexedType], + hybrid_arrow_schema: ArrowSchemaRef, + arrow_record_batchs: Vec, +) -> Result { + // TODO: should keep tsid ordering here? + let mut batch_by_tsid = BTreeMap::new(); + for record_batch in arrow_record_batchs { + let tsid_array = record_batch + .column(tsid_type.idx) + .as_any() + .downcast_ref::() + .expect("checked when create table"); + + if tsid_array.is_empty() { + continue; + } + + let non_collapsible_col_values = non_collapsible_col_types + .iter() + .map(|col| { + record_batch + .column(col.idx) + .as_any() + .downcast_ref::() + .expect("checked in HybridRecordEncoder::try_new") + }) + .collect::>(); + let mut previous_tsid = tsid_array.value(0); + // duplicated_tsids is an array of every tsid's offset in origin array + // the length of each tsid can be calculated with + // tsid_n = duplicated_tsids[n+1].offset - duplicated_tsids[n].offset + let mut duplicated_tsids = vec![(previous_tsid, 0)]; // (tsid, offset) + for row_idx in 1..tsid_array.len() { + let tsid = tsid_array.value(row_idx); + if tsid != previous_tsid { + previous_tsid = tsid; + duplicated_tsids.push((tsid, row_idx)); + } + } + for i in 0..duplicated_tsids.len() { + let (tsid, offset) = duplicated_tsids[i]; + let length = if i == duplicated_tsids.len() - 1 { + tsid_array.len() - offset + } else { + duplicated_tsids[i + 1].1 - offset + }; + + let batch = batch_by_tsid.entry(tsid).or_insert_with(|| { + TsidBatch::new( + non_collapsible_col_values + .iter() + .map(|col| col.value(offset).to_string()) + .collect(), + collapsible_col_types + .iter() + .map(|col| record_batch.column(col.idx).clone()) + .collect(), + ) + }); + batch.append_slice_arg(SliceArg { offset, length }); + } + } + + build_hybrid_record( + hybrid_arrow_schema, + tsid_type, + non_collapsible_col_types, + collapsible_col_types, + batch_by_tsid, + ) +} + +#[cfg(test)] +mod tests { + use arrow_deps::arrow::{ + array::{TimestampMillisecondArray, UInt16Array}, + datatypes::{TimestampMillisecondType, UInt16Type}, + }; + + use super::*; + + impl From<(usize, usize)> for SliceArg { + fn from(offset_length: (usize, usize)) -> Self { + Self { + offset: offset_length.0, + length: offset_length.1, + } + } + } + + fn timestamp_array(start: i64, end: i64) -> ArrayRef { + Arc::new(TimestampMillisecondArray::from_iter_values(start..end)) + } + + fn uint16_array(values: Vec>) -> ArrayRef { + let arr: UInt16Array = values.into_iter().collect(); + + Arc::new(arr) + } + + #[test] + fn merge_timestamp_array_to_list() { + let list_of_arrays = vec![ + ArrayHandle::with_slice_args( + timestamp_array(1, 20), + vec![(1, 2).into(), (10, 3).into()], + ), + ArrayHandle::with_slice_args( + timestamp_array(100, 120), + vec![(1, 2).into(), (10, 3).into()], + ), + ]; + + let data = vec![ + Some(vec![Some(2), Some(3), Some(11), Some(12), Some(13)]), + Some(vec![Some(101), Some(102), Some(110), Some(111), Some(112)]), + ]; + let expected = ListArray::from_iter_primitive::(data); + let list_array = ListArrayBuilder::new(DatumKind::Timestamp, list_of_arrays) + .build() + .unwrap(); + + assert_eq!(list_array, expected); + } + + #[test] + fn merge_u16_array_with_none_to_list() { + let list_of_arrays = vec![ArrayHandle::with_slice_args( + uint16_array(vec![ + Some(1), + Some(2), + None, + Some(3), + Some(4), + Some(5), + Some(6), + ]), + vec![(1, 3).into(), (4, 1).into()], + )]; + + let data = vec![Some(vec![Some(2), None, Some(3), Some(4)])]; + let expected = ListArray::from_iter_primitive::(data); + let list_array = ListArrayBuilder::new(DatumKind::UInt16, list_of_arrays) + .build() + .unwrap(); + + assert_eq!(list_array, expected); + } +} diff --git a/analytic_engine/src/sst/parquet/mod.rs b/analytic_engine/src/sst/parquet/mod.rs index aaf82e4671..98771e812f 100644 --- a/analytic_engine/src/sst/parquet/mod.rs +++ b/analytic_engine/src/sst/parquet/mod.rs @@ -4,4 +4,5 @@ pub mod builder; pub mod encoding; +mod hybrid; pub mod reader; diff --git a/analytic_engine/src/sst/parquet/reader.rs b/analytic_engine/src/sst/parquet/reader.rs index 935b171590..7f82927801 100644 --- a/analytic_engine/src/sst/parquet/reader.rs +++ b/analytic_engine/src/sst/parquet/reader.rs @@ -3,6 +3,7 @@ //! Sst reader implementation based on parquet. use std::{ + convert::TryFrom, pin::Pin, sync::Arc, task::{Context, Poll}, @@ -22,7 +23,7 @@ use async_trait::async_trait; use common_types::{ projected_schema::{ProjectedSchema, RowProjector}, record_batch::{ArrowRecordBatchProjector, RecordBatchWithKey}, - schema::Schema, + schema::{Schema, StorageFormat}, }; use common_util::runtime::Runtime; use futures::Stream; @@ -332,13 +333,18 @@ impl ProjectAndFilterReader { .context(DecodeRecordBatch) { Ok(record_batch) => { - row_num += record_batch.num_rows(); + let arrow_schema = record_batch.schema(); + let schema = Schema::try_from(arrow_schema).context(InvalidSchema)?; + let record_batch = match schema.storage_format() { + StorageFormat::Hybrid => todo!("Will implement this in PR 207"), + StorageFormat::Columnar => record_batch, + }; + row_num += record_batch.num_rows(); let record_batch_with_key = arrow_record_batch_projector .project_to_record_batch_with_key(record_batch) .map_err(|e| Box::new(e) as _) .context(DecodeRecordBatch); - send(record_batch_with_key)?; } Err(e) => { diff --git a/analytic_engine/src/sst/reader.rs b/analytic_engine/src/sst/reader.rs index ab76c9a044..cdc9aa57e8 100644 --- a/analytic_engine/src/sst/reader.rs +++ b/analytic_engine/src/sst/reader.rs @@ -45,6 +45,9 @@ pub mod error { #[snafu(display("Sst meta data is empty.\nBacktrace:\n{}", backtrace))] EmptySstMeta { backtrace: Backtrace }, + #[snafu(display("Invalid schema, err:{}", source))] + InvalidSchema { source: common_types::schema::Error }, + #[snafu(display("Other kind of error:{}", source))] Other { source: Box, diff --git a/common_types/src/datum.rs b/common_types/src/datum.rs index dc0d8fa47b..144aae4eea 100644 --- a/common_types/src/datum.rs +++ b/common_types/src/datum.rs @@ -176,6 +176,28 @@ impl DatumKind { pub fn into_u8(self) -> u8 { self as u8 } + + /// Return None for variable-length type + pub fn size(&self) -> Option { + let size = match self { + DatumKind::Null => 0, + DatumKind::Timestamp => 8, + DatumKind::Double => 8, + DatumKind::Float => 4, + DatumKind::Varbinary => return None, + DatumKind::String => return None, + DatumKind::UInt64 => 8, + DatumKind::UInt32 => 4, + DatumKind::UInt16 => 2, + DatumKind::UInt8 => 1, + DatumKind::Int64 => 8, + DatumKind::Int32 => 4, + DatumKind::Int16 => 8, + DatumKind::Int8 => 8, + DatumKind::Boolean => 1, + }; + Some(size) + } } impl TryFrom<&SqlDataType> for DatumKind { @@ -755,6 +777,26 @@ pub mod arrow_convert { | DataType::Map(_, _) => None, } } + + pub fn to_arrow_data_type(&self) -> DataType { + match self { + DatumKind::Null => DataType::Null, + DatumKind::Timestamp => DataType::Timestamp(TimeUnit::Millisecond, None), + DatumKind::Double => DataType::Float64, + DatumKind::Float => DataType::Float32, + DatumKind::Varbinary => DataType::Binary, + DatumKind::String => DataType::Utf8, + DatumKind::UInt64 => DataType::UInt64, + DatumKind::UInt32 => DataType::UInt32, + DatumKind::UInt16 => DataType::UInt16, + DatumKind::UInt8 => DataType::UInt8, + DatumKind::Int64 => DataType::Int64, + DatumKind::Int32 => DataType::Int32, + DatumKind::Int16 => DataType::Int16, + DatumKind::Int8 => DataType::Int8, + DatumKind::Boolean => DataType::Boolean, + } + } } impl Datum { diff --git a/common_types/src/record_batch.rs b/common_types/src/record_batch.rs index 1b7ca99d98..eb8184bd6a 100644 --- a/common_types/src/record_batch.rs +++ b/common_types/src/record_batch.rs @@ -558,7 +558,7 @@ impl From for ArrowRecordBatchProjector { impl ArrowRecordBatchProjector { /// Project the [arrow::RecordBatch] to [RecordBatchWithKey] and these - /// things is to be done: + /// things are to be done: /// - Insert the null column if the projected column does not appear in the /// source schema. /// - Convert the [arrow::RecordBatch] to [RecordBatchWithKey]. diff --git a/common_types/src/schema.rs b/common_types/src/schema.rs index 4ea3792e92..5e7dfd5683 100644 --- a/common_types/src/schema.rs +++ b/common_types/src/schema.rs @@ -445,6 +445,44 @@ pub fn compare_row( Ordering::Equal } +/// StorageFormat specify how records are saved in persistent storage +pub enum StorageFormat { + /// Traditional columnar format, every column is saved in one exact one + /// column, for example: + /// + ///```plaintext + /// | Timestamp | Device ID | Status Code | Tag 1 | Tag 2 | + /// | --------- |---------- | ----------- | ----- | ----- | + /// | 12:01 | A | 0 | v1 | v1 | + /// | 12:01 | B | 0 | v2 | v2 | + /// | 12:02 | A | 0 | v1 | v1 | + /// | 12:02 | B | 1 | v2 | v2 | + /// | 12:03 | A | 0 | v1 | v1 | + /// | 12:03 | B | 0 | v2 | v2 | + /// | ..... | | | | | + /// ``` + Columnar, + + /// Design for time-series data + /// Collapsible Columns within same primary key are collapsed + /// into list, other columns are the same format with columar's. + /// + /// Wether a column is collapsible is decided by + /// `Schema::is_collapsible_column` + /// + /// Note: minTime/maxTime is optional and not implemented yet, mainly used + /// for time-range pushdown filter + /// + ///```plaintext + /// | Device ID | Timestamp | Status Code | Tag 1 | Tag 2 | minTime | maxTime | + /// |-----------|---------------------|-------------|-------|-------|---------|---------| + /// | A | [12:01,12:02,12:03] | [0,0,0] | v1 | v1 | 12:01 | 12:03 | + /// | B | [12:01,12:02,12:03] | [0,1,0] | v2 | v2 | 12:01 | 12:03 | + /// | ... | | | | | | | + /// ``` + Hybrid, +} + // TODO(yingwen): Maybe rename to TableSchema. /// Schema of a table /// @@ -595,6 +633,26 @@ impl Schema { self.timestamp_index } + /// Whether i-nth column is tag column + pub fn is_tag_column(&self, i: usize) -> bool { + self.column(i).is_tag + } + + /// Whether i-nth column can be collapsed to List describe in + /// `StorageFormat::Hybrid` + pub fn is_collapsible_column(&self, i: usize) -> bool { + if self.timestamp_index == i { + return true; + } + + if self.is_tag_column(i) { + return false; + } + + self.tsid_index + .map_or_else(|| true, |tsid_idx| tsid_idx != i) + } + /// Get the version of this schema #[inline] pub fn version(&self) -> Version { @@ -745,6 +803,21 @@ impl Schema { pub fn string_buffer_offset(&self) -> usize { self.column_schemas.string_buffer_offset } + + /// Data format in storage + pub fn storage_format(&self) -> StorageFormat { + // TODO: parse it from table options + match std::env::var("CERESDB_TABLE_FORMAT") { + Ok(format) => { + if format == "HYBRID" { + StorageFormat::Hybrid + } else { + StorageFormat::Columnar + } + } + Err(_) => StorageFormat::Columnar, + } + } } impl TryFrom for Schema {