From 32f5dbb37e3c80c22c87440f9f5d18f09cbbbdc9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?D=C3=A1vid=20Karnok?= Date: Fri, 28 Feb 2020 23:26:52 +0100 Subject: [PATCH] 3.x: Fix replay() not resetting when the connection is disposed --- .../operators/flowable/FlowableReplay.java | 14 +- .../observable/ObservableReplay.java | 14 +- .../flowable/FlowablePublishTest.java | 27 ++++ .../FlowableReplayEagerTruncateTest.java | 94 ++++++++++++-- .../flowable/FlowableReplayTest.java | 121 ++++++++++++++++-- .../observable/ObservablePublishTest.java | 27 ++++ .../ObservableReplayEagerTruncateTest.java | 83 +++++++++++- .../observable/ObservableReplayTest.java | 27 ++++ 8 files changed, 368 insertions(+), 39 deletions(-) diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableReplay.java b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableReplay.java index 258db5f3bc..4423747632 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableReplay.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableReplay.java @@ -179,7 +179,7 @@ public void connect(Consumer connection) { } // create a new subscriber-to-source - ReplaySubscriber u = new ReplaySubscriber<>(buf); + ReplaySubscriber u = new ReplaySubscriber<>(buf, current); // try setting it as the current subscriber-to-source if (!current.compareAndSet(ps, u)) { // did not work, perhaps a new subscriber arrived @@ -249,9 +249,13 @@ static final class ReplaySubscriber /** Tracks the amount already requested from the upstream. */ long requestedFromUpstream; + /** The current connection. */ + final AtomicReference> current; + @SuppressWarnings("unchecked") - ReplaySubscriber(ReplayBuffer buffer) { + ReplaySubscriber(ReplayBuffer buffer, AtomicReference> current) { this.buffer = buffer; + this.current = current; this.management = new AtomicInteger(); this.subscribers = new AtomicReference<>(EMPTY); this.shouldConnect = new AtomicBoolean(); @@ -266,9 +270,7 @@ public boolean isDisposed() { @Override public void dispose() { subscribers.set(TERMINATED); - // unlike OperatorPublish, we can't null out the terminated so - // late subscribers can still get replay - // current.compareAndSet(ReplaySubscriber.this, null); + current.compareAndSet(ReplaySubscriber.this, null); // we don't care if it fails because it means the current has // been replaced in the meantime SubscriptionHelper.cancel(this); @@ -1198,7 +1200,7 @@ public void subscribe(Subscriber child) { return; } // create a new subscriber to source - ReplaySubscriber u = new ReplaySubscriber<>(buf); + ReplaySubscriber u = new ReplaySubscriber<>(buf, curr); // let's try setting it as the current subscriber-to-source if (!curr.compareAndSet(null, u)) { // didn't work, maybe someone else did it or the current subscriber diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableReplay.java b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableReplay.java index f7937b0509..3b7bff1673 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableReplay.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableReplay.java @@ -174,7 +174,7 @@ public void connect(Consumer connection) { // create a new subscriber-to-source ReplayBuffer buf = bufferFactory.call(); - ReplayObserver u = new ReplayObserver<>(buf); + ReplayObserver u = new ReplayObserver<>(buf, current); // try setting it as the current subscriber-to-source if (!current.compareAndSet(ps, u)) { // did not work, perhaps a new subscriber arrived @@ -240,8 +240,12 @@ static final class ReplayObserver */ final AtomicBoolean shouldConnect; - ReplayObserver(ReplayBuffer buffer) { + /** The current connection. */ + final AtomicReference> current; + + ReplayObserver(ReplayBuffer buffer, AtomicReference> current) { this.buffer = buffer; + this.current = current; this.observers = new AtomicReference<>(EMPTY); this.shouldConnect = new AtomicBoolean(); @@ -255,9 +259,7 @@ public boolean isDisposed() { @Override public void dispose() { observers.set(TERMINATED); - // unlike OperatorPublish, we can't null out the terminated so - // late observers can still get replay - // current.compareAndSet(ReplayObserver.this, null); + current.compareAndSet(ReplayObserver.this, null); // we don't care if it fails because it means the current has // been replaced in the meantime DisposableHelper.dispose(this); @@ -1004,7 +1006,7 @@ public void subscribe(Observer child) { // create a new subscriber to source ReplayBuffer buf = bufferFactory.call(); - ReplayObserver u = new ReplayObserver<>(buf); + ReplayObserver u = new ReplayObserver<>(buf, curr); // let's try setting it as the current subscriber-to-source if (!curr.compareAndSet(null, u)) { // didn't work, maybe someone else did it or the current subscriber diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowablePublishTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowablePublishTest.java index 6e701e5fc3..a3fb489e9f 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowablePublishTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowablePublishTest.java @@ -1776,4 +1776,31 @@ public void onError(Throwable t) { ts1.assertEmpty(); } + + @Test + public void disposeNoNeedForReset() { + PublishProcessor pp = PublishProcessor.create(); + + ConnectableFlowable cf = pp.publish(); + + TestSubscriber ts = cf.test(); + + Disposable d = cf.connect(); + + pp.onNext(1); + + d.dispose(); + + ts = cf.test(); + + ts.assertEmpty(); + + cf.connect(); + + ts.assertEmpty(); + + pp.onNext(2); + + ts.assertValuesOnly(2); + } } diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableReplayEagerTruncateTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableReplayEagerTruncateTest.java index ce2cf5a564..62612f886b 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableReplayEagerTruncateTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableReplayEagerTruncateTest.java @@ -1923,19 +1923,6 @@ public ReplayBuffer get() throws Exception { .assertFailure(TestException.class); } - @Test - public void currentDisposedWhenConnecting() { - FlowableReplay fr = (FlowableReplay)FlowableReplay.create(Flowable.never(), 16, true); - fr.connect(); - - fr.current.get().dispose(); - assertTrue(fr.current.get().isDisposed()); - - fr.connect(); - - assertFalse(fr.current.get().isDisposed()); - } - @Test public void noBoundedRetentionViaThreadLocal() throws Exception { Flowable source = Flowable.range(1, 200) @@ -2275,4 +2262,85 @@ public void timeAndSizeNoTerminalTruncationOnTimechange() { .assertComplete() .assertNoErrors(); } + + @Test + public void disposeNoNeedForResetSizeBound() { + PublishProcessor pp = PublishProcessor.create(); + + ConnectableFlowable cf = pp.replay(10, true); + + TestSubscriber ts = cf.test(); + + Disposable d = cf.connect(); + + pp.onNext(1); + + d.dispose(); + + ts = cf.test(); + + ts.assertEmpty(); + + cf.connect(); + + ts.assertEmpty(); + + pp.onNext(2); + + ts.assertValuesOnly(2); + } + + @Test + public void disposeNoNeedForResetTimeBound() { + PublishProcessor pp = PublishProcessor.create(); + + ConnectableFlowable cf = pp.replay(10, TimeUnit.MINUTES, Schedulers.single(), true); + + TestSubscriber ts = cf.test(); + + Disposable d = cf.connect(); + + pp.onNext(1); + + d.dispose(); + + ts = cf.test(); + + ts.assertEmpty(); + + cf.connect(); + + ts.assertEmpty(); + + pp.onNext(2); + + ts.assertValuesOnly(2); + } + + @Test + public void disposeNoNeedForResetTimeAndSIzeBound() { + PublishProcessor pp = PublishProcessor.create(); + + ConnectableFlowable cf = pp.replay(10, 10, TimeUnit.MINUTES, Schedulers.single(), true); + + TestSubscriber ts = cf.test(); + + Disposable d = cf.connect(); + + pp.onNext(1); + + d.dispose(); + + ts = cf.test(); + + ts.assertEmpty(); + + cf.connect(); + + ts.assertEmpty(); + + pp.onNext(2); + + ts.assertValuesOnly(2); + } } diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableReplayTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableReplayTest.java index e151dd07f1..2a799af71f 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableReplayTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableReplayTest.java @@ -1988,19 +1988,6 @@ public ReplayBuffer get() throws Exception { .assertFailure(TestException.class); } - @Test - public void currentDisposedWhenConnecting() { - FlowableReplay fr = (FlowableReplay)FlowableReplay.create(Flowable.never(), 16, false); - fr.connect(); - - fr.current.get().dispose(); - assertTrue(fr.current.get().isDisposed()); - - fr.connect(); - - assertFalse(fr.current.get().isDisposed()); - } - @Test public void noBoundedRetentionViaThreadLocal() throws Exception { Flowable source = Flowable.range(1, 200) @@ -2191,4 +2178,112 @@ public void cancel() { ts.assertResult(); } + + @Test + public void disposeNoNeedForReset() { + PublishProcessor pp = PublishProcessor.create(); + + ConnectableFlowable cf = pp.replay(); + + TestSubscriber ts = cf.test(); + + Disposable d = cf.connect(); + + pp.onNext(1); + + d.dispose(); + + ts = cf.test(); + + ts.assertEmpty(); + + cf.connect(); + + ts.assertEmpty(); + + pp.onNext(2); + + ts.assertValuesOnly(2); + } + + @Test + public void disposeNoNeedForResetSizeBound() { + PublishProcessor pp = PublishProcessor.create(); + + ConnectableFlowable cf = pp.replay(10); + + TestSubscriber ts = cf.test(); + + Disposable d = cf.connect(); + + pp.onNext(1); + + d.dispose(); + + ts = cf.test(); + + ts.assertEmpty(); + + cf.connect(); + + ts.assertEmpty(); + + pp.onNext(2); + + ts.assertValuesOnly(2); + } + + @Test + public void disposeNoNeedForResetTimeBound() { + PublishProcessor pp = PublishProcessor.create(); + + ConnectableFlowable cf = pp.replay(10, TimeUnit.MINUTES); + + TestSubscriber ts = cf.test(); + + Disposable d = cf.connect(); + + pp.onNext(1); + + d.dispose(); + + ts = cf.test(); + + ts.assertEmpty(); + + cf.connect(); + + ts.assertEmpty(); + + pp.onNext(2); + + ts.assertValuesOnly(2); + } + + @Test + public void disposeNoNeedForResetTimeAndSIzeBound() { + PublishProcessor pp = PublishProcessor.create(); + + ConnectableFlowable cf = pp.replay(10, 10, TimeUnit.MINUTES); + + TestSubscriber ts = cf.test(); + + Disposable d = cf.connect(); + + pp.onNext(1); + + d.dispose(); + + ts = cf.test(); + + ts.assertEmpty(); + + cf.connect(); + + ts.assertEmpty(); + + pp.onNext(2); + + ts.assertValuesOnly(2); + } } diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservablePublishTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservablePublishTest.java index 16cb03d00e..a83372c03e 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservablePublishTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservablePublishTest.java @@ -866,4 +866,31 @@ public void disposeResets() { to.assertValuesOnly(1); } + + @Test + public void disposeNoNeedForReset() { + PublishSubject ps = PublishSubject.create(); + + ConnectableObservable co = ps.publish(); + + TestObserver to = co.test(); + + Disposable d = co.connect(); + + ps.onNext(1); + + d.dispose(); + + to = co.test(); + + to.assertEmpty(); + + co.connect(); + + to.assertEmpty(); + + ps.onNext(2); + + to.assertValuesOnly(2); + } } diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableReplayEagerTruncateTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableReplayEagerTruncateTest.java index fe2c59d088..cf473d6e78 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableReplayEagerTruncateTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableReplayEagerTruncateTest.java @@ -30,7 +30,7 @@ import io.reactivex.rxjava3.core.Observable; import io.reactivex.rxjava3.core.Observer; import io.reactivex.rxjava3.core.Scheduler.Worker; -import io.reactivex.rxjava3.disposables.*; +import io.reactivex.rxjava3.disposables.Disposable; import io.reactivex.rxjava3.exceptions.TestException; import io.reactivex.rxjava3.functions.*; import io.reactivex.rxjava3.internal.functions.Functions; @@ -1976,4 +1976,85 @@ public void timeAndSizeNoTerminalTruncationOnTimechange() { .assertComplete() .assertNoErrors(); } + + @Test + public void disposeNoNeedForResetSizeBound() { + PublishSubject ps = PublishSubject.create(); + + ConnectableObservable co = ps.replay(10, true); + + TestObserver to = co.test(); + + Disposable d = co.connect(); + + ps.onNext(1); + + d.dispose(); + + to = co.test(); + + to.assertEmpty(); + + co.connect(); + + to.assertEmpty(); + + ps.onNext(2); + + to.assertValuesOnly(2); + } + + @Test + public void disposeNoNeedForResetTimeBound() { + PublishSubject ps = PublishSubject.create(); + + ConnectableObservable co = ps.replay(10, TimeUnit.MINUTES, Schedulers.single(), true); + + TestObserver to = co.test(); + + Disposable d = co.connect(); + + ps.onNext(1); + + d.dispose(); + + to = co.test(); + + to.assertEmpty(); + + co.connect(); + + to.assertEmpty(); + + ps.onNext(2); + + to.assertValuesOnly(2); + } + + @Test + public void disposeNoNeedForResetTimeAndSIzeBound() { + PublishSubject ps = PublishSubject.create(); + + ConnectableObservable co = ps.replay(10, 10, TimeUnit.MINUTES, Schedulers.single(), true); + + TestObserver to = co.test(); + + Disposable d = co.connect(); + + ps.onNext(1); + + d.dispose(); + + to = co.test(); + + to.assertEmpty(); + + co.connect(); + + to.assertEmpty(); + + ps.onNext(2); + + to.assertValuesOnly(2); + } } diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableReplayTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableReplayTest.java index cbaf961149..94a7a6087f 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableReplayTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableReplayTest.java @@ -1724,4 +1724,31 @@ public void resetWhileActiveIsNoOp() { co.reset(); } + + @Test + public void disposeNoNeedForReset() { + PublishSubject ps = PublishSubject.create(); + + ConnectableObservable co = ps.replay(); + + TestObserver to = co.test(); + + Disposable d = co.connect(); + + ps.onNext(1); + + d.dispose(); + + to = co.test(); + + to.assertEmpty(); + + co.connect(); + + to.assertEmpty(); + + ps.onNext(2); + + to.assertValuesOnly(2); + } }