diff --git a/src/main/java/io/reactivex/internal/operators/completable/CompletableConcatArray.java b/src/main/java/io/reactivex/internal/operators/completable/CompletableConcatArray.java index caccf81c79..3972a7b31c 100644 --- a/src/main/java/io/reactivex/internal/operators/completable/CompletableConcatArray.java +++ b/src/main/java/io/reactivex/internal/operators/completable/CompletableConcatArray.java @@ -52,7 +52,7 @@ static final class ConcatInnerObserver extends AtomicInteger implements Completa @Override public void onSubscribe(Disposable d) { - sd.update(d); + sd.replace(d); } @Override diff --git a/src/main/java/io/reactivex/internal/operators/completable/CompletableConcatIterable.java b/src/main/java/io/reactivex/internal/operators/completable/CompletableConcatIterable.java index 624047a627..47f4fad3bc 100644 --- a/src/main/java/io/reactivex/internal/operators/completable/CompletableConcatIterable.java +++ b/src/main/java/io/reactivex/internal/operators/completable/CompletableConcatIterable.java @@ -64,7 +64,7 @@ static final class ConcatInnerObserver extends AtomicInteger implements Completa @Override public void onSubscribe(Disposable d) { - sd.update(d); + sd.replace(d); } @Override diff --git a/src/test/java/io/reactivex/internal/operators/completable/CompletableAndThenTest.java b/src/test/java/io/reactivex/internal/operators/completable/CompletableAndThenTest.java index abe412b831..a5d9b3a279 100644 --- a/src/test/java/io/reactivex/internal/operators/completable/CompletableAndThenTest.java +++ b/src/test/java/io/reactivex/internal/operators/completable/CompletableAndThenTest.java @@ -15,7 +15,13 @@ import io.reactivex.Completable; import io.reactivex.Maybe; +import io.reactivex.functions.Action; +import io.reactivex.schedulers.Schedulers; + +import java.util.concurrent.CountDownLatch; + import org.junit.Test; +import static org.junit.Assert.*; public class CompletableAndThenTest { @Test(expected = NullPointerException.class) @@ -63,4 +69,39 @@ public void andThenMaybeError() { .assertError(RuntimeException.class) .assertErrorMessage("bla"); } + + @Test + public void andThenNoInterrupt() throws InterruptedException { + for (int k = 0; k < 100; k++) { + final int count = 10; + final CountDownLatch latch = new CountDownLatch(count); + final boolean[] interrupted = { false }; + + for (int i = 0; i < count; i++) { + Completable.complete() + .subscribeOn(Schedulers.io()) + .observeOn(Schedulers.io()) + .andThen(Completable.fromAction(new Action() { + @Override + public void run() throws Exception { + try { + Thread.sleep(30); + } catch (InterruptedException e) { + System.out.println("Interrupted! " + Thread.currentThread()); + interrupted[0] = true; + } + } + })) + .subscribe(new Action() { + @Override + public void run() throws Exception { + latch.countDown(); + } + }); + } + + latch.await(); + assertFalse("The second Completable was interrupted!", interrupted[0]); + } + } } diff --git a/src/test/java/io/reactivex/internal/operators/completable/CompletableConcatTest.java b/src/test/java/io/reactivex/internal/operators/completable/CompletableConcatTest.java index efc76109a5..9c2c080104 100644 --- a/src/test/java/io/reactivex/internal/operators/completable/CompletableConcatTest.java +++ b/src/test/java/io/reactivex/internal/operators/completable/CompletableConcatTest.java @@ -16,6 +16,7 @@ import static org.junit.Assert.*; import java.util.*; +import java.util.concurrent.CountDownLatch; import org.junit.Test; import org.reactivestreams.*; @@ -23,7 +24,7 @@ import io.reactivex.*; import io.reactivex.disposables.Disposables; import io.reactivex.exceptions.*; -import io.reactivex.functions.Function; +import io.reactivex.functions.*; import io.reactivex.internal.subscriptions.BooleanSubscription; import io.reactivex.observers.*; import io.reactivex.plugins.RxJavaPlugins; @@ -254,4 +255,41 @@ public void run() { TestHelper.race(r1, r2, Schedulers.single()); } } + + @Test + public void noInterrupt() throws InterruptedException { + for (int k = 0; k < 100; k++) { + final int count = 10; + final CountDownLatch latch = new CountDownLatch(count); + final boolean[] interrupted = { false }; + + for (int i = 0; i < count; i++) { + Completable c0 = Completable.fromAction(new Action() { + @Override + public void run() throws Exception { + try { + Thread.sleep(30); + } catch (InterruptedException e) { + System.out.println("Interrupted! " + Thread.currentThread()); + interrupted[0] = true; + } + } + }); + Completable.concat(Arrays.asList(Completable.complete() + .subscribeOn(Schedulers.io()) + .observeOn(Schedulers.io()), + c0) + ) + .subscribe(new Action() { + @Override + public void run() throws Exception { + latch.countDown(); + } + }); + } + + latch.await(); + assertFalse("The second Completable was interrupted!", interrupted[0]); + } + } }