diff --git a/datafusion/core/src/physical_plan/aggregates/mod.rs b/datafusion/core/src/physical_plan/aggregates/mod.rs index 473d4eb131f1..be3b8684992e 100644 --- a/datafusion/core/src/physical_plan/aggregates/mod.rs +++ b/datafusion/core/src/physical_plan/aggregates/mod.rs @@ -1215,7 +1215,6 @@ fn evaluate_group_by( #[cfg(test)] mod tests { use super::*; - use crate::execution::context::SessionConfig; use crate::physical_plan::aggregates::GroupByOrderMode::{ FullyOrdered, PartiallyOrdered, }; @@ -1231,7 +1230,6 @@ mod tests { DisplayAs, ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream, Statistics, }; - use crate::prelude::SessionContext; use crate::test::exec::{assert_strong_count_converges_to_zero, BlockingExec}; use crate::test::{assert_is_pending, csv_exec_sorted}; use crate::{assert_batches_eq, assert_batches_sorted_eq, physical_plan::common}; @@ -1449,8 +1447,7 @@ mod tests { DataType::Int64, ))]; - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let task_ctx = Arc::new(TaskContext::default()); let partial_aggregate = Arc::new(AggregateExec::try_new( AggregateMode::Partial, @@ -1556,8 +1553,7 @@ mod tests { DataType::Float64, ))]; - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let task_ctx = Arc::new(TaskContext::default()); let partial_aggregate = Arc::new(AggregateExec::try_new( AggregateMode::Partial, @@ -1779,14 +1775,11 @@ mod tests { Arc::new(TestYieldingExec { yield_first: true }); let input_schema = input.schema(); - let session_ctx = SessionContext::with_config_rt( - SessionConfig::default(), - Arc::new( - RuntimeEnv::new(RuntimeConfig::default().with_memory_limit(1, 1.0)) - .unwrap(), - ), + let runtime = Arc::new( + RuntimeEnv::new(RuntimeConfig::default().with_memory_limit(1, 1.0)).unwrap(), ); - let task_ctx = session_ctx.task_ctx(); + let task_ctx = TaskContext::default().with_runtime(runtime); + let task_ctx = Arc::new(task_ctx); let groups_none = PhysicalGroupBy::default(); let groups_some = PhysicalGroupBy { @@ -1864,8 +1857,7 @@ mod tests { #[tokio::test] async fn test_drop_cancel_without_groups() -> Result<()> { - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let task_ctx = Arc::new(TaskContext::default()); let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, true)])); @@ -1901,8 +1893,7 @@ mod tests { #[tokio::test] async fn test_drop_cancel_with_groups() -> Result<()> { - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let task_ctx = Arc::new(TaskContext::default()); let schema = Arc::new(Schema::new(vec![ Field::new("a", DataType::Float32, true), Field::new("b", DataType::Float32, true), @@ -1970,8 +1961,7 @@ mod tests { use_coalesce_batches: bool, is_first_acc: bool, ) -> Result<()> { - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let task_ctx = Arc::new(TaskContext::default()); let (schema, data) = some_data_v2(); let partition1 = data[0].clone(); diff --git a/datafusion/core/src/physical_plan/analyze.rs b/datafusion/core/src/physical_plan/analyze.rs index bba1c37e6d61..d8c87d2f513f 100644 --- a/datafusion/core/src/physical_plan/analyze.rs +++ b/datafusion/core/src/physical_plan/analyze.rs @@ -229,7 +229,6 @@ mod tests { use arrow::datatypes::{DataType, Field, Schema}; use futures::FutureExt; - use crate::prelude::SessionContext; use crate::{ physical_plan::collect, test::{ @@ -242,8 +241,7 @@ mod tests { #[tokio::test] async fn test_drop_cancel() -> Result<()> { - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let task_ctx = Arc::new(TaskContext::default()); let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, true)])); diff --git a/datafusion/core/src/physical_plan/coalesce_batches.rs b/datafusion/core/src/physical_plan/coalesce_batches.rs index 994f75ce4cfe..71ad39658f91 100644 --- a/datafusion/core/src/physical_plan/coalesce_batches.rs +++ b/datafusion/core/src/physical_plan/coalesce_batches.rs @@ -305,46 +305,10 @@ pub fn concat_batches( #[cfg(test)] mod tests { use super::*; - use crate::config::ConfigOptions; - use crate::datasource::MemTable; - use crate::physical_plan::filter::FilterExec; use crate::physical_plan::{memory::MemoryExec, repartition::RepartitionExec}; - use crate::prelude::SessionContext; use crate::test::create_vec_batches; use arrow::datatypes::{DataType, Field, Schema}; - #[tokio::test] - async fn test_custom_batch_size() -> Result<()> { - let mut config = ConfigOptions::new(); - config.execution.batch_size = 1234; - - let ctx = SessionContext::with_config(config.into()); - let plan = create_physical_plan(ctx).await?; - let coalesce = plan.as_any().downcast_ref::().unwrap(); - assert_eq!(1234, coalesce.target_batch_size); - Ok(()) - } - - #[tokio::test] - async fn test_disable_coalesce() -> Result<()> { - let mut config = ConfigOptions::new(); - config.execution.coalesce_batches = false; - - let ctx = SessionContext::with_config(config.into()); - let plan = create_physical_plan(ctx).await?; - let _filter = plan.as_any().downcast_ref::().unwrap(); - Ok(()) - } - - async fn create_physical_plan(ctx: SessionContext) -> Result> { - let schema = test_schema(); - let partition = create_vec_batches(&schema, 10); - let table = MemTable::try_new(schema, vec![partition])?; - ctx.register_table("a", Arc::new(table))?; - let dataframe = ctx.sql("SELECT * FROM a WHERE c0 < 1").await?; - dataframe.create_physical_plan().await - } - #[tokio::test(flavor = "multi_thread")] async fn test_concat_batches() -> Result<()> { let schema = test_schema(); @@ -385,10 +349,9 @@ mod tests { // execute and collect results let output_partition_count = exec.output_partitioning().partition_count(); let mut output_partitions = Vec::with_capacity(output_partition_count); - let session_ctx = SessionContext::new(); for i in 0..output_partition_count { // execute this *output* partition and collect all batches - let task_ctx = session_ctx.task_ctx(); + let task_ctx = Arc::new(TaskContext::default()); let mut stream = exec.execute(i, task_ctx.clone())?; let mut batches = vec![]; while let Some(result) = stream.next().await { diff --git a/datafusion/core/src/physical_plan/coalesce_partitions.rs b/datafusion/core/src/physical_plan/coalesce_partitions.rs index bc48b5f5e15b..14e8aada6ca7 100644 --- a/datafusion/core/src/physical_plan/coalesce_partitions.rs +++ b/datafusion/core/src/physical_plan/coalesce_partitions.rs @@ -176,7 +176,6 @@ mod tests { use super::*; use crate::physical_plan::{collect, common}; - use crate::prelude::SessionContext; use crate::test::exec::{ assert_strong_count_converges_to_zero, BlockingExec, PanicExec, }; @@ -184,8 +183,7 @@ mod tests { #[tokio::test] async fn merge() -> Result<()> { - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let task_ctx = Arc::new(TaskContext::default()); let num_partitions = 4; let csv = test::scan_partitioned_csv(num_partitions)?; @@ -212,8 +210,7 @@ mod tests { #[tokio::test] async fn test_drop_cancel() -> Result<()> { - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let task_ctx = Arc::new(TaskContext::default()); let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, true)])); @@ -235,8 +232,7 @@ mod tests { #[tokio::test] #[should_panic(expected = "PanickingStream did panic")] async fn test_panic() { - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let task_ctx = Arc::new(TaskContext::default()); let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, true)])); diff --git a/datafusion/core/src/physical_plan/empty.rs b/datafusion/core/src/physical_plan/empty.rs index 17bfa0af3a87..7da4e80c1217 100644 --- a/datafusion/core/src/physical_plan/empty.rs +++ b/datafusion/core/src/physical_plan/empty.rs @@ -174,13 +174,11 @@ impl ExecutionPlan for EmptyExec { mod tests { use super::*; use crate::physical_plan::with_new_children_if_necessary; - use crate::prelude::SessionContext; use crate::{physical_plan::common, test_util}; #[tokio::test] async fn empty() -> Result<()> { - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let task_ctx = Arc::new(TaskContext::default()); let schema = test_util::aggr_test_schema(); let empty = EmptyExec::new(false, schema.clone()); @@ -217,8 +215,7 @@ mod tests { #[tokio::test] async fn invalid_execute() -> Result<()> { - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let task_ctx = Arc::new(TaskContext::default()); let schema = test_util::aggr_test_schema(); let empty = EmptyExec::new(false, schema); @@ -230,8 +227,7 @@ mod tests { #[tokio::test] async fn produce_one_row() -> Result<()> { - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let task_ctx = Arc::new(TaskContext::default()); let schema = test_util::aggr_test_schema(); let empty = EmptyExec::new(true, schema); @@ -246,8 +242,7 @@ mod tests { #[tokio::test] async fn produce_one_row_multiple_partition() -> Result<()> { - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let task_ctx = Arc::new(TaskContext::default()); let schema = test_util::aggr_test_schema(); let partitions = 3; let empty = EmptyExec::new(true, schema).with_partitions(partitions); diff --git a/datafusion/core/src/physical_plan/filter.rs b/datafusion/core/src/physical_plan/filter.rs index e8c181a34ba2..f9fc4fb4b61c 100644 --- a/datafusion/core/src/physical_plan/filter.rs +++ b/datafusion/core/src/physical_plan/filter.rs @@ -381,7 +381,6 @@ mod tests { use crate::physical_plan::expressions::*; use crate::physical_plan::ExecutionPlan; use crate::physical_plan::{collect, with_new_children_if_necessary}; - use crate::prelude::SessionContext; use crate::test; use crate::test::exec::StatisticsExec; use crate::test_util; @@ -395,8 +394,7 @@ mod tests { #[tokio::test] async fn simple_predicate() -> Result<()> { - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let task_ctx = Arc::new(TaskContext::default()); let schema = test_util::aggr_test_schema(); let partitions = 4; diff --git a/datafusion/core/src/physical_plan/joins/cross_join.rs b/datafusion/core/src/physical_plan/joins/cross_join.rs index eaee9892d0e2..6d74a069b6e8 100644 --- a/datafusion/core/src/physical_plan/joins/cross_join.rs +++ b/datafusion/core/src/physical_plan/joins/cross_join.rs @@ -460,7 +460,6 @@ mod tests { use crate::assert_batches_sorted_eq; use crate::common::assert_contains; use crate::physical_plan::common; - use crate::prelude::{SessionConfig, SessionContext}; use crate::test::{build_table_scan_i32, columns}; use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnv}; @@ -617,8 +616,7 @@ mod tests { #[tokio::test] async fn test_join() -> Result<()> { - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let task_ctx = Arc::new(TaskContext::default()); let left = build_table_scan_i32( ("a1", &vec![1, 2, 3]), @@ -656,9 +654,8 @@ mod tests { async fn test_overallocation() -> Result<()> { let runtime_config = RuntimeConfig::new().with_memory_limit(100, 1.0); let runtime = Arc::new(RuntimeEnv::new(runtime_config)?); - let session_ctx = - SessionContext::with_config_rt(SessionConfig::default(), runtime); - let task_ctx = session_ctx.task_ctx(); + let task_ctx = TaskContext::default().with_runtime(runtime); + let task_ctx = Arc::new(task_ctx); let left = build_table_scan_i32( ("a1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]), diff --git a/datafusion/core/src/physical_plan/joins/hash_join.rs b/datafusion/core/src/physical_plan/joins/hash_join.rs index 2108893ccbb0..3522656142f3 100644 --- a/datafusion/core/src/physical_plan/joins/hash_join.rs +++ b/datafusion/core/src/physical_plan/joins/hash_join.rs @@ -1402,7 +1402,6 @@ mod tests { use crate::execution::context::SessionConfig; use crate::physical_expr::expressions::BinaryExpr; - use crate::prelude::SessionContext; use crate::{ assert_batches_sorted_eq, common::assert_contains, @@ -1540,8 +1539,7 @@ mod tests { #[tokio::test] async fn join_inner_one() -> Result<()> { - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let task_ctx = Arc::new(TaskContext::default()); let left = build_table( ("a1", &vec![1, 2, 3]), ("b1", &vec![4, 5, 5]), // this has a repetition @@ -1586,8 +1584,7 @@ mod tests { #[tokio::test] async fn partitioned_join_inner_one() -> Result<()> { - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let task_ctx = Arc::new(TaskContext::default()); let left = build_table( ("a1", &vec![1, 2, 3]), ("b1", &vec![4, 5, 5]), // this has a repetition @@ -1631,8 +1628,7 @@ mod tests { #[tokio::test] async fn join_inner_one_no_shared_column_names() -> Result<()> { - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let task_ctx = Arc::new(TaskContext::default()); let left = build_table( ("a1", &vec![1, 2, 3]), ("b1", &vec![4, 5, 5]), // this has a repetition @@ -1670,8 +1666,7 @@ mod tests { #[tokio::test] async fn join_inner_two() -> Result<()> { - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let task_ctx = Arc::new(TaskContext::default()); let left = build_table( ("a1", &vec![1, 2, 2]), ("b2", &vec![1, 2, 2]), @@ -1718,8 +1713,7 @@ mod tests { /// Test where the left has 2 parts, the right with 1 part => 1 part #[tokio::test] async fn join_inner_one_two_parts_left() -> Result<()> { - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let task_ctx = Arc::new(TaskContext::default()); let batch1 = build_table_i32( ("a1", &vec![1, 2]), ("b2", &vec![1, 2]), @@ -1773,8 +1767,7 @@ mod tests { /// Test where the left has 1 part, the right has 2 parts => 2 parts #[tokio::test] async fn join_inner_one_two_parts_right() -> Result<()> { - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let task_ctx = Arc::new(TaskContext::default()); let left = build_table( ("a1", &vec![1, 2, 3]), ("b1", &vec![4, 5, 5]), // this has a repetition @@ -1849,8 +1842,7 @@ mod tests { #[tokio::test] async fn join_left_multi_batch() { - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let task_ctx = Arc::new(TaskContext::default()); let left = build_table( ("a1", &vec![1, 2, 3]), ("b1", &vec![4, 5, 7]), // 7 does not exist on the right @@ -1891,8 +1883,7 @@ mod tests { #[tokio::test] async fn join_full_multi_batch() { - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let task_ctx = Arc::new(TaskContext::default()); let left = build_table( ("a1", &vec![1, 2, 3]), ("b1", &vec![4, 5, 7]), // 7 does not exist on the right @@ -1936,8 +1927,7 @@ mod tests { #[tokio::test] async fn join_left_empty_right() { - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let task_ctx = Arc::new(TaskContext::default()); let left = build_table( ("a1", &vec![1, 2, 3]), ("b1", &vec![4, 5, 7]), @@ -1973,8 +1963,7 @@ mod tests { #[tokio::test] async fn join_full_empty_right() { - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let task_ctx = Arc::new(TaskContext::default()); let left = build_table( ("a1", &vec![1, 2, 3]), ("b1", &vec![4, 5, 7]), @@ -2010,8 +1999,7 @@ mod tests { #[tokio::test] async fn join_left_one() -> Result<()> { - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let task_ctx = Arc::new(TaskContext::default()); let left = build_table( ("a1", &vec![1, 2, 3]), ("b1", &vec![4, 5, 7]), // 7 does not exist on the right @@ -2054,8 +2042,7 @@ mod tests { #[tokio::test] async fn partitioned_join_left_one() -> Result<()> { - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let task_ctx = Arc::new(TaskContext::default()); let left = build_table( ("a1", &vec![1, 2, 3]), ("b1", &vec![4, 5, 7]), // 7 does not exist on the right @@ -2118,8 +2105,7 @@ mod tests { #[tokio::test] async fn join_left_semi() -> Result<()> { - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let task_ctx = Arc::new(TaskContext::default()); let left = build_semi_anti_left_table(); let right = build_semi_anti_right_table(); // left_table left semi join right_table on left_table.b1 = right_table.b2 @@ -2153,8 +2139,7 @@ mod tests { #[tokio::test] async fn join_left_semi_with_filter() -> Result<()> { - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let task_ctx = Arc::new(TaskContext::default()); let left = build_semi_anti_left_table(); let right = build_semi_anti_right_table(); @@ -2240,8 +2225,7 @@ mod tests { #[tokio::test] async fn join_right_semi() -> Result<()> { - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let task_ctx = Arc::new(TaskContext::default()); let left = build_semi_anti_left_table(); let right = build_semi_anti_right_table(); @@ -2275,8 +2259,7 @@ mod tests { #[tokio::test] async fn join_right_semi_with_filter() -> Result<()> { - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let task_ctx = Arc::new(TaskContext::default()); let left = build_semi_anti_left_table(); let right = build_semi_anti_right_table(); @@ -2361,8 +2344,7 @@ mod tests { #[tokio::test] async fn join_left_anti() -> Result<()> { - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let task_ctx = Arc::new(TaskContext::default()); let left = build_semi_anti_left_table(); let right = build_semi_anti_right_table(); // left_table left anti join right_table on left_table.b1 = right_table.b2 @@ -2395,8 +2377,7 @@ mod tests { #[tokio::test] async fn join_left_anti_with_filter() -> Result<()> { - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let task_ctx = Arc::new(TaskContext::default()); let left = build_semi_anti_left_table(); let right = build_semi_anti_right_table(); // left_table left anti join right_table on left_table.b1 = right_table.b2 and right_table.a2!=8 @@ -2489,8 +2470,7 @@ mod tests { #[tokio::test] async fn join_right_anti() -> Result<()> { - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let task_ctx = Arc::new(TaskContext::default()); let left = build_semi_anti_left_table(); let right = build_semi_anti_right_table(); let on = vec![( @@ -2521,8 +2501,7 @@ mod tests { #[tokio::test] async fn join_right_anti_with_filter() -> Result<()> { - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let task_ctx = Arc::new(TaskContext::default()); let left = build_semi_anti_left_table(); let right = build_semi_anti_right_table(); // left_table right anti join right_table on left_table.b1 = right_table.b2 and left_table.a1!=13 @@ -2618,8 +2597,7 @@ mod tests { #[tokio::test] async fn join_right_one() -> Result<()> { - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let task_ctx = Arc::new(TaskContext::default()); let left = build_table( ("a1", &vec![1, 2, 3]), ("b1", &vec![4, 5, 7]), @@ -2657,8 +2635,7 @@ mod tests { #[tokio::test] async fn partitioned_join_right_one() -> Result<()> { - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let task_ctx = Arc::new(TaskContext::default()); let left = build_table( ("a1", &vec![1, 2, 3]), ("b1", &vec![4, 5, 7]), @@ -2697,8 +2674,7 @@ mod tests { #[tokio::test] async fn join_full_one() -> Result<()> { - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let task_ctx = Arc::new(TaskContext::default()); let left = build_table( ("a1", &vec![1, 2, 3]), ("b1", &vec![4, 5, 7]), // 7 does not exist on the right @@ -2800,8 +2776,7 @@ mod tests { #[tokio::test] async fn join_with_duplicated_column_names() -> Result<()> { - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let task_ctx = Arc::new(TaskContext::default()); let left = build_table( ("a", &vec![1, 2, 3]), ("b", &vec![4, 5, 7]), @@ -2865,8 +2840,7 @@ mod tests { #[tokio::test] async fn join_inner_with_filter() -> Result<()> { - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let task_ctx = Arc::new(TaskContext::default()); let left = build_table( ("a", &vec![0, 1, 2, 2]), ("b", &vec![4, 5, 7, 8]), @@ -2906,8 +2880,7 @@ mod tests { #[tokio::test] async fn join_left_with_filter() -> Result<()> { - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let task_ctx = Arc::new(TaskContext::default()); let left = build_table( ("a", &vec![0, 1, 2, 2]), ("b", &vec![4, 5, 7, 8]), @@ -2950,8 +2923,7 @@ mod tests { #[tokio::test] async fn join_right_with_filter() -> Result<()> { - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let task_ctx = Arc::new(TaskContext::default()); let left = build_table( ("a", &vec![0, 1, 2, 2]), ("b", &vec![4, 5, 7, 8]), @@ -2993,8 +2965,7 @@ mod tests { #[tokio::test] async fn join_full_with_filter() -> Result<()> { - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let task_ctx = Arc::new(TaskContext::default()); let left = build_table( ("a", &vec![0, 1, 2, 2]), ("b", &vec![4, 5, 7, 8]), @@ -3062,8 +3033,7 @@ mod tests { let join = join(left, right, on, &JoinType::Inner, false)?; - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let task_ctx = Arc::new(TaskContext::default()); let stream = join.execute(0, task_ctx)?; let batches = common::collect(stream).await?; @@ -3122,8 +3092,7 @@ mod tests { false, ) .unwrap(); - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let task_ctx = Arc::new(TaskContext::default()); let stream = join.execute(0, task_ctx).unwrap(); @@ -3170,9 +3139,8 @@ mod tests { for join_type in join_types { let runtime_config = RuntimeConfig::new().with_memory_limit(100, 1.0); let runtime = Arc::new(RuntimeEnv::new(runtime_config)?); - let session_ctx = - SessionContext::with_config_rt(SessionConfig::default(), runtime); - let task_ctx = session_ctx.task_ctx(); + let task_ctx = TaskContext::default().with_runtime(runtime); + let task_ctx = Arc::new(task_ctx); let join = join(left.clone(), right.clone(), on.clone(), &join_type, false)?; @@ -3241,8 +3209,10 @@ mod tests { let runtime_config = RuntimeConfig::new().with_memory_limit(100, 1.0); let runtime = Arc::new(RuntimeEnv::new(runtime_config)?); let session_config = SessionConfig::default().with_batch_size(50); - let session_ctx = SessionContext::with_config_rt(session_config, runtime); - let task_ctx = session_ctx.task_ctx(); + let task_ctx = TaskContext::default() + .with_session_config(session_config) + .with_runtime(runtime); + let task_ctx = Arc::new(task_ctx); let join = HashJoinExec::try_new( left.clone(), diff --git a/datafusion/core/src/physical_plan/joins/nested_loop_join.rs b/datafusion/core/src/physical_plan/joins/nested_loop_join.rs index cad3b4743bc9..60fdf452cfc1 100644 --- a/datafusion/core/src/physical_plan/joins/nested_loop_join.rs +++ b/datafusion/core/src/physical_plan/joins/nested_loop_join.rs @@ -744,10 +744,7 @@ mod tests { use crate::{ assert_batches_sorted_eq, common::assert_contains, - execution::{ - context::SessionConfig, - runtime_env::{RuntimeConfig, RuntimeEnv}, - }, + execution::runtime_env::{RuntimeConfig, RuntimeEnv}, physical_plan::{ common, expressions::Column, memory::MemoryExec, repartition::RepartitionExec, }, @@ -757,7 +754,6 @@ mod tests { use datafusion_expr::Operator; use crate::physical_plan::joins::utils::JoinSide; - use crate::prelude::SessionContext; use datafusion_common::ScalarValue; use datafusion_physical_expr::expressions::Literal; use datafusion_physical_expr::PhysicalExpr; @@ -884,8 +880,7 @@ mod tests { #[tokio::test] async fn join_inner_with_filter() -> Result<()> { - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let task_ctx = Arc::new(TaskContext::default()); let left = build_left_table(); let right = build_right_table(); let filter = prepare_join_filter(); @@ -913,8 +908,7 @@ mod tests { #[tokio::test] async fn join_left_with_filter() -> Result<()> { - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let task_ctx = Arc::new(TaskContext::default()); let left = build_left_table(); let right = build_right_table(); @@ -945,8 +939,7 @@ mod tests { #[tokio::test] async fn join_right_with_filter() -> Result<()> { - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let task_ctx = Arc::new(TaskContext::default()); let left = build_left_table(); let right = build_right_table(); @@ -977,8 +970,7 @@ mod tests { #[tokio::test] async fn join_full_with_filter() -> Result<()> { - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let task_ctx = Arc::new(TaskContext::default()); let left = build_left_table(); let right = build_right_table(); @@ -1011,8 +1003,7 @@ mod tests { #[tokio::test] async fn join_left_semi_with_filter() -> Result<()> { - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let task_ctx = Arc::new(TaskContext::default()); let left = build_left_table(); let right = build_right_table(); @@ -1041,8 +1032,7 @@ mod tests { #[tokio::test] async fn join_left_anti_with_filter() -> Result<()> { - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let task_ctx = Arc::new(TaskContext::default()); let left = build_left_table(); let right = build_right_table(); @@ -1072,8 +1062,7 @@ mod tests { #[tokio::test] async fn join_right_semi_with_filter() -> Result<()> { - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let task_ctx = Arc::new(TaskContext::default()); let left = build_left_table(); let right = build_right_table(); @@ -1102,8 +1091,7 @@ mod tests { #[tokio::test] async fn join_right_anti_with_filter() -> Result<()> { - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let task_ctx = Arc::new(TaskContext::default()); let left = build_left_table(); let right = build_right_table(); @@ -1159,9 +1147,8 @@ mod tests { for join_type in join_types { let runtime_config = RuntimeConfig::new().with_memory_limit(100, 1.0); let runtime = Arc::new(RuntimeEnv::new(runtime_config)?); - let session_ctx = - SessionContext::with_config_rt(SessionConfig::default(), runtime); - let task_ctx = session_ctx.task_ctx(); + let task_ctx = TaskContext::default().with_runtime(runtime); + let task_ctx = Arc::new(task_ctx); let err = multi_partitioned_join_collect( left.clone(), diff --git a/datafusion/core/src/physical_plan/joins/sort_merge_join.rs b/datafusion/core/src/physical_plan/joins/sort_merge_join.rs index b3721eb4d616..902b02285735 100644 --- a/datafusion/core/src/physical_plan/joins/sort_merge_join.rs +++ b/datafusion/core/src/physical_plan/joins/sort_merge_join.rs @@ -1389,6 +1389,8 @@ mod tests { use arrow::compute::SortOptions; use arrow::datatypes::{DataType, Field, Schema}; use arrow::record_batch::RecordBatch; + use datafusion_execution::config::SessionConfig; + use datafusion_execution::TaskContext; use crate::common::assert_contains; use crate::physical_plan::expressions::Column; @@ -1396,7 +1398,6 @@ mod tests { use crate::physical_plan::joins::SortMergeJoinExec; use crate::physical_plan::memory::MemoryExec; use crate::physical_plan::{common, ExecutionPlan}; - use crate::prelude::{SessionConfig, SessionContext}; use crate::test::{build_table_i32, columns}; use crate::{assert_batches_eq, assert_batches_sorted_eq}; use datafusion_common::JoinType; @@ -1537,8 +1538,7 @@ mod tests { sort_options: Vec, null_equals_null: bool, ) -> Result<(Vec, Vec)> { - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let task_ctx = Arc::new(TaskContext::default()); let join = join_with_options( left, right, @@ -1560,9 +1560,9 @@ mod tests { on: JoinOn, join_type: JoinType, ) -> Result<(Vec, Vec)> { - let session_ctx = - SessionContext::with_config(SessionConfig::new().with_batch_size(2)); - let task_ctx = session_ctx.task_ctx(); + let task_ctx = TaskContext::default() + .with_session_config(SessionConfig::new().with_batch_size(2)); + let task_ctx = Arc::new(task_ctx); let join = join(left, right, on, join_type)?; let columns = columns(&join.schema()); @@ -2321,8 +2321,12 @@ mod tests { let runtime_config = RuntimeConfig::new().with_memory_limit(100, 1.0); let runtime = Arc::new(RuntimeEnv::new(runtime_config)?); let session_config = SessionConfig::default().with_batch_size(50); - let session_ctx = SessionContext::with_config_rt(session_config, runtime); - let task_ctx = session_ctx.task_ctx(); + + let task_ctx = TaskContext::default() + .with_session_config(session_config) + .with_runtime(runtime); + let task_ctx = Arc::new(task_ctx); + let join = join_with_options( left.clone(), right.clone(), @@ -2397,8 +2401,10 @@ mod tests { let runtime_config = RuntimeConfig::new().with_memory_limit(100, 1.0); let runtime = Arc::new(RuntimeEnv::new(runtime_config)?); let session_config = SessionConfig::default().with_batch_size(50); - let session_ctx = SessionContext::with_config_rt(session_config, runtime); - let task_ctx = session_ctx.task_ctx(); + let task_ctx = TaskContext::default() + .with_session_config(session_config) + .with_runtime(runtime); + let task_ctx = Arc::new(task_ctx); let join = join_with_options( left.clone(), right.clone(), diff --git a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs index dc8bcc2edb26..efe7ce503b44 100644 --- a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs +++ b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs @@ -1428,21 +1428,17 @@ impl SymmetricHashJoinStream { #[cfg(test)] mod tests { - use std::fs::File; - + use super::*; use arrow::compute::SortOptions; use arrow::datatypes::{DataType, Field, IntervalUnit, Schema, TimeUnit}; + use datafusion_execution::config::SessionConfig; use rstest::*; - use tempfile::TempDir; use datafusion_expr::Operator; use datafusion_physical_expr::expressions::{binary, col, Column}; use datafusion_physical_expr::intervals::test_utils::gen_conjunctive_numerical_expr; - use crate::physical_plan::displayable; use crate::physical_plan::joins::hash_join_utils::tests::complicated_filter; - use crate::prelude::{CsvReadOptions, SessionConfig, SessionContext}; - use crate::test_util::register_unbounded_file_with_ordering; use crate::physical_plan::joins::test_utils::{ build_sides_record_batches, compare_batches, create_memory_table, @@ -1451,9 +1447,6 @@ mod tests { partitioned_sym_join_with_filter, }; use datafusion_common::ScalarValue; - use std::iter::Iterator; - - use super::*; const TABLE_SIZE: i32 = 100; @@ -1506,8 +1499,7 @@ mod tests { cardinality: (i32, i32), ) -> Result<()> { // a + b > c + 10 AND a + b < c + 100 - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let task_ctx = Arc::new(TaskContext::default()); let (left_batch, right_batch) = build_sides_record_batches(TABLE_SIZE, cardinality)?; let left_schema = &left_batch.schema(); @@ -1587,8 +1579,7 @@ mod tests { cardinality: (i32, i32), #[values(0, 1, 2, 3, 4, 5, 6, 7)] case_expr: usize, ) -> Result<()> { - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let task_ctx = Arc::new(TaskContext::default()); let (left_batch, right_batch) = build_sides_record_batches(TABLE_SIZE, cardinality)?; let left_schema = &left_batch.schema(); @@ -1662,8 +1653,7 @@ mod tests { cardinality: (i32, i32), #[values(0, 1, 2, 3, 4, 5, 6)] case_expr: usize, ) -> Result<()> { - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let task_ctx = Arc::new(TaskContext::default()); let (left_batch, right_batch) = build_sides_record_batches(TABLE_SIZE, cardinality)?; let left_schema = &left_batch.schema(); @@ -1715,8 +1705,7 @@ mod tests { )] join_type: JoinType, ) -> Result<()> { - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let task_ctx = Arc::new(TaskContext::default()); let (left_batch, right_batch) = build_sides_record_batches(TABLE_SIZE, (11, 21))?; let left_schema = &left_batch.schema(); let right_schema = &right_batch.schema(); @@ -1753,8 +1742,7 @@ mod tests { cardinality: (i32, i32), #[values(0, 1, 2, 3, 4, 5, 6)] case_expr: usize, ) -> Result<()> { - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let task_ctx = Arc::new(TaskContext::default()); let (left_batch, right_batch) = build_sides_record_batches(TABLE_SIZE, cardinality)?; let left_schema = &left_batch.schema(); @@ -1811,173 +1799,14 @@ mod tests { Ok(()) } - #[tokio::test] - async fn join_change_in_planner() -> Result<()> { - let config = SessionConfig::new().with_target_partitions(8); - let ctx = SessionContext::with_config(config); - let tmp_dir = TempDir::new().unwrap(); - let left_file_path = tmp_dir.path().join("left.csv"); - File::create(left_file_path.clone()).unwrap(); - // Create schema - let schema = Arc::new(Schema::new(vec![ - Field::new("a1", DataType::UInt32, false), - Field::new("a2", DataType::UInt32, false), - ])); - // Specify the ordering: - let file_sort_order = vec![[datafusion_expr::col("a1")] - .into_iter() - .map(|e| { - let ascending = true; - let nulls_first = false; - e.sort(ascending, nulls_first) - }) - .collect::>()]; - register_unbounded_file_with_ordering( - &ctx, - schema.clone(), - &left_file_path, - "left", - file_sort_order.clone(), - true, - ) - .await?; - let right_file_path = tmp_dir.path().join("right.csv"); - File::create(right_file_path.clone()).unwrap(); - register_unbounded_file_with_ordering( - &ctx, - schema, - &right_file_path, - "right", - file_sort_order, - true, - ) - .await?; - let sql = "SELECT t1.a1, t1.a2, t2.a1, t2.a2 FROM left as t1 FULL JOIN right as t2 ON t1.a2 = t2.a2 AND t1.a1 > t2.a1 + 3 AND t1.a1 < t2.a1 + 10"; - let dataframe = ctx.sql(sql).await?; - let physical_plan = dataframe.create_physical_plan().await?; - let formatted = displayable(physical_plan.as_ref()).indent(true).to_string(); - let expected = { - [ - "SymmetricHashJoinExec: mode=Partitioned, join_type=Full, on=[(a2@1, a2@1)], filter=CAST(a1@0 AS Int64) > CAST(a1@1 AS Int64) + 3 AND CAST(a1@0 AS Int64) < CAST(a1@1 AS Int64) + 10", - " CoalesceBatchesExec: target_batch_size=8192", - " RepartitionExec: partitioning=Hash([a2@1], 8), input_partitions=1", - // " CsvExec: file_groups={1 group: [[tempdir/left.csv]]}, projection=[a1, a2], has_header=false", - " CoalesceBatchesExec: target_batch_size=8192", - " RepartitionExec: partitioning=Hash([a2@1], 8), input_partitions=1", - // " CsvExec: file_groups={1 group: [[tempdir/right.csv]]}, projection=[a1, a2], has_header=false" - ] - }; - let mut actual: Vec<&str> = formatted.trim().lines().collect(); - // Remove CSV lines - actual.remove(3); - actual.remove(5); - - assert_eq!( - expected, - actual[..], - "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n" - ); - Ok(()) - } - - #[tokio::test] - async fn join_change_in_planner_without_sort() -> Result<()> { - let config = SessionConfig::new().with_target_partitions(8); - let ctx = SessionContext::with_config(config); - let tmp_dir = TempDir::new()?; - let left_file_path = tmp_dir.path().join("left.csv"); - File::create(left_file_path.clone())?; - let schema = Arc::new(Schema::new(vec![ - Field::new("a1", DataType::UInt32, false), - Field::new("a2", DataType::UInt32, false), - ])); - ctx.register_csv( - "left", - left_file_path.as_os_str().to_str().unwrap(), - CsvReadOptions::new().schema(&schema).mark_infinite(true), - ) - .await?; - let right_file_path = tmp_dir.path().join("right.csv"); - File::create(right_file_path.clone())?; - ctx.register_csv( - "right", - right_file_path.as_os_str().to_str().unwrap(), - CsvReadOptions::new().schema(&schema).mark_infinite(true), - ) - .await?; - let sql = "SELECT t1.a1, t1.a2, t2.a1, t2.a2 FROM left as t1 FULL JOIN right as t2 ON t1.a2 = t2.a2 AND t1.a1 > t2.a1 + 3 AND t1.a1 < t2.a1 + 10"; - let dataframe = ctx.sql(sql).await?; - let physical_plan = dataframe.create_physical_plan().await?; - let formatted = displayable(physical_plan.as_ref()).indent(true).to_string(); - let expected = { - [ - "SymmetricHashJoinExec: mode=Partitioned, join_type=Full, on=[(a2@1, a2@1)], filter=CAST(a1@0 AS Int64) > CAST(a1@1 AS Int64) + 3 AND CAST(a1@0 AS Int64) < CAST(a1@1 AS Int64) + 10", - " CoalesceBatchesExec: target_batch_size=8192", - " RepartitionExec: partitioning=Hash([a2@1], 8), input_partitions=1", - // " CsvExec: file_groups={1 group: [[tempdir/left.csv]]}, projection=[a1, a2], has_header=false", - " CoalesceBatchesExec: target_batch_size=8192", - " RepartitionExec: partitioning=Hash([a2@1], 8), input_partitions=1", - // " CsvExec: file_groups={1 group: [[tempdir/right.csv]]}, projection=[a1, a2], has_header=false" - ] - }; - let mut actual: Vec<&str> = formatted.trim().lines().collect(); - // Remove CSV lines - actual.remove(3); - actual.remove(5); - - assert_eq!( - expected, - actual[..], - "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n" - ); - Ok(()) - } - - #[tokio::test] - async fn join_change_in_planner_without_sort_not_allowed() -> Result<()> { - let config = SessionConfig::new() - .with_target_partitions(8) - .with_allow_symmetric_joins_without_pruning(false); - let ctx = SessionContext::with_config(config); - let tmp_dir = TempDir::new()?; - let left_file_path = tmp_dir.path().join("left.csv"); - File::create(left_file_path.clone())?; - let schema = Arc::new(Schema::new(vec![ - Field::new("a1", DataType::UInt32, false), - Field::new("a2", DataType::UInt32, false), - ])); - ctx.register_csv( - "left", - left_file_path.as_os_str().to_str().unwrap(), - CsvReadOptions::new().schema(&schema).mark_infinite(true), - ) - .await?; - let right_file_path = tmp_dir.path().join("right.csv"); - File::create(right_file_path.clone())?; - ctx.register_csv( - "right", - right_file_path.as_os_str().to_str().unwrap(), - CsvReadOptions::new().schema(&schema).mark_infinite(true), - ) - .await?; - let df = ctx.sql("SELECT t1.a1, t1.a2, t2.a1, t2.a2 FROM left as t1 FULL JOIN right as t2 ON t1.a2 = t2.a2 AND t1.a1 > t2.a1 + 3 AND t1.a1 < t2.a1 + 10").await?; - match df.create_physical_plan().await { - Ok(_) => panic!("Expecting error."), - Err(e) => { - assert_eq!(e.to_string(), "PipelineChecker\ncaused by\nError during planning: Join operation cannot operate on a non-prunable stream without enabling the 'allow_symmetric_joins_without_pruning' configuration flag") - } - } - Ok(()) - } - #[tokio::test(flavor = "multi_thread")] async fn build_null_columns_first() -> Result<()> { let join_type = JoinType::Full; let cardinality = (10, 11); let case_expr = 1; - let config = SessionConfig::new().with_repartition_joins(false); - let session_ctx = SessionContext::with_config(config); - let task_ctx = session_ctx.task_ctx(); + let session_config = SessionConfig::new().with_repartition_joins(false); + let task_ctx = TaskContext::default().with_session_config(session_config); + let task_ctx = Arc::new(task_ctx); let (left_batch, right_batch) = build_sides_record_batches(TABLE_SIZE, cardinality)?; let left_schema = &left_batch.schema(); @@ -2038,9 +1867,9 @@ mod tests { let join_type = JoinType::Full; let cardinality = (10, 11); let case_expr = 1; - let config = SessionConfig::new().with_repartition_joins(false); - let session_ctx = SessionContext::with_config(config); - let task_ctx = session_ctx.task_ctx(); + let session_config = SessionConfig::new().with_repartition_joins(false); + let task_ctx = TaskContext::default().with_session_config(session_config); + let task_ctx = Arc::new(task_ctx); let (left_batch, right_batch) = build_sides_record_batches(TABLE_SIZE, cardinality)?; let left_schema = &left_batch.schema(); @@ -2102,9 +1931,9 @@ mod tests { let join_type = JoinType::Full; let cardinality = (10, 11); let case_expr = 1; - let config = SessionConfig::new().with_repartition_joins(false); - let session_ctx = SessionContext::with_config(config); - let task_ctx = session_ctx.task_ctx(); + let session_config = SessionConfig::new().with_repartition_joins(false); + let task_ctx = TaskContext::default().with_session_config(session_config); + let task_ctx = Arc::new(task_ctx); let (left_batch, right_batch) = build_sides_record_batches(TABLE_SIZE, cardinality)?; let left_schema = &left_batch.schema(); @@ -2167,9 +1996,9 @@ mod tests { let join_type = JoinType::Full; // a + b > c + 10 AND a + b < c + 100 - let config = SessionConfig::new().with_repartition_joins(false); - let session_ctx = SessionContext::with_config(config); - let task_ctx = session_ctx.task_ctx(); + let session_config = SessionConfig::new().with_repartition_joins(false); + let task_ctx = TaskContext::default().with_session_config(session_config); + let task_ctx = Arc::new(task_ctx); let (left_batch, right_batch) = build_sides_record_batches(TABLE_SIZE, cardinality)?; let left_schema = &left_batch.schema(); @@ -2241,9 +2070,9 @@ mod tests { let join_type = case.0; let should_be_empty = case.1; let random_state = RandomState::with_seeds(0, 0, 0, 0); - let config = SessionConfig::new().with_repartition_joins(false); - let session_ctx = SessionContext::with_config(config); - let task_ctx = session_ctx.task_ctx(); + let session_config = SessionConfig::new().with_repartition_joins(false); + let task_ctx = TaskContext::default().with_session_config(session_config); + let task_ctx = Arc::new(task_ctx); // Ensure there will be matching rows let (left_batch, right_batch) = build_sides_record_batches(20, (1, 1))?; let left_schema = left_batch.schema(); @@ -2369,9 +2198,9 @@ mod tests { cardinality: (i32, i32), #[values(0, 1)] case_expr: usize, ) -> Result<()> { - let config = SessionConfig::new().with_repartition_joins(false); - let session_ctx = SessionContext::with_config(config); - let task_ctx = session_ctx.task_ctx(); + let session_config = SessionConfig::new().with_repartition_joins(false); + let task_ctx = TaskContext::default().with_session_config(session_config); + let task_ctx = Arc::new(task_ctx); let (left_batch, right_batch) = build_sides_record_batches(TABLE_SIZE, cardinality)?; let left_schema = &left_batch.schema(); @@ -2453,9 +2282,9 @@ mod tests { )] cardinality: (i32, i32), ) -> Result<()> { - let config = SessionConfig::new().with_repartition_joins(false); - let session_ctx = SessionContext::with_config(config); - let task_ctx = session_ctx.task_ctx(); + let session_config = SessionConfig::new().with_repartition_joins(false); + let task_ctx = TaskContext::default().with_session_config(session_config); + let task_ctx = Arc::new(task_ctx); let (left_batch, right_batch) = build_sides_record_batches(TABLE_SIZE, cardinality)?; let left_schema = &left_batch.schema(); @@ -2532,9 +2361,9 @@ mod tests { cardinality: (i32, i32), #[values(0, 1, 2, 3, 4, 5, 6, 7)] case_expr: usize, ) -> Result<()> { - let config = SessionConfig::new().with_repartition_joins(false); - let session_ctx = SessionContext::with_config(config); - let task_ctx = session_ctx.task_ctx(); + let session_config = SessionConfig::new().with_repartition_joins(false); + let task_ctx = TaskContext::default().with_session_config(session_config); + let task_ctx = Arc::new(task_ctx); let (left_batch, right_batch) = build_sides_record_batches(TABLE_SIZE, cardinality)?; let left_schema = &left_batch.schema(); diff --git a/datafusion/core/src/physical_plan/limit.rs b/datafusion/core/src/physical_plan/limit.rs index 93f6cd7c2ca9..c5d5fbcfd195 100644 --- a/datafusion/core/src/physical_plan/limit.rs +++ b/datafusion/core/src/physical_plan/limit.rs @@ -532,13 +532,11 @@ mod tests { use super::*; use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec; use crate::physical_plan::common; - use crate::prelude::SessionContext; use crate::test; #[tokio::test] async fn limit() -> Result<()> { - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let task_ctx = Arc::new(TaskContext::default()); let num_partitions = 4; let csv = test::scan_partitioned_csv(num_partitions)?; @@ -654,8 +652,7 @@ mod tests { // test cases for "skip" async fn skip_and_fetch(skip: usize, fetch: Option) -> Result { - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let task_ctx = Arc::new(TaskContext::default()); let num_partitions = 4; let csv = test::scan_partitioned_csv(num_partitions)?; diff --git a/datafusion/core/src/physical_plan/projection.rs b/datafusion/core/src/physical_plan/projection.rs index 5c4b66114328..ca4a3e54f5cf 100644 --- a/datafusion/core/src/physical_plan/projection.rs +++ b/datafusion/core/src/physical_plan/projection.rs @@ -427,7 +427,6 @@ mod tests { use super::*; use crate::physical_plan::common::collect; use crate::physical_plan::expressions::{self, col}; - use crate::prelude::SessionContext; use crate::test::{self}; use crate::test_util; use datafusion_common::ScalarValue; @@ -448,8 +447,7 @@ mod tests { #[tokio::test] async fn project_first_column() -> Result<()> { - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let task_ctx = Arc::new(TaskContext::default()); let schema = test_util::aggr_test_schema(); let partitions = 4; @@ -519,8 +517,7 @@ mod tests { #[tokio::test] async fn project_no_column() -> Result<()> { - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let task_ctx = Arc::new(TaskContext::default()); let csv = test::scan_partitioned_csv(1)?; let expected = collect(csv.execute(0, task_ctx.clone())?).await.unwrap(); diff --git a/datafusion/core/src/physical_plan/repartition/mod.rs b/datafusion/core/src/physical_plan/repartition/mod.rs index 3f83e186ea49..78d78df0af8e 100644 --- a/datafusion/core/src/physical_plan/repartition/mod.rs +++ b/datafusion/core/src/physical_plan/repartition/mod.rs @@ -883,8 +883,6 @@ impl RecordBatchStream for PerPartitionStream { #[cfg(test)] mod tests { use super::*; - use crate::execution::context::SessionConfig; - use crate::prelude::SessionContext; use crate::test::create_vec_batches; use crate::{ assert_batches_sorted_eq, @@ -998,8 +996,7 @@ mod tests { input_partitions: Vec>, partitioning: Partitioning, ) -> Result>> { - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let task_ctx = Arc::new(TaskContext::default()); // create physical plan let exec = MemoryExec::try_new(&input_partitions, schema.clone(), None)?; let exec = RepartitionExec::try_new(Arc::new(exec), partitioning)?; @@ -1046,8 +1043,7 @@ mod tests { #[tokio::test] async fn unsupported_partitioning() { - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let task_ctx = Arc::new(TaskContext::default()); // have to send at least one batch through to provoke error let batch = RecordBatch::try_from_iter(vec![( "my_awesome_field", @@ -1081,8 +1077,7 @@ mod tests { // This generates an error on a call to execute. The error // should be returned and no results produced. - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let task_ctx = Arc::new(TaskContext::default()); let input = ErrorExec::new(); let partitioning = Partitioning::RoundRobinBatch(1); let exec = RepartitionExec::try_new(Arc::new(input), partitioning).unwrap(); @@ -1104,8 +1099,7 @@ mod tests { #[tokio::test] async fn repartition_with_error_in_stream() { - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let task_ctx = Arc::new(TaskContext::default()); let batch = RecordBatch::try_from_iter(vec![( "my_awesome_field", Arc::new(StringArray::from(vec!["foo", "bar"])) as ArrayRef, @@ -1138,8 +1132,7 @@ mod tests { #[tokio::test] async fn repartition_with_delayed_stream() { - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let task_ctx = Arc::new(TaskContext::default()); let batch1 = RecordBatch::try_from_iter(vec![( "my_awesome_field", Arc::new(StringArray::from(vec!["foo", "bar"])) as ArrayRef, @@ -1184,8 +1177,7 @@ mod tests { #[tokio::test] async fn robin_repartition_with_dropping_output_stream() { - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let task_ctx = Arc::new(TaskContext::default()); let partitioning = Partitioning::RoundRobinBatch(2); // The barrier exec waits to be pinged // requires the input to wait at least once) @@ -1228,8 +1220,7 @@ mod tests { // wiht different compilers, we will compare the same execution with // and without droping the output stream. async fn hash_repartition_with_dropping_output_stream() { - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let task_ctx = Arc::new(TaskContext::default()); let partitioning = Partitioning::Hash( vec![Arc::new(crate::physical_plan::expressions::Column::new( "my_awesome_field", @@ -1324,8 +1315,7 @@ mod tests { #[tokio::test] async fn test_drop_cancel() -> Result<()> { - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let task_ctx = Arc::new(TaskContext::default()); let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, true)])); @@ -1348,8 +1338,7 @@ mod tests { #[tokio::test] async fn hash_repartition_avoid_empty_batch() -> Result<()> { - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let task_ctx = Arc::new(TaskContext::default()); let batch = RecordBatch::try_from_iter(vec![( "a", Arc::new(StringArray::from(vec!["foo"])) as ArrayRef, @@ -1385,14 +1374,12 @@ mod tests { let partitioning = Partitioning::RoundRobinBatch(4); // setup up context - let session_ctx = SessionContext::with_config_rt( - SessionConfig::default(), - Arc::new( - RuntimeEnv::new(RuntimeConfig::default().with_memory_limit(1, 1.0)) - .unwrap(), - ), + let runtime = Arc::new( + RuntimeEnv::new(RuntimeConfig::default().with_memory_limit(1, 1.0)).unwrap(), ); - let task_ctx = session_ctx.task_ctx(); + + let task_ctx = TaskContext::default().with_runtime(runtime); + let task_ctx = Arc::new(task_ctx); // create physical plan let exec = MemoryExec::try_new(&input_partitions, schema.clone(), None)?; diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs b/datafusion/core/src/physical_plan/sorts/sort.rs index 1813fd6a1508..3bedbd17e81a 100644 --- a/datafusion/core/src/physical_plan/sorts/sort.rs +++ b/datafusion/core/src/physical_plan/sorts/sort.rs @@ -884,12 +884,10 @@ impl ExecutionPlan for SortExec { #[cfg(test)] mod tests { use super::*; - use crate::execution::context::SessionConfig; use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec; use crate::physical_plan::collect; use crate::physical_plan::expressions::col; use crate::physical_plan::memory::MemoryExec; - use crate::prelude::SessionContext; use crate::test; use crate::test::assert_is_pending; use crate::test::exec::{assert_strong_count_converges_to_zero, BlockingExec}; @@ -897,14 +895,14 @@ mod tests { use arrow::compute::SortOptions; use arrow::datatypes::*; use datafusion_common::cast::{as_primitive_array, as_string_array}; + use datafusion_execution::config::SessionConfig; use datafusion_execution::runtime_env::RuntimeConfig; use futures::FutureExt; use std::collections::HashMap; #[tokio::test] async fn test_in_mem_sort() -> Result<()> { - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let task_ctx = Arc::new(TaskContext::default()); let partitions = 4; let csv = test::scan_partitioned_csv(partitions)?; let schema = csv.schema(); @@ -930,7 +928,7 @@ mod tests { Arc::new(CoalescePartitionsExec::new(csv)), )); - let result = collect(sort_exec, task_ctx).await?; + let result = collect(sort_exec, task_ctx.clone()).await?; assert_eq!(result.len(), 1); @@ -949,7 +947,7 @@ mod tests { assert_eq!(c7.value(c7.len() - 1), 254,); assert_eq!( - session_ctx.runtime_env().memory_pool.reserved(), + task_ctx.runtime_env().memory_pool.reserved(), 0, "The sort should have returned all memory used back to the memory manager" ); @@ -968,7 +966,11 @@ mod tests { let rt_config = RuntimeConfig::new() .with_memory_limit(sort_spill_reservation_bytes + 12288, 1.0); let runtime = Arc::new(RuntimeEnv::new(rt_config)?); - let session_ctx = SessionContext::with_config_rt(session_config, runtime); + let task_ctx = Arc::new( + TaskContext::default() + .with_session_config(session_config) + .with_runtime(runtime), + ); let partitions = 4; let csv = test::scan_partitioned_csv(partitions)?; @@ -995,8 +997,7 @@ mod tests { Arc::new(CoalescePartitionsExec::new(csv)), )); - let task_ctx = session_ctx.task_ctx(); - let result = collect(sort_exec.clone(), task_ctx).await?; + let result = collect(sort_exec.clone(), task_ctx.clone()).await?; assert_eq!(result.len(), 1); @@ -1023,7 +1024,7 @@ mod tests { assert_eq!(c7.value(c7.len() - 1), 254,); assert_eq!( - session_ctx.runtime_env().memory_pool.reserved(), + task_ctx.runtime_env().memory_pool.reserved(), 0, "The sort should have returned all memory used back to the memory manager" ); @@ -1059,7 +1060,11 @@ mod tests { 1.0, ); let runtime = Arc::new(RuntimeEnv::new(rt_config)?); - let session_ctx = SessionContext::with_config_rt(session_config, runtime); + let task_ctx = Arc::new( + TaskContext::default() + .with_runtime(runtime) + .with_session_config(session_config), + ); let csv = test::scan_partitioned_csv(partitions)?; let schema = csv.schema(); @@ -1088,7 +1093,6 @@ mod tests { .with_fetch(fetch), ); - let task_ctx = session_ctx.task_ctx(); let result = collect(sort_exec.clone(), task_ctx).await?; assert_eq!(result.len(), 1); @@ -1101,8 +1105,7 @@ mod tests { #[tokio::test] async fn test_sort_metadata() -> Result<()> { - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let task_ctx = Arc::new(TaskContext::default()); let field_metadata: HashMap = vec![("foo".to_string(), "bar".to_string())] .into_iter() @@ -1151,8 +1154,7 @@ mod tests { #[tokio::test] async fn test_lex_sort_by_float() -> Result<()> { - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let task_ctx = Arc::new(TaskContext::default()); let schema = Arc::new(Schema::new(vec![ Field::new("a", DataType::Float32, true), Field::new("b", DataType::Float64, true), @@ -1257,8 +1259,7 @@ mod tests { #[tokio::test] async fn test_drop_cancel() -> Result<()> { - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let task_ctx = Arc::new(TaskContext::default()); let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, true)])); @@ -1272,7 +1273,7 @@ mod tests { blocking_exec, )); - let fut = collect(sort_exec, task_ctx); + let fut = collect(sort_exec, task_ctx.clone()); let mut fut = fut.boxed(); assert_is_pending(&mut fut); @@ -1280,7 +1281,7 @@ mod tests { assert_strong_count_converges_to_zero(refs).await; assert_eq!( - session_ctx.runtime_env().memory_pool.reserved(), + task_ctx.runtime_env().memory_pool.reserved(), 0, "The sort should have returned all memory used back to the memory manager" ); diff --git a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs index 6b978b5ee753..bc0ac678f03c 100644 --- a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs +++ b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs @@ -273,6 +273,7 @@ mod tests { use arrow::compute::SortOptions; use arrow::datatypes::{DataType, Field, Schema}; use arrow::record_batch::RecordBatch; + use datafusion_execution::config::SessionConfig; use futures::{FutureExt, StreamExt}; use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec; @@ -282,7 +283,6 @@ mod tests { use crate::physical_plan::sorts::sort::SortExec; use crate::physical_plan::stream::RecordBatchReceiverStream; use crate::physical_plan::{collect, common}; - use crate::prelude::{SessionConfig, SessionContext}; use crate::test::exec::{assert_strong_count_converges_to_zero, BlockingExec}; use crate::test::{self, assert_is_pending}; use crate::{assert_batches_eq, test_util}; @@ -292,8 +292,7 @@ mod tests { #[tokio::test] async fn test_merge_interleave() { - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let task_ctx = Arc::new(TaskContext::default()); let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 7, 9, 3])); let b: ArrayRef = Arc::new(StringArray::from_iter(vec![ Some("a"), @@ -341,8 +340,7 @@ mod tests { #[tokio::test] async fn test_merge_some_overlap() { - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let task_ctx = Arc::new(TaskContext::default()); let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 7, 9, 3])); let b: ArrayRef = Arc::new(StringArray::from_iter(vec![ Some("a"), @@ -390,8 +388,7 @@ mod tests { #[tokio::test] async fn test_merge_no_overlap() { - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let task_ctx = Arc::new(TaskContext::default()); let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 7, 9, 3])); let b: ArrayRef = Arc::new(StringArray::from_iter(vec![ Some("a"), @@ -439,8 +436,7 @@ mod tests { #[tokio::test] async fn test_merge_three_partitions() { - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let task_ctx = Arc::new(TaskContext::default()); let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 7, 9, 3])); let b: ArrayRef = Arc::new(StringArray::from_iter(vec![ Some("a"), @@ -561,8 +557,7 @@ mod tests { #[tokio::test] async fn test_partition_sort() { - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let task_ctx = Arc::new(TaskContext::default()); let partitions = 4; let csv = test::scan_partitioned_csv(partitions).unwrap(); let schema = csv.schema(); @@ -644,8 +639,7 @@ mod tests { #[tokio::test] async fn test_partition_sort_streaming_input() { - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let task_ctx = Arc::new(TaskContext::default()); let schema = test_util::aggr_test_schema(); let sort = vec![ // uint8 @@ -705,17 +699,18 @@ mod tests { }, ]; - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + // Test streaming with default batch size + let task_ctx = Arc::new(TaskContext::default()); let input = sorted_partitioned_input(sort.clone(), &[10, 5, 13], task_ctx.clone()).await; let basic = basic_sort(input.clone(), sort.clone(), task_ctx).await; - let session_ctx_bs_23 = - SessionContext::with_config(SessionConfig::new().with_batch_size(23)); + // batch size of 23 + let task_ctx = TaskContext::default() + .with_session_config(SessionConfig::new().with_batch_size(23)); + let task_ctx = Arc::new(task_ctx); let merge = Arc::new(SortPreservingMergeExec::new(sort, input)); - let task_ctx = session_ctx_bs_23.task_ctx(); let merged = collect(merge, task_ctx).await.unwrap(); assert_eq!(merged.len(), 14); @@ -735,8 +730,7 @@ mod tests { #[tokio::test] async fn test_nulls() { - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let task_ctx = Arc::new(TaskContext::default()); let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 7, 9, 3])); let b: ArrayRef = Arc::new(StringArray::from_iter(vec![ None, @@ -817,8 +811,7 @@ mod tests { #[tokio::test] async fn test_async() { - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let task_ctx = Arc::new(TaskContext::default()); let schema = test_util::aggr_test_schema(); let sort = vec![PhysicalSortExpr { expr: col("c12", &schema).unwrap(), @@ -885,8 +878,7 @@ mod tests { #[tokio::test] async fn test_merge_metrics() { - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let task_ctx = Arc::new(TaskContext::default()); let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2])); let b: ArrayRef = Arc::new(StringArray::from_iter(vec![Some("a"), Some("c")])); let b1 = RecordBatch::try_from_iter(vec![("a", a), ("b", b)]).unwrap(); @@ -942,8 +934,7 @@ mod tests { #[tokio::test] async fn test_drop_cancel() -> Result<()> { - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let task_ctx = Arc::new(TaskContext::default()); let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, true)])); @@ -969,8 +960,7 @@ mod tests { #[tokio::test] async fn test_stable_sort() { - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let task_ctx = Arc::new(TaskContext::default()); // Create record batches like: // batch_number |value diff --git a/datafusion/core/src/physical_plan/stream.rs b/datafusion/core/src/physical_plan/stream.rs index 2b916b7ee263..2e9d40cfe571 100644 --- a/datafusion/core/src/physical_plan/stream.rs +++ b/datafusion/core/src/physical_plan/stream.rs @@ -370,11 +370,8 @@ mod test { use super::*; use arrow_schema::{DataType, Field, Schema}; - use crate::{ - execution::context::SessionContext, - test::exec::{ - assert_strong_count_converges_to_zero, BlockingExec, MockExec, PanicExec, - }, + use crate::test::exec::{ + assert_strong_count_converges_to_zero, BlockingExec, MockExec, PanicExec, }; fn schema() -> SchemaRef { @@ -413,8 +410,7 @@ mod test { #[tokio::test] async fn record_batch_receiver_stream_drop_cancel() { - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let task_ctx = Arc::new(TaskContext::default()); let schema = schema(); // Make an input that never proceeds @@ -439,8 +435,7 @@ mod test { /// `RecordBatchReceiverStream` stops early and does not drive /// other streams to completion. async fn record_batch_receiver_stream_error_does_not_drive_completion() { - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let task_ctx = Arc::new(TaskContext::default()); let schema = schema(); // make an input that will error twice @@ -471,8 +466,7 @@ mod test { /// /// panic's if more than max_batches is seen, async fn consume(input: PanicExec, max_batches: usize) { - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let task_ctx = Arc::new(TaskContext::default()); let input = Arc::new(input); let num_partitions = input.output_partitioning().partition_count(); diff --git a/datafusion/core/src/physical_plan/union.rs b/datafusion/core/src/physical_plan/union.rs index e1f8072085f3..e29c96da092e 100644 --- a/datafusion/core/src/physical_plan/union.rs +++ b/datafusion/core/src/physical_plan/union.rs @@ -608,14 +608,12 @@ mod tests { use super::*; use crate::test; - use crate::prelude::SessionContext; use crate::{physical_plan::collect, scalar::ScalarValue}; use arrow::record_batch::RecordBatch; #[tokio::test] async fn test_union_partitions() -> Result<()> { - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let task_ctx = Arc::new(TaskContext::default()); // Create csv's with different partitioning let csv = test::scan_partitioned_csv(4)?; diff --git a/datafusion/core/src/physical_plan/windows/mod.rs b/datafusion/core/src/physical_plan/windows/mod.rs index 7d9d70f4771e..cba7df772efe 100644 --- a/datafusion/core/src/physical_plan/windows/mod.rs +++ b/datafusion/core/src/physical_plan/windows/mod.rs @@ -362,7 +362,6 @@ mod tests { use crate::physical_plan::aggregates::AggregateFunction; use crate::physical_plan::expressions::col; use crate::physical_plan::{collect, ExecutionPlan}; - use crate::prelude::SessionContext; use crate::test::exec::{assert_strong_count_converges_to_zero, BlockingExec}; use crate::test::{self, assert_is_pending, csv_exec_sorted}; use arrow::array::*; @@ -370,6 +369,7 @@ mod tests { use arrow::datatypes::{DataType, Field, SchemaRef}; use arrow::record_batch::RecordBatch; use datafusion_common::cast::as_primitive_array; + use datafusion_execution::TaskContext; use datafusion_expr::{create_udaf, Accumulator, Volatility}; use futures::FutureExt; @@ -546,8 +546,7 @@ mod tests { Arc::new(vec![DataType::Int64]), ); - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let task_ctx = Arc::new(TaskContext::default()); let (input, schema) = create_test_schema(1)?; let window_exec = Arc::new(WindowAggExec::try_new( @@ -579,8 +578,7 @@ mod tests { #[tokio::test] async fn window_function() -> Result<()> { - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let task_ctx = Arc::new(TaskContext::default()); let (input, schema) = create_test_schema(1)?; let window_exec = Arc::new(WindowAggExec::try_new( @@ -643,8 +641,7 @@ mod tests { #[tokio::test] async fn test_drop_cancel() -> Result<()> { - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let task_ctx = Arc::new(TaskContext::default()); let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, true)])); diff --git a/datafusion/core/tests/sql/joins.rs b/datafusion/core/tests/sql/joins.rs index 116a3c1a7978..e555a28f41fc 100644 --- a/datafusion/core/tests/sql/joins.rs +++ b/datafusion/core/tests/sql/joins.rs @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +use datafusion::test_util::register_unbounded_file_with_ordering; + use super::*; #[tokio::test] @@ -75,3 +77,162 @@ async fn null_aware_left_anti_join() -> Result<()> { Ok(()) } + +#[tokio::test] +async fn join_change_in_planner() -> Result<()> { + let config = SessionConfig::new().with_target_partitions(8); + let ctx = SessionContext::with_config(config); + let tmp_dir = TempDir::new().unwrap(); + let left_file_path = tmp_dir.path().join("left.csv"); + File::create(left_file_path.clone()).unwrap(); + // Create schema + let schema = Arc::new(Schema::new(vec![ + Field::new("a1", DataType::UInt32, false), + Field::new("a2", DataType::UInt32, false), + ])); + // Specify the ordering: + let file_sort_order = vec![[datafusion_expr::col("a1")] + .into_iter() + .map(|e| { + let ascending = true; + let nulls_first = false; + e.sort(ascending, nulls_first) + }) + .collect::>()]; + register_unbounded_file_with_ordering( + &ctx, + schema.clone(), + &left_file_path, + "left", + file_sort_order.clone(), + true, + ) + .await?; + let right_file_path = tmp_dir.path().join("right.csv"); + File::create(right_file_path.clone()).unwrap(); + register_unbounded_file_with_ordering( + &ctx, + schema, + &right_file_path, + "right", + file_sort_order, + true, + ) + .await?; + let sql = "SELECT t1.a1, t1.a2, t2.a1, t2.a2 FROM left as t1 FULL JOIN right as t2 ON t1.a2 = t2.a2 AND t1.a1 > t2.a1 + 3 AND t1.a1 < t2.a1 + 10"; + let dataframe = ctx.sql(sql).await?; + let physical_plan = dataframe.create_physical_plan().await?; + let formatted = displayable(physical_plan.as_ref()).indent(true).to_string(); + let expected = { + [ + "SymmetricHashJoinExec: mode=Partitioned, join_type=Full, on=[(a2@1, a2@1)], filter=CAST(a1@0 AS Int64) > CAST(a1@1 AS Int64) + 3 AND CAST(a1@0 AS Int64) < CAST(a1@1 AS Int64) + 10", + " CoalesceBatchesExec: target_batch_size=8192", + " RepartitionExec: partitioning=Hash([a2@1], 8), input_partitions=1", + // " CsvExec: file_groups={1 group: [[tempdir/left.csv]]}, projection=[a1, a2], has_header=false", + " CoalesceBatchesExec: target_batch_size=8192", + " RepartitionExec: partitioning=Hash([a2@1], 8), input_partitions=1", + // " CsvExec: file_groups={1 group: [[tempdir/right.csv]]}, projection=[a1, a2], has_header=false" + ] + }; + let mut actual: Vec<&str> = formatted.trim().lines().collect(); + // Remove CSV lines + actual.remove(3); + actual.remove(5); + + assert_eq!( + expected, + actual[..], + "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n" + ); + Ok(()) +} + +#[tokio::test] +async fn join_change_in_planner_without_sort() -> Result<()> { + let config = SessionConfig::new().with_target_partitions(8); + let ctx = SessionContext::with_config(config); + let tmp_dir = TempDir::new()?; + let left_file_path = tmp_dir.path().join("left.csv"); + File::create(left_file_path.clone())?; + let schema = Arc::new(Schema::new(vec![ + Field::new("a1", DataType::UInt32, false), + Field::new("a2", DataType::UInt32, false), + ])); + ctx.register_csv( + "left", + left_file_path.as_os_str().to_str().unwrap(), + CsvReadOptions::new().schema(&schema).mark_infinite(true), + ) + .await?; + let right_file_path = tmp_dir.path().join("right.csv"); + File::create(right_file_path.clone())?; + ctx.register_csv( + "right", + right_file_path.as_os_str().to_str().unwrap(), + CsvReadOptions::new().schema(&schema).mark_infinite(true), + ) + .await?; + let sql = "SELECT t1.a1, t1.a2, t2.a1, t2.a2 FROM left as t1 FULL JOIN right as t2 ON t1.a2 = t2.a2 AND t1.a1 > t2.a1 + 3 AND t1.a1 < t2.a1 + 10"; + let dataframe = ctx.sql(sql).await?; + let physical_plan = dataframe.create_physical_plan().await?; + let formatted = displayable(physical_plan.as_ref()).indent(true).to_string(); + let expected = { + [ + "SymmetricHashJoinExec: mode=Partitioned, join_type=Full, on=[(a2@1, a2@1)], filter=CAST(a1@0 AS Int64) > CAST(a1@1 AS Int64) + 3 AND CAST(a1@0 AS Int64) < CAST(a1@1 AS Int64) + 10", + " CoalesceBatchesExec: target_batch_size=8192", + " RepartitionExec: partitioning=Hash([a2@1], 8), input_partitions=1", + // " CsvExec: file_groups={1 group: [[tempdir/left.csv]]}, projection=[a1, a2], has_header=false", + " CoalesceBatchesExec: target_batch_size=8192", + " RepartitionExec: partitioning=Hash([a2@1], 8), input_partitions=1", + // " CsvExec: file_groups={1 group: [[tempdir/right.csv]]}, projection=[a1, a2], has_header=false" + ] + }; + let mut actual: Vec<&str> = formatted.trim().lines().collect(); + // Remove CSV lines + actual.remove(3); + actual.remove(5); + + assert_eq!( + expected, + actual[..], + "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n" + ); + Ok(()) +} + +#[tokio::test] +async fn join_change_in_planner_without_sort_not_allowed() -> Result<()> { + let config = SessionConfig::new() + .with_target_partitions(8) + .with_allow_symmetric_joins_without_pruning(false); + let ctx = SessionContext::with_config(config); + let tmp_dir = TempDir::new()?; + let left_file_path = tmp_dir.path().join("left.csv"); + File::create(left_file_path.clone())?; + let schema = Arc::new(Schema::new(vec![ + Field::new("a1", DataType::UInt32, false), + Field::new("a2", DataType::UInt32, false), + ])); + ctx.register_csv( + "left", + left_file_path.as_os_str().to_str().unwrap(), + CsvReadOptions::new().schema(&schema).mark_infinite(true), + ) + .await?; + let right_file_path = tmp_dir.path().join("right.csv"); + File::create(right_file_path.clone())?; + ctx.register_csv( + "right", + right_file_path.as_os_str().to_str().unwrap(), + CsvReadOptions::new().schema(&schema).mark_infinite(true), + ) + .await?; + let df = ctx.sql("SELECT t1.a1, t1.a2, t2.a1, t2.a2 FROM left as t1 FULL JOIN right as t2 ON t1.a2 = t2.a2 AND t1.a1 > t2.a1 + 3 AND t1.a1 < t2.a1 + 10").await?; + match df.create_physical_plan().await { + Ok(_) => panic!("Expecting error."), + Err(e) => { + assert_eq!(e.to_string(), "PipelineChecker\ncaused by\nError during planning: Join operation cannot operate on a non-prunable stream without enabling the 'allow_symmetric_joins_without_pruning' configuration flag") + } + } + Ok(()) +} diff --git a/datafusion/core/tests/sqllogictests/test_files/options.slt b/datafusion/core/tests/sqllogictests/test_files/options.slt new file mode 100644 index 000000000000..1f4cc9ab0cde --- /dev/null +++ b/datafusion/core/tests/sqllogictests/test_files/options.slt @@ -0,0 +1,84 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +####### +## Tests for config options +####### + + +statement ok +create table a(c0 int) as values (1), (2); + +# Expect coalesce and default batch size +query TT +explain SELECT * FROM a WHERE c0 < 1; +---- +logical_plan +Filter: a.c0 < Int32(1) +--TableScan: a projection=[c0] +physical_plan +CoalesceBatchesExec: target_batch_size=8192 +--FilterExec: c0@0 < 1 +----MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0] + +## +# test_disable_coalesce +## + +statement ok +set datafusion.execution.coalesce_batches = false + +# expect no coalsece +query TT +explain SELECT * FROM a WHERE c0 < 1; +---- +logical_plan +Filter: a.c0 < Int32(1) +--TableScan: a projection=[c0] +physical_plan +FilterExec: c0@0 < 1 +--MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0] + +statement ok +set datafusion.execution.coalesce_batches = true + + +## +# test_custom_batch_size +## + +statement ok +set datafusion.execution.batch_size = 1234; + +# expect batch size to be 1234 +query TT +explain SELECT * FROM a WHERE c0 < 1; +---- +logical_plan +Filter: a.c0 < Int32(1) +--TableScan: a projection=[c0] +physical_plan +CoalesceBatchesExec: target_batch_size=1234 +--FilterExec: c0@0 < 1 +----MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0] + + +statement ok +set datafusion.execution.batch_size = 8192; + +statement ok +drop table a diff --git a/datafusion/execution/src/task.rs b/datafusion/execution/src/task.rs index 62e2fa8da36b..72d804d7bb9a 100644 --- a/datafusion/execution/src/task.rs +++ b/datafusion/execution/src/task.rs @@ -27,16 +27,20 @@ use datafusion_common::{ use datafusion_expr::{AggregateUDF, ScalarUDF, WindowUDF}; use crate::{ - config::SessionConfig, memory_pool::MemoryPool, registry::FunctionRegistry, - runtime_env::RuntimeEnv, + config::SessionConfig, + memory_pool::MemoryPool, + registry::FunctionRegistry, + runtime_env::{RuntimeConfig, RuntimeEnv}, }; /// Task Execution Context /// -/// A [`TaskContext`] has represents the state available during a single query's -/// execution. +/// A [`TaskContext`] contains the state available during a single +/// query's execution. Please see [`SessionContext`] for a user level +/// multi-query API. /// -/// # Task Context +/// [`SessionContext`]: https://docs.rs/datafusion/latest/datafusion/execution/context/struct.SessionContext.html +#[derive(Debug)] pub struct TaskContext { /// Session Id session_id: String, @@ -54,6 +58,24 @@ pub struct TaskContext { runtime: Arc, } +impl Default for TaskContext { + fn default() -> Self { + let runtime = RuntimeEnv::new(RuntimeConfig::new()) + .expect("defauly runtime created successfully"); + + // Create a default task context, mostly useful for testing + Self { + session_id: "DEFAULT".to_string(), + task_id: None, + session_config: SessionConfig::new(), + scalar_functions: HashMap::new(), + aggregate_functions: HashMap::new(), + window_functions: HashMap::new(), + runtime: Arc::new(runtime), + } + } +} + impl TaskContext { /// Create a new [`TaskContext`] instance. /// @@ -137,6 +159,18 @@ impl TaskContext { pub fn runtime_env(&self) -> Arc { self.runtime.clone() } + + /// Update the [`ConfigOptions`] + pub fn with_session_config(mut self, session_config: SessionConfig) -> Self { + self.session_config = session_config; + self + } + + /// Update the [`RuntimeEnv`] + pub fn with_runtime(mut self, runtime: Arc) -> Self { + self.runtime = runtime; + self + } } impl FunctionRegistry for TaskContext {