Skip to content

Commit

Permalink
2.x: Fix refCount termination-reconnect race (#6187)
Browse files Browse the repository at this point in the history
* 2.x: Fix refCount termination-reconnect race

* Add/restore coverage

* Update ResettableConnectable interface and definitions
  • Loading branch information
akarnokd authored Aug 30, 2018
1 parent 2e566fb commit c7d91c6
Show file tree
Hide file tree
Showing 7 changed files with 229 additions and 32 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/**
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.internal.disposables;

import io.reactivex.annotations.Experimental;
import io.reactivex.disposables.Disposable;
import io.reactivex.flowables.ConnectableFlowable;
import io.reactivex.observables.ConnectableObservable;

/**
* Interface allowing conditional resetting of connections in {@link ConnectableObservable}s
* and {@link ConnectableFlowable}s.
* @since 2.2.2 - experimental
*/
@Experimental
public interface ResettableConnectable {

/**
* Reset the connectable source only if the given {@link Disposable} {@code connection} instance
* is still representing a connection established by a previous {@code connect()} connection.
* <p>
* For example, an immediately previous connection should reset the connectable source:
* <pre><code>
* Disposable d = connectable.connect();
*
* ((ResettableConnectable)connectable).resetIf(d);
* </code></pre>
* However, if the connection indicator {@code Disposable} is from a much earlier connection,
* it should not affect the current connection:
* <pre><code>
* Disposable d1 = connectable.connect();
* d.dispose();
*
* Disposable d2 = connectable.connect();
*
* ((ResettableConnectable)connectable).resetIf(d);
*
* assertFalse(d2.isDisposed());
* </code></pre>
* @param connection the disposable received from a previous {@code connect()} call.
*/
void resetIf(Disposable connection);
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ protected void subscribeActual(Subscriber<? super T> s) {
void cancel(RefConnection rc) {
SequentialDisposable sd;
synchronized (this) {
if (connection == null) {
if (connection == null || connection != rc) {
return;
}
long c = rc.subscriberCount - 1;
Expand All @@ -116,13 +116,17 @@ void cancel(RefConnection rc) {

void terminated(RefConnection rc) {
synchronized (this) {
if (connection != null) {
if (connection != null && connection == rc) {
connection = null;
if (rc.timer != null) {
rc.timer.dispose();
}
}
if (--rc.subscriberCount == 0) {
if (source instanceof Disposable) {
((Disposable)source).dispose();
} else if (source instanceof ResettableConnectable) {
((ResettableConnectable)source).resetIf(rc.get());
}
}
}
Expand All @@ -132,9 +136,12 @@ void timeout(RefConnection rc) {
synchronized (this) {
if (rc.subscriberCount == 0 && rc == connection) {
connection = null;
Disposable connectionObject = rc.get();
DisposableHelper.dispose(rc);
if (source instanceof Disposable) {
((Disposable)source).dispose();
} else if (source instanceof ResettableConnectable) {
((ResettableConnectable)source).resetIf(connectionObject);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.reactivex.exceptions.Exceptions;
import io.reactivex.flowables.ConnectableFlowable;
import io.reactivex.functions.*;
import io.reactivex.internal.disposables.ResettableConnectable;
import io.reactivex.internal.functions.ObjectHelper;
import io.reactivex.internal.fuseable.HasUpstreamPublisher;
import io.reactivex.internal.subscribers.SubscriberResourceWrapper;
Expand All @@ -32,7 +33,7 @@
import io.reactivex.plugins.RxJavaPlugins;
import io.reactivex.schedulers.Timed;

public final class FlowableReplay<T> extends ConnectableFlowable<T> implements HasUpstreamPublisher<T>, Disposable {
public final class FlowableReplay<T> extends ConnectableFlowable<T> implements HasUpstreamPublisher<T>, ResettableConnectable {
/** The source observable. */
final Flowable<T> source;
/** Holds the current subscriber that is, will be or just was subscribed to the source observable. */
Expand Down Expand Up @@ -161,15 +162,10 @@ protected void subscribeActual(Subscriber<? super T> s) {
onSubscribe.subscribe(s);
}

@SuppressWarnings({ "unchecked", "rawtypes" })
@Override
public void dispose() {
current.lazySet(null);
}

@Override
public boolean isDisposed() {
Disposable d = current.get();
return d == null || d.isDisposed();
public void resetIf(Disposable connectionObject) {
current.compareAndSet((ReplaySubscriber)connectionObject, null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ protected void subscribeActual(Observer<? super T> observer) {
void cancel(RefConnection rc) {
SequentialDisposable sd;
synchronized (this) {
if (connection == null) {
if (connection == null || connection != rc) {
return;
}
long c = rc.subscriberCount - 1;
Expand All @@ -113,13 +113,17 @@ void cancel(RefConnection rc) {

void terminated(RefConnection rc) {
synchronized (this) {
if (connection != null) {
if (connection != null && connection == rc) {
connection = null;
if (rc.timer != null) {
rc.timer.dispose();
}
}
if (--rc.subscriberCount == 0) {
if (source instanceof Disposable) {
((Disposable)source).dispose();
} else if (source instanceof ResettableConnectable) {
((ResettableConnectable)source).resetIf(rc.get());
}
}
}
Expand All @@ -129,9 +133,12 @@ void timeout(RefConnection rc) {
synchronized (this) {
if (rc.subscriberCount == 0 && rc == connection) {
connection = null;
Disposable connectionObject = rc.get();
DisposableHelper.dispose(rc);
if (source instanceof Disposable) {
((Disposable)source).dispose();
} else if (source instanceof ResettableConnectable) {
((ResettableConnectable)source).resetIf(connectionObject);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import io.reactivex.plugins.RxJavaPlugins;
import io.reactivex.schedulers.Timed;

public final class ObservableReplay<T> extends ConnectableObservable<T> implements HasUpstreamObservableSource<T>, Disposable {
public final class ObservableReplay<T> extends ConnectableObservable<T> implements HasUpstreamObservableSource<T>, ResettableConnectable {
/** The source observable. */
final ObservableSource<T> source;
/** Holds the current subscriber that is, will be or just was subscribed to the source observable. */
Expand Down Expand Up @@ -159,15 +159,10 @@ public ObservableSource<T> source() {
return source;
}

@SuppressWarnings({ "unchecked", "rawtypes" })
@Override
public void dispose() {
current.lazySet(null);
}

@Override
public boolean isDisposed() {
Disposable d = current.get();
return d == null || d.isDisposed();
public void resetIf(Disposable connectionObject) {
current.compareAndSet((ReplayObserver)connectionObject, null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -788,15 +788,17 @@ public void replayIsUnsubscribed() {
ConnectableFlowable<Integer> cf = Flowable.just(1)
.replay();

assertTrue(((Disposable)cf).isDisposed());
if (cf instanceof Disposable) {
assertTrue(((Disposable)cf).isDisposed());

Disposable connection = cf.connect();
Disposable connection = cf.connect();

assertFalse(((Disposable)cf).isDisposed());
assertFalse(((Disposable)cf).isDisposed());

connection.dispose();
connection.dispose();

assertTrue(((Disposable)cf).isDisposed());
assertTrue(((Disposable)cf).isDisposed());
}
}

static final class BadFlowableSubscribe extends ConnectableFlowable<Object> {
Expand Down Expand Up @@ -1325,5 +1327,71 @@ public void cancelTerminateStateExclusion() {
rc.connected = true;
o.connection = rc;
o.cancel(rc);

o.connection = rc;
o.cancel(new RefConnection(o));
}

@Test
public void replayRefCountShallBeThreadSafe() {
for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) {
Flowable<Integer> flowable = Flowable.just(1).replay(1).refCount();

TestSubscriber<Integer> ts1 = flowable
.subscribeOn(Schedulers.io())
.test();

TestSubscriber<Integer> ts2 = flowable
.subscribeOn(Schedulers.io())
.test();

ts1
.withTag("" + i)
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(1);

ts2
.withTag("" + i)
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(1);
}
}

static final class TestConnectableFlowable<T> extends ConnectableFlowable<T>
implements Disposable {

volatile boolean disposed;

@Override
public void dispose() {
disposed = true;
}

@Override
public boolean isDisposed() {
return disposed;
}

@Override
public void connect(Consumer<? super Disposable> connection) {
// not relevant
}

@Override
protected void subscribeActual(Subscriber<? super T> subscriber) {
// not relevant
}
}

@Test
public void timeoutDisposesSource() {
FlowableRefCount<Object> o = (FlowableRefCount<Object>)new TestConnectableFlowable<Object>().refCount();

RefConnection rc = new RefConnection(o);
o.connection = rc;

o.timeout(rc);

assertTrue(((Disposable)o.source).isDisposed());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -765,15 +765,17 @@ public void replayIsUnsubscribed() {
ConnectableObservable<Integer> co = Observable.just(1).concatWith(Observable.<Integer>never())
.replay();

assertTrue(((Disposable)co).isDisposed());
if (co instanceof Disposable) {
assertTrue(((Disposable)co).isDisposed());

Disposable connection = co.connect();
Disposable connection = co.connect();

assertFalse(((Disposable)co).isDisposed());
assertFalse(((Disposable)co).isDisposed());

connection.dispose();
connection.dispose();

assertTrue(((Disposable)co).isDisposed());
assertTrue(((Disposable)co).isDisposed());
}
}

static final class BadObservableSubscribe extends ConnectableObservable<Object> {
Expand Down Expand Up @@ -1239,6 +1241,8 @@ public void cancelTerminateStateExclusion() {

o.cancel(null);

o.cancel(new RefConnection(o));

RefConnection rc = new RefConnection(o);
o.connection = null;
rc.subscriberCount = 0;
Expand Down Expand Up @@ -1274,5 +1278,71 @@ public void cancelTerminateStateExclusion() {
rc.connected = true;
o.connection = rc;
o.cancel(rc);

o.connection = rc;
o.cancel(new RefConnection(o));
}

@Test
public void replayRefCountShallBeThreadSafe() {
for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) {
Observable<Integer> observable = Observable.just(1).replay(1).refCount();

TestObserver<Integer> observer1 = observable
.subscribeOn(Schedulers.io())
.test();

TestObserver<Integer> observer2 = observable
.subscribeOn(Schedulers.io())
.test();

observer1
.withTag("" + i)
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(1);

observer2
.withTag("" + i)
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(1);
}
}

static final class TestConnectableObservable<T> extends ConnectableObservable<T>
implements Disposable {

volatile boolean disposed;

@Override
public void dispose() {
disposed = true;
}

@Override
public boolean isDisposed() {
return disposed;
}

@Override
public void connect(Consumer<? super Disposable> connection) {
// not relevant
}

@Override
protected void subscribeActual(Observer<? super T> observer) {
// not relevant
}
}

@Test
public void timeoutDisposesSource() {
ObservableRefCount<Object> o = (ObservableRefCount<Object>)new TestConnectableObservable<Object>().refCount();

RefConnection rc = new RefConnection(o);
o.connection = rc;

o.timeout(rc);

assertTrue(((Disposable)o.source).isDisposed());
}
}

0 comments on commit c7d91c6

Please sign in to comment.