Skip to content

Commit

Permalink
2.x: fix Observable.zip to dispose eagerly (#5121)
Browse files Browse the repository at this point in the history
  • Loading branch information
jbarr21 authored and akarnokd committed Feb 20, 2017
1 parent 9a342fd commit 4a32963
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ public void subscribe(ObservableSource<? extends T>[] sources, int bufferSize) {
public void dispose() {
if (!cancelled) {
cancelled = true;
cancelSources();
if (getAndIncrement() == 0) {
clear();
}
Expand All @@ -126,9 +127,19 @@ public boolean isDisposed() {
return cancelled;
}

void clear() {
void cancel() {
clear();
cancelSources();
}

void cancelSources() {
for (ZipObserver<?, ?> zs : observers) {
zs.dispose();
}
}

void clear() {
for (ZipObserver<?, ?> zs : observers) {
zs.queue.clear();
}
}
Expand Down Expand Up @@ -168,7 +179,7 @@ public void drain() {
if (z.done && !delayError) {
Throwable ex = z.error;
if (ex != null) {
clear();
cancel();
a.onError(ex);
return;
}
Expand All @@ -186,7 +197,7 @@ public void drain() {
v = ObjectHelper.requireNonNull(zipper.apply(os.clone()), "The zipper returned a null value");
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
clear();
cancel();
a.onError(ex);
return;
}
Expand All @@ -205,15 +216,15 @@ public void drain() {

boolean checkTerminated(boolean d, boolean empty, Observer<? super R> a, boolean delayError, ZipObserver<?, ?> source) {
if (cancelled) {
clear();
cancel();
return true;
}

if (d) {
if (delayError) {
if (empty) {
Throwable e = source.error;
clear();
cancel();
if (e != null) {
a.onError(e);
} else {
Expand All @@ -224,12 +235,12 @@ boolean checkTerminated(boolean d, boolean empty, Observer<? super R> a, boolean
} else {
Throwable e = source.error;
if (e != null) {
clear();
cancel();
a.onError(e);
return true;
} else
if (empty) {
clear();
cancel();
a.onComplete();
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1392,4 +1392,39 @@ public List<Object> apply(Object t1, Object t2) throws Exception {
assertTrue(list.toString(), list.contains("RxSi"));
assertTrue(list.toString(), list.contains("RxCo"));
}
}}
}

@Test
public void eagerDispose() {
final PublishSubject<Integer> ps1 = PublishSubject.create();
final PublishSubject<Integer> ps2 = PublishSubject.create();

TestObserver<Integer> ts = new TestObserver<Integer>() {
@Override
public void onNext(Integer t) {
super.onNext(t);
cancel();
if (ps1.hasObservers()) {
onError(new IllegalStateException("ps1 not disposed"));
} else
if (ps2.hasObservers()) {
onError(new IllegalStateException("ps2 not disposed"));
} else {
onComplete();
}
}
};

Observable.zip(ps1, ps2, new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer t1, Integer t2) throws Exception {
return t1 + t2;
}
})
.subscribe(ts);

ps1.onNext(1);
ps2.onNext(2);
ts.assertResult(3);
}
}

0 comments on commit 4a32963

Please sign in to comment.