Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update physical_plan tests to not use SessionContext #7243

Merged
merged 3 commits into from
Aug 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 9 additions & 19 deletions datafusion/core/src/physical_plan/aggregates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1215,7 +1215,6 @@ fn evaluate_group_by(
#[cfg(test)]
mod tests {
use super::*;
use crate::execution::context::SessionConfig;
use crate::physical_plan::aggregates::GroupByOrderMode::{
FullyOrdered, PartiallyOrdered,
};
Expand All @@ -1231,7 +1230,6 @@ mod tests {
DisplayAs, ExecutionPlan, Partitioning, RecordBatchStream,
SendableRecordBatchStream, Statistics,
};
use crate::prelude::SessionContext;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The point of this PR is to remove these dependencies

use crate::test::exec::{assert_strong_count_converges_to_zero, BlockingExec};
use crate::test::{assert_is_pending, csv_exec_sorted};
use crate::{assert_batches_eq, assert_batches_sorted_eq, physical_plan::common};
Expand Down Expand Up @@ -1449,8 +1447,7 @@ mod tests {
DataType::Int64,
))];

let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
let task_ctx = Arc::new(TaskContext::default());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This pattern is 95% of this PR: create the TaskContext directly rather than via a SessionContext


let partial_aggregate = Arc::new(AggregateExec::try_new(
AggregateMode::Partial,
Expand Down Expand Up @@ -1556,8 +1553,7 @@ mod tests {
DataType::Float64,
))];

let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
let task_ctx = Arc::new(TaskContext::default());

let partial_aggregate = Arc::new(AggregateExec::try_new(
AggregateMode::Partial,
Expand Down Expand Up @@ -1779,14 +1775,11 @@ mod tests {
Arc::new(TestYieldingExec { yield_first: true });
let input_schema = input.schema();

let session_ctx = SessionContext::with_config_rt(
SessionConfig::default(),
Arc::new(
RuntimeEnv::new(RuntimeConfig::default().with_memory_limit(1, 1.0))
.unwrap(),
),
let runtime = Arc::new(
RuntimeEnv::new(RuntimeConfig::default().with_memory_limit(1, 1.0)).unwrap(),
);
let task_ctx = session_ctx.task_ctx();
let task_ctx = TaskContext::default().with_runtime(runtime);
let task_ctx = Arc::new(task_ctx);

let groups_none = PhysicalGroupBy::default();
let groups_some = PhysicalGroupBy {
Expand Down Expand Up @@ -1864,8 +1857,7 @@ mod tests {

#[tokio::test]
async fn test_drop_cancel_without_groups() -> Result<()> {
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
let task_ctx = Arc::new(TaskContext::default());
let schema =
Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, true)]));

Expand Down Expand Up @@ -1901,8 +1893,7 @@ mod tests {

#[tokio::test]
async fn test_drop_cancel_with_groups() -> Result<()> {
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
let task_ctx = Arc::new(TaskContext::default());
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Float32, true),
Field::new("b", DataType::Float32, true),
Expand Down Expand Up @@ -1970,8 +1961,7 @@ mod tests {
use_coalesce_batches: bool,
is_first_acc: bool,
) -> Result<()> {
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
let task_ctx = Arc::new(TaskContext::default());

let (schema, data) = some_data_v2();
let partition1 = data[0].clone();
Expand Down
4 changes: 1 addition & 3 deletions datafusion/core/src/physical_plan/analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,6 @@ mod tests {
use arrow::datatypes::{DataType, Field, Schema};
use futures::FutureExt;

use crate::prelude::SessionContext;
use crate::{
physical_plan::collect,
test::{
Expand All @@ -242,8 +241,7 @@ mod tests {

#[tokio::test]
async fn test_drop_cancel() -> Result<()> {
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
let task_ctx = Arc::new(TaskContext::default());
let schema =
Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, true)]));

Expand Down
39 changes: 1 addition & 38 deletions datafusion/core/src/physical_plan/coalesce_batches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,46 +305,10 @@ pub fn concat_batches(
#[cfg(test)]
mod tests {
use super::*;
use crate::config::ConfigOptions;
use crate::datasource::MemTable;
use crate::physical_plan::filter::FilterExec;
use crate::physical_plan::{memory::MemoryExec, repartition::RepartitionExec};
use crate::prelude::SessionContext;
use crate::test::create_vec_batches;
use arrow::datatypes::{DataType, Field, Schema};

#[tokio::test]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are tests that a config option makes it through to planning SQL, so I moved them to options.slt

async fn test_custom_batch_size() -> Result<()> {
let mut config = ConfigOptions::new();
config.execution.batch_size = 1234;

let ctx = SessionContext::with_config(config.into());
let plan = create_physical_plan(ctx).await?;
let coalesce = plan.as_any().downcast_ref::<CoalesceBatchesExec>().unwrap();
assert_eq!(1234, coalesce.target_batch_size);
Ok(())
}

#[tokio::test]
async fn test_disable_coalesce() -> Result<()> {
let mut config = ConfigOptions::new();
config.execution.coalesce_batches = false;

let ctx = SessionContext::with_config(config.into());
let plan = create_physical_plan(ctx).await?;
let _filter = plan.as_any().downcast_ref::<FilterExec>().unwrap();
Ok(())
}

async fn create_physical_plan(ctx: SessionContext) -> Result<Arc<dyn ExecutionPlan>> {
let schema = test_schema();
let partition = create_vec_batches(&schema, 10);
let table = MemTable::try_new(schema, vec![partition])?;
ctx.register_table("a", Arc::new(table))?;
let dataframe = ctx.sql("SELECT * FROM a WHERE c0 < 1").await?;
dataframe.create_physical_plan().await
}

#[tokio::test(flavor = "multi_thread")]
async fn test_concat_batches() -> Result<()> {
let schema = test_schema();
Expand Down Expand Up @@ -385,10 +349,9 @@ mod tests {
// execute and collect results
let output_partition_count = exec.output_partitioning().partition_count();
let mut output_partitions = Vec::with_capacity(output_partition_count);
let session_ctx = SessionContext::new();
for i in 0..output_partition_count {
// execute this *output* partition and collect all batches
let task_ctx = session_ctx.task_ctx();
let task_ctx = Arc::new(TaskContext::default());
let mut stream = exec.execute(i, task_ctx.clone())?;
let mut batches = vec![];
while let Some(result) = stream.next().await {
Expand Down
10 changes: 3 additions & 7 deletions datafusion/core/src/physical_plan/coalesce_partitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,16 +176,14 @@ mod tests {

use super::*;
use crate::physical_plan::{collect, common};
use crate::prelude::SessionContext;
use crate::test::exec::{
assert_strong_count_converges_to_zero, BlockingExec, PanicExec,
};
use crate::test::{self, assert_is_pending};

#[tokio::test]
async fn merge() -> Result<()> {
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
let task_ctx = Arc::new(TaskContext::default());

let num_partitions = 4;
let csv = test::scan_partitioned_csv(num_partitions)?;
Expand All @@ -212,8 +210,7 @@ mod tests {

#[tokio::test]
async fn test_drop_cancel() -> Result<()> {
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
let task_ctx = Arc::new(TaskContext::default());
let schema =
Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, true)]));

Expand All @@ -235,8 +232,7 @@ mod tests {
#[tokio::test]
#[should_panic(expected = "PanickingStream did panic")]
async fn test_panic() {
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
let task_ctx = Arc::new(TaskContext::default());
let schema =
Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, true)]));

Expand Down
13 changes: 4 additions & 9 deletions datafusion/core/src/physical_plan/empty.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,13 +174,11 @@ impl ExecutionPlan for EmptyExec {
mod tests {
use super::*;
use crate::physical_plan::with_new_children_if_necessary;
use crate::prelude::SessionContext;
use crate::{physical_plan::common, test_util};

#[tokio::test]
async fn empty() -> Result<()> {
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
let task_ctx = Arc::new(TaskContext::default());
let schema = test_util::aggr_test_schema();

let empty = EmptyExec::new(false, schema.clone());
Expand Down Expand Up @@ -217,8 +215,7 @@ mod tests {

#[tokio::test]
async fn invalid_execute() -> Result<()> {
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
let task_ctx = Arc::new(TaskContext::default());
let schema = test_util::aggr_test_schema();
let empty = EmptyExec::new(false, schema);

Expand All @@ -230,8 +227,7 @@ mod tests {

#[tokio::test]
async fn produce_one_row() -> Result<()> {
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
let task_ctx = Arc::new(TaskContext::default());
let schema = test_util::aggr_test_schema();
let empty = EmptyExec::new(true, schema);

Expand All @@ -246,8 +242,7 @@ mod tests {

#[tokio::test]
async fn produce_one_row_multiple_partition() -> Result<()> {
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
let task_ctx = Arc::new(TaskContext::default());
let schema = test_util::aggr_test_schema();
let partitions = 3;
let empty = EmptyExec::new(true, schema).with_partitions(partitions);
Expand Down
4 changes: 1 addition & 3 deletions datafusion/core/src/physical_plan/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,6 @@ mod tests {
use crate::physical_plan::expressions::*;
use crate::physical_plan::ExecutionPlan;
use crate::physical_plan::{collect, with_new_children_if_necessary};
use crate::prelude::SessionContext;
use crate::test;
use crate::test::exec::StatisticsExec;
use crate::test_util;
Expand All @@ -395,8 +394,7 @@ mod tests {

#[tokio::test]
async fn simple_predicate() -> Result<()> {
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
let task_ctx = Arc::new(TaskContext::default());
let schema = test_util::aggr_test_schema();

let partitions = 4;
Expand Down
9 changes: 3 additions & 6 deletions datafusion/core/src/physical_plan/joins/cross_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,6 @@ mod tests {
use crate::assert_batches_sorted_eq;
use crate::common::assert_contains;
use crate::physical_plan::common;
use crate::prelude::{SessionConfig, SessionContext};
use crate::test::{build_table_scan_i32, columns};
use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnv};

Expand Down Expand Up @@ -617,8 +616,7 @@ mod tests {

#[tokio::test]
async fn test_join() -> Result<()> {
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
let task_ctx = Arc::new(TaskContext::default());

let left = build_table_scan_i32(
("a1", &vec![1, 2, 3]),
Expand Down Expand Up @@ -656,9 +654,8 @@ mod tests {
async fn test_overallocation() -> Result<()> {
let runtime_config = RuntimeConfig::new().with_memory_limit(100, 1.0);
let runtime = Arc::new(RuntimeEnv::new(runtime_config)?);
let session_ctx =
SessionContext::with_config_rt(SessionConfig::default(), runtime);
let task_ctx = session_ctx.task_ctx();
let task_ctx = TaskContext::default().with_runtime(runtime);
let task_ctx = Arc::new(task_ctx);

let left = build_table_scan_i32(
("a1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
Expand Down
Loading