Skip to content

Commit

Permalink
feat: use kernel expression evaluator
Browse files Browse the repository at this point in the history
  • Loading branch information
roeap committed Aug 14, 2024
1 parent ac5225c commit cdbd392
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 96 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ debug = "line-tables-only"

[workspace.dependencies]
delta_kernel = { version = "0.3.0" }
# delta_kernel = { path = "../delta-kernel-rs/kernel" }
# delta_kernel = { path = "../delta-kernel-rs/kernel", version = "0.3.0" }

# arrow
arrow = { version = "52" }
Expand Down
5 changes: 5 additions & 0 deletions crates/core/src/kernel/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! Delta Kernel module
//!
//! The Kernel module contains all the logic for reading and processing the Delta Lake transaction log.
use delta_kernel::engine::arrow_expression::ArrowExpressionHandler;

pub mod arrow;
pub mod error;
Expand All @@ -19,3 +20,7 @@ pub trait DataCheck {
/// The SQL expression to use for the check
fn get_expression(&self) -> &str;
}

lazy_static::lazy_static! {
static ref ARROW_HANDLER: ArrowExpressionHandler = ArrowExpressionHandler {};
}
8 changes: 8 additions & 0 deletions crates/core/src/kernel/models/fields.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
//! Schema definitions for action types
use std::sync::Arc;

use delta_kernel::schema::{ArrayType, DataType, MapType, StructField, StructType};
use lazy_static::lazy_static;
Expand Down Expand Up @@ -271,3 +272,10 @@ fn deletion_vector_field() -> StructField {
pub(crate) fn log_schema() -> &'static StructType {
&LOG_SCHEMA
}

pub(crate) fn log_schema_ref() -> &'static Arc<StructType> {
lazy_static! {
static ref LOG_SCHEMA_REF: Arc<StructType> = Arc::new(LOG_SCHEMA.clone());
}
&LOG_SCHEMA_REF
}
69 changes: 46 additions & 23 deletions crates/core/src/kernel/snapshot/log_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -482,22 +482,21 @@ mod datafusion {
use ::datafusion::functions_aggregate::min_max::{MaxAccumulator, MinAccumulator};
use ::datafusion::physical_optimizer::pruning::PruningStatistics;
use ::datafusion::physical_plan::Accumulator;
use arrow::compute::concat_batches;
use arrow_arith::aggregate::sum;
use arrow_array::{ArrayRef, BooleanArray, Int64Array};
use arrow_schema::DataType as ArrowDataType;
use arrow_select::concat::concat;
use datafusion_common::scalar::ScalarValue;
use datafusion_common::stats::{ColumnStatistics, Precision, Statistics};
use datafusion_common::Column;
use delta_kernel::engine::arrow_expression::{
evaluate_expression, ArrowExpressionHandler, DefaultExpressionEvaluator,
};
use delta_kernel::engine::arrow_data::ArrowEngineData;
use delta_kernel::expressions::Expression;
use delta_kernel::schema::{DataType, PrimitiveType};
use itertools::Itertools;
use delta_kernel::{ExpressionEvaluator, ExpressionHandler};

use super::*;
use crate::kernel::arrow::extract::{extract_and_cast_opt, extract_column};
use crate::kernel::ARROW_HANDLER;

#[derive(Debug, Default, Clone)]
enum AccumulatorType {
Expand Down Expand Up @@ -721,14 +720,26 @@ mod datafusion {
} else {
Expression::Column(format!("add.stats_parsed.{}.{}", stats_field, column.name))
};
let results: Vec<_> = self
.data
.iter()
.map(|batch| evaluate_expression(&expression, batch, None))
.try_collect()
.ok()?;
let borrowed = results.iter().map(|a| a.as_ref()).collect::<Vec<_>>();
concat(borrowed.as_slice()).ok()
let evaluator = ARROW_HANDLER.get_evaluator(
crate::kernel::models::fields::log_schema_ref().clone(),
expression,
field.data_type().clone(),
);
let mut results = Vec::with_capacity(self.data.len());
for batch in self.data.iter() {
let engine = ArrowEngineData::new(batch.clone());
let result = evaluator.evaluate(&engine).ok()?;
let result = result
.as_any()
.downcast_ref::<ArrowEngineData>()
.ok_or(DeltaTableError::generic(
"failed to downcast evaluator result to ArrowEngineData.",
))
.ok()?;
results.push(result.record_batch().clone());
}
let batch = concat_batches(results[0].schema_ref(), &results).ok()?;
batch.column_by_name("output").map(|c| c.clone())
}
}

Expand Down Expand Up @@ -780,16 +791,28 @@ mod datafusion {
///
/// Note: the returned array must contain `num_containers()` rows
fn row_counts(&self, _column: &Column) -> Option<ArrayRef> {
let expression = Expression::Column("add.stats_parsed.numRecords".to_string());
let results: Vec<_> = self
.data
.iter()
.map(|batch| evaluate_expression(&expression, batch, None))
.try_collect()
.ok()?;
let borrowed = results.iter().map(|a| a.as_ref()).collect::<Vec<_>>();
let array = concat(borrowed.as_slice()).ok()?;
arrow_cast::cast(array.as_ref(), &ArrowDataType::UInt64).ok()
lazy_static::lazy_static! {
static ref ROW_COUNTS_EVAL: Arc<dyn ExpressionEvaluator> = ARROW_HANDLER.get_evaluator(
crate::kernel::models::fields::log_schema_ref().clone(),
Expression::column("add.stats_parsed.numRecords"),
DataType::Primitive(PrimitiveType::Long),
);
}
let mut results = Vec::with_capacity(self.data.len());
for batch in self.data.iter() {
let engine = ArrowEngineData::new(batch.clone());
let result = ROW_COUNTS_EVAL.evaluate(&engine).ok()?;
let result = result
.as_any()
.downcast_ref::<ArrowEngineData>()
.ok_or(DeltaTableError::generic(
"failed to downcast evaluator result to ArrowEngineData.",
))
.ok()?;
results.push(result.record_batch().clone());
}
let batch = concat_batches(results[0].schema_ref(), &results).ok()?;
arrow_cast::cast(batch.column_by_name("output")?, &ArrowDataType::UInt64).ok()
}

// This function is required since DataFusion 35.0, but is implemented as a no-op
Expand Down
4 changes: 0 additions & 4 deletions crates/core/src/kernel/snapshot/replay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -601,10 +601,6 @@ pub(super) mod tests {
use crate::table::config::TableConfig;
use crate::test_utils::{ActionFactory, TestResult, TestSchemas};

lazy_static::lazy_static! {
static ref ARROW_HANDLER: ArrowExpressionHandler = ArrowExpressionHandler {};
}

pub(crate) async fn test_log_replay(context: &IntegrationContext) -> TestResult {
let log_schema = Arc::new(StructType::new(vec![
ActionType::Add.schema_field().clone(),
Expand Down
71 changes: 3 additions & 68 deletions crates/core/src/operations/transaction/state.rs
Original file line number Diff line number Diff line change
@@ -1,83 +1,18 @@
use std::collections::HashSet;
use std::sync::Arc;

use arrow::array::{ArrayRef, BooleanArray};
use arrow::datatypes::{
DataType as ArrowDataType, Field as ArrowField, Schema as ArrowSchema,
SchemaRef as ArrowSchemaRef,
};
use arrow::datatypes::{DataType as ArrowDataType, SchemaRef as ArrowSchemaRef};
use datafusion::execution::context::SessionContext;
use datafusion::physical_optimizer::pruning::{PruningPredicate, PruningStatistics};
use datafusion_common::scalar::ScalarValue;
use datafusion_common::{Column, ToDFSchema};
use datafusion_expr::Expr;
use object_store::ObjectStore;
use parquet::arrow::arrow_reader::ArrowReaderOptions;
use parquet::arrow::async_reader::{ParquetObjectReader, ParquetRecordBatchStreamBuilder};

use crate::delta_datafusion::{get_null_of_arrow_type, to_correct_scalar_value, DataFusionMixins};
use crate::delta_datafusion::{get_null_of_arrow_type, to_correct_scalar_value};
use crate::errors::DeltaResult;
use crate::kernel::{Add, EagerSnapshot};
use crate::table::state::DeltaTableState;

impl DeltaTableState {
/// Get the physical table schema.
///
/// This will construct a schema derived from the parquet schema of the latest data file,
/// and fields for partition columns from the schema defined in table meta data.
pub async fn physical_arrow_schema(
&self,
object_store: Arc<dyn ObjectStore>,
) -> DeltaResult<ArrowSchemaRef> {
self.snapshot.physical_arrow_schema(object_store).await
}
}

impl EagerSnapshot {
/// Get the physical table schema.
///
/// This will construct a schema derived from the parquet schema of the latest data file,
/// and fields for partition columns from the schema defined in table meta data.
pub async fn physical_arrow_schema(
&self,
object_store: Arc<dyn ObjectStore>,
) -> DeltaResult<ArrowSchemaRef> {
if let Some(add) = self.file_actions()?.max_by_key(|obj| obj.modification_time) {
let file_meta = add.try_into()?;
let file_reader = ParquetObjectReader::new(object_store, file_meta);
let file_schema = ParquetRecordBatchStreamBuilder::new_with_options(
file_reader,
ArrowReaderOptions::new().with_skip_arrow_metadata(true),
)
.await?
.build()?
.schema()
.clone();

let table_schema = Arc::new(ArrowSchema::new(
self.arrow_schema()?
.fields
.clone()
.into_iter()
.map(|field| {
// field is an &Arc<Field>
let owned_field: ArrowField = field.as_ref().clone();
file_schema
.field_with_name(field.name())
// yielded with &Field
.cloned()
.unwrap_or(owned_field)
})
.collect::<Vec<ArrowField>>(),
));

Ok(table_schema)
} else {
self.arrow_schema()
}
}
}

pub struct AddContainer<'a> {
inner: &'a Vec<Add>,
partition_columns: &'a Vec<String>,
Expand Down Expand Up @@ -321,7 +256,7 @@ mod tests {
use datafusion_expr::{col, lit};

use super::*;
use crate::delta_datafusion::DataFusionFileMixins;
use crate::delta_datafusion::{DataFusionFileMixins, DataFusionMixins};
use crate::kernel::Action;
use crate::test_utils::{ActionFactory, TestSchemas};

Expand Down

0 comments on commit cdbd392

Please sign in to comment.