diff --git a/common/testing/junit5/src/main/java/io/helidon/common/testing/junit5/MatcherWithRetry.java b/common/testing/junit5/src/main/java/io/helidon/common/testing/junit5/MatcherWithRetry.java index 295df8105be..a25491c979a 100644 --- a/common/testing/junit5/src/main/java/io/helidon/common/testing/junit5/MatcherWithRetry.java +++ b/common/testing/junit5/src/main/java/io/helidon/common/testing/junit5/MatcherWithRetry.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022 Oracle and/or its affiliates. + * Copyright (c) 2022, 2023 Oracle and/or its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -67,4 +67,16 @@ public static T assertThatWithRetry(String reason, Supplier actualSupplie throw new AssertionError(description.toString()); } + + /** + * Checks the matcher, possibly multiple times after configured delays, invoking the supplier of the matched value each time, + * until either the matcher passes or the maximum retry expires. + * @param actualSupplier {@code Supplier} that furnishes the value to submit to the matcher + * @param matcher Hamcrest matcher which evaluates the supplied value + * @return the supplied value + * @param type of the supplied value + */ + public static T assertThatWithRetry(Supplier actualSupplier, Matcher matcher) { + return assertThatWithRetry("", actualSupplier, matcher); + } } diff --git a/nima/tests/integration/webserver/webserver/src/test/java/io/helidon/nima/tests/integration/server/WebServerStopIdleTest.java b/nima/tests/integration/webserver/webserver/src/test/java/io/helidon/nima/tests/integration/server/WebServerStopIdleTest.java new file mode 100644 index 00000000000..17d8faba09c --- /dev/null +++ b/nima/tests/integration/webserver/webserver/src/test/java/io/helidon/nima/tests/integration/server/WebServerStopIdleTest.java @@ -0,0 +1,50 @@ +/* + * Copyright (c) 2023 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.nima.tests.integration.server; + +import io.helidon.common.http.Http; +import io.helidon.nima.webclient.http1.Http1Client; +import io.helidon.nima.webserver.WebServer; +import org.junit.jupiter.api.Test; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.lessThan; + +class WebServerStopIdleTest { + + @Test + void stopWhenIdleExpectTimelyStop() { + WebServer webServer = WebServer.builder() + .routing(router -> router.get("ok", (req, res) -> res.send("ok"))) + .build(); + webServer.start(); + + Http1Client client = Http1Client.builder() + .baseUri("http://localhost:" + webServer.port()) + .build(); + try (var response = client.get("/ok").request()) { + assertThat(response.status(), is(Http.Status.OK_200)); + assertThat(response.entity().as(String.class), is("ok")); + } + + long startMillis = System.currentTimeMillis(); + webServer.stop(); + int stopExecutionTimeInMillis = (int) (System.currentTimeMillis() - startMillis); + assertThat(stopExecutionTimeInMillis, is(lessThan(500))); + } +} diff --git a/nima/webserver/webserver/src/test/java/io/helidon/nima/webserver/ExecutorShutdownTest.java b/nima/tests/integration/webserver/webserver/src/test/java/io/helidon/nima/tests/integration/server/WebServerStopOnlyTest.java similarity index 50% rename from nima/webserver/webserver/src/test/java/io/helidon/nima/webserver/ExecutorShutdownTest.java rename to nima/tests/integration/webserver/webserver/src/test/java/io/helidon/nima/tests/integration/server/WebServerStopOnlyTest.java index 325906cbb83..aa616960a68 100644 --- a/nima/webserver/webserver/src/test/java/io/helidon/nima/webserver/ExecutorShutdownTest.java +++ b/nima/tests/integration/webserver/webserver/src/test/java/io/helidon/nima/tests/integration/server/WebServerStopOnlyTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022 Oracle and/or its affiliates. + * Copyright (c) 2023 Oracle and/or its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -13,25 +13,28 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.helidon.nima.webserver; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +package io.helidon.nima.tests.integration.server; +import io.helidon.nima.webserver.WebServer; import org.junit.jupiter.api.Test; -import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.lessThan; + +class WebServerStopOnlyTest { -class ExecutorShutdownTest { @Test - void testShutdown() { - try (ExecutorService executor = Executors.newThreadPerTaskExecutor(Thread.ofVirtual() - .allowSetThreadLocals(true) - .inheritInheritableThreadLocals(false) - .factory())) { - ServerListener.shutdownExecutor(executor); - assertThat(executor.isShutdown(), is(true)); - } + void stoWhenNoRequestsExpectTimelyStop() { + WebServer webServer = WebServer.builder() + .routing(router -> router.get("ok", (req, res) -> res.send("ok"))) + .build(); + webServer.start(); + + long startMillis = System.currentTimeMillis(); + webServer.stop(); + int stopExecutionTimeInMillis = (int) (System.currentTimeMillis() - startMillis); + assertThat(stopExecutionTimeInMillis, is(lessThan(500))); } } diff --git a/nima/webserver/webserver/src/main/java/io/helidon/nima/webserver/ConnectionHandler.java b/nima/webserver/webserver/src/main/java/io/helidon/nima/webserver/ConnectionHandler.java index f1f6bc2b7a2..4325d964d4d 100644 --- a/nima/webserver/webserver/src/main/java/io/helidon/nima/webserver/ConnectionHandler.java +++ b/nima/webserver/webserver/src/main/java/io/helidon/nima/webserver/ConnectionHandler.java @@ -39,7 +39,7 @@ * Representation of a single channel between client and server. * Everything in this class runs in the channel reader virtual thread */ -class ConnectionHandler implements Runnable { +class ConnectionHandler implements InterruptableTask { private static final System.Logger LOGGER = System.getLogger(ConnectionHandler.class.getName()); private final ConnectionProviders connectionProviders; @@ -82,6 +82,11 @@ class ConnectionHandler implements Runnable { maxPayloadSize); } + @Override + public boolean canInterrupt() { + return connection instanceof InterruptableTask task && task.canInterrupt(); + } + @Override public final void run() { Thread.currentThread().setName("[" + socket.socketId() + " " + socket.childSocketId() + "] Nima socket"); diff --git a/nima/webserver/webserver/src/main/java/io/helidon/nima/webserver/HelidonTaskExecutor.java b/nima/webserver/webserver/src/main/java/io/helidon/nima/webserver/HelidonTaskExecutor.java new file mode 100644 index 00000000000..b7bbc8e10d6 --- /dev/null +++ b/nima/webserver/webserver/src/main/java/io/helidon/nima/webserver/HelidonTaskExecutor.java @@ -0,0 +1,64 @@ +/* + * Copyright (c) 2023 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.nima.webserver; + +import java.io.Closeable; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +/** + * A simplified {@link java.util.concurrent.ExecutorService} that can execute + * {@link InterruptableTask}s and can be efficiently terminated. A thread that is + * waiting to read on an open connection cannot be efficiently stopped. This + * executor will query the thread and interrupt it if possible. It is important + * to efficiently shut down the Nima webserver in certain environments. + */ +interface HelidonTaskExecutor extends Closeable { + + /** + * Executes a task. + * + * @param task an interruptable task + * @param type ov value returned by task + * @return a future for a value returned by the task + */ + Future execute(InterruptableTask task); + + /** + * Verifies if the executor is terminated. + * + * @return outcome of test + */ + boolean isTerminated(); + + /** + * Terminate executor waiting for any running task to complete for a specified + * timeout period. It will only wait for those {@link InterruptableTask}s that + * are not interruptable. + * + * @param timeout timeout period + * @param unit timeout period unit + * @return outcome of shutdown process + */ + boolean terminate(long timeout, TimeUnit unit); + + /** + * Force termination by forcefully interrupting all tasks. Shall only be called + * if {@link #terminate} returns {@code false}. + */ + void forceTerminate(); +} diff --git a/nima/webserver/webserver/src/main/java/io/helidon/nima/webserver/InterruptableTask.java b/nima/webserver/webserver/src/main/java/io/helidon/nima/webserver/InterruptableTask.java new file mode 100644 index 00000000000..1b5f3e493de --- /dev/null +++ b/nima/webserver/webserver/src/main/java/io/helidon/nima/webserver/InterruptableTask.java @@ -0,0 +1,50 @@ +/* + * Copyright (c) 2023 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.nima.webserver; + +import java.util.concurrent.Callable; + +/** + * An interruptable task that can implements both {@link Runnable} and + * {@link Callable}. + * + * @param type of value returned by task + */ +public interface InterruptableTask extends Runnable, Callable { + + /** + * Signals if a task can be interrupted at the time this method is called. + * + * @return outcome of interruptable test + */ + boolean canInterrupt(); + + @Override + default void run() { + try { + call(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + default T call() throws Exception { + run(); + return null; + } +} diff --git a/nima/webserver/webserver/src/main/java/io/helidon/nima/webserver/ServerListener.java b/nima/webserver/webserver/src/main/java/io/helidon/nima/webserver/ServerListener.java index f21b1930058..da9340e1f32 100644 --- a/nima/webserver/webserver/src/main/java/io/helidon/nima/webserver/ServerListener.java +++ b/nima/webserver/webserver/src/main/java/io/helidon/nima/webserver/ServerListener.java @@ -44,6 +44,7 @@ import io.helidon.nima.webserver.http.DirectHandlers; import io.helidon.nima.webserver.spi.ServerConnectionSelector; +import static java.lang.System.Logger.Level.DEBUG; import static java.lang.System.Logger.Level.ERROR; import static java.lang.System.Logger.Level.INFO; import static java.lang.System.Logger.Level.TRACE; @@ -57,7 +58,7 @@ class ServerListener { private final String socketName; private final ListenerConfiguration listenerConfig; private final Router router; - private final ExecutorService readerExecutor; + private final HelidonTaskExecutor readerExecutor; private final ExecutorService sharedExecutor; private final Thread serverThread; private final DirectHandlers simpleHandlers; @@ -93,7 +94,7 @@ class ServerListener { .name("server-" + socketName + "-listener") .unstarted(this::listen); this.simpleHandlers = simpleHandlers; - this.readerExecutor = Executors.newThreadPerTaskExecutor(Thread.ofVirtual() + this.readerExecutor = ThreadPerTaskExecutor.create(Thread.ofVirtual() .allowSetThreadLocals(true) .inheritInheritableThreadLocals(inheritThreadLocals) .factory()); @@ -131,11 +132,30 @@ void stop() { } running = false; try { - // Attempt to wait until existing channels finish execution - shutdownExecutor(readerExecutor); - shutdownExecutor(sharedExecutor); - + // Stop listening for connections serverSocket.close(); + + // Shutdown reader executor + readerExecutor.terminate(EXECUTOR_SHUTDOWN_MILLIS, TimeUnit.MILLISECONDS); + if (!readerExecutor.isTerminated()) { + LOGGER.log(DEBUG, "Some tasks in reader executor did not terminate gracefully"); + readerExecutor.forceTerminate(); + } + + // Shutdown shared executor + try { + sharedExecutor.shutdown(); + boolean done = sharedExecutor.awaitTermination(EXECUTOR_SHUTDOWN_MILLIS, TimeUnit.MILLISECONDS); + if (!done) { + List running = sharedExecutor.shutdownNow(); + if (!running.isEmpty()) { + LOGGER.log(DEBUG, running.size() + " tasks in shared executor did not terminate gracefully"); + } + } + } catch (InterruptedException e) { + // falls through + } + } catch (IOException e) { LOGGER.log(INFO, "Exception thrown on socket close", e); } @@ -252,7 +272,7 @@ private void listen() { listenerConfig.maxPayloadSize(), simpleHandlers); - readerExecutor.submit(handler); + readerExecutor.execute(handler); } catch (RejectedExecutionException e) { LOGGER.log(ERROR, "Executor rejected handler for new connection"); } catch (Exception e) { @@ -277,23 +297,4 @@ private void listen() { LOGGER.log(INFO, String.format("[%s] %s socket closed.", serverChannelId, socketName)); closeFuture.complete(null); } - - /** - * Shutdown an executor by waiting for a period of time. - * - * @param executor executor to shut down - */ - static void shutdownExecutor(ExecutorService executor) { - try { - boolean terminate = executor.awaitTermination(EXECUTOR_SHUTDOWN_MILLIS, TimeUnit.MILLISECONDS); - if (!terminate) { - List running = executor.shutdownNow(); - if (!running.isEmpty()) { - LOGGER.log(INFO, running.size() + " channel tasks did not terminate gracefully"); - } - } - } catch (InterruptedException e) { - LOGGER.log(INFO, "InterruptedException caught while shutting down channel tasks"); - } - } } diff --git a/nima/webserver/webserver/src/main/java/io/helidon/nima/webserver/ThreadPerTaskExecutor.java b/nima/webserver/webserver/src/main/java/io/helidon/nima/webserver/ThreadPerTaskExecutor.java new file mode 100644 index 00000000000..1f68aa7ab6d --- /dev/null +++ b/nima/webserver/webserver/src/main/java/io/helidon/nima/webserver/ThreadPerTaskExecutor.java @@ -0,0 +1,250 @@ +/* + * Copyright (c) 2023 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.nima.webserver; + +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; + +/** + * An implementation of {@link HelidonTaskExecutor}. Implementation is a simplified + * version of ThreadPerTaskExecutor in the JDK library. Upon termination, this + * executor shall interrupt all tasks whose {@link InterruptableTask#canInterrupt()} + * method return {@code true} in an attempt to stop as fast as possible. + */ +class ThreadPerTaskExecutor implements HelidonTaskExecutor { + + private final ThreadFactory factory; + private final Map threadTasks = new ConcurrentHashMap<>(); + private final CountDownLatch terminationSignal = new CountDownLatch(1); + + // states: RUNNING -> SHUTDOWN -> TERMINATED + private static final int RUNNING = 0; + private static final int SHUTDOWN = 1; + private static final int TERMINATED = 2; + private final AtomicInteger state = new AtomicInteger(); + + private ThreadPerTaskExecutor(ThreadFactory factory) { + this.factory = Objects.requireNonNull(factory); + } + + static HelidonTaskExecutor create(ThreadFactory factory) { + return new ThreadPerTaskExecutor(factory); + } + + @Override + public Future execute(InterruptableTask task) { + return submit(task); + } + + @Override + public boolean isTerminated() { + return state.get() >= TERMINATED; + } + + @Override + public boolean terminate(long timeout, TimeUnit unit) { + if (isTerminated()) { + return true; + } + if (state.compareAndSet(RUNNING, SHUTDOWN)) { + // attempt to stop interruptable tasks first + Set interrupted = tryStopInterruptableTasks(); + interrupted.forEach(threadTasks::remove); + if (threadTasks.isEmpty()) { + return state.compareAndSet(SHUTDOWN, TERMINATED); + } + try { + boolean result = terminationSignal.await(timeout, unit); + return result && state.compareAndSet(SHUTDOWN, TERMINATED); + } catch (InterruptedException e) { + // falls through + } + } + return false; + } + + @Override + public void forceTerminate() { + if (!isTerminated()) { + if (state.get() == RUNNING) { + throw new IllegalArgumentException("Must call terminate(long, TimeUnit) first" + + " to attempt graceful termination"); + } + if (state.compareAndSet(SHUTDOWN, TERMINATED)) { + // force interruption of all tasks + threadTasks.keySet().forEach(Thread::interrupt); + } + } + } + + /** + * Sends interrupt signal to all tasks that can be interrupted at this time + * and returns that set. + */ + private Set tryStopInterruptableTasks() { + return threadTasks.entrySet().stream() + .filter(entry -> entry.getKey().isAlive() && entry.getKey().getState() == Thread.State.WAITING) + .filter(entry -> { + // send interrupt signal to thread if possible + if (entry.getValue() instanceof InterruptableTask task && task.canInterrupt()) { + entry.getKey().interrupt(); + return true; + } + return false; + }) + .map(Map.Entry::getKey) + .collect(Collectors.toSet()); + } + + @Override + public void close() { + terminate(0, TimeUnit.SECONDS); + forceTerminate(); + } + + /** + * Creates a thread to run the given task. + */ + private Thread newThread(Runnable task) { + Thread thread = factory.newThread(task); + if (thread == null) { + throw new RejectedExecutionException(); + } + return thread; + } + + /** + * Notify the executor that the task executed by the given thread is complete. + * If the executor has been shutdown then this method will count down the + * termination signal. + */ + private void taskComplete(Thread thread) { + threadTasks.remove(thread); + if (state.get() == SHUTDOWN && threadTasks.isEmpty()) { + terminationSignal.countDown(); + } + } + + /** + * Adds a thread to the set of threads and starts it. + * + * @throws RejectedExecutionException if task not started + */ + private void start(Thread thread, Object task) { + assert thread.getState() == Thread.State.NEW; + + // keeps track of thread/task association + threadTasks.put(thread, task); + + boolean started = false; + try { + if (state.get() == RUNNING) { + thread.start(); + started = true; + } + } finally { + if (!started) { + taskComplete(thread); + } + } + + // throw REE if thread not started and no exception thrown + if (!started) { + throw new RejectedExecutionException(); + } + } + + private Future submit(Callable task) { + Objects.requireNonNull(task); + ensureNotShutdown(); + var future = new ThreadPerTaskExecutor.ThreadBoundFuture<>(this, task); + Thread thread = future.thread(); + start(thread, task); + return future; + } + + /** + * Throws RejectedExecutionException if the executor has been shutdown. + */ + private void ensureNotShutdown() { + if (state.get() >= SHUTDOWN) { + // shutdown or terminated + throw new RejectedExecutionException(); + } + } + + /** + * A Future for a task that runs in its own thread. The thread is + * created (but not started) when the Future is created. The thread + * is interrupted when the future is cancelled. The executor is + * notified when the task completes. + */ + private static class ThreadBoundFuture extends CompletableFuture implements Runnable { + + private final ThreadPerTaskExecutor executor; + private final Callable task; + private final Thread thread; + + ThreadBoundFuture(ThreadPerTaskExecutor executor, Callable task) { + this.executor = executor; + this.task = task; + this.thread = executor.newThread(this); + } + + Thread thread() { + return thread; + } + + @Override + public void run() { + if (Thread.currentThread() != thread) { + // should not happen except where something casts this object + // to a Runnable and invokes the run method. + throw new WrongThreadException(); + } + try { + T result = task.call(); + complete(result); + } catch (Throwable e) { + completeExceptionally(e); + } finally { + executor.taskComplete(thread); + } + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + boolean cancelled = super.cancel(mayInterruptIfRunning); + if (cancelled && mayInterruptIfRunning) { + thread.interrupt(); + } + return cancelled; + } + } +} + diff --git a/nima/webserver/webserver/src/main/java/io/helidon/nima/webserver/http1/Http1Connection.java b/nima/webserver/webserver/src/main/java/io/helidon/nima/webserver/http1/Http1Connection.java index 535a4ddb0e9..916bd76dcea 100644 --- a/nima/webserver/webserver/src/main/java/io/helidon/nima/webserver/http1/Http1Connection.java +++ b/nima/webserver/webserver/src/main/java/io/helidon/nima/webserver/http1/Http1Connection.java @@ -39,6 +39,7 @@ import io.helidon.nima.http.encoding.ContentEncodingContext; import io.helidon.nima.webserver.CloseConnectionException; import io.helidon.nima.webserver.ConnectionContext; +import io.helidon.nima.webserver.InterruptableTask; import io.helidon.nima.webserver.http.DirectTransportRequest; import io.helidon.nima.webserver.http.HttpRouting; import io.helidon.nima.webserver.http1.spi.Http1Upgrader; @@ -50,7 +51,7 @@ /** * HTTP/1.1 server connection. */ -public class Http1Connection implements ServerConnection { +public class Http1Connection implements ServerConnection, InterruptableTask { private static final System.Logger LOGGER = System.getLogger(Http1Connection.class.getName()); private final ConnectionContext ctx; @@ -73,6 +74,8 @@ public class Http1Connection implements ServerConnection { private long currentEntitySize; private long currentEntitySizeRead; + private volatile boolean currentlyReadingPrologue; + /** * Create a new connection. * @@ -98,13 +101,20 @@ public Http1Connection(ConnectionContext ctx, this.maxPayloadSize = ctx.maxPayloadSize(); } + @Override + public boolean canInterrupt() { + return currentlyReadingPrologue; + } + @Override public void handle() throws InterruptedException { try { // handle connection until an exception (or explicit connection close) while (true) { // prologue (first line of request) + currentlyReadingPrologue = true; HttpPrologue prologue = http1prologue.readPrologue(); + currentlyReadingPrologue = false; recvListener.prologue(ctx, prologue); currentEntitySize = 0; currentEntitySizeRead = 0; diff --git a/nima/webserver/webserver/src/test/java/io/helidon/nima/webserver/ThreadPerTaskExecutorTest.java b/nima/webserver/webserver/src/test/java/io/helidon/nima/webserver/ThreadPerTaskExecutorTest.java new file mode 100644 index 00000000000..beea0637d15 --- /dev/null +++ b/nima/webserver/webserver/src/test/java/io/helidon/nima/webserver/ThreadPerTaskExecutorTest.java @@ -0,0 +1,138 @@ +/* + * Copyright (c) 2022, 2023 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.helidon.nima.webserver; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import org.junit.jupiter.api.Test; + +import static io.helidon.common.testing.junit5.MatcherWithRetry.assertThatWithRetry; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.CoreMatchers.is; + +class ThreadPerTaskExecutorTest { + + @Test + void testTerminate() { + HelidonTaskExecutor executor = newExecutor(); + assertThat(executor.isTerminated(), is(false)); + executor.terminate(1, TimeUnit.SECONDS); + assertThat(executor.isTerminated(), is(true)); + } + + @Test + void testTerminateInterruptableTask() throws Exception { + HelidonTaskExecutor executor = newExecutor(); + + // Submit task and ensure it is waiting + Task task = new Task(true); + executor.execute(task); + task.waitUntilStarted(); + assertThatWithRetry(task.thread()::getState, is(Thread.State.WAITING)); + + // There should be no need to force termination + assertThat(executor.isTerminated(), is(false)); + executor.terminate(1, TimeUnit.SECONDS); + assertThat(executor.isTerminated(), is(true)); + } + + @Test + void testTerminateNonInterruptableTask() throws Exception { + HelidonTaskExecutor executor = newExecutor(); + + // Submit task and ensure it is waiting + Task task = new Task(false); + executor.execute(task); + task.waitUntilStarted(); + assertThatWithRetry(task.thread()::getState, is(Thread.State.WAITING)); + + // Force termination should be required + assertThat(executor.isTerminated(), is(false)); + executor.terminate(1, TimeUnit.SECONDS); + assertThat(executor.isTerminated(), is(false)); + executor.forceTerminate(); + assertThat(executor.isTerminated(), is(true)); + } + + private static HelidonTaskExecutor newExecutor() { + return ThreadPerTaskExecutor.create(Thread.ofVirtual() + .allowSetThreadLocals(true) + .inheritInheritableThreadLocals(false) + .factory()); + } + + /** + * An interruptable task. + */ + private static class Task implements InterruptableTask { + + private Thread thread; + private final boolean canInterrupt; + private final Barrier barrier; + private final CountDownLatch started = new CountDownLatch(1); + + Task(boolean canInterrupt) { + this.canInterrupt = canInterrupt; + this.barrier = new Barrier(); + } + + @Override + public boolean canInterrupt() { + return canInterrupt; + } + + void waitUntilStarted() throws InterruptedException { + started.await(); + } + + @Override + public void run() { + try { + thread = Thread.currentThread(); + started.countDown(); + barrier.waitOn(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + Thread thread() { + return thread; + } + + Barrier barrier() { + return barrier; + } + } + + /** + * A barrier is used to force a thread to wait (block) until it is retracted. + */ + private static class Barrier { + private final CompletableFuture future = new CompletableFuture<>(); + + void waitOn() throws ExecutionException, InterruptedException { + future.get(); + } + + void retract() { + future.complete(null); + } + } +}