From 108af70c8d3c1fedc8b6773a036a904d36ea252a Mon Sep 17 00:00:00 2001 From: Li Jin Date: Thu, 1 Jun 2023 14:48:01 -0400 Subject: [PATCH 1/3] GH-35838: Fix asof join backpresure --- cpp/src/arrow/acero/asof_join_node.cc | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/cpp/src/arrow/acero/asof_join_node.cc b/cpp/src/arrow/acero/asof_join_node.cc index 088f2e832dc79..3173bad79df3e 100644 --- a/cpp/src/arrow/acero/asof_join_node.cc +++ b/cpp/src/arrow/acero/asof_join_node.cc @@ -538,7 +538,7 @@ class KeyHasher { class BackpressureController : public BackpressureControl { public: - BackpressureController(ExecNode* node, ExecNode* output, + BackpressureController(ExecNode* node, AsofJoinNode* output, std::atomic& backpressure_counter) : node_(node), output_(output), backpressure_counter_(backpressure_counter) {} @@ -547,7 +547,7 @@ class BackpressureController : public BackpressureControl { private: ExecNode* node_; - ExecNode* output_; + AsofJoinNode* output_; std::atomic& backpressure_counter_; }; @@ -668,7 +668,7 @@ class InputState { static Result> Make( size_t index, TolType tolerance, bool must_hash, bool may_rehash, - KeyHasher* key_hasher, AsofJoinNode* node, ExecNode* output, + KeyHasher* key_hasher, ExecNode* node, AsofJoinNode* output, std::atomic& backpressure_counter, const std::shared_ptr& schema, const col_index_t time_col_index, const std::vector& key_col_index) { @@ -1407,7 +1407,7 @@ class AsofJoinNode : public ExecNode { ARROW_ASSIGN_OR_RAISE( auto input_state, InputState::Make(i, tolerance_, must_hash_, may_rehash_, key_hashers_[i].get(), - this, inputs[i], backpressure_counter_, + inputs[i], this, backpressure_counter_, inputs[i]->output_schema(), indices_of_on_key_[i], indices_of_by_key_[i])); state_.push_back(std::move(input_state)); From c38ab4104e73fdc636d18e9b7df4c36d7154e2b3 Mon Sep 17 00:00:00 2001 From: Li Jin Date: Thu, 1 Jun 2023 15:06:04 -0400 Subject: [PATCH 2/3] Revert changes to BackpressureController due to forward declaration issue --- cpp/src/arrow/acero/asof_join_node.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/src/arrow/acero/asof_join_node.cc b/cpp/src/arrow/acero/asof_join_node.cc index 3173bad79df3e..ca874983373a7 100644 --- a/cpp/src/arrow/acero/asof_join_node.cc +++ b/cpp/src/arrow/acero/asof_join_node.cc @@ -538,7 +538,7 @@ class KeyHasher { class BackpressureController : public BackpressureControl { public: - BackpressureController(ExecNode* node, AsofJoinNode* output, + BackpressureController(ExecNode* node, ExecNode* output, std::atomic& backpressure_counter) : node_(node), output_(output), backpressure_counter_(backpressure_counter) {} @@ -547,7 +547,7 @@ class BackpressureController : public BackpressureControl { private: ExecNode* node_; - AsofJoinNode* output_; + ExecNode* output_; std::atomic& backpressure_counter_; }; From 7a7bee0bdb96cd32c272354fc2b986d9d6686c9f Mon Sep 17 00:00:00 2001 From: Li Jin Date: Thu, 1 Jun 2023 15:15:44 -0400 Subject: [PATCH 3/3] Fix bug --- cpp/src/arrow/acero/asof_join_node.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/arrow/acero/asof_join_node.cc b/cpp/src/arrow/acero/asof_join_node.cc index ca874983373a7..b92339b951bcf 100644 --- a/cpp/src/arrow/acero/asof_join_node.cc +++ b/cpp/src/arrow/acero/asof_join_node.cc @@ -679,7 +679,7 @@ class InputState { BackpressureHandler::Make(low_threshold, high_threshold, std::move(backpressure_control))); return std::make_unique(index, tolerance, must_hash, may_rehash, - key_hasher, node, std::move(handler), schema, + key_hasher, output, std::move(handler), schema, time_col_index, key_col_index); }