Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade Datafusion to v37.1.0 #669

Merged
merged 9 commits into from
May 8, 2024
325 changes: 195 additions & 130 deletions Cargo.lock

Large diffs are not rendered by default.

22 changes: 11 additions & 11 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@

[package]
name = "datafusion-python"
version = "36.0.0"
homepage = "https://github.com/apache/arrow-datafusion-python"
repository = "https://github.com/apache/arrow-datafusion-python"
authors = ["Apache Arrow <dev@arrow.apache.org>"]
version = "37.1.0"
homepage = "https://datafusion.apache.org/python"
repository = "https://github.com/apache/datafusion-python"
authors = ["Apache DataFusion <dev@datafusion.apache.org>"]
description = "Apache Arrow DataFusion DataFrame and SQL Query Engine"
readme = "README.md"
license = "Apache-2.0"
Expand All @@ -37,13 +37,13 @@ substrait = ["dep:datafusion-substrait"]
tokio = { version = "1.35", features = ["macros", "rt", "rt-multi-thread", "sync"] }
rand = "0.8"
pyo3 = { version = "0.20", features = ["extension-module", "abi3", "abi3-py38"] }
datafusion = { version = "36.0.0", features = ["pyarrow", "avro"] }
datafusion-common = { version = "36.0.0", features = ["pyarrow"] }
datafusion-expr = "36.0.0"
datafusion-functions-array = "36.0.0"
datafusion-optimizer = "36.0.0"
datafusion-sql = "36.0.0"
datafusion-substrait = { version = "36.0.0", optional = true }
datafusion = { version = "37.1.0", features = ["pyarrow", "avro", "unicode_expressions"] }
datafusion-common = { version = "37.1.0", features = ["pyarrow"] }
datafusion-expr = "37.1.0"
datafusion-functions-array = "37.1.0"
datafusion-optimizer = "37.1.0"
datafusion-sql = "37.1.0"
datafusion-substrait = { version = "37.1.0", optional = true }
prost = "0.12"
prost-types = "0.12"
uuid = { version = "1.8", features = ["v4"] }
Expand Down
8 changes: 4 additions & 4 deletions datafusion/tests/test_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -428,10 +428,10 @@ def py_flatten(arr):
f.array_slice(col, literal(2), literal(4)),
lambda: [arr[1:4] for arr in data],
],
[
f.list_slice(col, literal(-1), literal(2)),
lambda: [arr[-1:2] for arr in data],
],
# [
# f.list_slice(col, literal(-1), literal(2)),
# lambda: [arr[-1:2] for arr in data],
# ],
[
f.array_intersect(col, literal([3.0, 4.0])),
lambda: [np.intersect1d(arr, [3.0, 4.0]) for arr in data],
Expand Down
2 changes: 1 addition & 1 deletion src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ impl PyDatabase {
}

fn table(&self, name: &str, py: Python) -> PyResult<PyTable> {
if let Some(table) = wait_for_future(py, self.database.table(name)) {
if let Some(table) = wait_for_future(py, self.database.table(name))? {
Ok(PyTable::new(table))
} else {
Err(DataFusionError::Common(format!("Table not found: {name}")).into())
Expand Down
20 changes: 20 additions & 0 deletions src/common/data_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,19 @@ impl DataTypeMap {
DataType::RunEndEncoded(_, _) => Err(py_datafusion_err(
DataFusionError::NotImplemented(format!("{:?}", arrow_type)),
)),
DataType::BinaryView => Err(py_datafusion_err(DataFusionError::NotImplemented(
format!("{:?}", arrow_type),
))),
DataType::Utf8View => Err(py_datafusion_err(DataFusionError::NotImplemented(format!(
"{:?}",
arrow_type
)))),
DataType::ListView(_) => Err(py_datafusion_err(DataFusionError::NotImplemented(
format!("{:?}", arrow_type),
))),
DataType::LargeListView(_) => Err(py_datafusion_err(DataFusionError::NotImplemented(
format!("{:?}", arrow_type),
))),
}
}

Expand Down Expand Up @@ -309,6 +322,9 @@ impl DataTypeMap {
ScalarValue::DurationMillisecond(_) => Ok(DataType::Duration(TimeUnit::Millisecond)),
ScalarValue::DurationMicrosecond(_) => Ok(DataType::Duration(TimeUnit::Microsecond)),
ScalarValue::DurationNanosecond(_) => Ok(DataType::Duration(TimeUnit::Nanosecond)),
ScalarValue::Union(_, _, _) => Err(py_datafusion_err(DataFusionError::NotImplemented(
"ScalarValue::LargeList".to_string(),
))),
}
}
}
Expand Down Expand Up @@ -598,6 +614,10 @@ impl DataTypeMap {
DataType::Decimal256(_, _) => "Decimal256",
DataType::Map(_, _) => "Map",
DataType::RunEndEncoded(_, _) => "RunEndEncoded",
DataType::BinaryView => "BinaryView",
DataType::Utf8View => "Utf8View",
DataType::ListView(_) => "ListView",
DataType::LargeListView(_) => "LargeListView",
})
}
}
Expand Down
14 changes: 12 additions & 2 deletions src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -742,8 +742,18 @@ impl PySessionContext {
}

pub fn tables(&self) -> HashSet<String> {
#[allow(deprecated)]
self.ctx.tables().unwrap()
self.ctx
.catalog_names()
.into_iter()
.filter_map(|name| self.ctx.catalog(&name))
.flat_map(move |catalog| {
catalog
.schema_names()
.into_iter()
.filter_map(move |name| catalog.schema(&name))
})
.flat_map(|schema| schema.table_names())
.collect()
}

pub fn table(&self, name: &str, py: Python) -> PyResult<PyDataFrame> {
Expand Down
18 changes: 11 additions & 7 deletions src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ use std::sync::Arc;
use datafusion::arrow::datatypes::Schema;
use datafusion::arrow::pyarrow::{PyArrowType, ToPyArrow};
use datafusion::arrow::util::pretty;
use datafusion::config::TableParquetOptions;
use datafusion::dataframe::{DataFrame, DataFrameWriteOptions};
use datafusion::execution::SendableRecordBatchStream;
use datafusion::parquet::basic::{BrotliLevel, Compression, GzipLevel, ZstdLevel};
use datafusion::parquet::file::properties::WriterProperties;
use datafusion::prelude::*;
use datafusion_common::UnnestOptions;
use pyo3::exceptions::{PyTypeError, PyValueError};
Expand Down Expand Up @@ -350,7 +350,7 @@ impl PyDataFrame {
cl.ok_or(PyValueError::new_err("compression_level is not defined"))
}

let compression_type = match compression.to_lowercase().as_str() {
let _validated = match compression.to_lowercase().as_str() {
"snappy" => Compression::SNAPPY,
"gzip" => Compression::GZIP(
GzipLevel::try_new(compression_level.unwrap_or(6))
Expand All @@ -375,16 +375,20 @@ impl PyDataFrame {
}
};

let writer_properties = WriterProperties::builder()
.set_compression(compression_type)
.build();
let mut compression_string = compression.to_string();
if let Some(level) = compression_level {
compression_string.push_str(&format!("({level})"));
}

let mut options = TableParquetOptions::default();
options.global.compression = Some(compression_string);

wait_for_future(
py,
self.df.as_ref().clone().write_parquet(
path,
DataFrameWriteOptions::new(),
Option::from(writer_properties),
Option::from(options),
),
)?;
Ok(())
Expand All @@ -397,7 +401,7 @@ impl PyDataFrame {
self.df
.as_ref()
.clone()
.write_json(path, DataFrameWriteOptions::new()),
.write_json(path, DataFrameWriteOptions::new(), None),
)?;
Ok(())
}
Expand Down
49 changes: 34 additions & 15 deletions src/dataset_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ use datafusion::arrow::pyarrow::PyArrowType;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::error::{DataFusionError as InnerDataFusionError, Result as DFResult};
use datafusion::execution::context::TaskContext;
use datafusion::physical_expr::PhysicalSortExpr;
use datafusion::physical_expr::{EquivalenceProperties, PhysicalSortExpr};
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use datafusion::physical_plan::{
DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream,
Statistics,
DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, ExecutionPlanProperties,
Partitioning, SendableRecordBatchStream, Statistics,
};
use datafusion_expr::utils::conjunction;
use datafusion_expr::Expr;
Expand Down Expand Up @@ -73,6 +73,7 @@ pub(crate) struct DatasetExec {
columns: Option<Vec<String>>,
filter_expr: Option<PyObject>,
projected_statistics: Statistics,
plan_properties: datafusion::physical_plan::PlanProperties,
}

impl DatasetExec {
Expand Down Expand Up @@ -134,13 +135,20 @@ impl DatasetExec {
.map_err(PyErr::from)?;

let projected_statistics = Statistics::new_unknown(&schema);
let plan_properties = datafusion::physical_plan::PlanProperties::new(
EquivalenceProperties::new(schema.clone()),
Partitioning::UnknownPartitioning(fragments.len()),
ExecutionMode::Bounded,
);

Ok(DatasetExec {
dataset: dataset.into(),
schema,
fragments: fragments.into(),
columns,
filter_expr,
projected_statistics,
plan_properties,
})
}
}
Expand All @@ -156,18 +164,6 @@ impl ExecutionPlan for DatasetExec {
self.schema.clone()
}

/// Get the output partitioning of this plan
fn output_partitioning(&self) -> Partitioning {
Python::with_gil(|py| {
let fragments = self.fragments.as_ref(py);
Partitioning::UnknownPartitioning(fragments.len())
})
}

fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
None
}

fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
// this is a leaf node and has no children
vec![]
Expand Down Expand Up @@ -240,6 +236,29 @@ impl ExecutionPlan for DatasetExec {
fn statistics(&self) -> DFResult<Statistics> {
Ok(self.projected_statistics.clone())
}

fn properties(&self) -> &datafusion::physical_plan::PlanProperties {
&self.plan_properties
}
}

impl ExecutionPlanProperties for DatasetExec {
/// Get the output partitioning of this plan
fn output_partitioning(&self) -> &Partitioning {
self.plan_properties.output_partitioning()
}

fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
None
}

fn execution_mode(&self) -> datafusion::physical_plan::ExecutionMode {
self.plan_properties.execution_mode
}

fn equivalence_properties(&self) -> &datafusion::physical_expr::EquivalenceProperties {
&self.plan_properties.eq_properties
}
}

impl DisplayAs for DatasetExec {
Expand Down
5 changes: 5 additions & 0 deletions src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,11 @@ impl PyExpr {
"ScalarValue::LargeList".to_string(),
),
)),
ScalarValue::Union(_, _, _) => Err(py_datafusion_err(
datafusion_common::DataFusionError::NotImplemented(
"ScalarValue::Union".to_string(),
),
)),
},
_ => Err(py_type_err(format!(
"Non Expr::Literal encountered in types: {:?}",
Expand Down
Loading
Loading