Skip to content

Commit 0f79c37

Browse files
committed
[HSTACK] - inlinetablescan top-down instead of bottom-up
1 parent 6bb9672 commit 0f79c37

File tree

1 file changed

+92
-9
lines changed

1 file changed

+92
-9
lines changed

datafusion/optimizer/src/analyzer/inline_table_scan.rs

Lines changed: 92 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ impl InlineTableScan {
3838

3939
impl AnalyzerRule for InlineTableScan {
4040
fn analyze(&self, plan: LogicalPlan, _: &ConfigOptions) -> Result<LogicalPlan> {
41-
plan.transform_up(analyze_internal).data()
41+
plan.transform_down_with_subqueries(analyze_internal).data()
4242
}
4343

4444
fn name(&self) -> &str {
@@ -47,11 +47,6 @@ impl AnalyzerRule for InlineTableScan {
4747
}
4848

4949
fn analyze_internal(plan: LogicalPlan) -> Result<Transformed<LogicalPlan>> {
50-
// rewrite any subqueries in the plan first
51-
let transformed_plan =
52-
plan.map_subqueries(|plan| plan.transform_up(analyze_internal))?;
53-
54-
let transformed_plan = transformed_plan.transform_data(|plan| {
5550
match plan {
5651
// Match only on scans without filter / projection / fetch
5752
// Views and DataFrames won't have those added
@@ -61,6 +56,7 @@ fn analyze_internal(plan: LogicalPlan) -> Result<Transformed<LogicalPlan>> {
6156
let sub_plan = sub_plan.into_owned();
6257
let projection_exprs =
6358
generate_projection_expr(&table_scan.projection, &sub_plan)?;
59+
6460
LogicalPlanBuilder::from(sub_plan)
6561
.project(projection_exprs)?
6662
// Ensures that the reference to the inlined table remains the
@@ -75,9 +71,6 @@ fn analyze_internal(plan: LogicalPlan) -> Result<Transformed<LogicalPlan>> {
7571
}
7672
_ => Ok(Transformed::no(plan)),
7773
}
78-
})?;
79-
80-
Ok(transformed_plan)
8174
}
8275

8376
fn generate_projection_expr(
@@ -202,4 +195,94 @@ mod tests {
202195

203196
assert_analyzed_plan_eq(Arc::new(InlineTableScan::new()), plan, expected)
204197
}
198+
199+
#[derive(Debug)]
200+
// stand-in for DataFrameTableProvider which we can't access here
201+
struct WrappingSource {
202+
plan: LogicalPlan,
203+
}
204+
205+
impl TableSource for WrappingSource {
206+
fn as_any(&self) -> &dyn std::any::Any {
207+
self
208+
}
209+
210+
fn schema(&self) -> arrow::datatypes::SchemaRef {
211+
let schema: Schema = self.plan.schema().as_ref().into();
212+
Arc::new(schema)
213+
}
214+
215+
fn supports_filters_pushdown(
216+
&self,
217+
filters: &[&Expr],
218+
) -> datafusion_common::Result<Vec<datafusion_expr::TableProviderFilterPushDown>>
219+
{
220+
// A filter is added on the DataFrame when given
221+
Ok(vec![
222+
datafusion_expr::TableProviderFilterPushDown::Exact;
223+
filters.len()
224+
])
225+
}
226+
227+
fn get_logical_plan(&self) -> Option<Cow<LogicalPlan>> {
228+
Some(Cow::Borrowed(&self.plan))
229+
}
230+
}
231+
232+
#[test]
233+
fn inline_table_scan_wrapped_once() -> datafusion_common::Result<()> {
234+
let custom_source = CustomSource::new();
235+
let wrapped_source = WrappingSource {
236+
plan: LogicalPlanBuilder::scan("wrapped", Arc::new(custom_source), None)?
237+
.build()?,
238+
};
239+
240+
let scan =
241+
LogicalPlanBuilder::scan("x".to_string(), Arc::new(wrapped_source), None)?;
242+
let plan = scan.build()?;
243+
let expected = "SubqueryAlias: x\
244+
\n Projection: *\
245+
\n SubqueryAlias: wrapped\
246+
\n Projection: *\
247+
\n TableScan: y";
248+
249+
assert_analyzed_plan_eq(Arc::new(InlineTableScan::new()), plan, expected)
250+
}
251+
252+
#[test]
253+
fn inline_table_scan_wrapped_twice_with_projection() -> datafusion_common::Result<()> {
254+
let custom_source = CustomSource::new();
255+
let wrapped_source_once = WrappingSource {
256+
plan: LogicalPlanBuilder::scan(
257+
"wrapped_once",
258+
Arc::new(custom_source),
259+
Some(vec![0]),
260+
)?
261+
.build()?,
262+
};
263+
let wrapped_source_twice = WrappingSource {
264+
plan: LogicalPlanBuilder::scan(
265+
"wrapped_twice",
266+
Arc::new(wrapped_source_once),
267+
None,
268+
)?
269+
.build()?,
270+
};
271+
272+
let scan = LogicalPlanBuilder::scan(
273+
"x".to_string(),
274+
Arc::new(wrapped_source_twice),
275+
None,
276+
)?;
277+
let plan = scan.build()?;
278+
let expected = "SubqueryAlias: x\
279+
\n Projection: *\
280+
\n SubqueryAlias: wrapped_twice\
281+
\n Projection: *\
282+
\n SubqueryAlias: wrapped_once\
283+
\n Projection: y.a\
284+
\n TableScan: y";
285+
286+
assert_analyzed_plan_eq(Arc::new(InlineTableScan::new()), plan, expected)
287+
}
205288
}

0 commit comments

Comments
 (0)