diff --git a/analytic_engine/src/instance/flush_compaction.rs b/analytic_engine/src/instance/flush_compaction.rs index 7094bb35ab..880eb10ca9 100644 --- a/analytic_engine/src/instance/flush_compaction.rs +++ b/analytic_engine/src/instance/flush_compaction.rs @@ -22,8 +22,8 @@ use std::{ }; use common_types::{ - projected_schema::ProjectedSchema, - record_batch::{RecordBatchWithKey, RecordBatchWithKeyBuilder}, + projected_schema::{ProjectedSchema, RowProjectorBuilder}, + record_batch::{FetchedRecordBatch, FetchedRecordBatchBuilder}, request_id::RequestId, row::RowViewOnBatch, time::TimeRange, @@ -46,8 +46,8 @@ use wal::manager::WalLocation; use crate::{ compaction::{CompactionInputFiles, CompactionTask, ExpiredFiles}, instance::{ - self, create_sst_read_option, reorder_memtable::Reorder, - serial_executor::TableFlushScheduler, ScanType, SpaceStore, SpaceStoreRef, + self, reorder_memtable::Reorder, serial_executor::TableFlushScheduler, ScanType, + SpaceStore, SpaceStoreRef, SstReadOptionsBuilder, }, manifest::meta_edit::{ AlterOptionsMeta, AlterSchemaMeta, MetaEdit, MetaEditRequest, MetaUpdate, VersionEditMeta, @@ -593,7 +593,7 @@ impl FlushTask { for time_range in &time_ranges { let (batch_record_sender, batch_record_receiver) = - channel::>(DEFAULT_CHANNEL_SIZE); + channel::>(DEFAULT_CHANNEL_SIZE); let file_id = self .table_data .alloc_file_id(&self.space_store.manifest) @@ -933,20 +933,26 @@ impl SpaceStore { let table_options = table_data.table_options(); let projected_schema = ProjectedSchema::no_projection(schema.clone()); let predicate = Arc::new(Predicate::empty()); - let sst_read_options = create_sst_read_option( + let maybe_table_level_metrics = table_data + .metrics + .maybe_table_level_metrics() + .sst_metrics + .clone(); + let sst_read_options_builder = SstReadOptionsBuilder::new( ScanType::Compaction, scan_options, - table_data - .metrics - .maybe_table_level_metrics() - .sst_metrics - .clone(), + maybe_table_level_metrics, table_options.num_rows_per_row_group, - projected_schema.clone(), predicate, self.meta_cache.clone(), runtime, ); + let fetched_schema = projected_schema.to_record_schema_with_key(); + let primary_key_indexes = fetched_schema.primary_key_idx().to_vec(); + let fetched_schema = fetched_schema.into_record_schema(); + let table_schema = projected_schema.table_schema().clone(); + let row_projector_builder = + RowProjectorBuilder::new(fetched_schema, table_schema, Some(primary_key_indexes)); let iter_options = IterOptions { batch_size: table_options.num_rows_per_row_group, @@ -966,8 +972,8 @@ impl SpaceStore { sequence, projected_schema, predicate: Arc::new(Predicate::empty()), + sst_read_options_builder: sst_read_options_builder.clone(), sst_factory: &self.sst_factory, - sst_read_options: sst_read_options.clone(), store_picker: self.store_picker(), merge_iter_options: iter_options.clone(), need_dedup: table_options.need_dedup(), @@ -992,6 +998,8 @@ impl SpaceStore { row_iter::record_batch_with_key_iter_to_stream(merge_iter) }; + // TODO: eliminate the duplicated building of `SstReadOptions`. + let sst_read_options = sst_read_options_builder.build(row_projector_builder); let (sst_meta, column_stats) = { let meta_reader = SstMetaReader { space_id: table_data.space_id, @@ -1157,12 +1165,17 @@ fn collect_column_stats_from_meta_datas(metas: &[SstMetaData]) -> HashMap Result> { - let mut builders: Vec = (0..time_ranges.len()) - .map(|_| RecordBatchWithKeyBuilder::new(record_batch.schema_with_key().clone())) +) -> Result> { + let fetched_schema = record_batch.schema(); + let primary_key_indexes = record_batch.primary_key_indexes(); + let mut builders: Vec = (0..time_ranges.len()) + .map(|_| { + let primary_key_indexes = primary_key_indexes.map(|idxs| idxs.to_vec()); + FetchedRecordBatchBuilder::new(fetched_schema.clone(), primary_key_indexes) + }) .collect(); for row_idx in 0..record_batch.num_rows() { @@ -1203,11 +1216,18 @@ fn build_mem_table_iter( table_data: &TableDataRef, ) -> Result { let scan_ctx = ScanContext::default(); + let projected_schema = ProjectedSchema::no_projection(table_data.schema()); + let fetched_schema = projected_schema.to_record_schema_with_key(); + let primary_key_indexes = fetched_schema.primary_key_idx().to_vec(); + let fetched_schema = fetched_schema.into_record_schema(); + let table_schema = projected_schema.table_schema().clone(); + let row_projector_builder = + RowProjectorBuilder::new(fetched_schema, table_schema, Some(primary_key_indexes)); let scan_req = ScanRequest { start_user_key: Bound::Unbounded, end_user_key: Bound::Unbounded, sequence: common_types::MAX_SEQUENCE_NUMBER, - projected_schema: ProjectedSchema::no_projection(table_data.schema()), + row_projector_builder, need_dedup: table_data.dedup(), reverse: false, metrics_collector: None, @@ -1226,7 +1246,7 @@ mod tests { use common_types::{ schema::Schema, tests::{ - build_record_batch_with_key_by_rows, build_row, build_row_opt, build_schema, + build_fetched_record_batch_by_rows, build_row, build_row_opt, build_schema, check_record_batch_with_key_with_rows, }, time::TimeRange, @@ -1275,7 +1295,7 @@ mod tests { .into_iter() .flatten() .collect(); - let record_batch_with_key = build_record_batch_with_key_by_rows(rows); + let record_batch_with_key = build_fetched_record_batch_by_rows(rows); let column_num = record_batch_with_key.num_columns(); let time_ranges = vec![ TimeRange::new_unchecked_for_test(0, 100), diff --git a/analytic_engine/src/instance/mod.rs b/analytic_engine/src/instance/mod.rs index 031f867ef5..ab8df1ef9b 100644 --- a/analytic_engine/src/instance/mod.rs +++ b/analytic_engine/src/instance/mod.rs @@ -33,7 +33,7 @@ pub(crate) mod write; use std::sync::Arc; -use common_types::{projected_schema::ProjectedSchema, table::TableId}; +use common_types::{projected_schema::RowProjectorBuilder, table::TableId}; use generic_error::{BoxError, GenericError}; use logger::{error, info}; use macros::define_result; @@ -327,32 +327,55 @@ impl Instance { } } -// TODO: make it a builder -#[allow(clippy::too_many_arguments)] -fn create_sst_read_option( +#[derive(Debug, Clone)] +pub struct SstReadOptionsBuilder { scan_type: ScanType, scan_options: ScanOptions, maybe_table_level_metrics: Arc, num_rows_per_row_group: usize, - projected_schema: ProjectedSchema, predicate: PredicateRef, meta_cache: Option, runtime: Arc, -) -> SstReadOptions { - SstReadOptions { - maybe_table_level_metrics, - num_rows_per_row_group, - frequency: scan_type.into(), - projected_schema, - predicate, - meta_cache, - scan_options, - runtime, +} + +impl SstReadOptionsBuilder { + pub fn new( + scan_type: ScanType, + scan_options: ScanOptions, + maybe_table_level_metrics: Arc, + num_rows_per_row_group: usize, + predicate: PredicateRef, + meta_cache: Option, + runtime: Arc, + ) -> Self { + Self { + scan_type, + scan_options, + maybe_table_level_metrics, + num_rows_per_row_group, + predicate, + meta_cache, + runtime, + } + } + + pub fn build(self, row_projector_builder: RowProjectorBuilder) -> SstReadOptions { + SstReadOptions { + maybe_table_level_metrics: self.maybe_table_level_metrics, + num_rows_per_row_group: self.num_rows_per_row_group, + frequency: self.scan_type.into(), + row_projector_builder, + predicate: self.predicate, + meta_cache: self.meta_cache, + scan_options: self.scan_options, + runtime: self.runtime, + } } } /// Scan type which mapped to the low level `ReadFrequency` in sst reader. -enum ScanType { +#[derive(Debug, Clone, Copy)] +pub enum ScanType { Query, Compaction, } diff --git a/analytic_engine/src/instance/read.rs b/analytic_engine/src/instance/read.rs index f769ec689d..9624f4cfbb 100644 --- a/analytic_engine/src/instance/read.rs +++ b/analytic_engine/src/instance/read.rs @@ -23,7 +23,7 @@ use std::{ use async_stream::try_stream; use common_types::{ projected_schema::ProjectedSchema, - record_batch::{RecordBatch, RecordBatchWithKey}, + record_batch::{FetchedRecordBatch, RecordBatch}, schema::RecordSchema, time::TimeRange, }; @@ -42,15 +42,14 @@ use time_ext::current_time_millis; use trace_metric::Metric; use crate::{ - instance::{create_sst_read_option, Instance, ScanType}, + instance::{Instance, ScanType, SstReadOptionsBuilder}, row_iter::{ chain, chain::{ChainConfig, ChainIterator}, dedup::DedupIterator, merge::{MergeBuilder, MergeConfig, MergeIterator}, - IterOptions, RecordBatchWithKeyIterator, + FetchedRecordBatchIterator, IterOptions, }, - sst::factory::SstReadOptions, table::{ data::TableData, version::{ReadView, TableVersion}, @@ -123,12 +122,11 @@ impl Instance { None, )); - let sst_read_options = create_sst_read_option( + let sst_read_options_builder = SstReadOptionsBuilder::new( ScanType::Query, self.scan_options.clone(), table_metrics.sst_metrics.clone(), table_options.num_rows_per_row_group, - request.projected_schema.clone(), request.predicate.clone(), self.meta_cache.clone(), self.read_runtime().clone(), @@ -136,12 +134,22 @@ impl Instance { if need_merge_sort { let merge_iters = self - .build_merge_iters(table_data, &request, &table_options, sst_read_options) + .build_merge_iters( + table_data, + &request, + &table_options, + sst_read_options_builder, + ) .await?; self.build_partitioned_streams(&request, merge_iters) } else { let chain_iters = self - .build_chain_iters(table_data, &request, &table_options, sst_read_options) + .build_chain_iters( + table_data, + &request, + &table_options, + sst_read_options_builder, + ) .await?; self.build_partitioned_streams(&request, chain_iters) } @@ -150,7 +158,7 @@ impl Instance { fn build_partitioned_streams( &self, request: &ReadRequest, - partitioned_iters: Vec, + partitioned_iters: Vec, ) -> Result { let read_parallelism = request.opts.read_parallelism; @@ -179,7 +187,7 @@ impl Instance { table_data: &TableData, request: &ReadRequest, table_options: &TableOptions, - sst_read_options: SstReadOptions, + sst_read_options_builder: SstReadOptionsBuilder, ) -> Result>> { // Current visible sequence let sequence = table_data.last_sequence(); @@ -203,7 +211,7 @@ impl Instance { projected_schema: request.projected_schema.clone(), predicate: request.predicate.clone(), sst_factory: &self.space_store.sst_factory, - sst_read_options: sst_read_options.clone(), + sst_read_options_builder: sst_read_options_builder.clone(), store_picker: self.space_store.store_picker(), merge_iter_options: iter_options.clone(), need_dedup: table_options.need_dedup(), @@ -239,7 +247,7 @@ impl Instance { table_data: &TableData, request: &ReadRequest, table_options: &TableOptions, - sst_read_options: SstReadOptions, + sst_read_options_builder: SstReadOptionsBuilder, ) -> Result> { let projected_schema = request.projected_schema.clone(); @@ -261,7 +269,7 @@ impl Instance { table_id: table_data.id, projected_schema: projected_schema.clone(), predicate: request.predicate.clone(), - sst_read_options: sst_read_options.clone(), + sst_read_options_builder: sst_read_options_builder.clone(), sst_factory: &self.space_store.sst_factory, store_picker: self.space_store.store_picker(), }; @@ -347,7 +355,7 @@ struct StreamStateOnMultiIters { projected_schema: ProjectedSchema, } -impl StreamStateOnMultiIters { +impl StreamStateOnMultiIters { fn is_exhausted(&self) -> bool { self.curr_iter_idx >= self.iters.len() } @@ -362,7 +370,7 @@ impl StreamStateOnMultiIters { async fn fetch_next_batch( &mut self, - ) -> Option> { + ) -> Option> { loop { if self.is_exhausted() { return None; @@ -379,7 +387,7 @@ impl StreamStateOnMultiIters { } fn iters_to_stream( - iters: Vec, + iters: Vec, projected_schema: ProjectedSchema, ) -> SendableRecordBatchStream { let mut state = StreamStateOnMultiIters { diff --git a/analytic_engine/src/instance/reorder_memtable.rs b/analytic_engine/src/instance/reorder_memtable.rs index be1db287dc..5a7a03de42 100644 --- a/analytic_engine/src/instance/reorder_memtable.rs +++ b/analytic_engine/src/instance/reorder_memtable.rs @@ -26,7 +26,7 @@ pub use arrow::{ }; use async_trait::async_trait; use common_types::{ - record_batch::{RecordBatchData, RecordBatchWithKey}, + record_batch::{FetchedRecordBatch, RecordBatchData}, schema::Schema, }; use datafusion::{ @@ -70,8 +70,8 @@ pub enum Error { define_result!(Error); pub type DfResult = std::result::Result; -type SendableRecordBatchWithkeyStream = - Pin> + Send>>; +type SendableFetchingRecordBatchStream = + Pin> + Send>>; impl From for Error { fn from(df_err: DataFusionError) -> Self { @@ -253,7 +253,7 @@ impl Reorder { // TODO: In theory we can construct a physical plan directly, here we choose // logical because it has a convenient builder API for use. - pub async fn into_stream(self) -> Result { + pub async fn into_stream(self) -> Result { // 1. Init datafusion context let runtime = Arc::new(RuntimeEnv::default()); let state = SessionState::with_config_rt(SessionConfig::new(), runtime); @@ -275,12 +275,16 @@ impl Reorder { // 3. Execute plan and transform stream let stream = execute_stream(physical_plan, ctx.task_ctx())?; - let schema_with_key = self.schema.to_record_schema_with_key(); + let record_schema = self.schema.to_record_schema(); let stream = stream.map(move |batch| { let batch = batch.context(FetchRecordBatch)?; let data = RecordBatchData::try_from(batch).context(ConvertRecordBatchData)?; - Ok(RecordBatchWithKey::new(schema_with_key.clone(), data)) + Ok(FetchedRecordBatch::new_from_parts( + record_schema.clone(), + None, + data, + )) }); Ok(Box::pin(stream)) diff --git a/analytic_engine/src/lib.rs b/analytic_engine/src/lib.rs index 4e951c3445..e7d7f81027 100644 --- a/analytic_engine/src/lib.rs +++ b/analytic_engine/src/lib.rs @@ -43,7 +43,11 @@ use size_ext::ReadableSize; use time_ext::ReadableDuration; use wal::config::Config as WalConfig; -pub use crate::{compaction::scheduler::SchedulerConfig, table_options::TableOptions}; +pub use crate::{ + compaction::scheduler::SchedulerConfig, + instance::{ScanType, SstReadOptionsBuilder}, + table_options::TableOptions, +}; /// Config of analytic engine #[derive(Debug, Clone, Deserialize, Serialize)] diff --git a/analytic_engine/src/memtable/columnar/iter.rs b/analytic_engine/src/memtable/columnar/iter.rs index e10739f240..57ea4e6ebd 100644 --- a/analytic_engine/src/memtable/columnar/iter.rs +++ b/analytic_engine/src/memtable/columnar/iter.rs @@ -27,8 +27,8 @@ use common_types::{ column::Column, column_schema::ColumnId, datum::Datum, - projected_schema::{ProjectedSchema, RowProjector}, - record_batch::{RecordBatchWithKey, RecordBatchWithKeyBuilder}, + projected_schema::RowProjector, + record_batch::{FetchedRecordBatch, FetchedRecordBatchBuilder}, row::Row, schema::Schema, SequenceNumber, @@ -66,8 +66,7 @@ pub struct ColumnarIterImpl + Clone + Sync + Send> /// Schema of this memtable, used to decode row memtable_schema: Schema, /// Projection of schema to read - projected_schema: ProjectedSchema, - projector: RowProjector, + row_projector: RowProjector, // Options related: batch_size: usize, @@ -101,17 +100,16 @@ impl + Clone + Sync + Send> ColumnarIterImpl { last_sequence: SequenceNumber, skiplist: Skiplist, ) -> Result { - let projector = request - .projected_schema - .try_project_with_key(&schema) + let row_projector = request + .row_projector_builder + .build(&schema) .context(ProjectSchema)?; let mut columnar_iter = Self { memtable, row_num, current_idx: 0, memtable_schema: schema, - projected_schema: request.projected_schema, - projector, + row_projector, batch_size: ctx.batch_size, deadline: ctx.deadline, start_user_key: request.start_user_key, @@ -190,7 +188,7 @@ impl + Clone + Sync + Send> ColumnarIterImpl { } /// Fetch next record batch - fn fetch_next_record_batch(&mut self) -> Result> { + fn fetch_next_record_batch(&mut self) -> Result> { debug_assert_eq!(State::Initialized, self.state); assert!(self.batch_size > 0); let rows = if !self.need_dedup { @@ -207,8 +205,14 @@ impl + Clone + Sync + Send> ColumnarIterImpl { } } - let mut builder = RecordBatchWithKeyBuilder::with_capacity( - self.projected_schema.to_record_schema_with_key(), + let fetched_schema = self.row_projector.fetched_schema().clone(); + let primary_key_indexes = self + .row_projector + .primary_key_indexes() + .map(|idxs| idxs.to_vec()); + let mut builder = FetchedRecordBatchBuilder::with_capacity( + fetched_schema, + primary_key_indexes, self.batch_size, ); for row in rows.into_iter() { @@ -308,7 +312,12 @@ impl + Clone + Sync + Send> ColumnarIterImpl { Row::from_datums(vec![Datum::Null; self.memtable_schema.num_columns()]); self.batch_size ]; - for (col_idx, column_schema_idx) in self.projector.source_projection().iter().enumerate() { + for (col_idx, column_schema_idx) in self + .row_projector + .fetched_source_column_indexes() + .iter() + .enumerate() + { if let Some(column_schema_idx) = column_schema_idx { let column_schema = self.memtable_schema.column(*column_schema_idx); if let Some(column) = memtable.get(&column_schema.id) { @@ -328,11 +337,16 @@ impl + Clone + Sync + Send> ColumnarIterImpl { let mut num_rows = 0; let memtable = self.memtable.read().unwrap(); - let record_schema = self.projected_schema.to_record_schema(); + let record_schema = self.row_projector.fetched_schema(); let mut rows = vec![Row::from_datums(vec![Datum::Null; record_schema.num_columns()]); self.batch_size]; - for (col_idx, column_schema_idx) in self.projector.source_projection().iter().enumerate() { + for (col_idx, column_schema_idx) in self + .row_projector + .fetched_source_column_indexes() + .iter() + .enumerate() + { if let Some(column_schema_idx) = column_schema_idx { let column_schema = self.memtable_schema.column(*column_schema_idx); if let Some(column) = memtable.get(&column_schema.id) { @@ -378,7 +392,7 @@ impl + Clone + Sync + Send> ColumnarIterImpl { } impl Iterator for ColumnarIterImpl { - type Item = Result; + type Item = Result; fn next(&mut self) -> Option { if self.state != State::Initialized { diff --git a/analytic_engine/src/memtable/mod.rs b/analytic_engine/src/memtable/mod.rs index 28d0eeb8b3..ed3b20d348 100644 --- a/analytic_engine/src/memtable/mod.rs +++ b/analytic_engine/src/memtable/mod.rs @@ -24,8 +24,8 @@ use std::{ops::Bound, sync::Arc, time::Instant}; use bytes_ext::{ByteVec, Bytes}; use common_types::{ - projected_schema::ProjectedSchema, - record_batch::RecordBatchWithKey, + projected_schema::RowProjectorBuilder, + record_batch::FetchedRecordBatch, row::Row, schema::{IndexInWriterSchema, Schema}, time::TimeRange, @@ -203,7 +203,7 @@ pub struct ScanRequest { /// visible. pub sequence: SequenceNumber, /// Schema and projection to read. - pub projected_schema: ProjectedSchema, + pub row_projector_builder: RowProjectorBuilder, pub need_dedup: bool, pub reverse: bool, /// Collector for scan metrics. @@ -291,4 +291,4 @@ pub struct Metrics { pub type MemTableRef = Arc; /// A pointer to columnar iterator -pub type ColumnarIterPtr = Box> + Send + Sync>; +pub type ColumnarIterPtr = Box> + Send + Sync>; diff --git a/analytic_engine/src/memtable/reversed_iter.rs b/analytic_engine/src/memtable/reversed_iter.rs index 475eb704f3..5a9d5d75d3 100644 --- a/analytic_engine/src/memtable/reversed_iter.rs +++ b/analytic_engine/src/memtable/reversed_iter.rs @@ -14,7 +14,7 @@ use std::iter::Rev; -use common_types::record_batch::RecordBatchWithKey; +use common_types::record_batch::FetchedRecordBatch; use generic_error::BoxError; use snafu::ResultExt; @@ -26,13 +26,13 @@ use crate::memtable::{IterReverse, Result}; // reverse order naturally. pub struct ReversedColumnarIterator { iter: I, - reversed_iter: Option>>>, + reversed_iter: Option>>>, num_record_batch: usize, } impl ReversedColumnarIterator where - I: Iterator>, + I: Iterator>, { pub fn new(iter: I, num_rows: usize, batch_size: usize) -> Self { Self { @@ -57,9 +57,9 @@ where impl Iterator for ReversedColumnarIterator where - I: Iterator>, + I: Iterator>, { - type Item = Result; + type Item = Result; fn next(&mut self) -> Option { self.init_if_necessary(); diff --git a/analytic_engine/src/memtable/skiplist/iter.rs b/analytic_engine/src/memtable/skiplist/iter.rs index b101746096..60dd18ca0b 100644 --- a/analytic_engine/src/memtable/skiplist/iter.rs +++ b/analytic_engine/src/memtable/skiplist/iter.rs @@ -20,8 +20,8 @@ use arena::{Arena, BasicStats}; use bytes_ext::{Bytes, BytesMut}; use codec::row; use common_types::{ - projected_schema::{ProjectedSchema, RowProjector}, - record_batch::{RecordBatchWithKey, RecordBatchWithKeyBuilder}, + projected_schema::RowProjector, + record_batch::{FetchedRecordBatch, FetchedRecordBatchBuilder}, row::contiguous::{ContiguousRowReader, ProjectedContiguousRow}, schema::Schema, SequenceNumber, @@ -57,8 +57,7 @@ pub struct ColumnarIterImpl + Clone + Sync + Send> /// Schema of this memtable, used to decode row memtable_schema: Schema, /// Projection of schema to read - projected_schema: ProjectedSchema, - projector: RowProjector, + row_projector: RowProjector, // Options related: batch_size: usize, @@ -86,17 +85,16 @@ impl + Clone + Sync + Send> ColumnarIterImpl { request: ScanRequest, ) -> Result { // Create projection for the memtable schema - let projector = request - .projected_schema - .try_project_with_key(&memtable.schema) + let row_projector = request + .row_projector_builder + .build(&memtable.schema) .context(ProjectSchema)?; let iter = memtable.skiplist.iter(); let mut columnar_iter = Self { iter, memtable_schema: memtable.schema.clone(), - projected_schema: request.projected_schema, - projector, + row_projector, batch_size: ctx.batch_size, deadline: ctx.deadline, start_user_key: request.start_user_key, @@ -148,12 +146,18 @@ impl + Clone + Sync + Send> ColumnarIterImpl { } /// Fetch next record batch - fn fetch_next_record_batch(&mut self) -> Result> { + fn fetch_next_record_batch(&mut self) -> Result> { debug_assert_eq!(State::Initialized, self.state); assert!(self.batch_size > 0); - let mut builder = RecordBatchWithKeyBuilder::with_capacity( - self.projected_schema.to_record_schema_with_key(), + let record_schema = self.row_projector.fetched_schema().clone(); + let primary_key_indexes = self + .row_projector + .primary_key_indexes() + .map(|idxs| idxs.to_vec()); + let mut builder = FetchedRecordBatchBuilder::with_capacity( + record_schema, + primary_key_indexes, self.batch_size, ); let mut num_rows = 0; @@ -161,7 +165,7 @@ impl + Clone + Sync + Send> ColumnarIterImpl { if let Some(row) = self.fetch_next_row()? { let row_reader = ContiguousRowReader::try_new(&row, &self.memtable_schema) .context(DecodeContinuousRow)?; - let projected_row = ProjectedContiguousRow::new(row_reader, &self.projector); + let projected_row = ProjectedContiguousRow::new(row_reader, &self.row_projector); trace!("Column iterator fetch next row, row:{:?}", projected_row); @@ -293,7 +297,7 @@ impl + Clone + Sync + Send> ColumnarIterImpl { } impl + Clone + Sync + Send> Iterator for ColumnarIterImpl { - type Item = Result; + type Item = Result; fn next(&mut self) -> Option { if self.state != State::Initialized { diff --git a/analytic_engine/src/memtable/skiplist/mod.rs b/analytic_engine/src/memtable/skiplist/mod.rs index 6298903904..a71a82a612 100644 --- a/analytic_engine/src/memtable/skiplist/mod.rs +++ b/analytic_engine/src/memtable/skiplist/mod.rs @@ -274,8 +274,8 @@ mod tests { use codec::memcomparable::MemComparable; use common_types::{ datum::Datum, - projected_schema::ProjectedSchema, - record_batch::RecordBatchWithKey, + projected_schema::{ProjectedSchema, RowProjectorBuilder}, + record_batch::FetchedRecordBatch, row::Row, schema::IndexInWriterSchema, tests::{build_row, build_schema}, @@ -294,7 +294,10 @@ mod tests { ) { let projection: Vec = (0..schema.num_columns()).collect(); let projected_schema = ProjectedSchema::new(schema, Some(projection)).unwrap(); - + let fetched_schema = projected_schema.to_record_schema(); + let table_schema = projected_schema.table_schema(); + let row_projector_builder = + RowProjectorBuilder::new(fetched_schema, table_schema.clone(), None); let testcases = vec![ ( // limited by sequence @@ -302,7 +305,7 @@ mod tests { start_user_key: Bound::Unbounded, end_user_key: Bound::Unbounded, sequence: 2, - projected_schema: projected_schema.clone(), + row_projector_builder: row_projector_builder.clone(), need_dedup: true, reverse: false, metrics_collector: None, @@ -322,7 +325,7 @@ mod tests { start_user_key: Bound::Included(build_scan_key("a", 1)), end_user_key: Bound::Excluded(build_scan_key("e", 5)), sequence: 2, - projected_schema: projected_schema.clone(), + row_projector_builder: row_projector_builder.clone(), need_dedup: true, reverse: false, metrics_collector: None, @@ -341,7 +344,7 @@ mod tests { start_user_key: Bound::Included(build_scan_key("a", 1)), end_user_key: Bound::Excluded(build_scan_key("e", 5)), sequence: 1, - projected_schema, + row_projector_builder, need_dedup: true, reverse: false, metrics_collector: None, @@ -367,13 +370,16 @@ mod tests { ) { let projection: Vec = (0..2).collect(); let projected_schema = ProjectedSchema::new(schema, Some(projection)).unwrap(); - + let fetched_schema = projected_schema.to_record_schema(); + let table_schema = projected_schema.table_schema(); + let row_projector_builder = + RowProjectorBuilder::new(fetched_schema, table_schema.clone(), None); let testcases = vec![( ScanRequest { start_user_key: Bound::Included(build_scan_key("a", 1)), end_user_key: Bound::Excluded(build_scan_key("e", 5)), sequence: 2, - projected_schema, + row_projector_builder, need_dedup: true, reverse: false, metrics_collector: None, @@ -457,7 +463,7 @@ mod tests { test_memtable_scan_for_projection(schema, memtable); } - fn check_iterator>>( + fn check_iterator>>( iter: T, expected_rows: Vec, ) { diff --git a/analytic_engine/src/row_iter/chain.rs b/analytic_engine/src/row_iter/chain.rs index 71df6e2f9c..3f8bff6bb9 100644 --- a/analytic_engine/src/row_iter/chain.rs +++ b/analytic_engine/src/row_iter/chain.rs @@ -19,7 +19,9 @@ use std::{ use async_trait::async_trait; use common_types::{ - projected_schema::ProjectedSchema, record_batch::RecordBatchWithKey, request_id::RequestId, + projected_schema::{ProjectedSchema, RowProjectorBuilder}, + record_batch::FetchedRecordBatch, + request_id::RequestId, schema::RecordSchemaWithKey, }; use generic_error::GenericError; @@ -30,13 +32,16 @@ use table_engine::{predicate::PredicateRef, table::TableId}; use trace_metric::{MetricsCollector, TraceMetricWhenDrop}; use crate::{ + instance::SstReadOptionsBuilder, row_iter::{ - record_batch_stream, record_batch_stream::BoxedPrefetchableRecordBatchStream, - RecordBatchWithKeyIterator, + record_batch_stream::{ + self, BoxedPrefetchableRecordBatchStream, MemtableStreamContext, SstStreamContext, + }, + FetchedRecordBatchIterator, }, space::SpaceId, sst::{ - factory::{FactoryRef as SstFactoryRef, ObjectStorePickerRef, SstReadOptions}, + factory::{FactoryRef as SstFactoryRef, ObjectStorePickerRef}, file::FileHandle, }, table::version::{MemTableVec, SamplingMemTable}, @@ -74,7 +79,7 @@ pub struct ChainConfig<'a> { pub predicate: PredicateRef, pub num_streams_to_prefetch: usize, - pub sst_read_options: SstReadOptions, + pub sst_read_options_builder: SstReadOptionsBuilder, /// Sst factory pub sst_factory: &'a SstFactoryRef, /// Store picker for persisting sst. @@ -119,6 +124,29 @@ impl<'a> Builder<'a> { impl<'a> Builder<'a> { pub async fn build(self) -> Result { + let fetched_schema = self.config.projected_schema.to_record_schema(); + let table_schema = self.config.projected_schema.table_schema(); + let row_projector_builder = + RowProjectorBuilder::new(fetched_schema.clone(), table_schema.clone(), None); + let sst_read_options = self + .config + .sst_read_options_builder + .build(row_projector_builder.clone()); + + let memtable_stream_ctx = MemtableStreamContext { + row_projector_builder, + fetched_schema: fetched_schema.clone(), + predicate: self.config.predicate, + need_dedup: false, + reverse: false, + deadline: self.config.deadline, + }; + + let sst_stream_ctx = SstStreamContext { + sst_read_options, + fetched_schema, + }; + let total_sst_streams: usize = self.ssts.iter().map(|v| v.len()).sum(); let mut total_streams = self.memtables.len() + total_sst_streams; if self.sampling_mem.is_some() { @@ -128,12 +156,8 @@ impl<'a> Builder<'a> { if let Some(v) = &self.sampling_mem { let stream = record_batch_stream::filtered_stream_from_memtable( - self.config.projected_schema.clone(), - false, &v.mem, - false, - self.config.predicate.as_ref(), - self.config.deadline, + &memtable_stream_ctx, self.config.metrics_collector.clone(), ) .context(BuildStreamFromMemtable)?; @@ -142,14 +166,10 @@ impl<'a> Builder<'a> { for memtable in &self.memtables { let stream = record_batch_stream::filtered_stream_from_memtable( - self.config.projected_schema.clone(), - false, // chain iterator only handle the case reading in no order so just read in asc // order by default. &memtable.mem, - false, - self.config.predicate.as_ref(), - self.config.deadline, + &memtable_stream_ctx, self.config.metrics_collector.clone(), ) .context(BuildStreamFromMemtable)?; @@ -163,8 +183,8 @@ impl<'a> Builder<'a> { self.config.table_id, sst, self.config.sst_factory, - &self.config.sst_read_options, self.config.store_picker, + &sst_stream_ctx, self.config.metrics_collector.clone(), ) .await @@ -307,7 +327,7 @@ impl ChainIterator { } } - async fn next_batch_internal(&mut self) -> Result> { + async fn next_batch_internal(&mut self) -> Result> { self.init_if_necessary(); self.maybe_prefetch().await; @@ -357,14 +377,14 @@ impl Drop for ChainIterator { } #[async_trait] -impl RecordBatchWithKeyIterator for ChainIterator { +impl FetchedRecordBatchIterator for ChainIterator { type Error = Error; fn schema(&self) -> &RecordSchemaWithKey { &self.schema } - async fn next_batch(&mut self) -> Result> { + async fn next_batch(&mut self) -> Result> { let timer = Instant::now(); let res = self.next_batch_internal().await; self.metrics.scan_duration += timer.elapsed(); diff --git a/analytic_engine/src/row_iter/dedup.rs b/analytic_engine/src/row_iter/dedup.rs index cdcffeb5a7..a35d1489f2 100644 --- a/analytic_engine/src/row_iter/dedup.rs +++ b/analytic_engine/src/row_iter/dedup.rs @@ -16,7 +16,7 @@ use std::cmp::Ordering; use async_trait::async_trait; use common_types::{ - record_batch::{RecordBatchWithKey, RecordBatchWithKeyBuilder}, + record_batch::{FetchedRecordBatch, FetchedRecordBatchBuilder}, request_id::RequestId, row::{Row, RowViewOnBatch, RowWithMeta}, schema::RecordSchemaWithKey, @@ -26,7 +26,7 @@ use logger::{info, trace}; use macros::define_result; use snafu::{ResultExt, Snafu}; -use crate::row_iter::{IterOptions, RecordBatchWithKeyIterator}; +use crate::row_iter::{FetchedRecordBatchIterator, IterOptions}; #[derive(Debug, Snafu)] pub enum Error { @@ -54,7 +54,7 @@ define_result!(Error); pub struct DedupIterator { request_id: RequestId, schema: RecordSchemaWithKey, - record_batch_builder: RecordBatchWithKeyBuilder, + record_batch_builder: FetchedRecordBatchBuilder, iter: I, /// Previous row returned. prev_row: Option, @@ -67,15 +67,19 @@ pub struct DedupIterator { total_selected_rows: usize, } -impl DedupIterator { +impl DedupIterator { pub fn new(request_id: RequestId, iter: I, iter_options: IterOptions) -> Self { - let schema = iter.schema(); - - let record_batch_builder = - RecordBatchWithKeyBuilder::with_capacity(schema.clone(), iter_options.batch_size); + let schema_with_key = iter.schema(); + let primary_key_indexes = schema_with_key.primary_key_idx().to_vec(); + let fetched_schema = schema_with_key.to_record_schema(); + let record_batch_builder = FetchedRecordBatchBuilder::with_capacity( + fetched_schema, + Some(primary_key_indexes), + iter_options.batch_size, + ); Self { request_id, - schema: schema.clone(), + schema: schema_with_key.clone(), record_batch_builder, iter, prev_row: None, @@ -85,7 +89,7 @@ impl DedupIterator { } } - fn dedup_batch(&mut self, record_batch: RecordBatchWithKey) -> Result { + fn dedup_batch(&mut self, record_batch: FetchedRecordBatch) -> Result { self.selected_rows.clear(); // Ignore all rows by default. self.selected_rows.resize(record_batch.num_rows(), false); @@ -141,9 +145,9 @@ impl DedupIterator { /// Filter batch by `selected_rows`. fn filter_batch( &mut self, - record_batch: RecordBatchWithKey, + record_batch: FetchedRecordBatch, selected_num: usize, - ) -> Result { + ) -> Result { self.total_selected_rows += selected_num; self.total_duplications += record_batch.num_rows() - selected_num; @@ -169,14 +173,14 @@ impl DedupIterator { } #[async_trait] -impl RecordBatchWithKeyIterator for DedupIterator { +impl FetchedRecordBatchIterator for DedupIterator { type Error = Error; fn schema(&self) -> &RecordSchemaWithKey { &self.schema } - async fn next_batch(&mut self) -> Result> { + async fn next_batch(&mut self) -> Result> { match self .iter .next_batch() @@ -210,7 +214,9 @@ mod tests { use common_types::tests::{build_row, build_schema}; use super::*; - use crate::row_iter::tests::{build_record_batch_with_key, check_iterator, VectorIterator}; + use crate::row_iter::tests::{ + build_fetched_record_batch_with_key, check_iterator, VectorIterator, + }; #[tokio::test] async fn test_dedup_iterator() { @@ -219,7 +225,7 @@ mod tests { let iter = VectorIterator::new( schema.to_record_schema_with_key(), vec![ - build_record_batch_with_key( + build_fetched_record_batch_with_key( schema.clone(), vec![ build_row(b"a", 1, 10.0, "v1", 1000, 1_000_000), @@ -227,7 +233,7 @@ mod tests { build_row(b"a", 2, 10.0, "v2", 2000, 2_000_000), ], ), - build_record_batch_with_key( + build_fetched_record_batch_with_key( schema, vec![ build_row(b"a", 2, 10.0, "v", 2000, 2_000_000), diff --git a/analytic_engine/src/row_iter/merge.rs b/analytic_engine/src/row_iter/merge.rs index db39f78d2f..e9029060cc 100644 --- a/analytic_engine/src/row_iter/merge.rs +++ b/analytic_engine/src/row_iter/merge.rs @@ -23,8 +23,8 @@ use std::{ use async_trait::async_trait; use common_types::{ - projected_schema::ProjectedSchema, - record_batch::{RecordBatchWithKey, RecordBatchWithKeyBuilder}, + projected_schema::{ProjectedSchema, RowProjectorBuilder}, + record_batch::{FetchedRecordBatch, FetchedRecordBatchBuilder}, request_id::RequestId, row::RowViewOnBatch, schema::RecordSchemaWithKey, @@ -39,14 +39,17 @@ use table_engine::{predicate::PredicateRef, table::TableId}; use trace_metric::{MetricsCollector, TraceMetricWhenDrop}; use crate::{ + instance::SstReadOptionsBuilder, row_iter::{ - record_batch_stream, - record_batch_stream::{BoxedPrefetchableRecordBatchStream, SequencedRecordBatch}, - IterOptions, RecordBatchWithKeyIterator, + record_batch_stream::{ + self, BoxedPrefetchableRecordBatchStream, MemtableStreamContext, SequencedRecordBatch, + SstStreamContext, + }, + FetchedRecordBatchIterator, IterOptions, }, space::SpaceId, sst::{ - factory::{FactoryRef as SstFactoryRef, ObjectStorePickerRef, SstReadOptions}, + factory::{FactoryRef as SstFactoryRef, ObjectStorePickerRef}, file::{FileHandle, Level, SST_LEVEL_NUM}, }, table::version::{MemTableVec, SamplingMemTable}, @@ -108,7 +111,7 @@ pub struct MergeConfig<'a> { /// The predicate of the query. pub predicate: PredicateRef, - pub sst_read_options: SstReadOptions, + pub sst_read_options_builder: SstReadOptionsBuilder, /// Sst factory pub sst_factory: &'a SstFactoryRef, /// Store picker for persisting sst. @@ -129,8 +132,10 @@ pub struct MergeBuilder<'a> { /// Sampling memtable to read. sampling_mem: Option, + /// MemTables to read. memtables: MemTableVec, + /// Ssts to read of each level. ssts: Vec>, } @@ -170,6 +175,34 @@ impl<'a> MergeBuilder<'a> { } pub async fn build(self) -> Result { + let fetched_schema = self.config.projected_schema.to_record_schema_with_key(); + let primary_key_indexes = fetched_schema.primary_key_idx().to_vec(); + let fetched_schema = fetched_schema.into_record_schema(); + let table_schema = self.config.projected_schema.table_schema(); + let row_projector_builder = RowProjectorBuilder::new( + fetched_schema.clone(), + table_schema.clone(), + Some(primary_key_indexes), + ); + let sst_read_options = self + .config + .sst_read_options_builder + .build(row_projector_builder.clone()); + + let memtable_stream_ctx = MemtableStreamContext { + row_projector_builder, + fetched_schema: fetched_schema.clone(), + predicate: self.config.predicate, + need_dedup: self.config.need_dedup, + reverse: self.config.reverse, + deadline: self.config.deadline, + }; + + let sst_stream_ctx = SstStreamContext { + sst_read_options, + fetched_schema, + }; + let sst_streams_num: usize = self .ssts .iter() @@ -192,12 +225,8 @@ impl<'a> MergeBuilder<'a> { if let Some(v) = &self.sampling_mem { let stream = record_batch_stream::filtered_stream_from_memtable( - self.config.projected_schema.clone(), - self.config.need_dedup, &v.mem, - self.config.reverse, - self.config.predicate.as_ref(), - self.config.deadline, + &memtable_stream_ctx, self.config.metrics_collector.clone(), ) .context(BuildStreamFromMemtable)?; @@ -206,12 +235,8 @@ impl<'a> MergeBuilder<'a> { for memtable in &self.memtables { let stream = record_batch_stream::filtered_stream_from_memtable( - self.config.projected_schema.clone(), - self.config.need_dedup, &memtable.mem, - self.config.reverse, - self.config.predicate.as_ref(), - self.config.deadline, + &memtable_stream_ctx, self.config.metrics_collector.clone(), ) .context(BuildStreamFromMemtable)?; @@ -226,8 +251,8 @@ impl<'a> MergeBuilder<'a> { self.config.table_id, f, self.config.sst_factory, - &self.config.sst_read_options, self.config.store_picker, + &sst_stream_ctx, self.config.metrics_collector.clone(), ) .await @@ -324,7 +349,7 @@ impl BufferedStreamState { /// Returns number of rows added. fn append_rows_to( &mut self, - builder: &mut RecordBatchWithKeyBuilder, + builder: &mut FetchedRecordBatchBuilder, len: usize, ) -> Result { let added = builder @@ -336,7 +361,7 @@ impl BufferedStreamState { /// Take record batch slice with at most `len` rows from cursor and advance /// the cursor. - fn take_record_batch_slice(&mut self, len: usize) -> RecordBatchWithKey { + fn take_record_batch_slice(&mut self, len: usize) -> FetchedRecordBatch { let len_to_fetch = cmp::min( self.buffered_record_batch.record_batch.num_rows() - self.cursor, len, @@ -403,14 +428,14 @@ impl BufferedStream { /// REQUIRE: the buffer is not exhausted. fn append_rows_to( &mut self, - builder: &mut RecordBatchWithKeyBuilder, + builder: &mut FetchedRecordBatchBuilder, len: usize, ) -> Result { self.state.as_mut().unwrap().append_rows_to(builder, len) } /// REQUIRE: the buffer is not exhausted. - fn take_record_batch_slice(&mut self, len: usize) -> RecordBatchWithKey { + fn take_record_batch_slice(&mut self, len: usize) -> FetchedRecordBatch { self.state.as_mut().unwrap().take_record_batch_slice(len) } @@ -634,7 +659,7 @@ pub struct MergeIterator { request_id: RequestId, inited: bool, schema: RecordSchemaWithKey, - record_batch_builder: RecordBatchWithKeyBuilder, + record_batch_builder: FetchedRecordBatchBuilder, origin_streams: Vec, /// ssts are kept here to avoid them from being purged. #[allow(dead_code)] @@ -661,8 +686,14 @@ impl MergeIterator { metrics: Metrics, ) -> Self { let heap_cap = streams.len(); - let record_batch_builder = - RecordBatchWithKeyBuilder::with_capacity(schema.clone(), iter_options.batch_size); + let primary_key_indexes = schema.primary_key_idx().to_vec(); + let fetched_schema = schema.to_record_schema(); + let record_batch_builder = FetchedRecordBatchBuilder::with_capacity( + fetched_schema, + Some(primary_key_indexes), + iter_options.batch_size, + ); + Self { table_id, request_id, @@ -790,7 +821,7 @@ impl MergeIterator { async fn fetch_rows_from_one_stream( &mut self, num_rows_to_fetch: usize, - ) -> Result> { + ) -> Result> { assert_eq!(self.hot.len(), 1); self.metrics.times_fetch_rows_from_one += 1; @@ -834,7 +865,7 @@ impl MergeIterator { /// Fetch the next batch from the streams. /// /// `init_if_necessary` should be finished before this method. - async fn fetch_next_batch(&mut self) -> Result> { + async fn fetch_next_batch(&mut self) -> Result> { self.init_if_necessary().await?; self.record_batch_builder.clear(); @@ -869,14 +900,14 @@ impl MergeIterator { } #[async_trait] -impl RecordBatchWithKeyIterator for MergeIterator { +impl FetchedRecordBatchIterator for MergeIterator { type Error = Error; fn schema(&self) -> &RecordSchemaWithKey { &self.schema } - async fn next_batch(&mut self) -> Result> { + async fn next_batch(&mut self) -> Result> { let record_batch = self.fetch_next_batch().await?; trace!("MergeIterator send next record batch:{:?}", record_batch); diff --git a/analytic_engine/src/row_iter/mod.rs b/analytic_engine/src/row_iter/mod.rs index a2e28dc24b..f3c5ac4d35 100644 --- a/analytic_engine/src/row_iter/mod.rs +++ b/analytic_engine/src/row_iter/mod.rs @@ -16,7 +16,7 @@ use async_stream::try_stream; use async_trait::async_trait; -use common_types::{record_batch::RecordBatchWithKey, schema::RecordSchemaWithKey}; +use common_types::{record_batch::FetchedRecordBatch, schema::RecordSchemaWithKey}; use generic_error::BoxError; use crate::sst::writer::RecordBatchStream; @@ -38,15 +38,15 @@ pub struct IterOptions { /// The `schema()` should be the same as the RecordBatch from `read()`. /// The reader is exhausted if the `read()` returns the `Ok(None)`. #[async_trait] -pub trait RecordBatchWithKeyIterator: Send { +pub trait FetchedRecordBatchIterator: Send { type Error: std::error::Error + Send + Sync + 'static; fn schema(&self) -> &RecordSchemaWithKey; - async fn next_batch(&mut self) -> std::result::Result, Self::Error>; + async fn next_batch(&mut self) -> std::result::Result, Self::Error>; } -pub fn record_batch_with_key_iter_to_stream( +pub fn record_batch_with_key_iter_to_stream( mut iter: I, ) -> RecordBatchStream { let record_batch_stream = try_stream! { diff --git a/analytic_engine/src/row_iter/record_batch_stream.rs b/analytic_engine/src/row_iter/record_batch_stream.rs index 0c0fe35ae5..dd0f4d132e 100644 --- a/analytic_engine/src/row_iter/record_batch_stream.rs +++ b/analytic_engine/src/row_iter/record_batch_stream.rs @@ -23,7 +23,8 @@ use arrow::{ datatypes::{DataType as ArrowDataType, SchemaRef as ArrowSchemaRef}, }; use common_types::{ - projected_schema::ProjectedSchema, record_batch::RecordBatchWithKey, SequenceNumber, + projected_schema::RowProjectorBuilder, record_batch::FetchedRecordBatch, schema::RecordSchema, + SequenceNumber, }; use datafusion::{ common::ToDFSchema, @@ -34,9 +35,13 @@ use datafusion::{ }; use futures::stream::{self, StreamExt}; use generic_error::{BoxError, GenericResult}; +use itertools::Itertools; use macros::define_result; use snafu::{Backtrace, OptionExt, ResultExt, Snafu}; -use table_engine::{predicate::Predicate, table::TableId}; +use table_engine::{ + predicate::{Predicate, PredicateRef}, + table::TableId, +}; use trace_metric::MetricsCollector; use crate::{ @@ -125,11 +130,11 @@ pub enum Error { define_result!(Error); -// TODO(yingwen): Can we move sequence to RecordBatchWithKey and remove this +// TODO(yingwen): Can we move sequence to FetchedRecordBatch and remove this // struct? But what is the sequence after merge? #[derive(Debug)] pub struct SequencedRecordBatch { - pub record_batch: RecordBatchWithKey, + pub record_batch: FetchedRecordBatch, pub sequence: SequenceNumber, } @@ -212,56 +217,44 @@ pub fn filter_stream( /// Build filtered (by `predicate`) [SequencedRecordBatchStream] from a /// memtable. pub fn filtered_stream_from_memtable( - projected_schema: ProjectedSchema, - need_dedup: bool, memtable: &MemTableRef, - reverse: bool, - predicate: &Predicate, - deadline: Option, + ctx: &MemtableStreamContext, metrics_collector: Option, ) -> Result { - stream_from_memtable( - projected_schema.clone(), - need_dedup, - memtable, - reverse, - deadline, - metrics_collector, - ) - .and_then(|origin_stream| { + stream_from_memtable(memtable, ctx, metrics_collector).and_then(|origin_stream| { filter_stream( origin_stream, - projected_schema - .as_record_schema_with_key() - .to_arrow_schema_ref(), - predicate, + ctx.fetched_schema.to_arrow_schema_ref(), + &ctx.predicate, ) }) } /// Build [SequencedRecordBatchStream] from a memtable. pub fn stream_from_memtable( - projected_schema: ProjectedSchema, - need_dedup: bool, memtable: &MemTableRef, - reverse: bool, - deadline: Option, + ctx: &MemtableStreamContext, metrics_collector: Option, ) -> Result { let scan_ctx = ScanContext { - deadline, + deadline: ctx.deadline, ..Default::default() }; let max_seq = memtable.last_sequence(); - let scan_memtable_desc = format!("scan_memtable_{max_seq}"); + let fetched_cols = ctx + .fetched_schema + .columns() + .iter() + .format_with(",", |col, f| f(&format_args!("{}", col.name))); + let scan_memtable_desc = format!("scan_memtable_{max_seq}, fetched_columns:[{fetched_cols}]",); let metrics_collector = metrics_collector.map(|v| v.span(scan_memtable_desc)); let scan_req = ScanRequest { start_user_key: Bound::Unbounded, end_user_key: Bound::Unbounded, sequence: max_seq, - projected_schema, - need_dedup, - reverse, + row_projector_builder: ctx.row_projector_builder.clone(), + need_dedup: ctx.need_dedup, + reverse: ctx.reverse, metrics_collector, }; @@ -277,6 +270,15 @@ pub fn stream_from_memtable( Ok(Box::new(NoopPrefetcher(Box::new(stream)))) } +pub struct MemtableStreamContext { + pub row_projector_builder: RowProjectorBuilder, + pub fetched_schema: RecordSchema, + pub predicate: PredicateRef, + pub need_dedup: bool, + pub reverse: bool, + pub deadline: Option, +} + /// Build the filtered by `sst_read_options.predicate` /// [SequencedRecordBatchStream] from a sst. pub async fn filtered_stream_from_sst_file( @@ -284,8 +286,8 @@ pub async fn filtered_stream_from_sst_file( table_id: TableId, sst_file: &FileHandle, sst_factory: &SstFactoryRef, - sst_read_options: &SstReadOptions, store_picker: &ObjectStorePickerRef, + ctx: &SstStreamContext, metrics_collector: Option, ) -> Result { stream_from_sst_file( @@ -293,19 +295,16 @@ pub async fn filtered_stream_from_sst_file( table_id, sst_file, sst_factory, - sst_read_options, store_picker, + ctx, metrics_collector, ) .await .and_then(|origin_stream| { filter_stream( origin_stream, - sst_read_options - .projected_schema - .as_record_schema_with_key() - .to_arrow_schema_ref(), - sst_read_options.predicate.as_ref(), + ctx.fetched_schema.to_arrow_schema_ref(), + &ctx.sst_read_options.predicate, ) }) } @@ -316,8 +315,8 @@ pub async fn stream_from_sst_file( table_id: TableId, sst_file: &FileHandle, sst_factory: &SstFactoryRef, - sst_read_options: &SstReadOptions, store_picker: &ObjectStorePickerRef, + ctx: &SstStreamContext, metrics_collector: Option, ) -> Result { sst_file.read_meter().mark(); @@ -327,12 +326,20 @@ pub async fn stream_from_sst_file( file_size: Some(sst_file.size() as usize), file_format: Some(sst_file.storage_format()), }; - let scan_sst_desc = format!("scan_sst_{}", sst_file.id()); + let fetched_cols = ctx + .fetched_schema + .columns() + .iter() + .format_with(",", |col, f| f(&format_args!("{}", col.name))); + let scan_sst_desc = format!( + "scan_sst_{}, fetched_columns:[{fetched_cols}]", + sst_file.id() + ); let metrics_collector = metrics_collector.map(|v| v.span(scan_sst_desc)); let mut sst_reader = sst_factory .create_reader( &path, - sst_read_options, + &ctx.sst_read_options, read_hint, store_picker, metrics_collector, @@ -353,6 +360,11 @@ pub async fn stream_from_sst_file( Ok(Box::new(stream)) } +pub struct SstStreamContext { + pub sst_read_options: SstReadOptions, + pub fetched_schema: RecordSchema, +} + #[cfg(test)] pub mod tests { use common_types::{row::Row, schema::Schema}; @@ -369,7 +381,7 @@ pub mod tests { .into_iter() .map(|(seq, rows)| { let batch = SequencedRecordBatch { - record_batch: row_iter::tests::build_record_batch_with_key( + record_batch: row_iter::tests::build_fetched_record_batch_with_key( schema.clone(), rows, ), diff --git a/analytic_engine/src/row_iter/tests.rs b/analytic_engine/src/row_iter/tests.rs index 3484980c29..0db3c8bd91 100644 --- a/analytic_engine/src/row_iter/tests.rs +++ b/analytic_engine/src/row_iter/tests.rs @@ -14,8 +14,8 @@ use async_trait::async_trait; use common_types::{ - projected_schema::ProjectedSchema, - record_batch::{RecordBatchWithKey, RecordBatchWithKeyBuilder}, + projected_schema::{ProjectedSchema, RowProjector}, + record_batch::{FetchedRecordBatch, FetchedRecordBatchBuilder}, row::{ contiguous::{ContiguousRowReader, ContiguousRowWriter, ProjectedContiguousRow}, Row, @@ -25,7 +25,7 @@ use common_types::{ use macros::define_result; use snafu::Snafu; -use crate::row_iter::RecordBatchWithKeyIterator; +use crate::row_iter::FetchedRecordBatchIterator; #[derive(Debug, Snafu)] pub enum Error {} @@ -34,12 +34,12 @@ define_result!(Error); pub struct VectorIterator { schema: RecordSchemaWithKey, - items: Vec>, + items: Vec>, idx: usize, } impl VectorIterator { - pub fn new(schema: RecordSchemaWithKey, items: Vec) -> Self { + pub fn new(schema: RecordSchemaWithKey, items: Vec) -> Self { Self { schema, items: items.into_iter().map(Some).collect(), @@ -49,14 +49,14 @@ impl VectorIterator { } #[async_trait] -impl RecordBatchWithKeyIterator for VectorIterator { +impl FetchedRecordBatchIterator for VectorIterator { type Error = Error; fn schema(&self) -> &RecordSchemaWithKey { &self.schema } - async fn next_batch(&mut self) -> Result> { + async fn next_batch(&mut self) -> Result> { if self.idx == self.items.len() { return Ok(None); } @@ -68,13 +68,26 @@ impl RecordBatchWithKeyIterator for VectorIterator { } } -pub fn build_record_batch_with_key(schema: Schema, rows: Vec) -> RecordBatchWithKey { +pub fn build_fetched_record_batch_with_key(schema: Schema, rows: Vec) -> FetchedRecordBatch { assert!(schema.num_columns() > 1); let projection: Vec = (0..schema.num_columns()).collect(); let projected_schema = ProjectedSchema::new(schema.clone(), Some(projection)).unwrap(); - let row_projected_schema = projected_schema.try_project_with_key(&schema).unwrap(); + let fetched_schema = projected_schema.to_record_schema_with_key(); + let primary_key_indexes = fetched_schema.primary_key_idx().to_vec(); + let fetched_schema = fetched_schema.to_record_schema(); + let table_schema = projected_schema.table_schema(); + let row_projector = RowProjector::new( + &fetched_schema, + Some(primary_key_indexes), + table_schema, + table_schema, + ) + .unwrap(); + let primary_key_indexes = row_projector + .primary_key_indexes() + .map(|idxs| idxs.to_vec()); let mut builder = - RecordBatchWithKeyBuilder::with_capacity(projected_schema.to_record_schema_with_key(), 2); + FetchedRecordBatchBuilder::with_capacity(fetched_schema, primary_key_indexes, 2); let index_in_writer = IndexInWriterSchema::for_same_schema(schema.num_columns()); let mut buf = Vec::new(); @@ -84,7 +97,7 @@ pub fn build_record_batch_with_key(schema: Schema, rows: Vec) -> RecordBatc writer.write_row(&row).unwrap(); let source_row = ContiguousRowReader::try_new(&buf, &schema).unwrap(); - let projected_row = ProjectedContiguousRow::new(source_row, &row_projected_schema); + let projected_row = ProjectedContiguousRow::new(source_row, &row_projector); builder .append_projected_contiguous_row(&projected_row) .unwrap(); @@ -92,7 +105,7 @@ pub fn build_record_batch_with_key(schema: Schema, rows: Vec) -> RecordBatc builder.build().unwrap() } -pub async fn check_iterator(iter: &mut T, expected_rows: Vec) { +pub async fn check_iterator(iter: &mut T, expected_rows: Vec) { let mut visited_rows = 0; while let Some(batch) = iter.next_batch().await.unwrap() { for row_idx in 0..batch.num_rows() { diff --git a/analytic_engine/src/sst/factory.rs b/analytic_engine/src/sst/factory.rs index 8d507c6e34..9f0c00313d 100644 --- a/analytic_engine/src/sst/factory.rs +++ b/analytic_engine/src/sst/factory.rs @@ -17,7 +17,7 @@ use std::{collections::HashMap, fmt::Debug, sync::Arc}; use async_trait::async_trait; -use common_types::projected_schema::ProjectedSchema; +use common_types::projected_schema::RowProjectorBuilder; use macros::define_result; use object_store::{ObjectStoreRef, Path}; use runtime::Runtime; @@ -140,7 +140,7 @@ pub struct SstReadOptions { pub frequency: ReadFrequency, pub num_rows_per_row_group: usize, - pub projected_schema: ProjectedSchema, + pub row_projector_builder: RowProjectorBuilder, pub predicate: PredicateRef, pub meta_cache: Option, pub scan_options: ScanOptions, diff --git a/analytic_engine/src/sst/parquet/async_reader.rs b/analytic_engine/src/sst/parquet/async_reader.rs index 687949182f..be98479619 100644 --- a/analytic_engine/src/sst/parquet/async_reader.rs +++ b/analytic_engine/src/sst/parquet/async_reader.rs @@ -26,8 +26,8 @@ use arrow::{datatypes::SchemaRef, record_batch::RecordBatch as ArrowRecordBatch} use async_trait::async_trait; use bytes_ext::Bytes; use common_types::{ - projected_schema::{ProjectedSchema, RowProjector}, - record_batch::{ArrowRecordBatchProjector, RecordBatchWithKey}, + projected_schema::{RowProjector, RowProjectorBuilder}, + record_batch::FetchedRecordBatch, }; use datafusion::{ common::ToDFSchema, @@ -77,7 +77,7 @@ use crate::{ const PRUNE_ROW_GROUPS_METRICS_COLLECTOR_NAME: &str = "prune_row_groups"; type SendableRecordBatchStream = Pin> + Send>>; -type RecordBatchWithKeyStream = Box> + Send + Unpin>; +type FetchedRecordBatchStream = Box> + Send + Unpin>; pub struct Reader<'a> { /// The path where the data is persisted. @@ -87,13 +87,14 @@ pub struct Reader<'a> { /// The hint for the sst file size. file_size_hint: Option, num_rows_per_row_group: usize, - projected_schema: ProjectedSchema, meta_cache: Option, predicate: PredicateRef, /// Current frequency decides the cache policy. frequency: ReadFrequency, /// Init those fields in `init_if_necessary` meta_data: Option, + + row_projector_builder: RowProjectorBuilder, row_projector: Option, /// Options for `read_parallelly` @@ -138,11 +139,11 @@ impl<'a> Reader<'a> { store, file_size_hint, num_rows_per_row_group: options.num_rows_per_row_group, - projected_schema: options.projected_schema.clone(), meta_cache: options.meta_cache.clone(), predicate: options.predicate.clone(), frequency: options.frequency, meta_data: None, + row_projector_builder: options.row_projector_builder.clone(), row_projector: None, metrics, df_plan_metrics, @@ -153,7 +154,7 @@ impl<'a> Reader<'a> { async fn maybe_read_parallelly( &mut self, read_parallelism: usize, - ) -> Result> { + ) -> Result> { assert!(read_parallelism > 0); self.init_if_necessary().await?; @@ -162,11 +163,7 @@ impl<'a> Reader<'a> { return Ok(Vec::new()); } - let row_projector = { - let row_projector = self.row_projector.take().unwrap(); - ArrowRecordBatchProjector::from(row_projector) - }; - + let row_projector = self.row_projector.take().unwrap(); let streams: Vec<_> = streams .into_iter() .map(|stream| { @@ -366,12 +363,14 @@ impl<'a> Reader<'a> { }; let row_projector = self - .projected_schema - .try_project_with_key(&meta_data.custom().schema) + .row_projector_builder + .build(&meta_data.custom().schema) .box_err() .context(Projection)?; + self.meta_data = Some(meta_data); self.row_projector = Some(row_projector); + Ok(()) } @@ -493,7 +492,7 @@ pub(crate) struct ProjectorMetrics { struct RecordBatchProjector { stream: SendableRecordBatchStream, - row_projector: ArrowRecordBatchProjector, + row_projector: RowProjector, metrics: ProjectorMetrics, start_time: Instant, @@ -502,7 +501,7 @@ struct RecordBatchProjector { impl RecordBatchProjector { fn new( stream: SendableRecordBatchStream, - row_projector: ArrowRecordBatchProjector, + row_projector: RowProjector, metrics_collector: Option, ) -> Self { let metrics = ProjectorMetrics { @@ -520,7 +519,7 @@ impl RecordBatchProjector { } impl Stream for RecordBatchProjector { - type Item = Result; + type Item = Result; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let projector = self.get_mut(); @@ -541,11 +540,10 @@ impl Stream for RecordBatchProjector { } projector.metrics.row_num += record_batch.num_rows(); - let projected_batch = projector - .row_projector - .project_to_record_batch_with_key(record_batch) - .box_err() - .context(DecodeRecordBatch {}); + let projected_batch = + FetchedRecordBatch::try_new(&projector.row_projector, record_batch) + .box_err() + .context(DecodeRecordBatch {}); Poll::Ready(Some(projected_batch)) } @@ -576,7 +574,7 @@ impl<'a> SstReader for Reader<'a> { async fn read( &mut self, - ) -> Result>>> { + ) -> Result>>> { let mut streams = self.maybe_read_parallelly(1).await?; assert_eq!(streams.len(), 1); let stream = streams.pop().expect("impossible to fetch no stream"); @@ -587,7 +585,7 @@ impl<'a> SstReader for Reader<'a> { struct RecordBatchReceiver { bg_prefetch_tx: Option>, - rx_group: Vec>>, + rx_group: Vec>>, cur_rx_idx: usize, #[allow(dead_code)] drop_helper: AbortOnDropMany<()>, @@ -595,13 +593,13 @@ struct RecordBatchReceiver { #[async_trait] impl PrefetchableStream for RecordBatchReceiver { - type Item = Result; + type Item = Result; async fn start_prefetch(&mut self) { // Start the prefetch work in background when first poll is called. if let Some(tx) = self.bg_prefetch_tx.take() { if tx.send(()).is_err() { - error!("The receiver for start prefetching has been closed"); + error!("The receiver for start prefetched has been closed"); } } } @@ -612,7 +610,7 @@ impl PrefetchableStream for RecordBatchReceiver { } impl Stream for RecordBatchReceiver { - type Item = Result; + type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { if self.rx_group.is_empty() { @@ -622,7 +620,7 @@ impl Stream for RecordBatchReceiver { // Start the prefetch work in background when first poll is called. if let Some(tx) = self.bg_prefetch_tx.take() { if tx.send(()).is_err() { - error!("The receiver for start prefetching has been closed"); + error!("The receiver for start prefetched has been closed"); } } @@ -692,8 +690,8 @@ impl<'a> ThreadedReader<'a> { fn read_record_batches_from_sub_reader( &mut self, - mut reader: Box> + Send + Unpin>, - tx: Sender>, + mut reader: Box> + Send + Unpin>, + tx: Sender>, mut rx: watch::Receiver<()>, ) -> JoinHandle<()> { self.runtime.spawn(async move { @@ -720,7 +718,7 @@ impl<'a> SstReader for ThreadedReader<'a> { async fn read( &mut self, - ) -> Result>>> { + ) -> Result>>> { // Get underlying sst readers and channels. let sub_readers = self .inner @@ -744,7 +742,7 @@ impl<'a> SstReader for ThreadedReader<'a> { let channel_cap_per_sub_reader = self.channel_cap / sub_readers.len(); let channel_cap_per_sub_reader = channel_cap_per_sub_reader.max(1); let (tx_group, rx_group): (Vec<_>, Vec<_>) = (0..read_parallelism) - .map(|_| mpsc::channel::>(channel_cap_per_sub_reader)) + .map(|_| mpsc::channel::>(channel_cap_per_sub_reader)) .unzip(); let (bg_prefetch_tx, bg_prefetch_rx) = watch::channel(()); diff --git a/analytic_engine/src/sst/parquet/writer.rs b/analytic_engine/src/sst/parquet/writer.rs index ef233f7053..e84adea7f8 100644 --- a/analytic_engine/src/sst/parquet/writer.rs +++ b/analytic_engine/src/sst/parquet/writer.rs @@ -18,7 +18,7 @@ use std::collections::{HashMap, HashSet}; use async_trait::async_trait; use common_types::{ - datum::DatumKind, record_batch::RecordBatchWithKey, request_id::RequestId, time::TimeRange, + datum::DatumKind, record_batch::FetchedRecordBatch, request_id::RequestId, time::TimeRange, }; use datafusion::parquet::basic::Compression; use futures::StreamExt; @@ -41,8 +41,9 @@ use crate::{ }, }, writer::{ - self, BuildParquetFilter, EncodePbData, EncodeRecordBatch, ExpectTimestampColumn, Io, - MetaData, PollRecordBatch, RecordBatchStream, Result, SstInfo, SstWriter, Storage, + self, BuildParquetFilter, BuildParquetFilterNoCause, EncodePbData, EncodeRecordBatch, + ExpectTimestampColumn, Io, MetaData, PollRecordBatch, RecordBatchStream, Result, + SstInfo, SstWriter, Storage, }, }, table::sst_util, @@ -160,8 +161,8 @@ impl<'a> RecordBatchGroupWriter<'a> { /// the left rows. async fn fetch_next_row_group( &mut self, - prev_record_batch: &mut Option, - ) -> Result> { + prev_record_batch: &mut Option, + ) -> Result> { let mut curr_row_group = vec![]; // Used to record the number of remaining rows to fill `curr_row_group`. let mut remaining = self.options.num_rows_per_row_group; @@ -217,7 +218,7 @@ impl<'a> RecordBatchGroupWriter<'a> { fn build_column_encodings( &self, - sample_row_groups: &[RecordBatchWithKey], + sample_row_groups: &[FetchedRecordBatch], column_encodings: &mut HashMap, ) -> Result<()> { let mut sampler = ColumnEncodingSampler { @@ -233,9 +234,15 @@ impl<'a> RecordBatchGroupWriter<'a> { /// Build the parquet filter for the given `row_group`. fn build_row_group_filter( &self, - row_group_batch: &[RecordBatchWithKey], + row_group_batch: &[FetchedRecordBatch], ) -> Result { - let mut builder = RowGroupFilterBuilder::new(row_group_batch[0].schema_with_key()); + let schema_with_key = + row_group_batch[0] + .schema_with_key() + .with_context(|| BuildParquetFilterNoCause { + msg: "primary key indexes not exist", + })?; + let mut builder = RowGroupFilterBuilder::new(&schema_with_key); for partial_batch in row_group_batch { for (col_idx, column) in partial_batch.columns().iter().enumerate() { @@ -253,7 +260,7 @@ impl<'a> RecordBatchGroupWriter<'a> { fn update_column_values( column_values: &mut [Option], - record_batch: &RecordBatchWithKey, + record_batch: &FetchedRecordBatch, ) { for (col_idx, col_values) in column_values.iter_mut().enumerate() { let mut too_many_values = false; @@ -320,7 +327,7 @@ impl<'a> RecordBatchGroupWriter<'a> { sink: W, meta_path: &Path, ) -> Result<(usize, ParquetMetaData)> { - let mut prev_record_batch: Option = None; + let mut prev_record_batch: Option = None; let mut arrow_row_group = Vec::new(); let mut total_num_rows = 0; @@ -531,7 +538,7 @@ impl<'a> SstWriter for ParquetSstWriter<'a> { /// A sampler to decide the column encoding options (whether to do dictionary /// encoding) with a bunch of sample row groups. struct ColumnEncodingSampler<'a> { - sample_row_groups: &'a [RecordBatchWithKey], + sample_row_groups: &'a [FetchedRecordBatch], meta_data: &'a MetaData, min_num_sample_rows: usize, max_unique_value_ratio: f64, @@ -613,7 +620,7 @@ mod tests { use bytes_ext::Bytes; use common_types::{ - projected_schema::ProjectedSchema, + projected_schema::{ProjectedSchema, RowProjectorBuilder}, tests::{build_row, build_row_for_dictionary, build_schema, build_schema_with_dictionary}, time::{TimeRange, Timestamp}, }; @@ -625,7 +632,7 @@ mod tests { use super::*; use crate::{ - row_iter::tests::build_record_batch_with_key, + row_iter::tests::build_fetched_record_batch_with_key, sst::{ factory::{ Factory, FactoryImpl, ReadFrequency, ScanOptions, SstReadOptions, SstWriteOptions, @@ -722,7 +729,7 @@ mod tests { "tagv2", ), ]; - let batch = build_record_batch_with_key(schema.clone(), rows); + let batch = build_fetched_record_batch_with_key(schema.clone(), rows); Poll::Ready(Some(Ok(batch))) })); @@ -748,15 +755,20 @@ mod tests { let scan_options = ScanOptions::default(); // read sst back to test + let row_projector_builder = RowProjectorBuilder::new( + reader_projected_schema.to_record_schema(), + reader_projected_schema.table_schema().clone(), + None, + ); let sst_read_options = SstReadOptions { maybe_table_level_metrics: Arc::new(MaybeTableLevelMetrics::new("test")), frequency: ReadFrequency::Frequent, num_rows_per_row_group: 5, - projected_schema: reader_projected_schema, predicate: Arc::new(Predicate::empty()), meta_cache: None, scan_options, runtime: runtime.clone(), + row_projector_builder, }; let mut reader: Box = { @@ -889,7 +901,7 @@ mod tests { .map(|_| build_row(b"a", 100, 10.0, "v4", 1000, 1_000_000)) .collect::>(); - let batch = build_record_batch_with_key(schema_clone.clone(), rows); + let batch = build_fetched_record_batch_with_key(schema_clone.clone(), rows); poll_cnt += 1; Poll::Ready(Some(Ok(batch))) @@ -964,8 +976,9 @@ mod tests { .into_iter() .map(|v| build_row(v.0, v.1, v.2, v.3, v.4, v.5)) .collect(); - let record_batch_with_key0 = build_record_batch_with_key(schema.clone(), rows.clone()); - let record_batch_with_key1 = build_record_batch_with_key(schema.clone(), rows); + let record_batch_with_key0 = + build_fetched_record_batch_with_key(schema.clone(), rows.clone()); + let record_batch_with_key1 = build_fetched_record_batch_with_key(schema.clone(), rows); let meta_data = MetaData { min_key: Bytes::from_static(b""), max_key: Bytes::from_static(b""), diff --git a/analytic_engine/src/sst/reader.rs b/analytic_engine/src/sst/reader.rs index fbb36364bc..66cebc047c 100644 --- a/analytic_engine/src/sst/reader.rs +++ b/analytic_engine/src/sst/reader.rs @@ -15,7 +15,7 @@ //! Sst reader trait definition. use async_trait::async_trait; -use common_types::record_batch::RecordBatchWithKey; +use common_types::record_batch::FetchedRecordBatch; use crate::{prefetchable_stream::PrefetchableStream, sst::meta_data::SstMetaData}; @@ -105,7 +105,7 @@ pub trait SstReader { async fn read( &mut self, - ) -> Result>>>; + ) -> Result>>>; } #[cfg(test)] @@ -117,7 +117,7 @@ pub mod tests { pub async fn check_stream(stream: &mut S, expected_rows: Vec) where - S: PrefetchableStream> + Unpin, + S: PrefetchableStream> + Unpin, { let mut visited_rows = 0; while let Some(batch) = stream.fetch_next().await { diff --git a/analytic_engine/src/sst/writer.rs b/analytic_engine/src/sst/writer.rs index 355ef1827e..773715cdb9 100644 --- a/analytic_engine/src/sst/writer.rs +++ b/analytic_engine/src/sst/writer.rs @@ -19,7 +19,7 @@ use std::cmp; use async_trait::async_trait; use bytes_ext::Bytes; use common_types::{ - record_batch::RecordBatchWithKey, request_id::RequestId, schema::Schema, time::TimeRange, + record_batch::FetchedRecordBatch, request_id::RequestId, schema::Schema, time::TimeRange, SequenceNumber, }; use futures::Stream; @@ -82,6 +82,9 @@ pub mod error { #[snafu(display("Failed to build parquet filter, err:{}", source))] BuildParquetFilter { source: GenericError }, + #[snafu(display("Failed to build parquet filter msg:{msg}.\nBacktrace:\n{backtrace}"))] + BuildParquetFilterNoCause { msg: String, backtrace: Backtrace }, + #[snafu(display("Failed to poll record batch, err:{}", source))] PollRecordBatch { source: GenericError }, @@ -97,7 +100,7 @@ pub mod error { pub use error::*; -pub type RecordBatchStreamItem = std::result::Result; +pub type RecordBatchStreamItem = std::result::Result; // TODO(yingwen): SstReader also has a RecordBatchStream, can we use same type? pub type RecordBatchStream = Box + Send + Unpin>; diff --git a/benchmarks/src/merge_memtable_bench.rs b/benchmarks/src/merge_memtable_bench.rs index 1ca655e50b..35765a0a96 100644 --- a/benchmarks/src/merge_memtable_bench.rs +++ b/benchmarks/src/merge_memtable_bench.rs @@ -24,14 +24,11 @@ use analytic_engine::{ row_iter::{ dedup::DedupIterator, merge::{MergeBuilder, MergeConfig}, - IterOptions, RecordBatchWithKeyIterator, + FetchedRecordBatchIterator, IterOptions, }, space::SpaceId, sst::{ - factory::{ - FactoryImpl, FactoryRef as SstFactoryRef, ObjectStorePickerRef, ReadFrequency, - ScanOptions, SstReadOptions, - }, + factory::{FactoryImpl, FactoryRef as SstFactoryRef, ObjectStorePickerRef, ScanOptions}, meta_data::cache::MetaCacheRef, metrics::MaybeTableLevelMetrics as SstMaybeTableLevelMetrics, }, @@ -39,6 +36,7 @@ use analytic_engine::{ sst_util, version::{MemTableState, MemTableVec}, }, + ScanType, SstReadOptionsBuilder, }; use arena::NoopCollector; use common_types::{ @@ -61,7 +59,8 @@ pub struct MergeMemTableBench { space_id: SpaceId, table_id: TableId, dedup: bool, - sst_read_options: SstReadOptions, + sst_read_options_builder: SstReadOptionsBuilder, + num_rows_per_row_group: usize, } impl MergeMemTableBench { @@ -113,7 +112,8 @@ impl MergeMemTableBench { id: *id, }); } - let sst_read_options = mock_sst_read_options(projected_schema.clone(), runtime.clone()); + let sst_read_options_builder = + mock_sst_read_options_builder(projected_schema.clone(), runtime.clone()); MergeMemTableBench { store, @@ -125,7 +125,8 @@ impl MergeMemTableBench { space_id, table_id, dedup: true, - sst_read_options, + sst_read_options_builder, + num_rows_per_row_group: 500, } } @@ -149,7 +150,7 @@ impl MergeMemTableBench { let projected_schema = self.projected_schema.clone(); let sst_factory: SstFactoryRef = Arc::new(FactoryImpl); let iter_options = IterOptions { - batch_size: self.sst_read_options.num_rows_per_row_group, + batch_size: self.num_rows_per_row_group, }; let request_id = RequestId::next_id(); @@ -164,7 +165,7 @@ impl MergeMemTableBench { projected_schema, predicate: Arc::new(Predicate::empty()), sst_factory: &sst_factory, - sst_read_options: self.sst_read_options.clone(), + sst_read_options_builder: self.sst_read_options_builder.clone(), store_picker: &store_picker, merge_iter_options: iter_options.clone(), need_dedup: true, @@ -206,23 +207,24 @@ impl MergeMemTableBench { } } -fn mock_sst_read_options( - projected_schema: ProjectedSchema, +fn mock_sst_read_options_builder( + _projected_schema: ProjectedSchema, runtime: Arc, -) -> SstReadOptions { +) -> SstReadOptionsBuilder { let scan_options = ScanOptions { background_read_parallelism: 1, max_record_batches_in_flight: 1024, num_streams_to_prefetch: 0, }; - SstReadOptions { - maybe_table_level_metrics: Arc::new(SstMaybeTableLevelMetrics::new("bench")), - frequency: ReadFrequency::Frequent, - num_rows_per_row_group: 500, - projected_schema, - predicate: Arc::new(Predicate::empty()), - meta_cache: None, + let maybe_table_level_metrics = Arc::new(SstMaybeTableLevelMetrics::new("bench")); + + SstReadOptionsBuilder::new( + ScanType::Query, scan_options, + maybe_table_level_metrics, + 500, + Arc::new(Predicate::empty()), + None, runtime, - } + ) } diff --git a/benchmarks/src/merge_sst_bench.rs b/benchmarks/src/merge_sst_bench.rs index 434f452b70..c8b07a21b2 100644 --- a/benchmarks/src/merge_sst_bench.rs +++ b/benchmarks/src/merge_sst_bench.rs @@ -22,19 +22,17 @@ use analytic_engine::{ chain::ChainConfig, dedup::DedupIterator, merge::{MergeBuilder, MergeConfig}, - IterOptions, RecordBatchWithKeyIterator, + FetchedRecordBatchIterator, IterOptions, }, space::SpaceId, sst::{ - factory::{ - FactoryImpl, FactoryRef as SstFactoryRef, ObjectStorePickerRef, ReadFrequency, - ScanOptions, SstReadOptions, - }, + factory::{FactoryImpl, FactoryRef as SstFactoryRef, ObjectStorePickerRef, ScanOptions}, file::{FileHandle, FilePurgeQueue, Level, Request}, meta_data::cache::MetaCacheRef, metrics::MaybeTableLevelMetrics as SstMaybeTableLevelMetrics, }, table::sst_util, + ScanType, SstReadOptionsBuilder, }; use common_types::{projected_schema::ProjectedSchema, request_id::RequestId, schema::Schema}; use logger::info; @@ -49,7 +47,9 @@ pub struct MergeSstBench { store: ObjectStoreRef, max_projections: usize, schema: Schema, - sst_read_options: SstReadOptions, + projected_schema: Option, + sst_read_options_builder: SstReadOptionsBuilder, + num_rows_per_row_group: usize, runtime: Arc, space_id: SpaceId, table_id: TableId, @@ -73,22 +73,24 @@ impl MergeSstBench { let schema = runtime.block_on(util::schema_from_sst(&store, &sst_path, &meta_cache)); let predicate = config.predicate.into_predicate(); - let projected_schema = ProjectedSchema::no_projection(schema.clone()); + let _projected_schema = ProjectedSchema::no_projection(schema.clone()); let scan_options = ScanOptions { background_read_parallelism: 1, max_record_batches_in_flight: 1024, num_streams_to_prefetch: 0, }; - let sst_read_options = SstReadOptions { - maybe_table_level_metrics: Arc::new(SstMaybeTableLevelMetrics::new("bench")), - frequency: ReadFrequency::Frequent, - num_rows_per_row_group: config.num_rows_per_row_group, - projected_schema, - predicate, - meta_cache: meta_cache.clone(), + + let maybe_table_level_metrics = Arc::new(SstMaybeTableLevelMetrics::new("bench")); + let scan_type = ScanType::Query; + let sst_read_options_builder = SstReadOptionsBuilder::new( + scan_type, scan_options, - runtime: runtime.clone(), - }; + maybe_table_level_metrics, + config.num_rows_per_row_group, + predicate, + meta_cache.clone(), + runtime.clone(), + ); let max_projections = cmp::min(config.max_projections, schema.num_columns()); let (tx, rx) = mpsc::unbounded_channel(); @@ -107,7 +109,9 @@ impl MergeSstBench { store, max_projections, schema, - sst_read_options, + sst_read_options_builder, + num_rows_per_row_group: config.num_rows_per_row_group, + projected_schema: None, runtime, space_id, table_id, @@ -126,7 +130,7 @@ impl MergeSstBench { let projected_schema = util::projected_schema_by_number(&self.schema, i, self.max_projections); - self.sst_read_options.projected_schema = projected_schema; + self.projected_schema = Some(projected_schema); self.dedup = dedup; } @@ -134,10 +138,10 @@ impl MergeSstBench { let space_id = self.space_id; let table_id = self.table_id; let sequence = u64::MAX; - let projected_schema = self.sst_read_options.projected_schema.clone(); + let projected_schema = self.projected_schema.clone().unwrap(); let sst_factory: SstFactoryRef = Arc::new(FactoryImpl); let iter_options = IterOptions { - batch_size: self.sst_read_options.num_rows_per_row_group, + batch_size: self.num_rows_per_row_group, }; let request_id = RequestId::next_id(); @@ -152,7 +156,7 @@ impl MergeSstBench { projected_schema, predicate: Arc::new(Predicate::empty()), sst_factory: &sst_factory, - sst_read_options: self.sst_read_options.clone(), + sst_read_options_builder: self.sst_read_options_builder.clone(), store_picker: &store_picker, merge_iter_options: iter_options.clone(), need_dedup: true, @@ -190,7 +194,7 @@ impl MergeSstBench { fn run_no_dedup_bench(&self) { let space_id = self.space_id; let table_id = self.table_id; - let projected_schema = self.sst_read_options.projected_schema.clone(); + let projected_schema = self.projected_schema.clone().unwrap(); let sst_factory: SstFactoryRef = Arc::new(FactoryImpl); let request_id = RequestId::next_id(); @@ -204,7 +208,7 @@ impl MergeSstBench { projected_schema, predicate: Arc::new(Predicate::empty()), sst_factory: &sst_factory, - sst_read_options: self.sst_read_options.clone(), + sst_read_options_builder: self.sst_read_options_builder.clone(), store_picker: &store_picker, num_streams_to_prefetch: 0, }) diff --git a/benchmarks/src/scan_memtable_bench.rs b/benchmarks/src/scan_memtable_bench.rs index a738a9c100..72e09a054c 100644 --- a/benchmarks/src/scan_memtable_bench.rs +++ b/benchmarks/src/scan_memtable_bench.rs @@ -25,7 +25,7 @@ use analytic_engine::{ sst::meta_data::cache::MetaCacheRef, }; use arena::NoopCollector; -use common_types::projected_schema::ProjectedSchema; +use common_types::projected_schema::{ProjectedSchema, RowProjectorBuilder}; use logger::info; use object_store::{LocalFileSystem, Path}; @@ -91,14 +91,18 @@ impl ScanMemTableBench { pub fn run_bench(&self) { let scan_ctx = ScanContext::default(); + let fetched_schema = self.projected_schema.to_record_schema(); + let table_schema = self.projected_schema.table_schema(); + let row_projector_builder = + RowProjectorBuilder::new(fetched_schema, table_schema.clone(), None); let scan_req = ScanRequest { start_user_key: Bound::Unbounded, end_user_key: Bound::Unbounded, sequence: common_types::MAX_SEQUENCE_NUMBER, - projected_schema: self.projected_schema.clone(), need_dedup: true, reverse: false, metrics_collector: None, + row_projector_builder, }; let iter = self.memtable.scan(scan_ctx, scan_req).unwrap(); diff --git a/benchmarks/src/sst_bench.rs b/benchmarks/src/sst_bench.rs index 38273bb758..3e9ed3d8da 100644 --- a/benchmarks/src/sst_bench.rs +++ b/benchmarks/src/sst_bench.rs @@ -16,15 +16,18 @@ use std::{cmp, sync::Arc, time::Instant}; -use analytic_engine::sst::{ - factory::{ - Factory, FactoryImpl, ObjectStorePickerRef, ReadFrequency, ScanOptions, SstReadHint, - SstReadOptions, +use analytic_engine::{ + sst::{ + factory::{Factory, FactoryImpl, ObjectStorePickerRef, ScanOptions, SstReadHint}, + meta_data::cache::{MetaCache, MetaCacheRef}, + metrics::MaybeTableLevelMetrics as SstMaybeTableLevelMetrics, }, - meta_data::cache::{MetaCache, MetaCacheRef}, - metrics::MaybeTableLevelMetrics as SstMaybeTableLevelMetrics, + ScanType, SstReadOptionsBuilder, +}; +use common_types::{ + projected_schema::{ProjectedSchema, RowProjectorBuilder}, + schema::Schema, }; -use common_types::{projected_schema::ProjectedSchema, schema::Schema}; use logger::info; use object_store::{LocalFileSystem, ObjectStoreRef, Path}; use runtime::Runtime; @@ -36,7 +39,8 @@ pub struct SstBench { pub sst_file_name: String, max_projections: usize, schema: Schema, - sst_read_options: SstReadOptions, + projected_schema: Option, + sst_read_options_builder: SstReadOptionsBuilder, runtime: Arc, } @@ -57,16 +61,16 @@ impl SstBench { max_record_batches_in_flight: 1024, num_streams_to_prefetch: 0, }; - let sst_read_options = SstReadOptions { - maybe_table_level_metrics: Arc::new(SstMaybeTableLevelMetrics::new("bench")), - frequency: ReadFrequency::Frequent, - num_rows_per_row_group: config.num_rows_per_row_group, - projected_schema, + let maybe_table_level_metrics = Arc::new(SstMaybeTableLevelMetrics::new("bench")); + let sst_read_options_builder = SstReadOptionsBuilder::new( + ScanType::Query, + scan_options, + maybe_table_level_metrics, + config.num_rows_per_row_group, predicate, meta_cache, - scan_options, - runtime: runtime.clone(), - }; + runtime.clone(), + ); let max_projections = cmp::min(config.max_projections, schema.num_columns()); SstBench { @@ -74,7 +78,8 @@ impl SstBench { sst_file_name: config.sst_file_name, max_projections, schema, - sst_read_options, + projected_schema: Some(projected_schema), + sst_read_options_builder: sst_read_options_builder.clone(), runtime, } } @@ -88,7 +93,7 @@ impl SstBench { let projected_schema = util::projected_schema_by_number(&self.schema, i, self.max_projections); - self.sst_read_options.projected_schema = projected_schema; + self.projected_schema = Some(projected_schema); } pub fn run_bench(&self) { @@ -97,11 +102,23 @@ impl SstBench { let sst_factory = FactoryImpl; let store_picker: ObjectStorePickerRef = Arc::new(self.store.clone()); + let fetched_schema = self.projected_schema.as_ref().unwrap().to_record_schema(); + let table_schema = self + .projected_schema + .as_ref() + .unwrap() + .table_schema() + .clone(); + let row_projector_builder = RowProjectorBuilder::new(fetched_schema, table_schema, None); + let sst_read_options = self + .sst_read_options_builder + .clone() + .build(row_projector_builder); self.runtime.block_on(async { let mut sst_reader = sst_factory .create_reader( &sst_path, - &self.sst_read_options, + &sst_read_options, SstReadHint::default(), &store_picker, None, diff --git a/benchmarks/src/sst_tools.rs b/benchmarks/src/sst_tools.rs index 62653d3c30..12a090e0ba 100644 --- a/benchmarks/src/sst_tools.rs +++ b/benchmarks/src/sst_tools.rs @@ -38,8 +38,12 @@ use analytic_engine::{ }, table::sst_util, table_options::{Compression, StorageFormatHint}, + ScanType, SstReadOptionsBuilder, +}; +use common_types::{ + projected_schema::{ProjectedSchema, RowProjectorBuilder}, + request_id::RequestId, }; -use common_types::{projected_schema::ProjectedSchema, request_id::RequestId}; use generic_error::BoxError; use logger::info; use object_store::{LocalFileSystem, ObjectStoreRef, Path}; @@ -121,15 +125,19 @@ pub async fn rebuild_sst(config: RebuildSstConfig, runtime: Arc) { max_record_batches_in_flight: 1024, num_streams_to_prefetch: 2, }; + + let fetched_schema = projected_schema.to_record_schema(); + let table_schema = projected_schema.table_schema().clone(); + let row_projector_builder = RowProjectorBuilder::new(fetched_schema, table_schema, None); let sst_read_options = SstReadOptions { maybe_table_level_metrics: Arc::new(SstMaybeTableLevelMetrics::new("bench")), frequency: ReadFrequency::Once, num_rows_per_row_group: config.num_rows_per_row_group, - projected_schema, predicate: config.predicate.into_predicate(), meta_cache: None, scan_options, runtime, + row_projector_builder, }; let record_batch_stream = @@ -224,6 +232,7 @@ pub async fn merge_sst(config: MergeSstConfig, runtime: Arc) { let iter_options = IterOptions { batch_size: config.num_rows_per_row_group, }; + let scan_options = ScanOptions { background_read_parallelism: 1, max_record_batches_in_flight: 1024, @@ -234,16 +243,23 @@ pub async fn merge_sst(config: MergeSstConfig, runtime: Arc) { let sst_factory: SstFactoryRef = Arc::new(FactoryImpl); let store_picker: ObjectStorePickerRef = Arc::new(store); let projected_schema = ProjectedSchema::no_projection(schema.clone()); - let sst_read_options = SstReadOptions { - maybe_table_level_metrics: Arc::new(SstMaybeTableLevelMetrics::new("bench")), - frequency: ReadFrequency::Once, - num_rows_per_row_group: config.num_rows_per_row_group, - projected_schema: projected_schema.clone(), - predicate: config.predicate.into_predicate(), - meta_cache: None, + let maybe_table_level_metrics = Arc::new(SstMaybeTableLevelMetrics::new("bench")); + let sst_read_options_builder = SstReadOptionsBuilder::new( + ScanType::Query, scan_options, - runtime: runtime.clone(), - }; + maybe_table_level_metrics, + config.num_rows_per_row_group, + config.predicate.into_predicate(), + None, + runtime.clone(), + ); + let fetched_schema = projected_schema.to_record_schema_with_key(); + let primary_key_indexes = fetched_schema.primary_key_idx().to_vec(); + let fetched_schema = fetched_schema.into_record_schema(); + let table_schema = projected_schema.table_schema().clone(); + let row_projector_builder = + RowProjectorBuilder::new(fetched_schema, table_schema, Some(primary_key_indexes)); + let iter = { let space_id = config.space_id; let table_id = config.table_id; @@ -260,11 +276,11 @@ pub async fn merge_sst(config: MergeSstConfig, runtime: Arc) { projected_schema, predicate: Arc::new(Predicate::empty()), sst_factory: &sst_factory, - sst_read_options: sst_read_options.clone(), store_picker: &store_picker, merge_iter_options: iter_options.clone(), need_dedup: true, reverse: false, + sst_read_options_builder: sst_read_options_builder.clone(), }); builder .mut_ssts_of_level(Level::MIN) @@ -280,6 +296,7 @@ pub async fn merge_sst(config: MergeSstConfig, runtime: Arc) { row_iter::record_batch_with_key_iter_to_stream(iter) }; + let sst_read_options = sst_read_options_builder.build(row_projector_builder); let sst_meta = { let meta_reader = SstMetaReader { space_id, diff --git a/benchmarks/src/util.rs b/benchmarks/src/util.rs index d00c00ef8b..3c52b26011 100644 --- a/benchmarks/src/util.rs +++ b/benchmarks/src/util.rs @@ -35,7 +35,7 @@ use analytic_engine::{ }; use bytes_ext::{BufMut, SafeBufMut}; use common_types::{ - projected_schema::ProjectedSchema, + projected_schema::{ProjectedSchema, RowProjectorBuilder}, schema::{IndexInWriterSchema, Schema}, }; use macros::define_result; @@ -123,15 +123,20 @@ pub async fn load_sst_to_memtable( max_record_batches_in_flight: 1024, num_streams_to_prefetch: 0, }; + let projected_schema = ProjectedSchema::no_projection(schema.clone()); + + let fetched_schema = projected_schema.to_record_schema(); + let table_schema = projected_schema.table_schema().clone(); + let row_projector_builder = RowProjectorBuilder::new(fetched_schema, table_schema, None); let sst_read_options = SstReadOptions { maybe_table_level_metrics: Arc::new(SstMaybeTableLevelMetrics::new("bench")), frequency: ReadFrequency::Frequent, num_rows_per_row_group: 8192, - projected_schema: ProjectedSchema::no_projection(schema.clone()), predicate: Arc::new(Predicate::empty()), meta_cache: None, scan_options, runtime, + row_projector_builder, }; let sst_factory = FactoryImpl; let store_picker: ObjectStorePickerRef = Arc::new(store.clone()); diff --git a/catalog/src/schema.rs b/catalog/src/schema.rs index 01d27d5447..51fb7f82d2 100644 --- a/catalog/src/schema.rs +++ b/catalog/src/schema.rs @@ -26,6 +26,8 @@ use table_engine::{ table::{SchemaId, TableId, TableRef}, }; +// FIXME: `CreateExistTable` can lead to `segmentation fault` if including +// backtrace. #[derive(Debug, Snafu)] #[snafu(visibility(pub))] pub enum Error { @@ -118,12 +120,8 @@ pub enum Error { #[snafu(display("Failed to close table, source:{}", source))] CloseTableWithCause { source: GenericError }, - #[snafu(display( - "Failed to create table, table already exists, table:{}.\nBacktrace:\n{}", - table, - backtrace - ))] - CreateExistTable { table: String, backtrace: Backtrace }, + #[snafu(display("Failed to create table, table already exists, table:{table}."))] + CreateExistTable { table: String }, #[snafu(display( "Failed to create table, cannot persist meta, table:{}, err:{}", diff --git a/common_types/src/projected_schema.rs b/common_types/src/projected_schema.rs index 77962765d9..d0f780d8b6 100644 --- a/common_types/src/projected_schema.rs +++ b/common_types/src/projected_schema.rs @@ -62,19 +62,155 @@ pub type Result = std::result::Result; #[derive(Debug, Clone)] pub struct RowProjector { - schema_with_key: RecordSchemaWithKey, + /// The schema for data fetched + /// It is derived from table schema and some columns may not exist in data + /// source. + target_record_schema: RecordSchema, + + /// Primary key indexes in `fetched_schema`. + /// It will be `None` if update mode of table is `append`, + /// and will be `Some` if the mode is `overwrite`. + primary_key_indexes: Option>, + + /// Schema in data source + /// It is possible to be different with the table + /// schema caused by table schema altering. source_schema: Schema, - /// The Vec stores the column index in source, and `None` means this column - /// is not in source but required by reader, and need to filled by null. - /// The length of Vec is the same as the number of columns reader intended - /// to read. - source_projection: Vec>, + + /// The Vec stores the column index in data source, and `None` means this + /// column is not in source but required by reader, and need to filled + /// by null. The length of Vec is the same as the number of columns + /// reader intended to read. + source_projection_indexes: Vec>, + + /// Used to reorder columns in arrow record batch fetched from sst to the + /// needed projection order. + /// Actually, It stores the record column indexes in + /// projected order similar as `source_projection_indexes`. + /// + /// Why we need it? + /// Because in current rust parquet impl, we can just define which columns + /// we wanted to fetch without their order. + /// + /// For example: + /// wanted columns in order: 2,1,3 + /// actual fetched columns: 1,2,3 + /// + /// However, projection is not only wanted columns but with wanted order, so + /// we need this remapping to reorder the fetched record. + /// + /// For example: + /// source columns in sst: 0,1,2,3,4 + /// target projection columns: 2,1,3 + /// + /// the actual columns in fetched record: 1,2,3 + /// relative columns indexes in fetched record: 0,1,2 + /// + /// finally, the remapping to the relative indexes: 1,0,2 + target_record_projection_remapping: Vec>, } impl RowProjector { + pub fn new( + fetched_schema: &RecordSchema, + primary_key_indexes: Option>, + table_schema: &Schema, + source_schema: &Schema, + ) -> Result { + // Get `fetched_source_column_indexes`. + let mut fetched_source_column_indexes = Vec::with_capacity(fetched_schema.num_columns()); + let mut projected_source_indexes = Vec::with_capacity(fetched_schema.num_columns()); + for column_schema in fetched_schema.columns() { + Self::try_project_column( + column_schema, + table_schema, + source_schema, + &mut fetched_source_column_indexes, + &mut projected_source_indexes, + )?; + } + + // Get `fetched_projected_source_column_indexes` from + // `fetched_source_column_indexes`. + projected_source_indexes.sort_unstable(); + let fetched_projected_source_column_indexes = fetched_source_column_indexes + .iter() + .map(|source_idx_opt| { + source_idx_opt.map(|src_idx| { + // Safe to unwrap, index exists in `fetched_source_column_indexes` is ensured + // to exist in `projected_source_indexes`. + projected_source_indexes + .iter() + .position(|proj_idx| src_idx == *proj_idx) + .unwrap() + }) + }) + .collect(); + + Ok(RowProjector { + target_record_schema: fetched_schema.clone(), + primary_key_indexes, + source_schema: source_schema.clone(), + source_projection_indexes: fetched_source_column_indexes, + target_record_projection_remapping: fetched_projected_source_column_indexes, + }) + } + + fn try_project_column( + column: &ColumnSchema, + table_schema: &Schema, + source_schema: &Schema, + fetched_source_column_indexes: &mut Vec>, + projected_source_indexes: &mut Vec, + ) -> Result<()> { + match source_schema.index_of(&column.name) { + Some(source_idx) => { + // Column is in source + if table_schema.version() == source_schema.version() { + // Same version, just use that column in source + fetched_source_column_indexes.push(Some(source_idx)); + projected_source_indexes.push(source_idx); + } else { + // Different version, need to check column schema + let source_column = source_schema.column(source_idx); + // TODO(yingwen): Data type is not checked here because we do not support alter + // data type now. + match column + .compatible_for_read(source_column) + .context(IncompatReadColumn)? + { + ReadOp::Exact => { + fetched_source_column_indexes.push(Some(source_idx)); + projected_source_indexes.push(source_idx); + } + ReadOp::FillNull => { + fetched_source_column_indexes.push(None); + } + } + } + } + None => { + // Column is not in source + ensure!(column.is_nullable, MissingReadColumn { name: &column.name }); + // Column is nullable, fill this column by null + fetched_source_column_indexes.push(None); + } + } + + Ok(()) + } + + pub fn source_schema(&self) -> &Schema { + &self.source_schema + } + + pub fn fetched_schema(&self) -> &RecordSchema { + &self.target_record_schema + } + /// The projected indexes of existed columns in the source schema. pub fn existed_source_projection(&self) -> Vec { - self.source_projection + self.source_projection_indexes .iter() .filter_map(|index| *index) .collect() @@ -82,12 +218,18 @@ impl RowProjector { /// The projected indexes of all columns(existed and not exist) in the /// source schema. - pub fn source_projection(&self) -> &[Option] { - &self.source_projection + pub fn fetched_source_column_indexes(&self) -> &[Option] { + &self.source_projection_indexes } - pub fn schema_with_key(&self) -> &RecordSchemaWithKey { - &self.schema_with_key + /// The projected indexes of all columns(existed and not exist) in the + /// projected source schema. + pub fn fetched_projected_source_column_indexes(&self) -> &[Option] { + &self.target_record_projection_remapping + } + + pub fn primary_key_indexes(&self) -> Option<&[usize]> { + self.primary_key_indexes.as_deref() } /// Project the row. @@ -96,9 +238,9 @@ impl RowProjector { pub fn project_row(&self, row: &Row, mut datums_buffer: Vec) -> Row { assert_eq!(self.source_schema.num_columns(), row.num_columns()); - datums_buffer.reserve(self.schema_with_key.num_columns()); + datums_buffer.reserve(self.target_record_schema.num_columns()); - for p in &self.source_projection { + for p in &self.source_projection_indexes { let datum = match p { Some(index_in_source) => row[*index_in_source].clone(), None => Datum::Null, @@ -119,13 +261,43 @@ impl RowProjector { } } +#[derive(Debug, Clone)] +pub struct RowProjectorBuilder { + fetched_schema: RecordSchema, + table_schema: Schema, + primary_key_indexes: Option>, +} + +impl RowProjectorBuilder { + pub fn new( + fetched_schema: RecordSchema, + table_schema: Schema, + primary_key_indexes: Option>, + ) -> Self { + Self { + fetched_schema, + table_schema, + primary_key_indexes, + } + } + + pub fn build(&self, source_schema: &Schema) -> Result { + RowProjector::new( + &self.fetched_schema, + self.primary_key_indexes.clone(), + &self.table_schema, + source_schema, + ) + } +} + #[derive(Clone)] pub struct ProjectedSchema(Arc); impl fmt::Debug for ProjectedSchema { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("ProjectedSchema") - .field("original_schema", &self.0.original_schema) + .field("original_schema", &self.0.table_schema) .field("projection", &self.0.projection) .finish() } @@ -137,8 +309,8 @@ impl ProjectedSchema { Self(Arc::new(inner)) } - pub fn new(schema: Schema, projection: Option>) -> Result { - let inner = ProjectedSchemaInner::new(schema, projection)?; + pub fn new(table_schema: Schema, projection: Option>) -> Result { + let inner = ProjectedSchemaInner::new(table_schema, projection)?; Ok(Self(Arc::new(inner))) } @@ -150,42 +322,33 @@ impl ProjectedSchema { self.0.projection() } - /// Returns the [RowProjector] to project the rows with source schema to - /// rows with [RecordSchemaWithKey]. - /// - /// REQUIRE: The key columns are the same as this schema. - #[inline] - pub fn try_project_with_key(&self, source_schema: &Schema) -> Result { - self.0.try_project_with_key(source_schema) - } - // Returns the record schema after projection with key. pub fn to_record_schema_with_key(&self) -> RecordSchemaWithKey { - self.0.schema_with_key.clone() + self.0.record_schema_with_key.clone() } pub fn as_record_schema_with_key(&self) -> &RecordSchemaWithKey { - &self.0.schema_with_key + &self.0.record_schema_with_key } // Returns the record schema after projection. pub fn to_record_schema(&self) -> RecordSchema { - self.0.record_schema.clone() + self.0.target_record_schema.clone() } /// Returns the arrow schema after projection. pub fn to_projected_arrow_schema(&self) -> ArrowSchemaRef { - self.0.record_schema.to_arrow_schema_ref() + self.0.target_record_schema.to_arrow_schema_ref() } - pub fn original_schema(&self) -> &Schema { - &self.0.original_schema + pub fn table_schema(&self) -> &Schema { + &self.0.table_schema } } impl From for ceresdbproto::schema::ProjectedSchema { fn from(request: ProjectedSchema) -> Self { - let table_schema_pb = (&request.0.original_schema).into(); + let table_schema_pb = (&request.0.table_schema).into(); let projection_pb = request.0.projection.as_ref().map(|project| { let project = project .iter() @@ -223,55 +386,56 @@ impl TryFrom for ProjectedSchema { /// Schema with projection informations struct ProjectedSchemaInner { - /// The schema before projection that the reader intended to read, may - /// differ from current schema of the table. - original_schema: Schema, + /// The table schema used to generate plan, possible to differ from + /// schema in ssts/memtable. + table_schema: Schema, /// Index of the projected columns in `self.schema`, `None` if /// all columns are needed. projection: Option>, - /// The record schema from `self.schema` with key columns after projection. - schema_with_key: RecordSchemaWithKey, - /// The record schema from `self.schema` after projection. - record_schema: RecordSchema, + /// The fetched record schema from `self.schema` with key columns after + /// projection. + record_schema_with_key: RecordSchemaWithKey, + /// The fetched record schema from `self.schema` after projection. + target_record_schema: RecordSchema, } impl ProjectedSchemaInner { - fn no_projection(schema: Schema) -> Self { - let schema_with_key = schema.to_record_schema_with_key(); - let record_schema = schema.to_record_schema(); + fn no_projection(table_schema: Schema) -> Self { + let record_schema_with_key = table_schema.to_record_schema_with_key(); + let target_record_schema = table_schema.to_record_schema(); Self { - original_schema: schema, + table_schema, projection: None, - schema_with_key, - record_schema, + record_schema_with_key, + target_record_schema, } } - fn new(schema: Schema, projection: Option>) -> Result { + fn new(table_schema: Schema, projection: Option>) -> Result { if let Some(p) = &projection { // Projection is provided, validate the projection is valid. This is necessary // to avoid panic when creating RecordSchema and // RecordSchemaWithKey. if let Some(max_idx) = p.iter().max() { ensure!( - *max_idx < schema.num_columns(), + *max_idx < table_schema.num_columns(), InvalidProjectionIndex { index: *max_idx } ); } - let schema_with_key = schema.project_record_schema_with_key(p); - let record_schema = schema.project_record_schema(p); + let record_schema_with_key = table_schema.project_record_schema_with_key(p); + let target_record_schema = table_schema.project_record_schema(p); Ok(Self { - original_schema: schema, + table_schema, projection, - schema_with_key, - record_schema, + record_schema_with_key, + target_record_schema, }) } else { - Ok(Self::no_projection(schema)) + Ok(Self::no_projection(table_schema)) } } @@ -283,75 +447,6 @@ impl ProjectedSchemaInner { fn projection(&self) -> Option> { self.projection.clone() } - - // TODO(yingwen): We can fill missing not null column with default value instead - // of returning error. - fn try_project_with_key(&self, source_schema: &Schema) -> Result { - // When do primary key sample, this will assert will fail. - // TODO: maybe we can add a flag to only skip this assert when sampling. - // - // debug_assert_eq!( - // self.schema_with_key.key_columns(), - // source_schema.key_columns() - // ); - // We consider the two schema is equal if they have same version. - // if self.original_schema.version() == source_schema.version() { - // debug_assert_eq!(self.original_schema, *source_schema); - // } - - let mut source_projection = Vec::with_capacity(self.schema_with_key.num_columns()); - // For each column in `schema_with_key` - for column_schema in self.schema_with_key.columns() { - self.try_project_column(column_schema, source_schema, &mut source_projection)?; - } - - Ok(RowProjector { - schema_with_key: self.schema_with_key.clone(), - source_schema: source_schema.clone(), - source_projection, - }) - } - - fn try_project_column( - &self, - column: &ColumnSchema, - source_schema: &Schema, - source_projection: &mut Vec>, - ) -> Result<()> { - match source_schema.index_of(&column.name) { - Some(source_idx) => { - // Column is in source - if self.original_schema.version() == source_schema.version() { - // Same version, just use that column in source - source_projection.push(Some(source_idx)); - } else { - // Different version, need to check column schema - let source_column = source_schema.column(source_idx); - // TODO(yingwen): Data type is not checked here because we do not support alter - // data type now. - match column - .compatible_for_read(source_column) - .context(IncompatReadColumn)? - { - ReadOp::Exact => { - source_projection.push(Some(source_idx)); - } - ReadOp::FillNull => { - source_projection.push(None); - } - } - } - } - None => { - // Column is not in source - ensure!(column.is_nullable, MissingReadColumn { name: &column.name }); - // Column is nullable, fill this column by null - source_projection.push(None); - } - } - - Ok(()) - } } #[cfg(test)] @@ -365,7 +460,7 @@ mod tests { let projection: Vec = (0..schema.num_columns() - 1).collect(); let projected_schema = ProjectedSchema::new(schema.clone(), Some(projection)).unwrap(); assert_eq!( - projected_schema.0.schema_with_key.num_columns(), + projected_schema.0.record_schema_with_key.num_columns(), schema.num_columns() - 1 ); assert!(!projected_schema.is_all_projection()); diff --git a/common_types/src/record_batch.rs b/common_types/src/record_batch.rs index af9cca487d..1b7d610d8e 100644 --- a/common_types/src/record_batch.rs +++ b/common_types/src/record_batch.rs @@ -362,15 +362,70 @@ fn cast_arrow_record_batch(source: ArrowRecordBatch) -> Result } #[derive(Debug)] -pub struct RecordBatchWithKey { - schema_with_key: RecordSchemaWithKey, +pub struct FetchedRecordBatch { + schema: RecordSchema, + // TODO: remove it later, `FetchedRecordBatch` is unnecessary to know anything about primary + // keys. + primary_key_indexes: Option>, data: RecordBatchData, } -impl RecordBatchWithKey { - pub fn new(schema_with_key: RecordSchemaWithKey, data: RecordBatchData) -> Self { +impl FetchedRecordBatch { + pub fn try_new(ctx: &RowProjector, arrow_record_batch: ArrowRecordBatch) -> Result { + let column_indexes = ctx.fetched_projected_source_column_indexes(); + let schema = ctx.fetched_schema().clone(); + let mut column_blocks = Vec::with_capacity(schema.num_columns()); + + let num_rows = arrow_record_batch.num_rows(); + let num_columns = arrow_record_batch.num_columns(); + for (col_idx_opt, col_schema) in column_indexes.iter().zip(schema.columns()) { + match col_idx_opt { + Some(col_idx) => { + ensure!( + *col_idx < num_columns, + OutOfIndexProjection { + source_projection: column_indexes, + arrow_schema: arrow_record_batch.schema() + } + ); + + let array = arrow_record_batch.column(*col_idx); + let column_block = + ColumnBlock::try_from_arrow_array_ref(&col_schema.data_type, array) + .context(CreateColumnBlock)?; + + column_blocks.push(column_block); + } + None => { + // Need to push row with specific type. + let null_block = ColumnBlock::new_null_with_type( + &col_schema.data_type, + num_rows, + col_schema.is_dictionary, + ) + .context(CreateColumnBlock)?; + column_blocks.push(null_block); + } + } + } + + let data = RecordBatchData::new(schema.to_arrow_schema_ref(), column_blocks)?; + + Ok(FetchedRecordBatch { + schema, + primary_key_indexes: ctx.primary_key_indexes().map(|idxs| idxs.to_vec()), + data, + }) + } + + pub fn new_from_parts( + schema: RecordSchema, + primary_key_indexes: Option>, + data: RecordBatchData, + ) -> Self { Self { - schema_with_key, + schema, + primary_key_indexes, data, } } @@ -398,27 +453,22 @@ impl RecordBatchWithKey { Row::from_datums(datums) } - /// Project the [RecordBatchWithKey] into a [RecordBatch] according to + /// Project the [FetchedRecordBatch] into a [RecordBatch] according to /// [ProjectedSchema]. - /// - /// REQUIRE: The schema_with_key of the [RecordBatchWithKey] is the same as - /// the schema_with_key of [ProjectedSchema]. + // TODO: how do we ensure `ProjectedSchema` passed here is same as the source + // `ProjectedSchema` of `RecordSchema` here? pub fn try_project(mut self, projected_schema: &ProjectedSchema) -> Result { - debug_assert_eq!( - &self.schema_with_key, - projected_schema.as_record_schema_with_key() - ); - // Get the schema after projection. let record_schema = projected_schema.to_record_schema(); let mut column_blocks = Vec::with_capacity(record_schema.num_columns()); for column_schema in record_schema.columns() { - let column_index = self.schema_with_key.index_of(&column_schema.name).context( - ColumnNotInSchemaWithKey { - name: &column_schema.name, - }, - )?; + let column_index = + self.schema + .index_of(&column_schema.name) + .context(ColumnNotInSchemaWithKey { + name: &column_schema.name, + })?; // Take the column block out. let column_block = self.data.take_column_block(column_index); @@ -435,7 +485,7 @@ impl RecordBatchWithKey { pub fn into_record_batch(self) -> RecordBatch { RecordBatch { - schema: self.schema_with_key.into_record_schema(), + schema: self.schema, data: self.data, } } @@ -448,9 +498,20 @@ impl RecordBatchWithKey { self.data.arrow_record_batch } + pub fn schema_with_key(&self) -> Option { + self.primary_key_indexes + .clone() + .map(|idxs| RecordSchemaWithKey::new(self.schema.clone(), idxs)) + } + + #[inline] + pub fn schema(&self) -> &RecordSchema { + &self.schema + } + #[inline] - pub fn schema_with_key(&self) -> &RecordSchemaWithKey { - &self.schema_with_key + pub fn primary_key_indexes(&self) -> Option<&[usize]> { + self.primary_key_indexes.as_deref() } #[inline] @@ -485,7 +546,8 @@ impl RecordBatchWithKey { #[must_use] pub fn slice(&self, offset: usize, length: usize) -> Self { Self { - schema_with_key: self.schema_with_key.clone(), + schema: self.schema.clone(), + primary_key_indexes: self.primary_key_indexes.clone(), data: self.data.slice(offset, length), } } @@ -506,14 +568,15 @@ impl RecordBatchWithKey { } } -pub struct RecordBatchWithKeyBuilder { - schema_with_key: RecordSchemaWithKey, +pub struct FetchedRecordBatchBuilder { + fetched_schema: RecordSchema, + primary_key_indexes: Option>, builders: Vec, } -impl RecordBatchWithKeyBuilder { - pub fn new(schema_with_key: RecordSchemaWithKey) -> Self { - let builders = schema_with_key +impl FetchedRecordBatchBuilder { + pub fn new(fetched_schema: RecordSchema, primary_key_indexes: Option>) -> Self { + let builders = fetched_schema .columns() .iter() .map(|column_schema| { @@ -525,13 +588,18 @@ impl RecordBatchWithKeyBuilder { }) .collect(); Self { - schema_with_key, + fetched_schema, + primary_key_indexes, builders, } } - pub fn with_capacity(schema_with_key: RecordSchemaWithKey, capacity: usize) -> Self { - let builders = schema_with_key + pub fn with_capacity( + record_schema: RecordSchema, + primary_key_indexes: Option>, + capacity: usize, + ) -> Self { + let builders = record_schema .columns() .iter() .map(|column_schema| { @@ -543,7 +611,8 @@ impl RecordBatchWithKeyBuilder { }) .collect(); Self { - schema_with_key, + fetched_schema: record_schema, + primary_key_indexes, builders, } } @@ -598,7 +667,7 @@ impl RecordBatchWithKeyBuilder { /// - The `record_batch` and the builder must have the same schema. pub fn append_batch_range( &mut self, - record_batch: &RecordBatchWithKey, + record_batch: &FetchedRecordBatch, start: usize, len: usize, ) -> Result { @@ -638,115 +707,41 @@ impl RecordBatchWithKeyBuilder { } } - /// Build [RecordBatchWithKey] and reset the builder. - pub fn build(&mut self) -> Result { + /// Build [FetchedRecordBatch] and reset the builder. + pub fn build(&mut self) -> Result { let column_blocks: Vec<_> = self .builders .iter_mut() .map(|builder| builder.build()) .collect(); - let arrow_schema = self.schema_with_key.to_arrow_schema_ref(); + let arrow_schema = self.fetched_schema.to_arrow_schema_ref(); - Ok(RecordBatchWithKey { - schema_with_key: self.schema_with_key.clone(), + Ok(FetchedRecordBatch { + schema: self.fetched_schema.clone(), + primary_key_indexes: self.primary_key_indexes.clone(), data: RecordBatchData::new(arrow_schema, column_blocks)?, }) } } -#[derive(Debug, Clone)] -pub struct ArrowRecordBatchProjector { - row_projector: RowProjector, -} - -impl From for ArrowRecordBatchProjector { - fn from(row_projector: RowProjector) -> Self { - Self { row_projector } - } -} - -impl ArrowRecordBatchProjector { - /// Project the [arrow::RecordBatch] to [RecordBatchWithKey] and these - /// things are to be done: - /// - Insert the null column if the projected column does not appear in the - /// source schema. - /// - Convert the [arrow::RecordBatch] to [RecordBatchWithKey]. - /// - /// REQUIRE: Schema of the `arrow_record_batch` is the same as the - /// projection of existing column in the source schema. - pub fn project_to_record_batch_with_key( - &self, - arrow_record_batch: ArrowRecordBatch, - ) -> Result { - let schema_with_key = self.row_projector.schema_with_key().clone(); - let source_projection = self.row_projector.source_projection(); - let mut column_blocks = Vec::with_capacity(schema_with_key.num_columns()); - - let num_rows = arrow_record_batch.num_rows(); - // ensure next_arrow_column_idx < num_columns - let mut next_arrow_column_idx = 0; - let num_columns = arrow_record_batch.num_columns(); - - for (source_idx, column_schema) in source_projection.iter().zip(schema_with_key.columns()) { - match source_idx { - Some(_) => { - ensure!( - next_arrow_column_idx < num_columns, - OutOfIndexProjection { - source_projection, - arrow_schema: arrow_record_batch.schema() - } - ); - - let array = arrow_record_batch.column(next_arrow_column_idx); - next_arrow_column_idx += 1; - - let column_block = - ColumnBlock::try_from_arrow_array_ref(&column_schema.data_type, array) - .context(CreateColumnBlock)?; - - column_blocks.push(column_block); - } - None => { - // Need to push row with specific type. - let null_block = ColumnBlock::new_null_with_type( - &column_schema.data_type, - num_rows, - column_schema.is_dictionary, - ) - .context(CreateColumnBlock)?; - column_blocks.push(null_block); - } - } - } - - let data = RecordBatchData::new(schema_with_key.to_arrow_schema_ref(), column_blocks)?; - - Ok(RecordBatchWithKey { - schema_with_key, - data, - }) - } -} - #[cfg(test)] mod tests { use crate::{ - record_batch::{RecordBatchWithKey, RecordBatchWithKeyBuilder}, + record_batch::{FetchedRecordBatch, FetchedRecordBatchBuilder}, row::RowViewOnBatch, tests::{ - build_projected_schema, build_record_batch_with_key_by_rows, build_rows, + build_fetched_record_batch_by_rows, build_projected_schema, build_rows, check_record_batch_with_key_with_rows, }, }; - fn build_record_batch_with_key() -> RecordBatchWithKey { + fn build_fetched_record_batch() -> FetchedRecordBatch { let rows = build_rows(); - build_record_batch_with_key_by_rows(rows) + build_fetched_record_batch_by_rows(rows) } fn check_record_batch_with_key( - record_batch_with_key: RecordBatchWithKey, + record_batch_with_key: FetchedRecordBatch, row_num: usize, column_num: usize, ) -> bool { @@ -756,7 +751,7 @@ mod tests { #[test] fn test_append_projected_contiguous_row() { - let record_batch_with_key = build_record_batch_with_key(); + let record_batch_with_key = build_fetched_record_batch(); assert_eq!(record_batch_with_key.num_rows(), 5); assert_eq!(record_batch_with_key.num_columns(), 5); @@ -766,15 +761,11 @@ mod tests { #[test] fn test_append_row_view() { let projected_schema = build_projected_schema(); - - let record_batch_with_key = build_record_batch_with_key(); - - let mut builder = RecordBatchWithKeyBuilder::with_capacity( - projected_schema.to_record_schema_with_key(), - 2, - ); + let fetched_record_batch = build_fetched_record_batch(); + let mut builder = + FetchedRecordBatchBuilder::with_capacity(projected_schema.to_record_schema(), None, 2); let view = RowViewOnBatch { - record_batch: &record_batch_with_key, + record_batch: &fetched_record_batch, row_idx: 1, }; builder.append_row_view(&view).unwrap(); @@ -788,13 +779,10 @@ mod tests { #[test] fn test_append_batch_range() { let projected_schema = build_projected_schema(); + let record_batch_with_key = build_fetched_record_batch(); - let record_batch_with_key = build_record_batch_with_key(); - - let mut builder = RecordBatchWithKeyBuilder::with_capacity( - projected_schema.to_record_schema_with_key(), - 2, - ); + let mut builder = + FetchedRecordBatchBuilder::with_capacity(projected_schema.to_record_schema(), None, 2); builder .append_batch_range(&record_batch_with_key, 0, 2) .unwrap(); diff --git a/common_types/src/row/contiguous.rs b/common_types/src/row/contiguous.rs index db055e66ea..d16960959b 100644 --- a/common_types/src/row/contiguous.rs +++ b/common_types/src/row/contiguous.rs @@ -248,27 +248,24 @@ fn datum_view_at<'a>( /// schema of source row. pub struct ProjectedContiguousRow<'a, T> { source_row: T, - projector: &'a RowProjector, + ctx: &'a RowProjector, } impl<'a, T: ContiguousRow> ProjectedContiguousRow<'a, T> { - pub fn new(source_row: T, projector: &'a RowProjector) -> Self { - Self { - source_row, - projector, - } + pub fn new(source_row: T, ctx: &'a RowProjector) -> Self { + Self { source_row, ctx } } pub fn num_datum_views(&self) -> usize { - self.projector.source_projection().len() + self.ctx.fetched_source_column_indexes().len() } pub fn datum_view_at(&self, index: usize) -> DatumView { - let p = self.projector.source_projection()[index]; + let p = self.ctx.fetched_source_column_indexes()[index]; match p { Some(index_in_source) => { - let datum_kind = self.projector.datum_kind(index_in_source); + let datum_kind = self.ctx.datum_kind(index_in_source); self.source_row.datum_view_at(index_in_source, datum_kind) } None => DatumView::Null, @@ -801,7 +798,13 @@ mod tests { let projection: Vec = (0..schema.num_columns() - 1).collect(); let projected_schema = ProjectedSchema::new(schema.clone(), Some(projection.clone())).unwrap(); - let row_projected_schema = projected_schema.try_project_with_key(&schema).unwrap(); + let ctx = RowProjector::new( + &projected_schema.to_record_schema(), + None, + projected_schema.table_schema(), + &schema, + ) + .unwrap(); let rows = build_rows(); let index_in_writer = IndexInWriterSchema::for_same_schema(schema.num_columns()); @@ -812,7 +815,7 @@ mod tests { writer.write_row(&row).unwrap(); let source_row = ContiguousRowReader::try_new(&buf, &schema).unwrap(); - let projected_row = ProjectedContiguousRow::new(source_row, &row_projected_schema); + let projected_row = ProjectedContiguousRow::new(source_row, &ctx); let range = projection.clone(); for i in range { diff --git a/common_types/src/row/mod.rs b/common_types/src/row/mod.rs index 4fb8139283..652611a892 100644 --- a/common_types/src/row/mod.rs +++ b/common_types/src/row/mod.rs @@ -24,7 +24,7 @@ use snafu::{ensure, Backtrace, OptionExt, Snafu}; use crate::{ column_schema::{ColumnId, ColumnSchema}, datum::{Datum, DatumKind, DatumView}, - record_batch::RecordBatchWithKey, + record_batch::FetchedRecordBatch, schema::{RecordSchemaWithKey, Schema}, time::Timestamp, }; @@ -560,13 +560,13 @@ pub trait RowView { fn column_by_idx(&self, column_idx: usize) -> Datum; } -// TODO(yingwen): Add a method to get row view on RecordBatchWithKey. -/// A row view on the [RecordBatchWithKey]. +// TODO(yingwen): Add a method to get row view on FetchedRecordBatch. +/// A row view on the [FetchedRecordBatch]. /// /// `row_idx < record_batch.num_rows()` is ensured. #[derive(Debug)] pub struct RowViewOnBatch<'a> { - pub record_batch: &'a RecordBatchWithKey, + pub record_batch: &'a FetchedRecordBatch, pub row_idx: usize, } @@ -583,18 +583,18 @@ impl<'a> RowViewOnBatch<'a> { pub struct RowViewOnBatchColumnIter<'a> { next_column_idx: usize, row_idx: usize, - record_batch: &'a RecordBatchWithKey, + record_batch: &'a FetchedRecordBatch, } impl<'a> RowView for RowViewOnBatch<'a> { fn try_get_column_by_name(&self, column_name: &str) -> Result> { - let column_idx = self - .record_batch - .schema_with_key() - .index_of(column_name) - .context(ColumnNameNotFound { - column: column_name, - })?; + let column_idx = + self.record_batch + .schema() + .index_of(column_name) + .context(ColumnNameNotFound { + column: column_name, + })?; Ok(Some(self.column_by_idx(column_idx))) } diff --git a/common_types/src/schema.rs b/common_types/src/schema.rs index 2ceaa46576..5abdeabb95 100644 --- a/common_types/src/schema.rs +++ b/common_types/src/schema.rs @@ -539,6 +539,13 @@ pub struct RecordSchemaWithKey { } impl RecordSchemaWithKey { + pub fn new(record_schema: RecordSchema, primary_key_indexes: Vec) -> Self { + Self { + record_schema, + primary_key_indexes, + } + } + pub fn num_columns(&self) -> usize { self.record_schema.num_columns() } @@ -578,7 +585,11 @@ impl RecordSchemaWithKey { .collect::>() } - pub(crate) fn into_record_schema(self) -> RecordSchema { + pub fn to_record_schema(&self) -> RecordSchema { + self.record_schema.clone() + } + + pub fn into_record_schema(self) -> RecordSchema { self.record_schema } diff --git a/common_types/src/tests.rs b/common_types/src/tests.rs index c3abca0060..4d5d8e1f54 100644 --- a/common_types/src/tests.rs +++ b/common_types/src/tests.rs @@ -18,8 +18,8 @@ use sqlparser::ast::{BinaryOperator, Expr, Value}; use crate::{ column_schema, datum::{Datum, DatumKind}, - projected_schema::ProjectedSchema, - record_batch::{RecordBatchWithKey, RecordBatchWithKeyBuilder}, + projected_schema::{ProjectedSchema, RowProjector}, + record_batch::{FetchedRecordBatch, FetchedRecordBatchBuilder}, row::{ contiguous::{ContiguousRowReader, ContiguousRowWriter, ProjectedContiguousRow}, Row, @@ -357,15 +357,16 @@ pub fn build_rows() -> Vec { ] } -pub fn build_record_batch_with_key_by_rows(rows: Vec) -> RecordBatchWithKey { +pub fn build_fetched_record_batch_by_rows(rows: Vec) -> FetchedRecordBatch { let schema = build_schema(); assert!(schema.num_columns() > 1); let projection: Vec = (0..schema.num_columns() - 1).collect(); let projected_schema = ProjectedSchema::new(schema.clone(), Some(projection)).unwrap(); - let row_projected_schema = projected_schema.try_project_with_key(&schema).unwrap(); + let row_projector = + RowProjector::new(&projected_schema.to_record_schema(), None, &schema, &schema).unwrap(); let mut builder = - RecordBatchWithKeyBuilder::with_capacity(projected_schema.to_record_schema_with_key(), 2); + FetchedRecordBatchBuilder::with_capacity(row_projector.fetched_schema().clone(), None, 2); let index_in_writer = IndexInWriterSchema::for_same_schema(schema.num_columns()); let mut buf = Vec::new(); @@ -375,7 +376,7 @@ pub fn build_record_batch_with_key_by_rows(rows: Vec) -> RecordBatchWithKey writer.write_row(&row).unwrap(); let source_row = ContiguousRowReader::try_new(&buf, &schema).unwrap(); - let projected_row = ProjectedContiguousRow::new(source_row, &row_projected_schema); + let projected_row = ProjectedContiguousRow::new(source_row, &row_projector); builder .append_projected_contiguous_row(&projected_row) .unwrap(); @@ -384,7 +385,7 @@ pub fn build_record_batch_with_key_by_rows(rows: Vec) -> RecordBatchWithKey } pub fn check_record_batch_with_key_with_rows( - record_batch_with_key: &RecordBatchWithKey, + record_batch_with_key: &FetchedRecordBatch, row_num: usize, column_num: usize, rows: Vec, diff --git a/components/object_store/src/disk_cache.rs b/components/object_store/src/disk_cache.rs index 5b37431c20..53d537ffa6 100644 --- a/components/object_store/src/disk_cache.rs +++ b/components/object_store/src/disk_cache.rs @@ -825,7 +825,7 @@ impl ObjectStore for DiskCacheStore { } async fn get(&self, location: &Path) -> Result { - // In sst module, we only use get_range, fetching a whole file is not used, and + // In sst module, we only use get_range, fetched a whole file is not used, and // it is not good for disk cache. self.underlying_store.get(location).await } diff --git a/integration_tests/cases/env/local/ddl/query-plan.result b/integration_tests/cases/env/local/ddl/query-plan.result index ec2258d64d..26dcf9098e 100644 --- a/integration_tests/cases/env/local/ddl/query-plan.result +++ b/integration_tests/cases/env/local/ddl/query-plan.result @@ -2,6 +2,10 @@ DROP TABLE IF EXISTS `03_dml_select_real_time_range`; affected_rows: 0 +DROP TABLE IF EXISTS `03_append_mode_table`; + +affected_rows: 0 + CREATE TABLE `03_dml_select_real_time_range` ( name string TAG, value double NOT NULL, @@ -27,7 +31,7 @@ explain analyze select t from `03_dml_select_real_time_range` where t > 1695348001000; plan_type,plan, -String("Plan with Metrics"),String("ScanTable: table=03_dml_select_real_time_range, parallelism=8, metrics=[\nPredicate { exprs:[t > TimestampMillisecond(1695348001000, None)], time_range:TimeRange { inclusive_start: Timestamp(1695348001001), exclusive_end: Timestamp(9223372036854775807) } }\nscan_table:\n do_merge_sort=true\n iter_num=1\n merge_iter_0:\n init_duration=xxs\n num_memtables=1\n num_ssts=0\n scan_count=2\n scan_duration=xxs\n times_fetch_row_from_multiple=0\n times_fetch_rows_from_one=1\n total_rows_fetch_from_one=1\n scan_memtable_1:\n=0]\n"), +String("Plan with Metrics"),String("ScanTable: table=03_dml_select_real_time_range, parallelism=8, metrics=[\nPredicate { exprs:[t > TimestampMillisecond(1695348001000, None)], time_range:TimeRange { inclusive_start: Timestamp(1695348001001), exclusive_end: Timestamp(9223372036854775807) } }\nscan_table:\n do_merge_sort=true\n iter_num=1\n merge_iter_0:\n init_duration=xxs\n num_memtables=1\n num_ssts=0\n scan_count=2\n scan_duration=xxs\n times_fetch_row_from_multiple=0\n times_fetch_rows_from_one=1\n total_rows_fetch_from_one=1\n scan_memtable_1, fetched_columns:[tsid,t]:\n=0]\n"), -- This query should not include memtable @@ -47,7 +51,7 @@ explain analyze select t from `03_dml_select_real_time_range` where t > 1695348001000; plan_type,plan, -String("Plan with Metrics"),String("ScanTable: table=03_dml_select_real_time_range, parallelism=8, metrics=[\nPredicate { exprs:[t > TimestampMillisecond(1695348001000, None)], time_range:TimeRange { inclusive_start: Timestamp(1695348001001), exclusive_end: Timestamp(9223372036854775807) } }\nscan_table:\n do_merge_sort=true\n iter_num=1\n merge_iter_0:\n init_duration=xxs\n num_memtables=0\n num_ssts=1\n scan_count=2\n scan_duration=xxs\n times_fetch_row_from_multiple=0\n times_fetch_rows_from_one=1\n total_rows_fetch_from_one=1\n scan_sst_1:\n meta_data_cache_hit=false\n parallelism=1\n project_record_batch=xxs\n read_meta_data_duration=xxs\n row_mem=320\n row_num=3\n prune_row_groups:\n pruned_by_custom_filter=0\n pruned_by_min_max=0\n row_groups_after_prune=1\n total_row_groups=1\n use_custom_filter=false\n=0]\n"), +String("Plan with Metrics"),String("ScanTable: table=03_dml_select_real_time_range, parallelism=8, metrics=[\nPredicate { exprs:[t > TimestampMillisecond(1695348001000, None)], time_range:TimeRange { inclusive_start: Timestamp(1695348001001), exclusive_end: Timestamp(9223372036854775807) } }\nscan_table:\n do_merge_sort=true\n iter_num=1\n merge_iter_0:\n init_duration=xxs\n num_memtables=0\n num_ssts=1\n scan_count=2\n scan_duration=xxs\n times_fetch_row_from_multiple=0\n times_fetch_rows_from_one=1\n total_rows_fetch_from_one=1\n scan_sst_1, fetched_columns:[tsid,t]:\n meta_data_cache_hit=false\n parallelism=1\n project_record_batch=xxs\n read_meta_data_duration=xxs\n row_mem=320\n row_num=3\n prune_row_groups:\n pruned_by_custom_filter=0\n pruned_by_min_max=0\n row_groups_after_prune=1\n total_row_groups=1\n use_custom_filter=false\n=0]\n"), -- This query should not include SST @@ -58,7 +62,58 @@ plan_type,plan, String("Plan with Metrics"),String("ScanTable: table=03_dml_select_real_time_range, parallelism=8, metrics=[\nPredicate { exprs:[t > TimestampMillisecond(1695348002000, None)], time_range:TimeRange { inclusive_start: Timestamp(1695348002001), exclusive_end: Timestamp(9223372036854775807) } }\nscan_table:\n do_merge_sort=true\n iter_num=0\n=0]\n"), +-- Table with an 'append' update mode +CREATE TABLE `03_append_mode_table` ( + name string TAG, + value double NOT NULL, + t timestamp NOT NULL, + timestamp KEY (t)) ENGINE = Analytic WITH ( + enable_ttl = 'false', + segment_duration = '2h', + update_mode = 'append' +); + +affected_rows: 0 + +INSERT INTO `03_append_mode_table` (t, name, value) + VALUES + (1695348000000, "ceresdb", 100), + (1695348001000, "ceresdb", 200), + (1695348002000, "ceresdb", 300); + +affected_rows: 3 + +-- Should just fetch projected columns from memtable +-- SQLNESS REPLACE duration=\d+.?\d*(µ|m|n) duration=xx +-- SQLNESS REPLACE since_create=\d+.?\d*(µ|m|n) since_create=xx +-- SQLNESS REPLACE since_init=\d+.?\d*(µ|m|n) since_init=xx +-- SQLNESS REPLACE elapsed_compute=\d+.?\d*(µ|m|n) elapsed_compute=xx +explain analyze select t from `03_append_mode_table` +where t >= 1695348001000 and name = 'ceresdb'; + +plan_type,plan, +String("Plan with Metrics"),String("ProjectionExec: expr=[t@0 as t], metrics=[output_rows=2, elapsed_compute=xxs]\n ScanTable: table=03_append_mode_table, parallelism=8, metrics=[\nPredicate { exprs:[t >= TimestampMillisecond(1695348001000, None), name = Utf8(\"ceresdb\")], time_range:TimeRange { inclusive_start: Timestamp(1695348001000), exclusive_end: Timestamp(9223372036854775807) } }\nscan_table:\n do_merge_sort=false\n chain_iter_0:\n num_memtables=1\n num_ssts=0\n scan_duration=xxs\n since_create=xxs\n since_init=xxs\n total_batch_fetched=1\n total_rows_fetched=2\n scan_memtable_1, fetched_columns:[t,name]:\n=0]\n"), + + +-- Should just fetch projected columns from SST +-- SQLNESS ARG pre_cmd=flush +-- SQLNESS REPLACE duration=\d+.?\d*(µ|m|n) duration=xx +-- SQLNESS REPLACE since_create=\d+.?\d*(µ|m|n) since_create=xx +-- SQLNESS REPLACE since_init=\d+.?\d*(µ|m|n) since_init=xx +-- SQLNESS REPLACE elapsed_compute=\d+.?\d*(µ|m|n) elapsed_compute=xx +-- SQLNESS REPLACE project_record_batch=\d+.?\d*(µ|m|n) project_record_batch=xx +explain analyze select t from `03_append_mode_table` +where t >= 1695348001000 and name = 'ceresdb'; + +plan_type,plan, +String("Plan with Metrics"),String("ProjectionExec: expr=[t@0 as t], metrics=[output_rows=2, elapsed_compute=xxs]\n ScanTable: table=03_append_mode_table, parallelism=8, metrics=[\nPredicate { exprs:[t >= TimestampMillisecond(1695348001000, None), name = Utf8(\"ceresdb\")], time_range:TimeRange { inclusive_start: Timestamp(1695348001000), exclusive_end: Timestamp(9223372036854775807) } }\nscan_table:\n do_merge_sort=false\n chain_iter_0:\n num_memtables=0\n num_ssts=1\n scan_duration=xxs\n since_create=xxs\n since_init=xxs\n total_batch_fetched=1\n total_rows_fetched=2\n scan_sst_1, fetched_columns:[t,name]:\n meta_data_cache_hit=false\n parallelism=1\n project_record_batch=xxs\n read_meta_data_duration=xxs\n row_mem=408\n row_num=3\n prune_row_groups:\n pruned_by_custom_filter=0\n pruned_by_min_max=0\n row_groups_after_prune=1\n total_row_groups=1\n use_custom_filter=false\n=0]\n"), + + DROP TABLE `03_dml_select_real_time_range`; affected_rows: 0 +DROP TABLE `03_append_mode_table`; + +affected_rows: 0 + diff --git a/integration_tests/cases/env/local/ddl/query-plan.sql b/integration_tests/cases/env/local/ddl/query-plan.sql index 00fb19e05c..a0baff5b81 100644 --- a/integration_tests/cases/env/local/ddl/query-plan.sql +++ b/integration_tests/cases/env/local/ddl/query-plan.sql @@ -1,4 +1,5 @@ DROP TABLE IF EXISTS `03_dml_select_real_time_range`; +DROP TABLE IF EXISTS `03_append_mode_table`; CREATE TABLE `03_dml_select_real_time_range` ( name string TAG, @@ -36,4 +37,40 @@ where t > 1695348001000; explain analyze select t from `03_dml_select_real_time_range` where t > 1695348002000; +-- Table with an 'append' update mode +CREATE TABLE `03_append_mode_table` ( + name string TAG, + value double NOT NULL, + t timestamp NOT NULL, + timestamp KEY (t)) ENGINE = Analytic WITH ( + enable_ttl = 'false', + segment_duration = '2h', + update_mode = 'append' +); + +INSERT INTO `03_append_mode_table` (t, name, value) + VALUES + (1695348000000, "ceresdb", 100), + (1695348001000, "ceresdb", 200), + (1695348002000, "ceresdb", 300); + +-- Should just fetch projected columns from memtable +-- SQLNESS REPLACE duration=\d+.?\d*(µ|m|n) duration=xx +-- SQLNESS REPLACE since_create=\d+.?\d*(µ|m|n) since_create=xx +-- SQLNESS REPLACE since_init=\d+.?\d*(µ|m|n) since_init=xx +-- SQLNESS REPLACE elapsed_compute=\d+.?\d*(µ|m|n) elapsed_compute=xx +explain analyze select t from `03_append_mode_table` +where t >= 1695348001000 and name = 'ceresdb'; + +-- Should just fetch projected columns from SST +-- SQLNESS ARG pre_cmd=flush +-- SQLNESS REPLACE duration=\d+.?\d*(µ|m|n) duration=xx +-- SQLNESS REPLACE since_create=\d+.?\d*(µ|m|n) since_create=xx +-- SQLNESS REPLACE since_init=\d+.?\d*(µ|m|n) since_init=xx +-- SQLNESS REPLACE elapsed_compute=\d+.?\d*(µ|m|n) elapsed_compute=xx +-- SQLNESS REPLACE project_record_batch=\d+.?\d*(µ|m|n) project_record_batch=xx +explain analyze select t from `03_append_mode_table` +where t >= 1695348001000 and name = 'ceresdb'; + DROP TABLE `03_dml_select_real_time_range`; +DROP TABLE `03_append_mode_table`; diff --git a/partition_table_engine/src/scan_builder.rs b/partition_table_engine/src/scan_builder.rs index 1291508fd2..247bcae98b 100644 --- a/partition_table_engine/src/scan_builder.rs +++ b/partition_table_engine/src/scan_builder.rs @@ -79,7 +79,7 @@ impl PartitionedTableScanBuilder { impl TableScanBuilder for PartitionedTableScanBuilder { async fn build(&self, request: ReadRequest) -> Result> { // Build partition rule. - let table_schema_snapshot = request.projected_schema.original_schema(); + let table_schema_snapshot = request.projected_schema.table_schema(); let df_partition_rule = DfPartitionRuleAdapter::new(self.partition_info.clone(), table_schema_snapshot) .map_err(|e| { diff --git a/src/wal/src/message_queue_impl/region.rs b/src/wal/src/message_queue_impl/region.rs index 1ebfd176a7..292d0469c9 100644 --- a/src/wal/src/message_queue_impl/region.rs +++ b/src/wal/src/message_queue_impl/region.rs @@ -810,7 +810,7 @@ pub struct MessageQueueLogIterator { /// Polling's end point /// - /// While fetching in slave node, it will be set to `None`, and + /// While fetched in slave node, it will be set to `None`, and /// reading will not stop. /// Otherwise, it will be set to high watermark. terminate_offset: Option, diff --git a/system_catalog/src/tables.rs b/system_catalog/src/tables.rs index 7593f7d754..dc1113f784 100644 --- a/system_catalog/src/tables.rs +++ b/system_catalog/src/tables.rs @@ -21,7 +21,8 @@ use catalog::{manager::ManagerRef, schema::SchemaRef, CatalogRef}; use common_types::{ column_schema, datum::{Datum, DatumKind}, - record_batch::RecordBatchWithKeyBuilder, + projected_schema::RowProjector, + record_batch::FetchedRecordBatchBuilder, row::Row, schema, schema::Schema, @@ -153,13 +154,22 @@ impl SystemTable for Tables { .all_catalogs() .box_err() .context(table_engine::table::Scan { table: self.name() })?; - let projected_record_schema = request.projected_schema.to_record_schema_with_key(); - let mut builder = RecordBatchWithKeyBuilder::new(projected_record_schema); + let fetched_schema = request.projected_schema.to_record_schema_with_key(); + let primary_key_indexes = fetched_schema.primary_key_idx().to_vec(); + let fetched_schema = fetched_schema.to_record_schema(); + let mut builder = FetchedRecordBatchBuilder::new( + fetched_schema.clone(), + Some(primary_key_indexes.clone()), + ); - let projector = request - .projected_schema - .try_project_with_key(&self.schema) - .expect("Should succeed to try_project_key of sys_tables"); + let table_schema = request.projected_schema.table_schema(); + let row_projector = RowProjector::new( + &fetched_schema, + Some(primary_key_indexes), + table_schema, + &self.schema, + ) + .expect("Should succeed to try_project_key of sys_tables"); for catalog in &catalogs { for schema in &catalog .all_schemas() @@ -172,7 +182,7 @@ impl SystemTable for Tables { .context(table_engine::table::Scan { table: self.name() })? { let row = self.from_table(catalog.clone(), schema.clone(), table.clone()); - let projected_row = projector.project_row(&row, Vec::new()); + let projected_row = row_projector.project_row(&row, Vec::new()); builder .append_row(projected_row) .box_err() diff --git a/table_engine/src/provider.rs b/table_engine/src/provider.rs index 7f9e974708..6b0c38a770 100644 --- a/table_engine/src/provider.rs +++ b/table_engine/src/provider.rs @@ -141,10 +141,11 @@ impl TableScanBuilder for NormalTableScanBuilder { #[derive(Debug)] pub struct TableProviderAdapter { table: TableRef, + /// The schema of the table when this adapter is created, used as schema /// snapshot for read to avoid the reader sees different schema during /// query - read_schema: Schema, + current_table_schema: Schema, /// Table scan builder builder: B, @@ -153,11 +154,11 @@ pub struct TableProviderAdapter { impl TableProviderAdapter { pub fn new(table: TableRef, builder: B) -> Self { // Take a snapshot of the schema - let read_schema = table.schema(); + let current_table_schema = table.schema(); Self { table, - read_schema, + current_table_schema, builder, } } @@ -193,12 +194,14 @@ impl TableProviderAdapter { ); let predicate = self.check_and_build_predicate_from_filters(filters); - let projected_schema = ProjectedSchema::new(self.read_schema.clone(), projection.cloned()) - .map_err(|e| { - DataFusionError::Internal(format!( - "Invalid projection, plan:{self:?}, projection:{projection:?}, err:{e:?}" - )) - })?; + let projected_schema = + ProjectedSchema::new(self.current_table_schema.clone(), projection.cloned()).map_err( + |e| { + DataFusionError::Internal(format!( + "Invalid projection, plan:{self:?}, projection:{projection:?}, err:{e:?}" + )) + }, + )?; let opts = ReadOptions { deadline, @@ -224,7 +227,9 @@ impl TableProviderAdapter { .filter_map(|filter| { let filter_cols = visitor::find_columns_by_expr(filter); - let support_pushdown = self.table.support_pushdown(&self.read_schema, &filter_cols); + let support_pushdown = self + .table + .support_pushdown(&self.current_table_schema, &filter_cols); if support_pushdown { Some(filter.clone()) } else { @@ -235,7 +240,7 @@ impl TableProviderAdapter { PredicateBuilder::default() .add_pushdown_exprs(&pushdown_filters) - .extract_time_range(&self.read_schema, filters) + .extract_time_range(&self.current_table_schema, filters) .build() } @@ -245,7 +250,9 @@ impl TableProviderAdapter { .map(|filter| { let filter_cols = visitor::find_columns_by_expr(filter); - let support_pushdown = self.table.support_pushdown(&self.read_schema, &filter_cols); + let support_pushdown = self + .table + .support_pushdown(&self.current_table_schema, &filter_cols); if support_pushdown { TableProviderFilterPushDown::Exact } else { @@ -264,7 +271,7 @@ impl TableProvider for TableProviderAdapter { fn schema(&self) -> SchemaRef { // We use the `read_schema` as the schema of this `TableProvider` - self.read_schema.clone().into_arrow_schema_ref() + self.current_table_schema.clone().into_arrow_schema_ref() } async fn scan( @@ -297,7 +304,7 @@ impl TableSource for TableProviderAdapter { /// Get a reference to the schema for this table fn schema(&self) -> SchemaRef { - self.read_schema.clone().into_arrow_schema_ref() + self.current_table_schema.clone().into_arrow_schema_ref() } /// Get the type of this table for metadata/catalog purposes. diff --git a/tools/src/bin/sst-convert.rs b/tools/src/bin/sst-convert.rs index 0021a1425b..57c8f8f5fa 100644 --- a/tools/src/bin/sst-convert.rs +++ b/tools/src/bin/sst-convert.rs @@ -30,7 +30,10 @@ use analytic_engine::{ }; use anyhow::{Context, Result}; use clap::Parser; -use common_types::{projected_schema::ProjectedSchema, request_id::RequestId}; +use common_types::{ + projected_schema::{ProjectedSchema, RowProjectorBuilder}, + request_id::RequestId, +}; use generic_error::BoxError; use object_store::{LocalFileSystem, Path}; use runtime::Runtime; @@ -92,15 +95,20 @@ async fn run(args: Args, runtime: Arc) -> Result<()> { let sst_meta = sst_util::meta_from_sst(&store, &input_path).await; let factory = FactoryImpl; let scan_options = ScanOptions::default(); + let projected_schema = ProjectedSchema::no_projection(sst_meta.schema.clone()); + + let fetched_schema = projected_schema.to_record_schema(); + let table_schema = projected_schema.table_schema().clone(); + let row_projector_builder = RowProjectorBuilder::new(fetched_schema, table_schema, None); let reader_opts = SstReadOptions { maybe_table_level_metrics: Arc::new(SstMaybeTableLevelMetrics::new("tool")), frequency: ReadFrequency::Once, num_rows_per_row_group: 8192, - projected_schema: ProjectedSchema::no_projection(sst_meta.schema.clone()), predicate: Arc::new(Predicate::empty()), meta_cache: None, scan_options, runtime, + row_projector_builder, }; let store_picker: ObjectStorePickerRef = Arc::new(store); let mut reader = factory