Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

A draft for support h2 client ping #3301 #3528

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* Copyright (c) 2021-2024 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package reactor.netty.http.client;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http2.DefaultHttp2PingFrame;
import io.netty.handler.codec.http2.Http2PingFrame;
import io.netty.util.concurrent.ScheduledFuture;
import reactor.util.Logger;
import reactor.util.Loggers;

import java.time.Duration;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

/**
* This is handler periodically sending a PING frame to the HTTP2 server. When the PING frame not acknowledged after
* max times,this handler will close the connection.User can config the interval between PING frames,the timeout in for
* a PING frame to be acknowledged and the maximum PING frame not acknowledged before close connection.
* @author chentong
* @since 1.2.1
*/
public class H2ClientHeartbeatHandler extends SimpleChannelInboundHandler<Http2PingFrame> {
private static final Logger log = Loggers.getLogger(Http3Codec.class);

private final AtomicBoolean lastPingResult = new AtomicBoolean(true);
private final AtomicInteger pingFailedTimes = new AtomicInteger(0);
private final Map<Long, Boolean> pingResultMap = new ConcurrentHashMap<>();
private final AtomicLong counter = new AtomicLong(1);
private ScheduledFuture<?> pingTask;
private final Duration pingTime;
private final Duration pingTimeout;
private final int maxFailedTimes;

public H2ClientHeartbeatHandler(Duration heartbeatTime, Duration heartbeatTimeout, int maxFailedTimes) {
super();
this.pingTime = heartbeatTime;
this.pingTimeout = heartbeatTimeout;
this.maxFailedTimes = maxFailedTimes;
}

@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
if (log.isDebugEnabled()) {
log.debug("channel active begin to send PING frame");
}
this.pingTask = ctx.executor()
.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
long pintContent = counter.getAndIncrement();
DefaultHttp2PingFrame defaultHttp2PingFrame = new DefaultHttp2PingFrame(pintContent, false);
if (log.isDebugEnabled()) {
log.debug("begin send PINT frame to {} with content {}", ctx.channel().remoteAddress(), defaultHttp2PingFrame.content());
}
ctx.writeAndFlush(defaultHttp2PingFrame);
ctx.executor().schedule(new HeartbeatCheckTask(ctx, pintContent), pingTimeout.getSeconds(), TimeUnit.SECONDS);
}
}, 5, pingTime.getSeconds(), TimeUnit.SECONDS);

ctx.fireChannelActive();
}

@Override
protected void channelRead0(ChannelHandlerContext ctx, Http2PingFrame msg) throws Exception {
if (msg.ack()) {
if (log.isDebugEnabled()) {
log.debug("get pong frame with content {} from {} ", msg.content(), ctx.channel().remoteAddress());
pingResultMap.put(msg.content(), true);
}
} else {
if (log.isDebugEnabled()) {
log.debug("get ping frame with content {} from {} ", msg.content(), ctx.channel().remoteAddress());
}
}
}

private final class HeartbeatCheckTask implements Runnable {
private final ChannelHandlerContext ctx;
private final long id;

public HeartbeatCheckTask(ChannelHandlerContext ctx, long id) {
this.ctx = ctx;
this.id = id;
}

@Override
public void run() {
boolean pingResult = pingResultMap.getOrDefault(id, false);
if (pingResult) {
lastPingResult.compareAndSet(false, true);
pingFailedTimes.set(0);
} else {
lastPingResult.compareAndSet(true, false);
pingFailedTimes.getAndIncrement();
}

// heartbeat failed begin to cloud channel
if (!lastPingResult.get() && pingFailedTimes.getAndIncrement() >= maxFailedTimes) {
log.warn("begin to close channel {}", ctx.channel().id());
pingTask.cancel(true);

if (ctx.channel().isOpen()) {
ctx.channel().close();
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1315,6 +1315,92 @@ public final HttpClient noSSL() {
return this;
}

/**
* Enable or Disable heartbeat support for the client.
*
* @param enableH2Heartbeat enableH2Heartbeat true if heartbeat should be enabled (default: false)
* @return a new {@link HttpClient}
*/
public final HttpClient enableH2Heartbeat(boolean enableH2Heartbeat) {
HttpClient dup = duplicate();
dup.configuration().h2Heartbeat = enableH2Heartbeat;
return dup;
}

public final HttpClient enableH2Heartbeat(Supplier<Boolean> enableH2Heartbeat) {
HttpClient dup = duplicate();
dup.configuration().h2Heartbeat = enableH2Heartbeat.get();
return dup;
}

/**
* The interval between PING frames.
*
* @param heartbeatTime the interval between two PING frames
* @return a new {@link HttpClient}
*/
public final HttpClient h2HeartbeatTime(Duration heartbeatTime) {
Objects.requireNonNull(heartbeatTime, "heartbeatTime");
HttpClient dup = duplicate();
dup.configuration().h2HeartbeatTime = heartbeatTime;
return dup;
}

public final HttpClient h2HeartbeatTime(Supplier<Duration> heartbeatTime) {
Objects.requireNonNull(heartbeatTime, "heartbeatTime");
HttpClient dup = duplicate();
dup.configuration().h2HeartbeatTime = heartbeatTime.get();
return dup;
}

/**
* The timeout for a PING frame to be acknowledged. If client does not receive an acknowledgment within this time,
* it will considered PING request failed.
*
* @param heartbeatTimeout the timeout of a PING frame
* @return @return a new {@link HttpClient}
*/
public final HttpClient h2HeartbeatTimeout(Duration heartbeatTimeout) {
Objects.requireNonNull(heartbeatTimeout, "heartbeatTimeout");
HttpClient dup = duplicate();
dup.configuration().h2HeartbeatTimeout = heartbeatTimeout;
return dup;
}

public final HttpClient h2HeartbeatTimeout(Supplier<Duration> heartbeatTimeout) {
Objects.requireNonNull(heartbeatTimeout, "heartbeatTimeout");
HttpClient dup = duplicate();
dup.configuration().h2HeartbeatTimeout = heartbeatTimeout.get();
return dup;
}

/**
* The max times for PING frame not acknowledged,when more than this value clint will close the connection.
*
* @param maxFailedTimes the max time for PING failed before close connection
* @return @return a new {@link HttpClient}
*/
public final HttpClient maxH2HeartbeatFailedTimes(int maxFailedTimes) {
if (maxFailedTimes <= 0) {
throw new IllegalArgumentException("max failed times must be big than zero");
}

HttpClient dup = duplicate();
dup.configuration().maxH2HeartbeatFailedTimes = maxFailedTimes;
return dup;
}

public final HttpClient maxH2HeartbeatFailedTimes(Supplier<Integer> maxFailedTimes) {
Objects.requireNonNull(maxFailedTimes, "maxFailedTimes");
if (maxFailedTimes.get() <= 0) {
throw new IllegalArgumentException("max failed times must be big than zero");
}

HttpClient dup = duplicate();
dup.configuration().maxH2HeartbeatFailedTimes = maxFailedTimes.get();
return dup;
}

@Override
public final HttpClient observe(ConnectionObserver observer) {
return super.observe(observer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,10 @@ public WebsocketClientSpec websocketClientSpec() {
String uriStr;
Function<String, String> uriTagValue;
WebsocketClientSpec websocketClientSpec;
boolean h2Heartbeat;
Duration h2HeartbeatTime;
Duration h2HeartbeatTimeout;
int maxH2HeartbeatFailedTimes;

HttpClientConfig(HttpConnectionProvider connectionProvider, Map<ChannelOption<?>, ?> options,
Supplier<? extends SocketAddress> remoteAddress) {
Expand Down Expand Up @@ -426,6 +430,10 @@ public WebsocketClientSpec websocketClientSpec() {
this.uriStr = parent.uriStr;
this.uriTagValue = parent.uriTagValue;
this.websocketClientSpec = parent.websocketClientSpec;
this.h2Heartbeat = parent.h2Heartbeat;
this.h2HeartbeatTime = parent.h2HeartbeatTime;
this.h2HeartbeatTimeout = parent.h2HeartbeatTimeout;
this.maxH2HeartbeatFailedTimes = parent.maxH2HeartbeatFailedTimes;
}

@Override
Expand Down Expand Up @@ -1053,6 +1061,10 @@ static final class HttpClientChannelInitializer implements ChannelPipelineConfig
final SocketAddress proxyAddress;
final SslProvider sslProvider;
final Function<String, String> uriTagValue;
final boolean heartbeat;
final Duration heartbeatTime;
final Duration heartbeatTimeout;
final int maxH2HeartbeatFailedTimes;

HttpClientChannelInitializer(HttpClientConfig config) {
this.acceptGzip = config.acceptGzip;
Expand All @@ -1064,6 +1076,10 @@ static final class HttpClientChannelInitializer implements ChannelPipelineConfig
this.proxyAddress = config.proxyProvider() != null ? config.proxyProvider().getProxyAddress() : null;
this.sslProvider = config.sslProvider;
this.uriTagValue = config.uriTagValue;
this.heartbeat = config.h2Heartbeat;
this.heartbeatTime = config.h2HeartbeatTime;
this.heartbeatTimeout = config.h2HeartbeatTimeout;
this.maxH2HeartbeatFailedTimes = config.maxH2HeartbeatFailedTimes;
}

@Override
Expand Down Expand Up @@ -1099,6 +1115,9 @@ else if ((protocols & h2c) == h2c) {
configureHttp2Pipeline(channel.pipeline(), decoder, http2Settings, observer);
}
}
if (heartbeat && (protocols & h2) == h2) {
channel.pipeline().addLast(new H2ClientHeartbeatHandler(heartbeatTime, heartbeatTimeout, maxH2HeartbeatFailedTimes));
}
}
}

Expand Down
Loading