Skip to content

Commit

Permalink
feat(StorageTableIter): parameterize on decoding (risingwavelabs#3800)
Browse files Browse the repository at this point in the history
* refactor deserializer constructor

* parameterize on deserializer

* rerun ci

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
kwannoel and mergify[bot] authored Jul 12, 2022
1 parent 27a7ea8 commit 6dadbe0
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 33 deletions.
50 changes: 26 additions & 24 deletions src/storage/src/encoding/cell_based_row_deserializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,9 @@ use risingwave_common::types::{Datum, VirtualNode, VIRTUAL_NODE_SIZE};
use risingwave_common::util::value_encoding::deserialize_cell;

use super::cell_based_encoding_util::deserialize_column_id;
use crate::encoding::{ColumnDescMapping, Decoding};
use crate::table::storage_table::DEFAULT_VNODE;

/// Record mapping from [`ColumnDesc`], [`ColumnId`], and output index of columns in a table.
pub struct ColumnDescMapping {
pub output_columns: Vec<ColumnDesc>,

pub id_to_column_index: HashMap<ColumnId, usize>,
}

#[allow(clippy::len_without_is_empty)]
impl ColumnDescMapping {
/// Create a mapping with given `output_columns`.
Expand Down Expand Up @@ -163,43 +157,50 @@ impl<Desc: Deref<Target = ColumnDescMapping>> CellBasedRowDeserializer<Desc> {
Ok(result)
}

/// When we encounter a new key, we can be sure that the previous row has been fully
/// deserialized. Then we return the key and the value of the previous row.
pub fn deserialize(
// TODO: remove this once we refactored lookup in delta join with cell-based table
pub fn deserialize_without_vnode(
&mut self,
raw_key: impl AsRef<[u8]>,
cell: impl AsRef<[u8]>,
) -> Result<Option<(VirtualNode, Vec<u8>, Row)>> {
self.deserialize_inner::<true>(raw_key, cell)
self.deserialize_inner::<false>(raw_key, cell)
}

// TODO: remove this once we refactored lookup in delta join with cell-based table
pub fn deserialize_without_vnode(
/// Since [`CellBasedRowDeserializer`] can be repetitively used with different inputs,
/// it needs to be reset so that pk and data are both cleared for the next use.
pub fn reset(&mut self) {
self.data.iter_mut().for_each(|datum| {
datum.take();
});
self.current_key.take();
}
}

impl<Desc: Deref<Target = ColumnDescMapping>> Decoding<Desc> for CellBasedRowDeserializer<Desc> {
/// Constructs a new serializer.
fn create_cell_based_deserializer(column_mapping: Desc) -> Self {
Self::new(column_mapping)
}

/// When we encounter a new key, we can be sure that the previous row has been fully
/// deserialized. Then we return the key and the value of the previous row.
fn deserialize(
&mut self,
raw_key: impl AsRef<[u8]>,
cell: impl AsRef<[u8]>,
) -> Result<Option<(VirtualNode, Vec<u8>, Row)>> {
self.deserialize_inner::<false>(raw_key, cell)
self.deserialize_inner::<true>(raw_key, cell)
}

/// Take the remaining data out of the deserializer.
pub fn take(&mut self) -> Option<(VirtualNode, Vec<u8>, Row)> {
fn take(&mut self) -> Option<(VirtualNode, Vec<u8>, Row)> {
let (vnode, cur_pk_bytes) = self.current_key.take()?;
let row = Row(std::mem::replace(
&mut self.data,
vec![None; self.columns.len()],
));
Some((vnode, cur_pk_bytes, row))
}

/// Since [`CellBasedRowDeserializer`] can be repetitively used with different inputs,
/// it needs to be reset so that pk and data are both cleared for the next use.
pub fn reset(&mut self) {
self.data.iter_mut().for_each(|datum| {
datum.take();
});
self.current_key.take();
}
}

#[cfg(test)]
Expand All @@ -212,6 +213,7 @@ mod tests {

use super::make_cell_based_row_deserializer;
use crate::encoding::cell_based_encoding_util::serialize_pk_and_row_state;
use crate::encoding::Decoding;

#[test]
fn test_cell_based_deserializer() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ use risingwave_common::error::Result;
use risingwave_common::types::{Datum, VirtualNode};
use risingwave_common::util::ordered::OrderedRowDeserializer;

use super::cell_based_row_deserializer::{CellBasedRowDeserializer, ColumnDescMapping};
use super::cell_based_row_deserializer::CellBasedRowDeserializer;
use crate::encoding::{ColumnDescMapping, Decoding};

/// Similar to [`CellBasedRowDeserializer`], but for dedup pk cell encoding.
#[derive(Clone)]
Expand Down Expand Up @@ -149,7 +150,7 @@ mod tests {

use super::DedupPkCellBasedRowDeserializer;
use crate::encoding::cell_based_encoding_util::serialize_pk_and_row;
use crate::encoding::cell_based_row_deserializer::ColumnDescMapping;
use crate::encoding::ColumnDescMapping;

#[test]
fn test_cell_based_deserializer() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ mod tests {

use super::*;
use crate::encoding::cell_based_row_deserializer::make_cell_based_row_deserializer;
use crate::encoding::Decoding;
use crate::table::storage_table::DEFAULT_VNODE;

#[test]
Expand Down
29 changes: 29 additions & 0 deletions src/storage/src/encoding/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::ops::Deref;

use risingwave_common::array::Row;
use risingwave_common::catalog::{ColumnDesc, ColumnId};
use risingwave_common::error::Result;
Expand All @@ -25,6 +28,8 @@ pub mod dedup_pk_cell_based_row_serializer;

pub type KeyBytes = Vec<u8>;
pub type ValueBytes = Vec<u8>;

/// `Encoding` defines an interface for encoding a key row into kv storage.
pub trait Encoding {
/// Constructs a new serializer.
fn create_cell_based_serializer(
Expand Down Expand Up @@ -55,3 +60,27 @@ pub trait Encoding {
/// TODO: This should probably not be exposed to user.
fn column_ids(&self) -> &[ColumnId];
}

/// Record mapping from [`ColumnDesc`], [`ColumnId`], and output index of columns in a table.
pub struct ColumnDescMapping {
pub output_columns: Vec<ColumnDesc>,

pub id_to_column_index: HashMap<ColumnId, usize>,
}

/// `Decoding` defines an interface for decoding a key row from kv storage.
pub trait Decoding<Desc: Deref<Target = ColumnDescMapping>> {
/// Constructs a new serializer.
fn create_cell_based_deserializer(column_mapping: Desc) -> Self;

/// When we encounter a new key, we can be sure that the previous row has been fully
/// deserialized. Then we return the key and the value of the previous row.
fn deserialize(
&mut self,
raw_key: impl AsRef<[u8]>,
cell: impl AsRef<[u8]>,
) -> Result<Option<(VirtualNode, Vec<u8>, Row)>>;

/// Take the remaining data out of the deserializer.
fn take(&mut self) -> Option<(VirtualNode, Vec<u8>, Row)>;
}
16 changes: 9 additions & 7 deletions src/storage/src/table/storage_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,12 @@ use risingwave_hummock_sdk::key::{end_bound_of_prefix, next_key, prefixed_range,
use super::mem_table::RowOp;
use super::{Distribution, TableIter};
use crate::encoding::cell_based_encoding_util::{serialize_pk, serialize_pk_and_column_id};
use crate::encoding::cell_based_row_deserializer::{CellBasedRowDeserializer, ColumnDescMapping};
use crate::encoding::cell_based_row_deserializer::{
CellBasedRowDeserializer, GeneralCellBasedRowDeserializer,
};
use crate::encoding::cell_based_row_serializer::CellBasedRowSerializer;
use crate::encoding::dedup_pk_cell_based_row_serializer::DedupPkCellBasedRowSerializer;
use crate::encoding::Encoding;
use crate::encoding::{ColumnDescMapping, Decoding, Encoding};
use crate::error::{StorageError, StorageResult};
use crate::keyspace::StripPrefixIterator;
use crate::storage_value::StorageValue;
Expand Down Expand Up @@ -509,7 +511,7 @@ impl<S: StateStore, E: Encoding, const T: AccessType> StorageTableBase<S, E, T>
let iterators: Vec<_> = try_join_all(vnodes.map(|vnode| {
let raw_key_range = prefixed_range(encoded_key_range.clone(), &vnode.to_be_bytes());
async move {
let iter = StorageTableIterInner::new(
let iter = StorageTableIterInner::<S, GeneralCellBasedRowDeserializer>::new(
&self.keyspace,
self.mapping.clone(),
raw_key_range,
Expand Down Expand Up @@ -676,15 +678,15 @@ impl<S: StateStore, E: Encoding, const T: AccessType> StorageTableBase<S, E, T>
}

/// [`StorageTableIterInner`] iterates on the storage table.
struct StorageTableIterInner<S: StateStore> {
struct StorageTableIterInner<S: StateStore, D: Decoding<Arc<ColumnDescMapping>>> {
/// An iterator that returns raw bytes from storage.
iter: StripPrefixIterator<S::Iter>,

/// Cell-based row deserializer
cell_based_row_deserializer: CellBasedRowDeserializer<Arc<ColumnDescMapping>>,
cell_based_row_deserializer: D, // CellBasedRowDeserializer<Arc<ColumnDescMapping>>,
}

impl<S: StateStore> StorageTableIterInner<S> {
impl<S: StateStore, D: Decoding<Arc<ColumnDescMapping>>> StorageTableIterInner<S, D> {
/// If `wait_epoch` is true, it will wait for the given epoch to be committed before iteration.
async fn new<R, B>(
keyspace: &Keyspace<S>,
Expand All @@ -701,7 +703,7 @@ impl<S: StateStore> StorageTableIterInner<S> {
keyspace.state_store().wait_epoch(epoch).await?;
}

let cell_based_row_deserializer = CellBasedRowDeserializer::new(table_descs);
let cell_based_row_deserializer = D::create_cell_based_deserializer(table_descs);

let iter = keyspace.iter_with_range(raw_key_range, epoch).await?;
let iter = Self {
Expand Down

0 comments on commit 6dadbe0

Please sign in to comment.