@@ -237,11 +237,24 @@ PipeSource::PipeSource() {}
237237Status PipeSource::Initialize (Pipe* pipe) {
238238 if (pipe_) return Status::Invalid (" Pipe:" + pipe->PipeName () + " has multiple sinks" );
239239 pipe_ = pipe;
240+ backpressure_source_.AddController (pipe);
240241 return Status::OK ();
241242}
242243
243- void PipeSource::Pause (int32_t counter) { pipe_->Pause (this , counter); }
244- void PipeSource::Resume (int32_t counter) { pipe_->Resume (this , counter); }
244+ void PipeSource::Pause (int32_t counter) {
245+ auto lock = mutex_.Lock ();
246+ if (backpressure_counter < counter) {
247+ backpressure_counter = counter;
248+ backpressure_source_.Pause ();
249+ }
250+ }
251+ void PipeSource::Resume (int32_t counter) {
252+ auto lock = mutex_.Lock ();
253+ if (backpressure_counter < counter) {
254+ backpressure_counter = counter;
255+ backpressure_source_.Resume ();
256+ }
257+ }
245258Status PipeSource::StopProducing () {
246259 if (pipe_) return pipe_->StopProducing (this );
247260 // stopped before initialization
@@ -265,67 +278,20 @@ Pipe::Pipe(ExecPlan* plan, std::string pipe_name,
265278 std::unique_ptr<BackpressureControl> ctrl,
266279 std::function<Status()> stopProducing, Ordering ordering, bool pause_on_any,
267280 bool stop_on_any)
268- : plan_(plan),
281+ : BackpressureCombiner(std::move(ctrl), pause_on_any),
282+ plan_(plan),
269283 ordering_(ordering),
270284 pipe_name_(pipe_name),
271- ctrl_(std::move(ctrl)),
272285 stopProducing_(stopProducing),
273- pause_on_any_(pause_on_any),
274286 stop_on_any_(stop_on_any) {}
275287
276288const Ordering& Pipe::ordering () const { return ordering_; }
277289
278- void Pipe::Pause (PipeSource* output, int counter) {
279- auto lock = mutex_.Lock ();
280- auto & state = state_[output];
281- if (state.backpressure_counter < counter) {
282- state.backpressure_counter = counter;
283- if (!state.paused && !state.stopped ) {
284- state.paused = true ;
285- size_t paused_count = ++paused_count_;
286- if (pause_on_any_) {
287- if (paused_count == 1 ) {
288- ctrl_->Pause ();
289- }
290- } else {
291- if (paused_count == CountSources () - stopped_count_) {
292- ctrl_->Pause ();
293- }
294- }
295- }
296- }
297- }
298-
299- void Pipe::Resume (PipeSource* output, int counter) {
300- auto lock = mutex_.Lock ();
301- auto & state = state_[output];
302- if (state.backpressure_counter < counter) {
303- state.backpressure_counter = counter;
304- DoResume (state);
305- }
306- }
307-
308- void Pipe::DoResume (SourceState& state) {
309- if (state.paused && !state.stopped ) {
310- state.paused = false ;
311- size_t paused_count = --paused_count_;
312- if (pause_on_any_) {
313- if (paused_count == 0 ) {
314- ctrl_->Resume ();
315- }
316- } else {
317- if (paused_count == CountSources () - stopped_count_ - 1 ) {
318- ctrl_->Resume ();
319- }
320- }
321- }
322- }
323-
324290Status Pipe::StopProducing (PipeSource* output) {
325291 auto lock = mutex_.Lock ();
326292 auto & state = state_[output];
327293 DCHECK (!state.stopped );
328- DoResume (state );
294+ BackpressureCombiner::Stop ( );
329295 state.stopped = true ;
330296 size_t stopped_count = ++stopped_count_;
331297 if (stop_on_any_) {
0 commit comments