Skip to content

Commit

Permalink
2.x: more detailed no-multi-subscribe with std consumers error message (
Browse files Browse the repository at this point in the history
#5301)

* 2.x: more detailed no-multi-subscribe with std consumers error message

* Improve coverage of the new content
  • Loading branch information
akarnokd authored Apr 20, 2017
1 parent bab3071 commit b17bc49
Show file tree
Hide file tree
Showing 26 changed files with 830 additions and 34 deletions.
150 changes: 150 additions & 0 deletions src/main/java/io/reactivex/internal/util/EndConsumerHelper.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
/**
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.internal.util;

import java.util.concurrent.atomic.*;

import org.reactivestreams.Subscription;

import io.reactivex.disposables.Disposable;
import io.reactivex.exceptions.ProtocolViolationException;
import io.reactivex.internal.disposables.DisposableHelper;
import io.reactivex.internal.functions.ObjectHelper;
import io.reactivex.internal.subscriptions.SubscriptionHelper;
import io.reactivex.plugins.RxJavaPlugins;

/**
* Utility class to help report multiple subscriptions with the same
* consumer type instead of the internal "Disposable already set!" message
* that is practically reserved for internal operators and indicate bugs in them.
*/
public final class EndConsumerHelper {

/**
* Utility class.
*/
private EndConsumerHelper() {
throw new IllegalStateException("No instances!");
}

/**
* Ensures that the upstream Disposable is null and returns true, otherwise
* disposes the next Disposable and if the upstream is not the shared
* disposed instance, reports a ProtocolViolationException due to
* multiple subscribe attempts.
* @param upstream the upstream current value
* @param next the Disposable to check for nullness and dispose if necessary
* @param observer the class of the consumer to have a personalized
* error message if the upstream already contains a non-cancelled Disposable.
* @return true if successful, false if the upstream was non null
*/
public static boolean validate(Disposable upstream, Disposable next, Class<?> observer) {
ObjectHelper.requireNonNull(next, "next is null");
if (upstream != null) {
next.dispose();
if (upstream != DisposableHelper.DISPOSED) {
reportDoubleSubscription(observer);
}
return false;
}
return true;
}

/**
* Atomically updates the target upstream AtomicReference from null to the non-null
* next Disposable, otherwise disposes next and reports a ProtocolViolationException
* if the AtomicReference doesn't contain the shared disposed indicator.
* @param upstream the target AtomicReference to update
* @param next the Disposable to set on it atomically
* @param observer the class of the consumer to have a personalized
* error message if the upstream already contains a non-cancelled Disposable.
* @return true if successful, false if the content of the AtomicReference was non null
*/
public static boolean setOnce(AtomicReference<Disposable> upstream, Disposable next, Class<?> observer) {
ObjectHelper.requireNonNull(next, "next is null");
if (!upstream.compareAndSet(null, next)) {
next.dispose();
if (upstream.get() != DisposableHelper.DISPOSED) {
reportDoubleSubscription(observer);
}
return false;
}
return true;
}

/**
* Ensures that the upstream Subscription is null and returns true, otherwise
* cancels the next Subscription and if the upstream is not the shared
* cancelled instance, reports a ProtocolViolationException due to
* multiple subscribe attempts.
* @param upstream the upstream current value
* @param next the Subscription to check for nullness and cancel if necessary
* @param subscriber the class of the consumer to have a personalized
* error message if the upstream already contains a non-cancelled Subscription.
* @return true if successful, false if the upstream was non null
*/
public static boolean validate(Subscription upstream, Subscription next, Class<?> subscriber) {
ObjectHelper.requireNonNull(next, "next is null");
if (upstream != null) {
next.cancel();
if (upstream != SubscriptionHelper.CANCELLED) {
reportDoubleSubscription(subscriber);
}
return false;
}
return true;
}

/**
* Atomically updates the target upstream AtomicReference from null to the non-null
* next Subscription, otherwise cancels next and reports a ProtocolViolationException
* if the AtomicReference doesn't contain the shared cancelled indicator.
* @param upstream the target AtomicReference to update
* @param next the Subscription to set on it atomically
* @param subscriber the class of the consumer to have a personalized
* error message if the upstream already contains a non-cancelled Subscription.
* @return true if successful, false if the content of the AtomicReference was non null
*/
public static boolean setOnce(AtomicReference<Subscription> upstream, Subscription next, Class<?> subscriber) {
ObjectHelper.requireNonNull(next, "next is null");
if (!upstream.compareAndSet(null, next)) {
next.cancel();
if (upstream.get() != SubscriptionHelper.CANCELLED) {
reportDoubleSubscription(subscriber);
}
return false;
}
return true;
}

/**
* Builds the error message with the consumer class.
* @param consumer the class of the consumer
* @return the error message string
*/
public static String composeMessage(String consumer) {
return "It is not allowed to subscribe with a(n) " + consumer + " multiple times. "
+ "Please create a fresh instance of " + consumer + " and subscribe that to the target source instead.";
}

/**
* Report a ProtocolViolationException with a personalized message referencing
* the simple type name of the consumer class and report it via
* RxJavaPlugins.onError.
* @param consumer the class of the consumer
*/
public static void reportDoubleSubscription(Class<?> consumer) {
RxJavaPlugins.onError(new ProtocolViolationException(composeMessage(consumer.getName())));
}
}
5 changes: 3 additions & 2 deletions src/main/java/io/reactivex/observers/DefaultObserver.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.reactivex.annotations.NonNull;
import io.reactivex.disposables.Disposable;
import io.reactivex.internal.disposables.DisposableHelper;
import io.reactivex.internal.util.EndConsumerHelper;

/**
* Abstract base implementation of an {@link io.reactivex.Observer Observer} with support for cancelling a
Expand All @@ -30,7 +31,7 @@
*
* <p>Like all other consumers, {@code DefaultObserver} can be subscribed only once.
* Any subsequent attempt to subscribe it to a new source will yield an
* {@link IllegalStateException} with message {@code "Disposable already set!"}.
* {@link IllegalStateException} with message {@code "It is not allowed to subscribe with a(n) <class name> multiple times."}.
*
* <p>Implementation of {@link #onStart()}, {@link #onNext(Object)}, {@link #onError(Throwable)}
* and {@link #onComplete()} are not allowed to throw any unchecked exceptions.
Expand Down Expand Up @@ -67,7 +68,7 @@ public abstract class DefaultObserver<T> implements Observer<T> {
private Disposable s;
@Override
public final void onSubscribe(@NonNull Disposable s) {
if (DisposableHelper.validate(this.s, s)) {
if (EndConsumerHelper.validate(this.s, s, getClass())) {
this.s = s;
onStart();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.reactivex.annotations.NonNull;
import io.reactivex.disposables.Disposable;
import io.reactivex.internal.disposables.DisposableHelper;
import io.reactivex.internal.util.EndConsumerHelper;

/**
* An abstract {@link CompletableObserver} that allows asynchronous cancellation by implementing Disposable.
Expand All @@ -27,7 +28,7 @@
*
* <p>Like all other consumers, {@code DisposableCompletableObserver} can be subscribed only once.
* Any subsequent attempt to subscribe it to a new source will yield an
* {@link IllegalStateException} with message {@code "Disposable already set!"}.
* {@link IllegalStateException} with message {@code "It is not allowed to subscribe with a(n) <class name> multiple times."}.
*
* <p>Implementation of {@link #onStart()}, {@link #onError(Throwable)} and
* {@link #onComplete()} are not allowed to throw any unchecked exceptions.
Expand Down Expand Up @@ -55,7 +56,7 @@ public abstract class DisposableCompletableObserver implements CompletableObserv

@Override
public final void onSubscribe(@NonNull Disposable s) {
if (DisposableHelper.setOnce(this.s, s)) {
if (EndConsumerHelper.setOnce(this.s, s, getClass())) {
onStart();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.reactivex.annotations.NonNull;
import io.reactivex.disposables.Disposable;
import io.reactivex.internal.disposables.DisposableHelper;
import io.reactivex.internal.util.EndConsumerHelper;

/**
* An abstract {@link MaybeObserver} that allows asynchronous cancellation by implementing Disposable.
Expand All @@ -31,7 +32,7 @@
*
* <p>Like all other consumers, {@code DisposableMaybeObserver} can be subscribed only once.
* Any subsequent attempt to subscribe it to a new source will yield an
* {@link IllegalStateException} with message {@code "Disposable already set!"}.
* {@link IllegalStateException} with message {@code "It is not allowed to subscribe with a(n) <class name> multiple times."}.
*
* <p>Implementation of {@link #onStart()}, {@link #onSuccess(Object)}, {@link #onError(Throwable)} and
* {@link #onComplete()} are not allowed to throw any unchecked exceptions.
Expand Down Expand Up @@ -66,7 +67,7 @@ public abstract class DisposableMaybeObserver<T> implements MaybeObserver<T>, Di

@Override
public final void onSubscribe(@NonNull Disposable s) {
if (DisposableHelper.setOnce(this.s, s)) {
if (EndConsumerHelper.setOnce(this.s, s, getClass())) {
onStart();
}
}
Expand Down
5 changes: 3 additions & 2 deletions src/main/java/io/reactivex/observers/DisposableObserver.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.reactivex.annotations.NonNull;
import io.reactivex.disposables.Disposable;
import io.reactivex.internal.disposables.DisposableHelper;
import io.reactivex.internal.util.EndConsumerHelper;

/**
* An abstract {@link Observer} that allows asynchronous cancellation by implementing Disposable.
Expand All @@ -30,7 +31,7 @@
*
* <p>Like all other consumers, {@code DisposableObserver} can be subscribed only once.
* Any subsequent attempt to subscribe it to a new source will yield an
* {@link IllegalStateException} with message {@code "Disposable already set!"}.
* {@link IllegalStateException} with message {@code "It is not allowed to subscribe with a(n) <class name> multiple times."}.
*
* <p>Implementation of {@link #onStart()}, {@link #onNext(Object)}, {@link #onError(Throwable)}
* and {@link #onComplete()} are not allowed to throw any unchecked exceptions.
Expand Down Expand Up @@ -69,7 +70,7 @@ public abstract class DisposableObserver<T> implements Observer<T>, Disposable {

@Override
public final void onSubscribe(@NonNull Disposable s) {
if (DisposableHelper.setOnce(this.s, s)) {
if (EndConsumerHelper.setOnce(this.s, s, getClass())) {
onStart();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.reactivex.annotations.NonNull;
import io.reactivex.disposables.Disposable;
import io.reactivex.internal.disposables.DisposableHelper;
import io.reactivex.internal.util.EndConsumerHelper;

/**
* An abstract {@link SingleObserver} that allows asynchronous cancellation by implementing Disposable.
Expand All @@ -27,7 +28,7 @@
*
* <p>Like all other consumers, {@code DisposableSingleObserver} can be subscribed only once.
* Any subsequent attempt to subscribe it to a new source will yield an
* {@link IllegalStateException} with message {@code "Disposable already set!"}.
* {@link IllegalStateException} with message {@code "It is not allowed to subscribe with a(n) <class name> multiple times."}.
*
* <p>Implementation of {@link #onStart()}, {@link #onSuccess(Object)} and {@link #onError(Throwable)}
* are not allowed to throw any unchecked exceptions.
Expand Down Expand Up @@ -58,7 +59,7 @@ public abstract class DisposableSingleObserver<T> implements SingleObserver<T>,

@Override
public final void onSubscribe(@NonNull Disposable s) {
if (DisposableHelper.setOnce(this.s, s)) {
if (EndConsumerHelper.setOnce(this.s, s, getClass())) {
onStart();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.reactivex.disposables.Disposable;
import io.reactivex.internal.disposables.*;
import io.reactivex.internal.functions.ObjectHelper;
import io.reactivex.internal.util.EndConsumerHelper;

/**
* An abstract {@link CompletableObserver} that allows asynchronous cancellation of its subscription and associated resources.
Expand All @@ -44,7 +45,7 @@
*
* <p>Like all other consumers, {@code ResourceCompletableObserver} can be subscribed only once.
* Any subsequent attempt to subscribe it to a new source will yield an
* {@link IllegalStateException} with message {@code "Disposable already set!"}.
* {@link IllegalStateException} with message {@code "It is not allowed to subscribe with a(n) <class name> multiple times."}.
*
* <p>Implementation of {@link #onStart()}, {@link #onError(Throwable)}
* and {@link #onComplete()} are not allowed to throw any unchecked exceptions.
Expand Down Expand Up @@ -92,7 +93,7 @@ public final void add(@NonNull Disposable resource) {

@Override
public final void onSubscribe(@NonNull Disposable s) {
if (DisposableHelper.setOnce(this.s, s)) {
if (EndConsumerHelper.setOnce(this.s, s, getClass())) {
onStart();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.reactivex.disposables.Disposable;
import io.reactivex.internal.disposables.*;
import io.reactivex.internal.functions.ObjectHelper;
import io.reactivex.internal.util.EndConsumerHelper;

/**
* An abstract {@link MaybeObserver} that allows asynchronous cancellation of its subscription and associated resources.
Expand Down Expand Up @@ -48,7 +49,7 @@
*
* <p>Like all other consumers, {@code ResourceMaybeObserver} can be subscribed only once.
* Any subsequent attempt to subscribe it to a new source will yield an
* {@link IllegalStateException} with message {@code "Disposable already set!"}.
* {@link IllegalStateException} with message {@code "It is not allowed to subscribe with a(n) <class name> multiple times."}.
*
* <p>Implementation of {@link #onStart()}, {@link #onSuccess(Object)}, {@link #onError(Throwable)}
* and {@link #onComplete()} are not allowed to throw any unchecked exceptions.
Expand Down Expand Up @@ -102,7 +103,7 @@ public final void add(@NonNull Disposable resource) {

@Override
public final void onSubscribe(@NonNull Disposable s) {
if (DisposableHelper.setOnce(this.s, s)) {
if (EndConsumerHelper.setOnce(this.s, s, getClass())) {
onStart();
}
}
Expand Down
5 changes: 3 additions & 2 deletions src/main/java/io/reactivex/observers/ResourceObserver.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.reactivex.disposables.Disposable;
import io.reactivex.internal.disposables.*;
import io.reactivex.internal.functions.ObjectHelper;
import io.reactivex.internal.util.EndConsumerHelper;

/**
* An abstract {@link Observer} that allows asynchronous cancellation of its subscription and associated resources.
Expand All @@ -41,7 +42,7 @@
*
* <p>Like all other consumers, {@code ResourceObserver} can be subscribed only once.
* Any subsequent attempt to subscribe it to a new source will yield an
* {@link IllegalStateException} with message {@code "Disposable already set!"}.
* {@link IllegalStateException} with message {@code "It is not allowed to subscribe with a(n) <class name> multiple times."}.
*
* <p>Implementation of {@link #onStart()}, {@link #onNext(Object)}, {@link #onError(Throwable)}
* and {@link #onComplete()} are not allowed to throw any unchecked exceptions.
Expand Down Expand Up @@ -100,7 +101,7 @@ public final void add(@NonNull Disposable resource) {

@Override
public final void onSubscribe(Disposable s) {
if (DisposableHelper.setOnce(this.s, s)) {
if (EndConsumerHelper.setOnce(this.s, s, getClass())) {
onStart();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.reactivex.disposables.Disposable;
import io.reactivex.internal.disposables.*;
import io.reactivex.internal.functions.ObjectHelper;
import io.reactivex.internal.util.EndConsumerHelper;

/**
* An abstract {@link SingleObserver} that allows asynchronous cancellation of its subscription
Expand All @@ -45,7 +46,7 @@
*
* <p>Like all other consumers, {@code ResourceSingleObserver} can be subscribed only once.
* Any subsequent attempt to subscribe it to a new source will yield an
* {@link IllegalStateException} with message {@code "Disposable already set!"}.
* {@link IllegalStateException} with message {@code "It is not allowed to subscribe with a(n) <class name> multiple times."}.
*
* <p>Implementation of {@link #onStart()}, {@link #onSuccess(Object)} and {@link #onError(Throwable)}
* are not allowed to throw any unchecked exceptions.
Expand Down Expand Up @@ -95,7 +96,7 @@ public final void add(@NonNull Disposable resource) {

@Override
public final void onSubscribe(@NonNull Disposable s) {
if (DisposableHelper.setOnce(this.s, s)) {
if (EndConsumerHelper.setOnce(this.s, s, getClass())) {
onStart();
}
}
Expand Down
Loading

0 comments on commit b17bc49

Please sign in to comment.