Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
af3e758
Add BackpressureCombiner
Aug 21, 2025
8b53421
Init atomic
Aug 21, 2025
7bf6b30
Move BackpressureController from Asofasof_join_node and sorted_merge_…
Aug 21, 2025
11d7170
Add new files to meson
Aug 21, 2025
935aaee
Add Wrapper and fix empty weak case
Aug 21, 2025
9789c0e
Do not export template?
Aug 21, 2025
5ce4c6e
Simplify backpressure combiner logic
Aug 22, 2025
ef519da
Update doc
Aug 22, 2025
0689cbc
Include suggestion
gitmodimo Sep 10, 2025
ba14f4c
Include suggestion
gitmodimo Sep 10, 2025
5f2f9ac
Add stop in backpressure combiner
Oct 2, 2025
50d3a02
Merge branch 'BackpressureCombiner' of https://github.com/gitmodimo/a…
Oct 2, 2025
212c331
Merge branch 'main' into BackpressureCombiner
Oct 2, 2025
e5e4e30
Initial implementation od acero pipes
Feb 7, 2025
5aa15a2
Export PipeSource
Feb 10, 2025
4650d9a
Revert "Export PipeSource"
Feb 10, 2025
26f9706
Forward ordering information
Feb 10, 2025
2820d3e
init pointer to fix unconncted pipe detection
Feb 10, 2025
cddf2ca
annotate ovvrided method
Feb 10, 2025
09c4436
fix implicit ordering check
Feb 10, 2025
4f88809
Add method to check for pipe sources present
Feb 10, 2025
e79f7c5
Check schema for matched pieps
Feb 10, 2025
c33dff8
Fix test
Feb 11, 2025
14aebce
Add test multiple schemas
Feb 11, 2025
ef9ae65
Check for duplicate pipe sinks
Feb 11, 2025
aaba1e7
Cleanup and doc
Feb 11, 2025
977355a
sink_node can use sync source
Feb 11, 2025
aa67658
Fix implicit infer from ordered
Feb 14, 2025
9721f27
Handle StopProcessing over pipe
Feb 17, 2025
7648f87
Expose stop strategy and add pause strategy
Feb 17, 2025
92e4409
Fix uninitialized atomics
Apr 11, 2025
8e37b69
Respect StopProducing for pipe_sources. Unpause on stop.
Apr 14, 2025
1935d79
Propagate ordering through pipe_tee
Apr 15, 2025
eb7d305
Rebase fix
May 13, 2025
aac71da
Remove unneded include
May 13, 2025
1a7d605
Fix meson build
Jul 4, 2025
15930a4
Remove redundant move
Jul 4, 2025
36000df
Use arrow mutex implementation
Jul 4, 2025
9a7812f
Update backpressure counter
Jul 11, 2025
dfcd2c7
fix namespace
Aug 25, 2025
d751490
Use BackpressureCombiner
Oct 2, 2025
f1a7f37
Move unpause when stop logic to PipeSource
Oct 2, 2025
fabcacc
Add missing declaration
Oct 2, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cpp/src/arrow/acero/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ set(ARROW_ACERO_REQUIRED_DEPENDENCIES Arrow ArrowCompute)

set(ARROW_ACERO_SRCS
accumulation_queue.cc
backpressure.cc
scalar_aggregate_node.cc
groupby_aggregate_node.cc
aggregate_internal.cc
Expand Down Expand Up @@ -60,6 +61,7 @@ set(ARROW_ACERO_SRCS
time_series_util.cc
tpch_node.cc
union_node.cc
pipe_node.cc
util.cc)

append_runtime_avx2_src(ARROW_ACERO_SRCS bloom_filter_avx2.cc)
Expand Down Expand Up @@ -173,6 +175,7 @@ function(add_arrow_acero_test REL_TEST_NAME)
${ARG_UNPARSED_ARGUMENTS})
endfunction()

add_arrow_acero_test(backpressure_test SOURCES backpressure_test.cc)
add_arrow_acero_test(plan_test SOURCES plan_test.cc test_nodes_test.cc)
add_arrow_acero_test(source_node_test SOURCES source_node_test.cc)
add_arrow_acero_test(fetch_node_test SOURCES fetch_node_test.cc)
Expand All @@ -183,6 +186,7 @@ add_arrow_acero_test(pivot_longer_node_test SOURCES pivot_longer_node_test.cc)

add_arrow_acero_test(asof_join_node_test SOURCES asof_join_node_test.cc)
add_arrow_acero_test(sorted_merge_node_test SOURCES sorted_merge_node_test.cc)
add_arrow_acero_test(pipe_node_test SOURCES pipe_node_test.cc)

add_arrow_acero_test(tpch_node_test SOURCES tpch_node_test.cc)
add_arrow_acero_test(union_node_test SOURCES union_node_test.cc)
Expand Down
16 changes: 1 addition & 15 deletions cpp/src/arrow/acero/asof_join_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
#ifndef NDEBUG
# include "arrow/compute/function_internal.h"
#endif
#include "arrow/acero/backpressure.h"
#include "arrow/acero/time_series_util.h"
#include "arrow/compute/key_hash_internal.h"
#include "arrow/compute/light_array_internal.h"
Expand Down Expand Up @@ -459,21 +460,6 @@ class KeyHasher {
arrow::util::TempVectorStack stack_;
};

class BackpressureController : public BackpressureControl {
public:
BackpressureController(ExecNode* node, ExecNode* output,
std::atomic<int32_t>& backpressure_counter)
: node_(node), output_(output), backpressure_counter_(backpressure_counter) {}

void Pause() override { node_->PauseProducing(output_, ++backpressure_counter_); }
void Resume() override { node_->ResumeProducing(output_, ++backpressure_counter_); }

private:
ExecNode* node_;
ExecNode* output_;
std::atomic<int32_t>& backpressure_counter_;
};

class InputState : public util::SerialSequencingQueue::Processor {
// InputState corresponds to an input
// Input record batches are queued up in InputState until processed and
Expand Down
108 changes: 108 additions & 0 deletions cpp/src/arrow/acero/backpressure.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "arrow/acero/backpressure.h"
#include "arrow/acero/exec_plan.h"

namespace arrow::acero {

BackpressureController::BackpressureController(ExecNode* node, ExecNode* output,
std::atomic<int32_t>& backpressure_counter)
: node_(node), output_(output), backpressure_counter_(backpressure_counter) {}
void BackpressureController::Pause() {
node_->PauseProducing(output_, ++backpressure_counter_);
}
void BackpressureController::Resume() {
node_->ResumeProducing(output_, ++backpressure_counter_);
}

BackpressureCombiner::BackpressureCombiner(
std::unique_ptr<BackpressureControl> backpressure_control, bool pause_on_any)
: pause_on_any_(pause_on_any),
backpressure_control_(std::move(backpressure_control)) {}

// Called from Source nodes
void BackpressureCombiner::Pause(Source* output) {
std::lock_guard<std::mutex> lg(mutex_);
if (!paused_[output]) {
paused_[output] = true;
paused_count_++;
UpdatePauseStateUnlocked();
}
}

// Called from Source nodes
void BackpressureCombiner::Resume(Source* output) {
std::lock_guard<std::mutex> lg(mutex_);
if (paused_.find(output) == paused_.end()) {
paused_[output] = false;
UpdatePauseStateUnlocked();
} else if (paused_[output]) {
paused_[output] = false;
paused_count_--;
UpdatePauseStateUnlocked();
}
}

void BackpressureCombiner::Stop() {
std::lock_guard<std::mutex> lg(mutex_);
stopped = true;
backpressure_control_->Resume();
paused = false;
}

void BackpressureCombiner::UpdatePauseStateUnlocked() {
if (stopped) return;
bool should_be_paused = (paused_count_ > 0);
if (!pause_on_any_) {
should_be_paused = should_be_paused && (paused_count_ == paused_.size());
}
if (should_be_paused) {
if (!paused) {
backpressure_control_->Pause();
paused = true;
}
} else {
if (paused) {
backpressure_control_->Resume();
paused = false;
}
}
}

BackpressureCombiner::Source::Source(BackpressureCombiner* ctrl) {
if (ctrl) {
AddController(ctrl);
}
}

void BackpressureCombiner::Source::AddController(BackpressureCombiner* ctrl) {
ctrl->Resume(this); // populate map in controller
connections_.push_back(ctrl);
}
void BackpressureCombiner::Source::Pause() {
for (auto& conn_ : connections_) {
conn_->Pause(this);
}
}
void BackpressureCombiner::Source::Resume() {
for (auto& conn_ : connections_) {
conn_->Resume(this);
}
}

} // namespace arrow::acero
99 changes: 99 additions & 0 deletions cpp/src/arrow/acero/backpressure.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <mutex>

#include "arrow/acero/options.h"

namespace arrow::acero {

// Generic backpressure controller for ExecNode
class ARROW_ACERO_EXPORT BackpressureController : public BackpressureControl {
public:
BackpressureController(ExecNode* node, ExecNode* output,
std::atomic<int32_t>& backpressure_counter);

void Pause() override;
void Resume() override;

private:
ExecNode* node_;
ExecNode* output_;
std::atomic<int32_t>& backpressure_counter_;
};

template <typename T>
class BackpressureControlWrapper : public BackpressureControl {
public:
explicit BackpressureControlWrapper(T* obj) : obj_(obj) {}

void Pause() override { obj_->Pause(); }
void Resume() override { obj_->Resume(); }

private:
T* obj_;
};

// Provides infrastructure of combining multiple backpressure sources and propagate the
// result into BackpressureControl There are two logic scheme of backpressure:
// 1. Default pause_on_any=true - pause on any source is propagated - OR logic
// 2. pause_on_any=false - pause is propagated only when all sources are paused - AND
// logic
class ARROW_ACERO_EXPORT BackpressureCombiner {
public:
explicit BackpressureCombiner(std::unique_ptr<BackpressureControl> backpressure_control,
bool pause_on_any = true);

// Instances of Source can be used as usual BackpresureControl.
// That means that BackpressureCombiner::Source can use another BackpressureCombiner
// as backpressure_control
// This enabled building more complex backpressure logic using AND/OR operations.
// Source can also be connected with more BackpressureCombiners to facilitate
// propagation of backpressure to multiple inputs.
class ARROW_ACERO_EXPORT Source : public BackpressureControl {
public:
// strong - strong_connection=true
// weak - strong_connection=false
explicit Source(BackpressureCombiner* ctrl = nullptr);
void AddController(BackpressureCombiner* ctrl);
void Pause() override;
void Resume() override;

private:
std::vector<BackpressureCombiner*> connections_;
};

void Stop();

private:
friend class Source;
void Pause(Source* output);
void Resume(Source* output);

void UpdatePauseStateUnlocked();
bool pause_on_any_;
std::unique_ptr<BackpressureControl> backpressure_control_;
std::mutex mutex_;
std::unordered_map<Source*, bool> paused_;
size_t paused_count_{0};
bool paused{false};
bool stopped{false};
};

} // namespace arrow::acero
128 changes: 128 additions & 0 deletions cpp/src/arrow/acero/backpressure_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include <gtest/gtest.h>

#include "arrow/acero/backpressure.h"

namespace arrow {
namespace acero {

class MonitorBackpressureControl : public acero::BackpressureControl {
public:
explicit MonitorBackpressureControl(std::atomic<bool>& paused) : paused(paused) {}
virtual void Pause() { paused = true; }
virtual void Resume() { paused = false; }
std::atomic<bool>& paused;
};

TEST(BackpressureCombiner, Basic) {
std::atomic<bool> paused{false};
BackpressureCombiner or_combiner(std::make_unique<MonitorBackpressureControl>(paused));
BackpressureCombiner::Source strong_source1(&or_combiner);
BackpressureCombiner::Source strong_source2;
strong_source2.AddController(&or_combiner);

BackpressureCombiner and_combiner(
std::make_unique<BackpressureCombiner::Source>(&or_combiner),
/*pause_on_any=*/false);
BackpressureCombiner::Source weak_source1(&and_combiner);
BackpressureCombiner::Source weak_source2;
weak_source2.AddController(&and_combiner);

// Any strong causes pause
ASSERT_FALSE(paused);
strong_source1.Pause();
ASSERT_TRUE(paused);
strong_source2.Pause();
ASSERT_TRUE(paused);
strong_source1.Resume();
ASSERT_TRUE(paused);
strong_source2.Resume();
ASSERT_FALSE(paused);

// All weak cause pause
ASSERT_FALSE(paused);
weak_source1.Pause();
ASSERT_FALSE(paused);
weak_source2.Pause();
ASSERT_TRUE(paused);
weak_source1.Resume();
ASSERT_FALSE(paused);
weak_source2.Resume();
ASSERT_FALSE(paused);

// mixed use
strong_source1.Pause();
ASSERT_TRUE(paused);

ASSERT_TRUE(paused);
weak_source1.Pause();
ASSERT_TRUE(paused);
weak_source2.Pause();

strong_source1.Resume();
ASSERT_TRUE(paused);

weak_source1.Resume();
ASSERT_FALSE(paused);
weak_source2.Resume();
ASSERT_FALSE(paused);
}

TEST(BackpressureCombiner, OnlyStrong) {
std::atomic<bool> paused{false};
BackpressureCombiner combiner(std::make_unique<MonitorBackpressureControl>(paused));
BackpressureCombiner::Source strong_source1(&combiner);
BackpressureCombiner::Source strong_source2;
strong_source2.AddController(&combiner);

// Any strong causes pause
ASSERT_FALSE(paused);
strong_source1.Pause();
ASSERT_TRUE(paused);
strong_source2.Pause();
ASSERT_TRUE(paused);
strong_source1.Resume();
ASSERT_TRUE(paused);
strong_source2.Resume();
ASSERT_FALSE(paused);
}

TEST(BackpressureCombiner, OnlyWeak) {
std::atomic<bool> paused{false};
BackpressureCombiner combiner(std::make_unique<MonitorBackpressureControl>(paused),
false);

BackpressureCombiner::Source weak_source1(&combiner);
BackpressureCombiner::Source weak_source2;
weak_source2.AddController(&combiner);

// All weak cause pause
ASSERT_FALSE(paused);
weak_source1.Pause();
ASSERT_FALSE(paused);
weak_source2.Pause();
ASSERT_TRUE(paused);
weak_source1.Resume();
ASSERT_FALSE(paused);
weak_source2.Resume();
ASSERT_FALSE(paused);
}

} // namespace acero
} // namespace arrow
Loading
Loading