Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
14d807a
Failing tests
avantgardnerio Jun 26, 2022
88f5d7f
Add month/year arithmetic
avantgardnerio Jun 26, 2022
d2f43c9
Fix tests?
avantgardnerio Jun 26, 2022
e34705e
Fix clippy?
avantgardnerio Jun 26, 2022
c37d29e
Update datafusion/common/src/scalar.rs
avantgardnerio Jun 27, 2022
874a5ed
Add support for all types, fix math
avantgardnerio Jun 29, 2022
ee1c756
Fix doc
avantgardnerio Jun 29, 2022
5ea1c28
Fix test that relied on previous flawed implementation
avantgardnerio Jun 29, 2022
8348470
Appease clippy
avantgardnerio Jun 29, 2022
cd999c7
Failing test case for TPC-H query 20
avantgardnerio Jun 25, 2022
ccdb98f
Fix name
avantgardnerio Jun 25, 2022
e7fcb2f
Broken test for adding intervals to dates
avantgardnerio Jun 25, 2022
9b51e46
Tests pass
avantgardnerio Jun 26, 2022
de8ae11
Fix rebase
avantgardnerio Jun 27, 2022
8dd2b16
Fix query
avantgardnerio Jun 27, 2022
34b2908
Additional tests
avantgardnerio Jun 27, 2022
6a759ce
Reduce to minimum failing (and passing) cases
avantgardnerio Jun 27, 2022
37a73c2
Adjust so data _should_ be returned, but see none
avantgardnerio Jun 27, 2022
1db5c8d
Fixed data, decorrelated test passes
avantgardnerio Jun 27, 2022
f3ee70c
Check in plans
avantgardnerio Jun 27, 2022
b08da97
Put real assertion in place
avantgardnerio Jun 27, 2022
f22c079
Add test for already working subquery optimizer
avantgardnerio Jun 27, 2022
0e0e0c7
Add decorellator
avantgardnerio Jun 27, 2022
308b67c
Check in broken test
avantgardnerio Jun 27, 2022
0c5ed1a
Add some passing and failing tests to see scope of problem
avantgardnerio Jun 28, 2022
d11d7f9
Have almost all inputs needed for optimization, but need to catch 1 l…
avantgardnerio Jun 28, 2022
6ab6894
Collected all inputs, now we just need to optimize
avantgardnerio Jun 28, 2022
b281c8c
Successfully decorrelated query 4
avantgardnerio Jun 28, 2022
6a08eb1
refactor
avantgardnerio Jun 29, 2022
7e02545
Pass test 4
avantgardnerio Jun 29, 2022
ea3f219
Ready for PR?
avantgardnerio Jun 29, 2022
50b3549
Only operate on equality expressions
avantgardnerio Jun 29, 2022
f90d95a
Lint error
avantgardnerio Jun 29, 2022
9377cdf
Tests still pass because we are losing remaining predicate
avantgardnerio Jun 29, 2022
23b0ffb
Don't lose remaining expressions
avantgardnerio Jun 29, 2022
858b284
Update test to expect remaining filter clause
avantgardnerio Jun 29, 2022
00a661b
Debugging
avantgardnerio Jun 30, 2022
1708415
Can run query 4
avantgardnerio Jun 30, 2022
60a6e58
Remove debugging code
avantgardnerio Jun 30, 2022
b8c0808
Clippy
avantgardnerio Jun 30, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions datafusion/common/src/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ use std::{convert::TryFrom, fmt, iter::repeat, sync::Arc};

/// Represents a dynamically typed, nullable single value.
/// This is the single-valued counter-part of arrow’s `Array`.
/// https://arrow.apache.org/docs/python/api/datatypes.html
/// https://github.com/apache/arrow/blob/master/format/Schema.fbs#L354-L375
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, this was built upon #2797 . I'll turn this into a draft until that gets merged.

#[derive(Clone)]
pub enum ScalarValue {
/// represents `DataType::Null` (castable to/from any other type)
Expand Down Expand Up @@ -75,9 +77,9 @@ pub enum ScalarValue {
LargeBinary(Option<Vec<u8>>),
/// list of nested ScalarValue
List(Option<Vec<ScalarValue>>, Box<DataType>),
/// Date stored as a signed 32bit int
/// Date stored as a signed 32bit int days since UNIX epoch 1970-01-01
Date32(Option<i32>),
/// Date stored as a signed 64bit int
/// Date stored as a signed 64bit int milliseconds since UNIX epoch 1970-01-01
Date64(Option<i64>),
/// Timestamp Second
TimestampSecond(Option<i64>, Option<String>),
Expand All @@ -87,11 +89,14 @@ pub enum ScalarValue {
TimestampMicrosecond(Option<i64>, Option<String>),
/// Timestamp Nanoseconds
TimestampNanosecond(Option<i64>, Option<String>),
/// Interval with YearMonth unit
/// Number of elapsed whole months
IntervalYearMonth(Option<i32>),
/// Interval with DayTime unit
/// Number of elapsed days and milliseconds (no leap seconds)
/// stored as 2 contiguous 32-bit signed integers
IntervalDayTime(Option<i64>),
/// Interval with MonthDayNano unit
/// A triple of the number of elapsed months, days, and nanoseconds.
/// Months and days are encoded as 32-bit signed integers.
/// Nanoseconds is encoded as a 64-bit signed integer (no leap seconds).
IntervalMonthDayNano(Option<i128>),
/// struct of nested ScalarValue
Struct(Option<Vec<ScalarValue>>, Box<Vec<Field>>),
Expand Down
2 changes: 2 additions & 0 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ use chrono::{DateTime, Utc};
use datafusion_common::ScalarValue;
use datafusion_expr::TableSource;
use datafusion_optimizer::filter_null_join_keys::FilterNullJoinKeys;
use datafusion_optimizer::subquery_decorrelate::SubqueryDecorrelate;
use datafusion_sql::{
parser::DFParser,
planner::{ContextProvider, SqlToRel},
Expand Down Expand Up @@ -1239,6 +1240,7 @@ impl SessionState {
// of applying other optimizations
Arc::new(SimplifyExpressions::new()),
Arc::new(SubqueryFilterToJoin::new()),
Arc::new(SubqueryDecorrelate::new()),
Arc::new(EliminateFilter::new()),
Arc::new(CommonSubexprEliminate::new()),
Arc::new(EliminateLimit::new()),
Expand Down
33 changes: 32 additions & 1 deletion datafusion/core/tests/sql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ mod explain;
mod idenfifers;
pub mod information_schema;
mod partitioned_csv;
mod subqueries;
#[cfg(feature = "unicode_expressions")]
pub mod unicode;

Expand Down Expand Up @@ -483,7 +484,37 @@ fn get_tpch_table_schema(table: &str) -> Schema {
Field::new("n_comment", DataType::Utf8, false),
]),

_ => unimplemented!(),
"supplier" => Schema::new(vec![
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add missing TPC-H tables to support testing those queries.

Field::new("s_suppkey", DataType::Int64, false),
Field::new("s_name", DataType::Utf8, false),
Field::new("s_address", DataType::Utf8, false),
Field::new("s_nationkey", DataType::Int64, false),
Field::new("s_phone", DataType::Utf8, false),
Field::new("s_acctbal", DataType::Float64, false),
Field::new("s_comment", DataType::Utf8, false),
]),

"partsupp" => Schema::new(vec![
Field::new("ps_partkey", DataType::Int64, false),
Field::new("ps_suppkey", DataType::Int64, false),
Field::new("ps_availqty", DataType::Int32, false),
Field::new("ps_supplycost", DataType::Float64, false),
Field::new("ps_comment", DataType::Utf8, false),
]),

"part" => Schema::new(vec![
Field::new("p_partkey", DataType::Int64, false),
Field::new("p_name", DataType::Utf8, false),
Field::new("p_mfgr", DataType::Utf8, false),
Field::new("p_brand", DataType::Utf8, false),
Field::new("p_type", DataType::Utf8, false),
Field::new("p_size", DataType::Int32, false),
Field::new("p_container", DataType::Utf8, false),
Field::new("p_retailprice", DataType::Float64, false),
Field::new("p_comment", DataType::Utf8, false),
]),

_ => unimplemented!("Table: {}", table),
}
}

Expand Down
67 changes: 67 additions & 0 deletions datafusion/core/tests/sql/subqueries.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
use super::*;
use crate::sql::execute_to_batches;
use datafusion::assert_batches_eq;
use datafusion::prelude::SessionContext;

#[tokio::test]
async fn tpch_q4_correlated() -> Result<()> {
let ctx = SessionContext::new();
register_tpch_csv(&ctx, "orders").await?;
register_tpch_csv(&ctx, "lineitem").await?;

/*
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Annotate plan with variable names from optimizer code for cross-correlation.

#orders.o_orderpriority ASC NULLS LAST
Projection: #orders.o_orderpriority, #COUNT(UInt8(1)) AS order_count
Aggregate: groupBy=[[#orders.o_orderpriority]], aggr=[[COUNT(UInt8(1))]]
Filter: EXISTS ( -- plan
Subquery: Projection: * -- proj
Filter: #lineitem.l_orderkey = #orders.o_orderkey -- filter
TableScan: lineitem projection=None -- filter.input
)
TableScan: orders projection=None -- plan.inputs
*/
let sql = r#"
select o_orderpriority, count(*) as order_count
from orders
where exists (
select * from lineitem where l_orderkey = o_orderkey and l_commitdate < l_receiptdate)
group by o_orderpriority
order by o_orderpriority;
"#;

// assert plan
let plan = ctx
.create_logical_plan(sql)
.map_err(|e| format!("{:?} at {}", e, "error"))
.unwrap();
let plan = ctx
.optimize(&plan)
.map_err(|e| format!("{:?} at {}", e, "error"))
.unwrap();
let actual = format!("{}", plan.display_indent());
let expected = r#"Sort: #orders.o_orderpriority ASC NULLS LAST
Projection: #orders.o_orderpriority, #COUNT(UInt8(1)) AS order_count
Aggregate: groupBy=[[#orders.o_orderpriority]], aggr=[[COUNT(UInt8(1))]]
Inner Join: #orders.o_orderkey = #lineitem.l_orderkey
TableScan: orders projection=[o_orderkey, o_orderpriority]
Projection: #lineitem.l_orderkey
Aggregate: groupBy=[[#lineitem.l_orderkey]], aggr=[[]]
Filter: #lineitem.l_commitdate < #lineitem.l_receiptdate
TableScan: lineitem projection=[l_orderkey, l_commitdate, l_receiptdate], partial_filters=[#lineitem.l_commitdate < #lineitem.l_receiptdate]"#
.to_string();
assert_eq!(actual, expected);

// assert data
let results = execute_to_batches(&ctx, sql).await;
let expected = vec![
"+-----------------+-------------+",
"| o_orderpriority | order_count |",
"+-----------------+-------------+",
"| 1-URGENT | 1 |",
"| 5-LOW | 1 |",
"+-----------------+-------------+",
];
assert_batches_eq!(expected, &results);

Ok(())
}
120 changes: 120 additions & 0 deletions datafusion/core/tests/sql/timestamp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -814,3 +814,123 @@ async fn group_by_timestamp_millis() -> Result<()> {
assert_batches_eq!(expected, &actual);
Ok(())
}

#[tokio::test]
async fn interval_year() -> Result<()> {
let ctx = SessionContext::new();

let sql = "select date '1994-01-01' + interval '1' year as date;";
let results = execute_to_batches(&ctx, sql).await;

let expected = vec![
"+------------+",
"| date |",
"+------------+",
"| 1995-01-01 |",
"+------------+",
];

assert_batches_eq!(expected, &results);

Ok(())
}

#[tokio::test]
async fn add_interval_month() -> Result<()> {
let ctx = SessionContext::new();

let sql = "select date '1994-01-31' + interval '1' month as date;";
let results = execute_to_batches(&ctx, sql).await;

let expected = vec![
"+------------+",
"| date |",
"+------------+",
"| 1994-02-28 |",
"+------------+",
];

assert_batches_eq!(expected, &results);

Ok(())
}

#[tokio::test]
async fn sub_interval_month() -> Result<()> {
let ctx = SessionContext::new();

let sql = "select date '1994-03-31' - interval '1' month as date;";
let results = execute_to_batches(&ctx, sql).await;

let expected = vec![
"+------------+",
"| date |",
"+------------+",
"| 1994-02-28 |",
"+------------+",
];

assert_batches_eq!(expected, &results);

Ok(())
}

#[tokio::test]
async fn sub_month_wrap() -> Result<()> {
let ctx = SessionContext::new();

let sql = "select date '1994-01-15' - interval '1' month as date;";
let results = execute_to_batches(&ctx, sql).await;

let expected = vec![
"+------------+",
"| date |",
"+------------+",
"| 1993-12-15 |",
"+------------+",
];

assert_batches_eq!(expected, &results);

Ok(())
}

#[tokio::test]
async fn add_interval_day() -> Result<()> {
let ctx = SessionContext::new();

let sql = "select date '1994-01-15' + interval '1' day as date;";
let results = execute_to_batches(&ctx, sql).await;

let expected = vec![
"+------------+",
"| date |",
"+------------+",
"| 1994-01-16 |",
"+------------+",
];

assert_batches_eq!(expected, &results);

Ok(())
}

#[tokio::test]
async fn sub_interval_day() -> Result<()> {
let ctx = SessionContext::new();

let sql = "select date '1994-01-01' - interval '1' day as date;";
let results = execute_to_batches(&ctx, sql).await;

let expected = vec![
"+------------+",
"| date |",
"+------------+",
"| 1993-12-31 |",
"+------------+",
];

assert_batches_eq!(expected, &results);

Ok(())
}
2 changes: 2 additions & 0 deletions datafusion/core/tests/tpch-csv/part.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
p_partkey,p_name,p_mfgr,p_brand,p_type,p_size,p_container,p_retailprice,p_comment
1,goldenrod lavender spring chocolate lace,Manufacturer#1,Brand#13,PROMO BURNISHED COPPER,7,JUMBO PKG,901.00,ly. slyly ironi
2 changes: 2 additions & 0 deletions datafusion/core/tests/tpch-csv/partsupp.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ps_partkey,ps_suppkey,ps_availqty,ps_supplycost,ps_comment
67310,7311,100,993.49,ven ideas. quickly even packages print. pending multipliers must have to are fluff
Empty file.
2 changes: 2 additions & 0 deletions datafusion/core/tests/tpch-csv/supplier.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
s_suppkey,s_name,s_address,s_nationkey,s_phone,s_acctbal,s_comment
1,Supplier#000000001, N kD4on9OM Ipw3,gf0JBoQDd7tgrzrddZ,17,27-918-335-1736,5755.94,each slyly above the careful
1 change: 1 addition & 0 deletions datafusion/optimizer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,5 @@ datafusion-common = { path = "../common", version = "9.0.0" }
datafusion-expr = { path = "../expr", version = "9.0.0" }
datafusion-physical-expr = { path = "../physical-expr", version = "9.0.0" }
hashbrown = { version = "0.12", features = ["raw"] }
itertools = "0.10"
log = "^0.4"
1 change: 1 addition & 0 deletions datafusion/optimizer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ pub mod projection_push_down;
pub mod reduce_outer_join;
pub mod simplify_expressions;
pub mod single_distinct_to_groupby;
pub mod subquery_decorrelate;
pub mod subquery_filter_to_join;
pub mod utils;

Expand Down
8 changes: 4 additions & 4 deletions datafusion/optimizer/src/simplify_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1951,7 +1951,7 @@ mod tests {
let date_plus_interval_expr = to_timestamp_expr(ts_string)
.cast_to(&DataType::Date32, schema)
.unwrap()
+ Expr::Literal(ScalarValue::IntervalDayTime(Some(123)));
+ Expr::Literal(ScalarValue::IntervalDayTime(Some(123i64 << 32)));

let plan = LogicalPlanBuilder::from(table_scan.clone())
.project(vec![date_plus_interval_expr])
Expand All @@ -1963,10 +1963,10 @@ mod tests {

// Note that constant folder runs and folds the entire
// expression down to a single constant (true)
let expected = "Projection: Date32(\"18636\") AS CAST(totimestamp(Utf8(\"2020-09-08T12:05:00+00:00\")) AS Date32) + IntervalDayTime(\"123\")\
\n TableScan: test";
let expected = r#"Projection: Date32("18636") AS CAST(totimestamp(Utf8("2020-09-08T12:05:00+00:00")) AS Date32) + IntervalDayTime("528280977408")
TableScan: test"#;
let actual = get_optimized_plan_formatted(&plan, &time);

assert_eq!(expected, actual);
assert_eq!(actual, expected);
}
}
Loading