Skip to content

Commit

Permalink
Upgrade to DataFusion 18.0.0-rc1 (#664)
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove authored Feb 11, 2023
1 parent e7f8774 commit a3ac857
Show file tree
Hide file tree
Showing 15 changed files with 59 additions and 58 deletions.
4 changes: 2 additions & 2 deletions ballista-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ ballista = { path = "../ballista/client", version = "0.10.0", features = [
"standalone",
] }
clap = { version = "3", features = ["derive", "cargo"] }
datafusion = "17.0.0"
datafusion-cli = "17.0.0"
datafusion = { version = "18.0.0", git = "https://github.com/apache/arrow-datafusion", rev = "18.0.0-rc1" }
datafusion-cli = { version = "18.0.0", git = "https://github.com/apache/arrow-datafusion", rev = "18.0.0-rc1" }
dirs = "4.0.0"
env_logger = "0.10"
mimalloc = { version = "0.1", default-features = false }
Expand Down
4 changes: 2 additions & 2 deletions ballista/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ rust-version = "1.63"
ballista-core = { path = "../core", version = "0.10.0" }
ballista-executor = { path = "../executor", version = "0.10.0", optional = true }
ballista-scheduler = { path = "../scheduler", version = "0.10.0", optional = true }
datafusion = "17.0.0"
datafusion-proto = "17.0.0"
datafusion = { version = "18.0.0", git = "https://github.com/apache/arrow-datafusion", rev = "18.0.0-rc1" }
datafusion-proto = { version = "18.0.0", git = "https://github.com/apache/arrow-datafusion", rev = "18.0.0-rc1" }
futures = "0.3"
log = "0.4"
parking_lot = "0.12"
Expand Down
9 changes: 7 additions & 2 deletions ballista/client/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use datafusion::arrow::datatypes::SchemaRef;
use log::info;
use parking_lot::Mutex;
use sqlparser::ast::Statement;
use std::borrow::Cow;
use std::collections::HashMap;
use std::sync::Arc;

Expand Down Expand Up @@ -355,10 +356,14 @@ impl BallistaContext {
let state = self.state.lock();
for (name, prov) in &state.tables {
// ctx is shared between queries, check table exists or not before register
let table_ref = TableReference::Bare { table: name };
let table_ref = TableReference::Bare {
table: Cow::Borrowed(name),
};
if !ctx.table_exist(table_ref)? {
ctx.register_table(
TableReference::Bare { table: name },
TableReference::Bare {
table: Cow::Borrowed(name),
},
Arc::clone(prov),
)?;
}
Expand Down
6 changes: 3 additions & 3 deletions ballista/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,13 @@ simd = ["datafusion/simd"]
[dependencies]
ahash = { version = "0.8", default-features = false }

arrow-flight = { version = "31.0.0", features = ["flight-sql-experimental"] }
arrow-flight = { version = "32.0.0", features = ["flight-sql-experimental"] }
async-trait = "0.1.41"
chrono = { version = "0.4", default-features = false }
clap = { version = "3", features = ["derive", "cargo"] }
datafusion = "17.0.0"
datafusion = { version = "18.0.0", git = "https://github.com/apache/arrow-datafusion", rev = "18.0.0-rc1" }
datafusion-objectstore-hdfs = { version = "0.1.1", default-features = false, optional = true }
datafusion-proto = "17.0.0"
datafusion-proto = { version = "18.0.0", git = "https://github.com/apache/arrow-datafusion", rev = "18.0.0-rc1" }
futures = "0.3"
hashbrown = "0.13"

Expand Down
8 changes: 5 additions & 3 deletions ballista/core/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,10 @@ use arrow_flight::{flight_service_client::FlightServiceClient, FlightData};
use datafusion::arrow::array::ArrayRef;
use datafusion::arrow::{
datatypes::{Schema, SchemaRef},
error::{ArrowError, Result as ArrowResult},
error::ArrowError,
record_batch::RecordBatch,
};
use datafusion::error::DataFusionError;

use crate::serde::protobuf;
use crate::utils::create_grpc_client_connection;
Expand Down Expand Up @@ -162,7 +163,7 @@ impl FlightDataStream {
}

impl Stream for FlightDataStream {
type Item = ArrowResult<RecordBatch>;
type Item = datafusion::error::Result<RecordBatch>;

fn poll_next(
mut self: std::pin::Pin<&mut Self>,
Expand All @@ -171,13 +172,14 @@ impl Stream for FlightDataStream {
self.stream.poll_next_unpin(cx).map(|x| match x {
Some(flight_data_chunk_result) => {
let converted_chunk = flight_data_chunk_result
.map_err(|e| ArrowError::from_external_error(Box::new(e)))
.map_err(|e| ArrowError::from_external_error(Box::new(e)).into())
.and_then(|flight_data_chunk| {
flight_data_to_arrow_batch(
&flight_data_chunk,
self.schema.clone(),
&self.dictionaries_by_id,
)
.map_err(DataFusionError::ArrowError)
});
Some(converted_chunk)
}
Expand Down
5 changes: 4 additions & 1 deletion ballista/core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,10 @@ impl From<parser::ParserError> for BallistaError {

impl From<DataFusionError> for BallistaError {
fn from(e: DataFusionError) -> Self {
BallistaError::DataFusionError(e)
match e {
DataFusionError::ArrowError(e) => Self::from(e),
_ => BallistaError::DataFusionError(e),
}
}
}

Expand Down
4 changes: 2 additions & 2 deletions ballista/core/src/execution_plans/distributed_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use crate::serde::protobuf::{
};
use crate::utils::create_grpc_client_connection;
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::arrow::error::{ArrowError, Result as ArrowResult};
use datafusion::arrow::error::ArrowError;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::error::{DataFusionError, Result};
use datafusion::execution::context::TaskContext;
Expand Down Expand Up @@ -227,7 +227,7 @@ async fn execute_query(
scheduler_url: String,
session_id: String,
query: ExecuteQueryParams,
) -> Result<impl Stream<Item = ArrowResult<RecordBatch>> + Send> {
) -> Result<impl Stream<Item = Result<RecordBatch>> + Send> {
info!("Connecting to Ballista scheduler at {}", scheduler_url);
// TODO reuse the scheduler to avoid connecting to the Ballista scheduler again and again
let connection = create_grpc_client_connection(scheduler_url)
Expand Down
6 changes: 3 additions & 3 deletions ballista/core/src/execution_plans/shuffle_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use crate::client::BallistaClient;
use crate::serde::scheduler::{PartitionLocation, PartitionStats};

use datafusion::arrow::datatypes::SchemaRef;
use datafusion::arrow::error::{ArrowError, Result as ArrowResult};
use datafusion::arrow::error::ArrowError;
use datafusion::arrow::ipc::reader::FileReader;
use datafusion::arrow::record_batch::RecordBatch;

Expand Down Expand Up @@ -208,14 +208,14 @@ impl LocalShuffleStream {
}

impl Stream for LocalShuffleStream {
type Item = ArrowResult<RecordBatch>;
type Item = Result<RecordBatch>;

fn poll_next(
mut self: Pin<&mut Self>,
_: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
if let Some(batch) = self.reader.next() {
return Poll::Ready(Some(batch));
return Poll::Ready(Some(batch.map_err(|e| e.into())));
}
Poll::Ready(None)
}
Expand Down
8 changes: 4 additions & 4 deletions ballista/executor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,15 @@ default = ["mimalloc"]

[dependencies]
anyhow = "1"
arrow = { version = "31.0.0" }
arrow-flight = { version = "31.0.0" }
arrow = { version = "32.0.0" }
arrow-flight = { version = "32.0.0" }
async-trait = "0.1.41"
ballista-core = { path = "../core", version = "0.10.0" }
chrono = { version = "0.4", default-features = false }
configure_me = "0.4.0"
dashmap = "5.4.0"
datafusion = "17.0.0"
datafusion-proto = "17.0.0"
datafusion = { version = "18.0.0", git = "https://github.com/apache/arrow-datafusion", rev = "18.0.0-rc1" }
datafusion-proto = { version = "18.0.0", git = "https://github.com/apache/arrow-datafusion", rev = "18.0.0-rc1" }
futures = "0.3"
hyper = "0.14.4"
log = "0.4"
Expand Down
6 changes: 2 additions & 4 deletions ballista/executor/src/collect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@ use std::sync::Arc;
use std::task::{Context, Poll};
use std::{any::Any, pin::Pin};

use datafusion::arrow::{
datatypes::SchemaRef, error::Result as ArrowResult, record_batch::RecordBatch,
};
use datafusion::arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
use datafusion::error::DataFusionError;
use datafusion::execution::context::TaskContext;
use datafusion::physical_plan::expressions::PhysicalSortExpr;
Expand Down Expand Up @@ -118,7 +116,7 @@ struct MergedRecordBatchStream {
}

impl Stream for MergedRecordBatchStream {
type Item = ArrowResult<RecordBatch>;
type Item = Result<RecordBatch>;

fn poll_next(
mut self: Pin<&mut Self>,
Expand Down
4 changes: 2 additions & 2 deletions ballista/executor/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,13 +182,13 @@ mod test {
use crate::executor::Executor;
use crate::metrics::LoggingMetricsCollector;
use arrow::datatypes::{Schema, SchemaRef};
use arrow::error::ArrowError;
use arrow::record_batch::RecordBatch;
use ballista_core::execution_plans::ShuffleWriterExec;
use ballista_core::serde::protobuf::ExecutorRegistration;
use datafusion::execution::context::TaskContext;

use ballista_core::serde::scheduler::PartitionId;
use datafusion::error::DataFusionError;
use datafusion::physical_expr::PhysicalSortExpr;
use datafusion::physical_plan::{
ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream,
Expand All @@ -213,7 +213,7 @@ mod test {
}

impl Stream for NeverendingRecordBatchStream {
type Item = Result<RecordBatch, ArrowError>;
type Item = Result<RecordBatch, DataFusionError>;

fn poll_next(
self: Pin<&mut Self>,
Expand Down
6 changes: 3 additions & 3 deletions ballista/scheduler/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,16 @@ sled = ["sled_package", "tokio-stream"]

[dependencies]
anyhow = "1"
arrow-flight = { version = "31.0.0", features = ["flight-sql-experimental"] }
arrow-flight = { version = "32.0.0", features = ["flight-sql-experimental"] }
async-recursion = "1.0.0"
async-trait = "0.1.41"
ballista-core = { path = "../core", version = "0.10.0" }
base64 = { version = "0.13", default-features = false }
clap = { version = "3", features = ["derive", "cargo"] }
configure_me = "0.4.0"
dashmap = "5.4.0"
datafusion = "17.0.0"
datafusion-proto = "17.0.0"
datafusion = { version = "18.0.0", git = "https://github.com/apache/arrow-datafusion", rev = "18.0.0-rc1" }
datafusion-proto = { version = "18.0.0", git = "https://github.com/apache/arrow-datafusion", rev = "18.0.0-rc1" }
etcd-client = { version = "0.10", optional = true }
flatbuffers = { version = "22.9.29" }
futures = "0.3"
Expand Down
41 changes: 17 additions & 24 deletions ballista/scheduler/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,17 +326,15 @@ mod test {
use datafusion::physical_plan::aggregates::{AggregateExec, AggregateMode};
use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
use datafusion::physical_plan::joins::HashJoinExec;
use datafusion::physical_plan::projection::ProjectionExec;
use datafusion::physical_plan::sorts::sort::SortExec;
use datafusion::physical_plan::{
coalesce_partitions::CoalescePartitionsExec, projection::ProjectionExec,
};
use datafusion::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use datafusion::physical_plan::{displayable, ExecutionPlan};
use datafusion::prelude::SessionContext;
use std::ops::Deref;

use datafusion_proto::physical_plan::AsExecutionPlan;
use datafusion_proto::protobuf::LogicalPlanNode;
use datafusion_proto::protobuf::PhysicalPlanNode;
use std::ops::Deref;
use std::sync::Arc;
use uuid::Uuid;

Expand Down Expand Up @@ -375,19 +373,19 @@ mod test {
/* Expected result:
ShuffleWriterExec: Some(Hash([Column { name: "l_returnflag", index: 0 }], 2))
AggregateExec: mode=Partial, gby=[l_returnflag@1 as l_returnflag], aggr=[SUM(l_extendedprice Multiply Int64(1))]
CsvExec: source=Path(testdata/lineitem: [testdata/lineitem/partition0.tbl,testdata/lineitem/partition1.tbl]), has_header=false
AggregateExec: mode=Partial, gby=[l_returnflag@1 as l_returnflag], aggr=[SUM(lineitem.l_extendedprice * Int64(1))]
CsvExec: files={2 groups: [[ballista/scheduler/testdata/lineitem/partition1.tbl], [ballista/scheduler/testdata/lineitem/partition0.tbl]]}, has_header=false, limit=None, projection=[l_extendedprice, l_returnflag]
ShuffleWriterExec: None
ProjectionExec: expr=[l_returnflag@0 as l_returnflag, SUM(lineitem.l_extendedprice Multiply Int64(1))@1 as sum_disc_price]
AggregateExec: mode=FinalPartitioned, gby=[l_returnflag@0 as l_returnflag], aggr=[SUM(l_extendedprice Multiply Int64(1))]
CoalesceBatchesExec: target_batch_size=4096
UnresolvedShuffleExec
SortExec: [l_returnflag@0 ASC NULLS LAST]
ProjectionExec: expr=[l_returnflag@0 as l_returnflag, SUM(lineitem.l_extendedprice * Int64(1))@1 as sum_disc_price]
AggregateExec: mode=FinalPartitioned, gby=[l_returnflag@0 as l_returnflag], aggr=[SUM(lineitem.l_extendedprice * Int64(1))]
CoalesceBatchesExec: target_batch_size=8192
UnresolvedShuffleExec
ShuffleWriterExec: None
SortExec: [l_returnflag@0 ASC]
CoalescePartitionsExec
UnresolvedShuffleExec
SortPreservingMergeExec: [l_returnflag@0 ASC NULLS LAST]
UnresolvedShuffleExec
*/

assert_eq!(3, stages.len());
Expand All @@ -399,7 +397,9 @@ mod test {

// verify stage 1
let stage1 = stages[1].children()[0].clone();
let projection = downcast_exec!(stage1, ProjectionExec);
let sort = downcast_exec!(stage1, SortExec);
let projection = sort.children()[0].clone();
let projection = downcast_exec!(projection, ProjectionExec);
let final_hash = projection.children()[0].clone();
let final_hash = downcast_exec!(final_hash, AggregateExec);
assert!(*final_hash.mode() == AggregateMode::FinalPartitioned);
Expand All @@ -414,15 +414,8 @@ mod test {

// verify stage 2
let stage2 = stages[2].children()[0].clone();
let sort = downcast_exec!(stage2, SortExec);
let coalesce_partitions = sort.children()[0].clone();
let coalesce_partitions =
downcast_exec!(coalesce_partitions, CoalescePartitionsExec);
assert_eq!(
coalesce_partitions.output_partitioning().partition_count(),
1
);
let unresolved_shuffle = coalesce_partitions.children()[0].clone();
let merge = downcast_exec!(stage2, SortPreservingMergeExec);
let unresolved_shuffle = merge.children()[0].clone();
let unresolved_shuffle =
downcast_exec!(unresolved_shuffle, UnresolvedShuffleExec);
assert_eq!(unresolved_shuffle.stage_id, 2);
Expand Down
4 changes: 2 additions & 2 deletions benchmarks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ snmalloc = ["snmalloc-rs"]

[dependencies]
ballista = { path = "../ballista/client", version = "0.10.0" }
datafusion = "17.0.0"
datafusion-proto = "17.0.0"
datafusion = { version = "18.0.0", git = "https://github.com/apache/arrow-datafusion", rev = "18.0.0-rc1" }
datafusion-proto = { version = "18.0.0", git = "https://github.com/apache/arrow-datafusion", rev = "18.0.0-rc1" }
env_logger = "0.10"
futures = "0.3"
mimalloc = { version = "0.1", optional = true, default-features = false }
Expand Down
2 changes: 1 addition & 1 deletion examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ required-features = ["ballista/standalone"]

[dependencies]
ballista = { path = "../ballista/client", version = "0.10.0" }
datafusion = "17.0.0"
datafusion = { version = "18.0.0", git = "https://github.com/apache/arrow-datafusion", rev = "18.0.0-rc1" }
futures = "0.3"
num_cpus = "1.13.0"
prost = "0.11"
Expand Down

0 comments on commit a3ac857

Please sign in to comment.