Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

first perf fixes #423

Merged
merged 1 commit into from
Feb 19, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2017, 2018 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2017, 2019 Oracle and/or its affiliates. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -26,6 +26,9 @@
* @param <T> the type of the {@code Subscriber}
*/
public class SingleSubscriberHolder<T> {
private static final IllegalStateException ALREADY_CLOSED = new IllegalStateException("Publisher already closed.");
private static final IllegalStateException CANCELLED = new IllegalStateException("Canceled before any subscriber is "
+ "registered!");

private final CompletableFuture<Flow.Subscriber<? super T>> subscriber = new CompletableFuture<>();
private final AtomicBoolean closed = new AtomicBoolean(false);
Expand Down Expand Up @@ -77,7 +80,7 @@ public boolean register(Flow.Subscriber<? super T> subscriber) {
* handler (e.g. in a previous invocation of this method).
*/
public void close(Consumer<Flow.Subscriber<? super T>> completionHandler) {
if (!subscriber.completeExceptionally(new IllegalStateException("Publisher already closed."))
if (!subscriber.completeExceptionally(ALREADY_CLOSED)
&& closed.compareAndSet(false, true)) {

try {
Expand All @@ -95,7 +98,7 @@ public void close(Consumer<Flow.Subscriber<? super T>> completionHandler) {
* Hard cancel - nothing is send to the subscriber but subscription is considered as canceled.
*/
public void cancel() {
subscriber.completeExceptionally(new IllegalStateException("Canceled before any subscriber is registered!"));
subscriber.completeExceptionally(CANCELLED);
closed.set(true);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpUtil;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
Expand All @@ -61,6 +62,8 @@ class BareResponseImpl implements BareResponse {

// See HttpConversionUtil.ExtensionHeaderNames
private static final String HTTP_2_HEADER_PREFIX = "x-http2";
private static final SocketClosedException CLOSED = new SocketClosedException("Response channel is closed!");
private static final LastHttpContent LAST_HTTP_CONTENT = new DefaultLastHttpContent(Unpooled.EMPTY_BUFFER);

private final boolean keepAlive;
private final ChannelHandlerContext ctx;
Expand Down Expand Up @@ -103,7 +106,7 @@ class BareResponseImpl implements BareResponse {
.closeFuture()
// to make this work, when programmatically closing the channel, the responseFuture must be closed beforehand!
.addListener(channelFuture -> responseFuture
.completeExceptionally(new SocketClosedException("Response channel is closed!")));
.completeExceptionally(CLOSED));
this.keepAlive = HttpUtil.isKeepAlive(request);
this.requestHeaders = request.headers();
}
Expand Down Expand Up @@ -189,26 +192,26 @@ private void completeInternal(Throwable throwable) {
ctx.channel().read();
}

ctx.writeAndFlush(new DefaultLastHttpContent(Unpooled.EMPTY_BUFFER))
.addListener(completeOnFailureListener("An exception occurred when writing last http content."))
.addListener(preventMaskingExceptionOnFailureListener(throwable))
.addListener(completeOnSuccessListener(throwable))
.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
writeLastContent(throwable, ChannelFutureListener.CLOSE_ON_FAILURE);
});
} else {
// If keep-alive is off, close the connection once the content is fully written.
runOnOutboundEventLoopThread(() -> {
LOGGER.finest(() -> log("Closing with an empty buffer; keep-alive: " + keepAlive));

ctx.writeAndFlush(new DefaultLastHttpContent(Unpooled.EMPTY_BUFFER))
.addListener(completeOnFailureListener("An exception occurred when writing last http content."))
.addListener(preventMaskingExceptionOnFailureListener(throwable))
.addListener(completeOnSuccessListener(throwable))
.addListener(ChannelFutureListener.CLOSE);
writeLastContent(throwable, ChannelFutureListener.CLOSE);
});
}
}

private void writeLastContent(final Throwable throwable, final ChannelFutureListener closeAction) {
ctx.writeAndFlush(LAST_HTTP_CONTENT)
.addListener(completeOnFailureListener("An exception occurred when writing last http content."))
.addListener(preventMaskingExceptionOnFailureListener(throwable))
.addListener(completeOnSuccessListener(throwable))
.addListener(closeAction);
}

private GenericFutureListener<Future<? super Void>> completeOnFailureListener(String message) {
return future -> {
if (!future.isSuccess()) {
Expand Down
71 changes: 40 additions & 31 deletions webserver/webserver/src/main/java/io/helidon/webserver/Request.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.net.URI;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.nio.charset.UnsupportedCharsetException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Deque;
Expand All @@ -35,7 +36,6 @@
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Stream;

import io.helidon.common.CollectionsHelper;
import io.helidon.common.http.ContextualRegistry;
Expand Down Expand Up @@ -71,7 +71,7 @@ abstract class Request implements ServerRequest {
/**
* Creates new instance.
*
* @param req bare request from HTTP SPI implementation.
* @param req bare request from HTTP SPI implementation.
* @param webServer relevant server.
*/
Request(BareRequest req, WebServer webServer) {
Expand Down Expand Up @@ -105,10 +105,10 @@ abstract class Request implements ServerRequest {
*/
static Charset requestContentCharset(ServerRequest request) {
return request.headers()
.contentType()
.flatMap(MediaType::charset)
.map(Charset::forName)
.orElse(DEFAULT_CHARSET);
.contentType()
.flatMap(MediaType::charset)
.map(Charset::forName)
.orElse(DEFAULT_CHARSET);
}

protected abstract Tracer tracer();
Expand Down Expand Up @@ -229,20 +229,27 @@ class Content implements io.helidon.common.http.Content {

private Content() {
this.originalPublisher = bareRequest.bodyPublisher();
this.readers = new LinkedList<>();
this.readers = appendDefaultReaders(new LinkedList<>());
this.filters = new ArrayList<>();
this.readersLock = new ReentrantReadWriteLock();
this.filtersLock = new ReentrantReadWriteLock();
}

private Content(Content orig) {
this.originalPublisher = orig.originalPublisher;
this.readers = orig.readers;
this.readers = appendDefaultReaders(orig.readers);
this.filters = orig.filters;
this.readersLock = orig.readersLock;
this.filtersLock = orig.filtersLock;
}

private Deque<InternalReader<?>> appendDefaultReaders(final Deque<InternalReader<?>> readers) {
readers.addLast(reader(String.class, stringContentReader()));
readers.addLast(reader(byte[].class, ContentReaders.byteArrayReader()));
readers.addLast(reader(InputStream.class, ContentReaders.inputStreamReader()));
return readers;
}

@Override
public void registerFilter(Function<Flow.Publisher<DataChunk>, Flow.Publisher<DataChunk>> function) {

Expand Down Expand Up @@ -280,30 +287,25 @@ private <T> InternalReader<T> reader(Class<T> clazz, Reader<T> reader) {

@Override
@SuppressWarnings("unchecked")
public <T> CompletionStage<T> as(Class<T> type) {
public <T> CompletionStage<T> as(final Class<T> type) {
Span readSpan = createReadSpan(type);
CompletionStage<T> result;
try {
readersLock.readLock().lock();

result = readersWithDefaults()
.filter(reader -> reader.accept(type))
.findFirst()
// in this context, the cast is absurd, although it's needed;
// one can create a predicate matching an incompatible class
.map(reader -> (CompletionStage<? extends T>) (((Reader<T>) reader).apply(chainPublishers(), type)))
.orElse(failedFuture(new IllegalArgumentException("No reader found for class: " + type)));
result = (CompletionStage<T>) readerFor(type).apply(chainPublishers(), type);
} catch (IllegalArgumentException e) {
result = failedFuture(e);
} catch (Exception e) {
result = failedFuture(new IllegalArgumentException("Transformation failed!", e));
} finally {
readersLock.readLock().unlock();
}
// Close span
result.thenRun(readSpan::finish)
.exceptionally(t -> {
finishSpanWithError(readSpan, t);
return null;
});
.exceptionally(t -> {
finishSpanWithError(readSpan, t);
return null;
});
return result;
}

Expand All @@ -327,16 +329,23 @@ private <T> Span createReadSpan(Class<T> type) {
return spanBuilder.start();
}

private Stream<InternalReader<?>> readersWithDefaults() {
return Stream.concat(readers.stream(), Stream.of(
reader(String.class, stringContentReader()),
reader(byte[].class, ContentReaders.byteArrayReader()),
reader(InputStream.class, ContentReaders.inputStreamReader())));
@SuppressWarnings("unchecked")
private <T> Reader<T> readerFor(final Class<T> type) {
return (Reader<T>) readers.stream()
.filter(reader -> reader.accept(type))
.findFirst()
.orElseThrow(() -> new IllegalArgumentException("No reader found for class: " + type));
}

private Reader<String> stringContentReader() {
Charset charset = requestContentCharset(Request.this);
return ContentReaders.stringReader(charset);
try {
Charset charset = requestContentCharset(Request.this);
return ContentReaders.stringReader(charset);
} catch (final UnsupportedCharsetException e) {
return (publisher, clazz) -> {
throw e;
};
}
}

@Override
Expand Down Expand Up @@ -405,9 +414,9 @@ static class Path implements ServerRequest.Path {
/**
* Creates new instance.
*
* @param path actual relative URI path.
* @param rawPath actual relative URI path without any decoding.
* @param params resolved path parameters.
* @param path actual relative URI path.
* @param rawPath actual relative URI path without any decoding.
* @param params resolved path parameters.
* @param absolutePath absolute path.
*/
Path(String path, String rawPath, Map<String, String> params, Path absolutePath) {
Expand Down