Skip to content

Commit 516437e

Browse files
authored
[fix][websocket] Fix webSocketPingDurationSeconds config (#19256)
Signed-off-by: Zixuan Liu <nodeces@gmail.com>
1 parent e3b76d4 commit 516437e

File tree

4 files changed

+21
-17
lines changed

4 files changed

+21
-17
lines changed

pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java

+6
Original file line numberDiff line numberDiff line change
@@ -2613,6 +2613,12 @@ The delayed message index bucket time step(in seconds) in per bucket snapshot se
26132613
)
26142614
private int webSocketSessionIdleTimeoutMillis = 300000;
26152615

2616+
@FieldContext(
2617+
category = CATEGORY_WEBSOCKET,
2618+
doc = "Interval of time to sending the ping to keep alive in WebSocket proxy. "
2619+
+ "This value greater than 0 means enabled")
2620+
private int webSocketPingDurationSeconds = -1;
2621+
26162622
@FieldContext(
26172623
category = CATEGORY_WEBSOCKET,
26182624
doc = "The maximum size of a text message during parsing in WebSocket proxy."

pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java

+6
Original file line numberDiff line numberDiff line change
@@ -811,6 +811,12 @@ public class ProxyConfiguration implements PulsarConfiguration {
811811
)
812812
private boolean webSocketServiceEnabled = false;
813813

814+
@FieldContext(
815+
category = CATEGORY_WEBSOCKET,
816+
doc = "Interval of time to sending the ping to keep alive in WebSocket proxy. "
817+
+ "This value greater than 0 means enabled")
818+
private int webSocketPingDurationSeconds = -1;
819+
814820
@FieldContext(
815821
category = CATEGORY_WEBSOCKET,
816822
doc = "Name of the cluster to which this broker belongs to"

pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java

+9-13
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,6 @@
5656
import org.apache.pulsar.common.util.Codec;
5757
import org.apache.pulsar.common.util.ObjectMapperFactory;
5858
import org.apache.pulsar.websocket.data.ConsumerCommand;
59-
import org.apache.pulsar.websocket.service.WebSocketProxyConfiguration;
6059
import org.eclipse.jetty.websocket.api.Session;
6160
import org.eclipse.jetty.websocket.api.WebSocketAdapter;
6261
import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
@@ -192,18 +191,15 @@ private void closePingFuture() {
192191
@Override
193192
public void onWebSocketConnect(Session session) {
194193
super.onWebSocketConnect(session);
195-
WebSocketProxyConfiguration webSocketProxyConfig = service.getWebSocketProxyConfig();
196-
if (webSocketProxyConfig != null) {
197-
int webSocketPingDurationSeconds = webSocketProxyConfig.getWebSocketPingDurationSeconds();
198-
if (webSocketPingDurationSeconds > 0) {
199-
pingFuture = service.getExecutor().scheduleAtFixedRate(() -> {
200-
try {
201-
session.getRemote().sendPing(ByteBuffer.wrap("PING".getBytes(StandardCharsets.UTF_8)));
202-
} catch (IOException e) {
203-
log.warn("[{}] WebSocket send ping", getSession().getRemoteAddress(), e);
204-
}
205-
}, webSocketPingDurationSeconds, webSocketPingDurationSeconds, TimeUnit.SECONDS);
206-
}
194+
int webSocketPingDurationSeconds = service.getConfig().getWebSocketPingDurationSeconds();
195+
if (webSocketPingDurationSeconds > 0) {
196+
pingFuture = service.getExecutor().scheduleAtFixedRate(() -> {
197+
try {
198+
session.getRemote().sendPing(ByteBuffer.wrap("PING".getBytes(StandardCharsets.UTF_8)));
199+
} catch (IOException e) {
200+
log.warn("[{}] WebSocket send ping", getSession().getRemoteAddress(), e);
201+
}
202+
}, webSocketPingDurationSeconds, webSocketPingDurationSeconds, TimeUnit.SECONDS);
207203
}
208204
log.info("[{}] New WebSocket session on topic {}", session.getRemoteAddress(), topic);
209205
}

pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java

-4
Original file line numberDiff line numberDiff line change
@@ -69,9 +69,6 @@ public class WebSocketService implements Closeable {
6969
private MetadataStoreExtended configMetadataStore;
7070
private ServiceConfiguration config;
7171

72-
@Getter
73-
private WebSocketProxyConfiguration webSocketProxyConfig;
74-
7572
@Getter
7673
private Optional<CryptoKeyReader> cryptoKeyReader = Optional.empty();
7774

@@ -83,7 +80,6 @@ public class WebSocketService implements Closeable {
8380

8481
public WebSocketService(WebSocketProxyConfiguration config) {
8582
this(createClusterData(config), PulsarConfigurationLoader.convertFrom(config));
86-
this.webSocketProxyConfig = config;
8783
}
8884

8985
public WebSocketService(ClusterData localCluster, ServiceConfiguration config) {

0 commit comments

Comments
 (0)