Skip to content

Commit dcdbe88

Browse files
committed
user defined nodes
1 parent 8ac5a34 commit dcdbe88

File tree

1 file changed

+33
-2
lines changed
  • datafusion/expr/src/logical_plan

1 file changed

+33
-2
lines changed

datafusion/expr/src/logical_plan/plan.rs

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1191,6 +1191,38 @@ where
11911191
rewrite_arc(node, f).map(|res| res.discard_data())
11921192
}
11931193

1194+
/// Rewrites all inputs for an Extension node "in place"
1195+
/// (it currently has to copy values because there are no APIs for in place modification)
1196+
///
1197+
/// Should be removed when we have an API for in place modifications of the
1198+
/// extension to avoid these copies
1199+
fn rewrite_extension_inputs<F>(
1200+
node: &mut Arc<dyn UserDefinedLogicalNode>,
1201+
mut f: F,
1202+
) -> Result<Transformed<()>>
1203+
where
1204+
F: FnMut(LogicalPlan) -> Result<Transformed<LogicalPlan>>,
1205+
{
1206+
let Transformed {
1207+
data: new_inputs,
1208+
transformed,
1209+
tnr,
1210+
} = node
1211+
.inputs()
1212+
.into_iter()
1213+
.cloned()
1214+
.map_until_stop_and_collect(|input| f(input))?;
1215+
1216+
let exprs = node.expressions();
1217+
let mut new_node = node.from_template(&exprs, &new_inputs);
1218+
std::mem::swap(node, &mut new_node);
1219+
Ok(Transformed {
1220+
data: (),
1221+
transformed,
1222+
tnr,
1223+
})
1224+
}
1225+
11941226
impl LogicalPlan {
11951227
/// applies `f` to each input of this plan node, rewriting them *in place.*
11961228
///
@@ -1241,8 +1273,7 @@ impl LogicalPlan {
12411273
rewrite_arc_no_data(input, &mut f)
12421274
}
12431275
LogicalPlan::Extension(extension) => {
1244-
todo!();
1245-
//rewrite_extension_inputs(&mut extension.node, &mut f)
1276+
rewrite_extension_inputs(&mut extension.node, &mut f)
12461277
}
12471278
LogicalPlan::Union(Union { inputs, .. }) => {
12481279
let results = inputs

0 commit comments

Comments
 (0)