Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OperatorMerge - RxRingBuffer throws IllegalStateException again #2604

Closed
davidmoten opened this issue Feb 5, 2015 · 15 comments
Closed

OperatorMerge - RxRingBuffer throws IllegalStateException again #2604

davidmoten opened this issue Feb 5, 2015 · 15 comments

Comments

@davidmoten
Copy link
Collaborator

With 1.0.5, I'm getting an IllegalStateException from OperatorMerge. This is repeatable for me but involves a big input data set being processed in chunks using flatMap/onSubscribe so I can't give you a quick test case yet. Hopefully just this description is enough otherwise I'll try and distill a test. I think it is happening close to completion.

java.lang.IllegalStateException: This instance has been unsubscribed and the queue is no longer usable.
        at rx.internal.util.RxRingBuffer.onNext(RxRingBuffer.java:346)
        at rx.internal.operators.OperatorMerge$InnerSubscriber.enqueue(OperatorMerge.java:721)
        at rx.internal.operators.OperatorMerge$InnerSubscriber.emit(OperatorMerge.java:698)
        at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:586)
        at rx.internal.operators.OperatorSubscribeOn$1$1$1.onNext(OperatorSubscribeOn.java:76)
        at rx.internal.operators.OperatorMerge$MergeSubscriber.handleScalarSynchronousObservableWithRequestLimits(OperatorMerge.java:280)
        at rx.internal.operators.OperatorMerge$MergeSubscriber.handleScalarSynchronousObservable(OperatorMerge.java:243)
        at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:176)
        at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:120)
        at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:55)
        at rx.internal.operators.OperatorMerge$MergeSubscriber.handleScalarSynchronousObservableWithRequestLimits(OperatorMerge.java:280)
        at rx.internal.operators.OperatorMerge$MergeSubscriber.handleScalarSynchronousObservable(OperatorMerge.java:243)
        at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:176)
        at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:120)
        at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:55)
        at rx.internal.operators.NotificationLite.accept(NotificationLite.java:150)
        at rx.internal.operators.OperatorMerge$MergeSubscriber.drainScalarValueQueue(OperatorMerge.java:396)
        at rx.internal.operators.OperatorMerge$MergeSubscriber.drainQueuesIfNeeded(OperatorMerge.java:343)
        at rx.internal.operators.OperatorMerge$MergeSubscriber.access$500(OperatorMerge.java:120)
        at rx.internal.operators.OperatorMerge$MergeProducer.request(OperatorMerge.java:549)
        at rx.Subscriber.request(Subscriber.java:130)
        at rx.internal.operators.OperatorMerge$MergeSubscriber.drainQueuesIfNeeded(OperatorMerge.java:350)
        at rx.internal.operators.OperatorMerge$MergeSubscriber.access$500(OperatorMerge.java:120)
        at rx.internal.operators.OperatorMerge$MergeProducer.request(OperatorMerge.java:549)
        at rx.Subscriber.request(Subscriber.java:130)
        at rx.internal.operators.OperatorMerge$MergeSubscriber.drainQueuesIfNeeded(OperatorMerge.java:350)
        at rx.internal.operators.OperatorMerge$MergeSubscriber.access$500(OperatorMerge.java:120)
        at rx.internal.operators.OperatorMerge$MergeProducer.request(OperatorMerge.java:549)
        at rx.internal.operators.OperatorSubscribeOn$1$1$1$1$1.call(OperatorSubscribeOn.java:94)
        at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:47)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: rx.exceptions.OnErrorThrowable$OnNextValue: OnError while emitting onNext value: au.gov.amsa.ais.Timestamped.class
        at rx.exceptions.OnErrorThrowable.addValueAsLastCause(OnErrorThrowable.java:98)
        at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:58)
        ... 27 more
@benjchristensen
Copy link
Member

benjchristensen commented Feb 5, 2015 via email

@davidmoten davidmoten changed the title OperatorMerge - RxRingBuffer throws IllegalStateException when onError occurs OperatorMerge - RxRingBuffer throws IllegalStateException again Feb 5, 2015
@davidmoten
Copy link
Collaborator Author

yes 1.0.5

@davidmoten
Copy link
Collaborator Author

I've updated this issue. No error is occuring in upstream. The only error occuring is the IllegalStateException.

@benjchristensen
Copy link
Member

sigh I had hoped we'd fixed this issue in 1.0.5.

@davidmoten
Copy link
Collaborator Author

not sure what fixes went through for that. i see that #1963 was closed but didn't notice record of the resolution (like a PR number).

@davidmoten
Copy link
Collaborator Author

In #2584 @akarnokd mentions that there is still a small window where this can happen. I imagine he'll enlighten us.

@akarnokd
Copy link
Member

akarnokd commented Feb 5, 2015

The only fix would be not to throw but just exit. We can't distinguish between an upstream ignoring unsubscription and an upstream pushing just after an unsubscription happened.

@davidmoten
Copy link
Collaborator Author

So in my case, I wouldn't expect what you describe because the operators should be consuming all of the input stream. None of the observables should be interrupted by unsubscription before completion. Do you have an alternative theory?

@akarnokd
Copy link
Member

akarnokd commented Feb 5, 2015

Something is unsubscribing because otherwise the queue wouldn't be null.

@davidmoten
Copy link
Collaborator Author

righto, I'll have a close look tomorrow

@davidmoten
Copy link
Collaborator Author

Got it. I think its an OperatorMerge bug. The unsubscribe is originating from upstream due to the use of using. Here is a failing unit test for your debugging pleasure.

    @Test(timeout = 3000)
    public void testMergeEagerUnsubscribeFromUpstream() {
        Func0<Object> resourceFactory = new Func0<Object>() {
            @Override
            public Object call() {
                return new Object();
            }
        };
        Func1<Object, Observable<Integer>> observableFactory = new Func1<Object, Observable<Integer>>() {
            @Override
            public Observable<Integer> call(Object o) {
                return Observable.range(1, 1000000);
            }
        };
        Action1<Object> disposeAction = new Action1<Object>() {
            @Override
            public void call(Object t) {
                // do nothing
            }
        };
        Observable.using(resourceFactory, observableFactory, disposeAction)
                .toList()
                .flatMap(new Func1<List<Integer>, Observable<Integer>>() {
                    @Override
                    public Observable<Integer> call(List<Integer> list) {
                        return Observable.from(list).subscribeOn(
                                Schedulers.computation());
                    }
                }).count().toBlocking().single();
    }

@akarnokd
Copy link
Member

akarnokd commented Feb 6, 2015

Thanks. This is a nasty one. using does subscribe() on the source which then wraps the downstream into a SafeSubscriber which then cancels all work stared by merge immediately after toList sends onCompleted(). So either using should do unsafeSubscribe or toList() should break the chaining of subscribers via the Subscriber constructor.
This may affect other operators that do something async after an onError and onCompleted.

return new Subscriber<T>(child) { .. }

should become something like this:

class InnerSubscriber<T> extends Subscriber<T> {
    void requestMore(long n) {
        request(n);
    }
    // ...
}
InnerSubscriber<T> s = new InnerSubscriber<T>();
child.setProducer(s::requestMore);
child.add(s);
return s;

@davidmoten
Copy link
Collaborator Author

unsafeSubscribe in using sounds alright to me. Any reasons preventing us doing that?

@davidmoten
Copy link
Collaborator Author

I've rewritten OnSubscribeUsing for my applications use so this problem doesn't happen. I use unsafeSubscribe and optional dispose on termination event (which I always set to true) in addition to unsubscription dispose though there is protection to ensure that disposal only occurs once. This sorts the problem for me. If people are happy with this direction as opposed to handling things downstream instead I'll put in a PR (and write the as yet unwritten unit tests).

public final class OnSubscribeUsing2<T, Resource> implements OnSubscribe<T> {

    private final Func0<Resource> resourceFactory;
    private final Func1<? super Resource, ? extends Observable<? extends T>> observableFactory;
    private final Action1<? super Resource> dispose;
    private final boolean disposeEagerly;

    public OnSubscribeUsing2(
            Func0<Resource> resourceFactory,
            Func1<? super Resource, ? extends Observable<? extends T>> observableFactory,
            Action1<? super Resource> dispose, boolean disposeEagerly) {
        this.resourceFactory = resourceFactory;
        this.observableFactory = observableFactory;
        this.dispose = dispose;
        this.disposeEagerly = disposeEagerly;
    }

    @Override
    public void call(Subscriber<? super T> subscriber) {

        try {
            // create the resource
            final Resource resource = resourceFactory.call();
            // create an action that disposes only once
            final Action0 disposeOnceOnly = createOnceOnlyDisposeAction(resource);
            // dispose on unsubscription
            subscriber.add(Subscriptions.create(disposeOnceOnly));
            // create the observable
            final Observable<? extends T> source = observableFactory
            // create the observable
                    .call(resource);
            final Observable<? extends T> observable;
            // supplement with on termination disposal if requested
            if (disposeEagerly)
                observable = source
                // dispose on completion or error
                        .doOnTerminate(disposeOnceOnly);
            else
                observable = source;
            try {
                // start
                observable.unsafeSubscribe(subscriber);
            } catch (Throwable e) {
                Throwable disposeError = disposeEagerlyIfRequested(disposeOnceOnly);
                if (disposeError != null)
                    subscriber.onError(new CompositeException(Arrays.asList(e,
                            disposeError)));
                else
                    // propagate error
                    subscriber.onError(e);
            }
        } catch (Throwable e) {
            // then propagate error
            subscriber.onError(e);
        }
    }

    private Throwable disposeEagerlyIfRequested(final Action0 disposeOnceOnly) {
        if (disposeEagerly)
            try {
                disposeOnceOnly.call();
                return null;
            } catch (Throwable e) {
                return e;
            }
        else
            return null;
    }

    private Action0 createOnceOnlyDisposeAction(final Resource resource) {
        return new Action0() {

            final AtomicBoolean disposed = new AtomicBoolean(false);

            @Override
            public void call() {
                // only want dispose called once
                if (disposed.compareAndSet(false, true))
                    dispose.call(resource);
            }
        };
    }
}

@akarnokd
Copy link
Member

I'm closing this as the related PR has been merged some days ago.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants