diff --git a/components/client/src/main/java/com/hotels/styx/client/netty/connectionpool/HttpRequestOperation.java b/components/client/src/main/java/com/hotels/styx/client/netty/connectionpool/HttpRequestOperation.java index d301fdaf85..da069c0c02 100644 --- a/components/client/src/main/java/com/hotels/styx/client/netty/connectionpool/HttpRequestOperation.java +++ b/components/client/src/main/java/com/hotels/styx/client/netty/connectionpool/HttpRequestOperation.java @@ -156,6 +156,14 @@ public Observable execute(NettyConnection nettyConnection) { Requests.doFinally(response, cause -> { if (nettyConnection.isConnected()) { removeProxyBridgeHandlers(nettyConnection); + + // MK: Request is no longer ongoing. The request body onComplete event has arrived, + // and the EMPTY_LAST_CONTENT chunk has been queued to Netty Client executor. + // So the channel remains open, and the log is never written. + // + // Next, the connection is returned to the pool. The next connection gets the + // connection and sends the headers. Bang! the Netty HTTP is out of sync! + // if (requestIsOngoing(requestRequestBodyChunkSubscriber.get())) { LOGGER.warn("Origin responded too quickly to an ongoing request, or it was cancelled. Connection={}, Request={}.", new Object[]{nettyConnection.channel(), this.request}); @@ -237,25 +245,25 @@ private WriteRequestToOrigin(Subscriber responseFromOr public void write() { Channel originChannel = this.nettyConnection.channel(); if (originChannel.isActive()) { - io.netty.handler.codec.http.HttpRequest msg = makeRequest(request); + io.netty.handler.codec.http.HttpRequest messageHeaders = makeRequest(request); - originChannel.writeAndFlush(msg) - .addListener(subscribeToResponseBody()); + originChannel.writeAndFlush(messageHeaders) + .addListener(subscribeToRequestBody()); } else { responseFromOriginObserver.onError(new TransportLostException(originChannel.remoteAddress(), nettyConnection.getOrigin())); } } - private ChannelFutureListener subscribeToResponseBody() { - return future -> { - if (future.isSuccess()) { - future.channel().read(); + private ChannelFutureListener subscribeToRequestBody() { + return headersFuture -> { + if (headersFuture.isSuccess()) { + headersFuture.channel().read(); Observable bufferObservable = toObservable(request.body()).map(Buffers::toByteBuf); bufferObservable.subscribe(requestBodyChunkSubscriber); } else { String channelIdentifier = String.format("%s -> %s", nettyConnection.channel().localAddress(), nettyConnection.channel().remoteAddress()); LOGGER.error(format("Failed to send request headers. origin=%s connection=%s request=%s", - nettyConnection.getOrigin(), channelIdentifier, request), future.cause()); + nettyConnection.getOrigin(), channelIdentifier, request), headersFuture.cause()); responseFromOriginObserver.onError(new TransportLostException(nettyConnection.channel().remoteAddress(), nettyConnection.getOrigin())); } }; @@ -290,8 +298,11 @@ public void onStart() { @Override public void onCompleted() { - completed = true; - channel.writeAndFlush(EMPTY_LAST_CONTENT); + // MK: This comes from a server thread, and Netty therefore queues it to the Client thread executor. + // But "completed" flag is set to true regardless. Really it should be considered "completed" + // only after Netty acknowledges writeAndFlush. + channel.writeAndFlush(EMPTY_LAST_CONTENT) + .addListener(future -> completed = true); } @Override diff --git a/components/client/src/main/java/com/hotels/styx/client/netty/connectionpool/NettyToStyxResponsePropagator.java b/components/client/src/main/java/com/hotels/styx/client/netty/connectionpool/NettyToStyxResponsePropagator.java index fdfca95059..ab9a0b2c5e 100644 --- a/components/client/src/main/java/com/hotels/styx/client/netty/connectionpool/NettyToStyxResponsePropagator.java +++ b/components/client/src/main/java/com/hotels/styx/client/netty/connectionpool/NettyToStyxResponsePropagator.java @@ -31,7 +31,6 @@ import io.netty.channel.EventLoop; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.HttpContent; -import io.netty.handler.codec.http.HttpHeaderNames; import io.netty.handler.codec.http.LastHttpContent; import io.netty.handler.timeout.IdleStateEvent; import org.slf4j.Logger; diff --git a/components/server/src/main/java/com/hotels/styx/server/netty/connectors/HttpPipelineHandler.java b/components/server/src/main/java/com/hotels/styx/server/netty/connectors/HttpPipelineHandler.java index 9129290c56..37f9d15571 100644 --- a/components/server/src/main/java/com/hotels/styx/server/netty/connectors/HttpPipelineHandler.java +++ b/components/server/src/main/java/com/hotels/styx/server/netty/connectors/HttpPipelineHandler.java @@ -63,6 +63,7 @@ import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; +import static com.hotels.styx.api.HttpHeaderNames.CONNECTION; import static com.hotels.styx.api.HttpHeaderNames.CONTENT_LENGTH; import static com.hotels.styx.api.HttpResponseStatus.BAD_GATEWAY; import static com.hotels.styx.api.HttpResponseStatus.BAD_REQUEST; @@ -517,6 +518,7 @@ private LiveHttpResponse exceptionToResponse(Throwable exception, LiveHttpReques .build() .newBuilder(), request) .header(CONTENT_LENGTH, message.getBytes(UTF_8).length) + .header(CONNECTION, "close") .build(); } diff --git a/components/server/src/test/java/com/hotels/styx/server/netty/connectors/HttpPipelineHandlerTest.java b/components/server/src/test/java/com/hotels/styx/server/netty/connectors/HttpPipelineHandlerTest.java index 752da95d78..6ec5ea6f8d 100644 --- a/components/server/src/test/java/com/hotels/styx/server/netty/connectors/HttpPipelineHandlerTest.java +++ b/components/server/src/test/java/com/hotels/styx/server/netty/connectors/HttpPipelineHandlerTest.java @@ -1,5 +1,5 @@ /* - Copyright (C) 2013-2018 Expedia Inc. + Copyright (C) 2013-2019 Expedia Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -47,12 +47,14 @@ import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import reactor.core.publisher.Mono; import rx.Observable; import rx.subjects.PublishSubject; import javax.net.ssl.SSLHandshakeException; import java.io.IOException; import java.net.InetSocketAddress; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicBoolean; @@ -601,11 +603,15 @@ public void pluginPipelineThrowsAnExceptionInAcceptingRequestsState() throws Exc handler.channelRead0(ctx, request); - verify(responseWriter).write(HttpResponse.response(INTERNAL_SERVER_ERROR) - .header(CONTENT_LENGTH, 29) - .body("Site temporarily unavailable.", UTF_8) - .build() - .stream()); + ArgumentCaptor captor = ArgumentCaptor.forClass(LiveHttpResponse.class); + verify(responseWriter).write(captor.capture()); + + HttpResponse response = Mono.from(captor.getValue().aggregate(100)).block(); + + assertThat(response.status(), is(INTERNAL_SERVER_ERROR)); + assertThat(response.header(CONNECTION), is(Optional.of("close"))); + assertThat(response.header(CONTENT_LENGTH), is(Optional.of("29"))); + assertThat(response.bodyAs(UTF_8), is("Site temporarily unavailable.")); verify(responseEnhancer).enhance(any(LiveHttpResponse.Transformer.class), eq(request)); verify(errorListener).proxyErrorOccurred(request, InetSocketAddress.createUnresolved("localhost", 2), INTERNAL_SERVER_ERROR, cause); @@ -622,6 +628,7 @@ public void requestTimeoutExceptionOccursInAcceptingRequestsStateAndTcpConnectio verify(responseEnhancer).enhance(any(LiveHttpResponse.Transformer.class), eq(null)); verify(responseWriter).write(response(REQUEST_TIMEOUT) .header(CONTENT_LENGTH, 15) + .header(CONNECTION, "close") .build()); } @@ -636,6 +643,7 @@ public void tooLongFrameExceptionOccursInIdleStateAndTcpConnectionRemainsActive( verify(responseEnhancer).enhance(any(LiveHttpResponse.Transformer.class), eq(request)); verify(responseWriter).write(response(REQUEST_ENTITY_TOO_LARGE) .header(CONTENT_LENGTH, 24) + .header(CONNECTION, "close") .build()); } @@ -650,6 +658,7 @@ public void badRequestExceptionExceptionOccursInIdleStateAndTcpConnectionRemains verify(responseEnhancer).enhance(any(LiveHttpResponse.Transformer.class), eq(request)); verify(responseWriter).write(response(BAD_REQUEST) .header(CONTENT_LENGTH, 11) + .header(CONNECTION, "close") .build()); } @@ -679,11 +688,16 @@ public void responseObservableEmitsContentOverflowExceptionInWaitingForResponseS responseObservable.onError(new ContentOverflowException("Request Send Error")); assertThat(responseUnsubscribed.get(), is(true)); - verify(responseWriter).write(HttpResponse.response(BAD_GATEWAY) - .header(CONTENT_LENGTH, "29") - .body("Site temporarily unavailable.", UTF_8) - .build() - .stream()); + + ArgumentCaptor captor = ArgumentCaptor.forClass(LiveHttpResponse.class); + verify(responseWriter).write(captor.capture()); + + HttpResponse response = Mono.from(captor.getValue().aggregate(100)).block(); + assertThat(response.status(), is(BAD_GATEWAY)); + assertThat(response.header(CONNECTION), is(Optional.of("close"))); + assertThat(response.header(CONTENT_LENGTH), is(Optional.of("29"))); + assertThat(response.bodyAs(UTF_8), is("Site temporarily unavailable.")); + verify(responseEnhancer).enhance(any(LiveHttpResponse.Transformer.class), eq(request)); writerFuture.complete(null); @@ -705,11 +719,15 @@ public void mapsStyxClientExceptionToInternalServerErrorInWaitingForResponseStat responseObservable.onError(new StyxClientException("Client error occurred", new RuntimeException("Something went wrong"))); assertThat(responseUnsubscribed.get(), is(true)); - verify(responseWriter).write(HttpResponse.response(INTERNAL_SERVER_ERROR) - .header(CONTENT_LENGTH, "29") - .body("Site temporarily unavailable.", UTF_8) - .build() - .stream()); + + ArgumentCaptor captor = ArgumentCaptor.forClass(LiveHttpResponse.class); + verify(responseWriter).write(captor.capture()); + + HttpResponse response = Mono.from(captor.getValue().aggregate(100)).block(); + assertThat(response.status(), is(INTERNAL_SERVER_ERROR)); + assertThat(response.header(CONNECTION), is(Optional.of("close"))); + assertThat(response.header(CONTENT_LENGTH), is(Optional.of("29"))); + assertThat(response.bodyAs(UTF_8), is("Site temporarily unavailable.")); writerFuture.complete(null); verify(statsCollector).onComplete(request.id(), 500); diff --git a/system-tests/e2e-suite/src/test/scala/com/hotels/styx/proxy/BadFramingSpec.scala b/system-tests/e2e-suite/src/test/scala/com/hotels/styx/proxy/BadFramingSpec.scala index d709e5250d..9cbdc06cbc 100644 --- a/system-tests/e2e-suite/src/test/scala/com/hotels/styx/proxy/BadFramingSpec.scala +++ b/system-tests/e2e-suite/src/test/scala/com/hotels/styx/proxy/BadFramingSpec.scala @@ -1,5 +1,5 @@ /* - Copyright (C) 2013-2018 Expedia Inc. + Copyright (C) 2013-2019 Expedia Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -15,12 +15,13 @@ */ package com.hotels.styx.proxy +import java.util.Optional import java.util.concurrent.TimeUnit.SECONDS import com.github.tomakehurst.wiremock.client.WireMock._ import com.google.common.base.Charsets.UTF_8 import com.google.common.net.HostAndPort._ -import com.hotels.styx._ +import com.hotels.styx.{DefaultStyxConfiguration, StyxProxySpec, api} import com.hotels.styx.api.HttpRequest.get import com.hotels.styx.api.HttpResponseStatus.{BAD_REQUEST, OK} import com.hotels.styx.support.backends.FakeHttpServer @@ -93,6 +94,7 @@ class BadFramingSpec extends FunSpec val response = decodedRequest(request) response.status() should be(BAD_REQUEST) + response.header(api.HttpHeaderNames.CONNECTION) shouldBe Optional.of("close") } @@ -105,6 +107,7 @@ class BadFramingSpec extends FunSpec val response = decodedRequest(request) response.status() should be(BAD_REQUEST) + response.header(api.HttpHeaderNames.CONNECTION) shouldBe Optional.of("close") } @@ -117,6 +120,7 @@ class BadFramingSpec extends FunSpec val response = decodedRequest(request) response.status() should be(BAD_REQUEST) + response.header(api.HttpHeaderNames.CONNECTION) shouldBe Optional.of("close") } diff --git a/system-tests/e2e-suite/src/test/scala/com/hotels/styx/proxy/BadRequestsSpec.scala b/system-tests/e2e-suite/src/test/scala/com/hotels/styx/proxy/BadRequestsSpec.scala index 133a2984dc..082e9acddf 100644 --- a/system-tests/e2e-suite/src/test/scala/com/hotels/styx/proxy/BadRequestsSpec.scala +++ b/system-tests/e2e-suite/src/test/scala/com/hotels/styx/proxy/BadRequestsSpec.scala @@ -32,6 +32,7 @@ import com.hotels.styx.support.matchers.RegExMatcher.matchesRegex import com.hotels.styx.utils.HttpTestClient import io.netty.buffer.Unpooled import io.netty.channel.{Channel, ChannelInitializer} +import io.netty.handler.codec.http.HttpHeaderNames.CONNECTION import io.netty.handler.codec.http.HttpMethod.GET import io.netty.handler.codec.http.HttpResponseStatus._ import io.netty.handler.codec.http.HttpVersion.HTTP_1_1 @@ -89,6 +90,7 @@ class BadRequestsSpec extends FunSpec assert(response.status == BAD_REQUEST, s"\nExpecting 400 Bad Request in message: \n$response \n\n$content\n\n") assert(content == BAD_REQUEST.reasonPhrase()) assertThat(response.headers().get(STYX_INFO_DEFAULT), matchesRegex("noJvmRouteSet;")) + assertThat(response.headers().get(CONNECTION), is("close")) assertThat(loggingSupport.log(), hasItem(loggingEvent(ERROR, "Failure status=\"400 Bad Request\"", "io.netty.handler.codec.DecoderException", "com.hotels.styx.server.BadRequestException.*"))) eventually(timeout(5 seconds)) { @@ -109,6 +111,7 @@ class BadRequestsSpec extends FunSpec assert(response.status == BAD_REQUEST, s"\nExpecting 400 Bad Request in message: \n$response \n\n$content\n\n") assertThat(response.headers().get(STYX_INFO_DEFAULT), matchesRegex("noJvmRouteSet;")) assert(content == BAD_REQUEST.reasonPhrase()) + assertThat(response.headers().get(CONNECTION), is("close")) } } @@ -130,6 +133,7 @@ class BadRequestsSpec extends FunSpec val content = response.content().toString(UTF_8) assert(response.status == BAD_REQUEST, s"\nExpecting 400 Bad Request in message: \n$response \n\n$content\n\n") assertThat(response.headers().get(STYX_INFO_DEFAULT), matchesRegex("noJvmRouteSet;")) + assertThat(response.headers().get(CONNECTION), is("close")) assertThat(loggingSupport.log(), hasItem(loggingEvent(ERROR, "Failure status=\"400 Bad Request\"", "io.netty.handler.codec.DecoderException", "com.hotels.styx.server.BadRequestException: Bad Host header. .*"))) } @@ -171,6 +175,7 @@ class BadRequestsSpec extends FunSpec val content = response.content().toString(UTF_8) assert(response.status == REQUEST_TIMEOUT, s"\nExpecting 408 Request Timeout in message: \n$response \n\n$content\n\n") assertThat(response.headers().get(STYX_INFO_DEFAULT), matchesRegex("noJvmRouteSet;[0-9a-f-]+")) + assertThat(response.headers().get(CONNECTION), is("close")) assertThat(loggingSupport.log(), hasItem(loggingEvent(ERROR, "Failure status=\"408 Request Timeout\"", "com.hotels.styx.server.RequestTimeoutException", "message=DefaultHttpRequest.decodeResult: success.*"))) } } diff --git a/system-tests/e2e-suite/src/test/scala/com/hotels/styx/proxy/BadResponseFromOriginSpec.scala b/system-tests/e2e-suite/src/test/scala/com/hotels/styx/proxy/BadResponseFromOriginSpec.scala index c2231b1b28..332feccbad 100644 --- a/system-tests/e2e-suite/src/test/scala/com/hotels/styx/proxy/BadResponseFromOriginSpec.scala +++ b/system-tests/e2e-suite/src/test/scala/com/hotels/styx/proxy/BadResponseFromOriginSpec.scala @@ -15,7 +15,10 @@ */ package com.hotels.styx.proxy +import java.util.Optional + import com.google.common.base.Charsets.UTF_8 +import com.hotels.styx.api.HttpHeaderNames.CONNECTION import com.hotels.styx.api.HttpRequest.get import com.hotels.styx.api.HttpResponseStatus.BAD_GATEWAY import com.hotels.styx.client.StyxHeaderConfig.STYX_INFO_DEFAULT @@ -108,6 +111,8 @@ class BadResponseFromOriginSpec extends FunSpec response.status() should be(BAD_GATEWAY) assertThat(response.headers().get(STYX_INFO_DEFAULT), matches(matchesRegex("noJvmRouteSet;[0-9a-f-]+"))) response.bodyAs(UTF_8) should be("Site temporarily unavailable.") + response.header(CONNECTION) should be(Optional.of("close")) + eventually(timeout(7.seconds)) { styxServer.metricsSnapshot.count("styx.response.status.502").get should be(1) } diff --git a/system-tests/e2e-suite/src/test/scala/com/hotels/styx/proxy/ProxySpec.scala b/system-tests/e2e-suite/src/test/scala/com/hotels/styx/proxy/ProxySpec.scala index 639326f624..0b996136a5 100644 --- a/system-tests/e2e-suite/src/test/scala/com/hotels/styx/proxy/ProxySpec.scala +++ b/system-tests/e2e-suite/src/test/scala/com/hotels/styx/proxy/ProxySpec.scala @@ -16,6 +16,7 @@ package com.hotels.styx.proxy import java.nio.charset.StandardCharsets.UTF_8 +import java.util.Optional import com.github.tomakehurst.wiremock.client.WireMock._ import com.github.tomakehurst.wiremock.client.{RequestPatternBuilder, WireMock} @@ -186,6 +187,7 @@ class ProxySpec extends FunSpec val resp = decodedRequest(req) assert(resp.status() == SERVICE_UNAVAILABLE) + assert(resp.header(CONNECTION) == Optional.of("close")) } } @@ -203,6 +205,7 @@ class ProxySpec extends FunSpec println("body: " + resp.bodyAs(UTF_8)) assert(resp.status() == BAD_GATEWAY) + assert(resp.header(CONNECTION) == Optional.of("close")) } } diff --git a/system-tests/e2e-suite/src/test/scala/com/hotels/styx/proxy/RoutingPipelineSpec.scala b/system-tests/e2e-suite/src/test/scala/com/hotels/styx/proxy/RoutingPipelineSpec.scala deleted file mode 100644 index 195bf12ffa..0000000000 --- a/system-tests/e2e-suite/src/test/scala/com/hotels/styx/proxy/RoutingPipelineSpec.scala +++ /dev/null @@ -1,37 +0,0 @@ -/* - Copyright (C) 2013-2018 Expedia Inc. - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - */ -package com.hotels.styx.proxy - -import com.hotels.styx.StyxProxySpec -import com.hotels.styx.routing.handlers.StaticResponseHandler.StaticResponseConfig -//import com.hotels.styx.support.configuration.routing.{ConfigBlock, InterceptorPipelineConfig, StaticResponseConfig} -import com.hotels.styx.support.configuration.{ProxyConfig, StyxConfig} -import org.scalatest.{BeforeAndAfter, FunSpec} - -// -// -//class RoutingPipelineSpec extends FunSpec with StyxProxySpec with BeforeAndAfter { -// -// private val y = ConfigBlock("StaticResponse", "", StaticResponseConfig(200, "Hello, world")) -// -//// val pipeline = InterceptorPipelineConfig(List("rewrite", "log"), y) -// -// override def styxConfig: StyxConfig = StyxConfig( -// proxyConfig = ProxyConfig(requestTimeoutMillis = 300), -// -// ) -//} -//