From c1a951eb8d9a5668f57531d16326c85e7e294e5c Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sun, 18 Apr 2021 18:09:56 +0200 Subject: [PATCH 01/13] WIP cartesian join --- rust/datafusion/src/logical_plan/builder.rs | 14 +++++++++ rust/datafusion/src/logical_plan/plan.rs | 27 +++++++++++++++-- .../src/optimizer/constant_folding.rs | 3 +- .../src/optimizer/hash_build_probe_order.rs | 30 +++++++++++++++++++ .../src/optimizer/projection_push_down.rs | 1 + rust/datafusion/src/optimizer/utils.rs | 5 ++++ .../src/physical_plan/hash_utils.rs | 10 +++---- rust/datafusion/src/physical_plan/mod.rs | 1 + rust/datafusion/src/physical_plan/planner.rs | 9 ++++-- rust/datafusion/src/sql/planner.rs | 6 ++-- rust/datafusion/tests/sql.rs | 15 +++++----- 11 files changed, 100 insertions(+), 21 deletions(-) diff --git a/rust/datafusion/src/logical_plan/builder.rs b/rust/datafusion/src/logical_plan/builder.rs index fed82fd23b8..4f25c73e441 100644 --- a/rust/datafusion/src/logical_plan/builder.rs +++ b/rust/datafusion/src/logical_plan/builder.rs @@ -270,6 +270,20 @@ impl LogicalPlanBuilder { })) } } + /// Apply a cartesian join + pub fn cartesian_join(&self, right: &LogicalPlan) -> Result { + let left_fields = self.plan.schema().fields().iter(); + let right_fields = right.schema().fields(); + let fields = left_fields.chain(right_fields).cloned().collect(); + + let schema = DFSchema::new(fields)?; + + Ok(Self::from(&LogicalPlan::CartesianJoin { + left: Arc::new(self.plan.clone()), + right: Arc::new(right.clone()), + schema: DFSchemaRef::new(schema), + })) + } /// Repartition pub fn repartition(&self, partitioning_scheme: Partitioning) -> Result { diff --git a/rust/datafusion/src/logical_plan/plan.rs b/rust/datafusion/src/logical_plan/plan.rs index d1b9b827a5a..9ad3791a6ad 100644 --- a/rust/datafusion/src/logical_plan/plan.rs +++ b/rust/datafusion/src/logical_plan/plan.rs @@ -113,6 +113,15 @@ pub enum LogicalPlan { /// The output schema, containing fields from the left and right inputs schema: DFSchemaRef, }, + /// Join two logical plans on one or more join columns + CartesianJoin { + /// Left input + left: Arc, + /// Right input + right: Arc, + /// The output schema, containing fields from the left and right inputs + schema: DFSchemaRef, + }, /// Repartition the plan based on a partitioning scheme. Repartition { /// The incoming logical plan @@ -203,6 +212,7 @@ impl LogicalPlan { LogicalPlan::Aggregate { schema, .. } => &schema, LogicalPlan::Sort { input, .. } => input.schema(), LogicalPlan::Join { schema, .. } => &schema, + LogicalPlan::CartesianJoin { schema, .. } => &schema, LogicalPlan::Repartition { input, .. } => input.schema(), LogicalPlan::Limit { input, .. } => input.schema(), LogicalPlan::CreateExternalTable { schema, .. } => &schema, @@ -229,6 +239,11 @@ impl LogicalPlan { right, schema, .. + } + | LogicalPlan::CartesianJoin { + left, + right, + schema, } => { let mut schemas = left.all_schemas(); schemas.extend(right.all_schemas()); @@ -290,8 +305,9 @@ impl LogicalPlan { | LogicalPlan::EmptyRelation { .. } | LogicalPlan::Limit { .. } | LogicalPlan::CreateExternalTable { .. } - | LogicalPlan::Explain { .. } => vec![], - LogicalPlan::Union { .. } => { + | LogicalPlan::CartesianJoin { .. } + | LogicalPlan::Explain { .. } + | LogicalPlan::Union { .. } => { vec![] } } @@ -307,6 +323,7 @@ impl LogicalPlan { LogicalPlan::Aggregate { input, .. } => vec![input], LogicalPlan::Sort { input, .. } => vec![input], LogicalPlan::Join { left, right, .. } => vec![left, right], + LogicalPlan::CartesianJoin { left, right, .. } => vec![left, right], LogicalPlan::Limit { input, .. } => vec![input], LogicalPlan::Extension { node } => node.inputs(), LogicalPlan::Union { inputs, .. } => inputs.iter().collect(), @@ -396,7 +413,8 @@ impl LogicalPlan { LogicalPlan::Repartition { input, .. } => input.accept(visitor)?, LogicalPlan::Aggregate { input, .. } => input.accept(visitor)?, LogicalPlan::Sort { input, .. } => input.accept(visitor)?, - LogicalPlan::Join { left, right, .. } => { + LogicalPlan::Join { left, right, .. } + | LogicalPlan::CartesianJoin { left, right, .. } => { left.accept(visitor)? && right.accept(visitor)? } LogicalPlan::Union { inputs, .. } => { @@ -669,6 +687,9 @@ impl LogicalPlan { keys.iter().map(|(l, r)| format!("{} = {}", l, r)).collect(); write!(f, "Join: {}", join_expr.join(", ")) } + LogicalPlan::CartesianJoin { .. } => { + write!(f, "CartesianJoin:") + } LogicalPlan::Repartition { partitioning_scheme, .. diff --git a/rust/datafusion/src/optimizer/constant_folding.rs b/rust/datafusion/src/optimizer/constant_folding.rs index 2fa03eb5c70..5c391512a04 100644 --- a/rust/datafusion/src/optimizer/constant_folding.rs +++ b/rust/datafusion/src/optimizer/constant_folding.rs @@ -72,7 +72,8 @@ impl OptimizerRule for ConstantFolding { | LogicalPlan::Explain { .. } | LogicalPlan::Limit { .. } | LogicalPlan::Union { .. } - | LogicalPlan::Join { .. } => { + | LogicalPlan::Join { .. } + | LogicalPlan::CartesianJoin { .. } => { // apply the optimization to all inputs of the plan let inputs = plan.inputs(); let new_inputs = inputs diff --git a/rust/datafusion/src/optimizer/hash_build_probe_order.rs b/rust/datafusion/src/optimizer/hash_build_probe_order.rs index f44050f0b72..8f212e110f3 100644 --- a/rust/datafusion/src/optimizer/hash_build_probe_order.rs +++ b/rust/datafusion/src/optimizer/hash_build_probe_order.rs @@ -67,6 +67,13 @@ fn get_num_rows(logical_plan: &LogicalPlan) -> Option { // we cannot predict the cardinality of the join output None } + LogicalPlan::CartesianJoin { left, right, .. } => { + get_num_rows(left).and_then(|x| + get_num_rows(right).and_then(|y| + Some(x * y) + ) + ) + } LogicalPlan::Repartition { .. } => { // we cannot predict how rows will be repartitioned None @@ -138,6 +145,29 @@ impl OptimizerRule for HashBuildProbeOrder { }) } } + LogicalPlan::CartesianJoin { + left, + right, + schema, + } => { + let left = self.optimize(left)?; + let right = self.optimize(right)?; + if should_swap_join_order(&left, &right) { + // Swap left and right, change join type and (equi-)join key order + Ok(LogicalPlan::CartesianJoin { + left: Arc::new(right), + right: Arc::new(left), + schema: schema.clone(), + }) + } else { + // Keep join as is + Ok(LogicalPlan::CartesianJoin { + left: Arc::new(left), + right: Arc::new(right), + schema: schema.clone(), + }) + } + } // Rest: recurse into plan, apply optimization where possible LogicalPlan::Projection { .. } | LogicalPlan::Aggregate { .. } diff --git a/rust/datafusion/src/optimizer/projection_push_down.rs b/rust/datafusion/src/optimizer/projection_push_down.rs index 6b1cdfe18ca..118c142dd9a 100644 --- a/rust/datafusion/src/optimizer/projection_push_down.rs +++ b/rust/datafusion/src/optimizer/projection_push_down.rs @@ -270,6 +270,7 @@ fn optimize_plan( | LogicalPlan::Sort { .. } | LogicalPlan::CreateExternalTable { .. } | LogicalPlan::Union { .. } + | LogicalPlan::CartesianJoin { .. } | LogicalPlan::Extension { .. } => { let expr = plan.expressions(); // collect all required columns by this plan diff --git a/rust/datafusion/src/optimizer/utils.rs b/rust/datafusion/src/optimizer/utils.rs index fe1d0238191..eb91f25921c 100644 --- a/rust/datafusion/src/optimizer/utils.rs +++ b/rust/datafusion/src/optimizer/utils.rs @@ -208,6 +208,11 @@ pub fn from_plan( on: on.clone(), schema: schema.clone(), }), + LogicalPlan::CartesianJoin { schema, .. } => Ok(LogicalPlan::CartesianJoin { + left: Arc::new(inputs[0].clone()), + right: Arc::new(inputs[1].clone()), + schema: schema.clone(), + }), LogicalPlan::Limit { n, .. } => Ok(LogicalPlan::Limit { n: *n, input: Arc::new(inputs[0].clone()), diff --git a/rust/datafusion/src/physical_plan/hash_utils.rs b/rust/datafusion/src/physical_plan/hash_utils.rs index b26ff9bb5fc..d04843f3f6c 100644 --- a/rust/datafusion/src/physical_plan/hash_utils.rs +++ b/rust/datafusion/src/physical_plan/hash_utils.rs @@ -52,11 +52,11 @@ fn check_join_set_is_valid( right: &HashSet, on: &JoinOn, ) -> Result<()> { - if on.is_empty() { - return Err(DataFusionError::Plan( - "The 'on' clause of a join cannot be empty".to_string(), - )); - } + // if on.is_empty() { + // return Err(DataFusionError::Plan( + // "The 'on' clause of a join cannot be empty".to_string(), + // )); + // } let on_left = &on.iter().map(|on| on.0.to_string()).collect::>(); let left_missing = on_left.difference(left).collect::>(); diff --git a/rust/datafusion/src/physical_plan/mod.rs b/rust/datafusion/src/physical_plan/mod.rs index 5036dcb921b..e96927b7c2c 100644 --- a/rust/datafusion/src/physical_plan/mod.rs +++ b/rust/datafusion/src/physical_plan/mod.rs @@ -348,6 +348,7 @@ pub mod functions; pub mod group_scalar; pub mod hash_aggregate; pub mod hash_join; +pub mod cartesian_join; pub mod hash_utils; pub mod limit; pub mod math_expressions; diff --git a/rust/datafusion/src/physical_plan/planner.rs b/rust/datafusion/src/physical_plan/planner.rs index f9279ae48f0..9d0f020c0dc 100644 --- a/rust/datafusion/src/physical_plan/planner.rs +++ b/rust/datafusion/src/physical_plan/planner.rs @@ -20,8 +20,8 @@ use std::sync::Arc; use super::{ - aggregates, empty::EmptyExec, expressions::binary, functions, - hash_join::PartitionMode, udaf, union::UnionExec, + aggregates, cartesian_join::CartesianJoinExec, empty::EmptyExec, expressions::binary, + functions, hash_join::PartitionMode, udaf, union::UnionExec, }; use crate::error::{DataFusionError, Result}; use crate::execution::context::ExecutionContextState; @@ -328,6 +328,11 @@ impl DefaultPhysicalPlanner { )?)) } } + LogicalPlan::CartesianJoin { left, right, .. } => { + let left = self.create_initial_plan(left, ctx_state)?; + let right = self.create_initial_plan(right, ctx_state)?; + Ok(Arc::new(CartesianJoinExec::try_new(left, right)?)) + } LogicalPlan::EmptyRelation { produce_one_row, schema, diff --git a/rust/datafusion/src/sql/planner.rs b/rust/datafusion/src/sql/planner.rs index f3cba232a23..d16b8b865a0 100644 --- a/rust/datafusion/src/sql/planner.rs +++ b/rust/datafusion/src/sql/planner.rs @@ -489,9 +489,9 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { } } if join_keys.is_empty() { - return Err(DataFusionError::NotImplemented( - "Cartesian joins are not supported".to_string(), - )); + let builder = LogicalPlanBuilder::from(&left); + + return Ok(builder.cartesian_join(right)?.build()?); } else { let left_keys: Vec<_> = join_keys.iter().map(|(l, _)| *l).collect(); diff --git a/rust/datafusion/tests/sql.rs b/rust/datafusion/tests/sql.rs index f4d4e65f3a4..41bda622405 100644 --- a/rust/datafusion/tests/sql.rs +++ b/rust/datafusion/tests/sql.rs @@ -1290,13 +1290,14 @@ async fn equijoin_implicit_syntax_reversed() -> Result<()> { #[tokio::test] async fn cartesian_join() -> Result<()> { - let ctx = create_join_context("t1_id", "t2_id")?; - let sql = "SELECT t1_id, t1_name, t2_name FROM t1, t2 ORDER BY t1_id"; - let maybe_plan = ctx.create_logical_plan(&sql); - assert_eq!( - "This feature is not implemented: Cartesian joins are not supported", - &format!("{}", maybe_plan.err().unwrap()) - ); + let mut ctx = create_join_context("t1_id", "t2_id")?; + // TODO: without filter + let sql = "SELECT t1_id, t1_name, t2_name FROM t1, t2 WHERE 1=1 ORDER BY t1_id"; + //let maybe_plan = ctx.create_logical_plan(&sql); + let actual = execute(&mut ctx, sql).await; + + println!("{:?}", actual); + Ok(()) } From a68e6b2cc33a8379e1eeccfc7fd86f282d6ac235 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sun, 18 Apr 2021 18:46:37 +0200 Subject: [PATCH 02/13] Update test --- .../src/physical_plan/cartesian_join.rs | 286 ++++++++++++++++++ rust/datafusion/tests/sql.rs | 3 +- 2 files changed, 288 insertions(+), 1 deletion(-) create mode 100644 rust/datafusion/src/physical_plan/cartesian_join.rs diff --git a/rust/datafusion/src/physical_plan/cartesian_join.rs b/rust/datafusion/src/physical_plan/cartesian_join.rs new file mode 100644 index 00000000000..0789e56e66e --- /dev/null +++ b/rust/datafusion/src/physical_plan/cartesian_join.rs @@ -0,0 +1,286 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Defines the join plan for executing partitions in parallel and then joining the results +//! into a set of partitions. + +use futures::StreamExt; +use std::{any::Any, sync::Arc}; + +use arrow::datatypes::{Schema, SchemaRef}; +use arrow::error::Result as ArrowResult; +use arrow::record_batch::RecordBatch; +use futures::{Stream, TryStreamExt}; + +use futures::lock::Mutex; + +use super::{hash_utils::check_join_is_valid, merge::MergeExec}; +use crate::{ + error::{DataFusionError, Result}, + scalar::ScalarValue, +}; +use async_trait::async_trait; +use std::time::Instant; + +use super::{ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream}; +use crate::physical_plan::coalesce_batches::concat_batches; +use log::debug; + +/// Data of the left side +type JoinLeftData = Vec; + +/// executes partitions in parallel and combines them into a set of +/// partitions by combining all values from the left with all values on the right +#[derive(Debug)] +pub struct CartesianJoinExec { + /// left (build) side which gets loaded in memory + left: Arc, + /// right (probe) side which are combined with left side + right: Arc, + /// The schema once the join is applied + schema: SchemaRef, + /// Build-side data + build_side: Arc>>, +} + +impl CartesianJoinExec { + /// Tries to create a new [CartesianJoinExec]. + /// # Error + /// This function errors when it is not possible to join the left and right sides on keys `on`. + pub fn try_new( + left: Arc, + right: Arc, + ) -> Result { + let left_schema = left.schema(); + let right_schema = right.schema(); + check_join_is_valid(&left_schema, &right_schema, &[])?; + + let left_schema = left.schema(); + let left_fields = left_schema.fields().iter(); + let right_schema = left.schema(); + + let right_fields = right_schema.fields().iter(); + + // left then right + let all_columns = left_fields.chain(right_fields).cloned().collect(); + + let schema = Arc::new(Schema::new(all_columns)); + + Ok(CartesianJoinExec { + left, + right, + schema, + build_side: Arc::new(Mutex::new(None)), + }) + } + + /// left (build) side which gets hashed + pub fn left(&self) -> &Arc { + &self.left + } + + /// right (probe) side which are filtered by the hash table + pub fn right(&self) -> &Arc { + &self.right + } +} + +#[async_trait] +impl ExecutionPlan for CartesianJoinExec { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.schema.clone() + } + + fn children(&self) -> Vec> { + vec![self.left.clone(), self.right.clone()] + } + + fn with_new_children( + &self, + children: Vec>, + ) -> Result> { + match children.len() { + 2 => Ok(Arc::new(CartesianJoinExec::try_new( + children[0].clone(), + children[1].clone(), + )?)), + _ => Err(DataFusionError::Internal( + "HashJoinExec wrong number of children".to_string(), + )), + } + } + + fn output_partitioning(&self) -> Partitioning { + self.right.output_partitioning() + } + + async fn execute(&self, partition: usize) -> Result { + // we only want to compute the build side once for PartitionMode::CollectLeft + let left_data = { + let mut build_side = self.build_side.lock().await; + + match build_side.as_ref() { + Some(stream) => stream.clone(), + None => { + let start = Instant::now(); + + // merge all left parts into a single stream + let merge = MergeExec::new(self.left.clone()); + let stream = merge.execute(0).await?; + + // This operation performs 2 steps at once: + // 1. creates a [JoinHashMap] of all batches from the stream + // 2. stores the batches in a vector. + let (batches, num_rows) = stream + .try_fold((Vec::new(), 0 as usize), |mut acc, batch| async { + acc.1 += batch.num_rows(); + acc.0.push(batch); + Ok(acc) + }) + .await?; + + *build_side = Some(batches.clone()); + + debug!( + "Built build-side of cartesian join containing {} rows in {} ms", + num_rows, + start.elapsed().as_millis() + ); + + batches + } + } + }; + + // we have the batches and the hash map with their keys. We can how create a stream + // over the right that uses this information to issue new batches. + + let stream = self.right.execute(partition).await?; + + Ok(Box::pin(CartesianJoinStream { + schema: self.schema.clone(), + left_data, + right: stream, + num_input_batches: 0, + num_input_rows: 0, + num_output_batches: 0, + num_output_rows: 0, + join_time: 0, + })) + } +} + +/// A stream that issues [RecordBatch]es as they arrive from the right of the join. +struct CartesianJoinStream { + /// Input schema + schema: Arc, + /// data from the left side + left_data: JoinLeftData, + /// right + right: SendableRecordBatchStream, + /// number of input batches + num_input_batches: usize, + /// number of input rows + num_input_rows: usize, + /// number of batches produced + num_output_batches: usize, + /// number of rows produced + num_output_rows: usize, + /// total time for joining probe-side batches to the build-side batches + join_time: usize, +} + +impl RecordBatchStream for CartesianJoinStream { + fn schema(&self) -> SchemaRef { + self.schema.clone() + } +} +fn build_batch( + batch: &RecordBatch, + left_data: &JoinLeftData, + schema: &Schema, +) -> Result { + let mut batches = Vec::new(); + let mut num_rows = 0; + for left in left_data.iter() { + for x in 0..batch.num_rows() { + // for each value on the left, repeat the value of the right + let arrays = batch + .columns() + .iter() + .map(|arr| { + let scalar = ScalarValue::try_from_array(arr, x)?; + Ok(scalar.to_array_of_size(left.num_rows())) + }) + .collect::>>()?; + + let batch = RecordBatch::try_new( + Arc::new(schema.clone()), + arrays + .iter() + .chain(left.columns().iter()) + .cloned() + .collect(), + )?; + batches.push(batch); + num_rows += left.num_rows(); + } + } + Ok(concat_batches(&Arc::new(schema.clone()), &batches, num_rows).unwrap()) +} + +impl Stream for CartesianJoinStream { + type Item = ArrowResult; + + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + self.right + .poll_next_unpin(cx) + .map(|maybe_batch| match maybe_batch { + Some(Ok(batch)) => { + let start = Instant::now(); + let result = build_batch(&batch, &self.left_data, &self.schema); + self.num_input_batches += 1; + self.num_input_rows += batch.num_rows(); + if let Ok(ref batch) = result { + self.join_time += start.elapsed().as_millis() as usize; + self.num_output_batches += 1; + self.num_output_rows += batch.num_rows(); + } + Some(result.map_err(|x| x.into_arrow_external_error())) + } + other => { + debug!( + "Processed {} probe-side input batches containing {} rows and \ + produced {} output batches containing {} rows in {} ms", + self.num_input_batches, + self.num_input_rows, + self.num_output_batches, + self.num_output_rows, + self.join_time + ); + other + } + }) + } +} diff --git a/rust/datafusion/tests/sql.rs b/rust/datafusion/tests/sql.rs index 41bda622405..0cabaa566b8 100644 --- a/rust/datafusion/tests/sql.rs +++ b/rust/datafusion/tests/sql.rs @@ -1296,7 +1296,8 @@ async fn cartesian_join() -> Result<()> { //let maybe_plan = ctx.create_logical_plan(&sql); let actual = execute(&mut ctx, sql).await; - println!("{:?}", actual); + assert_eq!(4 * 4, actual.len()); + Ok(()) } From e422974bf85f67b2892d67fbc76f2d9bd1517c5d Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sun, 18 Apr 2021 20:09:25 +0200 Subject: [PATCH 03/13] WIP --- .../src/physical_plan/cartesian_join.rs | 11 +++++++---- rust/datafusion/src/sql/planner.rs | 16 ++++++++++------ rust/datafusion/tests/sql.rs | 9 ++++++++- 3 files changed, 25 insertions(+), 11 deletions(-) diff --git a/rust/datafusion/src/physical_plan/cartesian_join.rs b/rust/datafusion/src/physical_plan/cartesian_join.rs index 0789e56e66e..37a0ae91f30 100644 --- a/rust/datafusion/src/physical_plan/cartesian_join.rs +++ b/rust/datafusion/src/physical_plan/cartesian_join.rs @@ -217,7 +217,7 @@ fn build_batch( batch: &RecordBatch, left_data: &JoinLeftData, schema: &Schema, -) -> Result { +) -> ArrowResult { let mut batches = Vec::new(); let mut num_rows = 0; for left in left_data.iter() { @@ -230,7 +230,8 @@ fn build_batch( let scalar = ScalarValue::try_from_array(arr, x)?; Ok(scalar.to_array_of_size(left.num_rows())) }) - .collect::>>()?; + .collect::>>() + .map_err(|x| x.into_arrow_external_error())?; let batch = RecordBatch::try_new( Arc::new(schema.clone()), @@ -240,11 +241,13 @@ fn build_batch( .cloned() .collect(), )?; + println!("{:?}", batch); + batches.push(batch); num_rows += left.num_rows(); } } - Ok(concat_batches(&Arc::new(schema.clone()), &batches, num_rows).unwrap()) + concat_batches(&Arc::new(schema.clone()), &batches, num_rows) } impl Stream for CartesianJoinStream { @@ -267,7 +270,7 @@ impl Stream for CartesianJoinStream { self.num_output_batches += 1; self.num_output_rows += batch.num_rows(); } - Some(result.map_err(|x| x.into_arrow_external_error())) + Some(result) } other => { debug!( diff --git a/rust/datafusion/src/sql/planner.rs b/rust/datafusion/src/sql/planner.rs index d16b8b865a0..795822d6902 100644 --- a/rust/datafusion/src/sql/planner.rs +++ b/rust/datafusion/src/sql/planner.rs @@ -489,9 +489,9 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { } } if join_keys.is_empty() { - let builder = LogicalPlanBuilder::from(&left); - - return Ok(builder.cartesian_join(right)?.build()?); + left = LogicalPlanBuilder::from(&left) + .cartesian_join(right)? + .build()?; } else { let left_keys: Vec<_> = join_keys.iter().map(|(l, _)| *l).collect(); @@ -517,9 +517,13 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { if plans.len() == 1 { Ok(plans[0].clone()) } else { - Err(DataFusionError::NotImplemented( - "Cartesian joins are not supported".to_string(), - )) + let mut left = plans[0].clone(); + for right in plans.iter().skip(1) { + left = LogicalPlanBuilder::from(&left) + .cartesian_join(right)? + .build()?; + } + Ok(left) } } }; diff --git a/rust/datafusion/tests/sql.rs b/rust/datafusion/tests/sql.rs index 0cabaa566b8..c8df90eae5b 100644 --- a/rust/datafusion/tests/sql.rs +++ b/rust/datafusion/tests/sql.rs @@ -1291,7 +1291,14 @@ async fn equijoin_implicit_syntax_reversed() -> Result<()> { #[tokio::test] async fn cartesian_join() -> Result<()> { let mut ctx = create_join_context("t1_id", "t2_id")?; - // TODO: without filter + + let sql = "SELECT t1_id, t1_name, t2_name FROM t1, t2 ORDER BY t1_id"; + //let maybe_plan = ctx.create_logical_plan(&sql); + let actual = execute(&mut ctx, sql).await; + + assert_eq!(4 * 4, actual.len()); + + let sql = "SELECT t1_id, t1_name, t2_name FROM t1, t2 WHERE 1=1 ORDER BY t1_id"; //let maybe_plan = ctx.create_logical_plan(&sql); let actual = execute(&mut ctx, sql).await; From 4287a7fa6b957f1729ef306bf2eabe7533fe3100 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sun, 18 Apr 2021 20:19:59 +0200 Subject: [PATCH 04/13] Fix, cleanup --- rust/datafusion/src/physical_plan/cartesian_join.rs | 10 ++++------ rust/datafusion/tests/sql.rs | 2 -- 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/rust/datafusion/src/physical_plan/cartesian_join.rs b/rust/datafusion/src/physical_plan/cartesian_join.rs index 37a0ae91f30..c3d6332df02 100644 --- a/rust/datafusion/src/physical_plan/cartesian_join.rs +++ b/rust/datafusion/src/physical_plan/cartesian_join.rs @@ -71,7 +71,7 @@ impl CartesianJoinExec { let left_schema = left.schema(); let left_fields = left_schema.fields().iter(); - let right_schema = left.schema(); + let right_schema = right.schema(); let right_fields = right_schema.fields().iter(); @@ -235,14 +235,12 @@ fn build_batch( let batch = RecordBatch::try_new( Arc::new(schema.clone()), - arrays - .iter() - .chain(left.columns().iter()) + left.columns().iter() + .chain(arrays.iter()) .cloned() .collect(), )?; - println!("{:?}", batch); - + batches.push(batch); num_rows += left.num_rows(); } diff --git a/rust/datafusion/tests/sql.rs b/rust/datafusion/tests/sql.rs index c8df90eae5b..f86cd2d1226 100644 --- a/rust/datafusion/tests/sql.rs +++ b/rust/datafusion/tests/sql.rs @@ -1293,14 +1293,12 @@ async fn cartesian_join() -> Result<()> { let mut ctx = create_join_context("t1_id", "t2_id")?; let sql = "SELECT t1_id, t1_name, t2_name FROM t1, t2 ORDER BY t1_id"; - //let maybe_plan = ctx.create_logical_plan(&sql); let actual = execute(&mut ctx, sql).await; assert_eq!(4 * 4, actual.len()); let sql = "SELECT t1_id, t1_name, t2_name FROM t1, t2 WHERE 1=1 ORDER BY t1_id"; - //let maybe_plan = ctx.create_logical_plan(&sql); let actual = execute(&mut ctx, sql).await; assert_eq!(4 * 4, actual.len()); From 0418ae402c79c0c42ad6cb6495e4c9dfe820a444 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sun, 18 Apr 2021 20:32:07 +0200 Subject: [PATCH 05/13] Cleanup, test results --- rust/datafusion/README.md | 4 ++- .../src/physical_plan/cartesian_join.rs | 11 +++----- .../src/physical_plan/hash_utils.rs | 5 ---- rust/datafusion/src/sql/planner.rs | 10 +++++++ rust/datafusion/tests/sql.rs | 27 ++++++++++++++++++- 5 files changed, 42 insertions(+), 15 deletions(-) diff --git a/rust/datafusion/README.md b/rust/datafusion/README.md index e5849b84ca7..70233b25ab9 100644 --- a/rust/datafusion/README.md +++ b/rust/datafusion/README.md @@ -213,7 +213,9 @@ DataFusion also includes a simple command-line interactive SQL utility. See the - [ ] MINUS - [x] Joins - [x] INNER JOIN - - [ ] CROSS JOIN + - [x] LEFT JOIN + - [x] RIGHT JOIN + - [x] CROSS JOIN - [ ] OUTER JOIN - [ ] Window diff --git a/rust/datafusion/src/physical_plan/cartesian_join.rs b/rust/datafusion/src/physical_plan/cartesian_join.rs index c3d6332df02..b51a46d6588 100644 --- a/rust/datafusion/src/physical_plan/cartesian_join.rs +++ b/rust/datafusion/src/physical_plan/cartesian_join.rs @@ -133,7 +133,7 @@ impl ExecutionPlan for CartesianJoinExec { } async fn execute(&self, partition: usize) -> Result { - // we only want to compute the build side once for PartitionMode::CollectLeft + // we only want to compute the build side once let left_data = { let mut build_side = self.build_side.lock().await; @@ -146,9 +146,7 @@ impl ExecutionPlan for CartesianJoinExec { let merge = MergeExec::new(self.left.clone()); let stream = merge.execute(0).await?; - // This operation performs 2 steps at once: - // 1. creates a [JoinHashMap] of all batches from the stream - // 2. stores the batches in a vector. + // Load all batches and count the rows let (batches, num_rows) = stream .try_fold((Vec::new(), 0 as usize), |mut acc, batch| async { acc.1 += batch.num_rows(); @@ -170,9 +168,6 @@ impl ExecutionPlan for CartesianJoinExec { } }; - // we have the batches and the hash map with their keys. We can how create a stream - // over the right that uses this information to issue new batches. - let stream = self.right.execute(partition).await?; Ok(Box::pin(CartesianJoinStream { @@ -240,7 +235,7 @@ fn build_batch( .cloned() .collect(), )?; - + batches.push(batch); num_rows += left.num_rows(); } diff --git a/rust/datafusion/src/physical_plan/hash_utils.rs b/rust/datafusion/src/physical_plan/hash_utils.rs index d04843f3f6c..a38cc092123 100644 --- a/rust/datafusion/src/physical_plan/hash_utils.rs +++ b/rust/datafusion/src/physical_plan/hash_utils.rs @@ -52,11 +52,6 @@ fn check_join_set_is_valid( right: &HashSet, on: &JoinOn, ) -> Result<()> { - // if on.is_empty() { - // return Err(DataFusionError::Plan( - // "The 'on' clause of a join cannot be empty".to_string(), - // )); - // } let on_left = &on.iter().map(|on| on.0.to_string()).collect::>(); let left_missing = on_left.difference(left).collect::>(); diff --git a/rust/datafusion/src/sql/planner.rs b/rust/datafusion/src/sql/planner.rs index 795822d6902..4ea1c7e1696 100644 --- a/rust/datafusion/src/sql/planner.rs +++ b/rust/datafusion/src/sql/planner.rs @@ -355,12 +355,22 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { JoinOperator::Inner(constraint) => { self.parse_join(left, &right, constraint, JoinType::Inner) } + JoinOperator::CrossJoin => self.parse_cross_join(left, &right), other => Err(DataFusionError::NotImplemented(format!( "Unsupported JOIN operator {:?}", other ))), } } + fn parse_cross_join( + &self, + left: &LogicalPlan, + right: &LogicalPlan, + ) -> Result { + LogicalPlanBuilder::from(&left) + .cartesian_join(&right)? + .build() + } fn parse_join( &self, diff --git a/rust/datafusion/tests/sql.rs b/rust/datafusion/tests/sql.rs index f86cd2d1226..7961e17f83c 100644 --- a/rust/datafusion/tests/sql.rs +++ b/rust/datafusion/tests/sql.rs @@ -1297,12 +1297,37 @@ async fn cartesian_join() -> Result<()> { assert_eq!(4 * 4, actual.len()); - let sql = "SELECT t1_id, t1_name, t2_name FROM t1, t2 WHERE 1=1 ORDER BY t1_id"; let actual = execute(&mut ctx, sql).await; assert_eq!(4 * 4, actual.len()); + let sql = "SELECT t1_id, t1_name, t2_name FROM t1 CROSS JOIN t2"; + let actual = execute(&mut ctx, sql).await; + + assert_eq!(4 * 4, actual.len()); + + assert_eq!( + actual, + [ + ["11", "a", "z"], + ["22", "b", "z"], + ["33", "c", "z"], + ["44", "d", "z"], + ["11", "a", "y"], + ["22", "b", "y"], + ["33", "c", "y"], + ["44", "d", "y"], + ["11", "a", "x"], + ["22", "b", "x"], + ["33", "c", "x"], + ["44", "d", "x"], + ["11", "a", "w"], + ["22", "b", "w"], + ["33", "c", "w"], + ["44", "d", "w"] + ] + ); Ok(()) } From ff9759fb4980081f7020c3b9fe05f93177f7e6be Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sun, 18 Apr 2021 20:36:24 +0200 Subject: [PATCH 06/13] Cleanups --- rust/datafusion/src/physical_plan/cartesian_join.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/rust/datafusion/src/physical_plan/cartesian_join.rs b/rust/datafusion/src/physical_plan/cartesian_join.rs index b51a46d6588..ba3a719b13c 100644 --- a/rust/datafusion/src/physical_plan/cartesian_join.rs +++ b/rust/datafusion/src/physical_plan/cartesian_join.rs @@ -88,12 +88,12 @@ impl CartesianJoinExec { }) } - /// left (build) side which gets hashed + /// left (build) side which gets loaded in memory pub fn left(&self) -> &Arc { &self.left } - /// right (probe) side which are filtered by the hash table + /// right side which gets combined with left side pub fn right(&self) -> &Arc { &self.right } @@ -123,7 +123,7 @@ impl ExecutionPlan for CartesianJoinExec { children[1].clone(), )?)), _ => Err(DataFusionError::Internal( - "HashJoinExec wrong number of children".to_string(), + "CartesianJoinExec wrong number of children".to_string(), )), } } @@ -230,7 +230,8 @@ fn build_batch( let batch = RecordBatch::try_new( Arc::new(schema.clone()), - left.columns().iter() + left.columns() + .iter() .chain(arrays.iter()) .cloned() .collect(), From fa6d38fe0e2821d7143bde6eec9199c989c58239 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sun, 18 Apr 2021 20:38:21 +0200 Subject: [PATCH 07/13] Cleanup --- rust/datafusion/src/optimizer/hash_build_probe_order.rs | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/rust/datafusion/src/optimizer/hash_build_probe_order.rs b/rust/datafusion/src/optimizer/hash_build_probe_order.rs index 8f212e110f3..f468c65c147 100644 --- a/rust/datafusion/src/optimizer/hash_build_probe_order.rs +++ b/rust/datafusion/src/optimizer/hash_build_probe_order.rs @@ -68,11 +68,8 @@ fn get_num_rows(logical_plan: &LogicalPlan) -> Option { None } LogicalPlan::CartesianJoin { left, right, .. } => { - get_num_rows(left).and_then(|x| - get_num_rows(right).and_then(|y| - Some(x * y) - ) - ) + // number of rows is equal to num_left * num_right + get_num_rows(left).and_then(|x| get_num_rows(right).and_then(|y| Some(x * y))) } LogicalPlan::Repartition { .. } => { // we cannot predict how rows will be repartitioned @@ -153,7 +150,7 @@ impl OptimizerRule for HashBuildProbeOrder { let left = self.optimize(left)?; let right = self.optimize(right)?; if should_swap_join_order(&left, &right) { - // Swap left and right, change join type and (equi-)join key order + // Swap left and right Ok(LogicalPlan::CartesianJoin { left: Arc::new(right), right: Arc::new(left), From 807d9c8923f2d9efd22cd74cd0b724723f253c8c Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sun, 18 Apr 2021 20:50:14 +0200 Subject: [PATCH 08/13] Add to ballista --- rust/ballista/rust/core/src/serde/logical_plan/to_proto.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/rust/ballista/rust/core/src/serde/logical_plan/to_proto.rs b/rust/ballista/rust/core/src/serde/logical_plan/to_proto.rs index a181f98b6eb..222b76739fe 100644 --- a/rust/ballista/rust/core/src/serde/logical_plan/to_proto.rs +++ b/rust/ballista/rust/core/src/serde/logical_plan/to_proto.rs @@ -940,6 +940,7 @@ impl TryInto for &LogicalPlan { } LogicalPlan::Extension { .. } => unimplemented!(), LogicalPlan::Union { .. } => unimplemented!(), + LogicalPlan::CrossJoin { .. } => unimplemented!(), } } } From c0855708d34686d7b04616307b4a7fe201b2a2a6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Sun, 18 Apr 2021 21:08:59 +0200 Subject: [PATCH 09/13] Fix enum balue --- rust/ballista/rust/core/src/serde/logical_plan/to_proto.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/ballista/rust/core/src/serde/logical_plan/to_proto.rs b/rust/ballista/rust/core/src/serde/logical_plan/to_proto.rs index 222b76739fe..948db32beaa 100644 --- a/rust/ballista/rust/core/src/serde/logical_plan/to_proto.rs +++ b/rust/ballista/rust/core/src/serde/logical_plan/to_proto.rs @@ -940,7 +940,7 @@ impl TryInto for &LogicalPlan { } LogicalPlan::Extension { .. } => unimplemented!(), LogicalPlan::Union { .. } => unimplemented!(), - LogicalPlan::CrossJoin { .. } => unimplemented!(), + LogicalPlan::CartesianJoin { .. } => unimplemented!(), } } } From be550bb3987ea86ce4026eb199c885060c0702a2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Sun, 18 Apr 2021 22:01:13 +0200 Subject: [PATCH 10/13] Reorder imports --- rust/datafusion/src/physical_plan/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/datafusion/src/physical_plan/mod.rs b/rust/datafusion/src/physical_plan/mod.rs index e96927b7c2c..bf898517685 100644 --- a/rust/datafusion/src/physical_plan/mod.rs +++ b/rust/datafusion/src/physical_plan/mod.rs @@ -333,6 +333,7 @@ pub trait Accumulator: Send + Sync + Debug { pub mod aggregates; pub mod array_expressions; +pub mod cartesian_join; pub mod coalesce_batches; pub mod common; #[cfg(feature = "crypto_expressions")] @@ -348,7 +349,6 @@ pub mod functions; pub mod group_scalar; pub mod hash_aggregate; pub mod hash_join; -pub mod cartesian_join; pub mod hash_utils; pub mod limit; pub mod math_expressions; From 8aa8d17074bfc56124f592a5ef4437b49b3deaa6 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sun, 18 Apr 2021 22:37:13 +0200 Subject: [PATCH 11/13] Clippy, efficiency --- .../src/optimizer/hash_build_probe_order.rs | 2 +- .../src/physical_plan/cartesian_join.rs | 14 +++++++------- rust/datafusion/tests/sql.rs | 18 +++++++++--------- 3 files changed, 17 insertions(+), 17 deletions(-) diff --git a/rust/datafusion/src/optimizer/hash_build_probe_order.rs b/rust/datafusion/src/optimizer/hash_build_probe_order.rs index f468c65c147..3270c2850a2 100644 --- a/rust/datafusion/src/optimizer/hash_build_probe_order.rs +++ b/rust/datafusion/src/optimizer/hash_build_probe_order.rs @@ -69,7 +69,7 @@ fn get_num_rows(logical_plan: &LogicalPlan) -> Option { } LogicalPlan::CartesianJoin { left, right, .. } => { // number of rows is equal to num_left * num_right - get_num_rows(left).and_then(|x| get_num_rows(right).and_then(|y| Some(x * y))) + get_num_rows(left).and_then(|x| get_num_rows(right).map(|y| x * y)) } LogicalPlan::Repartition { .. } => { // we cannot predict how rows will be repartitioned diff --git a/rust/datafusion/src/physical_plan/cartesian_join.rs b/rust/datafusion/src/physical_plan/cartesian_join.rs index ba3a719b13c..33ce982b665 100644 --- a/rust/datafusion/src/physical_plan/cartesian_join.rs +++ b/rust/datafusion/src/physical_plan/cartesian_join.rs @@ -148,7 +148,7 @@ impl ExecutionPlan for CartesianJoinExec { // Load all batches and count the rows let (batches, num_rows) = stream - .try_fold((Vec::new(), 0 as usize), |mut acc, batch| async { + .try_fold((Vec::new(), 0usize), |mut acc, batch| async { acc.1 += batch.num_rows(); acc.0.push(batch); Ok(acc) @@ -210,19 +210,19 @@ impl RecordBatchStream for CartesianJoinStream { } fn build_batch( batch: &RecordBatch, - left_data: &JoinLeftData, + left_data: &[RecordBatch], schema: &Schema, ) -> ArrowResult { let mut batches = Vec::new(); let mut num_rows = 0; for left in left_data.iter() { - for x in 0..batch.num_rows() { + for i in 0..left.num_rows() { // for each value on the left, repeat the value of the right - let arrays = batch + let arrays = left .columns() .iter() .map(|arr| { - let scalar = ScalarValue::try_from_array(arr, x)?; + let scalar = ScalarValue::try_from_array(arr, i)?; Ok(scalar.to_array_of_size(left.num_rows())) }) .collect::>>() @@ -230,9 +230,9 @@ fn build_batch( let batch = RecordBatch::try_new( Arc::new(schema.clone()), - left.columns() + arrays .iter() - .chain(arrays.iter()) + .chain(batch.columns().iter()) .cloned() .collect(), )?; diff --git a/rust/datafusion/tests/sql.rs b/rust/datafusion/tests/sql.rs index 7961e17f83c..10ba8cd2bfd 100644 --- a/rust/datafusion/tests/sql.rs +++ b/rust/datafusion/tests/sql.rs @@ -1311,20 +1311,20 @@ async fn cartesian_join() -> Result<()> { actual, [ ["11", "a", "z"], - ["22", "b", "z"], - ["33", "c", "z"], - ["44", "d", "z"], ["11", "a", "y"], - ["22", "b", "y"], - ["33", "c", "y"], - ["44", "d", "y"], ["11", "a", "x"], - ["22", "b", "x"], - ["33", "c", "x"], - ["44", "d", "x"], ["11", "a", "w"], + ["22", "b", "z"], + ["22", "b", "y"], + ["22", "b", "x"], ["22", "b", "w"], + ["33", "c", "z"], + ["33", "c", "y"], + ["33", "c", "x"], ["33", "c", "w"], + ["44", "d", "z"], + ["44", "d", "y"], + ["44", "d", "x"], ["44", "d", "w"] ] ); From c82efe75520e827fb94f26409c7928bfe138cc9e Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sun, 18 Apr 2021 22:51:03 +0200 Subject: [PATCH 12/13] Add to tpch run tests --- rust/benchmarks/src/bin/tpch.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/rust/benchmarks/src/bin/tpch.rs b/rust/benchmarks/src/bin/tpch.rs index 328a68dd6a6..b203ceb3f74 100644 --- a/rust/benchmarks/src/bin/tpch.rs +++ b/rust/benchmarks/src/bin/tpch.rs @@ -1374,6 +1374,11 @@ mod tests { run_query(6).await } + #[tokio::test] + async fn run_q9() -> Result<()> { + run_query(9).await + } + #[tokio::test] async fn run_q10() -> Result<()> { run_query(10).await From f93a7d3d1c6fd2b841b9cba2d68accf205170ad1 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sun, 18 Apr 2021 23:05:49 +0200 Subject: [PATCH 13/13] Fix repeating number of rows --- rust/datafusion/src/physical_plan/cartesian_join.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/datafusion/src/physical_plan/cartesian_join.rs b/rust/datafusion/src/physical_plan/cartesian_join.rs index 33ce982b665..d394b917f40 100644 --- a/rust/datafusion/src/physical_plan/cartesian_join.rs +++ b/rust/datafusion/src/physical_plan/cartesian_join.rs @@ -223,7 +223,7 @@ fn build_batch( .iter() .map(|arr| { let scalar = ScalarValue::try_from_array(arr, i)?; - Ok(scalar.to_array_of_size(left.num_rows())) + Ok(scalar.to_array_of_size(batch.num_rows())) }) .collect::>>() .map_err(|x| x.into_arrow_external_error())?;