Skip to content

Commit

Permalink
chore: update datafusion to 28 and arrow to 43
Browse files Browse the repository at this point in the history
  • Loading branch information
cmackenzie1 authored and rtyler committed Jul 30, 2023
1 parent 38b9732 commit 4f0e009
Show file tree
Hide file tree
Showing 7 changed files with 37 additions and 24 deletions.
2 changes: 1 addition & 1 deletion python/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ doc = false
name = "deltalake._internal"

[dependencies]
arrow-schema = { version = "42", features = ["serde"] }
arrow-schema = { version = "43", features = ["serde"] }
chrono = "0"
env_logger = "0"
futures = "0.3"
Expand Down
30 changes: 15 additions & 15 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@ readme = "README.md"
edition = "2021"

[dependencies]
arrow = { version = "42", optional = true }
arrow-array = { version = "42", optional = true }
arrow-buffer = { version = "42", optional = true }
arrow-cast = { version = "42", optional = true }
arrow-ord = { version = "42", optional = true }
arrow-row = { version = "42", optional = true }
arrow-schema = { version = "42", optional = true }
arrow-select = { version = "42", optional = true }
arrow = { version = "43", optional = true }
arrow-array = { version = "43", optional = true }
arrow-buffer = { version = "43", optional = true }
arrow-cast = { version = "43", optional = true }
arrow-ord = { version = "43", optional = true }
arrow-row = { version = "43", optional = true }
arrow-schema = { version = "43", optional = true }
arrow-select = { version = "43", optional = true }
async-trait = "0.1"
bytes = "1"
chrono = { version = "0.4.22", default-features = false, features = ["clock"] }
Expand All @@ -41,7 +41,7 @@ num-traits = "0.2.15"
object_store = "0.6.1"
once_cell = "1.16.0"
parking_lot = "0.12"
parquet = { version = "42", features = [
parquet = { version = "43", features = [
"async",
"object_store",
], optional = true }
Expand Down Expand Up @@ -74,12 +74,12 @@ reqwest-retry = { version = "0.2.2", optional = true }

# Datafusion
dashmap = { version = "5", optional = true }
datafusion = { version = "27", optional = true }
datafusion-expr = { version = "27", optional = true }
datafusion-common = { version = "27", optional = true }
datafusion-proto = { version = "27", optional = true }
datafusion-sql = { version = "27", optional = true }
datafusion-physical-expr = { version = "27", optional = true }
datafusion = { version = "28", optional = true }
datafusion-expr = { version = "28", optional = true }
datafusion-common = { version = "28", optional = true }
datafusion-proto = { version = "28", optional = true }
datafusion-sql = { version = "28", optional = true }
datafusion-physical-expr = { version = "28", optional = true }


sqlparser = { version = "0.35", optional = true }
Expand Down
2 changes: 1 addition & 1 deletion rust/src/action/parquet_read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -629,7 +629,7 @@ mod tests {
let preader = SerializedFileReader::new(File::open(path).unwrap()).unwrap();

let mut iter = preader.get_row_iter(None).unwrap();
let record = iter.nth(9).unwrap();
let record = iter.nth(9).unwrap().unwrap();
let add_record = record.get_group(1).unwrap();
let add_action = Add::from_parquet_record(add_record).unwrap();

Expand Down
14 changes: 10 additions & 4 deletions rust/src/delta_datafusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
use std::any::Any;
use std::collections::HashMap;
use std::convert::TryFrom;
use std::fmt::Debug;
use std::fmt::{self, Debug};
use std::sync::Arc;

use arrow::array::ArrayRef;
Expand All @@ -49,7 +49,8 @@ use datafusion::physical_optimizer::pruning::{PruningPredicate, PruningStatistic
use datafusion::physical_plan::filter::FilterExec;
use datafusion::physical_plan::limit::LocalLimitExec;
use datafusion::physical_plan::{
ColumnStatistics, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics,
ColumnStatistics, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning,
SendableRecordBatchStream, Statistics,
};
use datafusion_common::scalar::ScalarValue;
use datafusion_common::tree_node::{TreeNode, TreeNodeVisitor, VisitRecursion};
Expand Down Expand Up @@ -522,6 +523,12 @@ pub struct DeltaScan {
pub parquet_scan: Arc<dyn ExecutionPlan>,
}

impl DisplayAs for DeltaScan {
fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> std::fmt::Result {
write!(f, "DeltaScan")
}
}

impl ExecutionPlan for DeltaScan {
fn as_any(&self) -> &dyn Any {
self
Expand Down Expand Up @@ -1031,10 +1038,9 @@ impl TreeNodeVisitor for FindFilesExprProperties {
}
Expr::ScalarVariable(_, _)
| Expr::Literal(_)
| Expr::Alias(_, _)
| Expr::Alias(_)
| Expr::BinaryExpr(_)
| Expr::Like(_)
| Expr::ILike(_)
| Expr::SimilarTo(_)
| Expr::Not(_)
| Expr::IsNotNull(_)
Expand Down
9 changes: 8 additions & 1 deletion rust/src/operations/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

use std::{
collections::{HashMap, HashSet},
fmt,
sync::Arc,
time::{Instant, SystemTime, UNIX_EPOCH},
};
Expand All @@ -32,7 +33,7 @@ use datafusion::{
physical_plan::{
metrics::{ExecutionPlanMetricsSet, MetricBuilder, MetricsSet},
projection::ProjectionExec,
ExecutionPlan, RecordBatchStream, SendableRecordBatchStream,
DisplayAs, DisplayFormatType, ExecutionPlan, RecordBatchStream, SendableRecordBatchStream,
},
prelude::SessionContext,
};
Expand Down Expand Up @@ -469,6 +470,12 @@ impl UpdateCountExec {
}
}

impl DisplayAs for UpdateCountExec {
fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> std::fmt::Result {
write!(f, "UpdateCountExec")
}
}

impl ExecutionPlan for UpdateCountExec {
fn as_any(&self) -> &dyn std::any::Any {
self
Expand Down
2 changes: 1 addition & 1 deletion rust/src/table_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ impl DeltaTableState {
}
for record in preader.get_row_iter(None)? {
self.process_action(
action::Action::from_parquet_record(schema, &record)?,
action::Action::from_parquet_record(schema, &record.unwrap())?,
table_config.require_tombstones,
table_config.require_files,
)?;
Expand Down
2 changes: 1 addition & 1 deletion rust/tests/checkpoint_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ mod checkpoints_with_tombstones {
let row_iter = reader.get_row_iter(None).unwrap();
let mut actions = Vec::new();
for record in row_iter {
actions.push(Action::from_parquet_record(schema, &record).unwrap())
actions.push(Action::from_parquet_record(schema, &record.unwrap()).unwrap())
}
(schema.clone(), actions)
}
Expand Down

0 comments on commit 4f0e009

Please sign in to comment.