Skip to content

Commit d710cda

Browse files
authored
[grid] Add event bus heartbeat to prevent steal connection (#16444)
Signed-off-by: Viet Nguyen Duc <nguyenducviet4496@gmail.com>
1 parent 627a283 commit d710cda

File tree

6 files changed

+172
-7
lines changed

6 files changed

+172
-7
lines changed

java/src/org/openqa/selenium/events/zeromq/BoundZmqEventBus.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.net.Inet6Address;
2121
import java.net.InetAddress;
2222
import java.net.UnknownHostException;
23+
import java.time.Duration;
2324
import java.util.concurrent.ExecutorService;
2425
import java.util.concurrent.Executors;
2526
import java.util.logging.Level;
@@ -43,7 +44,11 @@ class BoundZmqEventBus implements EventBus {
4344
private final ExecutorService executor;
4445

4546
BoundZmqEventBus(
46-
ZContext context, String publishConnection, String subscribeConnection, Secret secret) {
47+
ZContext context,
48+
String publishConnection,
49+
String subscribeConnection,
50+
Secret secret,
51+
Duration heartbeatPeriod) {
4752
String address = new NetworkUtils().getHostAddress();
4853
Addresses xpubAddr = deriveAddresses(address, publishConnection);
4954
Addresses xsubAddr = deriveAddresses(address, subscribeConnection);
@@ -53,11 +58,13 @@ class BoundZmqEventBus implements EventBus {
5358
xpub = context.createSocket(SocketType.XPUB);
5459
xpub.setIPv6(xpubAddr.isIPv6);
5560
xpub.setImmediate(true);
61+
ZmqUtils.configureHeartbeat(xpub, heartbeatPeriod, "XPUB");
5662
xpub.bind(xpubAddr.bindTo);
5763

5864
xsub = context.createSocket(SocketType.XSUB);
5965
xsub.setIPv6(xsubAddr.isIPv6);
6066
xsub.setImmediate(true);
67+
ZmqUtils.configureHeartbeat(xsub, heartbeatPeriod, "XSUB");
6168
xsub.bind(xsubAddr.bindTo);
6269

6370
executor =
@@ -68,8 +75,9 @@ class BoundZmqEventBus implements EventBus {
6875
return thread;
6976
});
7077
executor.submit(() -> ZMQ.proxy(xsub, xpub, null));
71-
72-
delegate = new UnboundZmqEventBus(context, xpubAddr.advertise, xsubAddr.advertise, secret);
78+
delegate =
79+
new UnboundZmqEventBus(
80+
context, xpubAddr.advertise, xsubAddr.advertise, secret, heartbeatPeriod);
7381
}
7482

7583
@Override

java/src/org/openqa/selenium/events/zeromq/UnboundZmqEventBus.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.net.URI;
2828
import java.net.URISyntaxException;
2929
import java.net.UnknownHostException;
30+
import java.time.Duration;
3031
import java.time.temporal.ChronoUnit;
3132
import java.util.ArrayList;
3233
import java.util.LinkedList;
@@ -76,7 +77,11 @@ class UnboundZmqEventBus implements EventBus {
7677
private ZMQ.Socket sub;
7778

7879
UnboundZmqEventBus(
79-
ZContext context, String publishConnection, String subscribeConnection, Secret secret) {
80+
ZContext context,
81+
String publishConnection,
82+
String subscribeConnection,
83+
Secret secret,
84+
Duration heartbeatPeriod) {
8085
Require.nonNull("Secret", secret);
8186
StringBuilder builder = new StringBuilder();
8287
try (JsonOutput out = JSON.newOutput(builder)) {
@@ -136,11 +141,13 @@ class UnboundZmqEventBus implements EventBus {
136141
() -> {
137142
sub = context.createSocket(SocketType.SUB);
138143
sub.setIPv6(isSubAddressIPv6(publishConnection));
144+
ZmqUtils.configureHeartbeat(sub, heartbeatPeriod, "SUB");
139145
sub.connect(publishConnection);
140146
sub.subscribe(new byte[0]);
141147

142148
pub = context.createSocket(SocketType.PUB);
143149
pub.setIPv6(isSubAddressIPv6(subscribeConnection));
150+
ZmqUtils.configureHeartbeat(pub, heartbeatPeriod, "PUB");
144151
pub.connect(subscribeConnection);
145152
});
146153
// Connections are already established

java/src/org/openqa/selenium/events/zeromq/ZeroMqEventBus.java

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import java.net.URI;
2323
import java.net.URISyntaxException;
24+
import java.time.Duration;
2425
import java.util.function.Consumer;
2526
import org.openqa.selenium.events.EventBus;
2627
import org.openqa.selenium.events.EventListener;
@@ -38,17 +39,34 @@
3839
public class ZeroMqEventBus {
3940

4041
private static final String EVENTS_SECTION = "events";
42+
private static final int DEFAULT_HEARTBEAT_PERIOD_SECONDS = 60;
4143

4244
private ZeroMqEventBus() {
4345
// Use the create method.
4446
}
4547

4648
public static EventBus create(
4749
ZContext context, String publish, String subscribe, boolean bind, Secret secret) {
50+
return create(
51+
context,
52+
publish,
53+
subscribe,
54+
bind,
55+
secret,
56+
Duration.ofSeconds(DEFAULT_HEARTBEAT_PERIOD_SECONDS));
57+
}
58+
59+
public static EventBus create(
60+
ZContext context,
61+
String publish,
62+
String subscribe,
63+
boolean bind,
64+
Secret secret,
65+
Duration heartbeatPeriod) {
4866
if (bind) {
49-
return new BoundZmqEventBus(context, publish, subscribe, secret);
67+
return new BoundZmqEventBus(context, publish, subscribe, secret, heartbeatPeriod);
5068
}
51-
return new UnboundZmqEventBus(context, publish, subscribe, secret);
69+
return new UnboundZmqEventBus(context, publish, subscribe, secret, heartbeatPeriod);
5270
}
5371

5472
public static EventBus create(Config config) {
@@ -85,10 +103,25 @@ public static EventBus create(Config config) {
85103
});
86104

87105
boolean bind = config.getBool(EVENTS_SECTION, "bind").orElse(false);
106+
Duration heartbeatPeriod = getHeartbeatPeriod(config);
88107

89108
SecretOptions secretOptions = new SecretOptions(config);
90109

91-
return create(new ZContext(), publish, subscribe, bind, secretOptions.getRegistrationSecret());
110+
return create(
111+
new ZContext(),
112+
publish,
113+
subscribe,
114+
bind,
115+
secretOptions.getRegistrationSecret(),
116+
heartbeatPeriod);
117+
}
118+
119+
private static Duration getHeartbeatPeriod(Config config) {
120+
int periodSeconds =
121+
config
122+
.getInt(EVENTS_SECTION, "eventbus-heartbeat-period")
123+
.orElse(DEFAULT_HEARTBEAT_PERIOD_SECONDS);
124+
return Duration.ofSeconds(periodSeconds);
92125
}
93126

94127
private static String mungeUri(URI base, String scheme, int port) {
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
// Licensed to the Software Freedom Conservancy (SFC) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The SFC licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package org.openqa.selenium.events.zeromq;
19+
20+
import java.time.Duration;
21+
import java.util.logging.Level;
22+
import java.util.logging.Logger;
23+
import org.zeromq.ZMQ;
24+
25+
class ZmqUtils {
26+
27+
private static final Logger LOG = Logger.getLogger(ZmqUtils.class.getName());
28+
29+
// Minimum heartbeat interval: 1 second
30+
private static final long MIN_HEARTBEAT_MS = 1_000L;
31+
// Maximum heartbeat interval: ~24 days (to prevent overflow when multiplied by 6)
32+
private static final long MAX_HEARTBEAT_MS = Integer.MAX_VALUE / 6;
33+
34+
private ZmqUtils() {}
35+
36+
/**
37+
* Configures ZeroMQ heartbeat settings on a socket to prevent stale connections.
38+
*
39+
* <p>The heartbeat interval is clamped between 1 second and ~24 days to prevent integer overflow
40+
* and ensure reasonable values. If the provided duration is outside this range, it will be
41+
* adjusted and a warning will be logged.
42+
*
43+
* @param socket The ZMQ socket to configure
44+
* @param heartbeatPeriod The heartbeat interval duration
45+
* @param socketType The socket type name for logging (e.g., "SUB", "PUB", "XPUB", "XSUB")
46+
*/
47+
static void configureHeartbeat(ZMQ.Socket socket, Duration heartbeatPeriod, String socketType) {
48+
if (heartbeatPeriod != null && !heartbeatPeriod.isZero() && !heartbeatPeriod.isNegative()) {
49+
long heartbeatMs = heartbeatPeriod.toMillis();
50+
long clampedHeartbeatMs = clampHeartbeatInterval(heartbeatMs, socketType);
51+
52+
// Safe to cast to int now
53+
int heartbeatIvl = (int) clampedHeartbeatMs;
54+
int heartbeatTimeout = heartbeatIvl * 3;
55+
int heartbeatTtl = heartbeatIvl * 6;
56+
57+
socket.setHeartbeatIvl(heartbeatIvl);
58+
socket.setHeartbeatTimeout(heartbeatTimeout);
59+
socket.setHeartbeatTtl(heartbeatTtl);
60+
61+
LOG.info(
62+
String.format(
63+
"ZMQ %s socket heartbeat configured: interval=%ds, timeout=%ds, ttl=%ds",
64+
socketType, heartbeatIvl / 1000, heartbeatTimeout / 1000, heartbeatTtl / 1000));
65+
}
66+
}
67+
68+
/**
69+
* Clamps the heartbeat interval to safe bounds and logs warnings if adjustments are made.
70+
*
71+
* @param heartbeatMs The heartbeat interval in milliseconds
72+
* @param socketType The socket type for logging
73+
* @return The clamped heartbeat interval
74+
*/
75+
private static long clampHeartbeatInterval(long heartbeatMs, String socketType) {
76+
if (heartbeatMs < MIN_HEARTBEAT_MS) {
77+
logHeartbeatClampWarning(socketType, heartbeatMs, MIN_HEARTBEAT_MS, "below minimum");
78+
return MIN_HEARTBEAT_MS;
79+
}
80+
if (heartbeatMs > MAX_HEARTBEAT_MS) {
81+
logHeartbeatClampWarning(socketType, heartbeatMs, MAX_HEARTBEAT_MS, "exceeds maximum");
82+
return MAX_HEARTBEAT_MS;
83+
}
84+
return heartbeatMs;
85+
}
86+
87+
/**
88+
* Logs a warning when the heartbeat interval is clamped.
89+
*
90+
* @param socketType The socket type
91+
* @param originalMs The original interval value in milliseconds
92+
* @param clampedMs The clamped interval value in milliseconds
93+
* @param reason The reason for clamping
94+
*/
95+
private static void logHeartbeatClampWarning(
96+
String socketType, long originalMs, long clampedMs, String reason) {
97+
LOG.log(
98+
Level.WARNING,
99+
String.format(
100+
"ZMQ %s socket heartbeat interval %ds is %s, clamping to %ds",
101+
socketType, originalMs / 1000, reason, clampedMs / 1000));
102+
}
103+
}

java/src/org/openqa/selenium/grid/server/EventBusFlags.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,12 @@ public class EventBusFlags implements HasRoles {
6565
example = "\"org.openqa.selenium.events.zeromq.ZeroMqEventBus\"")
6666
private String implementation;
6767

68+
@Parameter(
69+
names = {"--eventbus-heartbeat-period"},
70+
description = "How often, in seconds, will the EventBus socket send heartbeats")
71+
@ConfigValue(section = EVENTS_SECTION, name = "eventbus-heartbeat-period", example = "30")
72+
private int eventbusHeartbeatPeriod;
73+
6874
@Override
6975
public Set<Role> getRoles() {
7076
return ImmutableSet.of(EVENT_BUS_ROLE);

java/src/org/openqa/selenium/grid/server/EventBusOptions.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.openqa.selenium.grid.server;
1919

20+
import java.time.Duration;
2021
import org.openqa.selenium.events.EventBus;
2122
import org.openqa.selenium.grid.config.Config;
2223
import org.openqa.selenium.internal.Require;
@@ -25,6 +26,7 @@ public class EventBusOptions {
2526

2627
static final String EVENTS_SECTION = "events";
2728
private static final String DEFAULT_CLASS = "org.openqa.selenium.events.zeromq.ZeroMqEventBus";
29+
private static final int DEFAULT_HEARTBEAT_PERIOD = 60;
2830
private final Config config;
2931
private volatile EventBus bus;
3032

@@ -47,6 +49,12 @@ public EventBus getEventBus() {
4749
return localBus;
4850
}
4951

52+
public Duration getHeartbeatPeriod() {
53+
int period =
54+
config.getInt(EVENTS_SECTION, "eventbus-heartbeat-period").orElse(DEFAULT_HEARTBEAT_PERIOD);
55+
return Duration.ofSeconds(period);
56+
}
57+
5058
private EventBus createBus() {
5159
return config.getClass(EVENTS_SECTION, "implementation", EventBus.class, DEFAULT_CLASS);
5260
}

0 commit comments

Comments
 (0)