Skip to content

Commit

Permalink
refactor: optimize replace into data block reading (databendlabs#11780)
Browse files Browse the repository at this point in the history
* optimize replace into data block reading

* fix sql logic test

* fix sqllogic test

* more sqllogic test
  • Loading branch information
dantengsky authored and andylokandy committed Nov 27, 2023
1 parent ccb4827 commit 92dfb2c
Show file tree
Hide file tree
Showing 3 changed files with 185 additions and 53 deletions.
2 changes: 1 addition & 1 deletion src/query/storages/fuse/src/operations/replace_into/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub use processors::BroadcastProcessor;
pub use processors::MergeIntoOperationAggregator;
pub use processors::ReplaceIntoProcessor;

#[derive(Clone)]
#[derive(Clone, Debug)]
pub struct OnConflictField {
pub table_field: common_expression::TableField,
pub field_index: common_expression::FieldIndex,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,12 @@ use common_catalog::table_context::TableContext;
use common_exception::ErrorCode;
use common_exception::Result;
use common_expression::ColumnId;
use common_expression::ComputedExpr;
use common_expression::DataBlock;
use common_expression::FieldIndex;
use common_expression::Scalar;
use common_expression::TableSchema;
use common_sql::evaluator::BlockOperator;
use opendal::Operator;
use siphasher::sip128;
use siphasher::sip128::Hasher128;
Expand Down Expand Up @@ -59,11 +63,16 @@ use crate::operations::replace_into::meta::merge_into_operation_meta::MergeIntoO
use crate::operations::replace_into::meta::merge_into_operation_meta::UniqueKeyDigest;
use crate::operations::replace_into::mutator::deletion_accumulator::DeletionAccumulator;
use crate::operations::replace_into::OnConflictField;

struct AggregationContext {
segment_locations: HashMap<SegmentIndex, Location>,
// the fields specified in ON CONFLICT clause
on_conflict_fields: Vec<OnConflictField>,
block_reader: Arc<BlockReader>,
// table fields excludes `on_conflict_fields`
remain_column_field_ids: Vec<FieldIndex>,
// reader that reads the ON CONFLICT key fields
key_column_reader: Arc<BlockReader>,
// reader that reads the `remain_column_field_ids`
remain_column_reader: Option<Arc<BlockReader>>,
data_accessor: Operator,
write_settings: WriteSettings,
read_settings: ReadSettings,
Expand Down Expand Up @@ -94,22 +103,61 @@ impl MergeIntoOperationAggregator {
let deletion_accumulator = DeletionAccumulator::default();
let segment_reader =
MetaReaders::segment_info_reader(data_accessor.clone(), table_schema.clone());
let indices = (0..table_schema.fields().len()).collect::<Vec<usize>>();
let projection = Projection::Columns(indices);
let block_reader = BlockReader::create(
data_accessor.clone(),
table_schema,
projection,
ctx.clone(),
false,
)?;

// order matters, later the projection that used by block readers depends on the order
let key_column_field_indexes: Vec<FieldIndex> =
on_conflict_fields.iter().map(|i| i.field_index).collect();

let remain_column_field_ids: Vec<FieldIndex> = {
let all_field_indexes = table_schema
.fields()
.iter()
.enumerate()
.filter(|(_, f)| !matches!(f.computed_expr(), Some(ComputedExpr::Virtual(_))))
.map(|(i, _)| i)
.collect::<Vec<FieldIndex>>();

all_field_indexes
.into_iter()
.filter(|index| !key_column_field_indexes.contains(index))
.collect()
};

let key_column_reader = {
let projection = Projection::Columns(key_column_field_indexes);
BlockReader::create(
data_accessor.clone(),
table_schema.clone(),
projection,
ctx.clone(),
false,
)
}?;

let remain_column_reader = {
if remain_column_field_ids.is_empty() {
None
} else {
let projection = Projection::Columns(remain_column_field_ids.clone());
let reader = BlockReader::create(
data_accessor.clone(),
table_schema,
projection,
ctx.clone(),
false,
)?;
Some(reader)
}
};

Ok(Self {
deletion_accumulator,
aggregation_ctx: Arc::new(AggregationContext {
segment_locations: HashMap::from_iter(segment_locations.into_iter()),
on_conflict_fields,
block_reader,
remain_column_field_ids,
key_column_reader,
remain_column_reader,
data_accessor,
write_settings,
read_settings,
Expand Down Expand Up @@ -245,51 +293,25 @@ impl AggregationContext {
deleted_key_hashes: &HashSet<UniqueKeyDigest>,
) -> Result<Option<ReplacementLogEntry>> {
info!(
"apply delete to segment idx {}, block idx {}",
segment_index, block_index
"apply delete to segment idx {}, block idx {}, num of deletion key hashes: {}",
segment_index,
block_index,
deleted_key_hashes.len()
);

if block_meta.row_count == 0 {
return Ok(None);
}

let reader = &self.block_reader;
let on_conflict_fields = &self.on_conflict_fields;
let key_columns_data = self.read_block(&self.key_column_reader, block_meta).await?;

let merged_io_read_result = reader
.read_columns_data_by_merge_io(
&self.read_settings,
&block_meta.location.0,
&block_meta.col_metas,
)
.await?;

// deserialize block data
// cpu intensive task, send them to dedicated thread pool
let data_block = {
let storage_format = self.write_settings.storage_format;
let block_meta_ptr = block_meta.clone();
let reader = reader.clone();
GlobalIORuntime::instance()
.spawn_blocking(move || {
let column_chunks = merged_io_read_result.columns_chunks()?;
reader.deserialize_chunks(
block_meta_ptr.location.0.as_str(),
block_meta_ptr.row_count as usize,
&block_meta_ptr.compression,
&block_meta_ptr.col_metas,
column_chunks,
&storage_format,
)
})
.await?
};

let num_rows = data_block.num_rows();
let num_rows = key_columns_data.num_rows();

let on_conflict_fields = &self.on_conflict_fields;
let mut columns = Vec::with_capacity(on_conflict_fields.len());
for field in on_conflict_fields {
let on_conflict_field_index = field.field_index;
let key_column = data_block
for (field, _) in on_conflict_fields.iter().enumerate() {
let on_conflict_field_index = field;
let key_column = key_columns_data
.columns()
.get(on_conflict_field_index)
.ok_or_else(|| {
Expand Down Expand Up @@ -322,7 +344,9 @@ impl AggregationContext {
}

let delete_nums = bitmap.unset_bits();
// shortcuts
info!("number of row deleted: {}", delete_nums);

// shortcut: nothing to be deleted
if delete_nums == 0 {
info!("nothing deleted");
// nothing to be deleted
Expand All @@ -334,11 +358,13 @@ impl AggregationContext {
// ignore bytes.
bytes: 0,
};

self.block_builder
.ctx
.get_write_progress()
.incr(&progress_values);

// shortcut: nothing to be deleted
if delete_nums == block_meta.row_count as usize {
info!("whole block deletion");
// whole block deletion
Expand All @@ -355,8 +381,38 @@ impl AggregationContext {
}

let bitmap = bitmap.into();
let new_block = data_block.filter_with_bitmap(&bitmap)?;
info!("number of row deleted: {}", delete_nums);
let mut key_columns_data_after_deletion = key_columns_data.filter_with_bitmap(&bitmap)?;

let new_block = match &self.remain_column_reader {
None => key_columns_data_after_deletion,
Some(remain_columns_reader) => {
// read the remaining columns
let remain_columns_data =
self.read_block(remain_columns_reader, block_meta).await?;

// remove the deleted rows
let remain_columns_data_after_deletion =
remain_columns_data.filter_with_bitmap(&bitmap)?;

// merge the remaining columns
for col in remain_columns_data_after_deletion.columns() {
key_columns_data_after_deletion.add_column(col.clone());
}

// resort the block
let col_indexes = self
.on_conflict_fields
.iter()
.map(|f| f.field_index)
.chain(self.remain_column_field_ids.iter().copied())
.collect::<Vec<_>>();
let mut projection = (0..col_indexes.len()).collect::<Vec<_>>();
projection.sort_by_key(|&i| col_indexes[i]);
let func_ctx = self.block_builder.ctx.get_function_context()?;
BlockOperator::Project { projection }
.execute(&func_ctx, key_columns_data_after_deletion)?
}
};

// serialization and compression is cpu intensive, send them to dedicated thread pool
// and wait (asyncly, which will NOT block the executor thread)
Expand Down Expand Up @@ -446,6 +502,35 @@ impl AggregationContext {
false
}
}

async fn read_block(&self, reader: &BlockReader, block_meta: &BlockMeta) -> Result<DataBlock> {
let merged_io_read_result = reader
.read_columns_data_by_merge_io(
&self.read_settings,
&block_meta.location.0,
&block_meta.col_metas,
)
.await?;

// deserialize block data
// cpu intensive task, send them to dedicated thread pool
let storage_format = self.write_settings.storage_format;
let block_meta_ptr = block_meta.clone();
let reader = reader.clone();
GlobalIORuntime::instance()
.spawn_blocking(move || {
let column_chunks = merged_io_read_result.columns_chunks()?;
reader.deserialize_chunks(
block_meta_ptr.location.0.as_str(),
block_meta_ptr.row_count as usize,
&block_meta_ptr.compression,
&block_meta_ptr.col_metas,
column_chunks,
&storage_format,
)
})
.await
}
}

#[cfg(test)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,53 @@ statement ok
DROP TABLE test



###########################################
# test cases for "prewhere" optimization #
###########################################

statement ok
drop table if exists t;

statement ok
create table t (id int, c1 int, c2 int, c3 int, c4 int) row_per_block=3;

statement ok
insert into t select number, number * 3, number * 5, number * 7, number * 9 from numbers(100);

query IIIII
select sum(id), sum(c1), sum(c2), sum(c3), sum(c4) from t;
----
4950 14850 24750 34650 44550

statement ok
replace into t on(c1, c3) select * from t;

# verify the replace into is idempotent
query IIIII
select sum(id), sum(c1), sum(c2), sum(c3), sum(c4) from t;
----
4950 14850 24750 34650 44550


# update half of the rows, set columns other than c1 and c3 to 0
statement ok
replace into t on(c3, c1) select 0 , c1, 0, c3, 0 from t where t.id % 2 = 0;

# verify the result is as expected
query IIIII
select sum(t.id), sum(t.c1), sum(t.c2), sum(t.c3), sum(t.c4) from (select 0 as id , c1, 0 as c2, c3, 0 as c4 from t where t.id % 2 = 0 union select * from t where t.id % 2 != 0) t;
----
2500 14850 12500 34650 22500

query IIIII
select sum(id), sum(c1), sum(c2), sum(c3), sum(c4) from t;
----
2500 14850 12500 34650 22500

statement ok
drop table t;

statement ok
DROP DATABASE db_09_0023

Expand Down

0 comments on commit 92dfb2c

Please sign in to comment.