diff --git a/analytic_engine/src/compaction/picker.rs b/analytic_engine/src/compaction/picker.rs index e309772967..7669d02204 100644 --- a/analytic_engine/src/compaction/picker.rs +++ b/analytic_engine/src/compaction/picker.rs @@ -617,15 +617,13 @@ mod tests { }, }; - fn build_sst_meta_data(time_range: TimeRange, size: u64) -> SstMetaData { + fn build_sst_meta_data(time_range: TimeRange, _size: u64) -> SstMetaData { SstMetaData { min_key: Bytes::from_static(b"100"), max_key: Bytes::from_static(b"200"), time_range, max_sequence: 200, schema: build_schema(), - size, - row_num: 2, storage_format_opts: Default::default(), bloom_filter: Default::default(), } @@ -769,6 +767,8 @@ mod tests { .map(|size| { let file_meta = FileMeta { id: 1, + size, + row_num: 0, meta: build_sst_meta_data(TimeRange::empty(), size), }; let queue = FilePurgeQueue::new(1, 1.into(), tx.clone()); diff --git a/analytic_engine/src/instance/flush_compaction.rs b/analytic_engine/src/instance/flush_compaction.rs index 5a9b8df21b..1a5a04f604 100644 --- a/analytic_engine/src/instance/flush_compaction.rs +++ b/analytic_engine/src/instance/flush_compaction.rs @@ -502,7 +502,7 @@ impl Instance { flushed_sequence = seq; sst_num += files_to_level0.len(); for add_file in &files_to_level0 { - local_metrics.observe_sst_size(add_file.file.meta.size); + local_metrics.observe_sst_size(add_file.file.size); } } } @@ -511,7 +511,7 @@ impl Instance { .dump_normal_memtable(table_data, request_id, mem) .await?; if let Some(file) = file { - let sst_size = file.meta.size; + let sst_size = file.size; files_to_level0.push(AddFile { level: 0, file }); // Set flushed sequence to max of the last_sequence of memtables. @@ -619,14 +619,12 @@ impl Instance { let sst_file_path = table_data.set_sst_file_path(file_id); // TODO: min_key max_key set in sst_builder build - let mut sst_meta = SstMetaData { + let sst_meta = SstMetaData { min_key: min_key.clone(), max_key: max_key.clone(), time_range: *time_range, max_sequence, schema: table_data.schema(), - size: 0, - row_num: 0, storage_format_opts: StorageFormatOptions::new( table_data.table_options().storage_format, ), @@ -663,10 +661,7 @@ impl Instance { path: sst_file_path.to_string(), })?; - // update sst metadata by built info. - sst_meta.row_num = sst_info.row_num as u64; - sst_meta.size = sst_info.file_size as u64; - Ok(sst_meta) + Ok((sst_info, sst_meta)) }); batch_record_senders.push(batch_record_sender); @@ -698,12 +693,15 @@ impl Instance { batch_record_senders.clear(); let ret = try_join_all(sst_handlers).await; - for (idx, sst_meta) in ret.context(RuntimeJoin)?.into_iter().enumerate() { + for (idx, info_and_meta) in ret.context(RuntimeJoin)?.into_iter().enumerate() { + let (sst_info, sst_meta) = info_and_meta?; files_to_level0.push(AddFile { level: 0, file: FileMeta { id: file_ids[idx], - meta: sst_meta?, + size: sst_info.file_size as u64, + row_num: sst_info.row_num as u64, + meta: sst_meta, }, }) } @@ -727,14 +725,12 @@ impl Instance { } }; let max_sequence = memtable_state.last_sequence(); - let mut sst_meta = SstMetaData { + let sst_meta = SstMetaData { min_key, max_key, time_range: memtable_state.time_range, max_sequence, schema: table_data.schema(), - size: 0, - row_num: 0, storage_format_opts: StorageFormatOptions::new(table_data.storage_format()), bloom_filter: Default::default(), }; @@ -774,11 +770,11 @@ impl Instance { })?; // update sst metadata by built info. - sst_meta.row_num = sst_info.row_num as u64; - sst_meta.size = sst_info.file_size as u64; Ok(Some(FileMeta { id: file_id, + row_num: sst_info.row_num as u64, + size: sst_info.file_size as u64, meta: sst_meta, })) } @@ -953,7 +949,7 @@ impl SpaceStore { row_iter::record_batch_with_key_iter_to_stream(merge_iter, &runtime) }; - let mut sst_meta = file::merge_sst_meta(&input.files, schema); + let sst_meta = file::merge_sst_meta(&input.files, schema); // Alloc file id for the merged sst. let file_id = table_data.alloc_file_id(); @@ -979,16 +975,12 @@ impl SpaceStore { path: sst_file_path.to_string(), })?; - // update sst metadata by built info. - sst_meta.row_num = sst_info.row_num as u64; - sst_meta.size = sst_info.file_size as u64; - table_data .metrics - .compaction_observe_output_sst_size(sst_meta.size); + .compaction_observe_output_sst_size(sst_info.file_size as u64); table_data .metrics - .compaction_observe_output_sst_row_num(sst_meta.row_num); + .compaction_observe_output_sst_row_num(sst_info.row_num as u64); info!( "Instance files compacted, table:{}, table_id:{}, request_id:{}, output_path:{}, input_files:{:?}, sst_meta:{:?}", @@ -1014,6 +1006,8 @@ impl SpaceStore { level: input.output_level, file: FileMeta { id: file_id, + size: sst_info.file_size as u64, + row_num: sst_info.row_num as u64, meta: sst_meta, }, }); diff --git a/analytic_engine/src/sst/file.rs b/analytic_engine/src/sst/file.rs index 37b92e0289..09cac771c3 100644 --- a/analytic_engine/src/sst/file.rs +++ b/analytic_engine/src/sst/file.rs @@ -185,7 +185,7 @@ impl FileHandle { #[inline] pub fn row_num(&self) -> u64 { - self.inner.meta.meta.row_num + self.inner.meta.row_num } #[inline] @@ -235,7 +235,7 @@ impl FileHandle { #[inline] pub fn size(&self) -> u64 { - self.inner.meta.meta.size + self.inner.meta.size } #[inline] @@ -424,6 +424,10 @@ impl FileHandleSet { pub struct FileMeta { /// Id of the sst file pub id: FileId, + /// file size in bytes + pub size: u64, + // total row number + pub row_num: u64, pub meta: SstMetaData, } @@ -508,10 +512,6 @@ pub struct SstMetaData { /// Max sequence number in the sst pub max_sequence: SequenceNumber, pub schema: Schema, - /// file size in bytes - pub size: u64, - // total row number - pub row_num: u64, pub storage_format_opts: StorageFormatOptions, pub bloom_filter: Option, } @@ -532,8 +532,6 @@ impl From for sst_pb::SstMetaData { max_sequence: src.max_sequence, time_range: Some(src.time_range.into()), schema: Some(common_pb::TableSchema::from(&src.schema)), - size: src.size, - row_num: src.row_num, storage_format_opts: Some(src.storage_format_opts.into()), bloom_filter: src.bloom_filter.map(|v| v.into()), } @@ -564,8 +562,6 @@ impl TryFrom for SstMetaData { time_range, max_sequence: src.max_sequence, schema, - size: src.size, - row_num: src.row_num, storage_format_opts, bloom_filter, }) @@ -726,9 +722,6 @@ pub fn merge_sst_meta(files: &[FileHandle], schema: Schema) -> SstMetaData { let mut time_range_start = files[0].time_range().inclusive_start(); let mut time_range_end = files[0].time_range().exclusive_end(); let mut max_sequence = files[0].max_sequence(); - // TODO(jiacai2050): what if format of different file is different? - // pick first now - let storage_format = files[0].storage_format(); if files.len() > 1 { for file in &files[1..] { @@ -746,11 +739,8 @@ pub fn merge_sst_meta(files: &[FileHandle], schema: Schema) -> SstMetaData { time_range: TimeRange::new(time_range_start, time_range_end).unwrap(), max_sequence, schema, - // we don't know file size and total row number yet - size: 0, - row_num: 0, - storage_format_opts: StorageFormatOptions::new(storage_format), - // bloom filter is rebuilt when write sst, so use default here + // we don't know those info yet + storage_format_opts: Default::default(), bloom_filter: Default::default(), } } @@ -805,8 +795,6 @@ pub mod tests { time_range: self.time_range, max_sequence: self.max_sequence, schema: self.schema.clone(), - row_num: 0, - size: 0, storage_format_opts: Default::default(), bloom_filter: Default::default(), } diff --git a/analytic_engine/src/sst/manager.rs b/analytic_engine/src/sst/manager.rs index 2d64a8fafb..59f8945e94 100644 --- a/analytic_engine/src/sst/manager.rs +++ b/analytic_engine/src/sst/manager.rs @@ -149,6 +149,8 @@ pub mod tests { 0, FileMeta { id: id as FileId, + size: 0, + row_num: 0, meta: sst_meta, }, ); diff --git a/analytic_engine/src/sst/meta_cache.rs b/analytic_engine/src/sst/meta_cache.rs index f81fdf339c..a347dd08b3 100644 --- a/analytic_engine/src/sst/meta_cache.rs +++ b/analytic_engine/src/sst/meta_cache.rs @@ -48,11 +48,7 @@ impl MetaData { /// /// After the building, a new parquet meta data will be generated which /// contains no extended custom information. - pub fn try_new( - parquet_meta_data: &ParquetMetaData, - sst_size: usize, - ignore_bloom_filter: bool, - ) -> Result { + pub fn try_new(parquet_meta_data: &ParquetMetaData, ignore_bloom_filter: bool) -> Result { let file_meta_data = parquet_meta_data.file_metadata(); let kv_metas = file_meta_data .key_value_metadata() @@ -66,11 +62,6 @@ impl MetaData { sst_meta.bloom_filter = None; } - // FIXME: After the issue fixed, let's remove the `sst_size` parameter. - // The size in sst_meta is always 0, so overwrite it here. - // Refer to https://github.com/CeresDB/ceresdb/issues/321 - sst_meta.size = sst_size as u64; - Arc::new(sst_meta) }; diff --git a/analytic_engine/src/sst/parquet/async_reader.rs b/analytic_engine/src/sst/parquet/async_reader.rs index c552c65060..6148ca4f63 100644 --- a/analytic_engine/src/sst/parquet/async_reader.rs +++ b/analytic_engine/src/sst/parquet/async_reader.rs @@ -335,8 +335,7 @@ impl<'a> Reader<'a> { let parquet_meta_data = self.load_meta_data_from_storage().await?; let ignore_bloom_filter = avoid_update_cache && empty_predicate; - let file_size = self.file_reader.file_size().await.context(DecodeSstMeta)?; - MetaData::try_new(&parquet_meta_data, file_size, ignore_bloom_filter) + MetaData::try_new(&parquet_meta_data, ignore_bloom_filter) .map_err(|e| Box::new(e) as _) .context(DecodeSstMeta)? }; diff --git a/analytic_engine/src/sst/parquet/builder.rs b/analytic_engine/src/sst/parquet/builder.rs index eda9008d9d..73039da19f 100644 --- a/analytic_engine/src/sst/parquet/builder.rs +++ b/analytic_engine/src/sst/parquet/builder.rs @@ -311,8 +311,6 @@ mod tests { time_range: TimeRange::new_unchecked(Timestamp::new(1), Timestamp::new(2)), max_sequence: 200, schema: schema.clone(), - size: 10, - row_num: 2, storage_format_opts: Default::default(), bloom_filter: Default::default(), }; @@ -368,13 +366,7 @@ mod tests { Arc::new(file_reader), &sst_reader_options, ); - let mut sst_meta_readback = { - // FIXME: size of SstMetaData is not what this file's size, so overwrite it - // https://github.com/CeresDB/ceresdb/issues/321 - let mut meta = reader.meta_data().await.unwrap().clone(); - meta.size = sst_meta.size; - meta - }; + let mut sst_meta_readback = { reader.meta_data().await.unwrap().clone() }; // bloom filter is built insider sst writer, so overwrite to default for // comparsion sst_meta_readback.bloom_filter = Default::default(); @@ -461,8 +453,6 @@ mod tests { time_range: Default::default(), max_sequence: 1, schema, - size: 0, - row_num: 0, storage_format_opts: Default::default(), bloom_filter: Default::default(), }, diff --git a/analytic_engine/src/sst/parquet/encoding.rs b/analytic_engine/src/sst/parquet/encoding.rs index 1db6f7c5b5..c836b1f9bf 100644 --- a/analytic_engine/src/sst/parquet/encoding.rs +++ b/analytic_engine/src/sst/parquet/encoding.rs @@ -871,8 +871,6 @@ mod tests { time_range: TimeRange::new_unchecked(Timestamp::new(100), Timestamp::new(101)), max_sequence: 200, schema: schema.clone(), - size: 10, - row_num: 4, storage_format_opts, bloom_filter: Default::default(), }; @@ -1006,8 +1004,6 @@ mod tests { time_range: TimeRange::new_unchecked(Timestamp::new(100), Timestamp::new(101)), max_sequence: 200, schema: schema.clone(), - size: 10, - row_num: 4, storage_format_opts, bloom_filter: Default::default(), }; diff --git a/analytic_engine/src/table/version_edit.rs b/analytic_engine/src/table/version_edit.rs index c0a10071aa..e5ee836f25 100644 --- a/analytic_engine/src/table/version_edit.rs +++ b/analytic_engine/src/table/version_edit.rs @@ -62,8 +62,8 @@ impl From for meta_pb::AddFileMeta { time_range: Some(v.file.meta.time_range.into()), max_seq: v.file.meta.max_sequence, schema: Some(common_pb::TableSchema::from(&v.file.meta.schema)), - size: v.file.meta.size, - row_num: v.file.meta.row_num, + size: v.file.size, + row_num: v.file.row_num, storage_format: analytic_common_pb::StorageFormat::from(v.file.meta.storage_format()) as i32, } @@ -91,14 +91,14 @@ impl TryFrom for AddFile { .context(InvalidLevel { level: src.level })?, file: FileMeta { id: src.file_id, + size: src.size, + row_num: src.row_num, meta: SstMetaData { min_key: Bytes::from(src.min_key), max_key: Bytes::from(src.max_key), time_range, max_sequence: src.max_seq, schema, - size: src.size, - row_num: src.row_num, storage_format_opts: StorageFormatOptions::new(storage_format.into()), bloom_filter: Default::default(), }, @@ -186,6 +186,8 @@ pub mod tests { level: 0, file: FileMeta { id: self.file_id, + size: 0, + row_num: 0, meta: self.sst_meta.clone(), }, } diff --git a/benchmarks/src/util.rs b/benchmarks/src/util.rs index d2071a66c6..9c9256cd81 100644 --- a/benchmarks/src/util.rs +++ b/benchmarks/src/util.rs @@ -150,6 +150,8 @@ pub async fn file_handles_from_ssts( let sst_meta = meta_from_sst(store, &path, meta_cache).await; let file_meta = FileMeta { id: *file_id, + size: 0, + row_num: 0, meta: sst_meta, }; diff --git a/proto/protos/sst.proto b/proto/protos/sst.proto index 32b136e447..6fb5305334 100644 --- a/proto/protos/sst.proto +++ b/proto/protos/sst.proto @@ -25,8 +25,6 @@ message SstMetaData { // The time range of the sst common.TimeRange time_range = 4; common.TableSchema schema = 5; - uint64 size = 6; - uint64 row_num = 7; - analytic_common.StorageFormatOptions storage_format_opts = 8; - SstBloomFilter bloom_filter = 9; + analytic_common.StorageFormatOptions storage_format_opts = 6; + SstBloomFilter bloom_filter = 7; }