@@ -28,6 +28,7 @@ use std::task::{Context, Poll};
2828
2929use crate :: error:: { DataFusionError , Result } ;
3030use crate :: physical_plan:: { DisplayFormatType , ExecutionPlan , Partitioning } ;
31+ use arrow:: array:: new_null_array;
3132use arrow:: datatypes:: { Schema , SchemaRef } ;
3233use arrow:: error:: { ArrowError , Result as ArrowResult } ;
3334use arrow:: record_batch:: RecordBatch ;
@@ -151,43 +152,71 @@ impl ExecutionPlan for SubqueryExec {
151152 let context = context. clone ( ) ;
152153 let size_hint = stream. size_hint ( ) ;
153154 let schema = self . schema . clone ( ) ;
154- let res_stream = stream. then ( move |batch| {
155- let cursor = cursor. clone ( ) ;
156- let context = context. clone ( ) ;
157- let subqueries = subqueries. clone ( ) ;
158- let schema = schema. clone ( ) ;
159- async move {
160- let batch = batch?;
161- let b = Arc :: new ( batch. clone ( ) ) ;
162- cursor. set_batch ( b) ?;
163- let mut subquery_arrays = vec ! [ Vec :: new( ) ; subqueries. len( ) ] ;
164- for i in 0 ..batch. num_rows ( ) {
165- cursor. set_position ( i) ?;
166- for ( subquery_i, subquery) in subqueries. iter ( ) . enumerate ( ) {
167- if subquery. output_partitioning ( ) . partition_count ( ) != 1 {
168- return Err ( ArrowError :: ComputeError ( format ! ( "Sub query should have only one partition but got {}" , subquery. output_partitioning( ) . partition_count( ) ) ) )
169- }
170- let mut stream = subquery. execute ( 0 , context. clone ( ) ) . await ?;
171- let res = stream. next ( ) . await ;
172- if let Some ( subquery_batch) = res {
173- let subquery_batch = subquery_batch?;
174- if subquery_batch. column ( 0 ) . len ( ) != 1 {
175- return Err ( ArrowError :: ComputeError ( "Sub query should return exactly one row" . to_string ( ) ) )
155+ let res_stream =
156+ stream. then ( move |batch| {
157+ let cursor = cursor. clone ( ) ;
158+ let context = context. clone ( ) ;
159+ let subqueries = subqueries. clone ( ) ;
160+ let schema = schema. clone ( ) ;
161+ async move {
162+ let batch = batch?;
163+ let b = Arc :: new ( batch. clone ( ) ) ;
164+ cursor. set_batch ( b) ?;
165+ let mut subquery_arrays = vec ! [ Vec :: new( ) ; subqueries. len( ) ] ;
166+ for i in 0 ..batch. num_rows ( ) {
167+ cursor. set_position ( i) ?;
168+ for ( subquery_i, subquery) in subqueries. iter ( ) . enumerate ( ) {
169+ let null_array = || {
170+ let schema = subquery. schema ( ) ;
171+ let fields = schema. fields ( ) ;
172+ if fields. len ( ) != 1 {
173+ return Err ( ArrowError :: ComputeError ( format ! (
174+ "Sub query should have only one column but got {}" ,
175+ fields. len( )
176+ ) ) ) ;
177+ }
178+
179+ let data_type = fields. get ( 0 ) . unwrap ( ) . data_type ( ) ;
180+ Ok ( new_null_array ( data_type, 1 ) )
181+ } ;
182+
183+ if subquery. output_partitioning ( ) . partition_count ( ) != 1 {
184+ return Err ( ArrowError :: ComputeError ( format ! (
185+ "Sub query should have only one partition but got {}" ,
186+ subquery. output_partitioning( ) . partition_count( )
187+ ) ) ) ;
188+ }
189+ let mut stream = subquery. execute ( 0 , context. clone ( ) ) . await ?;
190+ let res = stream. next ( ) . await ;
191+ if let Some ( subquery_batch) = res {
192+ let subquery_batch = subquery_batch?;
193+ match subquery_batch. column ( 0 ) . len ( ) {
194+ 0 => subquery_arrays[ subquery_i] . push ( null_array ( ) ?) ,
195+ 1 => subquery_arrays[ subquery_i]
196+ . push ( subquery_batch. column ( 0 ) . clone ( ) ) ,
197+ _ => return Err ( ArrowError :: ComputeError (
198+ "Sub query should return no more than one row"
199+ . to_string ( ) ,
200+ ) ) ,
201+ } ;
176202 } else {
177- subquery_arrays[ subquery_i] . push ( subquery_batch . column ( 0 ) . clone ( ) ) ;
203+ subquery_arrays[ subquery_i] . push ( null_array ( ) ? ) ;
178204 }
179- } else {
180- return Err ( ArrowError :: ComputeError ( "Sub query returned empty result set but exactly one row is expected" . to_string ( ) ) )
181205 }
182206 }
207+ let mut new_columns = batch. columns ( ) . to_vec ( ) ;
208+ for subquery_array in subquery_arrays {
209+ new_columns. push ( concat (
210+ subquery_array
211+ . iter ( )
212+ . map ( |a| a. as_ref ( ) )
213+ . collect :: < Vec < _ > > ( )
214+ . as_slice ( ) ,
215+ ) ?) ;
216+ }
217+ RecordBatch :: try_new ( schema. clone ( ) , new_columns)
183218 }
184- let mut new_columns = batch. columns ( ) . to_vec ( ) ;
185- for subquery_array in subquery_arrays {
186- new_columns. push ( concat ( subquery_array. iter ( ) . map ( |a| a. as_ref ( ) ) . collect :: < Vec < _ > > ( ) . as_slice ( ) ) ?) ;
187- }
188- RecordBatch :: try_new ( schema. clone ( ) , new_columns)
189- }
190- } ) ;
219+ } ) ;
191220 Ok ( Box :: pin ( SubQueryStream {
192221 schema : self . schema . clone ( ) ,
193222 stream : Box :: pin ( res_stream) ,
0 commit comments