Skip to content

Commit

Permalink
refactor(streaming): use table catalog in hash join (risingwavelabs#3707
Browse files Browse the repository at this point in the history
)

* use tablecatalog

* fix infer_internal_table_catalog

* cargo fix

* fix tests

* keys -> key

* fix build executor

* minor

* fix incorrect order keys

* fix comments

* fix typo

* add comment
  • Loading branch information
yuhao-su authored Jul 8, 2022
1 parent 2dc7992 commit 17c0417
Show file tree
Hide file tree
Showing 8 changed files with 229 additions and 219 deletions.
10 changes: 4 additions & 6 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,11 @@ message HashJoinNode {
catalog.Table left_table = 6;
// Used for internal table states.
catalog.Table right_table = 7;
repeated uint32 dist_key_l = 8;
repeated uint32 dist_key_r = 9;
// It is true when the input is append-only
bool is_append_only = 10;
// Whether to optimize for append only stream.
// the output indices of current node
repeated uint32 output_indices = 11;
// It is true when the input is append-only
bool is_append_only = 8;
// The output indices of current node
repeated uint32 output_indices = 9;
}

message DynamicFilterNode {
Expand Down
141 changes: 53 additions & 88 deletions src/frontend/src/optimizer/plan_node/stream_hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,22 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::fmt;

use itertools::Itertools;
use risingwave_common::catalog::{ColumnDesc, DatabaseId, SchemaId, TableId};
use risingwave_common::catalog::{DatabaseId, Field, SchemaId};
use risingwave_common::types::DataType;
use risingwave_common::util::sort_util::OrderType;
use risingwave_pb::plan_common::JoinType;
use risingwave_pb::stream_plan::stream_node::NodeBody;
use risingwave_pb::stream_plan::HashJoinNode;

use super::utils::TableCatalogBuilder;
use super::{LogicalJoin, PlanBase, PlanRef, PlanTreeNodeBinary, StreamDeltaJoin, ToStreamProst};
use crate::catalog::column_catalog::ColumnCatalog;
use crate::catalog::table_catalog::TableCatalog;
use crate::expr::Expr;
use crate::optimizer::plan_node::EqJoinPredicate;
use crate::optimizer::property::{Direction, Distribution, FieldOrder};
use crate::optimizer::property::Distribution;
use crate::utils::ColIndexMapping;

/// [`StreamHashJoin`] implements [`super::LogicalJoin`] with hash table. It builds a hash table
Expand All @@ -46,8 +47,6 @@ pub struct StreamHashJoin {
/// only. Will remove after we have fully support shared state and index.
is_delta: bool,

dist_key_l: Distribution,
dist_key_r: Distribution,
/// Whether can optimize for append-only stream.
/// It is true if input of both side is append-only
is_append_only: bool,
Expand All @@ -70,9 +69,6 @@ impl StreamHashJoin {
.composite(&logical.i2o_col_mapping()),
);

let dist_l = logical.left().distribution().clone();
let dist_r = logical.right().distribution().clone();

let force_delta = ctx.inner().session_ctx.config().get_delta_join();

// TODO: derive from input
Expand All @@ -89,8 +85,6 @@ impl StreamHashJoin {
logical,
eq_join_predicate,
is_delta: force_delta,
dist_key_l: dist_l,
dist_key_r: dist_r,
is_append_only: append_only,
}
}
Expand Down Expand Up @@ -181,46 +175,35 @@ impl_plan_tree_node_for_binary! { StreamHashJoin }

impl ToStreamProst for StreamHashJoin {
fn to_stream_prost_body(&self) -> NodeBody {
let left_key_indices = self.eq_join_predicate.left_eq_indexes();
let right_key_indices = self.eq_join_predicate.right_eq_indexes();
let left_key_indices_prost = left_key_indices.iter().map(|idx| *idx as i32).collect_vec();
let right_key_indices_prost = right_key_indices
.iter()
.map(|idx| *idx as i32)
.collect_vec();
NodeBody::HashJoin(HashJoinNode {
join_type: self.logical.join_type() as i32,
left_key: self
.eq_join_predicate
.left_eq_indexes()
.iter()
.map(|v| *v as i32)
.collect(),
right_key: self
.eq_join_predicate
.right_eq_indexes()
.iter()
.map(|v| *v as i32)
.collect(),
left_key: left_key_indices_prost,
right_key: right_key_indices_prost,
condition: self
.eq_join_predicate
.other_cond()
.as_expr_unless_true()
.map(|x| x.to_expr_proto()),
dist_key_l: self
.dist_key_l
.dist_column_indices()
.iter()
.map(|idx| *idx as u32)
.collect_vec(),
dist_key_r: self
.dist_key_r
.dist_column_indices()
.iter()
.map(|idx| *idx as u32)
.collect_vec(),
is_delta_join: self.is_delta,
left_table: Some(infer_internal_table_catalog(self.left()).to_prost(
SchemaId::placeholder() as u32,
DatabaseId::placeholder() as u32,
)),
right_table: Some(infer_internal_table_catalog(self.right()).to_prost(
SchemaId::placeholder() as u32,
DatabaseId::placeholder() as u32,
)),
left_table: Some(
infer_internal_table_catalog(self.left(), left_key_indices).to_prost(
SchemaId::placeholder() as u32,
DatabaseId::placeholder() as u32,
),
),
right_table: Some(
infer_internal_table_catalog(self.right(), right_key_indices).to_prost(
SchemaId::placeholder() as u32,
DatabaseId::placeholder() as u32,
),
),
output_indices: self
.logical
.output_indices()
Expand All @@ -232,51 +215,33 @@ impl ToStreamProst for StreamHashJoin {
}
}

fn infer_internal_table_catalog(input: PlanRef) -> TableCatalog {
fn infer_internal_table_catalog(input: PlanRef, join_key_indices: Vec<usize>) -> TableCatalog {
let base = input.plan_base();
let schema = &base.schema;
let pk_indices = &base.pk_indices;
let mut col_names = HashMap::new();
// FIXME: temp fix, use TableCatalogBuilder to avoid_duplicate_col_name in the future (https://github.com/singularity-data/risingwave/issues/3657)
let columns = schema
.fields()
.iter()
.enumerate()
.map(|(i, field)| {
let mut c = ColumnCatalog {
column_desc: ColumnDesc::from_field_with_column_id(field, i as i32),
is_hidden: false,
};
c.column_desc.name = match col_names.try_insert(field.name.clone(), 0) {
Ok(_) => field.name.clone(),
Err(mut err) => {
let cnt = err.entry.get_mut();
*cnt += 1;
field.name.clone() + "#" + &cnt.to_string()
}
};
c
})
.collect_vec();
let mut order_desc = vec![];
for &index in pk_indices {
order_desc.push(FieldOrder {
index,
direct: Direction::Asc,
});
}
TableCatalog {
id: TableId::placeholder(),
associated_source_id: None,
name: String::new(),
columns,
order_key: order_desc,
pk: pk_indices.clone(),
distribution_key: base.dist.dist_column_indices().to_vec(),
is_index_on: None,
appendonly: input.append_only(),
owner: risingwave_common::catalog::DEFAULT_SUPPER_USER.to_string(),
vnode_mapping: None,
properties: HashMap::default(),
}

let append_only = input.append_only();
let dist_keys = base.dist.dist_column_indices().to_vec();

// The pk of hash join internal table should be join_key + input_pk.
let mut pk_indices = join_key_indices;
// TODO(yuhao): dedup the dist key and pk.
pk_indices.extend(&base.pk_indices);

let mut columns_fields = schema.fields().to_vec();

// The join degree at the end of internal table.
let degree_column_field = Field::with_name(DataType::Int64, "_degree");
columns_fields.push(degree_column_field);

let mut internal_table_catalog_builder = TableCatalogBuilder::new();

columns_fields.iter().for_each(|field| {
internal_table_catalog_builder.add_column_desc_from_field_without_order_type(field)
});

pk_indices.iter().for_each(|idx| {
internal_table_catalog_builder.add_order_column(*idx, OrderType::Ascending)
});

internal_table_catalog_builder.build(dist_keys, append_only)
}
49 changes: 35 additions & 14 deletions src/frontend/src/optimizer/plan_node/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,27 @@ impl TableCatalogBuilder {
});

// Ordered column desc must be a pk.
self.add_order_column(column_desc, order_type);
if let Some(order) = order_type {
self.add_order_column(i32::from(column_desc.column_id) as usize, order);
}
}

/// Add a column from Field info. Should use `add_order_column` to build order keys.
/// Note that we should make sure `column_id` of columns and order keys index are 1-1 mapping
/// (e.g. in hash join)
pub fn add_column_desc_from_field_without_order_type(&mut self, field: &Field) {
let column_id = self.cur_col_id();
// Add column desc.
let mut column_desc = ColumnDesc::from_field_with_column_id(field, column_id);

// Avoid column name duplicate.
self.avoid_duplicate_col_name(&mut column_desc);

self.columns.push(ColumnCatalog {
column_desc: column_desc.clone(),
// All columns in internal table are invisible to batch query.
is_hidden: false,
});
}

/// Add a unnamed column.
Expand All @@ -73,23 +93,24 @@ impl TableCatalogBuilder {
is_hidden: false,
});

self.add_order_column(column_desc, order_type);
if let Some(order) = order_type {
self.add_order_column(i32::from(column_desc.column_id) as usize, order);
}
}

/// Check whether need to add a ordered column. Different from value, order desc equal pk in
/// semantics and they are encoded as storage key.
fn add_order_column(&mut self, column_desc: ColumnDesc, order_type: Option<OrderType>) {
let index = i32::from(column_desc.column_id) as usize;
if let Some(order) = order_type {
self.pk_indices.push(index);
self.order_key.push(FieldOrder {
index,
direct: match order {
OrderType::Ascending => Direction::Asc,
OrderType::Descending => Direction::Desc,
},
});
}
/// WARNING: This should only be called by user when building internal table of hash join (after
/// `add_column_desc_from_field_without_order_type`).
pub fn add_order_column(&mut self, index: usize, order_type: OrderType) {
self.pk_indices.push(index);
self.order_key.push(FieldOrder {
index,
direct: match order_type {
OrderType::Ascending => Direction::Asc,
OrderType::Descending => Direction::Desc,
},
});
}

/// Check the column name whether exist before. if true, record occurrence and change the name
Expand Down
50 changes: 50 additions & 0 deletions src/storage/src/table/state_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,17 @@ use std::borrow::Cow;
use std::cmp::Ordering;
use std::marker::PhantomData;
use std::ops::Index;
use std::sync::Arc;

use futures::{pin_mut, Stream, StreamExt};
use futures_async_stream::try_stream;
use risingwave_common::array::Row;
use risingwave_common::buffer::Bitmap;
use risingwave_common::catalog::{ColumnDesc, TableId};
use risingwave_common::util::ordered::OrderedRowSerializer;
use risingwave_common::util::sort_util::OrderType;
use risingwave_hummock_sdk::key::range_of_prefix;
use risingwave_pb::catalog::Table;

use super::mem_table::{MemTable, RowOp};
use super::storage_table::{StorageTableBase, READ_WRITE};
Expand Down Expand Up @@ -222,6 +225,53 @@ impl<S: StateStore> StateTable<S> {

Ok(StateTableRowIter::new(mem_table_iter, storage_table_iter).into_stream())
}

/// Create state table from table catalog and store.
pub fn from_table_catalog(
table_catalog: &Table,
store: S,
vnodes: Option<Arc<Bitmap>>,
) -> Self {
let table_columns = table_catalog
.columns
.iter()
.map(|col| col.column_desc.as_ref().unwrap().into())
.collect();
let order_types = table_catalog
.order_key
.iter()
.map(|col_order| {
OrderType::from_prost(
&risingwave_pb::plan_common::OrderType::from_i32(col_order.order_type).unwrap(),
)
})
.collect();
let dist_key_indices = table_catalog
.distribution_key
.iter()
.map(|dist_index| *dist_index as usize)
.collect();
let pk_indices = table_catalog
.order_key
.iter()
.map(|col_order| col_order.index as usize)
.collect();
let distribution = match vnodes {
Some(vnodes) => Distribution {
dist_key_indices,
vnodes,
},
None => Distribution::fallback(),
};
StateTable::new_with_distribution(
store,
TableId::new(table_catalog.id),
table_columns,
order_types,
pk_indices,
distribution,
)
}
}

pub type RowStream<'a, S: StateStore> = impl Stream<Item = StorageResult<Cow<'a, Row>>>;
Expand Down
Loading

0 comments on commit 17c0417

Please sign in to comment.