From 1974414eaa63970618df86fc4cce8fd5d4920e6d Mon Sep 17 00:00:00 2001 From: Slava Fomin Date: Mon, 11 May 2020 16:21:31 +0300 Subject: [PATCH] Read all before closing client connection --- .../proxy/impl/ProxyToServerConnection.java | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/src/main/java/org/littleshoot/proxy/impl/ProxyToServerConnection.java b/src/main/java/org/littleshoot/proxy/impl/ProxyToServerConnection.java index 3cb87c38c..fb5d50776 100644 --- a/src/main/java/org/littleshoot/proxy/impl/ProxyToServerConnection.java +++ b/src/main/java/org/littleshoot/proxy/impl/ProxyToServerConnection.java @@ -13,6 +13,7 @@ import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; +import io.netty.channel.ChannelPromise; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.channel.udt.nio.NioUdtProvider; import io.netty.handler.codec.http.FullHttpResponse; @@ -92,6 +93,8 @@ public class ProxyToServerConnection extends ProxyConnection { private volatile ChainedProxy chainedProxy; private final Queue availableChainedProxies; + private ChannelPromise lastUpstreamReadProcessed; + /** * The filters to apply to response/chunks received from server. */ @@ -262,6 +265,8 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) { } else { ReferenceCountUtil.retain(httpResponse); + lastUpstreamReadProcessed = channel.newPromise(); + proxyServer.getMessageProcessingExecutor() .execute(() -> { try { @@ -274,6 +279,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) { exceptionCaught(ctx, e); } finally { ReferenceCountUtil.release(httpResponse); + lastUpstreamReadProcessed.setSuccess(); } }); } @@ -468,7 +474,16 @@ protected void disconnected() { LOG.error("Unable to record connectionFailed", e); } } - clientConnection.serverDisconnected(this); + final ProxyToServerConnection serverConnection = this; + + if (lastUpstreamReadProcessed == null) { + clientConnection.serverDisconnected(serverConnection); + } else { + lastUpstreamReadProcessed.addListener(future -> + clientConnection.serverDisconnected(serverConnection)); + } + + } @Override