Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: use flag in preflush to indicate whether reorder is required #20

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions analytic_engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ generic_error = { workspace = true }
hex = { workspace = true }
hyperloglog = { workspace = true }
id_allocator = { workspace = true }
itertools = { workspace = true }
lazy_static = { workspace = true }
logger = { workspace = true }
lru = { workspace = true }
Expand Down
24 changes: 20 additions & 4 deletions analytic_engine/src/instance/flush_compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,10 @@ pub struct TableFlushRequest {
pub table_data: TableDataRef,
/// Max sequence number to flush (inclusive).
pub max_sequence: SequenceNumber,

/// We may suggest new primary keys in preflush. if suggestion happened, we
/// need to ensure data is in new order.
need_reorder: bool,
}

#[derive(Clone)]
Expand Down Expand Up @@ -287,7 +291,7 @@ impl FlushTask {
// Start flush duration timer.
let local_metrics = self.table_data.metrics.local_flush_metrics();
let _timer = local_metrics.start_flush_timer();
self.dump_memtables(request_id, &mems_to_flush)
self.dump_memtables(request_id, &mems_to_flush, flush_req.need_reorder)
.await
.box_err()
.context(FlushJobWithCause {
Expand Down Expand Up @@ -316,6 +320,7 @@ impl FlushTask {
let mut last_sequence = table_data.last_sequence();
// Switch (freeze) all mutable memtables. And update segment duration if
// suggestion is returned.
let mut need_reorder = false;
if let Some(suggest_segment_duration) = current_version.suggest_duration() {
info!(
"Update segment duration, table:{}, table_id:{}, segment_duration:{:?}",
Expand All @@ -324,6 +329,7 @@ impl FlushTask {
assert!(!suggest_segment_duration.is_zero());

if let Some(pk_idx) = current_version.suggest_primary_key() {
need_reorder = true;
let mut schema = table_data.schema();
info!(
"Update primary key, table:{}, table_id:{}, old:{:?}, new:{:?}",
Expand Down Expand Up @@ -388,6 +394,7 @@ impl FlushTask {
Ok(TableFlushRequest {
table_data: table_data.clone(),
max_sequence: last_sequence,
need_reorder,
})
}

Expand All @@ -401,6 +408,7 @@ impl FlushTask {
&self,
request_id: RequestId,
mems_to_flush: &FlushableMemTables,
need_reorder: bool,
) -> Result<()> {
let local_metrics = self.table_data.metrics.local_flush_metrics();
let mut files_to_level0 = Vec::with_capacity(mems_to_flush.memtables.len());
Expand All @@ -410,7 +418,12 @@ impl FlushTask {
// process sampling memtable and frozen memtable
if let Some(sampling_mem) = &mems_to_flush.sampling_mem {
if let Some(seq) = self
.dump_sampling_memtable(request_id, sampling_mem, &mut files_to_level0)
.dump_sampling_memtable(
request_id,
sampling_mem,
&mut files_to_level0,
need_reorder,
)
.await?
{
flushed_sequence = seq;
Expand Down Expand Up @@ -500,6 +513,7 @@ impl FlushTask {
request_id: RequestId,
sampling_mem: &SamplingMemTable,
files_to_level0: &mut Vec<AddFile>,
need_reorder: bool,
) -> Result<Option<SequenceNumber>> {
let (min_key, max_key) = match (sampling_mem.mem.min_key(), sampling_mem.mem.max_key()) {
(Some(min_key), Some(max_key)) => (min_key, max_key),
Expand Down Expand Up @@ -589,11 +603,13 @@ impl FlushTask {

let iter = build_mem_table_iter(sampling_mem.mem.clone(), &self.table_data)?;
let timestamp_idx = self.table_data.schema().timestamp_index();
if let Some(pk_idx) = self.table_data.current_version().suggest_primary_key() {
if need_reorder {
let schema = self.table_data.schema();
let primary_key_indexes = schema.primary_key_indexes();
let reorder = Reorder {
iter,
schema: self.table_data.schema(),
order_by_col_indexes: pk_idx,
order_by_col_indexes: primary_key_indexes.to_vec(),
};
let mut stream = reorder.into_stream().await.context(ReorderMemIter)?;
while let Some(data) = stream.next().await {
Expand Down
3 changes: 2 additions & 1 deletion analytic_engine/src/instance/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ use crate::{
metrics::MaybeTableLevelMetrics,
},
table::data::{TableDataRef, TableShardInfo},
RecoverMode, TableOptions,
RecoverMode, TableOptions, WalEncodeConfig,
};

#[allow(clippy::enum_variant_names)]
Expand Down Expand Up @@ -186,6 +186,7 @@ pub struct Instance {
pub(crate) scan_options: ScanOptions,
pub(crate) iter_options: Option<IterOptions>,
pub(crate) recover_mode: RecoverMode,
pub(crate) wal_encode: WalEncodeConfig,
}

impl Instance {
Expand Down
1 change: 1 addition & 0 deletions analytic_engine/src/instance/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ impl Instance {
iter_options,
scan_options,
recover_mode: ctx.config.recover_mode,
wal_encode: ctx.config.wal_encode,
});

Ok(instance)
Expand Down
4 changes: 2 additions & 2 deletions analytic_engine/src/instance/reorder_memtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use datafusion::{
execute_stream, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning,
RecordBatchStream as DfRecordBatchStream, SendableRecordBatchStream, Statistics,
},
prelude::{col, Expr, SessionConfig, SessionContext},
prelude::{ident, Expr, SessionConfig, SessionContext},
sql::TableReference,
};
use futures::{Stream, StreamExt};
Expand Down Expand Up @@ -241,7 +241,7 @@ impl Reorder {
let columns = schema.columns();
let sort_exprs = sort_by_col_idx
.iter()
.map(|i| col(&columns[*i].name).sort(true, true))
.map(|i| ident(&columns[*i].name).sort(true, true))
.collect::<Vec<_>>();
let df_plan = LogicalPlanBuilder::scan(DUMMY_TABLE_NAME, source, None)?
.sort(sort_exprs)?
Expand Down
75 changes: 54 additions & 21 deletions analytic_engine/src/instance/wal_replayer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,14 @@ use std::{
collections::{HashMap, VecDeque},
fmt::Display,
ops::Range,
sync::Arc,
};

use async_trait::async_trait;
use common_types::{schema::IndexInWriterSchema, table::ShardId};
use common_types::{
schema::{IndexInWriterSchema, Schema},
table::ShardId,
};
use generic_error::BoxError;
use lazy_static::lazy_static;
use logger::{debug, error, info, trace, warn};
Expand All @@ -44,7 +48,7 @@ use crate::{
serial_executor::TableOpSerialExecutor,
write::MemTableWriter,
},
payload::{ReadPayload, WalDecoder},
payload::{ReadPayload, SingleSchemaProviderAdapter, TableSchemaProvider, WalDecoder},
table::data::TableDataRef,
};

Expand Down Expand Up @@ -173,19 +177,19 @@ impl Replay for TableBasedReplay {
) -> Result<FailedTables> {
debug!("Replay wal logs on table mode, context:{context}, tables:{table_datas:?}",);

let mut faileds = HashMap::new();
let mut failed_tables = HashMap::new();
let read_ctx = ReadContext {
batch_size: context.wal_replay_batch_size,
..Default::default()
};
for table_data in table_datas {
let table_id = table_data.id;
if let Err(e) = Self::recover_table_logs(context, table_data, &read_ctx).await {
faileds.insert(table_id, e);
failed_tables.insert(table_id, e);
}
}

Ok(faileds)
Ok(failed_tables)
}
}

Expand Down Expand Up @@ -217,9 +221,14 @@ impl TableBasedReplay {
loop {
// fetch entries to log_entry_buf
let _timer = PULL_LOGS_DURATION_HISTOGRAM.start_timer();
let decoder = WalDecoder;
let adapter = SingleSchemaProviderAdapter {
schema: table_data.schema(),
};
let decoder = WalDecoder::new(adapter);
// All the logs should belong the table, so no need to check again.
let filter = |_| true;
log_entry_buf = log_iter
.next_log_entries(decoder, log_entry_buf)
.next_log_entries(decoder, filter, log_entry_buf)
.await
.box_err()
.context(ReplayWalWithCause { msg: None })?;
Expand Down Expand Up @@ -257,15 +266,26 @@ impl Replay for RegionBasedReplay {
debug!("Replay wal logs on region mode, context:{context}, tables:{table_datas:?}",);

// Init all table results to be oks, and modify to errs when failed to replay.
let mut faileds = FailedTables::new();
let mut failed_tables = FailedTables::new();
let scan_ctx = ScanContext {
batch_size: context.wal_replay_batch_size,
..Default::default()
};

Self::replay_region_logs(context, table_datas, &scan_ctx, &mut faileds).await?;
Self::replay_region_logs(context, table_datas, &scan_ctx, &mut failed_tables).await?;

Ok(failed_tables)
}
}

#[derive(Clone)]
struct TableSchemaProviderAdapter {
table_datas: Arc<HashMap<common_types::table::TableId, TableDataRef>>,
}

Ok(faileds)
impl TableSchemaProvider for TableSchemaProviderAdapter {
fn table_schema(&self, table_id: common_types::table::TableId) -> Option<Schema> {
self.table_datas.get(&table_id).map(|v| v.schema())
}
}

Expand All @@ -280,7 +300,7 @@ impl RegionBasedReplay {
context: &ReplayContext,
table_datas: &[TableDataRef],
scan_ctx: &ScanContext,
faileds: &mut FailedTables,
failed_tables: &mut FailedTables,
) -> Result<()> {
// Scan all wal logs of current shard.
let scan_req = ScanRequest {
Expand All @@ -297,21 +317,29 @@ impl RegionBasedReplay {

// Lock all related tables.
let mut serial_exec_ctxs = HashMap::with_capacity(table_datas.len());
let mut table_datas_by_id = HashMap::with_capacity(table_datas.len());
for table_data in table_datas {
let serial_exec = table_data.serial_exec.lock().await;
let serial_exec_ctx = SerialExecContext {
table_data: table_data.clone(),
serial_exec,
};
serial_exec_ctxs.insert(table_data.id, serial_exec_ctx);
table_datas_by_id.insert(table_data.id.as_u64(), table_data.clone());
}

let table_datas_by_id = Arc::new(table_datas_by_id);
let schema_provider = TableSchemaProviderAdapter {
table_datas: table_datas_by_id.clone(),
};
// Split and replay logs.
loop {
let _timer = PULL_LOGS_DURATION_HISTOGRAM.start_timer();
let decoder = WalDecoder;
let decoder = WalDecoder::new(schema_provider.clone());
let table_datas_for_filter = table_datas_by_id.clone();
let log_filter = move |log_table_id| table_datas_for_filter.contains_key(&log_table_id);
log_entry_buf = log_iter
.next_log_entries(decoder, log_entry_buf)
.next_log_entries(decoder, log_filter, log_entry_buf)
.await
.box_err()
.context(ReplayWalWithCause { msg: None })?;
Expand All @@ -321,8 +349,13 @@ impl RegionBasedReplay {
}

let _timer = APPLY_LOGS_DURATION_HISTOGRAM.start_timer();
Self::replay_single_batch(context, &log_entry_buf, &mut serial_exec_ctxs, faileds)
.await?;
Self::replay_single_batch(
context,
&log_entry_buf,
&mut serial_exec_ctxs,
failed_tables,
)
.await?;
}

Ok(())
Expand All @@ -332,7 +365,7 @@ impl RegionBasedReplay {
context: &ReplayContext,
log_batch: &VecDeque<LogEntry<ReadPayload>>,
serial_exec_ctxs: &mut HashMap<TableId, SerialExecContext<'_>>,
faileds: &mut FailedTables,
failed_tables: &mut FailedTables,
) -> Result<()> {
let mut table_batches = Vec::new();
// TODO: No `group_by` method in `VecDeque`, so implement it manually here...
Expand All @@ -341,7 +374,7 @@ impl RegionBasedReplay {
// TODO: Replay logs of different tables in parallel.
for table_batch in table_batches {
// Some tables may have failed in previous replay, ignore them.
if faileds.contains_key(&table_batch.table_id) {
if failed_tables.contains_key(&table_batch.table_id) {
continue;
}

Expand All @@ -359,7 +392,7 @@ impl RegionBasedReplay {

// If occur error, mark this table as failed and store the cause.
if let Err(e) = result {
faileds.insert(table_batch.table_id, e);
failed_tables.insert(table_batch.table_id, e);
}
}
}
Expand Down Expand Up @@ -489,16 +522,16 @@ async fn replay_table_log_entries(
let index_in_writer =
IndexInWriterSchema::for_same_schema(row_group.schema().num_columns());
let memtable_writer = MemTableWriter::new(table_data.clone(), serial_exec);
let memtable_write_ret = memtable_writer
.write(sequence, &row_group.into(), index_in_writer)
let write_res = memtable_writer
.write(sequence, row_group, index_in_writer)
.box_err()
.context(ReplayWalWithCause {
msg: Some(format!(
"table_id:{}, table_name:{}, space_id:{}",
table_data.space_id, table_data.name, table_data.id
)),
});
if let Err(e) = memtable_write_ret {
if let Err(e) = write_res {
// TODO: find a better way to match this.
if e.to_string().contains(crate::memtable::TOO_LARGE_MESSAGE) {
// ignore this error
Expand Down
Loading
Loading