Skip to content

Commit

Permalink
remove size from sst metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
jiacai2050 committed Dec 27, 2022
1 parent eb6af7a commit 30a1d7b
Show file tree
Hide file tree
Showing 11 changed files with 43 additions and 81 deletions.
6 changes: 3 additions & 3 deletions analytic_engine/src/compaction/picker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -617,15 +617,13 @@ mod tests {
},
};

fn build_sst_meta_data(time_range: TimeRange, size: u64) -> SstMetaData {
fn build_sst_meta_data(time_range: TimeRange, _size: u64) -> SstMetaData {
SstMetaData {
min_key: Bytes::from_static(b"100"),
max_key: Bytes::from_static(b"200"),
time_range,
max_sequence: 200,
schema: build_schema(),
size,
row_num: 2,
storage_format_opts: Default::default(),
bloom_filter: Default::default(),
}
Expand Down Expand Up @@ -769,6 +767,8 @@ mod tests {
.map(|size| {
let file_meta = FileMeta {
id: 1,
size,
row_num: 0,
meta: build_sst_meta_data(TimeRange::empty(), size),
};
let queue = FilePurgeQueue::new(1, 1.into(), tx.clone());
Expand Down
40 changes: 17 additions & 23 deletions analytic_engine/src/instance/flush_compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,7 @@ impl Instance {
flushed_sequence = seq;
sst_num += files_to_level0.len();
for add_file in &files_to_level0 {
local_metrics.observe_sst_size(add_file.file.meta.size);
local_metrics.observe_sst_size(add_file.file.size);
}
}
}
Expand All @@ -511,7 +511,7 @@ impl Instance {
.dump_normal_memtable(table_data, request_id, mem)
.await?;
if let Some(file) = file {
let sst_size = file.meta.size;
let sst_size = file.size;
files_to_level0.push(AddFile { level: 0, file });

// Set flushed sequence to max of the last_sequence of memtables.
Expand Down Expand Up @@ -619,14 +619,12 @@ impl Instance {
let sst_file_path = table_data.set_sst_file_path(file_id);

// TODO: min_key max_key set in sst_builder build
let mut sst_meta = SstMetaData {
let sst_meta = SstMetaData {
min_key: min_key.clone(),
max_key: max_key.clone(),
time_range: *time_range,
max_sequence,
schema: table_data.schema(),
size: 0,
row_num: 0,
storage_format_opts: StorageFormatOptions::new(
table_data.table_options().storage_format,
),
Expand Down Expand Up @@ -663,10 +661,7 @@ impl Instance {
path: sst_file_path.to_string(),
})?;

// update sst metadata by built info.
sst_meta.row_num = sst_info.row_num as u64;
sst_meta.size = sst_info.file_size as u64;
Ok(sst_meta)
Ok((sst_info, sst_meta))
});

batch_record_senders.push(batch_record_sender);
Expand Down Expand Up @@ -698,12 +693,15 @@ impl Instance {
batch_record_senders.clear();

let ret = try_join_all(sst_handlers).await;
for (idx, sst_meta) in ret.context(RuntimeJoin)?.into_iter().enumerate() {
for (idx, info_and_meta) in ret.context(RuntimeJoin)?.into_iter().enumerate() {
let (sst_info, sst_meta) = info_and_meta?;
files_to_level0.push(AddFile {
level: 0,
file: FileMeta {
id: file_ids[idx],
meta: sst_meta?,
size: sst_info.file_size as u64,
row_num: sst_info.row_num as u64,
meta: sst_meta,
},
})
}
Expand All @@ -727,14 +725,12 @@ impl Instance {
}
};
let max_sequence = memtable_state.last_sequence();
let mut sst_meta = SstMetaData {
let sst_meta = SstMetaData {
min_key,
max_key,
time_range: memtable_state.time_range,
max_sequence,
schema: table_data.schema(),
size: 0,
row_num: 0,
storage_format_opts: StorageFormatOptions::new(table_data.storage_format()),
bloom_filter: Default::default(),
};
Expand Down Expand Up @@ -774,11 +770,11 @@ impl Instance {
})?;

// update sst metadata by built info.
sst_meta.row_num = sst_info.row_num as u64;
sst_meta.size = sst_info.file_size as u64;

Ok(Some(FileMeta {
id: file_id,
row_num: sst_info.row_num as u64,
size: sst_info.file_size as u64,
meta: sst_meta,
}))
}
Expand Down Expand Up @@ -953,7 +949,7 @@ impl SpaceStore {
row_iter::record_batch_with_key_iter_to_stream(merge_iter, &runtime)
};

let mut sst_meta = file::merge_sst_meta(&input.files, schema);
let sst_meta = file::merge_sst_meta(&input.files, schema);

// Alloc file id for the merged sst.
let file_id = table_data.alloc_file_id();
Expand All @@ -979,16 +975,12 @@ impl SpaceStore {
path: sst_file_path.to_string(),
})?;

// update sst metadata by built info.
sst_meta.row_num = sst_info.row_num as u64;
sst_meta.size = sst_info.file_size as u64;

table_data
.metrics
.compaction_observe_output_sst_size(sst_meta.size);
.compaction_observe_output_sst_size(sst_info.file_size as u64);
table_data
.metrics
.compaction_observe_output_sst_row_num(sst_meta.row_num);
.compaction_observe_output_sst_row_num(sst_info.row_num as u64);

info!(
"Instance files compacted, table:{}, table_id:{}, request_id:{}, output_path:{}, input_files:{:?}, sst_meta:{:?}",
Expand All @@ -1014,6 +1006,8 @@ impl SpaceStore {
level: input.output_level,
file: FileMeta {
id: file_id,
size: sst_info.file_size as u64,
row_num: sst_info.row_num as u64,
meta: sst_meta,
},
});
Expand Down
28 changes: 8 additions & 20 deletions analytic_engine/src/sst/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ impl FileHandle {

#[inline]
pub fn row_num(&self) -> u64 {
self.inner.meta.meta.row_num
self.inner.meta.row_num
}

#[inline]
Expand Down Expand Up @@ -235,7 +235,7 @@ impl FileHandle {

#[inline]
pub fn size(&self) -> u64 {
self.inner.meta.meta.size
self.inner.meta.size
}

#[inline]
Expand Down Expand Up @@ -424,6 +424,10 @@ impl FileHandleSet {
pub struct FileMeta {
/// Id of the sst file
pub id: FileId,
/// file size in bytes
pub size: u64,
// total row number
pub row_num: u64,
pub meta: SstMetaData,
}

Expand Down Expand Up @@ -508,10 +512,6 @@ pub struct SstMetaData {
/// Max sequence number in the sst
pub max_sequence: SequenceNumber,
pub schema: Schema,
/// file size in bytes
pub size: u64,
// total row number
pub row_num: u64,
pub storage_format_opts: StorageFormatOptions,
pub bloom_filter: Option<BloomFilter>,
}
Expand All @@ -532,8 +532,6 @@ impl From<SstMetaData> for sst_pb::SstMetaData {
max_sequence: src.max_sequence,
time_range: Some(src.time_range.into()),
schema: Some(common_pb::TableSchema::from(&src.schema)),
size: src.size,
row_num: src.row_num,
storage_format_opts: Some(src.storage_format_opts.into()),
bloom_filter: src.bloom_filter.map(|v| v.into()),
}
Expand Down Expand Up @@ -564,8 +562,6 @@ impl TryFrom<sst_pb::SstMetaData> for SstMetaData {
time_range,
max_sequence: src.max_sequence,
schema,
size: src.size,
row_num: src.row_num,
storage_format_opts,
bloom_filter,
})
Expand Down Expand Up @@ -726,9 +722,6 @@ pub fn merge_sst_meta(files: &[FileHandle], schema: Schema) -> SstMetaData {
let mut time_range_start = files[0].time_range().inclusive_start();
let mut time_range_end = files[0].time_range().exclusive_end();
let mut max_sequence = files[0].max_sequence();
// TODO(jiacai2050): what if format of different file is different?
// pick first now
let storage_format = files[0].storage_format();

if files.len() > 1 {
for file in &files[1..] {
Expand All @@ -746,11 +739,8 @@ pub fn merge_sst_meta(files: &[FileHandle], schema: Schema) -> SstMetaData {
time_range: TimeRange::new(time_range_start, time_range_end).unwrap(),
max_sequence,
schema,
// we don't know file size and total row number yet
size: 0,
row_num: 0,
storage_format_opts: StorageFormatOptions::new(storage_format),
// bloom filter is rebuilt when write sst, so use default here
// we don't know those info yet
storage_format_opts: Default::default(),
bloom_filter: Default::default(),
}
}
Expand Down Expand Up @@ -805,8 +795,6 @@ pub mod tests {
time_range: self.time_range,
max_sequence: self.max_sequence,
schema: self.schema.clone(),
row_num: 0,
size: 0,
storage_format_opts: Default::default(),
bloom_filter: Default::default(),
}
Expand Down
2 changes: 2 additions & 0 deletions analytic_engine/src/sst/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,8 @@ pub mod tests {
0,
FileMeta {
id: id as FileId,
size: 0,
row_num: 0,
meta: sst_meta,
},
);
Expand Down
11 changes: 1 addition & 10 deletions analytic_engine/src/sst/meta_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,7 @@ impl MetaData {
///
/// After the building, a new parquet meta data will be generated which
/// contains no extended custom information.
pub fn try_new(
parquet_meta_data: &ParquetMetaData,
sst_size: usize,
ignore_bloom_filter: bool,
) -> Result<Self> {
pub fn try_new(parquet_meta_data: &ParquetMetaData, ignore_bloom_filter: bool) -> Result<Self> {
let file_meta_data = parquet_meta_data.file_metadata();
let kv_metas = file_meta_data
.key_value_metadata()
Expand All @@ -66,11 +62,6 @@ impl MetaData {
sst_meta.bloom_filter = None;
}

// FIXME: After the issue fixed, let's remove the `sst_size` parameter.
// The size in sst_meta is always 0, so overwrite it here.
// Refer to https://github.com/CeresDB/ceresdb/issues/321
sst_meta.size = sst_size as u64;

Arc::new(sst_meta)
};

Expand Down
3 changes: 1 addition & 2 deletions analytic_engine/src/sst/parquet/async_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,8 +335,7 @@ impl<'a> Reader<'a> {
let parquet_meta_data = self.load_meta_data_from_storage().await?;

let ignore_bloom_filter = avoid_update_cache && empty_predicate;
let file_size = self.file_reader.file_size().await.context(DecodeSstMeta)?;
MetaData::try_new(&parquet_meta_data, file_size, ignore_bloom_filter)
MetaData::try_new(&parquet_meta_data, ignore_bloom_filter)
.map_err(|e| Box::new(e) as _)
.context(DecodeSstMeta)?
};
Expand Down
12 changes: 1 addition & 11 deletions analytic_engine/src/sst/parquet/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,8 +311,6 @@ mod tests {
time_range: TimeRange::new_unchecked(Timestamp::new(1), Timestamp::new(2)),
max_sequence: 200,
schema: schema.clone(),
size: 10,
row_num: 2,
storage_format_opts: Default::default(),
bloom_filter: Default::default(),
};
Expand Down Expand Up @@ -368,13 +366,7 @@ mod tests {
Arc::new(file_reader),
&sst_reader_options,
);
let mut sst_meta_readback = {
// FIXME: size of SstMetaData is not what this file's size, so overwrite it
// https://github.com/CeresDB/ceresdb/issues/321
let mut meta = reader.meta_data().await.unwrap().clone();
meta.size = sst_meta.size;
meta
};
let mut sst_meta_readback = { reader.meta_data().await.unwrap().clone() };
// bloom filter is built insider sst writer, so overwrite to default for
// comparsion
sst_meta_readback.bloom_filter = Default::default();
Expand Down Expand Up @@ -461,8 +453,6 @@ mod tests {
time_range: Default::default(),
max_sequence: 1,
schema,
size: 0,
row_num: 0,
storage_format_opts: Default::default(),
bloom_filter: Default::default(),
},
Expand Down
4 changes: 0 additions & 4 deletions analytic_engine/src/sst/parquet/encoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -871,8 +871,6 @@ mod tests {
time_range: TimeRange::new_unchecked(Timestamp::new(100), Timestamp::new(101)),
max_sequence: 200,
schema: schema.clone(),
size: 10,
row_num: 4,
storage_format_opts,
bloom_filter: Default::default(),
};
Expand Down Expand Up @@ -1006,8 +1004,6 @@ mod tests {
time_range: TimeRange::new_unchecked(Timestamp::new(100), Timestamp::new(101)),
max_sequence: 200,
schema: schema.clone(),
size: 10,
row_num: 4,
storage_format_opts,
bloom_filter: Default::default(),
};
Expand Down
10 changes: 6 additions & 4 deletions analytic_engine/src/table/version_edit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ impl From<AddFile> for meta_pb::AddFileMeta {
time_range: Some(v.file.meta.time_range.into()),
max_seq: v.file.meta.max_sequence,
schema: Some(common_pb::TableSchema::from(&v.file.meta.schema)),
size: v.file.meta.size,
row_num: v.file.meta.row_num,
size: v.file.size,
row_num: v.file.row_num,
storage_format: analytic_common_pb::StorageFormat::from(v.file.meta.storage_format())
as i32,
}
Expand Down Expand Up @@ -91,14 +91,14 @@ impl TryFrom<meta_pb::AddFileMeta> for AddFile {
.context(InvalidLevel { level: src.level })?,
file: FileMeta {
id: src.file_id,
size: src.size,
row_num: src.row_num,
meta: SstMetaData {
min_key: Bytes::from(src.min_key),
max_key: Bytes::from(src.max_key),
time_range,
max_sequence: src.max_seq,
schema,
size: src.size,
row_num: src.row_num,
storage_format_opts: StorageFormatOptions::new(storage_format.into()),
bloom_filter: Default::default(),
},
Expand Down Expand Up @@ -186,6 +186,8 @@ pub mod tests {
level: 0,
file: FileMeta {
id: self.file_id,
size: 0,
row_num: 0,
meta: self.sst_meta.clone(),
},
}
Expand Down
2 changes: 2 additions & 0 deletions benchmarks/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,8 @@ pub async fn file_handles_from_ssts(
let sst_meta = meta_from_sst(store, &path, meta_cache).await;
let file_meta = FileMeta {
id: *file_id,
size: 0,
row_num: 0,
meta: sst_meta,
};

Expand Down
6 changes: 2 additions & 4 deletions proto/protos/sst.proto
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ message SstMetaData {
// The time range of the sst
common.TimeRange time_range = 4;
common.TableSchema schema = 5;
uint64 size = 6;
uint64 row_num = 7;
analytic_common.StorageFormatOptions storage_format_opts = 8;
SstBloomFilter bloom_filter = 9;
analytic_common.StorageFormatOptions storage_format_opts = 6;
SstBloomFilter bloom_filter = 7;
}

0 comments on commit 30a1d7b

Please sign in to comment.