Skip to content

Commit

Permalink
GH-35838: [C++] Fix asof join backpresure (#35878)
Browse files Browse the repository at this point in the history
### Rationale for this change

To fix a bug in asof join backpresure handling.

### What changes are included in this PR?

This is reverting a bug introduced in GH-34391 on this line that breaks asof join backpresure
https://github.com/apache/arrow/pull/34392/files#diff-5493b6ae7ea2a4d5cfb581034c076e9c4be7608382168de6d1301ef67b6c01eeR1410

### Are these changes tested?
No. However code change simply reverts to the state before the bug was introduced in GH-34391 and therefore should be pretty safe (we have tested that the code before GH-34391 is working). In the meantime @ rtpsw is working on adding tests around this but I would like to merge this to unblock internal  issues around this.

### Are there any user-facing changes?

* Closes: #35838

Authored-by: Li Jin <ice.xelloss@gmail.com>
Signed-off-by: Li Jin <ice.xelloss@gmail.com>
  • Loading branch information
icexelloss authored Jun 1, 2023
1 parent 61692b6 commit 3fe4a31
Showing 1 changed file with 3 additions and 3 deletions.
6 changes: 3 additions & 3 deletions cpp/src/arrow/acero/asof_join_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -668,7 +668,7 @@ class InputState {

static Result<std::unique_ptr<InputState>> Make(
size_t index, TolType tolerance, bool must_hash, bool may_rehash,
KeyHasher* key_hasher, AsofJoinNode* node, ExecNode* output,
KeyHasher* key_hasher, ExecNode* node, AsofJoinNode* output,
std::atomic<int32_t>& backpressure_counter,
const std::shared_ptr<arrow::Schema>& schema, const col_index_t time_col_index,
const std::vector<col_index_t>& key_col_index) {
Expand All @@ -679,7 +679,7 @@ class InputState {
BackpressureHandler::Make(low_threshold, high_threshold,
std::move(backpressure_control)));
return std::make_unique<InputState>(index, tolerance, must_hash, may_rehash,
key_hasher, node, std::move(handler), schema,
key_hasher, output, std::move(handler), schema,
time_col_index, key_col_index);
}

Expand Down Expand Up @@ -1407,7 +1407,7 @@ class AsofJoinNode : public ExecNode {
ARROW_ASSIGN_OR_RAISE(
auto input_state,
InputState::Make(i, tolerance_, must_hash_, may_rehash_, key_hashers_[i].get(),
this, inputs[i], backpressure_counter_,
inputs[i], this, backpressure_counter_,
inputs[i]->output_schema(), indices_of_on_key_[i],
indices_of_by_key_[i]));
state_.push_back(std::move(input_state));
Expand Down

0 comments on commit 3fe4a31

Please sign in to comment.