Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proposal to implement a more efficient webserver shutdown strategy #5876

Merged
merged 8 commits into from
Jan 27, 2023
Merged
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022 Oracle and/or its affiliates.
* Copyright (c) 2022, 2023 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -67,4 +67,16 @@ public static <T> T assertThatWithRetry(String reason, Supplier<T> actualSupplie

throw new AssertionError(description.toString());
}

/**
* Checks the matcher, possibly multiple times after configured delays, invoking the supplier of the matched value each time,
* until either the matcher passes or the maximum retry expires.
* @param actualSupplier {@code Supplier} that furnishes the value to submit to the matcher
* @param matcher Hamcrest matcher which evaluates the supplied value
* @return the supplied value
* @param <T> type of the supplied value
*/
public static <T> T assertThatWithRetry(Supplier<T> actualSupplier, Matcher<? super T> matcher) {
return assertThatWithRetry("", actualSupplier, matcher);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright (c) 2023 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.helidon.nima.tests.integration.server;

import io.helidon.common.http.Http;
import io.helidon.nima.webclient.http1.Http1Client;
import io.helidon.nima.webserver.WebServer;
import org.junit.jupiter.api.Test;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.lessThan;

class WebServerStopIdleTest {

@Test
void stopWhenIdleExpectTimelyStop() {
WebServer webServer = WebServer.builder()
.routing(router -> router.get("ok", (req, res) -> res.send("ok")))
.build();
webServer.start();

Http1Client client = Http1Client.builder()
.baseUri("http://localhost:" + webServer.port())
.build();
try (var response = client.get("/ok").request()) {
assertThat(response.status(), is(Http.Status.OK_200));
assertThat(response.entity().as(String.class), is("ok"));
}

long startMillis = System.currentTimeMillis();
webServer.stop();
int stopExecutionTimeInMillis = (int) (System.currentTimeMillis() - startMillis);
assertThat(stopExecutionTimeInMillis, is(lessThan(500)));
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022 Oracle and/or its affiliates.
* Copyright (c) 2023 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -13,25 +13,28 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.helidon.nima.webserver;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
package io.helidon.nima.tests.integration.server;

import io.helidon.nima.webserver.WebServer;
import org.junit.jupiter.api.Test;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.lessThan;

class WebServerStopOnlyTest {

class ExecutorShutdownTest {
@Test
void testShutdown() {
try (ExecutorService executor = Executors.newThreadPerTaskExecutor(Thread.ofVirtual()
.allowSetThreadLocals(true)
.inheritInheritableThreadLocals(false)
.factory())) {
ServerListener.shutdownExecutor(executor);
assertThat(executor.isShutdown(), is(true));
}
void stoWhenNoRequestsExpectTimelyStop() {
WebServer webServer = WebServer.builder()
.routing(router -> router.get("ok", (req, res) -> res.send("ok")))
.build();
webServer.start();

long startMillis = System.currentTimeMillis();
webServer.stop();
int stopExecutionTimeInMillis = (int) (System.currentTimeMillis() - startMillis);
assertThat(stopExecutionTimeInMillis, is(lessThan(500)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
* Representation of a single channel between client and server.
* Everything in this class runs in the channel reader virtual thread
*/
class ConnectionHandler implements Runnable {
class ConnectionHandler implements InterruptableTask<Void> {
private static final System.Logger LOGGER = System.getLogger(ConnectionHandler.class.getName());

private final ConnectionProviders connectionProviders;
Expand Down Expand Up @@ -82,6 +82,11 @@ class ConnectionHandler implements Runnable {
maxPayloadSize);
}

@Override
public boolean canInterrupt() {
return connection instanceof InterruptableTask<?> task && task.canInterrupt();
}

@Override
public final void run() {
Thread.currentThread().setName("[" + socket.socketId() + " " + socket.childSocketId() + "] Nima socket");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright (c) 2023 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.helidon.nima.webserver;

import java.io.Closeable;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

/**
* A simplified {@link java.util.concurrent.ExecutorService} that can execute
* {@link InterruptableTask}s and can be efficiently terminated. A thread that is
* waiting to read on an open connection cannot be efficiently stopped. This
* executor will query the thread and interrupt it if possible. It is important
* to efficiently shut down the Nima webserver in certain environments.
*/
interface HelidonTaskExecutor extends Closeable {

/**
* Executes a task.
*
* @param task an interruptable task
* @param <T> type ov value returned by task
* @return a future for a value returned by the task
*/
<T> Future<T> execute(InterruptableTask<T> task);

/**
* Verifies if the executor is terminated.
*
* @return outcome of test
*/
boolean isTerminated();

/**
* Terminate executor waiting for any running task to complete for a specified
* timeout period. It will only wait for those {@link InterruptableTask}s that
* are not interruptable.
*
* @param timeout timeout period
* @param unit timeout period unit
* @return outcome of shutdown process
*/
boolean terminate(long timeout, TimeUnit unit);

/**
* Force termination by forcefully interrupting all tasks. Shall only be called
* if {@link #terminate} returns {@code false}.
*/
void forceTerminate();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright (c) 2023 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.helidon.nima.webserver;

import java.util.concurrent.Callable;

/**
* An interruptable task that can implements both {@link Runnable} and
* {@link Callable}.
*
* @param <T> type of value returned by task
*/
public interface InterruptableTask<T> extends Runnable, Callable<T> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Misspelled; should be Interruptible


/**
* Signals if a task can be interrupted at the time this method is called.
*
* @return outcome of interruptable test
*/
boolean canInterrupt();

@Override
default void run() {
try {
call();
} catch (Exception e) {
throw new RuntimeException(e);
}
}

@Override
default T call() throws Exception {
run();
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import io.helidon.nima.webserver.http.DirectHandlers;
import io.helidon.nima.webserver.spi.ServerConnectionSelector;

import static java.lang.System.Logger.Level.DEBUG;
import static java.lang.System.Logger.Level.ERROR;
import static java.lang.System.Logger.Level.INFO;
import static java.lang.System.Logger.Level.TRACE;
Expand All @@ -57,7 +58,7 @@ class ServerListener {
private final String socketName;
private final ListenerConfiguration listenerConfig;
private final Router router;
private final ExecutorService readerExecutor;
private final HelidonTaskExecutor readerExecutor;
private final ExecutorService sharedExecutor;
private final Thread serverThread;
private final DirectHandlers simpleHandlers;
Expand Down Expand Up @@ -93,7 +94,7 @@ class ServerListener {
.name("server-" + socketName + "-listener")
.unstarted(this::listen);
this.simpleHandlers = simpleHandlers;
this.readerExecutor = Executors.newThreadPerTaskExecutor(Thread.ofVirtual()
this.readerExecutor = ThreadPerTaskExecutor.create(Thread.ofVirtual()
.allowSetThreadLocals(true)
.inheritInheritableThreadLocals(inheritThreadLocals)
.factory());
Expand Down Expand Up @@ -131,11 +132,30 @@ void stop() {
}
running = false;
try {
// Attempt to wait until existing channels finish execution
shutdownExecutor(readerExecutor);
shutdownExecutor(sharedExecutor);

// Stop listening for connections
serverSocket.close();

// Shutdown reader executor
readerExecutor.terminate(EXECUTOR_SHUTDOWN_MILLIS, TimeUnit.MILLISECONDS);
if (!readerExecutor.isTerminated()) {
LOGGER.log(DEBUG, "Some tasks in reader executor did not terminate gracefully");
readerExecutor.forceTerminate();
}

// Shutdown shared executor
try {
sharedExecutor.shutdown();
boolean done = sharedExecutor.awaitTermination(EXECUTOR_SHUTDOWN_MILLIS, TimeUnit.MILLISECONDS);
if (!done) {
List<Runnable> running = sharedExecutor.shutdownNow();
if (!running.isEmpty()) {
LOGGER.log(DEBUG, running.size() + " tasks in shared executor did not terminate gracefully");
}
}
} catch (InterruptedException e) {
// falls through
}

} catch (IOException e) {
LOGGER.log(INFO, "Exception thrown on socket close", e);
}
Expand Down Expand Up @@ -252,7 +272,7 @@ private void listen() {
listenerConfig.maxPayloadSize(),
simpleHandlers);

readerExecutor.submit(handler);
readerExecutor.execute(handler);
} catch (RejectedExecutionException e) {
LOGGER.log(ERROR, "Executor rejected handler for new connection");
} catch (Exception e) {
Expand All @@ -277,23 +297,4 @@ private void listen() {
LOGGER.log(INFO, String.format("[%s] %s socket closed.", serverChannelId, socketName));
closeFuture.complete(null);
}

/**
* Shutdown an executor by waiting for a period of time.
*
* @param executor executor to shut down
*/
static void shutdownExecutor(ExecutorService executor) {
try {
boolean terminate = executor.awaitTermination(EXECUTOR_SHUTDOWN_MILLIS, TimeUnit.MILLISECONDS);
if (!terminate) {
List<Runnable> running = executor.shutdownNow();
if (!running.isEmpty()) {
LOGGER.log(INFO, running.size() + " channel tasks did not terminate gracefully");
}
}
} catch (InterruptedException e) {
LOGGER.log(INFO, "InterruptedException caught while shutting down channel tasks");
}
}
}
Loading