diff --git a/Cargo.lock b/Cargo.lock index 03787588f9..98f6bd9a40 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -107,6 +107,7 @@ dependencies = [ "hex", "hyperloglog", "id_allocator", + "itertools 0.10.5", "lazy_static", "logger", "lru 0.7.8", diff --git a/analytic_engine/Cargo.toml b/analytic_engine/Cargo.toml index 46ea69eee0..e575bd465f 100644 --- a/analytic_engine/Cargo.toml +++ b/analytic_engine/Cargo.toml @@ -50,6 +50,7 @@ generic_error = { workspace = true } hex = { workspace = true } hyperloglog = { workspace = true } id_allocator = { workspace = true } +itertools = { workspace = true } lazy_static = { workspace = true } logger = { workspace = true } lru = { workspace = true } diff --git a/analytic_engine/src/instance/flush_compaction.rs b/analytic_engine/src/instance/flush_compaction.rs index f8911aaecf..fa66d78953 100644 --- a/analytic_engine/src/instance/flush_compaction.rs +++ b/analytic_engine/src/instance/flush_compaction.rs @@ -196,6 +196,10 @@ pub struct TableFlushRequest { pub table_data: TableDataRef, /// Max sequence number to flush (inclusive). pub max_sequence: SequenceNumber, + + /// We may suggest new primary keys in preflush. if suggestion happened, we + /// need to ensure data is in new order. + need_reorder: bool, } #[derive(Clone)] @@ -287,7 +291,7 @@ impl FlushTask { // Start flush duration timer. let local_metrics = self.table_data.metrics.local_flush_metrics(); let _timer = local_metrics.start_flush_timer(); - self.dump_memtables(request_id, &mems_to_flush) + self.dump_memtables(request_id, &mems_to_flush, flush_req.need_reorder) .await .box_err() .context(FlushJobWithCause { @@ -316,6 +320,7 @@ impl FlushTask { let mut last_sequence = table_data.last_sequence(); // Switch (freeze) all mutable memtables. And update segment duration if // suggestion is returned. + let mut need_reorder = false; if let Some(suggest_segment_duration) = current_version.suggest_duration() { info!( "Update segment duration, table:{}, table_id:{}, segment_duration:{:?}", @@ -324,6 +329,7 @@ impl FlushTask { assert!(!suggest_segment_duration.is_zero()); if let Some(pk_idx) = current_version.suggest_primary_key() { + need_reorder = true; let mut schema = table_data.schema(); info!( "Update primary key, table:{}, table_id:{}, old:{:?}, new:{:?}", @@ -388,6 +394,7 @@ impl FlushTask { Ok(TableFlushRequest { table_data: table_data.clone(), max_sequence: last_sequence, + need_reorder, }) } @@ -401,6 +408,7 @@ impl FlushTask { &self, request_id: RequestId, mems_to_flush: &FlushableMemTables, + need_reorder: bool, ) -> Result<()> { let local_metrics = self.table_data.metrics.local_flush_metrics(); let mut files_to_level0 = Vec::with_capacity(mems_to_flush.memtables.len()); @@ -410,7 +418,12 @@ impl FlushTask { // process sampling memtable and frozen memtable if let Some(sampling_mem) = &mems_to_flush.sampling_mem { if let Some(seq) = self - .dump_sampling_memtable(request_id, sampling_mem, &mut files_to_level0) + .dump_sampling_memtable( + request_id, + sampling_mem, + &mut files_to_level0, + need_reorder, + ) .await? { flushed_sequence = seq; @@ -500,6 +513,7 @@ impl FlushTask { request_id: RequestId, sampling_mem: &SamplingMemTable, files_to_level0: &mut Vec, + need_reorder: bool, ) -> Result> { let (min_key, max_key) = match (sampling_mem.mem.min_key(), sampling_mem.mem.max_key()) { (Some(min_key), Some(max_key)) => (min_key, max_key), @@ -589,11 +603,13 @@ impl FlushTask { let iter = build_mem_table_iter(sampling_mem.mem.clone(), &self.table_data)?; let timestamp_idx = self.table_data.schema().timestamp_index(); - if let Some(pk_idx) = self.table_data.current_version().suggest_primary_key() { + if need_reorder { + let schema = self.table_data.schema(); + let primary_key_indexes = schema.primary_key_indexes(); let reorder = Reorder { iter, schema: self.table_data.schema(), - order_by_col_indexes: pk_idx, + order_by_col_indexes: primary_key_indexes.to_vec(), }; let mut stream = reorder.into_stream().await.context(ReorderMemIter)?; while let Some(data) = stream.next().await { diff --git a/analytic_engine/src/instance/mod.rs b/analytic_engine/src/instance/mod.rs index 186f38912c..1ff48cc927 100644 --- a/analytic_engine/src/instance/mod.rs +++ b/analytic_engine/src/instance/mod.rs @@ -61,7 +61,7 @@ use crate::{ metrics::MaybeTableLevelMetrics, }, table::data::{TableDataRef, TableShardInfo}, - RecoverMode, TableOptions, + RecoverMode, TableOptions, WalEncodeConfig, }; #[allow(clippy::enum_variant_names)] @@ -186,6 +186,7 @@ pub struct Instance { pub(crate) scan_options: ScanOptions, pub(crate) iter_options: Option, pub(crate) recover_mode: RecoverMode, + pub(crate) wal_encode: WalEncodeConfig, } impl Instance { diff --git a/analytic_engine/src/instance/open.rs b/analytic_engine/src/instance/open.rs index f6999ec844..c3602ac641 100644 --- a/analytic_engine/src/instance/open.rs +++ b/analytic_engine/src/instance/open.rs @@ -146,6 +146,7 @@ impl Instance { iter_options, scan_options, recover_mode: ctx.config.recover_mode, + wal_encode: ctx.config.wal_encode, }); Ok(instance) diff --git a/analytic_engine/src/instance/reorder_memtable.rs b/analytic_engine/src/instance/reorder_memtable.rs index b29ecac02b..2e0901bbaa 100644 --- a/analytic_engine/src/instance/reorder_memtable.rs +++ b/analytic_engine/src/instance/reorder_memtable.rs @@ -39,7 +39,7 @@ use datafusion::{ execute_stream, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream as DfRecordBatchStream, SendableRecordBatchStream, Statistics, }, - prelude::{col, Expr, SessionConfig, SessionContext}, + prelude::{ident, Expr, SessionConfig, SessionContext}, sql::TableReference, }; use futures::{Stream, StreamExt}; @@ -241,7 +241,7 @@ impl Reorder { let columns = schema.columns(); let sort_exprs = sort_by_col_idx .iter() - .map(|i| col(&columns[*i].name).sort(true, true)) + .map(|i| ident(&columns[*i].name).sort(true, true)) .collect::>(); let df_plan = LogicalPlanBuilder::scan(DUMMY_TABLE_NAME, source, None)? .sort(sort_exprs)? diff --git a/analytic_engine/src/instance/wal_replayer.rs b/analytic_engine/src/instance/wal_replayer.rs index ecb5894bd6..23919de453 100644 --- a/analytic_engine/src/instance/wal_replayer.rs +++ b/analytic_engine/src/instance/wal_replayer.rs @@ -18,10 +18,14 @@ use std::{ collections::{HashMap, VecDeque}, fmt::Display, ops::Range, + sync::Arc, }; use async_trait::async_trait; -use common_types::{schema::IndexInWriterSchema, table::ShardId}; +use common_types::{ + schema::{IndexInWriterSchema, Schema}, + table::ShardId, +}; use generic_error::BoxError; use lazy_static::lazy_static; use logger::{debug, error, info, trace, warn}; @@ -44,7 +48,7 @@ use crate::{ serial_executor::TableOpSerialExecutor, write::MemTableWriter, }, - payload::{ReadPayload, WalDecoder}, + payload::{ReadPayload, SingleSchemaProviderAdapter, TableSchemaProvider, WalDecoder}, table::data::TableDataRef, }; @@ -173,7 +177,7 @@ impl Replay for TableBasedReplay { ) -> Result { debug!("Replay wal logs on table mode, context:{context}, tables:{table_datas:?}",); - let mut faileds = HashMap::new(); + let mut failed_tables = HashMap::new(); let read_ctx = ReadContext { batch_size: context.wal_replay_batch_size, ..Default::default() @@ -181,11 +185,11 @@ impl Replay for TableBasedReplay { for table_data in table_datas { let table_id = table_data.id; if let Err(e) = Self::recover_table_logs(context, table_data, &read_ctx).await { - faileds.insert(table_id, e); + failed_tables.insert(table_id, e); } } - Ok(faileds) + Ok(failed_tables) } } @@ -217,9 +221,14 @@ impl TableBasedReplay { loop { // fetch entries to log_entry_buf let _timer = PULL_LOGS_DURATION_HISTOGRAM.start_timer(); - let decoder = WalDecoder; + let adapter = SingleSchemaProviderAdapter { + schema: table_data.schema(), + }; + let decoder = WalDecoder::new(adapter); + // All the logs should belong the table, so no need to check again. + let filter = |_| true; log_entry_buf = log_iter - .next_log_entries(decoder, log_entry_buf) + .next_log_entries(decoder, filter, log_entry_buf) .await .box_err() .context(ReplayWalWithCause { msg: None })?; @@ -257,15 +266,26 @@ impl Replay for RegionBasedReplay { debug!("Replay wal logs on region mode, context:{context}, tables:{table_datas:?}",); // Init all table results to be oks, and modify to errs when failed to replay. - let mut faileds = FailedTables::new(); + let mut failed_tables = FailedTables::new(); let scan_ctx = ScanContext { batch_size: context.wal_replay_batch_size, ..Default::default() }; - Self::replay_region_logs(context, table_datas, &scan_ctx, &mut faileds).await?; + Self::replay_region_logs(context, table_datas, &scan_ctx, &mut failed_tables).await?; + + Ok(failed_tables) + } +} + +#[derive(Clone)] +struct TableSchemaProviderAdapter { + table_datas: Arc>, +} - Ok(faileds) +impl TableSchemaProvider for TableSchemaProviderAdapter { + fn table_schema(&self, table_id: common_types::table::TableId) -> Option { + self.table_datas.get(&table_id).map(|v| v.schema()) } } @@ -280,7 +300,7 @@ impl RegionBasedReplay { context: &ReplayContext, table_datas: &[TableDataRef], scan_ctx: &ScanContext, - faileds: &mut FailedTables, + failed_tables: &mut FailedTables, ) -> Result<()> { // Scan all wal logs of current shard. let scan_req = ScanRequest { @@ -297,6 +317,7 @@ impl RegionBasedReplay { // Lock all related tables. let mut serial_exec_ctxs = HashMap::with_capacity(table_datas.len()); + let mut table_datas_by_id = HashMap::with_capacity(table_datas.len()); for table_data in table_datas { let serial_exec = table_data.serial_exec.lock().await; let serial_exec_ctx = SerialExecContext { @@ -304,14 +325,21 @@ impl RegionBasedReplay { serial_exec, }; serial_exec_ctxs.insert(table_data.id, serial_exec_ctx); + table_datas_by_id.insert(table_data.id.as_u64(), table_data.clone()); } + let table_datas_by_id = Arc::new(table_datas_by_id); + let schema_provider = TableSchemaProviderAdapter { + table_datas: table_datas_by_id.clone(), + }; // Split and replay logs. loop { let _timer = PULL_LOGS_DURATION_HISTOGRAM.start_timer(); - let decoder = WalDecoder; + let decoder = WalDecoder::new(schema_provider.clone()); + let table_datas_for_filter = table_datas_by_id.clone(); + let log_filter = move |log_table_id| table_datas_for_filter.contains_key(&log_table_id); log_entry_buf = log_iter - .next_log_entries(decoder, log_entry_buf) + .next_log_entries(decoder, log_filter, log_entry_buf) .await .box_err() .context(ReplayWalWithCause { msg: None })?; @@ -321,8 +349,13 @@ impl RegionBasedReplay { } let _timer = APPLY_LOGS_DURATION_HISTOGRAM.start_timer(); - Self::replay_single_batch(context, &log_entry_buf, &mut serial_exec_ctxs, faileds) - .await?; + Self::replay_single_batch( + context, + &log_entry_buf, + &mut serial_exec_ctxs, + failed_tables, + ) + .await?; } Ok(()) @@ -332,7 +365,7 @@ impl RegionBasedReplay { context: &ReplayContext, log_batch: &VecDeque>, serial_exec_ctxs: &mut HashMap>, - faileds: &mut FailedTables, + failed_tables: &mut FailedTables, ) -> Result<()> { let mut table_batches = Vec::new(); // TODO: No `group_by` method in `VecDeque`, so implement it manually here... @@ -341,7 +374,7 @@ impl RegionBasedReplay { // TODO: Replay logs of different tables in parallel. for table_batch in table_batches { // Some tables may have failed in previous replay, ignore them. - if faileds.contains_key(&table_batch.table_id) { + if failed_tables.contains_key(&table_batch.table_id) { continue; } @@ -359,7 +392,7 @@ impl RegionBasedReplay { // If occur error, mark this table as failed and store the cause. if let Err(e) = result { - faileds.insert(table_batch.table_id, e); + failed_tables.insert(table_batch.table_id, e); } } } @@ -489,8 +522,8 @@ async fn replay_table_log_entries( let index_in_writer = IndexInWriterSchema::for_same_schema(row_group.schema().num_columns()); let memtable_writer = MemTableWriter::new(table_data.clone(), serial_exec); - let memtable_write_ret = memtable_writer - .write(sequence, &row_group.into(), index_in_writer) + let write_res = memtable_writer + .write(sequence, row_group, index_in_writer) .box_err() .context(ReplayWalWithCause { msg: Some(format!( @@ -498,7 +531,7 @@ async fn replay_table_log_entries( table_data.space_id, table_data.name, table_data.id )), }); - if let Err(e) = memtable_write_ret { + if let Err(e) = write_res { // TODO: find a better way to match this. if e.to_string().contains(crate::memtable::TOO_LARGE_MESSAGE) { // ignore this error diff --git a/analytic_engine/src/instance/write.rs b/analytic_engine/src/instance/write.rs index b2c09fce69..bed6a71eee 100644 --- a/analytic_engine/src/instance/write.rs +++ b/analytic_engine/src/instance/write.rs @@ -14,13 +14,19 @@ //! Write logic of instance +use std::iter; + use bytes_ext::ByteVec; use ceresdbproto::{schema as schema_pb, table_requests}; -use codec::row; +use codec::{ + columnar::{ColumnarEncoder, EncodeHint}, + row, +}; use common_types::{ - row::{RowGroup, RowGroupSlicer}, + row::RowGroup, schema::{IndexInWriterSchema, Schema}, }; +use itertools::Itertools; use logger::{debug, error, info, trace, warn}; use macros::define_result; use smallvec::SmallVec; @@ -28,6 +34,7 @@ use snafu::{ensure, Backtrace, ResultExt, Snafu}; use table_engine::table::WriteRequest; use wal::{ kv_encoder::LogBatchEncoder, + log_batch::Payload, manager::{SequenceNumber, WalLocation, WriteContext}, }; @@ -40,6 +47,7 @@ use crate::{ payload::WritePayload, space::SpaceRef, table::{data::TableDataRef, version::MemTableForWrite}, + WalEncodeConfig, WalEncodeFormat, }; #[derive(Debug, Snafu)] @@ -120,33 +128,98 @@ define_result!(Error); /// Max rows in a write request, must less than [u32::MAX] const MAX_ROWS_TO_WRITE: usize = 10_000_000; +/// The version used for [`table_requests::WriteRequest.version`]. +#[derive(Clone, Copy, Debug)] +pub enum WalEncodeVersion { + RowWise = 0, + Columnar, +} + +impl WalEncodeVersion { + #[inline] + pub fn as_u32(self) -> u32 { + match self { + Self::RowWise => 0, + Self::Columnar => 1, + } + } + + #[inline] + pub fn try_from_u32(v: u32) -> Option { + match v { + 0 => Some(Self::RowWise), + 1 => Some(Self::Columnar), + _ => None, + } + } +} + pub(crate) struct EncodeContext { pub row_group: RowGroup, pub index_in_writer: IndexInWriterSchema, - pub encoded_rows: Vec, } +enum EncodedPayload { + Cols(Vec), + Rows(Vec), +} impl EncodeContext { pub fn new(row_group: RowGroup) -> Self { Self { row_group, index_in_writer: IndexInWriterSchema::default(), - encoded_rows: Vec::new(), } } - pub fn encode_rows(&mut self, table_schema: &Schema) -> Result<()> { + fn encode( + &mut self, + config: &WalEncodeConfig, + table_schema: &Schema, + ) -> Result { + match config.format { + WalEncodeFormat::Columnar => self.encode_cols(config).map(EncodedPayload::Cols), + WalEncodeFormat::RowWise => self.encode_rows(table_schema).map(EncodedPayload::Rows), + } + } + + fn encode_cols(&mut self, config: &WalEncodeConfig) -> Result> { + let row_group_schema = self.row_group.schema(); + let mut encoded_cols = Vec::with_capacity(row_group_schema.num_columns()); + + for col_idx in 0..row_group_schema.num_columns() { + let col_schema = row_group_schema.column(col_idx); + let col_iter = self.row_group.iter_column(col_idx).map(|v| v.as_view()); + let enc = ColumnarEncoder::new( + col_schema.id, + config.num_bytes_compress_threshold.as_byte() as usize, + ); + let mut hint = EncodeHint { + num_nulls: None, + num_datums: Some(self.row_group.num_rows()), + datum_kind: col_schema.data_type, + }; + let sz = enc.estimated_encoded_size(col_iter.clone(), &mut hint); + let mut buf = Vec::with_capacity(sz); + enc.encode(&mut buf, col_iter, &mut hint).unwrap(); + encoded_cols.push(buf); + } + + Ok(encoded_cols) + } + + fn encode_rows(&mut self, table_schema: &Schema) -> Result> { + let mut encoded_rows = Vec::new(); row::encode_row_group_for_wal( &self.row_group, table_schema, &self.index_in_writer, - &mut self.encoded_rows, + &mut encoded_rows, ) .context(EncodeRowGroup)?; - assert_eq!(self.row_group.num_rows(), self.encoded_rows.len()); + assert_eq!(self.row_group.num_rows(), encoded_rows.len()); - Ok(()) + Ok(encoded_rows) } } @@ -160,15 +233,9 @@ struct WriteRowGroupSplitter { max_bytes_per_batch: usize, } -enum SplitResult<'a> { - Splitted { - encoded_batches: Vec>, - row_group_batches: Vec>, - }, - Integrate { - encoded_rows: Vec, - row_group: RowGroupSlicer<'a>, - }, +enum SplitResult { + Splitted { encoded_batches: Vec> }, + Integrate { encoded_rows: Vec }, } impl WriteRowGroupSplitter { @@ -179,34 +246,19 @@ impl WriteRowGroupSplitter { } /// Split the write request into multiple batches. - /// - /// NOTE: The length of the `encoded_rows` should be the same as the number - /// of rows in the `row_group`. - pub fn split<'a>( - &'_ self, - encoded_rows: Vec, - row_group: &'a RowGroup, - ) -> SplitResult<'a> { + pub fn split(&self, encoded_rows: Vec) -> SplitResult { let end_row_indexes = self.compute_batches(&encoded_rows); if end_row_indexes.len() <= 1 { // No need to split. - return SplitResult::Integrate { - encoded_rows, - row_group: RowGroupSlicer::from(row_group), - }; + return SplitResult::Integrate { encoded_rows }; } let mut prev_end_row_index = 0; let mut encoded_batches = Vec::with_capacity(end_row_indexes.len()); - let mut row_group_batches = Vec::with_capacity(end_row_indexes.len()); for end_row_index in &end_row_indexes { let end_row_index = *end_row_index; let curr_batch = Vec::with_capacity(end_row_index - prev_end_row_index); encoded_batches.push(curr_batch); - let row_group_slicer = - RowGroupSlicer::new(prev_end_row_index..end_row_index, row_group); - row_group_batches.push(row_group_slicer); - prev_end_row_index = end_row_index; } @@ -218,10 +270,7 @@ impl WriteRowGroupSplitter { encoded_batches[current_batch_idx].push(encoded_row); } - SplitResult::Splitted { - encoded_batches, - row_group_batches, - } + SplitResult::Splitted { encoded_batches } } /// Compute the end row indexes in the original `encoded_rows` of each @@ -298,7 +347,7 @@ impl<'a> MemTableWriter<'a> { pub fn write( &self, sequence: SequenceNumber, - row_group: &RowGroupSlicer, + row_group: &RowGroup, index_in_writer: IndexInWriterSchema, ) -> Result<()> { let _timer = self.table_data.metrics.start_table_write_memtable_timer(); @@ -372,76 +421,96 @@ impl<'a> Writer<'a> { self.preprocess_write(&mut encode_ctx).await?; - { + let encoded_payload = { let _timer = self.table_data.metrics.start_table_write_encode_timer(); let schema = self.table_data.schema(); - encode_ctx.encode_rows(&schema)?; - } + encode_ctx.encode(&self.instance.wal_encode, &schema)? + }; + + let seq = match encoded_payload { + EncodedPayload::Rows(encoded_rows) => self.write_to_wal_in_rows(encoded_rows).await?, + EncodedPayload::Cols(encoded_cols) => self.write_to_wal_in_cols(encoded_cols).await?, + }; + // Write the row group to the memtable and update the state in the mem. + let table_data = self.table_data.clone(); let EncodeContext { row_group, index_in_writer, - encoded_rows, } = encode_ctx; + self.write_to_mem(&table_data, &row_group, index_in_writer, seq) + .await?; - let table_data = self.table_data.clone(); - let split_res = self.maybe_split_write_request(encoded_rows, &row_group); + Ok(row_group.num_rows()) + } + + async fn write_to_wal_in_rows(&self, encoded_rows: Vec) -> Result { + let split_res = self.maybe_split_write_request(encoded_rows); match split_res { - SplitResult::Integrate { - encoded_rows, - row_group, - } => { - self.write_table_row_group(&table_data, row_group, index_in_writer, encoded_rows) - .await?; + SplitResult::Integrate { encoded_rows } => { + let write_req = self.make_rowwise_write_request(encoded_rows); + let payload = WritePayload::Write(&write_req); + self.write_to_wal(iter::once(payload)).await } - SplitResult::Splitted { - encoded_batches, - row_group_batches, - } => { - for (encoded_rows, row_group) in encoded_batches.into_iter().zip(row_group_batches) - { - self.write_table_row_group( - &table_data, - row_group, - index_in_writer.clone(), - encoded_rows, - ) - .await?; - } + SplitResult::Splitted { encoded_batches } => { + let write_reqs = encoded_batches + .into_iter() + .map(|v| self.make_rowwise_write_request(v)) + .collect_vec(); + + let payload = write_reqs.iter().map(WritePayload::Write); + self.write_to_wal(payload).await } } + } - Ok(row_group.num_rows()) + async fn write_to_wal_in_cols(&self, encoded_cols: Vec) -> Result { + let write_req = table_requests::WriteRequest { + version: WalEncodeVersion::Columnar.as_u32(), + schema: None, + rows: vec![], + cols: encoded_cols, + }; + let payload = WritePayload::Write(&write_req); + + self.write_to_wal(iter::once(payload)).await } - fn maybe_split_write_request<'b>( - &'a self, + fn make_rowwise_write_request( + &self, encoded_rows: Vec, - row_group: &'b RowGroup, - ) -> SplitResult<'b> { + ) -> table_requests::WriteRequest { + table_requests::WriteRequest { + version: WalEncodeVersion::RowWise.as_u32(), + // Use the table schema instead of the schema in request to avoid schema + // mismatch during replaying + schema: Some(schema_pb::TableSchema::from(&self.table_data.schema())), + rows: encoded_rows, + cols: vec![], + } + } + + fn maybe_split_write_request(&self, encoded_rows: Vec) -> SplitResult { if self.instance.max_bytes_per_write_batch.is_none() { - return SplitResult::Integrate { - encoded_rows, - row_group: RowGroupSlicer::from(row_group), - }; + return SplitResult::Integrate { encoded_rows }; } let splitter = WriteRowGroupSplitter::new(self.instance.max_bytes_per_write_batch.unwrap()); - splitter.split(encoded_rows, row_group) + splitter.split(encoded_rows) } - async fn write_table_row_group( + /// Write `row_group` to memtable and update the memory states. + async fn write_to_mem( &mut self, table_data: &TableDataRef, - row_group: RowGroupSlicer<'_>, + row_group: &RowGroup, index_in_writer: IndexInWriterSchema, - encoded_rows: Vec, + sequence: SequenceNumber, ) -> Result<()> { - let sequence = self.write_to_wal(encoded_rows).await?; let memtable_writer = MemTableWriter::new(table_data.clone(), self.serial_exec); memtable_writer - .write(sequence, &row_group, index_in_writer) + .write(sequence, row_group, index_in_writer) .map_err(|e| { error!( "Failed to write to memtable, table:{}, table_id:{}, err:{}", @@ -557,29 +626,22 @@ impl<'a> Writer<'a> { } /// Write log_batch into wal, return the sequence number of log_batch. - async fn write_to_wal(&self, encoded_rows: Vec) -> Result { + async fn write_to_wal(&self, payloads: I) -> Result + where + I: Iterator, + P: Payload, + { let _timer = self.table_data.metrics.start_table_write_wal_timer(); - // Convert into pb - let write_req_pb = table_requests::WriteRequest { - // FIXME: Shall we avoid the magic number here? - version: 0, - // Use the table schema instead of the schema in request to avoid schema - // mismatch during replaying - schema: Some(schema_pb::TableSchema::from(&self.table_data.schema())), - rows: encoded_rows, - cols: Vec::new(), - }; - - // Encode payload - let payload = WritePayload::Write(&write_req_pb); let table_location = self.table_data.table_location(); let wal_location = instance::create_wal_location(table_location.id, table_location.shard_info); let log_batch_encoder = LogBatchEncoder::create(wal_location); - let log_batch = log_batch_encoder.encode(&payload).context(EncodePayloads { - table: &self.table_data.name, - wal_location, - })?; + let log_batch = log_batch_encoder + .encode_batch(payloads) + .context(EncodePayloads { + table: &self.table_data.name, + wal_location, + })?; // Write to wal manager let write_ctx = WriteContext::default(); @@ -673,6 +735,7 @@ mod tests { let schema = SchemaBuilder::new() .add_key_column(column_schema) .unwrap() + .primary_key_indexes(vec![0]) .build() .unwrap(); let row_group = RowGroupBuilder::with_rows(schema, rows).unwrap().build(); @@ -736,40 +799,24 @@ mod tests { } }; for (batch_size, sizes, expected_batches) in cases { - let (encoded_rows, row_group) = generate_rows_for_test(sizes.clone()); + let (encoded_rows, _) = generate_rows_for_test(sizes.clone()); let write_row_group_splitter = WriteRowGroupSplitter::new(batch_size); - let split_res = write_row_group_splitter.split(encoded_rows, &row_group); + let split_res = write_row_group_splitter.split(encoded_rows); if expected_batches.is_empty() { assert!(matches!(split_res, SplitResult::Integrate { .. })); } else if expected_batches.len() == 1 { assert!(matches!(split_res, SplitResult::Integrate { .. })); - if let SplitResult::Integrate { - encoded_rows, - row_group, - } = split_res - { + if let SplitResult::Integrate { encoded_rows } = split_res { check_encoded_rows(&encoded_rows, &expected_batches[0]); - assert_eq!(row_group.num_rows(), expected_batches[0].len()); } } else { assert!(matches!(split_res, SplitResult::Splitted { .. })); - if let SplitResult::Splitted { - encoded_batches, - row_group_batches, - } = split_res - { - assert_eq!(encoded_batches.len(), row_group_batches.len()); + if let SplitResult::Splitted { encoded_batches } = split_res { assert_eq!(encoded_batches.len(), expected_batches.len()); - let mut batch_start_index = 0; - for ((encoded_batch, row_group_batch), expected_batch) in encoded_batches - .iter() - .zip(row_group_batches.iter()) - .zip(expected_batches.iter()) + for (encoded_batch, expected_batch) in + encoded_batches.iter().zip(expected_batches.iter()) { check_encoded_rows(encoded_batch, expected_batch); - assert_eq!(row_group_batch.num_rows(), expected_batch.len()); - assert_eq!(row_group_batch.slice_range().start, batch_start_index); - batch_start_index += expected_batch.len(); } } } diff --git a/analytic_engine/src/lib.rs b/analytic_engine/src/lib.rs index ff740d825b..9e8cb1dee2 100644 --- a/analytic_engine/src/lib.rs +++ b/analytic_engine/src/lib.rs @@ -103,6 +103,8 @@ pub struct Config { pub max_bytes_per_write_batch: Option, /// The interval for sampling the memory usage pub mem_usage_sampling_interval: ReadableDuration, + /// The config for log in the wal. + pub wal_encode: WalEncodeConfig, /// Wal storage config /// @@ -135,6 +137,28 @@ pub enum RecoverMode { ShardBased, } +#[derive(Debug, Clone, Copy, Deserialize, Serialize)] +pub enum WalEncodeFormat { + RowWise, + Columnar, +} +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct WalEncodeConfig { + /// The threshold of columnar bytes to do compression. + pub num_bytes_compress_threshold: ReadableSize, + /// Encode the data in a columnar layout if it is set. + pub format: WalEncodeFormat, +} + +impl Default for WalEncodeConfig { + fn default() -> Self { + Self { + num_bytes_compress_threshold: ReadableSize::kb(1), + format: WalEncodeFormat::RowWise, + } + } +} + impl Default for Config { fn default() -> Self { Self { @@ -163,6 +187,7 @@ impl Default for Config { max_retry_flush_limit: 0, max_bytes_per_write_batch: None, mem_usage_sampling_interval: ReadableDuration::secs(0), + wal_encode: WalEncodeConfig::default(), wal: StorageConfig::RocksDB(Box::default()), remote_engine_client: remote_engine_client::config::Config::default(), recover_mode: RecoverMode::TableBased, diff --git a/analytic_engine/src/manifest/details.rs b/analytic_engine/src/manifest/details.rs index 4d44cc45e4..e58bfce1b4 100644 --- a/analytic_engine/src/manifest/details.rs +++ b/analytic_engine/src/manifest/details.rs @@ -192,7 +192,7 @@ impl MetaUpdateLogEntryIterator for MetaUpdateReaderImpl { let buffer = mem::take(&mut self.buffer); self.buffer = self .iter - .next_log_entries(decoder, buffer) + .next_log_entries(decoder, |_| true, buffer) .await .context(ReadEntry)?; } @@ -766,6 +766,7 @@ mod tests { fn build_altered_schema(schema: &Schema) -> Schema { let mut builder = schema::Builder::new().auto_increment_column_id(true); + let old_pk_indexes = schema.primary_key_indexes(); for column_schema in schema.key_columns() { builder = builder .add_key_column(column_schema.clone()) @@ -783,6 +784,7 @@ mod tests { .expect("should succeed build column schema"), ) .unwrap() + .primary_key_indexes(old_pk_indexes.to_vec()) .build() .unwrap() } diff --git a/analytic_engine/src/manifest/meta_edit.rs b/analytic_engine/src/manifest/meta_edit.rs index 34cc70a339..d55b66f83c 100644 --- a/analytic_engine/src/manifest/meta_edit.rs +++ b/analytic_engine/src/manifest/meta_edit.rs @@ -26,7 +26,7 @@ use macros::define_result; use prost::Message; use snafu::{Backtrace, OptionExt, ResultExt, Snafu}; use table_engine::table::TableId; -use wal::log_batch::{Payload, PayloadDecoder}; +use wal::log_batch::{Payload, PayloadDecodeContext, PayloadDecoder}; use crate::{ manifest::meta_snapshot::MetaSnapshot, @@ -408,7 +408,7 @@ impl PayloadDecoder for MetaUpdateDecoder { type Error = Error; type Target = MetaUpdate; - fn decode(&self, buf: &mut B) -> Result { + fn decode(&self, _ctx: &PayloadDecodeContext, buf: &mut B) -> Result { let meta_update_pb = manifest_pb::MetaUpdate::decode(buf.chunk()).context(DecodePayloadPb)?; MetaUpdate::try_from(meta_update_pb) diff --git a/analytic_engine/src/payload.rs b/analytic_engine/src/payload.rs index 778f442100..032b40c5cf 100644 --- a/analytic_engine/src/payload.rs +++ b/analytic_engine/src/payload.rs @@ -16,17 +16,22 @@ use bytes_ext::{Buf, BufMut, SafeBuf, SafeBufMut}; use ceresdbproto::{manifest as manifest_pb, table_requests}; -use codec::{row::WalRowDecoder, Decoder}; +use codec::{ + columnar::{ColumnarDecoder, DecodeContext, DecodeResult}, + row::WalRowDecoder, + Decoder, +}; use common_types::{ - row::{RowGroup, RowGroupBuilder}, + row::{RowGroup, RowGroupBuilder, RowGroupBuilderFromColumn}, schema::Schema, + table::TableId, }; use macros::define_result; use prost::Message; use snafu::{Backtrace, OptionExt, ResultExt, Snafu}; -use wal::log_batch::{Payload, PayloadDecoder}; +use wal::log_batch::{Payload, PayloadDecodeContext, PayloadDecoder}; -use crate::{table_options, TableOptions}; +use crate::{instance::write::WalEncodeVersion, table_options, TableOptions}; #[derive(Debug, Snafu)] pub enum Error { @@ -61,10 +66,18 @@ pub enum Error { #[snafu(display("Failed to decode row, err:{}", source))] DecodeRow { source: codec::row::Error }, + #[snafu(display("Failed to decode column, err:{}", source))] + DecodeColumn { source: codec::columnar::Error }, + + #[snafu(display("Failed to build row group, err:{}", source))] + BuildRowGroup { source: common_types::row::Error }, + #[snafu(display( - "Table schema is not found in the write request.\nBacktrace:\n{}", - backtrace + "Invalid version of write request, version:{version}.\nBacktrace:\n{backtrace}" ))] + InvalidWriteReqVersion { version: u32, backtrace: Backtrace }, + + #[snafu(display("Table schema is not found.\nBacktrace:\n{}", backtrace))] TableSchemaNotFound { backtrace: Backtrace }, #[snafu(display( @@ -163,10 +176,23 @@ pub enum ReadPayload { } impl ReadPayload { - fn decode_write_from_pb(buf: &[u8]) -> Result { + fn decode_write_from_pb(schema: &Schema, buf: &[u8]) -> Result { let write_req_pb: table_requests::WriteRequest = Message::decode(buf).context(DecodeBody)?; + let version = { + let version = write_req_pb.version; + WalEncodeVersion::try_from_u32(version).context(InvalidWriteReqVersion { version })? + }; + match version { + WalEncodeVersion::RowWise => Self::decode_rowwise_write_req(write_req_pb), + WalEncodeVersion::Columnar => { + Self::decode_columnar_write_req(schema.clone(), write_req_pb) + } + } + } + + fn decode_rowwise_write_req(write_req_pb: table_requests::WriteRequest) -> Result { // Consume and convert schema in pb let schema: Schema = write_req_pb .schema @@ -191,6 +217,33 @@ impl ReadPayload { Ok(Self::Write { row_group }) } + fn decode_columnar_write_req( + schema: Schema, + write_req_pb: table_requests::WriteRequest, + ) -> Result { + let encoded_cols = write_req_pb.cols; + let mut row_group_builder = + RowGroupBuilderFromColumn::with_capacity(schema, encoded_cols.len()); + let mut decode_buf = Vec::new(); + for encoded_col in encoded_cols { + let decoder = ColumnarDecoder; + let mut col_buf = encoded_col.as_slice(); + let decode_ctx = DecodeContext { + buf: &mut decode_buf, + }; + let DecodeResult { column_id, datums } = decoder + .decode(decode_ctx, &mut col_buf) + .context(DecodeColumn)?; + + row_group_builder + .try_add_column(column_id, datums) + .context(BuildRowGroup)?; + } + + let row_group = row_group_builder.build(); + Ok(Self::Write { row_group }) + } + fn decode_alter_schema_from_pb(buf: &[u8]) -> Result { let alter_schema_meta_pb: manifest_pb::AlterSchemaMeta = Message::decode(buf).context(DecodeBody)?; @@ -220,15 +273,40 @@ impl ReadPayload { } } +/// The provider is used to provide the schema according to the table id. +pub trait TableSchemaProvider { + fn table_schema(&self, table_id: TableId) -> Option; +} + +pub struct SingleSchemaProviderAdapter { + pub schema: Schema, +} + +impl TableSchemaProvider for SingleSchemaProviderAdapter { + fn table_schema(&self, _table_id: TableId) -> Option { + Some(self.schema.clone()) + } +} + /// Wal payload decoder -#[derive(Default)] -pub struct WalDecoder; +pub struct WalDecoder

{ + schema_provider: P, +} + +impl WalDecoder

{ + pub fn new(schema_provider: P) -> Self { + Self { schema_provider } + } +} -impl PayloadDecoder for WalDecoder { +impl

PayloadDecoder for WalDecoder

+where + P: TableSchemaProvider + Send + Sync, +{ type Error = Error; type Target = ReadPayload; - fn decode(&self, buf: &mut B) -> Result { + fn decode(&self, ctx: &PayloadDecodeContext, buf: &mut B) -> Result { let header_value = buf.try_get_u8().context(DecodeHeader)?; let header = match Header::from_u8(header_value) { Some(header) => header, @@ -241,8 +319,12 @@ impl PayloadDecoder for WalDecoder { }; let chunk = buf.chunk(); + let schema = self + .schema_provider + .table_schema(ctx.table_id) + .context(TableSchemaNotFound)?; let payload = match header { - Header::Write => ReadPayload::decode_write_from_pb(chunk)?, + Header::Write => ReadPayload::decode_write_from_pb(&schema, chunk)?, Header::AlterSchema => ReadPayload::decode_alter_schema_from_pb(chunk)?, Header::AlterOption => ReadPayload::decode_alter_option_from_pb(chunk)?, }; diff --git a/analytic_engine/src/sampler.rs b/analytic_engine/src/sampler.rs index 0e5445169c..86d85e56fc 100644 --- a/analytic_engine/src/sampler.rs +++ b/analytic_engine/src/sampler.rs @@ -292,7 +292,7 @@ impl PrimaryKeySampler { .iter() .enumerate() .map(|(idx, col)| { - if idx == timestamp_index { + if col.data_type.is_timestamp() { return None; } diff --git a/analytic_engine/src/sst/meta_data/cache.rs b/analytic_engine/src/sst/meta_data/cache.rs index 720fc77971..d853b3fe98 100644 --- a/analytic_engine/src/sst/meta_data/cache.rs +++ b/analytic_engine/src/sst/meta_data/cache.rs @@ -307,6 +307,7 @@ mod tests { .unwrap() .add_key_column(timestamp_column_schema) .unwrap() + .primary_key_indexes(vec![0, 1]) .build() .unwrap() }; diff --git a/analytic_engine/src/table/mod.rs b/analytic_engine/src/table/mod.rs index 554f2d08eb..a502715e32 100644 --- a/analytic_engine/src/table/mod.rs +++ b/analytic_engine/src/table/mod.rs @@ -637,6 +637,7 @@ mod tests { ) -> WriteRequest { let schema = FixedSchemaTable::default_schema_builder() .version(schema_version) + .primary_key_indexes(vec![0, 1]) .build() .unwrap(); let mut schema_rows = Vec::with_capacity(num_rows); diff --git a/analytic_engine/src/tests/alter_test.rs b/analytic_engine/src/tests/alter_test.rs index 5687cfcc00..579c2be65d 100644 --- a/analytic_engine/src/tests/alter_test.rs +++ b/analytic_engine/src/tests/alter_test.rs @@ -132,7 +132,10 @@ async fn alter_schema_same_schema_version_case( let mut schema_builder = FixedSchemaTable::default_schema_builder(); schema_builder = add_columns(schema_builder); - let new_schema = schema_builder.build().unwrap(); + let new_schema = schema_builder + .primary_key_indexes(vec![0, 1]) + .build() + .unwrap(); let table = test_ctx.table(table_name); let old_schema = table.schema(); @@ -160,6 +163,7 @@ async fn alter_schema_old_pre_version_case( let new_schema = schema_builder .version(old_schema.version() + 1) + .primary_key_indexes(old_schema.primary_key_indexes().to_vec()) .build() .unwrap(); @@ -190,6 +194,7 @@ async fn alter_schema_add_column_case( let new_schema = schema_builder .version(old_schema.version() + 1) + .primary_key_indexes(old_schema.primary_key_indexes().to_vec()) .build() .unwrap(); diff --git a/analytic_engine/src/tests/drop_test.rs b/analytic_engine/src/tests/drop_test.rs index 2f7dfa97fe..a9c0510dc5 100644 --- a/analytic_engine/src/tests/drop_test.rs +++ b/analytic_engine/src/tests/drop_test.rs @@ -278,8 +278,10 @@ fn test_alter_schema_drop_create(engine_context: T) { .unwrap(), ) .unwrap(); + let new_schema = schema_builder .version(old_schema.version() + 1) + .primary_key_indexes(old_schema.primary_key_indexes().to_vec()) .build() .unwrap(); let request = AlterSchemaRequest { diff --git a/analytic_engine/src/tests/table.rs b/analytic_engine/src/tests/table.rs index 4e139c30ec..ec6236d92f 100644 --- a/analytic_engine/src/tests/table.rs +++ b/analytic_engine/src/tests/table.rs @@ -336,7 +336,8 @@ pub fn create_schema_builder( assert!(!key_tuples.is_empty()); let mut schema_builder = schema::Builder::with_capacity(key_tuples.len() + normal_tuples.len()) - .auto_increment_column_id(true); + .auto_increment_column_id(true) + .primary_key_indexes((0..key_tuples.len()).collect()); for tuple in key_tuples { // Key column is not nullable. diff --git a/benchmarks/src/wal_write_bench.rs b/benchmarks/src/wal_write_bench.rs index a3dbf1147c..17d435dd15 100644 --- a/benchmarks/src/wal_write_bench.rs +++ b/benchmarks/src/wal_write_bench.rs @@ -80,7 +80,7 @@ impl WalWriteBench { let wal = WalNamespaceImpl::open( MemoryImpl::default(), runtimes.clone(), - "ceresedb", + "ceresdb", NamespaceConfig::default(), ) .await @@ -88,8 +88,9 @@ impl WalWriteBench { let values = self.build_value_vec(); let wal_encoder = LogBatchEncoder::create(WalLocation::new(1, 1)); + let payloads = values.iter().map(|v| WritePayload(v)); let log_batch = wal_encoder - .encode_batch::>(values.as_slice()) + .encode_batch(payloads) .expect("should succeed to encode payload batch"); // Write to wal manager diff --git a/common_types/src/datum.rs b/common_types/src/datum.rs index 83cd2fc87f..e157b870da 100644 --- a/common_types/src/datum.rs +++ b/common_types/src/datum.rs @@ -521,6 +521,7 @@ impl Datum { /// Cast datum to timestamp. pub fn as_timestamp(&self) -> Option { match self { + Datum::Time(v) => Some(Timestamp::new(*v)), Datum::Timestamp(v) => Some(*v), _ => None, } @@ -1270,6 +1271,7 @@ impl<'a> DatumView<'a> { pub fn as_timestamp(&self) -> Option { match self { DatumView::Timestamp(v) => Some(*v), + DatumView::Time(v) => Some(Timestamp::new(*v)), _ => None, } } diff --git a/common_types/src/row/mod.rs b/common_types/src/row/mod.rs index c43295b1ec..7d861c4bfd 100644 --- a/common_types/src/row/mod.rs +++ b/common_types/src/row/mod.rs @@ -16,13 +16,14 @@ use std::{ cmp, - ops::{Index, IndexMut, Range}, + collections::HashMap, + ops::{Index, IndexMut}, }; use snafu::{ensure, Backtrace, OptionExt, Snafu}; use crate::{ - column_schema::ColumnSchema, + column_schema::{ColumnId, ColumnSchema}, datum::{Datum, DatumKind, DatumView}, record_batch::RecordBatchWithKey, schema::{RecordSchemaWithKey, Schema}, @@ -47,7 +48,19 @@ pub enum Error { }, #[snafu(display( - "Invalid column num of row, expect:{}, given:{}.\nBacktrace:\n{}", + "Invalid row num, expect:{}, given:{}.\nBacktrace:\n{}", + expect, + given, + backtrace + ))] + InvalidRowNum { + expect: usize, + given: usize, + backtrace: Backtrace, + }, + + #[snafu(display( + "Invalid column num, expect:{}, given:{}.\nBacktrace:\n{}", expect, given, backtrace @@ -99,6 +112,14 @@ pub enum Error { column: String, backtrace: Backtrace, }, + + #[snafu(display( + "Duplicate column id is found, column_id:{column_id}.\nBacktrace:\n{backtrace}", + ))] + DuplicateColumnId { + column_id: ColumnId, + backtrace: Backtrace, + }, } // Do not depend on test_util crates @@ -215,54 +236,6 @@ pub fn check_row_schema(row: &Row, schema: &Schema) -> Result<()> { Ok(()) } -#[derive(Debug)] -pub struct RowGroupSlicer<'a> { - range: Range, - row_group: &'a RowGroup, -} - -impl<'a> From<&'a RowGroup> for RowGroupSlicer<'a> { - fn from(value: &'a RowGroup) -> RowGroupSlicer<'a> { - Self { - range: 0..value.rows.len(), - row_group: value, - } - } -} - -impl<'a> RowGroupSlicer<'a> { - pub fn new(range: Range, row_group: &'a RowGroup) -> Self { - Self { range, row_group } - } - - #[inline] - pub fn is_empty(&self) -> bool { - self.range.is_empty() - } - - #[inline] - pub fn schema(&self) -> &Schema { - self.row_group.schema() - } - - #[inline] - pub fn iter(&self) -> IterRow<'a> { - IterRow { - iter: self.row_group.rows[self.range.start..self.range.end].iter(), - } - } - - #[inline] - pub fn slice_range(&self) -> Range { - self.range.clone() - } - - #[inline] - pub fn num_rows(&self) -> usize { - self.range.len() - } -} - // TODO(yingwen): For multiple rows that share the same schema, no need to store // Datum for each row element, we can store the whole row as a binary and // provide more efficient way to convert rows into columns @@ -390,7 +363,7 @@ impl<'a> Iterator for IterRow<'a> { } } -#[derive(Debug)] +#[derive(Clone, Debug)] pub struct IterCol<'a> { rows: &'a Vec, row_index: usize, @@ -421,6 +394,118 @@ impl<'a> Iterator for IterCol<'a> { } } +/// Build the [`RowGroup`] from the columns. +pub struct RowGroupBuilderFromColumn { + schema: Schema, + cols: HashMap>, +} + +impl RowGroupBuilderFromColumn { + pub fn with_capacity(schema: Schema, num_cols: usize) -> Self { + Self { + schema, + cols: HashMap::with_capacity(num_cols), + } + } + + /// The newly-added column should have the same elements as the + /// previously-added column's. + pub fn try_add_column(&mut self, col_id: ColumnId, col: Vec) -> Result<()> { + if let Some(num_rows) = self.num_rows() { + ensure!( + num_rows == col.len(), + InvalidRowNum { + expect: num_rows, + given: col.len(), + } + ); + } + + let old = self.cols.insert(col_id, col); + ensure!(old.is_none(), DuplicateColumnId { column_id: col_id }); + + Ok(()) + } + + pub fn build(mut self) -> RowGroup { + let num_rows = self.num_rows(); + if Some(0) == num_rows { + return RowGroup { + schema: self.schema, + rows: vec![], + min_timestamp: Timestamp::new(0), + max_timestamp: Timestamp::new(0), + }; + }; + + let num_rows = num_rows.unwrap(); + let num_cols = self.schema.num_columns(); + let mut rows = Vec::with_capacity(num_rows); + + // Pre-allocate the memory for column data in every row. + for _ in 0..num_rows { + let row = Vec::with_capacity(num_cols); + rows.push(row); + } + + let mut add_column_to_row = |row_idx: usize, datum: Datum| { + rows[row_idx].push(datum); + }; + + for col_schema in self.schema.columns() { + let col_id = col_schema.id; + let datums = self.cols.remove(&col_id); + + match datums { + Some(v) => { + for (row_idx, datum) in v.into_iter().enumerate() { + add_column_to_row(row_idx, datum); + } + } + None => { + for row_idx in 0..num_rows { + add_column_to_row(row_idx, Datum::Null); + } + } + } + } + + let rows = rows.into_iter().map(Row::from_datums).collect::>(); + + let (min_timestamp, max_timestamp) = self + .collect_minmax_timestamps(&rows) + .unwrap_or_else(|| (Timestamp::default(), Timestamp::default())); + + RowGroup { + schema: self.schema, + rows, + min_timestamp, + max_timestamp, + } + } + + #[inline] + fn num_rows(&self) -> Option { + self.cols.iter().next().map(|(_, v)| v.len()) + } + + fn collect_minmax_timestamps(&self, rows: &[Row]) -> Option<(Timestamp, Timestamp)> { + let timestamp_idx = self.schema.timestamp_index(); + if rows.is_empty() { + return None; + } + + rows.iter() + .fold(None, |prev: Option<(Timestamp, Timestamp)>, row| { + let timestamp = row[timestamp_idx].as_timestamp()?; + match prev { + None => Some((timestamp, timestamp)), + Some((min_ts, max_ts)) => Some((min_ts.min(timestamp), max_ts.max(timestamp))), + } + }) + } +} + /// RowGroup builder #[derive(Debug)] pub struct RowGroupBuilder { diff --git a/common_types/src/schema.rs b/common_types/src/schema.rs index fca2142832..e114c1e4d6 100644 --- a/common_types/src/schema.rs +++ b/common_types/src/schema.rs @@ -87,6 +87,16 @@ pub enum Error { backtrace: Backtrace, }, + #[snafu(display( + "Column id is missing in schema, id:{}.\nBacktrace:\n{}", + id, + backtrace + ))] + ColumnIdMissing { id: ColumnId, backtrace: Backtrace }, + + #[snafu(display("Primary key indexes cannot be empty.\nBacktrace:\n{}", backtrace))] + EmptyPirmaryKeyIndexes { backtrace: Backtrace }, + #[snafu(display( "Unsupported key column type, name:{}, type:{:?}.\nBacktrace:\n{}", name, @@ -114,13 +124,6 @@ pub enum Error { #[snafu(display("Timestamp not in primary key.\nBacktrace:\n{}", backtrace))] TimestampNotInPrimaryKey { backtrace: Backtrace }, - #[snafu(display( - "Key column cannot be nullable, name:{}.\nBacktrace:\n{}", - name, - backtrace - ))] - NullKeyColumn { name: String, backtrace: Backtrace }, - #[snafu(display( "Invalid arrow field, field_name:{}, arrow_schema:{:?}, err:{}", field_name, @@ -958,62 +961,37 @@ impl Schema { impl TryFrom for Schema { type Error = Error; - // We can't use Builder directly here, since it will disorder columns. fn try_from(schema: schema_pb::TableSchema) -> Result { + let mut builder = Builder::with_capacity(schema.columns.len()).version(schema.version); let primary_key_ids = schema.primary_key_ids; - let column_schemas = schema - .columns - .into_iter() - .map(|column_schema_pb| { - ColumnSchema::try_from(column_schema_pb).context(ColumnSchemaDeserializeFailed) + + let primary_key_indexes = primary_key_ids + .iter() + .cloned() + .map(|id| { + let col_idx = schema + .columns + .iter() + .enumerate() + .find_map(|(idx, col)| if col.id == id { Some(idx) } else { None }) + .context(ColumnIdMissing { id })?; + + Ok(col_idx) }) .collect::>>()?; + builder = builder.primary_key_indexes(primary_key_indexes); - let mut primary_key_indexes = Vec::with_capacity(primary_key_ids.len()); - let mut timestamp_index = None; - for pk_id in &primary_key_ids { - for (idx, col) in column_schemas.iter().enumerate() { - if col.id == *pk_id { - primary_key_indexes.push(idx); - if DatumKind::Timestamp == col.data_type { - // TODO: add a timestamp_id in schema_pb, so we can have two timestamp - // columns in primary keys. - if let Some(idx) = timestamp_index { - let column_schema: &ColumnSchema = &column_schemas[idx]; - return TimestampKeyExists { - timestamp_column: column_schema.name.to_string(), - given_column: col.name.clone(), - } - .fail(); - } - - timestamp_index = Some(idx); - } - break; - } + for column_schema_pb in schema.columns { + let column = + ColumnSchema::try_from(column_schema_pb).context(ColumnSchemaDeserializeFailed)?; + if primary_key_ids.contains(&column.id) { + builder = builder.add_key_column(column)?; + } else { + builder = builder.add_normal_column(column)?; } } - let timestamp_index = timestamp_index.context(TimestampNotInPrimaryKey)?; - let tsid_index = Builder::find_tsid_index(&column_schemas); - let fields = column_schemas - .iter() - .map(|c| c.to_arrow_field()) - .collect::>(); - let meta = Builder::build_arrow_schema_meta( - primary_key_indexes.clone(), - timestamp_index, - schema.version, - ); - - Ok(Schema { - arrow_schema: Arc::new(ArrowSchema::new_with_metadata(fields, meta)), - primary_key_indexes, - column_schemas: Arc::new(ColumnSchemas::new(column_schemas)), - version: schema.version, - tsid_index, - timestamp_index, - }) + builder.build() } } @@ -1090,8 +1068,6 @@ impl Builder { self.may_alloc_column_id(&mut column); self.validate_column(&column, true)?; - ensure!(!column.is_nullable, NullKeyColumn { name: column.name }); - // FIXME(xikai): it seems not reasonable to decide the timestamp column in this // way. let is_timestamp = DatumKind::Timestamp == column.data_type; @@ -1106,7 +1082,6 @@ impl Builder { self.timestamp_index = Some(self.columns.len()); } - self.primary_key_indexes.push(self.columns.len()); self.insert_new_column(column); Ok(self) @@ -1122,6 +1097,12 @@ impl Builder { Ok(self) } + /// Set primary key indexes of the schema + pub fn primary_key_indexes(mut self, indexes: Vec) -> Self { + self.primary_key_indexes = indexes; + self + } + /// Set version of the schema pub fn version(mut self, version: Version) -> Self { self.version = version; @@ -1264,7 +1245,10 @@ impl Builder { let timestamp_index = self.timestamp_index.context(TimestampNotInPrimaryKey)?; // Timestamp key column is exists, so key columns should not be zero - assert!(!self.primary_key_indexes.is_empty()); + ensure!( + !self.primary_key_indexes.is_empty(), + EmptyPirmaryKeyIndexes {} + ); let tsid_index = Self::find_tsid_index(&self.columns); let fields = self @@ -1373,6 +1357,7 @@ mod tests { .expect("should succeed build column schema"), ) .unwrap() + .primary_key_indexes(vec![0, 1]) .build() .unwrap() } @@ -1466,6 +1451,7 @@ mod tests { .expect("should succeed build column schema"), ) .unwrap() + .primary_key_indexes(vec![1, 2]) .build() .unwrap(); @@ -1566,6 +1552,7 @@ mod tests { .expect("should succeed build column schema"), ) .unwrap() + .primary_key_indexes(vec![0]) .build() .unwrap(); } @@ -1586,23 +1573,7 @@ mod tests { .expect("should succeed build column schema"), ) .unwrap(); - assert!(builder.build().is_err()); - } - - // Currently we allow null key column, maybe we can rename it to sorted column. - // Since we primary key in ceresdb isn't same with MySQL, and it only served for - // sort. - #[test] - fn test_null_key() { - assert!(Builder::new() - .add_key_column( - column_schema::Builder::new("key1".to_string(), DatumKind::Varbinary) - .id(1) - .is_nullable(true) - .build() - .expect("should succeed build column schema") - ) - .is_err()); + assert!(builder.primary_key_indexes(vec![0]).build().is_err()); } #[test] @@ -1637,6 +1608,7 @@ mod tests { .expect("should succeed build column schema"), ) .unwrap() + .primary_key_indexes(vec![0]) .build() .unwrap(); @@ -1690,6 +1662,7 @@ mod tests { .expect("should succeed build column schema"), ) .unwrap() + .primary_key_indexes(vec![0, 1]) .build() .unwrap(); @@ -1796,6 +1769,7 @@ mod tests { .expect("should succeed build column schema"), ) .unwrap() + .primary_key_indexes(vec![0, 1]) .build() .expect("should succeed to build schema"); diff --git a/common_types/src/tests.rs b/common_types/src/tests.rs index 666a1a6c91..29a442fdb9 100644 --- a/common_types/src/tests.rs +++ b/common_types/src/tests.rs @@ -73,11 +73,13 @@ fn base_schema_builder() -> schema::Builder { .expect("should succeed build column schema"), ) .unwrap() + .primary_key_indexes(vec![0, 1]) } fn default_value_schema_builder() -> schema::Builder { schema::Builder::new() .auto_increment_column_id(true) + .primary_key_indexes(vec![0, 1]) .add_key_column( column_schema::Builder::new("key1".to_string(), DatumKind::Varbinary) .build() @@ -233,7 +235,7 @@ pub fn build_schema_for_cpu() -> Schema { ) .unwrap(); - builder.build().unwrap() + builder.primary_key_indexes(vec![0, 1]).build().unwrap() } #[allow(clippy::too_many_arguments)] diff --git a/components/codec/src/columnar/mod.rs b/components/codec/src/columnar/mod.rs index d15604f579..46bcd3163e 100644 --- a/components/codec/src/columnar/mod.rs +++ b/components/codec/src/columnar/mod.rs @@ -88,6 +88,9 @@ pub enum Error { #[snafu(display("Bytes is not enough, length:{len}.\nBacktrace:\n{backtrace}"))] NotEnoughBytes { len: usize, backtrace: Backtrace }, + + #[snafu(display("Number operation overflowed, msg:{msg}.\nBacktrace:\n{backtrace}"))] + Overflow { msg: String, backtrace: Backtrace }, } define_result!(Error); @@ -117,7 +120,7 @@ trait ValuesEncoder { /// The decode context for decoding column. pub struct DecodeContext<'a> { /// Buffer for reuse during decoding. - buf: &'a mut Vec, + pub buf: &'a mut Vec, } /// The trait bound on the decoders for different types. @@ -258,11 +261,9 @@ impl ColumnarEncoder { let enc = ValuesEncoderImpl::default(); let data_size = match hint.datum_kind { DatumKind::Null => 0, - DatumKind::Timestamp => enc.estimated_encoded_size( - datums - .clone() - .filter_map(|v| v.as_timestamp().map(|v| v.as_i64())), - ), + DatumKind::Timestamp => { + enc.estimated_encoded_size(datums.clone().filter_map(|v| v.as_timestamp())) + } DatumKind::Double => { enc.estimated_encoded_size(datums.clone().filter_map(|v| v.as_f64())) } @@ -305,7 +306,9 @@ impl ColumnarEncoder { DatumKind::Date => { enc.estimated_encoded_size(datums.clone().filter_map(|v| v.as_date_i32())) } - DatumKind::Time => todo!(), + DatumKind::Time => { + enc.estimated_encoded_size(datums.clone().filter_map(|v| v.as_timestamp())) + } }; Self::header_size() + bit_set_size + data_size @@ -321,10 +324,7 @@ impl ColumnarEncoder { }; match datum_kind { DatumKind::Null => Ok(()), - DatumKind::Timestamp => enc.encode( - buf, - datums.filter_map(|v| v.as_timestamp().map(|v| v.as_i64())), - ), + DatumKind::Timestamp => enc.encode(buf, datums.filter_map(|v| v.as_timestamp())), DatumKind::Double => enc.encode(buf, datums.filter_map(|v| v.as_f64())), DatumKind::Float => enc.encode(buf, datums.filter_map(|v| v.as_f32())), DatumKind::Varbinary => enc.encode(buf, datums.filter_map(|v| v.into_bytes())), @@ -342,7 +342,7 @@ impl ColumnarEncoder { DatumKind::Int8 => enc.encode(buf, datums.filter_map(|v| v.as_i8())), DatumKind::Boolean => enc.encode(buf, datums.filter_map(|v| v.as_bool())), DatumKind::Date => enc.encode(buf, datums.filter_map(|v| v.as_date_i32())), - DatumKind::Time => todo!(), + DatumKind::Time => enc.encode(buf, datums.filter_map(|v| v.as_timestamp())), } } } @@ -445,8 +445,8 @@ impl ColumnarDecoder { match datum_kind { DatumKind::Null => Ok(()), DatumKind::Timestamp => { - let with_i64 = |v| f(Datum::from(Timestamp::new(v))); - ValuesDecoderImpl.decode(ctx, buf, with_i64) + let with_timestamp = |v: Timestamp| f(Datum::from(v)); + ValuesDecoderImpl.decode(ctx, buf, with_timestamp) } DatumKind::Double => { let with_float = |v: f64| f(Datum::from(v)); @@ -531,7 +531,10 @@ impl ColumnarDecoder { }; ValuesDecoderImpl.decode(ctx, buf, with_i32) } - DatumKind::Time => todo!(), + DatumKind::Time => { + let with_timestamp = |v: Timestamp| f(Datum::Time(v.as_i64())); + ValuesDecoderImpl.decode(ctx, buf, with_timestamp) + } } } } @@ -609,6 +612,26 @@ mod tests { ); } + #[test] + fn test_with_empty_datums() { + check_encode_end_decode(1, vec![], DatumKind::Null); + check_encode_end_decode(1, vec![], DatumKind::Timestamp); + check_encode_end_decode(1, vec![], DatumKind::Double); + check_encode_end_decode(1, vec![], DatumKind::Float); + check_encode_end_decode(1, vec![], DatumKind::Varbinary); + check_encode_end_decode(1, vec![], DatumKind::String); + check_encode_end_decode(1, vec![], DatumKind::UInt64); + check_encode_end_decode(1, vec![], DatumKind::UInt32); + check_encode_end_decode(1, vec![], DatumKind::UInt8); + check_encode_end_decode(1, vec![], DatumKind::Int64); + check_encode_end_decode(1, vec![], DatumKind::Int32); + check_encode_end_decode(1, vec![], DatumKind::Int16); + check_encode_end_decode(1, vec![], DatumKind::Int8); + check_encode_end_decode(1, vec![], DatumKind::Boolean); + check_encode_end_decode(1, vec![], DatumKind::Date); + check_encode_end_decode(1, vec![], DatumKind::Time); + } + #[test] fn test_i32_with_null() { let datums = vec![ @@ -680,6 +703,72 @@ mod tests { check_encode_end_decode(10, datums, DatumKind::Int64); } + #[test] + fn test_u64() { + let datums = vec![ + Datum::from(10u64), + Datum::from(1u64), + Datum::from(2u64), + Datum::from(18u64), + Datum::from(38u64), + Datum::from(48u64), + Datum::from(80u64), + Datum::from(81u64), + Datum::from(82u64), + ]; + + check_encode_end_decode(10, datums, DatumKind::UInt64); + } + + #[test] + fn test_timestamp() { + let datums = vec![ + Datum::from(Timestamp::new(-10)), + Datum::from(Timestamp::new(10)), + Datum::from(Timestamp::new(1024)), + Datum::from(Timestamp::new(1024)), + Datum::from(Timestamp::new(1025)), + ]; + + check_encode_end_decode(10, datums, DatumKind::Timestamp); + } + + #[test] + fn test_time() { + let datums = vec![ + Datum::Time(-10), + Datum::Time(10), + Datum::Time(1024), + Datum::Time(1024), + Datum::Time(1025), + ]; + + check_encode_end_decode(10, datums, DatumKind::Time); + } + + #[test] + fn test_overflow_timestamp() { + let datums = vec![ + Datum::from(Timestamp::new(i64::MIN)), + Datum::from(Timestamp::new(10)), + Datum::from(Timestamp::new(1024)), + Datum::from(Timestamp::new(1024)), + Datum::from(Timestamp::new(1025)), + ]; + + let encoder = ColumnarEncoder::new(0, 256); + let views = datums.iter().map(|v| v.as_view()); + let mut hint = EncodeHint { + num_nulls: None, + num_datums: None, + datum_kind: DatumKind::Timestamp, + }; + + let mut buf = Vec::new(); + let enc_res = encoder.encode(&mut buf, views, &mut hint); + assert!(enc_res.is_err()); + } + #[test] fn test_string() { let datums = vec![ diff --git a/components/codec/src/columnar/number.rs b/components/codec/src/columnar/number.rs index 1ed85746ac..e9c461860c 100644 --- a/components/codec/src/columnar/number.rs +++ b/components/codec/src/columnar/number.rs @@ -118,6 +118,8 @@ impl ValuesEncoder for ValuesEncoderImpl { B: BufMut, I: Iterator, { + buf.put_u8(VERSION); + for v in values { varint::encode_uvarint(buf, v).context(Varint)?; } diff --git a/components/codec/src/columnar/timestamp.rs b/components/codec/src/columnar/timestamp.rs index eab1a8bc35..9a6efd3275 100644 --- a/components/codec/src/columnar/timestamp.rs +++ b/components/codec/src/columnar/timestamp.rs @@ -11,3 +11,94 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. + +use common_types::time::Timestamp; +use snafu::{ensure, OptionExt, ResultExt}; + +use super::{DecodeContext, Overflow, Result, ValuesDecoder, ValuesDecoderImpl, Varint}; +use crate::{ + columnar::{InvalidVersion, ValuesEncoder, ValuesEncoderImpl}, + consts::MAX_VARINT_BYTES, + varint, +}; + +/// The layout for the timestamp values: +/// ```plaintext +/// +-------------+----------------------+--------+ +/// | version(u8) | first_timestamp(i64) | deltas | +/// +-------------+----------------------+--------+ +/// ``` +/// +/// This encoding assume the timestamps are have very small differences between +/// each other, so we just store the deltas from the first timestamp in varint. +struct Encoding; + +impl Encoding { + const VERSION: u8 = 0; + const VERSION_SIZE: usize = 1; +} + +impl ValuesEncoder for ValuesEncoderImpl { + fn encode(&self, buf: &mut B, mut values: I) -> Result<()> + where + B: bytes_ext::BufMut, + I: Iterator + Clone, + { + buf.put_u8(Encoding::VERSION); + + let first_ts = match values.next() { + Some(v) => v.as_i64(), + None => return Ok(()), + }; + + buf.put_i64(first_ts); + + for value in values { + let ts = value.as_i64(); + let delta = ts.checked_sub(first_ts).with_context(|| Overflow { + msg: format!("first timestamp:{ts}, current timestamp:{first_ts}"), + })?; + varint::encode_varint(buf, delta).context(Varint)?; + } + + Ok(()) + } + + fn estimated_encoded_size(&self, values: I) -> usize + where + I: Iterator, + { + let (lower, higher) = values.size_hint(); + let num = lower.max(higher.unwrap_or_default()); + num * MAX_VARINT_BYTES + Encoding::VERSION_SIZE + } +} + +impl ValuesDecoder for ValuesDecoderImpl { + fn decode(&self, _ctx: DecodeContext<'_>, buf: &mut B, mut f: F) -> Result<()> + where + B: bytes_ext::Buf, + F: FnMut(Timestamp) -> Result<()>, + { + let version = buf.get_u8(); + + ensure!(version == Encoding::VERSION, InvalidVersion { version }); + + if buf.remaining() == 0 { + return Ok(()); + } + + let first_ts = buf.get_i64(); + f(Timestamp::new(first_ts))?; + + while buf.remaining() > 0 { + let delta = varint::decode_varint(buf).context(Varint)?; + let ts = first_ts.checked_add(delta).with_context(|| Overflow { + msg: format!("first timestamp:{first_ts}, delta:{delta}"), + })?; + f(Timestamp::new(ts))?; + } + + Ok(()) + } +} diff --git a/integration_tests/cases/env/local/ddl/sampling-primary-key.result b/integration_tests/cases/env/local/ddl/sampling-primary-key.result index 27e8da08c7..0324b1fc1b 100644 --- a/integration_tests/cases/env/local/ddl/sampling-primary-key.result +++ b/integration_tests/cases/env/local/ddl/sampling-primary-key.result @@ -8,7 +8,7 @@ CREATE TABLE `sampling_primary_key_table` ( v3 double, v5 double, name string TAG, - value int64 NOT NULL, + myVALUE int64 NOT NULL, t timestamp NOT NULL, timestamp KEY (t)) ENGINE = Analytic WITH ( update_mode='append', @@ -20,10 +20,10 @@ affected_rows: 0 show create table `sampling_primary_key_table`; Table,Create Table, -String("sampling_primary_key_table"),String("CREATE TABLE `sampling_primary_key_table` (`tsid` uint64 NOT NULL, `t` timestamp NOT NULL, `v1` double, `v2` double, `v3` double, `v5` double, `name` string TAG, `value` bigint NOT NULL, PRIMARY KEY(tsid,t), TIMESTAMP KEY(t)) ENGINE=Analytic WITH(arena_block_size='2097152', compaction_strategy='default', compression='ZSTD', enable_ttl='false', memtable_type='skiplist', num_rows_per_row_group='8192', segment_duration='', storage_format='AUTO', ttl='7d', update_mode='APPEND', write_buffer_size='33554432')"), +String("sampling_primary_key_table"),String("CREATE TABLE `sampling_primary_key_table` (`tsid` uint64 NOT NULL, `t` timestamp NOT NULL, `v1` double, `v2` double, `v3` double, `v5` double, `name` string TAG, `myVALUE` bigint NOT NULL, PRIMARY KEY(tsid,t), TIMESTAMP KEY(t)) ENGINE=Analytic WITH(arena_block_size='2097152', compaction_strategy='default', compression='ZSTD', enable_ttl='false', memtable_type='skiplist', num_rows_per_row_group='8192', segment_duration='', storage_format='AUTO', ttl='7d', update_mode='APPEND', write_buffer_size='33554432')"), -INSERT INTO `sampling_primary_key_table` (t, name, value) +INSERT INTO `sampling_primary_key_table` (t, name, myVALUE) VALUES (1695348000000, "ceresdb2", 200), (1695348000005, "ceresdb2", 100), @@ -32,12 +32,30 @@ INSERT INTO `sampling_primary_key_table` (t, name, value) affected_rows: 4 +select * from `sampling_primary_key_table`; + +tsid,t,v1,v2,v3,v5,name,myVALUE, +UInt64(5478297384049724685),Timestamp(1695348000000),Double(0.0),Double(0.0),Double(0.0),Double(0.0),String("ceresdb2"),Int64(200), +UInt64(5478297384049724685),Timestamp(1695348000005),Double(0.0),Double(0.0),Double(0.0),Double(0.0),String("ceresdb2"),Int64(100), +UInt64(9680600349107584624),Timestamp(1695348000001),Double(0.0),Double(0.0),Double(0.0),Double(0.0),String("ceresdb1"),Int64(100), +UInt64(13753293625875895842),Timestamp(1695348000003),Double(0.0),Double(0.0),Double(0.0),Double(0.0),String("ceresdb3"),Int64(200), + + -- After flush, its primary key should changed. -- SQLNESS ARG pre_cmd=flush show create table `sampling_primary_key_table`; Table,Create Table, -String("sampling_primary_key_table"),String("CREATE TABLE `sampling_primary_key_table` (`tsid` uint64 NOT NULL, `t` timestamp NOT NULL, `v1` double, `v2` double, `v3` double, `v5` double, `name` string TAG, `value` bigint NOT NULL, PRIMARY KEY(value,name,tsid,t), TIMESTAMP KEY(t)) ENGINE=Analytic WITH(arena_block_size='2097152', compaction_strategy='default', compression='ZSTD', enable_ttl='false', memtable_type='skiplist', num_rows_per_row_group='8192', segment_duration='2h', storage_format='AUTO', ttl='7d', update_mode='APPEND', write_buffer_size='33554432')"), +String("sampling_primary_key_table"),String("CREATE TABLE `sampling_primary_key_table` (`tsid` uint64 NOT NULL, `t` timestamp NOT NULL, `v1` double, `v2` double, `v3` double, `v5` double, `name` string TAG, `myVALUE` bigint NOT NULL, PRIMARY KEY(myVALUE,name,tsid,t), TIMESTAMP KEY(t)) ENGINE=Analytic WITH(arena_block_size='2097152', compaction_strategy='default', compression='ZSTD', enable_ttl='false', memtable_type='skiplist', num_rows_per_row_group='8192', segment_duration='2h', storage_format='AUTO', ttl='7d', update_mode='APPEND', write_buffer_size='33554432')"), + + +select * from `sampling_primary_key_table`; + +tsid,t,v1,v2,v3,v5,name,myVALUE, +UInt64(9680600349107584624),Timestamp(1695348000001),Double(0.0),Double(0.0),Double(0.0),Double(0.0),String("ceresdb1"),Int64(100), +UInt64(5478297384049724685),Timestamp(1695348000005),Double(0.0),Double(0.0),Double(0.0),Double(0.0),String("ceresdb2"),Int64(100), +UInt64(5478297384049724685),Timestamp(1695348000000),Double(0.0),Double(0.0),Double(0.0),Double(0.0),String("ceresdb2"),Int64(200), +UInt64(13753293625875895842),Timestamp(1695348000003),Double(0.0),Double(0.0),Double(0.0),Double(0.0),String("ceresdb3"),Int64(200), DROP TABLE IF EXISTS `sampling_primary_key_table`; diff --git a/integration_tests/cases/env/local/ddl/sampling-primary-key.sql b/integration_tests/cases/env/local/ddl/sampling-primary-key.sql index 2536e721b1..4c1612ee67 100644 --- a/integration_tests/cases/env/local/ddl/sampling-primary-key.sql +++ b/integration_tests/cases/env/local/ddl/sampling-primary-key.sql @@ -7,7 +7,7 @@ CREATE TABLE `sampling_primary_key_table` ( v3 double, v5 double, name string TAG, - value int64 NOT NULL, + myVALUE int64 NOT NULL, t timestamp NOT NULL, timestamp KEY (t)) ENGINE = Analytic WITH ( update_mode='append', @@ -16,15 +16,19 @@ CREATE TABLE `sampling_primary_key_table` ( show create table `sampling_primary_key_table`; -INSERT INTO `sampling_primary_key_table` (t, name, value) +INSERT INTO `sampling_primary_key_table` (t, name, myVALUE) VALUES (1695348000000, "ceresdb2", 200), (1695348000005, "ceresdb2", 100), (1695348000001, "ceresdb1", 100), (1695348000003, "ceresdb3", 200); +select * from `sampling_primary_key_table`; + -- After flush, its primary key should changed. -- SQLNESS ARG pre_cmd=flush show create table `sampling_primary_key_table`; +select * from `sampling_primary_key_table`; + DROP TABLE IF EXISTS `sampling_primary_key_table`; diff --git a/interpreters/src/alter_table.rs b/interpreters/src/alter_table.rs index f3893442e8..4ce0aedabf 100644 --- a/interpreters/src/alter_table.rs +++ b/interpreters/src/alter_table.rs @@ -94,6 +94,7 @@ fn build_new_schema(current_schema: &Schema, column_schemas: Vec) let mut builder = schema::Builder::with_capacity(current_schema.num_columns() + column_schemas.len()) + .primary_key_indexes(current_schema.primary_key_indexes().to_vec()) // Increment the schema version. .version(current_version + 1); for (idx, column) in current_schema.columns().iter().enumerate() { diff --git a/proxy/src/grpc/prom_query.rs b/proxy/src/grpc/prom_query.rs index 375d9ca25d..13f18865e4 100644 --- a/proxy/src/grpc/prom_query.rs +++ b/proxy/src/grpc/prom_query.rs @@ -390,6 +390,7 @@ mod tests { .unwrap(), ) .unwrap() + .primary_key_indexes(vec![0, 1]) .build() .unwrap() } diff --git a/proxy/src/http/prom.rs b/proxy/src/http/prom.rs index b5c9565822..b84107124a 100644 --- a/proxy/src/http/prom.rs +++ b/proxy/src/http/prom.rs @@ -605,6 +605,7 @@ mod tests { .unwrap(), ) .unwrap() + .primary_key_indexes(vec![0, 1]) .build() .unwrap() } diff --git a/proxy/src/influxdb/types.rs b/proxy/src/influxdb/types.rs index bad6af396c..6a9a4acb87 100644 --- a/proxy/src/influxdb/types.rs +++ b/proxy/src/influxdb/types.rs @@ -796,6 +796,7 @@ mod tests { .expect("should succeed build column schema"), ) .unwrap() + .primary_key_indexes(vec![0]) .build() .unwrap(); diff --git a/proxy/src/write.rs b/proxy/src/write.rs index d1bc231ebf..2f07f43d04 100644 --- a/proxy/src/write.rs +++ b/proxy/src/write.rs @@ -1112,6 +1112,7 @@ mod test { .unwrap(), ) .unwrap() + .primary_key_indexes(vec![0, 1, 2]) .build() .unwrap() } diff --git a/query_frontend/src/planner.rs b/query_frontend/src/planner.rs index f8a889d5a3..9245c2ca36 100644 --- a/query_frontend/src/planner.rs +++ b/query_frontend/src/planner.rs @@ -534,7 +534,10 @@ pub fn build_schema_from_write_table_request( .context(BuildTableSchema {})?; } - schema_builder.build().context(BuildTableSchema {}) + schema_builder + .primary_key_indexes(vec![0, 1]) + .build() + .context(BuildTableSchema {}) } fn ensure_data_type_compatible( @@ -647,6 +650,10 @@ impl<'a, P: MetaProvider> PlannerDelegate<'a, P> { schema::Builder::with_capacity(columns_by_name.len()).auto_increment_column_id(true); // Collect the key columns. + // TODO: Here we put key column in front of all columns, this may change column + // order defined by users. + let primary_key_indexes = (0..primary_key_columns.len()).collect(); + schema_builder = schema_builder.primary_key_indexes(primary_key_indexes); for key_col in primary_key_columns { let col_name = key_col.value.as_str(); let col = columns_by_name diff --git a/src/wal/src/kv_encoder.rs b/src/wal/src/kv_encoder.rs index 4ee243d519..81f9d36319 100644 --- a/src/wal/src/kv_encoder.rs +++ b/src/wal/src/kv_encoder.rs @@ -562,18 +562,16 @@ impl LogBatchEncoder { /// Consume LogBatchEncoder and encode raw payload batch to LogWriteBatch. /// Note: To build payload from raw payload in `encode_batch`, raw payload /// need implement From trait. - pub fn encode_batch<'a, P: Payload, I>( - self, - raw_payload_batch: &'a [I], - ) -> manager::Result + pub fn encode_batch(self, raw_payloads: I) -> manager::Result where - &'a I: Into

, + I: Iterator, + P: Payload, { let mut write_batch = LogWriteBatch::new(self.location); let mut buf = BytesMut::new(); - for raw_payload in raw_payload_batch.iter() { + for raw_payload in raw_payloads { self.log_encoding - .encode_value(&mut buf, &raw_payload.into()) + .encode_value(&mut buf, &raw_payload) .box_err() .context(Encoding)?; @@ -750,7 +748,7 @@ mod tests { use super::*; use crate::{ kv_encoder::CommonLogKey, - log_batch::{MemoryPayload, MemoryPayloadDecoder, PayloadDecoder}, + log_batch::{MemoryPayload, MemoryPayloadDecoder, PayloadDecodeContext, PayloadDecoder}, }; #[test] @@ -777,7 +775,9 @@ mod tests { encoding.encode_value(&mut buf, &payload).unwrap(); let mut value = encoding.decode_value(&buf).unwrap(); - let decoded_value = decoder.decode(&mut value).unwrap(); + let decoded_value = decoder + .decode(&PayloadDecodeContext::default(), &mut value) + .unwrap(); assert_eq!(payload, decoded_value); } diff --git a/src/wal/src/lib.rs b/src/wal/src/lib.rs index c9eb0e1962..134a4048cf 100644 --- a/src/wal/src/lib.rs +++ b/src/wal/src/lib.rs @@ -14,6 +14,8 @@ //! Write Ahead Log +#![feature(trait_alias)] + pub mod config; pub mod kv_encoder; pub mod log_batch; diff --git a/src/wal/src/log_batch.rs b/src/wal/src/log_batch.rs index 44abf8a14d..623e0af95e 100644 --- a/src/wal/src/log_batch.rs +++ b/src/wal/src/log_batch.rs @@ -107,11 +107,21 @@ impl LogWriteBatch { } } +/// The context to decode payload. +#[derive(Debug, Default, Clone)] +pub struct PayloadDecodeContext { + pub table_id: TableId, +} + pub trait PayloadDecoder: Send + Sync { type Error: std::error::Error + Send + Sync + 'static; type Target: Send + Sync; /// Decode `Target` from the `bytes`. - fn decode(&self, buf: &mut B) -> Result; + fn decode( + &self, + ctx: &PayloadDecodeContext, + buf: &mut B, + ) -> Result; } pub struct MemoryPayloadDecoder; @@ -120,7 +130,11 @@ impl PayloadDecoder for MemoryPayloadDecoder { type Error = Error; type Target = MemoryPayload; - fn decode(&self, buf: &mut B) -> Result { + fn decode( + &self, + _ctx: &PayloadDecodeContext, + buf: &mut B, + ) -> Result { let val = buf.try_get_u32().expect("should succeed to read u32"); Ok(MemoryPayload { val }) } diff --git a/src/wal/src/manager.rs b/src/wal/src/manager.rs index 113b42061d..78404cf640 100644 --- a/src/wal/src/manager.rs +++ b/src/wal/src/manager.rs @@ -29,10 +29,12 @@ use snafu::ResultExt; use crate::{ config::StorageConfig, - log_batch::{LogEntry, LogWriteBatch, PayloadDecoder}, + log_batch::{LogEntry, LogWriteBatch, PayloadDecodeContext, PayloadDecoder}, metrics::WAL_WRITE_BYTES_HISTOGRAM, }; +pub trait TableFilter = Fn(TableId) -> bool; + pub mod error { use generic_error::GenericError; use macros::define_result; @@ -398,24 +400,36 @@ impl BatchLogIteratorAdapter { } } - async fn simulated_async_next( + async fn simulated_async_next( &mut self, decoder: D, + filter: F, runtime: Arc, sync_iter: Box, mut buffer: VecDeque>, - ) -> Result<(VecDeque>, Option)> { + ) -> Result<(VecDeque>, Option)> + where + D: PayloadDecoder + Send + 'static, + F: TableFilter + Send + 'static, + { buffer.clear(); let mut iter = sync_iter; let batch_size = self.batch_size; let (log_entries, iter_opt) = runtime .spawn_blocking(move || { - for _ in 0..batch_size { + while buffer.len() < batch_size { if let Some(raw_log_entry) = iter.next_log_entry()? { + if !filter(raw_log_entry.table_id) { + continue; + } + let mut raw_payload = raw_log_entry.payload; + let ctx = PayloadDecodeContext { + table_id: raw_log_entry.table_id, + }; let payload = decoder - .decode(&mut raw_payload) + .decode(&ctx, &mut raw_payload) .box_err() .context(error::Decoding)?; let log_entry = LogEntry { @@ -440,20 +454,31 @@ impl BatchLogIteratorAdapter { } } - async fn async_next( + async fn async_next( &mut self, decoder: D, + filter: F, async_iter: Box, mut buffer: VecDeque>, - ) -> Result<(VecDeque>, Option)> { + ) -> Result<(VecDeque>, Option)> + where + D: PayloadDecoder + Send + 'static, + F: TableFilter + Send + 'static, + { buffer.clear(); let mut async_iter = async_iter; - for _ in 0..self.batch_size { + while buffer.len() < self.batch_size { if let Some(raw_log_entry) = async_iter.next_log_entry().await? { + if !filter(raw_log_entry.table_id) { + continue; + } let mut raw_payload = raw_log_entry.payload; + let ctx = PayloadDecodeContext { + table_id: raw_log_entry.table_id, + }; let payload = decoder - .decode(&mut raw_payload) + .decode(&ctx, &mut raw_payload) .box_err() .context(error::Decoding)?; let log_entry = LogEntry { @@ -470,11 +495,21 @@ impl BatchLogIteratorAdapter { Ok((buffer, Some(LogIterator::Async(async_iter)))) } - pub async fn next_log_entries( + /// Read the next batch of wal logs. + /// + /// The `filter` will be used to determine whether a log is needed anymore, + /// so unnecessary decoding work can be avoided. + /// NOTE: the logs will be accepted if the return result of filter is true. + pub async fn next_log_entries( &mut self, decoder: D, + filter: F, buffer: VecDeque>, - ) -> Result>> { + ) -> Result>> + where + D: PayloadDecoder + Send + 'static, + F: TableFilter + Send + 'static, + { if self.iter.is_none() { return Ok(VecDeque::new()); } @@ -482,10 +517,10 @@ impl BatchLogIteratorAdapter { let iter = self.iter.take().unwrap(); let (log_entries, iter) = match iter { LogIterator::Sync { iter, runtime } => { - self.simulated_async_next(decoder, runtime, iter, buffer) + self.simulated_async_next(decoder, filter, runtime, iter, buffer) .await? } - LogIterator::Async(iter) => self.async_next(decoder, iter, buffer).await?, + LogIterator::Async(iter) => self.async_next(decoder, filter, iter, buffer).await?, }; self.iter = iter; @@ -607,7 +642,7 @@ mod tests { loop { buffer = iter - .next_log_entries(MemoryPayloadDecoder, buffer) + .next_log_entries(MemoryPayloadDecoder, |_| true, buffer) .await .unwrap(); for entry in buffer.iter() { @@ -631,7 +666,7 @@ mod tests { let mut buffer = VecDeque::with_capacity(3); loop { buffer = iter - .next_log_entries(MemoryPayloadDecoder, buffer) + .next_log_entries(MemoryPayloadDecoder, |_| true, buffer) .await .unwrap(); for entry in buffer.iter() { diff --git a/src/wal/src/message_queue_impl/region.rs b/src/wal/src/message_queue_impl/region.rs index ff2037726f..7008f3e761 100644 --- a/src/wal/src/message_queue_impl/region.rs +++ b/src/wal/src/message_queue_impl/region.rs @@ -957,7 +957,7 @@ mod tests { }; use crate::{ - log_batch::PayloadDecoder, + log_batch::{PayloadDecodeContext, PayloadDecoder}, manager::{ReadContext, WriteContext}, message_queue_impl::{encoding::MetaEncoding, test_util::TestContext}, }; @@ -1033,9 +1033,12 @@ mod tests { .unwrap(); while let Some(log_entry) = msg_iter.next_log_entry().await.unwrap() { let mut payload = log_entry.payload; + let ctx = PayloadDecodeContext { + table_id: log_entry.table_id, + }; let decoded_payload = test_context .test_payload_encoder - .decode(&mut payload) + .decode(&ctx, &mut payload) .unwrap(); mixed_decoded_res.push(decoded_payload.val); } diff --git a/src/wal/src/message_queue_impl/test_util.rs b/src/wal/src/message_queue_impl/test_util.rs index 73bf8278c9..76f9edb8b2 100644 --- a/src/wal/src/message_queue_impl/test_util.rs +++ b/src/wal/src/message_queue_impl/test_util.rs @@ -71,9 +71,8 @@ impl TestContext { .map(|(table_id, data)| { let log_batch_encoder = LogBatchEncoder::create(WalLocation::new(region_id, table_id)); - let log_write_batch = log_batch_encoder - .encode_batch::(&data) - .unwrap(); + let payloads = data.iter().map(|v| MemoryPayload { val: *v }); + let log_write_batch = log_batch_encoder.encode_batch(payloads).unwrap(); (table_id, TestDataOfTable::new(data, log_write_batch)) }) diff --git a/src/wal/src/table_kv_impl/namespace.rs b/src/wal/src/table_kv_impl/namespace.rs index bb61b11cc5..5793139369 100644 --- a/src/wal/src/table_kv_impl/namespace.rs +++ b/src/wal/src/table_kv_impl/namespace.rs @@ -1640,7 +1640,7 @@ mod tests { use super::*; use crate::{ kv_encoder::{LogBatchEncoder, LogEncoding}, - log_batch::{MemoryPayload, MemoryPayloadDecoder, PayloadDecoder}, + log_batch::{MemoryPayload, MemoryPayloadDecoder, PayloadDecodeContext, PayloadDecoder}, table_kv_impl::consts, }; @@ -2001,7 +2001,10 @@ mod tests { while iter.valid() { let decoded_key = log_encoding.decode_key(iter.key()).unwrap(); let mut raw_value = log_encoding.decode_value(iter.value()).unwrap(); - let decoded_value = decoder.decode(&mut raw_value).unwrap(); + let ctx = PayloadDecodeContext { + table_id: region_id, + }; + let decoded_value = decoder.decode(&ctx, &mut raw_value).unwrap(); key_values.push((decoded_key.1, decoded_value)); iter.next().unwrap(); @@ -2025,7 +2028,7 @@ mod tests { let log_entries = (start_sequence..end_sequence).collect::>(); let wal_encoder = LogBatchEncoder::create(location); let log_batch = wal_encoder - .encode_batch::(&log_entries) + .encode_batch(log_entries.iter().map(|v| MemoryPayload { val: *v })) .expect("should succeed to encode payload batch"); let write_ctx = manager::WriteContext::default(); namespace diff --git a/src/wal/tests/read_write.rs b/src/wal/tests/read_write.rs index a2f6181283..49d4aea662 100644 --- a/src/wal/tests/read_write.rs +++ b/src/wal/tests/read_write.rs @@ -1021,11 +1021,11 @@ impl TestEnv { start: u32, end: u32, ) -> (Vec, LogWriteBatch) { - let log_entries = (start..end).collect::>(); + let log_entries = start..end; let log_batch_encoder = LogBatchEncoder::create(location); let log_batch = log_batch_encoder - .encode_batch::(&log_entries) + .encode_batch(log_entries.map(|v| MemoryPayload { val: v })) .expect("should succeed to encode payloads"); let payload_batch = self.build_payload_batch(start, end); @@ -1047,7 +1047,7 @@ impl TestEnv { loop { let dec = MemoryPayloadDecoder; let log_entries = iter - .next_log_entries(dec, VecDeque::new()) + .next_log_entries(dec, |_| true, VecDeque::new()) .await .expect("should succeed to fetch next log entry"); if log_entries.is_empty() { diff --git a/system_catalog/src/sys_catalog_table.rs b/system_catalog/src/sys_catalog_table.rs index 53941bb0d9..dcef633ca6 100644 --- a/system_catalog/src/sys_catalog_table.rs +++ b/system_catalog/src/sys_catalog_table.rs @@ -724,6 +724,7 @@ fn new_sys_catalog_schema() -> schema::Result { .build() .expect("Should succeed to build column schema of catalog"), )? + .primary_key_indexes(vec![0, 1]) .build() } diff --git a/system_catalog/src/tables.rs b/system_catalog/src/tables.rs index f6f1d2d526..179051399d 100644 --- a/system_catalog/src/tables.rs +++ b/system_catalog/src/tables.rs @@ -91,6 +91,7 @@ fn tables_schema() -> Schema { .unwrap(), ) .unwrap() + .primary_key_indexes(vec![0, 1, 2]) .build() .unwrap() } diff --git a/table_engine/src/partition/rule/df_adapter/mod.rs b/table_engine/src/partition/rule/df_adapter/mod.rs index bb423f956f..69bee374ea 100644 --- a/table_engine/src/partition/rule/df_adapter/mod.rs +++ b/table_engine/src/partition/rule/df_adapter/mod.rs @@ -286,6 +286,7 @@ mod tests { .expect("should succeed build column schema"), ) .unwrap() + .primary_key_indexes(vec![0, 1]) .build() .expect("should succeed to build schema") }