Skip to content

Commit dd05408

Browse files
authoredApr 6, 2023
[fix][proxy] Fix connection read timeout handling in Pulsar Proxy (#20014)
1 parent f76beda commit dd05408

File tree

4 files changed

+10
-48
lines changed

4 files changed

+10
-48
lines changed
 

‎pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import io.netty.handler.ssl.SslContext;
4444
import io.netty.handler.ssl.SslHandler;
4545
import io.netty.handler.ssl.SslProvider;
46+
import io.netty.handler.timeout.ReadTimeoutHandler;
4647
import io.netty.util.CharsetUtil;
4748
import java.net.InetSocketAddress;
4849
import java.util.Arrays;
@@ -205,7 +206,7 @@ protected void initChannel(SocketChannel ch) {
205206
int brokerProxyReadTimeoutMs = service.getConfiguration().getBrokerProxyReadTimeoutMs();
206207
if (brokerProxyReadTimeoutMs > 0) {
207208
ch.pipeline().addLast("readTimeoutHandler",
208-
new ProxyReadTimeoutHandler(brokerProxyReadTimeoutMs, TimeUnit.MILLISECONDS));
209+
new ReadTimeoutHandler(brokerProxyReadTimeoutMs, TimeUnit.MILLISECONDS));
209210
}
210211
ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(
211212
service.getConfiguration().getMaxMessageSize() + Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0,
@@ -362,6 +363,9 @@ public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exce
362363

363364
if (service.proxyZeroCopyModeEnabled && service.proxyLogLevel == 0) {
364365
if (!isTlsOutboundChannel && !DirectProxyHandler.this.proxyConnection.isTlsInboundChannel) {
366+
if (ctx.pipeline().get("readTimeoutHandler") != null) {
367+
ctx.pipeline().remove("readTimeoutHandler");
368+
}
365369
ProxyConnection.spliceNIC2NIC((EpollSocketChannel) ctx.channel(),
366370
(EpollSocketChannel) inboundChannel, ProxyConnection.SPLICE_BYTES)
367371
.addListener(future -> {

‎pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java

+3
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,9 @@ public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exce
263263

264264
if (service.proxyZeroCopyModeEnabled && service.proxyLogLevel == 0) {
265265
if (!directProxyHandler.isTlsOutboundChannel && !isTlsInboundChannel) {
266+
if (ctx.pipeline().get("readTimeoutHandler") != null) {
267+
ctx.pipeline().remove("readTimeoutHandler");
268+
}
266269
spliceNIC2NIC((EpollSocketChannel) ctx.channel(),
267270
(EpollSocketChannel) directProxyHandler.outboundChannel, SPLICE_BYTES)
268271
.addListener(future -> {

‎pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyReadTimeoutHandler.java

-46
This file was deleted.

‎pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import io.netty.handler.ssl.SslContext;
2626
import io.netty.handler.ssl.SslHandler;
2727
import io.netty.handler.ssl.SslProvider;
28+
import io.netty.handler.timeout.ReadTimeoutHandler;
2829
import java.util.concurrent.TimeUnit;
2930
import org.apache.pulsar.common.protocol.Commands;
3031
import org.apache.pulsar.common.protocol.OptionalProxyProtocolDecoder;
@@ -106,7 +107,7 @@ protected void initChannel(SocketChannel ch) throws Exception {
106107
}
107108
if (brokerProxyReadTimeoutMs > 0) {
108109
ch.pipeline().addLast("readTimeoutHandler",
109-
new ProxyReadTimeoutHandler(brokerProxyReadTimeoutMs, TimeUnit.MILLISECONDS));
110+
new ReadTimeoutHandler(brokerProxyReadTimeoutMs, TimeUnit.MILLISECONDS));
110111
}
111112
if (proxyService.getConfiguration().isHaProxyProtocolEnabled()) {
112113
ch.pipeline().addLast(OptionalProxyProtocolDecoder.NAME, new OptionalProxyProtocolDecoder());

0 commit comments

Comments
 (0)
Please sign in to comment.