{@code heartbeatFrequencyMS=ms}: The frequency that the driver will attempt to determine the current state of each server in the
* cluster.
+ *
{@code serverMonitoringMode=enum}: The server monitoring mode, which defines the monitoring protocol to use. Enumerated values:
+ *
+ *
{@code stream};
+ *
{@code poll};
+ *
{@code auto} - the default.
+ *
+ *
*
*
Replica set configuration:
*
@@ -307,6 +317,7 @@ public class ConnectionString {
private Integer serverSelectionTimeout;
private Integer localThreshold;
private Integer heartbeatFrequency;
+ private ServerMonitoringMode serverMonitoringMode;
private String applicationName;
private List compressorList;
private UuidRepresentation uuidRepresentation;
@@ -529,6 +540,7 @@ public ConnectionString(final String connectionString, @Nullable final DnsClient
GENERAL_OPTIONS_KEYS.add("serverselectiontimeoutms");
GENERAL_OPTIONS_KEYS.add("localthresholdms");
GENERAL_OPTIONS_KEYS.add("heartbeatfrequencyms");
+ GENERAL_OPTIONS_KEYS.add("servermonitoringmode");
GENERAL_OPTIONS_KEYS.add("retrywrites");
GENERAL_OPTIONS_KEYS.add("retryreads");
@@ -665,6 +677,9 @@ private void translateOptions(final Map> optionsMap) {
case "heartbeatfrequencyms":
heartbeatFrequency = parseInteger(value, "heartbeatfrequencyms");
break;
+ case "servermonitoringmode":
+ serverMonitoringMode = ServerMonitoringModeUtil.fromString(value);
+ break;
case "appname":
applicationName = value;
break;
@@ -1623,6 +1638,20 @@ public Integer getHeartbeatFrequency() {
return heartbeatFrequency;
}
+ /**
+ * The server monitoring mode, which defines the monitoring protocol to use.
+ *
+ * Default is {@link ServerMonitoringMode#AUTO}.
+ *
+ * @return The {@link ServerMonitoringMode}, or {@code null} if unset and the default is to be used.
+ * @see ServerSettings#getServerMonitoringMode()
+ * @since 5.1
+ */
+ @Nullable
+ public ServerMonitoringMode getServerMonitoringMode() {
+ return serverMonitoringMode;
+ }
+
/**
* Gets the logical name of the application. The application name may be used by the client to identify the application to the server,
* for use in server logs, slow query logs, and profile collection.
@@ -1704,6 +1733,7 @@ public boolean equals(final Object o) {
&& Objects.equals(serverSelectionTimeout, that.serverSelectionTimeout)
&& Objects.equals(localThreshold, that.localThreshold)
&& Objects.equals(heartbeatFrequency, that.heartbeatFrequency)
+ && Objects.equals(serverMonitoringMode, that.serverMonitoringMode)
&& Objects.equals(applicationName, that.applicationName)
&& Objects.equals(compressorList, that.compressorList)
&& Objects.equals(uuidRepresentation, that.uuidRepresentation)
@@ -1717,7 +1747,7 @@ public int hashCode() {
writeConcern, retryWrites, retryReads, readConcern, minConnectionPoolSize, maxConnectionPoolSize, maxWaitTime,
maxConnectionIdleTime, maxConnectionLifeTime, maxConnecting, connectTimeout, socketTimeout, sslEnabled,
sslInvalidHostnameAllowed, requiredReplicaSetName, serverSelectionTimeout, localThreshold, heartbeatFrequency,
- applicationName, compressorList, uuidRepresentation, srvServiceName, srvMaxHosts, proxyHost, proxyPort,
- proxyUsername, proxyPassword);
+ serverMonitoringMode, applicationName, compressorList, uuidRepresentation, srvServiceName, srvMaxHosts, proxyHost,
+ proxyPort, proxyUsername, proxyPassword);
}
}
diff --git a/driver-core/src/main/com/mongodb/connection/ServerMonitoringMode.java b/driver-core/src/main/com/mongodb/connection/ServerMonitoringMode.java
new file mode 100644
index 0000000000..cf54afef4b
--- /dev/null
+++ b/driver-core/src/main/com/mongodb/connection/ServerMonitoringMode.java
@@ -0,0 +1,52 @@
+/*
+ * Copyright 2008-present MongoDB, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.mongodb.connection;
+
+import com.mongodb.event.ClusterListener;
+import com.mongodb.event.ServerHeartbeatFailedEvent;
+import com.mongodb.event.ServerHeartbeatStartedEvent;
+import com.mongodb.event.ServerHeartbeatSucceededEvent;
+import com.mongodb.event.ServerListener;
+
+/**
+ * The server monitoring mode, which defines the monitoring protocol to use.
+ *
+ * @see
+ * server discovery and monitoring (SDAM)
+ * @since 5.1
+ */
+public enum ServerMonitoringMode {
+ /**
+ * Use the streaming protocol when the server supports it or fall back to the polling protocol otherwise.
+ * When the streaming protocol comes into play,
+ * {@link ServerHeartbeatStartedEvent#isAwaited()}, {@link ServerHeartbeatSucceededEvent#isAwaited()},
+ * {@link ServerHeartbeatFailedEvent#isAwaited()} return {@code true} for new events.
+ *
+ * The streaming protocol uses long polling for server monitoring, and is intended to reduce the delay between a server change
+ * that warrants a new event for {@link ServerListener}/{@link ClusterListener},
+ * and that event being emitted, as well as the related housekeeping work being done.
+ */
+ STREAM(),
+ /**
+ * Use the polling protocol.
+ */
+ POLL(),
+ /**
+ * Behave the same as {@link #POLL} if running in a FaaS environment, otherwise behave as {@link #STREAM}.
+ * This is the default.
+ */
+ AUTO()
+}
diff --git a/driver-core/src/main/com/mongodb/connection/ServerSettings.java b/driver-core/src/main/com/mongodb/connection/ServerSettings.java
index d4ef398a04..b4394d0dc7 100644
--- a/driver-core/src/main/com/mongodb/connection/ServerSettings.java
+++ b/driver-core/src/main/com/mongodb/connection/ServerSettings.java
@@ -24,6 +24,7 @@
import java.util.ArrayList;
import java.util.List;
+import java.util.Objects;
import java.util.concurrent.TimeUnit;
import static com.mongodb.assertions.Assertions.notNull;
@@ -38,6 +39,7 @@
public class ServerSettings {
private final long heartbeatFrequencyMS;
private final long minHeartbeatFrequencyMS;
+ private final ServerMonitoringMode serverMonitoringMode;
private final List serverListeners;
private final List serverMonitorListeners;
@@ -68,6 +70,7 @@ public static Builder builder(final ServerSettings serverSettings) {
public static final class Builder {
private long heartbeatFrequencyMS = 10000;
private long minHeartbeatFrequencyMS = 500;
+ private ServerMonitoringMode serverMonitoringMode = ServerMonitoringMode.AUTO;
private List serverListeners = new ArrayList<>();
private List serverMonitorListeners = new ArrayList<>();
@@ -87,6 +90,7 @@ public Builder applySettings(final ServerSettings serverSettings) {
notNull("serverSettings", serverSettings);
heartbeatFrequencyMS = serverSettings.heartbeatFrequencyMS;
minHeartbeatFrequencyMS = serverSettings.minHeartbeatFrequencyMS;
+ serverMonitoringMode = serverSettings.serverMonitoringMode;
serverListeners = new ArrayList<>(serverSettings.serverListeners);
serverMonitorListeners = new ArrayList<>(serverSettings.serverMonitorListeners);
return this;
@@ -117,6 +121,20 @@ public Builder minHeartbeatFrequency(final long minHeartbeatFrequency, final Tim
return this;
}
+ /**
+ * Sets the server monitoring mode, which defines the monitoring protocol to use.
+ * The default value is {@link ServerMonitoringMode#AUTO}.
+ *
+ * @param serverMonitoringMode The {@link ServerMonitoringMode}.
+ * @return {@code this}.
+ * @see #getServerMonitoringMode()
+ * @since 5.1
+ */
+ public Builder serverMonitoringMode(final ServerMonitoringMode serverMonitoringMode) {
+ this.serverMonitoringMode = notNull("serverMonitoringMode", serverMonitoringMode);
+ return this;
+ }
+
/**
* Add a server listener.
*
@@ -181,6 +199,10 @@ public Builder applyConnectionString(final ConnectionString connectionString) {
if (heartbeatFrequency != null) {
heartbeatFrequencyMS = heartbeatFrequency;
}
+ ServerMonitoringMode serverMonitoringMode = connectionString.getServerMonitoringMode();
+ if (serverMonitoringMode != null) {
+ this.serverMonitoringMode = serverMonitoringMode;
+ }
return this;
}
@@ -215,6 +237,19 @@ public long getMinHeartbeatFrequency(final TimeUnit timeUnit) {
return timeUnit.convert(minHeartbeatFrequencyMS, TimeUnit.MILLISECONDS);
}
+ /**
+ * Gets the server monitoring mode, which defines the monitoring protocol to use.
+ * The default value is {@link ServerMonitoringMode#AUTO}.
+ *
+ * @return The {@link ServerMonitoringMode}.
+ * @see Builder#serverMonitoringMode(ServerMonitoringMode)
+ * @see ConnectionString#getServerMonitoringMode()
+ * @since 5.1
+ */
+ public ServerMonitoringMode getServerMonitoringMode() {
+ return serverMonitoringMode;
+ }
+
/**
* Gets the server listeners. The default value is an empty list.
*
@@ -243,33 +278,22 @@ public boolean equals(final Object o) {
if (o == null || getClass() != o.getClass()) {
return false;
}
-
- ServerSettings that = (ServerSettings) o;
-
- if (heartbeatFrequencyMS != that.heartbeatFrequencyMS) {
- return false;
- }
- if (minHeartbeatFrequencyMS != that.minHeartbeatFrequencyMS) {
- return false;
- }
-
- if (!serverListeners.equals(that.serverListeners)) {
- return false;
- }
- if (!serverMonitorListeners.equals(that.serverMonitorListeners)) {
- return false;
- }
-
- return true;
+ final ServerSettings that = (ServerSettings) o;
+ return heartbeatFrequencyMS == that.heartbeatFrequencyMS
+ && minHeartbeatFrequencyMS == that.minHeartbeatFrequencyMS
+ && serverMonitoringMode == that.serverMonitoringMode
+ && Objects.equals(serverListeners, that.serverListeners)
+ && Objects.equals(serverMonitorListeners, that.serverMonitorListeners);
}
@Override
public int hashCode() {
- int result = (int) (heartbeatFrequencyMS ^ (heartbeatFrequencyMS >>> 32));
- result = 31 * result + (int) (minHeartbeatFrequencyMS ^ (minHeartbeatFrequencyMS >>> 32));
- result = 31 * result + serverListeners.hashCode();
- result = 31 * result + serverMonitorListeners.hashCode();
- return result;
+ return Objects.hash(
+ heartbeatFrequencyMS,
+ minHeartbeatFrequencyMS,
+ serverMonitoringMode,
+ serverListeners,
+ serverMonitorListeners);
}
@Override
@@ -277,6 +301,7 @@ public String toString() {
return "ServerSettings{"
+ "heartbeatFrequencyMS=" + heartbeatFrequencyMS
+ ", minHeartbeatFrequencyMS=" + minHeartbeatFrequencyMS
+ + ", serverMonitoringMode=" + serverMonitoringMode
+ ", serverListeners='" + serverListeners + '\''
+ ", serverMonitorListeners='" + serverMonitorListeners + '\''
+ '}';
@@ -285,6 +310,7 @@ public String toString() {
ServerSettings(final Builder builder) {
heartbeatFrequencyMS = builder.heartbeatFrequencyMS;
minHeartbeatFrequencyMS = builder.minHeartbeatFrequencyMS;
+ serverMonitoringMode = builder.serverMonitoringMode;
serverListeners = unmodifiableList(builder.serverListeners);
serverMonitorListeners = unmodifiableList(builder.serverMonitorListeners);
}
diff --git a/driver-core/src/main/com/mongodb/event/ServerHeartbeatFailedEvent.java b/driver-core/src/main/com/mongodb/event/ServerHeartbeatFailedEvent.java
index b324ddb84c..a8468effe4 100644
--- a/driver-core/src/main/com/mongodb/event/ServerHeartbeatFailedEvent.java
+++ b/driver-core/src/main/com/mongodb/event/ServerHeartbeatFailedEvent.java
@@ -17,6 +17,7 @@
package com.mongodb.event;
import com.mongodb.connection.ConnectionId;
+import com.mongodb.connection.ServerMonitoringMode;
import java.util.concurrent.TimeUnit;
@@ -77,6 +78,7 @@ public long getElapsedTime(final TimeUnit timeUnit) {
* to the server and the time that the server waited before sending a response.
*
* @return whether the response was awaited
+ * @see ServerMonitoringMode#STREAM
* @since 4.1
* @mongodb.server.release 4.4
*/
diff --git a/driver-core/src/main/com/mongodb/event/ServerHeartbeatStartedEvent.java b/driver-core/src/main/com/mongodb/event/ServerHeartbeatStartedEvent.java
index 4397c510d9..f83dc5ef00 100644
--- a/driver-core/src/main/com/mongodb/event/ServerHeartbeatStartedEvent.java
+++ b/driver-core/src/main/com/mongodb/event/ServerHeartbeatStartedEvent.java
@@ -17,6 +17,7 @@
package com.mongodb.event;
import com.mongodb.connection.ConnectionId;
+import com.mongodb.connection.ServerMonitoringMode;
import static com.mongodb.assertions.Assertions.notNull;
@@ -27,14 +28,30 @@
*/
public final class ServerHeartbeatStartedEvent {
private final ConnectionId connectionId;
+ private final boolean awaited;
/**
* Construct an instance.
*
* @param connectionId the non-null connnectionId
+ * @param awaited {@code true} if and only if the heartbeat is for an awaitable `hello` / legacy hello.
+ * @since 5.1
*/
- public ServerHeartbeatStartedEvent(final ConnectionId connectionId) {
+ public ServerHeartbeatStartedEvent(final ConnectionId connectionId, final boolean awaited) {
this.connectionId = notNull("connectionId", connectionId);
+ this.awaited = awaited;
+ }
+
+ /**
+ * Construct an instance.
+ *
+ * @param connectionId the non-null connnectionId
+ * @deprecated Prefer {@link #ServerHeartbeatStartedEvent(ConnectionId, boolean)}.
+ * If this constructor is used then {@link #isAwaited()} is {@code false}.
+ */
+ @Deprecated
+ public ServerHeartbeatStartedEvent(final ConnectionId connectionId) {
+ this(connectionId, false);
}
/**
@@ -46,12 +63,24 @@ public ConnectionId getConnectionId() {
return connectionId;
}
+ /**
+ * Gets whether the heartbeat is for an awaitable `hello` / legacy hello.
+ *
+ * @return {@code true} if and only if the heartbeat is for an awaitable `hello` / legacy hello.
+ * @see ServerMonitoringMode#STREAM
+ * @since 5.1
+ */
+ public boolean isAwaited() {
+ return awaited;
+ }
+
@Override
public String toString() {
return "ServerHeartbeatStartedEvent{"
+ "connectionId=" + connectionId
+ ", server=" + connectionId.getServerId().getAddress()
+ ", clusterId=" + connectionId.getServerId().getClusterId()
+ + ", awaited=" + awaited
+ "} " + super.toString();
}
}
diff --git a/driver-core/src/main/com/mongodb/event/ServerHeartbeatSucceededEvent.java b/driver-core/src/main/com/mongodb/event/ServerHeartbeatSucceededEvent.java
index e6deb0bb7a..20e9741275 100644
--- a/driver-core/src/main/com/mongodb/event/ServerHeartbeatSucceededEvent.java
+++ b/driver-core/src/main/com/mongodb/event/ServerHeartbeatSucceededEvent.java
@@ -17,6 +17,7 @@
package com.mongodb.event;
import com.mongodb.connection.ConnectionId;
+import com.mongodb.connection.ServerMonitoringMode;
import org.bson.BsonDocument;
import java.util.concurrent.TimeUnit;
@@ -87,6 +88,7 @@ public long getElapsedTime(final TimeUnit timeUnit) {
* to the server and the time that the server waited before sending a response.
*
* @return whether the response was awaited
+ * @see ServerMonitoringMode#STREAM
* @since 4.1
* @mongodb.server.release 4.4
*/
diff --git a/driver-core/src/main/com/mongodb/internal/connection/DefaultClusterFactory.java b/driver-core/src/main/com/mongodb/internal/connection/DefaultClusterFactory.java
index 38fa28d3b3..0375373c23 100644
--- a/driver-core/src/main/com/mongodb/internal/connection/DefaultClusterFactory.java
+++ b/driver-core/src/main/com/mongodb/internal/connection/DefaultClusterFactory.java
@@ -110,7 +110,7 @@ public Cluster createCluster(final ClusterSettings originalClusterSettings, fina
connectionPoolSettings, internalConnectionPoolSettings,
streamFactory, heartbeatStreamFactory, credential, loggerSettings, commandListener, applicationName,
mongoDriverInformation != null ? mongoDriverInformation : MongoDriverInformation.builder().build(), compressorList,
- serverApi);
+ serverApi, FaasEnvironment.getFaasEnvironment() != FaasEnvironment.UNKNOWN);
if (clusterSettings.getMode() == ClusterConnectionMode.SINGLE) {
return new SingleServerCluster(clusterId, clusterSettings, serverFactory);
diff --git a/driver-core/src/main/com/mongodb/internal/connection/DefaultClusterableServerFactory.java b/driver-core/src/main/com/mongodb/internal/connection/DefaultClusterableServerFactory.java
index 5f5b1e97b1..7d0f5b62e5 100644
--- a/driver-core/src/main/com/mongodb/internal/connection/DefaultClusterableServerFactory.java
+++ b/driver-core/src/main/com/mongodb/internal/connection/DefaultClusterableServerFactory.java
@@ -53,6 +53,7 @@ public class DefaultClusterableServerFactory implements ClusterableServerFactory
private final List compressorList;
@Nullable
private final ServerApi serverApi;
+ private final boolean isFunctionAsAServiceEnvironment;
public DefaultClusterableServerFactory(
final ServerSettings serverSettings, final ConnectionPoolSettings connectionPoolSettings,
@@ -62,7 +63,7 @@ public DefaultClusterableServerFactory(
final LoggerSettings loggerSettings,
@Nullable final CommandListener commandListener,
@Nullable final String applicationName, @Nullable final MongoDriverInformation mongoDriverInformation,
- final List compressorList, @Nullable final ServerApi serverApi) {
+ final List compressorList, @Nullable final ServerApi serverApi, final boolean isFunctionAsAServiceEnvironment) {
this.serverSettings = serverSettings;
this.connectionPoolSettings = connectionPoolSettings;
this.internalConnectionPoolSettings = internalConnectionPoolSettings;
@@ -75,6 +76,7 @@ public DefaultClusterableServerFactory(
this.mongoDriverInformation = mongoDriverInformation;
this.compressorList = compressorList;
this.serverApi = serverApi;
+ this.isFunctionAsAServiceEnvironment = isFunctionAsAServiceEnvironment;
}
@Override
@@ -86,7 +88,7 @@ public ClusterableServer create(final Cluster cluster, final ServerAddress serve
// no credentials, compressor list, or command listener for the server monitor factory
new InternalStreamConnectionFactory(clusterMode, true, heartbeatStreamFactory, null, applicationName,
mongoDriverInformation, emptyList(), loggerSettings, null, serverApi),
- clusterMode, serverApi, sdamProvider);
+ clusterMode, serverApi, isFunctionAsAServiceEnvironment, sdamProvider);
ConnectionPool connectionPool = new DefaultConnectionPool(serverId,
new InternalStreamConnectionFactory(clusterMode, streamFactory, credential, applicationName,
mongoDriverInformation, compressorList, loggerSettings, commandListener, serverApi),
diff --git a/driver-core/src/main/com/mongodb/internal/connection/DefaultServerMonitor.java b/driver-core/src/main/com/mongodb/internal/connection/DefaultServerMonitor.java
index e4618fc31f..55030a6db3 100644
--- a/driver-core/src/main/com/mongodb/internal/connection/DefaultServerMonitor.java
+++ b/driver-core/src/main/com/mongodb/internal/connection/DefaultServerMonitor.java
@@ -50,6 +50,7 @@
import static com.mongodb.MongoNamespace.COMMAND_COLLECTION_NAME;
import static com.mongodb.ReadPreference.primary;
import static com.mongodb.assertions.Assertions.assertNotNull;
+import static com.mongodb.assertions.Assertions.fail;
import static com.mongodb.assertions.Assertions.notNull;
import static com.mongodb.connection.ServerType.UNKNOWN;
import static com.mongodb.internal.Locks.checkedWithLock;
@@ -76,12 +77,15 @@ class DefaultServerMonitor implements ServerMonitor {
private final ClusterConnectionMode clusterConnectionMode;
@Nullable
private final ServerApi serverApi;
+ private final boolean isFunctionAsAServiceEnvironment;
private final ServerSettings serverSettings;
- private final ServerMonitorRunnable monitor;
- private final Thread monitorThread;
- private final RoundTripTimeRunnable roundTripTimeMonitor;
+ private final ServerMonitor monitor;
+ /**
+ * Must be guarded by {@link #lock}.
+ */
+ @Nullable
+ private RoundTripTimeMonitor roundTripTimeMonitor;
private final ExponentiallyWeightedMovingAverage averageRoundTripTime = new ExponentiallyWeightedMovingAverage(0.2);
- private final Thread roundTripTimeMonitorThread;
private final Lock lock = new ReentrantLock();
private final Condition condition = lock.newCondition();
private volatile boolean isClosed;
@@ -90,6 +94,7 @@ class DefaultServerMonitor implements ServerMonitor {
final InternalConnectionFactory internalConnectionFactory,
final ClusterConnectionMode clusterConnectionMode,
@Nullable final ServerApi serverApi,
+ final boolean isFunctionAsAServiceEnvironment,
final Provider sdamProvider) {
this.serverSettings = notNull("serverSettings", serverSettings);
this.serverId = notNull("serverId", serverId);
@@ -97,21 +102,25 @@ class DefaultServerMonitor implements ServerMonitor {
this.internalConnectionFactory = notNull("internalConnectionFactory", internalConnectionFactory);
this.clusterConnectionMode = notNull("clusterConnectionMode", clusterConnectionMode);
this.serverApi = serverApi;
+ this.isFunctionAsAServiceEnvironment = isFunctionAsAServiceEnvironment;
this.sdamProvider = sdamProvider;
- monitor = new ServerMonitorRunnable();
- monitorThread = new Thread(monitor, "cluster-" + this.serverId.getClusterId() + "-" + this.serverId.getAddress());
- monitorThread.setDaemon(true);
- roundTripTimeMonitor = new RoundTripTimeRunnable();
- roundTripTimeMonitorThread = new Thread(roundTripTimeMonitor,
- "cluster-rtt-" + this.serverId.getClusterId() + "-" + this.serverId.getAddress());
- roundTripTimeMonitorThread.setDaemon(true);
+ monitor = new ServerMonitor();
+ roundTripTimeMonitor = null;
isClosed = false;
}
@Override
public void start() {
- monitorThread.start();
- roundTripTimeMonitorThread.start();
+ monitor.start();
+ }
+
+ private void ensureRoundTripTimeMonitorStarted() {
+ withLock(lock, () -> {
+ if (!isClosed && roundTripTimeMonitor == null) {
+ roundTripTimeMonitor = new RoundTripTimeMonitor();
+ roundTripTimeMonitor.start();
+ }
+ });
}
@Override
@@ -120,12 +129,16 @@ public void connect() {
}
@Override
+ @SuppressWarnings("try")
public void close() {
- isClosed = true;
- monitor.close();
- monitorThread.interrupt();
- roundTripTimeMonitor.close();
- roundTripTimeMonitorThread.interrupt();
+ withLock(lock, () -> {
+ isClosed = true;
+ //noinspection EmptyTryBlock
+ try (ServerMonitor ignoredAutoClosed = monitor;
+ RoundTripTimeMonitor ignoredAutoClose2 = roundTripTimeMonitor) {
+ // we are automatically closing resources here
+ }
+ });
}
@Override
@@ -133,11 +146,18 @@ public void cancelCurrentCheck() {
monitor.cancelCurrentCheck();
}
- class ServerMonitorRunnable implements Runnable {
+ class ServerMonitor extends Thread implements AutoCloseable {
private volatile InternalConnection connection = null;
private volatile boolean currentCheckCancelled;
- void close() {
+ ServerMonitor() {
+ super("cluster-" + serverId.getClusterId() + "-" + serverId.getAddress());
+ setDaemon(true);
+ }
+
+ @Override
+ public void close() {
+ interrupt();
InternalConnection connection = this.connection;
if (connection != null) {
connection.close();
@@ -151,6 +171,10 @@ public void run() {
while (!isClosed) {
ServerDescription previousServerDescription = currentServerDescription;
currentServerDescription = lookupServerDescription(currentServerDescription);
+ boolean shouldStreamResponses = shouldStreamResponses(currentServerDescription);
+ if (shouldStreamResponses) {
+ ensureRoundTripTimeMonitorStarted();
+ }
if (isClosed) {
continue;
@@ -165,8 +189,7 @@ public void run() {
logStateChange(previousServerDescription, currentServerDescription);
sdamProvider.get().update(currentServerDescription);
- if (((connection == null || shouldStreamResponses(currentServerDescription))
- && currentServerDescription.getTopologyVersion() != null && currentServerDescription.getType() != UNKNOWN)
+ if ((shouldStreamResponses && currentServerDescription.getType() != UNKNOWN)
|| (connection != null && connection.hasMoreToCome())
|| (currentServerDescription.getException() instanceof MongoSocketException
&& previousServerDescription.getType() != UNKNOWN)) {
@@ -199,7 +222,9 @@ private ServerDescription lookupServerDescription(final ServerDescription curren
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(format("Checking status of %s", serverId.getAddress()));
}
- serverMonitorListener.serverHearbeatStarted(new ServerHeartbeatStartedEvent(connection.getDescription().getConnectionId()));
+ boolean shouldStreamResponses = shouldStreamResponses(currentServerDescription);
+ serverMonitorListener.serverHearbeatStarted(new ServerHeartbeatStartedEvent(
+ connection.getDescription().getConnectionId(), shouldStreamResponses));
long start = System.nanoTime();
try {
@@ -207,7 +232,7 @@ private ServerDescription lookupServerDescription(final ServerDescription curren
if (!connection.hasMoreToCome()) {
BsonDocument helloDocument = new BsonDocument(getHandshakeCommandName(currentServerDescription), new BsonInt32(1))
.append("helloOk", BsonBoolean.TRUE);
- if (shouldStreamResponses(currentServerDescription)) {
+ if (shouldStreamResponses) {
helloDocument.append("topologyVersion", assertNotNull(currentServerDescription.getTopologyVersion()).asDocument());
helloDocument.append("maxAwaitTimeMS", new BsonInt64(serverSettings.getHeartbeatFrequency(MILLISECONDS)));
}
@@ -217,7 +242,7 @@ private ServerDescription lookupServerDescription(final ServerDescription curren
}
BsonDocument helloResult;
- if (shouldStreamResponses(currentServerDescription)) {
+ if (shouldStreamResponses) {
helloResult = connection.receive(new BsonDocumentCodec(), sessionContext,
Math.toIntExact(serverSettings.getHeartbeatFrequency(MILLISECONDS)));
} else {
@@ -225,15 +250,18 @@ private ServerDescription lookupServerDescription(final ServerDescription curren
}
long elapsedTimeNanos = System.nanoTime() - start;
+ if (!shouldStreamResponses) {
+ averageRoundTripTime.addSample(elapsedTimeNanos);
+ }
serverMonitorListener.serverHeartbeatSucceeded(
new ServerHeartbeatSucceededEvent(connection.getDescription().getConnectionId(), helloResult,
- elapsedTimeNanos, currentServerDescription.getTopologyVersion() != null));
+ elapsedTimeNanos, shouldStreamResponses));
return createServerDescription(serverId.getAddress(), helloResult, averageRoundTripTime.getAverage());
} catch (Exception e) {
serverMonitorListener.serverHeartbeatFailed(
new ServerHeartbeatFailedEvent(connection.getDescription().getConnectionId(), System.nanoTime() - start,
- currentServerDescription.getTopologyVersion() != null, e));
+ shouldStreamResponses, e));
throw e;
}
} catch (Throwable t) {
@@ -251,7 +279,21 @@ private ServerDescription lookupServerDescription(final ServerDescription curren
}
private boolean shouldStreamResponses(final ServerDescription currentServerDescription) {
- return currentServerDescription.getTopologyVersion() != null;
+ boolean serverSupportsStreaming = currentServerDescription.getTopologyVersion() != null;
+ switch (serverSettings.getServerMonitoringMode()) {
+ case STREAM: {
+ return serverSupportsStreaming;
+ }
+ case POLL: {
+ return false;
+ }
+ case AUTO: {
+ return !isFunctionAsAServiceEnvironment && serverSupportsStreaming;
+ }
+ default: {
+ throw fail();
+ }
+ }
}
private CommandMessage createCommandMessage(final BsonDocument command, final InternalConnection connection,
@@ -381,10 +423,17 @@ static boolean shouldLogStageChange(final ServerDescription previous, final Serv
}
- private class RoundTripTimeRunnable implements Runnable {
+ private class RoundTripTimeMonitor extends Thread implements AutoCloseable {
private volatile InternalConnection connection = null;
- void close() {
+ RoundTripTimeMonitor() {
+ super("cluster-rtt-" + serverId.getClusterId() + "-" + serverId.getAddress());
+ setDaemon(true);
+ }
+
+ @Override
+ public void close() {
+ interrupt();
InternalConnection connection = this.connection;
if (connection != null) {
connection.close();
diff --git a/driver-core/src/main/com/mongodb/internal/connection/ServerMonitoringModeUtil.java b/driver-core/src/main/com/mongodb/internal/connection/ServerMonitoringModeUtil.java
new file mode 100644
index 0000000000..17629f38a5
--- /dev/null
+++ b/driver-core/src/main/com/mongodb/internal/connection/ServerMonitoringModeUtil.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2008-present MongoDB, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.mongodb.internal.connection;
+
+import com.mongodb.connection.ServerMonitoringMode;
+
+import static java.lang.String.format;
+
+/**
+ *
This class is not part of the public API and may be removed or changed at any time