Skip to content

Commit

Permalink
Arrow2 test fix (#1733)
Browse files Browse the repository at this point in the history
* feat: add join type for logical plan display (#1674)

* (minor) Reduce memory manager and disk manager logs from `info!` to `debug!` (#1689)

* Move `information_schema` tests out of execution/context.rs to `sql_integration` tests (#1684)

* Move tests from context.rs to information_schema.rs

* Fix up tests to compile

* Move timestamp related tests out of context.rs and into sql integration test (#1696)

* Move some tests out of context.rs and into sql

* Move support test out of context.rs and into sql tests

* Fixup tests and make them compile

* Fix parquet projection

* fix pruning casting

* fix test based on debug strings

* revert read_spill method by getting schema from file

* Add `MemTrackingMetrics` to ease memory tracking for non-limited memory consumers (#1691)

* Memory manager no longer track consumers, update aggregatedMetricsSet

* Easy memory tracking with metrics

* use tracking metrics in SPMS

* tests

* fix

* doc

* Update datafusion/src/physical_plan/sorts/sort.rs

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>

* make tracker AtomicUsize

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>

* Implement TableProvider for DataFrameImpl (#1699)

* Add TableProvider impl for DataFrameImpl

* Add physical plan in

* Clean up plan construction and names construction

* Remove duplicate comments

* Remove unused parameter

* Add test

* Remove duplicate limit comment

* Use cloned instead of individual clone

* Reduce the amount of code to get a schema

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>

* Add comments to test

* Fix plan comparison

* Compare only the results of execution

* Remove println

* Refer to df_impl instead of table in test

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>

* Fix the register_table test to use the correct result set for comparison

* Consolidate group/agg exprs

* Format

* Remove outdated comment

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>

* refine test in repartition.rs & coalesce_batches.rs (#1707)

* Fuzz test for spillable sort (#1706)

* Lazy TempDir creation in DiskManager (#1695)

* Incorporate dyn scalar kernels (#1685)

* Rebase

* impl ToNumeric for ScalarValue

* Update macro to be based on

* Add floats

* Cleanup

* Newline

* add annotation for select_to_plan (#1714)

* Support `create_physical_expr` and `ExecutionContextState` or `DefaultPhysicalPlanner` for faster speed (#1700)

* Change physical_expr creation API

* Refactor API usage to avoid creating ExecutionContextState

* Fixup ballista

* clippy!

* Fix can not load parquet table form spark in datafusion-cli. (#1665)

* fix can not load parquet table form spark

* add Invalid file in log.

* fix fmt

* add upper bound for pub fn (#1713)

Signed-off-by: remzi <13716567376yh@gmail.com>

* Create SchemaAdapter trait to map table schema to file schemas (#1709)

* Create SchemaAdapter trait to map table schema to file schemas

* Linting fix

* Remove commented code

* approx_quantile() aggregation function (#1539)

* feat: implement TDigest for approx quantile

Adds a [TDigest] implementation providing approximate quantile
estimations of large inputs using a small amount of (bounded) memory.

A TDigest is most accurate near either "end" of the quantile range (that
is, 0.1, 0.9, 0.95, etc) due to the use of a scalaing function that
increases resolution at the tails. The paper claims single digit part
per million errors for q ≤ 0.001 or q ≥ 0.999 using 100 centroids, and
in practice I have found accuracy to be more than acceptable for an
apprixmate function across the entire quantile range.

The implementation is a modified copy of
https://github.com/MnO2/t-digest, itself a Rust port of [Facebook's C++
implementation]. Both Facebook's implementation, and Mn02's Rust port
are Apache 2.0 licensed.

[TDigest]: https://arxiv.org/abs/1902.04023
[Facebook's C++ implementation]: https://github.com/facebook/folly/blob/main/folly/stats/TDigest.h

* feat: approx_quantile aggregation

Adds the ApproxQuantile physical expression, plumbing & test cases.

The function signature is:

	approx_quantile(column, quantile)

Where column can be any numeric type (that can be cast to a float64) and
quantile is a float64 literal between 0 and 1.

* feat: approx_quantile dataframe function

Adds the approx_quantile() dataframe function, and exports it in the
prelude.

* refactor: bastilla approx_quantile support

Adds bastilla wire encoding for approx_quantile.

Adding support for this required modifying the AggregateExprNode proto
message to support propigating multiple LogicalExprNode aggregate
arguments - all the existing aggregations take a single argument, so
this wasn't needed before.

This commit adds "repeated" to the expr field, which I believe is
backwards compatible as described here:

	https://developers.google.com/protocol-buffers/docs/proto3#updating

Specifically, adding "repeated" to an existing message field:

	"For ... message fields, optional is compatible with repeated"

No existing tests needed fixing, and a new roundtrip test is included
that covers the change to allow multiple expr.

* refactor: use input type as return type

Casts the calculated quantile value to the same type as the input data.

* fixup! refactor: bastilla approx_quantile support

* refactor: rebase onto main

* refactor: validate quantile value

Ensures the quantile values is between 0 and 1, emitting a plan error if
not.

* refactor: rename to approx_percentile_cont

* refactor: clippy lints

* suppport bitwise and as an example (#1653)

* suppport bitwise and as an example

* Use $OP in macro rather than `&`

* fix: change signature to &dyn Array

* fmt

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>

* fix: substr - correct behaivour with negative start pos (#1660)

* minor: fix cargo run --release error (#1723)

* Convert boolean case expressions to boolean logic (#1719)

* Convert boolean case expressions to boolean logic

* Review feedback

* substitute `parking_lot::Mutex` for `std::sync::Mutex` (#1720)

* Substitute parking_lot::Mutex for std::sync::Mutex

* enable parking_lot feature in tokio

* Add Expression Simplification API (#1717)

* Add Expression Simplification API

* fmt

* use from_slice(&[T]) instead of from_slice(Vec<T>) to prevent future merge conflicts

* fix decimal add because arrow2 doesn't include decimal add in arithmetics::add

* fix decimal scale for cast test

* fix parquet file format adapted projection by providing the proper schema to the RecordBatch

Co-authored-by: xudong.w <wxd963996380@gmail.com>
Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
Co-authored-by: Yijie Shen <henry.yijieshen@gmail.com>
Co-authored-by: Phillip Cloud <417981+cpcloud@users.noreply.github.com>
Co-authored-by: Matthew Turner <matthew.m.turner@outlook.com>
Co-authored-by: Yang <37145547+Ted-Jiang@users.noreply.github.com>
Co-authored-by: Remzi Yang <59198230+HaoYang670@users.noreply.github.com>
Co-authored-by: Dan Harris <1327726+thinkharderdev@users.noreply.github.com>
Co-authored-by: Dom <dom@itsallbroken.com>
Co-authored-by: Kun Liu <liukun@apache.org>
Co-authored-by: Dmitry Patsura <talk@dmtry.me>
Co-authored-by: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com>
  • Loading branch information
13 people authored Feb 8, 2022
1 parent 98f98d1 commit 83f937a
Show file tree
Hide file tree
Showing 115 changed files with 5,193 additions and 2,273 deletions.
5 changes: 2 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,5 @@ lto = true
codegen-units = 1

[patch.crates-io]
arrow2 = { git = "https://github.com/jorgecarleitao/arrow2.git", branch = "main" }
#arrow2 = { git = "https://github.com/blaze-init/arrow2.git", branch = "shuffle_ipc" }
#parquet2 = { git = "https://github.com/blaze-init/parquet2.git", branch = "meta_new" }
#arrow2 = { git = "https://github.com/jorgecarleitao/arrow2.git", branch = "main" }
#parquet2 = { git = "https://github.com/jorgecarleitao/parquet2.git", branch = "main" }
2 changes: 1 addition & 1 deletion ballista-examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,6 @@ datafusion = { path = "../datafusion" }
ballista = { path = "../ballista/rust/client", version = "0.6.0"}
prost = "0.9"
tonic = "0.6"
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync"] }
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync", "parking_lot"] }
futures = "0.3"
num_cpus = "1.13.0"
1 change: 1 addition & 0 deletions ballista/rust/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ log = "0.4"
tokio = "1.0"
tempfile = "3"
sqlparser = "0.13"
parking_lot = "0.11"

datafusion = { path = "../../../datafusion", version = "6.0.0" }

Expand Down
19 changes: 10 additions & 9 deletions ballista/rust/client/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@

//! Distributed execution context.

use parking_lot::Mutex;
use sqlparser::ast::Statement;
use std::collections::HashMap;
use std::fs;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use std::sync::Arc;

use ballista_core::config::BallistaConfig;
use ballista_core::utils::create_df_ctx_with_ballista_query_planner;
Expand Down Expand Up @@ -142,7 +143,7 @@ impl BallistaContext {

// use local DataFusion context for now but later this might call the scheduler
let mut ctx = {
let guard = self.state.lock().unwrap();
let guard = self.state.lock();
create_df_ctx_with_ballista_query_planner(
&guard.scheduler_host,
guard.scheduler_port,
Expand All @@ -162,7 +163,7 @@ impl BallistaContext {

// use local DataFusion context for now but later this might call the scheduler
let mut ctx = {
let guard = self.state.lock().unwrap();
let guard = self.state.lock();
create_df_ctx_with_ballista_query_planner(
&guard.scheduler_host,
guard.scheduler_port,
Expand All @@ -186,7 +187,7 @@ impl BallistaContext {

// use local DataFusion context for now but later this might call the scheduler
let mut ctx = {
let guard = self.state.lock().unwrap();
let guard = self.state.lock();
create_df_ctx_with_ballista_query_planner(
&guard.scheduler_host,
guard.scheduler_port,
Expand All @@ -203,7 +204,7 @@ impl BallistaContext {
name: &str,
table: Arc<dyn TableProvider>,
) -> Result<()> {
let mut state = self.state.lock().unwrap();
let mut state = self.state.lock();
state.tables.insert(name.to_owned(), table);
Ok(())
}
Expand Down Expand Up @@ -280,7 +281,7 @@ impl BallistaContext {
/// might require the schema to be inferred.
pub async fn sql(&self, sql: &str) -> Result<Arc<dyn DataFrame>> {
let mut ctx = {
let state = self.state.lock().unwrap();
let state = self.state.lock();
create_df_ctx_with_ballista_query_planner(
&state.scheduler_host,
state.scheduler_port,
Expand All @@ -291,7 +292,7 @@ impl BallistaContext {
let is_show = self.is_show_statement(sql).await?;
// the show tables、 show columns sql can not run at scheduler because the tables is store at client
if is_show {
let state = self.state.lock().unwrap();
let state = self.state.lock();
ctx = ExecutionContext::with_config(
ExecutionConfig::new().with_information_schema(
state.config.default_with_information_schema(),
Expand All @@ -301,7 +302,7 @@ impl BallistaContext {

// register tables with DataFusion context
{
let state = self.state.lock().unwrap();
let state = self.state.lock();
for (name, prov) in &state.tables {
ctx.register_table(
TableReference::Bare { table: name },
Expand Down Expand Up @@ -483,7 +484,7 @@ mod tests {
.unwrap();

{
let mut guard = context.state.lock().unwrap();
let mut guard = context.state.lock();
let csv_table = guard.tables.get("single_nan");

if let Some(table_provide) = csv_table {
Expand Down
2 changes: 2 additions & 0 deletions ballista/rust/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ arrow = { package = "arrow2", version="0.9", features = ["io_ipc", "io_flight"]

datafusion = { path = "../../../datafusion", version = "6.0.0" }

parking_lot = "0.11"

[dev-dependencies]
tempfile = "3"

Expand Down
3 changes: 2 additions & 1 deletion ballista/rust/core/proto/ballista.proto
Original file line number Diff line number Diff line change
Expand Up @@ -176,11 +176,12 @@ enum AggregateFunction {
STDDEV=11;
STDDEV_POP=12;
CORRELATION=13;
APPROX_PERCENTILE_CONT = 14;
}

message AggregateExprNode {
AggregateFunction aggr_function = 1;
LogicalExprNode expr = 2;
repeated LogicalExprNode expr = 2;
}

enum BuiltInWindowFunction {
Expand Down
5 changes: 3 additions & 2 deletions ballista/rust/core/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@

use arrow::io::flight::deserialize_schemas;
use arrow::io::ipc::IpcSchema;
use std::sync::{Arc, Mutex};
use parking_lot::Mutex;
use std::sync::Arc;
use std::{collections::HashMap, pin::Pin};
use std::{
convert::{TryFrom, TryInto},
Expand Down Expand Up @@ -164,7 +165,7 @@ impl Stream for FlightDataStream {
self: std::pin::Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
let mut stream = self.stream.lock().expect("mutex is bad");
let mut stream = self.stream.lock();
stream.poll_next_unpin(cx).map(|x| match x {
Some(flight_data_chunk_result) => {
let converted_chunk = flight_data_chunk_result
Expand Down
3 changes: 2 additions & 1 deletion ballista/rust/core/src/execution_plans/shuffle_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@
//! partition is re-partitioned and streamed to disk in Arrow IPC format. Future stages of the query
//! will use the ShuffleReaderExec to read these results.

use parking_lot::Mutex;
use std::fs::File;
use std::iter::{FromIterator, Iterator};
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use std::sync::Arc;
use std::time::Instant;
use std::{any::Any, pin::Pin};

Expand Down
6 changes: 5 additions & 1 deletion ballista/rust/core/src/serde/logical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1066,7 +1066,11 @@ impl TryInto<Expr> for &protobuf::LogicalExprNode {

Ok(Expr::AggregateFunction {
fun,
args: vec![parse_required_expr(&expr.expr)?],
args: expr
.expr
.iter()
.map(|e| e.try_into())
.collect::<Result<Vec<_>, _>>()?,
distinct: false, //TODO
})
}
Expand Down
21 changes: 16 additions & 5 deletions ballista/rust/core/src/serde/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,15 @@ mod roundtrip_tests {
use crate::error::BallistaError;
use arrow::datatypes::IntegerType;
use core::panic;
use datafusion::arrow::datatypes::UnionMode;
use datafusion::field_util::SchemaExt;
use datafusion::logical_plan::Repartition;
use datafusion::{
arrow::datatypes::{DataType, Field, IntervalUnit, Schema, TimeUnit},
arrow::datatypes::{DataType, Field, IntervalUnit, Schema, TimeUnit, UnionMode},
datasource::object_store::local::LocalFileSystem,
logical_plan::{
col, CreateExternalTable, Expr, LogicalPlan, LogicalPlanBuilder,
Partitioning, ToDFSchema,
Partitioning, Repartition, ToDFSchema,
},
physical_plan::functions::BuiltinScalarFunction::Sqrt,
physical_plan::{aggregates, functions::BuiltinScalarFunction::Sqrt},
prelude::*,
scalar::ScalarValue,
sql::parser::FileType,
Expand Down Expand Up @@ -1009,4 +1007,17 @@ mod roundtrip_tests {

Ok(())
}

#[test]
fn roundtrip_approx_percentile_cont() -> Result<()> {
let test_expr = Expr::AggregateFunction {
fun: aggregates::AggregateFunction::ApproxPercentileCont,
args: vec![col("bananas"), lit(0.42)],
distinct: false,
};

roundtrip_test!(test_expr, protobuf::LogicalExprNode, Expr);

Ok(())
}
}
14 changes: 10 additions & 4 deletions ballista/rust/core/src/serde/logical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1130,6 +1130,9 @@ impl TryInto<protobuf::LogicalExprNode> for &Expr {
AggregateFunction::ApproxDistinct => {
protobuf::AggregateFunction::ApproxDistinct
}
AggregateFunction::ApproxPercentileCont => {
protobuf::AggregateFunction::ApproxPercentileCont
}
AggregateFunction::ArrayAgg => protobuf::AggregateFunction::ArrayAgg,
AggregateFunction::Min => protobuf::AggregateFunction::Min,
AggregateFunction::Max => protobuf::AggregateFunction::Max,
Expand All @@ -1155,11 +1158,13 @@ impl TryInto<protobuf::LogicalExprNode> for &Expr {
}
};

let arg = &args[0];
let aggregate_expr = Box::new(protobuf::AggregateExprNode {
let aggregate_expr = protobuf::AggregateExprNode {
aggr_function: aggr_function.into(),
expr: Some(Box::new(arg.try_into()?)),
});
expr: args
.iter()
.map(|v| v.try_into())
.collect::<Result<Vec<_>, _>>()?,
};
Ok(protobuf::LogicalExprNode {
expr_type: Some(ExprType::AggregateExpr(aggregate_expr)),
})
Expand Down Expand Up @@ -1390,6 +1395,7 @@ impl From<&AggregateFunction> for protobuf::AggregateFunction {
AggregateFunction::Stddev => Self::Stddev,
AggregateFunction::StddevPop => Self::StddevPop,
AggregateFunction::Correlation => Self::Correlation,
AggregateFunction::ApproxPercentileCont => Self::ApproxPercentileCont,
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions ballista/rust/core/src/serde/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,9 @@ impl From<protobuf::AggregateFunction> for AggregateFunction {
protobuf::AggregateFunction::Stddev => AggregateFunction::Stddev,
protobuf::AggregateFunction::StddevPop => AggregateFunction::StddevPop,
protobuf::AggregateFunction::Correlation => AggregateFunction::Correlation,
protobuf::AggregateFunction::ApproxPercentileCont => {
AggregateFunction::ApproxPercentileCont
}
}
}
}
Expand Down
3 changes: 1 addition & 2 deletions ballista/rust/core/src/serde/physical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -626,7 +626,6 @@ impl TryFrom<&protobuf::PhysicalExprNode> for Arc<dyn PhysicalExpr> {
let ctx_state = ExecutionContextState {
catalog_list,
scalar_functions: Default::default(),
var_provider: Default::default(),
aggregate_functions: Default::default(),
config: ExecutionConfig::new(),
execution_props: ExecutionProps::new(),
Expand All @@ -636,7 +635,7 @@ impl TryFrom<&protobuf::PhysicalExprNode> for Arc<dyn PhysicalExpr> {

let fun_expr = functions::create_physical_fun(
&(&scalar_function).into(),
&ctx_state,
&ctx_state.execution_props,
)?;

Arc::new(ScalarFunctionExpr::new(
Expand Down
3 changes: 2 additions & 1 deletion ballista/rust/executor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,12 @@ futures = "0.3"
log = "0.4"
snmalloc-rs = {version = "0.2", features= ["cache-friendly"], optional = true}
tempfile = "3"
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread"] }
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "parking_lot"] }
tokio-stream = { version = "0.1", features = ["net"] }
tonic = "0.6"
uuid = { version = "0.8", features = ["v4"] }
hyper = "0.14.4"
parking_lot = "0.11"

[dev-dependencies]

Expand Down
1 change: 1 addition & 0 deletions ballista/rust/scheduler/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ tokio-stream = { version = "0.1", features = ["net"], optional = true }
tonic = "0.6"
tower = { version = "0.4" }
warp = "0.3"
parking_lot = "0.11"

[dev-dependencies]
ballista-core = { path = "../core", version = "0.6.0" }
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ arrow = { package = "arrow2", version="0.9", features = ["io_csv", "io_json", "i
datafusion = { path = "../datafusion" }
ballista = { path = "../ballista/rust/client" }
structopt = { version = "0.3", default-features = false }
tokio = { version = "^1.0", features = ["macros", "rt", "rt-multi-thread"] }
tokio = { version = "^1.0", features = ["macros", "rt", "rt-multi-thread", "parking_lot"] }
futures = "0.3"
env_logger = "0.9"
mimalloc = { version = "0.1", optional = true, default-features = false }
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/src/bin/nyctaxi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ async fn datafusion_sql_benchmarks(
}

async fn execute_sql(ctx: &mut ExecutionContext, sql: &str, debug: bool) -> Result<()> {
let runtime = ctx.state.lock().unwrap().runtime_env.clone();
let runtime = ctx.state.lock().runtime_env.clone();
let plan = ctx.create_logical_plan(sql)?;
let plan = ctx.optimize(&plan)?;
if debug {
Expand Down
10 changes: 6 additions & 4 deletions benchmarks/src/bin/tpch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ use arrow::io::parquet::write::{Compression, Version, WriteOptions};
use ballista::prelude::{
BallistaConfig, BallistaContext, BALLISTA_DEFAULT_SHUFFLE_PARTITIONS,
};
use datafusion::datasource::file_format::csv::DEFAULT_CSV_EXTENSION;
use datafusion::datasource::file_format::parquet::DEFAULT_PARQUET_EXTENSION;
use datafusion::field_util::SchemaExt;
use structopt::StructOpt;

Expand Down Expand Up @@ -264,7 +266,7 @@ async fn benchmark_datafusion(opt: DataFusionBenchmarkOpt) -> Result<Vec<RecordB
.with_target_partitions(opt.partitions)
.with_batch_size(opt.batch_size);
let mut ctx = ExecutionContext::with_config(config);
let runtime = ctx.state.lock().unwrap().runtime_env.clone();
let runtime = ctx.state.lock().runtime_env.clone();

// register tables
for table in TABLES {
Expand Down Expand Up @@ -546,7 +548,7 @@ async fn execute_query(
displayable(physical_plan.as_ref()).indent()
);
}
let runtime = ctx.state.lock().unwrap().runtime_env.clone();
let runtime = ctx.state.lock().runtime_env.clone();
let result = collect(physical_plan.clone(), runtime).await?;
if debug {
println!(
Expand Down Expand Up @@ -661,13 +663,13 @@ fn get_table(
.with_delimiter(b',')
.with_has_header(true);

(Arc::new(format), path, ".csv")
(Arc::new(format), path, DEFAULT_CSV_EXTENSION)
}
"parquet" => {
let path = format!("{}/{}", path, table);
let format = ParquetFormat::default().with_enable_pruning(true);

(Arc::new(format), path, ".parquet")
(Arc::new(format), path, DEFAULT_PARQUET_EXTENSION)
}
other => {
unimplemented!("Invalid file format '{}'", other);
Expand Down
2 changes: 1 addition & 1 deletion datafusion-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ rust-version = "1.58"
[dependencies]
clap = { version = "3", features = ["derive", "cargo"] }
rustyline = "9.0"
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync"] }
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync", "parking_lot"] }
datafusion = { path = "../datafusion", version = "6.0.0" }
arrow = { package = "arrow2", version="0.9", features = ["io_print"] }
ballista = { path = "../ballista/rust/client", version = "0.6.0" }
2 changes: 1 addition & 1 deletion datafusion-examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,6 @@ arrow = { package = "arrow2", version="0.9", features = ["io_ipc", "io_flight"]
datafusion = { path = "../datafusion" }
prost = "0.9"
tonic = "0.6"
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync"] }
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync", "parking_lot"] }
futures = "0.3"
num_cpus = "1.13.0"
Loading

0 comments on commit 83f937a

Please sign in to comment.