From 7058a76748012ccb379f0cd467832093e5b20e9b Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Wed, 4 Jun 2014 16:09:16 -0700 Subject: [PATCH] TrampolineScheduler & Unsubscribe Unsubscribing should prevent new additions to a Worker, but not prevent already scheduled work, and definitely not affect other Workers using the same thread (by modifying the ThreadLocal as it was doing). See the unit test for details of how unsubscribing 1 Worker could prevent work from being done on a completely separate Worker. --- .../rx/schedulers/TrampolineScheduler.java | 4 -- .../schedulers/TrampolineSchedulerTest.java | 64 +++++++++++++++++++ 2 files changed, 64 insertions(+), 4 deletions(-) diff --git a/rxjava-core/src/main/java/rx/schedulers/TrampolineScheduler.java b/rxjava-core/src/main/java/rx/schedulers/TrampolineScheduler.java index a79255aa79..180be6ef8d 100644 --- a/rxjava-core/src/main/java/rx/schedulers/TrampolineScheduler.java +++ b/rxjava-core/src/main/java/rx/schedulers/TrampolineScheduler.java @@ -83,9 +83,6 @@ private Subscription enqueue(Action0 action, long execTime) { if (exec) { while (!queue.isEmpty()) { - if (innerSubscription.isUnsubscribed()) { - return Subscriptions.empty(); - } queue.poll().action.call(); } @@ -108,7 +105,6 @@ public void call() { @Override public void unsubscribe() { - QUEUE.set(null); // this assumes we are calling unsubscribe from the same thread innerSubscription.unsubscribe(); } diff --git a/rxjava-core/src/test/java/rx/schedulers/TrampolineSchedulerTest.java b/rxjava-core/src/test/java/rx/schedulers/TrampolineSchedulerTest.java index a904675f13..b8c2a88647 100644 --- a/rxjava-core/src/test/java/rx/schedulers/TrampolineSchedulerTest.java +++ b/rxjava-core/src/test/java/rx/schedulers/TrampolineSchedulerTest.java @@ -15,12 +15,18 @@ */ package rx.schedulers; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import java.util.ArrayList; +import java.util.Arrays; + import org.junit.Test; import rx.Observable; import rx.Scheduler; +import rx.Scheduler.Worker; +import rx.functions.Action0; import rx.functions.Action1; import rx.functions.Func1; @@ -55,4 +61,62 @@ public void call(String t) { } }); } + + @Test + public void testNestedTrampolineWithUnsubscribe() { + final ArrayList workDone = new ArrayList(); + Worker worker = Schedulers.trampoline().createWorker(); + worker.schedule(new Action0() { + + @Override + public void call() { + doWorkOnNewTrampoline("A", workDone); + } + + }); + + final Worker worker2 = Schedulers.trampoline().createWorker(); + worker2.schedule(new Action0() { + + @Override + public void call() { + doWorkOnNewTrampoline("B", workDone); + // we unsubscribe worker2 ... it should not affect work scheduled on a separate Trampline.Worker + worker2.unsubscribe(); + } + + }); + + assertEquals(6, workDone.size()); + assertEquals(Arrays.asList("A.1", "A.B.1", "A.B.2", "B.1", "B.B.1", "B.B.2"), workDone); + } + + private static void doWorkOnNewTrampoline(final String key, final ArrayList workDone) { + Worker worker = Schedulers.trampoline().createWorker(); + worker.schedule(new Action0() { + + @Override + public void call() { + String msg = key + ".1"; + workDone.add(msg); + System.out.println(msg); + Worker worker3 = Schedulers.trampoline().createWorker(); + worker3.schedule(createPrintAction(key + ".B.1", workDone)); + worker3.schedule(createPrintAction(key + ".B.2", workDone)); + } + + }); + } + + private static Action0 createPrintAction(final String message, final ArrayList workDone) { + return new Action0() { + + @Override + public void call() { + System.out.println(message); + workDone.add(message); + } + + }; + } }