Skip to content

Commit

Permalink
Use arrow2 0.9 and pull master from Feb 2 2022
Browse files Browse the repository at this point in the history
  • Loading branch information
Igosuki committed Feb 2, 2022
2 parents b95044e + e4a056f commit 469731b
Show file tree
Hide file tree
Showing 103 changed files with 5,006 additions and 2,165 deletions.
5 changes: 2 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,5 @@ lto = true
codegen-units = 1

[patch.crates-io]
arrow2 = { git = "https://github.com/jorgecarleitao/arrow2.git", branch = "main" }
#arrow2 = { git = "https://github.com/blaze-init/arrow2.git", branch = "shuffle_ipc" }
#parquet2 = { git = "https://github.com/blaze-init/parquet2.git", branch = "meta_new" }
#arrow2 = { git = "https://github.com/jorgecarleitao/arrow2.git", branch = "main" }
#parquet2 = { git = "https://github.com/jorgecarleitao/parquet2.git", branch = "main" }
2 changes: 1 addition & 1 deletion ballista-examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,6 @@ datafusion = { path = "../datafusion" }
ballista = { path = "../ballista/rust/client", version = "0.6.0"}
prost = "0.9"
tonic = "0.6"
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync"] }
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync", "parking_lot"] }
futures = "0.3"
num_cpus = "1.13.0"
1 change: 1 addition & 0 deletions ballista/rust/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ log = "0.4"
tokio = "1.0"
tempfile = "3"
sqlparser = "0.13"
parking_lot = "0.11"

datafusion = { path = "../../../datafusion", version = "6.0.0" }

Expand Down
19 changes: 10 additions & 9 deletions ballista/rust/client/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@

//! Distributed execution context.

use parking_lot::Mutex;
use sqlparser::ast::Statement;
use std::collections::HashMap;
use std::fs;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use std::sync::Arc;

use ballista_core::config::BallistaConfig;
use ballista_core::utils::create_df_ctx_with_ballista_query_planner;
Expand Down Expand Up @@ -142,7 +143,7 @@ impl BallistaContext {

// use local DataFusion context for now but later this might call the scheduler
let mut ctx = {
let guard = self.state.lock().unwrap();
let guard = self.state.lock();
create_df_ctx_with_ballista_query_planner(
&guard.scheduler_host,
guard.scheduler_port,
Expand All @@ -162,7 +163,7 @@ impl BallistaContext {

// use local DataFusion context for now but later this might call the scheduler
let mut ctx = {
let guard = self.state.lock().unwrap();
let guard = self.state.lock();
create_df_ctx_with_ballista_query_planner(
&guard.scheduler_host,
guard.scheduler_port,
Expand All @@ -186,7 +187,7 @@ impl BallistaContext {

// use local DataFusion context for now but later this might call the scheduler
let mut ctx = {
let guard = self.state.lock().unwrap();
let guard = self.state.lock();
create_df_ctx_with_ballista_query_planner(
&guard.scheduler_host,
guard.scheduler_port,
Expand All @@ -203,7 +204,7 @@ impl BallistaContext {
name: &str,
table: Arc<dyn TableProvider>,
) -> Result<()> {
let mut state = self.state.lock().unwrap();
let mut state = self.state.lock();
state.tables.insert(name.to_owned(), table);
Ok(())
}
Expand Down Expand Up @@ -280,7 +281,7 @@ impl BallistaContext {
/// might require the schema to be inferred.
pub async fn sql(&self, sql: &str) -> Result<Arc<dyn DataFrame>> {
let mut ctx = {
let state = self.state.lock().unwrap();
let state = self.state.lock();
create_df_ctx_with_ballista_query_planner(
&state.scheduler_host,
state.scheduler_port,
Expand All @@ -291,7 +292,7 @@ impl BallistaContext {
let is_show = self.is_show_statement(sql).await?;
// the show tables、 show columns sql can not run at scheduler because the tables is store at client
if is_show {
let state = self.state.lock().unwrap();
let state = self.state.lock();
ctx = ExecutionContext::with_config(
ExecutionConfig::new().with_information_schema(
state.config.default_with_information_schema(),
Expand All @@ -301,7 +302,7 @@ impl BallistaContext {

// register tables with DataFusion context
{
let state = self.state.lock().unwrap();
let state = self.state.lock();
for (name, prov) in &state.tables {
ctx.register_table(
TableReference::Bare { table: name },
Expand Down Expand Up @@ -483,7 +484,7 @@ mod tests {
.unwrap();

{
let mut guard = context.state.lock().unwrap();
let mut guard = context.state.lock();
let csv_table = guard.tables.get("single_nan");

if let Some(table_provide) = csv_table {
Expand Down
2 changes: 2 additions & 0 deletions ballista/rust/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ arrow = { package = "arrow2", version="0.9", features = ["io_ipc", "io_flight"]

datafusion = { path = "../../../datafusion", version = "6.0.0" }

parking_lot = "0.11"

[dev-dependencies]
tempfile = "3"

Expand Down
3 changes: 2 additions & 1 deletion ballista/rust/core/proto/ballista.proto
Original file line number Diff line number Diff line change
Expand Up @@ -176,11 +176,12 @@ enum AggregateFunction {
STDDEV=11;
STDDEV_POP=12;
CORRELATION=13;
APPROX_PERCENTILE_CONT = 14;
}

message AggregateExprNode {
AggregateFunction aggr_function = 1;
LogicalExprNode expr = 2;
repeated LogicalExprNode expr = 2;
}

enum BuiltInWindowFunction {
Expand Down
5 changes: 3 additions & 2 deletions ballista/rust/core/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@

use arrow::io::flight::deserialize_schemas;
use arrow::io::ipc::IpcSchema;
use std::sync::{Arc, Mutex};
use parking_lot::Mutex;
use std::sync::Arc;
use std::{collections::HashMap, pin::Pin};
use std::{
convert::{TryFrom, TryInto},
Expand Down Expand Up @@ -164,7 +165,7 @@ impl Stream for FlightDataStream {
self: std::pin::Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
let mut stream = self.stream.lock().expect("mutex is bad");
let mut stream = self.stream.lock();
stream.poll_next_unpin(cx).map(|x| match x {
Some(flight_data_chunk_result) => {
let converted_chunk = flight_data_chunk_result
Expand Down
3 changes: 2 additions & 1 deletion ballista/rust/core/src/execution_plans/shuffle_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@
//! partition is re-partitioned and streamed to disk in Arrow IPC format. Future stages of the query
//! will use the ShuffleReaderExec to read these results.

use parking_lot::Mutex;
use std::fs::File;
use std::iter::{FromIterator, Iterator};
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use std::sync::Arc;
use std::time::Instant;
use std::{any::Any, pin::Pin};

Expand Down
6 changes: 5 additions & 1 deletion ballista/rust/core/src/serde/logical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1066,7 +1066,11 @@ impl TryInto<Expr> for &protobuf::LogicalExprNode {

Ok(Expr::AggregateFunction {
fun,
args: vec![parse_required_expr(&expr.expr)?],
args: expr
.expr
.iter()
.map(|e| e.try_into())
.collect::<Result<Vec<_>, _>>()?,
distinct: false, //TODO
})
}
Expand Down
21 changes: 16 additions & 5 deletions ballista/rust/core/src/serde/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,15 @@ mod roundtrip_tests {
use crate::error::BallistaError;
use arrow::datatypes::IntegerType;
use core::panic;
use datafusion::arrow::datatypes::UnionMode;
use datafusion::field_util::SchemaExt;
use datafusion::logical_plan::Repartition;
use datafusion::{
arrow::datatypes::{DataType, Field, IntervalUnit, Schema, TimeUnit},
arrow::datatypes::{DataType, Field, IntervalUnit, Schema, TimeUnit, UnionMode},
datasource::object_store::local::LocalFileSystem,
logical_plan::{
col, CreateExternalTable, Expr, LogicalPlan, LogicalPlanBuilder,
Partitioning, ToDFSchema,
Partitioning, Repartition, ToDFSchema,
},
physical_plan::functions::BuiltinScalarFunction::Sqrt,
physical_plan::{aggregates, functions::BuiltinScalarFunction::Sqrt},
prelude::*,
scalar::ScalarValue,
sql::parser::FileType,
Expand Down Expand Up @@ -1009,4 +1007,17 @@ mod roundtrip_tests {

Ok(())
}

#[test]
fn roundtrip_approx_percentile_cont() -> Result<()> {
let test_expr = Expr::AggregateFunction {
fun: aggregates::AggregateFunction::ApproxPercentileCont,
args: vec![col("bananas"), lit(0.42)],
distinct: false,
};

roundtrip_test!(test_expr, protobuf::LogicalExprNode, Expr);

Ok(())
}
}
14 changes: 10 additions & 4 deletions ballista/rust/core/src/serde/logical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1130,6 +1130,9 @@ impl TryInto<protobuf::LogicalExprNode> for &Expr {
AggregateFunction::ApproxDistinct => {
protobuf::AggregateFunction::ApproxDistinct
}
AggregateFunction::ApproxPercentileCont => {
protobuf::AggregateFunction::ApproxPercentileCont
}
AggregateFunction::ArrayAgg => protobuf::AggregateFunction::ArrayAgg,
AggregateFunction::Min => protobuf::AggregateFunction::Min,
AggregateFunction::Max => protobuf::AggregateFunction::Max,
Expand All @@ -1155,11 +1158,13 @@ impl TryInto<protobuf::LogicalExprNode> for &Expr {
}
};

let arg = &args[0];
let aggregate_expr = Box::new(protobuf::AggregateExprNode {
let aggregate_expr = protobuf::AggregateExprNode {
aggr_function: aggr_function.into(),
expr: Some(Box::new(arg.try_into()?)),
});
expr: args
.iter()
.map(|v| v.try_into())
.collect::<Result<Vec<_>, _>>()?,
};
Ok(protobuf::LogicalExprNode {
expr_type: Some(ExprType::AggregateExpr(aggregate_expr)),
})
Expand Down Expand Up @@ -1390,6 +1395,7 @@ impl From<&AggregateFunction> for protobuf::AggregateFunction {
AggregateFunction::Stddev => Self::Stddev,
AggregateFunction::StddevPop => Self::StddevPop,
AggregateFunction::Correlation => Self::Correlation,
AggregateFunction::ApproxPercentileCont => Self::ApproxPercentileCont,
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions ballista/rust/core/src/serde/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,9 @@ impl From<protobuf::AggregateFunction> for AggregateFunction {
protobuf::AggregateFunction::Stddev => AggregateFunction::Stddev,
protobuf::AggregateFunction::StddevPop => AggregateFunction::StddevPop,
protobuf::AggregateFunction::Correlation => AggregateFunction::Correlation,
protobuf::AggregateFunction::ApproxPercentileCont => {
AggregateFunction::ApproxPercentileCont
}
}
}
}
Expand Down
3 changes: 1 addition & 2 deletions ballista/rust/core/src/serde/physical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -626,7 +626,6 @@ impl TryFrom<&protobuf::PhysicalExprNode> for Arc<dyn PhysicalExpr> {
let ctx_state = ExecutionContextState {
catalog_list,
scalar_functions: Default::default(),
var_provider: Default::default(),
aggregate_functions: Default::default(),
config: ExecutionConfig::new(),
execution_props: ExecutionProps::new(),
Expand All @@ -636,7 +635,7 @@ impl TryFrom<&protobuf::PhysicalExprNode> for Arc<dyn PhysicalExpr> {

let fun_expr = functions::create_physical_fun(
&(&scalar_function).into(),
&ctx_state,
&ctx_state.execution_props,
)?;

Arc::new(ScalarFunctionExpr::new(
Expand Down
3 changes: 2 additions & 1 deletion ballista/rust/executor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,12 @@ futures = "0.3"
log = "0.4"
snmalloc-rs = {version = "0.2", features= ["cache-friendly"], optional = true}
tempfile = "3"
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread"] }
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "parking_lot"] }
tokio-stream = { version = "0.1", features = ["net"] }
tonic = "0.6"
uuid = { version = "0.8", features = ["v4"] }
hyper = "0.14.4"
parking_lot = "0.11"

[dev-dependencies]

Expand Down
1 change: 1 addition & 0 deletions ballista/rust/scheduler/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ tokio-stream = { version = "0.1", features = ["net"], optional = true }
tonic = "0.6"
tower = { version = "0.4" }
warp = "0.3"
parking_lot = "0.11"

[dev-dependencies]
ballista-core = { path = "../core", version = "0.6.0" }
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ arrow = { package = "arrow2", version="0.9", features = ["io_csv", "io_json", "i
datafusion = { path = "../datafusion" }
ballista = { path = "../ballista/rust/client" }
structopt = { version = "0.3", default-features = false }
tokio = { version = "^1.0", features = ["macros", "rt", "rt-multi-thread"] }
tokio = { version = "^1.0", features = ["macros", "rt", "rt-multi-thread", "parking_lot"] }
futures = "0.3"
env_logger = "0.9"
mimalloc = { version = "0.1", optional = true, default-features = false }
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/src/bin/nyctaxi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ async fn datafusion_sql_benchmarks(
}

async fn execute_sql(ctx: &mut ExecutionContext, sql: &str, debug: bool) -> Result<()> {
let runtime = ctx.state.lock().unwrap().runtime_env.clone();
let runtime = ctx.state.lock().runtime_env.clone();
let plan = ctx.create_logical_plan(sql)?;
let plan = ctx.optimize(&plan)?;
if debug {
Expand Down
10 changes: 6 additions & 4 deletions benchmarks/src/bin/tpch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ use arrow::io::parquet::write::{Compression, Version, WriteOptions};
use ballista::prelude::{
BallistaConfig, BallistaContext, BALLISTA_DEFAULT_SHUFFLE_PARTITIONS,
};
use datafusion::datasource::file_format::csv::DEFAULT_CSV_EXTENSION;
use datafusion::datasource::file_format::parquet::DEFAULT_PARQUET_EXTENSION;
use datafusion::field_util::SchemaExt;
use structopt::StructOpt;

Expand Down Expand Up @@ -264,7 +266,7 @@ async fn benchmark_datafusion(opt: DataFusionBenchmarkOpt) -> Result<Vec<RecordB
.with_target_partitions(opt.partitions)
.with_batch_size(opt.batch_size);
let mut ctx = ExecutionContext::with_config(config);
let runtime = ctx.state.lock().unwrap().runtime_env.clone();
let runtime = ctx.state.lock().runtime_env.clone();

// register tables
for table in TABLES {
Expand Down Expand Up @@ -546,7 +548,7 @@ async fn execute_query(
displayable(physical_plan.as_ref()).indent()
);
}
let runtime = ctx.state.lock().unwrap().runtime_env.clone();
let runtime = ctx.state.lock().runtime_env.clone();
let result = collect(physical_plan.clone(), runtime).await?;
if debug {
println!(
Expand Down Expand Up @@ -661,13 +663,13 @@ fn get_table(
.with_delimiter(b',')
.with_has_header(true);

(Arc::new(format), path, ".csv")
(Arc::new(format), path, DEFAULT_CSV_EXTENSION)
}
"parquet" => {
let path = format!("{}/{}", path, table);
let format = ParquetFormat::default().with_enable_pruning(true);

(Arc::new(format), path, ".parquet")
(Arc::new(format), path, DEFAULT_PARQUET_EXTENSION)
}
other => {
unimplemented!("Invalid file format '{}'", other);
Expand Down
2 changes: 1 addition & 1 deletion datafusion-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ rust-version = "1.58"
[dependencies]
clap = { version = "3", features = ["derive", "cargo"] }
rustyline = "9.0"
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync"] }
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync", "parking_lot"] }
datafusion = { path = "../datafusion", version = "6.0.0" }
arrow = { package = "arrow2", version="0.9", features = ["io_print"] }
ballista = { path = "../ballista/rust/client", version = "0.6.0" }
2 changes: 1 addition & 1 deletion datafusion-examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,6 @@ arrow = { package = "arrow2", version="0.9", features = ["io_ipc", "io_flight"]
datafusion = { path = "../datafusion" }
prost = "0.9"
tonic = "0.6"
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync"] }
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync", "parking_lot"] }
futures = "0.3"
num_cpus = "1.13.0"
Loading

0 comments on commit 469731b

Please sign in to comment.