Skip to content

Commit

Permalink
fix(optimizer): fix hash join distribution (risingwavelabs#8598)
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzl25 authored Mar 16, 2023
1 parent 582307d commit 632423a
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 2 deletions.
24 changes: 24 additions & 0 deletions src/frontend/planner_test/tests/testdata/join.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -677,3 +677,27 @@
Table 6 { columns: [t_src, t_dst, t__row_id], primary key: [$0 ASC, $2 ASC], value indices: [0, 1, 2], distribution key: [0], read pk prefix len hint: 1 }
Table 7 { columns: [t_src, t__row_id, _degree], primary key: [$0 ASC, $1 ASC], value indices: [2], distribution key: [0], read pk prefix len hint: 1 }
Table 4294967294 { columns: [p1, p2, p3, t._row_id, t._row_id#1, t.src, t._row_id#2], primary key: [$3 ASC, $4 ASC, $1 ASC, $6 ASC, $5 ASC, $0 ASC], value indices: [0, 1, 2, 3, 4, 5, 6], distribution key: [0], read pk prefix len hint: 6 }
- name: Fix hash join distribution key (https://github.com/risingwavelabs/risingwave/issues/8537)
sql: |
CREATE TABLE part (
p INTEGER,
c VARCHAR,
PRIMARY KEY (p)
);
CREATE TABLE B (
b INTEGER,
d VARCHAR,
PRIMARY KEY (b)
);
select B.* from part join B on part.c = B.d join part p1 on p1.p = part.p and p1.p = B.b;
stream_plan: |
StreamMaterialize { columns: [b, d, part.p(hidden), part.c(hidden), part.p#1(hidden)], pk_columns: [part.p, b, part.c, part.p#1], pk_conflict: "no check" }
└─StreamHashJoin { type: Inner, predicate: part.p = part.p AND b.b = part.p, output: [b.b, b.d, part.p, part.c, part.p] }
├─StreamExchange { dist: HashShard(part.p, b.b) }
| └─StreamHashJoin { type: Inner, predicate: part.c = b.d, output: [part.p, b.b, b.d, part.c] }
| ├─StreamExchange { dist: HashShard(part.c) }
| | └─StreamTableScan { table: part, columns: [part.p, part.c], pk: [part.p], dist: UpstreamHashShard(part.p) }
| └─StreamExchange { dist: HashShard(b.d) }
| └─StreamTableScan { table: b, columns: [b.b, b.d], pk: [b.b], dist: UpstreamHashShard(b.b) }
└─StreamExchange { dist: HashShard(part.p, part.p) }
└─StreamTableScan { table: part, columns: [part.p], pk: [part.p], dist: UpstreamHashShard(part.p) }
4 changes: 3 additions & 1 deletion src/frontend/src/optimizer/plan_node/batch_hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,9 @@ impl ToDistributedBatch for BatchHashJoin {
let r2l = self
.eq_join_predicate()
.r2l_eq_columns_mapping(left.schema().len(), right.schema().len());
let l2r = r2l.inverse();
let l2r = self
.eq_join_predicate()
.l2r_eq_columns_mapping(left.schema().len());

let right_dist = right.distribution();
match right_dist {
Expand Down
9 changes: 9 additions & 0 deletions src/frontend/src/optimizer/plan_node/eq_join_predicate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,15 @@ impl EqJoinPredicate {
ColIndexMapping::new(map)
}

/// return the eq columns index mapping from left inputs to right inputs
pub fn l2r_eq_columns_mapping(&self, left_cols_num: usize) -> ColIndexMapping {
let mut map = vec![None; left_cols_num];
for (left, right, _) in self.eq_keys() {
map[left.index] = Some(right.index - left_cols_num);
}
ColIndexMapping::new(map)
}

/// Reorder the `eq_keys` according to the `reorder_idx`.
pub fn reorder(self, reorder_idx: &[usize]) -> Self {
assert!(reorder_idx.len() <= self.eq_keys.len());
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/logical_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -891,7 +891,7 @@ impl LogicalJoin {
let mut left = self.left();

let r2l = predicate.r2l_eq_columns_mapping(left.schema().len(), right.schema().len());
let l2r = r2l.inverse();
let l2r = predicate.l2r_eq_columns_mapping(left.schema().len());

let right_dist = right.distribution();
match right_dist {
Expand Down

0 comments on commit 632423a

Please sign in to comment.