Skip to content

Commit

Permalink
Pass booleans by value instead of by reference (#5487)
Browse files Browse the repository at this point in the history
* Pass booleans by value instead of by reference

* fix build error

* Fix tests

* clippy
  • Loading branch information
maxburke committed Mar 8, 2023
1 parent deeaa56 commit 9e32de3
Show file tree
Hide file tree
Showing 8 changed files with 41 additions and 39 deletions.
6 changes: 3 additions & 3 deletions datafusion/core/src/physical_optimizer/dist_enforcement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ fn adjust_input_keys_ordering(
filter.clone(),
join_type,
PartitionMode::Partitioned,
null_equals_null,
*null_equals_null,
)?) as Arc<dyn ExecutionPlan>)
};
Ok(Some(reorder_partitioned_join_keys(
Expand Down Expand Up @@ -606,7 +606,7 @@ fn reorder_join_keys_to_inputs(
filter.clone(),
join_type,
PartitionMode::Partitioned,
null_equals_null,
*null_equals_null,
)?))
} else {
Ok(plan)
Expand Down Expand Up @@ -1086,7 +1086,7 @@ mod tests {
None,
join_type,
PartitionMode::Partitioned,
&false,
false,
)
.unwrap(),
)
Expand Down
14 changes: 7 additions & 7 deletions datafusion/core/src/physical_optimizer/join_selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,7 @@ mod tests {
None,
&JoinType::Left,
PartitionMode::CollectLeft,
&false,
false,
)
.unwrap();

Expand Down Expand Up @@ -536,7 +536,7 @@ mod tests {
None,
&JoinType::Left,
PartitionMode::CollectLeft,
&false,
false,
)
.unwrap();

Expand Down Expand Up @@ -587,7 +587,7 @@ mod tests {
None,
&join_type,
PartitionMode::Partitioned,
&false,
false,
)
.unwrap();

Expand Down Expand Up @@ -652,7 +652,7 @@ mod tests {
None,
&JoinType::Inner,
PartitionMode::CollectLeft,
&false,
false,
)
.unwrap();
let child_schema = child_join.schema();
Expand All @@ -668,7 +668,7 @@ mod tests {
None,
&JoinType::Left,
PartitionMode::CollectLeft,
&false,
false,
)
.unwrap();

Expand Down Expand Up @@ -705,7 +705,7 @@ mod tests {
None,
&JoinType::Inner,
PartitionMode::CollectLeft,
&false,
false,
)
.unwrap();

Expand Down Expand Up @@ -930,7 +930,7 @@ mod tests {
None,
&JoinType::Inner,
PartitionMode::Auto,
&false,
false,
)
.unwrap();

Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_optimizer/pipeline_fixer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -710,7 +710,7 @@ mod hash_join_tests {
None,
&t.initial_join_type,
t.initial_mode,
&false,
false,
)?;

let initial_hash_join_state = PipelineStatePropagator {
Expand Down
26 changes: 13 additions & 13 deletions datafusion/core/src/physical_plan/joins/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ impl HashJoinExec {
filter: Option<JoinFilter>,
join_type: &JoinType,
partition_mode: PartitionMode,
null_equals_null: &bool,
null_equals_null: bool,
) -> Result<Self> {
let left_schema = left.schema();
let right_schema = right.schema();
Expand Down Expand Up @@ -230,7 +230,7 @@ impl HashJoinExec {
mode: partition_mode,
metrics: ExecutionPlanMetricsSet::new(),
column_indices,
null_equals_null: *null_equals_null,
null_equals_null,
})
}

Expand Down Expand Up @@ -265,8 +265,8 @@ impl HashJoinExec {
}

/// Get null_equals_null
pub fn null_equals_null(&self) -> &bool {
&self.null_equals_null
pub fn null_equals_null(&self) -> bool {
self.null_equals_null
}
}

Expand Down Expand Up @@ -402,7 +402,7 @@ impl ExecutionPlan for HashJoinExec {
self.filter.clone(),
&self.join_type,
self.mode,
&self.null_equals_null,
self.null_equals_null,
)?))
}

Expand Down Expand Up @@ -691,7 +691,7 @@ pub fn build_join_indices(
on_probe: &[Column],
filter: Option<&JoinFilter>,
random_state: &RandomState,
null_equals_null: &bool,
null_equals_null: bool,
hashes_buffer: &mut Vec<u64>,
offset: Option<usize>,
build_side: JoinSide,
Expand Down Expand Up @@ -762,7 +762,7 @@ pub fn build_equal_condition_join_indices(
build_on: &[Column],
probe_on: &[Column],
random_state: &RandomState,
null_equals_null: &bool,
null_equals_null: bool,
hashes_buffer: &mut Vec<u64>,
offset: Option<usize>,
) -> Result<(UInt64Array, UInt32Array)> {
Expand Down Expand Up @@ -804,7 +804,7 @@ pub fn build_equal_condition_join_indices(
row,
&build_join_values,
&keys_values,
*null_equals_null,
null_equals_null,
)? {
build_indices.append(offset_build_index as u64);
probe_indices.append(row as u32);
Expand Down Expand Up @@ -1207,7 +1207,7 @@ impl HashJoinStream {
&self.on_right,
self.filter.as_ref(),
&self.random_state,
&self.null_equals_null,
self.null_equals_null,
&mut hashes_buffer,
None,
JoinSide::Left,
Expand Down Expand Up @@ -1356,7 +1356,7 @@ mod tests {
None,
join_type,
PartitionMode::CollectLeft,
&null_equals_null,
null_equals_null,
)
}

Expand All @@ -1375,7 +1375,7 @@ mod tests {
Some(filter),
join_type,
PartitionMode::CollectLeft,
&null_equals_null,
null_equals_null,
)
}

Expand Down Expand Up @@ -1429,7 +1429,7 @@ mod tests {
None,
join_type,
PartitionMode::Partitioned,
&null_equals_null,
null_equals_null,
)?;

let columns = columns(&join.schema());
Expand Down Expand Up @@ -2680,7 +2680,7 @@ mod tests {
&[Column::new("a", 0)],
&[Column::new("a", 0)],
&random_state,
&false,
false,
&mut vec![0; right.num_rows()],
None,
)?;
Expand Down
20 changes: 10 additions & 10 deletions datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ impl SymmetricHashJoinExec {
on: JoinOn,
filter: JoinFilter,
join_type: &JoinType,
null_equals_null: &bool,
null_equals_null: bool,
) -> Result<Self> {
let left_schema = left.schema();
let right_schema = right.schema();
Expand Down Expand Up @@ -349,7 +349,7 @@ impl SymmetricHashJoinExec {
random_state,
metrics: ExecutionPlanMetricsSet::new(),
column_indices,
null_equals_null: *null_equals_null,
null_equals_null,
})
}

Expand Down Expand Up @@ -379,8 +379,8 @@ impl SymmetricHashJoinExec {
}

/// Get null_equals_null
pub fn null_equals_null(&self) -> &bool {
&self.null_equals_null
pub fn null_equals_null(&self) -> bool {
self.null_equals_null
}
}

Expand Down Expand Up @@ -472,7 +472,7 @@ impl ExecutionPlan for SymmetricHashJoinExec {
self.on.clone(),
self.filter.clone(),
&self.join_type,
&self.null_equals_null,
self.null_equals_null,
)?))
}

Expand Down Expand Up @@ -1054,7 +1054,7 @@ impl OneSideHashJoiner {
probe_offset: usize,
column_indices: &[ColumnIndex],
random_state: &RandomState,
null_equals_null: &bool,
null_equals_null: bool,
) -> Result<Option<RecordBatch>> {
if self.input_buffer.num_rows() == 0 || probe_batch.num_rows() == 0 {
return Ok(Some(RecordBatch::new_empty(schema.clone())));
Expand Down Expand Up @@ -1371,7 +1371,7 @@ impl SymmetricHashJoinStream {
probe_hash_joiner.offset,
&self.column_indices,
&self.random_state,
&self.null_equals_null,
self.null_equals_null,
)?;
// Increment the offset for the probe hash joiner:
probe_hash_joiner.offset += probe_batch.num_rows();
Expand Down Expand Up @@ -1504,7 +1504,7 @@ mod tests {
on,
filter,
join_type,
&null_equals_null,
null_equals_null,
)?;

let mut batches = vec![];
Expand Down Expand Up @@ -1551,7 +1551,7 @@ mod tests {
Some(filter),
join_type,
PartitionMode::Partitioned,
&null_equals_null,
null_equals_null,
)?;

let mut batches = vec![];
Expand Down Expand Up @@ -2465,7 +2465,7 @@ mod tests {
right_side_joiner.offset,
&join_column_indices,
&random_state,
&false,
false,
)?;
assert_eq!(left_side_joiner.visited_rows.is_empty(), should_be_empty);
Ok(())
Expand Down
4 changes: 3 additions & 1 deletion datafusion/core/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -860,6 +860,8 @@ impl DefaultPhysicalPlanner {
schema: join_schema,
..
}) => {
let null_equals_null = *null_equals_null;

// If join has expression equijoin keys, add physical projecton.
let has_expr_join_key = keys.iter().any(|(l, r)| {
!(matches!(l, Expr::Column(_))
Expand Down Expand Up @@ -1030,7 +1032,7 @@ impl DefaultPhysicalPlanner {
join_on,
*join_type,
vec![SortOptions::default(); join_on_len],
*null_equals_null,
null_equals_null,
)?))
}
} else if session_state.config().target_partitions() > 1
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/tests/join_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ async fn run_join_test(
None,
&join_type,
PartitionMode::Partitioned,
&false,
false,
)
.unwrap(),
);
Expand Down
6 changes: 3 additions & 3 deletions datafusion/proto/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,7 @@ impl AsExecutionPlan for PhysicalPlanNode {
filter,
&join_type.into(),
partition_mode,
&hashjoin.null_equals_null,
hashjoin.null_equals_null,
)?))
}
PhysicalPlanType::Union(union) => {
Expand Down Expand Up @@ -827,7 +827,7 @@ impl AsExecutionPlan for PhysicalPlanNode {
on,
join_type: join_type.into(),
partition_mode: partition_mode.into(),
null_equals_null: *exec.null_equals_null(),
null_equals_null: exec.null_equals_null(),
filter,
},
))),
Expand Down Expand Up @@ -1367,7 +1367,7 @@ mod roundtrip_tests {
None,
join_type,
*partition_mode,
&false,
false,
)?))?;
}
}
Expand Down

0 comments on commit 9e32de3

Please sign in to comment.