Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(expr): make evaluation async #8229

Merged
merged 10 commits into from
Mar 15, 2023
Merged
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/batch/src/executor/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ impl FilterExecutor {
#[for_await]
for data_chunk in self.child.execute() {
let data_chunk = data_chunk?.compact();
let vis_array = self.expr.eval(&data_chunk)?;
let vis_array = self.expr.eval(&data_chunk).await?;

if let Bool(vis) = vis_array.as_ref() {
// TODO: should we yield masked data chunk directly?
Expand Down
2 changes: 1 addition & 1 deletion src/batch/src/executor/hash_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ impl<K: HashKey + Send + Sync> HashAggExecutor<K> {

// TODO: currently not a vectorized implementation
for state in states {
state.update_single(&chunk, row_id)?
state.update_single(&chunk, row_id).await?
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/batch/src/executor/hop_window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,12 +178,12 @@ impl HopWindowExecutor {
let len = data_chunk.cardinality();
for i in 0..units {
let window_start_col = if output_indices.contains(&window_start_col_index) {
Some(self.window_start_exprs[i].eval(&data_chunk)?)
Some(self.window_start_exprs[i].eval(&data_chunk).await?)
} else {
None
};
let window_end_col = if output_indices.contains(&window_end_col_index) {
Some(self.window_end_exprs[i].eval(&data_chunk)?)
Some(self.window_end_exprs[i].eval(&data_chunk).await?)
} else {
None
};
Expand Down
73 changes: 50 additions & 23 deletions src/batch/src/executor/join/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ impl<K: HashKey> HashJoinExecutor<K> {
#[for_await]
for chunk in Self::do_inner_join(params) {
let mut chunk = chunk?;
chunk.set_visibility(cond.eval(&chunk)?.as_bool().iter().collect());
chunk.set_visibility(cond.eval(&chunk).await?.as_bool().iter().collect());
yield chunk
}
}
Expand Down Expand Up @@ -473,7 +473,8 @@ impl<K: HashKey> HashJoinExecutor<K> {
spilled,
cond.as_ref(),
&mut non_equi_state,
)?
)
.await?
}
}
} else {
Expand All @@ -494,7 +495,8 @@ impl<K: HashKey> HashJoinExecutor<K> {
spilled,
cond.as_ref(),
&mut non_equi_state,
)?
)
.await?
}
}

Expand Down Expand Up @@ -593,7 +595,8 @@ impl<K: HashKey> HashJoinExecutor<K> {
spilled,
cond.as_ref(),
&mut non_equi_state,
)?
)
.await?
}
}
}
Expand All @@ -606,7 +609,8 @@ impl<K: HashKey> HashJoinExecutor<K> {
spilled,
cond.as_ref(),
&mut non_equi_state,
)?
)
.await?
}
}

Expand Down Expand Up @@ -657,7 +661,8 @@ impl<K: HashKey> HashJoinExecutor<K> {
spilled,
cond.as_ref(),
&mut non_equi_state,
)?
)
.await?
}
}
} else if let Some(spilled) = Self::append_one_probe_row(
Expand All @@ -675,7 +680,8 @@ impl<K: HashKey> HashJoinExecutor<K> {
spilled,
cond.as_ref(),
&mut non_equi_state,
)?
)
.await?
}
if let Some(spilled) = remaining_chunk_builder.consume_all() {
yield spilled
Expand Down Expand Up @@ -777,7 +783,8 @@ impl<K: HashKey> HashJoinExecutor<K> {
spilled,
cond.as_ref(),
&mut non_equi_state,
)?
)
.await?
}
}
}
Expand All @@ -787,7 +794,8 @@ impl<K: HashKey> HashJoinExecutor<K> {
spilled,
cond.as_ref(),
&mut non_equi_state,
)?
)
.await?
}
#[for_await]
for spilled in Self::handle_remaining_build_rows_for_right_outer_join(
Expand Down Expand Up @@ -884,7 +892,8 @@ impl<K: HashKey> HashJoinExecutor<K> {
spilled,
cond.as_ref(),
&mut non_equi_state,
)?
)
.await?
}
}
}
Expand All @@ -894,7 +903,8 @@ impl<K: HashKey> HashJoinExecutor<K> {
spilled,
cond.as_ref(),
&mut non_equi_state,
)?
)
.await?
}
#[for_await]
for spilled in Self::handle_remaining_build_rows_for_right_semi_anti_join::<ANTI_JOIN>(
Expand Down Expand Up @@ -1028,7 +1038,8 @@ impl<K: HashKey> HashJoinExecutor<K> {
cond.as_ref(),
&mut left_non_equi_state,
&mut right_non_equi_state,
)?
)
.await?
}
}
} else {
Expand All @@ -1050,7 +1061,8 @@ impl<K: HashKey> HashJoinExecutor<K> {
cond.as_ref(),
&mut left_non_equi_state,
&mut right_non_equi_state,
)?
)
.await?
}
#[for_await]
for spilled in Self::handle_remaining_build_rows_for_right_outer_join(
Expand Down Expand Up @@ -1199,7 +1211,7 @@ impl<K: HashKey> HashJoinExecutor<K> {
///
/// For more information about how `process_*_join_non_equi_condition` work, see their unit
/// tests.
fn process_left_outer_join_non_equi_condition(
async fn process_left_outer_join_non_equi_condition(
chunk: DataChunk,
cond: &dyn Expression,
LeftNonEquiJoinState {
Expand All @@ -1209,7 +1221,7 @@ impl<K: HashKey> HashJoinExecutor<K> {
found_matched,
}: &mut LeftNonEquiJoinState,
) -> Result<DataChunk> {
let filter = cond.eval(&chunk)?.as_bool().iter().collect();
let filter = cond.eval(&chunk).await?.as_bool().iter().collect();
Ok(DataChunkMutator(chunk)
.nullify_build_side_for_non_equi_condition(&filter, *probe_column_count)
.remove_duplicate_rows_for_left_outer_join(
Expand All @@ -1223,7 +1235,7 @@ impl<K: HashKey> HashJoinExecutor<K> {

/// Filters for candidate rows which satisfy `non_equi` predicate.
/// Removes duplicate rows.
fn process_left_semi_anti_join_non_equi_condition<const ANTI_JOIN: bool>(
async fn process_left_semi_anti_join_non_equi_condition<const ANTI_JOIN: bool>(
chunk: DataChunk,
cond: &dyn Expression,
LeftNonEquiJoinState {
Expand All @@ -1233,7 +1245,7 @@ impl<K: HashKey> HashJoinExecutor<K> {
..
}: &mut LeftNonEquiJoinState,
) -> Result<DataChunk> {
let filter = cond.eval(&chunk)?.as_bool().iter().collect();
let filter = cond.eval(&chunk).await?.as_bool().iter().collect();
Ok(DataChunkMutator(chunk)
.remove_duplicate_rows_for_left_semi_anti_join::<ANTI_JOIN>(
&filter,
Expand All @@ -1244,29 +1256,29 @@ impl<K: HashKey> HashJoinExecutor<K> {
.take())
}

fn process_right_outer_join_non_equi_condition(
async fn process_right_outer_join_non_equi_condition(
chunk: DataChunk,
cond: &dyn Expression,
RightNonEquiJoinState {
build_row_ids,
build_row_matched,
}: &mut RightNonEquiJoinState,
) -> Result<DataChunk> {
let filter = cond.eval(&chunk)?.as_bool().iter().collect();
let filter = cond.eval(&chunk).await?.as_bool().iter().collect();
Ok(DataChunkMutator(chunk)
.remove_duplicate_rows_for_right_outer_join(&filter, build_row_ids, build_row_matched)
.take())
}

fn process_right_semi_anti_join_non_equi_condition(
async fn process_right_semi_anti_join_non_equi_condition(
chunk: DataChunk,
cond: &dyn Expression,
RightNonEquiJoinState {
build_row_ids,
build_row_matched,
}: &mut RightNonEquiJoinState,
) -> Result<()> {
let filter = cond.eval(&chunk)?.as_bool().iter().collect();
let filter = cond.eval(&chunk).await?.as_bool().iter().collect();
DataChunkMutator(chunk).remove_duplicate_rows_for_right_semi_anti_join(
&filter,
build_row_ids,
Expand All @@ -1275,13 +1287,13 @@ impl<K: HashKey> HashJoinExecutor<K> {
Ok(())
}

fn process_full_outer_join_non_equi_condition(
async fn process_full_outer_join_non_equi_condition(
chunk: DataChunk,
cond: &dyn Expression,
left_non_equi_state: &mut LeftNonEquiJoinState,
right_non_equi_state: &mut RightNonEquiJoinState,
) -> Result<DataChunk> {
let filter = cond.eval(&chunk)?.as_bool().iter().collect();
let filter = cond.eval(&chunk).await?.as_bool().iter().collect();
Ok(DataChunkMutator(chunk)
.nullify_build_side_for_non_equi_condition(
&filter,
Expand Down Expand Up @@ -2609,6 +2621,7 @@ mod tests {
cond.as_ref(),
&mut state
)
.await
.unwrap()
.compact(),
&expect
Expand Down Expand Up @@ -2638,6 +2651,7 @@ mod tests {
cond.as_ref(),
&mut state
)
.await
.unwrap()
.compact(),
&expect
Expand Down Expand Up @@ -2667,6 +2681,7 @@ mod tests {
cond.as_ref(),
&mut state
)
.await
.unwrap()
.compact(),
&expect
Expand Down Expand Up @@ -2706,6 +2721,7 @@ mod tests {
cond.as_ref(),
&mut state
)
.await
.unwrap()
.compact(),
&expect
Expand All @@ -2732,6 +2748,7 @@ mod tests {
cond.as_ref(),
&mut state
)
.await
.unwrap()
.compact(),
&expect
Expand All @@ -2758,6 +2775,7 @@ mod tests {
cond.as_ref(),
&mut state
)
.await
.unwrap()
.compact(),
&expect
Expand Down Expand Up @@ -2799,6 +2817,7 @@ mod tests {
cond.as_ref(),
&mut state
)
.await
.unwrap()
.compact(),
&expect
Expand Down Expand Up @@ -2827,6 +2846,7 @@ mod tests {
cond.as_ref(),
&mut state
)
.await
.unwrap()
.compact(),
&expect
Expand Down Expand Up @@ -2855,6 +2875,7 @@ mod tests {
cond.as_ref(),
&mut state
)
.await
.unwrap()
.compact(),
&expect
Expand Down Expand Up @@ -2918,6 +2939,7 @@ mod tests {
cond.as_ref(),
&mut state
)
.await
.unwrap()
.compact(),
&expect
Expand Down Expand Up @@ -2958,6 +2980,7 @@ mod tests {
cond.as_ref(),
&mut state
)
.await
.unwrap()
.compact(),
&expect
Expand Down Expand Up @@ -3010,6 +3033,7 @@ mod tests {
cond.as_ref(),
&mut state
)
.await
.is_ok()
);
assert_eq!(state.build_row_ids, Vec::new());
Expand Down Expand Up @@ -3044,6 +3068,7 @@ mod tests {
cond.as_ref(),
&mut state
)
.await
.is_ok()
);
assert_eq!(state.build_row_ids, Vec::new());
Expand Down Expand Up @@ -3105,6 +3130,7 @@ mod tests {
&mut left_state,
&mut right_state,
)
.await
.unwrap()
.compact(),
&expect
Expand Down Expand Up @@ -3152,6 +3178,7 @@ mod tests {
&mut left_state,
&mut right_state,
)
.await
.unwrap()
.compact(),
&expect
Expand Down
Loading