Skip to content

Commit

Permalink
fix: remove size from sst metadata (#513)
Browse files Browse the repository at this point in the history
* remove size from sst metadata

* pass file size from file handler

* run CI for breaking-changes branch

* fix CR
  • Loading branch information
jiacai2050 authored Jan 3, 2023
1 parent 5819fa5 commit 1195831
Show file tree
Hide file tree
Showing 17 changed files with 154 additions and 175 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ on:
push:
branches:
- main
- breaking-changes
paths-ignore:
- 'docs/**'
- 'etc/**'
Expand All @@ -17,6 +18,7 @@ on:
pull_request:
branches:
- main
- breaking-changes
paths-ignore:
- 'docs/**'
- 'etc/**'
Expand Down
120 changes: 60 additions & 60 deletions analytic_engine/src/compaction/picker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -617,15 +617,13 @@ mod tests {
},
};

fn build_sst_meta_data(time_range: TimeRange, size: u64) -> SstMetaData {
fn build_sst_meta_data(time_range: TimeRange) -> SstMetaData {
SstMetaData {
min_key: Bytes::from_static(b"100"),
max_key: Bytes::from_static(b"200"),
time_range,
max_sequence: 200,
schema: build_schema(),
size,
row_num: 2,
storage_format_opts: Default::default(),
bloom_filter: Default::default(),
}
Expand All @@ -635,22 +633,22 @@ mod tests {
fn build_old_bucket_case(now: i64) -> LevelsController {
let builder = LevelsControllerMockBuilder::default();
let sst_meta_vec = vec![
build_sst_meta_data(
TimeRange::new_unchecked(Timestamp::new(now - 14000), Timestamp::new(now - 13000)),
2,
),
build_sst_meta_data(
TimeRange::new_unchecked(Timestamp::new(now - 14000), Timestamp::new(now - 13000)),
2,
),
build_sst_meta_data(
TimeRange::new_unchecked(Timestamp::new(now - 4000), Timestamp::new(now - 3000)),
2,
),
build_sst_meta_data(
TimeRange::new_unchecked(Timestamp::new(100), Timestamp::new(200)),
2,
),
build_sst_meta_data(TimeRange::new_unchecked(
Timestamp::new(now - 14000),
Timestamp::new(now - 13000),
)),
build_sst_meta_data(TimeRange::new_unchecked(
Timestamp::new(now - 14000),
Timestamp::new(now - 13000),
)),
build_sst_meta_data(TimeRange::new_unchecked(
Timestamp::new(now - 4000),
Timestamp::new(now - 3000),
)),
build_sst_meta_data(TimeRange::new_unchecked(
Timestamp::new(100),
Timestamp::new(200),
)),
];
builder.add_sst(sst_meta_vec).build()
}
Expand All @@ -660,30 +658,30 @@ mod tests {
fn build_newest_bucket_case(now: i64) -> LevelsController {
let builder = LevelsControllerMockBuilder::default();
let sst_meta_vec = vec![
build_sst_meta_data(
TimeRange::new_unchecked(Timestamp::new(now - 14000), Timestamp::new(now - 13000)),
2,
),
build_sst_meta_data(
TimeRange::new_unchecked(Timestamp::new(now - 14000), Timestamp::new(now - 13000)),
2,
),
build_sst_meta_data(
TimeRange::new_unchecked(Timestamp::new(now - 4000), Timestamp::new(now - 3000)),
2,
),
build_sst_meta_data(
TimeRange::new_unchecked(Timestamp::new(now - 4000), Timestamp::new(now - 3000)),
2,
),
build_sst_meta_data(
TimeRange::new_unchecked(Timestamp::new(now - 4000), Timestamp::new(now - 3000)),
2,
),
build_sst_meta_data(
TimeRange::new_unchecked(Timestamp::new(now - 4000), Timestamp::new(now - 3000)),
2,
),
build_sst_meta_data(TimeRange::new_unchecked(
Timestamp::new(now - 14000),
Timestamp::new(now - 13000),
)),
build_sst_meta_data(TimeRange::new_unchecked(
Timestamp::new(now - 14000),
Timestamp::new(now - 13000),
)),
build_sst_meta_data(TimeRange::new_unchecked(
Timestamp::new(now - 4000),
Timestamp::new(now - 3000),
)),
build_sst_meta_data(TimeRange::new_unchecked(
Timestamp::new(now - 4000),
Timestamp::new(now - 3000),
)),
build_sst_meta_data(TimeRange::new_unchecked(
Timestamp::new(now - 4000),
Timestamp::new(now - 3000),
)),
build_sst_meta_data(TimeRange::new_unchecked(
Timestamp::new(now - 4000),
Timestamp::new(now - 3000),
)),
];
builder.add_sst(sst_meta_vec).build()
}
Expand All @@ -693,22 +691,22 @@ mod tests {
fn build_newest_bucket_no_match_case(now: i64) -> LevelsController {
let builder = LevelsControllerMockBuilder::default();
let sst_meta_vec = vec![
build_sst_meta_data(
TimeRange::new_unchecked(Timestamp::new(now - 14000), Timestamp::new(now - 13000)),
2,
),
build_sst_meta_data(
TimeRange::new_unchecked(Timestamp::new(now - 4000), Timestamp::new(now - 3000)),
2,
),
build_sst_meta_data(
TimeRange::new_unchecked(Timestamp::new(now - 4000), Timestamp::new(now - 3000)),
2,
),
build_sst_meta_data(
TimeRange::new_unchecked(Timestamp::new(now - 4000), Timestamp::new(now - 3000)),
2,
),
build_sst_meta_data(TimeRange::new_unchecked(
Timestamp::new(now - 14000),
Timestamp::new(now - 13000),
)),
build_sst_meta_data(TimeRange::new_unchecked(
Timestamp::new(now - 4000),
Timestamp::new(now - 3000),
)),
build_sst_meta_data(TimeRange::new_unchecked(
Timestamp::new(now - 4000),
Timestamp::new(now - 3000),
)),
build_sst_meta_data(TimeRange::new_unchecked(
Timestamp::new(now - 4000),
Timestamp::new(now - 3000),
)),
];
builder.add_sst(sst_meta_vec).build()
}
Expand Down Expand Up @@ -769,7 +767,9 @@ mod tests {
.map(|size| {
let file_meta = FileMeta {
id: 1,
meta: build_sst_meta_data(TimeRange::empty(), size),
size,
row_num: 0,
meta: build_sst_meta_data(TimeRange::empty()),
};
let queue = FilePurgeQueue::new(1, 1.into(), tx.clone());
FileHandle::new(file_meta, queue)
Expand Down
44 changes: 20 additions & 24 deletions analytic_engine/src/instance/flush_compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,7 @@ impl Instance {
flushed_sequence = seq;
sst_num += files_to_level0.len();
for add_file in &files_to_level0 {
local_metrics.observe_sst_size(add_file.file.meta.size);
local_metrics.observe_sst_size(add_file.file.size);
}
}
}
Expand All @@ -511,7 +511,7 @@ impl Instance {
.dump_normal_memtable(table_data, request_id, mem)
.await?;
if let Some(file) = file {
let sst_size = file.meta.size;
let sst_size = file.size;
files_to_level0.push(AddFile { level: 0, file });

// Set flushed sequence to max of the last_sequence of memtables.
Expand Down Expand Up @@ -619,14 +619,12 @@ impl Instance {
let sst_file_path = table_data.set_sst_file_path(file_id);

// TODO: min_key max_key set in sst_builder build
let mut sst_meta = SstMetaData {
let sst_meta = SstMetaData {
min_key: min_key.clone(),
max_key: max_key.clone(),
time_range: *time_range,
max_sequence,
schema: table_data.schema(),
size: 0,
row_num: 0,
storage_format_opts: StorageFormatOptions::new(
table_data.table_options().storage_format,
),
Expand Down Expand Up @@ -663,10 +661,7 @@ impl Instance {
path: sst_file_path.to_string(),
})?;

// update sst metadata by built info.
sst_meta.row_num = sst_info.row_num as u64;
sst_meta.size = sst_info.file_size as u64;
Ok(sst_meta)
Ok((sst_info, sst_meta))
});

batch_record_senders.push(batch_record_sender);
Expand Down Expand Up @@ -697,13 +692,16 @@ impl Instance {
}
batch_record_senders.clear();

let ret = try_join_all(sst_handlers).await;
for (idx, sst_meta) in ret.context(RuntimeJoin)?.into_iter().enumerate() {
let info_and_metas = try_join_all(sst_handlers).await.context(RuntimeJoin)?;
for (idx, info_and_meta) in info_and_metas.into_iter().enumerate() {
let (sst_info, sst_meta) = info_and_meta?;
files_to_level0.push(AddFile {
level: 0,
file: FileMeta {
id: file_ids[idx],
meta: sst_meta?,
size: sst_info.file_size as u64,
row_num: sst_info.row_num as u64,
meta: sst_meta,
},
})
}
Expand All @@ -727,14 +725,12 @@ impl Instance {
}
};
let max_sequence = memtable_state.last_sequence();
let mut sst_meta = SstMetaData {
let sst_meta = SstMetaData {
min_key,
max_key,
time_range: memtable_state.time_range,
max_sequence,
schema: table_data.schema(),
size: 0,
row_num: 0,
storage_format_opts: StorageFormatOptions::new(table_data.storage_format()),
bloom_filter: Default::default(),
};
Expand Down Expand Up @@ -774,11 +770,11 @@ impl Instance {
})?;

// update sst metadata by built info.
sst_meta.row_num = sst_info.row_num as u64;
sst_meta.size = sst_info.file_size as u64;

Ok(Some(FileMeta {
id: file_id,
row_num: sst_info.row_num as u64,
size: sst_info.file_size as u64,
meta: sst_meta,
}))
}
Expand Down Expand Up @@ -953,7 +949,7 @@ impl SpaceStore {
row_iter::record_batch_with_key_iter_to_stream(merge_iter, &runtime)
};

let mut sst_meta = file::merge_sst_meta(&input.files, schema);
let sst_meta = file::merge_sst_meta(&input.files, schema);

// Alloc file id for the merged sst.
let file_id = table_data.alloc_file_id();
Expand All @@ -979,16 +975,14 @@ impl SpaceStore {
path: sst_file_path.to_string(),
})?;

// update sst metadata by built info.
sst_meta.row_num = sst_info.row_num as u64;
sst_meta.size = sst_info.file_size as u64;

let sst_file_size = sst_info.file_size as u64;
let sst_row_num = sst_info.row_num as u64;
table_data
.metrics
.compaction_observe_output_sst_size(sst_meta.size);
.compaction_observe_output_sst_size(sst_file_size);
table_data
.metrics
.compaction_observe_output_sst_row_num(sst_meta.row_num);
.compaction_observe_output_sst_row_num(sst_row_num);

info!(
"Instance files compacted, table:{}, table_id:{}, request_id:{}, output_path:{}, input_files:{:?}, sst_meta:{:?}",
Expand All @@ -1014,6 +1008,8 @@ impl SpaceStore {
level: input.output_level,
file: FileMeta {
id: file_id,
size: sst_file_size,
row_num: sst_row_num,
meta: sst_meta,
},
});
Expand Down
3 changes: 2 additions & 1 deletion analytic_engine/src/row_iter/record_batch_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,8 +291,9 @@ pub async fn stream_from_sst_file(
) -> Result<SequencedRecordBatchStream> {
sst_file.read_meter().mark();
let path = sst_util::new_sst_file_path(space_id, table_id, sst_file.id());

let mut sst_reader = sst_factory
.new_sst_reader(sst_reader_options, &path, store_picker)
.new_sst_reader(sst_reader_options, &path, store_picker, sst_file.size())
.with_context(|| SstReaderNotFound {
options: sst_reader_options.clone(),
})?;
Expand Down
Loading

0 comments on commit 1195831

Please sign in to comment.