Skip to content

Commit

Permalink
Update to DataFusion 27
Browse files Browse the repository at this point in the history
This brings object_store 0.6, which fixes delta-io#1462.

Tokio needs to be updated, because DataFusion is using
JoinSet::spawn_blocking now.
  • Loading branch information
jhoekx committed Jun 16, 2023
1 parent 0dda99b commit 7f14905
Show file tree
Hide file tree
Showing 10 changed files with 70 additions and 42 deletions.
2 changes: 1 addition & 1 deletion python/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ num_cpus = "1"
reqwest = { version = "*", features = ["native-tls-vendored"] }

[dependencies.pyo3]
version = "0.18"
version = "0.19"
features = ["extension-module", "abi3", "abi3-py37"]

[dependencies.deltalake]
Expand Down
34 changes: 16 additions & 18 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@ readme = "README.md"
edition = "2021"

[dependencies]
arrow = { version = "40", optional = true }
arrow-array = { version = "40", optional = true }
arrow-buffer = { version = "40", optional = true }
arrow-cast = { version = "40", optional = true }
arrow-ord = { version = "40", optional = true }
arrow-row = { version = "40", optional = true }
arrow-schema = { version = "40", optional = true }
arrow-select = { version = "40", optional = true }
arrow = { version = "41", optional = true }
arrow-array = { version = "41", optional = true }
arrow-buffer = { version = "41", optional = true }
arrow-cast = { version = "41", optional = true }
arrow-ord = { version = "41", optional = true }
arrow-row = { version = "41", optional = true }
arrow-schema = { version = "41", optional = true }
arrow-select = { version = "41", optional = true }
async-trait = "0.1"
bytes = "1"
chrono = { version = "0.4.22", default-features = false, features = ["clock"] }
Expand All @@ -38,10 +38,10 @@ libc = ">=0.2.90, <1"
num-bigint = "0.4"
num_cpus = "1"
num-traits = "0.2.15"
object_store = "0.5.6"
object_store = "0.6.1"
once_cell = "1.16.0"
parking_lot = "0.12"
parquet = { version = "40", features = [
parquet = { version = "41", features = [
"async",
"object_store",
], optional = true }
Expand Down Expand Up @@ -74,12 +74,12 @@ reqwest-retry = { version = "0.2.2", optional = true }

# Datafusion
dashmap = { version = "5", optional = true }
datafusion = { version = "26", optional = true }
datafusion-expr = { version = "26", optional = true }
datafusion-common = { version = "26", optional = true }
datafusion-proto = { version = "26", optional = true }
datafusion-sql = { version = "26", optional = true }
datafusion-physical-expr = { version = "26", optional = true }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "9b419b19a66bdd35e9e5c0bca259786f8f3c3965", optional = true }
datafusion-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev = "9b419b19a66bdd35e9e5c0bca259786f8f3c3965", optional = true }
datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git", rev = "9b419b19a66bdd35e9e5c0bca259786f8f3c3965", optional = true }
datafusion-proto = { git = "https://github.com/apache/arrow-datafusion.git", rev = "9b419b19a66bdd35e9e5c0bca259786f8f3c3965", optional = true }
datafusion-sql = { git = "https://github.com/apache/arrow-datafusion.git", rev = "9b419b19a66bdd35e9e5c0bca259786f8f3c3965", optional = true }
datafusion-physical-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev = "9b419b19a66bdd35e9e5c0bca259786f8f3c3965", optional = true }


sqlparser = { version = "0.34", optional = true }
Expand Down Expand Up @@ -135,7 +135,6 @@ s3-native-tls = [
"rusoto_dynamodb/native-tls",
"dynamodb_lock/native-tls",
"object_store/aws",
"object_store/aws_profile",
]
s3 = [
"rusoto_core/rustls",
Expand All @@ -144,7 +143,6 @@ s3 = [
"rusoto_dynamodb/rustls",
"dynamodb_lock/rustls",
"object_store/aws",
"object_store/aws_profile",
]
unity-experimental = ["reqwest", "reqwest-middleware", "reqwest-retry"]

Expand Down
2 changes: 2 additions & 0 deletions rust/src/action/checkpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ pub async fn cleanup_expired_logs_for(
location: Path::from(""),
last_modified: DateTime::<Utc>::MIN_UTC,
size: 0,
e_tag: None,
},
);
let file_needs_time_adjustment =
Expand Down Expand Up @@ -255,6 +256,7 @@ pub async fn cleanup_expired_logs_for(
location: current_file.1.location.clone(),
last_modified: last_file.1.last_modified.add(Duration::seconds(1)),
size: 0,
e_tag: current_file.1.e_tag,
},
);
maybe_delete_files.push(updated);
Expand Down
14 changes: 8 additions & 6 deletions rust/src/delta_datafusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,16 @@ use arrow_array::StringArray;
use arrow_schema::Field;
use async_trait::async_trait;
use chrono::{DateTime, NaiveDateTime, Utc};
use datafusion::datasource::datasource::TableProviderFactory;
use datafusion::datasource::file_format::{parquet::ParquetFormat, FileFormat};
use datafusion::datasource::physical_plan::FileScanConfig;
use datafusion::datasource::provider::TableProviderFactory;
use datafusion::datasource::{listing::PartitionedFile, MemTable, TableProvider, TableType};
use datafusion::execution::context::{SessionContext, SessionState, TaskContext};
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::execution::FunctionRegistry;
use datafusion::optimizer::utils::conjunction;
use datafusion::physical_expr::PhysicalSortExpr;
use datafusion::physical_optimizer::pruning::{PruningPredicate, PruningStatistics};
use datafusion::physical_plan::file_format::FileScanConfig;
use datafusion::physical_plan::filter::FilterExec;
use datafusion::physical_plan::limit::LocalLimitExec;
use datafusion::physical_plan::{
Expand Down Expand Up @@ -1377,7 +1377,6 @@ mod tests {
use arrow::array::StructArray;
use arrow::datatypes::{DataType, Field, Schema};
use chrono::{TimeZone, Utc};
use datafusion::from_slice::FromSlice;
use datafusion::physical_plan::empty::EmptyExec;
use datafusion_proto::physical_plan::AsExecutionPlan;
use datafusion_proto::protobuf;
Expand Down Expand Up @@ -1555,9 +1554,10 @@ mod tests {
let file = partitioned_file_from_action(&action, &schema);
let ref_file = PartitionedFile {
object_meta: object_store::ObjectMeta {
location: Path::from("year=2015/month=1/part-00000-4dcb50d3-d017-450c-9df7-a7257dbd3c5d-c000.snappy.parquet".to_string()),
location: Path::from("year=2015/month=1/part-00000-4dcb50d3-d017-450c-9df7-a7257dbd3c5d-c000.snappy.parquet".to_string()),
last_modified: Utc.timestamp_millis_opt(1660497727833).unwrap(),
size: 10644,
e_tag: None,
},
partition_values: [ScalarValue::Int64(Some(2015)), ScalarValue::Int64(Some(1))].to_vec(),
range: None,
Expand All @@ -1575,8 +1575,10 @@ mod tests {
let batch = RecordBatch::try_new(
Arc::clone(&schema),
vec![
Arc::new(arrow::array::StringArray::from_slice(["a", "b", "c", "d"])),
Arc::new(arrow::array::Int32Array::from_slice([1, 10, 10, 100])),
Arc::new(arrow::array::StringArray::from_iter_values([
"a", "b", "c", "d",
])),
Arc::new(arrow::array::Int32Array::from_iter_values([1, 10, 10, 100])),
],
)
.unwrap();
Expand Down
2 changes: 1 addition & 1 deletion rust/src/operations/transaction/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ use arrow::array::ArrayRef;
use arrow::datatypes::{
DataType, Field as ArrowField, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef,
};
use datafusion::datasource::physical_plan::wrap_partition_type_in_dict;
use datafusion::optimizer::utils::conjunction;
use datafusion::physical_optimizer::pruning::{PruningPredicate, PruningStatistics};
use datafusion::physical_plan::file_format::wrap_partition_type_in_dict;
use datafusion_common::config::ConfigOptions;
use datafusion_common::scalar::ScalarValue;
use datafusion_common::{Column, DFSchema, Result as DFResult, TableReference};
Expand Down
29 changes: 17 additions & 12 deletions rust/src/storage/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,10 +153,11 @@ fn try_configure_gcs(
storage_url: &Url,
options: &StorageOptions,
) -> DeltaResult<Arc<DynObjectStore>> {
let store = GoogleCloudStorageBuilder::from_env()
.with_url(storage_url.as_ref())
.try_with_options(&options.as_gcs_options())?
.build()?;
let mut builder = GoogleCloudStorageBuilder::from_env().with_url(storage_url.as_ref());
for (key, value) in options.as_gcs_options() {
builder = builder.with_config(key, value);
}
let store = builder.build()?;
url_prefix_handler(store, storage_url)
}

Expand All @@ -176,11 +177,13 @@ fn try_configure_azure(
storage_url: &Url,
options: &StorageOptions,
) -> DeltaResult<Arc<DynObjectStore>> {
let store = MicrosoftAzureBuilder::from_env()
let mut builder = MicrosoftAzureBuilder::from_env()
.with_url(storage_url.as_ref())
.try_with_options(&options.as_azure_options())?
.with_allow_http(options.allow_http())
.build()?;
.with_allow_http(options.allow_http());
for (key, value) in options.as_azure_options() {
builder = builder.with_config(key, value);
}
let store = builder.build()?;
url_prefix_handler(store, storage_url)
}

Expand All @@ -200,11 +203,13 @@ fn try_configure_s3(
storage_url: &Url,
options: &StorageOptions,
) -> DeltaResult<Arc<DynObjectStore>> {
let amazon_s3 = AmazonS3Builder::from_env()
let mut builder = AmazonS3Builder::from_env()
.with_url(storage_url.as_ref())
.try_with_options(&options.as_s3_options())?
.with_allow_http(options.allow_http())
.build()?;
.with_allow_http(options.allow_http());
for (key, value) in options.as_s3_options() {
builder = builder.with_config(key, value);
}
let amazon_s3 = builder.build()?;
let store =
S3StorageBackend::try_new(Arc::new(amazon_s3), S3StorageOptions::from_map(&options.0))?;
url_prefix_handler(store, storage_url)
Expand Down
12 changes: 10 additions & 2 deletions rust/src/storage/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
use bytes::Bytes;
use futures::stream::BoxStream;
use object_store::{
local::LocalFileSystem, path::Path as ObjectStorePath, Error as ObjectStoreError, GetResult,
ListResult, MultipartId, ObjectMeta as ObjStoreObjectMeta, ObjectStore,
local::LocalFileSystem, path::Path as ObjectStorePath, Error as ObjectStoreError, GetOptions,
GetResult, ListResult, MultipartId, ObjectMeta as ObjStoreObjectMeta, ObjectStore,
Result as ObjectStoreResult,
};
use std::ops::Range;
Expand Down Expand Up @@ -153,6 +153,14 @@ impl ObjectStore for FileStorageBackend {
self.inner.get(location).await
}

async fn get_opts(
&self,
location: &ObjectStorePath,
options: GetOptions,
) -> ObjectStoreResult<GetResult> {
self.inner.get_opts(location, options).await
}

async fn get_range(
&self,
location: &ObjectStorePath,
Expand Down
8 changes: 8 additions & 0 deletions rust/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::sync::Arc;
use bytes::Bytes;
use futures::{stream::BoxStream, StreamExt};
use lazy_static::lazy_static;
use object_store::GetOptions;
use serde::de::{Error, SeqAccess, Visitor};
use serde::ser::SerializeSeq;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
Expand Down Expand Up @@ -210,6 +211,13 @@ impl ObjectStore for DeltaObjectStore {
self.storage.get(location).await
}

/// Perform a get request with options
///
/// Note: options.range will be ignored if [`GetResult::File`]
async fn get_opts(&self, location: &Path, options: GetOptions) -> ObjectStoreResult<GetResult> {
self.storage.get_opts(location, options).await
}

/// Return the bytes that are stored at the specified location
/// in the given byte range
async fn get_range(&self, location: &Path, range: Range<usize>) -> ObjectStoreResult<Bytes> {
Expand Down
8 changes: 6 additions & 2 deletions rust/src/storage/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use dynamodb_lock::{DynamoError, LockClient, LockItem, DEFAULT_MAX_RETRY_ACQUIRE
use futures::stream::BoxStream;
use object_store::path::Path;
use object_store::{
DynObjectStore, Error as ObjectStoreError, GetResult, ListResult, MultipartId, ObjectMeta,
ObjectStore, Result as ObjectStoreResult,
DynObjectStore, Error as ObjectStoreError, GetOptions, GetResult, ListResult, MultipartId,
ObjectMeta, ObjectStore, Result as ObjectStoreResult,
};
use rusoto_core::{HttpClient, Region};
use rusoto_credential::AutoRefreshingProvider;
Expand Down Expand Up @@ -451,6 +451,10 @@ impl ObjectStore for S3StorageBackend {
self.inner.get(location).await
}

async fn get_opts(&self, location: &Path, options: GetOptions) -> ObjectStoreResult<GetResult> {
self.inner.get_opts(location, options).await
}

async fn get_range(&self, location: &Path, range: Range<usize>) -> ObjectStoreResult<Bytes> {
self.inner.get_range(location, range).await
}
Expand Down
1 change: 1 addition & 0 deletions rust/src/storage/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ impl TryFrom<&Add> for ObjectMeta {
location: Path::parse(value.path.as_str())?,
last_modified,
size: value.size as usize,
e_tag: None,
})
}
}
Expand Down

0 comments on commit 7f14905

Please sign in to comment.