diff --git a/analytic_engine/src/sst/parquet/writer.rs b/analytic_engine/src/sst/parquet/writer.rs index 5e1adddb05..8bba1b41a2 100644 --- a/analytic_engine/src/sst/parquet/writer.rs +++ b/analytic_engine/src/sst/parquet/writer.rs @@ -152,8 +152,8 @@ impl RecordBatchGroupWriter { for partial_batch in row_group_batch { for (col_idx, column) in partial_batch.columns().iter().enumerate() { for row in 0..column.num_rows() { - let datum = column.datum(row); - datum.do_with_bytes(|bytes| { + let datum_view = column.datum_view(row); + datum_view.do_with_bytes(|bytes| { builder.add_key(col_idx, bytes); }); } diff --git a/common_types/src/column.rs b/common_types/src/column.rs index 7de424aeec..4c09a84644 100644 --- a/common_types/src/column.rs +++ b/common_types/src/column.rs @@ -235,6 +235,14 @@ macro_rules! impl_column { Some(self.datum(index)) } + pub fn datum_view_opt(&self, index: usize) -> Option { + if index >= self.0.len() { + return None; + } + + Some(self.datum_view(index)) + } + pub fn datum_view(&self, index: usize) -> DatumView { // If this datum is null. if self.0.is_null(index) { @@ -545,6 +553,12 @@ macro_rules! impl_column_block { } } + pub fn datum_view_opt(&self, index: usize) -> Option { + match self { + $(ColumnBlock::$Kind(col) => col.datum_view_opt(index),)* + } + } + /// Panic if index is out fo bound. pub fn datum_view(&self, index: usize) -> DatumView { match self { diff --git a/common_types/src/datum.rs b/common_types/src/datum.rs index bf694a5c6d..6f01baa8d8 100644 --- a/common_types/src/datum.rs +++ b/common_types/src/datum.rs @@ -1010,6 +1010,76 @@ impl<'a> DatumView<'a> { DatumView::Time(_) => DatumKind::Time, } } + + pub fn do_with_bytes(&self, mut f: F) + where + F: FnMut(&[u8]), + { + match self { + DatumView::Double(v) => { + let arr = v.to_le_bytes(); + f(arr.as_slice()) + } + DatumView::Float(v) => { + let arr = v.to_le_bytes(); + f(arr.as_slice()) + } + DatumView::UInt64(v) => { + let arr = v.to_le_bytes(); + f(arr.as_slice()) + } + DatumView::UInt32(v) => { + let arr = v.to_le_bytes(); + f(arr.as_slice()) + } + DatumView::UInt16(v) => { + let arr = v.to_le_bytes(); + f(arr.as_slice()) + } + DatumView::UInt8(v) => { + let arr = v.to_le_bytes(); + f(arr.as_slice()) + } + DatumView::Int64(v) => { + let arr = v.to_le_bytes(); + f(arr.as_slice()) + } + DatumView::Int32(v) => { + let arr = v.to_le_bytes(); + f(arr.as_slice()) + } + DatumView::Int16(v) => { + let arr = v.to_le_bytes(); + f(arr.as_slice()) + } + DatumView::Int8(v) => { + let arr = v.to_le_bytes(); + f(arr.as_slice()) + } + DatumView::Boolean(v) => { + if *v { + f(&[1]) + } else { + f(&[0]) + } + } + DatumView::Null => f(&[0]), + DatumView::Timestamp(v) => { + let arr = v.as_i64().to_le_bytes(); + f(arr.as_slice()) + } + DatumView::Varbinary(v) => f(v), + DatumView::String(v) => f(v.as_bytes()), + DatumView::Date(v) => { + let arr = v.to_le_bytes(); + f(arr.as_slice()) + } + DatumView::Time(v) => { + let arr = v.to_le_bytes(); + f(arr.as_slice()) + } + } + } } #[cfg(feature = "arrow")] diff --git a/common_types/src/record_batch.rs b/common_types/src/record_batch.rs index 80386a895d..a7a73c9381 100644 --- a/common_types/src/record_batch.rs +++ b/common_types/src/record_batch.rs @@ -535,9 +535,9 @@ impl RecordBatchWithKeyBuilder { /// /// REQUIRE: The `row_view` and the builder must have the same schema. pub fn append_row_view(&mut self, row_view: &RowViewOnBatch) -> Result<()> { - for (builder, datum) in self.builders.iter_mut().zip(row_view.iter_columns()) { - let datum = datum.context(IterateDatum)?; - builder.append(datum).context(AppendDatum)?; + for (builder, datum_view) in self.builders.iter_mut().zip(row_view.iter_columns()) { + let datum_view = datum_view.context(IterateDatum)?; + builder.append_view(datum_view).context(AppendDatum)?; } Ok(()) diff --git a/common_types/src/row/mod.rs b/common_types/src/row/mod.rs index a6d17b8546..9fb93e56c3 100644 --- a/common_types/src/row/mod.rs +++ b/common_types/src/row/mod.rs @@ -11,7 +11,7 @@ use snafu::{ensure, Backtrace, OptionExt, Snafu}; use crate::{ column_schema::ColumnSchema, - datum::{Datum, DatumKind}, + datum::{Datum, DatumKind, DatumView}, record_batch::RecordBatchWithKey, schema::{RecordSchemaWithKey, Schema}, time::Timestamp, @@ -607,7 +607,7 @@ impl<'a> RowView for RowViewOnBatch<'a> { } impl<'a> Iterator for RowViewOnBatchColumnIter<'a> { - type Item = Result; + type Item = Result>; fn next(&mut self) -> Option { if self.next_column_idx >= self.record_batch.num_columns() { @@ -616,11 +616,11 @@ impl<'a> Iterator for RowViewOnBatchColumnIter<'a> { let curr_column_idx = self.next_column_idx; let column = self.record_batch.column(curr_column_idx); - let datum = column.datum_opt(self.row_idx).map(Ok); + let datum_view = column.datum_view_opt(self.row_idx).map(Ok); self.next_column_idx += 1; - datum + datum_view } }