Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Arrow2 test fix #1733

Merged
merged 31 commits into from
Feb 8, 2022
Merged
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
7b8d72c
feat: add join type for logical plan display (#1674)
xudong963 Jan 27, 2022
18ced8d
(minor) Reduce memory manager and disk manager logs from `info!` to `…
alamb Jan 28, 2022
ed1de63
Move `information_schema` tests out of execution/context.rs to `sql_i…
alamb Jan 28, 2022
ab145c8
Move timestamp related tests out of context.rs and into sql integrati…
alamb Jan 28, 2022
39632dd
Fix parquet projection
Igosuki Jan 29, 2022
a34213e
fix pruning casting
Igosuki Jan 29, 2022
530f4f4
fix test based on debug strings
Igosuki Jan 29, 2022
b95044e
revert read_spill method by getting schema from file
Igosuki Jan 29, 2022
641338f
Add `MemTrackingMetrics` to ease memory tracking for non-limited memo…
yjshen Jan 29, 2022
0d6d1ce
Implement TableProvider for DataFrameImpl (#1699)
cpcloud Jan 30, 2022
75c7578
refine test in repartition.rs & coalesce_batches.rs (#1707)
xudong963 Jan 30, 2022
a7f0156
Fuzz test for spillable sort (#1706)
yjshen Jan 30, 2022
fecce97
Lazy TempDir creation in DiskManager (#1695)
alamb Jan 30, 2022
3494e9c
Incorporate dyn scalar kernels (#1685)
matthewmturner Jan 30, 2022
2512608
add annotation for select_to_plan (#1714)
xudong963 Jan 31, 2022
1caf52a
Support `create_physical_expr` and `ExecutionContextState` or `Defaul…
alamb Jan 31, 2022
f849968
Fix can not load parquet table form spark in datafusion-cli. (#1665)
Ted-Jiang Jan 31, 2022
d01d8d5
add upper bound for pub fn (#1713)
HaoYang670 Jan 31, 2022
7bec762
Create SchemaAdapter trait to map table schema to file schemas (#1709)
thinkharderdev Jan 31, 2022
cfb655d
approx_quantile() aggregation function (#1539)
domodwyer Jan 31, 2022
940d4eb
suppport bitwise and as an example (#1653)
liukun4515 Jan 31, 2022
b6ace16
fix: substr - correct behaivour with negative start pos (#1660)
ovr Jan 31, 2022
bacf10d
minor: fix cargo run --release error (#1723)
xudong963 Feb 1, 2022
b9a8f15
Convert boolean case expressions to boolean logic (#1719)
tustvold Feb 1, 2022
46879f1
substitute `parking_lot::Mutex` for `std::sync::Mutex` (#1720)
xudong963 Feb 2, 2022
e4a056f
Add Expression Simplification API (#1717)
alamb Feb 2, 2022
469731b
Use arrow2 0.9 and pull master from Feb 2 2022
Igosuki Feb 2, 2022
5ad5f7c
use from_slice(&[T]) instead of from_slice(Vec<T>) to prevent future …
Igosuki Feb 2, 2022
b8f9bc2
fix decimal add because arrow2 doesn't include decimal add in arithme…
Igosuki Feb 3, 2022
80078b5
fix decimal scale for cast test
Igosuki Feb 3, 2022
f2debbb
fix parquet file format adapted projection by providing the proper sc…
Igosuki Feb 3, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,5 @@ lto = true
codegen-units = 1

[patch.crates-io]
arrow2 = { git = "https://github.com/jorgecarleitao/arrow2.git", branch = "main" }
#arrow2 = { git = "https://github.com/blaze-init/arrow2.git", branch = "shuffle_ipc" }
#parquet2 = { git = "https://github.com/blaze-init/parquet2.git", branch = "meta_new" }
#arrow2 = { git = "https://github.com/jorgecarleitao/arrow2.git", branch = "main" }
#parquet2 = { git = "https://github.com/jorgecarleitao/parquet2.git", branch = "main" }
2 changes: 1 addition & 1 deletion ballista-examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,6 @@ datafusion = { path = "../datafusion" }
ballista = { path = "../ballista/rust/client", version = "0.6.0"}
prost = "0.9"
tonic = "0.6"
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync"] }
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync", "parking_lot"] }
futures = "0.3"
num_cpus = "1.13.0"
1 change: 1 addition & 0 deletions ballista/rust/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ log = "0.4"
tokio = "1.0"
tempfile = "3"
sqlparser = "0.13"
parking_lot = "0.11"

datafusion = { path = "../../../datafusion", version = "6.0.0" }

Expand Down
19 changes: 10 additions & 9 deletions ballista/rust/client/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@

//! Distributed execution context.

use parking_lot::Mutex;
use sqlparser::ast::Statement;
use std::collections::HashMap;
use std::fs;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use std::sync::Arc;

use ballista_core::config::BallistaConfig;
use ballista_core::utils::create_df_ctx_with_ballista_query_planner;
Expand Down Expand Up @@ -142,7 +143,7 @@ impl BallistaContext {

// use local DataFusion context for now but later this might call the scheduler
let mut ctx = {
let guard = self.state.lock().unwrap();
let guard = self.state.lock();
create_df_ctx_with_ballista_query_planner(
&guard.scheduler_host,
guard.scheduler_port,
Expand All @@ -162,7 +163,7 @@ impl BallistaContext {

// use local DataFusion context for now but later this might call the scheduler
let mut ctx = {
let guard = self.state.lock().unwrap();
let guard = self.state.lock();
create_df_ctx_with_ballista_query_planner(
&guard.scheduler_host,
guard.scheduler_port,
Expand All @@ -186,7 +187,7 @@ impl BallistaContext {

// use local DataFusion context for now but later this might call the scheduler
let mut ctx = {
let guard = self.state.lock().unwrap();
let guard = self.state.lock();
create_df_ctx_with_ballista_query_planner(
&guard.scheduler_host,
guard.scheduler_port,
Expand All @@ -203,7 +204,7 @@ impl BallistaContext {
name: &str,
table: Arc<dyn TableProvider>,
) -> Result<()> {
let mut state = self.state.lock().unwrap();
let mut state = self.state.lock();
state.tables.insert(name.to_owned(), table);
Ok(())
}
Expand Down Expand Up @@ -280,7 +281,7 @@ impl BallistaContext {
/// might require the schema to be inferred.
pub async fn sql(&self, sql: &str) -> Result<Arc<dyn DataFrame>> {
let mut ctx = {
let state = self.state.lock().unwrap();
let state = self.state.lock();
create_df_ctx_with_ballista_query_planner(
&state.scheduler_host,
state.scheduler_port,
Expand All @@ -291,7 +292,7 @@ impl BallistaContext {
let is_show = self.is_show_statement(sql).await?;
// the show tables、 show columns sql can not run at scheduler because the tables is store at client
if is_show {
let state = self.state.lock().unwrap();
let state = self.state.lock();
ctx = ExecutionContext::with_config(
ExecutionConfig::new().with_information_schema(
state.config.default_with_information_schema(),
Expand All @@ -301,7 +302,7 @@ impl BallistaContext {

// register tables with DataFusion context
{
let state = self.state.lock().unwrap();
let state = self.state.lock();
for (name, prov) in &state.tables {
ctx.register_table(
TableReference::Bare { table: name },
Expand Down Expand Up @@ -483,7 +484,7 @@ mod tests {
.unwrap();

{
let mut guard = context.state.lock().unwrap();
let mut guard = context.state.lock();
let csv_table = guard.tables.get("single_nan");

if let Some(table_provide) = csv_table {
Expand Down
2 changes: 2 additions & 0 deletions ballista/rust/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ arrow = { package = "arrow2", version="0.9", features = ["io_ipc", "io_flight"]

datafusion = { path = "../../../datafusion", version = "6.0.0" }

parking_lot = "0.11"

[dev-dependencies]
tempfile = "3"

Expand Down
3 changes: 2 additions & 1 deletion ballista/rust/core/proto/ballista.proto
Original file line number Diff line number Diff line change
Expand Up @@ -176,11 +176,12 @@ enum AggregateFunction {
STDDEV=11;
STDDEV_POP=12;
CORRELATION=13;
APPROX_PERCENTILE_CONT = 14;
}

message AggregateExprNode {
AggregateFunction aggr_function = 1;
LogicalExprNode expr = 2;
repeated LogicalExprNode expr = 2;
}

enum BuiltInWindowFunction {
Expand Down
5 changes: 3 additions & 2 deletions ballista/rust/core/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@

use arrow::io::flight::deserialize_schemas;
use arrow::io::ipc::IpcSchema;
use std::sync::{Arc, Mutex};
use parking_lot::Mutex;
use std::sync::Arc;
use std::{collections::HashMap, pin::Pin};
use std::{
convert::{TryFrom, TryInto},
Expand Down Expand Up @@ -164,7 +165,7 @@ impl Stream for FlightDataStream {
self: std::pin::Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
let mut stream = self.stream.lock().expect("mutex is bad");
let mut stream = self.stream.lock();
stream.poll_next_unpin(cx).map(|x| match x {
Some(flight_data_chunk_result) => {
let converted_chunk = flight_data_chunk_result
Expand Down
3 changes: 2 additions & 1 deletion ballista/rust/core/src/execution_plans/shuffle_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@
//! partition is re-partitioned and streamed to disk in Arrow IPC format. Future stages of the query
//! will use the ShuffleReaderExec to read these results.

use parking_lot::Mutex;
use std::fs::File;
use std::iter::{FromIterator, Iterator};
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use std::sync::Arc;
use std::time::Instant;
use std::{any::Any, pin::Pin};

Expand Down
6 changes: 5 additions & 1 deletion ballista/rust/core/src/serde/logical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1066,7 +1066,11 @@ impl TryInto<Expr> for &protobuf::LogicalExprNode {

Ok(Expr::AggregateFunction {
fun,
args: vec![parse_required_expr(&expr.expr)?],
args: expr
.expr
.iter()
.map(|e| e.try_into())
.collect::<Result<Vec<_>, _>>()?,
distinct: false, //TODO
})
}
Expand Down
21 changes: 16 additions & 5 deletions ballista/rust/core/src/serde/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,15 @@ mod roundtrip_tests {
use crate::error::BallistaError;
use arrow::datatypes::IntegerType;
use core::panic;
use datafusion::arrow::datatypes::UnionMode;
use datafusion::field_util::SchemaExt;
use datafusion::logical_plan::Repartition;
use datafusion::{
arrow::datatypes::{DataType, Field, IntervalUnit, Schema, TimeUnit},
arrow::datatypes::{DataType, Field, IntervalUnit, Schema, TimeUnit, UnionMode},
datasource::object_store::local::LocalFileSystem,
logical_plan::{
col, CreateExternalTable, Expr, LogicalPlan, LogicalPlanBuilder,
Partitioning, ToDFSchema,
Partitioning, Repartition, ToDFSchema,
},
physical_plan::functions::BuiltinScalarFunction::Sqrt,
physical_plan::{aggregates, functions::BuiltinScalarFunction::Sqrt},
prelude::*,
scalar::ScalarValue,
sql::parser::FileType,
Expand Down Expand Up @@ -1009,4 +1007,17 @@ mod roundtrip_tests {

Ok(())
}

#[test]
fn roundtrip_approx_percentile_cont() -> Result<()> {
let test_expr = Expr::AggregateFunction {
fun: aggregates::AggregateFunction::ApproxPercentileCont,
args: vec![col("bananas"), lit(0.42)],
distinct: false,
};

roundtrip_test!(test_expr, protobuf::LogicalExprNode, Expr);

Ok(())
}
}
14 changes: 10 additions & 4 deletions ballista/rust/core/src/serde/logical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1130,6 +1130,9 @@ impl TryInto<protobuf::LogicalExprNode> for &Expr {
AggregateFunction::ApproxDistinct => {
protobuf::AggregateFunction::ApproxDistinct
}
AggregateFunction::ApproxPercentileCont => {
protobuf::AggregateFunction::ApproxPercentileCont
}
AggregateFunction::ArrayAgg => protobuf::AggregateFunction::ArrayAgg,
AggregateFunction::Min => protobuf::AggregateFunction::Min,
AggregateFunction::Max => protobuf::AggregateFunction::Max,
Expand All @@ -1155,11 +1158,13 @@ impl TryInto<protobuf::LogicalExprNode> for &Expr {
}
};

let arg = &args[0];
let aggregate_expr = Box::new(protobuf::AggregateExprNode {
let aggregate_expr = protobuf::AggregateExprNode {
aggr_function: aggr_function.into(),
expr: Some(Box::new(arg.try_into()?)),
});
expr: args
.iter()
.map(|v| v.try_into())
.collect::<Result<Vec<_>, _>>()?,
};
Ok(protobuf::LogicalExprNode {
expr_type: Some(ExprType::AggregateExpr(aggregate_expr)),
})
Expand Down Expand Up @@ -1390,6 +1395,7 @@ impl From<&AggregateFunction> for protobuf::AggregateFunction {
AggregateFunction::Stddev => Self::Stddev,
AggregateFunction::StddevPop => Self::StddevPop,
AggregateFunction::Correlation => Self::Correlation,
AggregateFunction::ApproxPercentileCont => Self::ApproxPercentileCont,
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions ballista/rust/core/src/serde/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,9 @@ impl From<protobuf::AggregateFunction> for AggregateFunction {
protobuf::AggregateFunction::Stddev => AggregateFunction::Stddev,
protobuf::AggregateFunction::StddevPop => AggregateFunction::StddevPop,
protobuf::AggregateFunction::Correlation => AggregateFunction::Correlation,
protobuf::AggregateFunction::ApproxPercentileCont => {
AggregateFunction::ApproxPercentileCont
}
}
}
}
Expand Down
3 changes: 1 addition & 2 deletions ballista/rust/core/src/serde/physical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -626,7 +626,6 @@ impl TryFrom<&protobuf::PhysicalExprNode> for Arc<dyn PhysicalExpr> {
let ctx_state = ExecutionContextState {
catalog_list,
scalar_functions: Default::default(),
var_provider: Default::default(),
aggregate_functions: Default::default(),
config: ExecutionConfig::new(),
execution_props: ExecutionProps::new(),
Expand All @@ -636,7 +635,7 @@ impl TryFrom<&protobuf::PhysicalExprNode> for Arc<dyn PhysicalExpr> {

let fun_expr = functions::create_physical_fun(
&(&scalar_function).into(),
&ctx_state,
&ctx_state.execution_props,
)?;

Arc::new(ScalarFunctionExpr::new(
Expand Down
3 changes: 2 additions & 1 deletion ballista/rust/executor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,12 @@ futures = "0.3"
log = "0.4"
snmalloc-rs = {version = "0.2", features= ["cache-friendly"], optional = true}
tempfile = "3"
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread"] }
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "parking_lot"] }
tokio-stream = { version = "0.1", features = ["net"] }
tonic = "0.6"
uuid = { version = "0.8", features = ["v4"] }
hyper = "0.14.4"
parking_lot = "0.11"

[dev-dependencies]

Expand Down
1 change: 1 addition & 0 deletions ballista/rust/scheduler/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ tokio-stream = { version = "0.1", features = ["net"], optional = true }
tonic = "0.6"
tower = { version = "0.4" }
warp = "0.3"
parking_lot = "0.11"

[dev-dependencies]
ballista-core = { path = "../core", version = "0.6.0" }
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ arrow = { package = "arrow2", version="0.9", features = ["io_csv", "io_json", "i
datafusion = { path = "../datafusion" }
ballista = { path = "../ballista/rust/client" }
structopt = { version = "0.3", default-features = false }
tokio = { version = "^1.0", features = ["macros", "rt", "rt-multi-thread"] }
tokio = { version = "^1.0", features = ["macros", "rt", "rt-multi-thread", "parking_lot"] }
futures = "0.3"
env_logger = "0.9"
mimalloc = { version = "0.1", optional = true, default-features = false }
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/src/bin/nyctaxi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ async fn datafusion_sql_benchmarks(
}

async fn execute_sql(ctx: &mut ExecutionContext, sql: &str, debug: bool) -> Result<()> {
let runtime = ctx.state.lock().unwrap().runtime_env.clone();
let runtime = ctx.state.lock().runtime_env.clone();
let plan = ctx.create_logical_plan(sql)?;
let plan = ctx.optimize(&plan)?;
if debug {
Expand Down
10 changes: 6 additions & 4 deletions benchmarks/src/bin/tpch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ use arrow::io::parquet::write::{Compression, Version, WriteOptions};
use ballista::prelude::{
BallistaConfig, BallistaContext, BALLISTA_DEFAULT_SHUFFLE_PARTITIONS,
};
use datafusion::datasource::file_format::csv::DEFAULT_CSV_EXTENSION;
use datafusion::datasource::file_format::parquet::DEFAULT_PARQUET_EXTENSION;
use datafusion::field_util::SchemaExt;
use structopt::StructOpt;

Expand Down Expand Up @@ -264,7 +266,7 @@ async fn benchmark_datafusion(opt: DataFusionBenchmarkOpt) -> Result<Vec<RecordB
.with_target_partitions(opt.partitions)
.with_batch_size(opt.batch_size);
let mut ctx = ExecutionContext::with_config(config);
let runtime = ctx.state.lock().unwrap().runtime_env.clone();
let runtime = ctx.state.lock().runtime_env.clone();

// register tables
for table in TABLES {
Expand Down Expand Up @@ -546,7 +548,7 @@ async fn execute_query(
displayable(physical_plan.as_ref()).indent()
);
}
let runtime = ctx.state.lock().unwrap().runtime_env.clone();
let runtime = ctx.state.lock().runtime_env.clone();
let result = collect(physical_plan.clone(), runtime).await?;
if debug {
println!(
Expand Down Expand Up @@ -661,13 +663,13 @@ fn get_table(
.with_delimiter(b',')
.with_has_header(true);

(Arc::new(format), path, ".csv")
(Arc::new(format), path, DEFAULT_CSV_EXTENSION)
}
"parquet" => {
let path = format!("{}/{}", path, table);
let format = ParquetFormat::default().with_enable_pruning(true);

(Arc::new(format), path, ".parquet")
(Arc::new(format), path, DEFAULT_PARQUET_EXTENSION)
}
other => {
unimplemented!("Invalid file format '{}'", other);
Expand Down
2 changes: 1 addition & 1 deletion datafusion-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ rust-version = "1.58"
[dependencies]
clap = { version = "3", features = ["derive", "cargo"] }
rustyline = "9.0"
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync"] }
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync", "parking_lot"] }
datafusion = { path = "../datafusion", version = "6.0.0" }
arrow = { package = "arrow2", version="0.9", features = ["io_print"] }
ballista = { path = "../ballista/rust/client", version = "0.6.0" }
2 changes: 1 addition & 1 deletion datafusion-examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,6 @@ arrow = { package = "arrow2", version="0.9", features = ["io_ipc", "io_flight"]
datafusion = { path = "../datafusion" }
prost = "0.9"
tonic = "0.6"
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync"] }
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync", "parking_lot"] }
futures = "0.3"
num_cpus = "1.13.0"
Loading