@@ -45,7 +45,8 @@ use crate::{
4545
4646use arrow:: datatypes:: { DataType , Field , Schema , SchemaRef } ;
4747use datafusion_common:: tree_node:: {
48- Transformed , TransformedResult , TreeNode , TreeNodeRecursion , TreeNodeVisitor ,
48+ Transformed , TransformedIterator , TransformedResult , TreeNode , TreeNodeRecursion ,
49+ TreeNodeVisitor ,
4950} ;
5051use datafusion_common:: {
5152 aggregate_functional_dependencies, internal_err, plan_err, Column , Constraints ,
@@ -1144,7 +1145,10 @@ static PLACEHOLDER: OnceLock<Arc<LogicalPlan>> = OnceLock::new();
11441145/// unwrap the `Arc` which avoids `clone`ing in most cases.
11451146///
11461147/// On error, node be left with a placeholder logical plan
1147- fn rewrite_arc < F > ( node : & mut Arc < LogicalPlan > , mut f : F ) -> Result < Transformed < ( ) > >
1148+ fn rewrite_arc < F > (
1149+ node : & mut Arc < LogicalPlan > ,
1150+ mut f : F ,
1151+ ) -> Result < Transformed < & mut Arc < LogicalPlan > > >
11481152where
11491153 F : FnMut ( LogicalPlan ) -> Result < Transformed < LogicalPlan > > ,
11501154{
@@ -1163,7 +1167,7 @@ where
11631167 std:: mem:: swap ( node, & mut new_node) ;
11641168
11651169 // try to update existing node, if it isn't shared with others
1166- let mut new_node = Arc :: try_unwrap ( new_node)
1170+ let new_node = Arc :: try_unwrap ( new_node)
11671171 // if None is returned, there is another reference to this
11681172 // LogicalPlan, so we must clone instead
11691173 . unwrap_or_else ( |node| node. as_ref ( ) . clone ( ) ) ;
@@ -1175,37 +1179,20 @@ where
11751179 let mut new_node = Arc :: new ( result. data ) ;
11761180 std:: mem:: swap ( node, & mut new_node) ;
11771181
1178- // return the `() ` back
1179- Ok ( Transformed :: new ( ( ) , result. transformed , result. tnr ) )
1182+ // return the `node ` back
1183+ Ok ( Transformed :: new ( node , result. transformed , result. tnr ) )
11801184}
11811185
1182- /*
1183- /// Rewrties all inputs for an Extension node "in place"
1184- /// (it currently has to copy values because there are no APIs for in place modification)
1185- ///
1186- /// Should be removed when we have an API for in place modifications of the
1187- /// extension to avoid these copies
1188- fn rewrite_extension_inputs<F>(
1189- node: &mut Arc<dyn UserDefinedLogicalNode>,
1190- mut f: F,
1191- ) -> Result<Transformed<()>>
1186+ /// Rewrite the arc and discard the contents of Transformed
1187+ fn rewrite_arc_no_data < F > ( node : & mut Arc < LogicalPlan > , f : F ) -> Result < Transformed < ( ) > >
11921188where
1193- F: FnMut(&mut LogicalPlan) -> Result<Transformed<() >>,
1189+ F : FnMut ( LogicalPlan ) -> Result < Transformed < LogicalPlan > > ,
11941190{
1195- let mut inputs: Vec<_> = node.inputs().into_iter().cloned().collect();
1196-
1197- let result = inputs
1198- .iter_mut()
1199- .try_fold(Transformed::no(()), |acc, input| acc.and_then(|| f(input)))?;
1200- let exprs = node.expressions();
1201- let mut new_node = node.from_template(&exprs, &inputs);
1202- std::mem::swap(node, &mut new_node);
1203- Ok(result)
1191+ rewrite_arc ( node, f) . map ( |res| res. discard_data ( ) )
12041192}
1205- */
12061193
12071194impl LogicalPlan {
1208- /// applies `f` to each input of this plan node, rewriting them in place.
1195+ /// applies `f` to each input of this plan node, rewriting them * in place.*
12091196 ///
12101197 /// # Notes
12111198 /// Inputs include both direct children as well as any embedded subquery
@@ -1219,59 +1206,84 @@ impl LogicalPlan {
12191206 {
12201207 let children_result = match self {
12211208 LogicalPlan :: Projection ( Projection { input, .. } ) => {
1222- rewrite_arc ( input, & mut f)
1209+ rewrite_arc_no_data ( input, & mut f)
1210+ }
1211+ LogicalPlan :: Filter ( Filter { input, .. } ) => {
1212+ rewrite_arc_no_data ( input, & mut f)
12231213 }
1224- LogicalPlan :: Filter ( Filter { input, .. } ) => rewrite_arc ( input, & mut f) ,
12251214 LogicalPlan :: Repartition ( Repartition { input, .. } ) => {
1226- rewrite_arc ( input, & mut f)
1215+ rewrite_arc_no_data ( input, & mut f)
1216+ }
1217+ LogicalPlan :: Window ( Window { input, .. } ) => {
1218+ rewrite_arc_no_data ( input, & mut f)
12271219 }
1228- LogicalPlan :: Window ( Window { input, .. } ) => rewrite_arc ( input, & mut f) ,
1229- LogicalPlan :: Aggregate ( Aggregate { input, .. } ) => rewrite_arc ( input, & mut f) ,
1230- LogicalPlan :: Sort ( Sort { input, .. } ) => rewrite_arc ( input, & mut f) ,
1220+ LogicalPlan :: Aggregate ( Aggregate { input, .. } ) => {
1221+ rewrite_arc_no_data ( input, & mut f)
1222+ }
1223+ LogicalPlan :: Sort ( Sort { input, .. } ) => rewrite_arc_no_data ( input, & mut f) ,
12311224 LogicalPlan :: Join ( Join { left, right, .. } ) => {
1232- rewrite_arc ( left, & mut f) ?. and_then ( || rewrite_arc ( right, & mut f) )
1225+ let results = [ left, right]
1226+ . into_iter ( )
1227+ . map_until_stop_and_collect ( |input| rewrite_arc ( input, & mut f) ) ?;
1228+ Ok ( results. discard_data ( ) )
12331229 }
12341230 LogicalPlan :: CrossJoin ( CrossJoin { left, right, .. } ) => {
1235- rewrite_arc ( left, & mut f) ?. and_then ( || rewrite_arc ( right, & mut f) )
1231+ let results = [ left, right]
1232+ . into_iter ( )
1233+ . map_until_stop_and_collect ( |input| rewrite_arc ( input, & mut f) ) ?;
1234+ Ok ( results. discard_data ( ) )
12361235 }
1237- LogicalPlan :: Limit ( Limit { input, .. } ) => rewrite_arc ( input, & mut f) ,
1236+ LogicalPlan :: Limit ( Limit { input, .. } ) => rewrite_arc_no_data ( input, & mut f) ,
12381237 LogicalPlan :: Subquery ( Subquery { subquery, .. } ) => {
1239- rewrite_arc ( subquery, & mut f)
1238+ rewrite_arc_no_data ( subquery, & mut f)
12401239 }
12411240 LogicalPlan :: SubqueryAlias ( SubqueryAlias { input, .. } ) => {
1242- rewrite_arc ( input, & mut f)
1241+ rewrite_arc_no_data ( input, & mut f)
12431242 }
12441243 LogicalPlan :: Extension ( extension) => {
12451244 todo ! ( ) ;
12461245 //rewrite_extension_inputs(&mut extension.node, &mut f)
12471246 }
1248- LogicalPlan :: Union ( Union { inputs, .. } ) => inputs
1249- . iter_mut ( )
1250- . try_fold ( Transformed :: no ( ( ) ) , |acc, input| {
1251- acc. and_then ( || rewrite_arc ( input, & mut f) )
1252- } ) ,
1247+ LogicalPlan :: Union ( Union { inputs, .. } ) => {
1248+ let results = inputs
1249+ . iter_mut ( )
1250+ . map_until_stop_and_collect ( |input| rewrite_arc ( input, & mut f) ) ?;
1251+ Ok ( results. discard_data ( ) )
1252+ }
12531253 LogicalPlan :: Distinct (
12541254 Distinct :: All ( input) | Distinct :: On ( DistinctOn { input, .. } ) ,
1255- ) => rewrite_arc ( input, & mut f) ,
1256- LogicalPlan :: Explain ( explain) => rewrite_arc ( & mut explain. plan , & mut f) ,
1257- LogicalPlan :: Analyze ( analyze) => rewrite_arc ( & mut analyze. input , & mut f) ,
1258- LogicalPlan :: Dml ( write) => rewrite_arc ( & mut write. input , & mut f) ,
1259- LogicalPlan :: Copy ( copy) => rewrite_arc ( & mut copy. input , & mut f) ,
1255+ ) => rewrite_arc_no_data ( input, & mut f) ,
1256+ LogicalPlan :: Explain ( explain) => {
1257+ rewrite_arc_no_data ( & mut explain. plan , & mut f)
1258+ }
1259+ LogicalPlan :: Analyze ( analyze) => {
1260+ rewrite_arc_no_data ( & mut analyze. input , & mut f)
1261+ }
1262+ LogicalPlan :: Dml ( write) => rewrite_arc_no_data ( & mut write. input , & mut f) ,
1263+ LogicalPlan :: Copy ( copy) => rewrite_arc_no_data ( & mut copy. input , & mut f) ,
12601264 LogicalPlan :: Ddl ( ddl) => {
12611265 if let Some ( input) = ddl. input_mut ( ) {
1262- rewrite_arc ( input, & mut f)
1266+ rewrite_arc_no_data ( input, & mut f)
12631267 } else {
12641268 Ok ( Transformed :: no ( ( ) ) )
12651269 }
12661270 }
1267- LogicalPlan :: Unnest ( Unnest { input, .. } ) => rewrite_arc ( input, & mut f) ,
1268- LogicalPlan :: Prepare ( Prepare { input, .. } ) => rewrite_arc ( input, & mut f) ,
1271+ LogicalPlan :: Unnest ( Unnest { input, .. } ) => {
1272+ rewrite_arc_no_data ( input, & mut f)
1273+ }
1274+ LogicalPlan :: Prepare ( Prepare { input, .. } ) => {
1275+ rewrite_arc_no_data ( input, & mut f)
1276+ }
12691277 LogicalPlan :: RecursiveQuery ( RecursiveQuery {
12701278 static_term,
12711279 recursive_term,
12721280 ..
1273- } ) => rewrite_arc ( static_term, & mut f) ?
1274- . and_then ( || rewrite_arc ( recursive_term, & mut f) ) ,
1281+ } ) => {
1282+ let results = [ static_term, recursive_term]
1283+ . into_iter ( )
1284+ . map_until_stop_and_collect ( |input| rewrite_arc ( input, & mut f) ) ?;
1285+ Ok ( results. discard_data ( ) )
1286+ }
12751287 // plans without inputs
12761288 LogicalPlan :: TableScan { .. }
12771289 | LogicalPlan :: Statement { .. }
@@ -1282,7 +1294,8 @@ impl LogicalPlan {
12821294
12831295 // after visiting the actual children we we need to visit any subqueries
12841296 // that are inside the expressions
1285- children_result. and_then ( || self . rewrite_subqueries ( & mut f) )
1297+ // children_result.and_then(|| self.rewrite_subqueries(&mut f))
1298+ Ok ( children_result)
12861299 }
12871300
12881301 /// Return a `LogicalPlan` with all placeholders (e.g $1 $2,
0 commit comments