Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deprecate SessionContext physical plan methods (#4617) #4751

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions benchmarks/src/bin/tpch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,30 +325,31 @@ async fn execute_query(
enable_scheduler: bool,
) -> Result<Vec<RecordBatch>> {
let plan = ctx.sql(sql).await?;
let plan = plan.into_unoptimized_plan();
let (state, plan) = plan.into_parts();

if debug {
println!("=== Logical plan ===\n{:?}\n", plan);
}

let plan = ctx.optimize(&plan)?;
let plan = state.optimize(&plan)?;
if debug {
println!("=== Optimized logical plan ===\n{:?}\n", plan);
}
let physical_plan = ctx.create_physical_plan(&plan).await?;
let physical_plan = state.create_physical_plan(&plan).await?;
if debug {
println!(
"=== Physical plan ===\n{}\n",
displayable(physical_plan.as_ref()).indent()
);
}
let task_ctx = ctx.task_ctx();
let result = if enable_scheduler {
let scheduler = Scheduler::new(num_cpus::get());
let results = scheduler.schedule(physical_plan.clone(), task_ctx).unwrap();
let results = scheduler
.schedule(physical_plan.clone(), state.task_ctx())
.unwrap();
results.stream().try_collect().await?
} else {
collect(physical_plan.clone(), task_ctx).await?
collect(physical_plan.clone(), state.task_ctx()).await?
};
if debug {
println!(
Expand Down
5 changes: 5 additions & 0 deletions datafusion/core/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,11 @@ impl DataFrame {
&self.plan
}

/// Returns both the [`LogicalPlan`] and [`SessionState`] that comprise this [`DataFrame`]
pub fn into_parts(self) -> (SessionState, LogicalPlan) {
(self.session_state, self.plan)
}

/// Return the logical plan represented by this DataFrame without running the optimizers
///
/// Note: This method should not be used outside testing, as it loses the snapshot
Expand Down
6 changes: 6 additions & 0 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -991,11 +991,17 @@ impl SessionContext {
}

/// Optimizes the logical plan by applying optimizer rules.
#[deprecated(
note = "Use SessionState::optimize to ensure a consistent state for planning and execution"
)]
pub fn optimize(&self, plan: &LogicalPlan) -> Result<LogicalPlan> {
self.state.read().optimize(plan)
}

/// Creates a physical plan from a logical plan.
#[deprecated(
note = "Use SessionState::create_physical_plan or DataFrame::create_physical_plan to ensure a consistent state for planning and execution"
)]
pub async fn create_physical_plan(
&self,
logical_plan: &LogicalPlan,
Expand Down
15 changes: 6 additions & 9 deletions datafusion/core/tests/custom_sources.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,11 +208,12 @@ async fn custom_source_dataframe() -> Result<()> {
let ctx = SessionContext::new();

let table = ctx.read_table(Arc::new(CustomTableProvider))?;
let logical_plan = LogicalPlanBuilder::from(table.into_optimized_plan()?)
let (state, plan) = table.into_parts();
let logical_plan = LogicalPlanBuilder::from(plan)
.project(vec![col("c2")])?
.build()?;

let optimized_plan = ctx.optimize(&logical_plan)?;
let optimized_plan = state.optimize(&logical_plan)?;
match &optimized_plan {
LogicalPlan::Projection(Projection { input, .. }) => match &**input {
LogicalPlan::TableScan(TableScan {
Expand All @@ -235,13 +236,12 @@ async fn custom_source_dataframe() -> Result<()> {
);
assert_eq!(format!("{:?}", optimized_plan), expected);

let physical_plan = ctx.create_physical_plan(&optimized_plan).await?;
let physical_plan = state.create_physical_plan(&optimized_plan).await?;

assert_eq!(1, physical_plan.schema().fields().len());
assert_eq!("c2", physical_plan.schema().field(0).name().as_str());

let task_ctx = ctx.task_ctx();
let batches = collect(physical_plan, task_ctx).await?;
let batches = collect(physical_plan, state.task_ctx()).await?;
let origin_rec_batch = TEST_CUSTOM_RECORD_BATCH!()?;
assert_eq!(1, batches.len());
assert_eq!(1, batches[0].num_columns());
Expand All @@ -261,10 +261,7 @@ async fn optimizers_catch_all_statistics() {
.await
.unwrap();

let physical_plan = ctx
.create_physical_plan(&df.into_optimized_plan().unwrap())
.await
.unwrap();
let physical_plan = df.create_physical_plan().await.unwrap();

// when the optimization kicks in, the source is replaced by an EmptyExec
assert!(
Expand Down
8 changes: 4 additions & 4 deletions datafusion/core/tests/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,15 +210,15 @@ impl ContextWithParquet {
.expect("getting input");
let pretty_input = pretty_format_batches(&input).unwrap().to_string();

let logical_plan = self.ctx.optimize(&logical_plan).expect("optimizing plan");
let state = self.ctx.state();
let logical_plan = state.optimize(&logical_plan).expect("optimizing plan");

let physical_plan = self
.ctx
let physical_plan = state
.create_physical_plan(&logical_plan)
.await
.expect("creating physical plan");

let task_ctx = self.ctx.task_ctx();
let task_ctx = state.task_ctx();
let results = datafusion::physical_plan::collect(physical_plan.clone(), task_ctx)
.await
.expect("Running");
Expand Down
6 changes: 3 additions & 3 deletions datafusion/core/tests/sql/aggregates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1109,6 +1109,7 @@ async fn count_distinct_integers_aggregated_multiple_partitions() -> Result<()>
#[tokio::test]
async fn aggregate_with_alias() -> Result<()> {
let ctx = SessionContext::new();
let state = ctx.state();

let schema = Arc::new(Schema::new(vec![
Field::new("c1", DataType::Utf8, false),
Expand All @@ -1120,9 +1121,8 @@ async fn aggregate_with_alias() -> Result<()> {
.project(vec![col("c1"), sum(col("c2")).alias("total_salary")])?
.build()?;

let plan = ctx.optimize(&plan)?;

let physical_plan = ctx.create_physical_plan(&Arc::new(plan)).await?;
let plan = state.optimize(&plan)?;
let physical_plan = state.create_physical_plan(&Arc::new(plan)).await?;
assert_eq!("c1", physical_plan.schema().field(0).name().as_str());
assert_eq!(
"total_salary",
Expand Down
5 changes: 4 additions & 1 deletion datafusion/core/tests/sql/explain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,11 @@ fn optimize_explain() {
panic!("plan was not an explain: {:?}", plan);
}

let ctx = SessionContext::new();
let state = ctx.state();

// now optimize the plan and expect to see more plans
let optimized_plan = SessionContext::new().optimize(&plan).unwrap();
let optimized_plan = state.optimize(&plan).unwrap();
if let LogicalPlan::Explain(e) = &optimized_plan {
// should have more than one plan
assert!(
Expand Down
15 changes: 7 additions & 8 deletions datafusion/core/tests/sql/explain_analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,9 +257,9 @@ async fn csv_explain_plans() {
);

// Optimized logical plan
//
let state = ctx.state();
let msg = format!("Optimizing logical plan for '{}': {:?}", sql, plan);
let plan = ctx.optimize(plan).expect(&msg);
let plan = state.optimize(plan).expect(&msg);
let optimized_logical_schema = plan.schema();
// Both schema has to be the same
assert_eq!(logical_schema, optimized_logical_schema.as_ref());
Expand Down Expand Up @@ -334,12 +334,11 @@ async fn csv_explain_plans() {
// Physical plan
// Create plan
let msg = format!("Creating physical plan for '{}': {:?}", sql, plan);
let plan = ctx.create_physical_plan(&plan).await.expect(&msg);
let plan = state.create_physical_plan(&plan).await.expect(&msg);
//
// Execute plan
let msg = format!("Executing physical plan for '{}': {:?}", sql, plan);
let task_ctx = ctx.task_ctx();
let results = collect(plan, task_ctx).await.expect(&msg);
let results = collect(plan, state.task_ctx()).await.expect(&msg);
let actual = result_vec(&results);
// flatten to a single string
let actual = actual.into_iter().map(|r| r.join("\t")).collect::<String>();
Expand Down Expand Up @@ -481,9 +480,9 @@ async fn csv_explain_verbose_plans() {
);

// Optimized logical plan
//
let msg = format!("Optimizing logical plan for '{}': {:?}", sql, dataframe);
let plan = dataframe.into_optimized_plan().expect(&msg);
let (state, plan) = dataframe.into_parts();
let plan = state.optimize(&plan).expect(&msg);
let optimized_logical_schema = plan.schema();
// Both schema has to be the same
assert_eq!(&logical_schema, optimized_logical_schema.as_ref());
Expand Down Expand Up @@ -558,7 +557,7 @@ async fn csv_explain_verbose_plans() {
// Physical plan
// Create plan
let msg = format!("Creating physical plan for '{}': {:?}", sql, plan);
let plan = ctx.create_physical_plan(&plan).await.expect(&msg);
let plan = state.create_physical_plan(&plan).await.expect(&msg);
//
// Execute plan
let msg = format!("Executing physical plan for '{}': {:?}", sql, plan);
Expand Down
10 changes: 4 additions & 6 deletions datafusion/core/tests/sql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1022,13 +1022,11 @@ async fn try_execute_to_batches(
) -> Result<Vec<RecordBatch>> {
let dataframe = ctx.sql(sql).await?;
let logical_schema = dataframe.schema().clone();
let (state, plan) = dataframe.into_parts();

let optimized = ctx.optimize(dataframe.logical_plan())?;
let optimized_logical_schema = optimized.schema();
let results = dataframe.collect().await?;

assert_eq!(&logical_schema, optimized_logical_schema.as_ref());
Ok(results)
let optimized = state.optimize(&plan)?;
assert_eq!(&logical_schema, optimized.schema().as_ref());
DataFrame::new(state, optimized).collect().await
}

/// Execute query and return results as a Vec of RecordBatches
Expand Down
25 changes: 12 additions & 13 deletions datafusion/core/tests/sql/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,8 @@ async fn projection_on_table_scan() -> Result<()> {
.project(vec![col("c2")])?
.build()?;

let optimized_plan = ctx.optimize(&logical_plan)?;
let state = ctx.state();
let optimized_plan = state.optimize(&logical_plan)?;
match &optimized_plan {
LogicalPlan::Projection(Projection { input, .. }) => match &**input {
LogicalPlan::TableScan(TableScan {
Expand All @@ -192,12 +193,11 @@ async fn projection_on_table_scan() -> Result<()> {
\n TableScan: test projection=[c2]";
assert_eq!(format!("{:?}", optimized_plan), expected);

let physical_plan = ctx.create_physical_plan(&optimized_plan).await?;
let physical_plan = state.create_physical_plan(&optimized_plan).await?;

assert_eq!(1, physical_plan.schema().fields().len());
assert_eq!("c2", physical_plan.schema().field(0).name().as_str());
let task_ctx = ctx.task_ctx();
let batches = collect(physical_plan, task_ctx).await?;
let batches = collect(physical_plan, state.task_ctx()).await?;
assert_eq!(40, batches.iter().map(|x| x.num_rows()).sum::<usize>());

Ok(())
Expand All @@ -215,8 +215,8 @@ async fn preserve_nullability_on_projection() -> Result<()> {
.project(vec![col("c1")])?
.build()?;

let plan = ctx.optimize(&plan)?;
let physical_plan = ctx.create_physical_plan(&Arc::new(plan)).await?;
let dataframe = DataFrame::new(ctx.state(), plan);
let physical_plan = dataframe.create_physical_plan().await?;
assert!(!physical_plan.schema().field_with_name("c1")?.is_nullable());
Ok(())
}
Expand Down Expand Up @@ -247,9 +247,8 @@ async fn project_cast_dictionary() {
.unwrap();

let logical_plan = builder.project(vec![expr]).unwrap().build().unwrap();

let physical_plan = ctx.create_physical_plan(&logical_plan).await.unwrap();
let actual = collect(physical_plan, ctx.task_ctx()).await.unwrap();
let df = DataFrame::new(ctx.state(), logical_plan);
let actual = df.collect().await.unwrap();

let expected = vec![
"+----------------------------------------------------------------------------------+",
Expand Down Expand Up @@ -289,7 +288,8 @@ async fn projection_on_memory_scan() -> Result<()> {
assert_fields_eq(&plan, vec!["b"]);

let ctx = SessionContext::new();
let optimized_plan = ctx.optimize(&plan)?;
let state = ctx.state();
let optimized_plan = state.optimize(&plan)?;
match &optimized_plan {
LogicalPlan::Projection(Projection { input, .. }) => match &**input {
LogicalPlan::TableScan(TableScan {
Expand All @@ -312,13 +312,12 @@ async fn projection_on_memory_scan() -> Result<()> {
);
assert_eq!(format!("{:?}", optimized_plan), expected);

let physical_plan = ctx.create_physical_plan(&optimized_plan).await?;
let physical_plan = state.create_physical_plan(&optimized_plan).await?;

assert_eq!(1, physical_plan.schema().fields().len());
assert_eq!("b", physical_plan.schema().field(0).name().as_str());

let task_ctx = ctx.task_ctx();
let batches = collect(physical_plan, task_ctx).await?;
let batches = collect(physical_plan, state.task_ctx()).await?;
assert_eq!(1, batches.len());
assert_eq!(1, batches[0].num_columns());
assert_eq!(4, batches[0].num_rows());
Expand Down
5 changes: 1 addition & 4 deletions datafusion/core/tests/sql/udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,7 @@ async fn scalar_udf() -> Result<()> {
"Projection: t.a, t.b, my_add(t.a, t.b)\n TableScan: t projection=[a, b]"
);

let plan = ctx.optimize(&plan)?;
let plan = ctx.create_physical_plan(&plan).await?;
let task_ctx = ctx.task_ctx();
let result = collect(plan, task_ctx).await?;
let result = DataFrame::new(ctx.state(), plan).collect().await?;

let expected = vec![
"+-----+-----+-----------------+",
Expand Down
5 changes: 3 additions & 2 deletions datafusion/core/tests/tpcds_planning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1067,9 +1067,10 @@ async fn regression_test(query_no: u8, create_physical: bool) -> Result<()> {

for sql in &sql {
let df = ctx.sql(sql).await?;
let plan = df.into_optimized_plan()?;
let (state, plan) = df.into_parts();
let plan = state.optimize(&plan)?;
if create_physical {
let _ = ctx.create_physical_plan(&plan).await?;
let _ = state.create_physical_plan(&plan).await?;
}
}

Expand Down