Skip to content

Commit

Permalink
chore: test Mergeinto runtime filter performance and wizard (#14212)
Browse files Browse the repository at this point in the history
* add optimize comment

* add join optimizer top rule

* add distributed optmize rule

* add change join order flag

* fix not append plan

* fix typo

* add more cluster test

* fix test

* finish new distributed pipeline

* fix typo

* fix lint

* fix bugs

* fix static filter bug

* fix

* fix test

* fix test

* rollback

* remove static filter

* fix typo

* resolve conflict

* fix conflict

* remove static filter

* fix optimizer

* fix rowid split for new distributed execution

* fix lint

* fix fragment type

* add rule comment

* fix conflict

* add more tests and fix target build bug when not match merge into optimizer

* extract merge into branch

* fix lint

* remove error test

* rollback random exchange, cc @leiSky

* use distributed optimizer result when change joind order

* fix bad optimizer

* change as broadcast join

* fix lint

* fix lint

* fix lint

* remove static filter file

---------

Co-authored-by: BohuTANG <overred.shuttler@gmail.com>
Co-authored-by: dantengsky <dantengsky@gmail.com>
  • Loading branch information
3 people authored Jan 4, 2024
1 parent 5446f61 commit 8ba2c12
Show file tree
Hide file tree
Showing 25 changed files with 805 additions and 832 deletions.
78 changes: 57 additions & 21 deletions src/query/service/src/interpreters/interpreter_merge_into.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@ use databend_common_exception::Result;
use databend_common_expression::types::UInt32Type;
use databend_common_expression::ConstantFolder;
use databend_common_expression::DataBlock;
use databend_common_expression::DataField;
use databend_common_expression::DataSchema;
use databend_common_expression::DataSchemaRef;
use databend_common_expression::FieldIndex;
use databend_common_expression::FromData;
use databend_common_expression::RemoteExpr;
use databend_common_expression::SendableDataBlockStream;
use databend_common_expression::ROW_ID_COL_NAME;
use databend_common_expression::ROW_NUMBER_COL_NAME;
use databend_common_functions::BUILTIN_FUNCTIONS;
use databend_common_meta_app::schema::TableInfo;
Expand Down Expand Up @@ -162,6 +164,7 @@ impl MergeIntoInterpreter {
field_index_map,
merge_type,
distributed,
change_join_order,
..
} = &self.plan;

Expand All @@ -181,20 +184,17 @@ impl MergeIntoInterpreter {
let table_name = table_name.clone();
let input = input.clone();

let input = if let RelOperator::Exchange(_) = input.plan() {
Box::new(input.child(0)?.clone())
// we need to extract join plan, but we need to give this exchange
// back at last.
let (input, extract_exchange) = if let RelOperator::Exchange(_) = input.plan() {
(Box::new(input.child(0)?.clone()), true)
} else {
input
(input, false)
};

let optimized_input =
Self::build_static_filter(&input, meta_data, self.ctx.clone(), check_table).await?;
let mut builder = PhysicalPlanBuilder::new(meta_data.clone(), self.ctx.clone(), false);

// build source for MergeInto
let join_input = builder
.build(&optimized_input, *columns_set.clone())
.await?;
let join_input = builder.build(&input, *columns_set.clone()).await?;

// find row_id column index
let join_output_schema = join_input.output_schema()?;
Expand Down Expand Up @@ -227,7 +227,7 @@ impl MergeIntoInterpreter {
}
}

if *distributed {
if *distributed && !*change_join_order {
row_number_idx = Some(join_output_schema.index_of(ROW_NUMBER_COL_NAME)?);
}

Expand All @@ -238,7 +238,7 @@ impl MergeIntoInterpreter {
));
}

if *distributed && row_number_idx.is_none() {
if *distributed && row_number_idx.is_none() && !*change_join_order {
return Err(ErrorCode::InvalidRowIdIndex(
"can't get internal row_number_idx when running merge into",
));
Expand All @@ -258,11 +258,28 @@ impl MergeIntoInterpreter {

// merge_into_source is used to recv join's datablocks and split them into macthed and not matched
// datablocks.
let merge_into_source = PhysicalPlan::MergeIntoSource(MergeIntoSource {
input: Box::new(join_input),
row_id_idx: row_id_idx as u32,
merge_type: merge_type.clone(),
});
let merge_into_source = if !*distributed && extract_exchange {
// if we doesn't support distributed merge into, we should give the exchange merge back.
let rollback_join_input = PhysicalPlan::Exchange(Exchange {
plan_id: 0,
input: Box::new(join_input),
kind: FragmentKind::Merge,
keys: vec![],
allow_adjust_parallelism: true,
ignore_exchange: false,
});
PhysicalPlan::MergeIntoSource(MergeIntoSource {
input: Box::new(rollback_join_input),
row_id_idx: row_id_idx as u32,
merge_type: merge_type.clone(),
})
} else {
PhysicalPlan::MergeIntoSource(MergeIntoSource {
input: Box::new(join_input),
row_id_idx: row_id_idx as u32,
merge_type: merge_type.clone(),
})
};

// transform unmatched for insert
// reference to func `build_eval_scalar`
Expand Down Expand Up @@ -399,6 +416,7 @@ impl MergeIntoInterpreter {
distributed: false,
output_schema: DataSchemaRef::default(),
merge_type: merge_type.clone(),
change_join_order: *change_join_order,
}))
} else {
let merge_append = PhysicalPlan::MergeInto(Box::new(MergeInto {
Expand All @@ -409,14 +427,30 @@ impl MergeIntoInterpreter {
matched,
field_index_of_input_schema,
row_id_idx,
segments,
segments: segments.clone(),
distributed: true,
output_schema: DataSchemaRef::new(DataSchema::new(vec![
join_output_schema.fields[row_number_idx.unwrap()].clone(),
])),
output_schema: match *change_join_order {
false => DataSchemaRef::new(DataSchema::new(vec![
join_output_schema.fields[row_number_idx.unwrap()].clone(),
])),
true => DataSchemaRef::new(DataSchema::new(vec![DataField::new(
ROW_ID_COL_NAME,
databend_common_expression::types::DataType::Number(
databend_common_expression::types::NumberDataType::UInt64,
),
)])),
},
merge_type: merge_type.clone(),
change_join_order: *change_join_order,
}));

// if change_join_order = true, it means the target is build side,
// in this way, we will do matched operation and not matched operation
// locally in every node, and the main node just receive rowids to apply.
let segments = if *change_join_order {
segments.clone()
} else {
vec![]
};
PhysicalPlan::MergeIntoAppendNotMatched(Box::new(MergeIntoAppendNotMatched {
input: Box::new(PhysicalPlan::Exchange(Exchange {
plan_id: 0,
Expand All @@ -431,6 +465,8 @@ impl MergeIntoInterpreter {
unmatched: unmatched.clone(),
input_schema: merge_into_source.output_schema()?,
merge_type: merge_type.clone(),
change_join_order: *change_join_order,
segments,
}))
};

Expand Down
Loading

0 comments on commit 8ba2c12

Please sign in to comment.