diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs index 15d203832af8..53775e26813e 100644 --- a/datafusion/core/src/execution/context.rs +++ b/datafusion/core/src/execution/context.rs @@ -72,7 +72,6 @@ use crate::optimizer::to_approx_perc::ToApproxPerc; use crate::physical_optimizer::coalesce_batches::CoalesceBatches; use crate::physical_optimizer::merge_exec::AddCoalescePartitionsExec; -use crate::physical_optimizer::repartition::Repartition; use crate::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; use crate::logical_plan::plan::Explain; @@ -1164,7 +1163,8 @@ impl SessionState { Arc::new(AggregateStatistics::new()), Arc::new(HashBuildProbeOrder::new()), Arc::new(CoalesceBatches::new()), - Arc::new(Repartition::new()), + // FIXME: Repartition is temporarily disabled to avoid issues with subqueries + //Arc::new(Repartition::new()), Arc::new(AddCoalescePartitionsExec::new()), ], query_planner: Arc::new(DefaultQueryPlanner {}), diff --git a/datafusion/core/src/physical_plan/subquery.rs b/datafusion/core/src/physical_plan/subquery.rs index d4bd69948694..86cb3b2dcc52 100644 --- a/datafusion/core/src/physical_plan/subquery.rs +++ b/datafusion/core/src/physical_plan/subquery.rs @@ -28,6 +28,7 @@ use std::task::{Context, Poll}; use crate::error::{DataFusionError, Result}; use crate::physical_plan::{DisplayFormatType, ExecutionPlan, Partitioning}; +use arrow::array::new_null_array; use arrow::datatypes::{Schema, SchemaRef}; use arrow::error::{ArrowError, Result as ArrowResult}; use arrow::record_batch::RecordBatch; @@ -151,43 +152,71 @@ impl ExecutionPlan for SubqueryExec { let context = context.clone(); let size_hint = stream.size_hint(); let schema = self.schema.clone(); - let res_stream = stream.then(move |batch| { - let cursor = cursor.clone(); - let context = context.clone(); - let subqueries = subqueries.clone(); - let schema = schema.clone(); - async move { - let batch = batch?; - let b = Arc::new(batch.clone()); - cursor.set_batch(b)?; - let mut subquery_arrays = vec![Vec::new(); subqueries.len()]; - for i in 0..batch.num_rows() { - cursor.set_position(i)?; - for (subquery_i, subquery) in subqueries.iter().enumerate() { - if subquery.output_partitioning().partition_count() != 1 { - return Err(ArrowError::ComputeError(format!("Sub query should have only one partition but got {}", subquery.output_partitioning().partition_count()))) - } - let mut stream = subquery.execute(0, context.clone()).await?; - let res = stream.next().await; - if let Some(subquery_batch) = res { - let subquery_batch = subquery_batch?; - if subquery_batch.column(0).len() != 1 { - return Err(ArrowError::ComputeError("Sub query should return exactly one row".to_string())) + let res_stream = + stream.then(move |batch| { + let cursor = cursor.clone(); + let context = context.clone(); + let subqueries = subqueries.clone(); + let schema = schema.clone(); + async move { + let batch = batch?; + let b = Arc::new(batch.clone()); + cursor.set_batch(b)?; + let mut subquery_arrays = vec![Vec::new(); subqueries.len()]; + for i in 0..batch.num_rows() { + cursor.set_position(i)?; + for (subquery_i, subquery) in subqueries.iter().enumerate() { + let null_array = || { + let schema = subquery.schema(); + let fields = schema.fields(); + if fields.len() != 1 { + return Err(ArrowError::ComputeError(format!( + "Sub query should have only one column but got {}", + fields.len() + ))); + } + + let data_type = fields.get(0).unwrap().data_type(); + Ok(new_null_array(data_type, 1)) + }; + + if subquery.output_partitioning().partition_count() != 1 { + return Err(ArrowError::ComputeError(format!( + "Sub query should have only one partition but got {}", + subquery.output_partitioning().partition_count() + ))); + } + let mut stream = subquery.execute(0, context.clone()).await?; + let res = stream.next().await; + if let Some(subquery_batch) = res { + let subquery_batch = subquery_batch?; + match subquery_batch.column(0).len() { + 0 => subquery_arrays[subquery_i].push(null_array()?), + 1 => subquery_arrays[subquery_i] + .push(subquery_batch.column(0).clone()), + _ => return Err(ArrowError::ComputeError( + "Sub query should return no more than one row" + .to_string(), + )), + }; } else { - subquery_arrays[subquery_i].push(subquery_batch.column(0).clone()); + subquery_arrays[subquery_i].push(null_array()?); } - } else { - return Err(ArrowError::ComputeError("Sub query returned empty result set but exactly one row is expected".to_string())) } } + let mut new_columns = batch.columns().to_vec(); + for subquery_array in subquery_arrays { + new_columns.push(concat( + subquery_array + .iter() + .map(|a| a.as_ref()) + .collect::>() + .as_slice(), + )?); + } + RecordBatch::try_new(schema.clone(), new_columns) } - let mut new_columns = batch.columns().to_vec(); - for subquery_array in subquery_arrays { - new_columns.push(concat(subquery_array.iter().map(|a| a.as_ref()).collect::>().as_slice())?); - } - RecordBatch::try_new(schema.clone(), new_columns) - } - }); + }); Ok(Box::pin(SubQueryStream { schema: self.schema.clone(), stream: Box::pin(res_stream),