From cde023994dc4b8207866a63b1052fce48b9aa81d Mon Sep 17 00:00:00 2001 From: Bryan Atsatt Date: Tue, 19 Feb 2019 13:23:59 -0800 Subject: [PATCH] Fixes for unnecessary exception and DefaultLastHttpContent instantiation. --- .../reactive/SingleSubscriberHolder.java | 9 ++- .../helidon/webserver/BareResponseImpl.java | 25 ++++--- .../java/io/helidon/webserver/Request.java | 71 +++++++++++-------- 3 files changed, 60 insertions(+), 45 deletions(-) diff --git a/common/reactive/src/main/java/io/helidon/common/reactive/SingleSubscriberHolder.java b/common/reactive/src/main/java/io/helidon/common/reactive/SingleSubscriberHolder.java index 997ce4057cf..8686572c06b 100644 --- a/common/reactive/src/main/java/io/helidon/common/reactive/SingleSubscriberHolder.java +++ b/common/reactive/src/main/java/io/helidon/common/reactive/SingleSubscriberHolder.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017, 2018 Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2017, 2019 Oracle and/or its affiliates. All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,6 +26,9 @@ * @param the type of the {@code Subscriber} */ public class SingleSubscriberHolder { + private static final IllegalStateException ALREADY_CLOSED = new IllegalStateException("Publisher already closed."); + private static final IllegalStateException CANCELLED = new IllegalStateException("Canceled before any subscriber is " + + "registered!"); private final CompletableFuture> subscriber = new CompletableFuture<>(); private final AtomicBoolean closed = new AtomicBoolean(false); @@ -77,7 +80,7 @@ public boolean register(Flow.Subscriber subscriber) { * handler (e.g. in a previous invocation of this method). */ public void close(Consumer> completionHandler) { - if (!subscriber.completeExceptionally(new IllegalStateException("Publisher already closed.")) + if (!subscriber.completeExceptionally(ALREADY_CLOSED) && closed.compareAndSet(false, true)) { try { @@ -95,7 +98,7 @@ public void close(Consumer> completionHandler) { * Hard cancel - nothing is send to the subscriber but subscription is considered as canceled. */ public void cancel() { - subscriber.completeExceptionally(new IllegalStateException("Canceled before any subscriber is registered!")); + subscriber.completeExceptionally(CANCELLED); closed.set(true); } diff --git a/webserver/webserver/src/main/java/io/helidon/webserver/BareResponseImpl.java b/webserver/webserver/src/main/java/io/helidon/webserver/BareResponseImpl.java index 60593968bd9..afab6caae90 100644 --- a/webserver/webserver/src/main/java/io/helidon/webserver/BareResponseImpl.java +++ b/webserver/webserver/src/main/java/io/helidon/webserver/BareResponseImpl.java @@ -45,6 +45,7 @@ import io.netty.handler.codec.http.HttpHeaders; import io.netty.handler.codec.http.HttpRequest; import io.netty.handler.codec.http.HttpUtil; +import io.netty.handler.codec.http.LastHttpContent; import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; @@ -61,6 +62,8 @@ class BareResponseImpl implements BareResponse { // See HttpConversionUtil.ExtensionHeaderNames private static final String HTTP_2_HEADER_PREFIX = "x-http2"; + private static final SocketClosedException CLOSED = new SocketClosedException("Response channel is closed!"); + private static final LastHttpContent LAST_HTTP_CONTENT = new DefaultLastHttpContent(Unpooled.EMPTY_BUFFER); private final boolean keepAlive; private final ChannelHandlerContext ctx; @@ -103,7 +106,7 @@ class BareResponseImpl implements BareResponse { .closeFuture() // to make this work, when programmatically closing the channel, the responseFuture must be closed beforehand! .addListener(channelFuture -> responseFuture - .completeExceptionally(new SocketClosedException("Response channel is closed!"))); + .completeExceptionally(CLOSED)); this.keepAlive = HttpUtil.isKeepAlive(request); this.requestHeaders = request.headers(); } @@ -189,26 +192,26 @@ private void completeInternal(Throwable throwable) { ctx.channel().read(); } - ctx.writeAndFlush(new DefaultLastHttpContent(Unpooled.EMPTY_BUFFER)) - .addListener(completeOnFailureListener("An exception occurred when writing last http content.")) - .addListener(preventMaskingExceptionOnFailureListener(throwable)) - .addListener(completeOnSuccessListener(throwable)) - .addListener(ChannelFutureListener.CLOSE_ON_FAILURE); + writeLastContent(throwable, ChannelFutureListener.CLOSE_ON_FAILURE); }); } else { // If keep-alive is off, close the connection once the content is fully written. runOnOutboundEventLoopThread(() -> { LOGGER.finest(() -> log("Closing with an empty buffer; keep-alive: " + keepAlive)); - ctx.writeAndFlush(new DefaultLastHttpContent(Unpooled.EMPTY_BUFFER)) - .addListener(completeOnFailureListener("An exception occurred when writing last http content.")) - .addListener(preventMaskingExceptionOnFailureListener(throwable)) - .addListener(completeOnSuccessListener(throwable)) - .addListener(ChannelFutureListener.CLOSE); + writeLastContent(throwable, ChannelFutureListener.CLOSE); }); } } + private void writeLastContent(final Throwable throwable, final ChannelFutureListener closeAction) { + ctx.writeAndFlush(LAST_HTTP_CONTENT) + .addListener(completeOnFailureListener("An exception occurred when writing last http content.")) + .addListener(preventMaskingExceptionOnFailureListener(throwable)) + .addListener(completeOnSuccessListener(throwable)) + .addListener(closeAction); + } + private GenericFutureListener> completeOnFailureListener(String message) { return future -> { if (!future.isSuccess()) { diff --git a/webserver/webserver/src/main/java/io/helidon/webserver/Request.java b/webserver/webserver/src/main/java/io/helidon/webserver/Request.java index 3ffd8dd3e26..5d8a13bec2b 100644 --- a/webserver/webserver/src/main/java/io/helidon/webserver/Request.java +++ b/webserver/webserver/src/main/java/io/helidon/webserver/Request.java @@ -20,6 +20,7 @@ import java.net.URI; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; +import java.nio.charset.UnsupportedCharsetException; import java.util.ArrayList; import java.util.Collections; import java.util.Deque; @@ -35,7 +36,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Function; import java.util.function.Predicate; -import java.util.stream.Stream; import io.helidon.common.CollectionsHelper; import io.helidon.common.http.ContextualRegistry; @@ -71,7 +71,7 @@ abstract class Request implements ServerRequest { /** * Creates new instance. * - * @param req bare request from HTTP SPI implementation. + * @param req bare request from HTTP SPI implementation. * @param webServer relevant server. */ Request(BareRequest req, WebServer webServer) { @@ -105,10 +105,10 @@ abstract class Request implements ServerRequest { */ static Charset requestContentCharset(ServerRequest request) { return request.headers() - .contentType() - .flatMap(MediaType::charset) - .map(Charset::forName) - .orElse(DEFAULT_CHARSET); + .contentType() + .flatMap(MediaType::charset) + .map(Charset::forName) + .orElse(DEFAULT_CHARSET); } protected abstract Tracer tracer(); @@ -229,7 +229,7 @@ class Content implements io.helidon.common.http.Content { private Content() { this.originalPublisher = bareRequest.bodyPublisher(); - this.readers = new LinkedList<>(); + this.readers = appendDefaultReaders(new LinkedList<>()); this.filters = new ArrayList<>(); this.readersLock = new ReentrantReadWriteLock(); this.filtersLock = new ReentrantReadWriteLock(); @@ -237,12 +237,19 @@ private Content() { private Content(Content orig) { this.originalPublisher = orig.originalPublisher; - this.readers = orig.readers; + this.readers = appendDefaultReaders(orig.readers); this.filters = orig.filters; this.readersLock = orig.readersLock; this.filtersLock = orig.filtersLock; } + private Deque> appendDefaultReaders(final Deque> readers) { + readers.addLast(reader(String.class, stringContentReader())); + readers.addLast(reader(byte[].class, ContentReaders.byteArrayReader())); + readers.addLast(reader(InputStream.class, ContentReaders.inputStreamReader())); + return readers; + } + @Override public void registerFilter(Function, Flow.Publisher> function) { @@ -280,19 +287,14 @@ private InternalReader reader(Class clazz, Reader reader) { @Override @SuppressWarnings("unchecked") - public CompletionStage as(Class type) { + public CompletionStage as(final Class type) { Span readSpan = createReadSpan(type); CompletionStage result; try { readersLock.readLock().lock(); - - result = readersWithDefaults() - .filter(reader -> reader.accept(type)) - .findFirst() - // in this context, the cast is absurd, although it's needed; - // one can create a predicate matching an incompatible class - .map(reader -> (CompletionStage) (((Reader) reader).apply(chainPublishers(), type))) - .orElse(failedFuture(new IllegalArgumentException("No reader found for class: " + type))); + result = (CompletionStage) readerFor(type).apply(chainPublishers(), type); + } catch (IllegalArgumentException e) { + result = failedFuture(e); } catch (Exception e) { result = failedFuture(new IllegalArgumentException("Transformation failed!", e)); } finally { @@ -300,10 +302,10 @@ public CompletionStage as(Class type) { } // Close span result.thenRun(readSpan::finish) - .exceptionally(t -> { - finishSpanWithError(readSpan, t); - return null; - }); + .exceptionally(t -> { + finishSpanWithError(readSpan, t); + return null; + }); return result; } @@ -327,16 +329,23 @@ private Span createReadSpan(Class type) { return spanBuilder.start(); } - private Stream> readersWithDefaults() { - return Stream.concat(readers.stream(), Stream.of( - reader(String.class, stringContentReader()), - reader(byte[].class, ContentReaders.byteArrayReader()), - reader(InputStream.class, ContentReaders.inputStreamReader()))); + @SuppressWarnings("unchecked") + private Reader readerFor(final Class type) { + return (Reader) readers.stream() + .filter(reader -> reader.accept(type)) + .findFirst() + .orElseThrow(() -> new IllegalArgumentException("No reader found for class: " + type)); } private Reader stringContentReader() { - Charset charset = requestContentCharset(Request.this); - return ContentReaders.stringReader(charset); + try { + Charset charset = requestContentCharset(Request.this); + return ContentReaders.stringReader(charset); + } catch (final UnsupportedCharsetException e) { + return (publisher, clazz) -> { + throw e; + }; + } } @Override @@ -405,9 +414,9 @@ static class Path implements ServerRequest.Path { /** * Creates new instance. * - * @param path actual relative URI path. - * @param rawPath actual relative URI path without any decoding. - * @param params resolved path parameters. + * @param path actual relative URI path. + * @param rawPath actual relative URI path without any decoding. + * @param params resolved path parameters. * @param absolutePath absolute path. */ Path(String path, String rawPath, Map params, Path absolutePath) {