Skip to content

Commit

Permalink
refactor(frontend): replace dist_key_indices with dist_key_in_pk_indi…
Browse files Browse the repository at this point in the history
…ces in frontend (risingwavelabs#8617)
  • Loading branch information
wcy-fdu authored Mar 17, 2023
1 parent 961e342 commit 18863e0
Show file tree
Hide file tree
Showing 15 changed files with 75 additions and 51 deletions.
20 changes: 12 additions & 8 deletions dashboard/proto/gen/plan_common.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion proto/plan_common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ message StorageTableDesc {
repeated ColumnDesc columns = 2;
// TODO: may refactor primary key representations
repeated common.ColumnOrder pk = 3;
repeated uint32 dist_key_indices = 4;
repeated uint32 dist_key_in_pk_indices = 4;
uint32 retention_seconds = 5;
repeated uint32 value_indices = 6;
uint32 read_prefix_len_hint = 7;
Expand Down
6 changes: 3 additions & 3 deletions src/batch/src/executor/join/distributed_lookup_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,13 +195,13 @@ impl BoxedExecutorBuilder for DistributedLookupJoinExecutorBuilder {
.map(|k| k.column_index as usize)
.collect_vec();

let dist_key_indices = table_desc
.dist_key_indices
let dist_key_in_pk_indices = table_desc
.dist_key_in_pk_indices
.iter()
.map(|&k| k as usize)
.collect_vec();
// Lookup Join always contains distribution key, so we don't need vnode bitmap
let distribution = Distribution::all_vnodes(dist_key_indices);
let distribution = Distribution::all_vnodes(dist_key_in_pk_indices);
let table_option = TableOption {
retention_seconds: if table_desc.retention_seconds > 0 {
Some(table_desc.retention_seconds)
Expand Down
13 changes: 4 additions & 9 deletions src/batch/src/executor/join/local_lookup_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,20 +77,15 @@ pub type BoxedLookupExecutorBuilder = Box<dyn LookupExecutorBuilder>;
impl<C: BatchTaskContext> InnerSideExecutorBuilder<C> {
/// Gets the virtual node based on the given `scan_range`
fn get_virtual_node(&self, scan_range: &ScanRange) -> Result<VirtualNode> {
let dist_keys = self
let dist_key_in_pk_indices = self
.table_desc
.dist_key_indices
.dist_key_in_pk_indices
.iter()
.map(|&k| k as usize)
.collect_vec();
let pk_indices = self
.table_desc
.pk
.iter()
.map(|col| col.column_index as usize)
.collect_vec();

let virtual_node = scan_range.try_compute_vnode(&dist_keys, &pk_indices);
let virtual_node =
scan_range.try_compute_vnode_with_dist_key_in_pk_indices(&dist_key_in_pk_indices);
virtual_node.ok_or_else(|| internal_error("Could not compute vnode for lookup join"))
}

Expand Down
8 changes: 4 additions & 4 deletions src/batch/src/executor/row_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,19 +202,19 @@ impl BoxedExecutorBuilder for RowSeqScanExecutorBuilder {
.map(|k| k.column_index as usize)
.collect_vec();

let dist_key_indices = table_desc
.dist_key_indices
let dist_key_in_pk_indices = table_desc
.dist_key_in_pk_indices
.iter()
.map(|&k| k as usize)
.collect_vec();
let distribution = match &seq_scan_node.vnode_bitmap {
Some(vnodes) => Distribution {
vnodes: Bitmap::from(vnodes).into(),
dist_key_indices,
dist_key_in_pk_indices,
},
// This is possible for dml. vnode_bitmap is not filled by scheduler.
// Or it's single distribution, e.g., distinct agg. We scan in a single executor.
None => Distribution::all_vnodes(dist_key_indices),
None => Distribution::all_vnodes(dist_key_in_pk_indices),
};

let table_option = TableOption {
Expand Down
24 changes: 23 additions & 1 deletion src/common/src/catalog/physical_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::collections::HashMap;

use fixedbitset::FixedBitSet;
use itertools::Itertools;
use risingwave_pb::common::PbColumnOrder;
use risingwave_pb::plan_common::StorageTableDesc;

Expand Down Expand Up @@ -77,11 +78,32 @@ impl TableDesc {
}

pub fn to_protobuf(&self) -> StorageTableDesc {
let dist_key_indices: Vec<u32> = self.distribution_key.iter().map(|&k| k as u32).collect();
let pk_indices: Vec<u32> = self
.pk
.iter()
.map(|v| v.to_protobuf().column_index)
.collect();
let dist_key_in_pk_indices = dist_key_indices
.iter()
.map(|&di| {
pk_indices
.iter()
.position(|&pi| di == pi)
.unwrap_or_else(|| {
panic!(
"distribution key {:?} must be a subset of primary key {:?}",
dist_key_indices, pk_indices
)
})
})
.map(|d| d as u32)
.collect_vec();
StorageTableDesc {
table_id: self.table_id.into(),
columns: self.columns.iter().map(Into::into).collect(),
pk: self.pk.iter().map(|v| v.to_protobuf()).collect(),
dist_key_indices: self.distribution_key.iter().map(|&k| k as u32).collect(),
dist_key_in_pk_indices,
retention_seconds: self.retention_seconds,
value_indices: self.value_indices.iter().map(|&v| v as u32).collect(),
read_prefix_len_hint: self.read_prefix_len_hint as u32,
Expand Down
9 changes: 8 additions & 1 deletion src/common/src/util/scan_range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,14 +95,21 @@ impl ScanRange {
}

let dist_key_in_pk_indices = get_dist_key_in_pk_indices(dist_key_indices, pk_indices);
self.try_compute_vnode_with_dist_key_in_pk_indices(&dist_key_in_pk_indices)
}

pub fn try_compute_vnode_with_dist_key_in_pk_indices(
&self,
dist_key_in_pk_indices: &[usize],
) -> Option<VirtualNode> {
let pk_prefix_len = self.eq_conds.len();
if dist_key_in_pk_indices.iter().any(|&i| i >= pk_prefix_len) {
return None;
}

let pk_prefix_value = &self.eq_conds;
let vnode = pk_prefix_value
.project(&dist_key_in_pk_indices)
.project(dist_key_in_pk_indices)
.hash(Crc32FastBuilder)
.to_vnode();
Some(vnode)
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/scheduler/distributed/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,7 @@ pub(crate) mod tests {
field_descs: vec![],
},
],
distribution_key: vec![2],
distribution_key: vec![],
append_only: false,
retention_seconds: TABLE_OPTION_DUMMY_RETENTION_SECOND,
value_indices: vec![0, 1, 2],
Expand Down
7 changes: 2 additions & 5 deletions src/storage/src/table/batch_table/storage_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,7 @@ use futures::{Stream, StreamExt};
use futures_async_stream::try_stream;
use itertools::{Either, Itertools};
use risingwave_common::buffer::Bitmap;
use risingwave_common::catalog::{
get_dist_key_in_pk_indices, ColumnDesc, ColumnId, Schema, TableId, TableOption,
};
use risingwave_common::catalog::{ColumnDesc, ColumnId, Schema, TableId, TableOption};
use risingwave_common::hash::{VirtualNode, VnodeBitmapExt};
use risingwave_common::row::{self, OwnedRow, Row, RowExt};
use risingwave_common::util::ordered::*;
Expand Down Expand Up @@ -183,7 +181,7 @@ impl<S: StateStore> StorageTableInner<S, EitherSerde> {
order_types: Vec<OrderType>,
pk_indices: Vec<usize>,
Distribution {
dist_key_indices,
dist_key_in_pk_indices,
vnodes,
}: Distribution,
table_option: TableOption,
Expand Down Expand Up @@ -244,7 +242,6 @@ impl<S: StateStore> StorageTableInner<S, EitherSerde> {
}
};

let dist_key_in_pk_indices = get_dist_key_in_pk_indices(&dist_key_indices, &pk_indices);
let key_output_indices = match key_output_indices.is_empty() {
true => None,
false => Some(key_output_indices),
Expand Down
8 changes: 4 additions & 4 deletions src/storage/src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ pub const DEFAULT_VNODE: VirtualNode = VirtualNode::ZERO;
#[derive(Debug)]
pub struct Distribution {
/// Indices of distribution key for computing vnode, based on the all columns of the table.
pub dist_key_indices: Vec<usize>,
pub dist_key_in_pk_indices: Vec<usize>,

/// Virtual nodes that the table is partitioned into.
pub vnodes: Arc<Bitmap>,
Expand All @@ -49,7 +49,7 @@ impl Distribution {
vnodes.finish().into()
});
Self {
dist_key_indices: vec![],
dist_key_in_pk_indices: vec![],
vnodes: FALLBACK_VNODES.clone(),
}
}
Expand All @@ -66,12 +66,12 @@ impl Distribution {
}

/// Distribution that accesses all vnodes, mainly used for tests.
pub fn all_vnodes(dist_key_indices: Vec<usize>) -> Self {
pub fn all_vnodes(dist_key_in_pk_indices: Vec<usize>) -> Self {
/// A bitmap that all vnodes are set.
static ALL_VNODES: LazyLock<Arc<Bitmap>> =
LazyLock::new(|| Bitmap::ones(VirtualNode::COUNT).into());
Self {
dist_key_indices,
dist_key_in_pk_indices,
vnodes: ALL_VNODES.clone(),
}
}
Expand Down
3 changes: 1 addition & 2 deletions src/stream/src/common/table/state_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ where
order_types: Vec<OrderType>,
pk_indices: Vec<usize>,
Distribution {
dist_key_indices,
dist_key_in_pk_indices,
vnodes,
}: Distribution,
value_indices: Option<Vec<usize>>,
Expand Down Expand Up @@ -439,7 +439,6 @@ where
.collect_vec(),
None => table_columns.iter().map(|c| c.column_id).collect_vec(),
};
let dist_key_in_pk_indices = get_dist_key_in_pk_indices(&dist_key_indices, &pk_indices);
Self {
table_id,
local_store: local_state_store,
Expand Down
6 changes: 3 additions & 3 deletions src/stream/src/from_proto/batch_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,14 @@ impl ExecutorBuilder for BatchQueryExecutorBuilder {
.map(|k| k.column_index as usize)
.collect_vec();

let dist_key_indices = table_desc
.dist_key_indices
let dist_key_in_pk_indices = table_desc
.dist_key_in_pk_indices
.iter()
.map(|&k| k as usize)
.collect_vec();
let distribution = match params.vnode_bitmap {
Some(vnodes) => Distribution {
dist_key_indices,
dist_key_in_pk_indices,
vnodes: vnodes.into(),
},
None => Distribution::fallback(),
Expand Down
6 changes: 3 additions & 3 deletions src/stream/src/from_proto/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,14 +105,14 @@ impl ExecutorBuilder for ChainExecutorBuilder {
.map(|k| k.column_index as usize)
.collect_vec();

let dist_key_indices = table_desc
.dist_key_indices
let dist_key_in_pk_indices = table_desc
.dist_key_in_pk_indices
.iter()
.map(|&k| k as usize)
.collect_vec();
let distribution = match params.vnode_bitmap {
Some(vnodes) => Distribution {
dist_key_indices,
dist_key_in_pk_indices,
vnodes: vnodes.into(),
},
None => Distribution::fallback(),
Expand Down
6 changes: 3 additions & 3 deletions src/stream/src/from_proto/lookup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,14 @@ impl ExecutorBuilder for LookupExecutorBuilder {
.map(|k| k.column_index as usize)
.collect_vec();

let dist_key_indices = table_desc
.dist_key_indices
let dist_key_in_pk_indices = table_desc
.dist_key_in_pk_indices
.iter()
.map(|&k| k as usize)
.collect_vec();
let distribution = match params.vnode_bitmap {
Some(vnodes) => Distribution {
dist_key_indices,
dist_key_in_pk_indices,
vnodes: vnodes.into(),
},
None => Distribution::fallback(),
Expand Down
6 changes: 3 additions & 3 deletions src/stream/src/from_proto/temporal_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,14 @@ impl ExecutorBuilder for TemporalJoinExecutorBuilder {
.map(|k| k.column_index as usize)
.collect_vec();

let dist_key_indices = table_desc
.dist_key_indices
let dist_key_in_pk_indices = table_desc
.dist_key_in_pk_indices
.iter()
.map(|&k| k as usize)
.collect_vec();
let distribution = match params.vnode_bitmap.clone() {
Some(vnodes) => Distribution {
dist_key_indices,
dist_key_in_pk_indices,
vnodes: vnodes.into(),
},
None => Distribution::fallback(),
Expand Down

0 comments on commit 18863e0

Please sign in to comment.