From d1af6be08bbeee65caf9c5d4264d9c2c7dc94bf5 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Mon, 21 Dec 2015 19:39:11 +0100 Subject: [PATCH] 1.x: fix Completable.using not disposing the resource if the factory crashes during the subscription phase. This PR fixes the cases when the Completable factory throws an exception or returns null and the resource is not disposed before reporting error to the subscriber. --- src/main/java/rx/Completable.java | 25 ++++- src/test/java/rx/CompletableTest.java | 134 ++++++++++++++++++++++++++ 2 files changed, 157 insertions(+), 2 deletions(-) diff --git a/src/main/java/rx/Completable.java b/src/main/java/rx/Completable.java index aa222b9e8e..fb864ff14c 100644 --- a/src/main/java/rx/Completable.java +++ b/src/main/java/rx/Completable.java @@ -16,13 +16,13 @@ package rx; -import java.util.Iterator; +import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import rx.Observable.OnSubscribe; import rx.annotations.Experimental; -import rx.exceptions.Exceptions; +import rx.exceptions.*; import rx.functions.*; import rx.internal.operators.*; import rx.internal.util.*; @@ -864,12 +864,33 @@ public void call(final CompletableSubscriber s) { try { cs = completableFunc1.call(resource); } catch (Throwable e) { + try { + disposer.call(resource); + } catch (Throwable ex) { + Exceptions.throwIfFatal(e); + Exceptions.throwIfFatal(ex); + + s.onSubscribe(Subscriptions.unsubscribed()); + s.onError(new CompositeException(Arrays.asList(e, ex))); + return; + } + Exceptions.throwIfFatal(e); + s.onSubscribe(Subscriptions.unsubscribed()); s.onError(e); return; } if (cs == null) { + try { + disposer.call(resource); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + + s.onSubscribe(Subscriptions.unsubscribed()); + s.onError(new CompositeException(Arrays.asList(new NullPointerException("The completable supplied is null"), ex))); + return; + } s.onSubscribe(Subscriptions.unsubscribed()); s.onError(new NullPointerException("The completable supplied is null")); return; diff --git a/src/test/java/rx/CompletableTest.java b/src/test/java/rx/CompletableTest.java index 09d71a9ff7..7c9b2fe70b 100644 --- a/src/test/java/rx/CompletableTest.java +++ b/src/test/java/rx/CompletableTest.java @@ -31,6 +31,9 @@ import rx.subjects.PublishSubject; import rx.subscriptions.*; +import static org.mockito.Mockito.*; +import static org.junit.Assert.*; + /** * Test Completable methods and operators. */ @@ -3410,4 +3413,135 @@ public void endWithFlowableError() { ts.assertError(TestException.class); ts.assertNotCompleted(); } + + @Test + public void usingFactoryThrows() { + @SuppressWarnings("unchecked") + Action1 onDispose = mock(Action1.class); + + TestSubscriber ts = TestSubscriber.create(); + + Completable.using(new Func0() { + @Override + public Integer call() { + return 1; + } + }, + new Func1() { + @Override + public Completable call(Integer t) { + throw new TestException(); + } + }, onDispose).subscribe(ts); + + verify(onDispose).call(1); + + ts.assertNoValues(); + ts.assertNotCompleted(); + ts.assertError(TestException.class); + } + + @Test + public void usingFactoryAndDisposerThrow() { + Action1 onDispose = new Action1() { + @Override + public void call(Integer t) { + throw new TestException(); + } + }; + + TestSubscriber ts = TestSubscriber.create(); + + Completable.using(new Func0() { + @Override + public Integer call() { + return 1; + } + }, + new Func1() { + @Override + public Completable call(Integer t) { + throw new TestException(); + } + }, onDispose).subscribe(ts); + + ts.assertNoValues(); + ts.assertNotCompleted(); + ts.assertError(CompositeException.class); + + CompositeException ex = (CompositeException)ts.getOnErrorEvents().get(0); + + List listEx = ex.getExceptions(); + + assertEquals(2, listEx.size()); + + assertTrue(listEx.get(0).toString(), listEx.get(0) instanceof TestException); + assertTrue(listEx.get(1).toString(), listEx.get(1) instanceof TestException); + } + + @Test + public void usingFactoryReturnsNull() { + @SuppressWarnings("unchecked") + Action1 onDispose = mock(Action1.class); + + TestSubscriber ts = TestSubscriber.create(); + + Completable.using(new Func0() { + @Override + public Integer call() { + return 1; + } + }, + new Func1() { + @Override + public Completable call(Integer t) { + return null; + } + }, onDispose).subscribe(ts); + + verify(onDispose).call(1); + + ts.assertNoValues(); + ts.assertNotCompleted(); + ts.assertError(NullPointerException.class); + } + + @Test + public void usingFactoryReturnsNullAndDisposerThrows() { + Action1 onDispose = new Action1() { + @Override + public void call(Integer t) { + throw new TestException(); + } + }; + + TestSubscriber ts = TestSubscriber.create(); + + Completable.using(new Func0() { + @Override + public Integer call() { + return 1; + } + }, + new Func1() { + @Override + public Completable call(Integer t) { + return null; + } + }, onDispose).subscribe(ts); + + ts.assertNoValues(); + ts.assertNotCompleted(); + ts.assertError(CompositeException.class); + + CompositeException ex = (CompositeException)ts.getOnErrorEvents().get(0); + + List listEx = ex.getExceptions(); + + assertEquals(2, listEx.size()); + + assertTrue(listEx.get(0).toString(), listEx.get(0) instanceof NullPointerException); + assertTrue(listEx.get(1).toString(), listEx.get(1) instanceof TestException); + } + } \ No newline at end of file