From 92dfb2c9c5c40b38d1b60fbd0174bfa8c4a9ba7d Mon Sep 17 00:00:00 2001 From: dantengsky Date: Sun, 18 Jun 2023 10:15:33 +0800 Subject: [PATCH] refactor: optimize replace into data block reading (#11780) * optimize replace into data block reading * fix sql logic test * fix sqllogic test * more sqllogic test --- .../fuse/src/operations/replace_into/mod.rs | 2 +- .../mutator/merge_into_mutator.rs | 189 +++++++++++++----- .../base/09_fuse_engine/09_0023_replace_into | 47 +++++ 3 files changed, 185 insertions(+), 53 deletions(-) diff --git a/src/query/storages/fuse/src/operations/replace_into/mod.rs b/src/query/storages/fuse/src/operations/replace_into/mod.rs index 8ea492ec5c815..8a173111eb4f2 100644 --- a/src/query/storages/fuse/src/operations/replace_into/mod.rs +++ b/src/query/storages/fuse/src/operations/replace_into/mod.rs @@ -20,7 +20,7 @@ pub use processors::BroadcastProcessor; pub use processors::MergeIntoOperationAggregator; pub use processors::ReplaceIntoProcessor; -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct OnConflictField { pub table_field: common_expression::TableField, pub field_index: common_expression::FieldIndex, diff --git a/src/query/storages/fuse/src/operations/replace_into/mutator/merge_into_mutator.rs b/src/query/storages/fuse/src/operations/replace_into/mutator/merge_into_mutator.rs index 1f3ebe5366f9b..a3212f17ce617 100644 --- a/src/query/storages/fuse/src/operations/replace_into/mutator/merge_into_mutator.rs +++ b/src/query/storages/fuse/src/operations/replace_into/mutator/merge_into_mutator.rs @@ -28,8 +28,12 @@ use common_catalog::table_context::TableContext; use common_exception::ErrorCode; use common_exception::Result; use common_expression::ColumnId; +use common_expression::ComputedExpr; +use common_expression::DataBlock; +use common_expression::FieldIndex; use common_expression::Scalar; use common_expression::TableSchema; +use common_sql::evaluator::BlockOperator; use opendal::Operator; use siphasher::sip128; use siphasher::sip128::Hasher128; @@ -59,11 +63,16 @@ use crate::operations::replace_into::meta::merge_into_operation_meta::MergeIntoO use crate::operations::replace_into::meta::merge_into_operation_meta::UniqueKeyDigest; use crate::operations::replace_into::mutator::deletion_accumulator::DeletionAccumulator; use crate::operations::replace_into::OnConflictField; - struct AggregationContext { segment_locations: HashMap, + // the fields specified in ON CONFLICT clause on_conflict_fields: Vec, - block_reader: Arc, + // table fields excludes `on_conflict_fields` + remain_column_field_ids: Vec, + // reader that reads the ON CONFLICT key fields + key_column_reader: Arc, + // reader that reads the `remain_column_field_ids` + remain_column_reader: Option>, data_accessor: Operator, write_settings: WriteSettings, read_settings: ReadSettings, @@ -94,22 +103,61 @@ impl MergeIntoOperationAggregator { let deletion_accumulator = DeletionAccumulator::default(); let segment_reader = MetaReaders::segment_info_reader(data_accessor.clone(), table_schema.clone()); - let indices = (0..table_schema.fields().len()).collect::>(); - let projection = Projection::Columns(indices); - let block_reader = BlockReader::create( - data_accessor.clone(), - table_schema, - projection, - ctx.clone(), - false, - )?; + + // order matters, later the projection that used by block readers depends on the order + let key_column_field_indexes: Vec = + on_conflict_fields.iter().map(|i| i.field_index).collect(); + + let remain_column_field_ids: Vec = { + let all_field_indexes = table_schema + .fields() + .iter() + .enumerate() + .filter(|(_, f)| !matches!(f.computed_expr(), Some(ComputedExpr::Virtual(_)))) + .map(|(i, _)| i) + .collect::>(); + + all_field_indexes + .into_iter() + .filter(|index| !key_column_field_indexes.contains(index)) + .collect() + }; + + let key_column_reader = { + let projection = Projection::Columns(key_column_field_indexes); + BlockReader::create( + data_accessor.clone(), + table_schema.clone(), + projection, + ctx.clone(), + false, + ) + }?; + + let remain_column_reader = { + if remain_column_field_ids.is_empty() { + None + } else { + let projection = Projection::Columns(remain_column_field_ids.clone()); + let reader = BlockReader::create( + data_accessor.clone(), + table_schema, + projection, + ctx.clone(), + false, + )?; + Some(reader) + } + }; Ok(Self { deletion_accumulator, aggregation_ctx: Arc::new(AggregationContext { segment_locations: HashMap::from_iter(segment_locations.into_iter()), on_conflict_fields, - block_reader, + remain_column_field_ids, + key_column_reader, + remain_column_reader, data_accessor, write_settings, read_settings, @@ -245,51 +293,25 @@ impl AggregationContext { deleted_key_hashes: &HashSet, ) -> Result> { info!( - "apply delete to segment idx {}, block idx {}", - segment_index, block_index + "apply delete to segment idx {}, block idx {}, num of deletion key hashes: {}", + segment_index, + block_index, + deleted_key_hashes.len() ); + if block_meta.row_count == 0 { return Ok(None); } - let reader = &self.block_reader; - let on_conflict_fields = &self.on_conflict_fields; + let key_columns_data = self.read_block(&self.key_column_reader, block_meta).await?; - let merged_io_read_result = reader - .read_columns_data_by_merge_io( - &self.read_settings, - &block_meta.location.0, - &block_meta.col_metas, - ) - .await?; - - // deserialize block data - // cpu intensive task, send them to dedicated thread pool - let data_block = { - let storage_format = self.write_settings.storage_format; - let block_meta_ptr = block_meta.clone(); - let reader = reader.clone(); - GlobalIORuntime::instance() - .spawn_blocking(move || { - let column_chunks = merged_io_read_result.columns_chunks()?; - reader.deserialize_chunks( - block_meta_ptr.location.0.as_str(), - block_meta_ptr.row_count as usize, - &block_meta_ptr.compression, - &block_meta_ptr.col_metas, - column_chunks, - &storage_format, - ) - }) - .await? - }; - - let num_rows = data_block.num_rows(); + let num_rows = key_columns_data.num_rows(); + let on_conflict_fields = &self.on_conflict_fields; let mut columns = Vec::with_capacity(on_conflict_fields.len()); - for field in on_conflict_fields { - let on_conflict_field_index = field.field_index; - let key_column = data_block + for (field, _) in on_conflict_fields.iter().enumerate() { + let on_conflict_field_index = field; + let key_column = key_columns_data .columns() .get(on_conflict_field_index) .ok_or_else(|| { @@ -322,7 +344,9 @@ impl AggregationContext { } let delete_nums = bitmap.unset_bits(); - // shortcuts + info!("number of row deleted: {}", delete_nums); + + // shortcut: nothing to be deleted if delete_nums == 0 { info!("nothing deleted"); // nothing to be deleted @@ -334,11 +358,13 @@ impl AggregationContext { // ignore bytes. bytes: 0, }; + self.block_builder .ctx .get_write_progress() .incr(&progress_values); + // shortcut: nothing to be deleted if delete_nums == block_meta.row_count as usize { info!("whole block deletion"); // whole block deletion @@ -355,8 +381,38 @@ impl AggregationContext { } let bitmap = bitmap.into(); - let new_block = data_block.filter_with_bitmap(&bitmap)?; - info!("number of row deleted: {}", delete_nums); + let mut key_columns_data_after_deletion = key_columns_data.filter_with_bitmap(&bitmap)?; + + let new_block = match &self.remain_column_reader { + None => key_columns_data_after_deletion, + Some(remain_columns_reader) => { + // read the remaining columns + let remain_columns_data = + self.read_block(remain_columns_reader, block_meta).await?; + + // remove the deleted rows + let remain_columns_data_after_deletion = + remain_columns_data.filter_with_bitmap(&bitmap)?; + + // merge the remaining columns + for col in remain_columns_data_after_deletion.columns() { + key_columns_data_after_deletion.add_column(col.clone()); + } + + // resort the block + let col_indexes = self + .on_conflict_fields + .iter() + .map(|f| f.field_index) + .chain(self.remain_column_field_ids.iter().copied()) + .collect::>(); + let mut projection = (0..col_indexes.len()).collect::>(); + projection.sort_by_key(|&i| col_indexes[i]); + let func_ctx = self.block_builder.ctx.get_function_context()?; + BlockOperator::Project { projection } + .execute(&func_ctx, key_columns_data_after_deletion)? + } + }; // serialization and compression is cpu intensive, send them to dedicated thread pool // and wait (asyncly, which will NOT block the executor thread) @@ -446,6 +502,35 @@ impl AggregationContext { false } } + + async fn read_block(&self, reader: &BlockReader, block_meta: &BlockMeta) -> Result { + let merged_io_read_result = reader + .read_columns_data_by_merge_io( + &self.read_settings, + &block_meta.location.0, + &block_meta.col_metas, + ) + .await?; + + // deserialize block data + // cpu intensive task, send them to dedicated thread pool + let storage_format = self.write_settings.storage_format; + let block_meta_ptr = block_meta.clone(); + let reader = reader.clone(); + GlobalIORuntime::instance() + .spawn_blocking(move || { + let column_chunks = merged_io_read_result.columns_chunks()?; + reader.deserialize_chunks( + block_meta_ptr.location.0.as_str(), + block_meta_ptr.row_count as usize, + &block_meta_ptr.compression, + &block_meta_ptr.col_metas, + column_chunks, + &storage_format, + ) + }) + .await + } } #[cfg(test)] diff --git a/tests/sqllogictests/suites/base/09_fuse_engine/09_0023_replace_into b/tests/sqllogictests/suites/base/09_fuse_engine/09_0023_replace_into index 76bc7a5159e10..628ce4bb053fc 100644 --- a/tests/sqllogictests/suites/base/09_fuse_engine/09_0023_replace_into +++ b/tests/sqllogictests/suites/base/09_fuse_engine/09_0023_replace_into @@ -377,6 +377,53 @@ statement ok DROP TABLE test + +########################################### +# test cases for "prewhere" optimization # +########################################### + +statement ok +drop table if exists t; + +statement ok +create table t (id int, c1 int, c2 int, c3 int, c4 int) row_per_block=3; + +statement ok +insert into t select number, number * 3, number * 5, number * 7, number * 9 from numbers(100); + +query IIIII +select sum(id), sum(c1), sum(c2), sum(c3), sum(c4) from t; +---- +4950 14850 24750 34650 44550 + +statement ok +replace into t on(c1, c3) select * from t; + +# verify the replace into is idempotent +query IIIII +select sum(id), sum(c1), sum(c2), sum(c3), sum(c4) from t; +---- +4950 14850 24750 34650 44550 + + +# update half of the rows, set columns other than c1 and c3 to 0 +statement ok +replace into t on(c3, c1) select 0 , c1, 0, c3, 0 from t where t.id % 2 = 0; + +# verify the result is as expected +query IIIII +select sum(t.id), sum(t.c1), sum(t.c2), sum(t.c3), sum(t.c4) from (select 0 as id , c1, 0 as c2, c3, 0 as c4 from t where t.id % 2 = 0 union select * from t where t.id % 2 != 0) t; +---- +2500 14850 12500 34650 22500 + +query IIIII +select sum(id), sum(c1), sum(c2), sum(c3), sum(c4) from t; +---- +2500 14850 12500 34650 22500 + +statement ok +drop table t; + statement ok DROP DATABASE db_09_0023