Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for correlated subqueries & fix all related TPC-H benchmark issues #2885

Merged
merged 139 commits into from
Jul 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
139 commits
Select commit Hold shift + click to select a range
0432ef2
Failing test case for TPC-H query 20
avantgardnerio Jun 25, 2022
831812b
Fix name
avantgardnerio Jun 25, 2022
fed7706
Broken test for adding intervals to dates
avantgardnerio Jun 25, 2022
ed0236e
Tests pass
avantgardnerio Jun 26, 2022
e4286b8
Fix rebase
avantgardnerio Jun 27, 2022
d86861c
Fix query
avantgardnerio Jun 27, 2022
22e4561
Additional tests
avantgardnerio Jun 27, 2022
e71dd5a
Reduce to minimum failing (and passing) cases
avantgardnerio Jun 27, 2022
5f004e6
Adjust so data _should_ be returned, but see none
avantgardnerio Jun 27, 2022
927decb
Fixed data, decorrelated test passes
avantgardnerio Jun 27, 2022
acf44d3
Check in plans
avantgardnerio Jun 27, 2022
3a59fe5
Put real assertion in place
avantgardnerio Jun 27, 2022
6353642
Add test for already working subquery optimizer
avantgardnerio Jun 27, 2022
4e49f87
Add decorellator
avantgardnerio Jun 27, 2022
caeafb2
Check in broken test
avantgardnerio Jun 27, 2022
129cf4e
Add some passing and failing tests to see scope of problem
avantgardnerio Jun 28, 2022
e6a7369
Have almost all inputs needed for optimization, but need to catch 1 l…
avantgardnerio Jun 28, 2022
5257dc9
Collected all inputs, now we just need to optimize
avantgardnerio Jun 28, 2022
94d7519
Successfully decorrelated query 4
avantgardnerio Jun 28, 2022
4c492d5
refactor
avantgardnerio Jun 29, 2022
869b1f3
Pass test 4
avantgardnerio Jun 29, 2022
9d8ad43
Ready for PR?
avantgardnerio Jun 29, 2022
51da916
Only operate on equality expressions
avantgardnerio Jun 29, 2022
d9c89c9
Lint error
avantgardnerio Jun 29, 2022
eac67cd
Tests still pass because we are losing remaining predicate
avantgardnerio Jun 29, 2022
a9dc5b0
Don't lose remaining expressions
avantgardnerio Jun 29, 2022
e464491
Update test to expect remaining filter clause
avantgardnerio Jun 29, 2022
27880e5
Debugging
avantgardnerio Jun 30, 2022
9d04212
Can run query 4
avantgardnerio Jun 30, 2022
cd6d217
Remove debugging code
avantgardnerio Jun 30, 2022
3f5c8fe
Clippy
avantgardnerio Jun 30, 2022
ae4273d
Refactor where exists, add scalar subquery
avantgardnerio Jul 1, 2022
ca2af5b
Login qty < () and 0.2 times, predicate pushdown is killing our plan
avantgardnerio Jul 1, 2022
3d5dd4d
Query plan looks good
avantgardnerio Jul 1, 2022
b87c1a8
Fudge data to make test output nicer
avantgardnerio Jul 1, 2022
990f8b9
Fix syntax error
avantgardnerio Jul 1, 2022
5b8263b
[WIP] where in
avantgardnerio Jul 2, 2022
ba1afa0
Working recursively, q20 plan looks good, but execution failing
avantgardnerio Jul 2, 2022
83565a8
Fix CSV for execution error, remove silly variables in favor of --noc…
avantgardnerio Jul 3, 2022
cd64684
Silence verbose logs
avantgardnerio Jul 3, 2022
aaa952e
Query 21 test
avantgardnerio Jul 3, 2022
06a1379
[WIP] refactoring, query 4 looking good
avantgardnerio Jul 4, 2022
184f0f8
[WIP] 4 & 17 look good
avantgardnerio Jul 4, 2022
bbbd2a9
22 good?
avantgardnerio Jul 4, 2022
7e8622f
Check in "Test" for query 11
avantgardnerio Jul 4, 2022
d7ef835
query 11 works
avantgardnerio Jul 4, 2022
c7b28ad
Don't throw away plans when multiple subqueries in one filter
avantgardnerio Jul 4, 2022
b951b1a
Manually decorellate query 21
avantgardnerio Jul 4, 2022
3ca913e
[WIP] add data for query 21, anti join failing for some reason
avantgardnerio Jul 5, 2022
400ae9c
Does appear to be problem with anti-join
avantgardnerio Jul 5, 2022
b22c4ac
Minimum failing test
avantgardnerio Jul 5, 2022
5276a6e
Verify anti join fix
avantgardnerio Jul 6, 2022
47d5e42
Repeatable tests
avantgardnerio Jul 6, 2022
653c920
cargo fmt
avantgardnerio Jul 6, 2022
dc8b7ab
Restore some optimizers and update test expectations
avantgardnerio Jul 6, 2022
9f38ec5
Restore some optimizers and update test expectations
avantgardnerio Jul 6, 2022
bc82fa5
Restore some optimizers and update test expectations
avantgardnerio Jul 6, 2022
86a50ea
Restore some optimizers and update test expectations
avantgardnerio Jul 6, 2022
c515248
Cleanup
avantgardnerio Jul 6, 2022
5bc2bc8
Cleanup scalar subquery, de-duplicate some code
avantgardnerio Jul 9, 2022
6136619
Cleanup
avantgardnerio Jul 9, 2022
7babe7c
Refactor
avantgardnerio Jul 9, 2022
c87ac6f
Refactor
avantgardnerio Jul 9, 2022
84d7e7a
Refactor
avantgardnerio Jul 10, 2022
0525ebf
Refactor
avantgardnerio Jul 10, 2022
7f9205d
Handle recursive where in
avantgardnerio Jul 11, 2022
bea3d88
Update assertions
avantgardnerio Jul 11, 2022
2687f79
Support recursion in where exists queries
avantgardnerio Jul 11, 2022
5a1b9c2
Unit tests on where in
avantgardnerio Jul 11, 2022
1819bc2
Add correlated where in test
avantgardnerio Jul 11, 2022
5e1fbb1
Nasty code to make where in work for both correlated and uncorrelated…
avantgardnerio Jul 11, 2022
12175ab
Cleanup
avantgardnerio Jul 11, 2022
4fbec36
Refactoring
avantgardnerio Jul 11, 2022
52f9f4f
Refactoring
avantgardnerio Jul 11, 2022
040a170
Add correlated unit test
avantgardnerio Jul 12, 2022
4c935de
Add correlated where exists unit test
avantgardnerio Jul 12, 2022
7d676d1
[WIP] Failing scalar subquery unit test
avantgardnerio Jul 12, 2022
e813823
Refactor
avantgardnerio Jul 12, 2022
6339140
tuple mixup
avantgardnerio Jul 12, 2022
514a932
Scalar subquery unit test
avantgardnerio Jul 12, 2022
4e60070
ASF header
avantgardnerio Jul 12, 2022
3393bc2
PR feedback
avantgardnerio Jul 12, 2022
d834176
PR feedback
avantgardnerio Jul 12, 2022
0099418
PR feedback
avantgardnerio Jul 12, 2022
198793f
PR feedback
avantgardnerio Jul 12, 2022
0598af5
Fix build again
avantgardnerio Jul 12, 2022
9c84053
Formatting
avantgardnerio Jul 12, 2022
bc252ac
Testing
avantgardnerio Jul 14, 2022
d7658ca
multiple where in
avantgardnerio Jul 14, 2022
5a774e4
Unit tests for where in
avantgardnerio Jul 14, 2022
c500912
where exists tests
avantgardnerio Jul 14, 2022
0374e98
scalar subquery tests
avantgardnerio Jul 14, 2022
8594377
add aggregates to scalar subqueries
avantgardnerio Jul 15, 2022
396543d
Remove tests that only existed to get logical plans as input to unit …
avantgardnerio Jul 15, 2022
0c570d0
Check in assertions for valid tests
avantgardnerio Jul 15, 2022
9a64518
1/33 passing unit tests :/
avantgardnerio Jul 15, 2022
fdf656a
Down to one failing test
avantgardnerio Jul 15, 2022
097c0dd
All the unit tests pass
avantgardnerio Jul 17, 2022
972e309
into methods
avantgardnerio Jul 17, 2022
aee3c69
Where exists unit tests passing
avantgardnerio Jul 17, 2022
a7aa68a
Try from methods
avantgardnerio Jul 17, 2022
dff21a5
Fix tests
avantgardnerio Jul 18, 2022
a18cdc7
Fix tests
avantgardnerio Jul 18, 2022
14eebd5
Refactor
avantgardnerio Jul 18, 2022
8b21621
Fix test
avantgardnerio Jul 18, 2022
1a1ef22
Refactor
avantgardnerio Jul 18, 2022
d02f660
Fix test
avantgardnerio Jul 18, 2022
418e6f7
Fix error message
avantgardnerio Jul 18, 2022
5016c94
Fix tests
avantgardnerio Jul 18, 2022
6ba276b
Fix tests
avantgardnerio Jul 18, 2022
3bc8cae
Refactor
avantgardnerio Jul 18, 2022
e897007
Refactor and fix tests
avantgardnerio Jul 18, 2022
b73a252
Improved recursive subquery test
avantgardnerio Jul 18, 2022
96b934f
Recursive subquery fix
avantgardnerio Jul 18, 2022
f00f784
Update tests
avantgardnerio Jul 18, 2022
b177119
Update tests
avantgardnerio Jul 18, 2022
63204dc
Update tests
avantgardnerio Jul 18, 2022
11f24ac
Doc
avantgardnerio Jul 18, 2022
ddaeafc
Clippy
avantgardnerio Jul 18, 2022
054c07e
Linter & clippy
avantgardnerio Jul 18, 2022
953b85a
Add doc, move test methods into test modules
avantgardnerio Jul 18, 2022
d885631
PR cleanup
avantgardnerio Jul 19, 2022
63bcf4f
Inline test data
avantgardnerio Jul 19, 2022
4773ec8
Remove shared test data
avantgardnerio Jul 19, 2022
220c404
Remove shared test data
avantgardnerio Jul 19, 2022
095a41c
Update tests
avantgardnerio Jul 19, 2022
d01c86e
Fix toml
avantgardnerio Jul 19, 2022
3bd5ab1
Update expectation
avantgardnerio Jul 20, 2022
c2a25da
PR feedback
avantgardnerio Jul 20, 2022
ee91f24
PR feedback
avantgardnerio Jul 20, 2022
fcc7a63
Fix test to reveal logic error
avantgardnerio Jul 20, 2022
3960ca8
Simplify test
avantgardnerio Jul 20, 2022
0c941b7
Fix stuff, break other stuff
avantgardnerio Jul 20, 2022
69f5102
I've writen scala in rust because I'm in a hurry :(
avantgardnerio Jul 20, 2022
e341570
Clean the API up a little
avantgardnerio Jul 20, 2022
8408614
PR feedback
avantgardnerio Jul 21, 2022
99ca496
PR feedback
avantgardnerio Jul 21, 2022
50a952f
PR feedback
avantgardnerio Jul 21, 2022
517d0bd
PR feedback
avantgardnerio Jul 21, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion benchmarks/queries/q20.sql
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ where
l_partkey = ps_partkey
and l_suppkey = ps_suppkey
and l_shipdate >= date '1994-01-01'
and l_shipdate < 'date 1994-01-01' + interval '1' year
and l_shipdate < date '1994-01-01' + interval '1' year
avantgardnerio marked this conversation as resolved.
Show resolved Hide resolved
)
)
and s_nationkey = n_nationkey
Expand Down
27 changes: 27 additions & 0 deletions datafusion/common/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,30 @@ pub enum DataFusionError {
#[cfg(feature = "jit")]
/// Error occurs during code generation
JITError(ModuleError),
/// Error with additional context
Context(String, Box<DataFusionError>),
}

#[macro_export]
macro_rules! context {
($desc:expr, $err:expr) => {
datafusion_common::DataFusionError::Context(
format!("{} at {}:{}", $desc, file!(), line!()),
Box::new($err),
)
};
}

#[macro_export]
macro_rules! plan_err {
alamb marked this conversation as resolved.
Show resolved Hide resolved
($desc:expr) => {
Err(datafusion_common::DataFusionError::Plan(format!(
"{} at {}:{}",
$desc,
file!(),
line!()
)))
};
}

/// Schema-related errors
Expand Down Expand Up @@ -285,6 +309,9 @@ impl Display for DataFusionError {
DataFusionError::ObjectStore(ref desc) => {
write!(f, "Object Store error: {}", desc)
}
DataFusionError::Context(ref desc, ref err) => {
write!(f, "{}\ncaused by\n{}", desc, *err)
}
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ uuid = { version = "1.0", features = ["v4"] }

[dev-dependencies]
criterion = "0.3"
csv = "1.1.6"
ctor = "0.1.22"
doc-comment = "0.3"
env_logger = "0.9"
fuzz-utils = { path = "fuzz-utils" }
Expand Down
6 changes: 6 additions & 0 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ use async_trait::async_trait;
use chrono::{DateTime, Utc};
use datafusion_common::ScalarValue;
use datafusion_expr::TableSource;
use datafusion_optimizer::decorrelate_scalar_subquery::DecorrelateScalarSubquery;
use datafusion_optimizer::decorrelate_where_exists::DecorrelateWhereExists;
use datafusion_optimizer::decorrelate_where_in::DecorrelateWhereIn;
use datafusion_optimizer::filter_null_join_keys::FilterNullJoinKeys;
use datafusion_sql::{
parser::DFParser,
Expand Down Expand Up @@ -1356,6 +1359,9 @@ impl SessionState {
// Simplify expressions first to maximize the chance
// of applying other optimizations
Arc::new(SimplifyExpressions::new()),
Arc::new(DecorrelateWhereExists::new()),
Arc::new(DecorrelateWhereIn::new()),
Arc::new(DecorrelateScalarSubquery::new()),
Arc::new(SubqueryFilterToJoin::new()),
Arc::new(EliminateFilter::new()),
Arc::new(CommonSubexprEliminate::new()),
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/physical_plan/coalesce_batches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use arrow::datatypes::SchemaRef;
use arrow::error::Result as ArrowResult;
use arrow::record_batch::RecordBatch;
use futures::stream::{Stream, StreamExt};
use log::debug;
use log::trace;

use super::expressions::PhysicalSortExpr;
use super::metrics::{BaselineMetrics, MetricsSet};
Expand Down Expand Up @@ -286,7 +286,7 @@ pub fn concat_batches(
)?;
arrays.push(array);
}
debug!(
trace!(
"Combined {} batches containing {} rows",
batches.len(),
row_count
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/physical_plan/file_format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ mod file_stream;
mod json;
mod parquet;

pub(crate) use self::csv::plan_to_csv;
pub use self::csv::CsvExec;
pub(crate) use self::parquet::plan_to_parquet;
pub use self::parquet::ParquetExec;
use arrow::{
Expand All @@ -36,8 +38,6 @@ use arrow::{
record_batch::RecordBatch,
};
pub use avro::AvroExec;
pub(crate) use csv::plan_to_csv;
pub use csv::CsvExec;
pub(crate) use json::plan_to_json;
pub use json::NdJsonExec;

Expand Down
111 changes: 110 additions & 1 deletion datafusion/core/tests/sql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ use datafusion_expr::Volatility;
use object_store::path::Path;
use std::fs::File;
use std::io::Write;
use std::ops::Sub;
use std::path::PathBuf;
use tempfile::TempDir;

Expand Down Expand Up @@ -108,6 +109,7 @@ mod explain;
mod idenfifers;
pub mod information_schema;
mod partitioned_csv;
mod subqueries;
#[cfg(feature = "unicode_expressions")]
pub mod unicode;

Expand Down Expand Up @@ -483,7 +485,43 @@ fn get_tpch_table_schema(table: &str) -> Schema {
Field::new("n_comment", DataType::Utf8, false),
]),

_ => unimplemented!(),
"supplier" => Schema::new(vec![
Field::new("s_suppkey", DataType::Int64, false),
Field::new("s_name", DataType::Utf8, false),
Field::new("s_address", DataType::Utf8, false),
Field::new("s_nationkey", DataType::Int64, false),
Field::new("s_phone", DataType::Utf8, false),
Field::new("s_acctbal", DataType::Float64, false),
Field::new("s_comment", DataType::Utf8, false),
]),

"partsupp" => Schema::new(vec![
Field::new("ps_partkey", DataType::Int64, false),
Field::new("ps_suppkey", DataType::Int64, false),
Field::new("ps_availqty", DataType::Int32, false),
Field::new("ps_supplycost", DataType::Float64, false),
Field::new("ps_comment", DataType::Utf8, false),
]),

"part" => Schema::new(vec![
Field::new("p_partkey", DataType::Int64, false),
Field::new("p_name", DataType::Utf8, false),
Field::new("p_mfgr", DataType::Utf8, false),
Field::new("p_brand", DataType::Utf8, false),
Field::new("p_type", DataType::Utf8, false),
Field::new("p_size", DataType::Int32, false),
Field::new("p_container", DataType::Utf8, false),
Field::new("p_retailprice", DataType::Float64, false),
Field::new("p_comment", DataType::Utf8, false),
]),

"region" => Schema::new(vec![
Field::new("r_regionkey", DataType::Int64, false),
Field::new("r_name", DataType::Utf8, false),
Field::new("r_comment", DataType::Utf8, false),
]),

_ => unimplemented!("Table: {}", table),
}
}

Expand All @@ -499,6 +537,77 @@ async fn register_tpch_csv(ctx: &SessionContext, table: &str) -> Result<()> {
Ok(())
}

async fn register_tpch_csv_data(
ctx: &SessionContext,
table_name: &str,
data: &str,
) -> Result<()> {
let schema = Arc::new(get_tpch_table_schema(table_name));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about using SessionContext::register_csv here instead? Is there some reason we need to explicitly parse the CSV file and build in memory tables?

https://docs.rs/datafusion/10.0.0/datafusion/execution/context/struct.SessionContext.html#method.register_csv

Copy link
Contributor Author

@avantgardnerio avantgardnerio Jul 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I started with the TPC-H .csvs that were checked in already, and even added some of my own, then started adding data to existing ones, then updated failed tests with new expected results, then realized I'd had the feeling before where I'm going down the road hell that is shared test data. I see it being particularly bad for aggregates (what's the proper expected result for the sum of all sales with parts from the middle east?)

I started to break out csvs by folder, but that seemed cumbersome, so finally I thought that this function might be a useful tool to keep the data close to the test itself, like this:

https://github.com/spaceandtimelabs/arrow-datafusion/blob/563d87d1c6413e611894619c2bc472b396d75c3d/datafusion/core/tests/sql/subqueries.rs#L171

I don't have strong opinions as to implementation, but I would like to avoid sharing one set of data between all integration tests.


let mut reader = ::csv::ReaderBuilder::new()
.has_headers(false)
.from_reader(data.as_bytes());
let records: Vec<_> = reader.records().map(|it| it.unwrap()).collect();

let mut cols: Vec<Box<dyn ArrayBuilder>> = vec![];
for field in schema.fields().iter() {
match field.data_type() {
DataType::Utf8 => cols.push(Box::new(StringBuilder::new(records.len()))),
DataType::Date32 => cols.push(Box::new(Date32Builder::new(records.len()))),
DataType::Int32 => cols.push(Box::new(Int32Builder::new(records.len()))),
DataType::Int64 => cols.push(Box::new(Int64Builder::new(records.len()))),
DataType::Float64 => cols.push(Box::new(Float64Builder::new(records.len()))),
_ => {
let msg = format!("Not implemented: {}", field.data_type());
Err(DataFusionError::Plan(msg))?
}
}
}

for record in records.iter() {
for (idx, val) in record.iter().enumerate() {
let col = cols.get_mut(idx).unwrap();
let field = schema.field(idx);
match field.data_type() {
DataType::Utf8 => {
let sb = col.as_any_mut().downcast_mut::<StringBuilder>().unwrap();
sb.append_value(val)?;
}
DataType::Date32 => {
let sb = col.as_any_mut().downcast_mut::<Date32Builder>().unwrap();
let dt = NaiveDate::parse_from_str(val.trim(), "%Y-%m-%d").unwrap();
let dt = dt.sub(NaiveDate::from_ymd(1970, 1, 1)).num_days() as i32;
sb.append_value(dt)?;
}
DataType::Int32 => {
let sb = col.as_any_mut().downcast_mut::<Int32Builder>().unwrap();
sb.append_value(val.trim().parse().unwrap())?;
}
DataType::Int64 => {
let sb = col.as_any_mut().downcast_mut::<Int64Builder>().unwrap();
sb.append_value(val.trim().parse().unwrap())?;
}
DataType::Float64 => {
let sb = col.as_any_mut().downcast_mut::<Float64Builder>().unwrap();
sb.append_value(val.trim().parse().unwrap())?;
}
_ => Err(DataFusionError::Plan(format!(
"Not implemented: {}",
field.data_type()
)))?,
}
}
}
let cols: Vec<ArrayRef> = cols.iter_mut().map(|it| it.finish()).collect();

let batch = RecordBatch::try_new(Arc::clone(&schema), cols)?;

let table = Arc::new(MemTable::try_new(Arc::clone(&schema), vec![vec![batch]])?);
let _ = ctx.register_table(table_name, table).unwrap();

Ok(())
}

async fn register_aggregate_csv_by_sql(ctx: &SessionContext) {
let testdata = datafusion::test_util::arrow_test_data();

Expand Down
Loading