Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add new table_option storage format #218

Merged
merged 8 commits into from
Aug 26, 2022
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions analytic_engine/src/compaction/picker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -594,6 +594,7 @@ mod tests {
file::SstMetaData,
manager::{tests::LevelsControllerMockBuilder, LevelsController},
},
table_options::StorageFormat,
};

fn build_sst_meta_data(time_range: TimeRange, size: u64) -> SstMetaData {
Expand All @@ -605,6 +606,7 @@ mod tests {
schema: build_schema(),
size,
row_num: 2,
storage_format: StorageFormat::default(),
}
}

Expand Down
2 changes: 2 additions & 0 deletions analytic_engine/src/instance/flush_compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -607,6 +607,7 @@ impl Instance {
schema: table_data.schema(),
size: 0,
row_num: 0,
storage_format: table_data.table_options().storage_format,
};

let store = self.space_store.clone();
Expand Down Expand Up @@ -711,6 +712,7 @@ impl Instance {
schema: table_data.schema(),
size: 0,
row_num: 0,
storage_format: table_data.storage_format(),
};

// Alloc file id for next sst file
Expand Down
15 changes: 14 additions & 1 deletion analytic_engine/src/sst/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use tokio::sync::{
Mutex,
};

use crate::{space::SpaceId, sst::manager::FileId, table::sst_util};
use crate::{space::SpaceId, sst::manager::FileId, table::sst_util, table_options::StorageFormat};

/// Error of sst file.
#[derive(Debug, Snafu)]
Expand Down Expand Up @@ -220,6 +220,11 @@ impl FileHandle {
pub fn set_being_compacted(&self, value: bool) {
self.inner.being_compacted.store(value, Ordering::Relaxed);
}

#[inline]
pub fn storage_format(&self) -> StorageFormat {
self.inner.meta.meta.storage_format
}
}

impl fmt::Debug for FileHandle {
Expand Down Expand Up @@ -420,6 +425,7 @@ pub struct SstMetaData {
pub size: u64,
// total row number
pub row_num: u64,
pub storage_format: StorageFormat,
}

impl From<SstMetaData> for SstMetaDataPb {
Expand All @@ -433,6 +439,7 @@ impl From<SstMetaData> for SstMetaDataPb {
target.set_schema(src.schema.into());
target.set_size(src.size);
target.set_row_num(src.row_num);
target.set_storage_format(src.storage_format.into());

target
}
Expand All @@ -452,6 +459,7 @@ impl TryFrom<SstMetaDataPb> for SstMetaData {
schema,
size: src.size,
row_num: src.row_num,
storage_format: src.storage_format.into(),
})
}
}
Expand Down Expand Up @@ -610,6 +618,9 @@ pub fn merge_sst_meta(files: &[FileHandle], schema: Schema) -> SstMetaData {
let mut time_range_start = files[0].time_range().inclusive_start();
let mut time_range_end = files[0].time_range().exclusive_end();
let mut max_sequence = files[0].max_sequence();
// TODO: what if format of different file is different?
jiacai2050 marked this conversation as resolved.
Show resolved Hide resolved
// pick first now
let storage_format = files[0].storage_format();

if files.len() > 1 {
for file in &files[1..] {
Expand All @@ -630,6 +641,7 @@ pub fn merge_sst_meta(files: &[FileHandle], schema: Schema) -> SstMetaData {
// we don't know file size and total row number yet
size: 0,
row_num: 0,
storage_format,
}
}

Expand Down Expand Up @@ -685,6 +697,7 @@ pub mod tests {
schema: self.schema.clone(),
size: 0,
row_num: 0,
storage_format: StorageFormat::default(),
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion analytic_engine/src/sst/parquet/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ mod tests {
parquet::reader::ParquetSstReader,
reader::{tests::check_stream, SstReader},
},
table_options,
table_options::{self, StorageFormat},
};

// TODO(xikai): add test for reverse reader
Expand Down Expand Up @@ -217,6 +217,7 @@ mod tests {
schema: schema.clone(),
size: 10,
row_num: 2,
storage_format: StorageFormat::default(),
};

let mut counter = 10;
Expand Down
36 changes: 11 additions & 25 deletions analytic_engine/src/sst/parquet/encoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,20 @@ use arrow_deps::{
use common_types::{
bytes::{BytesMut, MemBufMut, Writer},
datum::DatumKind,
schema::{
ArrowSchema, ArrowSchemaMeta, ArrowSchemaRef, DataType, Field, Schema, StorageFormat,
},
schema::{ArrowSchema, ArrowSchemaRef, DataType, Field, Schema},
};
use common_util::define_result;
use log::trace;
use proto::sst::SstMetaData as SstMetaDataPb;
use protobuf::Message;
use snafu::{ensure, Backtrace, OptionExt, ResultExt, Snafu};

use crate::sst::{
file::SstMetaData,
parquet::hybrid::{self, IndexedType},
use crate::{
sst::{
file::SstMetaData,
parquet::hybrid::{self, IndexedType},
},
table_options::StorageFormat,
};

// TODO: Only support i32 offset now, consider i64 here?
Expand Down Expand Up @@ -450,14 +451,8 @@ impl ParquetEncoder {
.set_max_row_group_size(num_rows_per_row_group)
.set_compression(compression)
.build();
let mut format = meta_data.schema.storage_format();

// TODO: remove this overwrite when we can set format via table options
if matches!(format, StorageFormat::Hybrid) && meta_data.schema.index_of_tsid().is_none() {
format = StorageFormat::Columnar;
}

let record_encoder: Box<dyn RecordEncoder + Send> = match format {
let record_encoder: Box<dyn RecordEncoder + Send> = match meta_data.storage_format {
StorageFormat::Hybrid => Box::new(HybridRecordEncoder::try_new(
write_props,
&meta_data.schema,
Expand Down Expand Up @@ -723,22 +718,13 @@ pub struct ParquetDecoder {
}

impl ParquetDecoder {
pub fn try_new(arrow_schema: ArrowSchemaRef) -> Result<Self> {
let arrow_schema_meta = ArrowSchemaMeta::try_from(arrow_schema.metadata())
.map_err(|e| Box::new(e) as _)
.context(DecodeRecordBatch)?;
let mut format = arrow_schema_meta.storage_format();
// TODO: remove this overwrite when we can set format via table options
if matches!(format, StorageFormat::Hybrid) && !arrow_schema_meta.enable_tsid_primary_key() {
format = StorageFormat::Columnar;
}

let record_decoder: Box<dyn RecordDecoder> = match format {
pub fn new(storage_format: StorageFormat) -> Self {
let record_decoder: Box<dyn RecordDecoder> = match storage_format {
StorageFormat::Hybrid => Box::new(HybridRecordDecoder {}),
StorageFormat::Columnar => Box::new(ColumnarRecordDecoder {}),
};

Ok(Self { record_decoder })
Self { record_decoder }
}

pub fn decode_record_batch(
Expand Down
24 changes: 13 additions & 11 deletions analytic_engine/src/sst/parquet/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,14 @@ use snafu::{ensure, OptionExt, ResultExt};
use table_engine::predicate::PredicateRef;
use tokio::sync::mpsc::{self, Receiver, Sender};

use crate::sst::{
factory::SstReaderOptions,
file::SstMetaData,
parquet::encoding::{self, ParquetDecoder},
reader::{error::*, SstReader},
use crate::{
sst::{
factory::SstReaderOptions,
file::SstMetaData,
parquet::encoding::{self, ParquetDecoder},
reader::{error::*, SstReader},
},
table_options::StorageFormat,
};

const DEFAULT_CHANNEL_CAP: usize = 1000;
Expand Down Expand Up @@ -164,9 +167,9 @@ impl<'a> ParquetSstReader<'a> {

let file_reader = self.file_reader.take().unwrap();
let batch_size = self.batch_size;
let schema = {
let (schema, storage_format) = {
let meta_data = self.meta_data.as_ref().unwrap();
meta_data.schema.clone()
(meta_data.schema.clone(), meta_data.storage_format)
};
let projected_schema = self.projected_schema.clone();
let row_projector = projected_schema
Expand Down Expand Up @@ -202,6 +205,7 @@ impl<'a> ParquetSstReader<'a> {
predicate,
batch_size,
reverse,
storage_format,
};

let start_fetch = Instant::now();
Expand Down Expand Up @@ -248,6 +252,7 @@ struct ProjectAndFilterReader {
predicate: PredicateRef,
batch_size: usize,
reverse: bool,
storage_format: StorageFormat,
}

impl ProjectAndFilterReader {
Expand Down Expand Up @@ -319,10 +324,7 @@ impl ProjectAndFilterReader {
let reader = self.project_and_filter_reader()?;

let arrow_record_batch_projector = ArrowRecordBatchProjector::from(self.row_projector);
let arrow_schema = self.projected_schema.to_projected_arrow_schema();
let parquet_decoder = ParquetDecoder::try_new(arrow_schema)
.map_err(|e| Box::new(e) as _)
.context(DecodeRecordBatch)?;
let parquet_decoder = ParquetDecoder::new(self.storage_format);
let mut row_num = 0;
for record_batch in reader {
trace!(
Expand Down
5 changes: 5 additions & 0 deletions analytic_engine/src/table/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use crate::{
sst_util,
version::{MemTableForWrite, MemTableState, SamplingMemTable, TableVersion},
},
table_options::StorageFormat,
TableOptions,
};

Expand Down Expand Up @@ -466,6 +467,10 @@ impl TableData {
pub fn is_expired(&self, timestamp: Timestamp) -> bool {
self.table_options().is_expired(timestamp)
}

pub fn storage_format(&self) -> StorageFormat {
self.table_options().storage_format
}
}

/// Table data reference
Expand Down
1 change: 1 addition & 0 deletions analytic_engine/src/table/version_edit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ impl TryFrom<meta_pb::AddFileMeta> for AddFile {
schema,
size: src.size,
row_num: src.row_num,
storage_format: src.storage_format.into(),
},
},
})
Expand Down
Loading