Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(query): union all panic in mysql client #17095

Merged
merged 1 commit into from
Dec 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 57 additions & 41 deletions src/query/sql/src/executor/physical_plans/physical_union_all.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashSet;

use databend_common_exception::Result;
use databend_common_expression::DataField;
use databend_common_expression::DataSchema;
use databend_common_expression::DataSchemaRef;
use databend_common_expression::DataSchemaRefExt;
use databend_common_expression::RemoteExpr;
Expand All @@ -24,6 +27,7 @@ use crate::executor::PhysicalPlanBuilder;
use crate::optimizer::SExpr;
use crate::ColumnSet;
use crate::IndexType;
use crate::ScalarExpr;
use crate::TypeCheck;

#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
Expand Down Expand Up @@ -52,32 +56,49 @@ impl PhysicalPlanBuilder {
&mut self,
s_expr: &SExpr,
union_all: &crate::plans::UnionAll,
required: ColumnSet,
mut required: ColumnSet,
stat_info: PlanStatsInfo,
) -> Result<PhysicalPlan> {
// 1. Prune unused Columns.
let left_required = union_all
let metadata = self.metadata.read().clone();
let lazy_columns = metadata.lazy_columns();
required.extend(lazy_columns.clone());
let indices: Vec<_> = union_all
.left_outputs
.iter()
.fold(required.clone(), |mut acc, v| {
acc.insert(v.0);
acc
});
let right_required = union_all.right_outputs.iter().fold(required, |mut acc, v| {
acc.insert(v.0);
acc
});
.enumerate()
.filter_map(|(index, v)| required.contains(&v.0).then_some(index))
.collect();
let (left_required, right_required) = if indices.is_empty() {
(
HashSet::from([union_all.left_outputs[0].0]),
HashSet::from([union_all.right_outputs[0].0]),
)
} else {
indices.iter().fold(
(
HashSet::with_capacity(indices.len()),
HashSet::with_capacity(indices.len()),
),
|(mut left, mut right), &index| {
left.insert(union_all.left_outputs[index].0);
right.insert(union_all.right_outputs[index].0);
(left, right)
},
)
};

// 2. Build physical plan.
let left_plan = self.build(s_expr.child(0)?, left_required).await?;
let right_plan = self.build(s_expr.child(1)?, right_required).await?;
let left_plan = self.build(s_expr.child(0)?, left_required.clone()).await?;
let right_plan = self.build(s_expr.child(1)?, right_required.clone()).await?;

let left_schema = left_plan.output_schema()?;
let right_schema = right_plan.output_schema()?;

let fields = union_all
.left_outputs
.iter()
.filter(|(index, _)| left_required.contains(index))
.map(|(index, expr)| {
if let Some(expr) = expr {
Ok(DataField::new(&index.to_string(), expr.data_type()?))
Expand All @@ -87,35 +108,9 @@ impl PhysicalPlanBuilder {
})
.collect::<Result<Vec<_>>>()?;

let left_outputs = union_all
.left_outputs
.iter()
.map(|(index, scalar_expr)| {
if let Some(scalar_expr) = scalar_expr {
let expr = scalar_expr
.type_check(left_schema.as_ref())?
.project_column_ref(|idx| left_schema.index_of(&idx.to_string()).unwrap());
Ok((*index, Some(expr.as_remote_expr())))
} else {
Ok((*index, None))
}
})
.collect::<Result<Vec<_>>>()?;

let right_outputs = union_all
.right_outputs
.iter()
.map(|(index, scalar_expr)| {
if let Some(scalar_expr) = scalar_expr {
let expr = scalar_expr
.type_check(right_schema.as_ref())?
.project_column_ref(|idx| right_schema.index_of(&idx.to_string()).unwrap());
Ok((*index, Some(expr.as_remote_expr())))
} else {
Ok((*index, None))
}
})
.collect::<Result<Vec<_>>>()?;
let left_outputs = process_outputs(&union_all.left_outputs, &left_required, &left_schema)?;
let right_outputs =
process_outputs(&union_all.right_outputs, &right_required, &right_schema)?;

Ok(PhysicalPlan::UnionAll(UnionAll {
plan_id: 0,
Expand All @@ -130,3 +125,24 @@ impl PhysicalPlanBuilder {
}))
}
}

fn process_outputs(
outputs: &[(IndexType, Option<ScalarExpr>)],
required: &ColumnSet,
schema: &DataSchema,
) -> Result<Vec<(IndexType, Option<RemoteExpr>)>> {
outputs
.iter()
.filter(|(index, _)| required.contains(index))
.map(|(index, scalar_expr)| {
if let Some(scalar_expr) = scalar_expr {
let expr = scalar_expr
.type_check(schema)?
.project_column_ref(|idx| schema.index_of(&idx.to_string()).unwrap());
Ok((*index, Some(expr.as_remote_expr())))
} else {
Ok((*index, None))
}
})
.collect()
}
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,12 @@ select a, b, change$action, change$is_update from s3 as _change_delete order by
4 4 INSERT 0
6 5 INSERT 0

# ISSUE 17085
query I
select a from s3 where a > 5
----
6

statement ok
create table t4(a int, b int)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,93 +61,81 @@ AggregateFinal
├── aggregate functions: []
├── estimated rows: 0.00
└── UnionAll
├── output columns: [t.id (#0), de (#6)]
├── output columns: [t.id (#0)]
├── estimated rows: 0.00
├── EvalScalar
│ ├── output columns: [t.id (#0), de (#6)]
│ ├── expressions: [if(CAST(is_not_null(sum(tb.de) (#5)) AS Boolean NULL), CAST(assume_not_null(sum(tb.de) (#5)) AS Int64 NULL), true, 0, NULL)]
├── AggregateFinal
│ ├── output columns: [t.id (#0)]
│ ├── group by: [id]
│ ├── aggregate functions: []
│ ├── estimated rows: 0.00
│ └── AggregateFinal
│ ├── output columns: [sum(tb.de) (#5), t.id (#0)]
│ └── AggregatePartial
│ ├── group by: [id]
│ ├── aggregate functions: [sum(sum(coalesce(t3.val, 0)))]
│ ├── aggregate functions: []
│ ├── estimated rows: 0.00
│ └── AggregatePartial
│ ├── group by: [id]
│ ├── aggregate functions: [sum(sum(coalesce(t3.val, 0)))]
│ └── HashJoin
│ ├── output columns: [t.id (#0)]
│ ├── join type: LEFT OUTER
│ ├── build keys: [tb.sid (#1)]
│ ├── probe keys: [t.id (#0)]
│ ├── filters: []
│ ├── estimated rows: 0.00
│ └── HashJoin
│ ├── output columns: [t.id (#0), sum(coalesce(t3.val, 0)) (#4)]
│ ├── join type: LEFT OUTER
│ ├── build keys: [tb.sid (#1)]
│ ├── probe keys: [t.id (#0)]
│ ├── filters: []
│ ├── AggregateFinal(Build)
│ │ ├── output columns: [t2.sid (#1)]
│ │ ├── group by: [sid]
│ │ ├── aggregate functions: []
│ │ ├── estimated rows: 0.00
│ │ └── AggregatePartial
│ │ ├── group by: [sid]
│ │ ├── aggregate functions: []
│ │ ├── estimated rows: 0.00
│ │ └── Filter
│ │ ├── output columns: [t2.sid (#1)]
│ │ ├── filters: [is_true(t3.sid (#1) = 1)]
│ │ ├── estimated rows: 0.00
│ │ └── TableScan
│ │ ├── table: default.default.t2
│ │ ├── output columns: [sid (#1)]
│ │ ├── read rows: 0
│ │ ├── read size: 0
│ │ ├── partitions total: 0
│ │ ├── partitions scanned: 0
│ │ ├── push downs: [filters: [is_true(t2.sid (#1) = 1)], limit: NONE]
│ │ └── estimated rows: 0.00
│ └── Filter(Probe)
│ ├── output columns: [t.id (#0)]
│ ├── filters: [is_true(t.id (#0) = 1)]
│ ├── estimated rows: 0.00
│ ├── AggregateFinal(Build)
│ │ ├── output columns: [sum(coalesce(t3.val, 0)) (#4), t2.sid (#1)]
│ │ ├── group by: [sid]
│ │ ├── aggregate functions: [sum(sum_arg_0)]
│ │ ├── estimated rows: 0.00
│ │ └── AggregatePartial
│ │ ├── group by: [sid]
│ │ ├── aggregate functions: [sum(sum_arg_0)]
│ │ ├── estimated rows: 0.00
│ │ └── EvalScalar
│ │ ├── output columns: [t2.sid (#1), sum_arg_0 (#3)]
│ │ ├── expressions: [if(CAST(is_not_null(t3.val (#2)) AS Boolean NULL), CAST(assume_not_null(t3.val (#2)) AS Int32 NULL), true, 0, NULL)]
│ │ ├── estimated rows: 0.00
│ │ └── Filter
│ │ ├── output columns: [t2.sid (#1), t2.val (#2)]
│ │ ├── filters: [is_true(t3.sid (#1) = 1)]
│ │ ├── estimated rows: 0.00
│ │ └── TableScan
│ │ ├── table: default.default.t2
│ │ ├── output columns: [sid (#1), val (#2)]
│ │ ├── read rows: 0
│ │ ├── read size: 0
│ │ ├── partitions total: 0
│ │ ├── partitions scanned: 0
│ │ ├── push downs: [filters: [is_true(t2.sid (#1) = 1)], limit: NONE]
│ │ └── estimated rows: 0.00
│ └── Filter(Probe)
│ ├── output columns: [t.id (#0)]
│ ├── filters: [is_true(t.id (#0) = 1)]
│ ├── estimated rows: 0.00
│ └── TableScan
│ ├── table: default.default.t1
│ ├── output columns: [id (#0)]
│ ├── read rows: 0
│ ├── read size: 0
│ ├── partitions total: 0
│ ├── partitions scanned: 0
│ ├── push downs: [filters: [is_true(t1.id (#0) = 1)], limit: NONE]
│ └── estimated rows: 0.00
└── EvalScalar
├── output columns: [t.id (#7), de (#8)]
├── expressions: [0]
│ └── TableScan
│ ├── table: default.default.t1
│ ├── output columns: [id (#0)]
│ ├── read rows: 0
│ ├── read size: 0
│ ├── partitions total: 0
│ ├── partitions scanned: 0
│ ├── push downs: [filters: [is_true(t1.id (#0) = 1)], limit: NONE]
│ └── estimated rows: 0.00
└── AggregateFinal
├── output columns: [t.id (#7)]
├── group by: [id]
├── aggregate functions: []
├── estimated rows: 0.00
└── AggregateFinal
├── output columns: [t.id (#7)]
└── AggregatePartial
├── group by: [id]
├── aggregate functions: []
├── estimated rows: 0.00
└── AggregatePartial
├── group by: [id]
├── aggregate functions: []
└── Filter
├── output columns: [t.id (#7)]
├── filters: [is_true(t.id (#7) = 1)]
├── estimated rows: 0.00
└── Filter
├── output columns: [t.id (#7)]
├── filters: [is_true(t.id (#7) = 1)]
├── estimated rows: 0.00
└── TableScan
├── table: default.default.t1
├── output columns: [id (#7)]
├── read rows: 0
├── read size: 0
├── partitions total: 0
├── partitions scanned: 0
├── push downs: [filters: [is_true(t1.id (#7) = 1)], limit: NONE]
└── estimated rows: 0.00
└── TableScan
├── table: default.default.t1
├── output columns: [id (#7)]
├── read rows: 0
├── read size: 0
├── partitions total: 0
├── partitions scanned: 0
├── push downs: [filters: [is_true(t1.id (#7) = 1)], limit: NONE]
└── estimated rows: 0.00

statement ok
drop table if exists t1;
Expand Down
36 changes: 36 additions & 0 deletions tests/sqllogictests/suites/mode/standalone/explain/union.test
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,42 @@ Limit
├── push downs: [filters: [], limit: 1]
└── estimated rows: 2.00

# ISSUE 17085
query T
explain select b from (select * from t1 where a>1 union all select * from t2 where b>2)
----
UnionAll
├── output columns: [t1.b (#1)]
├── estimated rows: 2.00
├── Filter
│ ├── output columns: [t1.b (#1)]
│ ├── filters: [is_true(t1.a (#0) > 1)]
│ ├── estimated rows: 1.00
│ └── TableScan
│ ├── table: default.default.t1
│ ├── output columns: [a (#0), b (#1)]
│ ├── read rows: 2
│ ├── read size: < 1 KiB
│ ├── partitions total: 1
│ ├── partitions scanned: 1
│ ├── pruning stats: [segments: <range pruning: 1 to 1>, blocks: <range pruning: 1 to 1>]
│ ├── push downs: [filters: [is_true(t1.a (#0) > 1)], limit: NONE]
│ └── estimated rows: 2.00
└── Filter
├── output columns: [t2.b (#3)]
├── filters: [is_true(t2.b (#3) > 2)]
├── estimated rows: 1.00
└── TableScan
├── table: default.default.t2
├── output columns: [b (#3)]
├── read rows: 2
├── read size: < 1 KiB
├── partitions total: 1
├── partitions scanned: 1
├── pruning stats: [segments: <range pruning: 1 to 1>, blocks: <range pruning: 1 to 1>]
├── push downs: [filters: [is_true(t2.b (#3) > 2)], limit: NONE]
└── estimated rows: 2.00

statement ok
drop table t1

Expand Down
Loading
Loading