Skip to content

Commit

Permalink
perf(executor): decode row datums from pk (#2957)
Browse files Browse the repository at this point in the history
* add DedupPkCellBaseTableIter

* init row seq scan with pk DedupPkCellBaseTableIter

* pass pk descriptors to row seq scan

* pass pk descs to DedupPkCellBaseTableIter

* skip pk fields not in row

* test DedupPkCellBaseTableIter

* fix memcomparable failures

* refactor collect_data_chunk

* test memcomparable values
  • Loading branch information
kwannoel authored Jun 7, 2022
1 parent 2e4924c commit 109e7d0
Show file tree
Hide file tree
Showing 9 changed files with 375 additions and 53 deletions.
14 changes: 10 additions & 4 deletions src/batch/src/executor/row_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ use risingwave_common::array::DataChunk;
use risingwave_common::catalog::{ColumnDesc, Schema, TableId};
use risingwave_common::error::{Result, RwError};
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_storage::table::cell_based_table::{CellBasedTable, CellBasedTableRowIter};
use risingwave_storage::table::cell_based_table::{
CellBasedTable, CellTableChunkIter, DedupPkCellBasedTableRowIter,
};
use risingwave_storage::{dispatch_state_store, Keyspace, StateStore, StateStoreImpl};

use crate::executor::monitor::BatchMetrics;
Expand All @@ -35,13 +37,13 @@ pub struct RowSeqScanExecutor<S: StateStore> {
schema: Schema,
identity: String,
stats: Arc<BatchMetrics>,
row_iter: CellBasedTableRowIter<S>,
row_iter: DedupPkCellBasedTableRowIter<S>,
}

impl<S: StateStore> RowSeqScanExecutor<S> {
pub fn new(
schema: Schema,
row_iter: CellBasedTableRowIter<S>,
row_iter: DedupPkCellBasedTableRowIter<S>,
chunk_size: usize,
primary: bool,
identity: String,
Expand Down Expand Up @@ -100,7 +102,11 @@ impl BoxedExecutorBuilder for RowSeqScanExecutorBuilder {
let storage_stats = state_store.stats();
let batch_stats = source.context().stats();
let table = CellBasedTable::new_adhoc(keyspace, column_descs, storage_stats);
let iter = table.iter(source.epoch).await?;

let pk_descs_proto = &seq_scan_node.table_desc.as_ref().unwrap().pk;
let pk_descs = pk_descs_proto.iter().map(|d| d.into()).collect();
let iter = table.iter_with_pk(source.epoch, pk_descs).await?;

Ok(Box::new(RowSeqScanExecutor::new(
table.schema().clone(),
iter,
Expand Down
15 changes: 15 additions & 0 deletions src/common/src/catalog/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,21 @@ impl From<ProstOrderedColumnDesc> for OrderedColumnDesc {
}
}

impl From<&ProstOrderedColumnDesc> for OrderedColumnDesc {
fn from(prost: &ProstOrderedColumnDesc) -> Self {
prost.clone().into()
}
}

impl From<&OrderedColumnDesc> for ProstOrderedColumnDesc {
fn from(c: &OrderedColumnDesc) -> Self {
Self {
column_desc: Some((&c.column_desc).into()),
order: c.order.to_prost().into(),
}
}
}

#[cfg(test)]
pub mod tests {
use risingwave_pb::plan_common::ColumnDesc as ProstColumnDesc;
Expand Down
12 changes: 12 additions & 0 deletions src/common/src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,18 @@ impl DataType {
| DataType::Decimal
)
}

/// Checks if memcomparable encoding of datatype is equivalent to its value encoding.
pub fn mem_cmp_eq_value_enc(&self) -> bool {
use DataType::*;
match self {
Boolean | Int16 | Int32 | Int64 => true,
Float32 | Float64 | Decimal | Date | Varchar | Time | Timestamp | Timestampz
| Interval => false,
Struct { fields } => fields.iter().all(|dt| dt.mem_cmp_eq_value_enc()),
List { datatype } => datatype.mem_cmp_eq_value_enc(),
}
}
}

/// `Scalar` is a trait over all possible owned types in the evaluation
Expand Down
11 changes: 7 additions & 4 deletions src/common/src/util/ordered/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,18 @@ impl OrderedRow {
)
}

pub fn into_row(self) -> Row {
Row(self
.0
pub fn into_vec(self) -> Vec<Datum> {
self.0
.into_iter()
.map(|ordered_datum| match ordered_datum {
NormalOrder(datum) => datum,
ReversedOrder(datum) => datum.0,
})
.collect::<Vec<_>>())
.collect::<Vec<_>>()
}

pub fn into_row(self) -> Row {
Row(self.into_vec())
}

/// Serialize the row into a memcomparable bytes.
Expand Down
38 changes: 31 additions & 7 deletions src/compute/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use risingwave_batch::executor::{
RowSeqScanExecutor,
};
use risingwave_common::array::{Array, DataChunk, F64Array, I64Array, Row};
use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema, TableId};
use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, OrderedColumnDesc, Schema, TableId};
use risingwave_common::column_nonnull;
use risingwave_common::error::{Result, RwError};
use risingwave_common::test_prelude::DataChunkTestExt;
Expand Down Expand Up @@ -205,13 +205,24 @@ async fn test_table_v2_materialize() -> Result<()> {
let keyspace = Keyspace::table_root(memory_state_store, &source_table_id);
let table = CellBasedTable::new_adhoc(
keyspace,
column_descs,
column_descs.clone(),
Arc::new(StateStoreMetrics::unused()),
);

let ordered_column_descs: Vec<OrderedColumnDesc> = column_descs
.iter()
.take(1)
.map(|d| OrderedColumnDesc {
column_desc: d.clone(),
order: OrderType::Ascending,
})
.collect();

let scan = Box::new(RowSeqScanExecutor::new(
table.schema().clone(),
table.iter(u64::MAX).await?,
table
.iter_with_pk(u64::MAX, ordered_column_descs.clone())
.await?,
1024,
true,
"RowSeqExecutor2".to_string(),
Expand Down Expand Up @@ -270,7 +281,9 @@ async fn test_table_v2_materialize() -> Result<()> {
// Scan the table again, we are able to get the data now!
let scan = Box::new(RowSeqScanExecutor::new(
table.schema().clone(),
table.iter(u64::MAX).await?,
table
.iter_with_pk(u64::MAX, ordered_column_descs.clone())
.await?,
1024,
true,
"RowSeqScanExecutor2".to_string(),
Expand Down Expand Up @@ -338,7 +351,9 @@ async fn test_table_v2_materialize() -> Result<()> {
// Scan the table again, we are able to see the deletion now!
let scan = Box::new(RowSeqScanExecutor::new(
table.schema().clone(),
table.iter(u64::MAX).await?,
table
.iter_with_pk(u64::MAX, ordered_column_descs.clone())
.await?,
1024,
true,
"RowSeqScanExecutor2".to_string(),
Expand Down Expand Up @@ -382,7 +397,7 @@ async fn test_row_seq_scan() -> Result<()> {
);
let table = CellBasedTable::new_adhoc(
keyspace,
column_descs,
column_descs.clone(),
Arc::new(StateStoreMetrics::unused()),
);

Expand Down Expand Up @@ -410,9 +425,18 @@ async fn test_row_seq_scan() -> Result<()> {
.unwrap();
state.commit(epoch).await.unwrap();

let pk_descs: Vec<OrderedColumnDesc> = column_descs
.iter()
.take(1)
.map(|d| OrderedColumnDesc {
column_desc: d.clone(),
order: OrderType::Ascending,
})
.collect();

let executor = Box::new(RowSeqScanExecutor::new(
table.schema().clone(),
table.iter(u64::MAX).await.unwrap(),
table.iter_with_pk(u64::MAX, pk_descs).await.unwrap(),
1,
true,
"RowSeqScanExecutor2".to_string(),
Expand Down
8 changes: 7 additions & 1 deletion src/frontend/src/optimizer/plan_node/batch_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,13 @@ impl ToBatchProst for BatchSeqScan {
NodeBody::RowSeqScan(RowSeqScanNode {
table_desc: Some(CellBasedTableDesc {
table_id: self.logical.table_desc().table_id.into(),
pk: vec![], // TODO:
pk: self
.logical
.table_desc()
.order_desc
.iter()
.map(|v| v.into())
.collect(),
}),
column_descs,
})
Expand Down
Loading

0 comments on commit 109e7d0

Please sign in to comment.