Skip to content

Commit a405cb6

Browse files
authored
Ensure 100-Continue is handled when collecting client's metrics (#3895)
Fixes #3883 Signed-off-by: Violeta Georgieva <696661+violetagg@users.noreply.github.com>
1 parent 2171697 commit a405cb6

File tree

2 files changed

+71
-4
lines changed

2 files changed

+71
-4
lines changed

reactor-netty-http/src/main/java/reactor/netty/http/client/AbstractHttpClientMetricsHandler.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import io.netty.channel.ChannelPromise;
2424
import io.netty.handler.codec.http.HttpRequest;
2525
import io.netty.handler.codec.http.HttpResponse;
26+
import io.netty.handler.codec.http.HttpResponseStatus;
2627
import io.netty.handler.codec.http.LastHttpContent;
2728
import reactor.netty.channel.ChannelOperations;
2829
import reactor.util.annotation.Nullable;
@@ -73,6 +74,8 @@ abstract class AbstractHttpClientMetricsHandler extends ChannelDuplexHandler {
7374

7475
int lastWriteSeq;
7576

77+
boolean isNot100Continue;
78+
7679
protected AbstractHttpClientMetricsHandler(SocketAddress remoteAddress, @Nullable SocketAddress proxyAddress, @Nullable Function<String, String> uriTagValue) {
7780
this.proxyAddress = proxyAddress;
7881
this.remoteAddress = remoteAddress;
@@ -93,6 +96,7 @@ protected AbstractHttpClientMetricsHandler(AbstractHttpClientMetricsHandler copy
9396
this.uriTagValue = copy.uriTagValue;
9497
this.lastWriteSeq = copy.lastWriteSeq;
9598
this.lastReadSeq = copy.lastReadSeq;
99+
this.isNot100Continue = copy.isNot100Continue;
96100
}
97101

98102
@Override
@@ -139,14 +143,18 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
139143
public void channelRead(ChannelHandlerContext ctx, Object msg) {
140144
try {
141145
if (msg instanceof HttpResponse) {
142-
status = ((HttpResponse) msg).status().codeAsText().toString();
146+
HttpResponseStatus httpResponseStatus = ((HttpResponse) msg).status();
147+
isNot100Continue = httpResponseStatus.code() != HttpResponseStatus.CONTINUE.code();
148+
if (isNot100Continue) {
149+
status = httpResponseStatus.codeAsText().toString();
143150

144-
startRead((HttpResponse) msg);
151+
startRead((HttpResponse) msg);
152+
}
145153
}
146154

147155
dataReceived += extractProcessedDataFromBuffer(msg);
148156

149-
if (msg instanceof LastHttpContent) {
157+
if (isNot100Continue && msg instanceof LastHttpContent) {
150158
// Detect if we have received an early response before the request has been fully flushed.
151159
// In this case, invoke #recordWrite now (because next we will reset all class fields).
152160
lastReadSeq = (lastReadSeq + 1) & 0x7F_FF_FF_FF;

reactor-netty-http/src/test/java/reactor/netty/http/HttpMetricsHandlerTests.java

Lines changed: 60 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030
import io.netty.channel.group.ChannelGroup;
3131
import io.netty.channel.group.DefaultChannelGroup;
3232
import io.netty.channel.unix.DomainSocketAddress;
33+
import io.netty.handler.codec.http.HttpHeaderNames;
34+
import io.netty.handler.codec.http.HttpHeaderValues;
3335
import io.netty.handler.codec.http.HttpRequest;
3436
import io.netty.handler.codec.http.LastHttpContent;
3537
import io.netty.handler.codec.http2.Http2StreamChannel;
@@ -169,13 +171,14 @@ static void createSelfSignedCertificate() throws CertificateException {
169171
* <ul>
170172
* <li> /1 is used by testExistingEndpoint test</li>
171173
* <li> /2 is used by testExistingEndpoint, and testUriTagValueFunctionNotSharedForClient tests</li>
172-
* <li> /3 does not exists but is used by testNonExistingEndpoint, checkExpectationsNonExisting tests</li>
174+
* <li> /3 does not exist but is used by testNonExistingEndpoint, checkExpectationsNonExisting tests</li>
173175
* <li> /4 is used by testServerConnectionsMicrometer test</li>
174176
* <li> /5 is used by testServerConnectionsRecorder test</li>
175177
* <li> /6 is used by testServerConnectionsMicrometerConnectionClose test</li>
176178
* <li> /7 is used by testServerConnectionsRecorderConnectionClose test</li>
177179
* <li> /8 is used by testServerConnectionsWebsocketMicrometer test</li>
178180
* <li> /9 is used by testServerConnectionsWebsocketRecorder test</li>
181+
* <li> /10 is used by testIssue3883 for testing 100-Continue</li>
179182
* </ul>
180183
*/
181184
@BeforeEach
@@ -210,6 +213,10 @@ void setUp() {
210213
out.sendString(Mono.just("Hello World!").doOnNext(b -> checkServerConnectionsMicrometer(req)))))
211214
.get("/9", (req, res) -> res.sendWebsocket((in, out) ->
212215
out.sendString(Mono.just("Hello World!").doOnNext(b -> checkServerConnectionsRecorder(req)))))
216+
.post("/10", (req, res) -> req.receive()
217+
.aggregate()
218+
.asString()
219+
.flatMap(s -> res.header("Connection", "close").sendString(Mono.just(s)).then()))
213220
);
214221

215222
provider = ConnectionProvider.create("HttpMetricsHandlerTests", 1);
@@ -1076,6 +1083,58 @@ void testIssue3060ConnectTimeoutException(@SuppressWarnings("unused") HttpProtoc
10761083
assertTimer(registry, CLIENT_CONNECT_TIME, summaryTags).hasCountEqualTo(1);
10771084
}
10781085

1086+
@ParameterizedTest
1087+
@MethodSource("httpCompatibleProtocols")
1088+
void testIssue3883(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols,
1089+
@Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx) throws Exception {
1090+
CountDownLatch responseSent = new CountDownLatch(1); // response fully sent by the server
1091+
AtomicReference<CountDownLatch> responseSentRef = new AtomicReference<>(responseSent);
1092+
ResponseSentHandler responseSentHandler = ResponseSentHandler.INSTANCE;
1093+
disposableServer = customizeServerOptions(httpServer, serverCtx, serverProtocols)
1094+
.doOnConnection(cnx -> responseSentHandler.register(responseSentRef, cnx.channel().pipeline()))
1095+
.bindNow();
1096+
1097+
AtomicReference<SocketAddress> serverAddress = new AtomicReference<>();
1098+
CountDownLatch clientCompleted = new CountDownLatch(1); // client received full response
1099+
AtomicReference<CountDownLatch> clientCompletedRef = new AtomicReference<>(clientCompleted);
1100+
httpClient = customizeClientOptions(httpClient, clientCtx, clientProtocols)
1101+
.doAfterRequest((req, conn) -> serverAddress.set(conn.channel().remoteAddress()))
1102+
.doAfterResponseSuccess((resp, conn) -> clientCompletedRef.get().countDown());
1103+
1104+
httpClient.headers(h -> h.add(HttpHeaderNames.EXPECT, HttpHeaderValues.CONTINUE))
1105+
.post()
1106+
.uri("/10")
1107+
.send(body)
1108+
.responseContent()
1109+
.aggregate()
1110+
.asString()
1111+
.as(StepVerifier::create)
1112+
.expectNext("Hello World!")
1113+
.expectComplete()
1114+
.verify(Duration.ofSeconds(5));
1115+
1116+
assertThat(responseSentRef.get().await(30, TimeUnit.SECONDS)).as("responseSentRef latch await").isTrue();
1117+
assertThat(clientCompletedRef.get().await(30, TimeUnit.SECONDS)).as("clientCompletedRef latch await").isTrue();
1118+
1119+
InetSocketAddress sa = (InetSocketAddress) serverAddress.get();
1120+
1121+
int[] numWrites = new int[]{14, 25};
1122+
int[] bytesWrite = new int[]{160, 243};
1123+
if ((serverProtocols.length == 1 && serverProtocols[0] == HttpProtocol.HTTP11) ||
1124+
(clientProtocols.length == 1 && clientProtocols[0] == HttpProtocol.HTTP11)) {
1125+
numWrites = new int[]{14, 28};
1126+
bytesWrite = new int[]{151, 310};
1127+
}
1128+
else if (clientProtocols.length == 2 &&
1129+
Arrays.equals(clientProtocols, new HttpProtocol[]{HttpProtocol.H2C, HttpProtocol.HTTP11})) {
1130+
numWrites = new int[]{17, 28};
1131+
bytesWrite = new int[]{315, 435};
1132+
}
1133+
1134+
checkExpectationsExisting("/10", sa.getHostString() + ":" + sa.getPort(), 1, serverCtx != null,
1135+
numWrites[0], bytesWrite[0]);
1136+
}
1137+
10791138
static Stream<Arguments> combinationsIssue2956() {
10801139
return Stream.of(
10811140
// isCustomRecorder, isHttp2

0 commit comments

Comments
 (0)