Skip to content

Commit

Permalink
3.x: Fix Flowable.window (size, time) cancellation and abandonment (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd committed Dec 16, 2019
1 parent c045188 commit df7f1cd
Show file tree
Hide file tree
Showing 6 changed files with 944 additions and 582 deletions.
64 changes: 64 additions & 0 deletions src/main/java/io/reactivex/rxjava3/core/Flowable.java
Original file line number Diff line number Diff line change
Expand Up @@ -17407,6 +17407,10 @@ public final Flowable<T> unsubscribeOn(Scheduler scheduler) {
* propagates the notification from the source Publisher.
* <p>
* <img width="640" height="400" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/window3.png" alt="">
* <p>
* Note that ignoring windows or subscribing later (i.e., on another thread) will result in
* so-called window abandonment where a window will only contain one element. The behavior is
* a tradeoff between no-dataloss and ensuring upstream cancellation can happen.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator honors backpressure of its inner and outer subscribers, however, the inner Publisher uses an
Expand Down Expand Up @@ -17436,6 +17440,11 @@ public final Flowable<Flowable<T>> window(long count) {
* and propagates the notification from the source Publisher.
* <p>
* <img width="640" height="365" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/window4.png" alt="">
* <p>
* Note that ignoring windows or subscribing later (i.e., on another thread) will result in
* so-called window abandonment where a window may not contain any elements. In this case, subsequent
* elements will be dropped until the condition for the next window boundary is satisfied. The behavior is
* a tradeoff between no-dataloss and ensuring upstream cancellation can happen under some race conditions.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator honors backpressure of its inner and outer subscribers, however, the inner Publisher uses an
Expand Down Expand Up @@ -17468,6 +17477,11 @@ public final Flowable<Flowable<T>> window(long count, long skip) {
* and propagates the notification from the source Publisher.
* <p>
* <img width="640" height="365" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/window4.png" alt="">
* <p>
* Note that ignoring windows or subscribing later (i.e., on another thread) will result in
* so-called window abandonment where a window may not contain any elements. In this case, subsequent
* elements will be dropped until the condition for the next window boundary is satisfied. The behavior is
* a tradeoff between no-dataloss and ensuring upstream cancellation can happen under some race conditions.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator honors backpressure of its inner and outer subscribers, however, the inner Publisher uses an
Expand Down Expand Up @@ -17506,6 +17520,11 @@ public final Flowable<Flowable<T>> window(long count, long skip, int bufferSize)
* current window and propagates the notification from the source Publisher.
* <p>
* <img width="640" height="335" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/window7.png" alt="">
* <p>
* Note that ignoring windows or subscribing later (i.e., on another thread) will result in
* so-called window abandonment where a window may not contain any elements. In this case, subsequent
* elements will be dropped until the condition for the next window boundary is satisfied. The behavior is
* a tradeoff for ensuring upstream cancellation can happen under some race conditions.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator consumes the source {@code Publisher} in an unbounded manner.
Expand Down Expand Up @@ -17541,6 +17560,11 @@ public final Flowable<Flowable<T>> window(long timespan, long timeskip, TimeUnit
* current window and propagates the notification from the source Publisher.
* <p>
* <img width="640" height="335" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/window7.s.png" alt="">
* <p>
* Note that ignoring windows or subscribing later (i.e., on another thread) will result in
* so-called window abandonment where a window may not contain any elements. In this case, subsequent
* elements will be dropped until the condition for the next window boundary is satisfied. The behavior is
* a tradeoff for ensuring upstream cancellation can happen under some race conditions.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator consumes the source {@code Publisher} in an unbounded manner.
Expand Down Expand Up @@ -17578,6 +17602,11 @@ public final Flowable<Flowable<T>> window(long timespan, long timeskip, TimeUnit
* current window and propagates the notification from the source Publisher.
* <p>
* <img width="640" height="335" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/window7.s.png" alt="">
* <p>
* Note that ignoring windows or subscribing later (i.e., on another thread) will result in
* so-called window abandonment where a window may not contain any elements. In this case, subsequent
* elements will be dropped until the condition for the next window boundary is satisfied. The behavior is
* a tradeoff for ensuring upstream cancellation can happen under some race conditions.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator consumes the source {@code Publisher} in an unbounded manner.
Expand Down Expand Up @@ -17622,6 +17651,11 @@ public final Flowable<Flowable<T>> window(long timespan, long timeskip, TimeUnit
* Publisher emits the current window and propagates the notification from the source Publisher.
* <p>
* <img width="640" height="375" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/window5.png" alt="">
* <p>
* Note that ignoring windows or subscribing later (i.e., on another thread) will result in
* so-called window abandonment where a window may not contain any elements. In this case, subsequent
* elements will be dropped until the condition for the next window boundary is satisfied. The behavior is
* a tradeoff for ensuring upstream cancellation can happen under some race conditions.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator consumes the source {@code Publisher} in an unbounded manner.
Expand Down Expand Up @@ -17656,6 +17690,11 @@ public final Flowable<Flowable<T>> window(long timespan, TimeUnit unit) {
* emits the current window and propagates the notification from the source Publisher.
* <p>
* <img width="640" height="370" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/window6.png" alt="">
* <p>
* Note that ignoring windows or subscribing later (i.e., on another thread) will result in
* so-called window abandonment where a window may not contain any elements. In this case, subsequent
* elements will be dropped until the condition for the next window boundary is satisfied. The behavior is
* a tradeoff for ensuring upstream cancellation can happen under some race conditions.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator consumes the source {@code Publisher} in an unbounded manner.
Expand Down Expand Up @@ -17694,6 +17733,11 @@ public final Flowable<Flowable<T>> window(long timespan, TimeUnit unit,
* emits the current window and propagates the notification from the source Publisher.
* <p>
* <img width="640" height="370" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/window6.png" alt="">
* <p>
* Note that ignoring windows or subscribing later (i.e., on another thread) will result in
* so-called window abandonment where a window may not contain any elements. In this case, subsequent
* elements will be dropped until the condition for the next window boundary is satisfied. The behavior is
* a tradeoff for ensuring upstream cancellation can happen under some race conditions.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator consumes the source {@code Publisher} in an unbounded manner.
Expand Down Expand Up @@ -17733,6 +17777,11 @@ public final Flowable<Flowable<T>> window(long timespan, TimeUnit unit,
* Publisher emits the current window and propagates the notification from the source Publisher.
* <p>
* <img width="640" height="375" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/window5.s.png" alt="">
* <p>
* Note that ignoring windows or subscribing later (i.e., on another thread) will result in
* so-called window abandonment where a window may not contain any elements. In this case, subsequent
* elements will be dropped until the condition for the next window boundary is satisfied. The behavior is
* a tradeoff for ensuring upstream cancellation can happen under some race conditions.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator consumes the source {@code Publisher} in an unbounded manner.
Expand Down Expand Up @@ -17771,6 +17820,11 @@ public final Flowable<Flowable<T>> window(long timespan, TimeUnit unit,
* current window and propagates the notification from the source Publisher.
* <p>
* <img width="640" height="370" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/window6.s.png" alt="">
* <p>
* Note that ignoring windows or subscribing later (i.e., on another thread) will result in
* so-called window abandonment where a window may not contain any elements. In this case, subsequent
* elements will be dropped until the condition for the next window boundary is satisfied. The behavior is
* a tradeoff for ensuring upstream cancellation can happen under some race conditions.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator consumes the source {@code Publisher} in an unbounded manner.
Expand Down Expand Up @@ -17811,6 +17865,11 @@ public final Flowable<Flowable<T>> window(long timespan, TimeUnit unit,
* current window and propagates the notification from the source Publisher.
* <p>
* <img width="640" height="370" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/window6.s.png" alt="">
* <p>
* Note that ignoring windows or subscribing later (i.e., on another thread) will result in
* so-called window abandonment where a window may not contain any elements. In this case, subsequent
* elements will be dropped until the condition for the next window boundary is satisfied. The behavior is
* a tradeoff for ensuring upstream cancellation can happen under some race conditions.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator consumes the source {@code Publisher} in an unbounded manner.
Expand Down Expand Up @@ -17853,6 +17912,11 @@ public final Flowable<Flowable<T>> window(long timespan, TimeUnit unit,
* current window and propagates the notification from the source Publisher.
* <p>
* <img width="640" height="370" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/window6.s.png" alt="">
* <p>
* Note that ignoring windows or subscribing later (i.e., on another thread) will result in
* so-called window abandonment where a window may not contain any elements. In this case, subsequent
* elements will be dropped until the condition for the next window boundary is satisfied. The behavior is
* a tradeoff for ensuring upstream cancellation can happen under some race conditions.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator consumes the source {@code Publisher} in an unbounded manner.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import io.reactivex.rxjava3.internal.subscriptions.SubscriptionHelper;
import io.reactivex.rxjava3.internal.util.BackpressureHelper;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
import io.reactivex.rxjava3.processors.UnicastProcessor;
import io.reactivex.rxjava3.processors.*;

public final class FlowableWindow<T> extends AbstractFlowableWithUpstream<T, Flowable<T>> {
final long size;
Expand Down Expand Up @@ -92,13 +92,15 @@ public void onNext(T t) {
long i = index;

UnicastProcessor<T> w = window;
WindowSubscribeIntercept<T> intercept = null;
if (i == 0) {
getAndIncrement();

w = UnicastProcessor.<T>create(bufferSize, this);
window = w;

downstream.onNext(w);
intercept = new WindowSubscribeIntercept<T>(w);
downstream.onNext(intercept);
}

i++;
Expand All @@ -112,6 +114,10 @@ public void onNext(T t) {
} else {
index = i;
}

if (intercept != null && intercept.tryAbandon()) {
intercept.window.onComplete();
}
}

@Override
Expand Down Expand Up @@ -205,14 +211,16 @@ public void onSubscribe(Subscription s) {
public void onNext(T t) {
long i = index;

WindowSubscribeIntercept<T> intercept = null;
UnicastProcessor<T> w = window;
if (i == 0) {
getAndIncrement();

w = UnicastProcessor.<T>create(bufferSize, this);
window = w;

downstream.onNext(w);
intercept = new WindowSubscribeIntercept<T>(w);
downstream.onNext(intercept);
}

i++;
Expand All @@ -231,6 +239,10 @@ public void onNext(T t) {
} else {
index = i;
}

if (intercept != null && intercept.tryAbandon()) {
intercept.window.onComplete();
}
}

@Override
Expand Down Expand Up @@ -352,16 +364,14 @@ public void onNext(T t) {

long i = index;

UnicastProcessor<T> newWindow = null;
if (i == 0) {
if (!cancelled) {
getAndIncrement();

UnicastProcessor<T> w = UnicastProcessor.<T>create(bufferSize, this);

windows.offer(w);
newWindow = UnicastProcessor.<T>create(bufferSize, this);

queue.offer(w);
drain();
windows.offer(newWindow);
}
}

Expand All @@ -371,6 +381,11 @@ public void onNext(T t) {
w.onNext(t);
}

if (newWindow != null) {
queue.offer(newWindow);
drain();
}

long p = produced + 1;
if (p == size) {
produced = p - skip;
Expand Down Expand Up @@ -431,39 +446,59 @@ void drain() {
final SpscLinkedArrayQueue<UnicastProcessor<T>> q = queue;
int missed = 1;

outer:
for (;;) {

long r = requested.get();
long e = 0;
if (cancelled) {
UnicastProcessor<T> up = null;
while ((up = q.poll()) != null) {
up.onComplete();
}
} else {
long r = requested.get();
long e = 0;

while (e != r) {
boolean d = done;
while (e != r) {
boolean d = done;

UnicastProcessor<T> t = q.poll();
UnicastProcessor<T> t = q.poll();

boolean empty = t == null;
boolean empty = t == null;

if (checkTerminated(d, empty, a, q)) {
return;
}
if (cancelled) {
continue outer;
}

if (empty) {
break;
}
if (checkTerminated(d, empty, a, q)) {
return;
}

a.onNext(t);
if (empty) {
break;
}

e++;
}
WindowSubscribeIntercept<T> intercept = new WindowSubscribeIntercept<T>(t);
a.onNext(intercept);

if (e == r) {
if (checkTerminated(done, q.isEmpty(), a, q)) {
return;
if (intercept.tryAbandon()) {
t.onComplete();
}
e++;
}

if (e == r) {
if (cancelled) {
continue outer;
}

if (checkTerminated(done, q.isEmpty(), a, q)) {
return;
}
}
}

if (e != 0L && r != Long.MAX_VALUE) {
requested.addAndGet(-e);
if (e != 0L && r != Long.MAX_VALUE) {
requested.addAndGet(-e);
}
}

missed = wip.addAndGet(-missed);
Expand All @@ -474,11 +509,6 @@ void drain() {
}

boolean checkTerminated(boolean d, boolean empty, Subscriber<?> a, SpscLinkedArrayQueue<?> q) {
if (cancelled) {
q.clear();
return true;
}

if (d) {
Throwable e = error;

Expand Down Expand Up @@ -520,6 +550,7 @@ public void cancel() {
if (once.compareAndSet(false, true)) {
run();
}
drain();
}

@Override
Expand Down
Loading

0 comments on commit df7f1cd

Please sign in to comment.