Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade datafusion to 22 which brings arrow upgrades with it #1249

Merged
merged 12 commits into from
Apr 14, 2023
Merged
2 changes: 1 addition & 1 deletion python/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ doc = false
name = "deltalake._internal"

[dependencies]
arrow-schema = { version = "33", features = ["serde"] }
arrow-schema = { version = "36", features = ["serde"] }
chrono = "0"
env_logger = "0"
futures = "0.3"
Expand Down
19 changes: 11 additions & 8 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ readme = "README.md"
edition = "2021"

[dependencies]
arrow = { version = "33", optional = true }
arrow = { version = "36.0.0", optional = true }
async-trait = "0.1"
bytes = "1"
chrono = { version = "0.4.22", default-features = false, features = ["clock"] }
Expand All @@ -30,7 +30,7 @@ num-traits = "0.2.15"
object_store = "0.5.6"
once_cell = "1.16.0"
parking_lot = "0.12"
parquet = { version = "33", features = [
parquet = { version = "36", features = [
"async",
"object_store",
], optional = true }
Expand All @@ -54,12 +54,14 @@ rusoto_dynamodb = { version = "0.47", default-features = false, optional = true
rusoto_glue = { version = "0.47", default-features = false, optional = true }

# Datafusion
datafusion = { version = "19", optional = true }
datafusion-expr = { version = "19", optional = true }
datafusion-common = { version = "19", optional = true }
datafusion-proto = { version = "19", optional = true }
datafusion-sql = { version = "19", optional = true }
sqlparser = { version = "0.30", optional = true }
datafusion = { version = "22", optional = true }
datafusion-expr = { version = "22", optional = true }
datafusion-common = { version = "22", optional = true }
datafusion-proto = { version = "22", optional = true }
datafusion-sql = { version = "22", optional = true }
datafusion-physical-expr = { version = "22", optional = true }

sqlparser = { version = "0.32", optional = true }

# NOTE dependencies only for integration tests
fs_extra = { version = "1.2.0", optional = true }
Expand Down Expand Up @@ -94,6 +96,7 @@ datafusion = [
"datafusion-expr",
"datafusion-common",
"datafusion-proto",
"datafusion-physical-expr",
"datafusion-sql",
"sqlparser",
"arrow",
Expand Down
7 changes: 7 additions & 0 deletions rust/src/checkpoints.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
//! Implementation for writing delta checkpoints.

use arrow::datatypes::Schema as ArrowSchema;
// NOTE: Temporarily allowing these deprecated imports pending the completion of:
// <https://github.com/apache/arrow-rs/pull/3979>
use arrow::error::ArrowError;
#[allow(deprecated)]
use arrow::json::reader::{Decoder, DecoderOptions};
use chrono::{DateTime, Datelike, Duration, Utc};
use futures::StreamExt;
Expand Down Expand Up @@ -299,6 +302,9 @@ pub async fn cleanup_expired_logs_for(
}
}

// NOTE: Temporarily allowing these deprecated imports pending the completion of:
// <https://github.com/apache/arrow-rs/pull/3979>
#[allow(deprecated)]
fn parquet_bytes_from_state(state: &DeltaTableState) -> Result<bytes::Bytes, CheckpointError> {
let current_metadata = state
.current_metadata()
Expand Down Expand Up @@ -382,6 +388,7 @@ fn parquet_bytes_from_state(state: &DeltaTableState) -> Result<bytes::Bytes, Che
// Write the Checkpoint parquet file.
let mut bytes = vec![];
let mut writer = ArrowWriter::try_new(&mut bytes, arrow_schema.clone(), None)?;

let options = DecoderOptions::new().with_batch_size(CHECKPOINT_RECORD_BATCH_SIZE);
let decoder = Decoder::new(arrow_schema, options);
while let Some(batch) = decoder.next_batch(&mut jsons)? {
Expand Down
36 changes: 23 additions & 13 deletions rust/src/delta_datafusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,16 @@ use datafusion::execution::FunctionRegistry;
use datafusion::optimizer::utils::conjunction;
use datafusion::physical_expr::PhysicalSortExpr;
use datafusion::physical_optimizer::pruning::{PruningPredicate, PruningStatistics};
use datafusion::physical_plan::file_format::{partition_type_wrap, FileScanConfig};
use datafusion::physical_plan::file_format::{wrap_partition_type_in_dict, FileScanConfig};
use datafusion::physical_plan::{
ColumnStatistics, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics,
};
use datafusion_common::scalar::ScalarValue;
use datafusion_common::{Column, DataFusionError, Result as DataFusionResult};
use datafusion_common::{Column, DataFusionError, Result as DataFusionResult, ToDFSchema};
use datafusion_expr::logical_plan::CreateExternalTable;
use datafusion_expr::{Expr, Extension, LogicalPlan, TableProviderFilterPushDown};
use datafusion_physical_expr::execution_props::ExecutionProps;
use datafusion_physical_expr::{create_physical_expr, PhysicalExpr};
use datafusion_proto::logical_plan::LogicalExtensionCodec;
use datafusion_proto::physical_plan::PhysicalExtensionCodec;
use object_store::{path::Path, ObjectMeta};
Expand Down Expand Up @@ -337,11 +339,7 @@ impl PruningStatistics for DeltaTable {
fn register_store(table: &DeltaTable, env: Arc<RuntimeEnv>) {
let object_store_url = table.storage.object_store_url();
let url: &Url = object_store_url.as_ref();
env.register_object_store(
url.scheme(),
url.host_str().unwrap_or_default(),
table.object_store(),
);
env.register_object_store(url, table.object_store());
}

#[async_trait]
Expand Down Expand Up @@ -383,14 +381,15 @@ impl TableProvider for DeltaTable {

register_store(self, session.runtime_env().clone());

let filter_expr = conjunction(filters.iter().cloned())
.map(|expr| logical_expr_to_physical_expr(&expr, &schema));

// TODO we group files together by their partition values. If the table is partitioned
// and partitions are somewhat evenly distributed, probably not the worst choice ...
// However we may want to do some additional balancing in case we are far off from the above.
let mut file_groups: HashMap<Vec<ScalarValue>, Vec<PartitionedFile>> = HashMap::new();
if let Some(Some(predicate)) =
(!filters.is_empty()).then_some(conjunction(filters.iter().cloned()))
{
let pruning_predicate = PruningPredicate::try_new(predicate, schema.clone())?;
if let Some(predicate) = &filter_expr {
let pruning_predicate = PruningPredicate::try_new(predicate.clone(), schema.clone())?;
let files_to_prune = pruning_predicate.prune(&self.state)?;
self.get_state()
.files()
Expand Down Expand Up @@ -440,14 +439,16 @@ impl TableProvider for DeltaTable {
.map(|c| {
Ok((
c.to_owned(),
partition_type_wrap(schema.field_with_name(c)?.data_type().clone()),
wrap_partition_type_in_dict(
schema.field_with_name(c)?.data_type().clone(),
),
))
})
.collect::<Result<Vec<_>, ArrowError>>()?,
output_ordering: None,
infinite_source: false,
},
filters,
filter_expr.as_ref(),
)
.await?;

Expand Down Expand Up @@ -774,6 +775,15 @@ fn left_larger_than_right(left: ScalarValue, right: ScalarValue) -> Option<bool>
}
}

pub(crate) fn logical_expr_to_physical_expr(
expr: &Expr,
schema: &ArrowSchema,
) -> Arc<dyn PhysicalExpr> {
let df_schema = schema.clone().to_dfschema().unwrap();
let execution_props = ExecutionProps::new();
create_physical_expr(expr, &df_schema, schema, &execution_props).unwrap()
}

/// Responsible for checking batches of data conform to table's invariants.
#[derive(Clone)]
pub struct DeltaDataChecker {
Expand Down
8 changes: 5 additions & 3 deletions rust/src/operations/transaction/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use sqlparser::parser::Parser;
use sqlparser::tokenizer::Tokenizer;

use crate::action::Add;
use crate::delta_datafusion::to_correct_scalar_value;
use crate::delta_datafusion::{logical_expr_to_physical_expr, to_correct_scalar_value};
use crate::table_state::DeltaTableState;
use crate::DeltaResult;
use crate::{schema, DeltaTableError};
Expand All @@ -41,7 +41,8 @@ impl DeltaTableState {
if let Some(Some(predicate)) =
(!filters.is_empty()).then_some(conjunction(filters.iter().cloned()))
{
let pruning_predicate = PruningPredicate::try_new(predicate, self.arrow_schema()?)?;
let expr = logical_expr_to_physical_expr(&predicate, self.arrow_schema()?.as_ref());
let pruning_predicate = PruningPredicate::try_new(expr, self.arrow_schema()?)?;
Ok(Either::Left(
self.files()
.iter()
Expand Down Expand Up @@ -183,7 +184,8 @@ impl<'a> AddContainer<'a> {
/// so evaluating expressions is inexact. However, excluded files are guaranteed (for a correct log)
/// to not contain matches by the predicate expression.
pub fn predicate_matches(&self, predicate: Expr) -> DeltaResult<impl Iterator<Item = &Add>> {
let pruning_predicate = PruningPredicate::try_new(predicate, self.schema.clone())?;
let expr = logical_expr_to_physical_expr(&predicate, &self.schema);
let pruning_predicate = PruningPredicate::try_new(expr, self.schema.clone())?;
Ok(self
.inner
.iter()
Expand Down
6 changes: 6 additions & 0 deletions rust/src/writer/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ use arrow::datatypes::{
TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType, UInt16Type, UInt32Type,
UInt64Type, UInt8Type,
};
// NOTE: Temporarily allowing these deprecated imports pending the completion of:
// <https://github.com/apache/arrow-rs/pull/3979>
#[allow(deprecated)]
use arrow::json::reader::{Decoder, DecoderOptions};
use arrow::record_batch::*;
use object_store::path::Path;
Expand Down Expand Up @@ -104,6 +107,9 @@ pub(crate) fn next_data_path(
Ok(Path::from(format!("{partition_key}/{file_name}")))
}

// NOTE: Temporarily allowing these deprecated imports pending the completion of:
// <https://github.com/apache/arrow-rs/pull/3979>
#[allow(deprecated)]
/// Convert a vector of json values to a RecordBatch
pub fn record_batch_from_message(
arrow_schema: Arc<ArrowSchema>,
Expand Down
14 changes: 7 additions & 7 deletions rust/tests/common/datafusion.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
use datafusion::datasource::datasource::TableProviderFactory;
use datafusion::execution::context::SessionContext;
use datafusion::execution::context::{SessionContext, SessionState};
use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use datafusion::prelude::SessionConfig;
use deltalake::delta_datafusion::DeltaTableFactory;
use std::collections::HashMap;
use std::sync::Arc;

pub fn context_with_delta_table_factory() -> SessionContext {
let mut table_factories: HashMap<String, Arc<dyn TableProviderFactory>> = HashMap::new();
table_factories.insert("DELTATABLE".to_string(), Arc::new(DeltaTableFactory {}));
let cfg = RuntimeConfig::new().with_table_factories(table_factories);
let cfg = RuntimeConfig::new();
let env = RuntimeEnv::new(cfg).unwrap();
let ses = SessionConfig::new();
SessionContext::with_config_rt(ses, Arc::new(env))
let mut state = SessionState::with_config_rt(ses, Arc::new(env));
state
.table_factories_mut()
.insert("DELTATABLE".to_string(), Arc::new(DeltaTableFactory {}));
SessionContext::with_state(state)
}
8 changes: 3 additions & 5 deletions rust/tests/datafusion_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,11 +207,9 @@ async fn test_datafusion_write_from_serialized_delta_scan() -> Result<()> {
let source_store = DeltaObjectStore::try_new(source_location, HashMap::new()).unwrap();
let object_store_url = source_store.object_store_url();
let source_store_url: &Url = object_store_url.as_ref();
state.runtime_env().register_object_store(
source_store_url.scheme(),
source_store_url.host_str().unwrap_or_default(),
Arc::from(source_store),
);
state
.runtime_env()
.register_object_store(source_store_url, Arc::from(source_store));

// Execute write to the target table with the proper state
let target_table = WriteBuilder::new()
Expand Down