Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update ObjectStore 0.7.0 and Arrow 46.0.0 #7282

Merged
merged 14 commits into from
Aug 25, 2023
Merged
12 changes: 6 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@ rust-version = "1.70"
version = "30.0.0"

[workspace.dependencies]
arrow = { version = "45.0.0", features = ["prettyprint", "dyn_cmp_dict"] }
arrow-array = { version = "45.0.0", default-features = false, features = ["chrono-tz"] }
arrow-buffer = { version = "45.0.0", default-features = false }
arrow-flight = { version = "45.0.0", features = ["flight-sql-experimental"] }
arrow-schema = { version = "45.0.0", default-features = false }
parquet = { version = "45.0.0", features = ["arrow", "async", "object_store"] }
arrow = { version = "46.0.0", features = ["prettyprint", "dyn_cmp_dict"] }
arrow-array = { version = "46.0.0", default-features = false, features = ["chrono-tz"] }
arrow-buffer = { version = "46.0.0", default-features = false }
arrow-flight = { version = "46.0.0", features = ["flight-sql-experimental"] }
arrow-schema = { version = "46.0.0", default-features = false }
parquet = { version = "46.0.0", features = ["arrow", "async", "object_store"] }
sqlparser = { version = "0.37.0", features = ["visitor"] }

[profile.release]
Expand Down
65 changes: 33 additions & 32 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions datafusion-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ rust-version = "1.70"
readme = "README.md"

[dependencies]
arrow = "45.0.0"
arrow = "46.0.0"
async-trait = "0.1.41"
aws-config = "0.55"
aws-credential-types = "0.55"
Expand All @@ -38,7 +38,7 @@ datafusion = { path = "../datafusion/core", version = "30.0.0" }
dirs = "4.0.0"
env_logger = "0.9"
mimalloc = { version = "0.1", default-features = false }
object_store = { version = "0.6.1", features = ["aws", "gcp"] }
object_store = { version = "0.7.0", features = ["aws", "gcp"] }
parking_lot = { version = "0.12" }
rustyline = "11.0"
tokio = { version = "1.24", features = ["macros", "rt", "rt-multi-thread", "sync", "parking_lot"] }
Expand Down
2 changes: 1 addition & 1 deletion datafusion-examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ futures = "0.3"
log = "0.4"
mimalloc = { version = "0.1", default-features = false }
num_cpus = "1.13.0"
object_store = { version = "0.6.1", features = ["aws"] }
object_store = { version = "0.7.0", features = ["aws"] }
prost = { version = "0.11", default-features = false }
prost-derive = { version = "0.11", default-features = false }
serde = { version = "1.0.136", features = ["derive"] }
Expand Down
2 changes: 1 addition & 1 deletion datafusion/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ chrono = { version = "0.4", default-features = false }
flate2 = { version = "1.0.24", optional = true }
futures = "0.3"
num_cpus = "1.13.0"
object_store = { version = "0.6.1", default-features = false, optional = true }
object_store = { version = "0.7.0", default-features = false, optional = true }
parquet = { workspace = true, optional = true }
pyo3 = { version = "0.19.0", optional = true }
sqlparser = { workspace = true }
Expand Down
21 changes: 9 additions & 12 deletions datafusion/common/src/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1193,19 +1193,15 @@ impl ScalarValue {
/// NB: operating on `ScalarValue` directly is not efficient, performance sensitive code
/// should operate on Arrays directly, using vectorized array kernels
pub fn add<T: Borrow<ScalarValue>>(&self, other: T) -> Result<ScalarValue> {
let s = self.to_array_of_size(1);
let o = other.borrow().to_array_of_size(1);
let r = add_wrapping(&Scalar::new(s.as_ref()), &Scalar::new(o.as_ref()))?;
let r = add_wrapping(&self.to_scalar(), &other.borrow().to_scalar())?;
Self::try_from_array(r.as_ref(), 0)
}
/// Checked addition of `ScalarValue`
///
/// NB: operating on `ScalarValue` directly is not efficient, performance sensitive code
/// should operate on Arrays directly, using vectorized array kernels
pub fn add_checked<T: Borrow<ScalarValue>>(&self, other: T) -> Result<ScalarValue> {
let s = self.to_array_of_size(1);
let o = other.borrow().to_array_of_size(1);
let r = add(&Scalar::new(s.as_ref()), &Scalar::new(o.as_ref()))?;
let r = add(&self.to_scalar(), &other.borrow().to_scalar())?;
Self::try_from_array(r.as_ref(), 0)
}

Expand All @@ -1214,9 +1210,7 @@ impl ScalarValue {
/// NB: operating on `ScalarValue` directly is not efficient, performance sensitive code
/// should operate on Arrays directly, using vectorized array kernels
pub fn sub<T: Borrow<ScalarValue>>(&self, other: T) -> Result<ScalarValue> {
let s = self.to_array_of_size(1);
let o = other.borrow().to_array_of_size(1);
let r = sub_wrapping(&Scalar::new(s.as_ref()), &Scalar::new(o.as_ref()))?;
let r = sub_wrapping(&self.to_scalar(), &other.borrow().to_scalar())?;
Self::try_from_array(r.as_ref(), 0)
}

Expand All @@ -1225,9 +1219,7 @@ impl ScalarValue {
/// NB: operating on `ScalarValue` directly is not efficient, performance sensitive code
/// should operate on Arrays directly, using vectorized array kernels
pub fn sub_checked<T: Borrow<ScalarValue>>(&self, other: T) -> Result<ScalarValue> {
let s = self.to_array_of_size(1);
let o = other.borrow().to_array_of_size(1);
let r = sub(&Scalar::new(s.as_ref()), &Scalar::new(o.as_ref()))?;
let r = sub(&self.to_scalar(), &other.borrow().to_scalar())?;
Self::try_from_array(r.as_ref(), 0)
}

Expand Down Expand Up @@ -1329,6 +1321,11 @@ impl ScalarValue {
self.to_array_of_size(1)
}

/// Converts a scalar into an arrow [`Scalar`]
pub fn to_scalar(&self) -> Scalar<ArrayRef> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Scalar::new(self.to_array_of_size(1))
}

/// Converts an iterator of references [`ScalarValue`] into an [`ArrayRef`]
/// corresponding to those values. For example,
///
Expand Down
7 changes: 4 additions & 3 deletions datafusion/common/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
use crate::{DataFusionError, Result, ScalarValue};
use arrow::array::{ArrayRef, PrimitiveArray};
use arrow::compute;
use arrow::compute::{lexicographical_partition_ranges, SortColumn, SortOptions};
use arrow::compute::{partition, SortColumn, SortOptions};
use arrow::datatypes::{SchemaRef, UInt32Type};
use arrow::record_batch::RecordBatch;
use sqlparser::ast::Ident;
Expand Down Expand Up @@ -220,7 +220,7 @@ where
/// Given a list of 0 or more already sorted columns, finds the
/// partition ranges that would partition equally across columns.
///
/// See [`lexicographical_partition_ranges`] for more details.
/// See [`partition`] for more details.
pub fn evaluate_partition_ranges(
num_rows: usize,
partition_columns: &[SortColumn],
Expand All @@ -231,7 +231,8 @@ pub fn evaluate_partition_ranges(
end: num_rows,
}]
} else {
lexicographical_partition_ranges(partition_columns)?.collect()
let cols: Vec<_> = partition_columns.iter().map(|x| x.values.clone()).collect();
partition(&cols)?.ranges()
})
}

Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ lazy_static = { version = "^1.4.0" }
log = "^0.4"
num-traits = { version = "0.2", optional = true }
num_cpus = "1.13.0"
object_store = "0.6.1"
object_store = "0.7.0"
parking_lot = "0.12"
parquet = { workspace = true }
percent-encoding = "2.2.0"
Expand Down
11 changes: 7 additions & 4 deletions datafusion/core/src/datasource/file_format/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use arrow_schema::{Schema, SchemaRef};
use async_trait::async_trait;
use datafusion_common::Statistics;
use datafusion_physical_expr::PhysicalExpr;
use object_store::{GetResult, ObjectMeta, ObjectStore};
use object_store::{GetResultPayload, ObjectMeta, ObjectStore};
use std::any::Any;
use std::io::{Read, Seek};
use std::sync::Arc;
Expand All @@ -52,9 +52,12 @@ impl FileFormat for ArrowFormat {
) -> Result<SchemaRef> {
let mut schemas = vec![];
for object in objects {
let schema = match store.get(&object.location).await? {
GetResult::File(mut file, _) => read_arrow_schema_from_reader(&mut file)?,
r @ GetResult::Stream(_) => {
let r = store.as_ref().get(&object.location).await?;
let schema = match r.payload {
GetResultPayload::File(mut file, _) => {
read_arrow_schema_from_reader(&mut file)?
}
GetResultPayload::Stream(_) => {
// TODO: Fetching entire file to get schema is potentially wasteful
let data = r.bytes().await?;
let mut cursor = std::io::Cursor::new(&data);
Expand Down
11 changes: 7 additions & 4 deletions datafusion/core/src/datasource/file_format/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use arrow::datatypes::Schema;
use arrow::{self, datatypes::SchemaRef};
use async_trait::async_trait;
use datafusion_physical_expr::PhysicalExpr;
use object_store::{GetResult, ObjectMeta, ObjectStore};
use object_store::{GetResultPayload, ObjectMeta, ObjectStore};

use super::FileFormat;
use crate::datasource::avro_to_arrow::read_avro_schema_from_reader;
Expand Down Expand Up @@ -52,9 +52,12 @@ impl FileFormat for AvroFormat {
) -> Result<SchemaRef> {
let mut schemas = vec![];
for object in objects {
let schema = match store.get(&object.location).await? {
GetResult::File(mut file, _) => read_avro_schema_from_reader(&mut file)?,
r @ GetResult::Stream(_) => {
let r = store.as_ref().get(&object.location).await?;
let schema = match r.payload {
GetResultPayload::File(mut file, _) => {
read_avro_schema_from_reader(&mut file)?
}
GetResultPayload::Stream(_) => {
// TODO: Fetching entire file to get schema is potentially wasteful
let data = r.bytes().await?;
read_avro_schema_from_reader(&mut data.as_ref())?
Expand Down
Loading