Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pass booleans by value instead of by reference #5487

Merged
merged 4 commits into from
Mar 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions datafusion/core/src/physical_optimizer/dist_enforcement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ fn adjust_input_keys_ordering(
filter.clone(),
join_type,
PartitionMode::Partitioned,
null_equals_null,
*null_equals_null,
)?) as Arc<dyn ExecutionPlan>)
};
Ok(Some(reorder_partitioned_join_keys(
Expand Down Expand Up @@ -606,7 +606,7 @@ fn reorder_join_keys_to_inputs(
filter.clone(),
join_type,
PartitionMode::Partitioned,
null_equals_null,
*null_equals_null,
)?))
} else {
Ok(plan)
Expand Down Expand Up @@ -1086,7 +1086,7 @@ mod tests {
None,
join_type,
PartitionMode::Partitioned,
&false,
false,
)
.unwrap(),
)
Expand Down
14 changes: 7 additions & 7 deletions datafusion/core/src/physical_optimizer/join_selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,7 @@ mod tests {
None,
&JoinType::Left,
PartitionMode::CollectLeft,
&false,
false,
)
.unwrap();

Expand Down Expand Up @@ -536,7 +536,7 @@ mod tests {
None,
&JoinType::Left,
PartitionMode::CollectLeft,
&false,
false,
)
.unwrap();

Expand Down Expand Up @@ -587,7 +587,7 @@ mod tests {
None,
&join_type,
PartitionMode::Partitioned,
&false,
false,
)
.unwrap();

Expand Down Expand Up @@ -652,7 +652,7 @@ mod tests {
None,
&JoinType::Inner,
PartitionMode::CollectLeft,
&false,
false,
)
.unwrap();
let child_schema = child_join.schema();
Expand All @@ -668,7 +668,7 @@ mod tests {
None,
&JoinType::Left,
PartitionMode::CollectLeft,
&false,
false,
)
.unwrap();

Expand Down Expand Up @@ -705,7 +705,7 @@ mod tests {
None,
&JoinType::Inner,
PartitionMode::CollectLeft,
&false,
false,
)
.unwrap();

Expand Down Expand Up @@ -930,7 +930,7 @@ mod tests {
None,
&JoinType::Inner,
PartitionMode::Auto,
&false,
false,
)
.unwrap();

Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_optimizer/pipeline_fixer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -710,7 +710,7 @@ mod hash_join_tests {
None,
&t.initial_join_type,
t.initial_mode,
&false,
false,
)?;

let initial_hash_join_state = PipelineStatePropagator {
Expand Down
26 changes: 13 additions & 13 deletions datafusion/core/src/physical_plan/joins/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ impl HashJoinExec {
filter: Option<JoinFilter>,
join_type: &JoinType,
partition_mode: PartitionMode,
null_equals_null: &bool,
null_equals_null: bool,
) -> Result<Self> {
let left_schema = left.schema();
let right_schema = right.schema();
Expand Down Expand Up @@ -230,7 +230,7 @@ impl HashJoinExec {
mode: partition_mode,
metrics: ExecutionPlanMetricsSet::new(),
column_indices,
null_equals_null: *null_equals_null,
null_equals_null,
})
}

Expand Down Expand Up @@ -265,8 +265,8 @@ impl HashJoinExec {
}

/// Get null_equals_null
pub fn null_equals_null(&self) -> &bool {
&self.null_equals_null
pub fn null_equals_null(&self) -> bool {
self.null_equals_null
}
}

Expand Down Expand Up @@ -402,7 +402,7 @@ impl ExecutionPlan for HashJoinExec {
self.filter.clone(),
&self.join_type,
self.mode,
&self.null_equals_null,
self.null_equals_null,
)?))
}

Expand Down Expand Up @@ -691,7 +691,7 @@ pub fn build_join_indices(
on_probe: &[Column],
filter: Option<&JoinFilter>,
random_state: &RandomState,
null_equals_null: &bool,
null_equals_null: bool,
hashes_buffer: &mut Vec<u64>,
offset: Option<usize>,
build_side: JoinSide,
Expand Down Expand Up @@ -762,7 +762,7 @@ pub fn build_equal_condition_join_indices(
build_on: &[Column],
probe_on: &[Column],
random_state: &RandomState,
null_equals_null: &bool,
null_equals_null: bool,
hashes_buffer: &mut Vec<u64>,
offset: Option<usize>,
) -> Result<(UInt64Array, UInt32Array)> {
Expand Down Expand Up @@ -804,7 +804,7 @@ pub fn build_equal_condition_join_indices(
row,
&build_join_values,
&keys_values,
*null_equals_null,
null_equals_null,
)? {
build_indices.append(offset_build_index as u64);
probe_indices.append(row as u32);
Expand Down Expand Up @@ -1207,7 +1207,7 @@ impl HashJoinStream {
&self.on_right,
self.filter.as_ref(),
&self.random_state,
&self.null_equals_null,
self.null_equals_null,
&mut hashes_buffer,
None,
JoinSide::Left,
Expand Down Expand Up @@ -1356,7 +1356,7 @@ mod tests {
None,
join_type,
PartitionMode::CollectLeft,
&null_equals_null,
null_equals_null,
)
}

Expand All @@ -1375,7 +1375,7 @@ mod tests {
Some(filter),
join_type,
PartitionMode::CollectLeft,
&null_equals_null,
null_equals_null,
)
}

Expand Down Expand Up @@ -1429,7 +1429,7 @@ mod tests {
None,
join_type,
PartitionMode::Partitioned,
&null_equals_null,
null_equals_null,
)?;

let columns = columns(&join.schema());
Expand Down Expand Up @@ -2680,7 +2680,7 @@ mod tests {
&[Column::new("a", 0)],
&[Column::new("a", 0)],
&random_state,
&false,
false,
&mut vec![0; right.num_rows()],
None,
)?;
Expand Down
20 changes: 10 additions & 10 deletions datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ impl SymmetricHashJoinExec {
on: JoinOn,
filter: JoinFilter,
join_type: &JoinType,
null_equals_null: &bool,
null_equals_null: bool,
) -> Result<Self> {
let left_schema = left.schema();
let right_schema = right.schema();
Expand Down Expand Up @@ -349,7 +349,7 @@ impl SymmetricHashJoinExec {
random_state,
metrics: ExecutionPlanMetricsSet::new(),
column_indices,
null_equals_null: *null_equals_null,
null_equals_null,
})
}

Expand Down Expand Up @@ -379,8 +379,8 @@ impl SymmetricHashJoinExec {
}

/// Get null_equals_null
pub fn null_equals_null(&self) -> &bool {
&self.null_equals_null
pub fn null_equals_null(&self) -> bool {
self.null_equals_null
}
}

Expand Down Expand Up @@ -472,7 +472,7 @@ impl ExecutionPlan for SymmetricHashJoinExec {
self.on.clone(),
self.filter.clone(),
&self.join_type,
&self.null_equals_null,
self.null_equals_null,
)?))
}

Expand Down Expand Up @@ -1054,7 +1054,7 @@ impl OneSideHashJoiner {
probe_offset: usize,
column_indices: &[ColumnIndex],
random_state: &RandomState,
null_equals_null: &bool,
null_equals_null: bool,
) -> Result<Option<RecordBatch>> {
if self.input_buffer.num_rows() == 0 || probe_batch.num_rows() == 0 {
return Ok(Some(RecordBatch::new_empty(schema.clone())));
Expand Down Expand Up @@ -1371,7 +1371,7 @@ impl SymmetricHashJoinStream {
probe_hash_joiner.offset,
&self.column_indices,
&self.random_state,
&self.null_equals_null,
self.null_equals_null,
)?;
// Increment the offset for the probe hash joiner:
probe_hash_joiner.offset += probe_batch.num_rows();
Expand Down Expand Up @@ -1504,7 +1504,7 @@ mod tests {
on,
filter,
join_type,
&null_equals_null,
null_equals_null,
)?;

let mut batches = vec![];
Expand Down Expand Up @@ -1551,7 +1551,7 @@ mod tests {
Some(filter),
join_type,
PartitionMode::Partitioned,
&null_equals_null,
null_equals_null,
)?;

let mut batches = vec![];
Expand Down Expand Up @@ -2465,7 +2465,7 @@ mod tests {
right_side_joiner.offset,
&join_column_indices,
&random_state,
&false,
false,
)?;
assert_eq!(left_side_joiner.visited_rows.is_empty(), should_be_empty);
Ok(())
Expand Down
4 changes: 3 additions & 1 deletion datafusion/core/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -860,6 +860,8 @@ impl DefaultPhysicalPlanner {
schema: join_schema,
..
}) => {
let null_equals_null = *null_equals_null;

// If join has expression equijoin keys, add physical projecton.
let has_expr_join_key = keys.iter().any(|(l, r)| {
!(matches!(l, Expr::Column(_))
Expand Down Expand Up @@ -1030,7 +1032,7 @@ impl DefaultPhysicalPlanner {
join_on,
*join_type,
vec![SortOptions::default(); join_on_len],
*null_equals_null,
null_equals_null,
)?))
}
} else if session_state.config().target_partitions() > 1
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/tests/join_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ async fn run_join_test(
None,
&join_type,
PartitionMode::Partitioned,
&false,
false,
)
.unwrap(),
);
Expand Down
6 changes: 3 additions & 3 deletions datafusion/proto/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,7 @@ impl AsExecutionPlan for PhysicalPlanNode {
filter,
&join_type.into(),
partition_mode,
&hashjoin.null_equals_null,
hashjoin.null_equals_null,
)?))
}
PhysicalPlanType::Union(union) => {
Expand Down Expand Up @@ -827,7 +827,7 @@ impl AsExecutionPlan for PhysicalPlanNode {
on,
join_type: join_type.into(),
partition_mode: partition_mode.into(),
null_equals_null: *exec.null_equals_null(),
null_equals_null: exec.null_equals_null(),
filter,
},
))),
Expand Down Expand Up @@ -1367,7 +1367,7 @@ mod roundtrip_tests {
None,
join_type,
*partition_mode,
&false,
false,
)?))?;
}
}
Expand Down