From 71408837979daa88c2a7b749ac14be66a4a3e91c Mon Sep 17 00:00:00 2001 From: zouxiang Date: Wed, 7 Jun 2023 11:14:33 +0800 Subject: [PATCH 1/3] refactor: add do_with_bytes to DatumView, and use DatumView to optimize sst filter build --- analytic_engine/src/sst/parquet/writer.rs | 4 +- common_types/src/datum.rs | 70 +++++++++++++++++++++++ 2 files changed, 72 insertions(+), 2 deletions(-) diff --git a/analytic_engine/src/sst/parquet/writer.rs b/analytic_engine/src/sst/parquet/writer.rs index 5e1adddb05..8bba1b41a2 100644 --- a/analytic_engine/src/sst/parquet/writer.rs +++ b/analytic_engine/src/sst/parquet/writer.rs @@ -152,8 +152,8 @@ impl RecordBatchGroupWriter { for partial_batch in row_group_batch { for (col_idx, column) in partial_batch.columns().iter().enumerate() { for row in 0..column.num_rows() { - let datum = column.datum(row); - datum.do_with_bytes(|bytes| { + let datum_view = column.datum_view(row); + datum_view.do_with_bytes(|bytes| { builder.add_key(col_idx, bytes); }); } diff --git a/common_types/src/datum.rs b/common_types/src/datum.rs index bf694a5c6d..6575c3feb9 100644 --- a/common_types/src/datum.rs +++ b/common_types/src/datum.rs @@ -1010,6 +1010,76 @@ impl<'a> DatumView<'a> { DatumView::Time(_) => DatumKind::Time, } } + + pub fn do_with_bytes(&self, mut f: F) + where + F: FnMut(&[u8]), + { + match self { + DatumView::Double(v) => { + let arr = v.to_le_bytes(); + f(arr.as_slice()) + } + DatumView::Float(v) => { + let arr = v.to_le_bytes(); + f(arr.as_slice()) + } + DatumView::UInt64(v) => { + let arr = v.to_le_bytes(); + f(arr.as_slice()) + } + DatumView::UInt32(v) => { + let arr = v.to_le_bytes(); + f(arr.as_slice()) + } + DatumView::UInt16(v) => { + let arr = v.to_le_bytes(); + f(arr.as_slice()) + } + DatumView::UInt8(v) => { + let arr = v.to_le_bytes(); + f(arr.as_slice()) + } + DatumView::Int64(v) => { + let arr = v.to_le_bytes(); + f(arr.as_slice()) + } + DatumView::Int32(v) => { + let arr = v.to_le_bytes(); + f(arr.as_slice()) + } + DatumView::Int16(v) => { + let arr = v.to_le_bytes(); + f(arr.as_slice()) + } + DatumView::Int8(v) => { + let arr = v.to_le_bytes(); + f(arr.as_slice()) + } + DatumView::Boolean(v) => { + if *v { + f(&[1]) + } else { + f(&[0]) + } + } + DatumView::Null => f(&[0]), + DatumView::Timestamp(v) => { + let arr = v.as_i64().to_le_bytes(); + f(arr.as_slice()) + } + DatumView::Varbinary(v) => f(v.as_ref()), + DatumView::String(v) => f(v.as_bytes()), + DatumView::Date(v) => { + let arr = v.to_le_bytes(); + f(arr.as_slice()) + } + DatumView::Time(v) => { + let arr = v.to_le_bytes(); + f(arr.as_slice()) + } + } + } } #[cfg(feature = "arrow")] From 37cf7c06697e031413f11949c0cf104447387f8e Mon Sep 17 00:00:00 2001 From: zouxiang Date: Wed, 7 Jun 2023 11:28:17 +0800 Subject: [PATCH 2/3] refactor: when iterate a RowViewOnBatch, returns a DatumView instead of a Datum. --- common_types/src/column.rs | 14 ++++++++++++++ common_types/src/record_batch.rs | 6 +++--- common_types/src/row/mod.rs | 7 ++++--- 3 files changed, 21 insertions(+), 6 deletions(-) diff --git a/common_types/src/column.rs b/common_types/src/column.rs index 7de424aeec..4c09a84644 100644 --- a/common_types/src/column.rs +++ b/common_types/src/column.rs @@ -235,6 +235,14 @@ macro_rules! impl_column { Some(self.datum(index)) } + pub fn datum_view_opt(&self, index: usize) -> Option { + if index >= self.0.len() { + return None; + } + + Some(self.datum_view(index)) + } + pub fn datum_view(&self, index: usize) -> DatumView { // If this datum is null. if self.0.is_null(index) { @@ -545,6 +553,12 @@ macro_rules! impl_column_block { } } + pub fn datum_view_opt(&self, index: usize) -> Option { + match self { + $(ColumnBlock::$Kind(col) => col.datum_view_opt(index),)* + } + } + /// Panic if index is out fo bound. pub fn datum_view(&self, index: usize) -> DatumView { match self { diff --git a/common_types/src/record_batch.rs b/common_types/src/record_batch.rs index 80386a895d..a7a73c9381 100644 --- a/common_types/src/record_batch.rs +++ b/common_types/src/record_batch.rs @@ -535,9 +535,9 @@ impl RecordBatchWithKeyBuilder { /// /// REQUIRE: The `row_view` and the builder must have the same schema. pub fn append_row_view(&mut self, row_view: &RowViewOnBatch) -> Result<()> { - for (builder, datum) in self.builders.iter_mut().zip(row_view.iter_columns()) { - let datum = datum.context(IterateDatum)?; - builder.append(datum).context(AppendDatum)?; + for (builder, datum_view) in self.builders.iter_mut().zip(row_view.iter_columns()) { + let datum_view = datum_view.context(IterateDatum)?; + builder.append_view(datum_view).context(AppendDatum)?; } Ok(()) diff --git a/common_types/src/row/mod.rs b/common_types/src/row/mod.rs index a6d17b8546..305e6aa075 100644 --- a/common_types/src/row/mod.rs +++ b/common_types/src/row/mod.rs @@ -16,6 +16,7 @@ use crate::{ schema::{RecordSchemaWithKey, Schema}, time::Timestamp, }; +use crate::datum::DatumView; pub mod contiguous; @@ -607,7 +608,7 @@ impl<'a> RowView for RowViewOnBatch<'a> { } impl<'a> Iterator for RowViewOnBatchColumnIter<'a> { - type Item = Result; + type Item = Result>; fn next(&mut self) -> Option { if self.next_column_idx >= self.record_batch.num_columns() { @@ -616,11 +617,11 @@ impl<'a> Iterator for RowViewOnBatchColumnIter<'a> { let curr_column_idx = self.next_column_idx; let column = self.record_batch.column(curr_column_idx); - let datum = column.datum_opt(self.row_idx).map(Ok); + let datum_view = column.datum_view_opt(self.row_idx).map(Ok); self.next_column_idx += 1; - datum + datum_view } } From 604da01d436434a494bcdca855f778d5967f3c14 Mon Sep 17 00:00:00 2001 From: zouxiang Date: Wed, 7 Jun 2023 11:51:02 +0800 Subject: [PATCH 3/3] style: fix style check --- common_types/src/datum.rs | 6 +++--- common_types/src/row/mod.rs | 3 +-- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/common_types/src/datum.rs b/common_types/src/datum.rs index 6575c3feb9..6f01baa8d8 100644 --- a/common_types/src/datum.rs +++ b/common_types/src/datum.rs @@ -1012,8 +1012,8 @@ impl<'a> DatumView<'a> { } pub fn do_with_bytes(&self, mut f: F) - where - F: FnMut(&[u8]), + where + F: FnMut(&[u8]), { match self { DatumView::Double(v) => { @@ -1068,7 +1068,7 @@ impl<'a> DatumView<'a> { let arr = v.as_i64().to_le_bytes(); f(arr.as_slice()) } - DatumView::Varbinary(v) => f(v.as_ref()), + DatumView::Varbinary(v) => f(v), DatumView::String(v) => f(v.as_bytes()), DatumView::Date(v) => { let arr = v.to_le_bytes(); diff --git a/common_types/src/row/mod.rs b/common_types/src/row/mod.rs index 305e6aa075..9fb93e56c3 100644 --- a/common_types/src/row/mod.rs +++ b/common_types/src/row/mod.rs @@ -11,12 +11,11 @@ use snafu::{ensure, Backtrace, OptionExt, Snafu}; use crate::{ column_schema::ColumnSchema, - datum::{Datum, DatumKind}, + datum::{Datum, DatumKind, DatumView}, record_batch::RecordBatchWithKey, schema::{RecordSchemaWithKey, Schema}, time::Timestamp, }; -use crate::datum::DatumView; pub mod contiguous;