Skip to content

Commit ddcde9e

Browse files
committed
rewrite to be more functional
1 parent 1dcd9f5 commit ddcde9e

File tree

3 files changed

+108
-89
lines changed

3 files changed

+108
-89
lines changed

datafusion/expr/src/logical_plan/ddl.rs

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -110,24 +110,6 @@ impl DdlStatement {
110110
}
111111
}
112112

113-
/// Return a mutable reference to the input `LogicalPlan`, if any
114-
pub fn input_mut(&mut self) -> Option<&mut Arc<LogicalPlan>> {
115-
match self {
116-
DdlStatement::CreateMemoryTable(CreateMemoryTable { input, .. }) => {
117-
Some(input)
118-
}
119-
DdlStatement::CreateExternalTable(_) => None,
120-
DdlStatement::CreateView(CreateView { input, .. }) => Some(input),
121-
DdlStatement::CreateCatalogSchema(_) => None,
122-
DdlStatement::CreateCatalog(_) => None,
123-
DdlStatement::DropTable(_) => None,
124-
DdlStatement::DropView(_) => None,
125-
DdlStatement::DropCatalogSchema(_) => None,
126-
DdlStatement::CreateFunction(_) => None,
127-
DdlStatement::DropFunction(_) => None,
128-
}
129-
}
130-
131113
/// Return a `format`able structure with the a human readable
132114
/// description of this LogicalPlan node per node, not including
133115
/// children.

datafusion/expr/src/logical_plan/rewrite.rs

Lines changed: 106 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -19,39 +19,51 @@
1919
2020
use crate::dml::CopyTo;
2121
use crate::{
22-
Aggregate, Analyze, CrossJoin, Distinct, DistinctOn, DmlStatement, EmptyRelation,
23-
Explain, Filter, Join, Limit, LogicalPlan, Prepare, Projection, RecursiveQuery,
24-
Repartition, Sort, Subquery, SubqueryAlias, Union, Unnest, UserDefinedLogicalNode,
25-
Window,
22+
Aggregate, Analyze, CreateMemoryTable, CreateView, CrossJoin, DdlStatement, Distinct,
23+
DistinctOn, DmlStatement, Explain, Extension, Filter, Join, Limit, LogicalPlan,
24+
Prepare, Projection, RecursiveQuery, Repartition, Sort, Subquery, SubqueryAlias,
25+
Union, Unnest, Window,
2626
};
27-
use datafusion_common::tree_node::{Transformed, TreeNodeIterator};
28-
use datafusion_common::{map_until_stop_and_collect, DFSchema, DFSchemaRef, Result};
29-
use std::sync::{Arc, OnceLock};
27+
use datafusion_common::tree_node::{Transformed, TreeNodeIterator, TreeNodeRecursion};
28+
use datafusion_common::{map_until_stop_and_collect, Result};
29+
use std::sync::Arc;
3030

31-
/// A temporary node that is left in place while rewriting the children of a
32-
/// [`LogicalPlan`]. This is necessary to ensure that the `LogicalPlan` is
33-
/// always in a valid state (from the Rust perspective)
34-
static PLACEHOLDER: OnceLock<Arc<LogicalPlan>> = OnceLock::new();
31+
/// Converts a `Arc<LogicalPlan>` without copying, if possible. Copies the plan
32+
/// if there is a shared reference
33+
fn unwrap_arc(plan: Arc<LogicalPlan>) -> LogicalPlan {
34+
Arc::try_unwrap(plan)
35+
// if None is returned, there is another reference to this
36+
// LogicalPlan, so we can not own it, and must clone instead
37+
.unwrap_or_else(|node| node.as_ref().clone())
38+
}
3539

36-
/// its inputs, so this code would not be needed. However, for now we try and
37-
/// unwrap the `Arc` which avoids `clone`ing in most cases.
38-
///
39-
/// On error, node be left with a placeholder logical plan
40+
/// Applies `f` to rewrite a `Arc<LogicalPlan>` without copying, if possible
4041
fn rewrite_arc<F>(
41-
node: Arc<LogicalPlan>,
42+
plan: Arc<LogicalPlan>,
4243
mut f: F,
4344
) -> Result<Transformed<Arc<LogicalPlan>>>
4445
where
4546
F: FnMut(LogicalPlan) -> Result<Transformed<LogicalPlan>>,
4647
{
47-
// try to update existing node, if it isn't shared with others
48-
let new_node = Arc::try_unwrap(node)
49-
// if None is returned, there is another reference to this
50-
// LogicalPlan, so we must clone instead
51-
.unwrap_or_else(|node| node.as_ref().clone());
48+
f(unwrap_arc(plan))?.map_data(|new_plan| Ok(Arc::new(new_plan)))
49+
}
5250

53-
// apply the actual transform
54-
f(new_node).unwrap().map_data(Arc::new)
51+
/// rewrite a `Vec` of `Arc<LogicalPlan>` without copying, if possible
52+
fn rewrite_arcs<F>(
53+
input_plans: Vec<Arc<LogicalPlan>>,
54+
mut f: F,
55+
) -> Result<Transformed<Vec<Arc<LogicalPlan>>>>
56+
where
57+
F: FnMut(LogicalPlan) -> Result<Transformed<LogicalPlan>>,
58+
{
59+
Ok(input_plans
60+
.into_iter()
61+
.map(unwrap_arc)
62+
.map_until_stop_and_collect(&mut f)?
63+
.update_data(|inputs| {
64+
// have a Vec<LogicalPlan>, now need to wrap in `Arc`s again
65+
inputs.into_iter().map(Arc::new).collect::<Vec<_>>()
66+
}))
5567
}
5668

5769
/// Rewrites all inputs for an Extension node "in place"
@@ -60,30 +72,24 @@ where
6072
/// Should be removed when we have an API for in place modifications of the
6173
/// extension to avoid these copies
6274
fn rewrite_extension_inputs<F>(
63-
node: &mut Arc<dyn UserDefinedLogicalNode>,
75+
extension: Extension,
6476
f: F,
65-
) -> datafusion_common::Result<Transformed<()>>
77+
) -> Result<Transformed<Extension>>
6678
where
6779
F: FnMut(LogicalPlan) -> Result<Transformed<LogicalPlan>>,
6880
{
69-
let Transformed {
70-
data: new_inputs,
71-
transformed,
72-
tnr,
73-
} = node
74-
.inputs()
81+
let Extension { node } = extension;
82+
83+
node.inputs()
7584
.into_iter()
7685
.cloned()
77-
.map_until_stop_and_collect(f)?;
78-
79-
let exprs = node.expressions();
80-
let mut new_node = node.from_template(&exprs, &new_inputs);
81-
std::mem::swap(node, &mut new_node);
82-
Ok(Transformed {
83-
data: (),
84-
transformed,
85-
tnr,
86-
})
86+
.map_until_stop_and_collect(f)?
87+
.map_data(|new_inputs| {
88+
let exprs = node.expressions();
89+
Ok(Extension {
90+
node: node.from_template(&exprs, &new_inputs),
91+
})
92+
})
8793
}
8894

8995
impl LogicalPlan {
@@ -192,12 +198,12 @@ impl LogicalPlan {
192198
}),
193199
LogicalPlan::Limit(Limit { skip, fetch, input }) => {
194200
rewrite_arc(input, &mut f)?
195-
.map_data(|input| LogicalPlan::Limit(Limit { skip, fetch, input }))
201+
.update_data(|input| LogicalPlan::Limit(Limit { skip, fetch, input }))
196202
}
197203
LogicalPlan::Subquery(Subquery {
198204
subquery,
199205
outer_ref_columns,
200-
}) => rewrite_arc(subquery, &mut f)?.map_data(|input| {
206+
}) => rewrite_arc(subquery, &mut f)?.update_data(|subquery| {
201207
LogicalPlan::Subquery(Subquery {
202208
subquery,
203209
outer_ref_columns,
@@ -214,16 +220,14 @@ impl LogicalPlan {
214220
schema,
215221
})
216222
}),
217-
LogicalPlan::Extension(extension) => {
218-
rewrite_extension_inputs(&mut extension.node, &mut f)
219-
}
220-
LogicalPlan::Union(Union { inputs, schema }) => inputs
221-
.into_iter()
222-
.map_until_stop_and_collect(&mut f)?
223+
LogicalPlan::Extension(extension) => rewrite_extension_inputs(extension, f)?
224+
.update_data(LogicalPlan::Extension),
225+
LogicalPlan::Union(Union { inputs, schema }) => rewrite_arcs(inputs, &mut f)?
223226
.update_data(|inputs| LogicalPlan::Union(Union { inputs, schema })),
224227
LogicalPlan::Distinct(distinct) => match distinct {
225-
Distinct::All(input) => rewrite_arc(input, &mut f)?
226-
.update_data(|input| Distinct::All(input))?,
228+
Distinct::All(input) => {
229+
rewrite_arc(input, &mut f)?.update_data(Distinct::All)
230+
}
227231
Distinct::On(DistinctOn {
228232
on_expr,
229233
select_expr,
@@ -238,16 +242,16 @@ impl LogicalPlan {
238242
input,
239243
schema,
240244
})
241-
})?,
245+
}),
242246
}
243-
.map_data(|distinct| LogicalPlan::Distinct(distinct)),
247+
.update_data(LogicalPlan::Distinct),
244248
LogicalPlan::Explain(Explain {
245249
verbose,
246250
plan,
247251
stringified_plans,
248252
schema,
249253
logical_optimization_succeeded,
250-
}) => rewrite_arc(plan, &mut f)?.map_data(|plan| {
254+
}) => rewrite_arc(plan, &mut f)?.update_data(|plan| {
251255
LogicalPlan::Explain(Explain {
252256
verbose,
253257
plan,
@@ -260,7 +264,7 @@ impl LogicalPlan {
260264
verbose,
261265
input,
262266
schema,
263-
}) => rewrite_arc(input, &mut f)?.map_data(|input| {
267+
}) => rewrite_arc(input, &mut f)?.update_data(|input| {
264268
LogicalPlan::Analyze(Analyze {
265269
verbose,
266270
input,
@@ -272,7 +276,7 @@ impl LogicalPlan {
272276
table_schema,
273277
op,
274278
input,
275-
}) => rewrite_arc(&input, &mut f)?.map_data(|input| {
279+
}) => rewrite_arc(input, &mut f)?.update_data(|input| {
276280
LogicalPlan::Dml(DmlStatement {
277281
table_name,
278282
table_schema,
@@ -286,7 +290,7 @@ impl LogicalPlan {
286290
partition_by,
287291
format_options,
288292
options,
289-
}) => rewrite_arc(input, &mut f)?.map_data(|input| {
293+
}) => rewrite_arc(input, &mut f)?.update_data(|input| {
290294
LogicalPlan::Copy(CopyTo {
291295
input,
292296
output_url,
@@ -296,18 +300,55 @@ impl LogicalPlan {
296300
})
297301
}),
298302
LogicalPlan::Ddl(ddl) => {
299-
if let Some(input) = ddl.input_mut() {
300-
rewrite_arc(input, &mut f)
301-
} else {
302-
Ok(Transformed::no(()))
303+
match ddl {
304+
DdlStatement::CreateMemoryTable(CreateMemoryTable {
305+
name,
306+
constraints,
307+
input,
308+
if_not_exists,
309+
or_replace,
310+
column_defaults,
311+
}) => rewrite_arc(input, f)?.update_data(|input| {
312+
DdlStatement::CreateMemoryTable(CreateMemoryTable {
313+
name,
314+
constraints,
315+
input,
316+
if_not_exists,
317+
or_replace,
318+
column_defaults,
319+
})
320+
}),
321+
DdlStatement::CreateView(CreateView {
322+
name,
323+
input,
324+
or_replace,
325+
definition,
326+
}) => rewrite_arc(input, f)?.update_data(|input| {
327+
DdlStatement::CreateView(CreateView {
328+
name,
329+
input,
330+
or_replace,
331+
definition,
332+
})
333+
}),
334+
// no inputs in these statements
335+
DdlStatement::CreateExternalTable(_)
336+
| DdlStatement::CreateCatalogSchema(_)
337+
| DdlStatement::CreateCatalog(_)
338+
| DdlStatement::DropTable(_)
339+
| DdlStatement::DropView(_)
340+
| DdlStatement::DropCatalogSchema(_)
341+
| DdlStatement::CreateFunction(_)
342+
| DdlStatement::DropFunction(_) => Transformed::no(ddl),
303343
}
344+
.update_data(LogicalPlan::Ddl)
304345
}
305346
LogicalPlan::Unnest(Unnest {
306347
input,
307348
column,
308349
schema,
309350
options,
310-
}) => rewrite_arc(input, &mut f)?.map_data(|input| {
351+
}) => rewrite_arc(input, &mut f)?.update_data(|input| {
311352
LogicalPlan::Unnest(Unnest {
312353
input,
313354
column,
@@ -319,7 +360,7 @@ impl LogicalPlan {
319360
name,
320361
data_types,
321362
input,
322-
}) => rewrite_arc(input, &mut f)?.map_data(|input| {
363+
}) => rewrite_arc(input, &mut f)?.update_data(|input| {
323364
LogicalPlan::Prepare(Prepare {
324365
name,
325366
data_types,
@@ -336,7 +377,7 @@ impl LogicalPlan {
336377
recursive_term,
337378
rewrite_arc(recursive_term, &mut f)
338379
)?
339-
.map_data(|(static_term, recursive_term)| {
380+
.update_data(|(static_term, recursive_term)| {
340381
LogicalPlan::RecursiveQuery(RecursiveQuery {
341382
name,
342383
static_term,
@@ -349,7 +390,7 @@ impl LogicalPlan {
349390
| LogicalPlan::Statement { .. }
350391
| LogicalPlan::EmptyRelation { .. }
351392
| LogicalPlan::Values { .. }
352-
| LogicalPlan::DescribeTable(_) => Ok(Transformed::no(self)),
393+
| LogicalPlan::DescribeTable(_) => Transformed::no(self),
353394
})
354395
}
355396
}

datafusion/expr/src/tree_node/plan.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,14 +32,10 @@ impl TreeNode for LogicalPlan {
3232
self.inputs().into_iter().apply_until_stop(f)
3333
}
3434

35-
fn map_children<F>(mut self, f: F) -> Result<Transformed<Self>>
35+
fn map_children<F>(self, f: F) -> Result<Transformed<Self>>
3636
where
3737
F: FnMut(Self) -> Result<Transformed<Self>>,
3838
{
39-
// Apply the rewrite *in place* for each child to avoid cloning
40-
let result = self.map_inputs(f)?;
41-
42-
// return ourself
43-
Ok(result.update_data(|_| self))
39+
self.map_inputs(f)
4440
}
4541
}

0 commit comments

Comments
 (0)