Skip to content

Commit cbe8e84

Browse files
committed
Netty Expect 100-Continue client: chunked/buffered input (with example)
Signed-off-by: Maxim Nesen <maxim.nesen@oracle.com>
1 parent 382f69e commit cbe8e84

File tree

10 files changed

+977
-220
lines changed

10 files changed

+977
-220
lines changed
Lines changed: 93 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2023 Oracle and/or its affiliates. All rights reserved.
2+
* Copyright (c) 2023, 2025 Oracle and/or its affiliates. All rights reserved.
33
*
44
* This program and the accompanying materials are made available under the
55
* terms of the Eclipse Public License v. 2.0, which is available at
@@ -16,111 +16,130 @@
1616

1717
package org.glassfish.jersey.netty.connector;
1818

19-
import io.netty.channel.Channel;
20-
import io.netty.channel.ChannelFuture;
2119
import io.netty.channel.ChannelHandlerContext;
2220
import io.netty.channel.ChannelInboundHandlerAdapter;
23-
import io.netty.handler.codec.http.DefaultFullHttpRequest;
24-
import io.netty.handler.codec.http.HttpHeaderNames;
25-
import io.netty.handler.codec.http.HttpRequest;
21+
import io.netty.handler.codec.http.FullHttpMessage;
2622
import io.netty.handler.codec.http.HttpResponse;
2723
import io.netty.handler.codec.http.HttpResponseStatus;
28-
import io.netty.handler.codec.http.HttpUtil;
29-
import org.glassfish.jersey.client.ClientRequest;
24+
import io.netty.handler.codec.http.LastHttpContent;
3025

3126
import javax.ws.rs.ProcessingException;
27+
import java.io.IOException;
28+
import java.util.ArrayList;
3229
import java.util.Arrays;
3330
import java.util.List;
34-
import java.util.concurrent.CompletableFuture;
35-
import java.util.concurrent.ExecutionException;
36-
import java.util.concurrent.TimeUnit;
31+
import java.util.concurrent.CountDownLatch;
3732
import java.util.concurrent.TimeoutException;
3833

3934
public class JerseyExpectContinueHandler extends ChannelInboundHandlerAdapter {
4035

41-
private boolean isExpected;
36+
private ExpectationState currentState = ExpectationState.IDLE;
4237

43-
private static final List<HttpResponseStatus> statusesToBeConsidered = Arrays.asList(HttpResponseStatus.CONTINUE,
44-
HttpResponseStatus.UNAUTHORIZED, HttpResponseStatus.EXPECTATION_FAILED,
45-
HttpResponseStatus.METHOD_NOT_ALLOWED, HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE);
38+
private static final List<HttpResponseStatus> finalErrorStatuses = Arrays.asList(HttpResponseStatus.UNAUTHORIZED,
39+
HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE);
40+
private static final List<HttpResponseStatus> reSendErrorStatuses = Arrays.asList(
41+
HttpResponseStatus.METHOD_NOT_ALLOWED,
42+
HttpResponseStatus.EXPECTATION_FAILED);
4643

47-
private CompletableFuture<HttpResponseStatus> expectedFuture = new CompletableFuture<>();
44+
private static final List<HttpResponseStatus> errorStatuses = new ArrayList<>(finalErrorStatuses);
45+
private static final List<HttpResponseStatus> statusesToBeConsidered = new ArrayList<>(reSendErrorStatuses);
46+
47+
static {
48+
errorStatuses.addAll(reSendErrorStatuses);
49+
statusesToBeConsidered.addAll(finalErrorStatuses);
50+
statusesToBeConsidered.add(HttpResponseStatus.CONTINUE);
51+
}
52+
53+
private HttpResponseStatus status = null;
54+
55+
private CountDownLatch latch = null;
56+
57+
private boolean propagateLastMessage = false;
4858

4959
@Override
5060
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
51-
if (isExpected && msg instanceof HttpResponse) {
52-
final HttpResponse response = (HttpResponse) msg;
53-
if (statusesToBeConsidered.contains(response.status())) {
54-
expectedFuture.complete(response.status());
55-
}
56-
if (!HttpResponseStatus.CONTINUE.equals(response.status())) {
61+
62+
if (checkExpectResponse(msg)) {
63+
currentState = ExpectationState.AWAITING;
64+
}
65+
switch (currentState) {
66+
case AWAITING:
67+
final HttpResponse response = (HttpResponse) msg;
68+
status = response.status();
69+
70+
boolean handshakeDone = processErrorStatuses(status, ctx) || msg instanceof FullHttpMessage;
71+
currentState = (handshakeDone) ? ExpectationState.IDLE : ExpectationState.FINISHING;
72+
processLatch();
73+
return;
74+
case FINISHING:
75+
if (msg instanceof LastHttpContent) {
76+
currentState = ExpectationState.IDLE;
77+
if (propagateLastMessage) {
78+
propagateLastMessage = false;
79+
ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
80+
}
81+
}
82+
return;
83+
default:
5784
ctx.fireChannelRead(msg); //bypass the message to the next handler in line
58-
} else {
59-
ctx.pipeline().remove(JerseyExpectContinueHandler.class);
60-
}
61-
} else {
62-
if (!isExpected) {
63-
ctx.pipeline().remove(JerseyExpectContinueHandler.class);
64-
}
65-
ctx.fireChannelRead(msg); //bypass the message to the next handler in line
6685
}
6786
}
6887

69-
CompletableFuture<HttpResponseStatus> processExpect100ContinueRequest(HttpRequest nettyRequest,
70-
ClientRequest jerseyRequest,
71-
Channel ch,
72-
Integer timeout)
73-
throws InterruptedException, ExecutionException, TimeoutException {
74-
//check for 100-Continue presence/availability
75-
final Expect100ContinueConnectorExtension expect100ContinueExtension
76-
= new Expect100ContinueConnectorExtension();
77-
78-
final DefaultFullHttpRequest nettyRequestHeaders =
79-
new DefaultFullHttpRequest(nettyRequest.protocolVersion(), nettyRequest.method(), nettyRequest.uri());
80-
nettyRequestHeaders.headers().setAll(nettyRequest.headers());
81-
82-
if (!nettyRequestHeaders.headers().contains(HttpHeaderNames.HOST)) {
83-
nettyRequestHeaders.headers().add(HttpHeaderNames.HOST, jerseyRequest.getUri().getHost());
88+
private boolean checkExpectResponse(Object msg) {
89+
if (currentState == ExpectationState.IDLE && latch != null && msg instanceof HttpResponse) {
90+
return statusesToBeConsidered.contains(((HttpResponse) msg).status());
8491
}
92+
return false;
93+
}
8594

86-
//If Expect:100-continue feature is enabled and client supports it, the nettyRequestHeaders will be
87-
//enriched with the 'Expect:100-continue' header.
88-
expect100ContinueExtension.invoke(jerseyRequest, nettyRequestHeaders);
89-
90-
final ChannelFuture expect100ContinueFuture = (HttpUtil.is100ContinueExpected(nettyRequestHeaders))
91-
// Send only head of the HTTP request enriched with Expect:100-continue header.
92-
? ch.writeAndFlush(nettyRequestHeaders)
93-
// Expect:100-Continue either is not supported or is turned off
94-
: null;
95-
isExpected = expect100ContinueFuture != null;
96-
if (!isExpected) {
97-
ch.pipeline().remove(JerseyExpectContinueHandler.class);
98-
} else {
99-
final HttpResponseStatus status = expectedFuture
100-
.get(timeout, TimeUnit.MILLISECONDS);
101-
102-
processExpectationStatus(status);
95+
boolean processErrorStatuses(HttpResponseStatus status, ChannelHandlerContext ctx)
96+
throws InterruptedException {
97+
if (reSendErrorStatuses.contains(status)) {
98+
propagateLastMessage = true;
10399
}
104-
return expectedFuture;
100+
return (finalErrorStatuses.contains(status));
105101
}
106102

107-
private void processExpectationStatus(HttpResponseStatus status)
108-
throws TimeoutException {
103+
boolean processExpectationStatus()
104+
throws TimeoutException, IOException {
105+
if (status == null) {
106+
throw new TimeoutException(); // continue without expectations
107+
}
109108
if (!statusesToBeConsidered.contains(status)) {
110109
throw new ProcessingException(LocalizationMessages
111110
.UNEXPECTED_VALUE_FOR_EXPECT_100_CONTINUE_STATUSES(status.code()), null);
112111
}
113-
if (!expectedFuture.isDone() || HttpResponseStatus.EXPECTATION_FAILED.equals(status)) {
114-
isExpected = false;
115-
throw new TimeoutException(); // continue without expectations
112+
113+
if (finalErrorStatuses.contains(status)) {
114+
throw new IOException(LocalizationMessages
115+
.EXPECT_100_CONTINUE_FAILED_REQUEST_FAILED(), null);
116116
}
117-
if (!HttpResponseStatus.CONTINUE.equals(status)) {
118-
throw new ProcessingException(LocalizationMessages
119-
.UNEXPECTED_VALUE_FOR_EXPECT_100_CONTINUE_STATUSES(status.code()), null);
117+
118+
if (reSendErrorStatuses.contains(status)) {
119+
throw new TimeoutException(LocalizationMessages
120+
.EXPECT_100_CONTINUE_FAILED_REQUEST_SHOULD_BE_RESENT()); // Re-send request without expectations
121+
}
122+
123+
return true;
124+
}
125+
126+
void resetHandler() {
127+
latch = null;
128+
}
129+
130+
void attachCountDownLatch(CountDownLatch latch) {
131+
this.latch = latch;
132+
}
133+
134+
private void processLatch() {
135+
if (latch != null) {
136+
latch.countDown();
120137
}
121138
}
122139

123-
boolean isExpected() {
124-
return isExpected;
140+
private enum ExpectationState {
141+
AWAITING,
142+
FINISHING,
143+
IDLE
125144
}
126-
}
145+
}

connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/NettyConnector.java

Lines changed: 33 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@
3434
import java.util.concurrent.CompletableFuture;
3535
import java.util.concurrent.CompletionException;
3636
import java.util.concurrent.CountDownLatch;
37-
import java.util.concurrent.ExecutionException;
3837
import java.util.concurrent.ExecutorService;
3938
import java.util.concurrent.Future;
4039
import java.util.concurrent.TimeUnit;
@@ -69,6 +68,7 @@
6968
import io.netty.handler.codec.http.HttpRequest;
7069
import io.netty.handler.codec.http.HttpUtil;
7170
import io.netty.handler.codec.http.HttpVersion;
71+
import io.netty.handler.codec.http.LastHttpContent;
7272
import io.netty.handler.proxy.HttpProxyHandler;
7373
import io.netty.handler.proxy.ProxyHandler;
7474
import io.netty.handler.ssl.ApplicationProtocolConfig;
@@ -256,6 +256,8 @@ protected void execute(final ClientRequest jerseyRequest, final Set<URI> redirec
256256
}
257257
}
258258

259+
final JerseyExpectContinueHandler expect100ContinueHandler = new JerseyExpectContinueHandler();
260+
259261
if (chan == null) {
260262
Integer connectTimeout = jerseyRequest.resolveProperty(ClientProperties.CONNECT_TIMEOUT, 0);
261263
Bootstrap b = new Bootstrap();
@@ -328,8 +330,8 @@ protected void initChannel(SocketChannel ch) throws Exception {
328330
final Integer maxInitialLineLength = ClientProperties.getValue(config.getProperties(),
329331
NettyClientProperties.MAX_INITIAL_LINE_LENGTH,
330332
NettyClientProperties.DEFAULT_INITIAL_LINE_LENGTH);
331-
332333
p.addLast(new HttpClientCodec(maxInitialLineLength, maxHeaderSize, maxChunkSize));
334+
p.addLast(EXPECT_100_CONTINUE_HANDLER, expect100ContinueHandler);
333335
p.addLast(new ChunkedWriteHandler());
334336
p.addLast(new HttpContentDecompressor());
335337
}
@@ -358,11 +360,10 @@ protected void initChannel(SocketChannel ch) throws Exception {
358360
final Channel ch = chan;
359361
JerseyClientHandler clientHandler =
360362
new JerseyClientHandler(jerseyRequest, responseAvailable, responseDone, redirectUriHistory, this);
361-
final JerseyExpectContinueHandler expect100ContinueHandler = new JerseyExpectContinueHandler();
363+
362364
// read timeout makes sense really as an inactivity timeout
363365
ch.pipeline().addLast(READ_TIMEOUT_HANDLER,
364366
new IdleStateHandler(0, 0, timeout, TimeUnit.MILLISECONDS));
365-
ch.pipeline().addLast(EXPECT_100_CONTINUE_HANDLER, expect100ContinueHandler);
366367
ch.pipeline().addLast(REQUEST_HANDLER, clientHandler);
367368

368369
responseDone.whenComplete((_r, th) -> {
@@ -445,22 +446,11 @@ public void operationComplete(io.netty.util.concurrent.Future<? super Void> futu
445446
// // Set later after the entity is "written"
446447
// break;
447448
}
448-
try {
449-
expect100ContinueHandler.processExpect100ContinueRequest(nettyRequest, jerseyRequest,
450-
ch, expect100ContinueTimeout);
451-
} catch (ExecutionException e) {
452-
responseDone.completeExceptionally(e);
453-
} catch (TimeoutException e) {
454-
//Expect:100-continue allows timeouts by the spec
455-
//just removing the pipeline from processing
456-
if (ch.pipeline().context(JerseyExpectContinueHandler.class) != null) {
457-
ch.pipeline().remove(EXPECT_100_CONTINUE_HANDLER);
458-
}
459-
}
460449

461450
final CountDownLatch headersSet = new CountDownLatch(1);
462451
final CountDownLatch contentLengthSet = new CountDownLatch(1);
463452

453+
464454
jerseyRequest.setStreamProvider(new OutboundMessageContext.StreamProvider() {
465455
@Override
466456
public OutputStream getOutputStream(int contentLength) throws IOException {
@@ -485,7 +475,6 @@ public void run() {
485475

486476
try {
487477
jerseyRequest.writeEntity();
488-
489478
if (entityWriter.getType() == NettyEntityWriter.Type.DELAYED) {
490479
nettyRequest.headers().set(HttpHeaderNames.CONTENT_LENGTH, entityWriter.getLength());
491480
contentLengthSet.countDown();
@@ -505,12 +494,36 @@ public void run() {
505494
});
506495

507496
headersSet.await();
508-
if (!expect100ContinueHandler.isExpected()) {
509-
// Send the HTTP request. Expect:100-continue processing is not applicable
510-
// in this case.
497+
new Expect100ContinueConnectorExtension().invoke(jerseyRequest, nettyRequest);
498+
499+
boolean continueExpected = HttpUtil.is100ContinueExpected(nettyRequest);
500+
boolean expectationsFailed = false;
501+
502+
if (continueExpected) {
503+
final CountDownLatch expect100ContinueLatch = new CountDownLatch(1);
504+
expect100ContinueHandler.attachCountDownLatch(expect100ContinueLatch);
505+
//send expect request, sync and wait till either response or timeout received
511506
entityWriter.writeAndFlush(nettyRequest);
507+
expect100ContinueLatch.await(expect100ContinueTimeout, TimeUnit.MILLISECONDS);
508+
try {
509+
expect100ContinueHandler.processExpectationStatus();
510+
} catch (TimeoutException e) {
511+
//Expect:100-continue allows timeouts by the spec
512+
//so, send request directly without Expect header.
513+
expectationsFailed = true;
514+
} finally {
515+
//restore request and handler to the original state.
516+
HttpUtil.set100ContinueExpected(nettyRequest, false);
517+
expect100ContinueHandler.resetHandler();
518+
}
512519
}
513520

521+
if (!continueExpected || expectationsFailed) {
522+
if (expectationsFailed) {
523+
ch.pipeline().writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT).sync();
524+
}
525+
entityWriter.writeAndFlush(nettyRequest);
526+
}
514527
if (HttpUtil.isTransferEncodingChunked(nettyRequest)) {
515528
entityWriter.write(new HttpChunkedInput(entityWriter.getChunkedInput()));
516529
} else {

connectors/netty-connector/src/main/resources/org/glassfish/jersey/netty/connector/localization.properties

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
#
2-
# Copyright (c) 2016, 2023 Oracle and/or its affiliates. All rights reserved.
2+
# Copyright (c) 2016, 2025 Oracle and/or its affiliates. All rights reserved.
33
#
44
# This program and the accompanying materials are made available under the
55
# terms of the Eclipse Public License v. 2.0, which is available at
@@ -23,3 +23,5 @@ redirect.error.determining.location="Error determining redirect location: ({0}).
2323
redirect.infinite.loop="Infinite loop in chained redirects detected."
2424
redirect.limit.reached="Max chained redirect limit ({0}) exceeded."
2525
unexpected.value.for.expect.100.continue.statuses=Unexpected value: ("{0}").
26+
expect.100.continue.failed.request.should.be.resent=Expect 100-continue failed. Request should be resent.
27+
expect.100.continue.failed.request.failed=Expect 100-continue failed. Request failed.

0 commit comments

Comments
 (0)