@@ -113,20 +113,23 @@ class PipeSinkBackpressureControl : public BackpressureControl {
113113
114114class PipeSinkNode : public ExecNode {
115115 public:
116- PipeSinkNode (ExecPlan* plan, std::vector<ExecNode*> inputs, std::string pipe_name)
116+ PipeSinkNode (ExecPlan* plan, std::vector<ExecNode*> inputs, std::string pipe_name,
117+ bool pause_on_any, bool stop_on_any)
117118 : ExecNode(plan, inputs, /* input_labels=*/ {pipe_name}, {}) {
118119 pipe_ = std::make_shared<Pipe>(
119120 plan, std::move (pipe_name),
120121 std::make_unique<PipeSinkBackpressureControl>(inputs[0 ], this ),
121- [this ]() { return StopProducing (); }, inputs[0 ]->ordering ());
122+ [this ]() { return StopProducing (); }, inputs[0 ]->ordering (), pause_on_any,
123+ stop_on_any);
122124 }
123125
124126 static Result<ExecNode*> Make (ExecPlan* plan, std::vector<ExecNode*> inputs,
125127 const ExecNodeOptions& options) {
126128 RETURN_NOT_OK (ValidateExecNodeInputs (plan, inputs, 1 , " PipeSinkNode" ));
127129 const auto & pipe_tee_options = checked_cast<const PipeSinkNodeOptions&>(options);
128- return plan->EmplaceNode <PipeSinkNode>(plan, std::move (inputs),
129- pipe_tee_options.pipe_name );
130+ return plan->EmplaceNode <PipeSinkNode>(
131+ plan, std::move (inputs), pipe_tee_options.pipe_name ,
132+ pipe_tee_options.pause_on_any , pipe_tee_options.stop_on_any );
130133 }
131134 static const char kKindName [];
132135 const char * kind_name () const override { return kKindName ; }
@@ -172,8 +175,9 @@ const char PipeSinkNode::kKindName[] = "PipeSinkNode";
172175
173176class PipeTeeNode : public PipeSource , public PipeSinkNode {
174177 public:
175- PipeTeeNode (ExecPlan* plan, std::vector<ExecNode*> inputs, std::string pipe_name)
176- : PipeSinkNode(plan, inputs, pipe_name) {
178+ PipeTeeNode (ExecPlan* plan, std::vector<ExecNode*> inputs, std::string pipe_name,
179+ bool pause_on_any, bool stop_on_any)
180+ : PipeSinkNode(plan, inputs, pipe_name, pause_on_any, stop_on_any) {
177181 output_schema_ = inputs[0 ]->output_schema ();
178182 auto st = PipeSinkNode::pipe_->addSyncSource (this );
179183 if (ARROW_PREDICT_FALSE (!st.ok ())) { // this should never happen
@@ -185,8 +189,9 @@ class PipeTeeNode : public PipeSource, public PipeSinkNode {
185189 const ExecNodeOptions& options) {
186190 RETURN_NOT_OK (ValidateExecNodeInputs (plan, inputs, 1 , " PipeTeeNode" ));
187191 const auto & pipe_tee_options = checked_cast<const PipeSinkNodeOptions&>(options);
188- return plan->EmplaceNode <PipeTeeNode>(plan, std::move (inputs),
189- pipe_tee_options.pipe_name );
192+ return plan->EmplaceNode <PipeTeeNode>(
193+ plan, std::move (inputs), pipe_tee_options.pipe_name ,
194+ pipe_tee_options.pause_on_any , pipe_tee_options.stop_on_any );
190195 }
191196 static const char kKindName [];
192197 const char * kind_name () const override { return kKindName ; }
@@ -255,12 +260,14 @@ Status PipeSource::Validate(const Ordering& ordering) {
255260
256261Pipe::Pipe (ExecPlan* plan, std::string pipe_name,
257262 std::unique_ptr<BackpressureControl> ctrl,
258- std::function<Status()> stopProducing, Ordering ordering, bool stop_on_any)
263+ std::function<Status()> stopProducing, Ordering ordering, bool pause_on_any,
264+ bool stop_on_any)
259265 : plan_(plan),
260266 ordering_(ordering),
261267 pipe_name_(pipe_name),
262268 ctrl_(std::move(ctrl)),
263269 stopProducing_(stopProducing),
270+ pause_on_any_(pause_on_any),
264271 stop_on_any_(stop_on_any) {}
265272
266273const Ordering& Pipe::ordering () const { return ordering_; }
@@ -269,8 +276,15 @@ void Pipe::Pause(PipeSource* output, int counter) {
269276 std::lock_guard<std::mutex> lg (mutex_);
270277 if (!paused_[output]) {
271278 paused_[output] = true ;
272- if (0 == paused_count_++) {
273- ctrl_->Pause ();
279+ size_t paused_count = ++paused_count_;
280+ if (pause_on_any_) {
281+ if (paused_count == 1 ) {
282+ ctrl_->Pause ();
283+ }
284+ } else {
285+ if (paused_count == CountSources ()) {
286+ ctrl_->Pause ();
287+ }
274288 }
275289 }
276290}
@@ -279,8 +293,16 @@ void Pipe::Resume(PipeSource* output, int counter) {
279293 std::lock_guard<std::mutex> lg (mutex_);
280294 if (paused_[output]) {
281295 paused_[output] = false ;
282- if (0 == --paused_count_) {
283- ctrl_->Resume ();
296+
297+ size_t paused_count = --paused_count_;
298+ if (pause_on_any_) {
299+ if (paused_count == 0 ) {
300+ ctrl_->Resume ();
301+ }
302+ } else {
303+ if (paused_count == CountSources () - 1 ) {
304+ ctrl_->Resume ();
305+ }
284306 }
285307 }
286308}
0 commit comments