Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(frontend): avoid pk duplication #7073

Closed
wants to merge 16 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -626,8 +626,8 @@

Table 0 { columns: [ak1.k1, ak1.v, ak1.a._row_id], primary key: [$0 ASC, $2 ASC], value indices: [0, 1, 2], distribution key: [0] }
Table 1 { columns: [ak1.k1, ak1.a._row_id, _degree], primary key: [$0 ASC, $1 ASC], value indices: [2], distribution key: [0] }
Table 2 { columns: [count, a.k1], primary key: [$1 ASC, $1 ASC], value indices: [0, 1], distribution key: [1] }
Table 3 { columns: [a.k1, a.k1_0, _degree], primary key: [$0 ASC, $1 ASC], value indices: [2], distribution key: [0] }
Table 2 { columns: [count, a.k1], primary key: [$1 ASC], value indices: [0, 1], distribution key: [1] }
Table 3 { columns: [a.k1, _degree], primary key: [$0 ASC], value indices: [1], distribution key: [0] }
Table 4 { columns: [a.k1, count, count_0], primary key: [$0 ASC], value indices: [1, 2], distribution key: [0] }
Table 4294967294 { columns: [v, bv, ak1.a._row_id, ak1.k1, a.k1], primary key: [$2 ASC, $4 ASC, $3 ASC], value indices: [0, 1, 2, 3, 4], distribution key: [3] }
- id: aggk1_join_Ak1_onk1
Expand Down Expand Up @@ -680,8 +680,8 @@
Upstream
BatchPlanNode

Table 0 { columns: [count, a.k1], primary key: [$1 ASC, $1 ASC], value indices: [0, 1], distribution key: [1] }
Table 1 { columns: [a.k1, a.k1_0, _degree], primary key: [$0 ASC, $1 ASC], value indices: [2], distribution key: [0] }
Table 0 { columns: [count, a.k1], primary key: [$1 ASC], value indices: [0, 1], distribution key: [1] }
Table 1 { columns: [a.k1, _degree], primary key: [$0 ASC], value indices: [1], distribution key: [0] }
Table 2 { columns: [ak1.k1, ak1.v, ak1.a._row_id], primary key: [$0 ASC, $2 ASC], value indices: [0, 1, 2], distribution key: [0] }
Table 3 { columns: [ak1.k1, ak1.a._row_id, _degree], primary key: [$0 ASC, $1 ASC], value indices: [2], distribution key: [0] }
Table 4 { columns: [a.k1, count, count_0], primary key: [$0 ASC], value indices: [1, 2], distribution key: [0] }
Expand Down Expand Up @@ -749,10 +749,10 @@
Upstream
BatchPlanNode

Table 0 { columns: [count, a.k1], primary key: [$1 ASC, $1 ASC], value indices: [0, 1], distribution key: [1] }
Table 1 { columns: [a.k1, a.k1_0, _degree], primary key: [$0 ASC, $1 ASC], value indices: [2], distribution key: [0] }
Table 2 { columns: [count, b.k1], primary key: [$1 ASC, $1 ASC], value indices: [0, 1], distribution key: [1] }
Table 3 { columns: [b.k1, b.k1_0, _degree], primary key: [$0 ASC, $1 ASC], value indices: [2], distribution key: [0] }
Table 0 { columns: [count, a.k1], primary key: [$1 ASC], value indices: [0, 1], distribution key: [1] }
Table 1 { columns: [a.k1, _degree], primary key: [$0 ASC], value indices: [1], distribution key: [0] }
Table 2 { columns: [count, b.k1], primary key: [$1 ASC], value indices: [0, 1], distribution key: [1] }
Table 3 { columns: [b.k1, _degree], primary key: [$0 ASC], value indices: [1], distribution key: [0] }
Table 4 { columns: [a.k1, count, count_0], primary key: [$0 ASC], value indices: [1, 2], distribution key: [0] }
Table 5 { columns: [b.k1, count, count_0], primary key: [$0 ASC], value indices: [1, 2], distribution key: [0] }
Table 4294967294 { columns: [num, bv, a.k1, b.k1], primary key: [$2 ASC, $3 ASC], value indices: [0, 1, 2, 3], distribution key: [2] }
Expand Down
64 changes: 32 additions & 32 deletions src/frontend/planner_test/tests/testdata/nexmark.yaml

Large diffs are not rendered by default.

368 changes: 184 additions & 184 deletions src/frontend/planner_test/tests/testdata/tpch.yaml

Large diffs are not rendered by default.

20 changes: 15 additions & 5 deletions src/frontend/src/optimizer/plan_node/generic/agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,11 @@ impl<PlanRef: stream::StreamPlanRef> Agg<PlanRef> {
let table_col_idx =
internal_table_catalog_builder.add_column(&in_fields[upstream_idx]);
if let Some(order_type) = order_type {
internal_table_catalog_builder.add_order_column(table_col_idx, order_type);
internal_table_catalog_builder.add_order_column(
table_col_idx,
order_type,
false,
);
}
included_upstream_indices.push(upstream_idx);
table_col_idx
Expand Down Expand Up @@ -211,8 +215,11 @@ impl<PlanRef: stream::StreamPlanRef> Agg<PlanRef> {
let mut included_upstream_indices = vec![];
for &idx in &self.group_key {
let tb_column_idx = internal_table_catalog_builder.add_column(&in_fields[idx]);
internal_table_catalog_builder
.add_order_column(tb_column_idx, OrderType::Ascending);
internal_table_catalog_builder.add_order_column(
tb_column_idx,
OrderType::Ascending,
true,
);
included_upstream_indices.push(idx);
}

Expand Down Expand Up @@ -335,8 +342,11 @@ impl<PlanRef: stream::StreamPlanRef> Agg<PlanRef> {
for field in out_fields.iter() {
let tb_column_idx = internal_table_catalog_builder.add_column(field);
if tb_column_idx < self.group_key.len() {
internal_table_catalog_builder
.add_order_column(tb_column_idx, OrderType::Ascending);
internal_table_catalog_builder.add_order_column(
tb_column_idx,
OrderType::Ascending,
true,
);
}
}
internal_table_catalog_builder.set_read_prefix_len_hint(self.group_key.len());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ pub fn infer_left_internal_table_catalog(
});

pk_indices.iter().for_each(|idx| {
internal_table_catalog_builder.add_order_column(*idx, OrderType::Ascending)
internal_table_catalog_builder.add_order_column(*idx, OrderType::Ascending, true)
});

internal_table_catalog_builder.build(dist_keys)
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/generic/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ impl Source {

let ordered_col_idx = builder.add_column(&key);
builder.add_column(&value);
builder.add_order_column(ordered_col_idx, OrderType::Ascending);
builder.add_order_column(ordered_col_idx, OrderType::Ascending, true);

builder.build(vec![])
}
Expand Down
11 changes: 7 additions & 4 deletions src/frontend/src/optimizer/plan_node/generic/top_n.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,21 +58,24 @@ impl<PlanRef: stream::StreamPlanRef> TopN<PlanRef> {
// does a prefix scanning with the group key, we can fetch the data in the
// desired order.
self.group_key.iter().for_each(|&idx| {
internal_table_catalog_builder.add_order_column(idx, OrderType::Ascending);
internal_table_catalog_builder.add_order_column(idx, OrderType::Ascending, true);
order_cols.insert(idx);
});

field_order.iter().for_each(|field_order| {
if !order_cols.contains(&field_order.index) {
internal_table_catalog_builder
.add_order_column(field_order.index, OrderType::from(field_order.direct));
internal_table_catalog_builder.add_order_column(
field_order.index,
OrderType::from(field_order.direct),
false,
);
order_cols.insert(field_order.index);
}
});

pk_indices.iter().for_each(|idx| {
if !order_cols.contains(idx) {
internal_table_catalog_builder.add_order_column(*idx, OrderType::Ascending);
internal_table_catalog_builder.add_order_column(*idx, OrderType::Ascending, true);
order_cols.insert(*idx);
}
});
Expand Down
19 changes: 13 additions & 6 deletions src/frontend/src/optimizer/plan_node/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,8 +241,13 @@ impl HashJoin {
// The pk of hash join internal and degree table should be join_key + input_pk.
let join_key_len = join_key_indices.len();
let mut pk_indices = join_key_indices;

// TODO(yuhao): dedup the dist key and pk.
pk_indices.extend(input.logical_pk());
for input_pk_index in input.logical_pk() {
if !pk_indices.contains(input_pk_index) {
pk_indices.push(*input_pk_index);
}
}

// Build internal table
let mut internal_table_catalog_builder =
Expand All @@ -252,9 +257,12 @@ impl HashJoin {
internal_columns_fields.iter().for_each(|field| {
internal_table_catalog_builder.add_column(field);
});

pk_indices.iter().for_each(|idx| {
internal_table_catalog_builder.add_order_column(*idx, OrderType::Ascending)
pk_indices.iter().enumerate().for_each(|(order_idx, idx)| {
if order_idx < join_key_len {
internal_table_catalog_builder.add_order_column(*idx, OrderType::Ascending, false)
} else {
internal_table_catalog_builder.add_order_column(*idx, OrderType::Ascending, true)
}
});

// Build degree table.
Expand All @@ -265,15 +273,14 @@ impl HashJoin {

pk_indices.iter().enumerate().for_each(|(order_idx, idx)| {
degree_table_catalog_builder.add_column(&internal_columns_fields[*idx]);
degree_table_catalog_builder.add_order_column(order_idx, OrderType::Ascending)
degree_table_catalog_builder.add_order_column(order_idx, OrderType::Ascending, false);
});
degree_table_catalog_builder.add_column(&degree_column_field);
degree_table_catalog_builder
.set_value_indices(vec![degree_table_catalog_builder.columns().len() - 1]);

internal_table_catalog_builder.set_read_prefix_len_hint(join_key_len);
degree_table_catalog_builder.set_read_prefix_len_hint(join_key_len);

(
internal_table_catalog_builder.build(internal_table_dist_keys),
degree_table_catalog_builder.build(degree_table_dist_keys),
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/stream_materialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ impl StreamMaterialize {
let input = required_dist.enforce_if_not_satisfies(input, &Order::any())?;
let base = PlanBase::derive_stream_plan_base(&input);
let schema = &base.schema;
let pk_indices = &base.logical_pk;
let pk_indices = &base.logical_pk.iter().copied().unique().collect_vec();

let mut col_names = HashSet::new();
for name in &out_names {
Expand Down
32 changes: 23 additions & 9 deletions src/frontend/src/optimizer/plan_node/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,29 @@ impl TableCatalogBuilder {
}

/// Check whether need to add a ordered column. Different from value, order desc equal pk in
/// semantics and they are encoded as storage key.
pub fn add_order_column(&mut self, index: usize, order_type: OrderType) {
self.pk.push(FieldOrder {
index,
direct: match order_type {
OrderType::Ascending => Direction::Asc,
OrderType::Descending => Direction::Desc,
},
});
/// semantics and they are encoded as storage key, dedup is true means ignoring duplicated pk.
pub fn add_order_column(&mut self, index: usize, order_type: OrderType, dedup: bool) {
match dedup {
true => {
let pk_indices = self.pk.iter().map(|pk| pk.index).collect_vec();
if !pk_indices.contains(&index) {
self.pk.push(FieldOrder {
index,
direct: match order_type {
OrderType::Ascending => Direction::Asc,
OrderType::Descending => Direction::Desc,
},
});
}
}
false => self.pk.push(FieldOrder {
index,
direct: match order_type {
OrderType::Ascending => Direction::Asc,
OrderType::Descending => Direction::Desc,
},
}),
}
}

pub fn set_read_prefix_len_hint(&mut self, read_prefix_len_hint: usize) {
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/stream_fragmenter/rewrite/delta_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,7 @@ fn infer_internal_table_catalog(
internal_table_catalog_builder.add_order_column(
order.index as usize,
OrderType::from_prost(&ProstOrderType::from_i32(order.order_type).unwrap()),
true,
);
}

Expand Down