Skip to content

Commit

Permalink
fix: make ut works
Browse files Browse the repository at this point in the history
  • Loading branch information
ShiKaiWi committed Apr 9, 2023
1 parent b085aa2 commit fe59ce2
Show file tree
Hide file tree
Showing 8 changed files with 31 additions and 89 deletions.
2 changes: 1 addition & 1 deletion analytic_engine/src/compaction/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -641,7 +641,7 @@ impl ScheduleWorker {
let flush_scheduler = serializer.flush_scheduler();
// Instance flush the table asynchronously.
if let Err(e) = flusher
.schedule_flush(flush_scheduler, &table_data, TableFlushOptions::default())
.schedule_flush(flush_scheduler, table_data, TableFlushOptions::default())
.await
{
error!("Failed to flush table, err:{}", e);
Expand Down
14 changes: 3 additions & 11 deletions analytic_engine/src/instance/flush_compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ pub enum Error {
define_result!(Error);

/// Options to flush single table.
#[derive(Default)]
pub struct TableFlushOptions {
/// Flush result sender.
///
Expand All @@ -152,15 +153,6 @@ impl fmt::Debug for TableFlushOptions {
}
}

impl Default for TableFlushOptions {
fn default() -> Self {
Self {
res_sender: None,
compact_after_flush: None,
}
}
}

/// Request to flush single table.
pub struct TableFlushRequest {
/// Table to flush.
Expand Down Expand Up @@ -311,7 +303,7 @@ impl Flusher {
flush_job,
on_flush_success,
block_on,
&*self.runtime,
&self.runtime,
opts.res_sender,
)
.await
Expand All @@ -323,7 +315,7 @@ impl Flusher {
flush_job,
async {},
block_on,
&*self.runtime,
&self.runtime,
opts.res_sender,
)
.await
Expand Down
2 changes: 1 addition & 1 deletion analytic_engine/src/instance/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ impl Instance {
let mut serializer = table_data.serializer.lock().await;
let flush_scheduler = serializer.flush_scheduler();
flusher
.schedule_flush(flush_scheduler, &table_data, flush_opts)
.schedule_flush(flush_scheduler, table_data, flush_opts)
.await
.box_err()
.context(ManualFlush {
Expand Down
2 changes: 1 addition & 1 deletion analytic_engine/src/instance/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ impl Instance {
let flusher = self.make_flusher();
let flush_scheduler = serializer.flush_scheduler();
flusher
.schedule_flush(flush_scheduler, &table_data, opts)
.schedule_flush(flush_scheduler, table_data, opts)
.await
.context(FlushTable {
space_id: table_data.space_id,
Expand Down
1 change: 1 addition & 0 deletions analytic_engine/src/instance/serializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ impl TableFlushScheduler {
/// sequential.
///
/// REQUIRE: should only be called by the write thread.
#[allow(clippy::too_many_arguments)]
pub async fn flush_sequentially<F, T>(
&mut self,
table: String,
Expand Down
4 changes: 2 additions & 2 deletions analytic_engine/src/instance/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -600,12 +600,12 @@ impl<'a> Writer<'a> {
Ok(mut serializer) => {
let flush_scheduler = serializer.flush_scheduler();
// Set `block_on_write_thread` to false and let flush do in background.
return flusher
flusher
.schedule_flush(flush_scheduler, table_data, opts)
.await
.context(FlushTable {
table: &table_data.name,
});
})
}
Err(_) => {
warn!(
Expand Down
45 changes: 7 additions & 38 deletions analytic_engine/src/table/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -627,7 +627,6 @@ pub mod tests {
table_id: TableId,
table_name: String,
shard_id: ShardId,
write_handle: Option<WriteHandle>,
}

impl TableDataMocker {
Expand All @@ -646,11 +645,6 @@ pub mod tests {
self
}

pub fn write_handle(mut self, write_handle: WriteHandle) -> Self {
self.write_handle = Some(write_handle);
self
}

pub fn build(self) -> TableData {
let space_id = DEFAULT_SPACE_ID;
let table_schema = default_schema();
Expand All @@ -668,23 +662,11 @@ pub mod tests {
partition_info: None,
};

let write_handle = self.write_handle.unwrap_or_else(|| {
let mocked_write_handle = WriteHandleMocker::default().space_id(space_id).build();
mocked_write_handle.write_handle
});
let table_opts = TableOptions::default();
let purger = FilePurgerMocker::mock();
let collector = Arc::new(NoopCollector);

TableData::new(
space_id,
create_request,
write_handle,
table_opts,
&purger,
collector,
)
.unwrap()
TableData::new(space_id, create_request, table_opts, &purger, collector).unwrap()
}
}

Expand All @@ -694,7 +676,6 @@ pub mod tests {
table_id: table::new_table_id(2, 1),
table_name: "mocked_table".to_string(),
shard_id: DEFAULT_SHARD_ID,
write_handle: None,
}
}
}
Expand Down Expand Up @@ -722,30 +703,20 @@ pub mod tests {

#[test]
fn test_find_or_create_mutable() {
let mocked_write_handle = WriteHandleMocker::default()
.space_id(DEFAULT_SPACE_ID)
.build();
let table_data = TableDataMocker::default()
.write_handle(mocked_write_handle.write_handle)
.build();
let worker_local = mocked_write_handle.worker_local;
let table_data = TableDataMocker::default().build();
let schema = table_data.schema();

// Create sampling memtable.
let zero_ts = Timestamp::new(0);
let mutable = table_data
.find_or_create_mutable(&worker_local, zero_ts, &schema)
.unwrap();
let mutable = table_data.find_or_create_mutable(zero_ts, &schema).unwrap();
assert!(mutable.accept_timestamp(zero_ts));
let sampling_mem = mutable.as_sampling();
let sampling_id = sampling_mem.id;
assert_eq!(1, sampling_id);

// Test memtable is reused.
let now_ts = Timestamp::now();
let mutable = table_data
.find_or_create_mutable(&worker_local, now_ts, &schema)
.unwrap();
let mutable = table_data.find_or_create_mutable(now_ts, &schema).unwrap();
assert!(mutable.accept_timestamp(now_ts));
let sampling_mem = mutable.as_sampling();
// Use same sampling memtable.
Expand All @@ -756,14 +727,12 @@ pub mod tests {
let mut table_opts = (*table_data.table_options()).clone();
table_opts.segment_duration =
Some(ReadableDuration(table_options::DEFAULT_SEGMENT_DURATION));
table_data.set_table_options(&worker_local, table_opts);
table_data.set_table_options(table_opts);
// Freeze sampling memtable.
current_version.freeze_sampling(&worker_local);
current_version.freeze_sampling();

// A new mutable memtable should be created.
let mutable = table_data
.find_or_create_mutable(&worker_local, now_ts, &schema)
.unwrap();
let mutable = table_data.find_or_create_mutable(now_ts, &schema).unwrap();
assert!(mutable.accept_timestamp(now_ts));
let mem_state = mutable.as_normal();
assert_eq!(2, mem_state.id);
Expand Down
50 changes: 15 additions & 35 deletions analytic_engine/src/table/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -787,7 +787,6 @@ impl TableVersionMeta {
mod tests {
use super::*;
use crate::{
instance::write_worker::tests::WriteHandleMocker,
sst::file::tests::FilePurgerMocker,
table::{data::tests::MemTableMocker, version_edit::tests::AddFileMocker},
table_options,
Expand All @@ -802,8 +801,6 @@ mod tests {

#[test]
fn test_empty_table_version() {
let mocked_write_handle = WriteHandleMocker::default().build();
let worker_local = mocked_write_handle.worker_local;
let version = new_table_version();

let ts = Timestamp::now();
Expand Down Expand Up @@ -835,13 +832,11 @@ mod tests {
}

let now = Timestamp::now();
let mutable = version.memtable_for_write(&worker_local, now, 1).unwrap();
let mutable = version.memtable_for_write(now, 1).unwrap();
assert!(mutable.is_none());

// Nothing to switch.
assert!(version
.switch_memtables_or_suggest_duration(&worker_local)
.is_none());
assert!(version.switch_memtables_or_suggest_duration().is_none());
}

fn check_flushable_mem_with_sampling(
Expand All @@ -858,8 +853,6 @@ mod tests {

#[test]
fn test_table_version_sampling() {
let mocked_write_handle = WriteHandleMocker::default().build();
let worker_local = mocked_write_handle.worker_local;
let version = new_table_version();

let memtable = MemTableMocker::default().build();
Expand All @@ -873,14 +866,14 @@ mod tests {
// Should write to sampling memtable.
let now = Timestamp::now();
let mutable = version
.memtable_for_write(&worker_local, now, schema.version())
.memtable_for_write(now, schema.version())
.unwrap()
.unwrap();
let actual_memtable = mutable.as_sampling();
assert_eq!(memtable_id, actual_memtable.id);

let mutable = version
.memtable_for_write(&worker_local, Timestamp::new(1234), schema.version())
.memtable_for_write(Timestamp::new(1234), schema.version())
.unwrap()
.unwrap();
let actual_memtable = mutable.as_sampling();
Expand All @@ -898,7 +891,6 @@ mod tests {

#[test]
fn test_table_version_sampling_switch() {
let worker_local = WriteHandleMocker::default().build().worker_local;
let version = new_table_version();

let memtable = MemTableMocker::default().build();
Expand All @@ -910,9 +902,7 @@ mod tests {

version.set_sampling(sampling_mem);

let duration = version
.switch_memtables_or_suggest_duration(&worker_local)
.unwrap();
let duration = version.switch_memtables_or_suggest_duration().unwrap();
assert_eq!(table_options::DEFAULT_SEGMENT_DURATION, duration);

// Flushable memtables only contains sampling memtable.
Expand All @@ -922,17 +912,15 @@ mod tests {
// Write to memtable after switch and before freezed.
let now = Timestamp::now();
let mutable = version
.memtable_for_write(&worker_local, now, schema.version())
.memtable_for_write(now, schema.version())
.unwrap()
.unwrap();
// Still write to sampling memtable.
let actual_memtable = mutable.as_sampling();
assert_eq!(memtable_id, actual_memtable.id);

// Switch still return duration before freezed.
let duration = version
.switch_memtables_or_suggest_duration(&worker_local)
.unwrap();
let duration = version.switch_memtables_or_suggest_duration().unwrap();
assert_eq!(table_options::DEFAULT_SEGMENT_DURATION, duration);

// Flushable memtables only contains sampling memtable before sampling
Expand All @@ -943,7 +931,6 @@ mod tests {

#[test]
fn test_table_version_sampling_freeze() {
let worker_local = WriteHandleMocker::default().build().worker_local;
let version = new_table_version();

let memtable = MemTableMocker::default().build();
Expand All @@ -956,18 +943,16 @@ mod tests {
version.set_sampling(sampling_mem);
assert_eq!(
table_options::DEFAULT_SEGMENT_DURATION,
version
.switch_memtables_or_suggest_duration(&worker_local)
.unwrap()
version.switch_memtables_or_suggest_duration().unwrap()
);

// Freeze the sampling memtable.
version.freeze_sampling(&worker_local);
version.freeze_sampling();

// No memtable after switch and freezed.
let now = Timestamp::now();
assert!(version
.memtable_for_write(&worker_local, now, schema.version())
.memtable_for_write(now, schema.version())
.unwrap()
.is_none());

Expand Down Expand Up @@ -996,7 +981,7 @@ mod tests {

// Write to mutable memtable.
let mutable = version
.memtable_for_write(&worker_local, now, schema.version())
.memtable_for_write(now, schema.version())
.unwrap()
.unwrap();
let mutable = mutable.as_normal();
Expand All @@ -1010,13 +995,11 @@ mod tests {
assert_eq!(memtable_id2, read_view.memtables[0].id);

// Switch mutable memtable.
assert!(version
.switch_memtables_or_suggest_duration(&worker_local)
.is_none());
assert!(version.switch_memtables_or_suggest_duration().is_none());
// No memtable after switch.
let now = Timestamp::now();
assert!(version
.memtable_for_write(&worker_local, now, schema.version())
.memtable_for_write(now, schema.version())
.unwrap()
.is_none());

Expand All @@ -1029,7 +1012,6 @@ mod tests {

#[test]
fn test_table_version_sampling_apply_edit() {
let worker_local = WriteHandleMocker::default().build().worker_local;
let version = new_table_version();

let memtable = MemTableMocker::default().build();
Expand All @@ -1039,7 +1021,7 @@ mod tests {

// Prepare sampling memtable.
version.set_sampling(sampling_mem);
version.freeze_sampling(&worker_local);
version.freeze_sampling();

let now = Timestamp::now();
let time_range =
Expand All @@ -1057,9 +1039,7 @@ mod tests {
version.insert_mutable(mem_state);

// Switch memtable.
assert!(version
.switch_memtables_or_suggest_duration(&worker_local)
.is_none());
assert!(version.switch_memtables_or_suggest_duration().is_none());

let max_sequence = 120;
let file_id = 13;
Expand Down

0 comments on commit fe59ce2

Please sign in to comment.