Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: optimize replace into data block reading #11780

Merged
merged 4 commits into from
Jun 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/query/storages/fuse/src/operations/replace_into/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub use processors::BroadcastProcessor;
pub use processors::MergeIntoOperationAggregator;
pub use processors::ReplaceIntoProcessor;

#[derive(Clone)]
#[derive(Clone, Debug)]
pub struct OnConflictField {
pub table_field: common_expression::TableField,
pub field_index: common_expression::FieldIndex,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,12 @@ use common_catalog::table_context::TableContext;
use common_exception::ErrorCode;
use common_exception::Result;
use common_expression::ColumnId;
use common_expression::ComputedExpr;
use common_expression::DataBlock;
use common_expression::FieldIndex;
use common_expression::Scalar;
use common_expression::TableSchema;
use common_sql::evaluator::BlockOperator;
use opendal::Operator;
use siphasher::sip128;
use siphasher::sip128::Hasher128;
Expand Down Expand Up @@ -59,11 +63,16 @@ use crate::operations::replace_into::meta::merge_into_operation_meta::MergeIntoO
use crate::operations::replace_into::meta::merge_into_operation_meta::UniqueKeyDigest;
use crate::operations::replace_into::mutator::deletion_accumulator::DeletionAccumulator;
use crate::operations::replace_into::OnConflictField;

struct AggregationContext {
segment_locations: HashMap<SegmentIndex, Location>,
// the fields specified in ON CONFLICT clause
on_conflict_fields: Vec<OnConflictField>,
block_reader: Arc<BlockReader>,
// table fields excludes `on_conflict_fields`
remain_column_field_ids: Vec<FieldIndex>,
// reader that reads the ON CONFLICT key fields
key_column_reader: Arc<BlockReader>,
// reader that reads the `remain_column_field_ids`
remain_column_reader: Option<Arc<BlockReader>>,
data_accessor: Operator,
write_settings: WriteSettings,
read_settings: ReadSettings,
Expand Down Expand Up @@ -94,22 +103,61 @@ impl MergeIntoOperationAggregator {
let deletion_accumulator = DeletionAccumulator::default();
let segment_reader =
MetaReaders::segment_info_reader(data_accessor.clone(), table_schema.clone());
let indices = (0..table_schema.fields().len()).collect::<Vec<usize>>();
let projection = Projection::Columns(indices);
let block_reader = BlockReader::create(
data_accessor.clone(),
table_schema,
projection,
ctx.clone(),
false,
)?;

// order matters, later the projection that used by block readers depends on the order
let key_column_field_indexes: Vec<FieldIndex> =
on_conflict_fields.iter().map(|i| i.field_index).collect();

let remain_column_field_ids: Vec<FieldIndex> = {
let all_field_indexes = table_schema
.fields()
.iter()
.enumerate()
.filter(|(_, f)| !matches!(f.computed_expr(), Some(ComputedExpr::Virtual(_))))
.map(|(i, _)| i)
.collect::<Vec<FieldIndex>>();

all_field_indexes
.into_iter()
.filter(|index| !key_column_field_indexes.contains(index))
.collect()
};

let key_column_reader = {
let projection = Projection::Columns(key_column_field_indexes);
BlockReader::create(
data_accessor.clone(),
table_schema.clone(),
projection,
ctx.clone(),
false,
)
}?;

let remain_column_reader = {
if remain_column_field_ids.is_empty() {
None
} else {
let projection = Projection::Columns(remain_column_field_ids.clone());
let reader = BlockReader::create(
data_accessor.clone(),
table_schema,
projection,
ctx.clone(),
false,
)?;
Some(reader)
}
};

Ok(Self {
deletion_accumulator,
aggregation_ctx: Arc::new(AggregationContext {
segment_locations: HashMap::from_iter(segment_locations.into_iter()),
on_conflict_fields,
block_reader,
remain_column_field_ids,
key_column_reader,
remain_column_reader,
data_accessor,
write_settings,
read_settings,
Expand Down Expand Up @@ -245,51 +293,25 @@ impl AggregationContext {
deleted_key_hashes: &HashSet<UniqueKeyDigest>,
) -> Result<Option<ReplacementLogEntry>> {
info!(
"apply delete to segment idx {}, block idx {}",
segment_index, block_index
"apply delete to segment idx {}, block idx {}, num of deletion key hashes: {}",
segment_index,
block_index,
deleted_key_hashes.len()
);

if block_meta.row_count == 0 {
return Ok(None);
}

let reader = &self.block_reader;
let on_conflict_fields = &self.on_conflict_fields;
let key_columns_data = self.read_block(&self.key_column_reader, block_meta).await?;

let merged_io_read_result = reader
.read_columns_data_by_merge_io(
&self.read_settings,
&block_meta.location.0,
&block_meta.col_metas,
)
.await?;

// deserialize block data
// cpu intensive task, send them to dedicated thread pool
let data_block = {
let storage_format = self.write_settings.storage_format;
let block_meta_ptr = block_meta.clone();
let reader = reader.clone();
GlobalIORuntime::instance()
.spawn_blocking(move || {
let column_chunks = merged_io_read_result.columns_chunks()?;
reader.deserialize_chunks(
block_meta_ptr.location.0.as_str(),
block_meta_ptr.row_count as usize,
&block_meta_ptr.compression,
&block_meta_ptr.col_metas,
column_chunks,
&storage_format,
)
})
.await?
};

let num_rows = data_block.num_rows();
let num_rows = key_columns_data.num_rows();

let on_conflict_fields = &self.on_conflict_fields;
let mut columns = Vec::with_capacity(on_conflict_fields.len());
for field in on_conflict_fields {
let on_conflict_field_index = field.field_index;
let key_column = data_block
for (field, _) in on_conflict_fields.iter().enumerate() {
let on_conflict_field_index = field;
let key_column = key_columns_data
.columns()
.get(on_conflict_field_index)
.ok_or_else(|| {
Expand Down Expand Up @@ -322,7 +344,9 @@ impl AggregationContext {
}

let delete_nums = bitmap.unset_bits();
// shortcuts
info!("number of row deleted: {}", delete_nums);

// shortcut: nothing to be deleted
if delete_nums == 0 {
info!("nothing deleted");
// nothing to be deleted
Expand All @@ -334,11 +358,13 @@ impl AggregationContext {
// ignore bytes.
bytes: 0,
};

self.block_builder
.ctx
.get_write_progress()
.incr(&progress_values);

// shortcut: nothing to be deleted
if delete_nums == block_meta.row_count as usize {
info!("whole block deletion");
// whole block deletion
Expand All @@ -355,8 +381,38 @@ impl AggregationContext {
}

let bitmap = bitmap.into();
let new_block = data_block.filter_with_bitmap(&bitmap)?;
info!("number of row deleted: {}", delete_nums);
let mut key_columns_data_after_deletion = key_columns_data.filter_with_bitmap(&bitmap)?;

let new_block = match &self.remain_column_reader {
None => key_columns_data_after_deletion,
Some(remain_columns_reader) => {
// read the remaining columns
let remain_columns_data =
self.read_block(remain_columns_reader, block_meta).await?;

// remove the deleted rows
let remain_columns_data_after_deletion =
remain_columns_data.filter_with_bitmap(&bitmap)?;

// merge the remaining columns
for col in remain_columns_data_after_deletion.columns() {
key_columns_data_after_deletion.add_column(col.clone());
}

// resort the block
let col_indexes = self
.on_conflict_fields
.iter()
.map(|f| f.field_index)
.chain(self.remain_column_field_ids.iter().copied())
.collect::<Vec<_>>();
let mut projection = (0..col_indexes.len()).collect::<Vec<_>>();
projection.sort_by_key(|&i| col_indexes[i]);
let func_ctx = self.block_builder.ctx.get_function_context()?;
BlockOperator::Project { projection }
.execute(&func_ctx, key_columns_data_after_deletion)?
}
};

// serialization and compression is cpu intensive, send them to dedicated thread pool
// and wait (asyncly, which will NOT block the executor thread)
Expand Down Expand Up @@ -446,6 +502,35 @@ impl AggregationContext {
false
}
}

async fn read_block(&self, reader: &BlockReader, block_meta: &BlockMeta) -> Result<DataBlock> {
let merged_io_read_result = reader
.read_columns_data_by_merge_io(
&self.read_settings,
&block_meta.location.0,
&block_meta.col_metas,
)
.await?;

// deserialize block data
// cpu intensive task, send them to dedicated thread pool
let storage_format = self.write_settings.storage_format;
let block_meta_ptr = block_meta.clone();
let reader = reader.clone();
GlobalIORuntime::instance()
.spawn_blocking(move || {
let column_chunks = merged_io_read_result.columns_chunks()?;
reader.deserialize_chunks(
block_meta_ptr.location.0.as_str(),
block_meta_ptr.row_count as usize,
&block_meta_ptr.compression,
&block_meta_ptr.col_metas,
column_chunks,
&storage_format,
)
})
.await
}
}

#[cfg(test)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,53 @@ statement ok
DROP TABLE test



###########################################
# test cases for "prewhere" optimization #
###########################################

statement ok
drop table if exists t;

statement ok
create table t (id int, c1 int, c2 int, c3 int, c4 int) row_per_block=3;

statement ok
insert into t select number, number * 3, number * 5, number * 7, number * 9 from numbers(100);

query IIIII
select sum(id), sum(c1), sum(c2), sum(c3), sum(c4) from t;
----
4950 14850 24750 34650 44550

statement ok
replace into t on(c1, c3) select * from t;

# verify the replace into is idempotent
query IIIII
select sum(id), sum(c1), sum(c2), sum(c3), sum(c4) from t;
----
4950 14850 24750 34650 44550


# update half of the rows, set columns other than c1 and c3 to 0
statement ok
replace into t on(c3, c1) select 0 , c1, 0, c3, 0 from t where t.id % 2 = 0;

# verify the result is as expected
query IIIII
select sum(t.id), sum(t.c1), sum(t.c2), sum(t.c3), sum(t.c4) from (select 0 as id , c1, 0 as c2, c3, 0 as c4 from t where t.id % 2 = 0 union select * from t where t.id % 2 != 0) t;
----
2500 14850 12500 34650 22500

query IIIII
select sum(id), sum(c1), sum(c2), sum(c3), sum(c4) from t;
----
2500 14850 12500 34650 22500

statement ok
drop table t;

statement ok
DROP DATABASE db_09_0023

Expand Down