Skip to content

Commit

Permalink
Remove unused code and simplify execution path based on the test cove…
Browse files Browse the repository at this point in the history
…rage report
  • Loading branch information
trustin committed Jul 15, 2016
1 parent 65912a0 commit c9d5482
Show file tree
Hide file tree
Showing 14 changed files with 35 additions and 175 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ private enum HttpPreference {
}
sslCtx = builder.build();
} catch (SSLException e) {
throw new IllegalStateException("failed to create a SslContext", e);
throw new IllegalStateException("failed to create an SslContext", e);
}
} else {
sslCtx = null;
Expand Down Expand Up @@ -368,7 +368,7 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception {
// because they are filled by Http2ClientUpgradeCodec.

final String host = HttpHeaderUtil.hostHeader(
remoteAddress.getHostString(), remoteAddress.getPort(), sslCtx != null);
remoteAddress.getHostString(), remoteAddress.getPort(), H1C.defaultPort());

upgradeReq.headers().set(HttpHeaderNames.HOST, host);
upgradeReq.headers().set(HttpHeaderNames.USER_AGENT, HttpHeaderUtil.USER_AGENT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@ final class HttpHeaderUtil {
// TODO(trustin): Add version information
static final AsciiString USER_AGENT = AsciiString.of("Armeria");

static String hostHeader(String host, int port, boolean useTls) {
final int defaultPort = useTls ? 443 : 80;

static String hostHeader(String host, int port, int defaultPort) {
if (port == defaultPort) {
return host;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,17 +74,16 @@ public Future<Channel> apply(PoolKey key) {
}

void connect(SocketAddress remoteAddress, SessionProtocol protocol, Promise<Channel> sessionPromise) {

final Bootstrap bootstrap = bootstrap(protocol);
final ChannelFuture connectFuture = bootstrap.connect(remoteAddress);
final Channel ch = connectFuture.channel();

if (connectFuture.isDone()) {
notifySessionPromise(protocol, ch, connectFuture, sessionPromise);
} else {
connectFuture.addListener(
(Future<Void> future) -> notifySessionPromise(protocol, ch, future, sessionPromise));
}
connectFuture.addListener((ChannelFuture future) -> {
if (future.isSuccess()) {
initSession(protocol, future, sessionPromise);
} else {
sessionPromise.setFailure(future.cause());
}
});
}

private Bootstrap bootstrap(SessionProtocol sessionProtocol) {
Expand All @@ -100,35 +99,15 @@ protected void initChannel(Channel ch) throws Exception {
});
}

private void notifySessionPromise(SessionProtocol protocol, Channel ch,
Future<Void> connectFuture, Promise<Channel> sessionPromise) {
assert connectFuture.isDone();
if (connectFuture.isSuccess()) {
watchSessionActive(protocol, ch, sessionPromise);
} else {
sessionPromise.setFailure(connectFuture.cause());
}
}

private Future<Channel> watchSessionActive(SessionProtocol protocol, Channel ch,
Promise<Channel> sessionPromise) {
private void initSession(SessionProtocol protocol, ChannelFuture connectFuture,
Promise<Channel> sessionPromise) {
assert connectFuture.isSuccess();

EventLoop eventLoop = ch.eventLoop();

if (eventLoop.inEventLoop()) {
watchSessionActive0(protocol, ch, sessionPromise);
} else {
eventLoop.execute(() -> watchSessionActive0(protocol, ch, sessionPromise));
}
return sessionPromise;
}

private void watchSessionActive0(SessionProtocol protocol, final Channel ch,
Promise<Channel> sessionPromise) {

assert ch.eventLoop().inEventLoop();
final Channel ch = connectFuture.channel();
final EventLoop eventLoop = ch.eventLoop();
assert eventLoop.inEventLoop();

final ScheduledFuture<?> timeoutFuture = ch.eventLoop().schedule(() -> {
final ScheduledFuture<?> timeoutFuture = eventLoop.schedule(() -> {
if (sessionPromise.tryFailure(new SessionProtocolNegotiationException(
protocol, "connection established, but session creation timed out: " + ch))) {
ch.close();
Expand Down
7 changes: 1 addition & 6 deletions src/main/java/com/linecorp/armeria/common/Request.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,4 @@

package com.linecorp.armeria.common;

public interface Request {
@SuppressWarnings("unchecked")
default <T extends Request> T cast() {
return (T) this;
}
}
public interface Request {}
6 changes: 0 additions & 6 deletions src/main/java/com/linecorp/armeria/common/Response.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,6 @@
import com.linecorp.armeria.common.reactivestreams.RichPublisher;

public interface Response {

@SuppressWarnings("unchecked")
default <T extends Response> T cast() {
return (T) this;
}

default CompletableFuture<?> closeFuture() {
if (this instanceof RichPublisher) {
return ((RichPublisher<?>) this).closeFuture();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,8 @@ public final class HttpHeaderNames {

public static AsciiString of(String name) {
requireNonNull(name, "name");
final AsciiString asciiName = map.get(name.toLowerCase(Locale.US));
name = name.toLowerCase(Locale.US);
final AsciiString asciiName = map.get(name);
return asciiName != null ? asciiName : new AsciiString(name);
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public FlushConsolidationHandler(int explicitFlushAfterFlushes) {
@Override
public void flush(ChannelHandlerContext ctx) throws Exception {
if (readInprogess) {
// If there is still a read in compress we are sure we will see a channelReadComplete(...) call. Thus
// If there is still a read in progress we are sure we will see a channelReadComplete(...) call. Thus
// we only need to flush if we reach the explicitFlushAfterFlushes limit.
if (++flushPendingCount == explicitFlushAfterFlushes) {
flushPendingCount = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,5 +48,4 @@ static HttpServer get(ChannelHandlerContext ctx) {

SessionProtocol protocol();
int unfinishedRequests();
ServerConfig config();
}
Original file line number Diff line number Diff line change
Expand Up @@ -161,11 +161,6 @@ public int unfinishedRequests() {
return unfinishedRequests;
}

@Override
public ServerConfig config() {
return config;
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
if (responseEncoder != null) {
Expand Down Expand Up @@ -272,7 +267,7 @@ private void handleRequest(ChannelHandlerContext ctx, DecodedHttpRequest req) th
final HttpResponse res;
try (PushHandle ignored = RequestContext.push(reqCtx)) {
req.init(reqCtx);
res = (HttpResponse) service.serve(reqCtx, req);
res = service.serve(reqCtx, req);
} catch (Throwable cause) {
reqLogBuilder.end(cause);
if (cause instanceof ResourceNotFoundException) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ public HttpResponse serve(ServiceRequestContext ctx, HttpRequest req) throws Exc
return;
}

boolean success = false;
try {
final ArmeriaHttpTransport transport = new ArmeriaHttpTransport();
final HttpChannel httpChannel = new HttpChannel(
Expand All @@ -249,9 +250,11 @@ public HttpResponse serve(ServiceRequestContext ctx, HttpRequest req) throws Exc
fillRequest(ctx, aReq, httpChannel.getRequest());

ctx.blockingTaskExecutor().execute(() -> invoke(ctx, res, transport, httpChannel));
} catch (Throwable t) {
logger.warn("{} Failed to invoke Jetty:", ctx, t);
res.close();
success = true;
} finally {
if (!success) {
res.close();
}
}
})).exceptionally(CompletionActions::log);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ public void testStateTransition() throws Exception {

// CLOSED
for (int i = 0; i < minimumRequestThreshold + 1; i++) {
ThriftReply future = stub.execute(ctx, req).cast();
ThriftReply future = stub.execute(ctx, req);
// The future is `failureRes` itself
assertThat(future.isCompletedExceptionally(), is(true));
// This is not a CircuitBreakerException
Expand All @@ -210,11 +210,11 @@ public void testStateTransition() throws Exception {
when(delegate.execute(ctx, req)).thenReturn(successRes);

// HALF OPEN
ThriftReply future2 = stub.execute(ctx, req).cast();
ThriftReply future2 = stub.execute(ctx, req);
assertThat(future2.get(), is(nullValue()));

// CLOSED
ThriftReply future3 = stub.execute(ctx, req).cast();
ThriftReply future3 = stub.execute(ctx, req);
assertThat(future3.get(), is(nullValue()));
}

Expand Down Expand Up @@ -314,7 +314,7 @@ public void testPerMethodScope() throws Exception {
}

// CLOSED (methodB)
ThriftReply future2 = stub.execute(ctxB, reqB).cast();
ThriftReply future2 = stub.execute(ctxB, reqB);
assertThat(future2.get(), is(nullValue()));
}

Expand Down Expand Up @@ -348,7 +348,7 @@ public void testExceptionFilter() throws Exception {

// CLOSED
for (int i = 0; i < minimumRequestThreshold + 1; i++) {
ThriftReply future = stub.execute(ctx, req).cast();
ThriftReply future = stub.execute(ctx, req);
// The future is `failedFuture` itself
assertThat(future.isCompletedExceptionally(), is(true));
// This is not a CircuitBreakerException
Expand All @@ -357,7 +357,7 @@ public void testExceptionFilter() throws Exception {
}

// OPEN
ThriftReply future1 = stub.execute(ctx, req).cast();
ThriftReply future1 = stub.execute(ctx, req);
// The circuit is still CLOSED
assertThat(future1.isCompletedExceptionally(), is(true));
assertThat(future1.getCause(), is(not(instanceOf(FailFastException.class))));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ private static StubCollector testRemoteInvocationWithSamplingRate(float sampling
TracingClientImpl stub = new TracingClientImpl(delegate, brave);

// do invoke
ThriftReply actualRes = stub.execute(ctx, req).cast();
ThriftReply actualRes = stub.execute(ctx, req);

assertThat(actualRes, is(res));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.junit.Test;

import com.linecorp.armeria.common.SessionProtocol;
import com.linecorp.armeria.server.ServerConfig;

import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.embedded.EmbeddedChannel;
Expand Down Expand Up @@ -118,10 +117,5 @@ public SessionProtocol protocol() {
public int unfinishedRequests() {
return unfinishedRequests;
}

@Override
public ServerConfig config() {
return null;
}
}
}

0 comments on commit c9d5482

Please sign in to comment.