Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Race condition between onNext and unsubscribe #3024

Closed
daschl opened this issue Jun 17, 2015 · 32 comments
Closed

Race condition between onNext and unsubscribe #3024

daschl opened this issue Jun 17, 2015 · 32 comments
Labels

Comments

@daschl
Copy link
Contributor

daschl commented Jun 17, 2015

Hi folks,

I've seen some threads on this threaded message, but I'm still getting it on 1.10 - 1.12. Maybe it's something in my code, but I'm not sure what's going on. I have couchbase-related code which looks like:

bucket
    .async()
    .getFromReplica("id::0", ReplicaMode.ALL)
    .toBlocking()
    .first();

The exact semantics don't matter, the important part is that the getFromReplica could return N responses, but we take only the first, meaning we eagerly unsubscribe. Now I need to do buffer freeing and so forth, so @akarnokd recommended me a lift operator. It works fine, but the eager unsubscribing fails. The code in question is:

public static <T> Observable<T> liftForAutoRelease(final Observable<T> source) {
    return source.lift(new Observable.Operator<T, T>() {
        @Override
        public Subscriber<? super T> call(final Subscriber<? super T> child) {
            return new Subscriber<T>(child) {
                @Override
                public void onCompleted() {
                    child.onCompleted();
                }

                @Override
                public void onError(Throwable e) {
                    child.onError(e);
                }

                @Override
                public void onNext(T t) {
                    if (!child.isUnsubscribed()) {
                        child.onNext(t);
                    } else {
                        ReferenceCountUtil.release(t);
                    }
                }
            };
        }
    });
}

And the source is an AsyncSubject made cold:

@Override
public <R extends CouchbaseResponse> Observable<R> send (final RequestFactory requestFactory) {
    return Buffers.liftForAutoRelease(Observable.defer(new Func0<Observable<R>>() {
        @Override
        public Observable<R> call() {
            return sendHot(requestFactory.call());
        }
    }));
}

Now the dreaded error is:

java.lang.IllegalStateException: This instance has been unsubscribed and the queue is no longer usable.
    at rx.internal.util.RxRingBuffer.onNext(RxRingBuffer.java:346)
    at rx.internal.operators.OperatorMerge$InnerSubscriber.enqueue(OperatorMerge.java:721)
    at rx.internal.operators.OperatorMerge$InnerSubscriber.emit(OperatorMerge.java:698)
    at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:586)
    at com.couchbase.client.core.utils.Buffers$2$1.onNext(Buffers.java:86)
    at rx.subjects.SubjectSubscriptionManager$SubjectObserver.onNext(SubjectSubscriptionManager.java:224)
    at rx.subjects.AsyncSubject.onCompleted(AsyncSubject.java:101)
    at com.couchbase.client.core.endpoint.AbstractGenericHandler$1.call(AbstractGenericHandler.java:208)
    at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Which indicates that in my lift operator, even when I check for isUnsubscribed it could return false, but immediately afterwards when I call onNext it actually is!

Any idea how to fix this?

@akarnokd
Copy link
Member

Yes, there is always a small race window. What I'd do is put an onErrorResumeNext() before the merge step so when the exception is thrown, I call release(t) and return an empty() observable.

Btw, the new merge of #2928 suppresses ISE if it happened due unsubscription so you won't get the exception but you won't get the opportunity to release either.

@daschl
Copy link
Contributor Author

daschl commented Jun 17, 2015

@akarnokd so this is not something I can somehow fix in the lift or on the surroundings there? I want to keep that method "self contained" if you know what I mean..

@akarnokd
Copy link
Member

If the exception pops up within your lifted Subscriber, then yes.

@daschl
Copy link
Contributor Author

daschl commented Jun 17, 2015

@akarnokd that makes sense, the only worry I have with that is that my T is out of scope at this point?

public static <T> Observable<T> liftForAutoRelease(final Observable<T> source) {
    return source.lift(new Observable.Operator<T, T>() {

        @Override
        public Subscriber<? super T> call(final Subscriber<? super T> child) {
            return new Subscriber<T>() {
                @Override
                public void onCompleted() {
                    child.onCompleted();
                }

                @Override
                public void onError(Throwable e) {
                    child.onError(e);
                }

                @Override
                public void onNext(T t) {
                    if (!child.isUnsubscribed()) {
                        child.onNext(t);
                    } else {
                        ReferenceCountUtil.release(t);
                    }
                }
            };
        }
    }).onErrorResumeNext(new Func1<Throwable, Observable<? extends T>>() {
        @Override
        public Observable<? extends T> call(Throwable throwable) {
            if (throwable instanceof IllegalStateException) {
                ReferenceCountUtil.release(/*?*/);
                Observable.empty();
            }
            return Observable.error(throwable);
        }
    });

I could probably rethrow it with an exception that carries the message along.

@akarnokd
Copy link
Member

public static <T> Observable<T> liftForAutoRelease(final Observable<T> source) {
    return source.lift(new Observable.Operator<T, T>() {

        @Override
        public Subscriber<? super T> call(final Subscriber<? super T> child) {
            return new Subscriber<T>() {
                @Override
                public void onCompleted() {
                    child.onCompleted();
                }

                @Override
                public void onError(Throwable e) {
                    child.onError(e);
                }

                @Override
                public void onNext(T t) {
                    if (!child.isUnsubscribed()) {
                        try {
                             child.onNext(t);
                        } catch (IllegalStateException ex) {
                             if (child.isUnsubscribed()) {
                                 ReferenceCountUtil.release(t);
                             } else {
                                 onError(ex);
                             }
                        }
                    } else {
                        ReferenceCountUtil.release(t);
                    }
                }
            };
        }
    });
}

@daschl
Copy link
Contributor Author

daschl commented Jun 17, 2015

@akarnokd the error seems to be gone, but there is still something fishy. I still get leaks reported, but I also get "double releases". Is there a different race condition you can think about that would let the op pass into onNext (where a different component could release it) but then somehow also triggers the one after unsubscription?

@daschl
Copy link
Contributor Author

daschl commented Jun 17, 2015

also, what I don't understand is why are other operators not running into the unsubscribe issue? I checked similar implementations and they "naively" call child.onNext(t) apparently without issues.. I figured there must be something else which I haven't thought about :)

@akarnokd
Copy link
Member

It is hard to say what's going on in your system. You can also try and use subscription management:

MultipleAssignmentSubscription mas = new MultipleAssignmentSubscription();
child.add(mas);

public void onNext(T t) {
    if (!child.isUnsubscribed()) {
        mas.set(Subscriptions.create(() -> ReferenceCountUtil.release(t)));
        try {
            child.onNext(t);
        } catch (IllegalStateException ex) {
            if (!child.isUnsubscribed()) {
               onError(ex);
            }
        }
        mas.set(Subscriptions.unsubscribed());   
    }
}

@daschl
Copy link
Contributor Author

daschl commented Jun 18, 2015

@akarnokd hm it didn't help. I'll dig into it again and see if I can spot something else. Köszönöm!

@daschl
Copy link
Contributor Author

daschl commented Jul 14, 2015

@akarnokd I did some more investigations and was hitting the same (similar) issue by not using custom code, so maybe this time it is easier to reproduce.

First, for reference here is the code I'm using: https://github.com/couchbase/couchbase-java-client/blob/master/src/main/java/com/couchbase/client/java/bucket/ReplicaReader.java

The idea is that I'm performing N database reads and return the results. If all goes well it's fine, but as soon as someone unsubscribes while the ops are still in-flight things fall apart. You can imagine someone saying "give me all results, but I only care about the quickest one". So the user would do a .getFromReplica(...).first() , which would first fire off for example 3 requests/responses, but unsubscribe after the first one arrives and the others are still down the observable pipeline.

So the responses go down this path: https://github.com/couchbase/couchbase-java-client/blob/master/src/main/java/com/couchbase/client/java/bucket/ReplicaReader.java#L76

Here is the important part of the stack trace:

"cb-computations-4@1841" daemon prio=5 tid=0x20 nid=NA runnable
  java.lang.Thread.State: RUNNABLE
      at rx.internal.util.RxRingBuffer.onNext(RxRingBuffer.java:346)
      at rx.internal.operators.OperatorMerge$InnerSubscriber.enqueue(OperatorMerge.java:721)
      at rx.internal.operators.OperatorMerge$InnerSubscriber.emit(OperatorMerge.java:698)
      at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:586)
      at rx.internal.operators.OperatorOnErrorResumeNextViaFunction$1.onNext(OperatorOnErrorResumeNextViaFunction.java:89)
      at rx.internal.operators.OperatorFilter$1.onNext(OperatorFilter.java:54)
      at rx.subjects.SubjectSubscriptionManager$SubjectObserver.onNext(SubjectSubscriptionManager.java:224)
      at rx.subjects.AsyncSubject.onCompleted(AsyncSubject.java:101)
      at com.couchbase.client.core.endpoint.AbstractGenericHandler$1.call(AbstractGenericHandler.java:216)
      at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55)
      at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
      at java.util.concurrent.FutureTask.run(FutureTask.java:266)
      at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
      at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
      at java.lang.Thread.run(Thread.java:745)

You can see it passes through the filter, then it passes through the onErrorResumeNext also without issues, but then it should go back into the flatMap (merge) and there is the problem. Could it be that the merge onNext not properly checks if the whole thing is already teared down?

I wonder if this is an rx merge bug? Since the user has no handle and needs to rely on flatMap to detect if it is unsubscribed already or not?

@daschl
Copy link
Contributor Author

daschl commented Jul 14, 2015

Also, I did try to chain in a lift() and release there, but there still seems to be a race condition between unsubscribing and showing that somewhere in merge.

.filter(GetResponseFilter.INSTANCE)
.lift(new Observable.Operator<GetResponse, GetResponse>() {
    @Override
    public Subscriber<? super GetResponse> call(final Subscriber<? super GetResponse> child) {
        return new Subscriber<GetResponse>(child) {
            @Override
            public void onCompleted() {
                if (!child.isUnsubscribed()) {
                    child.onCompleted();
                }
            }

            @Override
            public void onError(Throwable e) {
                if (!child.isUnsubscribed()) {
                    child.onError(e);
                }
            }

            @Override
            public void onNext(GetResponse getResponse) {
                if (!child.isUnsubscribed()) {
                    child.onNext(getResponse);
                } else {
                    getResponse.release(getResponse.refCnt());
                }
            }
        };
    }
})

@davidmoten
Copy link
Collaborator

Sounds like you are not using the custom operator any more but just after ReferenceCountUtil.release(t) you should call request(1) so stream does not stall under backpressure (especially upstream of a flatMap). Oops not an issue.

The last time I saw this problem was when the using Operator upstream of flatMap was using .subscribe instead of .unsafeSubscribe so on termination pulled the plug on all subscribers before getting to flatMap. See #2604. Not sure if this will help though.

@davidmoten
Copy link
Collaborator

@daschl any chance of distilling a failing unit test?

@daschl
Copy link
Contributor Author

daschl commented Jul 14, 2015

@davidmoten yup will try to do that next :)

@headinthebox
Copy link
Contributor

You can imagine someone saying "give me all results, but I only care about the quickest one"

@daschl That sounds lime amb.

@daschl
Copy link
Contributor Author

daschl commented Jul 14, 2015

@davidmoten I managed to repro it: https://gist.github.com/daschl/d2271725311ac716b004

but you need to keep it running for a while, it will show the stack trace.. not sure if the worker stuff is causing the issue or just highlighting it, I will also try to find out whats going on.

@daschl
Copy link
Contributor Author

daschl commented Jul 14, 2015

@headinthebox indeed! But I return Observable to the user, and he can decide if he wants all or just a subset of the values, so my code needs to handle all kinds of scenarios :)

@daschl
Copy link
Contributor Author

daschl commented Jul 14, 2015

@davidmoten looks like the issue is not related to my custom code.. I changed it to

        public Observable<String> send(final Request request) {
            final int rand = new Random().nextInt(50) + 50;
            Observable
                .timer(rand, TimeUnit.MILLISECONDS, Schedulers.computation())
                .forEach(new Action1<Long>() {
                    @Override
                    public void call(Long aLong) {
                        try {
                            request.response().onNext(rand + "");
                            request.response().onCompleted();
                        } catch (Exception ex) {
                            ex.printStackTrace();
                            request.response().onError(ex);
                        }
                    }
                });

            return request.response().observeOn(Schedulers.computation());
        }

and I get the same there

Exception in thread "RxComputationThreadPool-5" java.lang.IllegalStateException: Fatal Exception thrown on Scheduler.Worker thread.
    at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:62)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalStateException: This instance has been unsubscribed and the queue is no longer usable.
    at rx.internal.util.RxRingBuffer.onNext(RxRingBuffer.java:346)
    at rx.internal.operators.OperatorMerge$InnerSubscriber.enqueue(OperatorMerge.java:721)
    at rx.internal.operators.OperatorMerge$InnerSubscriber.emit(OperatorMerge.java:698)
    at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:586)
    at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.pollQueue(OperatorObserveOn.java:208)
    at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber$2.call(OperatorObserveOn.java:170)
    at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55)
    ... 7 more

@daschl
Copy link
Contributor Author

daschl commented Jul 14, 2015

@davidmoten ah you can repro it much more reliably if you reduce the time for the events to happen:

            final int rand = new Random().nextInt(50);
            Observable
                .timer(rand, TimeUnit.MICROSECONDS, Schedulers.computation())

Will show it nearly immediately in the logs.

@daschl
Copy link
Contributor Author

daschl commented Jul 14, 2015

Updated the gist which shows it consistently and without all the scheduler jazz: https://gist.github.com/daschl/d2271725311ac716b004

@akarnokd
Copy link
Member

I'm sorry, I can't look into this (or many other) issue until September as I'm busy with an important task in my professional work.

@daschl
Copy link
Contributor Author

daschl commented Jul 14, 2015

@akarnokd no worries, thanks for the heads up :)

@davidmoten
Copy link
Collaborator

Thanks for the distilled test @daschl. I've had a play with it and am now reminded of how this race works. One encouraging thing is that the test does not fail (print exception stack traces) with the newly merged OperatorMerge from @akarnokd. I'm not aware of similar opportunities for races in the new version of OperatorMerge and we'll probably have to wait for @akarnokd to comment when he's able.

Your test case would be a nice addition to the merge unit tests once adjusted to run for a small fixed time and changed so that it halts when exception occurs.

@daschl
Copy link
Contributor Author

daschl commented Jul 15, 2015

@davidmoten yes, I was happy to see it merged finally. I'll give it a spin and my plan is to bump rxjava for our client soon anyways, so we might just run with that version once released and it helps.

@daschl
Copy link
Contributor Author

daschl commented Jul 15, 2015

@davidmoten looks like 1.0.13-SNAPSHOT indeed fixes it! But I have a related issue: do you know how I can reliably detect that someone has unsubscribed? using child.isUnsubscribed in a lift operator is not cutting it because it can happen any time (leading to bytebuf leaks).

@davidmoten
Copy link
Collaborator

So unsubscribe say from the endpoint subscriber propagates upstream and there is no message type in the Observable emissions that corresponds to an unsubscribed event to be pushed downstream in response. You can overlay this though if you really want it by pushing materialized notifications (.materialize()) to the endpoint subscriber where one of the onNext notifications contains the unsubscribed message.

@davidmoten
Copy link
Collaborator

Incidentally would Observable.using be of use to you? It has overloads for eager and non-eager unsubscribes.

@daschl
Copy link
Contributor Author

daschl commented Jul 15, 2015

@davidmoten thanks for the info.. I went with something a little different now.. I added a .cache() which is fine in this case since a max of 4 items is emitted, and my transcoding will now always happen (that is, taking a bytebuf and turning it into a gc-able object).. That way I have no leaks, and caching a max of 4 items is not too bad a tradeoff

@davidmoten
Copy link
Collaborator

@daschl bear in mind that .cache behaviour will change a little in 1.0.13. See discussion in #3026.

@daschl
Copy link
Contributor Author

daschl commented Jul 15, 2015

@davidmoten oh hm so you are saying in the current release it will wait for an onComplete to emit items downstream? I thought it will pass them through as they come along, saving them on the side to replay.

@davidmoten
Copy link
Collaborator

Yep. Is fixed in 1.0.13.

On Wed, 15 Jul 2015 22:30 Michael Nitschinger notifications@github.com
wrote:

@davidmoten https://github.com/davidmoten oh hm so you are saying in
the current release it will wait for an onComplete to emit items
downstream? I thought it will pass them through as they come along, saving
them on the side to replay.


Reply to this email directly or view it on GitHub
#3024 (comment).

@daschl
Copy link
Contributor Author

daschl commented Jul 15, 2015

awesome, considering we'll go there anyways it should be good. thanks!

@akarnokd akarnokd closed this as completed Sep 3, 2015
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

4 participants