diff --git a/benchmarks/expected-plans/q11.txt b/benchmarks/expected-plans/q11.txt index bb5493828e99..7d8e145487ba 100644 --- a/benchmarks/expected-plans/q11.txt +++ b/benchmarks/expected-plans/q11.txt @@ -1,6 +1,6 @@ Sort: value DESC NULLS FIRST Projection: partsupp.ps_partkey, SUM(partsupp.ps_supplycost * partsupp.ps_availqty) AS value - Filter: CAST(SUM(partsupp.ps_supplycost * partsupp.ps_availqty) AS Decimal128(38, 15)) > CAST(__sq_1.__value AS Decimal128(38, 15)) + Filter: CAST(SUM(partsupp.ps_supplycost * partsupp.ps_availqty) AS Decimal128(38, 15)) > CAST(__scalar_sq_1.__value AS Decimal128(38, 15)) CrossJoin: Aggregate: groupBy=[[partsupp.ps_partkey]], aggr=[[SUM(CAST(partsupp.ps_supplycost AS Decimal128(26, 2)) * CAST(partsupp.ps_availqty AS Decimal128(26, 2)))]] Inner Join: supplier.s_nationkey = nation.n_nationkey @@ -9,7 +9,7 @@ Sort: value DESC NULLS FIRST TableScan: supplier projection=[s_suppkey, s_nationkey] Filter: nation.n_name = Utf8("GERMANY") TableScan: nation projection=[n_nationkey, n_name] - SubqueryAlias: __sq_1 + SubqueryAlias: __scalar_sq_1 Projection: CAST(SUM(partsupp.ps_supplycost * partsupp.ps_availqty) AS Float64) * Float64(0.0001) AS __value Aggregate: groupBy=[[]], aggr=[[SUM(CAST(partsupp.ps_supplycost AS Decimal128(26, 2)) * CAST(partsupp.ps_availqty AS Decimal128(26, 2)))]] Inner Join: supplier.s_nationkey = nation.n_nationkey diff --git a/benchmarks/expected-plans/q15.txt b/benchmarks/expected-plans/q15.txt index 96401dd7bd81..f4e053f8d421 100644 --- a/benchmarks/expected-plans/q15.txt +++ b/benchmarks/expected-plans/q15.txt @@ -1,7 +1,7 @@ EmptyRelation Sort: supplier.s_suppkey ASC NULLS LAST Projection: supplier.s_suppkey, supplier.s_name, supplier.s_address, supplier.s_phone, revenue0.total_revenue - Inner Join: revenue0.total_revenue = __sq_1.__value + Inner Join: revenue0.total_revenue = __scalar_sq_1.__value Inner Join: supplier.s_suppkey = revenue0.supplier_no TableScan: supplier projection=[s_suppkey, s_name, s_address, s_phone] SubqueryAlias: revenue0 @@ -10,7 +10,7 @@ Sort: supplier.s_suppkey ASC NULLS LAST Aggregate: groupBy=[[lineitem.l_suppkey]], aggr=[[SUM(CAST(lineitem.l_extendedprice AS Decimal128(38, 4)) * CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4))) AS SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]] Filter: lineitem.l_shipdate >= Date32("9496") AND lineitem.l_shipdate < Date32("9587") TableScan: lineitem projection=[l_suppkey, l_extendedprice, l_discount, l_shipdate] - SubqueryAlias: __sq_1 + SubqueryAlias: __scalar_sq_1 Projection: MAX(revenue0.total_revenue) AS __value Aggregate: groupBy=[[]], aggr=[[MAX(revenue0.total_revenue)]] SubqueryAlias: revenue0 diff --git a/benchmarks/expected-plans/q16.txt b/benchmarks/expected-plans/q16.txt index 60ef269334b5..6af486a2a0ab 100644 --- a/benchmarks/expected-plans/q16.txt +++ b/benchmarks/expected-plans/q16.txt @@ -3,12 +3,12 @@ Sort: supplier_cnt DESC NULLS FIRST, part.p_brand ASC NULLS LAST, part.p_type AS Projection: group_alias_0 AS part.p_brand, group_alias_1 AS part.p_type, group_alias_2 AS part.p_size, COUNT(alias1) AS COUNT(DISTINCT partsupp.ps_suppkey) Aggregate: groupBy=[[group_alias_0, group_alias_1, group_alias_2]], aggr=[[COUNT(alias1)]] Aggregate: groupBy=[[part.p_brand AS group_alias_0, part.p_type AS group_alias_1, part.p_size AS group_alias_2, partsupp.ps_suppkey AS alias1]], aggr=[[]] - LeftAnti Join: partsupp.ps_suppkey = __sq_1.s_suppkey + LeftAnti Join: partsupp.ps_suppkey = __correlated_sq_1.s_suppkey Inner Join: partsupp.ps_partkey = part.p_partkey TableScan: partsupp projection=[ps_partkey, ps_suppkey] Filter: part.p_brand != Utf8("Brand#45") AND part.p_type NOT LIKE Utf8("MEDIUM POLISHED%") AND part.p_size IN ([Int32(49), Int32(14), Int32(23), Int32(45), Int32(19), Int32(3), Int32(36), Int32(9)]) TableScan: part projection=[p_partkey, p_brand, p_type, p_size] - SubqueryAlias: __sq_1 + SubqueryAlias: __correlated_sq_1 Projection: supplier.s_suppkey AS s_suppkey Filter: supplier.s_comment LIKE Utf8("%Customer%Complaints%") TableScan: supplier projection=[s_suppkey, s_comment] diff --git a/benchmarks/expected-plans/q17.txt b/benchmarks/expected-plans/q17.txt index 17b8e969879a..755311c5ee10 100644 --- a/benchmarks/expected-plans/q17.txt +++ b/benchmarks/expected-plans/q17.txt @@ -1,11 +1,12 @@ -Projection: CAST(SUM(lineitem.l_extendedprice) AS Decimal128(38, 33)) / Decimal128(Some(7000000000000000195487369212723200),38,33) AS avg_yearly +Projection: CAST(SUM(lineitem.l_extendedprice) AS Float64) / Float64(7) AS avg_yearly Aggregate: groupBy=[[]], aggr=[[SUM(lineitem.l_extendedprice)]] - Filter: CAST(lineitem.l_quantity AS Decimal128(38, 21)) < __sq_1.__value - Inner Join: part.p_partkey = __sq_1.l_partkey + Filter: CAST(lineitem.l_quantity AS Decimal128(30, 15)) < CAST(__scalar_sq_1.__value AS Decimal128(30, 15)) + Inner Join: part.p_partkey = __scalar_sq_1.l_partkey, lineitem.l_partkey = __scalar_sq_1.l_partkey Inner Join: lineitem.l_partkey = part.p_partkey TableScan: lineitem projection=[l_partkey, l_quantity, l_extendedprice] Filter: part.p_brand = Utf8("Brand#23") AND part.p_container = Utf8("MED BOX") TableScan: part projection=[p_partkey, p_brand, p_container] - Projection: lineitem.l_partkey, Decimal128(Some(200000000000000000000),38,21) * CAST(AVG(lineitem.l_quantity) AS Decimal128(38, 21)) AS __value, alias=__sq_1 - Aggregate: groupBy=[[lineitem.l_partkey]], aggr=[[AVG(lineitem.l_quantity)]] - TableScan: lineitem projection=[l_partkey, l_quantity, l_extendedprice] \ No newline at end of file + SubqueryAlias: __scalar_sq_1 + Projection: lineitem.l_partkey, Float64(0.2) * CAST(AVG(lineitem.l_quantity) AS Float64) AS __value + Aggregate: groupBy=[[lineitem.l_partkey]], aggr=[[AVG(lineitem.l_quantity)]] + TableScan: lineitem projection=[l_partkey, l_quantity] \ No newline at end of file diff --git a/benchmarks/expected-plans/q18.txt b/benchmarks/expected-plans/q18.txt index 4017722c505a..639598725ce0 100644 --- a/benchmarks/expected-plans/q18.txt +++ b/benchmarks/expected-plans/q18.txt @@ -1,13 +1,13 @@ Sort: orders.o_totalprice DESC NULLS FIRST, orders.o_orderdate ASC NULLS LAST Projection: customer.c_name, customer.c_custkey, orders.o_orderkey, orders.o_orderdate, orders.o_totalprice, SUM(lineitem.l_quantity) Aggregate: groupBy=[[customer.c_name, customer.c_custkey, orders.o_orderkey, orders.o_orderdate, orders.o_totalprice]], aggr=[[SUM(lineitem.l_quantity)]] - LeftSemi Join: orders.o_orderkey = __sq_1.l_orderkey + LeftSemi Join: orders.o_orderkey = __correlated_sq_1.l_orderkey Inner Join: orders.o_orderkey = lineitem.l_orderkey Inner Join: customer.c_custkey = orders.o_custkey TableScan: customer projection=[c_custkey, c_name] TableScan: orders projection=[o_orderkey, o_custkey, o_totalprice, o_orderdate] TableScan: lineitem projection=[l_orderkey, l_quantity] - SubqueryAlias: __sq_1 + SubqueryAlias: __correlated_sq_1 Projection: lineitem.l_orderkey AS l_orderkey Filter: SUM(lineitem.l_quantity) > Decimal128(Some(30000),25,2) Aggregate: groupBy=[[lineitem.l_orderkey]], aggr=[[SUM(lineitem.l_quantity)]] diff --git a/benchmarks/expected-plans/q2.txt b/benchmarks/expected-plans/q2.txt index 34fb1e09a2f0..571c320e9e1a 100644 --- a/benchmarks/expected-plans/q2.txt +++ b/benchmarks/expected-plans/q2.txt @@ -1,7 +1,7 @@ Sort: supplier.s_acctbal DESC NULLS FIRST, nation.n_name ASC NULLS LAST, supplier.s_name ASC NULLS LAST, part.p_partkey ASC NULLS LAST Projection: supplier.s_acctbal, supplier.s_name, nation.n_name, part.p_partkey, part.p_mfgr, supplier.s_address, supplier.s_phone, supplier.s_comment Projection: part.p_partkey, part.p_mfgr, supplier.s_name, supplier.s_address, supplier.s_phone, supplier.s_acctbal, supplier.s_comment, nation.n_name - Inner Join: part.p_partkey = __sq_1.ps_partkey, partsupp.ps_supplycost = __sq_1.__value + Inner Join: part.p_partkey = __scalar_sq_1.ps_partkey, partsupp.ps_supplycost = __scalar_sq_1.__value Inner Join: nation.n_regionkey = region.r_regionkey Inner Join: supplier.s_nationkey = nation.n_nationkey Inner Join: partsupp.ps_suppkey = supplier.s_suppkey @@ -13,7 +13,7 @@ Sort: supplier.s_acctbal DESC NULLS FIRST, nation.n_name ASC NULLS LAST, supplie TableScan: nation projection=[n_nationkey, n_name, n_regionkey] Filter: region.r_name = Utf8("EUROPE") TableScan: region projection=[r_regionkey, r_name] - SubqueryAlias: __sq_1 + SubqueryAlias: __scalar_sq_1 Projection: partsupp.ps_partkey, MIN(partsupp.ps_supplycost) AS __value Aggregate: groupBy=[[partsupp.ps_partkey]], aggr=[[MIN(partsupp.ps_supplycost)]] Inner Join: nation.n_regionkey = region.r_regionkey diff --git a/benchmarks/expected-plans/q20.txt b/benchmarks/expected-plans/q20.txt index b2676f61f8eb..b7ecb9a09199 100644 --- a/benchmarks/expected-plans/q20.txt +++ b/benchmarks/expected-plans/q20.txt @@ -1,21 +1,21 @@ Sort: supplier.s_name ASC NULLS LAST Projection: supplier.s_name, supplier.s_address - LeftSemi Join: supplier.s_suppkey = __sq_1.ps_suppkey + LeftSemi Join: supplier.s_suppkey = __correlated_sq_1.ps_suppkey Inner Join: supplier.s_nationkey = nation.n_nationkey TableScan: supplier projection=[s_suppkey, s_name, s_address, s_nationkey] Filter: nation.n_name = Utf8("CANADA") TableScan: nation projection=[n_nationkey, n_name] - SubqueryAlias: __sq_1 + SubqueryAlias: __correlated_sq_1 Projection: partsupp.ps_suppkey AS ps_suppkey - Filter: CAST(partsupp.ps_availqty AS Float64) > __sq_3.__value - Inner Join: partsupp.ps_partkey = __sq_3.l_partkey, partsupp.ps_suppkey = __sq_3.l_suppkey - LeftSemi Join: partsupp.ps_partkey = __sq_2.p_partkey + Filter: CAST(partsupp.ps_availqty AS Float64) > __scalar_sq_1.__value + Inner Join: partsupp.ps_partkey = __scalar_sq_1.l_partkey, partsupp.ps_suppkey = __scalar_sq_1.l_suppkey + LeftSemi Join: partsupp.ps_partkey = __correlated_sq_2.p_partkey TableScan: partsupp projection=[ps_partkey, ps_suppkey, ps_availqty] - SubqueryAlias: __sq_2 + SubqueryAlias: __correlated_sq_2 Projection: part.p_partkey AS p_partkey Filter: part.p_name LIKE Utf8("forest%") TableScan: part projection=[p_partkey, p_name] - SubqueryAlias: __sq_3 + SubqueryAlias: __scalar_sq_1 Projection: lineitem.l_partkey, lineitem.l_suppkey, Float64(0.5) * CAST(SUM(lineitem.l_quantity) AS Float64) AS __value Aggregate: groupBy=[[lineitem.l_partkey, lineitem.l_suppkey]], aggr=[[SUM(lineitem.l_quantity)]] Filter: lineitem.l_shipdate >= Date32("8766") AND lineitem.l_shipdate < Date32("9131") diff --git a/benchmarks/expected-plans/q22.txt b/benchmarks/expected-plans/q22.txt index 82060bd59d8f..0fd7a590ac19 100644 --- a/benchmarks/expected-plans/q22.txt +++ b/benchmarks/expected-plans/q22.txt @@ -3,13 +3,13 @@ Sort: custsale.cntrycode ASC NULLS LAST Aggregate: groupBy=[[custsale.cntrycode]], aggr=[[COUNT(UInt8(1)), SUM(custsale.c_acctbal)]] SubqueryAlias: custsale Projection: substr(customer.c_phone, Int64(1), Int64(2)) AS cntrycode, customer.c_acctbal - Filter: CAST(customer.c_acctbal AS Decimal128(19, 6)) > __sq_1.__value + Filter: CAST(customer.c_acctbal AS Decimal128(19, 6)) > __scalar_sq_1.__value CrossJoin: LeftAnti Join: customer.c_custkey = orders.o_custkey Filter: substr(customer.c_phone, Int64(1), Int64(2)) IN ([Utf8("13"), Utf8("31"), Utf8("23"), Utf8("29"), Utf8("30"), Utf8("18"), Utf8("17")]) TableScan: customer projection=[c_custkey, c_phone, c_acctbal] TableScan: orders projection=[o_custkey] - SubqueryAlias: __sq_1 + SubqueryAlias: __scalar_sq_1 Projection: AVG(customer.c_acctbal) AS __value Aggregate: groupBy=[[]], aggr=[[AVG(customer.c_acctbal)]] Filter: customer.c_acctbal > Decimal128(Some(0),15,2) AND substr(customer.c_phone, Int64(1), Int64(2)) IN ([Utf8("13"), Utf8("31"), Utf8("23"), Utf8("29"), Utf8("30"), Utf8("18"), Utf8("17")]) diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs index d74e9b64d185..c64eb78c9df6 100644 --- a/benchmarks/src/bin/tpch.rs +++ b/benchmarks/src/bin/tpch.rs @@ -596,17 +596,7 @@ mod tests { expected_plan(16).await } - /// This query produces different plans depending on operating system. The difference is - /// due to re-writing the following expression: - /// - /// `sum(l_extendedprice) / 7.0 as avg_yearly` - /// - /// Linux: Decimal128(Some(7000000000000000195487369212723200),38,33) - /// Windows: Decimal128(Some(6999999999999999042565864605876224),38,33) - /// - /// See https://github.com/apache/arrow-datafusion/issues/3791 - #[tokio::test] - #[ignore] + #[tokio::test] async fn q17_expected_plan() -> Result<()> { expected_plan(17).await } diff --git a/datafusion/core/tests/sql/subqueries.rs b/datafusion/core/tests/sql/subqueries.rs index 3a4dadd7f2a6..3fff5ba3e80c 100644 --- a/datafusion/core/tests/sql/subqueries.rs +++ b/datafusion/core/tests/sql/subqueries.rs @@ -50,21 +50,21 @@ where c_acctbal < ( let plan = dataframe.into_optimized_plan().unwrap(); let actual = format!("{}", plan.display_indent()); - let expected = "Sort: customer.c_custkey ASC NULLS LAST\ - \n Projection: customer.c_custkey\ - \n Filter: CAST(customer.c_acctbal AS Decimal128(25, 2)) < __sq_1.__value\ - \n Inner Join: customer.c_custkey = __sq_1.o_custkey\ - \n TableScan: customer projection=[c_custkey, c_acctbal]\ - \n SubqueryAlias: __sq_1\ - \n Projection: orders.o_custkey, SUM(orders.o_totalprice) AS __value\ - \n Aggregate: groupBy=[[orders.o_custkey]], aggr=[[SUM(orders.o_totalprice)]]\ - \n Filter: CAST(orders.o_totalprice AS Decimal128(25, 2)) < __sq_2.__value\ - \n Inner Join: orders.o_orderkey = __sq_2.l_orderkey\ - \n TableScan: orders projection=[o_orderkey, o_custkey, o_totalprice]\ - \n SubqueryAlias: __sq_2\ - \n Projection: lineitem.l_orderkey, SUM(lineitem.l_extendedprice) AS price AS __value\ - \n Aggregate: groupBy=[[lineitem.l_orderkey]], aggr=[[SUM(lineitem.l_extendedprice)]]\ - \n TableScan: lineitem projection=[l_orderkey, l_extendedprice]"; + let expected = r#"Sort: customer.c_custkey ASC NULLS LAST + Projection: customer.c_custkey + Filter: CAST(customer.c_acctbal AS Decimal128(25, 2)) < __scalar_sq_1.__value + Inner Join: customer.c_custkey = __scalar_sq_1.o_custkey + TableScan: customer projection=[c_custkey, c_acctbal] + SubqueryAlias: __scalar_sq_1 + Projection: orders.o_custkey, SUM(orders.o_totalprice) AS __value + Aggregate: groupBy=[[orders.o_custkey]], aggr=[[SUM(orders.o_totalprice)]] + Filter: CAST(orders.o_totalprice AS Decimal128(25, 2)) < __scalar_sq_2.__value + Inner Join: orders.o_orderkey = __scalar_sq_2.l_orderkey + TableScan: orders projection=[o_orderkey, o_custkey, o_totalprice] + SubqueryAlias: __scalar_sq_2 + Projection: lineitem.l_orderkey, SUM(lineitem.l_extendedprice) AS price AS __value + Aggregate: groupBy=[[lineitem.l_orderkey]], aggr=[[SUM(lineitem.l_extendedprice)]] + TableScan: lineitem projection=[l_orderkey, l_extendedprice]"#; assert_eq!(actual, expected); Ok(()) @@ -94,12 +94,12 @@ where o_orderstatus in ( let dataframe = ctx.sql(sql).await.unwrap(); let plan = dataframe.into_optimized_plan().unwrap(); let actual = format!("{}", plan.display_indent()); - let expected = "Projection: orders.o_orderkey\ - \n LeftSemi Join: orders.o_orderstatus = __sq_1.l_linestatus, orders.o_orderkey = __sq_1.l_orderkey\ - \n TableScan: orders projection=[o_orderkey, o_orderstatus]\ - \n SubqueryAlias: __sq_1\ - \n Projection: lineitem.l_linestatus AS l_linestatus, lineitem.l_orderkey AS l_orderkey\ - \n TableScan: lineitem projection=[l_orderkey, l_linestatus]"; + let expected = r#"Projection: orders.o_orderkey + LeftSemi Join: orders.o_orderstatus = __correlated_sq_1.l_linestatus, orders.o_orderkey = __correlated_sq_1.l_orderkey + TableScan: orders projection=[o_orderkey, o_orderstatus] + SubqueryAlias: __correlated_sq_1 + Projection: lineitem.l_linestatus AS l_linestatus, lineitem.l_orderkey AS l_orderkey + TableScan: lineitem projection=[l_orderkey, l_linestatus]"#; assert_eq!(actual, expected); // assert data @@ -140,32 +140,32 @@ order by s_acctbal desc, n_name, s_name, p_partkey;"#; let dataframe = ctx.sql(sql).await.unwrap(); let plan = dataframe.into_optimized_plan().unwrap(); let actual = format!("{}", plan.display_indent()); - let expected = "Sort: supplier.s_acctbal DESC NULLS FIRST, nation.n_name ASC NULLS LAST, supplier.s_name ASC NULLS LAST, part.p_partkey ASC NULLS LAST\ - \n Projection: supplier.s_acctbal, supplier.s_name, nation.n_name, part.p_partkey, part.p_mfgr, supplier.s_address, supplier.s_phone, supplier.s_comment\ - \n Projection: part.p_partkey, part.p_mfgr, supplier.s_name, supplier.s_address, supplier.s_phone, supplier.s_acctbal, supplier.s_comment, nation.n_name\ - \n Inner Join: part.p_partkey = __sq_1.ps_partkey, partsupp.ps_supplycost = __sq_1.__value\ - \n Inner Join: nation.n_regionkey = region.r_regionkey\ - \n Inner Join: supplier.s_nationkey = nation.n_nationkey\ - \n Inner Join: partsupp.ps_suppkey = supplier.s_suppkey\ - \n Inner Join: part.p_partkey = partsupp.ps_partkey\ - \n Filter: part.p_size = Int32(15) AND part.p_type LIKE Utf8(\"%BRASS\")\ - \n TableScan: part projection=[p_partkey, p_mfgr, p_type, p_size], partial_filters=[part.p_size = Int32(15), part.p_type LIKE Utf8(\"%BRASS\")]\ - \n TableScan: partsupp projection=[ps_partkey, ps_suppkey, ps_supplycost]\ - \n TableScan: supplier projection=[s_suppkey, s_name, s_address, s_nationkey, s_phone, s_acctbal, s_comment]\ - \n TableScan: nation projection=[n_nationkey, n_name, n_regionkey]\ - \n Filter: region.r_name = Utf8(\"EUROPE\")\ - \n TableScan: region projection=[r_regionkey, r_name], partial_filters=[region.r_name = Utf8(\"EUROPE\")]\ - \n SubqueryAlias: __sq_1\ - \n Projection: partsupp.ps_partkey, MIN(partsupp.ps_supplycost) AS __value\ - \n Aggregate: groupBy=[[partsupp.ps_partkey]], aggr=[[MIN(partsupp.ps_supplycost)]]\ - \n Inner Join: nation.n_regionkey = region.r_regionkey\ - \n Inner Join: supplier.s_nationkey = nation.n_nationkey\ - \n Inner Join: partsupp.ps_suppkey = supplier.s_suppkey\ - \n TableScan: partsupp projection=[ps_partkey, ps_suppkey, ps_supplycost]\ - \n TableScan: supplier projection=[s_suppkey, s_nationkey]\ - \n TableScan: nation projection=[n_nationkey, n_regionkey]\ - \n Filter: region.r_name = Utf8(\"EUROPE\")\ - \n TableScan: region projection=[r_regionkey, r_name], partial_filters=[region.r_name = Utf8(\"EUROPE\")]"; + let expected = r#"Sort: supplier.s_acctbal DESC NULLS FIRST, nation.n_name ASC NULLS LAST, supplier.s_name ASC NULLS LAST, part.p_partkey ASC NULLS LAST + Projection: supplier.s_acctbal, supplier.s_name, nation.n_name, part.p_partkey, part.p_mfgr, supplier.s_address, supplier.s_phone, supplier.s_comment + Projection: part.p_partkey, part.p_mfgr, supplier.s_name, supplier.s_address, supplier.s_phone, supplier.s_acctbal, supplier.s_comment, nation.n_name + Inner Join: part.p_partkey = __scalar_sq_1.ps_partkey, partsupp.ps_supplycost = __scalar_sq_1.__value + Inner Join: nation.n_regionkey = region.r_regionkey + Inner Join: supplier.s_nationkey = nation.n_nationkey + Inner Join: partsupp.ps_suppkey = supplier.s_suppkey + Inner Join: part.p_partkey = partsupp.ps_partkey + Filter: part.p_size = Int32(15) AND part.p_type LIKE Utf8("%BRASS") + TableScan: part projection=[p_partkey, p_mfgr, p_type, p_size], partial_filters=[part.p_size = Int32(15), part.p_type LIKE Utf8("%BRASS")] + TableScan: partsupp projection=[ps_partkey, ps_suppkey, ps_supplycost] + TableScan: supplier projection=[s_suppkey, s_name, s_address, s_nationkey, s_phone, s_acctbal, s_comment] + TableScan: nation projection=[n_nationkey, n_name, n_regionkey] + Filter: region.r_name = Utf8("EUROPE") + TableScan: region projection=[r_regionkey, r_name], partial_filters=[region.r_name = Utf8("EUROPE")] + SubqueryAlias: __scalar_sq_1 + Projection: partsupp.ps_partkey, MIN(partsupp.ps_supplycost) AS __value + Aggregate: groupBy=[[partsupp.ps_partkey]], aggr=[[MIN(partsupp.ps_supplycost)]] + Inner Join: nation.n_regionkey = region.r_regionkey + Inner Join: supplier.s_nationkey = nation.n_nationkey + Inner Join: partsupp.ps_suppkey = supplier.s_suppkey + TableScan: partsupp projection=[ps_partkey, ps_suppkey, ps_supplycost] + TableScan: supplier projection=[s_suppkey, s_nationkey] + TableScan: nation projection=[n_nationkey, n_regionkey] + Filter: region.r_name = Utf8("EUROPE") + TableScan: region projection=[r_regionkey, r_name], partial_filters=[region.r_name = Utf8("EUROPE")]"#; assert_eq!(actual, expected); // assert data @@ -230,7 +230,6 @@ async fn tpch_q4_correlated() -> Result<()> { Ok(()) } -#[ignore] // https://github.com/apache/arrow-datafusion/issues/3437 #[tokio::test] async fn tpch_q17_correlated() -> Result<()> { let parts = r#"63700,goldenrod lavender spring chocolate lace,Manufacturer#1,Brand#23,PROMO BURNISHED COPPER,7,MED BOX,901.00,ly. slyly ironi @@ -255,17 +254,18 @@ async fn tpch_q17_correlated() -> Result<()> { let dataframe = ctx.sql(sql).await.unwrap(); let plan = dataframe.into_optimized_plan().unwrap(); let actual = format!("{}", plan.display_indent()); - let expected = r#"Projection: CAST(SUM(lineitem.l_extendedprice) AS Decimal128(38, 33)) / CAST(Float64(7) AS Decimal128(38, 33)) AS avg_yearly + let expected = r#"Projection: CAST(SUM(lineitem.l_extendedprice) AS Float64) / Float64(7) AS avg_yearly Aggregate: groupBy=[[]], aggr=[[SUM(lineitem.l_extendedprice)]] - Filter: CAST(lineitem.l_quantity AS Decimal128(38, 21)) < __sq_1.__value - Inner Join: part.p_partkey = __sq_1.l_partkey + Filter: CAST(lineitem.l_quantity AS Decimal128(30, 15)) < CAST(__scalar_sq_1.__value AS Decimal128(30, 15)) + Inner Join: part.p_partkey = __scalar_sq_1.l_partkey, lineitem.l_partkey = __scalar_sq_1.l_partkey Inner Join: lineitem.l_partkey = part.p_partkey TableScan: lineitem projection=[l_partkey, l_quantity, l_extendedprice] Filter: part.p_brand = Utf8("Brand#23") AND part.p_container = Utf8("MED BOX") TableScan: part projection=[p_partkey, p_brand, p_container] - Projection: lineitem.l_partkey, CAST(Float64(0.2) AS Decimal128(38, 21)) * CAST(AVG(lineitem.l_quantity) AS Decimal128(38, 21)) AS __value, alias=__sq_1 - Aggregate: groupBy=[[lineitem.l_partkey]], aggr=[[AVG(lineitem.l_quantity)]] - TableScan: lineitem projection=[l_partkey, l_quantity, l_extendedprice]"# + SubqueryAlias: __scalar_sq_1 + Projection: lineitem.l_partkey, Float64(0.2) * CAST(AVG(lineitem.l_quantity) AS Float64) AS __value + Aggregate: groupBy=[[lineitem.l_partkey]], aggr=[[AVG(lineitem.l_quantity)]] + TableScan: lineitem projection=[l_partkey, l_quantity]"# .to_string(); assert_eq!(actual, expected); @@ -275,7 +275,7 @@ async fn tpch_q17_correlated() -> Result<()> { "+--------------------+", "| avg_yearly |", "+--------------------+", - "| 1901.3714285714286 |", + "| 190.13714285714286 |", "+--------------------+", ]; assert_batches_eq!(expected, &results); @@ -309,28 +309,28 @@ order by s_name; let dataframe = ctx.sql(sql).await.unwrap(); let plan = dataframe.into_optimized_plan().unwrap(); let actual = format!("{}", plan.display_indent()); - let expected = "Sort: supplier.s_name ASC NULLS LAST\ - \n Projection: supplier.s_name, supplier.s_address\ - \n LeftSemi Join: supplier.s_suppkey = __sq_1.ps_suppkey\ - \n Inner Join: supplier.s_nationkey = nation.n_nationkey\ - \n TableScan: supplier projection=[s_suppkey, s_name, s_address, s_nationkey]\ - \n Filter: nation.n_name = Utf8(\"CANADA\")\ - \n TableScan: nation projection=[n_nationkey, n_name], partial_filters=[nation.n_name = Utf8(\"CANADA\")]\ - \n SubqueryAlias: __sq_1\ - \n Projection: partsupp.ps_suppkey AS ps_suppkey\ - \n Filter: CAST(partsupp.ps_availqty AS Float64) > __sq_3.__value\ - \n Inner Join: partsupp.ps_partkey = __sq_3.l_partkey, partsupp.ps_suppkey = __sq_3.l_suppkey\ - \n LeftSemi Join: partsupp.ps_partkey = __sq_2.p_partkey\ - \n TableScan: partsupp projection=[ps_partkey, ps_suppkey, ps_availqty]\ - \n SubqueryAlias: __sq_2\ - \n Projection: part.p_partkey AS p_partkey\ - \n Filter: part.p_name LIKE Utf8(\"forest%\")\ - \n TableScan: part projection=[p_partkey, p_name], partial_filters=[part.p_name LIKE Utf8(\"forest%\")]\ - \n SubqueryAlias: __sq_3\ - \n Projection: lineitem.l_partkey, lineitem.l_suppkey, Float64(0.5) * CAST(SUM(lineitem.l_quantity) AS Float64) AS __value\ - \n Aggregate: groupBy=[[lineitem.l_partkey, lineitem.l_suppkey]], aggr=[[SUM(lineitem.l_quantity)]]\ - \n Filter: lineitem.l_shipdate >= Date32(\"8766\")\ - \n TableScan: lineitem projection=[l_partkey, l_suppkey, l_quantity, l_shipdate], partial_filters=[lineitem.l_shipdate >= Date32(\"8766\")]"; + let expected = r#"Sort: supplier.s_name ASC NULLS LAST + Projection: supplier.s_name, supplier.s_address + LeftSemi Join: supplier.s_suppkey = __correlated_sq_1.ps_suppkey + Inner Join: supplier.s_nationkey = nation.n_nationkey + TableScan: supplier projection=[s_suppkey, s_name, s_address, s_nationkey] + Filter: nation.n_name = Utf8("CANADA") + TableScan: nation projection=[n_nationkey, n_name], partial_filters=[nation.n_name = Utf8("CANADA")] + SubqueryAlias: __correlated_sq_1 + Projection: partsupp.ps_suppkey AS ps_suppkey + Filter: CAST(partsupp.ps_availqty AS Float64) > __scalar_sq_1.__value + Inner Join: partsupp.ps_partkey = __scalar_sq_1.l_partkey, partsupp.ps_suppkey = __scalar_sq_1.l_suppkey + LeftSemi Join: partsupp.ps_partkey = __correlated_sq_2.p_partkey + TableScan: partsupp projection=[ps_partkey, ps_suppkey, ps_availqty] + SubqueryAlias: __correlated_sq_2 + Projection: part.p_partkey AS p_partkey + Filter: part.p_name LIKE Utf8("forest%") + TableScan: part projection=[p_partkey, p_name], partial_filters=[part.p_name LIKE Utf8("forest%")] + SubqueryAlias: __scalar_sq_1 + Projection: lineitem.l_partkey, lineitem.l_suppkey, Float64(0.5) * CAST(SUM(lineitem.l_quantity) AS Float64) AS __value + Aggregate: groupBy=[[lineitem.l_partkey, lineitem.l_suppkey]], aggr=[[SUM(lineitem.l_quantity)]] + Filter: lineitem.l_shipdate >= Date32("8766") + TableScan: lineitem projection=[l_partkey, l_suppkey, l_quantity, l_shipdate], partial_filters=[lineitem.l_shipdate >= Date32("8766")]"#; assert_eq!(actual, expected); // assert data @@ -364,22 +364,22 @@ order by cntrycode;"#; let dataframe = ctx.sql(sql).await.unwrap(); let plan = dataframe.into_optimized_plan().unwrap(); let actual = format!("{}", plan.display_indent()); - let expected = "Sort: custsale.cntrycode ASC NULLS LAST\ - \n Projection: custsale.cntrycode, COUNT(UInt8(1)) AS numcust, SUM(custsale.c_acctbal) AS totacctbal\ - \n Aggregate: groupBy=[[custsale.cntrycode]], aggr=[[COUNT(UInt8(1)), SUM(custsale.c_acctbal)]]\ - \n SubqueryAlias: custsale\ - \n Projection: substr(customer.c_phone, Int64(1), Int64(2)) AS cntrycode, customer.c_acctbal\ - \n Filter: CAST(customer.c_acctbal AS Decimal128(19, 6)) > __sq_1.__value\ - \n CrossJoin:\ - \n LeftAnti Join: customer.c_custkey = orders.o_custkey\ - \n Filter: substr(customer.c_phone, Int64(1), Int64(2)) IN ([Utf8(\"13\"), Utf8(\"31\"), Utf8(\"23\"), Utf8(\"29\"), Utf8(\"30\"), Utf8(\"18\"), Utf8(\"17\")])\ - \n TableScan: customer projection=[c_custkey, c_phone, c_acctbal], partial_filters=[substr(customer.c_phone, Int64(1), Int64(2)) IN ([Utf8(\"13\"), Utf8(\"31\"), Utf8(\"23\"), Utf8(\"29\"), Utf8(\"30\"), Utf8(\"18\"), Utf8(\"17\")])]\ - \n TableScan: orders projection=[o_custkey]\ - \n SubqueryAlias: __sq_1\ - \n Projection: AVG(customer.c_acctbal) AS __value\ - \n Aggregate: groupBy=[[]], aggr=[[AVG(customer.c_acctbal)]]\ - \n Filter: customer.c_acctbal > Decimal128(Some(0),15,2) AND substr(customer.c_phone, Int64(1), Int64(2)) IN ([Utf8(\"13\"), Utf8(\"31\"), Utf8(\"23\"), Utf8(\"29\"), Utf8(\"30\"), Utf8(\"18\"), Utf8(\"17\")])\ - \n TableScan: customer projection=[c_phone, c_acctbal], partial_filters=[customer.c_acctbal > Decimal128(Some(0),15,2) AS customer.c_acctbal > Decimal128(Some(0),30,15), substr(customer.c_phone, Int64(1), Int64(2)) IN ([Utf8(\"13\"), Utf8(\"31\"), Utf8(\"23\"), Utf8(\"29\"), Utf8(\"30\"), Utf8(\"18\"), Utf8(\"17\")]), customer.c_acctbal > Decimal128(Some(0),15,2)]"; + let expected = r#"Sort: custsale.cntrycode ASC NULLS LAST + Projection: custsale.cntrycode, COUNT(UInt8(1)) AS numcust, SUM(custsale.c_acctbal) AS totacctbal + Aggregate: groupBy=[[custsale.cntrycode]], aggr=[[COUNT(UInt8(1)), SUM(custsale.c_acctbal)]] + SubqueryAlias: custsale + Projection: substr(customer.c_phone, Int64(1), Int64(2)) AS cntrycode, customer.c_acctbal + Filter: CAST(customer.c_acctbal AS Decimal128(19, 6)) > __scalar_sq_1.__value + CrossJoin: + LeftAnti Join: customer.c_custkey = orders.o_custkey + Filter: substr(customer.c_phone, Int64(1), Int64(2)) IN ([Utf8("13"), Utf8("31"), Utf8("23"), Utf8("29"), Utf8("30"), Utf8("18"), Utf8("17")]) + TableScan: customer projection=[c_custkey, c_phone, c_acctbal], partial_filters=[substr(customer.c_phone, Int64(1), Int64(2)) IN ([Utf8("13"), Utf8("31"), Utf8("23"), Utf8("29"), Utf8("30"), Utf8("18"), Utf8("17")])] + TableScan: orders projection=[o_custkey] + SubqueryAlias: __scalar_sq_1 + Projection: AVG(customer.c_acctbal) AS __value + Aggregate: groupBy=[[]], aggr=[[AVG(customer.c_acctbal)]] + Filter: customer.c_acctbal > Decimal128(Some(0),15,2) AND substr(customer.c_phone, Int64(1), Int64(2)) IN ([Utf8("13"), Utf8("31"), Utf8("23"), Utf8("29"), Utf8("30"), Utf8("18"), Utf8("17")]) + TableScan: customer projection=[c_phone, c_acctbal], partial_filters=[customer.c_acctbal > Decimal128(Some(0),15,2) AS customer.c_acctbal > Decimal128(Some(0),30,15), substr(customer.c_phone, Int64(1), Int64(2)) IN ([Utf8("13"), Utf8("31"), Utf8("23"), Utf8("29"), Utf8("30"), Utf8("18"), Utf8("17")]), customer.c_acctbal > Decimal128(Some(0),15,2)]"#; assert_eq!(expected, actual); // assert data @@ -420,26 +420,26 @@ order by value desc; let dataframe = ctx.sql(sql).await.unwrap(); let plan = dataframe.into_optimized_plan().unwrap(); let actual = format!("{}", plan.display_indent()); - let expected = "Sort: value DESC NULLS FIRST\ - \n Projection: partsupp.ps_partkey, SUM(partsupp.ps_supplycost * partsupp.ps_availqty) AS value\ - \n Filter: CAST(SUM(partsupp.ps_supplycost * partsupp.ps_availqty) AS Decimal128(38, 15)) > CAST(__sq_1.__value AS Decimal128(38, 15))\ - \n CrossJoin:\ - \n Aggregate: groupBy=[[partsupp.ps_partkey]], aggr=[[SUM(CAST(partsupp.ps_supplycost AS Decimal128(26, 2)) * CAST(partsupp.ps_availqty AS Decimal128(26, 2)))]]\ - \n Inner Join: supplier.s_nationkey = nation.n_nationkey\ - \n Inner Join: partsupp.ps_suppkey = supplier.s_suppkey\ - \n TableScan: partsupp projection=[ps_partkey, ps_suppkey, ps_availqty, ps_supplycost]\ - \n TableScan: supplier projection=[s_suppkey, s_nationkey]\ - \n Filter: nation.n_name = Utf8(\"GERMANY\")\ - \n TableScan: nation projection=[n_nationkey, n_name], partial_filters=[nation.n_name = Utf8(\"GERMANY\")]\ - \n SubqueryAlias: __sq_1\ - \n Projection: CAST(SUM(partsupp.ps_supplycost * partsupp.ps_availqty) AS Float64) * Float64(0.0001) AS __value\ - \n Aggregate: groupBy=[[]], aggr=[[SUM(CAST(partsupp.ps_supplycost AS Decimal128(26, 2)) * CAST(partsupp.ps_availqty AS Decimal128(26, 2)))]]\ - \n Inner Join: supplier.s_nationkey = nation.n_nationkey\ - \n Inner Join: partsupp.ps_suppkey = supplier.s_suppkey\ - \n TableScan: partsupp projection=[ps_suppkey, ps_availqty, ps_supplycost]\ - \n TableScan: supplier projection=[s_suppkey, s_nationkey]\ - \n Filter: nation.n_name = Utf8(\"GERMANY\")\ - \n TableScan: nation projection=[n_nationkey, n_name], partial_filters=[nation.n_name = Utf8(\"GERMANY\")]"; + let expected = r#"Sort: value DESC NULLS FIRST + Projection: partsupp.ps_partkey, SUM(partsupp.ps_supplycost * partsupp.ps_availqty) AS value + Filter: CAST(SUM(partsupp.ps_supplycost * partsupp.ps_availqty) AS Decimal128(38, 15)) > CAST(__scalar_sq_1.__value AS Decimal128(38, 15)) + CrossJoin: + Aggregate: groupBy=[[partsupp.ps_partkey]], aggr=[[SUM(CAST(partsupp.ps_supplycost AS Decimal128(26, 2)) * CAST(partsupp.ps_availqty AS Decimal128(26, 2)))]] + Inner Join: supplier.s_nationkey = nation.n_nationkey + Inner Join: partsupp.ps_suppkey = supplier.s_suppkey + TableScan: partsupp projection=[ps_partkey, ps_suppkey, ps_availqty, ps_supplycost] + TableScan: supplier projection=[s_suppkey, s_nationkey] + Filter: nation.n_name = Utf8("GERMANY") + TableScan: nation projection=[n_nationkey, n_name], partial_filters=[nation.n_name = Utf8("GERMANY")] + SubqueryAlias: __scalar_sq_1 + Projection: CAST(SUM(partsupp.ps_supplycost * partsupp.ps_availqty) AS Float64) * Float64(0.0001) AS __value + Aggregate: groupBy=[[]], aggr=[[SUM(CAST(partsupp.ps_supplycost AS Decimal128(26, 2)) * CAST(partsupp.ps_availqty AS Decimal128(26, 2)))]] + Inner Join: supplier.s_nationkey = nation.n_nationkey + Inner Join: partsupp.ps_suppkey = supplier.s_suppkey + TableScan: partsupp projection=[ps_suppkey, ps_availqty, ps_supplycost] + TableScan: supplier projection=[s_suppkey, s_nationkey] + Filter: nation.n_name = Utf8("GERMANY") + TableScan: nation projection=[n_nationkey, n_name], partial_filters=[nation.n_name = Utf8("GERMANY")]"#; assert_eq!(actual, expected); // assert data diff --git a/datafusion/optimizer/src/alias.rs b/datafusion/optimizer/src/alias.rs new file mode 100644 index 000000000000..70fdeb7ab4e6 --- /dev/null +++ b/datafusion/optimizer/src/alias.rs @@ -0,0 +1,44 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::atomic::{AtomicUsize, Ordering}; + +/// A utility struct that can be used to generate unique aliases when optimizing queries +pub struct AliasGenerator { + next_id: AtomicUsize, +} + +impl Default for AliasGenerator { + fn default() -> Self { + Self { + next_id: AtomicUsize::new(1), + } + } +} + +impl AliasGenerator { + /// Create a new [`AliasGenerator`] + pub fn new() -> Self { + Self::default() + } + + /// Return a unique alias with the provided prefix + pub fn next(&self, prefix: &str) -> String { + let id = self.next_id.fetch_add(1, Ordering::Relaxed); + format!("{}_{}", prefix, id) + } +} diff --git a/datafusion/optimizer/src/decorrelate_where_in.rs b/datafusion/optimizer/src/decorrelate_where_in.rs index c2a80ac2bb11..1aa976ce8ca7 100644 --- a/datafusion/optimizer/src/decorrelate_where_in.rs +++ b/datafusion/optimizer/src/decorrelate_where_in.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use crate::alias::AliasGenerator; use crate::optimizer::ApplyOrder; use crate::utils::{ alias_cols, conjunction, exprs_to_join_cols, find_join_exprs, merge_cols, @@ -28,12 +29,14 @@ use log::debug; use std::sync::Arc; #[derive(Default)] -pub struct DecorrelateWhereIn {} +pub struct DecorrelateWhereIn { + alias: AliasGenerator, +} impl DecorrelateWhereIn { #[allow(missing_docs)] pub fn new() -> Self { - Self {} + Self::default() } /// Finds expressions that have a where in subquery (and recurses when found) @@ -96,8 +99,12 @@ impl OptimizerRule for DecorrelateWhereIn { // iterate through all exists clauses in predicate, turning each into a join let mut cur_input = filter.input.as_ref().clone(); for subquery in subqueries { - cur_input = - optimize_where_in(&subquery, &cur_input, &other_exprs, config)?; + cur_input = optimize_where_in( + &subquery, + &cur_input, + &other_exprs, + &self.alias, + )?; } Ok(Some(cur_input)) } @@ -118,8 +125,8 @@ fn optimize_where_in( query_info: &SubqueryInfo, outer_input: &LogicalPlan, outer_other_exprs: &[Expr], - config: &dyn OptimizerConfig, -) -> datafusion_common::Result { + alias: &AliasGenerator, +) -> Result { let proj = Projection::try_from_plan(&query_info.query.subquery) .map_err(|e| context!("a projection is required", e))?; let mut subqry_input = proj.input.clone(); @@ -161,7 +168,7 @@ fn optimize_where_in( merge_cols((&[subquery_col], &subqry_cols), (&[outer_col], &outer_cols)); // build subquery side of join - the thing the subquery was querying - let subqry_alias = format!("__sq_{}", config.next_id()); + let subqry_alias = alias.next("__correlated_sq"); let mut subqry_plan = LogicalPlanBuilder::from((*subqry_input).clone()); if let Some(expr) = conjunction(other_subqry_exprs) { // if the subquery had additional expressions, restore them @@ -256,13 +263,13 @@ mod tests { .build()?; let expected = "Projection: test.b [b:UInt32]\ - \n LeftSemi Join: test.b = __sq_2.c [a:UInt32, b:UInt32, c:UInt32]\ - \n LeftSemi Join: test.c = __sq_1.c [a:UInt32, b:UInt32, c:UInt32]\ + \n LeftSemi Join: test.b = __correlated_sq_2.c [a:UInt32, b:UInt32, c:UInt32]\ + \n LeftSemi Join: test.c = __correlated_sq_1.c [a:UInt32, b:UInt32, c:UInt32]\ \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]\ - \n SubqueryAlias: __sq_1 [c:UInt32]\ + \n SubqueryAlias: __correlated_sq_1 [c:UInt32]\ \n Projection: sq_1.c AS c [c:UInt32]\ \n TableScan: sq_1 [a:UInt32, b:UInt32, c:UInt32]\ - \n SubqueryAlias: __sq_2 [c:UInt32]\ + \n SubqueryAlias: __correlated_sq_2 [c:UInt32]\ \n Projection: sq_2.c AS c [c:UInt32]\ \n TableScan: sq_2 [a:UInt32, b:UInt32, c:UInt32]"; @@ -286,9 +293,9 @@ mod tests { let expected = "Projection: test.b [b:UInt32]\ \n Filter: test.a = UInt32(1) AND test.b < UInt32(30) [a:UInt32, b:UInt32, c:UInt32]\ - \n LeftSemi Join: test.c = __sq_1.c [a:UInt32, b:UInt32, c:UInt32]\ + \n LeftSemi Join: test.c = __correlated_sq_1.c [a:UInt32, b:UInt32, c:UInt32]\ \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]\ - \n SubqueryAlias: __sq_1 [c:UInt32]\ + \n SubqueryAlias: __correlated_sq_1 [c:UInt32]\ \n Projection: sq.c AS c [c:UInt32]\ \n TableScan: sq [a:UInt32, b:UInt32, c:UInt32]"; @@ -340,9 +347,9 @@ mod tests { \n Subquery: [c:UInt32]\ \n Projection: sq1.c [c:UInt32]\ \n TableScan: sq1 [a:UInt32, b:UInt32, c:UInt32]\ - \n LeftSemi Join: test.c = __sq_1.c [a:UInt32, b:UInt32, c:UInt32]\ + \n LeftSemi Join: test.c = __correlated_sq_1.c [a:UInt32, b:UInt32, c:UInt32]\ \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]\ - \n SubqueryAlias: __sq_1 [c:UInt32]\ + \n SubqueryAlias: __correlated_sq_1 [c:UInt32]\ \n Projection: sq2.c AS c [c:UInt32]\ \n TableScan: sq2 [a:UInt32, b:UInt32, c:UInt32]"; @@ -365,13 +372,13 @@ mod tests { .build()?; let expected = "Projection: test.b [b:UInt32]\ - \n LeftSemi Join: test.b = __sq_1.a [a:UInt32, b:UInt32, c:UInt32]\ + \n LeftSemi Join: test.b = __correlated_sq_1.a [a:UInt32, b:UInt32, c:UInt32]\ \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]\ - \n SubqueryAlias: __sq_1 [a:UInt32]\ + \n SubqueryAlias: __correlated_sq_1 [a:UInt32]\ \n Projection: sq.a AS a [a:UInt32]\ - \n LeftSemi Join: sq.a = __sq_2.c [a:UInt32, b:UInt32, c:UInt32]\ + \n LeftSemi Join: sq.a = __correlated_sq_2.c [a:UInt32, b:UInt32, c:UInt32]\ \n TableScan: sq [a:UInt32, b:UInt32, c:UInt32]\ - \n SubqueryAlias: __sq_2 [c:UInt32]\ + \n SubqueryAlias: __correlated_sq_2 [c:UInt32]\ \n Projection: sq_nested.c AS c [c:UInt32]\ \n TableScan: sq_nested [a:UInt32, b:UInt32, c:UInt32]"; @@ -401,9 +408,9 @@ mod tests { \n TableScan: sq_outer [a:UInt32, b:UInt32, c:UInt32]\ \n SubqueryAlias: wrapped [b:UInt32, c:UInt32]\ \n Projection: test.b, test.c [b:UInt32, c:UInt32]\ - \n LeftSemi Join: test.c = __sq_1.c [a:UInt32, b:UInt32, c:UInt32]\ + \n LeftSemi Join: test.c = __correlated_sq_1.c [a:UInt32, b:UInt32, c:UInt32]\ \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]\ - \n SubqueryAlias: __sq_1 [c:UInt32]\ + \n SubqueryAlias: __correlated_sq_1 [c:UInt32]\ \n Projection: sq_inner.c AS c [c:UInt32]\ \n TableScan: sq_inner [a:UInt32, b:UInt32, c:UInt32]"; @@ -436,13 +443,13 @@ mod tests { debug!("plan to optimize:\n{}", plan.display_indent()); let expected = "Projection: customer.c_custkey [c_custkey:Int64]\ - \n LeftSemi Join: customer.c_custkey = __sq_2.o_custkey [c_custkey:Int64, c_name:Utf8]\ - \n LeftSemi Join: customer.c_custkey = __sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]\ + \n LeftSemi Join: customer.c_custkey = __correlated_sq_2.o_custkey [c_custkey:Int64, c_name:Utf8]\ + \n LeftSemi Join: customer.c_custkey = __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]\ \n TableScan: customer [c_custkey:Int64, c_name:Utf8]\ - \n SubqueryAlias: __sq_1 [o_custkey:Int64]\ + \n SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]\ \n Projection: orders.o_custkey AS o_custkey [o_custkey:Int64]\ \n TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]\ - \n SubqueryAlias: __sq_2 [o_custkey:Int64]\n Projection: orders.o_custkey AS o_custkey [o_custkey:Int64]\ + \n SubqueryAlias: __correlated_sq_2 [o_custkey:Int64]\n Projection: orders.o_custkey AS o_custkey [o_custkey:Int64]\ \n TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]"; assert_optimized_plan_eq_display_indent( Arc::new(DecorrelateWhereIn::new()), @@ -479,13 +486,13 @@ mod tests { .build()?; let expected = "Projection: customer.c_custkey [c_custkey:Int64]\ - \n LeftSemi Join: customer.c_custkey = __sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]\ + \n LeftSemi Join: customer.c_custkey = __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]\ \n TableScan: customer [c_custkey:Int64, c_name:Utf8]\ - \n SubqueryAlias: __sq_1 [o_custkey:Int64]\ + \n SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]\ \n Projection: orders.o_custkey AS o_custkey [o_custkey:Int64]\ - \n LeftSemi Join: orders.o_orderkey = __sq_2.l_orderkey [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]\ + \n LeftSemi Join: orders.o_orderkey = __correlated_sq_2.l_orderkey [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]\ \n TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]\ - \n SubqueryAlias: __sq_2 [l_orderkey:Int64]\ + \n SubqueryAlias: __correlated_sq_2 [l_orderkey:Int64]\ \n Projection: lineitem.l_orderkey AS l_orderkey [l_orderkey:Int64]\ \n TableScan: lineitem [l_orderkey:Int64, l_partkey:Int64, l_suppkey:Int64, l_linenumber:Int32, l_quantity:Float64, l_extendedprice:Float64]"; @@ -517,9 +524,9 @@ mod tests { .build()?; let expected = "Projection: customer.c_custkey [c_custkey:Int64]\ - \n LeftSemi Join: customer.c_custkey = __sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]\ + \n LeftSemi Join: customer.c_custkey = __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]\ \n TableScan: customer [c_custkey:Int64, c_name:Utf8]\ - \n SubqueryAlias: __sq_1 [o_custkey:Int64]\ + \n SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]\ \n Projection: orders.o_custkey AS o_custkey [o_custkey:Int64]\ \n Filter: orders.o_orderkey = Int32(1) [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]\ \n TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]"; @@ -549,9 +556,9 @@ mod tests { // Query will fail, but we can still transform the plan let expected = "Projection: customer.c_custkey [c_custkey:Int64]\ - \n LeftSemi Join: customer.c_custkey = __sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]\ + \n LeftSemi Join: customer.c_custkey = __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]\ \n TableScan: customer [c_custkey:Int64, c_name:Utf8]\ - \n SubqueryAlias: __sq_1 [o_custkey:Int64]\ + \n SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]\ \n Projection: orders.o_custkey AS o_custkey [o_custkey:Int64]\ \n Filter: customer.c_custkey = customer.c_custkey [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]\ \n TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]"; @@ -580,9 +587,9 @@ mod tests { .build()?; let expected = "Projection: customer.c_custkey [c_custkey:Int64]\ - \n LeftSemi Join: customer.c_custkey = __sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]\ + \n LeftSemi Join: customer.c_custkey = __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]\ \n TableScan: customer [c_custkey:Int64, c_name:Utf8]\ - \n SubqueryAlias: __sq_1 [o_custkey:Int64]\ + \n SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]\ \n Projection: orders.o_custkey AS o_custkey [o_custkey:Int64]\ \n Filter: orders.o_custkey = orders.o_custkey [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]\ \n TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]"; @@ -611,9 +618,9 @@ mod tests { .build()?; let expected = "Projection: customer.c_custkey [c_custkey:Int64]\ - \n LeftSemi Join: customer.c_custkey = __sq_1.o_custkey Filter: customer.c_custkey != orders.o_custkey [c_custkey:Int64, c_name:Utf8]\ + \n LeftSemi Join: customer.c_custkey = __correlated_sq_1.o_custkey Filter: customer.c_custkey != orders.o_custkey [c_custkey:Int64, c_name:Utf8]\ \n TableScan: customer [c_custkey:Int64, c_name:Utf8]\ - \n SubqueryAlias: __sq_1 [o_custkey:Int64]\ + \n SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]\ \n Projection: orders.o_custkey AS o_custkey [o_custkey:Int64]\ \n TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]"; @@ -793,9 +800,9 @@ mod tests { let expected = "Projection: customer.c_custkey [c_custkey:Int64]\ \n Filter: customer.c_custkey = Int32(1) [c_custkey:Int64, c_name:Utf8]\ - \n LeftSemi Join: customer.c_custkey = __sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]\ + \n LeftSemi Join: customer.c_custkey = __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]\ \n TableScan: customer [c_custkey:Int64, c_name:Utf8]\ - \n SubqueryAlias: __sq_1 [o_custkey:Int64]\ + \n SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]\ \n Projection: orders.o_custkey AS o_custkey [o_custkey:Int64]\ \n TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]"; @@ -858,9 +865,9 @@ mod tests { .build()?; let expected = "Projection: test.b [b:UInt32]\ - \n LeftSemi Join: test.c = __sq_1.c, test.a = __sq_1.a [a:UInt32, b:UInt32, c:UInt32]\ + \n LeftSemi Join: test.c = __correlated_sq_1.c, test.a = __correlated_sq_1.a [a:UInt32, b:UInt32, c:UInt32]\ \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]\ - \n SubqueryAlias: __sq_1 [c:UInt32, a:UInt32]\ + \n SubqueryAlias: __correlated_sq_1 [c:UInt32, a:UInt32]\ \n Projection: sq.c AS c, sq.a AS a [c:UInt32, a:UInt32]\ \n TableScan: sq [a:UInt32, b:UInt32, c:UInt32]"; @@ -882,9 +889,9 @@ mod tests { .build()?; let expected = "Projection: test.b [b:UInt32]\ - \n LeftSemi Join: test.c = __sq_1.c [a:UInt32, b:UInt32, c:UInt32]\ + \n LeftSemi Join: test.c = __correlated_sq_1.c [a:UInt32, b:UInt32, c:UInt32]\ \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]\ - \n SubqueryAlias: __sq_1 [c:UInt32]\ + \n SubqueryAlias: __correlated_sq_1 [c:UInt32]\ \n Projection: sq.c AS c [c:UInt32]\ \n TableScan: sq [a:UInt32, b:UInt32, c:UInt32]"; @@ -906,9 +913,9 @@ mod tests { .build()?; let expected = "Projection: test.b [b:UInt32]\ - \n LeftAnti Join: test.c = __sq_1.c [a:UInt32, b:UInt32, c:UInt32]\ + \n LeftAnti Join: test.c = __correlated_sq_1.c [a:UInt32, b:UInt32, c:UInt32]\ \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]\ - \n SubqueryAlias: __sq_1 [c:UInt32]\ + \n SubqueryAlias: __correlated_sq_1 [c:UInt32]\ \n Projection: sq.c AS c [c:UInt32]\ \n TableScan: sq [a:UInt32, b:UInt32, c:UInt32]"; diff --git a/datafusion/optimizer/src/lib.rs b/datafusion/optimizer/src/lib.rs index 27e6dff088f8..cd743fcda73b 100644 --- a/datafusion/optimizer/src/lib.rs +++ b/datafusion/optimizer/src/lib.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +pub mod alias; pub mod common_subexpr_eliminate; pub mod decorrelate_where_exists; pub mod decorrelate_where_in; diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs index b27c49d2ffcc..333f76fc5d87 100644 --- a/datafusion/optimizer/src/optimizer.rs +++ b/datafusion/optimizer/src/optimizer.rs @@ -41,7 +41,6 @@ use chrono::{DateTime, Utc}; use datafusion_common::{DataFusionError, Result}; use datafusion_expr::logical_plan::LogicalPlan; use log::{debug, trace, warn}; -use std::sync::atomic::AtomicUsize; use std::sync::Arc; use std::time::Instant; @@ -83,11 +82,6 @@ pub trait OptimizerConfig { /// How many times to attempt to optimize the plan fn max_passes(&self) -> u8; - - /// Return a unique ID - /// - /// This is useful for assigning unique names to aliases - fn next_id(&self) -> usize; } /// A standalone [`OptimizerConfig`] that can be used independently @@ -97,8 +91,6 @@ pub struct OptimizerContext { /// Query execution start time that can be used to rewrite /// expressions such as `now()` to use a literal value instead query_execution_start_time: DateTime, - /// id generator for optimizer passes - next_id: AtomicUsize, /// Option to skip rules that produce errors skip_failing_rules: bool, /// Specify whether to enable the filter_null_keys rule @@ -112,7 +104,6 @@ impl OptimizerContext { pub fn new() -> Self { Self { query_execution_start_time: Utc::now(), - next_id: AtomicUsize::new(1), skip_failing_rules: true, filter_null_keys: true, max_passes: 3, @@ -172,12 +163,6 @@ impl OptimizerConfig for OptimizerContext { fn max_passes(&self) -> u8 { self.max_passes } - - fn next_id(&self) -> usize { - use std::sync::atomic::Ordering; - // Can use relaxed ordering as not used for synchronisation - self.next_id.fetch_add(1, Ordering::Relaxed) - } } /// A rule-based optimizer. diff --git a/datafusion/optimizer/src/scalar_subquery_to_join.rs b/datafusion/optimizer/src/scalar_subquery_to_join.rs index c0ea975f34c0..51c4142afa62 100644 --- a/datafusion/optimizer/src/scalar_subquery_to_join.rs +++ b/datafusion/optimizer/src/scalar_subquery_to_join.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use crate::alias::AliasGenerator; use crate::optimizer::ApplyOrder; use crate::utils::{ conjunction, exprs_to_join_cols, find_join_exprs, only_or_err, split_conjunction, @@ -30,12 +31,14 @@ use std::sync::Arc; /// Optimizer rule for rewriting subquery filters to joins #[derive(Default)] -pub struct ScalarSubqueryToJoin {} +pub struct ScalarSubqueryToJoin { + alias: Arc, +} impl ScalarSubqueryToJoin { #[allow(missing_docs)] pub fn new() -> Self { - Self {} + Self::default() } /// Finds expressions that have a scalar subquery in them (and recurses when found) @@ -110,7 +113,7 @@ impl OptimizerRule for ScalarSubqueryToJoin { let mut cur_input = filter.input.as_ref().clone(); for subquery in subqueries { if let Some(optimized_subquery) = - optimize_scalar(&subquery, &cur_input, &other_exprs, config)? + optimize_scalar(&subquery, &cur_input, &other_exprs, &self.alias)? { cur_input = optimized_subquery; } else { @@ -173,7 +176,7 @@ fn optimize_scalar( query_info: &SubqueryInfo, filter_input: &LogicalPlan, outer_others: &[Expr], - config: &dyn OptimizerConfig, + alias: &AliasGenerator, ) -> Result> { let subquery = query_info.query.subquery.as_ref(); debug!( @@ -242,7 +245,7 @@ fn optimize_scalar( } // Only operate if one column is present and the other closed upon from outside scope - let subqry_alias = format!("__sq_{}", config.next_id()); + let subqry_alias = alias.next("__scalar_sq"); let group_by: Vec<_> = subqry_cols .iter() .map(|it| Expr::Column(it.clone())) @@ -386,16 +389,16 @@ mod tests { .build()?; let expected = "Projection: customer.c_custkey [c_custkey:Int64]\ - \n Filter: Int32(1) < __sq_2.__value [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Int64;N, o_custkey:Int64, __value:Int64;N]\ - \n Inner Join: customer.c_custkey = __sq_2.o_custkey [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Int64;N, o_custkey:Int64, __value:Int64;N]\ - \n Filter: Int32(1) < __sq_1.__value [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Int64;N]\ - \n Inner Join: customer.c_custkey = __sq_1.o_custkey [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Int64;N]\ + \n Filter: Int32(1) < __scalar_sq_2.__value [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Int64;N, o_custkey:Int64, __value:Int64;N]\ + \n Inner Join: customer.c_custkey = __scalar_sq_2.o_custkey [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Int64;N, o_custkey:Int64, __value:Int64;N]\ + \n Filter: Int32(1) < __scalar_sq_1.__value [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Int64;N]\ + \n Inner Join: customer.c_custkey = __scalar_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Int64;N]\ \n TableScan: customer [c_custkey:Int64, c_name:Utf8]\ - \n SubqueryAlias: __sq_1 [o_custkey:Int64, __value:Int64;N]\ + \n SubqueryAlias: __scalar_sq_1 [o_custkey:Int64, __value:Int64;N]\ \n Projection: orders.o_custkey, MAX(orders.o_custkey) AS __value [o_custkey:Int64, __value:Int64;N]\ \n Aggregate: groupBy=[[orders.o_custkey]], aggr=[[MAX(orders.o_custkey)]] [o_custkey:Int64, MAX(orders.o_custkey):Int64;N]\ \n TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]\ - \n SubqueryAlias: __sq_2 [o_custkey:Int64, __value:Int64;N]\ + \n SubqueryAlias: __scalar_sq_2 [o_custkey:Int64, __value:Int64;N]\ \n Projection: orders.o_custkey, MAX(orders.o_custkey) AS __value [o_custkey:Int64, __value:Int64;N]\ \n Aggregate: groupBy=[[orders.o_custkey]], aggr=[[MAX(orders.o_custkey)]] [o_custkey:Int64, MAX(orders.o_custkey):Int64;N]\ \n TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]"; @@ -439,16 +442,16 @@ mod tests { .build()?; let expected = "Projection: customer.c_custkey [c_custkey:Int64]\ - \n Filter: customer.c_acctbal < __sq_1.__value [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Float64;N]\ - \n Inner Join: customer.c_custkey = __sq_1.o_custkey [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Float64;N]\ + \n Filter: customer.c_acctbal < __scalar_sq_1.__value [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Float64;N]\ + \n Inner Join: customer.c_custkey = __scalar_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Float64;N]\ \n TableScan: customer [c_custkey:Int64, c_name:Utf8]\ - \n SubqueryAlias: __sq_1 [o_custkey:Int64, __value:Float64;N]\ + \n SubqueryAlias: __scalar_sq_1 [o_custkey:Int64, __value:Float64;N]\ \n Projection: orders.o_custkey, SUM(orders.o_totalprice) AS __value [o_custkey:Int64, __value:Float64;N]\ \n Aggregate: groupBy=[[orders.o_custkey]], aggr=[[SUM(orders.o_totalprice)]] [o_custkey:Int64, SUM(orders.o_totalprice):Float64;N]\ - \n Filter: orders.o_totalprice < __sq_2.__value [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N, l_orderkey:Int64, __value:Float64;N]\ - \n Inner Join: orders.o_orderkey = __sq_2.l_orderkey [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N, l_orderkey:Int64, __value:Float64;N]\ + \n Filter: orders.o_totalprice < __scalar_sq_2.__value [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N, l_orderkey:Int64, __value:Float64;N]\ + \n Inner Join: orders.o_orderkey = __scalar_sq_2.l_orderkey [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N, l_orderkey:Int64, __value:Float64;N]\ \n TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]\ - \n SubqueryAlias: __sq_2 [l_orderkey:Int64, __value:Float64;N]\ + \n SubqueryAlias: __scalar_sq_2 [l_orderkey:Int64, __value:Float64;N]\ \n Projection: lineitem.l_orderkey, SUM(lineitem.l_extendedprice) AS __value [l_orderkey:Int64, __value:Float64;N]\ \n Aggregate: groupBy=[[lineitem.l_orderkey]], aggr=[[SUM(lineitem.l_extendedprice)]] [l_orderkey:Int64, SUM(lineitem.l_extendedprice):Float64;N]\ \n TableScan: lineitem [l_orderkey:Int64, l_partkey:Int64, l_suppkey:Int64, l_linenumber:Int32, l_quantity:Float64, l_extendedprice:Float64]"; @@ -481,9 +484,9 @@ mod tests { .build()?; let expected = "Projection: customer.c_custkey [c_custkey:Int64]\ - \n Inner Join: customer.c_custkey = __sq_1.o_custkey, customer.c_custkey = __sq_1.__value [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Int64;N]\ + \n Inner Join: customer.c_custkey = __scalar_sq_1.o_custkey, customer.c_custkey = __scalar_sq_1.__value [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Int64;N]\ \n TableScan: customer [c_custkey:Int64, c_name:Utf8]\ - \n SubqueryAlias: __sq_1 [o_custkey:Int64, __value:Int64;N]\ + \n SubqueryAlias: __scalar_sq_1 [o_custkey:Int64, __value:Int64;N]\ \n Projection: orders.o_custkey, MAX(orders.o_custkey) AS __value [o_custkey:Int64, __value:Int64;N]\ \n Aggregate: groupBy=[[orders.o_custkey]], aggr=[[MAX(orders.o_custkey)]] [o_custkey:Int64, MAX(orders.o_custkey):Int64;N]\ \n Filter: orders.o_orderkey = Int32(1) [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]\ @@ -515,10 +518,10 @@ mod tests { // it will optimize, but fail for the same reason the unoptimized query would let expected = "Projection: customer.c_custkey [c_custkey:Int64]\ - \n Filter: customer.c_custkey = __sq_1.__value [c_custkey:Int64, c_name:Utf8, __value:Int64;N]\ + \n Filter: customer.c_custkey = __scalar_sq_1.__value [c_custkey:Int64, c_name:Utf8, __value:Int64;N]\ \n CrossJoin: [c_custkey:Int64, c_name:Utf8, __value:Int64;N]\ \n TableScan: customer [c_custkey:Int64, c_name:Utf8]\ - \n SubqueryAlias: __sq_1 [__value:Int64;N]\ + \n SubqueryAlias: __scalar_sq_1 [__value:Int64;N]\ \n Projection: MAX(orders.o_custkey) AS __value [__value:Int64;N]\ \n Aggregate: groupBy=[[]], aggr=[[MAX(orders.o_custkey)]] [MAX(orders.o_custkey):Int64;N]\ \n Filter: customer.c_custkey = customer.c_custkey [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]\ @@ -548,10 +551,10 @@ mod tests { .build()?; let expected = "Projection: customer.c_custkey [c_custkey:Int64]\ - \n Filter: customer.c_custkey = __sq_1.__value [c_custkey:Int64, c_name:Utf8, __value:Int64;N]\ + \n Filter: customer.c_custkey = __scalar_sq_1.__value [c_custkey:Int64, c_name:Utf8, __value:Int64;N]\ \n CrossJoin: [c_custkey:Int64, c_name:Utf8, __value:Int64;N]\ \n TableScan: customer [c_custkey:Int64, c_name:Utf8]\ - \n SubqueryAlias: __sq_1 [__value:Int64;N]\ + \n SubqueryAlias: __scalar_sq_1 [__value:Int64;N]\ \n Projection: MAX(orders.o_custkey) AS __value [__value:Int64;N]\ \n Aggregate: groupBy=[[]], aggr=[[MAX(orders.o_custkey)]] [MAX(orders.o_custkey):Int64;N]\ \n Filter: orders.o_custkey = orders.o_custkey [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]\ @@ -739,10 +742,10 @@ mod tests { let expected = "Projection: customer.c_custkey [c_custkey:Int64]\ \n Filter: customer.c_custkey = Int32(1) [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Int64;N]\ - \n Filter: customer.c_custkey >= __sq_1.__value [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Int64;N]\ - \n Inner Join: customer.c_custkey = __sq_1.o_custkey [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Int64;N]\ + \n Filter: customer.c_custkey >= __scalar_sq_1.__value [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Int64;N]\ + \n Inner Join: customer.c_custkey = __scalar_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Int64;N]\ \n TableScan: customer [c_custkey:Int64, c_name:Utf8]\ - \n SubqueryAlias: __sq_1 [o_custkey:Int64, __value:Int64;N]\ + \n SubqueryAlias: __scalar_sq_1 [o_custkey:Int64, __value:Int64;N]\ \n Projection: orders.o_custkey, MAX(orders.o_custkey) AS __value [o_custkey:Int64, __value:Int64;N]\ \n Aggregate: groupBy=[[orders.o_custkey]], aggr=[[MAX(orders.o_custkey)]] [o_custkey:Int64, MAX(orders.o_custkey):Int64;N]\ \n TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]"; @@ -776,9 +779,9 @@ mod tests { let expected = "Projection: customer.c_custkey [c_custkey:Int64]\ \n Filter: customer.c_custkey = Int32(1) [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Int64;N]\ - \n Inner Join: customer.c_custkey = __sq_1.o_custkey, customer.c_custkey = __sq_1.__value [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Int64;N]\ + \n Inner Join: customer.c_custkey = __scalar_sq_1.o_custkey, customer.c_custkey = __scalar_sq_1.__value [c_custkey:Int64, c_name:Utf8, o_custkey:Int64, __value:Int64;N]\ \n TableScan: customer [c_custkey:Int64, c_name:Utf8]\ - \n SubqueryAlias: __sq_1 [o_custkey:Int64, __value:Int64;N]\ + \n SubqueryAlias: __scalar_sq_1 [o_custkey:Int64, __value:Int64;N]\ \n Projection: orders.o_custkey, MAX(orders.o_custkey) AS __value [o_custkey:Int64, __value:Int64;N]\ \n Aggregate: groupBy=[[orders.o_custkey]], aggr=[[MAX(orders.o_custkey)]] [o_custkey:Int64, MAX(orders.o_custkey):Int64;N]\ \n TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]"; @@ -845,10 +848,10 @@ mod tests { .build()?; let expected = "Projection: test.c [c:UInt32]\ - \n Filter: test.c < __sq_1.__value [a:UInt32, b:UInt32, c:UInt32, a:UInt32, __value:UInt32;N]\ - \n Inner Join: test.a = __sq_1.a [a:UInt32, b:UInt32, c:UInt32, a:UInt32, __value:UInt32;N]\ + \n Filter: test.c < __scalar_sq_1.__value [a:UInt32, b:UInt32, c:UInt32, a:UInt32, __value:UInt32;N]\ + \n Inner Join: test.a = __scalar_sq_1.a [a:UInt32, b:UInt32, c:UInt32, a:UInt32, __value:UInt32;N]\ \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]\ - \n SubqueryAlias: __sq_1 [a:UInt32, __value:UInt32;N]\ + \n SubqueryAlias: __scalar_sq_1 [a:UInt32, __value:UInt32;N]\ \n Projection: sq.a, MIN(sq.c) AS __value [a:UInt32, __value:UInt32;N]\ \n Aggregate: groupBy=[[sq.a]], aggr=[[MIN(sq.c)]] [a:UInt32, MIN(sq.c):UInt32;N]\ \n TableScan: sq [a:UInt32, b:UInt32, c:UInt32]"; @@ -877,10 +880,10 @@ mod tests { .build()?; let expected = "Projection: customer.c_custkey [c_custkey:Int64]\ - \n Filter: customer.c_custkey < __sq_1.__value [c_custkey:Int64, c_name:Utf8, __value:Int64;N]\ + \n Filter: customer.c_custkey < __scalar_sq_1.__value [c_custkey:Int64, c_name:Utf8, __value:Int64;N]\ \n CrossJoin: [c_custkey:Int64, c_name:Utf8, __value:Int64;N]\ \n TableScan: customer [c_custkey:Int64, c_name:Utf8]\ - \n SubqueryAlias: __sq_1 [__value:Int64;N]\ + \n SubqueryAlias: __scalar_sq_1 [__value:Int64;N]\ \n Projection: MAX(orders.o_custkey) AS __value [__value:Int64;N]\ \n Aggregate: groupBy=[[]], aggr=[[MAX(orders.o_custkey)]] [MAX(orders.o_custkey):Int64;N]\ \n TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]"; @@ -908,10 +911,10 @@ mod tests { .build()?; let expected = "Projection: customer.c_custkey [c_custkey:Int64]\ - \n Filter: customer.c_custkey = __sq_1.__value [c_custkey:Int64, c_name:Utf8, __value:Int64;N]\ + \n Filter: customer.c_custkey = __scalar_sq_1.__value [c_custkey:Int64, c_name:Utf8, __value:Int64;N]\ \n CrossJoin: [c_custkey:Int64, c_name:Utf8, __value:Int64;N]\ \n TableScan: customer [c_custkey:Int64, c_name:Utf8]\ - \n SubqueryAlias: __sq_1 [__value:Int64;N]\ + \n SubqueryAlias: __scalar_sq_1 [__value:Int64;N]\ \n Projection: MAX(orders.o_custkey) AS __value [__value:Int64;N]\ \n Aggregate: groupBy=[[]], aggr=[[MAX(orders.o_custkey)]] [MAX(orders.o_custkey):Int64;N]\ \n TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]"; diff --git a/datafusion/optimizer/tests/integration-test.rs b/datafusion/optimizer/tests/integration-test.rs index 62a8f1ef2cd9..283dd1e72a94 100644 --- a/datafusion/optimizer/tests/integration-test.rs +++ b/datafusion/optimizer/tests/integration-test.rs @@ -64,10 +64,10 @@ fn subquery_filter_with_cast() -> Result<()> { )"; let plan = test_sql(sql)?; let expected = "Projection: test.col_int32\ - \n Filter: CAST(test.col_int32 AS Float64) > __sq_1.__value\ + \n Filter: CAST(test.col_int32 AS Float64) > __scalar_sq_1.__value\ \n CrossJoin:\ \n TableScan: test projection=[col_int32]\ - \n SubqueryAlias: __sq_1\ + \n SubqueryAlias: __scalar_sq_1\ \n Projection: AVG(test.col_int32) AS __value\ \n Aggregate: groupBy=[[]], aggr=[[AVG(test.col_int32)]]\ \n Filter: test.col_utf8 >= Utf8(\"2002-05-08\") AND test.col_utf8 <= Utf8(\"2002-05-13\")\