Skip to content

Commit

Permalink
Add connection: "close header" to error responses.
Browse files Browse the repository at this point in the history
Wait until EMPTY_LAST_CONTENT is actually before marking request completed.
  • Loading branch information
mikkokar committed Aug 7, 2019
1 parent 12b4d15 commit 1ef2f44
Show file tree
Hide file tree
Showing 9 changed files with 76 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,14 @@ public Observable<LiveHttpResponse> execute(NettyConnection nettyConnection) {
Requests.doFinally(response, cause -> {
if (nettyConnection.isConnected()) {
removeProxyBridgeHandlers(nettyConnection);

// MK: Request is no longer ongoing. The request body onComplete event has arrived,
// and the EMPTY_LAST_CONTENT chunk has been queued to Netty Client executor.
// So the channel remains open, and the log is never written.
//
// Next, the connection is returned to the pool. The next connection gets the
// connection and sends the headers. Bang! the Netty HTTP is out of sync!
//
if (requestIsOngoing(requestRequestBodyChunkSubscriber.get())) {
LOGGER.warn("Origin responded too quickly to an ongoing request, or it was cancelled. Connection={}, Request={}.",
new Object[]{nettyConnection.channel(), this.request});
Expand Down Expand Up @@ -237,25 +245,25 @@ private WriteRequestToOrigin(Subscriber<? super LiveHttpResponse> responseFromOr
public void write() {
Channel originChannel = this.nettyConnection.channel();
if (originChannel.isActive()) {
io.netty.handler.codec.http.HttpRequest msg = makeRequest(request);
io.netty.handler.codec.http.HttpRequest messageHeaders = makeRequest(request);

originChannel.writeAndFlush(msg)
.addListener(subscribeToResponseBody());
originChannel.writeAndFlush(messageHeaders)
.addListener(subscribeToRequestBody());
} else {
responseFromOriginObserver.onError(new TransportLostException(originChannel.remoteAddress(), nettyConnection.getOrigin()));
}
}

private ChannelFutureListener subscribeToResponseBody() {
return future -> {
if (future.isSuccess()) {
future.channel().read();
private ChannelFutureListener subscribeToRequestBody() {
return headersFuture -> {
if (headersFuture.isSuccess()) {
headersFuture.channel().read();
Observable<ByteBuf> bufferObservable = toObservable(request.body()).map(Buffers::toByteBuf);
bufferObservable.subscribe(requestBodyChunkSubscriber);
} else {
String channelIdentifier = String.format("%s -> %s", nettyConnection.channel().localAddress(), nettyConnection.channel().remoteAddress());
LOGGER.error(format("Failed to send request headers. origin=%s connection=%s request=%s",
nettyConnection.getOrigin(), channelIdentifier, request), future.cause());
nettyConnection.getOrigin(), channelIdentifier, request), headersFuture.cause());
responseFromOriginObserver.onError(new TransportLostException(nettyConnection.channel().remoteAddress(), nettyConnection.getOrigin()));
}
};
Expand Down Expand Up @@ -290,8 +298,11 @@ public void onStart() {

@Override
public void onCompleted() {
completed = true;
channel.writeAndFlush(EMPTY_LAST_CONTENT);
// MK: This comes from a server thread, and Netty therefore queues it to the Client thread executor.
// But "completed" flag is set to true regardless. Really it should be considered "completed"
// only after Netty acknowledges writeAndFlush.
channel.writeAndFlush(EMPTY_LAST_CONTENT)
.addListener(future -> completed = true);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import io.netty.channel.EventLoop;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.handler.timeout.IdleStateEvent;
import org.slf4j.Logger;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;

import static com.hotels.styx.api.HttpHeaderNames.CONNECTION;
import static com.hotels.styx.api.HttpHeaderNames.CONTENT_LENGTH;
import static com.hotels.styx.api.HttpResponseStatus.BAD_GATEWAY;
import static com.hotels.styx.api.HttpResponseStatus.BAD_REQUEST;
Expand Down Expand Up @@ -517,6 +518,7 @@ private LiveHttpResponse exceptionToResponse(Throwable exception, LiveHttpReques
.build()
.newBuilder(), request)
.header(CONTENT_LENGTH, message.getBytes(UTF_8).length)
.header(CONNECTION, "close")
.build();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright (C) 2013-2018 Expedia Inc.
Copyright (C) 2013-2019 Expedia Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -47,12 +47,14 @@
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import reactor.core.publisher.Mono;
import rx.Observable;
import rx.subjects.PublishSubject;

import javax.net.ssl.SSLHandshakeException;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;

Expand Down Expand Up @@ -601,11 +603,15 @@ public void pluginPipelineThrowsAnExceptionInAcceptingRequestsState() throws Exc

handler.channelRead0(ctx, request);

verify(responseWriter).write(HttpResponse.response(INTERNAL_SERVER_ERROR)
.header(CONTENT_LENGTH, 29)
.body("Site temporarily unavailable.", UTF_8)
.build()
.stream());
ArgumentCaptor<LiveHttpResponse> captor = ArgumentCaptor.forClass(LiveHttpResponse.class);
verify(responseWriter).write(captor.capture());

HttpResponse response = Mono.from(captor.getValue().aggregate(100)).block();

assertThat(response.status(), is(INTERNAL_SERVER_ERROR));
assertThat(response.header(CONNECTION), is(Optional.of("close")));
assertThat(response.header(CONTENT_LENGTH), is(Optional.of("29")));
assertThat(response.bodyAs(UTF_8), is("Site temporarily unavailable."));

verify(responseEnhancer).enhance(any(LiveHttpResponse.Transformer.class), eq(request));
verify(errorListener).proxyErrorOccurred(request, InetSocketAddress.createUnresolved("localhost", 2), INTERNAL_SERVER_ERROR, cause);
Expand All @@ -622,6 +628,7 @@ public void requestTimeoutExceptionOccursInAcceptingRequestsStateAndTcpConnectio
verify(responseEnhancer).enhance(any(LiveHttpResponse.Transformer.class), eq(null));
verify(responseWriter).write(response(REQUEST_TIMEOUT)
.header(CONTENT_LENGTH, 15)
.header(CONNECTION, "close")
.build());
}

Expand All @@ -636,6 +643,7 @@ public void tooLongFrameExceptionOccursInIdleStateAndTcpConnectionRemainsActive(
verify(responseEnhancer).enhance(any(LiveHttpResponse.Transformer.class), eq(request));
verify(responseWriter).write(response(REQUEST_ENTITY_TOO_LARGE)
.header(CONTENT_LENGTH, 24)
.header(CONNECTION, "close")
.build());
}

Expand All @@ -650,6 +658,7 @@ public void badRequestExceptionExceptionOccursInIdleStateAndTcpConnectionRemains
verify(responseEnhancer).enhance(any(LiveHttpResponse.Transformer.class), eq(request));
verify(responseWriter).write(response(BAD_REQUEST)
.header(CONTENT_LENGTH, 11)
.header(CONNECTION, "close")
.build());
}

Expand Down Expand Up @@ -679,11 +688,16 @@ public void responseObservableEmitsContentOverflowExceptionInWaitingForResponseS
responseObservable.onError(new ContentOverflowException("Request Send Error"));

assertThat(responseUnsubscribed.get(), is(true));
verify(responseWriter).write(HttpResponse.response(BAD_GATEWAY)
.header(CONTENT_LENGTH, "29")
.body("Site temporarily unavailable.", UTF_8)
.build()
.stream());

ArgumentCaptor<LiveHttpResponse> captor = ArgumentCaptor.forClass(LiveHttpResponse.class);
verify(responseWriter).write(captor.capture());

HttpResponse response = Mono.from(captor.getValue().aggregate(100)).block();
assertThat(response.status(), is(BAD_GATEWAY));
assertThat(response.header(CONNECTION), is(Optional.of("close")));
assertThat(response.header(CONTENT_LENGTH), is(Optional.of("29")));
assertThat(response.bodyAs(UTF_8), is("Site temporarily unavailable."));

verify(responseEnhancer).enhance(any(LiveHttpResponse.Transformer.class), eq(request));

writerFuture.complete(null);
Expand All @@ -705,11 +719,15 @@ public void mapsStyxClientExceptionToInternalServerErrorInWaitingForResponseStat
responseObservable.onError(new StyxClientException("Client error occurred", new RuntimeException("Something went wrong")));

assertThat(responseUnsubscribed.get(), is(true));
verify(responseWriter).write(HttpResponse.response(INTERNAL_SERVER_ERROR)
.header(CONTENT_LENGTH, "29")
.body("Site temporarily unavailable.", UTF_8)
.build()
.stream());

ArgumentCaptor<LiveHttpResponse> captor = ArgumentCaptor.forClass(LiveHttpResponse.class);
verify(responseWriter).write(captor.capture());

HttpResponse response = Mono.from(captor.getValue().aggregate(100)).block();
assertThat(response.status(), is(INTERNAL_SERVER_ERROR));
assertThat(response.header(CONNECTION), is(Optional.of("close")));
assertThat(response.header(CONTENT_LENGTH), is(Optional.of("29")));
assertThat(response.bodyAs(UTF_8), is("Site temporarily unavailable."));

writerFuture.complete(null);
verify(statsCollector).onComplete(request.id(), 500);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright (C) 2013-2018 Expedia Inc.
Copyright (C) 2013-2019 Expedia Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand All @@ -15,12 +15,13 @@
*/
package com.hotels.styx.proxy

import java.util.Optional
import java.util.concurrent.TimeUnit.SECONDS

import com.github.tomakehurst.wiremock.client.WireMock._
import com.google.common.base.Charsets.UTF_8
import com.google.common.net.HostAndPort._
import com.hotels.styx._
import com.hotels.styx.{DefaultStyxConfiguration, StyxProxySpec, api}
import com.hotels.styx.api.HttpRequest.get
import com.hotels.styx.api.HttpResponseStatus.{BAD_REQUEST, OK}
import com.hotels.styx.support.backends.FakeHttpServer
Expand Down Expand Up @@ -93,6 +94,7 @@ class BadFramingSpec extends FunSpec

val response = decodedRequest(request)
response.status() should be(BAD_REQUEST)
response.header(api.HttpHeaderNames.CONNECTION) shouldBe Optional.of("close")
}


Expand All @@ -105,6 +107,7 @@ class BadFramingSpec extends FunSpec

val response = decodedRequest(request)
response.status() should be(BAD_REQUEST)
response.header(api.HttpHeaderNames.CONNECTION) shouldBe Optional.of("close")
}


Expand All @@ -117,6 +120,7 @@ class BadFramingSpec extends FunSpec

val response = decodedRequest(request)
response.status() should be(BAD_REQUEST)
response.header(api.HttpHeaderNames.CONNECTION) shouldBe Optional.of("close")
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import com.hotels.styx.support.matchers.RegExMatcher.matchesRegex
import com.hotels.styx.utils.HttpTestClient
import io.netty.buffer.Unpooled
import io.netty.channel.{Channel, ChannelInitializer}
import io.netty.handler.codec.http.HttpHeaderNames.CONNECTION
import io.netty.handler.codec.http.HttpMethod.GET
import io.netty.handler.codec.http.HttpResponseStatus._
import io.netty.handler.codec.http.HttpVersion.HTTP_1_1
Expand Down Expand Up @@ -89,6 +90,7 @@ class BadRequestsSpec extends FunSpec
assert(response.status == BAD_REQUEST, s"\nExpecting 400 Bad Request in message: \n$response \n\n$content\n\n")
assert(content == BAD_REQUEST.reasonPhrase())
assertThat(response.headers().get(STYX_INFO_DEFAULT), matchesRegex("noJvmRouteSet;"))
assertThat(response.headers().get(CONNECTION), is("close"))

assertThat(loggingSupport.log(), hasItem(loggingEvent(ERROR, "Failure status=\"400 Bad Request\"", "io.netty.handler.codec.DecoderException", "com.hotels.styx.server.BadRequestException.*")))
eventually(timeout(5 seconds)) {
Expand All @@ -109,6 +111,7 @@ class BadRequestsSpec extends FunSpec
assert(response.status == BAD_REQUEST, s"\nExpecting 400 Bad Request in message: \n$response \n\n$content\n\n")
assertThat(response.headers().get(STYX_INFO_DEFAULT), matchesRegex("noJvmRouteSet;"))
assert(content == BAD_REQUEST.reasonPhrase())
assertThat(response.headers().get(CONNECTION), is("close"))
}
}

Expand All @@ -130,6 +133,7 @@ class BadRequestsSpec extends FunSpec
val content = response.content().toString(UTF_8)
assert(response.status == BAD_REQUEST, s"\nExpecting 400 Bad Request in message: \n$response \n\n$content\n\n")
assertThat(response.headers().get(STYX_INFO_DEFAULT), matchesRegex("noJvmRouteSet;"))
assertThat(response.headers().get(CONNECTION), is("close"))

assertThat(loggingSupport.log(), hasItem(loggingEvent(ERROR, "Failure status=\"400 Bad Request\"", "io.netty.handler.codec.DecoderException", "com.hotels.styx.server.BadRequestException: Bad Host header. .*")))
}
Expand Down Expand Up @@ -171,6 +175,7 @@ class BadRequestsSpec extends FunSpec
val content = response.content().toString(UTF_8)
assert(response.status == REQUEST_TIMEOUT, s"\nExpecting 408 Request Timeout in message: \n$response \n\n$content\n\n")
assertThat(response.headers().get(STYX_INFO_DEFAULT), matchesRegex("noJvmRouteSet;[0-9a-f-]+"))
assertThat(response.headers().get(CONNECTION), is("close"))
assertThat(loggingSupport.log(), hasItem(loggingEvent(ERROR, "Failure status=\"408 Request Timeout\"", "com.hotels.styx.server.RequestTimeoutException", "message=DefaultHttpRequest.decodeResult: success.*")))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@
*/
package com.hotels.styx.proxy

import java.util.Optional

import com.google.common.base.Charsets.UTF_8
import com.hotels.styx.api.HttpHeaderNames.CONNECTION
import com.hotels.styx.api.HttpRequest.get
import com.hotels.styx.api.HttpResponseStatus.BAD_GATEWAY
import com.hotels.styx.client.StyxHeaderConfig.STYX_INFO_DEFAULT
Expand Down Expand Up @@ -108,6 +111,8 @@ class BadResponseFromOriginSpec extends FunSpec
response.status() should be(BAD_GATEWAY)
assertThat(response.headers().get(STYX_INFO_DEFAULT), matches(matchesRegex("noJvmRouteSet;[0-9a-f-]+")))
response.bodyAs(UTF_8) should be("Site temporarily unavailable.")
response.header(CONNECTION) should be(Optional.of("close"))

eventually(timeout(7.seconds)) {
styxServer.metricsSnapshot.count("styx.response.status.502").get should be(1)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.hotels.styx.proxy

import java.nio.charset.StandardCharsets.UTF_8
import java.util.Optional

import com.github.tomakehurst.wiremock.client.WireMock._
import com.github.tomakehurst.wiremock.client.{RequestPatternBuilder, WireMock}
Expand Down Expand Up @@ -186,6 +187,7 @@ class ProxySpec extends FunSpec

val resp = decodedRequest(req)
assert(resp.status() == SERVICE_UNAVAILABLE)
assert(resp.header(CONNECTION) == Optional.of("close"))
}
}

Expand All @@ -203,6 +205,7 @@ class ProxySpec extends FunSpec
println("body: " + resp.bodyAs(UTF_8))

assert(resp.status() == BAD_GATEWAY)
assert(resp.header(CONNECTION) == Optional.of("close"))
}
}

Expand Down

This file was deleted.

0 comments on commit 1ef2f44

Please sign in to comment.