Skip to content

Commit

Permalink
feat: add new table_option storage format (#218)
Browse files Browse the repository at this point in the history
* add new table_option: storage format

* fix clippy

* add integration tests

* add storage format in table schema proto

* move storage format to table options module

* fix clippy

* fix CR comments
  • Loading branch information
jiacai2050 authored Aug 26, 2022
1 parent 10ed85d commit 02a7e31
Show file tree
Hide file tree
Showing 16 changed files with 295 additions and 168 deletions.
2 changes: 2 additions & 0 deletions analytic_engine/src/compaction/picker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -594,6 +594,7 @@ mod tests {
file::SstMetaData,
manager::{tests::LevelsControllerMockBuilder, LevelsController},
},
table_options::StorageFormat,
};

fn build_sst_meta_data(time_range: TimeRange, size: u64) -> SstMetaData {
Expand All @@ -605,6 +606,7 @@ mod tests {
schema: build_schema(),
size,
row_num: 2,
storage_format: StorageFormat::default(),
}
}

Expand Down
2 changes: 2 additions & 0 deletions analytic_engine/src/instance/flush_compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -607,6 +607,7 @@ impl Instance {
schema: table_data.schema(),
size: 0,
row_num: 0,
storage_format: table_data.table_options().storage_format,
};

let store = self.space_store.clone();
Expand Down Expand Up @@ -711,6 +712,7 @@ impl Instance {
schema: table_data.schema(),
size: 0,
row_num: 0,
storage_format: table_data.storage_format(),
};

// Alloc file id for next sst file
Expand Down
15 changes: 14 additions & 1 deletion analytic_engine/src/sst/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use tokio::sync::{
Mutex,
};

use crate::{space::SpaceId, sst::manager::FileId, table::sst_util};
use crate::{space::SpaceId, sst::manager::FileId, table::sst_util, table_options::StorageFormat};

/// Error of sst file.
#[derive(Debug, Snafu)]
Expand Down Expand Up @@ -220,6 +220,11 @@ impl FileHandle {
pub fn set_being_compacted(&self, value: bool) {
self.inner.being_compacted.store(value, Ordering::Relaxed);
}

#[inline]
pub fn storage_format(&self) -> StorageFormat {
self.inner.meta.meta.storage_format
}
}

impl fmt::Debug for FileHandle {
Expand Down Expand Up @@ -420,6 +425,7 @@ pub struct SstMetaData {
pub size: u64,
// total row number
pub row_num: u64,
pub storage_format: StorageFormat,
}

impl From<SstMetaData> for SstMetaDataPb {
Expand All @@ -433,6 +439,7 @@ impl From<SstMetaData> for SstMetaDataPb {
target.set_schema(src.schema.into());
target.set_size(src.size);
target.set_row_num(src.row_num);
target.set_storage_format(src.storage_format.into());

target
}
Expand All @@ -452,6 +459,7 @@ impl TryFrom<SstMetaDataPb> for SstMetaData {
schema,
size: src.size,
row_num: src.row_num,
storage_format: src.storage_format.into(),
})
}
}
Expand Down Expand Up @@ -610,6 +618,9 @@ pub fn merge_sst_meta(files: &[FileHandle], schema: Schema) -> SstMetaData {
let mut time_range_start = files[0].time_range().inclusive_start();
let mut time_range_end = files[0].time_range().exclusive_end();
let mut max_sequence = files[0].max_sequence();
// TODO(jiacai2050): what if format of different file is different?
// pick first now
let storage_format = files[0].storage_format();

if files.len() > 1 {
for file in &files[1..] {
Expand All @@ -630,6 +641,7 @@ pub fn merge_sst_meta(files: &[FileHandle], schema: Schema) -> SstMetaData {
// we don't know file size and total row number yet
size: 0,
row_num: 0,
storage_format,
}
}

Expand Down Expand Up @@ -685,6 +697,7 @@ pub mod tests {
schema: self.schema.clone(),
size: 0,
row_num: 0,
storage_format: StorageFormat::default(),
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion analytic_engine/src/sst/parquet/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ mod tests {
parquet::reader::ParquetSstReader,
reader::{tests::check_stream, SstReader},
},
table_options,
table_options::{self, StorageFormat},
};

// TODO(xikai): add test for reverse reader
Expand Down Expand Up @@ -217,6 +217,7 @@ mod tests {
schema: schema.clone(),
size: 10,
row_num: 2,
storage_format: StorageFormat::default(),
};

let mut counter = 10;
Expand Down
36 changes: 11 additions & 25 deletions analytic_engine/src/sst/parquet/encoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,20 @@ use arrow_deps::{
use common_types::{
bytes::{BytesMut, MemBufMut, Writer},
datum::DatumKind,
schema::{
ArrowSchema, ArrowSchemaMeta, ArrowSchemaRef, DataType, Field, Schema, StorageFormat,
},
schema::{ArrowSchema, ArrowSchemaRef, DataType, Field, Schema},
};
use common_util::define_result;
use log::trace;
use proto::sst::SstMetaData as SstMetaDataPb;
use protobuf::Message;
use snafu::{ensure, Backtrace, OptionExt, ResultExt, Snafu};

use crate::sst::{
file::SstMetaData,
parquet::hybrid::{self, IndexedType},
use crate::{
sst::{
file::SstMetaData,
parquet::hybrid::{self, IndexedType},
},
table_options::StorageFormat,
};

// TODO: Only support i32 offset now, consider i64 here?
Expand Down Expand Up @@ -450,14 +451,8 @@ impl ParquetEncoder {
.set_max_row_group_size(num_rows_per_row_group)
.set_compression(compression)
.build();
let mut format = meta_data.schema.storage_format();

// TODO: remove this overwrite when we can set format via table options
if matches!(format, StorageFormat::Hybrid) && meta_data.schema.index_of_tsid().is_none() {
format = StorageFormat::Columnar;
}

let record_encoder: Box<dyn RecordEncoder + Send> = match format {
let record_encoder: Box<dyn RecordEncoder + Send> = match meta_data.storage_format {
StorageFormat::Hybrid => Box::new(HybridRecordEncoder::try_new(
write_props,
&meta_data.schema,
Expand Down Expand Up @@ -723,22 +718,13 @@ pub struct ParquetDecoder {
}

impl ParquetDecoder {
pub fn try_new(arrow_schema: ArrowSchemaRef) -> Result<Self> {
let arrow_schema_meta = ArrowSchemaMeta::try_from(arrow_schema.metadata())
.map_err(|e| Box::new(e) as _)
.context(DecodeRecordBatch)?;
let mut format = arrow_schema_meta.storage_format();
// TODO: remove this overwrite when we can set format via table options
if matches!(format, StorageFormat::Hybrid) && !arrow_schema_meta.enable_tsid_primary_key() {
format = StorageFormat::Columnar;
}

let record_decoder: Box<dyn RecordDecoder> = match format {
pub fn new(storage_format: StorageFormat) -> Self {
let record_decoder: Box<dyn RecordDecoder> = match storage_format {
StorageFormat::Hybrid => Box::new(HybridRecordDecoder {}),
StorageFormat::Columnar => Box::new(ColumnarRecordDecoder {}),
};

Ok(Self { record_decoder })
Self { record_decoder }
}

pub fn decode_record_batch(
Expand Down
24 changes: 13 additions & 11 deletions analytic_engine/src/sst/parquet/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,14 @@ use snafu::{ensure, OptionExt, ResultExt};
use table_engine::predicate::PredicateRef;
use tokio::sync::mpsc::{self, Receiver, Sender};

use crate::sst::{
factory::SstReaderOptions,
file::SstMetaData,
parquet::encoding::{self, ParquetDecoder},
reader::{error::*, SstReader},
use crate::{
sst::{
factory::SstReaderOptions,
file::SstMetaData,
parquet::encoding::{self, ParquetDecoder},
reader::{error::*, SstReader},
},
table_options::StorageFormat,
};

const DEFAULT_CHANNEL_CAP: usize = 1000;
Expand Down Expand Up @@ -164,9 +167,9 @@ impl<'a> ParquetSstReader<'a> {

let file_reader = self.file_reader.take().unwrap();
let batch_size = self.batch_size;
let schema = {
let (schema, storage_format) = {
let meta_data = self.meta_data.as_ref().unwrap();
meta_data.schema.clone()
(meta_data.schema.clone(), meta_data.storage_format)
};
let projected_schema = self.projected_schema.clone();
let row_projector = projected_schema
Expand Down Expand Up @@ -202,6 +205,7 @@ impl<'a> ParquetSstReader<'a> {
predicate,
batch_size,
reverse,
storage_format,
};

let start_fetch = Instant::now();
Expand Down Expand Up @@ -248,6 +252,7 @@ struct ProjectAndFilterReader {
predicate: PredicateRef,
batch_size: usize,
reverse: bool,
storage_format: StorageFormat,
}

impl ProjectAndFilterReader {
Expand Down Expand Up @@ -319,10 +324,7 @@ impl ProjectAndFilterReader {
let reader = self.project_and_filter_reader()?;

let arrow_record_batch_projector = ArrowRecordBatchProjector::from(self.row_projector);
let arrow_schema = self.projected_schema.to_projected_arrow_schema();
let parquet_decoder = ParquetDecoder::try_new(arrow_schema)
.map_err(|e| Box::new(e) as _)
.context(DecodeRecordBatch)?;
let parquet_decoder = ParquetDecoder::new(self.storage_format);
let mut row_num = 0;
for record_batch in reader {
trace!(
Expand Down
5 changes: 5 additions & 0 deletions analytic_engine/src/table/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use crate::{
sst_util,
version::{MemTableForWrite, MemTableState, SamplingMemTable, TableVersion},
},
table_options::StorageFormat,
TableOptions,
};

Expand Down Expand Up @@ -466,6 +467,10 @@ impl TableData {
pub fn is_expired(&self, timestamp: Timestamp) -> bool {
self.table_options().is_expired(timestamp)
}

pub fn storage_format(&self) -> StorageFormat {
self.table_options().storage_format
}
}

/// Table data reference
Expand Down
1 change: 1 addition & 0 deletions analytic_engine/src/table/version_edit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ impl TryFrom<meta_pb::AddFileMeta> for AddFile {
schema,
size: src.size,
row_num: src.row_num,
storage_format: src.storage_format.into(),
},
},
})
Expand Down
Loading

0 comments on commit 02a7e31

Please sign in to comment.