Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public Worker createWorker() {
@NonNull
@Override
public Disposable scheduleDirect(@NonNull Runnable run) {
run.run();
RxJavaPlugins.onSchedule(run).run();
return EmptyDisposable.INSTANCE;
}

Expand All @@ -58,7 +58,7 @@ public Disposable scheduleDirect(@NonNull Runnable run) {
public Disposable scheduleDirect(@NonNull Runnable run, long delay, TimeUnit unit) {
try {
unit.sleep(delay);
run.run();
RxJavaPlugins.onSchedule(run).run();
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
RxJavaPlugins.onError(ex);
Expand Down
72 changes: 72 additions & 0 deletions src/test/java/io/reactivex/schedulers/AbstractSchedulerTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;

import io.reactivex.internal.functions.Functions;
import io.reactivex.plugins.RxJavaPlugins;
import org.junit.Test;
import org.mockito.InOrder;
import org.mockito.invocation.InvocationOnMock;
Expand All @@ -41,6 +43,7 @@ public abstract class AbstractSchedulerTests {

/**
* The scheduler to test.
*
* @return the Scheduler instance
*/
protected abstract Scheduler getScheduler();
Expand Down Expand Up @@ -576,6 +579,7 @@ public void schedulePeriodicallyDirectZeroPeriod() throws Exception {
try {
sd.replace(s.schedulePeriodicallyDirect(new Runnable() {
int count;

@Override
public void run() {
if (++count == 10) {
Expand Down Expand Up @@ -610,6 +614,7 @@ public void schedulePeriodicallyZeroPeriod() throws Exception {
try {
sd.replace(w.schedulePeriodically(new Runnable() {
int count;

@Override
public void run() {
if (++count == 10) {
Expand All @@ -626,4 +631,71 @@ public void run() {
}
}
}

private void assertRunnableDecorated(Runnable scheduleCall) throws InterruptedException {
try {
final CountDownLatch decoratedCalled = new CountDownLatch(1);

RxJavaPlugins.setScheduleHandler(new Function<Runnable, Runnable>() {
@Override
public Runnable apply(final Runnable actual) throws Exception {
return new Runnable() {
@Override
public void run() {
decoratedCalled.countDown();
actual.run();
}
};
}
});

scheduleCall.run();

assertTrue(decoratedCalled.await(5, TimeUnit.SECONDS));
} finally {
RxJavaPlugins.reset();
}
}

@Test(timeout = 6000)
public void scheduleDirectDecoratesRunnable() throws InterruptedException {
assertRunnableDecorated(new Runnable() {
@Override
public void run() {
getScheduler().scheduleDirect(Functions.EMPTY_RUNNABLE);
}
});
}

@Test(timeout = 6000)
public void scheduleDirectWithDelayDecoratesRunnable() throws InterruptedException {
assertRunnableDecorated(new Runnable() {
@Override
public void run() {
getScheduler().scheduleDirect(Functions.EMPTY_RUNNABLE, 1, TimeUnit.MILLISECONDS);
}
});
}

@Test(timeout = 6000)
public void schedulePeriodicallyDirectDecoratesRunnable() throws InterruptedException {
final Scheduler scheduler = getScheduler();
if (scheduler instanceof TrampolineScheduler) {
// Can't properly stop a trampolined periodic task.
return;
}

final AtomicReference<Disposable> disposable = new AtomicReference<Disposable>();

try {
assertRunnableDecorated(new Runnable() {
@Override
public void run() {
disposable.set(scheduler.schedulePeriodicallyDirect(Functions.EMPTY_RUNNABLE, 1, 10000, TimeUnit.MILLISECONDS));
}
});
} finally {
disposable.get().dispose();
}
}
}