diff --git a/nima/http2/http2/src/main/java/io/helidon/nima/http2/Http2ConnectionWriter.java b/nima/http2/http2/src/main/java/io/helidon/nima/http2/Http2ConnectionWriter.java index 998a3febee8..9b65f4ef925 100755 --- a/nima/http2/http2/src/main/java/io/helidon/nima/http2/Http2ConnectionWriter.java +++ b/nima/http2/http2/src/main/java/io/helidon/nima/http2/Http2ConnectionWriter.java @@ -151,6 +151,8 @@ private T withStreamLock(Callable callable) { } finally { streamLock.unlock(); } + } catch (RuntimeException e) { + throw e; } catch (Exception e) { throw new RuntimeException(e); } diff --git a/nima/http2/webserver/src/main/java/io/helidon/nima/http2/webserver/Http2Connection.java b/nima/http2/webserver/src/main/java/io/helidon/nima/http2/webserver/Http2Connection.java index 8e8ab3922fa..43cd860ce2d 100755 --- a/nima/http2/webserver/src/main/java/io/helidon/nima/http2/webserver/Http2Connection.java +++ b/nima/http2/webserver/src/main/java/io/helidon/nima/http2/webserver/Http2Connection.java @@ -16,6 +16,8 @@ package io.helidon.nima.http2.webserver; +import java.io.UncheckedIOException; +import java.net.SocketException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -165,7 +167,9 @@ public void handle() throws InterruptedException { sendErrorDetails ? e.getMessage() : ""); connectionWriter.write(frame.toFrameData(clientSettings, 0, Http2Flag.NoFlags.create())); state = State.FINISHED; - } catch (CloseConnectionException | InterruptedException e) { + } catch (CloseConnectionException + | InterruptedException + | UncheckedIOException e) { throw e; } catch (Throwable e) { if (state == State.FINISHED) { @@ -489,7 +493,7 @@ private void ackSettings() { stream.headers(upgradeHeaders, !hasEntity); upgradeHeaders = null; ctx.executor() - .submit(new StreamRunnable(streams, stream, stream.streamId())); + .submit(new StreamRunnable(streams, stream, Thread.currentThread())); } } @@ -592,7 +596,7 @@ private void doHeaders() { // we now have all information needed to execute ctx.executor() - .submit(new StreamRunnable(streams, stream, stream.streamId())); + .submit(new StreamRunnable(streams, stream, Thread.currentThread())); } private void pingFrame() { @@ -768,21 +772,26 @@ private enum State { UNKNOWN } - private static final class StreamRunnable implements Runnable { - private final Map streams; - private final Http2Stream stream; - private final int streamId; - - private StreamRunnable(Map streams, Http2Stream stream, int streamId) { - this.streams = streams; - this.stream = stream; - this.streamId = streamId; - } + private record StreamRunnable(Map streams, + Http2Stream stream, + Thread handlerThread) implements Runnable { @Override public void run() { - stream.run(); - streams.remove(stream.streamId()); + try { + stream.run(); + } catch (UncheckedIOException e) { + // Broken connection + if (e.getCause() instanceof SocketException) { + // Interrupt handler thread + handlerThread.interrupt(); + LOGGER.log(DEBUG, "Socket error on writer thread", e); + } else { + throw e; + } + } finally { + streams.remove(stream.streamId()); + } } } diff --git a/nima/tests/integration/http2/server/src/test/java/io/helidon/nima/tests/integration/http2/webserver/CutConnectionTest.java b/nima/tests/integration/http2/server/src/test/java/io/helidon/nima/tests/integration/http2/webserver/CutConnectionTest.java new file mode 100644 index 00000000000..b71af81b910 --- /dev/null +++ b/nima/tests/integration/http2/server/src/test/java/io/helidon/nima/tests/integration/http2/webserver/CutConnectionTest.java @@ -0,0 +1,173 @@ +/* + * Copyright (c) 2023 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.nima.tests.integration.http2.webserver; + +import io.helidon.common.http.Http; +import io.helidon.logging.common.LogConfig; +import io.helidon.nima.http2.webserver.Http2Route; +import io.helidon.nima.webserver.WebServer; +import io.helidon.nima.webserver.http.ServerRequest; +import io.helidon.nima.webserver.http.ServerResponse; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.SocketException; +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.logging.Handler; +import java.util.logging.Level; +import java.util.logging.LogRecord; +import java.util.logging.Logger; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; + +public class CutConnectionTest { + private static final AssertingHandler ASSERTING_HANDLER = new AssertingHandler(); + private static final Logger NIMA_LOGGER = Logger.getLogger("io.helidon.nima"); + private static final int TIME_OUT_SEC = 10; + + static { + LogConfig.configureRuntime(); + } + + private final HttpClient client; + + public CutConnectionTest() { + client = HttpClient.newBuilder() + .version(HttpClient.Version.HTTP_2) + .connectTimeout(Duration.ofSeconds(20)) + .build(); + } + + + private static void stream(ServerRequest req, ServerResponse res) throws InterruptedException, IOException { + try (OutputStream os = res.outputStream()) { + for (int i = 0; i < 1000; i++) { + Thread.sleep(1); + os.write("TEST".getBytes()); + os.flush(); + } + } + } + + @Test + void testStringRoute() throws Exception { + CompletableFuture receivedFirstChunk = new CompletableFuture<>(); + ExecutorService exec = Executors.newSingleThreadExecutor(); + Level originalLevel = Level.INFO; + try { + originalLevel = NIMA_LOGGER.getLevel(); + NIMA_LOGGER.setLevel(Level.FINE); + NIMA_LOGGER.addHandler(ASSERTING_HANDLER); + WebServer server = WebServer.builder() + .host("localhost") + .routing(r -> r.route(Http2Route.route(Http.Method.GET, "/stream", CutConnectionTest::stream))) + .build(); + server.start(); + + URI uri = new URI("http://localhost:" + server.port()).resolve("/stream"); + + exec.submit(() -> { + try { + HttpResponse response = client.send(HttpRequest.newBuilder() + .timeout(Duration.ofSeconds(20)) + .uri(uri) + .GET() + .build(), HttpResponse.BodyHandlers.ofInputStream()); + try (InputStream is = response.body()) { + byte[] chunk; + for (int read = 0; read != -1; read = is.read(chunk)) { + receivedFirstChunk.complete(null); + chunk = new byte[4]; + } + } + } catch (IOException | InterruptedException e) { + // ignored + } + }); + receivedFirstChunk.get(TIME_OUT_SEC, TimeUnit.SECONDS); + exec.shutdownNow(); + assertThat(exec.awaitTermination(TIME_OUT_SEC, TimeUnit.SECONDS), is(true)); + server.stop(); + SocketClosedLog log = ASSERTING_HANDLER.socketClosedLog.get(TIME_OUT_SEC, TimeUnit.SECONDS); + assertThat(log.record.getLevel(), is(Level.FINE)); + } finally { + NIMA_LOGGER.removeHandler(ASSERTING_HANDLER); + NIMA_LOGGER.setLevel(originalLevel); + } + } + + + private record SocketClosedLog(LogRecord record, SocketException e) { + + } + + /** + * DEBUG level logging for attempts to write to closed socket is expected: + *
{@code
+     * 023.05.23 14:51:53 FINE io.helidon.nima.http2.webserver.Http2Connection !thread!: Socket error on writer thread
+     * java.io.UncheckedIOException: java.net.SocketException: Socket closed
+     * 	at io.helidon.common.buffers.FixedBufferData.writeTo(FixedBufferData.java:74)
+     * 	at io.helidon.common.buffers.CompositeArrayBufferData.writeTo(CompositeArrayBufferData.java:41)
+     * 	at io.helidon.common.socket.PlainSocket.write(PlainSocket.java:127)
+     *  ...
+     * Caused by: java.net.SocketException: Socket closed
+     * 	at java.base/sun.nio.ch.NioSocketImpl.ensureOpenAndConnected(NioSocketImpl.java:163)
+     *  ...
+     * 	at java.base/java.net.Socket$SocketOutputStream.write(Socket.java:1120)
+     * 	at io.helidon.common.buffers.FixedBufferData.writeTo(FixedBufferData.java:71)
+     * 	...
+     * 	}
+ */ + + private static class AssertingHandler extends Handler { + + CompletableFuture socketClosedLog = new CompletableFuture<>(); + + @Override + public void publish(LogRecord record) { + Throwable t = record.getThrown(); + if (t == null) return; + while (t.getCause() != null) { + t = t.getCause(); + } + if (t instanceof SocketException e) { + socketClosedLog.complete(new SocketClosedLog(record, e)); + } + } + + @Override + public void flush() { + + } + + @Override + public void close() throws SecurityException { + + } + } +}