From 2da33f4ee7729916d23b048eb3caec1fb70f693a Mon Sep 17 00:00:00 2001 From: Eduard Karacharov Date: Wed, 13 Mar 2024 22:38:43 +0200 Subject: [PATCH 1/5] support input reordering for NestedLoopJoinExec --- .../src/physical_optimizer/join_selection.rs | 209 +++++++- datafusion/core/tests/fuzz_cases/join_fuzz.rs | 89 +++- .../src/joins/nested_loop_join.rs | 489 +++++++----------- datafusion/sqllogictest/test_files/cte.slt | 5 +- .../sqllogictest/test_files/group_by.slt | 2 +- datafusion/sqllogictest/test_files/joins.slt | 28 +- .../sqllogictest/test_files/tpch/q11.slt.part | 54 +- .../sqllogictest/test_files/tpch/q22.slt.part | 37 +- 8 files changed, 542 insertions(+), 371 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/join_selection.rs b/datafusion/core/src/physical_optimizer/join_selection.rs index 72174b0e6e2f..9d6c576f8904 100644 --- a/datafusion/core/src/physical_optimizer/join_selection.rs +++ b/datafusion/core/src/physical_optimizer/join_selection.rs @@ -30,8 +30,8 @@ use crate::error::Result; use crate::physical_optimizer::PhysicalOptimizerRule; use crate::physical_plan::joins::utils::{ColumnIndex, JoinFilter}; use crate::physical_plan::joins::{ - CrossJoinExec, HashJoinExec, PartitionMode, StreamJoinPartitionMode, - SymmetricHashJoinExec, + CrossJoinExec, HashJoinExec, NestedLoopJoinExec, PartitionMode, + StreamJoinPartitionMode, SymmetricHashJoinExec, }; use crate::physical_plan::projection::ProjectionExec; use crate::physical_plan::{ExecutionPlan, ExecutionPlanProperties}; @@ -199,6 +199,38 @@ fn swap_hash_join( } } +/// Swaps inputs of `NestedLoopJoinExec` and wraps it into `ProjectionExec` is required +fn swap_nl_join(join: &NestedLoopJoinExec) -> Result> { + let new_filter = swap_join_filter(join.filter()); + let new_join_type = &swap_join_type(*join.join_type()); + + let new_join = NestedLoopJoinExec::try_new( + Arc::clone(join.right()), + Arc::clone(join.left()), + new_filter, + new_join_type, + )?; + + // For Semi/Anti joins, swap result will produce same output schema, + // no need to wrap them into additional projection + let plan: Arc = if matches!( + join.join_type(), + JoinType::LeftSemi + | JoinType::RightSemi + | JoinType::LeftAnti + | JoinType::RightAnti + ) { + Arc::new(new_join) + } else { + let projection = + swap_reverting_projection(&join.left().schema(), &join.right().schema()); + + Arc::new(ProjectionExec::try_new(projection, Arc::new(new_join))?) + }; + + Ok(plan) +} + /// When the order of the join is changed by the optimizer, the columns in /// the output should not be impacted. This function creates the expressions /// that will allow to swap back the values from the original left as the first @@ -461,6 +493,14 @@ fn statistical_join_selection_subrule( } else { None } + } else if let Some(nl_join) = plan.as_any().downcast_ref::() { + let left = nl_join.left(); + let right = nl_join.right(); + if should_swap_join_order(&**left, &**right)? { + swap_nl_join(nl_join).map(Some)? + } else { + None + } } else { None }; @@ -697,9 +737,12 @@ mod tests_statistical { use arrow::datatypes::{DataType, Field, Schema}; use datafusion_common::{stats::Precision, JoinType, ScalarValue}; - use datafusion_physical_expr::expressions::Column; + use datafusion_expr::Operator; + use datafusion_physical_expr::expressions::{BinaryExpr, Column}; use datafusion_physical_expr::{PhysicalExpr, PhysicalExprRef}; + use rstest::rstest; + /// Return statistcs for empty table fn empty_statistics() -> Statistics { Statistics { @@ -785,6 +828,35 @@ mod tests_statistical { }] } + /// Create join filter for NLJoinExec with expression `big_col > small_col` + /// where both columns are 0-indexed and come from left and right inputs respectively + fn nl_join_filter() -> Option { + let column_indices = vec![ + ColumnIndex { + index: 0, + side: JoinSide::Left, + }, + ColumnIndex { + index: 0, + side: JoinSide::Right, + }, + ]; + let intermediate_schema = Schema::new(vec![ + Field::new("big_col", DataType::Int32, false), + Field::new("small_col", DataType::Int32, false), + ]); + let expression = Arc::new(BinaryExpr::new( + Arc::new(Column::new_with_schema("big_col", &intermediate_schema).unwrap()), + Operator::Gt, + Arc::new(Column::new_with_schema("big_col", &intermediate_schema).unwrap()), + )) as _; + Some(JoinFilter::new( + expression, + column_indices, + intermediate_schema, + )) + } + /// Returns three plans with statistics of (min, max, distinct_count) /// * big 100K rows @ (0, 50k, 50k) /// * medium 10K rows @ (1k, 5k, 1k) @@ -1151,6 +1223,137 @@ mod tests_statistical { crosscheck_plans(join).unwrap(); } + #[rstest( + join_type, + case::inner(JoinType::Inner), + case::left(JoinType::Left), + case::right(JoinType::Right), + case::full(JoinType::Full) + )] + #[tokio::test] + async fn test_nl_join_with_swap(join_type: JoinType) { + let (big, small) = create_big_and_small(); + + let join = Arc::new( + NestedLoopJoinExec::try_new( + Arc::clone(&big), + Arc::clone(&small), + nl_join_filter(), + &join_type, + ) + .unwrap(), + ); + + let optimized_join = JoinSelection::new() + .optimize(join.clone(), &ConfigOptions::new()) + .unwrap(); + + let swapping_projection = optimized_join + .as_any() + .downcast_ref::() + .expect("A proj is required to swap columns back to their original order"); + + assert_eq!(swapping_projection.expr().len(), 2); + let (col, name) = &swapping_projection.expr()[0]; + assert_eq!(name, "big_col"); + assert_col_expr(col, "big_col", 1); + let (col, name) = &swapping_projection.expr()[1]; + assert_eq!(name, "small_col"); + assert_col_expr(col, "small_col", 0); + + let swapped_join = swapping_projection + .input() + .as_any() + .downcast_ref::() + .expect("The type of the plan should not be changed"); + + // Assert join side of big_col swapped in filter expression + let swapped_filter = swapped_join.filter().unwrap(); + let swapped_big_col_idx = swapped_filter.schema().index_of("big_col").unwrap(); + let swapped_big_col_side = swapped_filter + .column_indices() + .get(swapped_big_col_idx) + .unwrap() + .side; + assert_eq!( + swapped_big_col_side, + JoinSide::Right, + "Filter column side should be swapped" + ); + + assert_eq!( + swapped_join.left().statistics().unwrap().total_byte_size, + Precision::Inexact(8192) + ); + assert_eq!( + swapped_join.right().statistics().unwrap().total_byte_size, + Precision::Inexact(2097152) + ); + crosscheck_plans(join.clone()).unwrap(); + } + + #[rstest( + join_type, + case::left_semi(JoinType::LeftSemi), + case::left_anti(JoinType::LeftAnti), + case::right_semi(JoinType::RightSemi), + case::right_anti(JoinType::RightAnti) + )] + #[tokio::test] + async fn test_nl_join_with_swap_no_proj(join_type: JoinType) { + let (big, small) = create_big_and_small(); + + let join = Arc::new( + NestedLoopJoinExec::try_new( + Arc::clone(&big), + Arc::clone(&small), + nl_join_filter(), + &join_type, + ) + .unwrap(), + ); + + let optimized_join = JoinSelection::new() + .optimize(join.clone(), &ConfigOptions::new()) + .unwrap(); + + let swapped_join = optimized_join + .as_any() + .downcast_ref::() + .expect("The type of the plan should not be changed"); + + // Assert before/after schemas are equal + assert_eq!( + join.schema(), + swapped_join.schema(), + "Join schema should not be modified while optimization" + ); + + // Assert join side of big_col swapped in filter expression + let swapped_filter = swapped_join.filter().unwrap(); + let swapped_big_col_idx = swapped_filter.schema().index_of("big_col").unwrap(); + let swapped_big_col_side = swapped_filter + .column_indices() + .get(swapped_big_col_idx) + .unwrap() + .side; + assert_eq!( + swapped_big_col_side, + JoinSide::Right, + "Filter column side should be swapped" + ); + + assert_eq!( + swapped_join.left().statistics().unwrap().total_byte_size, + Precision::Inexact(8192) + ); + assert_eq!( + swapped_join.right().statistics().unwrap().total_byte_size, + Precision::Inexact(2097152) + ); + crosscheck_plans(join.clone()).unwrap(); + } + #[tokio::test] async fn test_swap_reverting_projection() { let left_schema = Schema::new(vec![ diff --git a/datafusion/core/tests/fuzz_cases/join_fuzz.rs b/datafusion/core/tests/fuzz_cases/join_fuzz.rs index e25f04dc4beb..fbfa0ffc19b4 100644 --- a/datafusion/core/tests/fuzz_cases/join_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/join_fuzz.rs @@ -21,13 +21,19 @@ use arrow::array::{ArrayRef, Int32Array}; use arrow::compute::SortOptions; use arrow::record_batch::RecordBatch; use arrow::util::pretty::pretty_format_batches; +use arrow_schema::Schema; use rand::Rng; +use datafusion::common::JoinSide; +use datafusion::logical_expr::{JoinType, Operator}; +use datafusion::physical_expr::expressions::BinaryExpr; use datafusion::physical_plan::collect; use datafusion::physical_plan::expressions::Column; -use datafusion::physical_plan::joins::{HashJoinExec, PartitionMode, SortMergeJoinExec}; +use datafusion::physical_plan::joins::utils::{ColumnIndex, JoinFilter}; +use datafusion::physical_plan::joins::{ + HashJoinExec, NestedLoopJoinExec, PartitionMode, SortMergeJoinExec, +}; use datafusion::physical_plan::memory::MemoryExec; -use datafusion_expr::JoinType; use datafusion::prelude::{SessionConfig, SessionContext}; use test_utils::stagger_batch_with_seed; @@ -73,7 +79,7 @@ async fn test_full_join_1k() { } #[tokio::test] -async fn test_semi_join_1k() { +async fn test_semi_join_10k() { run_join_test( make_staggered_batches(10000), make_staggered_batches(10000), @@ -83,7 +89,7 @@ async fn test_semi_join_1k() { } #[tokio::test] -async fn test_anti_join_1k() { +async fn test_anti_join_10k() { run_join_test( make_staggered_batches(10000), make_staggered_batches(10000), @@ -118,6 +124,46 @@ async fn run_join_test( ), ]; + // Nested loop join uses filter for joining records + let column_indices = vec![ + ColumnIndex { + index: 0, + side: JoinSide::Left, + }, + ColumnIndex { + index: 1, + side: JoinSide::Left, + }, + ColumnIndex { + index: 0, + side: JoinSide::Right, + }, + ColumnIndex { + index: 1, + side: JoinSide::Right, + }, + ]; + let intermediate_schema = Schema::new(vec![ + schema1.field_with_name("a").unwrap().to_owned(), + schema1.field_with_name("b").unwrap().to_owned(), + schema2.field_with_name("a").unwrap().to_owned(), + schema2.field_with_name("b").unwrap().to_owned(), + ]); + + let equal_a = Arc::new(BinaryExpr::new( + Arc::new(Column::new("a", 0)), + Operator::Eq, + Arc::new(Column::new("a", 2)), + )) as _; + let equal_b = Arc::new(BinaryExpr::new( + Arc::new(Column::new("b", 1)), + Operator::Eq, + Arc::new(Column::new("b", 3)), + )) as _; + let expression = Arc::new(BinaryExpr::new(equal_a, Operator::And, equal_b)) as _; + + let on_filter = JoinFilter::new(expression, column_indices, intermediate_schema); + // sort-merge join let left = Arc::new( MemoryExec::try_new(&[input1.clone()], schema1.clone(), None).unwrap(), @@ -161,9 +207,23 @@ async fn run_join_test( ); let hj_collected = collect(hj, task_ctx.clone()).await.unwrap(); + // nested loop join + let left = Arc::new( + MemoryExec::try_new(&[input1.clone()], schema1.clone(), None).unwrap(), + ); + let right = Arc::new( + MemoryExec::try_new(&[input2.clone()], schema2.clone(), None).unwrap(), + ); + let nlj = Arc::new( + NestedLoopJoinExec::try_new(left, right, Some(on_filter), &join_type) + .unwrap(), + ); + let nlj_collected = collect(nlj, task_ctx.clone()).await.unwrap(); + // compare let smj_formatted = pretty_format_batches(&smj_collected).unwrap().to_string(); let hj_formatted = pretty_format_batches(&hj_collected).unwrap().to_string(); + let nlj_formatted = pretty_format_batches(&nlj_collected).unwrap().to_string(); let mut smj_formatted_sorted: Vec<&str> = smj_formatted.trim().lines().collect(); smj_formatted_sorted.sort_unstable(); @@ -171,12 +231,31 @@ async fn run_join_test( let mut hj_formatted_sorted: Vec<&str> = hj_formatted.trim().lines().collect(); hj_formatted_sorted.sort_unstable(); + let mut nlj_formatted_sorted: Vec<&str> = nlj_formatted.trim().lines().collect(); + nlj_formatted_sorted.sort_unstable(); + for (i, (smj_line, hj_line)) in smj_formatted_sorted .iter() .zip(&hj_formatted_sorted) .enumerate() { - assert_eq!((i, smj_line), (i, hj_line)); + assert_eq!( + (i, smj_line), + (i, hj_line), + "SortMergeJoinExec and HashJoinExec produced different results" + ); + } + + for (i, (nlj_line, hj_line)) in nlj_formatted_sorted + .iter() + .zip(&hj_formatted_sorted) + .enumerate() + { + assert_eq!( + (i, nlj_line), + (i, hj_line), + "NestedLoopJoinExec and HashJoinExec produced different results" + ); } } } diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs b/datafusion/physical-plan/src/joins/nested_loop_join.rs index c6d891dd13c1..2c16fff52750 100644 --- a/datafusion/physical-plan/src/joins/nested_loop_join.rs +++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs @@ -21,21 +21,22 @@ use std::any::Any; use std::fmt::Formatter; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use std::task::Poll; use crate::coalesce_batches::concat_batches; +use crate::coalesce_partitions::CoalescePartitionsExec; use crate::joins::utils::{ - append_right_indices, apply_join_filter_to_indices, build_batch_from_indices, - build_join_schema, check_join_is_valid, estimate_join_statistics, get_anti_indices, - get_final_indices_from_bit_map, get_semi_indices, - partitioned_join_output_partitioning, BuildProbeJoinMetrics, ColumnIndex, JoinFilter, - OnceAsync, OnceFut, + adjust_indices_by_join_type, adjust_right_output_partitioning, + apply_join_filter_to_indices, build_batch_from_indices, build_join_schema, + check_join_is_valid, estimate_join_statistics, get_final_indices_from_bit_map, + BuildProbeJoinMetrics, ColumnIndex, JoinFilter, OnceAsync, OnceFut, }; use crate::metrics::{ExecutionPlanMetricsSet, MetricsSet}; use crate::{ execution_mode_from_children, DisplayAs, DisplayFormatType, Distribution, - ExecutionMode, ExecutionPlan, ExecutionPlanProperties, PlanProperties, + ExecutionMode, ExecutionPlan, ExecutionPlanProperties, Partitioning, PlanProperties, RecordBatchStream, SendableRecordBatchStream, }; @@ -52,28 +53,90 @@ use datafusion_expr::JoinType; use datafusion_physical_expr::equivalence::join_equivalence_properties; use futures::{ready, Stream, StreamExt, TryStreamExt}; +use parking_lot::Mutex; + +use super::utils::need_produce_result_in_final; + +/// Shared bitmap for visited left-side indices +type SharedBitmapBuilder = Mutex; +/// Left (build-side) data +struct JoinLeftData { + /// Build-side data collected to single batch + batch: RecordBatch, + /// Shared bitmap builder for visited left indices + bitmap: SharedBitmapBuilder, + /// Counter of running probe-threads, potentially able to update `bitmap` + running_threads_counter: AtomicUsize, + /// Memory reservation for tracking batch and bitmap + /// Cleared on `JoinLeftData` drop + #[allow(dead_code)] + reservation: MemoryReservation, +} -/// Data of the inner table side -type JoinLeftData = (RecordBatch, MemoryReservation); +impl JoinLeftData { + fn new( + batch: RecordBatch, + bitmap: SharedBitmapBuilder, + running_threads_counter: AtomicUsize, + reservation: MemoryReservation, + ) -> Self { + Self { + batch, + bitmap, + running_threads_counter, + reservation, + } + } + + fn batch(&self) -> &RecordBatch { + &self.batch + } -/// NestedLoopJoinExec executes partitions in parallel. -/// One input will be collected to a single partition, call it inner-table. -/// The other side of the input is treated as outer-table, and the output Partitioning is from it. -/// Giving an output partition number x, the execution will be: + fn bitmap(&self) -> &SharedBitmapBuilder { + &self.bitmap + } + + /// Decrements counter of running threads, and returns `true` + /// if caller is the last running thread + fn report_probe_completed(&self) -> bool { + self.running_threads_counter.fetch_sub(1, Ordering::Relaxed) == 1 + } +} + +/// NestedLoopJoinExec is build-probe join operator, whose main task is to +/// perform joins without any equijoin conditions in `ON` clause. +/// +/// Execution consists of following phases: /// -/// ```text -/// for outer-table-batch in outer-table-partition-x -/// check-join(outer-table-batch, inner-table-data) -/// ``` +/// #### 1. Build phase +/// Collecting build-side data in memory, by polling all available data from build-side input. +/// Due to the absence of equijoin conditions, it's not possible to partition build-side data +/// across multiple threads of the operator, so build-side is always collected in a single +/// batch shared across all threads. +/// The operator always considers LEFT input as build-side input, so it's crucial to adjust +/// smaller input to be the LEFT one. Normally this selection is handled by physical optimizer. /// -/// One of the inputs will become inner table, and it is decided by the join type. -/// Following is the relation table: +/// #### 2. Probe phase +/// Sequentially polling batches from the probe-side input and processing them according to the +/// following logic: +/// - apply join filter (`ON` clause) to Carthesian product of probe batch adn build side data +/// -- filter evaluation is executed once per build-side data row +/// - update shared bitmap of joined ("visited") build-side row indices, if required -- allows +/// to produce unmatched build-side data in case of e.g. LEFT/FULL JOIN after probing phase +/// completed +/// - perform join index alignment is required -- depending on `JoinType` +/// - produce output join batch /// -/// | JoinType | Distribution (left, right) | Inner-table | -/// |--------------------------------|--------------------------------------------|-------------| -/// | Inner/Left/LeftSemi/LeftAnti | (UnspecifiedDistribution, SinglePartition) | right | -/// | Right/RightSemi/RightAnti/Full | (SinglePartition, UnspecifiedDistribution) | left | -/// | Full | (SinglePartition, SinglePartition) | left | +/// Probing phase is executed in parallel, according to probe-side input partitioning -- one +/// thread per partition. After probe input is exhausted, each thread **ATTEMPTS** to produce +/// unmatched build-side data. +/// +/// #### 3. Producing unmatched build-side data +/// Producing unmatched build-side data as an output batch, after probe input is exhausted. +/// This step is also executed in parallel (once per probe input partition), and to avoid +/// duplicate output of unmatched data (due to shared nature build-side data), each thread +/// "reports" about probe phase completion (which means that "visited" bitmap won't be +/// updated anymore), and only the last thread, reporting about completion, will return output. /// #[derive(Debug)] pub struct NestedLoopJoinExec { @@ -112,6 +175,7 @@ impl NestedLoopJoinExec { build_join_schema(&left_schema, &right_schema, join_type); let schema = Arc::new(schema); let cache = Self::compute_properties(&left, &right, schema.clone(), *join_type); + Ok(NestedLoopJoinExec { left, right, @@ -165,15 +229,19 @@ impl NestedLoopJoinExec { ); // Get output partitioning, - let output_partitioning = if join_type == JoinType::Full { - left.output_partitioning().clone() - } else { - partitioned_join_output_partitioning( - join_type, - left.output_partitioning(), + let output_partitioning = match join_type { + JoinType::Inner | JoinType::Right => adjust_right_output_partitioning( right.output_partitioning(), - left.schema().fields.len(), - ) + left.schema().fields().len(), + ), + JoinType::RightSemi | JoinType::RightAnti => { + right.output_partitioning().clone() + } + JoinType::Left | JoinType::LeftSemi | JoinType::LeftAnti | JoinType::Full => { + Partitioning::UnknownPartitioning( + right.output_partitioning().partition_count(), + ) + } }; // Determine execution mode: @@ -214,7 +282,10 @@ impl ExecutionPlan for NestedLoopJoinExec { } fn required_input_distribution(&self) -> Vec { - distribution_from_join_type(&self.join_type) + vec![ + Distribution::SinglePartition, + Distribution::UnspecifiedDistribution, + ] } fn children(&self) -> Vec> { @@ -245,38 +316,17 @@ impl ExecutionPlan for NestedLoopJoinExec { MemoryConsumer::new(format!("NestedLoopJoinLoad[{partition}]")) .register(context.memory_pool()); - // Initialization of stream-level reservation - let reservation = - MemoryConsumer::new(format!("NestedLoopJoinStream[{partition}]")) - .register(context.memory_pool()); - - let (outer_table, inner_table) = if left_is_build_side(self.join_type) { - // left must be single partition - let inner_table = self.inner_table.once(|| { - load_specified_partition_of_input( - 0, - self.left.clone(), - context.clone(), - join_metrics.clone(), - load_reservation, - ) - }); - let outer_table = self.right.execute(partition, context)?; - (outer_table, inner_table) - } else { - // right must be single partition - let inner_table = self.inner_table.once(|| { - load_specified_partition_of_input( - 0, - self.right.clone(), - context.clone(), - join_metrics.clone(), - load_reservation, - ) - }); - let outer_table = self.left.execute(partition, context)?; - (outer_table, inner_table) - }; + let inner_table = self.inner_table.once(|| { + collect_left_input( + self.left.clone(), + context.clone(), + join_metrics.clone(), + load_reservation, + need_produce_result_in_final(self.join_type), + self.right().output_partitioning().partition_count(), + ) + }); + let outer_table = self.right.execute(partition, context)?; Ok(Box::pin(NestedLoopJoinStream { schema: self.schema.clone(), @@ -285,10 +335,8 @@ impl ExecutionPlan for NestedLoopJoinExec { outer_table, inner_table, is_exhausted: false, - visited_left_side: None, column_indices: self.column_indices.clone(), join_metrics, - reservation, })) } @@ -307,43 +355,25 @@ impl ExecutionPlan for NestedLoopJoinExec { } } -// For the nested loop join, different join type need the different distribution for -// left and right node. -fn distribution_from_join_type(join_type: &JoinType) -> Vec { - match join_type { - JoinType::Inner | JoinType::Left | JoinType::LeftSemi | JoinType::LeftAnti => { - // need the left data, and the right should be one partition - vec![ - Distribution::UnspecifiedDistribution, - Distribution::SinglePartition, - ] - } - JoinType::Right | JoinType::RightSemi | JoinType::RightAnti => { - // need the right data, and the left should be one partition - vec![ - Distribution::SinglePartition, - Distribution::UnspecifiedDistribution, - ] - } - JoinType::Full => { - // need the left and right data, and the left and right should be one partition - vec![Distribution::SinglePartition, Distribution::SinglePartition] - } - } -} - -/// Asynchronously collect the specified partition data of the input -async fn load_specified_partition_of_input( - partition: usize, +/// Asynchronously collect input into a single batch, and creates `JoinLeftData` from it +async fn collect_left_input( input: Arc, context: Arc, join_metrics: BuildProbeJoinMetrics, reservation: MemoryReservation, + with_visited_left_side: bool, + probing_threads: usize, ) -> Result { - let stream = input.execute(partition, context)?; + let schema = input.schema(); + let merge = if input.output_partitioning().partition_count() != 1 { + Arc::new(CoalescePartitionsExec::new(input)) + } else { + input + }; + let stream = merge.execute(0, context)?; // Load all batches and count the rows - let (batches, num_rows, _, reservation) = stream + let (batches, num_rows, metrics, mut reservation) = stream .try_fold( (Vec::new(), 0usize, join_metrics, reservation), |mut acc, batch| async { @@ -363,19 +393,31 @@ async fn load_specified_partition_of_input( ) .await?; - let merged_batch = concat_batches(&input.schema(), &batches, num_rows)?; + let merged_batch = concat_batches(&schema, &batches, num_rows)?; - Ok((merged_batch, reservation)) -} + // Reserve memory for visited_left_side bitmap if required by join type + let visited_left_side = if with_visited_left_side { + // TODO: Replace `ceil` wrapper with stable `div_cell` after + // https://github.com/rust-lang/rust/issues/88581 + let buffer_size = bit_util::ceil(merged_batch.num_rows(), 8); + reservation.try_grow(buffer_size)?; + metrics.build_mem_used.add(buffer_size); -// BuildLeft means the left relation is the single patrition side. -// For full join, both side are single partition, so it is BuildLeft and BuildRight, treat it as BuildLeft. -pub fn left_is_build_side(join_type: JoinType) -> bool { - matches!( - join_type, - JoinType::Right | JoinType::RightSemi | JoinType::RightAnti | JoinType::Full - ) + let mut buffer = BooleanBufferBuilder::new(merged_batch.num_rows()); + buffer.append_n(merged_batch.num_rows(), false); + buffer + } else { + BooleanBufferBuilder::new(0) + }; + + Ok(JoinLeftData::new( + merged_batch, + Mutex::new(visited_left_side), + AtomicUsize::new(probing_threads), + reservation, + )) } + /// A stream that issues [RecordBatch]es as they arrive from the right of the join. struct NestedLoopJoinStream { /// Input schema @@ -390,16 +432,12 @@ struct NestedLoopJoinStream { inner_table: OnceFut, /// There is nothing to process anymore and left side is processed in case of full join is_exhausted: bool, - /// Keeps track of the left side rows whether they are visited - visited_left_side: Option, /// Information of index and left / right placement of columns column_indices: Vec, // TODO: support null aware equal // null_equals_null: bool /// Join execution metrics join_metrics: BuildProbeJoinMetrics, - /// Memory reservation for visited_left_side - reservation: MemoryReservation, } fn build_join_indices( @@ -430,39 +468,20 @@ fn build_join_indices( } impl NestedLoopJoinStream { - /// For Right/RightSemi/RightAnti/Full joins, left is the single partition side. - fn poll_next_impl_for_build_left( + fn poll_next_impl( &mut self, cx: &mut std::task::Context<'_>, ) -> Poll>> { // all left row let build_timer = self.join_metrics.build_time.timer(); - let (left_data, _) = match ready!(self.inner_table.get(cx)) { + let left_data = match ready!(self.inner_table.get_shared(cx)) { Ok(data) => data, Err(e) => return Poll::Ready(Some(Err(e))), }; build_timer.done(); - if self.visited_left_side.is_none() && self.join_type == JoinType::Full { - // TODO: Replace `ceil` wrapper with stable `div_cell` after - // https://github.com/rust-lang/rust/issues/88581 - let visited_bitmap_size = bit_util::ceil(left_data.num_rows(), 8); - self.reservation.try_grow(visited_bitmap_size)?; - self.join_metrics.build_mem_used.add(visited_bitmap_size); - } - - // add a bitmap for full join. - let visited_left_side = self.visited_left_side.get_or_insert_with(|| { - let left_num_rows = left_data.num_rows(); - // only full join need bitmap - if self.join_type == JoinType::Full { - let mut buffer = BooleanBufferBuilder::new(left_num_rows); - buffer.append_n(left_num_rows, false); - buffer - } else { - BooleanBufferBuilder::new(0) - } - }); + // Get or initialize visited_left_side bitmap if required by join type + let visited_left_side = left_data.bitmap(); self.outer_table .poll_next_unpin(cx) @@ -474,7 +493,7 @@ impl NestedLoopJoinStream { let timer = self.join_metrics.join_time.timer(); let result = join_left_and_right_batch( - left_data, + left_data.batch(), &right_batch, self.join_type, self.filter.as_ref(), @@ -494,21 +513,32 @@ impl NestedLoopJoinStream { } Some(err) => Some(err), None => { - if self.join_type == JoinType::Full && !self.is_exhausted { + if need_produce_result_in_final(self.join_type) && !self.is_exhausted + { + // At this stage `visited_left_side` won't be updated, so it's + // safe to report about probe completion. + // + // Setting `is_exhausted` / returning None will prevent from + // multiple calls of `report_probe_completed()` + if !left_data.report_probe_completed() { + self.is_exhausted = true; + return None; + }; + // Only setting up timer, input is exhausted let timer = self.join_metrics.join_time.timer(); - // use the global left bitmap to produce the left indices and right indices - let (left_side, right_side) = get_final_indices_from_bit_map( - visited_left_side, - self.join_type, - ); + let (left_side, right_side) = + get_final_indices_from_shared_bitmap( + visited_left_side, + self.join_type, + ); let empty_right_batch = RecordBatch::new_empty(self.outer_table.schema()); // use the left and right indices to produce the batch result let result = build_batch_from_indices( &self.schema, - left_data, + left_data.batch(), &empty_right_batch, &left_side, &right_side, @@ -532,55 +562,6 @@ impl NestedLoopJoinStream { } }) } - - /// For Inner/Left/LeftSemi/LeftAnti joins, right is the single partition side. - fn poll_next_impl_for_build_right( - &mut self, - cx: &mut std::task::Context<'_>, - ) -> Poll>> { - // all right row - let build_timer = self.join_metrics.build_time.timer(); - let (right_data, _) = match ready!(self.inner_table.get(cx)) { - Ok(data) => data, - Err(e) => return Poll::Ready(Some(Err(e))), - }; - build_timer.done(); - - // for build right, bitmap is not needed. - let mut empty_visited_left_side = BooleanBufferBuilder::new(0); - self.outer_table - .poll_next_unpin(cx) - .map(|maybe_batch| match maybe_batch { - Some(Ok(left_batch)) => { - // Setting up timer & updating input metrics - self.join_metrics.input_batches.add(1); - self.join_metrics.input_rows.add(left_batch.num_rows()); - let timer = self.join_metrics.join_time.timer(); - - // Actual join execution - let result = join_left_and_right_batch( - &left_batch, - right_data, - self.join_type, - self.filter.as_ref(), - &self.column_indices, - &self.schema, - &mut empty_visited_left_side, - ); - - // Recording time & updating output metrics - if let Ok(batch) = &result { - timer.done(); - self.join_metrics.output_batches.add(1); - self.join_metrics.output_rows.add(batch.num_rows()); - } - - Some(result) - } - Some(err) => Some(err), - None => None, - }) - } } fn join_left_and_right_batch( @@ -590,7 +571,7 @@ fn join_left_and_right_batch( filter: Option<&JoinFilter>, column_indices: &[ColumnIndex], schema: &Schema, - visited_left_side: &mut BooleanBufferBuilder, + visited_left_side: &SharedBitmapBuilder, ) -> Result { let indices_result = (0..left_batch.num_rows()) .map(|left_row_index| { @@ -621,17 +602,17 @@ fn join_left_and_right_batch( Ok((left_side, right_side)) => { // set the left bitmap // and only full join need the left bitmap - if join_type == JoinType::Full { + if need_produce_result_in_final(join_type) { + let mut bitmap = visited_left_side.lock(); left_side.iter().flatten().for_each(|x| { - visited_left_side.set_bit(x as usize, true); + bitmap.set_bit(x as usize, true); }); } // adjust the two side indices base on the join type let (left_side, right_side) = adjust_indices_by_join_type( left_side, right_side, - left_batch.num_rows(), - right_batch.num_rows(), + 0..right_batch.num_rows(), join_type, ); @@ -649,86 +630,12 @@ fn join_left_and_right_batch( } } -fn adjust_indices_by_join_type( - left_indices: UInt64Array, - right_indices: UInt32Array, - count_left_batch: usize, - count_right_batch: usize, +fn get_final_indices_from_shared_bitmap( + shared_bitmap: &SharedBitmapBuilder, join_type: JoinType, ) -> (UInt64Array, UInt32Array) { - match join_type { - JoinType::Inner => (left_indices, right_indices), - JoinType::Left => { - // matched - // unmatched left row will be produced in this batch - let left_unmatched_indices = - get_anti_indices(0..count_left_batch, &left_indices); - // combine the matched and unmatched left result together - append_left_indices(left_indices, right_indices, left_unmatched_indices) - } - JoinType::LeftSemi => { - // need to remove the duplicated record in the left side - let left_indices = get_semi_indices(0..count_left_batch, &left_indices); - // the right_indices will not be used later for the `left semi` join - (left_indices, right_indices) - } - JoinType::LeftAnti => { - // need to remove the duplicated record in the left side - // get the anti index for the left side - let left_indices = get_anti_indices(0..count_left_batch, &left_indices); - // the right_indices will not be used later for the `left anti` join - (left_indices, right_indices) - } - // right/right-semi/right-anti => right = outer_table, left = inner_table - JoinType::Right | JoinType::Full => { - // matched - // unmatched right row will be produced in this batch - let right_unmatched_indices = - get_anti_indices(0..count_right_batch, &right_indices); - // combine the matched and unmatched right result together - append_right_indices(left_indices, right_indices, right_unmatched_indices) - } - JoinType::RightSemi => { - // need to remove the duplicated record in the right side - let right_indices = get_semi_indices(0..count_right_batch, &right_indices); - // the left_indices will not be used later for the `right semi` join - (left_indices, right_indices) - } - JoinType::RightAnti => { - // need to remove the duplicated record in the right side - // get the anti index for the right side - let right_indices = get_anti_indices(0..count_right_batch, &right_indices); - // the left_indices will not be used later for the `right anti` join - (left_indices, right_indices) - } - } -} - -/// Appends the `left_unmatched_indices` to the `left_indices`, -/// and fills Null to tail of `right_indices` to -/// keep the length of `left_indices` and `right_indices` consistent. -fn append_left_indices( - left_indices: UInt64Array, - right_indices: UInt32Array, - left_unmatched_indices: UInt64Array, -) -> (UInt64Array, UInt32Array) { - if left_unmatched_indices.is_empty() { - (left_indices, right_indices) - } else { - let unmatched_size = left_unmatched_indices.len(); - // the new left indices: left_indices + null array - // the new right indices: right_indices + right_unmatched_indices - let new_left_indices = left_indices - .iter() - .chain(left_unmatched_indices.iter()) - .collect::(); - let new_right_indices = right_indices - .iter() - .chain(std::iter::repeat(None).take(unmatched_size)) - .collect::(); - - (new_left_indices, new_right_indices) - } + let bitmap = shared_bitmap.lock(); + get_final_indices_from_bit_map(&bitmap, join_type) } impl Stream for NestedLoopJoinStream { @@ -738,11 +645,7 @@ impl Stream for NestedLoopJoinStream { mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> Poll> { - if left_is_build_side(self.join_type) { - self.poll_next_impl_for_build_left(cx) - } else { - self.poll_next_impl_for_build_right(cx) - } + self.poll_next_impl(cx) } } @@ -847,35 +750,19 @@ mod tests { context: Arc, ) -> Result<(Vec, Vec)> { let partition_count = 4; - let mut output_partition = 1; - let distribution = distribution_from_join_type(join_type); - // left - let left = if matches!(distribution[0], Distribution::SinglePartition) { - left - } else { - output_partition = partition_count; - Arc::new(RepartitionExec::try_new( - left, - Partitioning::RoundRobinBatch(partition_count), - )?) - } as Arc; - - let right = if matches!(distribution[1], Distribution::SinglePartition) { - right - } else { - output_partition = partition_count; - Arc::new(RepartitionExec::try_new( - right, - Partitioning::RoundRobinBatch(partition_count), - )?) - } as Arc; + + // Redistributing right input + let right = Arc::new(RepartitionExec::try_new( + right, + Partitioning::RoundRobinBatch(partition_count), + )?) as Arc; // Use the required distribution for nested loop join to test partition data let nested_loop_join = NestedLoopJoinExec::try_new(left, right, join_filter, join_type)?; let columns = columns(&nested_loop_join.schema()); let mut batches = vec![]; - for i in 0..output_partition { + for i in 0..partition_count { let stream = nested_loop_join.execute(i, context.clone())?; let more_batches = common::collect(stream).await?; batches.extend( diff --git a/datafusion/sqllogictest/test_files/cte.slt b/datafusion/sqllogictest/test_files/cte.slt index 6b9db5589391..67d2beb622d3 100644 --- a/datafusion/sqllogictest/test_files/cte.slt +++ b/datafusion/sqllogictest/test_files/cte.slt @@ -349,6 +349,7 @@ SELECT * FROM my_cte t1, my_cte 6 5 6 6 +# TODO: (probably) infinite recursion when recursive CTE is on build-side of join (works for CrossJoinExec / NLJoinExec) # CTE within recursive CTE works and does not result in 'index out of bounds: the len is 0 but the index is 0' query I WITH RECURSIVE "recursive_cte" AS ( @@ -364,8 +365,8 @@ WITH RECURSIVE "recursive_cte" AS ( SELECT 2 as "val" FROM - "recursive_cte" - FULL JOIN "sub_cte" ON 1 = 1 + "sub_cte" + FULL JOIN "recursive_cte" ON 1 = 1 WHERE "recursive_cte"."val" < 2 ) diff --git a/datafusion/sqllogictest/test_files/group_by.slt b/datafusion/sqllogictest/test_files/group_by.slt index 3d9f8ff3ad2c..080f7c209634 100644 --- a/datafusion/sqllogictest/test_files/group_by.slt +++ b/datafusion/sqllogictest/test_files/group_by.slt @@ -3429,9 +3429,9 @@ SortPreservingMergeExec: [sn@0 ASC NULLS LAST] ------------AggregateExec: mode=Partial, gby=[sn@1 as sn, amount@2 as amount], aggr=[SUM(l.amount)] --------------ProjectionExec: expr=[amount@1 as amount, sn@2 as sn, amount@3 as amount] ----------------NestedLoopJoinExec: join_type=Inner, filter=sn@0 >= sn@1 -------------------MemoryExec: partitions=8, partition_sizes=[1, 0, 0, 0, 0, 0, 0, 0] ------------------CoalescePartitionsExec --------------------MemoryExec: partitions=8, partition_sizes=[1, 0, 0, 0, 0, 0, 0, 0] +------------------MemoryExec: partitions=8, partition_sizes=[1, 0, 0, 0, 0, 0, 0, 0] query IRR SELECT r.sn, SUM(l.amount), r.amount diff --git a/datafusion/sqllogictest/test_files/joins.slt b/datafusion/sqllogictest/test_files/joins.slt index 344b65a7c2aa..3b36f40c5266 100644 --- a/datafusion/sqllogictest/test_files/joins.slt +++ b/datafusion/sqllogictest/test_files/joins.slt @@ -2010,7 +2010,8 @@ set datafusion.explain.logical_plan_only = false; statement ok set datafusion.execution.target_partitions = 4; -# Right as inner table nested loop join +# Planning inner nested loop join +# inputs are swapped due to inexact statistics + join reordering caused additional projection query TT EXPLAIN @@ -2027,17 +2028,18 @@ Inner Join: Filter: join_t1.t1_id > join_t2.t2_id ----Filter: join_t2.t2_int > UInt32(1) ------TableScan: join_t2 projection=[t2_id, t2_int] physical_plan -NestedLoopJoinExec: join_type=Inner, filter=t1_id@0 > t2_id@1 ---CoalesceBatchesExec: target_batch_size=2 -----FilterExec: t1_id@0 > 10 -------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 ---------MemoryExec: partitions=1, partition_sizes=[1] ---CoalescePartitionsExec -----ProjectionExec: expr=[t2_id@0 as t2_id] -------CoalesceBatchesExec: target_batch_size=2 ---------FilterExec: t2_int@1 > 1 -----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -------------MemoryExec: partitions=1, partition_sizes=[1] +ProjectionExec: expr=[t1_id@1 as t1_id, t2_id@0 as t2_id] +--NestedLoopJoinExec: join_type=Inner, filter=t1_id@0 > t2_id@1 +----CoalescePartitionsExec +------ProjectionExec: expr=[t2_id@0 as t2_id] +--------CoalesceBatchesExec: target_batch_size=2 +----------FilterExec: t2_int@1 > 1 +------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +--------------MemoryExec: partitions=1, partition_sizes=[1] +----CoalesceBatchesExec: target_batch_size=2 +------FilterExec: t1_id@0 > 10 +--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +----------MemoryExec: partitions=1, partition_sizes=[1] query II SELECT join_t1.t1_id, join_t2.t2_id @@ -3473,9 +3475,9 @@ Inner Join: Filter: r.a < l.a ----TableScan: annotated_data projection=[a0, a, b, c, d] physical_plan NestedLoopJoinExec: join_type=Inner, filter=a@1 < a@0 +--CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], has_header=true --RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 ----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], has_header=true ---CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], has_header=true # Currently datafusion cannot pushdown filter conditions with scalar UDF into # cross join. diff --git a/datafusion/sqllogictest/test_files/tpch/q11.slt.part b/datafusion/sqllogictest/test_files/tpch/q11.slt.part index 9e3d0e6d37ae..28059a2ea7a4 100644 --- a/datafusion/sqllogictest/test_files/tpch/q11.slt.part +++ b/datafusion/sqllogictest/test_files/tpch/q11.slt.part @@ -75,10 +75,10 @@ Limit: skip=0, fetch=10 ----------------------TableScan: nation projection=[n_nationkey, n_name], partial_filters=[nation.n_name = Utf8("GERMANY")] physical_plan GlobalLimitExec: skip=0, fetch=10 ---SortPreservingMergeExec: [value@1 DESC], fetch=10 -----SortExec: TopK(fetch=10), expr=[value@1 DESC] -------ProjectionExec: expr=[ps_partkey@0 as ps_partkey, SUM(partsupp.ps_supplycost * partsupp.ps_availqty)@1 as value] ---------NestedLoopJoinExec: join_type=Inner, filter=CAST(SUM(partsupp.ps_supplycost * partsupp.ps_availqty)@0 AS Decimal128(38, 15)) > SUM(partsupp.ps_supplycost * partsupp.ps_availqty) * Float64(0.0001)@1 +--SortExec: TopK(fetch=10), expr=[value@1 DESC] +----ProjectionExec: expr=[ps_partkey@0 as ps_partkey, SUM(partsupp.ps_supplycost * partsupp.ps_availqty)@1 as value] +------NestedLoopJoinExec: join_type=Inner, filter=CAST(SUM(partsupp.ps_supplycost * partsupp.ps_availqty)@0 AS Decimal128(38, 15)) > SUM(partsupp.ps_supplycost * partsupp.ps_availqty) * Float64(0.0001)@1 +--------CoalescePartitionsExec ----------AggregateExec: mode=FinalPartitioned, gby=[ps_partkey@0 as ps_partkey], aggr=[SUM(partsupp.ps_supplycost * partsupp.ps_availqty)] ------------CoalesceBatchesExec: target_batch_size=8192 --------------RepartitionExec: partitioning=Hash([ps_partkey@0], 4), input_partitions=4 @@ -103,32 +103,30 @@ GlobalLimitExec: skip=0, fetch=10 ------------------------------FilterExec: n_name@1 = GERMANY --------------------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 ----------------------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/nation.tbl]]}, projection=[n_nationkey, n_name], has_header=false -----------ProjectionExec: expr=[CAST(CAST(SUM(partsupp.ps_supplycost * partsupp.ps_availqty)@0 AS Float64) * 0.0001 AS Decimal128(38, 15)) as SUM(partsupp.ps_supplycost * partsupp.ps_availqty) * Float64(0.0001)] -------------AggregateExec: mode=Final, gby=[], aggr=[SUM(partsupp.ps_supplycost * partsupp.ps_availqty)] ---------------CoalescePartitionsExec -----------------AggregateExec: mode=Partial, gby=[], aggr=[SUM(partsupp.ps_supplycost * partsupp.ps_availqty)] -------------------CoalesceBatchesExec: target_batch_size=8192 ---------------------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(s_nationkey@2, n_nationkey@0)], projection=[ps_availqty@0, ps_supplycost@1] -----------------------CoalesceBatchesExec: target_batch_size=8192 -------------------------RepartitionExec: partitioning=Hash([s_nationkey@2], 4), input_partitions=4 ---------------------------CoalesceBatchesExec: target_batch_size=8192 -----------------------------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(ps_suppkey@0, s_suppkey@0)], projection=[ps_availqty@1, ps_supplycost@2, s_nationkey@4] -------------------------------CoalesceBatchesExec: target_batch_size=8192 ---------------------------------RepartitionExec: partitioning=Hash([ps_suppkey@0], 4), input_partitions=4 -----------------------------------CsvExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/partsupp.tbl:0..2932049], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/partsupp.tbl:2932049..5864098], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/partsupp.tbl:5864098..8796147], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/partsupp.tbl:8796147..11728193]]}, projection=[ps_suppkey, ps_availqty, ps_supplycost], has_header=false -------------------------------CoalesceBatchesExec: target_batch_size=8192 ---------------------------------RepartitionExec: partitioning=Hash([s_suppkey@0], 4), input_partitions=4 -----------------------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -------------------------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/supplier.tbl]]}, projection=[s_suppkey, s_nationkey], has_header=false -----------------------CoalesceBatchesExec: target_batch_size=8192 -------------------------RepartitionExec: partitioning=Hash([n_nationkey@0], 4), input_partitions=4 ---------------------------ProjectionExec: expr=[n_nationkey@0 as n_nationkey] +--------ProjectionExec: expr=[CAST(CAST(SUM(partsupp.ps_supplycost * partsupp.ps_availqty)@0 AS Float64) * 0.0001 AS Decimal128(38, 15)) as SUM(partsupp.ps_supplycost * partsupp.ps_availqty) * Float64(0.0001)] +----------AggregateExec: mode=Final, gby=[], aggr=[SUM(partsupp.ps_supplycost * partsupp.ps_availqty)] +------------CoalescePartitionsExec +--------------AggregateExec: mode=Partial, gby=[], aggr=[SUM(partsupp.ps_supplycost * partsupp.ps_availqty)] +----------------CoalesceBatchesExec: target_batch_size=8192 +------------------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(s_nationkey@2, n_nationkey@0)], projection=[ps_availqty@0, ps_supplycost@1] +--------------------CoalesceBatchesExec: target_batch_size=8192 +----------------------RepartitionExec: partitioning=Hash([s_nationkey@2], 4), input_partitions=4 +------------------------CoalesceBatchesExec: target_batch_size=8192 +--------------------------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(ps_suppkey@0, s_suppkey@0)], projection=[ps_availqty@1, ps_supplycost@2, s_nationkey@4] ----------------------------CoalesceBatchesExec: target_batch_size=8192 -------------------------------FilterExec: n_name@1 = GERMANY +------------------------------RepartitionExec: partitioning=Hash([ps_suppkey@0], 4), input_partitions=4 +--------------------------------CsvExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/partsupp.tbl:0..2932049], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/partsupp.tbl:2932049..5864098], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/partsupp.tbl:5864098..8796147], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/partsupp.tbl:8796147..11728193]]}, projection=[ps_suppkey, ps_availqty, ps_supplycost], has_header=false +----------------------------CoalesceBatchesExec: target_batch_size=8192 +------------------------------RepartitionExec: partitioning=Hash([s_suppkey@0], 4), input_partitions=4 --------------------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -----------------------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/nation.tbl]]}, projection=[n_nationkey, n_name], has_header=false - - +----------------------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/supplier.tbl]]}, projection=[s_suppkey, s_nationkey], has_header=false +--------------------CoalesceBatchesExec: target_batch_size=8192 +----------------------RepartitionExec: partitioning=Hash([n_nationkey@0], 4), input_partitions=4 +------------------------ProjectionExec: expr=[n_nationkey@0 as n_nationkey] +--------------------------CoalesceBatchesExec: target_batch_size=8192 +----------------------------FilterExec: n_name@1 = GERMANY +------------------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +--------------------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/nation.tbl]]}, projection=[n_nationkey, n_name], has_header=false query IR select diff --git a/datafusion/sqllogictest/test_files/tpch/q22.slt.part b/datafusion/sqllogictest/test_files/tpch/q22.slt.part index c4556fee3fae..6f9f6524a451 100644 --- a/datafusion/sqllogictest/test_files/tpch/q22.slt.part +++ b/datafusion/sqllogictest/test_files/tpch/q22.slt.part @@ -82,27 +82,28 @@ SortPreservingMergeExec: [cntrycode@0 ASC NULLS LAST] ----------RepartitionExec: partitioning=Hash([cntrycode@0], 4), input_partitions=4 ------------AggregateExec: mode=Partial, gby=[cntrycode@0 as cntrycode], aggr=[COUNT(*), SUM(custsale.c_acctbal)] --------------ProjectionExec: expr=[substr(c_phone@0, 1, 2) as cntrycode, c_acctbal@1 as c_acctbal] -----------------NestedLoopJoinExec: join_type=Inner, filter=CAST(c_acctbal@0 AS Decimal128(19, 6)) > AVG(customer.c_acctbal)@1 -------------------CoalesceBatchesExec: target_batch_size=8192 ---------------------HashJoinExec: mode=Partitioned, join_type=LeftAnti, on=[(c_custkey@0, o_custkey@0)], projection=[c_phone@1, c_acctbal@2] +----------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +------------------NestedLoopJoinExec: join_type=Inner, filter=CAST(c_acctbal@0 AS Decimal128(19, 6)) > AVG(customer.c_acctbal)@1 +--------------------CoalescePartitionsExec ----------------------CoalesceBatchesExec: target_batch_size=8192 -------------------------RepartitionExec: partitioning=Hash([c_custkey@0], 4), input_partitions=4 +------------------------HashJoinExec: mode=Partitioned, join_type=LeftAnti, on=[(c_custkey@0, o_custkey@0)], projection=[c_phone@1, c_acctbal@2] --------------------------CoalesceBatchesExec: target_batch_size=8192 -----------------------------FilterExec: Use substr(c_phone@1, 1, 2) IN (SET) ([Literal { value: Utf8("13") }, Literal { value: Utf8("31") }, Literal { value: Utf8("23") }, Literal { value: Utf8("29") }, Literal { value: Utf8("30") }, Literal { value: Utf8("18") }, Literal { value: Utf8("17") }]) -------------------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 ---------------------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/customer.tbl]]}, projection=[c_custkey, c_phone, c_acctbal], has_header=false -----------------------CoalesceBatchesExec: target_batch_size=8192 -------------------------RepartitionExec: partitioning=Hash([o_custkey@0], 4), input_partitions=4 ---------------------------CsvExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/orders.tbl:0..4223281], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/orders.tbl:4223281..8446562], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/orders.tbl:8446562..12669843], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/orders.tbl:12669843..16893122]]}, projection=[o_custkey], has_header=false -------------------AggregateExec: mode=Final, gby=[], aggr=[AVG(customer.c_acctbal)] ---------------------CoalescePartitionsExec -----------------------AggregateExec: mode=Partial, gby=[], aggr=[AVG(customer.c_acctbal)] -------------------------ProjectionExec: expr=[c_acctbal@1 as c_acctbal] +----------------------------RepartitionExec: partitioning=Hash([c_custkey@0], 4), input_partitions=4 +------------------------------CoalesceBatchesExec: target_batch_size=8192 +--------------------------------FilterExec: Use substr(c_phone@1, 1, 2) IN (SET) ([Literal { value: Utf8("13") }, Literal { value: Utf8("31") }, Literal { value: Utf8("23") }, Literal { value: Utf8("29") }, Literal { value: Utf8("30") }, Literal { value: Utf8("18") }, Literal { value: Utf8("17") }]) +----------------------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +------------------------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/customer.tbl]]}, projection=[c_custkey, c_phone, c_acctbal], has_header=false --------------------------CoalesceBatchesExec: target_batch_size=8192 -----------------------------FilterExec: c_acctbal@1 > Some(0),15,2 AND Use substr(c_phone@0, 1, 2) IN (SET) ([Literal { value: Utf8("13") }, Literal { value: Utf8("31") }, Literal { value: Utf8("23") }, Literal { value: Utf8("29") }, Literal { value: Utf8("30") }, Literal { value: Utf8("18") }, Literal { value: Utf8("17") }]) -------------------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 ---------------------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/customer.tbl]]}, projection=[c_phone, c_acctbal], has_header=false - +----------------------------RepartitionExec: partitioning=Hash([o_custkey@0], 4), input_partitions=4 +------------------------------CsvExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/orders.tbl:0..4223281], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/orders.tbl:4223281..8446562], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/orders.tbl:8446562..12669843], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/orders.tbl:12669843..16893122]]}, projection=[o_custkey], has_header=false +--------------------AggregateExec: mode=Final, gby=[], aggr=[AVG(customer.c_acctbal)] +----------------------CoalescePartitionsExec +------------------------AggregateExec: mode=Partial, gby=[], aggr=[AVG(customer.c_acctbal)] +--------------------------ProjectionExec: expr=[c_acctbal@1 as c_acctbal] +----------------------------CoalesceBatchesExec: target_batch_size=8192 +------------------------------FilterExec: c_acctbal@1 > Some(0),15,2 AND Use substr(c_phone@0, 1, 2) IN (SET) ([Literal { value: Utf8("13") }, Literal { value: Utf8("31") }, Literal { value: Utf8("23") }, Literal { value: Utf8("29") }, Literal { value: Utf8("30") }, Literal { value: Utf8("18") }, Literal { value: Utf8("17") }]) +--------------------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +----------------------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/customer.tbl]]}, projection=[c_phone, c_acctbal], has_header=false query TIR select From 3914e1c82d0ba426db155d6bc4eb5b792a5ae7df Mon Sep 17 00:00:00 2001 From: Eduard Karacharov Date: Wed, 17 Apr 2024 21:49:13 +0300 Subject: [PATCH 2/5] renamed variables and struct fields --- .../physical-plan/src/joins/nested_loop_join.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs b/datafusion/physical-plan/src/joins/nested_loop_join.rs index 796b8602b22f..7391d60f1c2b 100644 --- a/datafusion/physical-plan/src/joins/nested_loop_join.rs +++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs @@ -66,7 +66,7 @@ struct JoinLeftData { /// Shared bitmap builder for visited left indices bitmap: SharedBitmapBuilder, /// Counter of running probe-threads, potentially able to update `bitmap` - running_threads_counter: AtomicUsize, + probe_threads_counter: AtomicUsize, /// Memory reservation for tracking batch and bitmap /// Cleared on `JoinLeftData` drop #[allow(dead_code)] @@ -77,13 +77,13 @@ impl JoinLeftData { fn new( batch: RecordBatch, bitmap: SharedBitmapBuilder, - running_threads_counter: AtomicUsize, + probe_threads_counter: AtomicUsize, reservation: MemoryReservation, ) -> Self { Self { batch, bitmap, - running_threads_counter, + probe_threads_counter, reservation, } } @@ -99,7 +99,7 @@ impl JoinLeftData { /// Decrements counter of running threads, and returns `true` /// if caller is the last running thread fn report_probe_completed(&self) -> bool { - self.running_threads_counter.fetch_sub(1, Ordering::Relaxed) == 1 + self.probe_threads_counter.fetch_sub(1, Ordering::Relaxed) == 1 } } @@ -366,7 +366,7 @@ async fn collect_left_input( join_metrics: BuildProbeJoinMetrics, reservation: MemoryReservation, with_visited_left_side: bool, - probing_threads: usize, + probe_threads_count: usize, ) -> Result { let schema = input.schema(); let merge = if input.output_partitioning().partition_count() != 1 { @@ -417,7 +417,7 @@ async fn collect_left_input( Ok(JoinLeftData::new( merged_batch, Mutex::new(visited_left_side), - AtomicUsize::new(probing_threads), + AtomicUsize::new(probe_threads_count), reservation, )) } From 45f170d02dbb0cddaf9cabb0976dcc415358d086 Mon Sep 17 00:00:00 2001 From: Eduard Karacharov Date: Mon, 22 Apr 2024 20:25:34 +0300 Subject: [PATCH 3/5] fixed nl join filter expression in tests --- datafusion/core/src/physical_optimizer/join_selection.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/core/src/physical_optimizer/join_selection.rs b/datafusion/core/src/physical_optimizer/join_selection.rs index 9d6c576f8904..95bfa3a18fc3 100644 --- a/datafusion/core/src/physical_optimizer/join_selection.rs +++ b/datafusion/core/src/physical_optimizer/join_selection.rs @@ -848,7 +848,7 @@ mod tests_statistical { let expression = Arc::new(BinaryExpr::new( Arc::new(Column::new_with_schema("big_col", &intermediate_schema).unwrap()), Operator::Gt, - Arc::new(Column::new_with_schema("big_col", &intermediate_schema).unwrap()), + Arc::new(Column::new_with_schema("small_col", &intermediate_schema).unwrap()), )) as _; Some(JoinFilter::new( expression, From 90a899ddbb7791fddd7d332c437c8294608de2d2 Mon Sep 17 00:00:00 2001 From: Eduard Karacharov <13005055+korowa@users.noreply.github.com> Date: Mon, 22 Apr 2024 20:31:40 +0300 Subject: [PATCH 4/5] Update datafusion/physical-plan/src/joins/nested_loop_join.rs Co-authored-by: Andrew Lamb --- datafusion/physical-plan/src/joins/nested_loop_join.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs b/datafusion/physical-plan/src/joins/nested_loop_join.rs index 7391d60f1c2b..9d6a1c8101e8 100644 --- a/datafusion/physical-plan/src/joins/nested_loop_join.rs +++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs @@ -119,7 +119,7 @@ impl JoinLeftData { /// #### 2. Probe phase /// Sequentially polling batches from the probe-side input and processing them according to the /// following logic: -/// - apply join filter (`ON` clause) to Carthesian product of probe batch adn build side data +/// - apply join filter (`ON` clause) to Cartesian product of probe batch adn build side data /// -- filter evaluation is executed once per build-side data row /// - update shared bitmap of joined ("visited") build-side row indices, if required -- allows /// to produce unmatched build-side data in case of e.g. LEFT/FULL JOIN after probing phase From cb31fa80f705ffbdfbcb400c6a27dc5849d686c4 Mon Sep 17 00:00:00 2001 From: Eduard Karacharov Date: Mon, 22 Apr 2024 20:33:13 +0300 Subject: [PATCH 5/5] typo fixed --- datafusion/physical-plan/src/joins/nested_loop_join.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs b/datafusion/physical-plan/src/joins/nested_loop_join.rs index 9d6a1c8101e8..5fccd63029a1 100644 --- a/datafusion/physical-plan/src/joins/nested_loop_join.rs +++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs @@ -119,7 +119,7 @@ impl JoinLeftData { /// #### 2. Probe phase /// Sequentially polling batches from the probe-side input and processing them according to the /// following logic: -/// - apply join filter (`ON` clause) to Cartesian product of probe batch adn build side data +/// - apply join filter (`ON` clause) to Cartesian product of probe batch and build side data /// -- filter evaluation is executed once per build-side data row /// - update shared bitmap of joined ("visited") build-side row indices, if required -- allows /// to produce unmatched build-side data in case of e.g. LEFT/FULL JOIN after probing phase