From 108af70c8d3c1fedc8b6773a036a904d36ea252a Mon Sep 17 00:00:00 2001 From: Li Jin Date: Thu, 1 Jun 2023 14:48:01 -0400 Subject: [PATCH] GH-35838: Fix asof join backpresure --- cpp/src/arrow/acero/asof_join_node.cc | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/cpp/src/arrow/acero/asof_join_node.cc b/cpp/src/arrow/acero/asof_join_node.cc index 088f2e832dc79..3173bad79df3e 100644 --- a/cpp/src/arrow/acero/asof_join_node.cc +++ b/cpp/src/arrow/acero/asof_join_node.cc @@ -538,7 +538,7 @@ class KeyHasher { class BackpressureController : public BackpressureControl { public: - BackpressureController(ExecNode* node, ExecNode* output, + BackpressureController(ExecNode* node, AsofJoinNode* output, std::atomic& backpressure_counter) : node_(node), output_(output), backpressure_counter_(backpressure_counter) {} @@ -547,7 +547,7 @@ class BackpressureController : public BackpressureControl { private: ExecNode* node_; - ExecNode* output_; + AsofJoinNode* output_; std::atomic& backpressure_counter_; }; @@ -668,7 +668,7 @@ class InputState { static Result> Make( size_t index, TolType tolerance, bool must_hash, bool may_rehash, - KeyHasher* key_hasher, AsofJoinNode* node, ExecNode* output, + KeyHasher* key_hasher, ExecNode* node, AsofJoinNode* output, std::atomic& backpressure_counter, const std::shared_ptr& schema, const col_index_t time_col_index, const std::vector& key_col_index) { @@ -1407,7 +1407,7 @@ class AsofJoinNode : public ExecNode { ARROW_ASSIGN_OR_RAISE( auto input_state, InputState::Make(i, tolerance_, must_hash_, may_rehash_, key_hashers_[i].get(), - this, inputs[i], backpressure_counter_, + inputs[i], this, backpressure_counter_, inputs[i]->output_schema(), indices_of_on_key_[i], indices_of_by_key_[i])); state_.push_back(std::move(input_state));