Skip to content

Commit

Permalink
Upgrade datafusion to 22 which brings arrow upgrades with it (#1249)
Browse files Browse the repository at this point in the history
Both @roeap and I have upgraded to datafusion 22 and arrow 36 in this
pull request. We're not yet ready to adopt arrow 37 unfortunately

---------

Co-authored-by: Robert Pack <robstar.pack@gmail.com>
  • Loading branch information
rtyler and roeap authored Apr 14, 2023
1 parent 4f9665d commit 449fe5f
Show file tree
Hide file tree
Showing 8 changed files with 63 additions and 37 deletions.
2 changes: 1 addition & 1 deletion python/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ doc = false
name = "deltalake._internal"

[dependencies]
arrow-schema = { version = "33", features = ["serde"] }
arrow-schema = { version = "36", features = ["serde"] }
chrono = "0"
env_logger = "0"
futures = "0.3"
Expand Down
19 changes: 11 additions & 8 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ readme = "README.md"
edition = "2021"

[dependencies]
arrow = { version = "33", optional = true }
arrow = { version = "36.0.0", optional = true }
async-trait = "0.1"
bytes = "1"
chrono = { version = "0.4.22", default-features = false, features = ["clock"] }
Expand All @@ -30,7 +30,7 @@ num-traits = "0.2.15"
object_store = "0.5.6"
once_cell = "1.16.0"
parking_lot = "0.12"
parquet = { version = "33", features = [
parquet = { version = "36", features = [
"async",
"object_store",
], optional = true }
Expand All @@ -54,12 +54,14 @@ rusoto_dynamodb = { version = "0.47", default-features = false, optional = true
rusoto_glue = { version = "0.47", default-features = false, optional = true }

# Datafusion
datafusion = { version = "19", optional = true }
datafusion-expr = { version = "19", optional = true }
datafusion-common = { version = "19", optional = true }
datafusion-proto = { version = "19", optional = true }
datafusion-sql = { version = "19", optional = true }
sqlparser = { version = "0.30", optional = true }
datafusion = { version = "22", optional = true }
datafusion-expr = { version = "22", optional = true }
datafusion-common = { version = "22", optional = true }
datafusion-proto = { version = "22", optional = true }
datafusion-sql = { version = "22", optional = true }
datafusion-physical-expr = { version = "22", optional = true }

sqlparser = { version = "0.32", optional = true }

# NOTE dependencies only for integration tests
fs_extra = { version = "1.2.0", optional = true }
Expand Down Expand Up @@ -94,6 +96,7 @@ datafusion = [
"datafusion-expr",
"datafusion-common",
"datafusion-proto",
"datafusion-physical-expr",
"datafusion-sql",
"sqlparser",
"arrow",
Expand Down
7 changes: 7 additions & 0 deletions rust/src/checkpoints.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
//! Implementation for writing delta checkpoints.
use arrow::datatypes::Schema as ArrowSchema;
// NOTE: Temporarily allowing these deprecated imports pending the completion of:
// <https://github.com/apache/arrow-rs/pull/3979>
use arrow::error::ArrowError;
#[allow(deprecated)]
use arrow::json::reader::{Decoder, DecoderOptions};
use chrono::{DateTime, Datelike, Duration, Utc};
use futures::StreamExt;
Expand Down Expand Up @@ -299,6 +302,9 @@ pub async fn cleanup_expired_logs_for(
}
}

// NOTE: Temporarily allowing these deprecated imports pending the completion of:
// <https://github.com/apache/arrow-rs/pull/3979>
#[allow(deprecated)]
fn parquet_bytes_from_state(state: &DeltaTableState) -> Result<bytes::Bytes, CheckpointError> {
let current_metadata = state
.current_metadata()
Expand Down Expand Up @@ -382,6 +388,7 @@ fn parquet_bytes_from_state(state: &DeltaTableState) -> Result<bytes::Bytes, Che
// Write the Checkpoint parquet file.
let mut bytes = vec![];
let mut writer = ArrowWriter::try_new(&mut bytes, arrow_schema.clone(), None)?;

let options = DecoderOptions::new().with_batch_size(CHECKPOINT_RECORD_BATCH_SIZE);
let decoder = Decoder::new(arrow_schema, options);
while let Some(batch) = decoder.next_batch(&mut jsons)? {
Expand Down
36 changes: 23 additions & 13 deletions rust/src/delta_datafusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,16 @@ use datafusion::execution::FunctionRegistry;
use datafusion::optimizer::utils::conjunction;
use datafusion::physical_expr::PhysicalSortExpr;
use datafusion::physical_optimizer::pruning::{PruningPredicate, PruningStatistics};
use datafusion::physical_plan::file_format::{partition_type_wrap, FileScanConfig};
use datafusion::physical_plan::file_format::{wrap_partition_type_in_dict, FileScanConfig};
use datafusion::physical_plan::{
ColumnStatistics, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics,
};
use datafusion_common::scalar::ScalarValue;
use datafusion_common::{Column, DataFusionError, Result as DataFusionResult};
use datafusion_common::{Column, DataFusionError, Result as DataFusionResult, ToDFSchema};
use datafusion_expr::logical_plan::CreateExternalTable;
use datafusion_expr::{Expr, Extension, LogicalPlan, TableProviderFilterPushDown};
use datafusion_physical_expr::execution_props::ExecutionProps;
use datafusion_physical_expr::{create_physical_expr, PhysicalExpr};
use datafusion_proto::logical_plan::LogicalExtensionCodec;
use datafusion_proto::physical_plan::PhysicalExtensionCodec;
use object_store::{path::Path, ObjectMeta};
Expand Down Expand Up @@ -337,11 +339,7 @@ impl PruningStatistics for DeltaTable {
fn register_store(table: &DeltaTable, env: Arc<RuntimeEnv>) {
let object_store_url = table.storage.object_store_url();
let url: &Url = object_store_url.as_ref();
env.register_object_store(
url.scheme(),
url.host_str().unwrap_or_default(),
table.object_store(),
);
env.register_object_store(url, table.object_store());
}

#[async_trait]
Expand Down Expand Up @@ -383,14 +381,15 @@ impl TableProvider for DeltaTable {

register_store(self, session.runtime_env().clone());

let filter_expr = conjunction(filters.iter().cloned())
.map(|expr| logical_expr_to_physical_expr(&expr, &schema));

// TODO we group files together by their partition values. If the table is partitioned
// and partitions are somewhat evenly distributed, probably not the worst choice ...
// However we may want to do some additional balancing in case we are far off from the above.
let mut file_groups: HashMap<Vec<ScalarValue>, Vec<PartitionedFile>> = HashMap::new();
if let Some(Some(predicate)) =
(!filters.is_empty()).then_some(conjunction(filters.iter().cloned()))
{
let pruning_predicate = PruningPredicate::try_new(predicate, schema.clone())?;
if let Some(predicate) = &filter_expr {
let pruning_predicate = PruningPredicate::try_new(predicate.clone(), schema.clone())?;
let files_to_prune = pruning_predicate.prune(&self.state)?;
self.get_state()
.files()
Expand Down Expand Up @@ -440,14 +439,16 @@ impl TableProvider for DeltaTable {
.map(|c| {
Ok((
c.to_owned(),
partition_type_wrap(schema.field_with_name(c)?.data_type().clone()),
wrap_partition_type_in_dict(
schema.field_with_name(c)?.data_type().clone(),
),
))
})
.collect::<Result<Vec<_>, ArrowError>>()?,
output_ordering: None,
infinite_source: false,
},
filters,
filter_expr.as_ref(),
)
.await?;

Expand Down Expand Up @@ -774,6 +775,15 @@ fn left_larger_than_right(left: ScalarValue, right: ScalarValue) -> Option<bool>
}
}

pub(crate) fn logical_expr_to_physical_expr(
expr: &Expr,
schema: &ArrowSchema,
) -> Arc<dyn PhysicalExpr> {
let df_schema = schema.clone().to_dfschema().unwrap();
let execution_props = ExecutionProps::new();
create_physical_expr(expr, &df_schema, schema, &execution_props).unwrap()
}

/// Responsible for checking batches of data conform to table's invariants.
#[derive(Clone)]
pub struct DeltaDataChecker {
Expand Down
8 changes: 5 additions & 3 deletions rust/src/operations/transaction/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use sqlparser::parser::Parser;
use sqlparser::tokenizer::Tokenizer;

use crate::action::Add;
use crate::delta_datafusion::to_correct_scalar_value;
use crate::delta_datafusion::{logical_expr_to_physical_expr, to_correct_scalar_value};
use crate::table_state::DeltaTableState;
use crate::DeltaResult;
use crate::{schema, DeltaTableError};
Expand All @@ -41,7 +41,8 @@ impl DeltaTableState {
if let Some(Some(predicate)) =
(!filters.is_empty()).then_some(conjunction(filters.iter().cloned()))
{
let pruning_predicate = PruningPredicate::try_new(predicate, self.arrow_schema()?)?;
let expr = logical_expr_to_physical_expr(&predicate, self.arrow_schema()?.as_ref());
let pruning_predicate = PruningPredicate::try_new(expr, self.arrow_schema()?)?;
Ok(Either::Left(
self.files()
.iter()
Expand Down Expand Up @@ -183,7 +184,8 @@ impl<'a> AddContainer<'a> {
/// so evaluating expressions is inexact. However, excluded files are guaranteed (for a correct log)
/// to not contain matches by the predicate expression.
pub fn predicate_matches(&self, predicate: Expr) -> DeltaResult<impl Iterator<Item = &Add>> {
let pruning_predicate = PruningPredicate::try_new(predicate, self.schema.clone())?;
let expr = logical_expr_to_physical_expr(&predicate, &self.schema);
let pruning_predicate = PruningPredicate::try_new(expr, self.schema.clone())?;
Ok(self
.inner
.iter()
Expand Down
6 changes: 6 additions & 0 deletions rust/src/writer/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ use arrow::datatypes::{
TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType, UInt16Type, UInt32Type,
UInt64Type, UInt8Type,
};
// NOTE: Temporarily allowing these deprecated imports pending the completion of:
// <https://github.com/apache/arrow-rs/pull/3979>
#[allow(deprecated)]
use arrow::json::reader::{Decoder, DecoderOptions};
use arrow::record_batch::*;
use object_store::path::Path;
Expand Down Expand Up @@ -104,6 +107,9 @@ pub(crate) fn next_data_path(
Ok(Path::from(format!("{partition_key}/{file_name}")))
}

// NOTE: Temporarily allowing these deprecated imports pending the completion of:
// <https://github.com/apache/arrow-rs/pull/3979>
#[allow(deprecated)]
/// Convert a vector of json values to a RecordBatch
pub fn record_batch_from_message(
arrow_schema: Arc<ArrowSchema>,
Expand Down
14 changes: 7 additions & 7 deletions rust/tests/common/datafusion.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
use datafusion::datasource::datasource::TableProviderFactory;
use datafusion::execution::context::SessionContext;
use datafusion::execution::context::{SessionContext, SessionState};
use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use datafusion::prelude::SessionConfig;
use deltalake::delta_datafusion::DeltaTableFactory;
use std::collections::HashMap;
use std::sync::Arc;

pub fn context_with_delta_table_factory() -> SessionContext {
let mut table_factories: HashMap<String, Arc<dyn TableProviderFactory>> = HashMap::new();
table_factories.insert("DELTATABLE".to_string(), Arc::new(DeltaTableFactory {}));
let cfg = RuntimeConfig::new().with_table_factories(table_factories);
let cfg = RuntimeConfig::new();
let env = RuntimeEnv::new(cfg).unwrap();
let ses = SessionConfig::new();
SessionContext::with_config_rt(ses, Arc::new(env))
let mut state = SessionState::with_config_rt(ses, Arc::new(env));
state
.table_factories_mut()
.insert("DELTATABLE".to_string(), Arc::new(DeltaTableFactory {}));
SessionContext::with_state(state)
}
8 changes: 3 additions & 5 deletions rust/tests/datafusion_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,11 +207,9 @@ async fn test_datafusion_write_from_serialized_delta_scan() -> Result<()> {
let source_store = DeltaObjectStore::try_new(source_location, HashMap::new()).unwrap();
let object_store_url = source_store.object_store_url();
let source_store_url: &Url = object_store_url.as_ref();
state.runtime_env().register_object_store(
source_store_url.scheme(),
source_store_url.host_str().unwrap_or_default(),
Arc::from(source_store),
);
state
.runtime_env()
.register_object_store(source_store_url, Arc::from(source_store));

// Execute write to the target table with the proper state
let target_table = WriteBuilder::new()
Expand Down

0 comments on commit 449fe5f

Please sign in to comment.