Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix the issue that GroupBy may not call 'unsubscribe' #1967

Merged
merged 4 commits into from
Dec 17, 2014
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 71 additions & 16 deletions src/main/java/rx/internal/operators/OperatorGroupBy.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
*/
package rx.internal.operators;

import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
Expand All @@ -34,6 +36,7 @@
import rx.functions.Func1;
import rx.observables.GroupedObservable;
import rx.subjects.Subject;
import rx.subscriptions.Subscriptions;

/**
* Groups the items emitted by an Observable according to a specified criterion, and emits these
Expand Down Expand Up @@ -76,6 +79,13 @@ static final class GroupBySubscriber<K, T, R> extends Subscriber<T> {
final Func1<? super T, ? extends R> elementSelector;
final Subscriber<? super GroupedObservable<K, R>> child;

// We should not call `unsubscribe()` until `groups.isEmpty() && child.isUnsubscribed()` is true.
// Use `WIP_FOR_UNSUBSCRIBE_UPDATER` to monitor these statuses and call `unsubscribe()` properly.
// Should check both when `child.unsubscribe` is called and any group is removed.
@SuppressWarnings("rawtypes")
static final AtomicIntegerFieldUpdater<GroupBySubscriber> WIP_FOR_UNSUBSCRIBE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(GroupBySubscriber.class, "wipForUnsubscribe");
volatile int wipForUnsubscribe = 1;

public GroupBySubscriber(
Func1<? super T, ? extends K> keySelector,
Func1<? super T, ? extends R> elementSelector,
Expand All @@ -84,6 +94,16 @@ public GroupBySubscriber(
this.keySelector = keySelector;
this.elementSelector = elementSelector;
this.child = child;
child.add(Subscriptions.create(new Action0() {

@Override
public void call() {
if (WIP_FOR_UNSUBSCRIBE_UPDATER.decrementAndGet(self) == 0) {
self.unsubscribe();
}
}

}));
}

private static class GroupState<K, T> {
Expand All @@ -107,7 +127,13 @@ public Observer<T> getObserver() {
private static final NotificationLite<Object> nl = NotificationLite.instance();

volatile int completionEmitted;
volatile int terminated;

private static final int UNTERMINATED = 0;
private static final int TERMINATED_WITH_COMPLETED = 1;
private static final int TERMINATED_WITH_ERROR = 2;

// Must be one of `UNTERMINATED`, `TERMINATED_WITH_COMPLETED`, `TERMINATED_WITH_ERROR`
volatile int terminated = UNTERMINATED;

@SuppressWarnings("rawtypes")
static final AtomicIntegerFieldUpdater<GroupBySubscriber> COMPLETION_EMITTED_UPDATER = AtomicIntegerFieldUpdater.newUpdater(GroupBySubscriber.class, "completionEmitted");
Expand All @@ -130,15 +156,15 @@ public void onStart() {

@Override
public void onCompleted() {
if (TERMINATED_UPDATER.compareAndSet(this, 0, 1)) {
if (TERMINATED_UPDATER.compareAndSet(this, UNTERMINATED, TERMINATED_WITH_COMPLETED)) {
// if we receive onCompleted from our parent we onComplete children
// for each group check if it is ready to accept more events if so pass the oncomplete through else buffer it.
for (GroupState<K, T> group : groups.values()) {
emitItem(group, nl.completed());
}

// special case (no groups emitted ... or all unsubscribed)
if (groups.size() == 0) {
if (groups.isEmpty()) {
// we must track 'completionEmitted' seperately from 'completed' since `completeInner` can result in childObserver.onCompleted() being emitted
if (COMPLETION_EMITTED_UPDATER.compareAndSet(this, 0, 1)) {
child.onCompleted();
Expand All @@ -149,9 +175,19 @@ public void onCompleted() {

@Override
public void onError(Throwable e) {
if (TERMINATED_UPDATER.compareAndSet(this, 0, 1)) {
// we immediately tear everything down if we receive an error
child.onError(e);
if (TERMINATED_UPDATER.compareAndSet(this, UNTERMINATED, TERMINATED_WITH_ERROR)) {
// It's safe to access all groups and emit the error.
// onNext and onError are in sequence so no group will be created in the loop.
for (GroupState<K, T> group : groups.values()) {
emitItem(group, nl.error(e));
}
try {
// we immediately tear everything down if we receive an error
child.onError(e);
} finally {
// We have not chained the subscribers, so need to call it explicitly.
unsubscribe();
}
}
}

Expand Down Expand Up @@ -187,7 +223,9 @@ public void onNext(T t) {
}
group = createNewGroup(key);
}
emitItem(group, nl.next(t));
if (group != null) {
emitItem(group, nl.next(t));
}
} catch (Throwable e) {
onError(OnErrorThrowable.addValueAsLastCause(e, t));
}
Expand Down Expand Up @@ -236,6 +274,11 @@ public void onCompleted() {
@Override
public void onError(Throwable e) {
o.onError(e);
// eagerly cleanup instead of waiting for unsubscribe
if (once.compareAndSet(false, true)) {
// done once per instance, either onComplete or onUnSubscribe
cleanupGroup(key);
}
}

@Override
Expand All @@ -250,7 +293,17 @@ public void onNext(T t) {
}
});

GroupState<K, T> putIfAbsent = groups.putIfAbsent(key, groupState);
GroupState<K, T> putIfAbsent;
for (;;) {
int wip = wipForUnsubscribe;
if (wip <= 0) {
return null;
}
if (WIP_FOR_UNSUBSCRIBE_UPDATER.compareAndSet(this, wip, wip + 1)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this for making the group emission thread-safe with a concurrent unsubscribe?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can take a look at the original discussion in #1959

putIfAbsent = groups.putIfAbsent(key, groupState);
break;
}
}
if (putIfAbsent != null) {
// this shouldn't happen (because we receive onNext sequentially) and would mean we have a bug
throw new IllegalStateException("Group already existed while creating a new one");
Expand All @@ -264,7 +317,7 @@ private void cleanupGroup(Object key) {
GroupState<K, T> removed;
removed = groups.remove(key);
if (removed != null) {
if (removed.buffer.size() > 0) {
if (!removed.buffer.isEmpty()) {
BUFFERED_COUNT.addAndGet(self, -removed.buffer.size());
}
completeInner();
Expand Down Expand Up @@ -342,16 +395,18 @@ private void drainIfPossible(GroupState<K, T> groupState) {
}

private void completeInner() {
// if we have no outstanding groups (all completed or unsubscribe) and terminated/unsubscribed on outer
if (groups.size() == 0 && (terminated == 1 || child.isUnsubscribed())) {
// A group is removed, so check if we need to call `unsubscribe`
if (WIP_FOR_UNSUBSCRIBE_UPDATER.decrementAndGet(this) == 0) {
// It means `groups.isEmpty() && child.isUnsubscribed()` is true
unsubscribe();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand why this is being done here and not inside the conditional block below where it already calls unsubscribe on line 352/403.

We shouldn't call unsubscribe before emitting onCompleted.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't call unsubscribe before emitting onCompleted.

If it becomes 0, it means an asynchronous unsubscribe has already been called from:

            child.add(Subscriptions.create(new Action0() {

                @Override
                public void call() {
                    if (WIP_FOR_UNSUBSCRIBE_UPDATER.decrementAndGet(self) == 0) {
                        self.unsubscribe();
                    }
                }

            }));

So now child.isUnsubscribed must be true and onCompleted won't be called.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is very nuanced. So what you're saying is that the last complete from an inner has occurred and is about to onComplete the outer but was beaten to the onCompleted by an unsubscribe on the outer?

Can you please add comments explaining this as none of us will remember that logic when we come to this in many months.

}
// if we have no outstanding groups (all completed or unsubscribe) and terminated on outer
if (groups.isEmpty() && terminated == TERMINATED_WITH_COMPLETED) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't this be an if/else? If we just unsubscribed in the previous lines I don't think we should ever go through this flow.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right.

// completionEmitted ensures we only emit onCompleted once
if (COMPLETION_EMITTED_UPDATER.compareAndSet(this, 0, 1)) {

if (child.isUnsubscribed()) {
// if the entire groupBy has been unsubscribed and children are completed we will propagate the unsubscribe up.
unsubscribe();
if (!child.isUnsubscribed()) {
child.onCompleted();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this isUnsubscribed check only possible under a race condition between an onComplete and unsubscribe that comes after the check a few lines above? Do we really need this check any longer now that the WIP_FOR_UNSUBSCRIBE_UPDATER exists?

Note the comment a few lines above that we would have invoked unsubscribe if groups.isEmpty() and child.isUnsubscribed() == true. Those are two of the three conditions that would get us here IF the unsubscribe happens after we're past those lines. Thus it would be a race wit TERMINATED_WITH_COMPLETED and unsubscribe.

Under that circumstance do we need the isUnsubscribed check? It seems fine to emit the onCompleted in that case since we are truly completed. The unsubscribe() would propagate up from the registered Action on line 102 if I understand correctly since this method would have decremented already.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right.

}
child.onCompleted();
}
}
}
Expand Down
69 changes: 69 additions & 0 deletions src/test/java/rx/internal/operators/OperatorGroupByTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import rx.Observable.OnSubscribe;
import rx.Observer;
import rx.Subscriber;
import rx.Subscription;
import rx.exceptions.TestException;
import rx.functions.Action0;
import rx.functions.Action1;
Expand Down Expand Up @@ -1385,4 +1386,72 @@ public void call(String s) {
assertEquals(null, key[0]);
assertEquals(Arrays.asList("a", "b", "c"), values);
}

@Test
public void testGroupByUnsubscribe() {
final Subscription s = mock(Subscription.class);
Observable<Integer> o = Observable.create(
new OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.add(s);
}
}
);
o.groupBy(new Func1<Integer, Integer>() {

@Override
public Integer call(Integer integer) {
return null;
}
}).subscribe().unsubscribe();
verify(s).unsubscribe();
}

@Test
public void testGroupByShouldPropagateError() {
final Throwable e = new RuntimeException("Oops");
final TestSubscriber<Integer> inner1 = new TestSubscriber<Integer>();
final TestSubscriber<Integer> inner2 = new TestSubscriber<Integer>();

final TestSubscriber<GroupedObservable<Integer, Integer>> outer
= new TestSubscriber<GroupedObservable<Integer, Integer>>(new Subscriber<GroupedObservable<Integer, Integer>>() {

@Override
public void onCompleted() {
}

@Override
public void onError(Throwable e) {
}

@Override
public void onNext(GroupedObservable<Integer, Integer> o) {
if (o.getKey() == 0) {
o.subscribe(inner1);
} else {
o.subscribe(inner2);
}
}
});
Observable.create(
new OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onNext(0);
subscriber.onNext(1);
subscriber.onError(e);
}
}
).groupBy(new Func1<Integer, Integer>() {

@Override
public Integer call(Integer i) {
return i % 2;
}
}).subscribe(outer);
assertEquals(Arrays.asList(e), outer.getOnErrorEvents());
assertEquals(Arrays.asList(e), inner1.getOnErrorEvents());
assertEquals(Arrays.asList(e), inner2.getOnErrorEvents());
}
}