diff --git a/reactor-core/src/main/java/reactor/core/publisher/BlockingOptionalMonoSubscriber.java b/reactor-core/src/main/java/reactor/core/publisher/BlockingOptionalMonoSubscriber.java index d39a449b21..6805753c90 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/BlockingOptionalMonoSubscriber.java +++ b/reactor-core/src/main/java/reactor/core/publisher/BlockingOptionalMonoSubscriber.java @@ -39,6 +39,7 @@ import org.reactivestreams.Subscription; import reactor.core.Disposable; import reactor.core.Exceptions; +import reactor.core.scheduler.Schedulers; import reactor.util.annotation.Nullable; /** @@ -109,6 +110,9 @@ public final void dispose() { * @return an Optional representing the first value (or empty Optional if the source is empty) */ final Optional blockingGet() { + if (Schedulers.isInNonBlockingThread()) { + throw new IllegalStateException("blockOptional() is blocking, which is not supported in thread " + Thread.currentThread().getName()); + } if (getCount() != 0) { try { await(); @@ -142,6 +146,9 @@ final Optional blockingGet() { * @return an Optional representing the first value (or empty Optional if the source is empty) */ final Optional blockingGet(long timeout, TimeUnit unit) { + if (Schedulers.isInNonBlockingThread()) { + throw new IllegalStateException("blockOptional() is blocking, which is not supported in thread " + Thread.currentThread().getName()); + } if (getCount() != 0) { try { if (!await(timeout, unit)) { diff --git a/reactor-core/src/main/java/reactor/core/publisher/BlockingSingleSubscriber.java b/reactor-core/src/main/java/reactor/core/publisher/BlockingSingleSubscriber.java index 6c663e66dd..674c866a52 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/BlockingSingleSubscriber.java +++ b/reactor-core/src/main/java/reactor/core/publisher/BlockingSingleSubscriber.java @@ -22,6 +22,7 @@ import org.reactivestreams.Subscription; import reactor.core.Disposable; import reactor.core.Exceptions; +import reactor.core.scheduler.Schedulers; import reactor.util.annotation.Nullable; /** @@ -72,6 +73,9 @@ public final void dispose() { */ @Nullable final T blockingGet() { + if (Schedulers.isInNonBlockingThread()) { + throw new IllegalStateException("block()/blockFirst()/blockLast() are blocking, which is not supported in thread " + Thread.currentThread().getName()); + } if (getCount() != 0) { try { await(); @@ -103,6 +107,9 @@ final T blockingGet() { */ @Nullable final T blockingGet(long timeout, TimeUnit unit) { + if (Schedulers.isInNonBlockingThread()) { + throw new IllegalStateException("block()/blockFirst()/blockLast() are blocking, which is not supported in thread " + Thread.currentThread().getName()); + } if (getCount() != 0) { try { if (!await(timeout, unit)) { diff --git a/reactor-core/src/main/java/reactor/core/publisher/Flux.java b/reactor-core/src/main/java/reactor/core/publisher/Flux.java index 7bac9bacfe..a667699b11 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/Flux.java +++ b/reactor-core/src/main/java/reactor/core/publisher/Flux.java @@ -7733,6 +7733,9 @@ public final Iterable toIterable(int batchSize) { */ public final Iterable toIterable(int batchSize, @Nullable Supplier> queueProvider) { + if (Schedulers.isInNonBlockingThread()) { + throw new IllegalStateException("toIterable() is blocking, which is not supported in thread " + Thread.currentThread().getName()); + } final Supplier> provider; if(queueProvider == null){ provider = Queues.get(batchSize); @@ -7768,6 +7771,9 @@ public final Stream toStream() { * @return a {@link Stream} of unknown size with onClose attached to {@link Subscription#cancel()} */ public final Stream toStream(int batchSize) { + if (Schedulers.isInNonBlockingThread()) { + throw new IllegalStateException("toStream() is blocking, which is not supported in thread " + Thread.currentThread().getName()); + } final Supplier> provider; provider = Queues.get(batchSize); return new BlockingIterable<>(this, batchSize, provider).stream(); diff --git a/reactor-core/src/main/java/reactor/core/scheduler/ElasticScheduler.java b/reactor-core/src/main/java/reactor/core/scheduler/ElasticScheduler.java index e45df68aa6..7d14d27833 100644 --- a/reactor-core/src/main/java/reactor/core/scheduler/ElasticScheduler.java +++ b/reactor-core/src/main/java/reactor/core/scheduler/ElasticScheduler.java @@ -186,8 +186,8 @@ public Disposable schedulePeriodically(Runnable task, long initialDelay, long pe public String toString() { StringBuilder ts = new StringBuilder(Schedulers.ELASTIC) .append('('); - if (factory instanceof Schedulers.SchedulerThreadFactory) { - ts.append('\"').append(((Schedulers.SchedulerThreadFactory) factory).get()).append('\"'); + if (factory instanceof ReactorThreadFactory) { + ts.append('\"').append(((ReactorThreadFactory) factory).get()).append('\"'); } ts.append(')'); return ts.toString(); diff --git a/reactor-core/src/main/java/reactor/core/scheduler/NonBlocking.java b/reactor-core/src/main/java/reactor/core/scheduler/NonBlocking.java new file mode 100644 index 0000000000..2dc5ddb5ea --- /dev/null +++ b/reactor-core/src/main/java/reactor/core/scheduler/NonBlocking.java @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2011-2018 Pivotal Software Inc, All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package reactor.core.scheduler; + +import java.util.concurrent.ThreadFactory; + +/** + * A marker interface that is detected on {@link Thread Threads} while executing Reactor + * blocking APIs, resulting in these calls throwing an exception. + *

+ * Extend {@link AbstractReactorThreadFactory} for a {@link ThreadFactory} that can easily + * create such threads, and optionally name them and further configure them. + *

+ * See {@link Schedulers#isBlockingCurrentThreadOk()} and + * {@link Schedulers#isBlockingCurrentThreadOk(Thread)} for a check that includes detecting + * this marker interface. + * + * @author Simon Baslé + */ +public interface NonBlocking { } diff --git a/reactor-core/src/main/java/reactor/core/scheduler/ParallelScheduler.java b/reactor-core/src/main/java/reactor/core/scheduler/ParallelScheduler.java index 194bee4c57..592fdecdd8 100644 --- a/reactor-core/src/main/java/reactor/core/scheduler/ParallelScheduler.java +++ b/reactor-core/src/main/java/reactor/core/scheduler/ParallelScheduler.java @@ -174,8 +174,8 @@ public Disposable schedulePeriodically(Runnable task, public String toString() { StringBuilder ts = new StringBuilder(Schedulers.PARALLEL) .append('(').append(n); - if (factory instanceof Schedulers.SchedulerThreadFactory) { - ts.append(",\"").append(((Schedulers.SchedulerThreadFactory) factory).get()).append('\"'); + if (factory instanceof ReactorThreadFactory) { + ts.append(",\"").append(((ReactorThreadFactory) factory).get()).append('\"'); } ts.append(')'); return ts.toString(); diff --git a/reactor-core/src/main/java/reactor/core/scheduler/ReactorThreadFactory.java b/reactor-core/src/main/java/reactor/core/scheduler/ReactorThreadFactory.java new file mode 100644 index 0000000000..31c573176e --- /dev/null +++ b/reactor-core/src/main/java/reactor/core/scheduler/ReactorThreadFactory.java @@ -0,0 +1,98 @@ +/* + * Copyright (c) 2011-2018 Pivotal Software Inc, All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package reactor.core.scheduler; + +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.BiConsumer; +import java.util.function.Supplier; + +import org.jetbrains.annotations.NotNull; +import reactor.util.annotation.Nullable; + +/** + * The standard Reactor {@link ThreadFactory Thread factories} to be used by {@link Scheduler}, + * creating {@link Thread} with a prefix (which can be retrieved with the {@link #get()} method). + * + * @author Simon Baslé + */ +class ReactorThreadFactory implements ThreadFactory, + Supplier, + Thread.UncaughtExceptionHandler { + + final private String name; + final private AtomicLong counterReference; + final private boolean daemon; + final private boolean rejectBlocking; + + @Nullable + final private BiConsumer uncaughtExceptionHandler; + + ReactorThreadFactory(String name, + AtomicLong counterReference, + boolean daemon, + boolean rejectBlocking, + @Nullable BiConsumer uncaughtExceptionHandler) { + this.name = name; + this.counterReference = counterReference; + this.daemon = daemon; + this.rejectBlocking = rejectBlocking; + this.uncaughtExceptionHandler = uncaughtExceptionHandler; + } + + @Override + public final Thread newThread(@NotNull Runnable runnable) { + String newThreadName = name + "-" + counterReference.incrementAndGet(); + Thread t = rejectBlocking + ? new NonBlockingThread(runnable, newThreadName) + : new Thread(runnable, newThreadName); + if (daemon) { + t.setDaemon(true); + } + if (uncaughtExceptionHandler != null) { + t.setUncaughtExceptionHandler(this); + } + return t; + } + + @Override + public void uncaughtException(Thread t, Throwable e) { + if (uncaughtExceptionHandler == null) { + return; + } + + uncaughtExceptionHandler.accept(t,e); + } + + /** + * Get the prefix used for new {@link Thread Threads} created by this {@link ThreadFactory}. + * The factory can also be seen as a {@link Supplier Supplier<String>}. + * + * @return the thread name prefix + */ + @Override + public final String get() { + return name; + } + + static final class NonBlockingThread extends Thread implements NonBlocking { + + public NonBlockingThread(Runnable target, String name) { + super(target, name); + } + } +} diff --git a/reactor-core/src/main/java/reactor/core/scheduler/Schedulers.java b/reactor-core/src/main/java/reactor/core/scheduler/Schedulers.java index 4350798633..f0107f04e3 100644 --- a/reactor-core/src/main/java/reactor/core/scheduler/Schedulers.java +++ b/reactor-core/src/main/java/reactor/core/scheduler/Schedulers.java @@ -29,7 +29,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; -import java.util.function.Function; +import java.util.function.Predicate; import java.util.function.Supplier; import reactor.core.Disposable; @@ -127,6 +127,17 @@ public static Scheduler elastic() { return cache(CACHED_ELASTIC, ELASTIC, ELASTIC_SUPPLIER); } + /** + * {@link Scheduler} that hosts a fixed pool of single-threaded ExecutorService-based + * workers and is suited for parallel work. + * + * @return default instance of a {@link Scheduler} that hosts a fixed pool of single-threaded + * ExecutorService-based workers + */ + public static Scheduler parallel() { + return cache(CACHED_PARALLEL, PARALLEL, PARALLEL_SUPPLIER); + } + /** * Executes tasks on the caller's thread immediately. * @@ -192,8 +203,8 @@ public static Scheduler newElastic(String name, int ttlSeconds) { */ public static Scheduler newElastic(String name, int ttlSeconds, boolean daemon) { return newElastic(ttlSeconds, - new SchedulerThreadFactory(name, daemon, ElasticScheduler.COUNTER)); - + new ReactorThreadFactory(name, ElasticScheduler.COUNTER, daemon, false, + Schedulers::defaultUncaughtException)); } /** @@ -217,7 +228,8 @@ public static Scheduler newElastic(int ttlSeconds, ThreadFactory threadFactory) /** * {@link Scheduler} that hosts a fixed pool of single-threaded ExecutorService-based - * workers and is suited for parallel work. + * workers and is suited for parallel work. This type of {@link Scheduler} detects and + * rejects usage of blocking Reactor APIs. * * @param name Thread prefix * @@ -231,7 +243,8 @@ public static Scheduler newParallel(String name) { /** * {@link Scheduler} that hosts a fixed pool of single-threaded ExecutorService-based - * workers and is suited for parallel work. + * workers and is suited for parallel work. This type of {@link Scheduler} detects and + * rejects usage of blocking Reactor APIs. * * @param name Thread prefix * @param parallelism Number of pooled workers. @@ -245,7 +258,8 @@ public static Scheduler newParallel(String name, int parallelism) { /** * {@link Scheduler} that hosts a fixed pool of single-threaded ExecutorService-based - * workers and is suited for parallel work. + * workers and is suited for parallel work. This type of {@link Scheduler} detects and + * rejects usage of blocking Reactor APIs. * * @param name Thread prefix * @param parallelism Number of pooled workers. @@ -257,7 +271,8 @@ public static Scheduler newParallel(String name, int parallelism) { */ public static Scheduler newParallel(String name, int parallelism, boolean daemon) { return newParallel(parallelism, - new SchedulerThreadFactory(name, daemon, ParallelScheduler.COUNTER)); + new ReactorThreadFactory(name, ParallelScheduler.COUNTER, daemon, + true, Schedulers::defaultUncaughtException)); } /** @@ -277,7 +292,8 @@ public static Scheduler newParallel(int parallelism, ThreadFactory threadFactory /** * {@link Scheduler} that hosts a single-threaded ExecutorService-based worker and is - * suited for parallel work. + * suited for parallel work. This type of {@link Scheduler} detects and rejects usage + * * of blocking Reactor APIs. * * @param name Component and thread name prefix * @@ -290,7 +306,8 @@ public static Scheduler newSingle(String name) { /** * {@link Scheduler} that hosts a single-threaded ExecutorService-based worker and is - * suited for parallel work. + * suited for parallel work. This type of {@link Scheduler} detects and rejects usage + * of blocking Reactor APIs. * * @param name Component and thread name prefix * @param daemon false if the {@link Scheduler} requires an explicit {@link @@ -300,8 +317,8 @@ public static Scheduler newSingle(String name) { * worker */ public static Scheduler newSingle(String name, boolean daemon) { - return newSingle(new SchedulerThreadFactory(name, daemon, - SingleScheduler.COUNTER)); + return newSingle(new ReactorThreadFactory(name, SingleScheduler.COUNTER, daemon, + true, Schedulers::defaultUncaughtException)); } /** @@ -334,14 +351,25 @@ public static void onHandleError(BiConsumer c) { } /** - * {@link Scheduler} that hosts a fixed pool of single-threaded ExecutorService-based - * workers and is suited for parallel work. + * Check if calling a Reactor blocking API in the current {@link Thread} is forbidden + * or not, by checking if the thread implements {@link NonBlocking} (in which case it is + * forbidden and this method returns {@code true}). * - * @return default instance of a {@link Scheduler} that hosts a fixed pool of single-threaded - * ExecutorService-based workers + * @return {@code true} if blocking is forbidden in this thread, {@code false} otherwise */ - public static Scheduler parallel() { - return cache(CACHED_PARALLEL, PARALLEL, PARALLEL_SUPPLIER); + public static boolean isInNonBlockingThread() { + return Thread.currentThread() instanceof NonBlocking; + } + + /** + * Check if calling a Reactor blocking API in the given {@link Thread} is forbidden + * or not, by checking if the thread implements {@link NonBlocking} (in which case it is + * forbidden and this method returns {@code true}). + * + * @return {@code true} if blocking is forbidden in that thread, {@code false} otherwise + */ + public static boolean isNonBlockingThread(Thread t) { + return t instanceof NonBlocking; } /** @@ -542,37 +570,9 @@ static CachedScheduler cache(AtomicReference reference, String static final Logger log = Loggers.getLogger(Schedulers.class); - static final class SchedulerThreadFactory - implements ThreadFactory, Supplier, Thread.UncaughtExceptionHandler { - - final String name; - final boolean daemon; - final AtomicLong COUNTER; - - SchedulerThreadFactory(String name, boolean daemon, AtomicLong counter) { - this.name = name; - this.daemon = daemon; - this.COUNTER = counter; - } - - @Override - public Thread newThread(Runnable r) { - Thread t = new Thread(r, name + "-" + COUNTER.incrementAndGet()); - t.setDaemon(daemon); - t.setUncaughtExceptionHandler(this); - return t; - } - - @Override - public void uncaughtException(Thread t, Throwable e) { - log.error("Scheduler worker in group " + t.getThreadGroup().getName() + - " failed with an uncaught exception", e); - } - - @Override - public String get() { - return name; - } + static final void defaultUncaughtException(Thread t, Throwable e) { + Schedulers.log.error("Scheduler worker in group " + t.getThreadGroup().getName() + + " failed with an uncaught exception", e); } static void handleError(Throwable ex) { diff --git a/reactor-core/src/main/java/reactor/core/scheduler/SingleScheduler.java b/reactor-core/src/main/java/reactor/core/scheduler/SingleScheduler.java index 9f4f95a72c..63cee4868e 100644 --- a/reactor-core/src/main/java/reactor/core/scheduler/SingleScheduler.java +++ b/reactor-core/src/main/java/reactor/core/scheduler/SingleScheduler.java @@ -140,8 +140,8 @@ public Disposable schedulePeriodically(Runnable task, public String toString() { StringBuilder ts = new StringBuilder(Schedulers.SINGLE) .append('('); - if (factory instanceof Schedulers.SchedulerThreadFactory) { - ts.append('\"').append(((Schedulers.SchedulerThreadFactory) factory).get()).append('\"'); + if (factory instanceof ReactorThreadFactory) { + ts.append('\"').append(((ReactorThreadFactory) factory).get()).append('\"'); } return ts.append(')').toString(); } diff --git a/reactor-core/src/test/java/reactor/core/publisher/BlockingTests.java b/reactor-core/src/test/java/reactor/core/publisher/BlockingTests.java index 8acc486588..9de673c493 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/BlockingTests.java +++ b/reactor-core/src/test/java/reactor/core/publisher/BlockingTests.java @@ -17,11 +17,15 @@ package reactor.core.publisher; import java.time.Duration; +import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; +import java.util.stream.Stream; +import org.assertj.core.api.Assertions; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; @@ -29,21 +33,25 @@ import reactor.core.Exceptions; import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; +import reactor.test.StepVerifier; import static org.assertj.core.api.AssertionsForClassTypes.assertThat; public class BlockingTests { static Scheduler scheduler; + static Scheduler nonBlockingScheduler; @BeforeClass public static void before() { scheduler = Schedulers.fromExecutorService(Executors.newSingleThreadExecutor()); + nonBlockingScheduler = Schedulers.newSingle("nonBlockingScheduler"); } @AfterClass public static void after() { scheduler.dispose(); + nonBlockingScheduler.dispose(); } @Test @@ -224,4 +232,148 @@ public void monoBlockOptionalDoesntCancel() { assertThat(cancelCount.get()).isEqualTo(0); } + + @Test + public void fluxBlockFirstForbidden() { + Function badMapper = v -> Flux.just(v).hide() + .blockFirst(); + Function badMapperTimeout = v -> Flux.just(v).hide() + .blockFirst(Duration.ofMillis(100)); + + Mono forbiddenSequence1 = Mono.just("data") + .publishOn(nonBlockingScheduler) + .map(badMapper); + + StepVerifier.create(forbiddenSequence1) + .expectErrorSatisfies(e -> assertThat(e) + .isInstanceOf(IllegalStateException.class) + .hasMessageStartingWith("block()/blockFirst()/blockLast() are blocking, which is not supported in thread nonBlockingScheduler-")) + .verify(); + + Mono forbiddenSequence2 = Mono.just("data") + .publishOn(nonBlockingScheduler) + .map(badMapperTimeout); + + StepVerifier.create(forbiddenSequence2) + .expectErrorSatisfies(e -> assertThat(e) + .isInstanceOf(IllegalStateException.class) + .hasMessageStartingWith("block()/blockFirst()/blockLast() are blocking, which is not supported in thread nonBlockingScheduler-")) + .verify(); + } + + @Test + public void fluxBlockLastForbidden() { + Function badMapper = v -> Flux.just(v).hide() + .blockLast(); + Function badMapperTimeout = v -> Flux.just(v).hide() + .blockLast(Duration.ofMillis(100)); + + Mono forbiddenSequence1 = Mono.just("data") + .publishOn(nonBlockingScheduler) + .map(badMapper); + + StepVerifier.create(forbiddenSequence1) + .expectErrorSatisfies(e -> assertThat(e) + .isInstanceOf(IllegalStateException.class) + .hasMessageStartingWith("block()/blockFirst()/blockLast() are blocking, which is not supported in thread nonBlockingScheduler-")) + .verify(); + + Mono forbiddenSequence2 = Mono.just("data") + .publishOn(nonBlockingScheduler) + .map(badMapperTimeout); + + StepVerifier.create(forbiddenSequence2) + .expectErrorSatisfies(e -> assertThat(e) + .isInstanceOf(IllegalStateException.class) + .hasMessageStartingWith("block()/blockFirst()/blockLast() are blocking, which is not supported in thread nonBlockingScheduler-")) + .verify(); + } + + @Test + public void monoBlockForbidden() { + Function badMapper = v -> Mono.just(v).hide() + .block(); + Function badMapperTimeout = v -> Mono.just(v).hide() + .block(Duration.ofMillis(100)); + + Mono forbiddenSequence1 = Mono.just("data") + .publishOn(nonBlockingScheduler) + .map(badMapper); + + StepVerifier.create(forbiddenSequence1) + .expectErrorSatisfies(e -> assertThat(e) + .isInstanceOf(IllegalStateException.class) + .hasMessageStartingWith("block()/blockFirst()/blockLast() are blocking, which is not supported in thread nonBlockingScheduler-")) + .verify(); + + Mono forbiddenSequence2 = Mono.just("data") + .publishOn(nonBlockingScheduler) + .map(badMapperTimeout); + + StepVerifier.create(forbiddenSequence2) + .expectErrorSatisfies(e -> assertThat(e) + .isInstanceOf(IllegalStateException.class) + .hasMessageStartingWith("block()/blockFirst()/blockLast() are blocking, which is not supported in thread nonBlockingScheduler-")) + .verify(); + } + + @Test + public void monoBlockOptionalForbidden() { + Function> badMapper = v -> Mono.just(v).hide() + .blockOptional(); + Function> badMapperTimeout = v -> Mono.just(v).hide() + .blockOptional(Duration.ofMillis(100)); + + Mono> forbiddenSequence1 = Mono.just("data") + .publishOn(nonBlockingScheduler) + .map(badMapper); + + StepVerifier.create(forbiddenSequence1) + .expectErrorSatisfies(e -> assertThat(e) + .isInstanceOf(IllegalStateException.class) + .hasMessageStartingWith("blockOptional() is blocking, which is not supported in thread nonBlockingScheduler-")) + .verify(); + + Mono> forbiddenSequence2 = Mono.just("data") + .publishOn(nonBlockingScheduler) + .map(badMapperTimeout); + + StepVerifier.create(forbiddenSequence2) + .expectErrorSatisfies(e -> assertThat(e) + .isInstanceOf(IllegalStateException.class) + .hasMessageStartingWith("blockOptional() is blocking, which is not supported in thread nonBlockingScheduler-")) + .verify(); + } + + @Test + public void fluxToIterableForbidden() { + Function badMapper = v -> Flux.range(1, v) + .toIterable(); + + Mono forbiddenSequence = Mono.just(3) + .publishOn(nonBlockingScheduler) + .map(badMapper); + + StepVerifier.create(forbiddenSequence) + .expectErrorSatisfies(e -> assertThat(e) + .isInstanceOf(IllegalStateException.class) + .hasMessageStartingWith("toIterable() is blocking, which is not supported in thread nonBlockingScheduler-")) + .verify(); + } + + @Test + public void fluxToStreamForbidden() { + Function badMapper = v -> Flux.range(1, v) + .toStream(); + + Mono forbiddenSequence = Mono.just(3) + .publishOn(nonBlockingScheduler) + .map(badMapper); + + StepVerifier.create(forbiddenSequence) + .expectErrorSatisfies(e -> assertThat(e) + .isInstanceOf(IllegalStateException.class) + .hasMessageStartingWith("toStream() is blocking, which is not supported in thread nonBlockingScheduler-")) + .verify(); + } } diff --git a/reactor-core/src/test/java/reactor/core/publisher/FluxSubscribeOnTest.java b/reactor-core/src/test/java/reactor/core/publisher/FluxSubscribeOnTest.java index 8d04603c9e..e37d75ba58 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/FluxSubscribeOnTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/FluxSubscribeOnTest.java @@ -16,7 +16,10 @@ package reactor.core.publisher; import java.time.Duration; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; @@ -265,38 +268,58 @@ public void forceScheduledRequests() { public void gh507() { Scheduler s = Schedulers.newSingle("subscribe"); Scheduler s2 = Schedulers.newParallel("receive"); - - Flux.from((Publisher) subscriber -> { - subscriber.onSubscribe(new Subscription() { - private int totalCount; - - @Override - public void request(long n) { - for (int i = 0; i < n; i++) { - if (totalCount++ < 317) { - subscriber.onNext(String.valueOf(totalCount)); - } - else { - subscriber.onComplete(); + AtomicBoolean interrupted = new AtomicBoolean(); + AtomicBoolean timedOut = new AtomicBoolean(); + + try { + Flux.from((Publisher) subscriber -> { + subscriber.onSubscribe(new Subscription() { + private int totalCount; + + @Override + public void request(long n) { + for (int i = 0; i < n; i++) { + if (totalCount++ < 317) { + subscriber.onNext(String.valueOf(totalCount)); + } + else { + subscriber.onComplete(); + } } } - } - @Override - public void cancel() { - // do nothing - } - }); - }) - .subscribeOn(s) - .limitRate(10) - .doOnNext(d -> { - Mono.fromCallable(() -> d) - .subscribeOn(s2) - .block(); - }) - .blockLast(); - - s.dispose(); + @Override + public void cancel() { + // do nothing + } + }); + }) + .subscribeOn(s) + .limitRate(10) + .doOnNext(d -> { + CountDownLatch latch = new CountDownLatch(1); + Mono.fromCallable(() -> d) + .subscribeOn(s2) + .doFinally(it -> latch.countDown()) + .subscribe(); + + try { + if (!latch.await(5, TimeUnit.SECONDS)) { + timedOut.set(true); + } + } + catch (InterruptedException e) { + interrupted.set(true); + } + }) + .blockLast(Duration.ofSeconds(2)); + + assertThat(interrupted).as("interrupted").isFalse(); + assertThat(timedOut).as("latch timeout").isFalse(); + } + finally { + s.dispose(); + s2.dispose(); + } } } diff --git a/reactor-core/src/test/java/reactor/core/scheduler/SchedulersTest.java b/reactor-core/src/test/java/reactor/core/scheduler/SchedulersTest.java index 307e831317..661aa9d032 100644 --- a/reactor-core/src/test/java/reactor/core/scheduler/SchedulersTest.java +++ b/reactor-core/src/test/java/reactor/core/scheduler/SchedulersTest.java @@ -69,17 +69,17 @@ final static class TestSchedulers implements Schedulers.Factory { } public final Scheduler newElastic(int ttlSeconds, ThreadFactory threadFactory) { - assertThat(((Schedulers.SchedulerThreadFactory)threadFactory).get()).isEqualTo("unused"); + assertThat(((ReactorThreadFactory)threadFactory).get()).isEqualTo("unused"); return elastic; } public final Scheduler newParallel(int parallelism, ThreadFactory threadFactory) { - assertThat(((Schedulers.SchedulerThreadFactory)threadFactory).get()).isEqualTo("unused"); + assertThat(((ReactorThreadFactory)threadFactory).get()).isEqualTo("unused"); return parallel; } public final Scheduler newSingle(ThreadFactory threadFactory) { - assertThat(((Schedulers.SchedulerThreadFactory)threadFactory).get()).isEqualTo("unused"); + assertThat(((ReactorThreadFactory)threadFactory).get()).isEqualTo("unused"); return single; } } @@ -92,6 +92,118 @@ public void resetSchedulers() { Schedulers.resetFactory(); } + @Test + public void parallelSchedulerDefaultNonBlocking() throws InterruptedException { + Scheduler scheduler = Schedulers.newParallel("parallelSchedulerDefaultNonBlocking"); + CountDownLatch latch = new CountDownLatch(1); + AtomicReference errorRef = new AtomicReference<>(); + try { + scheduler.schedule(() -> { + try { + Mono.just("foo") + .hide() + .block(); + } + catch (Throwable t) { + errorRef.set(t); + } + finally { + latch.countDown(); + } + }); + latch.await(); + } + finally { + scheduler.dispose(); + } + + assertThat(errorRef.get()) + .isInstanceOf(IllegalStateException.class) + .hasMessageStartingWith("block()/blockFirst()/blockLast() are blocking, which is not supported in thread parallelSchedulerDefaultNonBlocking-"); + } + + @Test + public void singleSchedulerDefaultNonBlocking() throws InterruptedException { + Scheduler scheduler = Schedulers.newSingle("singleSchedulerDefaultNonBlocking"); + CountDownLatch latch = new CountDownLatch(1); + AtomicReference errorRef = new AtomicReference<>(); + try { + scheduler.schedule(() -> { + try { + Mono.just("foo") + .hide() + .block(); + } + catch (Throwable t) { + errorRef.set(t); + } + finally { + latch.countDown(); + } + }); + latch.await(); + } + finally { + scheduler.dispose(); + } + + assertThat(errorRef.get()) + .isInstanceOf(IllegalStateException.class) + .hasMessageStartingWith("block()/blockFirst()/blockLast() are blocking, which is not supported in thread singleSchedulerDefaultNonBlocking-"); + } + + @Test + public void elasticSchedulerDefaultBlockingOk() throws InterruptedException { + Scheduler scheduler = Schedulers.newElastic("elasticSchedulerDefaultNonBlocking"); + CountDownLatch latch = new CountDownLatch(1); + AtomicReference errorRef = new AtomicReference<>(); + try { + scheduler.schedule(() -> { + try { + Mono.just("foo") + .hide() + .block(); + } + catch (Throwable t) { + errorRef.set(t); + } + finally { + latch.countDown(); + } + }); + latch.await(); + } + finally { + scheduler.dispose(); + } + + assertThat(errorRef.get()).isNull(); + } + + @Test + public void isInNonBlockingThreadFalse() { + assertThat(Thread.currentThread()).isNotInstanceOf(NonBlocking.class); + + assertThat(Schedulers.isInNonBlockingThread()).as("isInNonBlockingThread").isFalse(); + } + + @Test + public void isNonBlockingThreadInstanceOf() { + Thread nonBlocking = new ReactorThreadFactory.NonBlockingThread(() -> {}, "isNonBlockingThreadInstanceOf_nonBlocking"); + Thread thread = new Thread(() -> {}, "isNonBlockingThreadInstanceOf_blocking"); + + assertThat(Schedulers.isNonBlockingThread(nonBlocking)).as("nonBlocking").isTrue(); + assertThat(Schedulers.isNonBlockingThread(thread)).as("thread").isFalse(); + } + + @Test + public void isInNonBlockingThreadTrue() { + new ReactorThreadFactory.NonBlockingThread(() -> assertThat(Schedulers.isInNonBlockingThread()) + .as("isInNonBlockingThread") + .isFalse(), + "isInNonBlockingThreadTrue"); + } + @Test public void handleErrorWithJvmFatalForwardsToUncaughtHandlerFusedCallable() { AtomicBoolean handlerCaught = new AtomicBoolean(); diff --git a/src/docs/asciidoc/apdx-operatorChoice.adoc b/src/docs/asciidoc/apdx-operatorChoice.adoc index a9a1f5beb5..036a9a14af 100644 --- a/src/docs/asciidoc/apdx-operatorChoice.adoc +++ b/src/docs/asciidoc/apdx-operatorChoice.adoc @@ -271,6 +271,9 @@ TIP: Note that this returns a `Flux>`, each inner `GroupedFlux [[which.blocking]] == Going Back to the Synchronous World +Note: all of these methods except `Mono#toFuture` will throw an `UnsupportedOperatorException` if called from +within a `Scheduler` marked as "non-blocking only" (by default `parallel()` and `single()`). + * I have a `Flux` and I want to: ** block until I can get the first element: `Flux#blockFirst` *** ...with a timeout: `Flux#blockFirst(Duration)` diff --git a/src/docs/asciidoc/coreFeatures.adoc b/src/docs/asciidoc/coreFeatures.adoc index c6ddf98248..b358b55ba7 100644 --- a/src/docs/asciidoc/coreFeatures.adoc +++ b/src/docs/asciidoc/coreFeatures.adoc @@ -212,6 +212,15 @@ it does not tie up other resources. See <>. * a fixed pool of workers that is tuned for parallel work (`Schedulers.parallel()`). It creates as many workers as you have CPU cores. +WARNING: While `elastic` is made to help with legacy blocking code if it cannot be avoided, +`single` and `parallel` are not. As a consequence, the use of Reactor blocking APIs +(`block()`, `blockFirst()`, `blockLast()`, `toIterable()` and `toStream()`) inside the +default single and parallel Schedulers will result in an `IllegalStateException` being thrown. + + + + +Custom `Schedulers` can also be marked as "non blocking only" by creating instances of `Thread` +that implement the `NonBlocking` marker interface. + Additionally, you can create a `Scheduler` out of any pre-existing `ExecutorService` by using `Schedulers.fromExecutorService(ExecutorService)`. (You can also create one from an `Executor`, although doing so is discouraged.) You can also create new instances of the