Skip to content

Commit

Permalink
Move reset to RoutingResponse
Browse files Browse the repository at this point in the history
Introduce commit to RoutingResponse
Correctly terminate HTTP/2 streams when an error is thrown
Update tests with try with resources
  • Loading branch information
tomas-langer committed Dec 21, 2022
1 parent 85666d8 commit 2d25ff2
Show file tree
Hide file tree
Showing 13 changed files with 250 additions and 142 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ public OutputStream writeResponseStatusAndHeaders(long contentLengthParam,
if (contentLength > 0) {
res.header(Header.create(Header.CONTENT_LENGTH, String.valueOf(contentLength)));
}
this.outputStream = res.outputStream();
this.outputStream = new NoFlushOutputStream(res.outputStream());
return outputStream;
}

Expand Down Expand Up @@ -386,4 +386,37 @@ public void await() {
}
}
}

private static class NoFlushOutputStream extends OutputStream {
private final OutputStream delegate;

private NoFlushOutputStream(OutputStream delegate) {
this.delegate = delegate;
}

@Override
public void write(byte[] b) throws IOException {
delegate.write(b);
}

@Override
public void write(byte[] b, int off, int len) throws IOException {
delegate.write(b, off, len);
}

@Override
public void flush() {
// intentional no-op, flush did not work nicely with Jersey
}

@Override
public void close() throws IOException {
delegate.close();
}

@Override
public void write(int b) throws IOException {
delegate.write(b);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -118,17 +118,6 @@ public boolean isSent() {
return isSent;
}

@Override
public boolean reset() {
if (isSent) {
return false;
}
headers.clear();
streamingEntity = false;
outputStream = null;
return true;
}

@Override
public OutputStream outputStream() {
if (isSent) {
Expand Down Expand Up @@ -166,6 +155,24 @@ public boolean hasEntity() {
return isSent || streamingEntity;
}

@Override
public boolean reset() {
if (isSent || outputStream != null && outputStream.bytesWritten > 0) {
return false;
}
headers.clear();
streamingEntity = false;
outputStream = null;
return true;
}

@Override
public void commit() {
if (outputStream != null) {
outputStream.commit();
}
}

private static class BlockingOutputStream extends OutputStream {

private final ServerResponseHeaders headers;
Expand Down Expand Up @@ -210,8 +217,19 @@ public void write(byte[] b, int off, int len) throws IOException {
write(BufferData.create(b, off, len));
}

@Override
public void flush() throws IOException {
if (firstByte && firstBuffer != null) {
write(BufferData.empty());
}
}

@Override
public void close() {
// does nothing, we expect commit(), so we can reset response when no bytes were written to response
}

void commit() {
if (closed) {
return;
}
Expand Down Expand Up @@ -292,6 +310,7 @@ private void sendHeadersAndPrepare() {
headers.setIfAbsent(Header.create(Header.DATE, true, false, Http.DateTime.rfc1123String()));

Http2Headers http2Headers = Http2Headers.create(headers);
http2Headers.status(status);
http2Headers.validateResponse();
int written = writer.writeHeaders(http2Headers,
streamId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,10 +258,10 @@ public void run() {
+ ctx.childSocketId() + " ] - " + streamId);
try {
handle();
} catch (SocketWriterException e) {
throw new CloseConnectionException(e.getMessage(), e);
} catch (CloseConnectionException | UncheckedIOException e) {
throw e;
} catch (SocketWriterException | CloseConnectionException | UncheckedIOException e) {
Http2RstStream rst = new Http2RstStream(Http2ErrorCode.STREAM_CLOSED);
writer.write(rst.toFrameData(serverSettings, streamId, Http2Flag.NoFlags.create()), flowControl);
// no sense in throwing an exception, as this is invoked from an executor service directly
} catch (RequestException e) {
DirectHandler handler = ctx.directHandlers().handler(e.eventType());
DirectHandler.TransportResponse response = handler.handle(e.request(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,16 @@

package io.helidon.nima.tests.integration.http2.webserver;

import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Optional;

import io.helidon.common.configurable.Resource;
import io.helidon.common.http.Http;
import io.helidon.common.pki.KeyConfig;
Expand All @@ -29,41 +39,38 @@
import io.helidon.nima.webserver.http.HttpRouting;
import io.helidon.nima.webserver.http.ServerRequest;
import io.helidon.nima.webserver.http.ServerResponse;

import org.hamcrest.BaseMatcher;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
import org.junit.jupiter.api.Test;

import java.io.OutputStream;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Optional;

import static io.helidon.common.http.Http.Method.GET;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;

@ServerTest
class Http2ErrorHandlingWithOutputStreamTest {

private static final Http.HeaderName MAIN_HEADER_NAME = Http.Header.create("main-handler");
private static final Http.HeaderName ERROR_HEADER_NAME = Http.Header.create("error-handler");

private static HttpClient httpClient;
private final int plainPort;
private final int tlsPort;
private static HttpClient httpClient;


Http2ErrorHandlingWithOutputStreamTest(WebServer server) {
this.plainPort = server.port();
this.tlsPort = server.port("https");
}

public static <T> Matcher<? super Optional<T>> emptyOptional() {
return new EmptyOptionalMatcher<>();
}

@SetUpServer
static void setUpServer(WebServer.Builder serverBuilder) {
KeyConfig privateKeyConfig = KeyConfig.keystoreBuilder()
Expand All @@ -77,7 +84,7 @@ static void setUpServer(WebServer.Builder serverBuilder) {
.build();

serverBuilder.socket("https",
socketBuilder -> socketBuilder.tls(tls));
socketBuilder -> socketBuilder.tls(tls));
httpClient = http2Client();
}

Expand All @@ -100,7 +107,6 @@ static void router(HttpRouting.Builder router) {
}))
.route(Http2Route.route(GET, "get-outputStream-writeTwiceThenError", (req, res) -> {
res.status(Http.Status.OK_200);
res.header(Http.Header.create("status"), "200");
res.header(MAIN_HEADER_NAME, "x");
OutputStream os = res.outputStream();
os.write("writeOnce".getBytes(StandardCharsets.UTF_8));
Expand All @@ -115,31 +121,16 @@ static void router(HttpRouting.Builder router) {
os.flush();
throw new CustomException();
}))
.route(Http2Route.route(GET, "", ((req, res) ->
res.send("ok"))));
}

private static HttpClient http2Client() {
return HttpClient.newBuilder()
.version(HttpClient.Version.HTTP_2)
.connectTimeout(Duration.ofSeconds(30))
.build();
}

private HttpResponse<String> request(String uriSuffix) {
try {
return httpClient.send(httpRequest(uriSuffix), HttpResponse.BodyHandlers.ofString());
} catch (Exception e) {
throw new RuntimeException(e);
}
}

private HttpRequest httpRequest(String uriSuffix) {
return HttpRequest.newBuilder()
.timeout(Duration.ofSeconds(30))
.uri(URI.create("http://localhost:" + plainPort + uriSuffix))
.GET()
.build();
.get("get-outputStream-tryWithResources", (req, res) -> {
res.header(MAIN_HEADER_NAME, "x");
try (OutputStream os = res.outputStream()) {
os.write("This should not be sent immediately".getBytes(StandardCharsets.UTF_8));
throw new CustomException();
}
})
.route(Http2Route.route(GET, "", (
(req, res) ->
res.send("ok"))));
}

@Test
Expand All @@ -160,7 +151,6 @@ void testGetOutputStreamThenError_expect_CustomErrorHandlerMessage() {
assertThat(response.headers().firstValue(MAIN_HEADER_NAME.lowerCase()), is(emptyOptional()));
}


@Test
void testGetOutputStreamWriteOnceThenError_expect_CustomErrorHandlerMessage() {
var response = request("/get-outputStream-writeOnceThenError");
Expand All @@ -171,35 +161,54 @@ void testGetOutputStreamWriteOnceThenError_expect_CustomErrorHandlerMessage() {
assertThat(response.headers().firstValue(MAIN_HEADER_NAME.lowerCase()), is(emptyOptional()));
}

// ------------------
// This test hangs
// ------------------
// @Test
// void testGetOutputStreamWriteTwiceThenError_expect_invalidResponse() {
// try {
// var response = request("/get-outputStream-writeTwiceThenError");
// assertEquals(500, response.statusCode());
// //String body = response.body();
// } catch (Exception e) {
// System.err.println("never get here ---------------");
// e.printStackTrace();
// }
// }

// ------------------
// This test hangs
// ------------------
// @Test
// void testGetOutputStreamWriteFlushThenError_expect_invalidResponse() {
// try {
// var response = request("/get-outputStream-writeFlushThenError");
// assertEquals(500, response.statusCode());
// //String body = response.body();
// } catch (Exception e) {
// System.err.println("never get here ---------------");
// e.printStackTrace();
// }
// }
@Test
void testGetOutputStreamWriteTwiceThenError_expect_invalidResponse() {
RuntimeException r = assertThrows(RuntimeException.class, () -> request("/get-outputStream-writeTwiceThenError"));
assertThat(r.getCause(), instanceOf(IOException.class));
// stream should have been reset during processing
assertThat(r.getMessage(), containsString("RST_STREAM"));
}

@Test
void testGetOutputStreamWriteFlushThenError_expect_invalidResponse() {
RuntimeException r = assertThrows(RuntimeException.class, () -> request("/get-outputStream-writeFlushThenError"));
assertThat(r.getCause(), instanceOf(IOException.class));
// stream should have been reset during processing
assertThat(r.getMessage(), containsString("RST_STREAM"));
}

@Test
void testGetOutputStreamTryWithResourcesThenError_expect_CustomErrorHandlerMessage() {
var response = request("/get-outputStream-tryWithResources");

assertEquals(418, response.statusCode());
assertThat(response.body(), is("TeaPotIAm"));
assertThat(response.headers().firstValue(ERROR_HEADER_NAME.lowerCase()), is(Optional.of("err")));
assertThat(response.headers().firstValue(MAIN_HEADER_NAME.lowerCase()), is(emptyOptional()));
}

private static HttpClient http2Client() {
return HttpClient.newBuilder()
.version(HttpClient.Version.HTTP_2)
.connectTimeout(Duration.ofSeconds(30))
.build();
}

private HttpResponse<String> request(String uriSuffix) {
try {
return httpClient.send(httpRequest(uriSuffix), HttpResponse.BodyHandlers.ofString());
} catch (Exception e) {
throw new RuntimeException(e);
}
}

private HttpRequest httpRequest(String uriSuffix) {
return HttpRequest.newBuilder()
.timeout(Duration.ofSeconds(30))
.uri(URI.create("http://localhost:" + plainPort + uriSuffix))
.GET()
.build();
}

private static class CustomRoutingHandler implements ErrorHandler<CustomException> {
@Override
Expand All @@ -214,10 +223,6 @@ private static class CustomException extends RuntimeException {

}

public static <T> Matcher<? super Optional<T>> emptyOptional() {
return new EmptyOptionalMatcher<>();
}

static class EmptyOptionalMatcher<T> extends BaseMatcher<Optional<T>> {

private Optional<T> optionalActual;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,4 @@ java.util.logging.ConsoleHandler.level=FINEST
java.util.logging.SimpleFormatter.format=%1$tY.%1$tm.%1$td %1$tH:%1$tM:%1$tS.%1$tL %5$s%6$s%n
# Global logging level. Can be overridden by specific loggers
.level=INFO
io.helidon.nima.level=FINEST
io.helidon.nima.level=INFO
Loading

0 comments on commit 2d25ff2

Please sign in to comment.