From 4860cd43bddec51354f059f45f5bf6c658d3806c Mon Sep 17 00:00:00 2001 From: WEI Xikai Date: Mon, 6 Nov 2023 16:52:11 +0800 Subject: [PATCH 1/5] feat: support write wal logs in columnar format (#1179) ## Rationale After the columnar encoding is supported for wal format, we should adapt such encoding in the wal write procedure. ## Detailed Changes Adapt the columnar encoding in the wal write procedure. And now, both rowwise and columnar encoding in the wal write procedure are supported, which to use depends on the configuration. ## Test Plan Existing tests. The compatibility tests have been designed as: - Write rowwise-encoded wal logs and restart the server to replay it - Write columnar-encoded wal logs and restart the server to replay it --- Cargo.lock | 1 + analytic_engine/Cargo.toml | 1 + analytic_engine/src/instance/mod.rs | 3 +- analytic_engine/src/instance/open.rs | 1 + analytic_engine/src/instance/wal_replayer.rs | 75 +++-- analytic_engine/src/instance/write.rs | 286 +++++++++++-------- analytic_engine/src/lib.rs | 25 ++ analytic_engine/src/manifest/details.rs | 2 +- analytic_engine/src/manifest/meta_edit.rs | 4 +- analytic_engine/src/payload.rs | 106 ++++++- benchmarks/src/wal_write_bench.rs | 5 +- common_types/src/datum.rs | 2 + common_types/src/row/mod.rs | 189 ++++++++---- components/codec/src/columnar/mod.rs | 119 +++++++- components/codec/src/columnar/number.rs | 2 + components/codec/src/columnar/timestamp.rs | 91 ++++++ src/wal/src/kv_encoder.rs | 18 +- src/wal/src/lib.rs | 2 + src/wal/src/log_batch.rs | 18 +- src/wal/src/manager.rs | 65 ++++- src/wal/src/message_queue_impl/region.rs | 7 +- src/wal/src/message_queue_impl/test_util.rs | 5 +- src/wal/src/table_kv_impl/namespace.rs | 9 +- src/wal/tests/read_write.rs | 6 +- 24 files changed, 779 insertions(+), 263 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 03787588f9..98f6bd9a40 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -107,6 +107,7 @@ dependencies = [ "hex", "hyperloglog", "id_allocator", + "itertools 0.10.5", "lazy_static", "logger", "lru 0.7.8", diff --git a/analytic_engine/Cargo.toml b/analytic_engine/Cargo.toml index 46ea69eee0..e575bd465f 100644 --- a/analytic_engine/Cargo.toml +++ b/analytic_engine/Cargo.toml @@ -50,6 +50,7 @@ generic_error = { workspace = true } hex = { workspace = true } hyperloglog = { workspace = true } id_allocator = { workspace = true } +itertools = { workspace = true } lazy_static = { workspace = true } logger = { workspace = true } lru = { workspace = true } diff --git a/analytic_engine/src/instance/mod.rs b/analytic_engine/src/instance/mod.rs index 186f38912c..1ff48cc927 100644 --- a/analytic_engine/src/instance/mod.rs +++ b/analytic_engine/src/instance/mod.rs @@ -61,7 +61,7 @@ use crate::{ metrics::MaybeTableLevelMetrics, }, table::data::{TableDataRef, TableShardInfo}, - RecoverMode, TableOptions, + RecoverMode, TableOptions, WalEncodeConfig, }; #[allow(clippy::enum_variant_names)] @@ -186,6 +186,7 @@ pub struct Instance { pub(crate) scan_options: ScanOptions, pub(crate) iter_options: Option, pub(crate) recover_mode: RecoverMode, + pub(crate) wal_encode: WalEncodeConfig, } impl Instance { diff --git a/analytic_engine/src/instance/open.rs b/analytic_engine/src/instance/open.rs index f6999ec844..c3602ac641 100644 --- a/analytic_engine/src/instance/open.rs +++ b/analytic_engine/src/instance/open.rs @@ -146,6 +146,7 @@ impl Instance { iter_options, scan_options, recover_mode: ctx.config.recover_mode, + wal_encode: ctx.config.wal_encode, }); Ok(instance) diff --git a/analytic_engine/src/instance/wal_replayer.rs b/analytic_engine/src/instance/wal_replayer.rs index ecb5894bd6..23919de453 100644 --- a/analytic_engine/src/instance/wal_replayer.rs +++ b/analytic_engine/src/instance/wal_replayer.rs @@ -18,10 +18,14 @@ use std::{ collections::{HashMap, VecDeque}, fmt::Display, ops::Range, + sync::Arc, }; use async_trait::async_trait; -use common_types::{schema::IndexInWriterSchema, table::ShardId}; +use common_types::{ + schema::{IndexInWriterSchema, Schema}, + table::ShardId, +}; use generic_error::BoxError; use lazy_static::lazy_static; use logger::{debug, error, info, trace, warn}; @@ -44,7 +48,7 @@ use crate::{ serial_executor::TableOpSerialExecutor, write::MemTableWriter, }, - payload::{ReadPayload, WalDecoder}, + payload::{ReadPayload, SingleSchemaProviderAdapter, TableSchemaProvider, WalDecoder}, table::data::TableDataRef, }; @@ -173,7 +177,7 @@ impl Replay for TableBasedReplay { ) -> Result { debug!("Replay wal logs on table mode, context:{context}, tables:{table_datas:?}",); - let mut faileds = HashMap::new(); + let mut failed_tables = HashMap::new(); let read_ctx = ReadContext { batch_size: context.wal_replay_batch_size, ..Default::default() @@ -181,11 +185,11 @@ impl Replay for TableBasedReplay { for table_data in table_datas { let table_id = table_data.id; if let Err(e) = Self::recover_table_logs(context, table_data, &read_ctx).await { - faileds.insert(table_id, e); + failed_tables.insert(table_id, e); } } - Ok(faileds) + Ok(failed_tables) } } @@ -217,9 +221,14 @@ impl TableBasedReplay { loop { // fetch entries to log_entry_buf let _timer = PULL_LOGS_DURATION_HISTOGRAM.start_timer(); - let decoder = WalDecoder; + let adapter = SingleSchemaProviderAdapter { + schema: table_data.schema(), + }; + let decoder = WalDecoder::new(adapter); + // All the logs should belong the table, so no need to check again. + let filter = |_| true; log_entry_buf = log_iter - .next_log_entries(decoder, log_entry_buf) + .next_log_entries(decoder, filter, log_entry_buf) .await .box_err() .context(ReplayWalWithCause { msg: None })?; @@ -257,15 +266,26 @@ impl Replay for RegionBasedReplay { debug!("Replay wal logs on region mode, context:{context}, tables:{table_datas:?}",); // Init all table results to be oks, and modify to errs when failed to replay. - let mut faileds = FailedTables::new(); + let mut failed_tables = FailedTables::new(); let scan_ctx = ScanContext { batch_size: context.wal_replay_batch_size, ..Default::default() }; - Self::replay_region_logs(context, table_datas, &scan_ctx, &mut faileds).await?; + Self::replay_region_logs(context, table_datas, &scan_ctx, &mut failed_tables).await?; + + Ok(failed_tables) + } +} + +#[derive(Clone)] +struct TableSchemaProviderAdapter { + table_datas: Arc>, +} - Ok(faileds) +impl TableSchemaProvider for TableSchemaProviderAdapter { + fn table_schema(&self, table_id: common_types::table::TableId) -> Option { + self.table_datas.get(&table_id).map(|v| v.schema()) } } @@ -280,7 +300,7 @@ impl RegionBasedReplay { context: &ReplayContext, table_datas: &[TableDataRef], scan_ctx: &ScanContext, - faileds: &mut FailedTables, + failed_tables: &mut FailedTables, ) -> Result<()> { // Scan all wal logs of current shard. let scan_req = ScanRequest { @@ -297,6 +317,7 @@ impl RegionBasedReplay { // Lock all related tables. let mut serial_exec_ctxs = HashMap::with_capacity(table_datas.len()); + let mut table_datas_by_id = HashMap::with_capacity(table_datas.len()); for table_data in table_datas { let serial_exec = table_data.serial_exec.lock().await; let serial_exec_ctx = SerialExecContext { @@ -304,14 +325,21 @@ impl RegionBasedReplay { serial_exec, }; serial_exec_ctxs.insert(table_data.id, serial_exec_ctx); + table_datas_by_id.insert(table_data.id.as_u64(), table_data.clone()); } + let table_datas_by_id = Arc::new(table_datas_by_id); + let schema_provider = TableSchemaProviderAdapter { + table_datas: table_datas_by_id.clone(), + }; // Split and replay logs. loop { let _timer = PULL_LOGS_DURATION_HISTOGRAM.start_timer(); - let decoder = WalDecoder; + let decoder = WalDecoder::new(schema_provider.clone()); + let table_datas_for_filter = table_datas_by_id.clone(); + let log_filter = move |log_table_id| table_datas_for_filter.contains_key(&log_table_id); log_entry_buf = log_iter - .next_log_entries(decoder, log_entry_buf) + .next_log_entries(decoder, log_filter, log_entry_buf) .await .box_err() .context(ReplayWalWithCause { msg: None })?; @@ -321,8 +349,13 @@ impl RegionBasedReplay { } let _timer = APPLY_LOGS_DURATION_HISTOGRAM.start_timer(); - Self::replay_single_batch(context, &log_entry_buf, &mut serial_exec_ctxs, faileds) - .await?; + Self::replay_single_batch( + context, + &log_entry_buf, + &mut serial_exec_ctxs, + failed_tables, + ) + .await?; } Ok(()) @@ -332,7 +365,7 @@ impl RegionBasedReplay { context: &ReplayContext, log_batch: &VecDeque>, serial_exec_ctxs: &mut HashMap>, - faileds: &mut FailedTables, + failed_tables: &mut FailedTables, ) -> Result<()> { let mut table_batches = Vec::new(); // TODO: No `group_by` method in `VecDeque`, so implement it manually here... @@ -341,7 +374,7 @@ impl RegionBasedReplay { // TODO: Replay logs of different tables in parallel. for table_batch in table_batches { // Some tables may have failed in previous replay, ignore them. - if faileds.contains_key(&table_batch.table_id) { + if failed_tables.contains_key(&table_batch.table_id) { continue; } @@ -359,7 +392,7 @@ impl RegionBasedReplay { // If occur error, mark this table as failed and store the cause. if let Err(e) = result { - faileds.insert(table_batch.table_id, e); + failed_tables.insert(table_batch.table_id, e); } } } @@ -489,8 +522,8 @@ async fn replay_table_log_entries( let index_in_writer = IndexInWriterSchema::for_same_schema(row_group.schema().num_columns()); let memtable_writer = MemTableWriter::new(table_data.clone(), serial_exec); - let memtable_write_ret = memtable_writer - .write(sequence, &row_group.into(), index_in_writer) + let write_res = memtable_writer + .write(sequence, row_group, index_in_writer) .box_err() .context(ReplayWalWithCause { msg: Some(format!( @@ -498,7 +531,7 @@ async fn replay_table_log_entries( table_data.space_id, table_data.name, table_data.id )), }); - if let Err(e) = memtable_write_ret { + if let Err(e) = write_res { // TODO: find a better way to match this. if e.to_string().contains(crate::memtable::TOO_LARGE_MESSAGE) { // ignore this error diff --git a/analytic_engine/src/instance/write.rs b/analytic_engine/src/instance/write.rs index b2c09fce69..246e5da1ee 100644 --- a/analytic_engine/src/instance/write.rs +++ b/analytic_engine/src/instance/write.rs @@ -14,13 +14,19 @@ //! Write logic of instance +use std::iter; + use bytes_ext::ByteVec; use ceresdbproto::{schema as schema_pb, table_requests}; -use codec::row; +use codec::{ + columnar::{ColumnarEncoder, EncodeHint}, + row, +}; use common_types::{ - row::{RowGroup, RowGroupSlicer}, + row::RowGroup, schema::{IndexInWriterSchema, Schema}, }; +use itertools::Itertools; use logger::{debug, error, info, trace, warn}; use macros::define_result; use smallvec::SmallVec; @@ -28,6 +34,7 @@ use snafu::{ensure, Backtrace, ResultExt, Snafu}; use table_engine::table::WriteRequest; use wal::{ kv_encoder::LogBatchEncoder, + log_batch::Payload, manager::{SequenceNumber, WalLocation, WriteContext}, }; @@ -40,6 +47,7 @@ use crate::{ payload::WritePayload, space::SpaceRef, table::{data::TableDataRef, version::MemTableForWrite}, + WalEncodeConfig, WalEncodeFormat, }; #[derive(Debug, Snafu)] @@ -120,33 +128,98 @@ define_result!(Error); /// Max rows in a write request, must less than [u32::MAX] const MAX_ROWS_TO_WRITE: usize = 10_000_000; +/// The version used for [`table_requests::WriteRequest.version`]. +#[derive(Clone, Copy, Debug)] +pub enum WalEncodeVersion { + RowWise = 0, + Columnar, +} + +impl WalEncodeVersion { + #[inline] + pub fn as_u32(self) -> u32 { + match self { + Self::RowWise => 0, + Self::Columnar => 1, + } + } + + #[inline] + pub fn try_from_u32(v: u32) -> Option { + match v { + 0 => Some(Self::RowWise), + 1 => Some(Self::Columnar), + _ => None, + } + } +} + pub(crate) struct EncodeContext { pub row_group: RowGroup, pub index_in_writer: IndexInWriterSchema, - pub encoded_rows: Vec, } +enum EncodedPayload { + Cols(Vec), + Rows(Vec), +} impl EncodeContext { pub fn new(row_group: RowGroup) -> Self { Self { row_group, index_in_writer: IndexInWriterSchema::default(), - encoded_rows: Vec::new(), } } - pub fn encode_rows(&mut self, table_schema: &Schema) -> Result<()> { + fn encode( + &mut self, + config: &WalEncodeConfig, + table_schema: &Schema, + ) -> Result { + match config.format { + WalEncodeFormat::Columnar => self.encode_cols(config).map(EncodedPayload::Cols), + WalEncodeFormat::RowWise => self.encode_rows(table_schema).map(EncodedPayload::Rows), + } + } + + fn encode_cols(&mut self, config: &WalEncodeConfig) -> Result> { + let row_group_schema = self.row_group.schema(); + let mut encoded_cols = Vec::with_capacity(row_group_schema.num_columns()); + + for col_idx in 0..row_group_schema.num_columns() { + let col_schema = row_group_schema.column(col_idx); + let col_iter = self.row_group.iter_column(col_idx).map(|v| v.as_view()); + let enc = ColumnarEncoder::new( + col_schema.id, + config.num_bytes_compress_threshold.as_byte() as usize, + ); + let mut hint = EncodeHint { + num_nulls: None, + num_datums: Some(self.row_group.num_rows()), + datum_kind: col_schema.data_type, + }; + let sz = enc.estimated_encoded_size(col_iter.clone(), &mut hint); + let mut buf = Vec::with_capacity(sz); + enc.encode(&mut buf, col_iter, &mut hint).unwrap(); + encoded_cols.push(buf); + } + + Ok(encoded_cols) + } + + fn encode_rows(&mut self, table_schema: &Schema) -> Result> { + let mut encoded_rows = Vec::new(); row::encode_row_group_for_wal( &self.row_group, table_schema, &self.index_in_writer, - &mut self.encoded_rows, + &mut encoded_rows, ) .context(EncodeRowGroup)?; - assert_eq!(self.row_group.num_rows(), self.encoded_rows.len()); + assert_eq!(self.row_group.num_rows(), encoded_rows.len()); - Ok(()) + Ok(encoded_rows) } } @@ -160,15 +233,9 @@ struct WriteRowGroupSplitter { max_bytes_per_batch: usize, } -enum SplitResult<'a> { - Splitted { - encoded_batches: Vec>, - row_group_batches: Vec>, - }, - Integrate { - encoded_rows: Vec, - row_group: RowGroupSlicer<'a>, - }, +enum SplitResult { + Splitted { encoded_batches: Vec> }, + Integrate { encoded_rows: Vec }, } impl WriteRowGroupSplitter { @@ -179,34 +246,19 @@ impl WriteRowGroupSplitter { } /// Split the write request into multiple batches. - /// - /// NOTE: The length of the `encoded_rows` should be the same as the number - /// of rows in the `row_group`. - pub fn split<'a>( - &'_ self, - encoded_rows: Vec, - row_group: &'a RowGroup, - ) -> SplitResult<'a> { + pub fn split(&self, encoded_rows: Vec) -> SplitResult { let end_row_indexes = self.compute_batches(&encoded_rows); if end_row_indexes.len() <= 1 { // No need to split. - return SplitResult::Integrate { - encoded_rows, - row_group: RowGroupSlicer::from(row_group), - }; + return SplitResult::Integrate { encoded_rows }; } let mut prev_end_row_index = 0; let mut encoded_batches = Vec::with_capacity(end_row_indexes.len()); - let mut row_group_batches = Vec::with_capacity(end_row_indexes.len()); for end_row_index in &end_row_indexes { let end_row_index = *end_row_index; let curr_batch = Vec::with_capacity(end_row_index - prev_end_row_index); encoded_batches.push(curr_batch); - let row_group_slicer = - RowGroupSlicer::new(prev_end_row_index..end_row_index, row_group); - row_group_batches.push(row_group_slicer); - prev_end_row_index = end_row_index; } @@ -218,10 +270,7 @@ impl WriteRowGroupSplitter { encoded_batches[current_batch_idx].push(encoded_row); } - SplitResult::Splitted { - encoded_batches, - row_group_batches, - } + SplitResult::Splitted { encoded_batches } } /// Compute the end row indexes in the original `encoded_rows` of each @@ -298,7 +347,7 @@ impl<'a> MemTableWriter<'a> { pub fn write( &self, sequence: SequenceNumber, - row_group: &RowGroupSlicer, + row_group: &RowGroup, index_in_writer: IndexInWriterSchema, ) -> Result<()> { let _timer = self.table_data.metrics.start_table_write_memtable_timer(); @@ -372,76 +421,96 @@ impl<'a> Writer<'a> { self.preprocess_write(&mut encode_ctx).await?; - { + let encoded_payload = { let _timer = self.table_data.metrics.start_table_write_encode_timer(); let schema = self.table_data.schema(); - encode_ctx.encode_rows(&schema)?; - } + encode_ctx.encode(&self.instance.wal_encode, &schema)? + }; + + let seq = match encoded_payload { + EncodedPayload::Rows(encoded_rows) => self.write_to_wal_in_rows(encoded_rows).await?, + EncodedPayload::Cols(encoded_cols) => self.write_to_wal_in_cols(encoded_cols).await?, + }; + // Write the row group to the memtable and update the state in the mem. + let table_data = self.table_data.clone(); let EncodeContext { row_group, index_in_writer, - encoded_rows, } = encode_ctx; + self.write_to_mem(&table_data, &row_group, index_in_writer, seq) + .await?; - let table_data = self.table_data.clone(); - let split_res = self.maybe_split_write_request(encoded_rows, &row_group); + Ok(row_group.num_rows()) + } + + async fn write_to_wal_in_rows(&self, encoded_rows: Vec) -> Result { + let split_res = self.maybe_split_write_request(encoded_rows); match split_res { - SplitResult::Integrate { - encoded_rows, - row_group, - } => { - self.write_table_row_group(&table_data, row_group, index_in_writer, encoded_rows) - .await?; + SplitResult::Integrate { encoded_rows } => { + let write_req = self.make_rowwise_write_request(encoded_rows); + let payload = WritePayload::Write(&write_req); + self.write_to_wal(iter::once(payload)).await } - SplitResult::Splitted { - encoded_batches, - row_group_batches, - } => { - for (encoded_rows, row_group) in encoded_batches.into_iter().zip(row_group_batches) - { - self.write_table_row_group( - &table_data, - row_group, - index_in_writer.clone(), - encoded_rows, - ) - .await?; - } + SplitResult::Splitted { encoded_batches } => { + let write_reqs = encoded_batches + .into_iter() + .map(|v| self.make_rowwise_write_request(v)) + .collect_vec(); + + let payload = write_reqs.iter().map(WritePayload::Write); + self.write_to_wal(payload).await } } + } - Ok(row_group.num_rows()) + async fn write_to_wal_in_cols(&self, encoded_cols: Vec) -> Result { + let write_req = table_requests::WriteRequest { + version: WalEncodeVersion::Columnar.as_u32(), + schema: None, + rows: vec![], + cols: encoded_cols, + }; + let payload = WritePayload::Write(&write_req); + + self.write_to_wal(iter::once(payload)).await } - fn maybe_split_write_request<'b>( - &'a self, + fn make_rowwise_write_request( + &self, encoded_rows: Vec, - row_group: &'b RowGroup, - ) -> SplitResult<'b> { + ) -> table_requests::WriteRequest { + table_requests::WriteRequest { + version: WalEncodeVersion::RowWise.as_u32(), + // Use the table schema instead of the schema in request to avoid schema + // mismatch during replaying + schema: Some(schema_pb::TableSchema::from(&self.table_data.schema())), + rows: encoded_rows, + cols: vec![], + } + } + + fn maybe_split_write_request(&self, encoded_rows: Vec) -> SplitResult { if self.instance.max_bytes_per_write_batch.is_none() { - return SplitResult::Integrate { - encoded_rows, - row_group: RowGroupSlicer::from(row_group), - }; + return SplitResult::Integrate { encoded_rows }; } let splitter = WriteRowGroupSplitter::new(self.instance.max_bytes_per_write_batch.unwrap()); - splitter.split(encoded_rows, row_group) + splitter.split(encoded_rows) } - async fn write_table_row_group( + /// Write `row_group` to memtable and update the memory states. + async fn write_to_mem( &mut self, table_data: &TableDataRef, - row_group: RowGroupSlicer<'_>, + row_group: &RowGroup, index_in_writer: IndexInWriterSchema, - encoded_rows: Vec, + sequence: SequenceNumber, ) -> Result<()> { - let sequence = self.write_to_wal(encoded_rows).await?; let memtable_writer = MemTableWriter::new(table_data.clone(), self.serial_exec); memtable_writer - .write(sequence, &row_group, index_in_writer) + .write(sequence, row_group, index_in_writer) .map_err(|e| { error!( "Failed to write to memtable, table:{}, table_id:{}, err:{}", @@ -557,29 +626,22 @@ impl<'a> Writer<'a> { } /// Write log_batch into wal, return the sequence number of log_batch. - async fn write_to_wal(&self, encoded_rows: Vec) -> Result { + async fn write_to_wal(&self, payloads: I) -> Result + where + I: Iterator, + P: Payload, + { let _timer = self.table_data.metrics.start_table_write_wal_timer(); - // Convert into pb - let write_req_pb = table_requests::WriteRequest { - // FIXME: Shall we avoid the magic number here? - version: 0, - // Use the table schema instead of the schema in request to avoid schema - // mismatch during replaying - schema: Some(schema_pb::TableSchema::from(&self.table_data.schema())), - rows: encoded_rows, - cols: Vec::new(), - }; - - // Encode payload - let payload = WritePayload::Write(&write_req_pb); let table_location = self.table_data.table_location(); let wal_location = instance::create_wal_location(table_location.id, table_location.shard_info); let log_batch_encoder = LogBatchEncoder::create(wal_location); - let log_batch = log_batch_encoder.encode(&payload).context(EncodePayloads { - table: &self.table_data.name, - wal_location, - })?; + let log_batch = log_batch_encoder + .encode_batch(payloads) + .context(EncodePayloads { + table: &self.table_data.name, + wal_location, + })?; // Write to wal manager let write_ctx = WriteContext::default(); @@ -736,40 +798,24 @@ mod tests { } }; for (batch_size, sizes, expected_batches) in cases { - let (encoded_rows, row_group) = generate_rows_for_test(sizes.clone()); + let (encoded_rows, _) = generate_rows_for_test(sizes.clone()); let write_row_group_splitter = WriteRowGroupSplitter::new(batch_size); - let split_res = write_row_group_splitter.split(encoded_rows, &row_group); + let split_res = write_row_group_splitter.split(encoded_rows); if expected_batches.is_empty() { assert!(matches!(split_res, SplitResult::Integrate { .. })); } else if expected_batches.len() == 1 { assert!(matches!(split_res, SplitResult::Integrate { .. })); - if let SplitResult::Integrate { - encoded_rows, - row_group, - } = split_res - { + if let SplitResult::Integrate { encoded_rows } = split_res { check_encoded_rows(&encoded_rows, &expected_batches[0]); - assert_eq!(row_group.num_rows(), expected_batches[0].len()); } } else { assert!(matches!(split_res, SplitResult::Splitted { .. })); - if let SplitResult::Splitted { - encoded_batches, - row_group_batches, - } = split_res - { - assert_eq!(encoded_batches.len(), row_group_batches.len()); + if let SplitResult::Splitted { encoded_batches } = split_res { assert_eq!(encoded_batches.len(), expected_batches.len()); - let mut batch_start_index = 0; - for ((encoded_batch, row_group_batch), expected_batch) in encoded_batches - .iter() - .zip(row_group_batches.iter()) - .zip(expected_batches.iter()) + for (encoded_batch, expected_batch) in + encoded_batches.iter().zip(expected_batches.iter()) { check_encoded_rows(encoded_batch, expected_batch); - assert_eq!(row_group_batch.num_rows(), expected_batch.len()); - assert_eq!(row_group_batch.slice_range().start, batch_start_index); - batch_start_index += expected_batch.len(); } } } diff --git a/analytic_engine/src/lib.rs b/analytic_engine/src/lib.rs index ff740d825b..9e8cb1dee2 100644 --- a/analytic_engine/src/lib.rs +++ b/analytic_engine/src/lib.rs @@ -103,6 +103,8 @@ pub struct Config { pub max_bytes_per_write_batch: Option, /// The interval for sampling the memory usage pub mem_usage_sampling_interval: ReadableDuration, + /// The config for log in the wal. + pub wal_encode: WalEncodeConfig, /// Wal storage config /// @@ -135,6 +137,28 @@ pub enum RecoverMode { ShardBased, } +#[derive(Debug, Clone, Copy, Deserialize, Serialize)] +pub enum WalEncodeFormat { + RowWise, + Columnar, +} +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct WalEncodeConfig { + /// The threshold of columnar bytes to do compression. + pub num_bytes_compress_threshold: ReadableSize, + /// Encode the data in a columnar layout if it is set. + pub format: WalEncodeFormat, +} + +impl Default for WalEncodeConfig { + fn default() -> Self { + Self { + num_bytes_compress_threshold: ReadableSize::kb(1), + format: WalEncodeFormat::RowWise, + } + } +} + impl Default for Config { fn default() -> Self { Self { @@ -163,6 +187,7 @@ impl Default for Config { max_retry_flush_limit: 0, max_bytes_per_write_batch: None, mem_usage_sampling_interval: ReadableDuration::secs(0), + wal_encode: WalEncodeConfig::default(), wal: StorageConfig::RocksDB(Box::default()), remote_engine_client: remote_engine_client::config::Config::default(), recover_mode: RecoverMode::TableBased, diff --git a/analytic_engine/src/manifest/details.rs b/analytic_engine/src/manifest/details.rs index 4d44cc45e4..6da9271477 100644 --- a/analytic_engine/src/manifest/details.rs +++ b/analytic_engine/src/manifest/details.rs @@ -192,7 +192,7 @@ impl MetaUpdateLogEntryIterator for MetaUpdateReaderImpl { let buffer = mem::take(&mut self.buffer); self.buffer = self .iter - .next_log_entries(decoder, buffer) + .next_log_entries(decoder, |_| true, buffer) .await .context(ReadEntry)?; } diff --git a/analytic_engine/src/manifest/meta_edit.rs b/analytic_engine/src/manifest/meta_edit.rs index 34cc70a339..d55b66f83c 100644 --- a/analytic_engine/src/manifest/meta_edit.rs +++ b/analytic_engine/src/manifest/meta_edit.rs @@ -26,7 +26,7 @@ use macros::define_result; use prost::Message; use snafu::{Backtrace, OptionExt, ResultExt, Snafu}; use table_engine::table::TableId; -use wal::log_batch::{Payload, PayloadDecoder}; +use wal::log_batch::{Payload, PayloadDecodeContext, PayloadDecoder}; use crate::{ manifest::meta_snapshot::MetaSnapshot, @@ -408,7 +408,7 @@ impl PayloadDecoder for MetaUpdateDecoder { type Error = Error; type Target = MetaUpdate; - fn decode(&self, buf: &mut B) -> Result { + fn decode(&self, _ctx: &PayloadDecodeContext, buf: &mut B) -> Result { let meta_update_pb = manifest_pb::MetaUpdate::decode(buf.chunk()).context(DecodePayloadPb)?; MetaUpdate::try_from(meta_update_pb) diff --git a/analytic_engine/src/payload.rs b/analytic_engine/src/payload.rs index 778f442100..032b40c5cf 100644 --- a/analytic_engine/src/payload.rs +++ b/analytic_engine/src/payload.rs @@ -16,17 +16,22 @@ use bytes_ext::{Buf, BufMut, SafeBuf, SafeBufMut}; use ceresdbproto::{manifest as manifest_pb, table_requests}; -use codec::{row::WalRowDecoder, Decoder}; +use codec::{ + columnar::{ColumnarDecoder, DecodeContext, DecodeResult}, + row::WalRowDecoder, + Decoder, +}; use common_types::{ - row::{RowGroup, RowGroupBuilder}, + row::{RowGroup, RowGroupBuilder, RowGroupBuilderFromColumn}, schema::Schema, + table::TableId, }; use macros::define_result; use prost::Message; use snafu::{Backtrace, OptionExt, ResultExt, Snafu}; -use wal::log_batch::{Payload, PayloadDecoder}; +use wal::log_batch::{Payload, PayloadDecodeContext, PayloadDecoder}; -use crate::{table_options, TableOptions}; +use crate::{instance::write::WalEncodeVersion, table_options, TableOptions}; #[derive(Debug, Snafu)] pub enum Error { @@ -61,10 +66,18 @@ pub enum Error { #[snafu(display("Failed to decode row, err:{}", source))] DecodeRow { source: codec::row::Error }, + #[snafu(display("Failed to decode column, err:{}", source))] + DecodeColumn { source: codec::columnar::Error }, + + #[snafu(display("Failed to build row group, err:{}", source))] + BuildRowGroup { source: common_types::row::Error }, + #[snafu(display( - "Table schema is not found in the write request.\nBacktrace:\n{}", - backtrace + "Invalid version of write request, version:{version}.\nBacktrace:\n{backtrace}" ))] + InvalidWriteReqVersion { version: u32, backtrace: Backtrace }, + + #[snafu(display("Table schema is not found.\nBacktrace:\n{}", backtrace))] TableSchemaNotFound { backtrace: Backtrace }, #[snafu(display( @@ -163,10 +176,23 @@ pub enum ReadPayload { } impl ReadPayload { - fn decode_write_from_pb(buf: &[u8]) -> Result { + fn decode_write_from_pb(schema: &Schema, buf: &[u8]) -> Result { let write_req_pb: table_requests::WriteRequest = Message::decode(buf).context(DecodeBody)?; + let version = { + let version = write_req_pb.version; + WalEncodeVersion::try_from_u32(version).context(InvalidWriteReqVersion { version })? + }; + match version { + WalEncodeVersion::RowWise => Self::decode_rowwise_write_req(write_req_pb), + WalEncodeVersion::Columnar => { + Self::decode_columnar_write_req(schema.clone(), write_req_pb) + } + } + } + + fn decode_rowwise_write_req(write_req_pb: table_requests::WriteRequest) -> Result { // Consume and convert schema in pb let schema: Schema = write_req_pb .schema @@ -191,6 +217,33 @@ impl ReadPayload { Ok(Self::Write { row_group }) } + fn decode_columnar_write_req( + schema: Schema, + write_req_pb: table_requests::WriteRequest, + ) -> Result { + let encoded_cols = write_req_pb.cols; + let mut row_group_builder = + RowGroupBuilderFromColumn::with_capacity(schema, encoded_cols.len()); + let mut decode_buf = Vec::new(); + for encoded_col in encoded_cols { + let decoder = ColumnarDecoder; + let mut col_buf = encoded_col.as_slice(); + let decode_ctx = DecodeContext { + buf: &mut decode_buf, + }; + let DecodeResult { column_id, datums } = decoder + .decode(decode_ctx, &mut col_buf) + .context(DecodeColumn)?; + + row_group_builder + .try_add_column(column_id, datums) + .context(BuildRowGroup)?; + } + + let row_group = row_group_builder.build(); + Ok(Self::Write { row_group }) + } + fn decode_alter_schema_from_pb(buf: &[u8]) -> Result { let alter_schema_meta_pb: manifest_pb::AlterSchemaMeta = Message::decode(buf).context(DecodeBody)?; @@ -220,15 +273,40 @@ impl ReadPayload { } } +/// The provider is used to provide the schema according to the table id. +pub trait TableSchemaProvider { + fn table_schema(&self, table_id: TableId) -> Option; +} + +pub struct SingleSchemaProviderAdapter { + pub schema: Schema, +} + +impl TableSchemaProvider for SingleSchemaProviderAdapter { + fn table_schema(&self, _table_id: TableId) -> Option { + Some(self.schema.clone()) + } +} + /// Wal payload decoder -#[derive(Default)] -pub struct WalDecoder; +pub struct WalDecoder

{ + schema_provider: P, +} + +impl WalDecoder

{ + pub fn new(schema_provider: P) -> Self { + Self { schema_provider } + } +} -impl PayloadDecoder for WalDecoder { +impl

PayloadDecoder for WalDecoder

+where + P: TableSchemaProvider + Send + Sync, +{ type Error = Error; type Target = ReadPayload; - fn decode(&self, buf: &mut B) -> Result { + fn decode(&self, ctx: &PayloadDecodeContext, buf: &mut B) -> Result { let header_value = buf.try_get_u8().context(DecodeHeader)?; let header = match Header::from_u8(header_value) { Some(header) => header, @@ -241,8 +319,12 @@ impl PayloadDecoder for WalDecoder { }; let chunk = buf.chunk(); + let schema = self + .schema_provider + .table_schema(ctx.table_id) + .context(TableSchemaNotFound)?; let payload = match header { - Header::Write => ReadPayload::decode_write_from_pb(chunk)?, + Header::Write => ReadPayload::decode_write_from_pb(&schema, chunk)?, Header::AlterSchema => ReadPayload::decode_alter_schema_from_pb(chunk)?, Header::AlterOption => ReadPayload::decode_alter_option_from_pb(chunk)?, }; diff --git a/benchmarks/src/wal_write_bench.rs b/benchmarks/src/wal_write_bench.rs index a3dbf1147c..17d435dd15 100644 --- a/benchmarks/src/wal_write_bench.rs +++ b/benchmarks/src/wal_write_bench.rs @@ -80,7 +80,7 @@ impl WalWriteBench { let wal = WalNamespaceImpl::open( MemoryImpl::default(), runtimes.clone(), - "ceresedb", + "ceresdb", NamespaceConfig::default(), ) .await @@ -88,8 +88,9 @@ impl WalWriteBench { let values = self.build_value_vec(); let wal_encoder = LogBatchEncoder::create(WalLocation::new(1, 1)); + let payloads = values.iter().map(|v| WritePayload(v)); let log_batch = wal_encoder - .encode_batch::>(values.as_slice()) + .encode_batch(payloads) .expect("should succeed to encode payload batch"); // Write to wal manager diff --git a/common_types/src/datum.rs b/common_types/src/datum.rs index 83cd2fc87f..e157b870da 100644 --- a/common_types/src/datum.rs +++ b/common_types/src/datum.rs @@ -521,6 +521,7 @@ impl Datum { /// Cast datum to timestamp. pub fn as_timestamp(&self) -> Option { match self { + Datum::Time(v) => Some(Timestamp::new(*v)), Datum::Timestamp(v) => Some(*v), _ => None, } @@ -1270,6 +1271,7 @@ impl<'a> DatumView<'a> { pub fn as_timestamp(&self) -> Option { match self { DatumView::Timestamp(v) => Some(*v), + DatumView::Time(v) => Some(Timestamp::new(*v)), _ => None, } } diff --git a/common_types/src/row/mod.rs b/common_types/src/row/mod.rs index c43295b1ec..7d861c4bfd 100644 --- a/common_types/src/row/mod.rs +++ b/common_types/src/row/mod.rs @@ -16,13 +16,14 @@ use std::{ cmp, - ops::{Index, IndexMut, Range}, + collections::HashMap, + ops::{Index, IndexMut}, }; use snafu::{ensure, Backtrace, OptionExt, Snafu}; use crate::{ - column_schema::ColumnSchema, + column_schema::{ColumnId, ColumnSchema}, datum::{Datum, DatumKind, DatumView}, record_batch::RecordBatchWithKey, schema::{RecordSchemaWithKey, Schema}, @@ -47,7 +48,19 @@ pub enum Error { }, #[snafu(display( - "Invalid column num of row, expect:{}, given:{}.\nBacktrace:\n{}", + "Invalid row num, expect:{}, given:{}.\nBacktrace:\n{}", + expect, + given, + backtrace + ))] + InvalidRowNum { + expect: usize, + given: usize, + backtrace: Backtrace, + }, + + #[snafu(display( + "Invalid column num, expect:{}, given:{}.\nBacktrace:\n{}", expect, given, backtrace @@ -99,6 +112,14 @@ pub enum Error { column: String, backtrace: Backtrace, }, + + #[snafu(display( + "Duplicate column id is found, column_id:{column_id}.\nBacktrace:\n{backtrace}", + ))] + DuplicateColumnId { + column_id: ColumnId, + backtrace: Backtrace, + }, } // Do not depend on test_util crates @@ -215,54 +236,6 @@ pub fn check_row_schema(row: &Row, schema: &Schema) -> Result<()> { Ok(()) } -#[derive(Debug)] -pub struct RowGroupSlicer<'a> { - range: Range, - row_group: &'a RowGroup, -} - -impl<'a> From<&'a RowGroup> for RowGroupSlicer<'a> { - fn from(value: &'a RowGroup) -> RowGroupSlicer<'a> { - Self { - range: 0..value.rows.len(), - row_group: value, - } - } -} - -impl<'a> RowGroupSlicer<'a> { - pub fn new(range: Range, row_group: &'a RowGroup) -> Self { - Self { range, row_group } - } - - #[inline] - pub fn is_empty(&self) -> bool { - self.range.is_empty() - } - - #[inline] - pub fn schema(&self) -> &Schema { - self.row_group.schema() - } - - #[inline] - pub fn iter(&self) -> IterRow<'a> { - IterRow { - iter: self.row_group.rows[self.range.start..self.range.end].iter(), - } - } - - #[inline] - pub fn slice_range(&self) -> Range { - self.range.clone() - } - - #[inline] - pub fn num_rows(&self) -> usize { - self.range.len() - } -} - // TODO(yingwen): For multiple rows that share the same schema, no need to store // Datum for each row element, we can store the whole row as a binary and // provide more efficient way to convert rows into columns @@ -390,7 +363,7 @@ impl<'a> Iterator for IterRow<'a> { } } -#[derive(Debug)] +#[derive(Clone, Debug)] pub struct IterCol<'a> { rows: &'a Vec, row_index: usize, @@ -421,6 +394,118 @@ impl<'a> Iterator for IterCol<'a> { } } +/// Build the [`RowGroup`] from the columns. +pub struct RowGroupBuilderFromColumn { + schema: Schema, + cols: HashMap>, +} + +impl RowGroupBuilderFromColumn { + pub fn with_capacity(schema: Schema, num_cols: usize) -> Self { + Self { + schema, + cols: HashMap::with_capacity(num_cols), + } + } + + /// The newly-added column should have the same elements as the + /// previously-added column's. + pub fn try_add_column(&mut self, col_id: ColumnId, col: Vec) -> Result<()> { + if let Some(num_rows) = self.num_rows() { + ensure!( + num_rows == col.len(), + InvalidRowNum { + expect: num_rows, + given: col.len(), + } + ); + } + + let old = self.cols.insert(col_id, col); + ensure!(old.is_none(), DuplicateColumnId { column_id: col_id }); + + Ok(()) + } + + pub fn build(mut self) -> RowGroup { + let num_rows = self.num_rows(); + if Some(0) == num_rows { + return RowGroup { + schema: self.schema, + rows: vec![], + min_timestamp: Timestamp::new(0), + max_timestamp: Timestamp::new(0), + }; + }; + + let num_rows = num_rows.unwrap(); + let num_cols = self.schema.num_columns(); + let mut rows = Vec::with_capacity(num_rows); + + // Pre-allocate the memory for column data in every row. + for _ in 0..num_rows { + let row = Vec::with_capacity(num_cols); + rows.push(row); + } + + let mut add_column_to_row = |row_idx: usize, datum: Datum| { + rows[row_idx].push(datum); + }; + + for col_schema in self.schema.columns() { + let col_id = col_schema.id; + let datums = self.cols.remove(&col_id); + + match datums { + Some(v) => { + for (row_idx, datum) in v.into_iter().enumerate() { + add_column_to_row(row_idx, datum); + } + } + None => { + for row_idx in 0..num_rows { + add_column_to_row(row_idx, Datum::Null); + } + } + } + } + + let rows = rows.into_iter().map(Row::from_datums).collect::>(); + + let (min_timestamp, max_timestamp) = self + .collect_minmax_timestamps(&rows) + .unwrap_or_else(|| (Timestamp::default(), Timestamp::default())); + + RowGroup { + schema: self.schema, + rows, + min_timestamp, + max_timestamp, + } + } + + #[inline] + fn num_rows(&self) -> Option { + self.cols.iter().next().map(|(_, v)| v.len()) + } + + fn collect_minmax_timestamps(&self, rows: &[Row]) -> Option<(Timestamp, Timestamp)> { + let timestamp_idx = self.schema.timestamp_index(); + if rows.is_empty() { + return None; + } + + rows.iter() + .fold(None, |prev: Option<(Timestamp, Timestamp)>, row| { + let timestamp = row[timestamp_idx].as_timestamp()?; + match prev { + None => Some((timestamp, timestamp)), + Some((min_ts, max_ts)) => Some((min_ts.min(timestamp), max_ts.max(timestamp))), + } + }) + } +} + /// RowGroup builder #[derive(Debug)] pub struct RowGroupBuilder { diff --git a/components/codec/src/columnar/mod.rs b/components/codec/src/columnar/mod.rs index d15604f579..46bcd3163e 100644 --- a/components/codec/src/columnar/mod.rs +++ b/components/codec/src/columnar/mod.rs @@ -88,6 +88,9 @@ pub enum Error { #[snafu(display("Bytes is not enough, length:{len}.\nBacktrace:\n{backtrace}"))] NotEnoughBytes { len: usize, backtrace: Backtrace }, + + #[snafu(display("Number operation overflowed, msg:{msg}.\nBacktrace:\n{backtrace}"))] + Overflow { msg: String, backtrace: Backtrace }, } define_result!(Error); @@ -117,7 +120,7 @@ trait ValuesEncoder { /// The decode context for decoding column. pub struct DecodeContext<'a> { /// Buffer for reuse during decoding. - buf: &'a mut Vec, + pub buf: &'a mut Vec, } /// The trait bound on the decoders for different types. @@ -258,11 +261,9 @@ impl ColumnarEncoder { let enc = ValuesEncoderImpl::default(); let data_size = match hint.datum_kind { DatumKind::Null => 0, - DatumKind::Timestamp => enc.estimated_encoded_size( - datums - .clone() - .filter_map(|v| v.as_timestamp().map(|v| v.as_i64())), - ), + DatumKind::Timestamp => { + enc.estimated_encoded_size(datums.clone().filter_map(|v| v.as_timestamp())) + } DatumKind::Double => { enc.estimated_encoded_size(datums.clone().filter_map(|v| v.as_f64())) } @@ -305,7 +306,9 @@ impl ColumnarEncoder { DatumKind::Date => { enc.estimated_encoded_size(datums.clone().filter_map(|v| v.as_date_i32())) } - DatumKind::Time => todo!(), + DatumKind::Time => { + enc.estimated_encoded_size(datums.clone().filter_map(|v| v.as_timestamp())) + } }; Self::header_size() + bit_set_size + data_size @@ -321,10 +324,7 @@ impl ColumnarEncoder { }; match datum_kind { DatumKind::Null => Ok(()), - DatumKind::Timestamp => enc.encode( - buf, - datums.filter_map(|v| v.as_timestamp().map(|v| v.as_i64())), - ), + DatumKind::Timestamp => enc.encode(buf, datums.filter_map(|v| v.as_timestamp())), DatumKind::Double => enc.encode(buf, datums.filter_map(|v| v.as_f64())), DatumKind::Float => enc.encode(buf, datums.filter_map(|v| v.as_f32())), DatumKind::Varbinary => enc.encode(buf, datums.filter_map(|v| v.into_bytes())), @@ -342,7 +342,7 @@ impl ColumnarEncoder { DatumKind::Int8 => enc.encode(buf, datums.filter_map(|v| v.as_i8())), DatumKind::Boolean => enc.encode(buf, datums.filter_map(|v| v.as_bool())), DatumKind::Date => enc.encode(buf, datums.filter_map(|v| v.as_date_i32())), - DatumKind::Time => todo!(), + DatumKind::Time => enc.encode(buf, datums.filter_map(|v| v.as_timestamp())), } } } @@ -445,8 +445,8 @@ impl ColumnarDecoder { match datum_kind { DatumKind::Null => Ok(()), DatumKind::Timestamp => { - let with_i64 = |v| f(Datum::from(Timestamp::new(v))); - ValuesDecoderImpl.decode(ctx, buf, with_i64) + let with_timestamp = |v: Timestamp| f(Datum::from(v)); + ValuesDecoderImpl.decode(ctx, buf, with_timestamp) } DatumKind::Double => { let with_float = |v: f64| f(Datum::from(v)); @@ -531,7 +531,10 @@ impl ColumnarDecoder { }; ValuesDecoderImpl.decode(ctx, buf, with_i32) } - DatumKind::Time => todo!(), + DatumKind::Time => { + let with_timestamp = |v: Timestamp| f(Datum::Time(v.as_i64())); + ValuesDecoderImpl.decode(ctx, buf, with_timestamp) + } } } } @@ -609,6 +612,26 @@ mod tests { ); } + #[test] + fn test_with_empty_datums() { + check_encode_end_decode(1, vec![], DatumKind::Null); + check_encode_end_decode(1, vec![], DatumKind::Timestamp); + check_encode_end_decode(1, vec![], DatumKind::Double); + check_encode_end_decode(1, vec![], DatumKind::Float); + check_encode_end_decode(1, vec![], DatumKind::Varbinary); + check_encode_end_decode(1, vec![], DatumKind::String); + check_encode_end_decode(1, vec![], DatumKind::UInt64); + check_encode_end_decode(1, vec![], DatumKind::UInt32); + check_encode_end_decode(1, vec![], DatumKind::UInt8); + check_encode_end_decode(1, vec![], DatumKind::Int64); + check_encode_end_decode(1, vec![], DatumKind::Int32); + check_encode_end_decode(1, vec![], DatumKind::Int16); + check_encode_end_decode(1, vec![], DatumKind::Int8); + check_encode_end_decode(1, vec![], DatumKind::Boolean); + check_encode_end_decode(1, vec![], DatumKind::Date); + check_encode_end_decode(1, vec![], DatumKind::Time); + } + #[test] fn test_i32_with_null() { let datums = vec![ @@ -680,6 +703,72 @@ mod tests { check_encode_end_decode(10, datums, DatumKind::Int64); } + #[test] + fn test_u64() { + let datums = vec![ + Datum::from(10u64), + Datum::from(1u64), + Datum::from(2u64), + Datum::from(18u64), + Datum::from(38u64), + Datum::from(48u64), + Datum::from(80u64), + Datum::from(81u64), + Datum::from(82u64), + ]; + + check_encode_end_decode(10, datums, DatumKind::UInt64); + } + + #[test] + fn test_timestamp() { + let datums = vec![ + Datum::from(Timestamp::new(-10)), + Datum::from(Timestamp::new(10)), + Datum::from(Timestamp::new(1024)), + Datum::from(Timestamp::new(1024)), + Datum::from(Timestamp::new(1025)), + ]; + + check_encode_end_decode(10, datums, DatumKind::Timestamp); + } + + #[test] + fn test_time() { + let datums = vec![ + Datum::Time(-10), + Datum::Time(10), + Datum::Time(1024), + Datum::Time(1024), + Datum::Time(1025), + ]; + + check_encode_end_decode(10, datums, DatumKind::Time); + } + + #[test] + fn test_overflow_timestamp() { + let datums = vec![ + Datum::from(Timestamp::new(i64::MIN)), + Datum::from(Timestamp::new(10)), + Datum::from(Timestamp::new(1024)), + Datum::from(Timestamp::new(1024)), + Datum::from(Timestamp::new(1025)), + ]; + + let encoder = ColumnarEncoder::new(0, 256); + let views = datums.iter().map(|v| v.as_view()); + let mut hint = EncodeHint { + num_nulls: None, + num_datums: None, + datum_kind: DatumKind::Timestamp, + }; + + let mut buf = Vec::new(); + let enc_res = encoder.encode(&mut buf, views, &mut hint); + assert!(enc_res.is_err()); + } + #[test] fn test_string() { let datums = vec![ diff --git a/components/codec/src/columnar/number.rs b/components/codec/src/columnar/number.rs index 1ed85746ac..e9c461860c 100644 --- a/components/codec/src/columnar/number.rs +++ b/components/codec/src/columnar/number.rs @@ -118,6 +118,8 @@ impl ValuesEncoder for ValuesEncoderImpl { B: BufMut, I: Iterator, { + buf.put_u8(VERSION); + for v in values { varint::encode_uvarint(buf, v).context(Varint)?; } diff --git a/components/codec/src/columnar/timestamp.rs b/components/codec/src/columnar/timestamp.rs index eab1a8bc35..9a6efd3275 100644 --- a/components/codec/src/columnar/timestamp.rs +++ b/components/codec/src/columnar/timestamp.rs @@ -11,3 +11,94 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. + +use common_types::time::Timestamp; +use snafu::{ensure, OptionExt, ResultExt}; + +use super::{DecodeContext, Overflow, Result, ValuesDecoder, ValuesDecoderImpl, Varint}; +use crate::{ + columnar::{InvalidVersion, ValuesEncoder, ValuesEncoderImpl}, + consts::MAX_VARINT_BYTES, + varint, +}; + +/// The layout for the timestamp values: +/// ```plaintext +/// +-------------+----------------------+--------+ +/// | version(u8) | first_timestamp(i64) | deltas | +/// +-------------+----------------------+--------+ +/// ``` +/// +/// This encoding assume the timestamps are have very small differences between +/// each other, so we just store the deltas from the first timestamp in varint. +struct Encoding; + +impl Encoding { + const VERSION: u8 = 0; + const VERSION_SIZE: usize = 1; +} + +impl ValuesEncoder for ValuesEncoderImpl { + fn encode(&self, buf: &mut B, mut values: I) -> Result<()> + where + B: bytes_ext::BufMut, + I: Iterator + Clone, + { + buf.put_u8(Encoding::VERSION); + + let first_ts = match values.next() { + Some(v) => v.as_i64(), + None => return Ok(()), + }; + + buf.put_i64(first_ts); + + for value in values { + let ts = value.as_i64(); + let delta = ts.checked_sub(first_ts).with_context(|| Overflow { + msg: format!("first timestamp:{ts}, current timestamp:{first_ts}"), + })?; + varint::encode_varint(buf, delta).context(Varint)?; + } + + Ok(()) + } + + fn estimated_encoded_size(&self, values: I) -> usize + where + I: Iterator, + { + let (lower, higher) = values.size_hint(); + let num = lower.max(higher.unwrap_or_default()); + num * MAX_VARINT_BYTES + Encoding::VERSION_SIZE + } +} + +impl ValuesDecoder for ValuesDecoderImpl { + fn decode(&self, _ctx: DecodeContext<'_>, buf: &mut B, mut f: F) -> Result<()> + where + B: bytes_ext::Buf, + F: FnMut(Timestamp) -> Result<()>, + { + let version = buf.get_u8(); + + ensure!(version == Encoding::VERSION, InvalidVersion { version }); + + if buf.remaining() == 0 { + return Ok(()); + } + + let first_ts = buf.get_i64(); + f(Timestamp::new(first_ts))?; + + while buf.remaining() > 0 { + let delta = varint::decode_varint(buf).context(Varint)?; + let ts = first_ts.checked_add(delta).with_context(|| Overflow { + msg: format!("first timestamp:{first_ts}, delta:{delta}"), + })?; + f(Timestamp::new(ts))?; + } + + Ok(()) + } +} diff --git a/src/wal/src/kv_encoder.rs b/src/wal/src/kv_encoder.rs index 4ee243d519..81f9d36319 100644 --- a/src/wal/src/kv_encoder.rs +++ b/src/wal/src/kv_encoder.rs @@ -562,18 +562,16 @@ impl LogBatchEncoder { /// Consume LogBatchEncoder and encode raw payload batch to LogWriteBatch. /// Note: To build payload from raw payload in `encode_batch`, raw payload /// need implement From trait. - pub fn encode_batch<'a, P: Payload, I>( - self, - raw_payload_batch: &'a [I], - ) -> manager::Result + pub fn encode_batch(self, raw_payloads: I) -> manager::Result where - &'a I: Into

, + I: Iterator, + P: Payload, { let mut write_batch = LogWriteBatch::new(self.location); let mut buf = BytesMut::new(); - for raw_payload in raw_payload_batch.iter() { + for raw_payload in raw_payloads { self.log_encoding - .encode_value(&mut buf, &raw_payload.into()) + .encode_value(&mut buf, &raw_payload) .box_err() .context(Encoding)?; @@ -750,7 +748,7 @@ mod tests { use super::*; use crate::{ kv_encoder::CommonLogKey, - log_batch::{MemoryPayload, MemoryPayloadDecoder, PayloadDecoder}, + log_batch::{MemoryPayload, MemoryPayloadDecoder, PayloadDecodeContext, PayloadDecoder}, }; #[test] @@ -777,7 +775,9 @@ mod tests { encoding.encode_value(&mut buf, &payload).unwrap(); let mut value = encoding.decode_value(&buf).unwrap(); - let decoded_value = decoder.decode(&mut value).unwrap(); + let decoded_value = decoder + .decode(&PayloadDecodeContext::default(), &mut value) + .unwrap(); assert_eq!(payload, decoded_value); } diff --git a/src/wal/src/lib.rs b/src/wal/src/lib.rs index c9eb0e1962..134a4048cf 100644 --- a/src/wal/src/lib.rs +++ b/src/wal/src/lib.rs @@ -14,6 +14,8 @@ //! Write Ahead Log +#![feature(trait_alias)] + pub mod config; pub mod kv_encoder; pub mod log_batch; diff --git a/src/wal/src/log_batch.rs b/src/wal/src/log_batch.rs index 44abf8a14d..623e0af95e 100644 --- a/src/wal/src/log_batch.rs +++ b/src/wal/src/log_batch.rs @@ -107,11 +107,21 @@ impl LogWriteBatch { } } +/// The context to decode payload. +#[derive(Debug, Default, Clone)] +pub struct PayloadDecodeContext { + pub table_id: TableId, +} + pub trait PayloadDecoder: Send + Sync { type Error: std::error::Error + Send + Sync + 'static; type Target: Send + Sync; /// Decode `Target` from the `bytes`. - fn decode(&self, buf: &mut B) -> Result; + fn decode( + &self, + ctx: &PayloadDecodeContext, + buf: &mut B, + ) -> Result; } pub struct MemoryPayloadDecoder; @@ -120,7 +130,11 @@ impl PayloadDecoder for MemoryPayloadDecoder { type Error = Error; type Target = MemoryPayload; - fn decode(&self, buf: &mut B) -> Result { + fn decode( + &self, + _ctx: &PayloadDecodeContext, + buf: &mut B, + ) -> Result { let val = buf.try_get_u32().expect("should succeed to read u32"); Ok(MemoryPayload { val }) } diff --git a/src/wal/src/manager.rs b/src/wal/src/manager.rs index 113b42061d..78404cf640 100644 --- a/src/wal/src/manager.rs +++ b/src/wal/src/manager.rs @@ -29,10 +29,12 @@ use snafu::ResultExt; use crate::{ config::StorageConfig, - log_batch::{LogEntry, LogWriteBatch, PayloadDecoder}, + log_batch::{LogEntry, LogWriteBatch, PayloadDecodeContext, PayloadDecoder}, metrics::WAL_WRITE_BYTES_HISTOGRAM, }; +pub trait TableFilter = Fn(TableId) -> bool; + pub mod error { use generic_error::GenericError; use macros::define_result; @@ -398,24 +400,36 @@ impl BatchLogIteratorAdapter { } } - async fn simulated_async_next( + async fn simulated_async_next( &mut self, decoder: D, + filter: F, runtime: Arc, sync_iter: Box, mut buffer: VecDeque>, - ) -> Result<(VecDeque>, Option)> { + ) -> Result<(VecDeque>, Option)> + where + D: PayloadDecoder + Send + 'static, + F: TableFilter + Send + 'static, + { buffer.clear(); let mut iter = sync_iter; let batch_size = self.batch_size; let (log_entries, iter_opt) = runtime .spawn_blocking(move || { - for _ in 0..batch_size { + while buffer.len() < batch_size { if let Some(raw_log_entry) = iter.next_log_entry()? { + if !filter(raw_log_entry.table_id) { + continue; + } + let mut raw_payload = raw_log_entry.payload; + let ctx = PayloadDecodeContext { + table_id: raw_log_entry.table_id, + }; let payload = decoder - .decode(&mut raw_payload) + .decode(&ctx, &mut raw_payload) .box_err() .context(error::Decoding)?; let log_entry = LogEntry { @@ -440,20 +454,31 @@ impl BatchLogIteratorAdapter { } } - async fn async_next( + async fn async_next( &mut self, decoder: D, + filter: F, async_iter: Box, mut buffer: VecDeque>, - ) -> Result<(VecDeque>, Option)> { + ) -> Result<(VecDeque>, Option)> + where + D: PayloadDecoder + Send + 'static, + F: TableFilter + Send + 'static, + { buffer.clear(); let mut async_iter = async_iter; - for _ in 0..self.batch_size { + while buffer.len() < self.batch_size { if let Some(raw_log_entry) = async_iter.next_log_entry().await? { + if !filter(raw_log_entry.table_id) { + continue; + } let mut raw_payload = raw_log_entry.payload; + let ctx = PayloadDecodeContext { + table_id: raw_log_entry.table_id, + }; let payload = decoder - .decode(&mut raw_payload) + .decode(&ctx, &mut raw_payload) .box_err() .context(error::Decoding)?; let log_entry = LogEntry { @@ -470,11 +495,21 @@ impl BatchLogIteratorAdapter { Ok((buffer, Some(LogIterator::Async(async_iter)))) } - pub async fn next_log_entries( + /// Read the next batch of wal logs. + /// + /// The `filter` will be used to determine whether a log is needed anymore, + /// so unnecessary decoding work can be avoided. + /// NOTE: the logs will be accepted if the return result of filter is true. + pub async fn next_log_entries( &mut self, decoder: D, + filter: F, buffer: VecDeque>, - ) -> Result>> { + ) -> Result>> + where + D: PayloadDecoder + Send + 'static, + F: TableFilter + Send + 'static, + { if self.iter.is_none() { return Ok(VecDeque::new()); } @@ -482,10 +517,10 @@ impl BatchLogIteratorAdapter { let iter = self.iter.take().unwrap(); let (log_entries, iter) = match iter { LogIterator::Sync { iter, runtime } => { - self.simulated_async_next(decoder, runtime, iter, buffer) + self.simulated_async_next(decoder, filter, runtime, iter, buffer) .await? } - LogIterator::Async(iter) => self.async_next(decoder, iter, buffer).await?, + LogIterator::Async(iter) => self.async_next(decoder, filter, iter, buffer).await?, }; self.iter = iter; @@ -607,7 +642,7 @@ mod tests { loop { buffer = iter - .next_log_entries(MemoryPayloadDecoder, buffer) + .next_log_entries(MemoryPayloadDecoder, |_| true, buffer) .await .unwrap(); for entry in buffer.iter() { @@ -631,7 +666,7 @@ mod tests { let mut buffer = VecDeque::with_capacity(3); loop { buffer = iter - .next_log_entries(MemoryPayloadDecoder, buffer) + .next_log_entries(MemoryPayloadDecoder, |_| true, buffer) .await .unwrap(); for entry in buffer.iter() { diff --git a/src/wal/src/message_queue_impl/region.rs b/src/wal/src/message_queue_impl/region.rs index ff2037726f..7008f3e761 100644 --- a/src/wal/src/message_queue_impl/region.rs +++ b/src/wal/src/message_queue_impl/region.rs @@ -957,7 +957,7 @@ mod tests { }; use crate::{ - log_batch::PayloadDecoder, + log_batch::{PayloadDecodeContext, PayloadDecoder}, manager::{ReadContext, WriteContext}, message_queue_impl::{encoding::MetaEncoding, test_util::TestContext}, }; @@ -1033,9 +1033,12 @@ mod tests { .unwrap(); while let Some(log_entry) = msg_iter.next_log_entry().await.unwrap() { let mut payload = log_entry.payload; + let ctx = PayloadDecodeContext { + table_id: log_entry.table_id, + }; let decoded_payload = test_context .test_payload_encoder - .decode(&mut payload) + .decode(&ctx, &mut payload) .unwrap(); mixed_decoded_res.push(decoded_payload.val); } diff --git a/src/wal/src/message_queue_impl/test_util.rs b/src/wal/src/message_queue_impl/test_util.rs index 73bf8278c9..76f9edb8b2 100644 --- a/src/wal/src/message_queue_impl/test_util.rs +++ b/src/wal/src/message_queue_impl/test_util.rs @@ -71,9 +71,8 @@ impl TestContext { .map(|(table_id, data)| { let log_batch_encoder = LogBatchEncoder::create(WalLocation::new(region_id, table_id)); - let log_write_batch = log_batch_encoder - .encode_batch::(&data) - .unwrap(); + let payloads = data.iter().map(|v| MemoryPayload { val: *v }); + let log_write_batch = log_batch_encoder.encode_batch(payloads).unwrap(); (table_id, TestDataOfTable::new(data, log_write_batch)) }) diff --git a/src/wal/src/table_kv_impl/namespace.rs b/src/wal/src/table_kv_impl/namespace.rs index bb61b11cc5..5793139369 100644 --- a/src/wal/src/table_kv_impl/namespace.rs +++ b/src/wal/src/table_kv_impl/namespace.rs @@ -1640,7 +1640,7 @@ mod tests { use super::*; use crate::{ kv_encoder::{LogBatchEncoder, LogEncoding}, - log_batch::{MemoryPayload, MemoryPayloadDecoder, PayloadDecoder}, + log_batch::{MemoryPayload, MemoryPayloadDecoder, PayloadDecodeContext, PayloadDecoder}, table_kv_impl::consts, }; @@ -2001,7 +2001,10 @@ mod tests { while iter.valid() { let decoded_key = log_encoding.decode_key(iter.key()).unwrap(); let mut raw_value = log_encoding.decode_value(iter.value()).unwrap(); - let decoded_value = decoder.decode(&mut raw_value).unwrap(); + let ctx = PayloadDecodeContext { + table_id: region_id, + }; + let decoded_value = decoder.decode(&ctx, &mut raw_value).unwrap(); key_values.push((decoded_key.1, decoded_value)); iter.next().unwrap(); @@ -2025,7 +2028,7 @@ mod tests { let log_entries = (start_sequence..end_sequence).collect::>(); let wal_encoder = LogBatchEncoder::create(location); let log_batch = wal_encoder - .encode_batch::(&log_entries) + .encode_batch(log_entries.iter().map(|v| MemoryPayload { val: *v })) .expect("should succeed to encode payload batch"); let write_ctx = manager::WriteContext::default(); namespace diff --git a/src/wal/tests/read_write.rs b/src/wal/tests/read_write.rs index a2f6181283..49d4aea662 100644 --- a/src/wal/tests/read_write.rs +++ b/src/wal/tests/read_write.rs @@ -1021,11 +1021,11 @@ impl TestEnv { start: u32, end: u32, ) -> (Vec, LogWriteBatch) { - let log_entries = (start..end).collect::>(); + let log_entries = start..end; let log_batch_encoder = LogBatchEncoder::create(location); let log_batch = log_batch_encoder - .encode_batch::(&log_entries) + .encode_batch(log_entries.map(|v| MemoryPayload { val: v })) .expect("should succeed to encode payloads"); let payload_batch = self.build_payload_batch(start, end); @@ -1047,7 +1047,7 @@ impl TestEnv { loop { let dec = MemoryPayloadDecoder; let log_entries = iter - .next_log_entries(dec, VecDeque::new()) + .next_log_entries(dec, |_| true, VecDeque::new()) .await .expect("should succeed to fetch next log entry"); if log_entries.is_empty() { From 2cb70f7ff7e89361de9a0596648ef313d81a4d96 Mon Sep 17 00:00:00 2001 From: Jiacai Liu Date: Mon, 6 Nov 2023 19:12:49 +0800 Subject: [PATCH 2/5] fix: ensure primary key order (#1292) --- analytic_engine/src/instance/write.rs | 1 + analytic_engine/src/manifest/details.rs | 2 + analytic_engine/src/sst/meta_data/cache.rs | 1 + analytic_engine/src/table/mod.rs | 1 + analytic_engine/src/tests/alter_test.rs | 7 +- analytic_engine/src/tests/drop_test.rs | 2 + analytic_engine/src/tests/table.rs | 3 +- common_types/src/schema.rs | 126 +++++++----------- common_types/src/tests.rs | 4 +- interpreters/src/alter_table.rs | 1 + proxy/src/grpc/prom_query.rs | 1 + proxy/src/http/prom.rs | 1 + proxy/src/influxdb/types.rs | 1 + proxy/src/write.rs | 1 + query_frontend/src/planner.rs | 9 +- system_catalog/src/sys_catalog_table.rs | 1 + system_catalog/src/tables.rs | 1 + .../src/partition/rule/df_adapter/mod.rs | 1 + 18 files changed, 84 insertions(+), 80 deletions(-) diff --git a/analytic_engine/src/instance/write.rs b/analytic_engine/src/instance/write.rs index 246e5da1ee..bed6a71eee 100644 --- a/analytic_engine/src/instance/write.rs +++ b/analytic_engine/src/instance/write.rs @@ -735,6 +735,7 @@ mod tests { let schema = SchemaBuilder::new() .add_key_column(column_schema) .unwrap() + .primary_key_indexes(vec![0]) .build() .unwrap(); let row_group = RowGroupBuilder::with_rows(schema, rows).unwrap().build(); diff --git a/analytic_engine/src/manifest/details.rs b/analytic_engine/src/manifest/details.rs index 6da9271477..e58bfce1b4 100644 --- a/analytic_engine/src/manifest/details.rs +++ b/analytic_engine/src/manifest/details.rs @@ -766,6 +766,7 @@ mod tests { fn build_altered_schema(schema: &Schema) -> Schema { let mut builder = schema::Builder::new().auto_increment_column_id(true); + let old_pk_indexes = schema.primary_key_indexes(); for column_schema in schema.key_columns() { builder = builder .add_key_column(column_schema.clone()) @@ -783,6 +784,7 @@ mod tests { .expect("should succeed build column schema"), ) .unwrap() + .primary_key_indexes(old_pk_indexes.to_vec()) .build() .unwrap() } diff --git a/analytic_engine/src/sst/meta_data/cache.rs b/analytic_engine/src/sst/meta_data/cache.rs index 720fc77971..d853b3fe98 100644 --- a/analytic_engine/src/sst/meta_data/cache.rs +++ b/analytic_engine/src/sst/meta_data/cache.rs @@ -307,6 +307,7 @@ mod tests { .unwrap() .add_key_column(timestamp_column_schema) .unwrap() + .primary_key_indexes(vec![0, 1]) .build() .unwrap() }; diff --git a/analytic_engine/src/table/mod.rs b/analytic_engine/src/table/mod.rs index 554f2d08eb..a502715e32 100644 --- a/analytic_engine/src/table/mod.rs +++ b/analytic_engine/src/table/mod.rs @@ -637,6 +637,7 @@ mod tests { ) -> WriteRequest { let schema = FixedSchemaTable::default_schema_builder() .version(schema_version) + .primary_key_indexes(vec![0, 1]) .build() .unwrap(); let mut schema_rows = Vec::with_capacity(num_rows); diff --git a/analytic_engine/src/tests/alter_test.rs b/analytic_engine/src/tests/alter_test.rs index 5687cfcc00..579c2be65d 100644 --- a/analytic_engine/src/tests/alter_test.rs +++ b/analytic_engine/src/tests/alter_test.rs @@ -132,7 +132,10 @@ async fn alter_schema_same_schema_version_case( let mut schema_builder = FixedSchemaTable::default_schema_builder(); schema_builder = add_columns(schema_builder); - let new_schema = schema_builder.build().unwrap(); + let new_schema = schema_builder + .primary_key_indexes(vec![0, 1]) + .build() + .unwrap(); let table = test_ctx.table(table_name); let old_schema = table.schema(); @@ -160,6 +163,7 @@ async fn alter_schema_old_pre_version_case( let new_schema = schema_builder .version(old_schema.version() + 1) + .primary_key_indexes(old_schema.primary_key_indexes().to_vec()) .build() .unwrap(); @@ -190,6 +194,7 @@ async fn alter_schema_add_column_case( let new_schema = schema_builder .version(old_schema.version() + 1) + .primary_key_indexes(old_schema.primary_key_indexes().to_vec()) .build() .unwrap(); diff --git a/analytic_engine/src/tests/drop_test.rs b/analytic_engine/src/tests/drop_test.rs index 2f7dfa97fe..a9c0510dc5 100644 --- a/analytic_engine/src/tests/drop_test.rs +++ b/analytic_engine/src/tests/drop_test.rs @@ -278,8 +278,10 @@ fn test_alter_schema_drop_create(engine_context: T) { .unwrap(), ) .unwrap(); + let new_schema = schema_builder .version(old_schema.version() + 1) + .primary_key_indexes(old_schema.primary_key_indexes().to_vec()) .build() .unwrap(); let request = AlterSchemaRequest { diff --git a/analytic_engine/src/tests/table.rs b/analytic_engine/src/tests/table.rs index 4e139c30ec..ec6236d92f 100644 --- a/analytic_engine/src/tests/table.rs +++ b/analytic_engine/src/tests/table.rs @@ -336,7 +336,8 @@ pub fn create_schema_builder( assert!(!key_tuples.is_empty()); let mut schema_builder = schema::Builder::with_capacity(key_tuples.len() + normal_tuples.len()) - .auto_increment_column_id(true); + .auto_increment_column_id(true) + .primary_key_indexes((0..key_tuples.len()).collect()); for tuple in key_tuples { // Key column is not nullable. diff --git a/common_types/src/schema.rs b/common_types/src/schema.rs index fca2142832..e114c1e4d6 100644 --- a/common_types/src/schema.rs +++ b/common_types/src/schema.rs @@ -87,6 +87,16 @@ pub enum Error { backtrace: Backtrace, }, + #[snafu(display( + "Column id is missing in schema, id:{}.\nBacktrace:\n{}", + id, + backtrace + ))] + ColumnIdMissing { id: ColumnId, backtrace: Backtrace }, + + #[snafu(display("Primary key indexes cannot be empty.\nBacktrace:\n{}", backtrace))] + EmptyPirmaryKeyIndexes { backtrace: Backtrace }, + #[snafu(display( "Unsupported key column type, name:{}, type:{:?}.\nBacktrace:\n{}", name, @@ -114,13 +124,6 @@ pub enum Error { #[snafu(display("Timestamp not in primary key.\nBacktrace:\n{}", backtrace))] TimestampNotInPrimaryKey { backtrace: Backtrace }, - #[snafu(display( - "Key column cannot be nullable, name:{}.\nBacktrace:\n{}", - name, - backtrace - ))] - NullKeyColumn { name: String, backtrace: Backtrace }, - #[snafu(display( "Invalid arrow field, field_name:{}, arrow_schema:{:?}, err:{}", field_name, @@ -958,62 +961,37 @@ impl Schema { impl TryFrom for Schema { type Error = Error; - // We can't use Builder directly here, since it will disorder columns. fn try_from(schema: schema_pb::TableSchema) -> Result { + let mut builder = Builder::with_capacity(schema.columns.len()).version(schema.version); let primary_key_ids = schema.primary_key_ids; - let column_schemas = schema - .columns - .into_iter() - .map(|column_schema_pb| { - ColumnSchema::try_from(column_schema_pb).context(ColumnSchemaDeserializeFailed) + + let primary_key_indexes = primary_key_ids + .iter() + .cloned() + .map(|id| { + let col_idx = schema + .columns + .iter() + .enumerate() + .find_map(|(idx, col)| if col.id == id { Some(idx) } else { None }) + .context(ColumnIdMissing { id })?; + + Ok(col_idx) }) .collect::>>()?; + builder = builder.primary_key_indexes(primary_key_indexes); - let mut primary_key_indexes = Vec::with_capacity(primary_key_ids.len()); - let mut timestamp_index = None; - for pk_id in &primary_key_ids { - for (idx, col) in column_schemas.iter().enumerate() { - if col.id == *pk_id { - primary_key_indexes.push(idx); - if DatumKind::Timestamp == col.data_type { - // TODO: add a timestamp_id in schema_pb, so we can have two timestamp - // columns in primary keys. - if let Some(idx) = timestamp_index { - let column_schema: &ColumnSchema = &column_schemas[idx]; - return TimestampKeyExists { - timestamp_column: column_schema.name.to_string(), - given_column: col.name.clone(), - } - .fail(); - } - - timestamp_index = Some(idx); - } - break; - } + for column_schema_pb in schema.columns { + let column = + ColumnSchema::try_from(column_schema_pb).context(ColumnSchemaDeserializeFailed)?; + if primary_key_ids.contains(&column.id) { + builder = builder.add_key_column(column)?; + } else { + builder = builder.add_normal_column(column)?; } } - let timestamp_index = timestamp_index.context(TimestampNotInPrimaryKey)?; - let tsid_index = Builder::find_tsid_index(&column_schemas); - let fields = column_schemas - .iter() - .map(|c| c.to_arrow_field()) - .collect::>(); - let meta = Builder::build_arrow_schema_meta( - primary_key_indexes.clone(), - timestamp_index, - schema.version, - ); - - Ok(Schema { - arrow_schema: Arc::new(ArrowSchema::new_with_metadata(fields, meta)), - primary_key_indexes, - column_schemas: Arc::new(ColumnSchemas::new(column_schemas)), - version: schema.version, - tsid_index, - timestamp_index, - }) + builder.build() } } @@ -1090,8 +1068,6 @@ impl Builder { self.may_alloc_column_id(&mut column); self.validate_column(&column, true)?; - ensure!(!column.is_nullable, NullKeyColumn { name: column.name }); - // FIXME(xikai): it seems not reasonable to decide the timestamp column in this // way. let is_timestamp = DatumKind::Timestamp == column.data_type; @@ -1106,7 +1082,6 @@ impl Builder { self.timestamp_index = Some(self.columns.len()); } - self.primary_key_indexes.push(self.columns.len()); self.insert_new_column(column); Ok(self) @@ -1122,6 +1097,12 @@ impl Builder { Ok(self) } + /// Set primary key indexes of the schema + pub fn primary_key_indexes(mut self, indexes: Vec) -> Self { + self.primary_key_indexes = indexes; + self + } + /// Set version of the schema pub fn version(mut self, version: Version) -> Self { self.version = version; @@ -1264,7 +1245,10 @@ impl Builder { let timestamp_index = self.timestamp_index.context(TimestampNotInPrimaryKey)?; // Timestamp key column is exists, so key columns should not be zero - assert!(!self.primary_key_indexes.is_empty()); + ensure!( + !self.primary_key_indexes.is_empty(), + EmptyPirmaryKeyIndexes {} + ); let tsid_index = Self::find_tsid_index(&self.columns); let fields = self @@ -1373,6 +1357,7 @@ mod tests { .expect("should succeed build column schema"), ) .unwrap() + .primary_key_indexes(vec![0, 1]) .build() .unwrap() } @@ -1466,6 +1451,7 @@ mod tests { .expect("should succeed build column schema"), ) .unwrap() + .primary_key_indexes(vec![1, 2]) .build() .unwrap(); @@ -1566,6 +1552,7 @@ mod tests { .expect("should succeed build column schema"), ) .unwrap() + .primary_key_indexes(vec![0]) .build() .unwrap(); } @@ -1586,23 +1573,7 @@ mod tests { .expect("should succeed build column schema"), ) .unwrap(); - assert!(builder.build().is_err()); - } - - // Currently we allow null key column, maybe we can rename it to sorted column. - // Since we primary key in ceresdb isn't same with MySQL, and it only served for - // sort. - #[test] - fn test_null_key() { - assert!(Builder::new() - .add_key_column( - column_schema::Builder::new("key1".to_string(), DatumKind::Varbinary) - .id(1) - .is_nullable(true) - .build() - .expect("should succeed build column schema") - ) - .is_err()); + assert!(builder.primary_key_indexes(vec![0]).build().is_err()); } #[test] @@ -1637,6 +1608,7 @@ mod tests { .expect("should succeed build column schema"), ) .unwrap() + .primary_key_indexes(vec![0]) .build() .unwrap(); @@ -1690,6 +1662,7 @@ mod tests { .expect("should succeed build column schema"), ) .unwrap() + .primary_key_indexes(vec![0, 1]) .build() .unwrap(); @@ -1796,6 +1769,7 @@ mod tests { .expect("should succeed build column schema"), ) .unwrap() + .primary_key_indexes(vec![0, 1]) .build() .expect("should succeed to build schema"); diff --git a/common_types/src/tests.rs b/common_types/src/tests.rs index 666a1a6c91..29a442fdb9 100644 --- a/common_types/src/tests.rs +++ b/common_types/src/tests.rs @@ -73,11 +73,13 @@ fn base_schema_builder() -> schema::Builder { .expect("should succeed build column schema"), ) .unwrap() + .primary_key_indexes(vec![0, 1]) } fn default_value_schema_builder() -> schema::Builder { schema::Builder::new() .auto_increment_column_id(true) + .primary_key_indexes(vec![0, 1]) .add_key_column( column_schema::Builder::new("key1".to_string(), DatumKind::Varbinary) .build() @@ -233,7 +235,7 @@ pub fn build_schema_for_cpu() -> Schema { ) .unwrap(); - builder.build().unwrap() + builder.primary_key_indexes(vec![0, 1]).build().unwrap() } #[allow(clippy::too_many_arguments)] diff --git a/interpreters/src/alter_table.rs b/interpreters/src/alter_table.rs index f3893442e8..4ce0aedabf 100644 --- a/interpreters/src/alter_table.rs +++ b/interpreters/src/alter_table.rs @@ -94,6 +94,7 @@ fn build_new_schema(current_schema: &Schema, column_schemas: Vec) let mut builder = schema::Builder::with_capacity(current_schema.num_columns() + column_schemas.len()) + .primary_key_indexes(current_schema.primary_key_indexes().to_vec()) // Increment the schema version. .version(current_version + 1); for (idx, column) in current_schema.columns().iter().enumerate() { diff --git a/proxy/src/grpc/prom_query.rs b/proxy/src/grpc/prom_query.rs index 375d9ca25d..13f18865e4 100644 --- a/proxy/src/grpc/prom_query.rs +++ b/proxy/src/grpc/prom_query.rs @@ -390,6 +390,7 @@ mod tests { .unwrap(), ) .unwrap() + .primary_key_indexes(vec![0, 1]) .build() .unwrap() } diff --git a/proxy/src/http/prom.rs b/proxy/src/http/prom.rs index b5c9565822..b84107124a 100644 --- a/proxy/src/http/prom.rs +++ b/proxy/src/http/prom.rs @@ -605,6 +605,7 @@ mod tests { .unwrap(), ) .unwrap() + .primary_key_indexes(vec![0, 1]) .build() .unwrap() } diff --git a/proxy/src/influxdb/types.rs b/proxy/src/influxdb/types.rs index bad6af396c..6a9a4acb87 100644 --- a/proxy/src/influxdb/types.rs +++ b/proxy/src/influxdb/types.rs @@ -796,6 +796,7 @@ mod tests { .expect("should succeed build column schema"), ) .unwrap() + .primary_key_indexes(vec![0]) .build() .unwrap(); diff --git a/proxy/src/write.rs b/proxy/src/write.rs index d1bc231ebf..2f07f43d04 100644 --- a/proxy/src/write.rs +++ b/proxy/src/write.rs @@ -1112,6 +1112,7 @@ mod test { .unwrap(), ) .unwrap() + .primary_key_indexes(vec![0, 1, 2]) .build() .unwrap() } diff --git a/query_frontend/src/planner.rs b/query_frontend/src/planner.rs index f8a889d5a3..9245c2ca36 100644 --- a/query_frontend/src/planner.rs +++ b/query_frontend/src/planner.rs @@ -534,7 +534,10 @@ pub fn build_schema_from_write_table_request( .context(BuildTableSchema {})?; } - schema_builder.build().context(BuildTableSchema {}) + schema_builder + .primary_key_indexes(vec![0, 1]) + .build() + .context(BuildTableSchema {}) } fn ensure_data_type_compatible( @@ -647,6 +650,10 @@ impl<'a, P: MetaProvider> PlannerDelegate<'a, P> { schema::Builder::with_capacity(columns_by_name.len()).auto_increment_column_id(true); // Collect the key columns. + // TODO: Here we put key column in front of all columns, this may change column + // order defined by users. + let primary_key_indexes = (0..primary_key_columns.len()).collect(); + schema_builder = schema_builder.primary_key_indexes(primary_key_indexes); for key_col in primary_key_columns { let col_name = key_col.value.as_str(); let col = columns_by_name diff --git a/system_catalog/src/sys_catalog_table.rs b/system_catalog/src/sys_catalog_table.rs index 53941bb0d9..dcef633ca6 100644 --- a/system_catalog/src/sys_catalog_table.rs +++ b/system_catalog/src/sys_catalog_table.rs @@ -724,6 +724,7 @@ fn new_sys_catalog_schema() -> schema::Result { .build() .expect("Should succeed to build column schema of catalog"), )? + .primary_key_indexes(vec![0, 1]) .build() } diff --git a/system_catalog/src/tables.rs b/system_catalog/src/tables.rs index f6f1d2d526..179051399d 100644 --- a/system_catalog/src/tables.rs +++ b/system_catalog/src/tables.rs @@ -91,6 +91,7 @@ fn tables_schema() -> Schema { .unwrap(), ) .unwrap() + .primary_key_indexes(vec![0, 1, 2]) .build() .unwrap() } diff --git a/table_engine/src/partition/rule/df_adapter/mod.rs b/table_engine/src/partition/rule/df_adapter/mod.rs index bb423f956f..69bee374ea 100644 --- a/table_engine/src/partition/rule/df_adapter/mod.rs +++ b/table_engine/src/partition/rule/df_adapter/mod.rs @@ -286,6 +286,7 @@ mod tests { .expect("should succeed build column schema"), ) .unwrap() + .primary_key_indexes(vec![0, 1]) .build() .expect("should succeed to build schema") } From f6de24853c4e298e898ecbfce1e6a37ebc204760 Mon Sep 17 00:00:00 2001 From: jiacai2050 Date: Mon, 6 Nov 2023 21:10:30 +0800 Subject: [PATCH 3/5] fix: use flag in preflush to indicate whether reorder is required --- .../src/instance/flush_compaction.rs | 24 ++++++++++++++--- .../src/instance/reorder_memtable.rs | 4 +-- .../env/local/ddl/sampling-primary-key.result | 26 ++++++++++++++++--- .../env/local/ddl/sampling-primary-key.sql | 8 ++++-- 4 files changed, 50 insertions(+), 12 deletions(-) diff --git a/analytic_engine/src/instance/flush_compaction.rs b/analytic_engine/src/instance/flush_compaction.rs index f8911aaecf..1b24166932 100644 --- a/analytic_engine/src/instance/flush_compaction.rs +++ b/analytic_engine/src/instance/flush_compaction.rs @@ -196,6 +196,10 @@ pub struct TableFlushRequest { pub table_data: TableDataRef, /// Max sequence number to flush (inclusive). pub max_sequence: SequenceNumber, + + /// We may suggest new primary keys in preflush. if suggest happens, we need + /// to ensure data is in new order. + need_reorder: bool, } #[derive(Clone)] @@ -287,7 +291,7 @@ impl FlushTask { // Start flush duration timer. let local_metrics = self.table_data.metrics.local_flush_metrics(); let _timer = local_metrics.start_flush_timer(); - self.dump_memtables(request_id, &mems_to_flush) + self.dump_memtables(request_id, &mems_to_flush, flush_req.need_reorder) .await .box_err() .context(FlushJobWithCause { @@ -316,6 +320,7 @@ impl FlushTask { let mut last_sequence = table_data.last_sequence(); // Switch (freeze) all mutable memtables. And update segment duration if // suggestion is returned. + let mut need_reorder = false; if let Some(suggest_segment_duration) = current_version.suggest_duration() { info!( "Update segment duration, table:{}, table_id:{}, segment_duration:{:?}", @@ -324,6 +329,7 @@ impl FlushTask { assert!(!suggest_segment_duration.is_zero()); if let Some(pk_idx) = current_version.suggest_primary_key() { + need_reorder = true; let mut schema = table_data.schema(); info!( "Update primary key, table:{}, table_id:{}, old:{:?}, new:{:?}", @@ -388,6 +394,7 @@ impl FlushTask { Ok(TableFlushRequest { table_data: table_data.clone(), max_sequence: last_sequence, + need_reorder, }) } @@ -401,6 +408,7 @@ impl FlushTask { &self, request_id: RequestId, mems_to_flush: &FlushableMemTables, + need_reorder: bool, ) -> Result<()> { let local_metrics = self.table_data.metrics.local_flush_metrics(); let mut files_to_level0 = Vec::with_capacity(mems_to_flush.memtables.len()); @@ -410,7 +418,12 @@ impl FlushTask { // process sampling memtable and frozen memtable if let Some(sampling_mem) = &mems_to_flush.sampling_mem { if let Some(seq) = self - .dump_sampling_memtable(request_id, sampling_mem, &mut files_to_level0) + .dump_sampling_memtable( + request_id, + sampling_mem, + &mut files_to_level0, + need_reorder, + ) .await? { flushed_sequence = seq; @@ -500,6 +513,7 @@ impl FlushTask { request_id: RequestId, sampling_mem: &SamplingMemTable, files_to_level0: &mut Vec, + need_reorder: bool, ) -> Result> { let (min_key, max_key) = match (sampling_mem.mem.min_key(), sampling_mem.mem.max_key()) { (Some(min_key), Some(max_key)) => (min_key, max_key), @@ -589,11 +603,13 @@ impl FlushTask { let iter = build_mem_table_iter(sampling_mem.mem.clone(), &self.table_data)?; let timestamp_idx = self.table_data.schema().timestamp_index(); - if let Some(pk_idx) = self.table_data.current_version().suggest_primary_key() { + if need_reorder { + let schema = self.table_data.schema(); + let primary_key_indexes = schema.primary_key_indexes(); let reorder = Reorder { iter, schema: self.table_data.schema(), - order_by_col_indexes: pk_idx, + order_by_col_indexes: primary_key_indexes.to_vec(), }; let mut stream = reorder.into_stream().await.context(ReorderMemIter)?; while let Some(data) = stream.next().await { diff --git a/analytic_engine/src/instance/reorder_memtable.rs b/analytic_engine/src/instance/reorder_memtable.rs index b29ecac02b..2e0901bbaa 100644 --- a/analytic_engine/src/instance/reorder_memtable.rs +++ b/analytic_engine/src/instance/reorder_memtable.rs @@ -39,7 +39,7 @@ use datafusion::{ execute_stream, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream as DfRecordBatchStream, SendableRecordBatchStream, Statistics, }, - prelude::{col, Expr, SessionConfig, SessionContext}, + prelude::{ident, Expr, SessionConfig, SessionContext}, sql::TableReference, }; use futures::{Stream, StreamExt}; @@ -241,7 +241,7 @@ impl Reorder { let columns = schema.columns(); let sort_exprs = sort_by_col_idx .iter() - .map(|i| col(&columns[*i].name).sort(true, true)) + .map(|i| ident(&columns[*i].name).sort(true, true)) .collect::>(); let df_plan = LogicalPlanBuilder::scan(DUMMY_TABLE_NAME, source, None)? .sort(sort_exprs)? diff --git a/integration_tests/cases/env/local/ddl/sampling-primary-key.result b/integration_tests/cases/env/local/ddl/sampling-primary-key.result index 27e8da08c7..0324b1fc1b 100644 --- a/integration_tests/cases/env/local/ddl/sampling-primary-key.result +++ b/integration_tests/cases/env/local/ddl/sampling-primary-key.result @@ -8,7 +8,7 @@ CREATE TABLE `sampling_primary_key_table` ( v3 double, v5 double, name string TAG, - value int64 NOT NULL, + myVALUE int64 NOT NULL, t timestamp NOT NULL, timestamp KEY (t)) ENGINE = Analytic WITH ( update_mode='append', @@ -20,10 +20,10 @@ affected_rows: 0 show create table `sampling_primary_key_table`; Table,Create Table, -String("sampling_primary_key_table"),String("CREATE TABLE `sampling_primary_key_table` (`tsid` uint64 NOT NULL, `t` timestamp NOT NULL, `v1` double, `v2` double, `v3` double, `v5` double, `name` string TAG, `value` bigint NOT NULL, PRIMARY KEY(tsid,t), TIMESTAMP KEY(t)) ENGINE=Analytic WITH(arena_block_size='2097152', compaction_strategy='default', compression='ZSTD', enable_ttl='false', memtable_type='skiplist', num_rows_per_row_group='8192', segment_duration='', storage_format='AUTO', ttl='7d', update_mode='APPEND', write_buffer_size='33554432')"), +String("sampling_primary_key_table"),String("CREATE TABLE `sampling_primary_key_table` (`tsid` uint64 NOT NULL, `t` timestamp NOT NULL, `v1` double, `v2` double, `v3` double, `v5` double, `name` string TAG, `myVALUE` bigint NOT NULL, PRIMARY KEY(tsid,t), TIMESTAMP KEY(t)) ENGINE=Analytic WITH(arena_block_size='2097152', compaction_strategy='default', compression='ZSTD', enable_ttl='false', memtable_type='skiplist', num_rows_per_row_group='8192', segment_duration='', storage_format='AUTO', ttl='7d', update_mode='APPEND', write_buffer_size='33554432')"), -INSERT INTO `sampling_primary_key_table` (t, name, value) +INSERT INTO `sampling_primary_key_table` (t, name, myVALUE) VALUES (1695348000000, "ceresdb2", 200), (1695348000005, "ceresdb2", 100), @@ -32,12 +32,30 @@ INSERT INTO `sampling_primary_key_table` (t, name, value) affected_rows: 4 +select * from `sampling_primary_key_table`; + +tsid,t,v1,v2,v3,v5,name,myVALUE, +UInt64(5478297384049724685),Timestamp(1695348000000),Double(0.0),Double(0.0),Double(0.0),Double(0.0),String("ceresdb2"),Int64(200), +UInt64(5478297384049724685),Timestamp(1695348000005),Double(0.0),Double(0.0),Double(0.0),Double(0.0),String("ceresdb2"),Int64(100), +UInt64(9680600349107584624),Timestamp(1695348000001),Double(0.0),Double(0.0),Double(0.0),Double(0.0),String("ceresdb1"),Int64(100), +UInt64(13753293625875895842),Timestamp(1695348000003),Double(0.0),Double(0.0),Double(0.0),Double(0.0),String("ceresdb3"),Int64(200), + + -- After flush, its primary key should changed. -- SQLNESS ARG pre_cmd=flush show create table `sampling_primary_key_table`; Table,Create Table, -String("sampling_primary_key_table"),String("CREATE TABLE `sampling_primary_key_table` (`tsid` uint64 NOT NULL, `t` timestamp NOT NULL, `v1` double, `v2` double, `v3` double, `v5` double, `name` string TAG, `value` bigint NOT NULL, PRIMARY KEY(value,name,tsid,t), TIMESTAMP KEY(t)) ENGINE=Analytic WITH(arena_block_size='2097152', compaction_strategy='default', compression='ZSTD', enable_ttl='false', memtable_type='skiplist', num_rows_per_row_group='8192', segment_duration='2h', storage_format='AUTO', ttl='7d', update_mode='APPEND', write_buffer_size='33554432')"), +String("sampling_primary_key_table"),String("CREATE TABLE `sampling_primary_key_table` (`tsid` uint64 NOT NULL, `t` timestamp NOT NULL, `v1` double, `v2` double, `v3` double, `v5` double, `name` string TAG, `myVALUE` bigint NOT NULL, PRIMARY KEY(myVALUE,name,tsid,t), TIMESTAMP KEY(t)) ENGINE=Analytic WITH(arena_block_size='2097152', compaction_strategy='default', compression='ZSTD', enable_ttl='false', memtable_type='skiplist', num_rows_per_row_group='8192', segment_duration='2h', storage_format='AUTO', ttl='7d', update_mode='APPEND', write_buffer_size='33554432')"), + + +select * from `sampling_primary_key_table`; + +tsid,t,v1,v2,v3,v5,name,myVALUE, +UInt64(9680600349107584624),Timestamp(1695348000001),Double(0.0),Double(0.0),Double(0.0),Double(0.0),String("ceresdb1"),Int64(100), +UInt64(5478297384049724685),Timestamp(1695348000005),Double(0.0),Double(0.0),Double(0.0),Double(0.0),String("ceresdb2"),Int64(100), +UInt64(5478297384049724685),Timestamp(1695348000000),Double(0.0),Double(0.0),Double(0.0),Double(0.0),String("ceresdb2"),Int64(200), +UInt64(13753293625875895842),Timestamp(1695348000003),Double(0.0),Double(0.0),Double(0.0),Double(0.0),String("ceresdb3"),Int64(200), DROP TABLE IF EXISTS `sampling_primary_key_table`; diff --git a/integration_tests/cases/env/local/ddl/sampling-primary-key.sql b/integration_tests/cases/env/local/ddl/sampling-primary-key.sql index 2536e721b1..4c1612ee67 100644 --- a/integration_tests/cases/env/local/ddl/sampling-primary-key.sql +++ b/integration_tests/cases/env/local/ddl/sampling-primary-key.sql @@ -7,7 +7,7 @@ CREATE TABLE `sampling_primary_key_table` ( v3 double, v5 double, name string TAG, - value int64 NOT NULL, + myVALUE int64 NOT NULL, t timestamp NOT NULL, timestamp KEY (t)) ENGINE = Analytic WITH ( update_mode='append', @@ -16,15 +16,19 @@ CREATE TABLE `sampling_primary_key_table` ( show create table `sampling_primary_key_table`; -INSERT INTO `sampling_primary_key_table` (t, name, value) +INSERT INTO `sampling_primary_key_table` (t, name, myVALUE) VALUES (1695348000000, "ceresdb2", 200), (1695348000005, "ceresdb2", 100), (1695348000001, "ceresdb1", 100), (1695348000003, "ceresdb3", 200); +select * from `sampling_primary_key_table`; + -- After flush, its primary key should changed. -- SQLNESS ARG pre_cmd=flush show create table `sampling_primary_key_table`; +select * from `sampling_primary_key_table`; + DROP TABLE IF EXISTS `sampling_primary_key_table`; From e5c773105ff185e5d924e9f2090ce6c8c296d289 Mon Sep 17 00:00:00 2001 From: jiacai2050 Date: Mon, 6 Nov 2023 22:11:36 +0800 Subject: [PATCH 4/5] do not sample timestamp --- analytic_engine/src/sampler.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/analytic_engine/src/sampler.rs b/analytic_engine/src/sampler.rs index 0e5445169c..86d85e56fc 100644 --- a/analytic_engine/src/sampler.rs +++ b/analytic_engine/src/sampler.rs @@ -292,7 +292,7 @@ impl PrimaryKeySampler { .iter() .enumerate() .map(|(idx, col)| { - if idx == timestamp_index { + if col.data_type.is_timestamp() { return None; } From 3dc2d221b2915a8a208ebdd6785d9d9e0c7b75d6 Mon Sep 17 00:00:00 2001 From: jiacai2050 Date: Wed, 8 Nov 2023 10:35:01 +0800 Subject: [PATCH 5/5] fix CR --- analytic_engine/src/instance/flush_compaction.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/analytic_engine/src/instance/flush_compaction.rs b/analytic_engine/src/instance/flush_compaction.rs index 1b24166932..fa66d78953 100644 --- a/analytic_engine/src/instance/flush_compaction.rs +++ b/analytic_engine/src/instance/flush_compaction.rs @@ -197,8 +197,8 @@ pub struct TableFlushRequest { /// Max sequence number to flush (inclusive). pub max_sequence: SequenceNumber, - /// We may suggest new primary keys in preflush. if suggest happens, we need - /// to ensure data is in new order. + /// We may suggest new primary keys in preflush. if suggestion happened, we + /// need to ensure data is in new order. need_reorder: bool, }