diff --git a/src/main/java/io/reactivex/rxjava3/core/Flowable.java b/src/main/java/io/reactivex/rxjava3/core/Flowable.java index 73ea226256..5cb5ebd7ed 100644 --- a/src/main/java/io/reactivex/rxjava3/core/Flowable.java +++ b/src/main/java/io/reactivex/rxjava3/core/Flowable.java @@ -17407,6 +17407,10 @@ public final Flowable unsubscribeOn(Scheduler scheduler) { * propagates the notification from the source Publisher. *

* + *

+ * Note that ignoring windows or subscribing later (i.e., on another thread) will result in + * so-called window abandonment where a window will only contain one element. The behavior is + * a tradeoff between no-dataloss and ensuring upstream cancellation can happen. *

*
Backpressure:
*
The operator honors backpressure of its inner and outer subscribers, however, the inner Publisher uses an @@ -17436,6 +17440,11 @@ public final Flowable> window(long count) { * and propagates the notification from the source Publisher. *

* + *

+ * Note that ignoring windows or subscribing later (i.e., on another thread) will result in + * so-called window abandonment where a window may not contain any elements. In this case, subsequent + * elements will be dropped until the condition for the next window boundary is satisfied. The behavior is + * a tradeoff between no-dataloss and ensuring upstream cancellation can happen under some race conditions. *

*
Backpressure:
*
The operator honors backpressure of its inner and outer subscribers, however, the inner Publisher uses an @@ -17468,6 +17477,11 @@ public final Flowable> window(long count, long skip) { * and propagates the notification from the source Publisher. *

* + *

+ * Note that ignoring windows or subscribing later (i.e., on another thread) will result in + * so-called window abandonment where a window may not contain any elements. In this case, subsequent + * elements will be dropped until the condition for the next window boundary is satisfied. The behavior is + * a tradeoff between no-dataloss and ensuring upstream cancellation can happen under some race conditions. *

*
Backpressure:
*
The operator honors backpressure of its inner and outer subscribers, however, the inner Publisher uses an @@ -17506,6 +17520,11 @@ public final Flowable> window(long count, long skip, int bufferSize) * current window and propagates the notification from the source Publisher. *

* + *

+ * Note that ignoring windows or subscribing later (i.e., on another thread) will result in + * so-called window abandonment where a window may not contain any elements. In this case, subsequent + * elements will be dropped until the condition for the next window boundary is satisfied. The behavior is + * a tradeoff for ensuring upstream cancellation can happen under some race conditions. *

*
Backpressure:
*
The operator consumes the source {@code Publisher} in an unbounded manner. @@ -17541,6 +17560,11 @@ public final Flowable> window(long timespan, long timeskip, TimeUnit * current window and propagates the notification from the source Publisher. *

* + *

+ * Note that ignoring windows or subscribing later (i.e., on another thread) will result in + * so-called window abandonment where a window may not contain any elements. In this case, subsequent + * elements will be dropped until the condition for the next window boundary is satisfied. The behavior is + * a tradeoff for ensuring upstream cancellation can happen under some race conditions. *

*
Backpressure:
*
The operator consumes the source {@code Publisher} in an unbounded manner. @@ -17578,6 +17602,11 @@ public final Flowable> window(long timespan, long timeskip, TimeUnit * current window and propagates the notification from the source Publisher. *

* + *

+ * Note that ignoring windows or subscribing later (i.e., on another thread) will result in + * so-called window abandonment where a window may not contain any elements. In this case, subsequent + * elements will be dropped until the condition for the next window boundary is satisfied. The behavior is + * a tradeoff for ensuring upstream cancellation can happen under some race conditions. *

*
Backpressure:
*
The operator consumes the source {@code Publisher} in an unbounded manner. @@ -17622,6 +17651,11 @@ public final Flowable> window(long timespan, long timeskip, TimeUnit * Publisher emits the current window and propagates the notification from the source Publisher. *

* + *

+ * Note that ignoring windows or subscribing later (i.e., on another thread) will result in + * so-called window abandonment where a window may not contain any elements. In this case, subsequent + * elements will be dropped until the condition for the next window boundary is satisfied. The behavior is + * a tradeoff for ensuring upstream cancellation can happen under some race conditions. *

*
Backpressure:
*
The operator consumes the source {@code Publisher} in an unbounded manner. @@ -17656,6 +17690,11 @@ public final Flowable> window(long timespan, TimeUnit unit) { * emits the current window and propagates the notification from the source Publisher. *

* + *

+ * Note that ignoring windows or subscribing later (i.e., on another thread) will result in + * so-called window abandonment where a window may not contain any elements. In this case, subsequent + * elements will be dropped until the condition for the next window boundary is satisfied. The behavior is + * a tradeoff for ensuring upstream cancellation can happen under some race conditions. *

*
Backpressure:
*
The operator consumes the source {@code Publisher} in an unbounded manner. @@ -17694,6 +17733,11 @@ public final Flowable> window(long timespan, TimeUnit unit, * emits the current window and propagates the notification from the source Publisher. *

* + *

+ * Note that ignoring windows or subscribing later (i.e., on another thread) will result in + * so-called window abandonment where a window may not contain any elements. In this case, subsequent + * elements will be dropped until the condition for the next window boundary is satisfied. The behavior is + * a tradeoff for ensuring upstream cancellation can happen under some race conditions. *

*
Backpressure:
*
The operator consumes the source {@code Publisher} in an unbounded manner. @@ -17733,6 +17777,11 @@ public final Flowable> window(long timespan, TimeUnit unit, * Publisher emits the current window and propagates the notification from the source Publisher. *

* + *

+ * Note that ignoring windows or subscribing later (i.e., on another thread) will result in + * so-called window abandonment where a window may not contain any elements. In this case, subsequent + * elements will be dropped until the condition for the next window boundary is satisfied. The behavior is + * a tradeoff for ensuring upstream cancellation can happen under some race conditions. *

*
Backpressure:
*
The operator consumes the source {@code Publisher} in an unbounded manner. @@ -17771,6 +17820,11 @@ public final Flowable> window(long timespan, TimeUnit unit, * current window and propagates the notification from the source Publisher. *

* + *

+ * Note that ignoring windows or subscribing later (i.e., on another thread) will result in + * so-called window abandonment where a window may not contain any elements. In this case, subsequent + * elements will be dropped until the condition for the next window boundary is satisfied. The behavior is + * a tradeoff for ensuring upstream cancellation can happen under some race conditions. *

*
Backpressure:
*
The operator consumes the source {@code Publisher} in an unbounded manner. @@ -17811,6 +17865,11 @@ public final Flowable> window(long timespan, TimeUnit unit, * current window and propagates the notification from the source Publisher. *

* + *

+ * Note that ignoring windows or subscribing later (i.e., on another thread) will result in + * so-called window abandonment where a window may not contain any elements. In this case, subsequent + * elements will be dropped until the condition for the next window boundary is satisfied. The behavior is + * a tradeoff for ensuring upstream cancellation can happen under some race conditions. *

*
Backpressure:
*
The operator consumes the source {@code Publisher} in an unbounded manner. @@ -17853,6 +17912,11 @@ public final Flowable> window(long timespan, TimeUnit unit, * current window and propagates the notification from the source Publisher. *

* + *

+ * Note that ignoring windows or subscribing later (i.e., on another thread) will result in + * so-called window abandonment where a window may not contain any elements. In this case, subsequent + * elements will be dropped until the condition for the next window boundary is satisfied. The behavior is + * a tradeoff for ensuring upstream cancellation can happen under some race conditions. *

*
Backpressure:
*
The operator consumes the source {@code Publisher} in an unbounded manner. diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableWindow.java b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableWindow.java index e3a104662b..f66d38169e 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableWindow.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableWindow.java @@ -23,7 +23,7 @@ import io.reactivex.rxjava3.internal.subscriptions.SubscriptionHelper; import io.reactivex.rxjava3.internal.util.BackpressureHelper; import io.reactivex.rxjava3.plugins.RxJavaPlugins; -import io.reactivex.rxjava3.processors.UnicastProcessor; +import io.reactivex.rxjava3.processors.*; public final class FlowableWindow extends AbstractFlowableWithUpstream> { final long size; @@ -92,13 +92,15 @@ public void onNext(T t) { long i = index; UnicastProcessor w = window; + WindowSubscribeIntercept intercept = null; if (i == 0) { getAndIncrement(); w = UnicastProcessor.create(bufferSize, this); window = w; - downstream.onNext(w); + intercept = new WindowSubscribeIntercept(w); + downstream.onNext(intercept); } i++; @@ -112,6 +114,10 @@ public void onNext(T t) { } else { index = i; } + + if (intercept != null && intercept.tryAbandon()) { + intercept.window.onComplete(); + } } @Override @@ -205,6 +211,7 @@ public void onSubscribe(Subscription s) { public void onNext(T t) { long i = index; + WindowSubscribeIntercept intercept = null; UnicastProcessor w = window; if (i == 0) { getAndIncrement(); @@ -212,7 +219,8 @@ public void onNext(T t) { w = UnicastProcessor.create(bufferSize, this); window = w; - downstream.onNext(w); + intercept = new WindowSubscribeIntercept(w); + downstream.onNext(intercept); } i++; @@ -231,6 +239,10 @@ public void onNext(T t) { } else { index = i; } + + if (intercept != null && intercept.tryAbandon()) { + intercept.window.onComplete(); + } } @Override @@ -352,16 +364,14 @@ public void onNext(T t) { long i = index; + UnicastProcessor newWindow = null; if (i == 0) { if (!cancelled) { getAndIncrement(); - UnicastProcessor w = UnicastProcessor.create(bufferSize, this); - - windows.offer(w); + newWindow = UnicastProcessor.create(bufferSize, this); - queue.offer(w); - drain(); + windows.offer(newWindow); } } @@ -371,6 +381,11 @@ public void onNext(T t) { w.onNext(t); } + if (newWindow != null) { + queue.offer(newWindow); + drain(); + } + long p = produced + 1; if (p == size) { produced = p - skip; @@ -431,39 +446,59 @@ void drain() { final SpscLinkedArrayQueue> q = queue; int missed = 1; + outer: for (;;) { - long r = requested.get(); - long e = 0; + if (cancelled) { + UnicastProcessor up = null; + while ((up = q.poll()) != null) { + up.onComplete(); + } + } else { + long r = requested.get(); + long e = 0; - while (e != r) { - boolean d = done; + while (e != r) { + boolean d = done; - UnicastProcessor t = q.poll(); + UnicastProcessor t = q.poll(); - boolean empty = t == null; + boolean empty = t == null; - if (checkTerminated(d, empty, a, q)) { - return; - } + if (cancelled) { + continue outer; + } - if (empty) { - break; - } + if (checkTerminated(d, empty, a, q)) { + return; + } - a.onNext(t); + if (empty) { + break; + } - e++; - } + WindowSubscribeIntercept intercept = new WindowSubscribeIntercept(t); + a.onNext(intercept); - if (e == r) { - if (checkTerminated(done, q.isEmpty(), a, q)) { - return; + if (intercept.tryAbandon()) { + t.onComplete(); + } + e++; + } + + if (e == r) { + if (cancelled) { + continue outer; + } + + if (checkTerminated(done, q.isEmpty(), a, q)) { + return; + } } - } - if (e != 0L && r != Long.MAX_VALUE) { - requested.addAndGet(-e); + if (e != 0L && r != Long.MAX_VALUE) { + requested.addAndGet(-e); + } } missed = wip.addAndGet(-missed); @@ -474,11 +509,6 @@ void drain() { } boolean checkTerminated(boolean d, boolean empty, Subscriber a, SpscLinkedArrayQueue q) { - if (cancelled) { - q.clear(); - return true; - } - if (d) { Throwable e = error; @@ -520,6 +550,7 @@ public void cancel() { if (once.compareAndSet(false, true)) { run(); } + drain(); } @Override diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableWindowTimed.java b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableWindowTimed.java index 397af690a2..c340c0e0db 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableWindowTimed.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableWindowTimed.java @@ -15,19 +15,18 @@ import java.util.*; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.*; import org.reactivestreams.*; import io.reactivex.rxjava3.core.*; import io.reactivex.rxjava3.core.Scheduler.Worker; -import io.reactivex.rxjava3.disposables.Disposable; import io.reactivex.rxjava3.exceptions.MissingBackpressureException; -import io.reactivex.rxjava3.internal.disposables.*; +import io.reactivex.rxjava3.internal.disposables.SequentialDisposable; import io.reactivex.rxjava3.internal.fuseable.SimplePlainQueue; import io.reactivex.rxjava3.internal.queue.MpscLinkedQueue; -import io.reactivex.rxjava3.internal.subscribers.QueueDrainSubscriber; import io.reactivex.rxjava3.internal.subscriptions.SubscriptionHelper; -import io.reactivex.rxjava3.internal.util.NotificationLite; +import io.reactivex.rxjava3.internal.util.BackpressureHelper; import io.reactivex.rxjava3.processors.UnicastProcessor; import io.reactivex.rxjava3.subscribers.SerializedSubscriber; @@ -74,227 +73,286 @@ protected void subscribeActual(Subscriber> s) { timespan, timeskip, unit, scheduler.createWorker(), bufferSize)); } - static final class WindowExactUnboundedSubscriber - extends QueueDrainSubscriber> - implements FlowableSubscriber, Subscription, Runnable { + abstract static class AbstractWindowSubscriber + extends AtomicInteger + implements FlowableSubscriber, Subscription { + private static final long serialVersionUID = 5724293814035355511L; + + final Subscriber> downstream; + + final SimplePlainQueue queue; + final long timespan; final TimeUnit unit; - final Scheduler scheduler; final int bufferSize; - Subscription upstream; + final AtomicLong requested; + long emitted; - UnicastProcessor window; + volatile boolean done; + Throwable error; - final SequentialDisposable timer = new SequentialDisposable(); + Subscription upstream; - static final Object NEXT = new Object(); + final AtomicBoolean downstreamCancelled; - volatile boolean terminated; + volatile boolean upstreamCancelled; - WindowExactUnboundedSubscriber(Subscriber> actual, long timespan, TimeUnit unit, - Scheduler scheduler, int bufferSize) { - super(actual, new MpscLinkedQueue()); + final AtomicInteger windowCount; + + AbstractWindowSubscriber(Subscriber> actual, long timespan, TimeUnit unit, int bufferSize) { + this.downstream = actual; + this.queue = new MpscLinkedQueue(); this.timespan = timespan; this.unit = unit; - this.scheduler = scheduler; this.bufferSize = bufferSize; + this.requested = new AtomicLong(); + this.downstreamCancelled = new AtomicBoolean(); + this.windowCount = new AtomicInteger(1); } @Override - public void onSubscribe(Subscription s) { + public final void onSubscribe(Subscription s) { if (SubscriptionHelper.validate(this.upstream, s)) { this.upstream = s; - window = UnicastProcessor.create(bufferSize); - - Subscriber> a = downstream; - a.onSubscribe(this); - - long r = requested(); - if (r != 0L) { - a.onNext(window); - if (r != Long.MAX_VALUE) { - produced(1); - } - } else { - cancelled = true; - s.cancel(); - a.onError(new MissingBackpressureException("Could not deliver first window due to lack of requests.")); - return; - } + downstream.onSubscribe(this); - if (!cancelled) { - if (timer.replace(scheduler.schedulePeriodicallyDirect(this, timespan, timespan, unit))) { - s.request(Long.MAX_VALUE); - } - } + createFirstWindow(); } } + abstract void createFirstWindow(); + @Override - public void onNext(T t) { - if (terminated) { - return; - } - if (fastEnter()) { - window.onNext(t); - if (leave(-1) == 0) { - return; - } - } else { - queue.offer(NotificationLite.next(t)); - if (!enter()) { - return; - } - } - drainLoop(); + public final void onNext(T t) { + queue.offer(t); + drain(); } @Override - public void onError(Throwable t) { + public final void onError(Throwable t) { error = t; done = true; - if (enter()) { - drainLoop(); - } - - downstream.onError(t); + drain(); } @Override - public void onComplete() { + public final void onComplete() { done = true; - if (enter()) { - drainLoop(); - } + drain(); + } - downstream.onComplete(); + @Override + public final void request(long n) { + if (SubscriptionHelper.validate(n)) { + BackpressureHelper.add(requested, n); + } } @Override - public void request(long n) { - requested(n); + public final void cancel() { + if (downstreamCancelled.compareAndSet(false, true)) { + windowDone(); + } + } + + final void windowDone() { + if (windowCount.decrementAndGet() == 0) { + cleanupResources(); + upstream.cancel(); + upstreamCancelled = true; + drain(); + } + } + + abstract void cleanupResources(); + + abstract void drain(); + } + + static final class WindowExactUnboundedSubscriber + extends AbstractWindowSubscriber + implements Runnable { + + private static final long serialVersionUID = 1155822639622580836L; + + final Scheduler scheduler; + + UnicastProcessor window; + + final SequentialDisposable timer; + + static final Object NEXT_WINDOW = new Object(); + + final Runnable windowRunnable; + + WindowExactUnboundedSubscriber(Subscriber> actual, long timespan, TimeUnit unit, + Scheduler scheduler, int bufferSize) { + super(actual, timespan, unit, bufferSize); + this.scheduler = scheduler; + this.timer = new SequentialDisposable(); + this.windowRunnable = new WindowRunnable(); } @Override - public void cancel() { - cancelled = true; + void createFirstWindow() { + if (!downstreamCancelled.get()) { + if (requested.get() != 0L) { + windowCount.getAndIncrement(); + window = UnicastProcessor.create(bufferSize, windowRunnable); + + emitted = 1; + + WindowSubscribeIntercept intercept = new WindowSubscribeIntercept(window); + downstream.onNext(intercept); + + timer.replace(scheduler.schedulePeriodicallyDirect(this, timespan, timespan, unit)); + + if (intercept.tryAbandon()) { + window.onComplete(); + } + + upstream.request(Long.MAX_VALUE); + } else { + upstream.cancel(); + downstream.onError(new MissingBackpressureException(missingBackpressureMessage(emitted))); + + cleanupResources(); + upstreamCancelled = true; + } + } } @Override public void run() { - if (cancelled) { - terminated = true; - } - queue.offer(NEXT); - if (enter()) { - drainLoop(); - } + queue.offer(NEXT_WINDOW); + drain(); } - void drainLoop() { + @Override + void drain() { + if (getAndIncrement() != 0) { + return; + } - final SimplePlainQueue q = queue; - final Subscriber> a = downstream; - UnicastProcessor w = window; + final SimplePlainQueue queue = this.queue; + final Subscriber> downstream = this.downstream; + UnicastProcessor window = this.window; int missed = 1; for (;;) { - for (;;) { - boolean term = terminated; // NOPMD - - boolean d = done; - - Object o = q.poll(); - - if (d && (o == null || o == NEXT)) { - window = null; - q.clear(); - Throwable err = error; - if (err != null) { - w.onError(err); + if (upstreamCancelled) { + queue.clear(); + window = null; + this.window = null; + } else { + boolean isDone = done; + Object o = queue.poll(); + boolean isEmpty = o == null; + + if (isDone && isEmpty) { + Throwable ex = error; + if (ex != null) { + if (window != null) { + window.onError(ex); + } + downstream.onError(ex); } else { - w.onComplete(); + if (window != null) { + window.onComplete(); + } + downstream.onComplete(); } - timer.dispose(); - return; - } - - if (o == null) { - break; + cleanupResources(); + upstreamCancelled = true; + continue; } + else if (!isEmpty) { - if (o == NEXT) { - w.onComplete(); - if (!term) { - w = UnicastProcessor.create(bufferSize); - window = w; - - long r = requested(); - if (r != 0L) { - a.onNext(w); - if (r != Long.MAX_VALUE) { - produced(1); - } - } else { + if (o == NEXT_WINDOW) { + if (window != null) { + window.onComplete(); window = null; - queue.clear(); - upstream.cancel(); - a.onError(new MissingBackpressureException("Could not deliver first window due to lack of requests.")); + this.window = null; + } + if (downstreamCancelled.get()) { timer.dispose(); - return; + } else { + if (requested.get() == emitted) { + upstream.cancel(); + cleanupResources(); + upstreamCancelled = true; + + downstream.onError(new MissingBackpressureException(missingBackpressureMessage(emitted))); + } else { + emitted++; + + windowCount.getAndIncrement(); + window = UnicastProcessor.create(bufferSize, windowRunnable); + this.window = window; + + WindowSubscribeIntercept intercept = new WindowSubscribeIntercept(window); + downstream.onNext(intercept); + + if (intercept.tryAbandon()) { + window.onComplete(); + } + } } - } else { - upstream.cancel(); + } else if (window != null) { + @SuppressWarnings("unchecked") + T item = (T)o; + window.onNext(item); } + continue; } - - w.onNext(NotificationLite.getValue(o)); } - missed = leave(-missed); + missed = addAndGet(-missed); if (missed == 0) { break; } } } + + @Override + void cleanupResources() { + timer.dispose(); + } + + final class WindowRunnable implements Runnable { + @Override + public void run() { + windowDone(); + } + } } static final class WindowExactBoundedSubscriber - extends QueueDrainSubscriber> - implements Subscription { - final long timespan; - final TimeUnit unit; + extends AbstractWindowSubscriber + implements Runnable { + private static final long serialVersionUID = -6130475889925953722L; + final Scheduler scheduler; - final int bufferSize; final boolean restartTimerOnMaxSize; final long maxSize; final Scheduler.Worker worker; long count; - long producerIndex; - - Subscription upstream; - UnicastProcessor window; - volatile boolean terminated; - - final SequentialDisposable timer = new SequentialDisposable(); + final SequentialDisposable timer; WindowExactBoundedSubscriber( Subscriber> actual, long timespan, TimeUnit unit, Scheduler scheduler, int bufferSize, long maxSize, boolean restartTimerOnMaxSize) { - super(actual, new MpscLinkedQueue()); - this.timespan = timespan; - this.unit = unit; + super(actual, timespan, unit, bufferSize); this.scheduler = scheduler; - this.bufferSize = bufferSize; this.maxSize = maxSize; this.restartTimerOnMaxSize = restartTimerOnMaxSize; if (restartTimerOnMaxSize) { @@ -302,142 +360,50 @@ static final class WindowExactBoundedSubscriber } else { worker = null; } + this.timer = new SequentialDisposable(); } @Override - public void onSubscribe(Subscription s) { - if (SubscriptionHelper.validate(this.upstream, s)) { + void createFirstWindow() { + if (!downstreamCancelled.get()) { + if (requested.get() != 0L) { + emitted = 1; - this.upstream = s; - - Subscriber> a = downstream; - - a.onSubscribe(this); - - if (cancelled) { - return; - } + windowCount.getAndIncrement(); + window = UnicastProcessor.create(bufferSize, this); - UnicastProcessor w = UnicastProcessor.create(bufferSize); - window = w; + WindowSubscribeIntercept intercept = new WindowSubscribeIntercept(window); + downstream.onNext(intercept); - long r = requested(); - if (r != 0L) { - a.onNext(w); - if (r != Long.MAX_VALUE) { - produced(1); + Runnable boundaryTask = new WindowBoundaryRunnable(this, 1L); + if (restartTimerOnMaxSize) { + timer.replace(worker.schedulePeriodically(boundaryTask, timespan, timespan, unit)); + } else { + timer.replace(scheduler.schedulePeriodicallyDirect(boundaryTask, timespan, timespan, unit)); } - } else { - cancelled = true; - s.cancel(); - a.onError(new MissingBackpressureException("Could not deliver initial window due to lack of requests.")); - return; - } - - Disposable task; - ConsumerIndexHolder consumerIndexHolder = new ConsumerIndexHolder(producerIndex, this); - if (restartTimerOnMaxSize) { - task = worker.schedulePeriodically(consumerIndexHolder, timespan, timespan, unit); - } else { - task = scheduler.schedulePeriodicallyDirect(consumerIndexHolder, timespan, timespan, unit); - } - - if (timer.replace(task)) { - s.request(Long.MAX_VALUE); - } - } - } - - @Override - public void onNext(T t) { - if (terminated) { - return; - } - - if (fastEnter()) { - UnicastProcessor w = window; - w.onNext(t); - - long c = count + 1; - - if (c >= maxSize) { - producerIndex++; - count = 0; - - w.onComplete(); - long r = requested(); - - if (r != 0L) { - w = UnicastProcessor.create(bufferSize); - window = w; - downstream.onNext(w); - if (r != Long.MAX_VALUE) { - produced(1); - } - if (restartTimerOnMaxSize) { - Disposable tm = timer.get(); - - tm.dispose(); - Disposable task = worker.schedulePeriodically( - new ConsumerIndexHolder(producerIndex, this), timespan, timespan, unit); - timer.replace(task); - } - } else { - window = null; - upstream.cancel(); - downstream.onError(new MissingBackpressureException("Could not deliver window due to lack of requests")); - disposeTimer(); - return; + if (intercept.tryAbandon()) { + window.onComplete(); } + + upstream.request(Long.MAX_VALUE); } else { - count = c; - } + upstream.cancel(); + downstream.onError(new MissingBackpressureException(missingBackpressureMessage(emitted))); - if (leave(-1) == 0) { - return; - } - } else { - queue.offer(NotificationLite.next(t)); - if (!enter()) { - return; + cleanupResources(); + upstreamCancelled = true; } } - drainLoop(); } @Override - public void onError(Throwable t) { - error = t; - done = true; - if (enter()) { - drainLoop(); - } - - downstream.onError(t); - } - - @Override - public void onComplete() { - done = true; - if (enter()) { - drainLoop(); - } - - downstream.onComplete(); - } - - @Override - public void request(long n) { - requested(n); + public void run() { + windowDone(); } @Override - public void cancel() { - cancelled = true; - } - - public void disposeTimer() { + void cleanupResources() { timer.dispose(); Worker w = worker; if (w != null) { @@ -445,343 +411,278 @@ public void disposeTimer() { } } - void drainLoop() { - final SimplePlainQueue q = queue; - final Subscriber> a = downstream; - UnicastProcessor w = window; + void boundary(WindowBoundaryRunnable sender) { + queue.offer(sender); + drain(); + } - int missed = 1; - for (;;) { + @Override + void drain() { + if (getAndIncrement() != 0) { + return; + } - for (;;) { - if (terminated) { - upstream.cancel(); - q.clear(); - disposeTimer(); - return; - } + int missed = 1; + final SimplePlainQueue queue = this.queue; + final Subscriber> downstream = this.downstream; + UnicastProcessor window = this.window; - boolean d = done; + for (;;) { - Object o = q.poll(); + if (upstreamCancelled) { + queue.clear(); + window = null; + this.window = null; + } else { - boolean empty = o == null; - boolean isHolder = o instanceof ConsumerIndexHolder; + boolean isDone = done; + Object o = queue.poll(); + boolean isEmpty = o == null; - if (d && (empty || isHolder)) { - window = null; - q.clear(); - Throwable err = error; - if (err != null) { - w.onError(err); + if (isDone && isEmpty) { + Throwable ex = error; + if (ex != null) { + if (window != null) { + window.onError(ex); + } + downstream.onError(ex); } else { - w.onComplete(); + if (window != null) { + window.onComplete(); + } + downstream.onComplete(); } - disposeTimer(); - return; - } - - if (empty) { - break; - } - - if (isHolder) { - ConsumerIndexHolder consumerIndexHolder = (ConsumerIndexHolder) o; - if (!restartTimerOnMaxSize || producerIndex == consumerIndexHolder.index) { - w.onComplete(); - count = 0; - w = UnicastProcessor.create(bufferSize); - window = w; - - long r = requested(); - if (r != 0L) { - a.onNext(w); - if (r != Long.MAX_VALUE) { - produced(1); - } + cleanupResources(); + upstreamCancelled = true; + continue; + } else if (!isEmpty) { + if (o instanceof WindowBoundaryRunnable) { + WindowBoundaryRunnable boundary = (WindowBoundaryRunnable) o; + if (boundary.index == emitted || !restartTimerOnMaxSize) { + this.count = 0; + window = createNewWindow(window); + } + } else if (window != null) { + @SuppressWarnings("unchecked") + T item = (T)o; + window.onNext(item); + + long count = this.count + 1; + if (count == maxSize) { + this.count = 0; + window = createNewWindow(window); } else { - window = null; - queue.clear(); - upstream.cancel(); - a.onError(new MissingBackpressureException("Could not deliver first window due to lack of requests.")); - disposeTimer(); - return; + this.count = count; } } + continue; } + } - w.onNext(NotificationLite.getValue(o)); - long c = count + 1; - - if (c >= maxSize) { - producerIndex++; - count = 0; + missed = addAndGet(-missed); + if (missed == 0) { + break; + } + } + } - w.onComplete(); + UnicastProcessor createNewWindow(UnicastProcessor window) { + if (window != null) { + window.onComplete(); + window = null; + } - long r = requested(); + if (downstreamCancelled.get()) { + cleanupResources(); + } else { + long emitted = this.emitted; + if (requested.get() == emitted) { + upstream.cancel(); + cleanupResources(); + upstreamCancelled = true; - if (r != 0L) { - w = UnicastProcessor.create(bufferSize); - window = w; - downstream.onNext(w); - if (r != Long.MAX_VALUE) { - produced(1); - } + downstream.onError(new MissingBackpressureException(missingBackpressureMessage(emitted))); + } else { + this.emitted = ++emitted; - if (restartTimerOnMaxSize) { - Disposable tm = timer.get(); - tm.dispose(); + windowCount.getAndIncrement(); + window = UnicastProcessor.create(bufferSize, this); + this.window = window; - Disposable task = worker.schedulePeriodically( - new ConsumerIndexHolder(producerIndex, this), timespan, timespan, unit); - timer.replace(task); - } + WindowSubscribeIntercept intercept = new WindowSubscribeIntercept(window); + downstream.onNext(intercept); - } else { - window = null; - upstream.cancel(); - downstream.onError(new MissingBackpressureException("Could not deliver window due to lack of requests")); - disposeTimer(); - return; - } - } else { - count = c; + if (restartTimerOnMaxSize) { + timer.update(worker.schedulePeriodically(new WindowBoundaryRunnable(this, emitted), timespan, timespan, unit)); } - } - missed = leave(-missed); - if (missed == 0) { - break; + if (intercept.tryAbandon()) { + window.onComplete(); + } } } + + return window; } - static final class ConsumerIndexHolder implements Runnable { - final long index; + static final class WindowBoundaryRunnable implements Runnable { + final WindowExactBoundedSubscriber parent; - ConsumerIndexHolder(long index, WindowExactBoundedSubscriber parent) { - this.index = index; + + final long index; + + WindowBoundaryRunnable(WindowExactBoundedSubscriber parent, long index) { this.parent = parent; + this.index = index; } @Override public void run() { - WindowExactBoundedSubscriber p = parent; - - if (!p.cancelled) { - p.queue.offer(this); - } else { - p.terminated = true; - } - if (p.enter()) { - p.drainLoop(); - } + parent.boundary(this); } } } static final class WindowSkipSubscriber - extends QueueDrainSubscriber> - implements Subscription, Runnable { - final long timespan; + extends AbstractWindowSubscriber + implements Runnable { + private static final long serialVersionUID = -7852870764194095894L; + final long timeskip; - final TimeUnit unit; final Scheduler.Worker worker; - final int bufferSize; final List> windows; - Subscription upstream; - - volatile boolean terminated; - WindowSkipSubscriber(Subscriber> actual, long timespan, long timeskip, TimeUnit unit, Worker worker, int bufferSize) { - super(actual, new MpscLinkedQueue()); - this.timespan = timespan; + super(actual, timespan, unit, bufferSize); this.timeskip = timeskip; - this.unit = unit; this.worker = worker; - this.bufferSize = bufferSize; this.windows = new LinkedList>(); } @Override - public void onSubscribe(Subscription s) { - if (SubscriptionHelper.validate(this.upstream, s)) { - - this.upstream = s; + void createFirstWindow() { + if (!downstreamCancelled.get()) { + if (requested.get() != 0L) { + emitted = 1; - downstream.onSubscribe(this); + windowCount.getAndIncrement(); + UnicastProcessor window = UnicastProcessor.create(bufferSize, this); + windows.add(window); - if (cancelled) { - return; - } + WindowSubscribeIntercept intercept = new WindowSubscribeIntercept(window); + downstream.onNext(intercept); - long r = requested(); - if (r != 0L) { - final UnicastProcessor w = UnicastProcessor.create(bufferSize); - windows.add(w); + worker.schedule(new WindowBoundaryRunnable(this, false), timespan, unit); + worker.schedulePeriodically(new WindowBoundaryRunnable(this, true), timeskip, timeskip, unit); - downstream.onNext(w); - if (r != Long.MAX_VALUE) { - produced(1); + if (intercept.tryAbandon()) { + window.onComplete(); + windows.remove(window); } - worker.schedule(new Completion(w), timespan, unit); - - worker.schedulePeriodically(this, timeskip, timeskip, unit); - - s.request(Long.MAX_VALUE); + upstream.request(Long.MAX_VALUE); } else { - s.cancel(); - downstream.onError(new MissingBackpressureException("Could not emit the first window due to lack of requests")); - } - } - } + upstream.cancel(); + downstream.onError(new MissingBackpressureException(missingBackpressureMessage(emitted))); - @Override - public void onNext(T t) { - if (fastEnter()) { - for (UnicastProcessor w : windows) { - w.onNext(t); - } - if (leave(-1) == 0) { - return; - } - } else { - queue.offer(t); - if (!enter()) { - return; + cleanupResources(); + upstreamCancelled = true; } } - drainLoop(); } @Override - public void onError(Throwable t) { - error = t; - done = true; - if (enter()) { - drainLoop(); - } - - downstream.onError(t); + void cleanupResources() { + worker.dispose(); } @Override - public void onComplete() { - done = true; - if (enter()) { - drainLoop(); + void drain() { + if (getAndIncrement() != 0) { + return; } - downstream.onComplete(); - } - - @Override - public void request(long n) { - requested(n); - } - - @Override - public void cancel() { - cancelled = true; - } - - void complete(UnicastProcessor w) { - queue.offer(new SubjectWork(w, false)); - if (enter()) { - drainLoop(); - } - } - - @SuppressWarnings("unchecked") - void drainLoop() { - final SimplePlainQueue q = queue; - final Subscriber> a = downstream; - final List> ws = windows; - int missed = 1; + final SimplePlainQueue queue = this.queue; + final Subscriber> downstream = this.downstream; + final List> windows = this.windows; for (;;) { - - for (;;) { - if (terminated) { - upstream.cancel(); - q.clear(); - ws.clear(); - worker.dispose(); - return; - } - - boolean d = done; - - Object v = q.poll(); - - boolean empty = v == null; - boolean sw = v instanceof SubjectWork; - - if (d && (empty || sw)) { - q.clear(); - Throwable e = error; - if (e != null) { - for (UnicastProcessor w : ws) { - w.onError(e); + if (upstreamCancelled) { + queue.clear(); + windows.clear(); + } else { + boolean isDone = done; + Object o = queue.poll(); + boolean isEmpty = o == null; + + if (isDone && isEmpty) { + Throwable ex = error; + if (ex != null) { + for (UnicastProcessor window : windows) { + window.onError(ex); } + downstream.onError(ex); } else { - for (UnicastProcessor w : ws) { - w.onComplete(); + for (UnicastProcessor window : windows) { + window.onComplete(); } + downstream.onComplete(); } - ws.clear(); - worker.dispose(); - return; - } - - if (empty) { - break; - } - - if (sw) { - SubjectWork work = (SubjectWork)v; - - if (work.open) { - if (cancelled) { - continue; - } - - long r = requested(); - if (r != 0L) { - final UnicastProcessor w = UnicastProcessor.create(bufferSize); - ws.add(w); - a.onNext(w); - if (r != Long.MAX_VALUE) { - produced(1); + cleanupResources(); + upstreamCancelled = true; + continue; + } else if (!isEmpty) { + if (o == WINDOW_OPEN) { + if (!downstreamCancelled.get()) { + long emitted = this.emitted; + if (requested.get() != emitted) { + this.emitted = ++emitted; + + windowCount.getAndIncrement(); + UnicastProcessor window = UnicastProcessor.create(bufferSize, this); + windows.add(window); + + WindowSubscribeIntercept intercept = new WindowSubscribeIntercept(window); + downstream.onNext(intercept); + + worker.schedule(new WindowBoundaryRunnable(this, false), timespan, unit); + + if (intercept.tryAbandon()) { + window.onComplete(); + } + } else { + upstream.cancel(); + Throwable ex = new MissingBackpressureException(missingBackpressureMessage(emitted)); + for (UnicastProcessor window : windows) { + window.onError(ex); + } + downstream.onError(ex); + + cleanupResources(); + upstreamCancelled = true; + continue; } - - worker.schedule(new Completion(w), timespan, unit); - } else { - a.onError(new MissingBackpressureException("Can't emit window due to lack of requests")); + } + } else if (o == WINDOW_CLOSE) { + if (!windows.isEmpty()) { + windows.remove(0).onComplete(); } } else { - ws.remove(work.w); - work.w.onComplete(); - if (ws.isEmpty() && cancelled) { - terminated = true; + @SuppressWarnings("unchecked") + T item = (T)o; + for (UnicastProcessor window : windows) { + window.onNext(item); } } - } else { - for (UnicastProcessor w : ws) { - w.onNext((T)v); - } + continue; } } - - missed = leave(-missed); + missed = addAndGet(-missed); if (missed == 0) { break; } @@ -790,39 +691,37 @@ void drainLoop() { @Override public void run() { - - UnicastProcessor w = UnicastProcessor.create(bufferSize); - - SubjectWork sw = new SubjectWork(w, true); - if (!cancelled) { - queue.offer(sw); - } - if (enter()) { - drainLoop(); - } + windowDone(); } - static final class SubjectWork { - final UnicastProcessor w; - final boolean open; - SubjectWork(UnicastProcessor w, boolean open) { - this.w = w; - this.open = open; - } + void boundary(boolean isOpen) { + queue.offer(isOpen ? WINDOW_OPEN : WINDOW_CLOSE); + drain(); } - final class Completion implements Runnable { - private final UnicastProcessor processor; + static final Object WINDOW_OPEN = new Object(); + static final Object WINDOW_CLOSE = new Object(); + + static final class WindowBoundaryRunnable implements Runnable { + + final WindowSkipSubscriber parent; - Completion(UnicastProcessor processor) { - this.processor = processor; + final boolean isOpen; + + WindowBoundaryRunnable(WindowSkipSubscriber parent, boolean isOpen) { + this.parent = parent; + this.isOpen = isOpen; } @Override public void run() { - complete(processor); + parent.boundary(isOpen); } } } + static String missingBackpressureMessage(long index) { + return "Unable to emit the next window (#" + index + ") due to lack of requests. Please make sure the downstream is ready to consume windows."; + } + } diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/WindowSubscribeIntercept.java b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/WindowSubscribeIntercept.java new file mode 100644 index 0000000000..4fcbdee225 --- /dev/null +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/WindowSubscribeIntercept.java @@ -0,0 +1,48 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.operators.flowable; + +import java.util.concurrent.atomic.AtomicBoolean; + +import org.reactivestreams.Subscriber; + +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.processors.FlowableProcessor; + +/** + * Wrapper for a FlowableProcessor that detects an incoming subscriber. + * @param the element type of the flow. + * @since 3.0.0 + */ +final class WindowSubscribeIntercept extends Flowable { + + final FlowableProcessor window; + + final AtomicBoolean once; + + WindowSubscribeIntercept(FlowableProcessor source) { + this.window = source; + this.once = new AtomicBoolean(); + } + + @Override + protected void subscribeActual(Subscriber s) { + window.subscribe(s); + once.set(true); + } + + boolean tryAbandon() { + return !once.get() && once.compareAndSet(false, true); + } +} \ No newline at end of file diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableWindowWithSizeTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableWindowWithSizeTest.java index 27eefd437d..4536d7a8e4 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableWindowWithSizeTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableWindowWithSizeTest.java @@ -19,7 +19,7 @@ import java.util.*; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.*; import org.junit.Test; import org.reactivestreams.*; @@ -37,20 +37,12 @@ public class FlowableWindowWithSizeTest extends RxJavaTest { private static List> toLists(Flowable> observables) { - final List> lists = new ArrayList>(); - Flowable.concat(observables.map(new Function, Flowable>>() { + return observables.flatMapSingle(new Function, SingleSource>>() { @Override - public Flowable> apply(Flowable xs) { - return xs.toList().toFlowable(); + public SingleSource> apply(Flowable w) throws Throwable { + return w.toList(); } - })) - .blockingForEach(new Consumer>() { - @Override - public void accept(List xs) { - lists.add(xs); - } - }); - return lists; + }).toList().blockingGet(); } @Test @@ -311,7 +303,7 @@ public Flowable> apply(Flowable t) { return t.toList().toFlowable(); } }) - .concatMap(new Function>, Publisher>>() { + .concatMapEager(new Function>, Publisher>>() { @Override public Publisher> apply(Flowable> v) { return v; @@ -455,4 +447,166 @@ public void accept(Flowable w) throws Exception { to[0].assertFailure(TestException.class, 1); } + + @Test + public void cancellingWindowCancelsUpstreamSize() { + PublishProcessor pp = PublishProcessor.create(); + + TestSubscriber ts = pp.window(10) + .take(1) + .flatMap(new Function, Publisher>() { + @Override + public Publisher apply(Flowable w) throws Throwable { + return w.take(1); + } + }) + .test(); + + assertTrue(pp.hasSubscribers()); + + pp.onNext(1); + + ts + .assertResult(1); + + assertFalse("Processor still has subscribers!", pp.hasSubscribers()); + } + + @Test + public void windowAbandonmentCancelsUpstreamSize() { + PublishProcessor pp = PublishProcessor.create(); + + final AtomicReference> inner = new AtomicReference>(); + + TestSubscriber> ts = pp.window(10) + .take(1) + .doOnNext(new Consumer>() { + @Override + public void accept(Flowable v) throws Throwable { + inner.set(v); + } + }) + .test(); + + assertTrue(pp.hasSubscribers()); + + pp.onNext(1); + + ts + .assertValueCount(1) + .assertNoErrors() + .assertComplete(); + + assertFalse("Processor still has subscribers!", pp.hasSubscribers()); + + inner.get().test().assertResult(1); + } + + @Test + public void cancellingWindowCancelsUpstreamSkip() { + PublishProcessor pp = PublishProcessor.create(); + + TestSubscriber ts = pp.window(5, 10) + .take(1) + .flatMap(new Function, Publisher>() { + @Override + public Publisher apply(Flowable w) throws Throwable { + return w.take(1); + } + }) + .test(); + + assertTrue(pp.hasSubscribers()); + + pp.onNext(1); + + ts + .assertResult(1); + + assertFalse("Processor still has subscribers!", pp.hasSubscribers()); + } + + @Test + public void windowAbandonmentCancelsUpstreamSkip() { + PublishProcessor pp = PublishProcessor.create(); + + final AtomicReference> inner = new AtomicReference>(); + + TestSubscriber> ts = pp.window(5, 10) + .take(1) + .doOnNext(new Consumer>() { + @Override + public void accept(Flowable v) throws Throwable { + inner.set(v); + } + }) + .test(); + + assertTrue(pp.hasSubscribers()); + + pp.onNext(1); + + ts + .assertValueCount(1) + .assertNoErrors() + .assertComplete(); + + assertFalse("Processor still has subscribers!", pp.hasSubscribers()); + + inner.get().test().assertResult(1); + } + + @Test + public void cancellingWindowCancelsUpstreamOverlap() { + PublishProcessor pp = PublishProcessor.create(); + + TestSubscriber ts = pp.window(5, 3) + .take(1) + .flatMap(new Function, Publisher>() { + @Override + public Publisher apply(Flowable w) throws Throwable { + return w.take(1); + } + }) + .test(); + + assertTrue(pp.hasSubscribers()); + + pp.onNext(1); + + ts + .assertResult(1); + + assertFalse("Processor still has subscribers!", pp.hasSubscribers()); + } + + @Test + public void windowAbandonmentCancelsUpstreamOverlap() { + PublishProcessor pp = PublishProcessor.create(); + + final AtomicReference> inner = new AtomicReference>(); + + TestSubscriber> ts = pp.window(5, 3) + .take(1) + .doOnNext(new Consumer>() { + @Override + public void accept(Flowable v) throws Throwable { + inner.set(v); + } + }) + .test(); + + assertTrue(pp.hasSubscribers()); + + pp.onNext(1); + + ts + .assertValueCount(1) + .assertNoErrors() + .assertComplete(); + + assertFalse("Processor still has subscribers!", pp.hasSubscribers()); + + inner.get().test().assertResult(1); + } } diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableWindowWithTimeTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableWindowWithTimeTest.java index 82ac4bb67f..eb72dab5f7 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableWindowWithTimeTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableWindowWithTimeTest.java @@ -514,14 +514,24 @@ public void overlapBackpressure2() { PublishProcessor pp = PublishProcessor.create(); + final TestSubscriber tsInner = new TestSubscriber(); + TestSubscriber> ts = pp.window(2, 1, TimeUnit.SECONDS, scheduler) + .doOnNext(new Consumer>() { + @Override + public void accept(Flowable w) throws Throwable { + w.subscribe(tsInner); + } + }) // avoid abandonment .test(1L); scheduler.advanceTimeBy(2, TimeUnit.SECONDS); ts.assertError(MissingBackpressureException.class); - TestHelper.assertError(errors, 0, MissingBackpressureException.class); + tsInner.assertError(MissingBackpressureException.class); + + assertTrue(errors.isEmpty()); } finally { RxJavaPlugins.reset(); } @@ -820,6 +830,12 @@ public void countRestartsOnTimeTick() { FlowableProcessor ps = PublishProcessor.create(); TestSubscriber> ts = ps.window(5, TimeUnit.MILLISECONDS, scheduler, 5, true) + .doOnNext(new Consumer>() { + @Override + public void accept(Flowable w) throws Throwable { + w.subscribe(); + } + }) // avoid abandonment .test(); // window #1 @@ -1159,5 +1175,155 @@ public void accept(Flowable v) throws Exception { assertFalse("The doOnNext got interrupted!", isInterrupted.get()); } + + @Test + public void cancellingWindowCancelsUpstreamExactTime() { + PublishProcessor pp = PublishProcessor.create(); + + TestSubscriber ts = pp.window(10, TimeUnit.MINUTES) + .take(1) + .flatMap(new Function, Publisher>() { + @Override + public Publisher apply(Flowable w) throws Throwable { + return w.take(1); + } + }) + .test(); + + assertTrue(pp.hasSubscribers()); + + pp.onNext(1); + + ts + .assertResult(1); + + assertFalse("Processor still has subscribers!", pp.hasSubscribers()); + } + + @Test + public void windowAbandonmentCancelsUpstreamExactTime() { + PublishProcessor pp = PublishProcessor.create(); + + final AtomicReference> inner = new AtomicReference>(); + + TestSubscriber> ts = pp.window(10, TimeUnit.MINUTES) + .take(1) + .doOnNext(new Consumer>() { + @Override + public void accept(Flowable v) throws Throwable { + inner.set(v); + } + }) + .test(); + + assertFalse("Processor still has subscribers!", pp.hasSubscribers()); + + ts + .assertValueCount(1) + .assertNoErrors() + .assertComplete(); + + inner.get().test().assertResult(); + } + + @Test + public void cancellingWindowCancelsUpstreamExactTimeAndSize() { + PublishProcessor pp = PublishProcessor.create(); + + TestSubscriber ts = pp.window(10, TimeUnit.MINUTES, 100) + .take(1) + .flatMap(new Function, Publisher>() { + @Override + public Publisher apply(Flowable w) throws Throwable { + return w.take(1); + } + }) + .test(); + + assertTrue(pp.hasSubscribers()); + + pp.onNext(1); + + ts + .assertResult(1); + + assertFalse("Processor still has subscribers!", pp.hasSubscribers()); + } + + @Test + public void windowAbandonmentCancelsUpstreamExactTimeAndSize() { + PublishProcessor pp = PublishProcessor.create(); + + final AtomicReference> inner = new AtomicReference>(); + + TestSubscriber> ts = pp.window(10, TimeUnit.MINUTES, 100) + .take(1) + .doOnNext(new Consumer>() { + @Override + public void accept(Flowable v) throws Throwable { + inner.set(v); + } + }) + .test(); + + assertFalse("Processor still has subscribers!", pp.hasSubscribers()); + + ts + .assertValueCount(1) + .assertNoErrors() + .assertComplete(); + + inner.get().test().assertResult(); + } + + @Test + public void cancellingWindowCancelsUpstreamExactTimeSkip() { + PublishProcessor pp = PublishProcessor.create(); + + TestSubscriber ts = pp.window(10, 15, TimeUnit.MINUTES) + .take(1) + .flatMap(new Function, Publisher>() { + @Override + public Publisher apply(Flowable w) throws Throwable { + return w.take(1); + } + }) + .test(); + + assertTrue(pp.hasSubscribers()); + + pp.onNext(1); + + ts + .assertResult(1); + + assertFalse("Processor still has subscribers!", pp.hasSubscribers()); + } + + @Test + public void windowAbandonmentCancelsUpstreamExactTimeSkip() { + PublishProcessor pp = PublishProcessor.create(); + + final AtomicReference> inner = new AtomicReference>(); + + TestSubscriber> ts = pp.window(10, 15, TimeUnit.MINUTES) + .take(1) + .doOnNext(new Consumer>() { + @Override + public void accept(Flowable v) throws Throwable { + inner.set(v); + } + }) + .test(); + + assertFalse("Processor still has subscribers!", pp.hasSubscribers()); + + ts + .assertValueCount(1) + .assertNoErrors() + .assertComplete(); + + inner.get().test().assertResult(); + } }