Skip to content

Commit

Permalink
6871 Writing to closed socket with a silent error (#6887)
Browse files Browse the repository at this point in the history
  • Loading branch information
danielkec authored Jun 7, 2023
1 parent 3dfa7ce commit bc225b5
Show file tree
Hide file tree
Showing 3 changed files with 199 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,8 @@ private <T> T withStreamLock(Callable<T> callable) {
} finally {
streamLock.unlock();
}
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package io.helidon.nima.http2.webserver;

import java.io.UncheckedIOException;
import java.net.SocketException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -165,7 +167,9 @@ public void handle() throws InterruptedException {
sendErrorDetails ? e.getMessage() : "");
connectionWriter.write(frame.toFrameData(clientSettings, 0, Http2Flag.NoFlags.create()));
state = State.FINISHED;
} catch (CloseConnectionException | InterruptedException e) {
} catch (CloseConnectionException
| InterruptedException
| UncheckedIOException e) {
throw e;
} catch (Throwable e) {
if (state == State.FINISHED) {
Expand Down Expand Up @@ -489,7 +493,7 @@ private void ackSettings() {
stream.headers(upgradeHeaders, !hasEntity);
upgradeHeaders = null;
ctx.executor()
.submit(new StreamRunnable(streams, stream, stream.streamId()));
.submit(new StreamRunnable(streams, stream, Thread.currentThread()));
}
}

Expand Down Expand Up @@ -592,7 +596,7 @@ private void doHeaders() {

// we now have all information needed to execute
ctx.executor()
.submit(new StreamRunnable(streams, stream, stream.streamId()));
.submit(new StreamRunnable(streams, stream, Thread.currentThread()));
}

private void pingFrame() {
Expand Down Expand Up @@ -768,21 +772,26 @@ private enum State {
UNKNOWN
}

private static final class StreamRunnable implements Runnable {
private final Map<Integer, StreamContext> streams;
private final Http2Stream stream;
private final int streamId;

private StreamRunnable(Map<Integer, StreamContext> streams, Http2Stream stream, int streamId) {
this.streams = streams;
this.stream = stream;
this.streamId = streamId;
}
private record StreamRunnable(Map<Integer, StreamContext> streams,
Http2Stream stream,
Thread handlerThread) implements Runnable {

@Override
public void run() {
stream.run();
streams.remove(stream.streamId());
try {
stream.run();
} catch (UncheckedIOException e) {
// Broken connection
if (e.getCause() instanceof SocketException) {
// Interrupt handler thread
handlerThread.interrupt();
LOGGER.log(DEBUG, "Socket error on writer thread", e);
} else {
throw e;
}
} finally {
streams.remove(stream.streamId());
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
/*
* Copyright (c) 2023 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.helidon.nima.tests.integration.http2.webserver;

import io.helidon.common.http.Http;
import io.helidon.logging.common.LogConfig;
import io.helidon.nima.http2.webserver.Http2Route;
import io.helidon.nima.webserver.WebServer;
import io.helidon.nima.webserver.http.ServerRequest;
import io.helidon.nima.webserver.http.ServerResponse;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.SocketException;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.logging.Handler;
import java.util.logging.Level;
import java.util.logging.LogRecord;
import java.util.logging.Logger;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;

public class CutConnectionTest {
private static final AssertingHandler ASSERTING_HANDLER = new AssertingHandler();
private static final Logger NIMA_LOGGER = Logger.getLogger("io.helidon.nima");
private static final int TIME_OUT_SEC = 10;

static {
LogConfig.configureRuntime();
}

private final HttpClient client;

public CutConnectionTest() {
client = HttpClient.newBuilder()
.version(HttpClient.Version.HTTP_2)
.connectTimeout(Duration.ofSeconds(20))
.build();
}


private static void stream(ServerRequest req, ServerResponse res) throws InterruptedException, IOException {
try (OutputStream os = res.outputStream()) {
for (int i = 0; i < 1000; i++) {
Thread.sleep(1);
os.write("TEST".getBytes());
os.flush();
}
}
}

@Test
void testStringRoute() throws Exception {
CompletableFuture<Void> receivedFirstChunk = new CompletableFuture<>();
ExecutorService exec = Executors.newSingleThreadExecutor();
Level originalLevel = Level.INFO;
try {
originalLevel = NIMA_LOGGER.getLevel();
NIMA_LOGGER.setLevel(Level.FINE);
NIMA_LOGGER.addHandler(ASSERTING_HANDLER);
WebServer server = WebServer.builder()
.host("localhost")
.routing(r -> r.route(Http2Route.route(Http.Method.GET, "/stream", CutConnectionTest::stream)))
.build();
server.start();

URI uri = new URI("http://localhost:" + server.port()).resolve("/stream");

exec.submit(() -> {
try {
HttpResponse<InputStream> response = client.send(HttpRequest.newBuilder()
.timeout(Duration.ofSeconds(20))
.uri(uri)
.GET()
.build(), HttpResponse.BodyHandlers.ofInputStream());
try (InputStream is = response.body()) {
byte[] chunk;
for (int read = 0; read != -1; read = is.read(chunk)) {
receivedFirstChunk.complete(null);
chunk = new byte[4];
}
}
} catch (IOException | InterruptedException e) {
// ignored
}
});
receivedFirstChunk.get(TIME_OUT_SEC, TimeUnit.SECONDS);
exec.shutdownNow();
assertThat(exec.awaitTermination(TIME_OUT_SEC, TimeUnit.SECONDS), is(true));
server.stop();
SocketClosedLog log = ASSERTING_HANDLER.socketClosedLog.get(TIME_OUT_SEC, TimeUnit.SECONDS);
assertThat(log.record.getLevel(), is(Level.FINE));
} finally {
NIMA_LOGGER.removeHandler(ASSERTING_HANDLER);
NIMA_LOGGER.setLevel(originalLevel);
}
}


private record SocketClosedLog(LogRecord record, SocketException e) {

}

/**
* DEBUG level logging for attempts to write to closed socket is expected:
* <pre>{@code
* 023.05.23 14:51:53 FINE io.helidon.nima.http2.webserver.Http2Connection !thread!: Socket error on writer thread
* java.io.UncheckedIOException: java.net.SocketException: Socket closed
* at io.helidon.common.buffers.FixedBufferData.writeTo(FixedBufferData.java:74)
* at io.helidon.common.buffers.CompositeArrayBufferData.writeTo(CompositeArrayBufferData.java:41)
* at io.helidon.common.socket.PlainSocket.write(PlainSocket.java:127)
* ...
* Caused by: java.net.SocketException: Socket closed
* at java.base/sun.nio.ch.NioSocketImpl.ensureOpenAndConnected(NioSocketImpl.java:163)
* ...
* at java.base/java.net.Socket$SocketOutputStream.write(Socket.java:1120)
* at io.helidon.common.buffers.FixedBufferData.writeTo(FixedBufferData.java:71)
* ...
* }</pre>
*/

private static class AssertingHandler extends Handler {

CompletableFuture<SocketClosedLog> socketClosedLog = new CompletableFuture<>();

@Override
public void publish(LogRecord record) {
Throwable t = record.getThrown();
if (t == null) return;
while (t.getCause() != null) {
t = t.getCause();
}
if (t instanceof SocketException e) {
socketClosedLog.complete(new SocketClosedLog(record, e));
}
}

@Override
public void flush() {

}

@Override
public void close() throws SecurityException {

}
}
}

0 comments on commit bc225b5

Please sign in to comment.