Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement LambdaConsumerIntrospection #5590

Merged
merged 10 commits into from
Sep 12, 2017
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@
import io.reactivex.exceptions.*;
import io.reactivex.functions.*;
import io.reactivex.internal.disposables.DisposableHelper;
import io.reactivex.observers.HasDefaultErrorConsumer;
import io.reactivex.plugins.RxJavaPlugins;

public final class CallbackCompletableObserver
extends AtomicReference<Disposable> implements CompletableObserver, Disposable, Consumer<Throwable> {
extends AtomicReference<Disposable>
implements CompletableObserver, Disposable, Consumer<Throwable>, HasDefaultErrorConsumer {


private static final long serialVersionUID = -4361286194466301354L;
Expand Down Expand Up @@ -82,4 +84,9 @@ public void dispose() {
public boolean isDisposed() {
return get() == DisposableHelper.DISPOSED;
}

@Override
public boolean hasMissingErrorConsumer() {
return onError == this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@
import io.reactivex.exceptions.*;
import io.reactivex.functions.Consumer;
import io.reactivex.internal.disposables.*;
import io.reactivex.internal.functions.Functions;
import io.reactivex.observers.HasDefaultErrorConsumer;
import io.reactivex.plugins.RxJavaPlugins;

public final class ConsumerSingleObserver<T>
extends AtomicReference<Disposable>
implements SingleObserver<T>, Disposable {
implements SingleObserver<T>, Disposable, HasDefaultErrorConsumer {


private static final long serialVersionUID = -7012088219455310787L;
Expand Down Expand Up @@ -74,4 +76,9 @@ public void dispose() {
public boolean isDisposed() {
return get() == DisposableHelper.DISPOSED;
}

@Override
public boolean hasMissingErrorConsumer() {
return onError == Functions.ON_ERROR_MISSING;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@
import io.reactivex.disposables.Disposable;
import io.reactivex.exceptions.OnErrorNotImplementedException;
import io.reactivex.internal.disposables.DisposableHelper;
import io.reactivex.observers.HasDefaultErrorConsumer;
import io.reactivex.plugins.RxJavaPlugins;

public final class EmptyCompletableObserver
extends AtomicReference<Disposable>
implements CompletableObserver, Disposable {
implements CompletableObserver, Disposable, HasDefaultErrorConsumer {


private static final long serialVersionUID = -7545121636549663526L;
Expand Down Expand Up @@ -55,4 +56,8 @@ public void onSubscribe(Disposable d) {
DisposableHelper.setOnce(this, d);
}

@Override
public boolean hasMissingErrorConsumer() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@
import io.reactivex.exceptions.*;
import io.reactivex.functions.*;
import io.reactivex.internal.disposables.DisposableHelper;
import io.reactivex.internal.functions.Functions;
import io.reactivex.observers.HasDefaultErrorConsumer;
import io.reactivex.plugins.RxJavaPlugins;

public final class LambdaObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
public final class LambdaObserver<T> extends AtomicReference<Disposable>
implements Observer<T>, Disposable, HasDefaultErrorConsumer {

private static final long serialVersionUID = -7251123623727029452L;
final Consumer<? super T> onNext;
Expand Down Expand Up @@ -101,4 +104,9 @@ public void dispose() {
public boolean isDisposed() {
return get() == DisposableHelper.DISPOSED;
}

@Override
public boolean hasMissingErrorConsumer() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A test would be great.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return onError == Functions.ON_ERROR_MISSING;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import io.reactivex.exceptions.*;
import io.reactivex.functions.*;
import io.reactivex.internal.disposables.DisposableHelper;
import io.reactivex.internal.functions.Functions;
import io.reactivex.observers.HasDefaultErrorConsumer;
import io.reactivex.plugins.RxJavaPlugins;

/**
Expand All @@ -29,7 +31,7 @@
*/
public final class MaybeCallbackObserver<T>
extends AtomicReference<Disposable>
implements MaybeObserver<T>, Disposable {
implements MaybeObserver<T>, Disposable, HasDefaultErrorConsumer {


private static final long serialVersionUID = -6076952298809384986L;
Expand Down Expand Up @@ -96,5 +98,8 @@ public void onComplete() {
}
}


@Override
public boolean hasMissingErrorConsumer() {
return onError == Functions.ON_ERROR_MISSING;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

import java.util.concurrent.atomic.AtomicReference;

import io.reactivex.internal.functions.Functions;
import io.reactivex.observers.HasDefaultErrorConsumer;
import org.reactivestreams.Subscription;

import io.reactivex.FlowableSubscriber;
Expand All @@ -24,7 +26,8 @@
import io.reactivex.internal.subscriptions.SubscriptionHelper;
import io.reactivex.plugins.RxJavaPlugins;

public final class LambdaSubscriber<T> extends AtomicReference<Subscription> implements FlowableSubscriber<T>, Subscription, Disposable {
public final class LambdaSubscriber<T> extends AtomicReference<Subscription>
implements FlowableSubscriber<T>, Subscription, Disposable, HasDefaultErrorConsumer {

private static final long serialVersionUID = -7251123623727029452L;
final Consumer<? super T> onNext;
Expand Down Expand Up @@ -115,4 +118,9 @@ public void request(long n) {
public void cancel() {
SubscriptionHelper.cancel(this);
}

@Override
public boolean hasMissingErrorConsumer() {
return onError == Functions.ON_ERROR_MISSING;
}
}
20 changes: 20 additions & 0 deletions src/main/java/io/reactivex/observers/HasDefaultErrorConsumer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package io.reactivex.observers;

import io.reactivex.annotations.Experimental;

/**
* An interface that indicates that the implementing type has default implementations for error consumption.
*
* @since 2.1.4 - experimental
*/
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@since 2.1.4 - experimental

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Experimental
public interface HasDefaultErrorConsumer {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should find a better name for this interface for several reasons:

  1. HasDefaultErrorConsumer sounds like marker interface that you need to use with instanceof to get boolean result.
  2. Interface HasDefaultErrorConsumer and method name hasMissingErrorConsumer() are inconsistent.
  3. Extensibility — do we plan to extend it with other functions to check onNext/onComplete implementations?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd definitely get rid of "default" and "missing" from both. The negated method is really confusing. There's already Has* types which do similar things so I don't agree that it needs changed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, last sentence dropped from my comment for some reason.

Depending on answer to №3 I have following interface names on my mind ObserverIntrospection/SubscriberIntrospection or ErrorConsumerIntrospection, hope someone has better suggestions :)

The negated method is really confusing.

Huh, indeed! Didn't notice that until you pointed

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I let you people work out the naming.

Copy link
Contributor Author

@ZacSweers ZacSweers Sep 8, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for the method, how about just a simple isNotImplemented() or isThrowing()?

Assuming the interface itself gets named HasErrorConsumer?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

People will most likely mistreat CompositeObserver and think of it as of a combination of observers because that's how CompositeDisposable and CompositeException work.

ObserverInstrumentation/ObserverIntrospection? Such a name also more clearly delivers the intention of consume-only public API

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Composite is misleading and will show up in content assist when the developer types Composite

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to get to the disposable/exception variant mentioned.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd call it LambdaConsumerIntrospection and hasCustomOnError()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good points. The Introspection moniker feels off to me, mostly because I consider introspection an act and don't necessarily think of it as a noun/type at first. I don't know what a better word would be though. Updated in bae194c


/**
* @return {@code true} if the implementation is missing an error consumer and thus using a throwing default
* implementation. Returns {@code false} if a concrete error consumer implementation was supplied.
*/
@Experimental
boolean hasMissingErrorConsumer();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package io.reactivex.internal.observers;

import io.reactivex.internal.functions.Functions;
import org.junit.Test;

import static org.junit.Assert.*;

public final class CallbackCompletableObserverTest {

@Test
public void hasMissingErrorConsumer() {
CallbackCompletableObserver o = new CallbackCompletableObserver(Functions.EMPTY_ACTION);

assertTrue(o.hasMissingErrorConsumer());
}

@Test
public void isNotMissingErrorConsumer() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: doesNotHaveMissingErrorConsumer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made them clearer in 500df33

CallbackCompletableObserver o = new CallbackCompletableObserver(Functions.<Throwable>emptyConsumer(),
Functions.EMPTY_ACTION);

assertFalse(o.hasMissingErrorConsumer());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package io.reactivex.internal.observers;

import io.reactivex.internal.functions.Functions;
import org.junit.Test;

import static org.junit.Assert.*;

public final class ConsumerSingleObserverTest {

@Test
public void hasMissingErrorConsumer() {
ConsumerSingleObserver<Integer> o = new ConsumerSingleObserver<Integer>(Functions.<Integer>emptyConsumer(),
Functions.ON_ERROR_MISSING);

assertTrue(o.hasMissingErrorConsumer());
}

@Test
public void isNotMissingErrorConsumer() {
ConsumerSingleObserver<Integer> o = new ConsumerSingleObserver<Integer>(Functions.<Integer>emptyConsumer(),
Functions.<Throwable>emptyConsumer());

assertFalse(o.hasMissingErrorConsumer());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package io.reactivex.internal.observers;

import org.junit.Test;

import static org.junit.Assert.assertTrue;

public final class EmptyCompletableObserverTest {

@Test
public void hasMissingErrorConsumer() {
EmptyCompletableObserver o = new EmptyCompletableObserver();

assertTrue(o.hasMissingErrorConsumer());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import java.util.*;

import io.reactivex.internal.functions.Functions;
import org.junit.Test;

import io.reactivex.Observable;
Expand Down Expand Up @@ -342,4 +343,24 @@ public void accept(Disposable s) throws Exception {

assertTrue(errors.toString(), errors.get(0) instanceof TestException);
}

@Test
public void hasMissingErrorConsumer() {
LambdaObserver<Integer> o = new LambdaObserver<Integer>(Functions.<Integer>emptyConsumer(),
Functions.ON_ERROR_MISSING,
Functions.EMPTY_ACTION,
Functions.<Disposable>emptyConsumer());

assertTrue(o.hasMissingErrorConsumer());
}

@Test
public void isNotMissingErrorConsumer() {
LambdaObserver<Integer> o = new LambdaObserver<Integer>(Functions.<Integer>emptyConsumer(),
Functions.<Throwable>emptyConsumer(),
Functions.EMPTY_ACTION,
Functions.<Disposable>emptyConsumer());

assertFalse(o.hasMissingErrorConsumer());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,17 @@

package io.reactivex.internal.operators.maybe;

import static org.junit.Assert.*;

import java.util.List;

import org.junit.Test;

import io.reactivex.TestHelper;
import io.reactivex.disposables.*;
import io.reactivex.exceptions.*;
import io.reactivex.functions.*;
import io.reactivex.internal.functions.Functions;
import io.reactivex.plugins.RxJavaPlugins;
import org.junit.Test;

import java.util.List;

import static org.junit.Assert.*;

public class MaybeCallbackObserverTest {

Expand Down Expand Up @@ -121,4 +120,22 @@ public void run() throws Exception {
RxJavaPlugins.reset();
}
}

@Test
public void hasMissingErrorConsumer() {
MaybeCallbackObserver<Integer> o = new MaybeCallbackObserver<Integer>(Functions.<Integer>emptyConsumer(),
Functions.ON_ERROR_MISSING,
Functions.EMPTY_ACTION);

assertTrue(o.hasMissingErrorConsumer());
}

@Test
public void isNotMissingErrorConsumer() {
MaybeCallbackObserver<Integer> o = new MaybeCallbackObserver<Integer>(Functions.<Integer>emptyConsumer(),
Functions.<Throwable>emptyConsumer(),
Functions.EMPTY_ACTION);

assertFalse(o.hasMissingErrorConsumer());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,20 @@

package io.reactivex.internal.subscribers;

import static org.junit.Assert.*;

import java.util.*;

import org.junit.Test;
import org.reactivestreams.*;

import io.reactivex.*;
import io.reactivex.exceptions.*;
import io.reactivex.functions.*;
import io.reactivex.internal.functions.Functions;
import io.reactivex.internal.operators.flowable.FlowableInternalHelper;
import io.reactivex.internal.subscriptions.BooleanSubscription;
import io.reactivex.plugins.RxJavaPlugins;
import io.reactivex.processors.PublishProcessor;
import org.junit.Test;
import org.reactivestreams.*;

import java.util.*;

import static org.junit.Assert.*;

public class LambdaSubscriberTest {

Expand Down Expand Up @@ -347,4 +348,24 @@ public void accept(Subscription s) throws Exception {

assertTrue(errors.toString(), errors.get(0) instanceof TestException);
}

@Test
public void hasMissingErrorConsumer() {
LambdaSubscriber<Integer> o = new LambdaSubscriber<Integer>(Functions.<Integer>emptyConsumer(),
Functions.ON_ERROR_MISSING,
Functions.EMPTY_ACTION,
FlowableInternalHelper.RequestMax.INSTANCE);

assertTrue(o.hasMissingErrorConsumer());
}

@Test
public void isNotMissingErrorConsumer() {
LambdaSubscriber<Integer> o = new LambdaSubscriber<Integer>(Functions.<Integer>emptyConsumer(),
Functions.<Throwable>emptyConsumer(),
Functions.EMPTY_ACTION,
FlowableInternalHelper.RequestMax.INSTANCE);

assertFalse(o.hasMissingErrorConsumer());
}
}