Skip to content

Commit

Permalink
fix predicate_index and not matched null cast
Browse files Browse the repository at this point in the history
  • Loading branch information
JackTan25 committed Oct 11, 2023
1 parent 0afc3f3 commit a26efba
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 5 deletions.
8 changes: 6 additions & 2 deletions src/query/service/src/interpreters/interpreter_merge_into.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Instant;
use std::usize::MAX;

use common_base::runtime::GlobalIORuntime;
use common_exception::ErrorCode;
Expand All @@ -33,6 +34,7 @@ use common_sql::executor::PhysicalPlan;
use common_sql::executor::PhysicalPlanBuilder;
use common_sql::plans::MergeInto as MergePlan;
use common_sql::plans::UpdatePlan;
use common_sql::IndexType;
use common_sql::ScalarExpr;
use common_sql::TypeCheck;
use common_storages_factory::Table;
Expand All @@ -51,6 +53,8 @@ use crate::pipelines::PipelineBuildResult;
use crate::schedulers::build_query_pipeline_without_render_result_set;
use crate::sessions::QueryContext;

// predicate_index should not be conflict with update expr's column_binding's index.
pub const PREDICATE_COLUMN_INDEX: IndexType = MAX;
const DUMMY_COL_INDEX: usize = 1;
pub struct MergeIntoInterpreter {
ctx: Arc<QueryContext>,
Expand Down Expand Up @@ -260,7 +264,7 @@ impl MergeIntoInterpreter {
self.ctx.clone(),
fuse_table.schema().into(),
col_indices,
Some(join_output_schema.num_fields()),
Some(PREDICATE_COLUMN_INDEX),
target_alias.is_some(),
)?;
let update_list = update_list
Expand All @@ -274,7 +278,7 @@ impl MergeIntoInterpreter {
// there will add a predicate col when we process matched clauses.
// so it's not in join_output_schema for now. But it's must be added
// to the tail, so let do it like below.
if name == &join_output_schema.num_fields().to_string() {
if *name == PREDICATE_COLUMN_INDEX.to_string() {
join_output_schema.num_fields()
} else {
join_output_schema.index_of(name).unwrap()
Expand Down
10 changes: 8 additions & 2 deletions src/query/sql/src/planner/binder/merge_into.rs
Original file line number Diff line number Diff line change
Expand Up @@ -408,8 +408,15 @@ impl Binder {
let mut values = Vec::with_capacity(default_schema.num_fields());
let update_columns_star = update_columns_star.unwrap();
for idx in 0..default_schema.num_fields() {
values.push(update_columns_star.get(&idx).unwrap().clone());
let scalar = update_columns_star.get(&idx).unwrap().clone();
// cast expr
values.push(wrap_cast_scalar(
&scalar,
&scalar.data_type()?,
&DataType::from(default_schema.field(idx).data_type()),
)?);
}

Ok(UnmatchedEvaluator {
source_schema: Arc::new(Arc::new(default_schema).into()),
condition,
Expand All @@ -423,7 +430,6 @@ impl Binder {
}

let mut values = Vec::with_capacity(clause.insert_operation.values.len());

// we need to get source schema, and use it for filling columns.
let source_schema = if let Some(fields) = clause.insert_operation.columns.clone() {
self.schema_project(&table_schema, &fields)?
Expand Down
1 change: 1 addition & 0 deletions src/query/sql/src/planner/plans/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ impl UpdatePlan {

let mut right = right.ok_or_else(|| ErrorCode::Internal("It's a bug"))?;
let right_data_type = right.data_type()?;

// cornor case: for merge into, if target_table's fields are not null, when after bind_join, it will
// change into nullable, so we need to cast this.
right = wrap_cast_scalar(&right, &right_data_type, target_type)?;
Expand Down
101 changes: 100 additions & 1 deletion tests/sqllogictests/suites/base/09_fuse_engine/09_0026_merge_into
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,105 @@ select * from target_test order by a;
3 f
5 f

## test not match cast and predicate index
statement ok
drop table if exists test_order;

statement ok
drop table if exists random_source;

statement ok
create table test_order (
id bigint,
id1 bigint,
id2 bigint,
id3 bigint,
id4 bigint,
id5 bigint,
id6 bigint,
id7 bigint,

s1 varchar,
s2 varchar,
s3 varchar,
s4 varchar,
s5 varchar,
s6 varchar,
s7 varchar,
s8 varchar,
s9 varchar,
s10 varchar,
s11 varchar,
s12 varchar,
s13 varchar,

d1 DECIMAL(20, 8),
d2 DECIMAL(20, 8),
d3 DECIMAL(20, 8),
d4 DECIMAL(20, 8),
d5 DECIMAL(20, 8),
d6 DECIMAL(30, 8),
d7 DECIMAL(30, 8),
d8 DECIMAL(30, 8),
d9 DECIMAL(30, 8),
d10 DECIMAL(30, 8),

insert_time datetime,
insert_time1 datetime,
insert_time2 datetime,
insert_time3 datetime,

i int

) CLUSTER BY(to_yyyymmdd(insert_time), id) bloom_index_columns='insert_time,id';

statement ok
create table random_source(
id bigint not null,
id1 bigint,
id2 bigint,
id3 bigint,
id4 bigint,
id5 bigint,
id6 bigint,
id7 bigint,

s1 varchar,
s2 varchar,
s3 varchar,
s4 varchar,
s5 varchar,
s6 varchar,
s7 varchar,
s8 varchar,
s9 varchar,
s10 varchar,
s11 varchar,
s12 varchar,
s13 varchar,

d1 DECIMAL(20, 8),
d2 DECIMAL(20, 8),
d3 DECIMAL(20, 8),
d4 DECIMAL(20, 8),
d5 DECIMAL(20, 8),
d6 DECIMAL(30, 8),
d7 DECIMAL(30, 8),
d8 DECIMAL(30, 8),
d9 DECIMAL(30, 8),
d10 DECIMAL(30, 8),

insert_time datetime not null,
insert_time1 datetime,
insert_time2 datetime,
insert_time3 datetime,

i int

) Engine = Random;

statement ok
merge into test_order as t using (select id,34 as id1,238 as id2, id3, id4, id5, id6, id7,s1, s2, s3, s4, s5, s6, s7, s8, s9, s10, s11, s12, s13,d1, d2, d3, d4, d5, d6, d7, d8, d9, d10,insert_time,insert_time1,insert_time2,insert_time3,i from random_source limit 1) as s on t.id = s.id and t.insert_time = s.insert_time when matched then update * when not matched then insert *;

statement ok
set enable_experimental_merge_into = 0;

0 comments on commit a26efba

Please sign in to comment.