Skip to content

Commit

Permalink
feat(batch): prune scan partition according to scan_range (#3698)
Browse files Browse the repository at this point in the history
* prune vnode according to scan_range

* disallow accessing other vnodes from batch scan

* improve style

* add ut

* fix doc about dist key

* fix

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
xxchan and mergify[bot] authored Jul 7, 2022
1 parent ed4cb79 commit e1fd06b
Show file tree
Hide file tree
Showing 5 changed files with 160 additions and 34 deletions.
2 changes: 1 addition & 1 deletion src/frontend/src/catalog/table_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ use crate::optimizer::property::FieldOrder;
///
/// If there's no `ORDER BY` clause, the order key will be the same as pk.
///
/// - **Distribution Key**: the columns used to partition the data. It must be a prefix of the order
/// - **Distribution Key**: the columns used to partition the data. It must be a subset of the order
/// key.
#[derive(Clone, Debug, PartialEq)]
pub struct TableCatalog {
Expand Down
4 changes: 4 additions & 0 deletions src/frontend/src/optimizer/plan_node/batch_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ impl BatchSeqScan {
pub fn logical(&self) -> &LogicalScan {
&self.logical
}

pub fn scan_range(&self) -> &ScanRange {
&self.scan_range
}
}

impl_plan_tree_node_for_leaf! { BatchSeqScan }
Expand Down
55 changes: 44 additions & 11 deletions src/frontend/src/scheduler/plan_fragmenter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@ use std::collections::{HashMap, HashSet};
use std::fmt::{Debug, Formatter};
use std::sync::Arc;

use risingwave_common::buffer::BitmapBuilder;
use risingwave_common::buffer::{Bitmap, BitmapBuilder};
use risingwave_common::types::ParallelUnitId;
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::ExchangeInfo;
use risingwave_pb::common::Buffer;
use risingwave_pb::plan_common::Field as FieldProst;
use uuid::Uuid;

use crate::optimizer::plan_node::{PlanNodeId, PlanNodeType};
use crate::optimizer::plan_node::{BatchSeqScan, PlanNodeId, PlanNodeType};
use crate::optimizer::property::Distribution;
use crate::optimizer::PlanRef;
use crate::scheduler::worker_node_manager::WorkerNodeManagerRef;
Expand Down Expand Up @@ -164,7 +164,8 @@ impl Query {

#[derive(Clone)]
pub struct TableScanInfo {
/// Indicates data distribution and partition of the table.
/// Indicates the table partitions to be read by scan tasks. Unnecessary partitions are already
/// pruned.
///
/// `None` if the table is not partitioned (system table).
pub vnode_bitmaps: Option<HashMap<ParallelUnitId, Buffer>>,
Expand Down Expand Up @@ -391,9 +392,9 @@ impl BatchPlanFragmenter {
builder.root = Some(Arc::new(execution_plan_node));
}
// Check out the comments for `has_table_scan` in `QueryStage`.
if let Some(scan_node) = node.as_batch_seq_scan() {
let table_desc = scan_node.logical().table_desc();

let scan_node: Option<&BatchSeqScan> = node.as_batch_seq_scan();
if let Some(scan_node) = scan_node {
// TODO: handle multiple table scan inside a stage
assert!(
builder.table_scan_info.is_none()
|| builder
Expand All @@ -404,11 +405,38 @@ impl BatchPlanFragmenter {
.is_none(),
"multiple table scan inside a stage"
);
builder.table_scan_info = Some(TableScanInfo {
vnode_bitmaps: table_desc
.vnode_mapping
.clone()
.map(vnode_mapping_to_owner_mapping),

builder.table_scan_info = Some({
let table_desc = scan_node.logical().table_desc();

let vnode_bitmaps =
table_desc.vnode_mapping.as_ref().map(|vnode_mapping| {
let num_vnodes = vnode_mapping.len();
let scan_range = scan_node.scan_range();
// Try to derive the partition to read from the scan range.
// It can be derived if the value of the distribution key is already
// known.
match scan_range.try_compute_vnode(
&table_desc.distribution_key,
&table_desc.order_column_indices(),
) {
// scan all partitions
None => vnode_mapping_to_owner_mapping(vnode_mapping.clone()),
// scan a single partition
Some(vnode) => {
let parallel_unit_id = vnode_mapping[vnode as usize];
let mut vnode_bitmaps = HashMap::new();
vnode_bitmaps.insert(
parallel_unit_id,
bitmap_with_single_vnode(vnode as usize, num_vnodes)
.to_protobuf(),
);
vnode_bitmaps
}
}
});

TableScanInfo { vnode_bitmaps }
});
}
}
Expand Down Expand Up @@ -453,6 +481,11 @@ fn vnode_mapping_to_owner_mapping(
.collect()
}

fn bitmap_with_single_vnode(vnode: usize, num_vnodes: usize) -> Bitmap {
let mut bitmap = BitmapBuilder::zeroed(num_vnodes);
bitmap.set(vnode as usize, true);
bitmap.finish()
}
#[cfg(test)]
mod tests {
use std::collections::{HashMap, HashSet};
Expand Down
97 changes: 96 additions & 1 deletion src/frontend/src/utils/scan_range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@

use std::ops::{Bound, RangeBounds};

use risingwave_common::types::ScalarImpl;
use itertools::Itertools;
use risingwave_common::array::Row;
use risingwave_common::types::{ScalarImpl, VirtualNode};
use risingwave_common::util::hash_util::CRC32FastBuilder;
use risingwave_pb::batch_plan::scan_range::Bound as BoundProst;
use risingwave_pb::batch_plan::ScanRange as ScanRangeProst;

Expand Down Expand Up @@ -58,6 +61,45 @@ impl ScanRange {
range: full_range(),
}
}

pub fn try_compute_vnode(
&self,
dist_key_indices: &[usize],
pk_indices: &[usize],
) -> Option<VirtualNode> {
if dist_key_indices.is_empty() {
return None;
}

let dist_key_in_pk_indices = dist_key_indices
.iter()
.map(|&di| {
pk_indices
.iter()
.position(|&pi| di == pi)
.unwrap_or_else(|| {
panic!(
"distribution keys {:?} must be a subset of primary keys {:?}",
dist_key_indices, pk_indices
)
})
})
.collect_vec();
let pk_prefix_len = self.eq_conds.len();
if dist_key_in_pk_indices.iter().any(|&i| i >= pk_prefix_len) {
return None;
}

let pk_prefix_value = Row(self
.eq_conds
.iter()
.map(|scalar| Some(scalar.clone()))
.collect());
let vnode = pk_prefix_value
.hash_by_indices(&dist_key_in_pk_indices, &CRC32FastBuilder {})
.to_vnode();
Some(vnode)
}
}

pub const fn full_range<T>() -> (Bound<T>, Bound<T>) {
Expand All @@ -68,3 +110,56 @@ pub fn is_full_range<T>(bounds: &impl RangeBounds<T>) -> bool {
matches!(bounds.start_bound(), Bound::Unbounded)
&& matches!(bounds.end_bound(), Bound::Unbounded)
}

#[cfg(test)]
mod tests {
use super::*;

// dist_key is prefix of pk
#[test]
fn test_vnode_prefix() {
let dist_key = vec![1, 3];
let pk = vec![1, 3, 2];

let mut scan_range = ScanRange::full_table_scan();
assert!(scan_range.try_compute_vnode(&dist_key, &pk).is_none());

scan_range.eq_conds.push(ScalarImpl::from(114));
assert!(scan_range.try_compute_vnode(&dist_key, &pk).is_none());

scan_range.eq_conds.push(ScalarImpl::from(514));
let vnode = Row(vec![
Some(ScalarImpl::from(114)),
Some(ScalarImpl::from(514)),
])
.hash_by_indices(&[0, 1], &CRC32FastBuilder {})
.to_vnode();
assert_eq!(scan_range.try_compute_vnode(&dist_key, &pk), Some(vnode));
}

// dist_key is not prefix of pk
#[test]
fn test_vnode_not_prefix() {
let dist_key = vec![2, 3];
let pk = vec![1, 3, 2];

let mut scan_range = ScanRange::full_table_scan();
assert!(scan_range.try_compute_vnode(&dist_key, &pk).is_none());

scan_range.eq_conds.push(ScalarImpl::from(114));
assert!(scan_range.try_compute_vnode(&dist_key, &pk).is_none());

scan_range.eq_conds.push(ScalarImpl::from(514));
assert!(scan_range.try_compute_vnode(&dist_key, &pk).is_none());

scan_range.eq_conds.push(ScalarImpl::from(114514));
let vnode = Row(vec![
Some(ScalarImpl::from(114)),
Some(ScalarImpl::from(514)),
Some(ScalarImpl::from(114514)),
])
.hash_by_indices(&[2, 1], &CRC32FastBuilder {})
.to_vnode();
assert_eq!(scan_range.try_compute_vnode(&dist_key, &pk), Some(vnode));
}
}
36 changes: 15 additions & 21 deletions src/storage/src/table/storage_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,25 +275,13 @@ impl<S: StateStore, E: Encoding, const T: AccessType> StorageTableBase<S, E, T>
/// Get
impl<S: StateStore, E: Encoding, const T: AccessType> StorageTableBase<S, E, T> {
/// Check whether the given `vnode` is set in the `vnodes` of this table.
///
/// - For `READ_WRITE` or streaming usages, this will panic on `false` and always return `true`
/// since the table should only be used to access entries with vnode specified in
/// `self.vnodes`.
/// - For `READ_ONLY` or batch usages, this will return the result verbatim. The caller may
/// filter out the scanned row according to the result.
fn check_vnode_is_set(&self, vnode: VirtualNode) -> bool {
fn check_vnode_is_set(&self, vnode: VirtualNode) {
let is_set = self.vnodes.is_set(vnode as usize).unwrap();
match T {
READ_WRITE => {
assert!(
is_set,
"vnode {} should not be accessed by this table: {:#?}, dist key {:?}",
vnode, self.table_columns, self.dist_key_indices
);
}
READ_ONLY => {}
}
is_set
assert!(
is_set,
"vnode {} should not be accessed by this table: {:#?}, dist key {:?}",
vnode, self.table_columns, self.dist_key_indices
);
}

/// Get vnode value with `indices` on the given `row`. Should not be used directly.
Expand All @@ -307,7 +295,7 @@ impl<S: StateStore, E: Encoding, const T: AccessType> StorageTableBase<S, E, T>

tracing::trace!(target: "events::storage::storage_table", "compute vnode: {:?} keys {:?} => {}", row, indices, vnode);

let _ = self.check_vnode_is_set(vnode);
self.check_vnode_is_set(vnode);
vnode
}

Expand Down Expand Up @@ -355,7 +343,10 @@ impl<S: StateStore, E: Encoding, const T: AccessType> StorageTableBase<S, E, T>
}

let result = deserializer.take();
Ok(result.and_then(|(vnode, _pk, row)| self.check_vnode_is_set(vnode).then_some(row)))
Ok(result.map(|(vnode, _pk, row)| {
self.check_vnode_is_set(vnode);
row
}))
}

/// Get a single row by range scan
Expand All @@ -374,7 +365,10 @@ impl<S: StateStore, E: Encoding, const T: AccessType> StorageTableBase<S, E, T>
}

let result = deserializer.take();
Ok(result.and_then(|(vnode, _pk, row)| self.check_vnode_is_set(vnode).then_some(row)))
Ok(result.map(|(vnode, _pk, row)| {
self.check_vnode_is_set(vnode);
row
}))
}
}

Expand Down

0 comments on commit e1fd06b

Please sign in to comment.