-
Notifications
You must be signed in to change notification settings - Fork 673
Support a handler for checking connection status using Ping frame in HTTP/2 #3612
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: 1.2.x
Are you sure you want to change the base?
Changes from all commits
168b146
7680dcf
f5aae94
9cd9f61
f245d91
d7c0342
ba1025b
e16cb7d
8987335
f9807c3
b9dd4bf
db7586c
d6c6a97
84bb3fc
36c039c
76f9828
8e7e733
8664765
0d5b0ef
33f15aa
c23df4b
1642979
e9ed832
90124bc
fa6b4c3
2aea90f
d446748
ea8a7df
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
/* | ||
* Copyright (c) 2025 VMware, Inc. or its affiliates, All Rights Reserved. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* https://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package reactor.netty.examples.documentation.http.server.liveness; | ||
|
||
import reactor.netty.DisposableServer; | ||
import reactor.netty.http.Http2SslContextSpec; | ||
import reactor.netty.http.HttpProtocol; | ||
import reactor.netty.http.server.HttpServer; | ||
|
||
import java.io.File; | ||
import java.time.Duration; | ||
|
||
public class Application { | ||
|
||
public static void main(String[] args) { | ||
File cert = new File("certificate.crt"); | ||
File key = new File("private.key"); | ||
|
||
Http2SslContextSpec http2SslContextSpec = Http2SslContextSpec.forServer(cert, key); | ||
|
||
DisposableServer server = | ||
HttpServer.create() | ||
.port(8080) | ||
.protocol(HttpProtocol.H2) | ||
.secure(spec -> spec.sslContext(http2SslContextSpec)) | ||
.idleTimeout(Duration.ofSeconds(1)) //<1> | ||
.http2Settings( | ||
builder -> builder.pingAckTimeout(Duration.ofMillis(600)) // <2> | ||
.pingScheduleInterval(Duration.ofMillis(300)) // <3> | ||
.pingAckDropThreshold(2) // <4> | ||
) | ||
.bindNow(); | ||
|
||
server.onDispose() | ||
.block(); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,292 @@ | ||
/* | ||
* Copyright (c) 2025 VMware, Inc. or its affiliates, All Rights Reserved. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* https://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package reactor.netty.http; | ||
|
||
import io.netty.channel.Channel; | ||
import io.netty.channel.ChannelFuture; | ||
import io.netty.channel.ChannelFutureListener; | ||
import io.netty.channel.ChannelHandlerContext; | ||
import io.netty.handler.codec.http2.Http2DataFrame; | ||
import io.netty.handler.codec.http2.Http2FrameCodec; | ||
import io.netty.handler.codec.http2.Http2FrameWriter; | ||
import io.netty.handler.codec.http2.Http2PingFrame; | ||
import reactor.util.Logger; | ||
import reactor.util.Loggers; | ||
import reactor.util.annotation.Nullable; | ||
|
||
import java.time.Duration; | ||
import java.util.concurrent.ScheduledFuture; | ||
import java.util.concurrent.ThreadLocalRandom; | ||
|
||
import static java.util.concurrent.TimeUnit.NANOSECONDS; | ||
|
||
/** | ||
* Supports connection health checks using HTTP/2 Ping Frames. | ||
* | ||
* <p> Http2ConnectionLiveness sends a ping frame at the specified interval when no frame is being read or written, | ||
* ensuring the connection health is monitored. If a ping ACK frame is not received within the configured interval, | ||
* the connection will be closed.</p> | ||
* | ||
* <p>Ping frame checking will not be performed while a read or write operation is in progress.</p> | ||
* | ||
* <p>Be cautious when setting a very short interval, as it may cause the connection to be closed, | ||
* even if the keep-alive setting is enabled.</p> | ||
* | ||
* <p>If no interval is specified, no ping frame checking will be performed.</p> | ||
* | ||
* @author raccoonback | ||
* @since 1.2.5 | ||
*/ | ||
public final class Http2ConnectionLiveness implements HttpConnectionLiveness { | ||
|
||
static final Logger log = Loggers.getLogger(Http2ConnectionLiveness.class); | ||
|
||
private ScheduledFuture<?> pingScheduler; | ||
|
||
private final ChannelFutureListener pingWriteListener = new PingWriteListener(); | ||
private final Http2FrameWriter http2FrameWriter; | ||
private final long pingAckTimeoutNanos; | ||
private final long pingScheduleIntervalNanos; | ||
private final int pingAckDropThreshold; | ||
|
||
private int pingAckDropCount; | ||
private long lastSentPingData; | ||
private long lastReceivedPingTime; | ||
private long lastSendingPingTime; | ||
private boolean isPingAckPending; | ||
|
||
/** | ||
* Constructs a new {@code Http2ConnectionLiveness} instance. | ||
* | ||
* @param http2FrameCodec the HTTP/2 frame codec | ||
* @param pingAckTimeout the ping ACK timeout duration | ||
* @param pingScheduleInterval the ping schedule interval duration | ||
* @param pingAckDropThreshold the ping ACK drop threshold | ||
*/ | ||
public Http2ConnectionLiveness( | ||
Http2FrameCodec http2FrameCodec, | ||
@Nullable Duration pingAckTimeout, | ||
@Nullable Duration pingScheduleInterval, | ||
@Nullable Integer pingAckDropThreshold | ||
) { | ||
this.http2FrameWriter = http2FrameCodec.encoder() | ||
.frameWriter(); | ||
|
||
if (pingAckTimeout != null) { | ||
this.pingAckTimeoutNanos = pingAckTimeout.toNanos(); | ||
} | ||
else { | ||
this.pingAckTimeoutNanos = 0L; | ||
} | ||
|
||
if (pingScheduleInterval != null) { | ||
this.pingScheduleIntervalNanos = pingScheduleInterval.toNanos(); | ||
} | ||
else { | ||
this.pingScheduleIntervalNanos = 0L; | ||
} | ||
|
||
if (pingAckDropThreshold != null) { | ||
this.pingAckDropThreshold = pingAckDropThreshold; | ||
} | ||
else { | ||
this.pingAckDropThreshold = 0; | ||
} | ||
} | ||
|
||
/** | ||
* Checks the liveness of the connection and schedules a ping if necessary. | ||
* | ||
* @param ctx the {@link ChannelHandlerContext} of the connection | ||
*/ | ||
@Override | ||
@SuppressWarnings("FutureReturnValueIgnored") | ||
public void check(ChannelHandlerContext ctx) { | ||
if (isPingIntervalConfigured()) { | ||
if (pingScheduler == null) { | ||
isPingAckPending = false; | ||
pingAckDropCount = 0; | ||
pingScheduler = ctx.executor() | ||
.schedule( | ||
new PingTimeoutTask(ctx), | ||
pingAckTimeoutNanos, | ||
NANOSECONDS | ||
); | ||
} | ||
|
||
return; | ||
} | ||
|
||
ctx.close(); | ||
} | ||
|
||
/** | ||
* Receives a message from the peer and processes it if it is a ping frame. | ||
* | ||
* @param msg the message received from the peer | ||
*/ | ||
@Override | ||
public void receive(Object msg) { | ||
if (msg instanceof Http2PingFrame) { | ||
Http2PingFrame frame = (Http2PingFrame) msg; | ||
if (frame.ack() && frame.content() == lastSentPingData) { | ||
lastReceivedPingTime = System.nanoTime(); | ||
} | ||
} | ||
|
||
if (msg instanceof Http2DataFrame) { | ||
cancel(); | ||
} | ||
} | ||
|
||
/** | ||
* Cancels the scheduled ping task. | ||
*/ | ||
@Override | ||
public void cancel() { | ||
if (pingScheduler != null) { | ||
pingScheduler.cancel(false); | ||
pingScheduler = null; | ||
} | ||
} | ||
Comment on lines
+158
to
+164
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The scheduler is canceled when the channel becomes inactive or an exception occurs. |
||
|
||
private boolean isPingIntervalConfigured() { | ||
return pingAckTimeoutNanos > 0 | ||
&& pingScheduleIntervalNanos > 0; | ||
} | ||
|
||
/** | ||
* A task that handles ping timeouts. | ||
*/ | ||
class PingTimeoutTask implements Runnable { | ||
|
||
private final ChannelHandlerContext ctx; | ||
|
||
PingTimeoutTask(ChannelHandlerContext ctx) { | ||
this.ctx = ctx; | ||
} | ||
|
||
@Override | ||
public void run() { | ||
Channel channel = ctx.channel(); | ||
if (channel == null || !channel.isOpen()) { | ||
return; | ||
} | ||
|
||
if (!isPingAckPending) { | ||
if (log.isDebugEnabled()) { | ||
log.debug("Attempting to send a ping frame to the channel: {}", channel); | ||
} | ||
|
||
writePing(ctx); | ||
pingScheduler = invokeNextSchedule(); | ||
return; | ||
} | ||
Comment on lines
+189
to
+197
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If not waiting for a Ping ACK, a Ping frame is sent. |
||
|
||
if (isOutOfTimeRange()) { | ||
countPingDrop(); | ||
|
||
if (isExceedAckDropThreshold()) { | ||
if (log.isInfoEnabled()) { | ||
log.info("Closing the channel due to delayed ping frame response (timeout: {} ns). {}", pingAckTimeoutNanos, channel); | ||
} | ||
|
||
close(); | ||
return; | ||
} | ||
|
||
if (log.isInfoEnabled()) { | ||
log.info("Dropping ping ACK frame in channel (ping data: {}). channel: {}", lastSentPingData, channel); | ||
} | ||
|
||
writePing(ctx); | ||
pingScheduler = invokeNextSchedule(); | ||
return; | ||
} | ||
Comment on lines
+199
to
+218
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If the Ping ACK is not received within pingAckTimeoutNanos, but the retry count has not yet reached the pingAckDropThreshold, a retry is attempted. |
||
|
||
isPingAckPending = false; | ||
pingAckDropCount = 0; | ||
pingScheduler = invokeNextSchedule(); | ||
Comment on lines
+220
to
+222
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If the Ping ACK is not received within pingAckTimeoutNanos and retries have reached the pingAckDropThreshold, |
||
} | ||
|
||
private void writePing(ChannelHandlerContext ctx) { | ||
lastSentPingData = ThreadLocalRandom.current().nextLong(); | ||
|
||
http2FrameWriter | ||
.writePing(ctx, false, lastSentPingData, ctx.newPromise()) | ||
.addListener(pingWriteListener); | ||
ctx.flush(); | ||
} | ||
Comment on lines
+225
to
+232
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. New data is generated, and a Ping frame is sent. |
||
|
||
private boolean isOutOfTimeRange() { | ||
return pingAckTimeoutNanos < Math.abs(lastReceivedPingTime - lastSendingPingTime); | ||
} | ||
Comment on lines
+234
to
+236
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It determines whether the Ping ACK was received within |
||
|
||
private void countPingDrop() { | ||
pingAckDropCount++; | ||
} | ||
|
||
private boolean isExceedAckDropThreshold() { | ||
return pingAckDropCount > pingAckDropThreshold; | ||
} | ||
|
||
private ScheduledFuture<?> invokeNextSchedule() { | ||
return ctx.executor() | ||
.schedule( | ||
this, | ||
pingScheduleIntervalNanos, | ||
NANOSECONDS | ||
); | ||
} | ||
|
||
private void close() { | ||
ctx.close() | ||
.addListener(future -> { | ||
if (future.isSuccess()) { | ||
if (log.isDebugEnabled()) { | ||
log.debug("Channel closed after liveness check: {}", ctx.channel()); | ||
} | ||
} | ||
else if (log.isDebugEnabled()) { | ||
log.debug("Failed to close the channel: {}. Cause: {}", ctx.channel(), future.cause()); | ||
} | ||
}); | ||
} | ||
} | ||
|
||
/** | ||
* A listener that handles the completion of ping frame writes. | ||
*/ | ||
private class PingWriteListener implements ChannelFutureListener { | ||
|
||
@Override | ||
public void operationComplete(ChannelFuture future) { | ||
if (future.isSuccess()) { | ||
if (log.isDebugEnabled()) { | ||
log.debug("Successfully wrote PING frame to the channel: {}", future.channel()); | ||
} | ||
|
||
isPingAckPending = true; | ||
lastSendingPingTime = System.nanoTime(); | ||
} | ||
else { | ||
if (log.isDebugEnabled()) { | ||
log.debug("Failed to write PING frame to the channel: {}", future.channel()); | ||
} | ||
} | ||
} | ||
} | ||
Comment on lines
+273
to
+291
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If the new Ping frame is successfully sent, the status is changed to 'ping awaiting', and |
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The scheduler for checking the connection status based on HTTP/2 Ping frames operates only when
pingAckTimeout
,pingScheduleInterval
, andpingAckDropThreshold
are all configured.If they are not configured, the channel is closed immediately.