diff --git a/src/frontend/planner_test/tests/testdata/watermark.yaml b/src/frontend/planner_test/tests/testdata/watermark.yaml index d525c4e1211b5..2888ee086dcec 100644 --- a/src/frontend/planner_test/tests/testdata/watermark.yaml +++ b/src/frontend/planner_test/tests/testdata/watermark.yaml @@ -64,6 +64,18 @@ | └─StreamTableScan { table: t1, columns: [t1.ts, t1.v1, t1.v2, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) } └─StreamExchange { dist: HashShard(t2.ts) } └─StreamTableScan { table: t2, columns: [t2.ts, t2.v1, t2.v2, t2._row_id], pk: [t2._row_id], dist: UpstreamHashShard(t2._row_id) } +- name: left semi hash join + sql: | + create table t1 (ts timestamp with time zone, v1 int, v2 int, watermark for ts as ts - INTERVAL '1' SECOND) append only; + create table t2 (ts timestamp with time zone, v1 int, v2 int, watermark for ts as ts - INTERVAL '1' SECOND) append only; + select t1.ts as t1_ts, t1.v1 as t1_v1, t1.v2 as t1_v2 from t1 where exists (select * from t2 where t1.ts = t2.ts); + stream_plan: | + StreamMaterialize { columns: [t1_ts, t1_v1, t1_v2, t1._row_id(hidden)], pk_columns: [t1._row_id, t1_ts], pk_conflict: "no check", watermark_columns: [t1_ts] } + └─StreamHashJoin { type: LeftSemi, predicate: t1.ts = t2.ts, output_watermarks: [t1.ts], output: all } + ├─StreamExchange { dist: HashShard(t1.ts) } + | └─StreamTableScan { table: t1, columns: [t1.ts, t1.v1, t1.v2, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) } + └─StreamExchange { dist: HashShard(t2.ts) } + └─StreamTableScan { table: t2, columns: [t2.ts, t2._row_id], pk: [t2._row_id], dist: UpstreamHashShard(t2._row_id) } - name: union all sql: | create table t1 (ts timestamp with time zone, v1 int, v2 int, watermark for ts as ts - INTERVAL '1' SECOND) append only; diff --git a/src/frontend/src/optimizer/plan_node/stream_hash_join.rs b/src/frontend/src/optimizer/plan_node/stream_hash_join.rs index 95395325fd19b..0f5b1c6ff8600 100644 --- a/src/frontend/src/optimizer/plan_node/stream_hash_join.rs +++ b/src/frontend/src/optimizer/plan_node/stream_hash_join.rs @@ -74,8 +74,12 @@ impl StreamHashJoin { if logical.left().watermark_columns().contains(left_key) && logical.right().watermark_columns().contains(right_key) { - watermark_columns.insert(l2i.map(left_key)); - watermark_columns.insert(r2i.map(right_key)); + if let Some(internal) = l2i.try_map(left_key) { + watermark_columns.insert(internal); + } + if let Some(internal) = r2i.try_map(right_key) { + watermark_columns.insert(internal); + } } } logical.i2o_col_mapping().rewrite_bitset(&watermark_columns)