Skip to content

Commit

Permalink
apacheGH-35838: Fix asof join backpresure
Browse files Browse the repository at this point in the history
  • Loading branch information
icexelloss committed Jun 1, 2023
1 parent 3299d12 commit 108af70
Showing 1 changed file with 4 additions and 4 deletions.
8 changes: 4 additions & 4 deletions cpp/src/arrow/acero/asof_join_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,7 @@ class KeyHasher {

class BackpressureController : public BackpressureControl {
public:
BackpressureController(ExecNode* node, ExecNode* output,
BackpressureController(ExecNode* node, AsofJoinNode* output,
std::atomic<int32_t>& backpressure_counter)
: node_(node), output_(output), backpressure_counter_(backpressure_counter) {}

Expand All @@ -547,7 +547,7 @@ class BackpressureController : public BackpressureControl {

private:
ExecNode* node_;
ExecNode* output_;
AsofJoinNode* output_;
std::atomic<int32_t>& backpressure_counter_;
};

Expand Down Expand Up @@ -668,7 +668,7 @@ class InputState {

static Result<std::unique_ptr<InputState>> Make(
size_t index, TolType tolerance, bool must_hash, bool may_rehash,
KeyHasher* key_hasher, AsofJoinNode* node, ExecNode* output,
KeyHasher* key_hasher, ExecNode* node, AsofJoinNode* output,
std::atomic<int32_t>& backpressure_counter,
const std::shared_ptr<arrow::Schema>& schema, const col_index_t time_col_index,
const std::vector<col_index_t>& key_col_index) {
Expand Down Expand Up @@ -1407,7 +1407,7 @@ class AsofJoinNode : public ExecNode {
ARROW_ASSIGN_OR_RAISE(
auto input_state,
InputState::Make(i, tolerance_, must_hash_, may_rehash_, key_hashers_[i].get(),
this, inputs[i], backpressure_counter_,
inputs[i], this, backpressure_counter_,
inputs[i]->output_schema(), indices_of_on_key_[i],
indices_of_by_key_[i]));
state_.push_back(std::move(input_state));
Expand Down

0 comments on commit 108af70

Please sign in to comment.