Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

3.x: Fix replay() not resetting when the connection is disposed #6921

Merged
merged 1 commit into from
Feb 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ public void connect(Consumer<? super Disposable> connection) {
}

// create a new subscriber-to-source
ReplaySubscriber<T> u = new ReplaySubscriber<>(buf);
ReplaySubscriber<T> u = new ReplaySubscriber<>(buf, current);
// try setting it as the current subscriber-to-source
if (!current.compareAndSet(ps, u)) {
// did not work, perhaps a new subscriber arrived
Expand Down Expand Up @@ -249,9 +249,13 @@ static final class ReplaySubscriber<T>
/** Tracks the amount already requested from the upstream. */
long requestedFromUpstream;

/** The current connection. */
final AtomicReference<ReplaySubscriber<T>> current;

@SuppressWarnings("unchecked")
ReplaySubscriber(ReplayBuffer<T> buffer) {
ReplaySubscriber(ReplayBuffer<T> buffer, AtomicReference<ReplaySubscriber<T>> current) {
this.buffer = buffer;
this.current = current;
this.management = new AtomicInteger();
this.subscribers = new AtomicReference<>(EMPTY);
this.shouldConnect = new AtomicBoolean();
Expand All @@ -266,9 +270,7 @@ public boolean isDisposed() {
@Override
public void dispose() {
subscribers.set(TERMINATED);
// unlike OperatorPublish, we can't null out the terminated so
// late subscribers can still get replay
// current.compareAndSet(ReplaySubscriber.this, null);
current.compareAndSet(ReplaySubscriber.this, null);
// we don't care if it fails because it means the current has
// been replaced in the meantime
SubscriptionHelper.cancel(this);
Expand Down Expand Up @@ -1198,7 +1200,7 @@ public void subscribe(Subscriber<? super T> child) {
return;
}
// create a new subscriber to source
ReplaySubscriber<T> u = new ReplaySubscriber<>(buf);
ReplaySubscriber<T> u = new ReplaySubscriber<>(buf, curr);
// let's try setting it as the current subscriber-to-source
if (!curr.compareAndSet(null, u)) {
// didn't work, maybe someone else did it or the current subscriber
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ public void connect(Consumer<? super Disposable> connection) {
// create a new subscriber-to-source
ReplayBuffer<T> buf = bufferFactory.call();

ReplayObserver<T> u = new ReplayObserver<>(buf);
ReplayObserver<T> u = new ReplayObserver<>(buf, current);
// try setting it as the current subscriber-to-source
if (!current.compareAndSet(ps, u)) {
// did not work, perhaps a new subscriber arrived
Expand Down Expand Up @@ -240,8 +240,12 @@ static final class ReplayObserver<T>
*/
final AtomicBoolean shouldConnect;

ReplayObserver(ReplayBuffer<T> buffer) {
/** The current connection. */
final AtomicReference<ReplayObserver<T>> current;

ReplayObserver(ReplayBuffer<T> buffer, AtomicReference<ReplayObserver<T>> current) {
this.buffer = buffer;
this.current = current;

this.observers = new AtomicReference<>(EMPTY);
this.shouldConnect = new AtomicBoolean();
Expand All @@ -255,9 +259,7 @@ public boolean isDisposed() {
@Override
public void dispose() {
observers.set(TERMINATED);
// unlike OperatorPublish, we can't null out the terminated so
// late observers can still get replay
// current.compareAndSet(ReplayObserver.this, null);
current.compareAndSet(ReplayObserver.this, null);
// we don't care if it fails because it means the current has
// been replaced in the meantime
DisposableHelper.dispose(this);
Expand Down Expand Up @@ -1004,7 +1006,7 @@ public void subscribe(Observer<? super T> child) {
// create a new subscriber to source
ReplayBuffer<T> buf = bufferFactory.call();

ReplayObserver<T> u = new ReplayObserver<>(buf);
ReplayObserver<T> u = new ReplayObserver<>(buf, curr);
// let's try setting it as the current subscriber-to-source
if (!curr.compareAndSet(null, u)) {
// didn't work, maybe someone else did it or the current subscriber
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1776,4 +1776,31 @@ public void onError(Throwable t) {

ts1.assertEmpty();
}

@Test
public void disposeNoNeedForReset() {
PublishProcessor<Integer> pp = PublishProcessor.create();

ConnectableFlowable<Integer> cf = pp.publish();

TestSubscriber<Integer> ts = cf.test();

Disposable d = cf.connect();

pp.onNext(1);

d.dispose();

ts = cf.test();

ts.assertEmpty();

cf.connect();

ts.assertEmpty();

pp.onNext(2);

ts.assertValuesOnly(2);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1923,19 +1923,6 @@ public ReplayBuffer<Integer> get() throws Exception {
.assertFailure(TestException.class);
}

@Test
public void currentDisposedWhenConnecting() {
FlowableReplay<Integer> fr = (FlowableReplay<Integer>)FlowableReplay.create(Flowable.<Integer>never(), 16, true);
fr.connect();

fr.current.get().dispose();
assertTrue(fr.current.get().isDisposed());

fr.connect();

assertFalse(fr.current.get().isDisposed());
}

@Test
public void noBoundedRetentionViaThreadLocal() throws Exception {
Flowable<byte[]> source = Flowable.range(1, 200)
Expand Down Expand Up @@ -2275,4 +2262,85 @@ public void timeAndSizeNoTerminalTruncationOnTimechange() {
.assertComplete()
.assertNoErrors();
}

@Test
public void disposeNoNeedForResetSizeBound() {
PublishProcessor<Integer> pp = PublishProcessor.create();

ConnectableFlowable<Integer> cf = pp.replay(10, true);

TestSubscriber<Integer> ts = cf.test();

Disposable d = cf.connect();

pp.onNext(1);

d.dispose();

ts = cf.test();

ts.assertEmpty();

cf.connect();

ts.assertEmpty();

pp.onNext(2);

ts.assertValuesOnly(2);
}

@Test
public void disposeNoNeedForResetTimeBound() {
PublishProcessor<Integer> pp = PublishProcessor.create();

ConnectableFlowable<Integer> cf = pp.replay(10, TimeUnit.MINUTES, Schedulers.single(), true);

TestSubscriber<Integer> ts = cf.test();

Disposable d = cf.connect();

pp.onNext(1);

d.dispose();

ts = cf.test();

ts.assertEmpty();

cf.connect();

ts.assertEmpty();

pp.onNext(2);

ts.assertValuesOnly(2);
}

@Test
public void disposeNoNeedForResetTimeAndSIzeBound() {
PublishProcessor<Integer> pp = PublishProcessor.create();

ConnectableFlowable<Integer> cf = pp.replay(10, 10, TimeUnit.MINUTES, Schedulers.single(), true);

TestSubscriber<Integer> ts = cf.test();

Disposable d = cf.connect();

pp.onNext(1);

d.dispose();

ts = cf.test();

ts.assertEmpty();

cf.connect();

ts.assertEmpty();

pp.onNext(2);

ts.assertValuesOnly(2);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1988,19 +1988,6 @@ public ReplayBuffer<Integer> get() throws Exception {
.assertFailure(TestException.class);
}

@Test
public void currentDisposedWhenConnecting() {
FlowableReplay<Integer> fr = (FlowableReplay<Integer>)FlowableReplay.create(Flowable.<Integer>never(), 16, false);
fr.connect();

fr.current.get().dispose();
assertTrue(fr.current.get().isDisposed());

fr.connect();

assertFalse(fr.current.get().isDisposed());
}

@Test
public void noBoundedRetentionViaThreadLocal() throws Exception {
Flowable<byte[]> source = Flowable.range(1, 200)
Expand Down Expand Up @@ -2191,4 +2178,112 @@ public void cancel() {

ts.assertResult();
}

@Test
public void disposeNoNeedForReset() {
PublishProcessor<Integer> pp = PublishProcessor.create();

ConnectableFlowable<Integer> cf = pp.replay();

TestSubscriber<Integer> ts = cf.test();

Disposable d = cf.connect();

pp.onNext(1);

d.dispose();

ts = cf.test();

ts.assertEmpty();

cf.connect();

ts.assertEmpty();

pp.onNext(2);

ts.assertValuesOnly(2);
}

@Test
public void disposeNoNeedForResetSizeBound() {
PublishProcessor<Integer> pp = PublishProcessor.create();

ConnectableFlowable<Integer> cf = pp.replay(10);

TestSubscriber<Integer> ts = cf.test();

Disposable d = cf.connect();

pp.onNext(1);

d.dispose();

ts = cf.test();

ts.assertEmpty();

cf.connect();

ts.assertEmpty();

pp.onNext(2);

ts.assertValuesOnly(2);
}

@Test
public void disposeNoNeedForResetTimeBound() {
PublishProcessor<Integer> pp = PublishProcessor.create();

ConnectableFlowable<Integer> cf = pp.replay(10, TimeUnit.MINUTES);

TestSubscriber<Integer> ts = cf.test();

Disposable d = cf.connect();

pp.onNext(1);

d.dispose();

ts = cf.test();

ts.assertEmpty();

cf.connect();

ts.assertEmpty();

pp.onNext(2);

ts.assertValuesOnly(2);
}

@Test
public void disposeNoNeedForResetTimeAndSIzeBound() {
PublishProcessor<Integer> pp = PublishProcessor.create();

ConnectableFlowable<Integer> cf = pp.replay(10, 10, TimeUnit.MINUTES);

TestSubscriber<Integer> ts = cf.test();

Disposable d = cf.connect();

pp.onNext(1);

d.dispose();

ts = cf.test();

ts.assertEmpty();

cf.connect();

ts.assertEmpty();

pp.onNext(2);

ts.assertValuesOnly(2);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -866,4 +866,31 @@ public void disposeResets() {

to.assertValuesOnly(1);
}

@Test
public void disposeNoNeedForReset() {
PublishSubject<Integer> ps = PublishSubject.create();

ConnectableObservable<Integer> co = ps.publish();

TestObserver<Integer> to = co.test();

Disposable d = co.connect();

ps.onNext(1);

d.dispose();

to = co.test();

to.assertEmpty();

co.connect();

to.assertEmpty();

ps.onNext(2);

to.assertValuesOnly(2);
}
}
Loading