Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
trustin committed May 30, 2016
1 parent 8208c28 commit e55ed01
Show file tree
Hide file tree
Showing 157 changed files with 3,095 additions and 15,234 deletions.
15 changes: 15 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@
<logback.version>1.1.7</logback.version>
<metrics.version>3.1.2</metrics.version>
<netty.version>4.1.0.CR7</netty.version>
<reactive-streams.version>1.0.0</reactive-streams.version>
<reactive-streams-commons.version>0.4.0.BUILD-SNAPSHOT</reactive-streams-commons.version>
<slf4j.version>1.7.21</slf4j.version>
<tomcat.version>8.0.33</tomcat.version>
<jetty.alpnAgent.version>2.0.1</jetty.alpnAgent.version>
Expand Down Expand Up @@ -140,6 +142,19 @@
<classifier>linux-x86_64</classifier>
</dependency>

<!-- Reactive Streams API -->
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams</artifactId>
<version>${reactive-streams.version}</version>
</dependency>
<!-- Reactive Streams Commons -->
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactive-streams-commons</artifactId>
<version>${reactive-streams-commons.version}</version>
</dependency>

<!-- ALPN -->
<dependency>
<groupId>org.eclipse.jetty.alpn</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ private void configureAsHttp(Channel ch) {
new Http2ClientUpgradeCodec(newHttp2ConnectionHandler(ch));
HttpClientUpgradeHandler http2UpgradeHandler =
new HttpClientUpgradeHandler(http1Codec, http2ClientUpgradeCodec,
options.maxFrameLength());
options.maxContentLength());

pipeline.addLast(http1Codec);
pipeline.addLast(new WorkaroundHandler());
Expand Down Expand Up @@ -296,7 +296,7 @@ void finishSuccessfully(ChannelPipeline pipeline, SessionProtocol protocol) {
switch (protocol) {
case H1:
case H1C:
addBeforeSessionHandler(pipeline, new HttpObjectAggregator(options.maxFrameLength()));
addBeforeSessionHandler(pipeline, new HttpObjectAggregator(options.maxContentLength()));
break;
case H2:
case H2C:
Expand Down Expand Up @@ -364,10 +364,11 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception {
// Note: There's no need to fill Connection, Upgrade, and HTTP2-Settings headers here
// because they are filled by Http2ClientUpgradeCodec.

final String host = HttpHostHeaderUtil.hostHeader(
final String host = HttpHeaderUtil.hostHeader(
remoteAddress.getHostString(), remoteAddress.getPort(), sslCtx != null);

upgradeReq.headers().set(HttpHeaderNames.HOST, host);
upgradeReq.headers().set(HttpHeaderNames.USER_AGENT, HttpHeaderUtil.USER_AGENT);

ctx.writeAndFlush(upgradeReq);
ctx.fireChannelActive();
Expand Down Expand Up @@ -528,7 +529,7 @@ private Http2ConnectionHandler newHttp2ConnectionHandler(Channel ch) {
conn.addListener(new Http2GoAwayListener(ch));
final InboundHttp2ToHttpAdapter listener = new InboundHttp2ToHttpAdapterBuilder(conn)
.propagateSettings(true).validateHttpHeaders(validateHeaders)
.maxContentLength(options.maxFrameLength()).build();
.maxContentLength(options.maxContentLength()).build();

Http2FrameReader reader = new DefaultHttp2FrameReader(validateHeaders);
Http2FrameWriter writer = new DefaultHttp2FrameWriter();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@

package com.linecorp.armeria.client;

final class HttpHostHeaderUtil {
import io.netty.util.AsciiString;

final class HttpHeaderUtil {

static final AsciiString USER_AGENT = AsciiString.of("Armeria client");

static String hostHeader(String host, int port, boolean useTls) {
final int defaultPort = useTls ? 443 : 80;
Expand All @@ -28,5 +32,5 @@ static String hostHeader(String host, int port, boolean useTls) {
return new StringBuilder(host.length() + 6).append(host).append(':').append(port).toString();
}

private HttpHostHeaderUtil() {}
private HttpHeaderUtil() {}
}
16 changes: 11 additions & 5 deletions src/main/java/com/linecorp/armeria/client/HttpSessionHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http2.Http2Settings;
import io.netty.handler.codec.http2.HttpConversionUtil.ExtensionHeaderNames;
import io.netty.util.AsciiString;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.collection.IntObjectHashMap;
import io.netty.util.collection.IntObjectMap;
Expand All @@ -65,7 +66,8 @@ class HttpSessionHandler extends ChannelDuplexHandler implements HttpSession {
*/
private static final int MAX_NUM_REQUESTS_SENT = 536870912;

private static final String ARMERIA_USER_AGENT = "armeria client";
private static final AsciiString SCHEME_HTTPS = AsciiString.of("https");
private static final AsciiString SCHEME_HTTP = AsciiString.of("http");

static HttpSession get(Channel ch) {
final HttpSessionHandler sessionHandler = ch.pipeline().get(HttpSessionHandler.class);
Expand Down Expand Up @@ -458,8 +460,8 @@ private FullHttpRequest convertToHttpRequest(Invocation invocation) {
HttpHeaders headers = request.headers();

headers.set(HttpHeaderNames.HOST, hostHeader(ctx));
headers.set(ExtensionHeaderNames.SCHEME.text(), protocol.uriText());
headers.set(HttpHeaderNames.USER_AGENT, ARMERIA_USER_AGENT);
headers.set(ExtensionHeaderNames.SCHEME.text(), scheme(protocol));
headers.set(HttpHeaderNames.USER_AGENT, HttpHeaderUtil.USER_AGENT);
headers.set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);

ByteBuf contentBuf = request.content();
Expand All @@ -484,8 +486,12 @@ private FullHttpRequest convertToHttpRequest(Invocation invocation) {

private static String hostHeader(ServiceInvocationContext ctx) {
final int port = ((InetSocketAddress) ctx.remoteAddress()).getPort();
return HttpHostHeaderUtil.hostHeader(ctx.host(), port,
ctx.scheme().sessionProtocol().isTls());
return HttpHeaderUtil.hostHeader(ctx.host(), port,
ctx.scheme().sessionProtocol().isTls());
}

private static AsciiString scheme(SessionProtocol protocol) {
return protocol.isTls() ? SCHEME_HTTPS : SCHEME_HTTP;
}

static class Invocation {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,10 @@ protected RemoteInvokerOption<Object> newConstant(int id, String name) {
public static final RemoteInvokerOption<Duration> IDLE_TIMEOUT = valueOf("IDLE_TIMEOUT");

/**
* The maximum allowed length of the frame (or the content) decoded at the session layer. e.g. the
* content of an HTTP request.
* The maximum allowed length of the content decoded at the session layer.
* e.g. the content of an HTTP request.
*/
public static final RemoteInvokerOption<Integer> MAX_FRAME_LENGTH = valueOf("MAX_FRAME_LENGTH");
public static final RemoteInvokerOption<Integer> MAX_CONTENT_LENGTH = valueOf("MAX_CONTENT_LENGTH");

/**
* The maximum number of concurrent in-progress invocations.
Expand Down
22 changes: 11 additions & 11 deletions src/main/java/com/linecorp/armeria/client/RemoteInvokerOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import static com.linecorp.armeria.client.RemoteInvokerOption.EVENT_LOOP_GROUP;
import static com.linecorp.armeria.client.RemoteInvokerOption.IDLE_TIMEOUT;
import static com.linecorp.armeria.client.RemoteInvokerOption.MAX_CONCURRENCY;
import static com.linecorp.armeria.client.RemoteInvokerOption.MAX_FRAME_LENGTH;
import static com.linecorp.armeria.client.RemoteInvokerOption.MAX_CONTENT_LENGTH;
import static com.linecorp.armeria.client.RemoteInvokerOption.POOL_HANDLER_DECORATOR;
import static com.linecorp.armeria.client.RemoteInvokerOption.TRUST_MANAGER_FACTORY;
import static com.linecorp.armeria.client.RemoteInvokerOption.USE_HTTP2_PREFACE;
Expand Down Expand Up @@ -53,7 +53,7 @@ public class RemoteInvokerOptions extends AbstractOptions {

private static final Duration DEFAULT_CONNECTION_TIMEOUT = Duration.ofMillis(3200);
private static final Duration DEFAULT_IDLE_TIMEOUT = Duration.ofSeconds(10);
private static final int DEFAULT_MAX_FRAME_LENGTH = 10 * 1024 * 1024; // 10 MB
private static final int DEFAULT_MAX_CONTENT_LENGTH = 10 * 1024 * 1024; // 10 MB
private static final Integer DEFAULT_MAX_CONCURRENCY = Integer.MAX_VALUE;
private static final Boolean DEFAULT_USE_HTTP2_PREFACE =
"true".equals(System.getProperty("com.linecorp.armeria.defaultUseHttp2Preface", "false"));
Expand All @@ -65,7 +65,7 @@ public class RemoteInvokerOptions extends AbstractOptions {
private static final RemoteInvokerOptionValue<?>[] DEFAULT_OPTION_VALUES = {
CONNECT_TIMEOUT.newValue(DEFAULT_CONNECTION_TIMEOUT),
IDLE_TIMEOUT.newValue(DEFAULT_IDLE_TIMEOUT),
MAX_FRAME_LENGTH.newValue(DEFAULT_MAX_FRAME_LENGTH),
MAX_CONTENT_LENGTH.newValue(DEFAULT_MAX_CONTENT_LENGTH),
MAX_CONCURRENCY.newValue(DEFAULT_MAX_CONCURRENCY),
USE_HTTP2_PREFACE.newValue(DEFAULT_USE_HTTP2_PREFACE)
};
Expand Down Expand Up @@ -97,8 +97,8 @@ private static <T> RemoteInvokerOptionValue<T> validateValue(RemoteInvokerOption

if (option == CONNECT_TIMEOUT) {
validateConnectionTimeout((Duration) value);
} else if (option == MAX_FRAME_LENGTH) {
validateMaxFrameLength((Integer) value);
} else if (option == MAX_CONTENT_LENGTH) {
validateMaxContentLength((Integer) value);
} else if (option == IDLE_TIMEOUT) {
validateIdleTimeout((Duration) value);
} else if (option == MAX_CONCURRENCY) {
Expand All @@ -108,11 +108,11 @@ private static <T> RemoteInvokerOptionValue<T> validateValue(RemoteInvokerOption
return optionValue;
}

private static int validateMaxFrameLength(int maxFrameLength) {
if (maxFrameLength <= 0) {
throw new IllegalArgumentException("maxFrameLength: " + maxFrameLength + " (expected: > 0)");
private static int validateMaxContentLength(int maxContentLength) {
if (maxContentLength <= 0) {
throw new IllegalArgumentException("maxContentLength: " + maxContentLength + " (expected: > 0)");
}
return maxFrameLength;
return maxContentLength;
}

private static Duration validateConnectionTimeout(Duration connectionTimeout) {
Expand Down Expand Up @@ -215,8 +215,8 @@ public long idleTimeoutMillis() {
return idleTimeout().toMillis();
}

public int maxFrameLength() {
return getOrElse(MAX_FRAME_LENGTH, DEFAULT_MAX_FRAME_LENGTH);
public int maxContentLength() {
return getOrElse(MAX_CONTENT_LENGTH, DEFAULT_MAX_CONTENT_LENGTH);
}

public int maxConcurrency() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Copyright 2016 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.armeria.common.http;

import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;

import com.linecorp.armeria.common.util.Exceptions;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http2.Http2ConnectionDecoder;
import io.netty.handler.codec.http2.Http2ConnectionEncoder;
import io.netty.handler.codec.http2.Http2ConnectionHandler;
import io.netty.handler.codec.http2.Http2Exception;
import io.netty.handler.codec.http2.Http2Exception.ClosedStreamCreationException;
import io.netty.handler.codec.http2.Http2Settings;
import io.netty.handler.codec.http2.Http2Stream.State;
import io.netty.handler.codec.http2.Http2StreamVisitor;
import io.netty.handler.codec.http2.HttpToHttp2ConnectionHandler;

public abstract class AbstractHttp2ConnectionHandler extends Http2ConnectionHandler {

/**
* XXX(trustin): Don't know why, but {@link Http2ConnectionHandler} does not close the last stream
* on a cleartext connection, so we make sure all streams are closed.
*/
private static final Http2StreamVisitor closeAllStreams = stream -> {
if (stream.state() != State.CLOSED) {
stream.close();
}
return true;
};

private boolean closing;
private boolean handlingConnectionError;

protected AbstractHttp2ConnectionHandler(
Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder, Http2Settings initialSettings) {
super(decoder, encoder, initialSettings);
}

public boolean isClosing() {
return closing;
}

@Override
protected void onConnectionError(ChannelHandlerContext ctx, Throwable cause, Http2Exception http2Ex) {
if (handlingConnectionError) {
return;
}

handlingConnectionError = true;

// TODO(trustin): Remove this once Http2ConnectionHandler.goAway() sends better debugData.
// See https://github.com/netty/netty/issues/5160
if (http2Ex == null) {
http2Ex = new Http2Exception(INTERNAL_ERROR, goAwayDebugData(null, cause), cause);
} else if (http2Ex instanceof ClosedStreamCreationException) {
final ClosedStreamCreationException e = (ClosedStreamCreationException) http2Ex;
http2Ex = new ClosedStreamCreationException(e.error(), goAwayDebugData(e, cause), cause);
} else {
http2Ex = new Http2Exception(
http2Ex.error(), goAwayDebugData(http2Ex, cause), cause, http2Ex.shutdownHint());
}

super.onConnectionError(ctx, cause, http2Ex);
}

private static String goAwayDebugData(Http2Exception http2Ex, Throwable cause) {
final StringBuilder buf = new StringBuilder(256);
final String type;
final String message;

if (http2Ex != null) {
type = http2Ex.getClass().getName();
message = http2Ex.getMessage();
} else {
type = null;
message = null;
}

buf.append("type: ");
buf.append(type != null ? type : "n/a");
buf.append(", message: ");
buf.append(message != null ? message : "n/a");
buf.append(", cause: ");
buf.append(cause != null ? Exceptions.traceText(cause) : "n/a");

return buf.toString();
}

@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
closing = true;

// TODO(trustin): Remove this line once https://github.com/netty/netty/issues/4210 is fixed.
connection().forEachActiveStream(closeAllStreams);

onCloseRequest(ctx);
super.close(ctx, promise);
}

protected abstract void onCloseRequest(ChannelHandlerContext ctx) throws Exception;
}
Loading

0 comments on commit e55ed01

Please sign in to comment.