diff --git a/src/common/src/lib.rs b/src/common/src/lib.rs index 3b6f98e919d15..5a076d2ad4a48 100644 --- a/src/common/src/lib.rs +++ b/src/common/src/lib.rs @@ -13,11 +13,9 @@ // limitations under the License. #![allow(rustdoc::private_intra_doc_links)] -#![allow(clippy::derive_partial_eq_without_eq)] #![feature(trait_alias)] #![feature(binary_heap_drain_sorted)] #![feature(is_sorted)] -#![feature(fn_traits)] #![feature(type_alias_impl_trait)] #![feature(test)] #![feature(trusted_len)] diff --git a/src/common/src/types/mod.rs b/src/common/src/types/mod.rs index e764a9a9728c6..a795902f38871 100644 --- a/src/common/src/types/mod.rs +++ b/src/common/src/types/mod.rs @@ -28,6 +28,7 @@ use crate::error::BoxedError; mod native_type; mod ops; mod scalar_impl; +mod successor; use std::fmt::Debug; use std::io::Cursor; @@ -37,6 +38,7 @@ pub use native_type::*; use risingwave_pb::data::data_type::IntervalType::*; use risingwave_pb::data::data_type::{IntervalType, TypeName}; pub use scalar_impl::*; +pub use successor::*; pub mod chrono_wrapper; pub mod decimal; pub mod interval; diff --git a/src/common/src/types/successor.rs b/src/common/src/types/successor.rs new file mode 100644 index 0000000000000..20b729f4df2a5 --- /dev/null +++ b/src/common/src/types/successor.rs @@ -0,0 +1,84 @@ +// Copyright 2023 Singularity Data +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use chrono::Duration; + +use super::{NaiveDateTimeWrapper, NaiveDateWrapper, ScalarImpl}; + +/// A successor is a term that comes right after a particular value. Suppose n is a number (where n +/// belongs to any whole number), then the successor of n is 'n+1'. The other terminologies used for +/// a successor are just after, immediately after, and next value. +pub trait Successor { + /// Returns the successor of the current value if it exists, otherwise returns None. + fn successor(&self) -> Option + where + Self: Sized, + { + None + } +} + +impl Successor for i16 { + fn successor(&self) -> Option { + self.checked_add(1) + } +} + +impl Successor for i32 { + fn successor(&self) -> Option { + self.checked_add(1) + } +} + +impl Successor for i64 { + fn successor(&self) -> Option { + self.checked_add(1) + } +} + +impl Successor for NaiveDateTimeWrapper { + fn successor(&self) -> Option { + self.0 + .checked_add_signed(Duration::nanoseconds(1)) + .map(NaiveDateTimeWrapper) + } +} + +impl Successor for NaiveDateWrapper { + fn successor(&self) -> Option { + self.0 + .checked_add_signed(Duration::days(1)) + .map(NaiveDateWrapper) + } +} + +impl ScalarImpl { + /// Returns the successor of the current value if it exists. + /// + /// See also [`Successor`]. + /// + /// The function may return None when: + /// 1. The current value is the maximum value of the type. + /// 2. The successor value of the type is not well-defined. + pub fn successor(&self) -> Option { + match self { + ScalarImpl::Int16(v) => v.successor().map(ScalarImpl::Int16), + ScalarImpl::Int32(v) => v.successor().map(ScalarImpl::Int32), + ScalarImpl::Int64(v) => v.successor().map(ScalarImpl::Int64), + ScalarImpl::NaiveDateTime(v) => v.successor().map(ScalarImpl::NaiveDateTime), + ScalarImpl::NaiveDate(v) => v.successor().map(ScalarImpl::NaiveDate), + _ => None, + } + } +} diff --git a/src/frontend/planner_test/tests/testdata/subquery.yaml b/src/frontend/planner_test/tests/testdata/subquery.yaml index 6551564c73e02..69c87d032b751 100644 --- a/src/frontend/planner_test/tests/testdata/subquery.yaml +++ b/src/frontend/planner_test/tests/testdata/subquery.yaml @@ -106,11 +106,11 @@ ) as t0 ); optimized_logical_plan: | - LogicalJoin { type: LeftSemi, on: IsNotDistinctFrom(ab.a, bc.b), output: all } + LogicalJoin { type: LeftSemi, on: IsNotDistinctFrom(ab.a, t.v1), output: all } ├─LogicalScan { table: ab, columns: [ab.a, ab.b] } └─LogicalJoin { type: Inner, on: true, output: all } - ├─LogicalScan { table: bc, columns: [bc.b] } - └─LogicalScan { table: t, output_columns: [], required_columns: [t.v1], predicate: IsNotNull(t.v1) } + ├─LogicalScan { table: bc, columns: [] } + └─LogicalScan { table: t, columns: [t.v1], predicate: IsNotNull(t.v1) } - name: We cannot reference columns in left table if not lateral sql: | create table ab (a int, b int); @@ -322,21 +322,20 @@ └─LogicalFilter { predicate: (b.b1 = CorrelatedInputRef { index: 0, correlated_id: 1 }) } └─LogicalScan { table: b, columns: [b.b1, b.b2, b._row_id] } optimized_logical_plan: | - LogicalJoin { type: Inner, on: IsNotDistinctFrom(a.a1, a.a1) AND (a.a1 = min(b.b2)), output: [a.a1, a.a2] } + LogicalJoin { type: Inner, on: IsNotDistinctFrom(a.a1, a.a1) AND (a.a1 = min(b.b1)), output: [a.a1, a.a2] } ├─LogicalScan { table: a, columns: [a.a1, a.a2] } - └─LogicalAgg { group_key: [a.a1], aggs: [min(b.b2)] } - └─LogicalJoin { type: LeftOuter, on: IsNotDistinctFrom(a.a1, b.b1), output: [a.a1, b.b2] } + └─LogicalAgg { group_key: [a.a1], aggs: [min(b.b1)] } + └─LogicalJoin { type: LeftOuter, on: IsNotDistinctFrom(a.a1, a.a1), output: [a.a1, b.b1] } ├─LogicalAgg { group_key: [a.a1], aggs: [] } | └─LogicalScan { table: a, columns: [a.a1] } - └─LogicalJoin { type: Inner, on: (b.b2 = min(b.b1)), output: [b.b1, b.b2] } + └─LogicalJoin { type: Inner, on: (b.b2 = min(b.b1)), output: [a.a1, b.b1] } ├─LogicalScan { table: b, columns: [b.b1, b.b2] } - └─LogicalProject { exprs: [min(b.b1)] } - └─LogicalAgg { group_key: [a.a1], aggs: [min(b.b1)] } - └─LogicalJoin { type: LeftOuter, on: IsNotDistinctFrom(a.a1, b.b1), output: [a.a1, b.b1] } - ├─LogicalAgg { group_key: [a.a1], aggs: [] } - | └─LogicalScan { table: a, columns: [a.a1] } - └─LogicalProject { exprs: [b.b1, b.b1] } - └─LogicalScan { table: b, columns: [b.b1], predicate: IsNotNull(b.b1) } + └─LogicalAgg { group_key: [a.a1], aggs: [min(b.b1)] } + └─LogicalJoin { type: LeftOuter, on: IsNotDistinctFrom(a.a1, b.b1), output: [a.a1, b.b1] } + ├─LogicalAgg { group_key: [a.a1], aggs: [] } + | └─LogicalScan { table: a, columns: [a.a1] } + └─LogicalProject { exprs: [b.b1, b.b1] } + └─LogicalScan { table: b, columns: [b.b1], predicate: IsNotNull(b.b1) } - name: test subquery in join on condition sql: | create table a (v1 int, v2 int); diff --git a/src/frontend/planner_test/tests/testdata/subquery_expr_correlated.yaml b/src/frontend/planner_test/tests/testdata/subquery_expr_correlated.yaml index e9ed6911aac28..54ef8d3c35b4a 100644 --- a/src/frontend/planner_test/tests/testdata/subquery_expr_correlated.yaml +++ b/src/frontend/planner_test/tests/testdata/subquery_expr_correlated.yaml @@ -624,9 +624,9 @@ create table t3(x int, y int); select * from t1 where exists(select t2.x from t2 right join t3 on t2.x = t3.x and t1.y = t2.y and t1.y = t3.y); optimized_logical_plan: | - LogicalJoin { type: LeftSemi, on: IsNotDistinctFrom(t1.y, t1.y), output: all } + LogicalJoin { type: LeftSemi, on: IsNotDistinctFrom(t1.y, t2.y), output: all } ├─LogicalScan { table: t1, columns: [t1.x, t1.y] } - └─LogicalJoin { type: LeftOuter, on: (t2.x = t3.x) AND (t2.y = t3.y) AND IsNotDistinctFrom(t2.y, t1.y), output: [t1.y] } + └─LogicalJoin { type: LeftOuter, on: (t2.x = t3.x) AND (t2.y = t3.y) AND IsNotDistinctFrom(t2.y, t1.y), output: [t2.y] } ├─LogicalJoin { type: Inner, on: true, output: all } | ├─LogicalAgg { group_key: [t1.y], aggs: [] } | | └─LogicalScan { table: t1, columns: [t1.y] } @@ -737,3 +737,31 @@ ├─LogicalAgg { group_key: [strings.v1], aggs: [] } | └─LogicalScan { table: strings, columns: [strings.v1] } └─LogicalScan { table: strings, columns: [strings.v1] } +- name: Existential join on outer join with correlated condition + sql: | + create table t1(x int, y int); + create table t2(x int, y int); + create table t3(a varchar, z int); + select x from t1 where y in (select y from t3 full join t2 where t1.x = t2.x and z IS NOT DISTINCT FROM t2.x); + optimized_logical_plan: | + LogicalJoin { type: LeftSemi, on: (t1.y = t2.y) AND (t1.x = t2.x), output: [t1.x] } + ├─LogicalScan { table: t1, columns: [t1.x, t1.y] } + └─LogicalProject { exprs: [t2.y, t2.x] } + └─LogicalFilter { predicate: IsNotDistinctFrom(t3.z, t2.x) } + └─LogicalJoin { type: FullOuter, on: true, output: all } + ├─LogicalScan { table: t3, columns: [t3.z] } + └─LogicalScan { table: t2, columns: [t2.x, t2.y] } +- name: Correlated condition in RHS of right outer join + sql: | + create table t1(x int, y int); + create table t2(x int, y int); + create table t3(a varchar, z int); + select x from t1 where y in (select y from t3 right join t2 where t1.x = t2.x and z IS NOT DISTINCT FROM t2.x); + optimized_logical_plan: | + LogicalJoin { type: LeftSemi, on: (t1.y = t2.y) AND (t1.x = t2.x), output: [t1.x] } + ├─LogicalScan { table: t1, columns: [t1.x, t1.y] } + └─LogicalProject { exprs: [t2.y, t2.x] } + └─LogicalFilter { predicate: IsNotDistinctFrom(t3.z, t2.x) } + └─LogicalJoin { type: LeftOuter, on: true, output: [t3.z, t2.x, t2.y] } + ├─LogicalScan { table: t2, columns: [t2.x, t2.y] } + └─LogicalScan { table: t3, columns: [t3.z] } diff --git a/src/frontend/src/optimizer/rule/apply_join_transpose_rule.rs b/src/frontend/src/optimizer/rule/apply_join_transpose_rule.rs index 09ddf7e300281..15905c6b623d9 100644 --- a/src/frontend/src/optimizer/rule/apply_join_transpose_rule.rs +++ b/src/frontend/src/optimizer/rule/apply_join_transpose_rule.rs @@ -23,9 +23,7 @@ use crate::expr::{ CollectInputRef, CorrelatedId, CorrelatedInputRef, Expr, ExprImpl, ExprRewriter, ExprType, ExprVisitor, FunctionCall, InputRef, }; -use crate::optimizer::plan_node::{ - LogicalApply, LogicalFilter, LogicalJoin, LogicalProject, PlanTreeNodeBinary, -}; +use crate::optimizer::plan_node::{LogicalApply, LogicalFilter, LogicalJoin, PlanTreeNodeBinary}; use crate::optimizer::plan_visitor::{ExprCorrelatedIdFinder, PlanCorrelatedIdFinder}; use crate::optimizer::PlanRef; use crate::utils::{ColIndexMapping, Condition}; @@ -157,36 +155,38 @@ impl Rule for ApplyJoinTransposeRule { JoinType::Unspecified => unreachable!(), }; - if push_left && push_right { - Some(self.push_apply_both_side( + let out = if push_left && push_right { + self.push_apply_both_side( apply_left, join, apply_on, apply_join_type, correlated_id, correlated_indices, - )) + ) } else if push_left { - Some(self.push_apply_left_side( + self.push_apply_left_side( apply_left, join, apply_on, apply_join_type, correlated_id, correlated_indices, - )) + ) } else if push_right { - Some(self.push_apply_right_side( + self.push_apply_right_side( apply_left, join, apply_on, apply_join_type, correlated_id, correlated_indices, - )) + ) } else { unreachable!(); - } + }; + assert_eq!(out.schema(), plan.schema()); + Some(out) } } @@ -365,19 +365,41 @@ impl ApplyJoinTransposeRule { correlated_indices, false, ); + let output_indices: Vec<_> = { + let (apply_left_len, join_right_len) = match apply_join_type { + JoinType::LeftSemi | JoinType::LeftAnti => (apply_left_len, 0), + JoinType::RightSemi | JoinType::RightAnti => (0, join.right().schema().len()), + _ => (apply_left_len, join.right().schema().len()), + }; + + let left_iter = join_left_len..join_left_len + apply_left_len; + let right_iter = (0..join_left_len).chain( + join_left_len + apply_left_len..join_left_len + apply_left_len + join_right_len, + ); + + match join.join_type() { + JoinType::LeftSemi | JoinType::LeftAnti => left_iter.collect(), + JoinType::RightSemi | JoinType::RightAnti => right_iter.collect(), + _ => left_iter.chain(right_iter).collect(), + } + }; + let mut output_indices_mapping = + ColIndexMapping::new(output_indices.iter().map(|x| Some(*x)).collect()); let new_join = LogicalJoin::new( join.left().clone(), new_join_right.clone(), join.join_type(), new_join_condition, - ); + ) + .clone_with_output_indices(output_indices); // Leave other condition for predicate push down to deal with LogicalFilter::create( new_join.into(), Condition { conjunctions: other_condition, - }, + } + .rewrite_expr(&mut output_indices_mapping), ) } @@ -511,78 +533,46 @@ impl ApplyJoinTransposeRule { correlated_indices, false, ); + + let output_indices: Vec<_> = { + let (apply_left_len, join_right_len) = match apply_join_type { + JoinType::LeftSemi | JoinType::LeftAnti => (apply_left_len, 0), + JoinType::RightSemi | JoinType::RightAnti => (0, join.right().schema().len()), + _ => (apply_left_len, join.right().schema().len()), + }; + + let left_iter = 0..join_left_len + apply_left_len; + let right_iter = join_left_len + apply_left_len * 2 + ..join_left_len + apply_left_len * 2 + join_right_len; + + match join.join_type() { + JoinType::LeftSemi | JoinType::LeftAnti => left_iter.collect(), + JoinType::RightSemi | JoinType::RightAnti => right_iter.collect(), + _ => left_iter.chain(right_iter).collect(), + } + }; let new_join = LogicalJoin::new( new_join_left.clone(), new_join_right.clone(), join.join_type(), new_join_condition, - ); + ) + .clone_with_output_indices(output_indices.clone()); match join.join_type() { JoinType::LeftSemi | JoinType::LeftAnti | JoinType::RightSemi | JoinType::RightAnti => { new_join.into() } JoinType::Inner | JoinType::LeftOuter | JoinType::RightOuter | JoinType::FullOuter => { - // Use project to provide a natural join - let mut project_exprs: Vec = vec![]; - - let d_offset = if join.join_type() == JoinType::RightOuter { - new_join_left.schema().len() - } else { - 0 - }; - - project_exprs.extend( - apply_left - .schema() - .fields - .iter() - .enumerate() - .map(|(i, field)| { - ExprImpl::InputRef(Box::new(InputRef::new( - i + d_offset, - field.data_type.clone(), - ))) - }) - .collect_vec(), - ); - - project_exprs.extend( - new_join_left - .schema() - .fields - .iter() - .enumerate() - .skip(apply_left_len) - .map(|(i, field)| { - ExprImpl::InputRef(Box::new(InputRef::new(i, field.data_type.clone()))) - }) - .collect_vec(), - ); - project_exprs.extend( - new_join_right - .schema() - .fields - .iter() - .enumerate() - .skip(apply_left_len) - .map(|(i, field)| { - ExprImpl::InputRef(Box::new(InputRef::new( - i + new_join_left.schema().len(), - field.data_type.clone(), - ))) - }) - .collect_vec(), - ); - - let new_project = LogicalProject::create(new_join.into(), project_exprs); - + let mut output_indices_mapping = + ColIndexMapping::new(output_indices.iter().map(|x| Some(*x)).collect()); // Leave other condition for predicate push down to deal with LogicalFilter::create( - new_project, + new_join.into(), Condition { conjunctions: other_condition, - }, + } + .rewrite_expr(&mut output_indices_mapping), ) } JoinType::Unspecified => unreachable!(), diff --git a/src/sqlparser/src/lib.rs b/src/sqlparser/src/lib.rs index 3b47984488551..988b218fd7f66 100644 --- a/src/sqlparser/src/lib.rs +++ b/src/sqlparser/src/lib.rs @@ -33,7 +33,6 @@ #![cfg_attr(not(feature = "std"), no_std)] #![feature(lint_reasons)] #![feature(let_chains)] -#![expect(clippy::derive_partial_eq_without_eq)] #![expect(clippy::doc_markdown)] #![expect(clippy::upper_case_acronyms)] diff --git a/src/stream/src/executor/sort_buffer.rs b/src/stream/src/executor/sort_buffer.rs index 2c503b923f594..79c271ee43df2 100644 --- a/src/stream/src/executor/sort_buffer.rs +++ b/src/stream/src/executor/sort_buffer.rs @@ -140,36 +140,25 @@ impl SortBuffer { // Only records with timestamp greater than the last watermark will be output, so // records will only be emitted exactly once unless recovery. let start_bound = if let Some(last_watermark) = last_watermark.clone() { - // TODO: `start_bound` is wrong here, only values with `val.0 > last_watermark` - // should be output, but it's hard to represent `OwnedRow::MAX`. A possible - // implementation is introducing `next_unit` on a subset of `ScalarImpl` variants. - // Currently, we can skip some values explicitly. - Bound::Excluded((last_watermark, OwnedRow::empty().into())) + Bound::Excluded(( + // TODO: unsupported type or watermark overflow. Do we have better ways instead + // of unwrap? + last_watermark.successor().unwrap(), + OwnedRow::empty().into(), + )) } else { Bound::Unbounded }; - // TODO: `end_bound` = `Bound::Inclusive((watermark_value + 1, OwnedRow::empty()))`, but - // it's hard to represent now, so we end the loop by an explicit break. - let end_bound = Bound::Unbounded; - - for ((time_col, _), (row, _)) in self.buffer.range((start_bound, end_bound)) { - if let Some(ref last_watermark) = &last_watermark && time_col == last_watermark { - continue; - } - // Only when a record's timestamp is prior to the watermark should it be - // sent to downstream. - if time_col <= watermark_val { - // Add the record to stream chunk data. Note that we retrieve the - // record from a BTreeMap, so data in this chunk should be ordered - // by timestamp and pk. - if let Some(data_chunk) = data_chunk_builder.append_one_row(row) { - // When the chunk size reaches its maximum, we construct a data chunk and - // send it to downstream. - yield data_chunk; - } - } else { - // We have collected all data below watermark. - break; + let end_bound = Bound::Excluded(( + (watermark_val.successor().unwrap()), + OwnedRow::empty().into(), + )); + + for (_, (row, _)) in self.buffer.range((start_bound, end_bound)) { + if let Some(data_chunk) = data_chunk_builder.append_one_row(row) { + // When the chunk size reaches its maximum, we construct a data chunk and + // send it to downstream. + yield data_chunk; } }