From 765afa2ca4b50b2f52d9299591d5d199f6d5fb71 Mon Sep 17 00:00:00 2001 From: kamille Date: Mon, 5 Jun 2023 21:57:02 +0800 Subject: [PATCH 01/15] draft of wal replayer. --- analytic_engine/src/instance/engine.rs | 16 +- analytic_engine/src/instance/wal_replayer.rs | 500 +++++++++++++++++++ 2 files changed, 515 insertions(+), 1 deletion(-) create mode 100644 analytic_engine/src/instance/wal_replayer.rs diff --git a/analytic_engine/src/instance/engine.rs b/analytic_engine/src/instance/engine.rs index 00b6ba8745..e32fa064d5 100644 --- a/analytic_engine/src/instance/engine.rs +++ b/analytic_engine/src/instance/engine.rs @@ -218,6 +218,18 @@ pub enum Error { #[snafu(display("Failed to open shard, msg:{}.\nBacktrace:\n{}", msg, backtrace))] OpenTablesOfShard { msg: String, backtrace: Backtrace }, + + #[snafu(display("Failed to replay wal, msg:{:?}, err:{}", msg, source))] + ReplayWalWithCause { + msg: Option, + source: GenericError, + }, + + #[snafu(display("Failed to replay wal, msg:{:?}.\nBacktrace:\n{}", msg, backtrace))] + ReplayWalNoCause { + msg: Option, + backtrace: Backtrace, + }, } define_result!(Error); @@ -250,7 +262,9 @@ impl From for table_engine::engine::Error { | Error::DoManifestSnapshot { .. } | Error::OpenManifest { .. } | Error::TableNotExist { .. } - | Error::OpenTablesOfShard { .. } => Self::Unexpected { + | Error::OpenTablesOfShard { .. } + | Error::ReplayWalNoCause { .. } + | Error::ReplayWalWithCause { .. } => Self::Unexpected { source: Box::new(err), }, } diff --git a/analytic_engine/src/instance/wal_replayer.rs b/analytic_engine/src/instance/wal_replayer.rs new file mode 100644 index 0000000000..e7f8947958 --- /dev/null +++ b/analytic_engine/src/instance/wal_replayer.rs @@ -0,0 +1,500 @@ +// Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0. + +use std::{collections::{VecDeque, HashMap}, fmt::Display}; + +use async_trait::async_trait; +use common_types::{table::ShardId, schema::IndexInWriterSchema, SequenceNumber}; +use common_util::error::{GenericError, GenericResult, BoxError}; +use log::{debug, info, trace, error}; +use snafu::ResultExt; +use table_engine::table::TableId; +use wal::{manager::{WalManager, ReadContext, ReadRequest, WalManagerRef, ReadBoundary, ScanRequest, RegionId, ScanContext}, log_batch::LogEntry}; +use crate::{instance::{engine::{Result, ReplayWalNoCause, ReplayWalWithCause}, +flush_compaction::{Flusher, TableFlushOptions}, serial_executor::TableOpSerialExecutor, self, write::MemTableWriter}, +table::data::TableDataRef, payload::{ReadPayload, WalDecoder}}; + +/// Wal replayer +pub struct WalReplayer { + stages: HashMap, + context: ReplayContext, + mode: ReplayMode, +} + +impl WalReplayer { + fn build_core(mode: ReplayMode) -> Box { + info!("Replay wal in mode:{mode:?}"); + + match mode { + ReplayMode::RegionBased => Box::new(TableBasedCore), + ReplayMode::TableBased => Box::new(RegionBasedCore), + } + } + + pub async fn replay(&mut self) -> Result>> { + // Build core according to mode. + let core = Self::build_core(self.mode); + info!( + "Replay wal logs begin, context:{}, states:{:?}", self.context, self.stages + ); + core.replay(&self.context, &mut self.stages).await?; + info!( + "Replay wal logs finish, context:{}, states:{:?}", self.context, self.stages, + ); + + // Return the replay results. + let stages = std::mem::take(&mut self.stages); + let mut results = HashMap::with_capacity(self.stages.len()); + for (table_id, stage) in stages { + let result = match stage { + ReplayStage::Failed(e) => Err(e), + ReplayStage::Success(_) => Ok(()), + ReplayStage::Replay(_) => return ReplayWalNoCause { + msg: Some(format!("invalid stage, stage:{stage:?}, table_id:{table_id}, context:{}, mode:{:?}", self.context, self.mode)), + }.fail(), + }; + results.insert(table_id, result); + } + + Ok(results) + } +} + +struct ReplayContext { + pub shard_id: ShardId, + pub wal_manager: WalManagerRef, + pub wal_replay_batch_size: usize, + pub flusher: Flusher, + pub max_retry_flush_limit: usize, +} + +impl Display for ReplayContext { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ReplayContext") + .field("shard_id", &self.shard_id) + .field("replay_batch_size", &self.wal_replay_batch_size) + .field("max_retry_flush_limit", &self.max_retry_flush_limit) + .finish() + } +} + +#[derive(Debug)] +enum ReplayStage { + Replay(TableDataRef), + Failed(crate::instance::engine::Error), + Success(()), +} + +#[derive(Debug, Clone, Copy)] +enum ReplayMode { + RegionBased, + TableBased, +} + +/// Replay core, the abstract of different replay strategies +#[async_trait] +trait ReplayCore { + async fn replay(&self, context: &ReplayContext, states: &mut HashMap) -> Result<()>; +} + +/// Table based wal replay core +struct TableBasedCore; + +#[async_trait] +impl ReplayCore for TableBasedCore { + async fn replay(&self, context: &ReplayContext, states: &mut HashMap) -> Result<()> { + debug!( + "Replay wal logs on table mode, context:{context}, states:{states:?}", + ); + + for (table_id, stage) in states.iter_mut() { + match stage { + ReplayStage::Replay(table_data) => { + let read_ctx = ReadContext { + batch_size: context.wal_replay_batch_size, + ..Default::default() + }; + + let result = Self::recover_table_logs( + &context.flusher, + context.max_retry_flush_limit, + context.wal_manager.as_ref(), + table_data.clone(), + context.wal_replay_batch_size, + &read_ctx, + ).await; + + match result { + Ok(()) => { + *stage = ReplayStage::Success(()); + }, + Err(e) => { + *stage = ReplayStage::Failed(e); + }, + } + } + + ReplayStage::Failed(_) | ReplayStage::Success(_) => { + return ReplayWalNoCause { + msg: Some(format!("invalid stage, stage:{stage:?}, table_id:{}, shard_id:{}", table_id, context.shard_id)) + }.fail(); + } + } + } + + Ok(()) + } +} + +impl TableBasedCore { + /// Recover table data from wal. + /// + /// Called by write worker + async fn recover_table_logs( + flusher: &Flusher, + max_retry_flush_limit: usize, + wal_manager: &dyn WalManager, + table_data: TableDataRef, + replay_batch_size: usize, + read_ctx: &ReadContext, + ) -> Result<()> { + let table_location = table_data.table_location(); + let wal_location = + instance::create_wal_location(table_location.id, table_location.shard_info); + let read_req = ReadRequest { + location: wal_location, + start: ReadBoundary::Excluded(table_data.current_version().flushed_sequence()), + end: ReadBoundary::Max, + }; + + // Read all wal of current table. + let mut log_iter = wal_manager + .read_batch(read_ctx, &read_req) + .await + .box_err() + .context(ReplayWalWithCause { + msg: None, + })?; + + let mut serial_exec = table_data.serial_exec.lock().await; + let mut log_entry_buf = VecDeque::with_capacity(replay_batch_size); + loop { + // fetch entries to log_entry_buf + let decoder = WalDecoder::default(); + log_entry_buf = log_iter + .next_log_entries(decoder, log_entry_buf) + .await + .box_err() + .context(ReplayWalWithCause { + msg: None, + })?; + + if log_entry_buf.is_empty() { + break; + } + + // Replay all log entries of current table + let last_sequence = log_entry_buf.back().unwrap().sequence; + replay_table_log_entries( + flusher, + max_retry_flush_limit, + &mut serial_exec, + &table_data, + log_entry_buf.iter(), + last_sequence + ) + .await?; + } + + Ok(()) + } +} + +/// Region based wal replay core +struct RegionBasedCore; + +#[async_trait] +impl ReplayCore for RegionBasedCore { + async fn replay(&self, context: &ReplayContext, states: &mut HashMap) -> Result<()> { + debug!( + "Replay wal logs on region mode, context:{context}, states:{states:?}", + ); + + for (table_id, state) in states.iter_mut() { + match state { + ReplayStage::Replay(table_data) => { + let read_ctx = ReadContext { + batch_size: context.wal_replay_batch_size, + ..Default::default() + }; + + let result = Self::replay_region_logs( + &context.flusher, + context.max_retry_flush_limit, + context.wal_manager.as_ref(), + table_data.clone(), + context.wal_replay_batch_size, + &read_ctx, + states, + ).await; + + match result { + Ok(()) => { + *state = ReplayStage::Success(()); + }, + Err(e) => { + *state = ReplayStage::Failed(e); + }, + } + }, + ReplayStage::Failed(_) | ReplayStage::Success(_) => { + return ReplayWalNoCause { + msg: Some(format!("table_id:{}, shard_id:{}", table_id, context.shard_id)) + }.fail(); + } + } + } + + Ok(()) + } +} + +impl RegionBasedCore { + /// Replay logs in same region. + /// + /// Steps: + /// + Scan all logs of region. + /// + Split logs according to table ids. + /// + Replay logs to recover data of tables. + async fn replay_region_logs( + shard_id: ShardId, + flusher: &Flusher, + max_retry_flush_limit: usize, + wal_manager: &dyn WalManager, + replay_batch_size: usize, + scan_ctx: &ScanContext, + states: &mut HashMap + ) -> Result<()> { + // Scan all wal logs of current shard. + let scan_req = ScanRequest { + region_id: shard_id as RegionId, + }; + + let mut log_iter = wal_manager + .scan(scan_ctx, &scan_req) + .await + .box_err() + .context(ReplayWalWithCause { + msg: None, + })?; + let mut log_entry_buf = VecDeque::with_capacity(replay_batch_size); + + // Lock all related tables. + let mut table_serial_execs = HashMap::with_capacity(states.len()); + for (table_id, state) in states { + match state { + ReplayStage::Replay(table_data) => { + let mut serial_exec = table_data.serial_exec.lock().await; + table_serial_execs.insert(*table_id, &mut *serial_exec); + } + ReplayStage::Failed(_) | ReplayStage::Success(_) => { + return ReplayWalNoCause { + msg: Some(format!("table_id:{table_id}, shard_id:{shard_id}")) + }.fail(); + } + } + } + + // Split and replay logs. + loop { + let decoder = WalDecoder::default(); + log_entry_buf = log_iter + .next_log_entries(decoder, log_entry_buf) + .await + .box_err() + .context(ReplayWalWithCause { + msg: None, + })?; + + if log_entry_buf.is_empty() { + break; + } + + Self::replay_single_batch(&log_entry_buf, flusher, max_retry_flush_limit, wal_manager, replay_batch_size, read_ctx, &table_serial_execs, states).await; + } + + Ok(()) + } + + async fn replay_single_batch( + log_batch: &VecDeque>, + flusher: &Flusher, + max_retry_flush_limit: usize, + wal_manager: &dyn WalManager, + replay_batch_size: usize, + read_ctx: &ReadContext, + table_serial_execs: &HashMap, + stages: &mut HashMap, + ) -> Result<()> + { + let mut table_batches = Vec::with_capacity(stages.len()); + Self::split_log_batch_by_table(&log_batch, &mut table_batches); + for table_batch in table_batches { + // Replay all log entries of current table + let last_sequence = table_batch.last_sequence; + replay_table_log_entries( + flusher, + max_retry_flush_limit, + &mut serial_exec, + &table_data, + log_batch.range(table_batch.start_log_idx..table_batch.end_log_idx), + last_sequence + ) + .await?; + } + + Ok(()) + } + + fn split_log_batch_by_table

(log_batch: &VecDeque>, table_batches: &mut Vec) { + table_batches.clear(); + + if log_batch.is_empty() { + return; + } + + // Split log batch by table id, for example: + // input batch: + // |1|1|2|2|2|3|3|3|3| + // + // output batches: + // |1|1|, |2|2|2|, |3|3|3|3| + let mut start_log_idx = 0usize; + for log_idx in 0..log_batch.len() { + let last_round = log_idx + 1 == log_batch.len(); + + let start_log_entry = log_batch.get(start_log_idx).unwrap(); + let current_log_entry = log_batch.get(log_idx).unwrap(); + + let start_table_id = start_log_entry.table_id; + let current_table_id = current_log_entry.table_id; + let current_sequence = current_log_entry.sequence; + + if (current_table_id != start_table_id) || last_round { + let end_log_idx = if last_round { + log_batch.len() + } else { + start_log_idx = log_idx; + log_idx + }; + + table_batches.push(TableBatch { + table_id: TableId::new(start_table_id), + last_sequence: current_sequence, + start_log_idx, + end_log_idx, + }); + } + } + } +} + +struct TableBatch { + table_id: TableId, + last_sequence: SequenceNumber, + start_log_idx: usize, + end_log_idx: usize, +} + +/// Replay all log entries into memtable and flush if necessary. +async fn replay_table_log_entries( + flusher: &Flusher, + max_retry_flush_limit: usize, + serial_exec: &mut TableOpSerialExecutor, + table_data: &TableDataRef, + log_entries: impl Iterator>, + last_sequence: SequenceNumber, +) -> Result<()> { + debug!( + "Replay table log entries begin, table:{}, table_id:{:?}, sequence:{}", + table_data.name, table_data.id, last_sequence + ); + + for log_entry in log_entries { + let (sequence, payload) = (log_entry.sequence, &log_entry.payload); + + // Apply to memtable + match payload { + ReadPayload::Write { row_group } => { + trace!( + "Instance replay row_group, table:{}, row_group:{:?}", + table_data.name, + row_group + ); + + let table_schema_version = table_data.schema_version(); + if table_schema_version != row_group.schema().version() { + // Data with old schema should already been flushed, but we avoid panic + // here. + error!( + "Ignore data with mismatch schema version during replaying, \ + table:{}, \ + table_id:{:?}, \ + expect:{}, \ + actual:{}, \ + last_sequence:{}, \ + sequence:{}", + table_data.name, + table_data.id, + table_schema_version, + row_group.schema().version(), + last_sequence, + sequence, + ); + + continue; + } + + let index_in_writer = + IndexInWriterSchema::for_same_schema(row_group.schema().num_columns()); + let memtable_writer = MemTableWriter::new(table_data.clone(), serial_exec); + memtable_writer + .write(sequence, &row_group.into(), index_in_writer) + .box_err() + .context(ReplayWalWithCause { + msg: Some(format!("table_id:{}, table_name:{}, space_id:{}", table_data.space_id, table_data.name, table_data.id)), + })?; + + // Flush the table if necessary. + if table_data.should_flush_table(serial_exec) { + let opts = TableFlushOptions { + res_sender: None, + max_retry_flush_limit, + }; + let flush_scheduler = serial_exec.flush_scheduler(); + flusher + .schedule_flush(flush_scheduler, table_data, opts) + .await + .box_err() + .context(ReplayWalWithCause { + msg: Some(format!("table_id:{}, table_name:{}, space_id:{}", table_data.space_id, table_data.name, table_data.id)), + })?; + } + } + ReadPayload::AlterSchema { .. } | ReadPayload::AlterOptions { .. } => { + // Ignore records except Data. + // + // - DDL (AlterSchema and AlterOptions) should be recovered + // from Manifest on start. + } + } + } + + debug!( + "Replay table log entries finish, table:{}, table_id:{:?}, last_sequence:{}", + table_data.name, table_data.id, last_sequence + ); + + table_data.set_last_sequence(last_sequence); + + Ok(()) +} From a08696726668267f96fe66d24524b4d97e9602a3 Mon Sep 17 00:00:00 2001 From: kamille Date: Wed, 7 Jun 2023 11:08:27 +0800 Subject: [PATCH 02/15] add test for `split_log_batch_by_table` in `WalReplayer`. --- analytic_engine/src/instance/mod.rs | 1 + analytic_engine/src/instance/wal_replayer.rs | 500 ++++++++++++------- table_engine/src/table.rs | 1 + 3 files changed, 308 insertions(+), 194 deletions(-) diff --git a/analytic_engine/src/instance/mod.rs b/analytic_engine/src/instance/mod.rs index 89a71ba2d6..e58ad92f73 100644 --- a/analytic_engine/src/instance/mod.rs +++ b/analytic_engine/src/instance/mod.rs @@ -15,6 +15,7 @@ pub(crate) mod mem_collector; pub mod open; mod read; pub(crate) mod serial_executor; +pub mod wal_replayer; pub(crate) mod write; use std::sync::Arc; diff --git a/analytic_engine/src/instance/wal_replayer.rs b/analytic_engine/src/instance/wal_replayer.rs index e7f8947958..555648f561 100644 --- a/analytic_engine/src/instance/wal_replayer.rs +++ b/analytic_engine/src/instance/wal_replayer.rs @@ -1,21 +1,41 @@ // Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0. -use std::{collections::{VecDeque, HashMap}, fmt::Display}; +use std::{ + collections::{HashMap, VecDeque}, + fmt::Display, + sync::Arc, +}; use async_trait::async_trait; -use common_types::{table::ShardId, schema::IndexInWriterSchema, SequenceNumber}; -use common_util::error::{GenericError, GenericResult, BoxError}; -use log::{debug, info, trace, error}; +use common_types::{schema::IndexInWriterSchema, table::ShardId, SequenceNumber}; +use common_util::error::{BoxError, GenericError, GenericResult}; +use log::{debug, error, info, trace}; use snafu::ResultExt; -use table_engine::table::TableId; -use wal::{manager::{WalManager, ReadContext, ReadRequest, WalManagerRef, ReadBoundary, ScanRequest, RegionId, ScanContext}, log_batch::LogEntry}; -use crate::{instance::{engine::{Result, ReplayWalNoCause, ReplayWalWithCause}, -flush_compaction::{Flusher, TableFlushOptions}, serial_executor::TableOpSerialExecutor, self, write::MemTableWriter}, -table::data::TableDataRef, payload::{ReadPayload, WalDecoder}}; +use table_engine::table::{TableId, TableRef}; +use tokio::sync::{Mutex, MutexGuard}; +use wal::{ + log_batch::LogEntry, + manager::{ + ReadBoundary, ReadContext, ReadRequest, RegionId, ScanContext, ScanRequest, WalManager, + WalManagerRef, + }, +}; + +use crate::{ + instance::{ + self, + engine::{ReplayWalNoCause, ReplayWalWithCause, Result}, + flush_compaction::{Flusher, TableFlushOptions}, + serial_executor::TableOpSerialExecutor, + write::MemTableWriter, + }, + payload::{ReadPayload, WalDecoder}, + table::data::TableDataRef, +}; /// Wal replayer pub struct WalReplayer { - stages: HashMap, + results: HashMap>, context: ReplayContext, mode: ReplayMode, } @@ -30,35 +50,26 @@ impl WalReplayer { } } - pub async fn replay(&mut self) -> Result>> { + pub async fn replay( + &mut self, + table_datas: Vec, + ) -> Result>> { // Build core according to mode. - let core = Self::build_core(self.mode); + let core = Self::build_core(self.mode); info!( - "Replay wal logs begin, context:{}, states:{:?}", self.context, self.stages + "Replay wal logs begin, context:{}, tables:{:?}", + self.context, table_datas ); - core.replay(&self.context, &mut self.stages).await?; + let result = core.replay(&self.context, &table_datas).await; info!( - "Replay wal logs finish, context:{}, states:{:?}", self.context, self.stages, - ); - - // Return the replay results. - let stages = std::mem::take(&mut self.stages); - let mut results = HashMap::with_capacity(self.stages.len()); - for (table_id, stage) in stages { - let result = match stage { - ReplayStage::Failed(e) => Err(e), - ReplayStage::Success(_) => Ok(()), - ReplayStage::Replay(_) => return ReplayWalNoCause { - msg: Some(format!("invalid stage, stage:{stage:?}, table_id:{table_id}, context:{}, mode:{:?}", self.context, self.mode)), - }.fail(), - }; - results.insert(table_id, result); - } + "Replay wal logs finish, context:{}, tables:{:?}", + self.context, table_datas, + ); - Ok(results) + result } } - + struct ReplayContext { pub shard_id: ShardId, pub wal_manager: WalManagerRef, @@ -70,20 +81,13 @@ struct ReplayContext { impl Display for ReplayContext { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("ReplayContext") - .field("shard_id", &self.shard_id) - .field("replay_batch_size", &self.wal_replay_batch_size) - .field("max_retry_flush_limit", &self.max_retry_flush_limit) - .finish() + .field("shard_id", &self.shard_id) + .field("replay_batch_size", &self.wal_replay_batch_size) + .field("max_retry_flush_limit", &self.max_retry_flush_limit) + .finish() } } -#[derive(Debug)] -enum ReplayStage { - Replay(TableDataRef), - Failed(crate::instance::engine::Error), - Success(()), -} - #[derive(Debug, Clone, Copy)] enum ReplayMode { RegionBased, @@ -93,7 +97,11 @@ enum ReplayMode { /// Replay core, the abstract of different replay strategies #[async_trait] trait ReplayCore { - async fn replay(&self, context: &ReplayContext, states: &mut HashMap) -> Result<()>; + async fn replay( + &self, + context: &ReplayContext, + table_datas: &[TableDataRef], + ) -> Result>>; } /// Table based wal replay core @@ -101,47 +109,34 @@ struct TableBasedCore; #[async_trait] impl ReplayCore for TableBasedCore { - async fn replay(&self, context: &ReplayContext, states: &mut HashMap) -> Result<()> { - debug!( - "Replay wal logs on table mode, context:{context}, states:{states:?}", - ); - - for (table_id, stage) in states.iter_mut() { - match stage { - ReplayStage::Replay(table_data) => { - let read_ctx = ReadContext { - batch_size: context.wal_replay_batch_size, - ..Default::default() - }; - - let result = Self::recover_table_logs( - &context.flusher, - context.max_retry_flush_limit, - context.wal_manager.as_ref(), - table_data.clone(), - context.wal_replay_batch_size, - &read_ctx, - ).await; - - match result { - Ok(()) => { - *stage = ReplayStage::Success(()); - }, - Err(e) => { - *stage = ReplayStage::Failed(e); - }, - } - } + async fn replay( + &self, + context: &ReplayContext, + table_datas: &[TableDataRef], + ) -> Result>> { + debug!("Replay wal logs on table mode, context:{context}, tables:{table_datas:?}",); + + let mut results = HashMap::with_capacity(table_datas.len()); + let read_ctx = ReadContext { + batch_size: context.wal_replay_batch_size, + ..Default::default() + }; + for table_data in table_datas { + let table_id = table_data.id; + let result = Self::recover_table_logs( + &context.flusher, + context.max_retry_flush_limit, + context.wal_manager.as_ref(), + table_data, + context.wal_replay_batch_size, + &read_ctx, + ) + .await; - ReplayStage::Failed(_) | ReplayStage::Success(_) => { - return ReplayWalNoCause { - msg: Some(format!("invalid stage, stage:{stage:?}, table_id:{}, shard_id:{}", table_id, context.shard_id)) - }.fail(); - } - } + results.insert(table_id, result); } - Ok(()) + Ok(results) } } @@ -153,7 +148,7 @@ impl TableBasedCore { flusher: &Flusher, max_retry_flush_limit: usize, wal_manager: &dyn WalManager, - table_data: TableDataRef, + table_data: &TableDataRef, replay_batch_size: usize, read_ctx: &ReadContext, ) -> Result<()> { @@ -171,9 +166,7 @@ impl TableBasedCore { .read_batch(read_ctx, &read_req) .await .box_err() - .context(ReplayWalWithCause { - msg: None, - })?; + .context(ReplayWalWithCause { msg: None })?; let mut serial_exec = table_data.serial_exec.lock().await; let mut log_entry_buf = VecDeque::with_capacity(replay_batch_size); @@ -184,23 +177,21 @@ impl TableBasedCore { .next_log_entries(decoder, log_entry_buf) .await .box_err() - .context(ReplayWalWithCause { - msg: None, - })?; + .context(ReplayWalWithCause { msg: None })?; if log_entry_buf.is_empty() { break; } - + // Replay all log entries of current table let last_sequence = log_entry_buf.back().unwrap().sequence; replay_table_log_entries( flusher, max_retry_flush_limit, &mut serial_exec, - &table_data, + table_data, log_entry_buf.iter(), - last_sequence + last_sequence, ) .await?; } @@ -214,65 +205,51 @@ struct RegionBasedCore; #[async_trait] impl ReplayCore for RegionBasedCore { - async fn replay(&self, context: &ReplayContext, states: &mut HashMap) -> Result<()> { - debug!( - "Replay wal logs on region mode, context:{context}, states:{states:?}", - ); + async fn replay( + &self, + context: &ReplayContext, + table_datas: &[TableDataRef], + ) -> Result>> { + debug!("Replay wal logs on region mode, context:{context}, states:{table_datas:?}",); + + let mut results = HashMap::with_capacity(table_datas.len()); + let scan_ctx = ScanContext { + batch_size: context.wal_replay_batch_size, + ..Default::default() + }; - for (table_id, state) in states.iter_mut() { - match state { - ReplayStage::Replay(table_data) => { - let read_ctx = ReadContext { - batch_size: context.wal_replay_batch_size, - ..Default::default() - }; - - let result = Self::replay_region_logs( - &context.flusher, - context.max_retry_flush_limit, - context.wal_manager.as_ref(), - table_data.clone(), - context.wal_replay_batch_size, - &read_ctx, - states, - ).await; - - match result { - Ok(()) => { - *state = ReplayStage::Success(()); - }, - Err(e) => { - *state = ReplayStage::Failed(e); - }, - } - }, - ReplayStage::Failed(_) | ReplayStage::Success(_) => { - return ReplayWalNoCause { - msg: Some(format!("table_id:{}, shard_id:{}", table_id, context.shard_id)) - }.fail(); - } - } - } + Self::replay_region_logs( + &table_datas, + context.shard_id, + &context.flusher, + context.max_retry_flush_limit, + context.wal_manager.as_ref(), + context.wal_replay_batch_size, + &scan_ctx, + &mut results, + ) + .await?; - Ok(()) + Ok(results) } } impl RegionBasedCore { /// Replay logs in same region. - /// + /// /// Steps: /// + Scan all logs of region. /// + Split logs according to table ids. /// + Replay logs to recover data of tables. async fn replay_region_logs( + table_datas: &[TableDataRef], shard_id: ShardId, flusher: &Flusher, max_retry_flush_limit: usize, wal_manager: &dyn WalManager, replay_batch_size: usize, scan_ctx: &ScanContext, - states: &mut HashMap + table_results: &mut HashMap>, ) -> Result<()> { // Scan all wal logs of current shard. let scan_req = ScanRequest { @@ -283,25 +260,18 @@ impl RegionBasedCore { .scan(scan_ctx, &scan_req) .await .box_err() - .context(ReplayWalWithCause { - msg: None, - })?; + .context(ReplayWalWithCause { msg: None })?; let mut log_entry_buf = VecDeque::with_capacity(replay_batch_size); - + // Lock all related tables. - let mut table_serial_execs = HashMap::with_capacity(states.len()); - for (table_id, state) in states { - match state { - ReplayStage::Replay(table_data) => { - let mut serial_exec = table_data.serial_exec.lock().await; - table_serial_execs.insert(*table_id, &mut *serial_exec); - } - ReplayStage::Failed(_) | ReplayStage::Success(_) => { - return ReplayWalNoCause { - msg: Some(format!("table_id:{table_id}, shard_id:{shard_id}")) - }.fail(); - } - } + let mut serial_exec_ctxs = HashMap::with_capacity(table_datas.len()); + for table_data in table_datas { + let serial_exec = table_data.serial_exec.lock().await; + let serial_exec_ctx = SerialExecContext { + table_data: table_data.clone(), + serial_exec, + }; + serial_exec_ctxs.insert(table_data.id, serial_exec_ctx); } // Split and replay logs. @@ -311,51 +281,69 @@ impl RegionBasedCore { .next_log_entries(decoder, log_entry_buf) .await .box_err() - .context(ReplayWalWithCause { - msg: None, - })?; + .context(ReplayWalWithCause { msg: None })?; if log_entry_buf.is_empty() { break; } - Self::replay_single_batch(&log_entry_buf, flusher, max_retry_flush_limit, wal_manager, replay_batch_size, read_ctx, &table_serial_execs, states).await; + Self::replay_single_batch( + &log_entry_buf, + flusher, + max_retry_flush_limit, + wal_manager, + replay_batch_size, + scan_ctx, + &mut serial_exec_ctxs, + table_results, + ) + .await?; } Ok(()) } async fn replay_single_batch( - log_batch: &VecDeque>, - flusher: &Flusher, - max_retry_flush_limit: usize, - wal_manager: &dyn WalManager, - replay_batch_size: usize, - read_ctx: &ReadContext, - table_serial_execs: &HashMap, - stages: &mut HashMap, - ) -> Result<()> - { - let mut table_batches = Vec::with_capacity(stages.len()); + log_batch: &VecDeque>, + flusher: &Flusher, + max_retry_flush_limit: usize, + wal_manager: &dyn WalManager, + replay_batch_size: usize, + read_ctx: &ReadContext, + serial_exec_ctxs: &mut HashMap>, + table_results: &mut HashMap>, + ) -> Result<()> { + let mut table_batches = Vec::with_capacity(serial_exec_ctxs.len()); Self::split_log_batch_by_table(&log_batch, &mut table_batches); for table_batch in table_batches { + let table_result = table_results.get(&table_batch.table_id); + if let Some(Err(_)) = table_result { + return Ok(()); + } + // Replay all log entries of current table + let serial_exec_ctx = serial_exec_ctxs.get_mut(&table_batch.table_id).unwrap(); let last_sequence = table_batch.last_sequence; - replay_table_log_entries( + let result = replay_table_log_entries( flusher, max_retry_flush_limit, - &mut serial_exec, - &table_data, + &mut serial_exec_ctx.serial_exec, + &serial_exec_ctx.table_data, log_batch.range(table_batch.start_log_idx..table_batch.end_log_idx), - last_sequence + last_sequence, ) - .await?; + .await; + + table_results.insert(table_batch.table_id, result); } Ok(()) } - fn split_log_batch_by_table

(log_batch: &VecDeque>, table_batches: &mut Vec) { + fn split_log_batch_by_table

( + log_batch: &VecDeque>, + table_batches: &mut Vec, + ) { table_batches.clear(); if log_batch.is_empty() { @@ -369,35 +357,48 @@ impl RegionBasedCore { // output batches: // |1|1|, |2|2|2|, |3|3|3|3| let mut start_log_idx = 0usize; - for log_idx in 0..log_batch.len() { - let last_round = log_idx + 1 == log_batch.len(); - - let start_log_entry = log_batch.get(start_log_idx).unwrap(); - let current_log_entry = log_batch.get(log_idx).unwrap(); - - let start_table_id = start_log_entry.table_id; - let current_table_id = current_log_entry.table_id; - let current_sequence = current_log_entry.sequence; - - if (current_table_id != start_table_id) || last_round { - let end_log_idx = if last_round { - log_batch.len() + let mut curr_log_idx = 0usize; + let mut start_table_id = log_batch.get(start_log_idx).unwrap().table_id; + loop { + let time_to_break = curr_log_idx == log_batch.len(); + let found_end_idx = if time_to_break { + true + } else { + let current_table_id = log_batch.get(curr_log_idx).unwrap().table_id; + if current_table_id != start_table_id { + true } else { - start_log_idx = log_idx; - log_idx - }; - + false + } + }; + + if found_end_idx { + let last_sequence = log_batch.get(curr_log_idx - 1).unwrap().sequence; table_batches.push(TableBatch { table_id: TableId::new(start_table_id), - last_sequence: current_sequence, + last_sequence, start_log_idx, - end_log_idx, + end_log_idx: curr_log_idx, }); + + // Step to next start idx. + start_log_idx = curr_log_idx; + start_table_id = if time_to_break { + u64::MAX + } else { + log_batch.get(start_log_idx).unwrap().table_id + }; + } + + if time_to_break { + break; } + curr_log_idx += 1; } } } +#[derive(Debug, Eq, PartialEq)] struct TableBatch { table_id: TableId, last_sequence: SequenceNumber, @@ -405,6 +406,11 @@ struct TableBatch { end_log_idx: usize, } +struct SerialExecContext<'a> { + table_data: TableDataRef, + serial_exec: MutexGuard<'a, TableOpSerialExecutor>, +} + /// Replay all log entries into memtable and flush if necessary. async fn replay_table_log_entries( flusher: &Flusher, @@ -461,7 +467,10 @@ async fn replay_table_log_entries( .write(sequence, &row_group.into(), index_in_writer) .box_err() .context(ReplayWalWithCause { - msg: Some(format!("table_id:{}, table_name:{}, space_id:{}", table_data.space_id, table_data.name, table_data.id)), + msg: Some(format!( + "table_id:{}, table_name:{}, space_id:{}", + table_data.space_id, table_data.name, table_data.id + )), })?; // Flush the table if necessary. @@ -476,15 +485,18 @@ async fn replay_table_log_entries( .await .box_err() .context(ReplayWalWithCause { - msg: Some(format!("table_id:{}, table_name:{}, space_id:{}", table_data.space_id, table_data.name, table_data.id)), + msg: Some(format!( + "table_id:{}, table_name:{}, space_id:{}", + table_data.space_id, table_data.name, table_data.id + )), })?; } } ReadPayload::AlterSchema { .. } | ReadPayload::AlterOptions { .. } => { // Ignore records except Data. // - // - DDL (AlterSchema and AlterOptions) should be recovered - // from Manifest on start. + // - DDL (AlterSchema and AlterOptions) should be recovered from + // Manifest on start. } } } @@ -498,3 +510,103 @@ async fn replay_table_log_entries( Ok(()) } + +#[cfg(test)] +mod tests { + use std::collections::VecDeque; + + use table_engine::table::TableId; + use wal::log_batch::LogEntry; + + use crate::instance::wal_replayer::{RegionBasedCore, TableBatch}; + + #[test] + fn test_split_log_batch_by_table() { + let test_set = test_set(); + for (test_batch, expected) in test_set { + check_split_result(&test_batch, &expected); + } + } + + fn test_set() -> Vec<(VecDeque>, Vec)> { + let test_log_batch1: VecDeque> = VecDeque::from([ + LogEntry { + table_id: 0, + sequence: 1, + payload: 0, + }, + LogEntry { + table_id: 0, + sequence: 2, + payload: 0, + }, + LogEntry { + table_id: 0, + sequence: 3, + payload: 0, + }, + LogEntry { + table_id: 1, + sequence: 1, + payload: 0, + }, + LogEntry { + table_id: 1, + sequence: 2, + payload: 0, + }, + LogEntry { + table_id: 2, + sequence: 1, + payload: 0, + }, + ]); + let expected1 = vec![ + TableBatch { + table_id: TableId::new(0), + last_sequence: 3, + start_log_idx: 0, + end_log_idx: 3, + }, + TableBatch { + table_id: TableId::new(1), + last_sequence: 2, + start_log_idx: 3, + end_log_idx: 5, + }, + TableBatch { + table_id: TableId::new(2), + last_sequence: 1, + start_log_idx: 5, + end_log_idx: 6, + }, + ]; + + let test_log_batch2: VecDeque> = VecDeque::from([LogEntry { + table_id: 0, + sequence: 1, + payload: 0, + }]); + let expected2 = vec![TableBatch { + table_id: TableId::new(0), + last_sequence: 1, + start_log_idx: 0, + end_log_idx: 1, + }]; + + let test_log_batch3: VecDeque> = VecDeque::default(); + let expected3 = vec![]; + + vec![ + (test_log_batch1, expected1), + (test_log_batch2, expected2), + (test_log_batch3, expected3), + ] + } + + fn check_split_result(batch: &VecDeque>, expected: &[TableBatch]) { + let mut table_batches = Vec::new(); + RegionBasedCore::split_log_batch_by_table(batch, &mut table_batches); + assert_eq!(&table_batches, expected); + } +} diff --git a/table_engine/src/table.rs b/table_engine/src/table.rs index 8d7638b685..821bdb6195 100644 --- a/table_engine/src/table.rs +++ b/table_engine/src/table.rs @@ -249,6 +249,7 @@ impl From for TableSeq { pub struct TableId(u64); impl TableId { + pub const MAX: TableId = TableId(u64::MAX); /// Min table id. pub const MIN: TableId = TableId(0); From 257c091f4b07c69d09721644fc600b9d1dbf4bef Mon Sep 17 00:00:00 2001 From: kamille Date: Wed, 7 Jun 2023 13:46:31 +0800 Subject: [PATCH 03/15] make use of `WalReplayer`. --- .../src/instance/flush_compaction.rs | 1 + analytic_engine/src/instance/open.rs | 261 ++++-------------- analytic_engine/src/instance/wal_replayer.rs | 129 ++++----- 3 files changed, 106 insertions(+), 285 deletions(-) diff --git a/analytic_engine/src/instance/flush_compaction.rs b/analytic_engine/src/instance/flush_compaction.rs index 0dc4c6208c..c2ca13bb9a 100644 --- a/analytic_engine/src/instance/flush_compaction.rs +++ b/analytic_engine/src/instance/flush_compaction.rs @@ -163,6 +163,7 @@ pub struct TableFlushRequest { pub max_sequence: SequenceNumber, } +#[derive(Clone)] pub struct Flusher { pub space_store: SpaceStoreRef, diff --git a/analytic_engine/src/instance/open.rs b/analytic_engine/src/instance/open.rs index 6c4d178fe2..df63157971 100644 --- a/analytic_engine/src/instance/open.rs +++ b/analytic_engine/src/instance/open.rs @@ -3,19 +3,16 @@ //! Open logic of instance use std::{ - collections::{HashMap, VecDeque}, + collections::HashMap, sync::{Arc, RwLock}, }; -use common_types::{schema::IndexInWriterSchema, table::ShardId}; -use log::{debug, error, info, trace}; +use common_types::table::ShardId; +use log::info; use object_store::ObjectStoreRef; use snafu::ResultExt; use table_engine::{engine::TableDef, table::TableId}; -use wal::{ - log_batch::LogEntry, - manager::{ReadBoundary, ReadContext, ReadRequest, WalManager, WalManagerRef}, -}; +use wal::manager::WalManagerRef; use super::{engine::OpenTablesOfShard, flush_compaction::Flusher}; use crate::{ @@ -23,16 +20,12 @@ use crate::{ context::OpenContext, engine, instance::{ - self, - engine::{ApplyMemTable, FlushTable, OpenManifest, ReadMetaUpdate, ReadWal, Result}, - flush_compaction::TableFlushOptions, + engine::{OpenManifest, ReadMetaUpdate, Result}, mem_collector::MemUsageCollector, - serial_executor::TableOpSerialExecutor, - write::MemTableWriter, + wal_replayer::WalReplayer, Instance, SpaceStore, }, manifest::{details::ManifestImpl, LoadRequest, Manifest, ManifestRef}, - payload::{ReadPayload, WalDecoder}, row_iter::IterOptions, space::{SpaceAndTable, SpaceRef, Spaces}, sst::{ @@ -319,46 +312,57 @@ impl ShardOpener { /// Recover table data based on shard. async fn recover_table_datas(&mut self) -> Result<()> { - for state in self.states.values_mut() { - match state { + // Replay wal logs of tables. + let mut replay_table_datas = Vec::with_capacity(self.states.len()); + for (table_id, stage) in self.states.iter_mut() { + match stage { // Only do the wal recovery work in `RecoverTableData` state. TableOpenStage::RecoverTableData(ctx) => { - let table_data = ctx.table_data.clone(); - let read_ctx = ReadContext { - batch_size: self.wal_replay_batch_size, - ..Default::default() - }; - - let result = match Self::recover_single_table_data( - &self.flusher, - self.max_retry_flush_limit, - self.wal_manager.as_ref(), - table_data.clone(), - self.wal_replay_batch_size, - &read_ctx, - ) - .await - { - Ok(()) => Ok((table_data, ctx.space.clone())), - Err(e) => Err(e), - }; - - match result { - Ok((table_data, space)) => { - *state = TableOpenStage::Success(Some(SpaceAndTable::new( - space, table_data, - ))); - } - Err(e) => *state = TableOpenStage::Failed(e), - } + replay_table_datas.push(ctx.table_data.clone()); } // Table was found opened, or failed in meta recovery stage. TableOpenStage::Failed(_) | TableOpenStage::Success(_) => {} TableOpenStage::RecoverTableMeta(_) => { return OpenTablesOfShard { - msg: format!("unexpected table state:{state:?}"), + msg: format!( + "unexpected stage, stage:{stage:?}, table_id:{table_id}, shard_id:{}", + self.shard_id + ), } - .fail() + .fail(); + } + } + } + let mut wal_replayer = WalReplayer::new( + &replay_table_datas, + self.shard_id, + self.wal_manager.clone(), + self.wal_replay_batch_size, + self.flusher.clone(), + self.max_retry_flush_limit, + ); + let mut table_results = wal_replayer.replay().await?; + + // Process the replay results. + for table_data in replay_table_datas { + let table_id = table_data.id; + let stage = self.states.get_mut(&table_id).unwrap(); + let table_result = table_results.remove(&table_id).unwrap(); + + match (&stage, table_result) { + (TableOpenStage::RecoverTableData(ctx), Ok(())) => { + let space_table = SpaceAndTable::new(ctx.space.clone(), ctx.table_data.clone()); + *stage = TableOpenStage::Success(Some(space_table)); + } + + (TableOpenStage::RecoverTableData(_), Err(e)) => { + *stage = TableOpenStage::Failed(e); + } + + (other_stage, _) => { + return OpenTablesOfShard { + msg: format!("unexpected stage, stage:{other_stage:?}, table_id:{table_id}, shard_id:{}", self.shard_id), + }.fail(); } } } @@ -398,171 +402,4 @@ impl ShardOpener { Ok(()) } - - /// Recover table data from wal. - /// - /// Called by write worker - pub(crate) async fn recover_single_table_data( - flusher: &Flusher, - max_retry_flush_limit: usize, - wal_manager: &dyn WalManager, - table_data: TableDataRef, - replay_batch_size: usize, - read_ctx: &ReadContext, - ) -> Result<()> { - debug!( - "Instance recover table from wal, replay batch size:{}, table id:{}, shard info:{:?}", - replay_batch_size, table_data.id, table_data.shard_info - ); - - let table_location = table_data.table_location(); - let wal_location = - instance::create_wal_location(table_location.id, table_location.shard_info); - let read_req = ReadRequest { - location: wal_location, - start: ReadBoundary::Excluded(table_data.current_version().flushed_sequence()), - end: ReadBoundary::Max, - }; - - // Read all wal of current table. - let mut log_iter = wal_manager - .read_batch(read_ctx, &read_req) - .await - .context(ReadWal)?; - - let mut serial_exec = table_data.serial_exec.lock().await; - let mut log_entry_buf = VecDeque::with_capacity(replay_batch_size); - loop { - // fetch entries to log_entry_buf - let decoder = WalDecoder::default(); - log_entry_buf = log_iter - .next_log_entries(decoder, log_entry_buf) - .await - .context(ReadWal)?; - - // Replay all log entries of current table - Self::replay_table_log_entries( - flusher, - max_retry_flush_limit, - &mut serial_exec, - &table_data, - &log_entry_buf, - ) - .await?; - - // No more entries. - if log_entry_buf.is_empty() { - break; - } - } - - Ok(()) - } - - /// Replay all log entries into memtable and flush if necessary. - async fn replay_table_log_entries( - flusher: &Flusher, - max_retry_flush_limit: usize, - serial_exec: &mut TableOpSerialExecutor, - table_data: &TableDataRef, - log_entries: &VecDeque>, - ) -> Result<()> { - if log_entries.is_empty() { - info!( - "Instance replay an empty table log entries, table:{}, table_id:{:?}", - table_data.name, table_data.id - ); - - // No data in wal - return Ok(()); - } - - let last_sequence = log_entries.back().unwrap().sequence; - - debug!( - "Instance replay table log entries begin, table:{}, table_id:{:?}, sequence:{}", - table_data.name, table_data.id, last_sequence - ); - - for log_entry in log_entries { - let (sequence, payload) = (log_entry.sequence, &log_entry.payload); - - // Apply to memtable - match payload { - ReadPayload::Write { row_group } => { - trace!( - "Instance replay row_group, table:{}, row_group:{:?}", - table_data.name, - row_group - ); - - let table_schema_version = table_data.schema_version(); - if table_schema_version != row_group.schema().version() { - // Data with old schema should already been flushed, but we avoid panic - // here. - error!( - "Ignore data with mismatch schema version during replaying, \ - table:{}, \ - table_id:{:?}, \ - expect:{}, \ - actual:{}, \ - last_sequence:{}, \ - sequence:{}", - table_data.name, - table_data.id, - table_schema_version, - row_group.schema().version(), - last_sequence, - sequence, - ); - - continue; - } - - let index_in_writer = - IndexInWriterSchema::for_same_schema(row_group.schema().num_columns()); - let memtable_writer = MemTableWriter::new(table_data.clone(), serial_exec); - memtable_writer - .write(sequence, &row_group.into(), index_in_writer) - .context(ApplyMemTable { - space_id: table_data.space_id, - table: &table_data.name, - table_id: table_data.id, - })?; - - // Flush the table if necessary. - if table_data.should_flush_table(serial_exec) { - let opts = TableFlushOptions { - res_sender: None, - max_retry_flush_limit, - }; - let flush_scheduler = serial_exec.flush_scheduler(); - flusher - .schedule_flush(flush_scheduler, table_data, opts) - .await - .context(FlushTable { - space_id: table_data.space_id, - table: &table_data.name, - table_id: table_data.id, - })?; - } - } - ReadPayload::AlterSchema { .. } | ReadPayload::AlterOptions { .. } => { - // Ignore records except Data. - // - // - DDL (AlterSchema and AlterOptions) should be recovered - // from Manifest on start. - } - } - } - - debug!( - "Instance replay table log entries end, table:{}, table_id:{:?}, last_sequence:{}", - table_data.name, table_data.id, last_sequence - ); - - table_data.set_last_sequence(last_sequence); - - Ok(()) - } } diff --git a/analytic_engine/src/instance/wal_replayer.rs b/analytic_engine/src/instance/wal_replayer.rs index 555648f561..72b145e990 100644 --- a/analytic_engine/src/instance/wal_replayer.rs +++ b/analytic_engine/src/instance/wal_replayer.rs @@ -3,28 +3,26 @@ use std::{ collections::{HashMap, VecDeque}, fmt::Display, - sync::Arc, }; use async_trait::async_trait; use common_types::{schema::IndexInWriterSchema, table::ShardId, SequenceNumber}; -use common_util::error::{BoxError, GenericError, GenericResult}; +use common_util::error::BoxError; use log::{debug, error, info, trace}; use snafu::ResultExt; -use table_engine::table::{TableId, TableRef}; -use tokio::sync::{Mutex, MutexGuard}; +use table_engine::table::TableId; +use tokio::sync::MutexGuard; use wal::{ log_batch::LogEntry, manager::{ - ReadBoundary, ReadContext, ReadRequest, RegionId, ScanContext, ScanRequest, WalManager, - WalManagerRef, + ReadBoundary, ReadContext, ReadRequest, RegionId, ScanContext, ScanRequest, WalManagerRef, }, }; use crate::{ instance::{ self, - engine::{ReplayWalNoCause, ReplayWalWithCause, Result}, + engine::{ReplayWalWithCause, Result}, flush_compaction::{Flusher, TableFlushOptions}, serial_executor::TableOpSerialExecutor, write::MemTableWriter, @@ -34,13 +32,36 @@ use crate::{ }; /// Wal replayer -pub struct WalReplayer { - results: HashMap>, +pub struct WalReplayer<'a> { context: ReplayContext, mode: ReplayMode, + table_datas: &'a [TableDataRef], } -impl WalReplayer { +impl<'a> WalReplayer<'a> { + pub fn new( + table_datas: &'a [TableDataRef], + shard_id: ShardId, + wal_manager: WalManagerRef, + wal_replay_batch_size: usize, + flusher: Flusher, + max_retry_flush_limit: usize, + ) -> Self { + let context = ReplayContext { + shard_id, + wal_manager, + wal_replay_batch_size, + flusher, + max_retry_flush_limit, + }; + + Self { + mode: ReplayMode::RegionBased, + context, + table_datas, + } + } + fn build_core(mode: ReplayMode) -> Box { info!("Replay wal in mode:{mode:?}"); @@ -50,27 +71,24 @@ impl WalReplayer { } } - pub async fn replay( - &mut self, - table_datas: Vec, - ) -> Result>> { + pub async fn replay(&mut self) -> Result>> { // Build core according to mode. let core = Self::build_core(self.mode); info!( "Replay wal logs begin, context:{}, tables:{:?}", - self.context, table_datas + self.context, self.table_datas ); - let result = core.replay(&self.context, &table_datas).await; + let result = core.replay(&self.context, self.table_datas).await; info!( "Replay wal logs finish, context:{}, tables:{:?}", - self.context, table_datas, + self.context, self.table_datas, ); result } } -struct ReplayContext { +pub struct ReplayContext { pub shard_id: ShardId, pub wal_manager: WalManagerRef, pub wal_replay_batch_size: usize, @@ -96,7 +114,7 @@ enum ReplayMode { /// Replay core, the abstract of different replay strategies #[async_trait] -trait ReplayCore { +trait ReplayCore: Send + Sync + 'static { async fn replay( &self, context: &ReplayContext, @@ -123,15 +141,7 @@ impl ReplayCore for TableBasedCore { }; for table_data in table_datas { let table_id = table_data.id; - let result = Self::recover_table_logs( - &context.flusher, - context.max_retry_flush_limit, - context.wal_manager.as_ref(), - table_data, - context.wal_replay_batch_size, - &read_ctx, - ) - .await; + let result = Self::recover_table_logs(context, table_data, &read_ctx).await; results.insert(table_id, result); } @@ -145,11 +155,8 @@ impl TableBasedCore { /// /// Called by write worker async fn recover_table_logs( - flusher: &Flusher, - max_retry_flush_limit: usize, - wal_manager: &dyn WalManager, + context: &ReplayContext, table_data: &TableDataRef, - replay_batch_size: usize, read_ctx: &ReadContext, ) -> Result<()> { let table_location = table_data.table_location(); @@ -162,14 +169,15 @@ impl TableBasedCore { }; // Read all wal of current table. - let mut log_iter = wal_manager + let mut log_iter = context + .wal_manager .read_batch(read_ctx, &read_req) .await .box_err() .context(ReplayWalWithCause { msg: None })?; let mut serial_exec = table_data.serial_exec.lock().await; - let mut log_entry_buf = VecDeque::with_capacity(replay_batch_size); + let mut log_entry_buf = VecDeque::with_capacity(context.wal_replay_batch_size); loop { // fetch entries to log_entry_buf let decoder = WalDecoder::default(); @@ -186,8 +194,8 @@ impl TableBasedCore { // Replay all log entries of current table let last_sequence = log_entry_buf.back().unwrap().sequence; replay_table_log_entries( - flusher, - max_retry_flush_limit, + &context.flusher, + context.max_retry_flush_limit, &mut serial_exec, table_data, log_entry_buf.iter(), @@ -218,17 +226,7 @@ impl ReplayCore for RegionBasedCore { ..Default::default() }; - Self::replay_region_logs( - &table_datas, - context.shard_id, - &context.flusher, - context.max_retry_flush_limit, - context.wal_manager.as_ref(), - context.wal_replay_batch_size, - &scan_ctx, - &mut results, - ) - .await?; + Self::replay_region_logs(context, table_datas, &scan_ctx, &mut results).await?; Ok(results) } @@ -242,26 +240,23 @@ impl RegionBasedCore { /// + Split logs according to table ids. /// + Replay logs to recover data of tables. async fn replay_region_logs( + context: &ReplayContext, table_datas: &[TableDataRef], - shard_id: ShardId, - flusher: &Flusher, - max_retry_flush_limit: usize, - wal_manager: &dyn WalManager, - replay_batch_size: usize, scan_ctx: &ScanContext, table_results: &mut HashMap>, ) -> Result<()> { // Scan all wal logs of current shard. let scan_req = ScanRequest { - region_id: shard_id as RegionId, + region_id: context.shard_id as RegionId, }; - let mut log_iter = wal_manager + let mut log_iter = context + .wal_manager .scan(scan_ctx, &scan_req) .await .box_err() .context(ReplayWalWithCause { msg: None })?; - let mut log_entry_buf = VecDeque::with_capacity(replay_batch_size); + let mut log_entry_buf = VecDeque::with_capacity(context.wal_replay_batch_size); // Lock all related tables. let mut serial_exec_ctxs = HashMap::with_capacity(table_datas.len()); @@ -288,12 +283,8 @@ impl RegionBasedCore { } Self::replay_single_batch( + context, &log_entry_buf, - flusher, - max_retry_flush_limit, - wal_manager, - replay_batch_size, - scan_ctx, &mut serial_exec_ctxs, table_results, ) @@ -304,17 +295,13 @@ impl RegionBasedCore { } async fn replay_single_batch( + context: &ReplayContext, log_batch: &VecDeque>, - flusher: &Flusher, - max_retry_flush_limit: usize, - wal_manager: &dyn WalManager, - replay_batch_size: usize, - read_ctx: &ReadContext, serial_exec_ctxs: &mut HashMap>, table_results: &mut HashMap>, ) -> Result<()> { let mut table_batches = Vec::with_capacity(serial_exec_ctxs.len()); - Self::split_log_batch_by_table(&log_batch, &mut table_batches); + Self::split_log_batch_by_table(log_batch, &mut table_batches); for table_batch in table_batches { let table_result = table_results.get(&table_batch.table_id); if let Some(Err(_)) = table_result { @@ -325,8 +312,8 @@ impl RegionBasedCore { let serial_exec_ctx = serial_exec_ctxs.get_mut(&table_batch.table_id).unwrap(); let last_sequence = table_batch.last_sequence; let result = replay_table_log_entries( - flusher, - max_retry_flush_limit, + &context.flusher, + context.max_retry_flush_limit, &mut serial_exec_ctx.serial_exec, &serial_exec_ctx.table_data, log_batch.range(table_batch.start_log_idx..table_batch.end_log_idx), @@ -365,11 +352,7 @@ impl RegionBasedCore { true } else { let current_table_id = log_batch.get(curr_log_idx).unwrap().table_id; - if current_table_id != start_table_id { - true - } else { - false - } + current_table_id != start_table_id }; if found_end_idx { From a98fc429b92fc66e3d799f86a6877dbc1b7541e0 Mon Sep 17 00:00:00 2001 From: kamille Date: Wed, 7 Jun 2023 15:02:29 +0800 Subject: [PATCH 04/15] expose configs to choose replay wal in `TableBased` or `RegionBased` mode. --- analytic_engine/src/instance/mod.rs | 3 ++- analytic_engine/src/instance/open.rs | 14 +++++++++++++- analytic_engine/src/instance/wal_replayer.rs | 10 +++++++--- analytic_engine/src/lib.rs | 13 +++++++++++++ 4 files changed, 35 insertions(+), 5 deletions(-) diff --git a/analytic_engine/src/instance/mod.rs b/analytic_engine/src/instance/mod.rs index e58ad92f73..1faf254f08 100644 --- a/analytic_engine/src/instance/mod.rs +++ b/analytic_engine/src/instance/mod.rs @@ -45,7 +45,7 @@ use crate::{ meta_data::cache::MetaCacheRef, }, table::data::{TableDataRef, TableShardInfo}, - TableOptions, + RecoverMode, TableOptions, }; #[allow(clippy::enum_variant_names)] @@ -160,6 +160,7 @@ pub struct Instance { /// Options for scanning sst pub(crate) scan_options: ScanOptions, pub(crate) iter_options: Option, + pub(crate) recover_mode: RecoverMode, } impl Instance { diff --git a/analytic_engine/src/instance/open.rs b/analytic_engine/src/instance/open.rs index df63157971..6abd2271a2 100644 --- a/analytic_engine/src/instance/open.rs +++ b/analytic_engine/src/instance/open.rs @@ -22,7 +22,7 @@ use crate::{ instance::{ engine::{OpenManifest, ReadMetaUpdate, Result}, mem_collector::MemUsageCollector, - wal_replayer::WalReplayer, + wal_replayer::{ReplayMode, WalReplayer}, Instance, SpaceStore, }, manifest::{details::ManifestImpl, LoadRequest, Manifest, ManifestRef}, @@ -34,6 +34,7 @@ use crate::{ }, table::data::TableDataRef, table_meta_set_impl::TableMetaSetImpl, + RecoverMode, }; const MAX_RECORD_BATCHES_IN_FLIGHT_WHEN_COMPACTION_READ: usize = 64; @@ -126,6 +127,7 @@ impl Instance { .map(|v| v.as_byte() as usize), iter_options, scan_options, + recover_mode: ctx.config.recover_mode, }); Ok(instance) @@ -143,6 +145,7 @@ impl Instance { self.replay_batch_size, self.make_flusher(), self.max_retry_flush_limit, + self.recover_mode, )?; shard_opener.open().await @@ -194,6 +197,7 @@ struct ShardOpener { wal_replay_batch_size: usize, flusher: Flusher, max_retry_flush_limit: usize, + recover_mode: RecoverMode, } impl ShardOpener { @@ -204,6 +208,7 @@ impl ShardOpener { wal_replay_batch_size: usize, flusher: Flusher, max_retry_flush_limit: usize, + recover_mode: RecoverMode, ) -> Result { let mut states = HashMap::with_capacity(shard_context.table_ctxs.len()); for table_ctx in shard_context.table_ctxs { @@ -230,6 +235,7 @@ impl ShardOpener { wal_replay_batch_size, flusher, max_retry_flush_limit, + recover_mode, }) } @@ -333,6 +339,11 @@ impl ShardOpener { } } } + + let replay_mode = match self.recover_mode { + RecoverMode::TableBased => ReplayMode::TableBased, + RecoverMode::ShardBased => ReplayMode::RegionBased, + }; let mut wal_replayer = WalReplayer::new( &replay_table_datas, self.shard_id, @@ -340,6 +351,7 @@ impl ShardOpener { self.wal_replay_batch_size, self.flusher.clone(), self.max_retry_flush_limit, + replay_mode, ); let mut table_results = wal_replayer.replay().await?; diff --git a/analytic_engine/src/instance/wal_replayer.rs b/analytic_engine/src/instance/wal_replayer.rs index 72b145e990..53dcbc6d6e 100644 --- a/analytic_engine/src/instance/wal_replayer.rs +++ b/analytic_engine/src/instance/wal_replayer.rs @@ -1,5 +1,7 @@ // Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0. +//! Wal replayer + use std::{ collections::{HashMap, VecDeque}, fmt::Display, @@ -31,7 +33,8 @@ use crate::{ table::data::TableDataRef, }; -/// Wal replayer +/// Wal replayer supporting both table based and region based +// TODO: limit the memory usage in `RegionBased` mode. pub struct WalReplayer<'a> { context: ReplayContext, mode: ReplayMode, @@ -46,6 +49,7 @@ impl<'a> WalReplayer<'a> { wal_replay_batch_size: usize, flusher: Flusher, max_retry_flush_limit: usize, + replay_mode: ReplayMode, ) -> Self { let context = ReplayContext { shard_id, @@ -56,7 +60,7 @@ impl<'a> WalReplayer<'a> { }; Self { - mode: ReplayMode::RegionBased, + mode: replay_mode, context, table_datas, } @@ -107,7 +111,7 @@ impl Display for ReplayContext { } #[derive(Debug, Clone, Copy)] -enum ReplayMode { +pub enum ReplayMode { RegionBased, TableBased, } diff --git a/analytic_engine/src/lib.rs b/analytic_engine/src/lib.rs index 9e95d8e97b..025845afbe 100644 --- a/analytic_engine/src/lib.rs +++ b/analytic_engine/src/lib.rs @@ -97,9 +97,21 @@ pub struct Config { /// + Kafka pub wal: WalStorageConfig, + /// Recover mode + /// + /// + TableBased, tables on same shard will be recovered table by table. + /// + ShardBased, tables on same shard will be recovered together. + pub recover_mode: RecoverMode, + pub remote_engine_client: remote_engine_client::config::Config, } +#[derive(Debug, Clone, Copy, Deserialize, Serialize)] +pub enum RecoverMode { + TableBased, + ShardBased, +} + impl Default for Config { fn default() -> Self { Self { @@ -127,6 +139,7 @@ impl Default for Config { max_bytes_per_write_batch: None, wal: WalStorageConfig::RocksDB(Box::default()), remote_engine_client: remote_engine_client::config::Config::default(), + recover_mode: RecoverMode::TableBased, } } } From 62901916a6c07311a368151bb312594b66267351 Mon Sep 17 00:00:00 2001 From: kamille Date: Thu, 8 Jun 2023 00:24:13 +0800 Subject: [PATCH 05/15] add tests about `RegionBased` wal replay. --- analytic_engine/src/instance/wal_replayer.rs | 45 +++--- analytic_engine/src/tests/alter_test.rs | 29 ++-- analytic_engine/src/tests/drop_test.rs | 39 +++-- analytic_engine/src/tests/read_write_test.rs | 121 ++++++++++------ analytic_engine/src/tests/util.rs | 145 +++++++++++++++++-- 5 files changed, 279 insertions(+), 100 deletions(-) diff --git a/analytic_engine/src/instance/wal_replayer.rs b/analytic_engine/src/instance/wal_replayer.rs index 53dcbc6d6e..7ca4ed588f 100644 --- a/analytic_engine/src/instance/wal_replayer.rs +++ b/analytic_engine/src/instance/wal_replayer.rs @@ -70,8 +70,8 @@ impl<'a> WalReplayer<'a> { info!("Replay wal in mode:{mode:?}"); match mode { - ReplayMode::RegionBased => Box::new(TableBasedCore), - ReplayMode::TableBased => Box::new(RegionBasedCore), + ReplayMode::RegionBased => Box::new(RegionBasedCore), + ReplayMode::TableBased => Box::new(TableBasedCore), } } @@ -155,9 +155,6 @@ impl ReplayCore for TableBasedCore { } impl TableBasedCore { - /// Recover table data from wal. - /// - /// Called by write worker async fn recover_table_logs( context: &ReplayContext, table_data: &TableDataRef, @@ -224,7 +221,11 @@ impl ReplayCore for RegionBasedCore { ) -> Result>> { debug!("Replay wal logs on region mode, context:{context}, states:{table_datas:?}",); - let mut results = HashMap::with_capacity(table_datas.len()); + // Init all table results to be oks, and modify to errs when failed to replay. + let mut results = table_datas + .iter() + .map(|table_data| (table_data.id, Ok(()))) + .collect(); let scan_ctx = ScanContext { batch_size: context.wal_replay_batch_size, ..Default::default() @@ -307,25 +308,27 @@ impl RegionBasedCore { let mut table_batches = Vec::with_capacity(serial_exec_ctxs.len()); Self::split_log_batch_by_table(log_batch, &mut table_batches); for table_batch in table_batches { + // Some tables may have failed in previous replay, ignore them. let table_result = table_results.get(&table_batch.table_id); if let Some(Err(_)) = table_result { return Ok(()); } - // Replay all log entries of current table - let serial_exec_ctx = serial_exec_ctxs.get_mut(&table_batch.table_id).unwrap(); - let last_sequence = table_batch.last_sequence; - let result = replay_table_log_entries( - &context.flusher, - context.max_retry_flush_limit, - &mut serial_exec_ctx.serial_exec, - &serial_exec_ctx.table_data, - log_batch.range(table_batch.start_log_idx..table_batch.end_log_idx), - last_sequence, - ) - .await; - - table_results.insert(table_batch.table_id, result); + // Replay all log entries of current table. + // Some tables may have been moved to other shards or dropped, ignore such logs. + if let Some(ctx) = serial_exec_ctxs.get_mut(&table_batch.table_id) { + let last_sequence = table_batch.last_sequence; + let result = replay_table_log_entries( + &context.flusher, + context.max_retry_flush_limit, + &mut ctx.serial_exec, + &ctx.table_data, + log_batch.range(table_batch.start_log_idx..table_batch.end_log_idx), + last_sequence, + ) + .await; + table_results.insert(table_batch.table_id, result); + } } Ok(()) @@ -398,7 +401,7 @@ struct SerialExecContext<'a> { serial_exec: MutexGuard<'a, TableOpSerialExecutor>, } -/// Replay all log entries into memtable and flush if necessary. +/// Replay all log entries into memtable and flush if necessary async fn replay_table_log_entries( flusher: &Flusher, max_retry_flush_limit: usize, diff --git a/analytic_engine/src/tests/alter_test.rs b/analytic_engine/src/tests/alter_test.rs index 614cab7541..c6f4b08eec 100644 --- a/analytic_engine/src/tests/alter_test.rs +++ b/analytic_engine/src/tests/alter_test.rs @@ -20,24 +20,25 @@ use crate::{ tests::{ row_util, table::{self, FixedSchemaTable}, - util::{ - EngineBuildContext, MemoryEngineBuildContext, Null, RocksDBEngineBuildContext, - TestContext, TestEnv, - }, + util::{memory_ctxs, rocksdb_ctxs, EngineBuildContext, Null, TestContext, TestEnv}, }, }; #[test] fn test_alter_table_add_column_rocks() { - let rocksdb_ctx = RocksDBEngineBuildContext::default(); - test_alter_table_add_column(rocksdb_ctx); + let rocksdb_ctxs = rocksdb_ctxs(); + for ctx in rocksdb_ctxs { + test_alter_table_add_column(ctx); + } } #[ignore = "Enable this test when manifest use another snapshot implementation"] #[test] fn test_alter_table_add_column_mem_wal() { - let memory_ctx = MemoryEngineBuildContext::default(); - test_alter_table_add_column(memory_ctx); + let memory_ctxs = memory_ctxs(); + for ctx in memory_ctxs { + test_alter_table_add_column(ctx); + } } fn test_alter_table_add_column(engine_context: T) { @@ -370,15 +371,19 @@ async fn check_read_row_group( #[test] fn test_alter_table_options_rocks() { - let rocksdb_ctx = RocksDBEngineBuildContext::default(); - test_alter_table_options(rocksdb_ctx); + let rocksdb_ctxs = rocksdb_ctxs(); + for ctx in rocksdb_ctxs { + test_alter_table_options(ctx); + } } #[ignore = "Enable this test when manifest use another snapshot implementation"] #[test] fn test_alter_table_options_mem_wal() { - let memory_ctx = MemoryEngineBuildContext::default(); - test_alter_table_options(memory_ctx); + let memory_ctxs = memory_ctxs(); + for ctx in memory_ctxs { + test_alter_table_options(ctx); + } } fn test_alter_table_options(engine_context: T) { diff --git a/analytic_engine/src/tests/drop_test.rs b/analytic_engine/src/tests/drop_test.rs index 5dd0be033a..c915ae1482 100644 --- a/analytic_engine/src/tests/drop_test.rs +++ b/analytic_engine/src/tests/drop_test.rs @@ -10,7 +10,8 @@ use table_engine::table::AlterSchemaRequest; use crate::tests::{ table::FixedSchemaTable, util::{ - self, EngineBuildContext, MemoryEngineBuildContext, RocksDBEngineBuildContext, TestEnv, + self, memory_ctxs, rocksdb_ctxs, EngineBuildContext, MemoryEngineBuildContext, + RocksDBEngineBuildContext, TestEnv, }, }; @@ -209,14 +210,18 @@ fn test_drop_create_same_table_case(flush: bool, engine_c #[test] fn test_drop_create_same_table_rocks() { - let rocksdb_ctx = RocksDBEngineBuildContext::default(); - test_drop_create_same_table(rocksdb_ctx); + let rocksdb_ctxs = rocksdb_ctxs(); + for ctx in rocksdb_ctxs { + test_drop_create_same_table(ctx); + } } #[test] fn test_drop_create_same_table_mem_wal() { - let memory_ctx = MemoryEngineBuildContext::default(); - test_drop_create_same_table(memory_ctx); + let memory_ctxs = memory_ctxs(); + for ctx in memory_ctxs { + test_drop_create_same_table(ctx); + } } fn test_drop_create_same_table(engine_context: T) { @@ -227,14 +232,18 @@ fn test_drop_create_same_table(engine_context: T) { #[test] fn test_alter_schema_drop_create_rocks() { - let rocksdb_ctx = RocksDBEngineBuildContext::default(); - test_alter_schema_drop_create(rocksdb_ctx); + let rocksdb_ctxs = rocksdb_ctxs(); + for ctx in rocksdb_ctxs { + test_alter_schema_drop_create(ctx); + } } #[test] fn test_alter_schema_drop_create_mem_wal() { - let memory_ctx = MemoryEngineBuildContext::default(); - test_alter_schema_drop_create(memory_ctx); + let memory_ctxs = memory_ctxs(); + for ctx in memory_ctxs { + test_alter_schema_drop_create(ctx); + } } fn test_alter_schema_drop_create(engine_context: T) { @@ -284,14 +293,18 @@ fn test_alter_schema_drop_create(engine_context: T) { #[test] fn test_alter_options_drop_create_rocks() { - let rocksdb_ctx = RocksDBEngineBuildContext::default(); - test_alter_options_drop_create(rocksdb_ctx); + let rocksdb_ctxs = rocksdb_ctxs(); + for ctx in rocksdb_ctxs { + test_alter_options_drop_create(ctx); + } } #[test] fn test_alter_options_drop_create_mem_wal() { - let memory_ctx = MemoryEngineBuildContext::default(); - test_alter_options_drop_create(memory_ctx); + let memory_ctxs = memory_ctxs(); + for ctx in memory_ctxs { + test_alter_options_drop_create(ctx); + } } fn test_alter_options_drop_create(engine_context: T) { diff --git a/analytic_engine/src/tests/read_write_test.rs b/analytic_engine/src/tests/read_write_test.rs index 783f46aa42..7092fd98c6 100644 --- a/analytic_engine/src/tests/read_write_test.rs +++ b/analytic_engine/src/tests/read_write_test.rs @@ -1,4 +1,4 @@ -// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. +// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0. //! Read write test. @@ -11,22 +11,23 @@ use table_engine::table::ReadOrder; use crate::{ setup::WalsOpener, table_options, - tests::util::{ - self, EngineBuildContext, MemoryEngineBuildContext, RocksDBEngineBuildContext, TestContext, - TestEnv, - }, + tests::util::{self, memory_ctxs, rocksdb_ctxs, EngineBuildContext, TestContext, TestEnv}, }; #[test] fn test_multi_table_read_write_rocks() { - let rocksdb_ctx = RocksDBEngineBuildContext::default(); - test_multi_table_read_write(rocksdb_ctx); + let rocksdb_ctxs = rocksdb_ctxs(); + for ctx in rocksdb_ctxs { + test_multi_table_read_write(ctx); + } } #[test] fn test_multi_table_read_write_mem_wal() { - let memory_ctx = MemoryEngineBuildContext::default(); - test_multi_table_read_write(memory_ctx); + let memory_ctxs = memory_ctxs(); + for ctx in memory_ctxs { + test_multi_table_read_write(ctx); + } } fn test_multi_table_read_write(engine_context: T) { @@ -171,14 +172,18 @@ fn test_multi_table_read_write(engine_context: T) { #[test] fn test_table_write_read_rocks() { - let rocksdb_ctx = RocksDBEngineBuildContext::default(); - test_table_write_read(rocksdb_ctx); + let rocksdb_ctxs = rocksdb_ctxs(); + for ctx in rocksdb_ctxs { + test_table_write_read(ctx); + } } #[test] fn test_table_write_read_mem_wal() { - let memory_ctx = MemoryEngineBuildContext::default(); - test_table_write_read(memory_ctx); + let memory_ctxs = memory_ctxs(); + for ctx in memory_ctxs { + test_table_write_read(ctx); + } } fn test_table_write_read(engine_context: T) { @@ -192,7 +197,7 @@ fn test_table_write_read(engine_context: T) { let fixed_schema_table = test_ctx.create_fixed_schema_table(test_table1).await; let start_ms = test_ctx.start_ms(); - let rows = [ + let rows: [(&str, Timestamp, &str, f64, f64, &str); 3] = [ ( "key1", Timestamp::new(start_ms), @@ -250,14 +255,18 @@ fn test_table_write_read(engine_context: T) { #[test] fn test_table_write_get_rocks() { - let rocksdb_ctx = RocksDBEngineBuildContext::default(); - test_table_write_get(rocksdb_ctx); + let rocksdb_ctxs = rocksdb_ctxs(); + for ctx in rocksdb_ctxs { + test_table_write_get(ctx); + } } #[test] fn test_table_write_get_mem_wal() { - let memory_ctx = MemoryEngineBuildContext::default(); - test_table_write_get(memory_ctx); + let memory_ctxs = memory_ctxs(); + for ctx in memory_ctxs { + test_table_write_get(ctx); + } } fn test_table_write_get(engine_context: T) { @@ -327,22 +336,28 @@ fn test_table_write_get(engine_context: T) { #[test] fn test_table_write_get_override_rocks() { - test_table_write_get_override::(); + let rocksdb_ctxs = rocksdb_ctxs(); + for ctx in rocksdb_ctxs { + test_table_write_get_override(ctx); + } } #[test] fn test_table_write_get_override_mem_wal() { - test_table_write_get_override::(); + let memory_ctxs = memory_ctxs(); + for ctx in memory_ctxs { + test_table_write_get_override(ctx); + } } -fn test_table_write_get_override() { - test_table_write_get_override_case::(FlushPoint::NoFlush, T::default()); +fn test_table_write_get_override(engine_context: T) { + test_table_write_get_override_case::(FlushPoint::NoFlush, engine_context.clone()); - test_table_write_get_override_case::(FlushPoint::AfterFirstWrite, T::default()); + test_table_write_get_override_case::(FlushPoint::AfterFirstWrite, engine_context.clone()); - test_table_write_get_override_case::(FlushPoint::AfterOverwrite, T::default()); + test_table_write_get_override_case::(FlushPoint::AfterOverwrite, engine_context.clone()); - test_table_write_get_override_case::(FlushPoint::FirstAndOverwrite, T::default()); + test_table_write_get_override_case::(FlushPoint::FirstAndOverwrite, engine_context); } #[derive(Debug)] @@ -506,16 +521,20 @@ fn test_table_write_get_override_case( #[test] fn test_db_write_buffer_size_rocks() { - let rocksdb_ctx = RocksDBEngineBuildContext::default(); - // Use different table name to avoid metrics collision. - test_db_write_buffer_size("test_db_write_buffer_size_rocks", rocksdb_ctx); + let rocksdb_ctxs = rocksdb_ctxs(); + for ctx in rocksdb_ctxs { + // Use different table name to avoid metrics collision. + test_db_write_buffer_size("test_db_write_buffer_size_rocks", ctx); + } } #[test] fn test_db_write_buffer_size_mem_wal() { - let memory_ctx = MemoryEngineBuildContext::default(); - // Use different table name to avoid metrics collision. - test_db_write_buffer_size("test_db_write_buffer_size_mem_wal", memory_ctx); + let memory_ctxs = memory_ctxs(); + for ctx in memory_ctxs { + // Use different table name to avoid metrics collision. + test_db_write_buffer_size("test_db_write_buffer_size_mem_wal", ctx); + } } fn test_db_write_buffer_size(table_name: &str, engine_context: T) { @@ -527,16 +546,20 @@ fn test_db_write_buffer_size(table_name: &str, engine_con #[test] fn test_space_write_buffer_size_rocks() { - let rocksdb_ctx = RocksDBEngineBuildContext::default(); - // Use different table name to avoid metrics collision. - test_space_write_buffer_size("test_space_write_buffer_size_rocks", rocksdb_ctx); + let rocksdb_ctxs = rocksdb_ctxs(); + for ctx in rocksdb_ctxs { + // Use different table name to avoid metrics collision. + test_space_write_buffer_size("test_space_write_buffer_size_rocks", ctx); + } } #[test] fn test_space_write_buffer_size_mem_wal() { - let memory_ctx = MemoryEngineBuildContext::default(); - // Use different table name to avoid metrics collision. - test_space_write_buffer_size("test_space_write_buffer_size_mem_wal", memory_ctx); + let memory_ctxs = memory_ctxs(); + for ctx in memory_ctxs { + // Use different table name to avoid metrics collision. + test_space_write_buffer_size("test_space_write_buffer_size_mem_wal", ctx); + } } fn test_space_write_buffer_size(table_name: &str, engine_context: T) { @@ -660,14 +683,18 @@ fn test_write_buffer_size_overflow( #[test] fn test_table_write_read_reverse_rocks() { - let rocksdb_ctx = RocksDBEngineBuildContext::default(); - test_table_write_read_reverse(rocksdb_ctx); + let rocksdb_ctxs = rocksdb_ctxs(); + for ctx in rocksdb_ctxs { + test_table_write_read_reverse(ctx); + } } #[test] fn test_table_write_read_reverse_mem_wal() { - let memory_ctx = MemoryEngineBuildContext::default(); - test_table_write_read_reverse(memory_ctx); + let memory_ctxs = memory_ctxs(); + for ctx in memory_ctxs { + test_table_write_read_reverse(ctx); + } } fn test_table_write_read_reverse(engine_context: T) { @@ -746,15 +773,19 @@ fn test_table_write_read_reverse(engine_context: T) { #[test] #[ignore = "https://github.com/CeresDB/ceresdb/issues/313"] fn test_table_write_read_reverse_after_flush_rocks() { - let rocksdb_ctx = RocksDBEngineBuildContext::default(); - test_table_write_read_reverse_after_flush(rocksdb_ctx); + let rocksdb_ctxs = rocksdb_ctxs(); + for ctx in rocksdb_ctxs { + test_table_write_read_reverse_after_flush(ctx); + } } #[test] #[ignore = "https://github.com/CeresDB/ceresdb/issues/313"] fn test_table_write_read_reverse_after_flush_mem_wal() { - let memory_ctx = MemoryEngineBuildContext::default(); - test_table_write_read_reverse_after_flush(memory_ctx); + let memory_ctxs = memory_ctxs(); + for ctx in memory_ctxs { + test_table_write_read_reverse_after_flush(ctx); + } } fn test_table_write_read_reverse_after_flush(engine_context: T) { diff --git a/analytic_engine/src/tests/util.rs b/analytic_engine/src/tests/util.rs index 0e2c897ecc..0cc8fb94e3 100644 --- a/analytic_engine/src/tests/util.rs +++ b/analytic_engine/src/tests/util.rs @@ -8,7 +8,7 @@ use common_types::{ datum::Datum, record_batch::RecordBatch, row::{Row, RowGroup}, - table::DEFAULT_SHARD_ID, + table::{ShardId, DEFAULT_SHARD_ID}, time::Timestamp, }; use common_util::{ @@ -20,8 +20,8 @@ use log::info; use object_store::config::{LocalOptions, ObjectStoreOptions, StorageOptions}; use table_engine::{ engine::{ - CreateTableRequest, DropTableRequest, EngineRuntimes, OpenTableRequest, - Result as EngineResult, TableEngineRef, + CreateTableRequest, DropTableRequest, EngineRuntimes, OpenShardRequest, OpenTableRequest, + Result as EngineResult, TableDef, TableEngineRef, }, table::{ AlterSchemaRequest, FlushRequest, GetRequest, ReadOrder, ReadRequest, Result, SchemaId, @@ -33,7 +33,7 @@ use tempfile::TempDir; use crate::{ setup::{EngineBuilder, MemWalsOpener, OpenedWals, RocksDBWalsOpener, WalsOpener}, tests::table::{self, FixedSchemaTable, RowTuple}, - Config, RocksDBConfig, WalStorageConfig, + Config, RecoverMode, RocksDBConfig, WalStorageConfig, }; const DAY_MS: i64 = 24 * 60 * 60 * 1000; @@ -113,6 +113,7 @@ pub struct TestContext { opened_wals: Option, schema_id: SchemaId, last_table_seq: u32, + open_method: OpenTablesMethod, name_to_tables: HashMap, } @@ -169,8 +170,69 @@ impl TestContext { self.open().await; - for (id, name) in table_infos { - self.open_table(id, name).await; + match self.open_method { + OpenTablesMethod::WithOpenTable => { + for (id, name) in table_infos { + self.open_table(id, name).await; + } + } + OpenTablesMethod::WithOpenShard => { + self.open_tables_of_shard(table_infos, DEFAULT_SHARD_ID) + .await; + } + } + } + + pub async fn reopen_with_tables_of_shard(&mut self, tables: &[&str], shard_id: ShardId) { + let table_infos: Vec<_> = tables + .iter() + .map(|name| { + let table_id = self.name_to_tables.get(*name).unwrap().id(); + (table_id, *name) + }) + .collect(); + { + // Close all tables. + self.name_to_tables.clear(); + + // Close engine. + let engine = self.engine.take().unwrap(); + engine.close().await.unwrap(); + } + + self.open().await; + + self.open_tables_of_shard(table_infos, shard_id).await + } + + async fn open_tables_of_shard(&mut self, table_infos: Vec<(TableId, &str)>, shard_id: ShardId) { + let table_defs = table_infos + .into_iter() + .map(|table| TableDef { + catalog_name: "ceresdb".to_string(), + schema_name: "public".to_string(), + schema_id: self.schema_id, + id: table.0, + name: table.1.to_string(), + }) + .collect(); + + let open_shard_request = OpenShardRequest { + shard_id, + table_defs, + engine: table_engine::ANALYTIC_ENGINE_TYPE.to_string(), + }; + + let tables = self + .engine() + .open_shard(open_shard_request) + .await + .unwrap() + .into_values() + .map(|result| result.unwrap().unwrap()); + + for table in tables { + self.name_to_tables.insert(table.name().to_string(), table); } } @@ -368,6 +430,12 @@ impl TestContext { } } +#[derive(Clone, Copy)] +pub enum OpenTablesMethod { + WithOpenTable, + WithOpenShard, +} + impl TestContext { pub fn config_mut(&mut self) -> &mut Config { &mut self.config @@ -405,6 +473,7 @@ impl TestEnv { schema_id: SchemaId::from_u32(100), last_table_seq: 1, name_to_tables: HashMap::new(), + open_method: build_context.open_method(), } } @@ -474,10 +543,22 @@ pub trait EngineBuildContext: Clone + Default { fn wals_opener(&self) -> Self::WalsOpener; fn config(&self) -> Config; + fn open_method(&self) -> OpenTablesMethod; } pub struct RocksDBEngineBuildContext { config: Config, + open_method: OpenTablesMethod, +} + +impl RocksDBEngineBuildContext { + pub fn new(mode: RecoverMode, open_method: OpenTablesMethod) -> Self { + let mut context = Self::default(); + context.config.recover_mode = mode; + context.open_method = open_method; + + context + } } impl Default for RocksDBEngineBuildContext { @@ -504,7 +585,10 @@ impl Default for RocksDBEngineBuildContext { ..Default::default() }; - Self { config } + Self { + config, + open_method: OpenTablesMethod::WithOpenTable, + } } } @@ -531,7 +615,10 @@ impl Clone for RocksDBEngineBuildContext { ..Default::default() })); - Self { config } + Self { + config, + open_method: self.open_method, + } } } @@ -545,11 +632,26 @@ impl EngineBuildContext for RocksDBEngineBuildContext { fn config(&self) -> Config { self.config.clone() } + + fn open_method(&self) -> OpenTablesMethod { + self.open_method + } } #[derive(Clone)] pub struct MemoryEngineBuildContext { config: Config, + open_method: OpenTablesMethod, +} + +impl MemoryEngineBuildContext { + pub fn new(mode: RecoverMode, open_method: OpenTablesMethod) -> Self { + let mut context = Self::default(); + context.config.recover_mode = mode; + context.open_method = open_method; + + context + } } impl Default for MemoryEngineBuildContext { @@ -572,7 +674,10 @@ impl Default for MemoryEngineBuildContext { ..Default::default() }; - Self { config } + Self { + config, + open_method: OpenTablesMethod::WithOpenTable, + } } } @@ -586,4 +691,26 @@ impl EngineBuildContext for MemoryEngineBuildContext { fn config(&self) -> Config { self.config.clone() } + + fn open_method(&self) -> OpenTablesMethod { + self.open_method + } +} + +pub fn rocksdb_ctxs() -> Vec { + vec![ + RocksDBEngineBuildContext::new(RecoverMode::TableBased, OpenTablesMethod::WithOpenTable), + RocksDBEngineBuildContext::new(RecoverMode::ShardBased, OpenTablesMethod::WithOpenTable), + RocksDBEngineBuildContext::new(RecoverMode::TableBased, OpenTablesMethod::WithOpenShard), + RocksDBEngineBuildContext::new(RecoverMode::ShardBased, OpenTablesMethod::WithOpenShard), + ] +} + +pub fn memory_ctxs() -> Vec { + vec![ + MemoryEngineBuildContext::new(RecoverMode::TableBased, OpenTablesMethod::WithOpenTable), + MemoryEngineBuildContext::new(RecoverMode::ShardBased, OpenTablesMethod::WithOpenTable), + MemoryEngineBuildContext::new(RecoverMode::TableBased, OpenTablesMethod::WithOpenShard), + MemoryEngineBuildContext::new(RecoverMode::ShardBased, OpenTablesMethod::WithOpenShard), + ] } From 00202c563630c44dd7126debd6c706f9413b6e15 Mon Sep 17 00:00:00 2001 From: kamille Date: Mon, 12 Jun 2023 14:41:16 +0800 Subject: [PATCH 06/15] ignore the too old wal logs when replaying. --- analytic_engine/src/instance/wal_replayer.rs | 36 ++++++++------------ 1 file changed, 15 insertions(+), 21 deletions(-) diff --git a/analytic_engine/src/instance/wal_replayer.rs b/analytic_engine/src/instance/wal_replayer.rs index 7ca4ed588f..c770474701 100644 --- a/analytic_engine/src/instance/wal_replayer.rs +++ b/analytic_engine/src/instance/wal_replayer.rs @@ -8,7 +8,7 @@ use std::{ }; use async_trait::async_trait; -use common_types::{schema::IndexInWriterSchema, table::ShardId, SequenceNumber}; +use common_types::{schema::IndexInWriterSchema, table::ShardId}; use common_util::error::BoxError; use log::{debug, error, info, trace}; use snafu::ResultExt; @@ -193,14 +193,12 @@ impl TableBasedCore { } // Replay all log entries of current table - let last_sequence = log_entry_buf.back().unwrap().sequence; replay_table_log_entries( &context.flusher, context.max_retry_flush_limit, &mut serial_exec, table_data, log_entry_buf.iter(), - last_sequence, ) .await?; } @@ -317,14 +315,12 @@ impl RegionBasedCore { // Replay all log entries of current table. // Some tables may have been moved to other shards or dropped, ignore such logs. if let Some(ctx) = serial_exec_ctxs.get_mut(&table_batch.table_id) { - let last_sequence = table_batch.last_sequence; let result = replay_table_log_entries( &context.flusher, context.max_retry_flush_limit, &mut ctx.serial_exec, &ctx.table_data, log_batch.range(table_batch.start_log_idx..table_batch.end_log_idx), - last_sequence, ) .await; table_results.insert(table_batch.table_id, result); @@ -363,10 +359,8 @@ impl RegionBasedCore { }; if found_end_idx { - let last_sequence = log_batch.get(curr_log_idx - 1).unwrap().sequence; table_batches.push(TableBatch { table_id: TableId::new(start_table_id), - last_sequence, start_log_idx, end_log_idx: curr_log_idx, }); @@ -391,7 +385,6 @@ impl RegionBasedCore { #[derive(Debug, Eq, PartialEq)] struct TableBatch { table_id: TableId, - last_sequence: SequenceNumber, start_log_idx: usize, end_log_idx: usize, } @@ -408,17 +401,22 @@ async fn replay_table_log_entries( serial_exec: &mut TableOpSerialExecutor, table_data: &TableDataRef, log_entries: impl Iterator>, - last_sequence: SequenceNumber, ) -> Result<()> { + let flushed_sequence = table_data.current_version().flushed_sequence(); debug!( - "Replay table log entries begin, table:{}, table_id:{:?}, sequence:{}", - table_data.name, table_data.id, last_sequence + "Replay table log entries begin, table:{}, table_id:{:?}, last_sequence:{}, flushed_sequence:{flushed_sequence}", + table_data.name, table_data.id, table_data.last_sequence(), ); for log_entry in log_entries { let (sequence, payload) = (log_entry.sequence, &log_entry.payload); - // Apply to memtable + // Ignore too old logs(sequence <= `flushed_sequence`). + if sequence <= flushed_sequence { + continue; + } + + // Apply logs to memtable. match payload { ReadPayload::Write { row_group } => { trace!( @@ -443,7 +441,7 @@ async fn replay_table_log_entries( table_data.id, table_schema_version, row_group.schema().version(), - last_sequence, + table_data.last_sequence(), sequence, ); @@ -489,15 +487,15 @@ async fn replay_table_log_entries( // Manifest on start. } } + + table_data.set_last_sequence(sequence); } debug!( - "Replay table log entries finish, table:{}, table_id:{:?}, last_sequence:{}", - table_data.name, table_data.id, last_sequence + "Replay table log entries finish, table:{}, table_id:{:?}, last_sequence:{}, flushed_sequence:{}", + table_data.name, table_data.id, table_data.last_sequence(), table_data.current_version().flushed_sequence() ); - table_data.set_last_sequence(last_sequence); - Ok(()) } @@ -554,19 +552,16 @@ mod tests { let expected1 = vec![ TableBatch { table_id: TableId::new(0), - last_sequence: 3, start_log_idx: 0, end_log_idx: 3, }, TableBatch { table_id: TableId::new(1), - last_sequence: 2, start_log_idx: 3, end_log_idx: 5, }, TableBatch { table_id: TableId::new(2), - last_sequence: 1, start_log_idx: 5, end_log_idx: 6, }, @@ -579,7 +574,6 @@ mod tests { }]); let expected2 = vec![TableBatch { table_id: TableId::new(0), - last_sequence: 1, start_log_idx: 0, end_log_idx: 1, }]; From ed3a30391131520f6ec9562c28d0a1426800bbf0 Mon Sep 17 00:00:00 2001 From: kamille Date: Mon, 12 Jun 2023 15:08:17 +0800 Subject: [PATCH 07/15] remove unwrap in path of kafka based wal deleting. --- wal/src/message_queue_impl/region.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/wal/src/message_queue_impl/region.rs b/wal/src/message_queue_impl/region.rs index 49f331dfac..c38f87b4bb 100644 --- a/wal/src/message_queue_impl/region.rs +++ b/wal/src/message_queue_impl/region.rs @@ -586,7 +586,7 @@ impl Region { table_id ); - inner.mark_delete_to(table_id, sequence_num).await.unwrap(); + inner.mark_delete_to(table_id, sequence_num).await?; ( inner.make_meta_snapshot().await, From 182c49d2217c4a69a660f428d78fb2ce978feee5 Mon Sep 17 00:00:00 2001 From: kamille Date: Mon, 12 Jun 2023 15:09:13 +0800 Subject: [PATCH 08/15] add error log when background flush procedure failed. --- analytic_engine/src/instance/flush_compaction.rs | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/analytic_engine/src/instance/flush_compaction.rs b/analytic_engine/src/instance/flush_compaction.rs index c2ca13bb9a..34c254980f 100644 --- a/analytic_engine/src/instance/flush_compaction.rs +++ b/analytic_engine/src/instance/flush_compaction.rs @@ -283,7 +283,16 @@ impl Flusher { runtime: self.runtime.clone(), write_sst_max_buffer_size: self.write_sst_max_buffer_size, }; - let flush_job = async move { flush_task.run().await }; + let flush_job = async move { + let table_data = &flush_task.table_data; + flush_task.run().await.map_err(|e| { + error!( + "Instance flush memtables failed, table:{}, table_id:{}, err{e}", + table_data.name, table_data.id + ); + e + }) + }; flush_scheduler .flush_sequentially(flush_job, block_on, opts, &self.runtime, table_data.clone()) From b16634fce84462acaa94d28a641f4ea052aa6cce Mon Sep 17 00:00:00 2001 From: kamille Date: Wed, 14 Jun 2023 14:54:50 +0800 Subject: [PATCH 09/15] add `core` field to `WalReplayer`. --- analytic_engine/src/instance/wal_replayer.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/analytic_engine/src/instance/wal_replayer.rs b/analytic_engine/src/instance/wal_replayer.rs index c770474701..a0844605f3 100644 --- a/analytic_engine/src/instance/wal_replayer.rs +++ b/analytic_engine/src/instance/wal_replayer.rs @@ -37,7 +37,7 @@ use crate::{ // TODO: limit the memory usage in `RegionBased` mode. pub struct WalReplayer<'a> { context: ReplayContext, - mode: ReplayMode, + core: Box, table_datas: &'a [TableDataRef], } @@ -59,8 +59,10 @@ impl<'a> WalReplayer<'a> { max_retry_flush_limit, }; + let core = Self::build_core(replay_mode); + Self { - mode: replay_mode, + core, context, table_datas, } @@ -77,12 +79,11 @@ impl<'a> WalReplayer<'a> { pub async fn replay(&mut self) -> Result>> { // Build core according to mode. - let core = Self::build_core(self.mode); info!( "Replay wal logs begin, context:{}, tables:{:?}", self.context, self.table_datas ); - let result = core.replay(&self.context, self.table_datas).await; + let result = self.core.replay(&self.context, self.table_datas).await; info!( "Replay wal logs finish, context:{}, tables:{:?}", self.context, self.table_datas, From 83e3e1fcf6579643cc418e1ea76a6ac671aba386 Mon Sep 17 00:00:00 2001 From: kamille Date: Wed, 14 Jun 2023 21:00:41 +0800 Subject: [PATCH 10/15] refactor the error log in flush. --- .../src/instance/flush_compaction.rs | 30 ++++++++++++++----- analytic_engine/src/instance/write.rs | 2 +- 2 files changed, 23 insertions(+), 9 deletions(-) diff --git a/analytic_engine/src/instance/flush_compaction.rs b/analytic_engine/src/instance/flush_compaction.rs index 34c254980f..c2f5a32b8f 100644 --- a/analytic_engine/src/instance/flush_compaction.rs +++ b/analytic_engine/src/instance/flush_compaction.rs @@ -130,6 +130,18 @@ pub enum Error { #[snafu(display("Other failure, msg:{}.\nBacktrace:\n{:?}", msg, backtrace))] Other { msg: String, backtrace: Backtrace }, + + #[snafu(display("Failed to run flush job, msg:{:?}, err:{}", msg, source))] + FlushJobWithCause { + msg: Option, + source: GenericError, + }, + + #[snafu(display("Failed to run flush job, msg:{:?}.\nBacktrace:\n{}", msg, backtrace))] + FlushJobNoCause { + msg: Option, + backtrace: Backtrace, + }, } define_result!(Error); @@ -285,13 +297,7 @@ impl Flusher { }; let flush_job = async move { let table_data = &flush_task.table_data; - flush_task.run().await.map_err(|e| { - error!( - "Instance flush memtables failed, table:{}, table_id:{}, err{e}", - table_data.name, table_data.id - ); - e - }) + flush_task.run().await }; flush_scheduler @@ -321,7 +327,15 @@ impl FlushTask { // Start flush duration timer. let local_metrics = self.table_data.metrics.local_flush_metrics(); let _timer = local_metrics.start_flush_timer(); - self.dump_memtables(request_id, &mems_to_flush).await?; + self.dump_memtables(request_id, &mems_to_flush) + .await + .box_err() + .context(FlushJobWithCause { + msg: Some(format!( + "table:{}, table_id:{}, request_id:{request_id}", + self.table_data.name, self.table_data.id + )), + })?; self.table_data .set_last_flush_time(time::current_time_millis()); diff --git a/analytic_engine/src/instance/write.rs b/analytic_engine/src/instance/write.rs index 219e86102c..072de5b860 100644 --- a/analytic_engine/src/instance/write.rs +++ b/analytic_engine/src/instance/write.rs @@ -8,7 +8,7 @@ use common_types::{ row::{RowGroup, RowGroupSlicer}, schema::{IndexInWriterSchema, Schema}, }; -use common_util::{codec::row, define_result}; +use common_util::{codec::row, define_result, error::GenericError}; use log::{debug, error, info, trace, warn}; use smallvec::SmallVec; use snafu::{ensure, Backtrace, ResultExt, Snafu}; From 72a2e44d0a2d2ec4994830b53a9fd54155d2a89f Mon Sep 17 00:00:00 2001 From: kamille Date: Wed, 14 Jun 2023 21:01:43 +0800 Subject: [PATCH 11/15] add comments. --- analytic_engine/src/instance/flush_compaction.rs | 2 +- analytic_engine/src/instance/serial_executor.rs | 2 ++ analytic_engine/src/instance/wal_replayer.rs | 5 ++++- analytic_engine/src/instance/write.rs | 2 +- 4 files changed, 8 insertions(+), 3 deletions(-) diff --git a/analytic_engine/src/instance/flush_compaction.rs b/analytic_engine/src/instance/flush_compaction.rs index c2f5a32b8f..1225370aec 100644 --- a/analytic_engine/src/instance/flush_compaction.rs +++ b/analytic_engine/src/instance/flush_compaction.rs @@ -296,7 +296,7 @@ impl Flusher { write_sst_max_buffer_size: self.write_sst_max_buffer_size, }; let flush_job = async move { - let table_data = &flush_task.table_data; + let _table_data = &flush_task.table_data; flush_task.run().await }; diff --git a/analytic_engine/src/instance/serial_executor.rs b/analytic_engine/src/instance/serial_executor.rs index 0e48ce5f18..0608f78c2e 100644 --- a/analytic_engine/src/instance/serial_executor.rs +++ b/analytic_engine/src/instance/serial_executor.rs @@ -223,6 +223,8 @@ fn on_flush_finished(schedule_sync: ScheduleSyncRef, res: &Result<()>) { *flush_state = FlushState::Ready; } Err(e) => { + error!("Failed to run flush task, err:{e}"); + schedule_sync.inc_flush_failure_count(); let err_msg = e.to_string(); *flush_state = FlushState::Failed { err_msg }; diff --git a/analytic_engine/src/instance/wal_replayer.rs b/analytic_engine/src/instance/wal_replayer.rs index a0844605f3..054545421b 100644 --- a/analytic_engine/src/instance/wal_replayer.rs +++ b/analytic_engine/src/instance/wal_replayer.rs @@ -304,8 +304,11 @@ impl RegionBasedCore { serial_exec_ctxs: &mut HashMap>, table_results: &mut HashMap>, ) -> Result<()> { - let mut table_batches = Vec::with_capacity(serial_exec_ctxs.len()); + let mut table_batches = Vec::new(); + // TODO: No `group_by` method in `VecDeque`, so implement it manually here... Self::split_log_batch_by_table(log_batch, &mut table_batches); + + // TODO: Replay logs of different tables in parallel. for table_batch in table_batches { // Some tables may have failed in previous replay, ignore them. let table_result = table_results.get(&table_batch.table_id); diff --git a/analytic_engine/src/instance/write.rs b/analytic_engine/src/instance/write.rs index 072de5b860..219e86102c 100644 --- a/analytic_engine/src/instance/write.rs +++ b/analytic_engine/src/instance/write.rs @@ -8,7 +8,7 @@ use common_types::{ row::{RowGroup, RowGroupSlicer}, schema::{IndexInWriterSchema, Schema}, }; -use common_util::{codec::row, define_result, error::GenericError}; +use common_util::{codec::row, define_result}; use log::{debug, error, info, trace, warn}; use smallvec::SmallVec; use snafu::{ensure, Backtrace, ResultExt, Snafu}; From 85441480be73555ef6709fdccb5cc7e5d4ee53c9 Mon Sep 17 00:00:00 2001 From: kamille Date: Thu, 15 Jun 2023 11:20:17 +0800 Subject: [PATCH 12/15] just return faileds in `WalReplayer::replay`. --- analytic_engine/src/instance/open.rs | 8 +-- analytic_engine/src/instance/wal_replayer.rs | 58 ++++++++++---------- 2 files changed, 33 insertions(+), 33 deletions(-) diff --git a/analytic_engine/src/instance/open.rs b/analytic_engine/src/instance/open.rs index 6abd2271a2..0f0f175c8b 100644 --- a/analytic_engine/src/instance/open.rs +++ b/analytic_engine/src/instance/open.rs @@ -359,15 +359,15 @@ impl ShardOpener { for table_data in replay_table_datas { let table_id = table_data.id; let stage = self.states.get_mut(&table_id).unwrap(); - let table_result = table_results.remove(&table_id).unwrap(); + let failed_table_opt = table_results.remove(&table_id); - match (&stage, table_result) { - (TableOpenStage::RecoverTableData(ctx), Ok(())) => { + match (&stage, failed_table_opt) { + (TableOpenStage::RecoverTableData(ctx), None) => { let space_table = SpaceAndTable::new(ctx.space.clone(), ctx.table_data.clone()); *stage = TableOpenStage::Success(Some(space_table)); } - (TableOpenStage::RecoverTableData(_), Err(e)) => { + (TableOpenStage::RecoverTableData(_), Some(e)) => { *stage = TableOpenStage::Failed(e); } diff --git a/analytic_engine/src/instance/wal_replayer.rs b/analytic_engine/src/instance/wal_replayer.rs index 054545421b..e1e698f626 100644 --- a/analytic_engine/src/instance/wal_replayer.rs +++ b/analytic_engine/src/instance/wal_replayer.rs @@ -24,7 +24,7 @@ use wal::{ use crate::{ instance::{ self, - engine::{ReplayWalWithCause, Result}, + engine::{Error, ReplayWalWithCause, Result}, flush_compaction::{Flusher, TableFlushOptions}, serial_executor::TableOpSerialExecutor, write::MemTableWriter, @@ -77,7 +77,8 @@ impl<'a> WalReplayer<'a> { } } - pub async fn replay(&mut self) -> Result>> { + /// Replay tables and return the failed tables and the causes. + pub async fn replay(&mut self) -> Result { // Build core according to mode. info!( "Replay wal logs begin, context:{}, tables:{:?}", @@ -117,14 +118,17 @@ pub enum ReplayMode { TableBased, } +pub type FailedTables = HashMap; + /// Replay core, the abstract of different replay strategies #[async_trait] trait ReplayCore: Send + Sync + 'static { + /// Replay tables, return the failed tables and the causes. async fn replay( &self, context: &ReplayContext, table_datas: &[TableDataRef], - ) -> Result>>; + ) -> Result; } /// Table based wal replay core @@ -136,22 +140,22 @@ impl ReplayCore for TableBasedCore { &self, context: &ReplayContext, table_datas: &[TableDataRef], - ) -> Result>> { + ) -> Result { debug!("Replay wal logs on table mode, context:{context}, tables:{table_datas:?}",); - let mut results = HashMap::with_capacity(table_datas.len()); + let mut faileds = HashMap::new(); let read_ctx = ReadContext { batch_size: context.wal_replay_batch_size, ..Default::default() }; for table_data in table_datas { let table_id = table_data.id; - let result = Self::recover_table_logs(context, table_data, &read_ctx).await; - - results.insert(table_id, result); + if let Err(e) = Self::recover_table_logs(context, table_data, &read_ctx).await { + faileds.insert(table_id, e); + } } - Ok(results) + Ok(faileds) } } @@ -217,22 +221,19 @@ impl ReplayCore for RegionBasedCore { &self, context: &ReplayContext, table_datas: &[TableDataRef], - ) -> Result>> { + ) -> Result { debug!("Replay wal logs on region mode, context:{context}, states:{table_datas:?}",); // Init all table results to be oks, and modify to errs when failed to replay. - let mut results = table_datas - .iter() - .map(|table_data| (table_data.id, Ok(()))) - .collect(); + let mut faileds = FailedTables::new(); let scan_ctx = ScanContext { batch_size: context.wal_replay_batch_size, ..Default::default() }; - Self::replay_region_logs(context, table_datas, &scan_ctx, &mut results).await?; + Self::replay_region_logs(context, table_datas, &scan_ctx, &mut faileds).await?; - Ok(results) + Ok(faileds) } } @@ -247,7 +248,7 @@ impl RegionBasedCore { context: &ReplayContext, table_datas: &[TableDataRef], scan_ctx: &ScanContext, - table_results: &mut HashMap>, + faileds: &mut FailedTables, ) -> Result<()> { // Scan all wal logs of current shard. let scan_req = ScanRequest { @@ -286,13 +287,8 @@ impl RegionBasedCore { break; } - Self::replay_single_batch( - context, - &log_entry_buf, - &mut serial_exec_ctxs, - table_results, - ) - .await?; + Self::replay_single_batch(context, &log_entry_buf, &mut serial_exec_ctxs, faileds) + .await?; } Ok(()) @@ -302,7 +298,7 @@ impl RegionBasedCore { context: &ReplayContext, log_batch: &VecDeque>, serial_exec_ctxs: &mut HashMap>, - table_results: &mut HashMap>, + faileds: &mut FailedTables, ) -> Result<()> { let mut table_batches = Vec::new(); // TODO: No `group_by` method in `VecDeque`, so implement it manually here... @@ -311,9 +307,8 @@ impl RegionBasedCore { // TODO: Replay logs of different tables in parallel. for table_batch in table_batches { // Some tables may have failed in previous replay, ignore them. - let table_result = table_results.get(&table_batch.table_id); - if let Some(Err(_)) = table_result { - return Ok(()); + if faileds.contains_key(&table_batch.table_id) { + continue; } // Replay all log entries of current table. @@ -327,7 +322,11 @@ impl RegionBasedCore { log_batch.range(table_batch.start_log_idx..table_batch.end_log_idx), ) .await; - table_results.insert(table_batch.table_id, result); + + // If occur error, mark this table as failed and store the cause. + if let Err(e) = result { + faileds.insert(table_batch.table_id, e); + } } } @@ -372,6 +371,7 @@ impl RegionBasedCore { // Step to next start idx. start_log_idx = curr_log_idx; start_table_id = if time_to_break { + // The final round, just set it to max as an invalid flag. u64::MAX } else { log_batch.get(start_log_idx).unwrap().table_id From c8775b2ede40c4bdb071d6519b425ee6c667c16b Mon Sep 17 00:00:00 2001 From: kamille Date: Thu, 15 Jun 2023 11:45:17 +0800 Subject: [PATCH 13/15] fix naming and add comments. --- analytic_engine/src/instance/open.rs | 16 ++++--- analytic_engine/src/instance/wal_replayer.rs | 49 ++++++++++---------- 2 files changed, 33 insertions(+), 32 deletions(-) diff --git a/analytic_engine/src/instance/open.rs b/analytic_engine/src/instance/open.rs index 0f0f175c8b..212dd81e20 100644 --- a/analytic_engine/src/instance/open.rs +++ b/analytic_engine/src/instance/open.rs @@ -193,7 +193,7 @@ struct ShardOpener { shard_id: ShardId, manifest: ManifestRef, wal_manager: WalManagerRef, - states: HashMap, + stages: HashMap, wal_replay_batch_size: usize, flusher: Flusher, max_retry_flush_limit: usize, @@ -231,7 +231,7 @@ impl ShardOpener { shard_id: shard_context.shard_id, manifest, wal_manager, - states, + stages: states, wal_replay_batch_size, flusher, max_retry_flush_limit, @@ -247,7 +247,7 @@ impl ShardOpener { self.recover_table_datas().await?; // Retrieve the table results and return. - let states = std::mem::take(&mut self.states); + let states = std::mem::take(&mut self.stages); let mut table_results = HashMap::with_capacity(states.len()); for (table_id, state) in states { match state { @@ -273,7 +273,7 @@ impl ShardOpener { /// Recover table meta data from manifest based on shard. async fn recover_table_metas(&mut self) -> Result<()> { - for (table_id, state) in self.states.iter_mut() { + for (table_id, state) in self.stages.iter_mut() { match state { // Only do the meta recovery work in `RecoverTableMeta` state. TableOpenStage::RecoverTableMeta(ctx) => { @@ -319,8 +319,8 @@ impl ShardOpener { /// Recover table data based on shard. async fn recover_table_datas(&mut self) -> Result<()> { // Replay wal logs of tables. - let mut replay_table_datas = Vec::with_capacity(self.states.len()); - for (table_id, stage) in self.states.iter_mut() { + let mut replay_table_datas = Vec::with_capacity(self.stages.len()); + for (table_id, stage) in self.stages.iter_mut() { match stage { // Only do the wal recovery work in `RecoverTableData` state. TableOpenStage::RecoverTableData(ctx) => { @@ -358,7 +358,9 @@ impl ShardOpener { // Process the replay results. for table_data in replay_table_datas { let table_id = table_data.id; - let stage = self.states.get_mut(&table_id).unwrap(); + // Each `table_data` has its related `stage` in `stages`, impossible to panic + // here. + let stage = self.stages.get_mut(&table_id).unwrap(); let failed_table_opt = table_results.remove(&table_id); match (&stage, failed_table_opt) { diff --git a/analytic_engine/src/instance/wal_replayer.rs b/analytic_engine/src/instance/wal_replayer.rs index e1e698f626..bd798756bf 100644 --- a/analytic_engine/src/instance/wal_replayer.rs +++ b/analytic_engine/src/instance/wal_replayer.rs @@ -37,7 +37,7 @@ use crate::{ // TODO: limit the memory usage in `RegionBased` mode. pub struct WalReplayer<'a> { context: ReplayContext, - core: Box, + replay: Box, table_datas: &'a [TableDataRef], } @@ -59,32 +59,32 @@ impl<'a> WalReplayer<'a> { max_retry_flush_limit, }; - let core = Self::build_core(replay_mode); + let replay = Self::build_replay(replay_mode); Self { - core, + replay, context, table_datas, } } - fn build_core(mode: ReplayMode) -> Box { + fn build_replay(mode: ReplayMode) -> Box { info!("Replay wal in mode:{mode:?}"); match mode { - ReplayMode::RegionBased => Box::new(RegionBasedCore), - ReplayMode::TableBased => Box::new(TableBasedCore), + ReplayMode::RegionBased => Box::new(RegionBasedReplay), + ReplayMode::TableBased => Box::new(TableBasedReplay), } } /// Replay tables and return the failed tables and the causes. pub async fn replay(&mut self) -> Result { - // Build core according to mode. + // Build replay action according to mode. info!( "Replay wal logs begin, context:{}, tables:{:?}", self.context, self.table_datas ); - let result = self.core.replay(&self.context, self.table_datas).await; + let result = self.replay.run(&self.context, self.table_datas).await; info!( "Replay wal logs finish, context:{}, tables:{:?}", self.context, self.table_datas, @@ -120,23 +120,22 @@ pub enum ReplayMode { pub type FailedTables = HashMap; -/// Replay core, the abstract of different replay strategies +/// Replay action, the abstract of different replay strategies #[async_trait] -trait ReplayCore: Send + Sync + 'static { - /// Replay tables, return the failed tables and the causes. - async fn replay( +trait Replay: Send + Sync + 'static { + async fn run( &self, context: &ReplayContext, table_datas: &[TableDataRef], ) -> Result; } -/// Table based wal replay core -struct TableBasedCore; +/// Table based wal replay +struct TableBasedReplay; #[async_trait] -impl ReplayCore for TableBasedCore { - async fn replay( +impl Replay for TableBasedReplay { + async fn run( &self, context: &ReplayContext, table_datas: &[TableDataRef], @@ -159,7 +158,7 @@ impl ReplayCore for TableBasedCore { } } -impl TableBasedCore { +impl TableBasedReplay { async fn recover_table_logs( context: &ReplayContext, table_data: &TableDataRef, @@ -212,17 +211,17 @@ impl TableBasedCore { } } -/// Region based wal replay core -struct RegionBasedCore; +/// Region based wal replay +struct RegionBasedReplay; #[async_trait] -impl ReplayCore for RegionBasedCore { - async fn replay( +impl Replay for RegionBasedReplay { + async fn run( &self, context: &ReplayContext, table_datas: &[TableDataRef], ) -> Result { - debug!("Replay wal logs on region mode, context:{context}, states:{table_datas:?}",); + debug!("Replay wal logs on region mode, context:{context}, tables:{table_datas:?}",); // Init all table results to be oks, and modify to errs when failed to replay. let mut faileds = FailedTables::new(); @@ -237,7 +236,7 @@ impl ReplayCore for RegionBasedCore { } } -impl RegionBasedCore { +impl RegionBasedReplay { /// Replay logs in same region. /// /// Steps: @@ -510,7 +509,7 @@ mod tests { use table_engine::table::TableId; use wal::log_batch::LogEntry; - use crate::instance::wal_replayer::{RegionBasedCore, TableBatch}; + use crate::instance::wal_replayer::{RegionBasedReplay, TableBatch}; #[test] fn test_split_log_batch_by_table() { @@ -594,7 +593,7 @@ mod tests { fn check_split_result(batch: &VecDeque>, expected: &[TableBatch]) { let mut table_batches = Vec::new(); - RegionBasedCore::split_log_batch_by_table(batch, &mut table_batches); + RegionBasedReplay::split_log_batch_by_table(batch, &mut table_batches); assert_eq!(&table_batches, expected); } } From eafbd5b878d46bf747b47b945b31ca1f0fb6e1af Mon Sep 17 00:00:00 2001 From: kamille Date: Thu, 15 Jun 2023 14:33:12 +0800 Subject: [PATCH 14/15] add TODO to the check logic in `replay_table_log_entries`. --- analytic_engine/src/instance/wal_replayer.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/analytic_engine/src/instance/wal_replayer.rs b/analytic_engine/src/instance/wal_replayer.rs index bd798756bf..407ee5102a 100644 --- a/analytic_engine/src/instance/wal_replayer.rs +++ b/analytic_engine/src/instance/wal_replayer.rs @@ -428,6 +428,8 @@ async fn replay_table_log_entries( row_group ); + // TODO: too strict check here, should be modified to like what in + // `ColumnSchema::compatible_for_write`.` let table_schema_version = table_data.schema_version(); if table_schema_version != row_group.schema().version() { // Data with old schema should already been flushed, but we avoid panic From a80131b96cd545aa709a78f6ca66a3b899ea803d Mon Sep 17 00:00:00 2001 From: kamille Date: Thu, 15 Jun 2023 14:49:37 +0800 Subject: [PATCH 15/15] fix some naming problem. --- .../src/instance/flush_compaction.rs | 5 +---- analytic_engine/src/instance/open.rs | 12 +++++------ analytic_engine/src/instance/wal_replayer.rs | 21 +++++++------------ 3 files changed, 15 insertions(+), 23 deletions(-) diff --git a/analytic_engine/src/instance/flush_compaction.rs b/analytic_engine/src/instance/flush_compaction.rs index 1225370aec..4e860def74 100644 --- a/analytic_engine/src/instance/flush_compaction.rs +++ b/analytic_engine/src/instance/flush_compaction.rs @@ -295,10 +295,7 @@ impl Flusher { runtime: self.runtime.clone(), write_sst_max_buffer_size: self.write_sst_max_buffer_size, }; - let flush_job = async move { - let _table_data = &flush_task.table_data; - flush_task.run().await - }; + let flush_job = async move { flush_task.run().await }; flush_scheduler .flush_sequentially(flush_job, block_on, opts, &self.runtime, table_data.clone()) diff --git a/analytic_engine/src/instance/open.rs b/analytic_engine/src/instance/open.rs index 212dd81e20..70e9d75fee 100644 --- a/analytic_engine/src/instance/open.rs +++ b/analytic_engine/src/instance/open.rs @@ -210,7 +210,7 @@ impl ShardOpener { max_retry_flush_limit: usize, recover_mode: RecoverMode, ) -> Result { - let mut states = HashMap::with_capacity(shard_context.table_ctxs.len()); + let mut stages = HashMap::with_capacity(shard_context.table_ctxs.len()); for table_ctx in shard_context.table_ctxs { let space = &table_ctx.space; let table_id = table_ctx.table_def.id; @@ -224,14 +224,14 @@ impl ShardOpener { space: table_ctx.space, }) }; - states.insert(table_id, state); + stages.insert(table_id, state); } Ok(Self { shard_id: shard_context.shard_id, manifest, wal_manager, - stages: states, + stages, wal_replay_batch_size, flusher, max_retry_flush_limit, @@ -247,9 +247,9 @@ impl ShardOpener { self.recover_table_datas().await?; // Retrieve the table results and return. - let states = std::mem::take(&mut self.stages); - let mut table_results = HashMap::with_capacity(states.len()); - for (table_id, state) in states { + let stages = std::mem::take(&mut self.stages); + let mut table_results = HashMap::with_capacity(stages.len()); + for (table_id, state) in stages { match state { TableOpenStage::Failed(e) => { table_results.insert(table_id, Err(e)); diff --git a/analytic_engine/src/instance/wal_replayer.rs b/analytic_engine/src/instance/wal_replayer.rs index 407ee5102a..5d494450e4 100644 --- a/analytic_engine/src/instance/wal_replayer.rs +++ b/analytic_engine/src/instance/wal_replayer.rs @@ -5,6 +5,7 @@ use std::{ collections::{HashMap, VecDeque}, fmt::Display, + ops::Range, }; use async_trait::async_trait; @@ -318,7 +319,7 @@ impl RegionBasedReplay { context.max_retry_flush_limit, &mut ctx.serial_exec, &ctx.table_data, - log_batch.range(table_batch.start_log_idx..table_batch.end_log_idx), + log_batch.range(table_batch.range), ) .await; @@ -363,8 +364,7 @@ impl RegionBasedReplay { if found_end_idx { table_batches.push(TableBatch { table_id: TableId::new(start_table_id), - start_log_idx, - end_log_idx: curr_log_idx, + range: start_log_idx..curr_log_idx, }); // Step to next start idx. @@ -388,8 +388,7 @@ impl RegionBasedReplay { #[derive(Debug, Eq, PartialEq)] struct TableBatch { table_id: TableId, - start_log_idx: usize, - end_log_idx: usize, + range: Range, } struct SerialExecContext<'a> { @@ -557,18 +556,15 @@ mod tests { let expected1 = vec![ TableBatch { table_id: TableId::new(0), - start_log_idx: 0, - end_log_idx: 3, + range: 0..3, }, TableBatch { table_id: TableId::new(1), - start_log_idx: 3, - end_log_idx: 5, + range: 3..5, }, TableBatch { table_id: TableId::new(2), - start_log_idx: 5, - end_log_idx: 6, + range: 5..6, }, ]; @@ -579,8 +575,7 @@ mod tests { }]); let expected2 = vec![TableBatch { table_id: TableId::new(0), - start_log_idx: 0, - end_log_idx: 1, + range: 0..1, }]; let test_log_batch3: VecDeque> = VecDeque::default();