From 33631e1fae0e37f0e1ced824323a30c641bf3b20 Mon Sep 17 00:00:00 2001
From: Valentin Kovalenko
Date: Wed, 14 Feb 2024 17:46:02 -0700
Subject: [PATCH 01/13] Support `ServerHeartbeatStartedEvent.awaited`
This change was supposed to be done as part of https://github.com/mongodb/mongo-java-driver/commit/c8b028a509c825c0de30c67fa1eb6a5871d8bea9
JAVA-4936
---
.../event/ServerHeartbeatStartedEvent.java | 29 ++++++++++++++++++-
.../connection/DefaultServerMonitor.java | 10 +++----
.../AsynchronousClusterEventListenerTest.java | 2 +-
3 files changed, 34 insertions(+), 7 deletions(-)
diff --git a/driver-core/src/main/com/mongodb/event/ServerHeartbeatStartedEvent.java b/driver-core/src/main/com/mongodb/event/ServerHeartbeatStartedEvent.java
index 4397c510d9..0e377ed7b3 100644
--- a/driver-core/src/main/com/mongodb/event/ServerHeartbeatStartedEvent.java
+++ b/driver-core/src/main/com/mongodb/event/ServerHeartbeatStartedEvent.java
@@ -27,14 +27,30 @@
*/
public final class ServerHeartbeatStartedEvent {
private final ConnectionId connectionId;
+ private final boolean awaited;
/**
* Construct an instance.
*
* @param connectionId the non-null connnectionId
+ * @param awaited {@code true} if and only if the heartbeat is for an awaitable `hello` / legacy hello.
+ * @since 5.1
*/
- public ServerHeartbeatStartedEvent(final ConnectionId connectionId) {
+ public ServerHeartbeatStartedEvent(final ConnectionId connectionId, final boolean awaited) {
this.connectionId = notNull("connectionId", connectionId);
+ this.awaited = awaited;
+ }
+
+ /**
+ * Construct an instance.
+ *
+ * @param connectionId the non-null connnectionId
+ * @deprecated Prefer {@link #ServerHeartbeatStartedEvent(ConnectionId, boolean)}.
+ * If this constructor is used then {@link #isAwaited()} is {@code false}.
+ */
+ @Deprecated
+ public ServerHeartbeatStartedEvent(final ConnectionId connectionId) {
+ this(connectionId, false);
}
/**
@@ -46,12 +62,23 @@ public ConnectionId getConnectionId() {
return connectionId;
}
+ /**
+ * Gets whether the heartbeat is for an awaitable `hello` / legacy hello.
+ *
+ * @return {@code true} if and only if the heartbeat is for an awaitable `hello` / legacy hello.
+ * @since 5.1
+ */
+ public boolean isAwaited() {
+ return awaited;
+ }
+
@Override
public String toString() {
return "ServerHeartbeatStartedEvent{"
+ "connectionId=" + connectionId
+ ", server=" + connectionId.getServerId().getAddress()
+ ", clusterId=" + connectionId.getServerId().getClusterId()
+ + ", awaited=" + awaited
+ "} " + super.toString();
}
}
diff --git a/driver-core/src/main/com/mongodb/internal/connection/DefaultServerMonitor.java b/driver-core/src/main/com/mongodb/internal/connection/DefaultServerMonitor.java
index e4618fc31f..237a692dde 100644
--- a/driver-core/src/main/com/mongodb/internal/connection/DefaultServerMonitor.java
+++ b/driver-core/src/main/com/mongodb/internal/connection/DefaultServerMonitor.java
@@ -165,8 +165,7 @@ public void run() {
logStateChange(previousServerDescription, currentServerDescription);
sdamProvider.get().update(currentServerDescription);
- if (((connection == null || shouldStreamResponses(currentServerDescription))
- && currentServerDescription.getTopologyVersion() != null && currentServerDescription.getType() != UNKNOWN)
+ if ((shouldStreamResponses(currentServerDescription) && currentServerDescription.getType() != UNKNOWN)
|| (connection != null && connection.hasMoreToCome())
|| (currentServerDescription.getException() instanceof MongoSocketException
&& previousServerDescription.getType() != UNKNOWN)) {
@@ -199,7 +198,8 @@ private ServerDescription lookupServerDescription(final ServerDescription curren
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(format("Checking status of %s", serverId.getAddress()));
}
- serverMonitorListener.serverHearbeatStarted(new ServerHeartbeatStartedEvent(connection.getDescription().getConnectionId()));
+ serverMonitorListener.serverHearbeatStarted(new ServerHeartbeatStartedEvent(
+ connection.getDescription().getConnectionId(), shouldStreamResponses(currentServerDescription)));
long start = System.nanoTime();
try {
@@ -227,13 +227,13 @@ private ServerDescription lookupServerDescription(final ServerDescription curren
long elapsedTimeNanos = System.nanoTime() - start;
serverMonitorListener.serverHeartbeatSucceeded(
new ServerHeartbeatSucceededEvent(connection.getDescription().getConnectionId(), helloResult,
- elapsedTimeNanos, currentServerDescription.getTopologyVersion() != null));
+ elapsedTimeNanos, shouldStreamResponses(currentServerDescription)));
return createServerDescription(serverId.getAddress(), helloResult, averageRoundTripTime.getAverage());
} catch (Exception e) {
serverMonitorListener.serverHeartbeatFailed(
new ServerHeartbeatFailedEvent(connection.getDescription().getConnectionId(), System.nanoTime() - start,
- currentServerDescription.getTopologyVersion() != null, e));
+ shouldStreamResponses(currentServerDescription), e));
throw e;
}
} catch (Throwable t) {
diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/AsynchronousClusterEventListenerTest.java b/driver-core/src/test/unit/com/mongodb/internal/connection/AsynchronousClusterEventListenerTest.java
index 2403f3f324..09d2fa864a 100644
--- a/driver-core/src/test/unit/com/mongodb/internal/connection/AsynchronousClusterEventListenerTest.java
+++ b/driver-core/src/test/unit/com/mongodb/internal/connection/AsynchronousClusterEventListenerTest.java
@@ -72,7 +72,7 @@ public void testEventsPublished() throws InterruptedException {
listener.clusterDescriptionChanged(clusterDescriptionChangedEvent);
assertEquals(clusterDescriptionChangedEvent, targetListener.take());
- ServerHeartbeatStartedEvent serverHeartbeatStartedEvent = new ServerHeartbeatStartedEvent(connectionId);
+ ServerHeartbeatStartedEvent serverHeartbeatStartedEvent = new ServerHeartbeatStartedEvent(connectionId, false);
listener.serverHearbeatStarted(serverHeartbeatStartedEvent);
assertEquals(serverHeartbeatStartedEvent, targetListener.take());
From d7aa01c2cc20d10ceb86c3567f5eea9bd62a66b3 Mon Sep 17 00:00:00 2001
From: Valentin Kovalenko
Date: Wed, 14 Feb 2024 17:55:04 -0700
Subject: [PATCH 02/13] Add support for the unified test schema version 1.17
Note that despite this commit marking 1.16 as supported,
it actually is not. The existing practice
is to mark as supported all the versions up to the one that is
either fully or partially supported, despite them
being likely not actually supported. When someone discovers
that a feature is used in a test, but is not implemented,
then he implements that feature.
JAVA-4936
---
.../event/TestServerMonitorListener.java | 149 ++++++++++++++++++
.../client/unified/ContextElement.java | 48 ++++++
.../com/mongodb/client/unified/Entities.java | 25 ++-
.../mongodb/client/unified/EventMatcher.java | 104 +++++++++++-
.../mongodb/client/unified/UnifiedTest.java | 25 ++-
5 files changed, 339 insertions(+), 12 deletions(-)
create mode 100644 driver-core/src/test/unit/com/mongodb/event/TestServerMonitorListener.java
diff --git a/driver-core/src/test/unit/com/mongodb/event/TestServerMonitorListener.java b/driver-core/src/test/unit/com/mongodb/event/TestServerMonitorListener.java
new file mode 100644
index 0000000000..b009b5094f
--- /dev/null
+++ b/driver-core/src/test/unit/com/mongodb/event/TestServerMonitorListener.java
@@ -0,0 +1,149 @@
+/*
+ * Copyright 2008-present MongoDB, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.mongodb.event;
+
+import com.mongodb.annotations.ThreadSafe;
+import com.mongodb.lang.Nullable;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+import static com.mongodb.assertions.Assertions.assertNotNull;
+import static com.mongodb.assertions.Assertions.assertTrue;
+import static java.util.Collections.unmodifiableSet;
+import static java.util.stream.StreamSupport.stream;
+
+@ThreadSafe
+public final class TestServerMonitorListener implements ServerMonitorListener {
+ private final Set> listenableEventTypes;
+ private final Lock lock;
+ private final Condition condition;
+ private final List events;
+
+ public TestServerMonitorListener(final Iterable listenableEventTypes) {
+ this.listenableEventTypes = unmodifiableSet(stream(listenableEventTypes.spliterator(), false)
+ .map(TestServerMonitorListener::nullableEventType)
+ .filter(Objects::nonNull)
+ .collect(Collectors.toSet()));
+ lock = new ReentrantLock();
+ condition = lock.newCondition();
+ events = new ArrayList<>();
+ }
+
+ public void serverHearbeatStarted(final ServerHeartbeatStartedEvent event) {
+ register(event);
+ }
+
+ public void serverHeartbeatSucceeded(final ServerHeartbeatSucceededEvent event) {
+ register(event);
+ }
+
+ public void serverHeartbeatFailed(final ServerHeartbeatFailedEvent event) {
+ register(event);
+ }
+
+ public void waitForEvents(final Class type, final Predicate super T> matcher, final int count, final Duration duration)
+ throws InterruptedException, TimeoutException {
+ assertTrue(listenable(type));
+ long remainingNanos = duration.toNanos();
+ lock.lock();
+ try {
+ long observedCount = countEvents(type, matcher);
+ while (observedCount < count) {
+ if (remainingNanos <= 0) {
+ throw new TimeoutException(String.format("Timed out waiting for %d %s events. The observed count is %d.",
+ count, type.getSimpleName(), observedCount));
+ }
+ remainingNanos = condition.awaitNanos(remainingNanos);
+ observedCount = countEvents(type, matcher);
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public long countEvents(final Class type, final Predicate super T> matcher) {
+ assertTrue(listenable(type));
+ lock.lock();
+ try {
+ return events.stream()
+ .filter(type::isInstance)
+ .map(type::cast)
+ .filter(matcher)
+ .count();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public List getEvents() {
+ lock.lock();
+ try {
+ return new ArrayList<>(events);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public static Class> eventType(final String eventType) {
+ return assertNotNull(nullableEventType(eventType));
+ }
+
+ @Nullable
+ private static Class> nullableEventType(final String eventType) {
+ switch (eventType) {
+ case "serverHeartbeatStartedEvent": {
+ return ServerHeartbeatStartedEvent.class;
+ }
+ case "serverHeartbeatSucceededEvent": {
+ return ServerHeartbeatSucceededEvent.class;
+ }
+ case "serverHeartbeatFailedEvent": {
+ return ServerHeartbeatFailedEvent.class;
+ }
+ default: {
+ return null;
+ }
+ }
+ }
+
+ private boolean listenable(final Class> eventType) {
+ return listenableEventTypes.contains(eventType);
+ }
+
+ private void register(final Object event) {
+ if (!listenable(event.getClass())) {
+ return;
+ }
+ lock.lock();
+ try {
+ events.add(event);
+ condition.signalAll();
+ } finally {
+ lock.unlock();
+ }
+ }
+}
diff --git a/driver-sync/src/test/functional/com/mongodb/client/unified/ContextElement.java b/driver-sync/src/test/functional/com/mongodb/client/unified/ContextElement.java
index 110cf321f4..8ffa35388e 100644
--- a/driver-sync/src/test/functional/com/mongodb/client/unified/ContextElement.java
+++ b/driver-sync/src/test/functional/com/mongodb/client/unified/ContextElement.java
@@ -23,6 +23,7 @@
import com.mongodb.event.CommandSucceededEvent;
import com.mongodb.internal.logging.LogMessage;
import org.bson.BsonArray;
+import org.bson.BsonBoolean;
import org.bson.BsonDocument;
import org.bson.BsonString;
import org.bson.BsonValue;
@@ -127,6 +128,48 @@ public static ContextElement ofClusterDescriptionChangedEventCount(final String
return new EventCountContext("Cluster Description Changed Event Count", client, event, count);
}
+ public static ContextElement ofWaitForServerMonitorEvents(final String client, final BsonDocument event, final int count) {
+ return new EventCountContext("Wait For Server Monitor Events", client, event, count);
+ }
+
+ public static ContextElement ofServerMonitorEventCount(final String client, final BsonDocument event, final int count) {
+ return new EventCountContext("Server Monitor Event Count", client, event, count);
+ }
+
+ public static ContextElement ofServerMonitorEvents(final String client, final BsonArray expectedEvents, final List> actualEvents) {
+ return new ContextElement() {
+ @Override
+ public String toString() {
+ return "Events MatchingContext: \n"
+ + " client: '" + client + "'\n"
+ + " Expected events:\n"
+ + new BsonDocument("events", expectedEvents).toJson(JsonWriterSettings.builder().indent(true).build()) + "\n"
+ + " Actual events:\n"
+ + new BsonDocument("events",
+ new BsonArray(actualEvents.stream().map(ContextElement::serverMonitorEventToDocument).collect(Collectors.toList())))
+ .toJson(JsonWriterSettings.builder().indent(true).build())
+ + "\n";
+ }
+
+ private BsonDocument toDocument(final Object event) {
+ return new BsonDocument(EventMatcher.getEventType(event.getClass()),
+ new BsonDocument("awaited", BsonBoolean.valueOf(EventMatcher.getAwaitedFromServerMonitorEvent(event))));
+ }
+ };
+ }
+
+ public static ContextElement ofServerMonitorEvent(final BsonDocument expected, final Object actual, final int eventPosition) {
+ return new ContextElement() {
+ @Override
+ public String toString() {
+ return "Event Matching Context\n"
+ + " event position: " + eventPosition + "\n"
+ + " expected event: " + expected + "\n"
+ + " actual event: " + serverMonitorEventToDocument(actual) + "\n";
+ }
+ };
+ }
+
private static class EventCountContext extends ContextElement {
private final String name;
@@ -417,4 +460,9 @@ public String toString() {
private static BsonDocument connectionPoolEventToDocument(final Object event) {
return new BsonDocument(event.getClass().getSimpleName(), new BsonDocument());
}
+
+ private static BsonDocument serverMonitorEventToDocument(final Object event) {
+ return new BsonDocument(EventMatcher.getEventType(event.getClass()),
+ new BsonDocument("awaited", BsonBoolean.valueOf(EventMatcher.getAwaitedFromServerMonitorEvent(event))));
+ }
}
diff --git a/driver-sync/src/test/functional/com/mongodb/client/unified/Entities.java b/driver-sync/src/test/functional/com/mongodb/client/unified/Entities.java
index ba1ed53cd8..af95b919a5 100644
--- a/driver-sync/src/test/functional/com/mongodb/client/unified/Entities.java
+++ b/driver-sync/src/test/functional/com/mongodb/client/unified/Entities.java
@@ -23,6 +23,8 @@
import com.mongodb.ReadConcernLevel;
import com.mongodb.ServerApi;
import com.mongodb.ServerApiVersion;
+import com.mongodb.connection.ServerMonitoringMode;
+import com.mongodb.event.TestServerMonitorListener;
import com.mongodb.internal.connection.TestClusterListener;
import com.mongodb.logging.TestLoggingInterceptor;
import com.mongodb.TransactionOptions;
@@ -120,6 +122,7 @@ public final class Entities {
private final Map clientConnectionPoolListeners = new HashMap<>();
private final Map clientServerListeners = new HashMap<>();
private final Map clientClusterListeners = new HashMap<>();
+ private final Map serverMonitorListeners = new HashMap<>();
private final Map> cursors = new HashMap<>();
private final Map topologyDescriptions = new HashMap<>();
private final Map successCounts = new HashMap<>();
@@ -284,6 +287,10 @@ public TestClusterListener getClusterListener(final String id) {
return getEntity(id + "-cluster-listener", clientClusterListeners, "cluster listener");
}
+ public TestServerMonitorListener getServerMonitorListener(final String id) {
+ return getEntity(id + "-server-monitor-listener", serverMonitorListeners, "server monitor listener");
+ }
+
private T getEntity(final String id, final Map entities, final String type) {
T entity = entities.get(id);
if (entity == null) {
@@ -376,23 +383,25 @@ private void initClient(final BsonDocument entity, final String id,
putEntity(id + "-cluster-listener", testClusterListener, clientClusterListeners);
if (entity.containsKey("observeEvents")) {
+ List observeEvents = entity.getArray("observeEvents").stream()
+ .map(type -> type.asString().getValue()).collect(Collectors.toList());
List ignoreCommandMonitoringEvents = entity
.getArray("ignoreCommandMonitoringEvents", new BsonArray()).stream()
.map(type -> type.asString().getValue()).collect(Collectors.toList());
ignoreCommandMonitoringEvents.add("configureFailPoint");
- TestCommandListener testCommandListener = new TestCommandListener(
- entity.getArray("observeEvents").stream()
- .map(type -> type.asString().getValue()).collect(Collectors.toList()),
+ TestCommandListener testCommandListener = new TestCommandListener(observeEvents,
ignoreCommandMonitoringEvents, entity.getBoolean("observeSensitiveCommands", BsonBoolean.FALSE).getValue());
clientSettingsBuilder.addCommandListener(testCommandListener);
putEntity(id + "-command-listener", testCommandListener, clientCommandListeners);
- TestConnectionPoolListener testConnectionPoolListener = new TestConnectionPoolListener(
- entity.getArray("observeEvents").stream()
- .map(type -> type.asString().getValue()).collect(Collectors.toList()));
+ TestConnectionPoolListener testConnectionPoolListener = new TestConnectionPoolListener(observeEvents);
clientSettingsBuilder.applyToConnectionPoolSettings(builder ->
builder.addConnectionPoolListener(testConnectionPoolListener));
putEntity(id + "-connection-pool-listener", testConnectionPoolListener, clientConnectionPoolListeners);
+
+ TestServerMonitorListener testServerMonitorListener = new TestServerMonitorListener(observeEvents);
+ clientSettingsBuilder.applyToServerSettings(builder -> builder.addServerMonitorListener(testServerMonitorListener));
+ putEntity(id + "-server-monitor-listener", testServerMonitorListener, serverMonitorListeners);
} else {
// Regardless of whether events are observed, we still need to track some info about the pool in order to implement
// the assertNumberConnectionsCheckedOut operation
@@ -498,6 +507,10 @@ private void initClient(final BsonDocument entity, final String id,
case "appName":
clientSettingsBuilder.applicationName(value.asString().getValue());
break;
+ case "serverMonitoringMode":
+ clientSettingsBuilder.applyToServerSettings(builder -> builder.serverMonitoringMode(
+ ServerMonitoringMode.fromString(value.asString().getValue())));
+ break;
default:
throw new UnsupportedOperationException("Unsupported uri option: " + key);
}
diff --git a/driver-sync/src/test/functional/com/mongodb/client/unified/EventMatcher.java b/driver-sync/src/test/functional/com/mongodb/client/unified/EventMatcher.java
index e831082597..41ada275a6 100644
--- a/driver-sync/src/test/functional/com/mongodb/client/unified/EventMatcher.java
+++ b/driver-sync/src/test/functional/com/mongodb/client/unified/EventMatcher.java
@@ -16,6 +16,7 @@
package com.mongodb.client.unified;
+import com.mongodb.assertions.Assertions;
import com.mongodb.connection.ServerType;
import com.mongodb.event.ClusterDescriptionChangedEvent;
import com.mongodb.event.CommandEvent;
@@ -29,10 +30,15 @@
import com.mongodb.event.ConnectionPoolReadyEvent;
import com.mongodb.event.ConnectionReadyEvent;
import com.mongodb.event.ServerDescriptionChangedEvent;
+import com.mongodb.event.ServerHeartbeatFailedEvent;
+import com.mongodb.event.ServerHeartbeatStartedEvent;
+import com.mongodb.event.ServerHeartbeatSucceededEvent;
+import com.mongodb.event.TestServerMonitorListener;
import com.mongodb.internal.connection.TestClusterListener;
import com.mongodb.internal.connection.TestConnectionPoolListener;
import com.mongodb.internal.connection.TestServerListener;
import com.mongodb.lang.NonNull;
+import com.mongodb.lang.Nullable;
import org.bson.BsonArray;
import org.bson.BsonDocument;
import org.bson.types.ObjectId;
@@ -284,9 +290,61 @@ public void assertClusterDescriptionChangeEventCount(final String client, final
context.pop();
}
+ public void waitForServerMonitorEvents(final String client, final Class expectedEventType, final BsonDocument expectedEvent,
+ final int count, final TestServerMonitorListener serverMonitorListener) {
+ context.push(ContextElement.ofWaitForServerMonitorEvents(client, expectedEvent, count));
+ BsonDocument expectedEventContents = getEventContents(expectedEvent);
+ try {
+ serverMonitorListener.waitForEvents(expectedEventType,
+ event -> serverMonitorEventMatches(expectedEventContents, event, null), count, Duration.ofSeconds(10));
+ context.pop();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ } catch (TimeoutException e) {
+ fail(context.getMessage("Timed out waiting for server monitor events"));
+ }
+ }
+
+ public void assertServerMonitorEventCount(final String client, final Class expectedEventType, final BsonDocument expectedEvent,
+ final int count, final TestServerMonitorListener serverMonitorListener) {
+ BsonDocument expectedEventContents = getEventContents(expectedEvent);
+ context.push(ContextElement.ofServerMonitorEventCount(client, expectedEvent, count));
+ long matchCount = serverMonitorListener.countEvents(expectedEventType, event ->
+ serverMonitorEventMatches(expectedEventContents, event, null));
+ assertEquals(context.getMessage("Expected server monitor event counts to match"), count, matchCount);
+ context.pop();
+ }
+
+ public void assertServerMonitorEventsEquality(
+ final String client,
+ final boolean ignoreExtraEvents,
+ final BsonArray expectedEventDocuments,
+ final List> events) {
+ context.push(ContextElement.ofServerMonitorEvents(client, expectedEventDocuments, events));
+ if (ignoreExtraEvents) {
+ assertTrue(context.getMessage("Number of events must be greater than or equal to the expected number of events"),
+ events.size() >= expectedEventDocuments.size());
+ } else {
+ assertEquals(context.getMessage("Number of events must be the same"), expectedEventDocuments.size(), events.size());
+ }
+ for (int i = 0; i < expectedEventDocuments.size(); i++) {
+ Object actualEvent = events.get(i);
+ BsonDocument expectedEventDocument = expectedEventDocuments.get(i).asDocument();
+ String expectedEventType = expectedEventDocument.getFirstKey();
+ context.push(ContextElement.ofServerMonitorEvent(expectedEventDocument, actualEvent, i));
+ assertEquals(context.getMessage("Expected event type to match"), expectedEventType, getEventType(actualEvent.getClass()));
+ BsonDocument expectedEventContents = expectedEventDocument.getDocument(expectedEventType);
+ serverMonitorEventMatches(expectedEventContents, actualEvent, context);
+ context.pop();
+ }
+ context.pop();
+ }
+
@NonNull
private BsonDocument getEventContents(final BsonDocument expectedEvent) {
- HashSet supportedEventTypes = new HashSet<>(asList("serverDescriptionChangedEvent", "topologyDescriptionChangedEvent"));
+ HashSet supportedEventTypes = new HashSet<>(asList(
+ "serverDescriptionChangedEvent", "topologyDescriptionChangedEvent",
+ "serverHeartbeatStartedEvent", "serverHeartbeatSucceededEvent", "serverHeartbeatFailedEvent"));
String expectedEventType = expectedEvent.getFirstKey();
if (!supportedEventTypes.contains(expectedEventType)) {
throw new UnsupportedOperationException("Unsupported event type " + expectedEventType);
@@ -333,12 +391,52 @@ private static boolean clusterDescriptionChangedEventMatches(final BsonDocument
return true;
}
- private static String getEventType(final Class> eventClass) {
+ /**
+ * @param context Not {@code null} iff mismatch must result in an error, that is, this method works as an assertion.
+ */
+ private static boolean serverMonitorEventMatches(
+ final BsonDocument expectedEventContents,
+ final T event,
+ @Nullable final AssertionContext context) {
+ if (expectedEventContents.size() > 1) {
+ throw new UnsupportedOperationException("Matching for the following event is not implemented " + expectedEventContents.toJson());
+ }
+ if (expectedEventContents.containsKey("awaited")) {
+ boolean expectedAwaited = expectedEventContents.getBoolean("awaited").getValue();
+ boolean actualAwaited = getAwaitedFromServerMonitorEvent(event);
+ boolean awaitedMatches = expectedAwaited == actualAwaited;
+ if (context != null) {
+ assertTrue(context.getMessage("Expected `awaited` to match"), awaitedMatches);
+ }
+ return awaitedMatches;
+ }
+ return true;
+ }
+
+ static boolean getAwaitedFromServerMonitorEvent(final Object event) {
+ if (event instanceof ServerHeartbeatStartedEvent) {
+ return ((ServerHeartbeatStartedEvent) event).isAwaited();
+ } else if (event instanceof ServerHeartbeatSucceededEvent) {
+ return ((ServerHeartbeatSucceededEvent) event).isAwaited();
+ } else if (event instanceof ServerHeartbeatFailedEvent) {
+ return ((ServerHeartbeatFailedEvent) event).isAwaited();
+ } else {
+ throw Assertions.fail(event.toString());
+ }
+ }
+
+ static String getEventType(final Class> eventClass) {
String eventClassName = eventClass.getSimpleName();
if (eventClassName.startsWith("ConnectionPool")) {
return eventClassName.replace("ConnectionPool", "pool");
- } else {
+ } else if (eventClassName.startsWith("Connection")) {
return eventClassName.replace("Connection", "connection");
+ } else if (eventClassName.startsWith("ServerHeartbeat")) {
+ StringBuilder eventTypeBuilder = new StringBuilder(eventClassName);
+ eventTypeBuilder.setCharAt(0, Character.toLowerCase(eventTypeBuilder.charAt(0)));
+ return eventTypeBuilder.toString();
+ } else {
+ throw new UnsupportedOperationException(eventClassName);
}
}
diff --git a/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTest.java b/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTest.java
index 8b76f426db..ea2edd8f03 100644
--- a/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTest.java
+++ b/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTest.java
@@ -21,6 +21,7 @@
import com.mongodb.MongoNamespace;
import com.mongodb.ReadPreference;
import com.mongodb.UnixServerAddress;
+import com.mongodb.event.TestServerMonitorListener;
import com.mongodb.internal.logging.LogMessage;
import com.mongodb.logging.TestLoggingInterceptor;
import com.mongodb.WriteConcern;
@@ -199,7 +200,9 @@ public void setUp() {
|| schemaVersion.equals("1.12")
|| schemaVersion.equals("1.13")
|| schemaVersion.equals("1.14")
- || schemaVersion.equals("1.15"));
+ || schemaVersion.equals("1.15")
+ || schemaVersion.equals("1.16")
+ || schemaVersion.equals("1.17"));
if (runOnRequirements != null) {
assumeTrue("Run-on requirements not met",
runOnRequirementsMet(runOnRequirements, getMongoClientSettings(), getServerVersion()));
@@ -262,14 +265,18 @@ private void compareEvents(final UnifiedTestContext context, final BsonDocument
String client = curClientEvents.getString("client").getValue();
boolean ignoreExtraEvents = curClientEvents.getBoolean("ignoreExtraEvents", BsonBoolean.FALSE).getValue();
String eventType = curClientEvents.getString("eventType", new BsonString("command")).getValue();
+ BsonArray expectedEvents = curClientEvents.getArray("events");
if (eventType.equals("command")) {
TestCommandListener listener = entities.getClientCommandListener(client);
- context.getEventMatcher().assertCommandEventsEquality(client, ignoreExtraEvents, curClientEvents.getArray("events"),
+ context.getEventMatcher().assertCommandEventsEquality(client, ignoreExtraEvents, expectedEvents,
listener.getEvents());
} else if (eventType.equals("cmap")) {
TestConnectionPoolListener listener = entities.getConnectionPoolListener(client);
- context.getEventMatcher().assertConnectionPoolEventsEquality(client, ignoreExtraEvents, curClientEvents.getArray("events"),
+ context.getEventMatcher().assertConnectionPoolEventsEquality(client, ignoreExtraEvents, expectedEvents,
listener.getEvents());
+ } else if (eventType.equals("sdam")) {
+ TestServerMonitorListener listener = entities.getServerMonitorListener(client);
+ context.getEventMatcher().assertServerMonitorEventsEquality(client, ignoreExtraEvents, expectedEvents, listener.getEvents());
} else {
throw new UnsupportedOperationException("Unexpected event type: " + eventType);
}
@@ -599,6 +606,12 @@ private OperationResult executeWaitForEvent(final UnifiedTestContext context, fi
case "connectionReadyEvent":
context.getEventMatcher().waitForConnectionPoolEvents(clientId, event, count, entities.getConnectionPoolListener(clientId));
break;
+ case "serverHeartbeatStartedEvent":
+ case "serverHeartbeatSucceededEvent":
+ case "serverHeartbeatFailedEvent":
+ context.getEventMatcher().waitForServerMonitorEvents(clientId, TestServerMonitorListener.eventType(eventName), event, count,
+ entities.getServerMonitorListener(clientId));
+ break;
default:
throw new UnsupportedOperationException("Unsupported event: " + eventName);
}
@@ -627,6 +640,12 @@ private OperationResult executeAssertEventCount(final UnifiedTestContext context
context.getEventMatcher().assertConnectionPoolEventCount(clientId, event, count,
entities.getConnectionPoolListener(clientId).getEvents());
break;
+ case "serverHeartbeatStartedEvent":
+ case "serverHeartbeatSucceededEvent":
+ case "serverHeartbeatFailedEvent":
+ context.getEventMatcher().assertServerMonitorEventCount(clientId, TestServerMonitorListener.eventType(eventName), event, count,
+ entities.getServerMonitorListener(clientId));
+ break;
default:
throw new UnsupportedOperationException("Unsupported event: " + eventName);
}
From 63562c6fabdf9024da0ecb5040255b47a9eadc2d Mon Sep 17 00:00:00 2001
From: Valentin Kovalenko
Date: Fri, 16 Feb 2024 15:57:23 -0700
Subject: [PATCH 03/13] Update the FaaS prose test
This change is with accordance to source/faas-automated-testing/faas-automated-testing.rst
JAVA-4936
---
driver-lambda/build.gradle | 2 ++
.../com/mongodb/lambdatest/LambdaTestApp.java | 21 +++++++++++++++++++
2 files changed, 23 insertions(+)
diff --git a/driver-lambda/build.gradle b/driver-lambda/build.gradle
index 76e293c9c1..d7b9928e8f 100644
--- a/driver-lambda/build.gradle
+++ b/driver-lambda/build.gradle
@@ -55,6 +55,8 @@ dependencies {
implementation('com.amazonaws:aws-lambda-java-core:1.2.2')
implementation('com.amazonaws:aws-lambda-java-events:3.11.1')
+ implementation(platform("org.junit:junit-bom:$junitBomVersion"))
+ implementation('org.junit.jupiter:junit-jupiter-api')
}
diff --git a/driver-lambda/src/main/com/mongodb/lambdatest/LambdaTestApp.java b/driver-lambda/src/main/com/mongodb/lambdatest/LambdaTestApp.java
index c9a0d2ca99..c264337516 100644
--- a/driver-lambda/src/main/com/mongodb/lambdatest/LambdaTestApp.java
+++ b/driver-lambda/src/main/com/mongodb/lambdatest/LambdaTestApp.java
@@ -32,6 +32,7 @@
import com.mongodb.event.ConnectionCreatedEvent;
import com.mongodb.event.ConnectionPoolListener;
import com.mongodb.event.ServerHeartbeatFailedEvent;
+import com.mongodb.event.ServerHeartbeatStartedEvent;
import com.mongodb.event.ServerHeartbeatSucceededEvent;
import com.mongodb.event.ServerMonitorListener;
import com.mongodb.lang.NonNull;
@@ -45,8 +46,11 @@
import java.io.StringWriter;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.CopyOnWriteArrayList;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* Test App for AWS lambda functions
@@ -58,6 +62,7 @@ public class LambdaTestApp implements RequestHandler failedAssertions = new CopyOnWriteArrayList<>();
public LambdaTestApp() {
String connectionString = System.getenv("MONGODB_URI");
@@ -77,13 +82,20 @@ public void commandFailed(@NonNull final CommandFailedEvent event) {
}
})
.applyToServerSettings(builder -> builder.addServerMonitorListener(new ServerMonitorListener() {
+ @Override
+ public void serverHearbeatStarted(@NonNull final ServerHeartbeatStartedEvent event) {
+ checkAssertion(() -> assertFalse(event.isAwaited(), event::toString));
+ }
+
@Override
public void serverHeartbeatSucceeded(@NonNull final ServerHeartbeatSucceededEvent event) {
+ checkAssertion(() -> assertFalse(event.isAwaited(), event::toString));
totalHeartbeatCount++;
totalHeartbeatDurationMs += event.getElapsedTime(MILLISECONDS);
}
@Override
public void serverHeartbeatFailed(@NonNull final ServerHeartbeatFailedEvent event) {
+ checkAssertion(() -> assertFalse(event.isAwaited(), event::toString));
totalHeartbeatCount++;
totalHeartbeatDurationMs += event.getElapsedTime(MILLISECONDS);
}
@@ -110,6 +122,7 @@ public APIGatewayProxyResponseEvent handleRequest(final APIGatewayProxyRequestEv
BsonValue id = collection.insertOne(new Document("n", 1)).getInsertedId();
collection.deleteOne(new Document("_id", id));
+ assertTrue(failedAssertions.isEmpty(), failedAssertions.toString());
BsonDocument responseBody = getBsonDocument();
return templateResponse()
@@ -151,4 +164,12 @@ private APIGatewayProxyResponseEvent templateResponse() {
return new APIGatewayProxyResponseEvent()
.withHeaders(headers);
}
+
+ private void checkAssertion(final Runnable assertion) {
+ try {
+ assertion.run();
+ } catch (Throwable t) {
+ failedAssertions.add(t);
+ }
+ }
}
From 1e74b01875e1a0ffeb0fbe5aab473efbb28810f1 Mon Sep 17 00:00:00 2001
From: Valentin Kovalenko
Date: Sat, 17 Feb 2024 15:44:04 -0700
Subject: [PATCH 04/13] Support the `serverMonitoringMode` connection string
option
This change is in accordance with source/uri-options/uri-options.rst.
JAVA-4936
---
.../main/com/mongodb/ConnectionString.java | 33 +++++-
.../connection/ServerMonitoringMode.java | 74 ++++++++++++
.../mongodb/connection/ServerSettings.java | 72 ++++++++----
.../resources/uri-options/sdam-options.json | 46 ++++++++
.../mongodb/AbstractConnectionStringTest.java | 3 +
.../ConnectionStringSpecification.groovy | 3 +
.../com/mongodb/ConnectionStringUnitTest.java | 62 ++++++++++
.../connection/ServerMonitoringModeTest.java | 43 +++++++
.../ServerSettingsSpecification.groovy | 3 +
.../connection/ServerSettingsTest.java | 106 ++++++++++++++++++
10 files changed, 420 insertions(+), 25 deletions(-)
create mode 100644 driver-core/src/main/com/mongodb/connection/ServerMonitoringMode.java
create mode 100644 driver-core/src/test/resources/uri-options/sdam-options.json
create mode 100644 driver-core/src/test/unit/com/mongodb/ConnectionStringUnitTest.java
create mode 100644 driver-core/src/test/unit/com/mongodb/connection/ServerMonitoringModeTest.java
create mode 100644 driver-core/src/test/unit/com/mongodb/connection/ServerSettingsTest.java
diff --git a/driver-core/src/main/com/mongodb/ConnectionString.java b/driver-core/src/main/com/mongodb/ConnectionString.java
index 5e6a5b7d81..d9fa22fec4 100644
--- a/driver-core/src/main/com/mongodb/ConnectionString.java
+++ b/driver-core/src/main/com/mongodb/ConnectionString.java
@@ -18,6 +18,8 @@
import com.mongodb.connection.ClusterSettings;
import com.mongodb.connection.ConnectionPoolSettings;
+import com.mongodb.connection.ServerMonitoringMode;
+import com.mongodb.connection.ServerSettings;
import com.mongodb.connection.SocketSettings;
import com.mongodb.event.ConnectionCheckOutStartedEvent;
import com.mongodb.event.ConnectionCheckedInEvent;
@@ -111,6 +113,13 @@
*
* {@code heartbeatFrequencyMS=ms}: The frequency that the driver will attempt to determine the current state of each server in the
* cluster.
+ * {@code serverMonitoringMode=enum}: The server monitoring mode, which defines the monitoring protocol to use. Enumerated values:
+ *
+ * {@code stream};
+ * {@code poll};
+ * {@code auto} - the default.
+ *
+ *
*
* Replica set configuration:
*
@@ -307,6 +316,7 @@ public class ConnectionString {
private Integer serverSelectionTimeout;
private Integer localThreshold;
private Integer heartbeatFrequency;
+ private ServerMonitoringMode serverMonitoringMode;
private String applicationName;
private List compressorList;
private UuidRepresentation uuidRepresentation;
@@ -529,6 +539,7 @@ public ConnectionString(final String connectionString, @Nullable final DnsClient
GENERAL_OPTIONS_KEYS.add("serverselectiontimeoutms");
GENERAL_OPTIONS_KEYS.add("localthresholdms");
GENERAL_OPTIONS_KEYS.add("heartbeatfrequencyms");
+ GENERAL_OPTIONS_KEYS.add("servermonitoringmode");
GENERAL_OPTIONS_KEYS.add("retrywrites");
GENERAL_OPTIONS_KEYS.add("retryreads");
@@ -665,6 +676,9 @@ private void translateOptions(final Map> optionsMap) {
case "heartbeatfrequencyms":
heartbeatFrequency = parseInteger(value, "heartbeatfrequencyms");
break;
+ case "servermonitoringmode":
+ serverMonitoringMode = ServerMonitoringMode.fromString(value);
+ break;
case "appname":
applicationName = value;
break;
@@ -1623,6 +1637,20 @@ public Integer getHeartbeatFrequency() {
return heartbeatFrequency;
}
+ /**
+ * The server monitoring mode, which defines the monitoring protocol to use.
+ *
+ * Default is {@link ServerMonitoringMode#AUTO}.
+ *
+ * @return The {@link ServerMonitoringMode}, or {@code null} if unset and the default is to be used.
+ * @see ServerSettings#getServerMonitoringMode()
+ * @since 5.1
+ */
+ @Nullable
+ public ServerMonitoringMode getServerMonitoringMode() {
+ return serverMonitoringMode;
+ }
+
/**
* Gets the logical name of the application. The application name may be used by the client to identify the application to the server,
* for use in server logs, slow query logs, and profile collection.
@@ -1704,6 +1732,7 @@ public boolean equals(final Object o) {
&& Objects.equals(serverSelectionTimeout, that.serverSelectionTimeout)
&& Objects.equals(localThreshold, that.localThreshold)
&& Objects.equals(heartbeatFrequency, that.heartbeatFrequency)
+ && Objects.equals(serverMonitoringMode, that.serverMonitoringMode)
&& Objects.equals(applicationName, that.applicationName)
&& Objects.equals(compressorList, that.compressorList)
&& Objects.equals(uuidRepresentation, that.uuidRepresentation)
@@ -1717,7 +1746,7 @@ public int hashCode() {
writeConcern, retryWrites, retryReads, readConcern, minConnectionPoolSize, maxConnectionPoolSize, maxWaitTime,
maxConnectionIdleTime, maxConnectionLifeTime, maxConnecting, connectTimeout, socketTimeout, sslEnabled,
sslInvalidHostnameAllowed, requiredReplicaSetName, serverSelectionTimeout, localThreshold, heartbeatFrequency,
- applicationName, compressorList, uuidRepresentation, srvServiceName, srvMaxHosts, proxyHost, proxyPort,
- proxyUsername, proxyPassword);
+ serverMonitoringMode, applicationName, compressorList, uuidRepresentation, srvServiceName, srvMaxHosts, proxyHost,
+ proxyPort, proxyUsername, proxyPassword);
}
}
diff --git a/driver-core/src/main/com/mongodb/connection/ServerMonitoringMode.java b/driver-core/src/main/com/mongodb/connection/ServerMonitoringMode.java
new file mode 100644
index 0000000000..a88c4c3afb
--- /dev/null
+++ b/driver-core/src/main/com/mongodb/connection/ServerMonitoringMode.java
@@ -0,0 +1,74 @@
+/*
+ * Copyright 2008-present MongoDB, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.mongodb.connection;
+
+import static com.mongodb.assertions.Assertions.notNull;
+import static java.lang.String.format;
+
+/**
+ * The server monitoring mode, which defines the monitoring protocol to use.
+ *
+ * @since 5.1
+ */
+public enum ServerMonitoringMode {
+ /**
+ * Use the streaming protocol whe the server supports it or fall back to the polling protocol otherwise.
+ */
+ STREAM("stream"),
+ /**
+ * Use the polling protocol.
+ */
+ POLL("poll"),
+ /**
+ * Behave the same as {@link #POLL} if running in a FaaS environment, otherwise behave as {@link #STREAM}.
+ * This is the default.
+ */
+ AUTO("auto");
+
+ private final String value;
+
+ ServerMonitoringMode(final String value) {
+ this.value = value;
+ }
+
+ /**
+ * Parses a string into {@link ServerMonitoringMode}.
+ *
+ * @param serverMonitoringMode A server monitoring mode string.
+ * @return The corresponding {@link ServerMonitoringMode} value.
+ * @see #getValue()
+ */
+ public static ServerMonitoringMode fromString(final String serverMonitoringMode) {
+ notNull("serverMonitoringMode", serverMonitoringMode);
+ for (ServerMonitoringMode mode : ServerMonitoringMode.values()) {
+ if (serverMonitoringMode.equalsIgnoreCase(mode.value)) {
+ return mode;
+ }
+ }
+ throw new IllegalArgumentException(format("'%s' is not a valid %s",
+ serverMonitoringMode, ServerMonitoringMode.class.getSimpleName()));
+ }
+
+ /**
+ * The string value.
+ *
+ * @return The string value.
+ * @see #fromString(String)
+ */
+ public String getValue() {
+ return value;
+ }
+}
diff --git a/driver-core/src/main/com/mongodb/connection/ServerSettings.java b/driver-core/src/main/com/mongodb/connection/ServerSettings.java
index d4ef398a04..b4394d0dc7 100644
--- a/driver-core/src/main/com/mongodb/connection/ServerSettings.java
+++ b/driver-core/src/main/com/mongodb/connection/ServerSettings.java
@@ -24,6 +24,7 @@
import java.util.ArrayList;
import java.util.List;
+import java.util.Objects;
import java.util.concurrent.TimeUnit;
import static com.mongodb.assertions.Assertions.notNull;
@@ -38,6 +39,7 @@
public class ServerSettings {
private final long heartbeatFrequencyMS;
private final long minHeartbeatFrequencyMS;
+ private final ServerMonitoringMode serverMonitoringMode;
private final List serverListeners;
private final List serverMonitorListeners;
@@ -68,6 +70,7 @@ public static Builder builder(final ServerSettings serverSettings) {
public static final class Builder {
private long heartbeatFrequencyMS = 10000;
private long minHeartbeatFrequencyMS = 500;
+ private ServerMonitoringMode serverMonitoringMode = ServerMonitoringMode.AUTO;
private List serverListeners = new ArrayList<>();
private List serverMonitorListeners = new ArrayList<>();
@@ -87,6 +90,7 @@ public Builder applySettings(final ServerSettings serverSettings) {
notNull("serverSettings", serverSettings);
heartbeatFrequencyMS = serverSettings.heartbeatFrequencyMS;
minHeartbeatFrequencyMS = serverSettings.minHeartbeatFrequencyMS;
+ serverMonitoringMode = serverSettings.serverMonitoringMode;
serverListeners = new ArrayList<>(serverSettings.serverListeners);
serverMonitorListeners = new ArrayList<>(serverSettings.serverMonitorListeners);
return this;
@@ -117,6 +121,20 @@ public Builder minHeartbeatFrequency(final long minHeartbeatFrequency, final Tim
return this;
}
+ /**
+ * Sets the server monitoring mode, which defines the monitoring protocol to use.
+ * The default value is {@link ServerMonitoringMode#AUTO}.
+ *
+ * @param serverMonitoringMode The {@link ServerMonitoringMode}.
+ * @return {@code this}.
+ * @see #getServerMonitoringMode()
+ * @since 5.1
+ */
+ public Builder serverMonitoringMode(final ServerMonitoringMode serverMonitoringMode) {
+ this.serverMonitoringMode = notNull("serverMonitoringMode", serverMonitoringMode);
+ return this;
+ }
+
/**
* Add a server listener.
*
@@ -181,6 +199,10 @@ public Builder applyConnectionString(final ConnectionString connectionString) {
if (heartbeatFrequency != null) {
heartbeatFrequencyMS = heartbeatFrequency;
}
+ ServerMonitoringMode serverMonitoringMode = connectionString.getServerMonitoringMode();
+ if (serverMonitoringMode != null) {
+ this.serverMonitoringMode = serverMonitoringMode;
+ }
return this;
}
@@ -215,6 +237,19 @@ public long getMinHeartbeatFrequency(final TimeUnit timeUnit) {
return timeUnit.convert(minHeartbeatFrequencyMS, TimeUnit.MILLISECONDS);
}
+ /**
+ * Gets the server monitoring mode, which defines the monitoring protocol to use.
+ * The default value is {@link ServerMonitoringMode#AUTO}.
+ *
+ * @return The {@link ServerMonitoringMode}.
+ * @see Builder#serverMonitoringMode(ServerMonitoringMode)
+ * @see ConnectionString#getServerMonitoringMode()
+ * @since 5.1
+ */
+ public ServerMonitoringMode getServerMonitoringMode() {
+ return serverMonitoringMode;
+ }
+
/**
* Gets the server listeners. The default value is an empty list.
*
@@ -243,33 +278,22 @@ public boolean equals(final Object o) {
if (o == null || getClass() != o.getClass()) {
return false;
}
-
- ServerSettings that = (ServerSettings) o;
-
- if (heartbeatFrequencyMS != that.heartbeatFrequencyMS) {
- return false;
- }
- if (minHeartbeatFrequencyMS != that.minHeartbeatFrequencyMS) {
- return false;
- }
-
- if (!serverListeners.equals(that.serverListeners)) {
- return false;
- }
- if (!serverMonitorListeners.equals(that.serverMonitorListeners)) {
- return false;
- }
-
- return true;
+ final ServerSettings that = (ServerSettings) o;
+ return heartbeatFrequencyMS == that.heartbeatFrequencyMS
+ && minHeartbeatFrequencyMS == that.minHeartbeatFrequencyMS
+ && serverMonitoringMode == that.serverMonitoringMode
+ && Objects.equals(serverListeners, that.serverListeners)
+ && Objects.equals(serverMonitorListeners, that.serverMonitorListeners);
}
@Override
public int hashCode() {
- int result = (int) (heartbeatFrequencyMS ^ (heartbeatFrequencyMS >>> 32));
- result = 31 * result + (int) (minHeartbeatFrequencyMS ^ (minHeartbeatFrequencyMS >>> 32));
- result = 31 * result + serverListeners.hashCode();
- result = 31 * result + serverMonitorListeners.hashCode();
- return result;
+ return Objects.hash(
+ heartbeatFrequencyMS,
+ minHeartbeatFrequencyMS,
+ serverMonitoringMode,
+ serverListeners,
+ serverMonitorListeners);
}
@Override
@@ -277,6 +301,7 @@ public String toString() {
return "ServerSettings{"
+ "heartbeatFrequencyMS=" + heartbeatFrequencyMS
+ ", minHeartbeatFrequencyMS=" + minHeartbeatFrequencyMS
+ + ", serverMonitoringMode=" + serverMonitoringMode
+ ", serverListeners='" + serverListeners + '\''
+ ", serverMonitorListeners='" + serverMonitorListeners + '\''
+ '}';
@@ -285,6 +310,7 @@ public String toString() {
ServerSettings(final Builder builder) {
heartbeatFrequencyMS = builder.heartbeatFrequencyMS;
minHeartbeatFrequencyMS = builder.minHeartbeatFrequencyMS;
+ serverMonitoringMode = builder.serverMonitoringMode;
serverListeners = unmodifiableList(builder.serverListeners);
serverMonitorListeners = unmodifiableList(builder.serverMonitorListeners);
}
diff --git a/driver-core/src/test/resources/uri-options/sdam-options.json b/driver-core/src/test/resources/uri-options/sdam-options.json
new file mode 100644
index 0000000000..673f5607ee
--- /dev/null
+++ b/driver-core/src/test/resources/uri-options/sdam-options.json
@@ -0,0 +1,46 @@
+{
+ "tests": [
+ {
+ "description": "serverMonitoringMode=auto",
+ "uri": "mongodb://example.com/?serverMonitoringMode=auto",
+ "valid": true,
+ "warning": false,
+ "hosts": null,
+ "auth": null,
+ "options": {
+ "serverMonitoringMode": "auto"
+ }
+ },
+ {
+ "description": "serverMonitoringMode=stream",
+ "uri": "mongodb://example.com/?serverMonitoringMode=stream",
+ "valid": true,
+ "warning": false,
+ "hosts": null,
+ "auth": null,
+ "options": {
+ "serverMonitoringMode": "stream"
+ }
+ },
+ {
+ "description": "serverMonitoringMode=poll",
+ "uri": "mongodb://example.com/?serverMonitoringMode=poll",
+ "valid": true,
+ "warning": false,
+ "hosts": null,
+ "auth": null,
+ "options": {
+ "serverMonitoringMode": "poll"
+ }
+ },
+ {
+ "description": "invalid serverMonitoringMode",
+ "uri": "mongodb://example.com/?serverMonitoringMode=invalid",
+ "valid": true,
+ "warning": true,
+ "hosts": null,
+ "auth": null,
+ "options": {}
+ }
+ ]
+}
diff --git a/driver-core/src/test/unit/com/mongodb/AbstractConnectionStringTest.java b/driver-core/src/test/unit/com/mongodb/AbstractConnectionStringTest.java
index c5b90aa95f..9e07b9758a 100644
--- a/driver-core/src/test/unit/com/mongodb/AbstractConnectionStringTest.java
+++ b/driver-core/src/test/unit/com/mongodb/AbstractConnectionStringTest.java
@@ -139,6 +139,9 @@ protected void testValidOptions() {
} else if (option.getKey().equalsIgnoreCase("heartbeatfrequencyms")) {
int expected = option.getValue().asInt32().getValue();
assertEquals(expected, connectionString.getHeartbeatFrequency().intValue());
+ } else if (option.getKey().equalsIgnoreCase("servermonitoringmode")) {
+ String expected = option.getValue().asString().getValue();
+ assertEquals(expected, connectionString.getServerMonitoringMode().getValue());
} else if (option.getKey().equalsIgnoreCase("localthresholdms")) {
int expected = option.getValue().asInt32().getValue();
assertEquals(expected, connectionString.getLocalThreshold().intValue());
diff --git a/driver-core/src/test/unit/com/mongodb/ConnectionStringSpecification.groovy b/driver-core/src/test/unit/com/mongodb/ConnectionStringSpecification.groovy
index e0245e8009..e8731439a8 100644
--- a/driver-core/src/test/unit/com/mongodb/ConnectionStringSpecification.groovy
+++ b/driver-core/src/test/unit/com/mongodb/ConnectionStringSpecification.groovy
@@ -35,6 +35,9 @@ import static com.mongodb.ReadPreference.secondaryPreferred
import static java.util.Arrays.asList
import static java.util.concurrent.TimeUnit.MILLISECONDS
+/**
+ * Update {@link ConnectionStringUnitTest} instead.
+ */
class ConnectionStringSpecification extends Specification {
static final LONG_STRING = new String((1..256).collect { (byte) 1 } as byte[])
diff --git a/driver-core/src/test/unit/com/mongodb/ConnectionStringUnitTest.java b/driver-core/src/test/unit/com/mongodb/ConnectionStringUnitTest.java
new file mode 100644
index 0000000000..d2e41ebeaf
--- /dev/null
+++ b/driver-core/src/test/unit/com/mongodb/ConnectionStringUnitTest.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2008-present MongoDB, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.mongodb;
+
+import com.mongodb.connection.ServerMonitoringMode;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import static org.junit.jupiter.api.Assertions.assertAll;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+final class ConnectionStringUnitTest {
+ private static final String DEFAULT_OPTIONS = "mongodb://localhost/?";
+ @Test
+ void defaults() {
+ ConnectionString connectionStringDefault = new ConnectionString(DEFAULT_OPTIONS);
+ assertAll(() -> assertNull(connectionStringDefault.getServerMonitoringMode()));
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = {DEFAULT_OPTIONS + "serverMonitoringMode=stream"})
+ void equalAndHashCode(final String connectionString) {
+ ConnectionString default1 = new ConnectionString(DEFAULT_OPTIONS);
+ ConnectionString default2 = new ConnectionString(DEFAULT_OPTIONS);
+ ConnectionString actual1 = new ConnectionString(connectionString);
+ ConnectionString actual2 = new ConnectionString(connectionString);
+ assertAll(
+ () -> assertEquals(default1, default2),
+ () -> assertEquals(default1.hashCode(), default2.hashCode()),
+ () -> assertEquals(actual1, actual2),
+ () -> assertEquals(actual1.hashCode(), actual2.hashCode()),
+ () -> assertNotEquals(default1, actual1)
+ );
+ }
+
+ @Test
+ void serverMonitoringMode() {
+ assertAll(
+ () -> assertEquals(ServerMonitoringMode.POLL,
+ new ConnectionString(DEFAULT_OPTIONS + "serverMonitoringMode=poll").getServerMonitoringMode()),
+ () -> assertThrows(IllegalArgumentException.class,
+ () -> new ConnectionString(DEFAULT_OPTIONS + "serverMonitoringMode=invalid"))
+ );
+ }
+}
diff --git a/driver-core/src/test/unit/com/mongodb/connection/ServerMonitoringModeTest.java b/driver-core/src/test/unit/com/mongodb/connection/ServerMonitoringModeTest.java
new file mode 100644
index 0000000000..1aba44d476
--- /dev/null
+++ b/driver-core/src/test/unit/com/mongodb/connection/ServerMonitoringModeTest.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2008-present MongoDB, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.mongodb.connection;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertAll;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+final class ServerMonitoringModeTest {
+ @Test
+ public void fromString() {
+ assertAll(
+ () -> assertEquals(ServerMonitoringMode.STREAM, ServerMonitoringMode.fromString("stream")),
+ () -> assertEquals(ServerMonitoringMode.POLL, ServerMonitoringMode.fromString("poll")),
+ () -> assertEquals(ServerMonitoringMode.AUTO, ServerMonitoringMode.fromString("auto")),
+ () -> assertThrows(IllegalArgumentException.class, () -> ServerMonitoringMode.fromString("invalid"))
+ );
+ }
+
+ @Test
+ public void getValue() {
+ assertAll(
+ () -> assertEquals("stream", ServerMonitoringMode.STREAM.getValue()),
+ () -> assertEquals("poll", ServerMonitoringMode.POLL.getValue()),
+ () -> assertEquals("auto", ServerMonitoringMode.AUTO.getValue())
+ );
+ }
+}
diff --git a/driver-core/src/test/unit/com/mongodb/connection/ServerSettingsSpecification.groovy b/driver-core/src/test/unit/com/mongodb/connection/ServerSettingsSpecification.groovy
index b92d8630f1..b11ed3a65a 100644
--- a/driver-core/src/test/unit/com/mongodb/connection/ServerSettingsSpecification.groovy
+++ b/driver-core/src/test/unit/com/mongodb/connection/ServerSettingsSpecification.groovy
@@ -24,6 +24,9 @@ import spock.lang.Specification
import static java.util.concurrent.TimeUnit.MILLISECONDS
import static java.util.concurrent.TimeUnit.SECONDS
+/**
+ * Update {@link ServerSettingsTest} instead.
+ */
class ServerSettingsSpecification extends Specification {
def 'should have correct defaults'() {
when:
diff --git a/driver-core/src/test/unit/com/mongodb/connection/ServerSettingsTest.java b/driver-core/src/test/unit/com/mongodb/connection/ServerSettingsTest.java
new file mode 100644
index 0000000000..e8868813b0
--- /dev/null
+++ b/driver-core/src/test/unit/com/mongodb/connection/ServerSettingsTest.java
@@ -0,0 +1,106 @@
+/*
+ * Copyright 2008-present MongoDB, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.mongodb.connection;
+
+import com.mongodb.ConnectionString;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertAll;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+
+final class ServerSettingsTest {
+ private static final String DEFAULT_OPTIONS = "mongodb://localhost/?";
+
+ @Test
+ void defaults() {
+ ServerSettings defaultServerSettings = ServerSettings.builder().build();
+ assertAll(() -> assertEquals(ServerMonitoringMode.AUTO, defaultServerSettings.getServerMonitoringMode()));
+ }
+
+ @ParameterizedTest
+ @MethodSource("equalAndHashCodeArgs")
+ void equalAndHashCode(final ServerSettings.Builder serverSettingsBuilder) {
+ ServerSettings default1 = ServerSettings.builder().build();
+ ServerSettings default2 = ServerSettings.builder().build();
+ ServerSettings actual1 = serverSettingsBuilder.build();
+ ServerSettings actual2 = serverSettingsBuilder.build();
+ assertAll(
+ () -> assertEquals(default1, default2),
+ () -> assertEquals(default1.hashCode(), default2.hashCode()),
+ () -> assertEquals(actual1, actual2),
+ () -> assertEquals(actual1.hashCode(), actual2.hashCode()),
+ () -> assertNotEquals(default1, actual1)
+ );
+ }
+
+ private static Stream equalAndHashCodeArgs() {
+ return Stream.of(
+ Arguments.of(ServerSettings.builder().serverMonitoringMode(ServerMonitoringMode.POLL))
+ );
+ }
+
+ @Test
+ void serverMonitoringMode() {
+ assertAll(
+ () -> assertEquals(
+ ServerMonitoringMode.POLL,
+ ServerSettings.builder()
+ .serverMonitoringMode(ServerMonitoringMode.POLL)
+ .build()
+ .getServerMonitoringMode(),
+ "should set"),
+ () -> assertEquals(
+ ServerMonitoringMode.STREAM,
+ ServerSettings.builder()
+ .applySettings(ServerSettings.builder()
+ .serverMonitoringMode(ServerMonitoringMode.STREAM)
+ .build())
+ .build()
+ .getServerMonitoringMode(),
+ "should apply from settings"),
+ () -> assertEquals(
+ ServerMonitoringMode.AUTO,
+ ServerSettings.builder()
+ .serverMonitoringMode(ServerMonitoringMode.STREAM)
+ .applySettings(ServerSettings.builder()
+ .build())
+ .build()
+ .getServerMonitoringMode(),
+ "should apply unset from settings"),
+ () -> assertEquals(
+ ServerMonitoringMode.POLL,
+ ServerSettings.builder()
+ .applyConnectionString(new ConnectionString(DEFAULT_OPTIONS + "serverMonitoringMode=POLL"))
+ .build()
+ .getServerMonitoringMode(),
+ "should apply from connection string"),
+ () -> assertEquals(
+ ServerMonitoringMode.STREAM,
+ ServerSettings.builder()
+ .serverMonitoringMode(ServerMonitoringMode.STREAM)
+ .applyConnectionString(new ConnectionString(DEFAULT_OPTIONS))
+ .build()
+ .getServerMonitoringMode(),
+ "should not apply unset from connection string")
+ );
+ }
+}
From 640fd79d7761cff7fa82954bbd4f6d082479912e Mon Sep 17 00:00:00 2001
From: Valentin Kovalenko
Date: Wed, 21 Feb 2024 13:46:04 -0700
Subject: [PATCH 05/13] Implement the `serverMonitoringMode` logic
This change is in accordance with source/server-discovery-and-monitoring/server-monitoring.rst.
JAVA-4936
---
.../connection/DefaultClusterFactory.java | 2 +-
.../DefaultClusterableServerFactory.java | 6 +-
.../connection/DefaultServerMonitor.java | 20 +-
.../ServerMonitorSpecification.groovy | 2 +-
.../connection/SingleServerClusterTest.java | 2 +-
.../serverMonitoringMode-java-specific.json | 190 ++++++++
.../serverMonitoringMode.json | 449 ++++++++++++++++++
.../DefaultServerMonitorSpecification.groovy | 6 +-
...ifiedServerDiscoveryAndMonitoringTest.java | 6 +
...ifiedServerDiscoveryAndMonitoringTest.java | 17 +
.../mongodb/client/unified/UnifiedTest.java | 4 +
11 files changed, 695 insertions(+), 9 deletions(-)
create mode 100644 driver-core/src/test/resources/unified-test-format/server-discovery-and-monitoring/serverMonitoringMode-java-specific.json
create mode 100644 driver-core/src/test/resources/unified-test-format/server-discovery-and-monitoring/serverMonitoringMode.json
diff --git a/driver-core/src/main/com/mongodb/internal/connection/DefaultClusterFactory.java b/driver-core/src/main/com/mongodb/internal/connection/DefaultClusterFactory.java
index 38fa28d3b3..0375373c23 100644
--- a/driver-core/src/main/com/mongodb/internal/connection/DefaultClusterFactory.java
+++ b/driver-core/src/main/com/mongodb/internal/connection/DefaultClusterFactory.java
@@ -110,7 +110,7 @@ public Cluster createCluster(final ClusterSettings originalClusterSettings, fina
connectionPoolSettings, internalConnectionPoolSettings,
streamFactory, heartbeatStreamFactory, credential, loggerSettings, commandListener, applicationName,
mongoDriverInformation != null ? mongoDriverInformation : MongoDriverInformation.builder().build(), compressorList,
- serverApi);
+ serverApi, FaasEnvironment.getFaasEnvironment() != FaasEnvironment.UNKNOWN);
if (clusterSettings.getMode() == ClusterConnectionMode.SINGLE) {
return new SingleServerCluster(clusterId, clusterSettings, serverFactory);
diff --git a/driver-core/src/main/com/mongodb/internal/connection/DefaultClusterableServerFactory.java b/driver-core/src/main/com/mongodb/internal/connection/DefaultClusterableServerFactory.java
index 5f5b1e97b1..5eeb6f9d1c 100644
--- a/driver-core/src/main/com/mongodb/internal/connection/DefaultClusterableServerFactory.java
+++ b/driver-core/src/main/com/mongodb/internal/connection/DefaultClusterableServerFactory.java
@@ -53,6 +53,7 @@ public class DefaultClusterableServerFactory implements ClusterableServerFactory
private final List compressorList;
@Nullable
private final ServerApi serverApi;
+ private final boolean faas;
public DefaultClusterableServerFactory(
final ServerSettings serverSettings, final ConnectionPoolSettings connectionPoolSettings,
@@ -62,7 +63,7 @@ public DefaultClusterableServerFactory(
final LoggerSettings loggerSettings,
@Nullable final CommandListener commandListener,
@Nullable final String applicationName, @Nullable final MongoDriverInformation mongoDriverInformation,
- final List compressorList, @Nullable final ServerApi serverApi) {
+ final List compressorList, @Nullable final ServerApi serverApi, final boolean faas) {
this.serverSettings = serverSettings;
this.connectionPoolSettings = connectionPoolSettings;
this.internalConnectionPoolSettings = internalConnectionPoolSettings;
@@ -75,6 +76,7 @@ public DefaultClusterableServerFactory(
this.mongoDriverInformation = mongoDriverInformation;
this.compressorList = compressorList;
this.serverApi = serverApi;
+ this.faas = faas;
}
@Override
@@ -86,7 +88,7 @@ public ClusterableServer create(final Cluster cluster, final ServerAddress serve
// no credentials, compressor list, or command listener for the server monitor factory
new InternalStreamConnectionFactory(clusterMode, true, heartbeatStreamFactory, null, applicationName,
mongoDriverInformation, emptyList(), loggerSettings, null, serverApi),
- clusterMode, serverApi, sdamProvider);
+ clusterMode, serverApi, faas, sdamProvider);
ConnectionPool connectionPool = new DefaultConnectionPool(serverId,
new InternalStreamConnectionFactory(clusterMode, streamFactory, credential, applicationName,
mongoDriverInformation, compressorList, loggerSettings, commandListener, serverApi),
diff --git a/driver-core/src/main/com/mongodb/internal/connection/DefaultServerMonitor.java b/driver-core/src/main/com/mongodb/internal/connection/DefaultServerMonitor.java
index 237a692dde..ab6a5c69cc 100644
--- a/driver-core/src/main/com/mongodb/internal/connection/DefaultServerMonitor.java
+++ b/driver-core/src/main/com/mongodb/internal/connection/DefaultServerMonitor.java
@@ -50,6 +50,7 @@
import static com.mongodb.MongoNamespace.COMMAND_COLLECTION_NAME;
import static com.mongodb.ReadPreference.primary;
import static com.mongodb.assertions.Assertions.assertNotNull;
+import static com.mongodb.assertions.Assertions.fail;
import static com.mongodb.assertions.Assertions.notNull;
import static com.mongodb.connection.ServerType.UNKNOWN;
import static com.mongodb.internal.Locks.checkedWithLock;
@@ -76,6 +77,7 @@ class DefaultServerMonitor implements ServerMonitor {
private final ClusterConnectionMode clusterConnectionMode;
@Nullable
private final ServerApi serverApi;
+ private final boolean faas;
private final ServerSettings serverSettings;
private final ServerMonitorRunnable monitor;
private final Thread monitorThread;
@@ -90,6 +92,7 @@ class DefaultServerMonitor implements ServerMonitor {
final InternalConnectionFactory internalConnectionFactory,
final ClusterConnectionMode clusterConnectionMode,
@Nullable final ServerApi serverApi,
+ final boolean faas,
final Provider sdamProvider) {
this.serverSettings = notNull("serverSettings", serverSettings);
this.serverId = notNull("serverId", serverId);
@@ -97,6 +100,7 @@ class DefaultServerMonitor implements ServerMonitor {
this.internalConnectionFactory = notNull("internalConnectionFactory", internalConnectionFactory);
this.clusterConnectionMode = notNull("clusterConnectionMode", clusterConnectionMode);
this.serverApi = serverApi;
+ this.faas = faas;
this.sdamProvider = sdamProvider;
monitor = new ServerMonitorRunnable();
monitorThread = new Thread(monitor, "cluster-" + this.serverId.getClusterId() + "-" + this.serverId.getAddress());
@@ -251,7 +255,21 @@ private ServerDescription lookupServerDescription(final ServerDescription curren
}
private boolean shouldStreamResponses(final ServerDescription currentServerDescription) {
- return currentServerDescription.getTopologyVersion() != null;
+ boolean serverSupportsStreaming = currentServerDescription.getTopologyVersion() != null;
+ switch (serverSettings.getServerMonitoringMode()) {
+ case STREAM: {
+ return serverSupportsStreaming;
+ }
+ case POLL: {
+ return false;
+ }
+ case AUTO: {
+ return !faas && serverSupportsStreaming;
+ }
+ default: {
+ throw fail();
+ }
+ }
}
private CommandMessage createCommandMessage(final BsonDocument command, final InternalConnection connection,
diff --git a/driver-core/src/test/functional/com/mongodb/internal/connection/ServerMonitorSpecification.groovy b/driver-core/src/test/functional/com/mongodb/internal/connection/ServerMonitorSpecification.groovy
index 8e69c609c8..0f2ba70d4c 100644
--- a/driver-core/src/test/functional/com/mongodb/internal/connection/ServerMonitorSpecification.groovy
+++ b/driver-core/src/test/functional/com/mongodb/internal/connection/ServerMonitorSpecification.groovy
@@ -224,7 +224,7 @@ class ServerMonitorSpecification extends OperationFunctionalSpecification {
SocketSettings.builder().connectTimeout(500, TimeUnit.MILLISECONDS).build(), getSslSettings()),
getCredentialWithCache(), null, null, [], LoggerSettings.builder().build(), null,
getServerApi()),
- getClusterConnectionMode(), getServerApi(), SameObjectProvider.initialized(sdam))
+ getClusterConnectionMode(), getServerApi(), false, SameObjectProvider.initialized(sdam))
serverMonitor.start()
serverMonitor
}
diff --git a/driver-core/src/test/functional/com/mongodb/internal/connection/SingleServerClusterTest.java b/driver-core/src/test/functional/com/mongodb/internal/connection/SingleServerClusterTest.java
index 55ba6875a1..e715bfb5cd 100644
--- a/driver-core/src/test/functional/com/mongodb/internal/connection/SingleServerClusterTest.java
+++ b/driver-core/src/test/functional/com/mongodb/internal/connection/SingleServerClusterTest.java
@@ -69,7 +69,7 @@ private void setUpCluster(final ServerAddress serverAddress) {
streamFactory, streamFactory, getCredential(),
LoggerSettings.builder().build(), null, null, null,
- Collections.emptyList(), getServerApi()));
+ Collections.emptyList(), getServerApi(), false));
}
@After
diff --git a/driver-core/src/test/resources/unified-test-format/server-discovery-and-monitoring/serverMonitoringMode-java-specific.json b/driver-core/src/test/resources/unified-test-format/server-discovery-and-monitoring/serverMonitoringMode-java-specific.json
new file mode 100644
index 0000000000..2d50d211c8
--- /dev/null
+++ b/driver-core/src/test/resources/unified-test-format/server-discovery-and-monitoring/serverMonitoringMode-java-specific.json
@@ -0,0 +1,190 @@
+{
+ "description": "serverMonitoringMode-Java-specific",
+ "schemaVersion": "1.17",
+ "runOnRequirements": [
+ {
+ "topologies": [
+ "single",
+ "sharded",
+ "sharded-replicaset"
+ ],
+ "serverless": "forbid"
+ }
+ ],
+ "tests": [
+ {
+ "description": "connect with serverMonitoringMode=auto >=4.4 Java-specific",
+ "runOnRequirements": [
+ {
+ "minServerVersion": "4.4.0"
+ }
+ ],
+ "operations": [
+ {
+ "name": "createEntities",
+ "object": "testRunner",
+ "arguments": {
+ "entities": [
+ {
+ "client": {
+ "id": "client",
+ "uriOptions": {
+ "serverMonitoringMode": "auto"
+ },
+ "useMultipleMongoses": false,
+ "observeEvents": [
+ "serverHeartbeatStartedEvent",
+ "serverHeartbeatSucceededEvent",
+ "serverHeartbeatFailedEvent"
+ ]
+ }
+ },
+ {
+ "database": {
+ "id": "db",
+ "client": "client",
+ "databaseName": "sdam-tests"
+ }
+ }
+ ]
+ }
+ },
+ {
+ "name": "runCommand",
+ "object": "db",
+ "arguments": {
+ "commandName": "ping",
+ "command": {
+ "ping": 1
+ }
+ },
+ "expectResult": {
+ "ok": 1
+ }
+ },
+ {
+ "name": "waitForEvent",
+ "object": "testRunner",
+ "arguments": {
+ "client": "client",
+ "event": {
+ "serverHeartbeatStartedEvent": {}
+ },
+ "count": 2
+ }
+ }
+ ],
+ "expectEvents": [
+ {
+ "client": "client",
+ "eventType": "sdam",
+ "ignoreExtraEvents": true,
+ "events": [
+ {
+ "serverHeartbeatStartedEvent": {
+ "awaited": true
+ }
+ },
+ {
+ "serverHeartbeatSucceededEvent": {
+ "awaited": true
+ }
+ },
+ {
+ "serverHeartbeatStartedEvent": {
+ "awaited": true
+ }
+ }
+ ]
+ }
+ ]
+ },
+ {
+ "description": "connect with serverMonitoringMode=stream >=4.4 Java-specific",
+ "runOnRequirements": [
+ {
+ "minServerVersion": "4.4.0"
+ }
+ ],
+ "operations": [
+ {
+ "name": "createEntities",
+ "object": "testRunner",
+ "arguments": {
+ "entities": [
+ {
+ "client": {
+ "id": "client",
+ "uriOptions": {
+ "serverMonitoringMode": "stream"
+ },
+ "useMultipleMongoses": false,
+ "observeEvents": [
+ "serverHeartbeatStartedEvent",
+ "serverHeartbeatSucceededEvent",
+ "serverHeartbeatFailedEvent"
+ ]
+ }
+ },
+ {
+ "database": {
+ "id": "db",
+ "client": "client",
+ "databaseName": "sdam-tests"
+ }
+ }
+ ]
+ }
+ },
+ {
+ "name": "runCommand",
+ "object": "db",
+ "arguments": {
+ "commandName": "ping",
+ "command": {
+ "ping": 1
+ }
+ },
+ "expectResult": {
+ "ok": 1
+ }
+ },
+ {
+ "name": "waitForEvent",
+ "object": "testRunner",
+ "arguments": {
+ "client": "client",
+ "event": {
+ "serverHeartbeatStartedEvent": {}
+ },
+ "count": 2
+ }
+ }
+ ],
+ "expectEvents": [
+ {
+ "client": "client",
+ "eventType": "sdam",
+ "ignoreExtraEvents": true,
+ "events": [
+ {
+ "serverHeartbeatStartedEvent": {
+ "awaited": true
+ }
+ },
+ {
+ "serverHeartbeatSucceededEvent": {
+ "awaited": true
+ }
+ },
+ {
+ "serverHeartbeatStartedEvent": {
+ "awaited": true
+ }
+ }
+ ]
+ }
+ ]
+ }
+ ]
+}
diff --git a/driver-core/src/test/resources/unified-test-format/server-discovery-and-monitoring/serverMonitoringMode.json b/driver-core/src/test/resources/unified-test-format/server-discovery-and-monitoring/serverMonitoringMode.json
new file mode 100644
index 0000000000..7d681b4f9e
--- /dev/null
+++ b/driver-core/src/test/resources/unified-test-format/server-discovery-and-monitoring/serverMonitoringMode.json
@@ -0,0 +1,449 @@
+{
+ "description": "serverMonitoringMode",
+ "schemaVersion": "1.17",
+ "runOnRequirements": [
+ {
+ "topologies": [
+ "single",
+ "sharded",
+ "sharded-replicaset"
+ ],
+ "serverless": "forbid"
+ }
+ ],
+ "tests": [
+ {
+ "description": "connect with serverMonitoringMode=auto >=4.4",
+ "runOnRequirements": [
+ {
+ "minServerVersion": "4.4.0"
+ }
+ ],
+ "operations": [
+ {
+ "name": "createEntities",
+ "object": "testRunner",
+ "arguments": {
+ "entities": [
+ {
+ "client": {
+ "id": "client",
+ "uriOptions": {
+ "serverMonitoringMode": "auto"
+ },
+ "useMultipleMongoses": false,
+ "observeEvents": [
+ "serverHeartbeatStartedEvent",
+ "serverHeartbeatSucceededEvent",
+ "serverHeartbeatFailedEvent"
+ ]
+ }
+ },
+ {
+ "database": {
+ "id": "db",
+ "client": "client",
+ "databaseName": "sdam-tests"
+ }
+ }
+ ]
+ }
+ },
+ {
+ "name": "runCommand",
+ "object": "db",
+ "arguments": {
+ "commandName": "ping",
+ "command": {
+ "ping": 1
+ }
+ },
+ "expectResult": {
+ "ok": 1
+ }
+ },
+ {
+ "name": "waitForEvent",
+ "object": "testRunner",
+ "arguments": {
+ "client": "client",
+ "event": {
+ "serverHeartbeatStartedEvent": {}
+ },
+ "count": 2
+ }
+ }
+ ],
+ "expectEvents": [
+ {
+ "client": "client",
+ "eventType": "sdam",
+ "ignoreExtraEvents": true,
+ "events": [
+ {
+ "serverHeartbeatStartedEvent": {
+ "awaited": false
+ }
+ },
+ {
+ "serverHeartbeatSucceededEvent": {
+ "awaited": false
+ }
+ },
+ {
+ "serverHeartbeatStartedEvent": {
+ "awaited": true
+ }
+ }
+ ]
+ }
+ ]
+ },
+ {
+ "description": "connect with serverMonitoringMode=auto <4.4",
+ "runOnRequirements": [
+ {
+ "maxServerVersion": "4.2.99"
+ }
+ ],
+ "operations": [
+ {
+ "name": "createEntities",
+ "object": "testRunner",
+ "arguments": {
+ "entities": [
+ {
+ "client": {
+ "id": "client",
+ "uriOptions": {
+ "serverMonitoringMode": "auto",
+ "heartbeatFrequencyMS": 500
+ },
+ "useMultipleMongoses": false,
+ "observeEvents": [
+ "serverHeartbeatStartedEvent",
+ "serverHeartbeatSucceededEvent",
+ "serverHeartbeatFailedEvent"
+ ]
+ }
+ },
+ {
+ "database": {
+ "id": "db",
+ "client": "client",
+ "databaseName": "sdam-tests"
+ }
+ }
+ ]
+ }
+ },
+ {
+ "name": "runCommand",
+ "object": "db",
+ "arguments": {
+ "commandName": "ping",
+ "command": {
+ "ping": 1
+ }
+ },
+ "expectResult": {
+ "ok": 1
+ }
+ },
+ {
+ "name": "waitForEvent",
+ "object": "testRunner",
+ "arguments": {
+ "client": "client",
+ "event": {
+ "serverHeartbeatStartedEvent": {}
+ },
+ "count": 2
+ }
+ }
+ ],
+ "expectEvents": [
+ {
+ "client": "client",
+ "eventType": "sdam",
+ "ignoreExtraEvents": true,
+ "events": [
+ {
+ "serverHeartbeatStartedEvent": {
+ "awaited": false
+ }
+ },
+ {
+ "serverHeartbeatSucceededEvent": {
+ "awaited": false
+ }
+ },
+ {
+ "serverHeartbeatStartedEvent": {
+ "awaited": false
+ }
+ }
+ ]
+ }
+ ]
+ },
+ {
+ "description": "connect with serverMonitoringMode=stream >=4.4",
+ "runOnRequirements": [
+ {
+ "minServerVersion": "4.4.0"
+ }
+ ],
+ "operations": [
+ {
+ "name": "createEntities",
+ "object": "testRunner",
+ "arguments": {
+ "entities": [
+ {
+ "client": {
+ "id": "client",
+ "uriOptions": {
+ "serverMonitoringMode": "stream"
+ },
+ "useMultipleMongoses": false,
+ "observeEvents": [
+ "serverHeartbeatStartedEvent",
+ "serverHeartbeatSucceededEvent",
+ "serverHeartbeatFailedEvent"
+ ]
+ }
+ },
+ {
+ "database": {
+ "id": "db",
+ "client": "client",
+ "databaseName": "sdam-tests"
+ }
+ }
+ ]
+ }
+ },
+ {
+ "name": "runCommand",
+ "object": "db",
+ "arguments": {
+ "commandName": "ping",
+ "command": {
+ "ping": 1
+ }
+ },
+ "expectResult": {
+ "ok": 1
+ }
+ },
+ {
+ "name": "waitForEvent",
+ "object": "testRunner",
+ "arguments": {
+ "client": "client",
+ "event": {
+ "serverHeartbeatStartedEvent": {}
+ },
+ "count": 2
+ }
+ }
+ ],
+ "expectEvents": [
+ {
+ "client": "client",
+ "eventType": "sdam",
+ "ignoreExtraEvents": true,
+ "events": [
+ {
+ "serverHeartbeatStartedEvent": {
+ "awaited": false
+ }
+ },
+ {
+ "serverHeartbeatSucceededEvent": {
+ "awaited": false
+ }
+ },
+ {
+ "serverHeartbeatStartedEvent": {
+ "awaited": true
+ }
+ }
+ ]
+ }
+ ]
+ },
+ {
+ "description": "connect with serverMonitoringMode=stream <4.4",
+ "runOnRequirements": [
+ {
+ "maxServerVersion": "4.2.99"
+ }
+ ],
+ "operations": [
+ {
+ "name": "createEntities",
+ "object": "testRunner",
+ "arguments": {
+ "entities": [
+ {
+ "client": {
+ "id": "client",
+ "uriOptions": {
+ "serverMonitoringMode": "stream",
+ "heartbeatFrequencyMS": 500
+ },
+ "useMultipleMongoses": false,
+ "observeEvents": [
+ "serverHeartbeatStartedEvent",
+ "serverHeartbeatSucceededEvent",
+ "serverHeartbeatFailedEvent"
+ ]
+ }
+ },
+ {
+ "database": {
+ "id": "db",
+ "client": "client",
+ "databaseName": "sdam-tests"
+ }
+ }
+ ]
+ }
+ },
+ {
+ "name": "runCommand",
+ "object": "db",
+ "arguments": {
+ "commandName": "ping",
+ "command": {
+ "ping": 1
+ }
+ },
+ "expectResult": {
+ "ok": 1
+ }
+ },
+ {
+ "name": "waitForEvent",
+ "object": "testRunner",
+ "arguments": {
+ "client": "client",
+ "event": {
+ "serverHeartbeatStartedEvent": {}
+ },
+ "count": 2
+ }
+ }
+ ],
+ "expectEvents": [
+ {
+ "client": "client",
+ "eventType": "sdam",
+ "ignoreExtraEvents": true,
+ "events": [
+ {
+ "serverHeartbeatStartedEvent": {
+ "awaited": false
+ }
+ },
+ {
+ "serverHeartbeatSucceededEvent": {
+ "awaited": false
+ }
+ },
+ {
+ "serverHeartbeatStartedEvent": {
+ "awaited": false
+ }
+ }
+ ]
+ }
+ ]
+ },
+ {
+ "description": "connect with serverMonitoringMode=poll",
+ "operations": [
+ {
+ "name": "createEntities",
+ "object": "testRunner",
+ "arguments": {
+ "entities": [
+ {
+ "client": {
+ "id": "client",
+ "uriOptions": {
+ "serverMonitoringMode": "poll",
+ "heartbeatFrequencyMS": 500
+ },
+ "useMultipleMongoses": false,
+ "observeEvents": [
+ "serverHeartbeatStartedEvent",
+ "serverHeartbeatSucceededEvent",
+ "serverHeartbeatFailedEvent"
+ ]
+ }
+ },
+ {
+ "database": {
+ "id": "db",
+ "client": "client",
+ "databaseName": "sdam-tests"
+ }
+ }
+ ]
+ }
+ },
+ {
+ "name": "runCommand",
+ "object": "db",
+ "arguments": {
+ "commandName": "ping",
+ "command": {
+ "ping": 1
+ }
+ },
+ "expectResult": {
+ "ok": 1
+ }
+ },
+ {
+ "name": "waitForEvent",
+ "object": "testRunner",
+ "arguments": {
+ "client": "client",
+ "event": {
+ "serverHeartbeatStartedEvent": {}
+ },
+ "count": 2
+ }
+ }
+ ],
+ "expectEvents": [
+ {
+ "client": "client",
+ "eventType": "sdam",
+ "ignoreExtraEvents": true,
+ "events": [
+ {
+ "serverHeartbeatStartedEvent": {
+ "awaited": false
+ }
+ },
+ {
+ "serverHeartbeatSucceededEvent": {
+ "awaited": false
+ }
+ },
+ {
+ "serverHeartbeatStartedEvent": {
+ "awaited": false
+ }
+ }
+ ]
+ }
+ ]
+ }
+ ]
+}
diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/DefaultServerMonitorSpecification.groovy b/driver-core/src/test/unit/com/mongodb/internal/connection/DefaultServerMonitorSpecification.groovy
index 1e77995c21..0143de71e9 100644
--- a/driver-core/src/test/unit/com/mongodb/internal/connection/DefaultServerMonitorSpecification.groovy
+++ b/driver-core/src/test/unit/com/mongodb/internal/connection/DefaultServerMonitorSpecification.groovy
@@ -84,7 +84,7 @@ class DefaultServerMonitorSpecification extends Specification {
}
}
monitor = new DefaultServerMonitor(new ServerId(new ClusterId(), new ServerAddress()), ServerSettings.builder().build(),
- internalConnectionFactory, ClusterConnectionMode.SINGLE, null, SameObjectProvider.initialized(sdam))
+ internalConnectionFactory, ClusterConnectionMode.SINGLE, null, false, SameObjectProvider.initialized(sdam))
monitor.start()
when:
@@ -167,7 +167,7 @@ class DefaultServerMonitorSpecification extends Specification {
}
monitor = new DefaultServerMonitor(new ServerId(new ClusterId(), new ServerAddress()),
ServerSettings.builder().heartbeatFrequency(1, TimeUnit.SECONDS).addServerMonitorListener(serverMonitorListener).build(),
- internalConnectionFactory, ClusterConnectionMode.SINGLE, null, mockSdamProvider())
+ internalConnectionFactory, ClusterConnectionMode.SINGLE, null, false, mockSdamProvider())
when:
monitor.start()
@@ -246,7 +246,7 @@ class DefaultServerMonitorSpecification extends Specification {
}
monitor = new DefaultServerMonitor(new ServerId(new ClusterId(), new ServerAddress()),
ServerSettings.builder().heartbeatFrequency(1, TimeUnit.SECONDS).addServerMonitorListener(serverMonitorListener).build(),
- internalConnectionFactory, ClusterConnectionMode.SINGLE, null, mockSdamProvider())
+ internalConnectionFactory, ClusterConnectionMode.SINGLE, null, false, mockSdamProvider())
when:
monitor.start()
diff --git a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/unified/UnifiedServerDiscoveryAndMonitoringTest.java b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/unified/UnifiedServerDiscoveryAndMonitoringTest.java
index 4ea7b43bb0..b32137abd4 100644
--- a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/unified/UnifiedServerDiscoveryAndMonitoringTest.java
+++ b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/unified/UnifiedServerDiscoveryAndMonitoringTest.java
@@ -19,6 +19,7 @@
import com.mongodb.lang.Nullable;
import org.bson.BsonArray;
import org.bson.BsonDocument;
+import org.junit.Before;
import org.junit.runners.Parameterized;
import java.io.IOException;
@@ -38,4 +39,9 @@ public UnifiedServerDiscoveryAndMonitoringTest(@SuppressWarnings("unused") final
public static Collection data() throws URISyntaxException, IOException {
return getTestData("unified-test-format/server-discovery-and-monitoring");
}
+
+ @Before
+ public void before() {
+ com.mongodb.client.unified.UnifiedServerDiscoveryAndMonitoringTest.skipTests(getDefinition());
+ }
}
diff --git a/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedServerDiscoveryAndMonitoringTest.java b/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedServerDiscoveryAndMonitoringTest.java
index c01bdca845..7f2b4bce60 100644
--- a/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedServerDiscoveryAndMonitoringTest.java
+++ b/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedServerDiscoveryAndMonitoringTest.java
@@ -19,12 +19,16 @@
import com.mongodb.lang.Nullable;
import org.bson.BsonArray;
import org.bson.BsonDocument;
+import org.bson.BsonString;
+import org.junit.Before;
import org.junit.runners.Parameterized;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.Collection;
+import static org.junit.Assume.assumeFalse;
+
public class UnifiedServerDiscoveryAndMonitoringTest extends UnifiedSyncTest {
public UnifiedServerDiscoveryAndMonitoringTest(@SuppressWarnings("unused") final String fileDescription,
@@ -38,4 +42,17 @@ public UnifiedServerDiscoveryAndMonitoringTest(@SuppressWarnings("unused") final
public static Collection data() throws URISyntaxException, IOException {
return getTestData("unified-test-format/server-discovery-and-monitoring");
}
+
+ @Before
+ public void before() {
+ skipTests(getDefinition());
+ }
+
+ public static void skipTests(final BsonDocument definition) {
+ String description = definition.getString("description", new BsonString("")).getValue();
+ assumeFalse("Skipping because our server monitoring events behave differently for now",
+ description.equals("connect with serverMonitoringMode=auto >=4.4"));
+ assumeFalse("Skipping because our server monitoring events behave differently for now",
+ description.equals("connect with serverMonitoringMode=stream >=4.4"));
+ }
}
diff --git a/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTest.java b/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTest.java
index ea2edd8f03..ca34f52664 100644
--- a/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTest.java
+++ b/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTest.java
@@ -176,6 +176,10 @@ private static Object[] createTestData(final BsonDocument fileDocument, final Bs
testDocument};
}
+ protected BsonDocument getDefinition() {
+ return definition;
+ }
+
protected abstract MongoClient createMongoClient(MongoClientSettings settings);
protected abstract GridFSBucket createGridFSBucket(MongoDatabase database);
From b7e461f5f085595086071f99e10b1909b8b1b034 Mon Sep 17 00:00:00 2001
From: Valentin Kovalenko
Date: Thu, 22 Feb 2024 13:16:43 -0700
Subject: [PATCH 06/13] Start `roundTripTimeMonitor` only if the streaming
protocol is used
This change is in accordance with source/server-discovery-and-monitoring/server-monitoring.rst.
JAVA-4936
---
.../connection/DefaultServerMonitor.java | 88 +++++++++++++------
.../DefaultServerMonitorSpecification.groovy | 2 +-
2 files changed, 62 insertions(+), 28 deletions(-)
diff --git a/driver-core/src/main/com/mongodb/internal/connection/DefaultServerMonitor.java b/driver-core/src/main/com/mongodb/internal/connection/DefaultServerMonitor.java
index ab6a5c69cc..0aaae1a38f 100644
--- a/driver-core/src/main/com/mongodb/internal/connection/DefaultServerMonitor.java
+++ b/driver-core/src/main/com/mongodb/internal/connection/DefaultServerMonitor.java
@@ -79,11 +79,13 @@ class DefaultServerMonitor implements ServerMonitor {
private final ServerApi serverApi;
private final boolean faas;
private final ServerSettings serverSettings;
- private final ServerMonitorRunnable monitor;
- private final Thread monitorThread;
- private final RoundTripTimeRunnable roundTripTimeMonitor;
+ private final ServerMonitor monitor;
+ /**
+ * Must be guarded by {@link #lock}.
+ */
+ @Nullable
+ private RoundTripTimeMonitor roundTripTimeMonitor;
private final ExponentiallyWeightedMovingAverage averageRoundTripTime = new ExponentiallyWeightedMovingAverage(0.2);
- private final Thread roundTripTimeMonitorThread;
private final Lock lock = new ReentrantLock();
private final Condition condition = lock.newCondition();
private volatile boolean isClosed;
@@ -102,20 +104,26 @@ class DefaultServerMonitor implements ServerMonitor {
this.serverApi = serverApi;
this.faas = faas;
this.sdamProvider = sdamProvider;
- monitor = new ServerMonitorRunnable();
- monitorThread = new Thread(monitor, "cluster-" + this.serverId.getClusterId() + "-" + this.serverId.getAddress());
- monitorThread.setDaemon(true);
- roundTripTimeMonitor = new RoundTripTimeRunnable();
- roundTripTimeMonitorThread = new Thread(roundTripTimeMonitor,
- "cluster-rtt-" + this.serverId.getClusterId() + "-" + this.serverId.getAddress());
- roundTripTimeMonitorThread.setDaemon(true);
+ monitor = new ServerMonitor();
+ roundTripTimeMonitor = null;
isClosed = false;
}
@Override
public void start() {
- monitorThread.start();
- roundTripTimeMonitorThread.start();
+ monitor.start();
+ }
+
+ private void ensureRoundTripTimeMonitorStarted() {
+ lock.lock();
+ try {
+ if (roundTripTimeMonitor == null) {
+ roundTripTimeMonitor = new RoundTripTimeMonitor();
+ roundTripTimeMonitor.start();
+ }
+ } finally {
+ lock.unlock();
+ }
}
@Override
@@ -124,12 +132,16 @@ public void connect() {
}
@Override
+ @SuppressWarnings("try")
public void close() {
isClosed = true;
- monitor.close();
- monitorThread.interrupt();
- roundTripTimeMonitor.close();
- roundTripTimeMonitorThread.interrupt();
+ withLock(lock, () -> {
+ //noinspection EmptyTryBlock
+ try (ServerMonitor ignoredAutoClosed = monitor;
+ RoundTripTimeMonitor ignoredAutoClose2 = roundTripTimeMonitor) {
+ // we are automatically closing resources here
+ }
+ });
}
@Override
@@ -137,11 +149,18 @@ public void cancelCurrentCheck() {
monitor.cancelCurrentCheck();
}
- class ServerMonitorRunnable implements Runnable {
+ class ServerMonitor extends Thread implements AutoCloseable {
private volatile InternalConnection connection = null;
private volatile boolean currentCheckCancelled;
- void close() {
+ ServerMonitor() {
+ super("cluster-" + serverId.getClusterId() + "-" + serverId.getAddress());
+ setDaemon(true);
+ }
+
+ @Override
+ public void close() {
+ interrupt();
InternalConnection connection = this.connection;
if (connection != null) {
connection.close();
@@ -155,6 +174,10 @@ public void run() {
while (!isClosed) {
ServerDescription previousServerDescription = currentServerDescription;
currentServerDescription = lookupServerDescription(currentServerDescription);
+ boolean shouldStreamResponses = shouldStreamResponses(currentServerDescription);
+ if (shouldStreamResponses) {
+ ensureRoundTripTimeMonitorStarted();
+ }
if (isClosed) {
continue;
@@ -169,7 +192,7 @@ public void run() {
logStateChange(previousServerDescription, currentServerDescription);
sdamProvider.get().update(currentServerDescription);
- if ((shouldStreamResponses(currentServerDescription) && currentServerDescription.getType() != UNKNOWN)
+ if ((shouldStreamResponses && currentServerDescription.getType() != UNKNOWN)
|| (connection != null && connection.hasMoreToCome())
|| (currentServerDescription.getException() instanceof MongoSocketException
&& previousServerDescription.getType() != UNKNOWN)) {
@@ -202,8 +225,9 @@ private ServerDescription lookupServerDescription(final ServerDescription curren
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(format("Checking status of %s", serverId.getAddress()));
}
+ boolean shouldStreamResponses = shouldStreamResponses(currentServerDescription);
serverMonitorListener.serverHearbeatStarted(new ServerHeartbeatStartedEvent(
- connection.getDescription().getConnectionId(), shouldStreamResponses(currentServerDescription)));
+ connection.getDescription().getConnectionId(), shouldStreamResponses));
long start = System.nanoTime();
try {
@@ -211,7 +235,7 @@ private ServerDescription lookupServerDescription(final ServerDescription curren
if (!connection.hasMoreToCome()) {
BsonDocument helloDocument = new BsonDocument(getHandshakeCommandName(currentServerDescription), new BsonInt32(1))
.append("helloOk", BsonBoolean.TRUE);
- if (shouldStreamResponses(currentServerDescription)) {
+ if (shouldStreamResponses) {
helloDocument.append("topologyVersion", assertNotNull(currentServerDescription.getTopologyVersion()).asDocument());
helloDocument.append("maxAwaitTimeMS", new BsonInt64(serverSettings.getHeartbeatFrequency(MILLISECONDS)));
}
@@ -221,7 +245,7 @@ private ServerDescription lookupServerDescription(final ServerDescription curren
}
BsonDocument helloResult;
- if (shouldStreamResponses(currentServerDescription)) {
+ if (shouldStreamResponses) {
helloResult = connection.receive(new BsonDocumentCodec(), sessionContext,
Math.toIntExact(serverSettings.getHeartbeatFrequency(MILLISECONDS)));
} else {
@@ -229,15 +253,18 @@ private ServerDescription lookupServerDescription(final ServerDescription curren
}
long elapsedTimeNanos = System.nanoTime() - start;
+ if (!shouldStreamResponses) {
+ averageRoundTripTime.addSample(elapsedTimeNanos);
+ }
serverMonitorListener.serverHeartbeatSucceeded(
new ServerHeartbeatSucceededEvent(connection.getDescription().getConnectionId(), helloResult,
- elapsedTimeNanos, shouldStreamResponses(currentServerDescription)));
+ elapsedTimeNanos, shouldStreamResponses));
return createServerDescription(serverId.getAddress(), helloResult, averageRoundTripTime.getAverage());
} catch (Exception e) {
serverMonitorListener.serverHeartbeatFailed(
new ServerHeartbeatFailedEvent(connection.getDescription().getConnectionId(), System.nanoTime() - start,
- shouldStreamResponses(currentServerDescription), e));
+ shouldStreamResponses, e));
throw e;
}
} catch (Throwable t) {
@@ -399,10 +426,17 @@ static boolean shouldLogStageChange(final ServerDescription previous, final Serv
}
- private class RoundTripTimeRunnable implements Runnable {
+ private class RoundTripTimeMonitor extends Thread implements AutoCloseable {
private volatile InternalConnection connection = null;
- void close() {
+ RoundTripTimeMonitor() {
+ super("cluster-rtt-" + serverId.getClusterId() + "-" + serverId.getAddress());
+ setDaemon(true);
+ }
+
+ @Override
+ public void close() {
+ interrupt();
InternalConnection connection = this.connection;
if (connection != null) {
connection.close();
diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/DefaultServerMonitorSpecification.groovy b/driver-core/src/test/unit/com/mongodb/internal/connection/DefaultServerMonitorSpecification.groovy
index 0143de71e9..42626a46d9 100644
--- a/driver-core/src/test/unit/com/mongodb/internal/connection/DefaultServerMonitorSpecification.groovy
+++ b/driver-core/src/test/unit/com/mongodb/internal/connection/DefaultServerMonitorSpecification.groovy
@@ -89,7 +89,7 @@ class DefaultServerMonitorSpecification extends Specification {
when:
monitor.close()
- monitor.monitorThread.join()
+ monitor.monitor.join()
then:
!stateChanged
From 2f3cdcbfbfc3f530d61f44a7f259d80da63b97be Mon Sep 17 00:00:00 2001
From: Valentin Kovalenko
Date: Wed, 28 Feb 2024 10:51:08 -0700
Subject: [PATCH 07/13] Update
driver-core/src/main/com/mongodb/connection/ServerMonitoringMode.java
Co-authored-by: Viacheslav Babanin
---
.../src/main/com/mongodb/connection/ServerMonitoringMode.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/driver-core/src/main/com/mongodb/connection/ServerMonitoringMode.java b/driver-core/src/main/com/mongodb/connection/ServerMonitoringMode.java
index a88c4c3afb..8bccd2c453 100644
--- a/driver-core/src/main/com/mongodb/connection/ServerMonitoringMode.java
+++ b/driver-core/src/main/com/mongodb/connection/ServerMonitoringMode.java
@@ -25,7 +25,7 @@
*/
public enum ServerMonitoringMode {
/**
- * Use the streaming protocol whe the server supports it or fall back to the polling protocol otherwise.
+ * Use the streaming protocol when the server supports it or fall back to the polling protocol otherwise.
*/
STREAM("stream"),
/**
From bc589fbec4b39dcbaecab68aae0bc38b41fee2f7 Mon Sep 17 00:00:00 2001
From: Valentin Kovalenko
Date: Wed, 28 Feb 2024 11:04:01 -0700
Subject: [PATCH 08/13] faas -> isFunctionAsAServiceEnvironment
JAVA-4936
---
.../connection/DefaultClusterableServerFactory.java | 8 ++++----
.../mongodb/internal/connection/DefaultServerMonitor.java | 8 ++++----
2 files changed, 8 insertions(+), 8 deletions(-)
diff --git a/driver-core/src/main/com/mongodb/internal/connection/DefaultClusterableServerFactory.java b/driver-core/src/main/com/mongodb/internal/connection/DefaultClusterableServerFactory.java
index 5eeb6f9d1c..7d0f5b62e5 100644
--- a/driver-core/src/main/com/mongodb/internal/connection/DefaultClusterableServerFactory.java
+++ b/driver-core/src/main/com/mongodb/internal/connection/DefaultClusterableServerFactory.java
@@ -53,7 +53,7 @@ public class DefaultClusterableServerFactory implements ClusterableServerFactory
private final List compressorList;
@Nullable
private final ServerApi serverApi;
- private final boolean faas;
+ private final boolean isFunctionAsAServiceEnvironment;
public DefaultClusterableServerFactory(
final ServerSettings serverSettings, final ConnectionPoolSettings connectionPoolSettings,
@@ -63,7 +63,7 @@ public DefaultClusterableServerFactory(
final LoggerSettings loggerSettings,
@Nullable final CommandListener commandListener,
@Nullable final String applicationName, @Nullable final MongoDriverInformation mongoDriverInformation,
- final List compressorList, @Nullable final ServerApi serverApi, final boolean faas) {
+ final List compressorList, @Nullable final ServerApi serverApi, final boolean isFunctionAsAServiceEnvironment) {
this.serverSettings = serverSettings;
this.connectionPoolSettings = connectionPoolSettings;
this.internalConnectionPoolSettings = internalConnectionPoolSettings;
@@ -76,7 +76,7 @@ public DefaultClusterableServerFactory(
this.mongoDriverInformation = mongoDriverInformation;
this.compressorList = compressorList;
this.serverApi = serverApi;
- this.faas = faas;
+ this.isFunctionAsAServiceEnvironment = isFunctionAsAServiceEnvironment;
}
@Override
@@ -88,7 +88,7 @@ public ClusterableServer create(final Cluster cluster, final ServerAddress serve
// no credentials, compressor list, or command listener for the server monitor factory
new InternalStreamConnectionFactory(clusterMode, true, heartbeatStreamFactory, null, applicationName,
mongoDriverInformation, emptyList(), loggerSettings, null, serverApi),
- clusterMode, serverApi, faas, sdamProvider);
+ clusterMode, serverApi, isFunctionAsAServiceEnvironment, sdamProvider);
ConnectionPool connectionPool = new DefaultConnectionPool(serverId,
new InternalStreamConnectionFactory(clusterMode, streamFactory, credential, applicationName,
mongoDriverInformation, compressorList, loggerSettings, commandListener, serverApi),
diff --git a/driver-core/src/main/com/mongodb/internal/connection/DefaultServerMonitor.java b/driver-core/src/main/com/mongodb/internal/connection/DefaultServerMonitor.java
index 0aaae1a38f..9155f6e533 100644
--- a/driver-core/src/main/com/mongodb/internal/connection/DefaultServerMonitor.java
+++ b/driver-core/src/main/com/mongodb/internal/connection/DefaultServerMonitor.java
@@ -77,7 +77,7 @@ class DefaultServerMonitor implements ServerMonitor {
private final ClusterConnectionMode clusterConnectionMode;
@Nullable
private final ServerApi serverApi;
- private final boolean faas;
+ private final boolean isFunctionAsAServiceEnvironment;
private final ServerSettings serverSettings;
private final ServerMonitor monitor;
/**
@@ -94,7 +94,7 @@ class DefaultServerMonitor implements ServerMonitor {
final InternalConnectionFactory internalConnectionFactory,
final ClusterConnectionMode clusterConnectionMode,
@Nullable final ServerApi serverApi,
- final boolean faas,
+ final boolean isFunctionAsAServiceEnvironment,
final Provider sdamProvider) {
this.serverSettings = notNull("serverSettings", serverSettings);
this.serverId = notNull("serverId", serverId);
@@ -102,7 +102,7 @@ class DefaultServerMonitor implements ServerMonitor {
this.internalConnectionFactory = notNull("internalConnectionFactory", internalConnectionFactory);
this.clusterConnectionMode = notNull("clusterConnectionMode", clusterConnectionMode);
this.serverApi = serverApi;
- this.faas = faas;
+ this.isFunctionAsAServiceEnvironment = isFunctionAsAServiceEnvironment;
this.sdamProvider = sdamProvider;
monitor = new ServerMonitor();
roundTripTimeMonitor = null;
@@ -291,7 +291,7 @@ private boolean shouldStreamResponses(final ServerDescription currentServerDescr
return false;
}
case AUTO: {
- return !faas && serverSupportsStreaming;
+ return !isFunctionAsAServiceEnvironment && serverSupportsStreaming;
}
default: {
throw fail();
From 315d66cf2d1df9b024328bd3b283c6a39193e491 Mon Sep 17 00:00:00 2001
From: Valentin Kovalenko
Date: Wed, 28 Feb 2024 12:37:13 -0700
Subject: [PATCH 09/13] Fix the race condition in `DefaultServerMonitor`
JAVA-4936
---
.../internal/connection/DefaultServerMonitor.java | 11 ++++-------
1 file changed, 4 insertions(+), 7 deletions(-)
diff --git a/driver-core/src/main/com/mongodb/internal/connection/DefaultServerMonitor.java b/driver-core/src/main/com/mongodb/internal/connection/DefaultServerMonitor.java
index 9155f6e533..55030a6db3 100644
--- a/driver-core/src/main/com/mongodb/internal/connection/DefaultServerMonitor.java
+++ b/driver-core/src/main/com/mongodb/internal/connection/DefaultServerMonitor.java
@@ -115,15 +115,12 @@ public void start() {
}
private void ensureRoundTripTimeMonitorStarted() {
- lock.lock();
- try {
- if (roundTripTimeMonitor == null) {
+ withLock(lock, () -> {
+ if (!isClosed && roundTripTimeMonitor == null) {
roundTripTimeMonitor = new RoundTripTimeMonitor();
roundTripTimeMonitor.start();
}
- } finally {
- lock.unlock();
- }
+ });
}
@Override
@@ -134,8 +131,8 @@ public void connect() {
@Override
@SuppressWarnings("try")
public void close() {
- isClosed = true;
withLock(lock, () -> {
+ isClosed = true;
//noinspection EmptyTryBlock
try (ServerMonitor ignoredAutoClosed = monitor;
RoundTripTimeMonitor ignoredAutoClose2 = roundTripTimeMonitor) {
From 00c05ffcabdf27ef193f91d397527374bf7b07d9 Mon Sep 17 00:00:00 2001
From: Valentin Kovalenko
Date: Wed, 28 Feb 2024 14:11:29 -0700
Subject: [PATCH 10/13] Add more docs for the streaming protocol
JAVA-4936
---
.../mongodb/connection/ServerMonitoringMode.java | 15 +++++++++++++++
.../mongodb/event/ServerHeartbeatFailedEvent.java | 2 ++
.../event/ServerHeartbeatStartedEvent.java | 2 ++
.../event/ServerHeartbeatSucceededEvent.java | 2 ++
4 files changed, 21 insertions(+)
diff --git a/driver-core/src/main/com/mongodb/connection/ServerMonitoringMode.java b/driver-core/src/main/com/mongodb/connection/ServerMonitoringMode.java
index 8bccd2c453..bb04253c51 100644
--- a/driver-core/src/main/com/mongodb/connection/ServerMonitoringMode.java
+++ b/driver-core/src/main/com/mongodb/connection/ServerMonitoringMode.java
@@ -15,17 +15,32 @@
*/
package com.mongodb.connection;
+import com.mongodb.event.ClusterListener;
+import com.mongodb.event.ServerHeartbeatFailedEvent;
+import com.mongodb.event.ServerHeartbeatStartedEvent;
+import com.mongodb.event.ServerHeartbeatSucceededEvent;
+import com.mongodb.event.ServerListener;
+
import static com.mongodb.assertions.Assertions.notNull;
import static java.lang.String.format;
/**
* The server monitoring mode, which defines the monitoring protocol to use.
*
+ * @see
+ * server discovery and monitoring (SDAM)
* @since 5.1
*/
public enum ServerMonitoringMode {
/**
* Use the streaming protocol when the server supports it or fall back to the polling protocol otherwise.
+ * When the streaming protocol comes into play,
+ * {@link ServerHeartbeatStartedEvent#isAwaited()}, {@link ServerHeartbeatSucceededEvent#isAwaited()},
+ * {@link ServerHeartbeatFailedEvent#isAwaited()} return {@code true} for new events.
+ *
+ * The streaming protocol uses long polling for SDAM, and is intended to reduce the delay between a server change
+ * that warrants a new event for {@link ServerListener}/{@link ClusterListener},
+ * and that event being emitted, as well as the related housekeeping work being done.
*/
STREAM("stream"),
/**
diff --git a/driver-core/src/main/com/mongodb/event/ServerHeartbeatFailedEvent.java b/driver-core/src/main/com/mongodb/event/ServerHeartbeatFailedEvent.java
index b324ddb84c..a8468effe4 100644
--- a/driver-core/src/main/com/mongodb/event/ServerHeartbeatFailedEvent.java
+++ b/driver-core/src/main/com/mongodb/event/ServerHeartbeatFailedEvent.java
@@ -17,6 +17,7 @@
package com.mongodb.event;
import com.mongodb.connection.ConnectionId;
+import com.mongodb.connection.ServerMonitoringMode;
import java.util.concurrent.TimeUnit;
@@ -77,6 +78,7 @@ public long getElapsedTime(final TimeUnit timeUnit) {
* to the server and the time that the server waited before sending a response.
*
* @return whether the response was awaited
+ * @see ServerMonitoringMode#STREAM
* @since 4.1
* @mongodb.server.release 4.4
*/
diff --git a/driver-core/src/main/com/mongodb/event/ServerHeartbeatStartedEvent.java b/driver-core/src/main/com/mongodb/event/ServerHeartbeatStartedEvent.java
index 0e377ed7b3..f83dc5ef00 100644
--- a/driver-core/src/main/com/mongodb/event/ServerHeartbeatStartedEvent.java
+++ b/driver-core/src/main/com/mongodb/event/ServerHeartbeatStartedEvent.java
@@ -17,6 +17,7 @@
package com.mongodb.event;
import com.mongodb.connection.ConnectionId;
+import com.mongodb.connection.ServerMonitoringMode;
import static com.mongodb.assertions.Assertions.notNull;
@@ -66,6 +67,7 @@ public ConnectionId getConnectionId() {
* Gets whether the heartbeat is for an awaitable `hello` / legacy hello.
*
* @return {@code true} if and only if the heartbeat is for an awaitable `hello` / legacy hello.
+ * @see ServerMonitoringMode#STREAM
* @since 5.1
*/
public boolean isAwaited() {
diff --git a/driver-core/src/main/com/mongodb/event/ServerHeartbeatSucceededEvent.java b/driver-core/src/main/com/mongodb/event/ServerHeartbeatSucceededEvent.java
index e6deb0bb7a..20e9741275 100644
--- a/driver-core/src/main/com/mongodb/event/ServerHeartbeatSucceededEvent.java
+++ b/driver-core/src/main/com/mongodb/event/ServerHeartbeatSucceededEvent.java
@@ -17,6 +17,7 @@
package com.mongodb.event;
import com.mongodb.connection.ConnectionId;
+import com.mongodb.connection.ServerMonitoringMode;
import org.bson.BsonDocument;
import java.util.concurrent.TimeUnit;
@@ -87,6 +88,7 @@ public long getElapsedTime(final TimeUnit timeUnit) {
* to the server and the time that the server waited before sending a response.
*
* @return whether the response was awaited
+ * @see ServerMonitoringMode#STREAM
* @since 4.1
* @mongodb.server.release 4.4
*/
From fa83aba2a10850434a09a01e7864def433c8f12f Mon Sep 17 00:00:00 2001
From: Valentin Kovalenko
Date: Wed, 28 Feb 2024 15:36:38 -0700
Subject: [PATCH 11/13] Remove `ServerMonitoringMode.getValue`/`fromString`
from the API
JAVA-4936
---
.../main/com/mongodb/ConnectionString.java | 3 +-
.../connection/ServerMonitoringMode.java | 43 ++-------------
.../connection/ServerMonitoringModeUtil.java | 52 +++++++++++++++++++
.../mongodb/AbstractConnectionStringTest.java | 3 +-
.../ServerMonitoringModeUtilTest.java} | 19 +++----
.../com/mongodb/client/unified/Entities.java | 4 +-
6 files changed, 71 insertions(+), 53 deletions(-)
create mode 100644 driver-core/src/main/com/mongodb/internal/connection/ServerMonitoringModeUtil.java
rename driver-core/src/test/unit/com/mongodb/{connection/ServerMonitoringModeTest.java => internal/connection/ServerMonitoringModeUtilTest.java} (68%)
diff --git a/driver-core/src/main/com/mongodb/ConnectionString.java b/driver-core/src/main/com/mongodb/ConnectionString.java
index d9fa22fec4..e715b8983f 100644
--- a/driver-core/src/main/com/mongodb/ConnectionString.java
+++ b/driver-core/src/main/com/mongodb/ConnectionString.java
@@ -26,6 +26,7 @@
import com.mongodb.event.ConnectionCheckedOutEvent;
import com.mongodb.event.ConnectionCreatedEvent;
import com.mongodb.event.ConnectionReadyEvent;
+import com.mongodb.internal.connection.ServerMonitoringModeUtil;
import com.mongodb.internal.diagnostics.logging.Logger;
import com.mongodb.internal.diagnostics.logging.Loggers;
import com.mongodb.internal.dns.DefaultDnsResolver;
@@ -677,7 +678,7 @@ private void translateOptions(final Map> optionsMap) {
heartbeatFrequency = parseInteger(value, "heartbeatfrequencyms");
break;
case "servermonitoringmode":
- serverMonitoringMode = ServerMonitoringMode.fromString(value);
+ serverMonitoringMode = ServerMonitoringModeUtil.fromString(value);
break;
case "appname":
applicationName = value;
diff --git a/driver-core/src/main/com/mongodb/connection/ServerMonitoringMode.java b/driver-core/src/main/com/mongodb/connection/ServerMonitoringMode.java
index bb04253c51..630eef2ff6 100644
--- a/driver-core/src/main/com/mongodb/connection/ServerMonitoringMode.java
+++ b/driver-core/src/main/com/mongodb/connection/ServerMonitoringMode.java
@@ -21,9 +21,6 @@
import com.mongodb.event.ServerHeartbeatSucceededEvent;
import com.mongodb.event.ServerListener;
-import static com.mongodb.assertions.Assertions.notNull;
-import static java.lang.String.format;
-
/**
* The server monitoring mode, which defines the monitoring protocol to use.
*
@@ -42,48 +39,14 @@ public enum ServerMonitoringMode {
* that warrants a new event for {@link ServerListener}/{@link ClusterListener},
* and that event being emitted, as well as the related housekeeping work being done.
*/
- STREAM("stream"),
+ STREAM(),
/**
* Use the polling protocol.
*/
- POLL("poll"),
+ POLL(),
/**
* Behave the same as {@link #POLL} if running in a FaaS environment, otherwise behave as {@link #STREAM}.
* This is the default.
*/
- AUTO("auto");
-
- private final String value;
-
- ServerMonitoringMode(final String value) {
- this.value = value;
- }
-
- /**
- * Parses a string into {@link ServerMonitoringMode}.
- *
- * @param serverMonitoringMode A server monitoring mode string.
- * @return The corresponding {@link ServerMonitoringMode} value.
- * @see #getValue()
- */
- public static ServerMonitoringMode fromString(final String serverMonitoringMode) {
- notNull("serverMonitoringMode", serverMonitoringMode);
- for (ServerMonitoringMode mode : ServerMonitoringMode.values()) {
- if (serverMonitoringMode.equalsIgnoreCase(mode.value)) {
- return mode;
- }
- }
- throw new IllegalArgumentException(format("'%s' is not a valid %s",
- serverMonitoringMode, ServerMonitoringMode.class.getSimpleName()));
- }
-
- /**
- * The string value.
- *
- * @return The string value.
- * @see #fromString(String)
- */
- public String getValue() {
- return value;
- }
+ AUTO()
}
diff --git a/driver-core/src/main/com/mongodb/internal/connection/ServerMonitoringModeUtil.java b/driver-core/src/main/com/mongodb/internal/connection/ServerMonitoringModeUtil.java
new file mode 100644
index 0000000000..807975652e
--- /dev/null
+++ b/driver-core/src/main/com/mongodb/internal/connection/ServerMonitoringModeUtil.java
@@ -0,0 +1,52 @@
+/*
+ * Copyright 2008-present MongoDB, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.mongodb.internal.connection;
+
+import com.mongodb.connection.ServerMonitoringMode;
+
+import static java.lang.String.format;
+
+public final class ServerMonitoringModeUtil {
+ /**
+ * Returns the string value of the provided {@code serverMonitoringMode}.
+ *
+ * @return The string value.
+ * @see #fromString(String)
+ */
+ public static String getValue(final ServerMonitoringMode serverMonitoringMode) {
+ return serverMonitoringMode.name().toLowerCase();
+ }
+
+ /**
+ * Parses a string into {@link ServerMonitoringMode}.
+ *
+ * @param serverMonitoringMode A server monitoring mode string.
+ * @return The corresponding {@link ServerMonitoringMode} value.
+ * @see #getValue(ServerMonitoringMode)
+ */
+ public static ServerMonitoringMode fromString(final String serverMonitoringMode) {
+ for (ServerMonitoringMode mode : ServerMonitoringMode.values()) {
+ if (serverMonitoringMode.equalsIgnoreCase(mode.name())) {
+ return mode;
+ }
+ }
+ throw new IllegalArgumentException(format("'%s' is not a valid %s",
+ serverMonitoringMode, ServerMonitoringMode.class.getSimpleName()));
+ }
+
+ private ServerMonitoringModeUtil() {
+ }
+}
diff --git a/driver-core/src/test/unit/com/mongodb/AbstractConnectionStringTest.java b/driver-core/src/test/unit/com/mongodb/AbstractConnectionStringTest.java
index 9e07b9758a..bb26a3edb5 100644
--- a/driver-core/src/test/unit/com/mongodb/AbstractConnectionStringTest.java
+++ b/driver-core/src/test/unit/com/mongodb/AbstractConnectionStringTest.java
@@ -16,6 +16,7 @@
package com.mongodb;
+import com.mongodb.internal.connection.ServerMonitoringModeUtil;
import com.mongodb.lang.Nullable;
import junit.framework.TestCase;
import org.bson.BsonArray;
@@ -141,7 +142,7 @@ protected void testValidOptions() {
assertEquals(expected, connectionString.getHeartbeatFrequency().intValue());
} else if (option.getKey().equalsIgnoreCase("servermonitoringmode")) {
String expected = option.getValue().asString().getValue();
- assertEquals(expected, connectionString.getServerMonitoringMode().getValue());
+ assertEquals(expected, ServerMonitoringModeUtil.getValue(connectionString.getServerMonitoringMode()));
} else if (option.getKey().equalsIgnoreCase("localthresholdms")) {
int expected = option.getValue().asInt32().getValue();
assertEquals(expected, connectionString.getLocalThreshold().intValue());
diff --git a/driver-core/src/test/unit/com/mongodb/connection/ServerMonitoringModeTest.java b/driver-core/src/test/unit/com/mongodb/internal/connection/ServerMonitoringModeUtilTest.java
similarity index 68%
rename from driver-core/src/test/unit/com/mongodb/connection/ServerMonitoringModeTest.java
rename to driver-core/src/test/unit/com/mongodb/internal/connection/ServerMonitoringModeUtilTest.java
index 1aba44d476..f549207b74 100644
--- a/driver-core/src/test/unit/com/mongodb/connection/ServerMonitoringModeTest.java
+++ b/driver-core/src/test/unit/com/mongodb/internal/connection/ServerMonitoringModeUtilTest.java
@@ -13,31 +13,32 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package com.mongodb.connection;
+package com.mongodb.internal.connection;
+import com.mongodb.connection.ServerMonitoringMode;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertAll;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
-final class ServerMonitoringModeTest {
+final class ServerMonitoringModeUtilTest {
@Test
public void fromString() {
assertAll(
- () -> assertEquals(ServerMonitoringMode.STREAM, ServerMonitoringMode.fromString("stream")),
- () -> assertEquals(ServerMonitoringMode.POLL, ServerMonitoringMode.fromString("poll")),
- () -> assertEquals(ServerMonitoringMode.AUTO, ServerMonitoringMode.fromString("auto")),
- () -> assertThrows(IllegalArgumentException.class, () -> ServerMonitoringMode.fromString("invalid"))
+ () -> assertEquals(ServerMonitoringMode.STREAM, ServerMonitoringModeUtil.fromString("stream")),
+ () -> assertEquals(ServerMonitoringMode.POLL, ServerMonitoringModeUtil.fromString("poll")),
+ () -> assertEquals(ServerMonitoringMode.AUTO, ServerMonitoringModeUtil.fromString("auto")),
+ () -> assertThrows(IllegalArgumentException.class, () -> ServerMonitoringModeUtil.fromString("invalid"))
);
}
@Test
public void getValue() {
assertAll(
- () -> assertEquals("stream", ServerMonitoringMode.STREAM.getValue()),
- () -> assertEquals("poll", ServerMonitoringMode.POLL.getValue()),
- () -> assertEquals("auto", ServerMonitoringMode.AUTO.getValue())
+ () -> assertEquals("stream", ServerMonitoringModeUtil.getValue(ServerMonitoringMode.STREAM)),
+ () -> assertEquals("poll", ServerMonitoringModeUtil.getValue(ServerMonitoringMode.POLL)),
+ () -> assertEquals("auto", ServerMonitoringModeUtil.getValue(ServerMonitoringMode.AUTO))
);
}
}
diff --git a/driver-sync/src/test/functional/com/mongodb/client/unified/Entities.java b/driver-sync/src/test/functional/com/mongodb/client/unified/Entities.java
index fadc126daf..3071879e9a 100644
--- a/driver-sync/src/test/functional/com/mongodb/client/unified/Entities.java
+++ b/driver-sync/src/test/functional/com/mongodb/client/unified/Entities.java
@@ -23,8 +23,8 @@
import com.mongodb.ReadConcernLevel;
import com.mongodb.ServerApi;
import com.mongodb.ServerApiVersion;
-import com.mongodb.connection.ServerMonitoringMode;
import com.mongodb.event.TestServerMonitorListener;
+import com.mongodb.internal.connection.ServerMonitoringModeUtil;
import com.mongodb.internal.connection.TestClusterListener;
import com.mongodb.logging.TestLoggingInterceptor;
import com.mongodb.TransactionOptions;
@@ -510,7 +510,7 @@ private void initClient(final BsonDocument entity, final String id,
break;
case "serverMonitoringMode":
clientSettingsBuilder.applyToServerSettings(builder -> builder.serverMonitoringMode(
- ServerMonitoringMode.fromString(value.asString().getValue())));
+ ServerMonitoringModeUtil.fromString(value.asString().getValue())));
break;
default:
throw new UnsupportedOperationException("Unsupported uri option: " + key);
From b372ca97a65a3671cc5a15d4f8d87b14f4724e1e Mon Sep 17 00:00:00 2001
From: Valentin Kovalenko
Date: Wed, 28 Feb 2024 17:49:27 -0700
Subject: [PATCH 12/13] Update
driver-core/src/main/com/mongodb/connection/ServerMonitoringMode.java
Co-authored-by: Jeff Yemin
---
.../src/main/com/mongodb/connection/ServerMonitoringMode.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/driver-core/src/main/com/mongodb/connection/ServerMonitoringMode.java b/driver-core/src/main/com/mongodb/connection/ServerMonitoringMode.java
index 630eef2ff6..cf54afef4b 100644
--- a/driver-core/src/main/com/mongodb/connection/ServerMonitoringMode.java
+++ b/driver-core/src/main/com/mongodb/connection/ServerMonitoringMode.java
@@ -35,7 +35,7 @@ public enum ServerMonitoringMode {
* {@link ServerHeartbeatStartedEvent#isAwaited()}, {@link ServerHeartbeatSucceededEvent#isAwaited()},
* {@link ServerHeartbeatFailedEvent#isAwaited()} return {@code true} for new events.
*
- * The streaming protocol uses long polling for SDAM, and is intended to reduce the delay between a server change
+ * The streaming protocol uses long polling for server monitoring, and is intended to reduce the delay between a server change
* that warrants a new event for {@link ServerListener}/{@link ClusterListener},
* and that event being emitted, as well as the related housekeeping work being done.
*/
From 128741d1bad4f840e7c7d7bf823b0821f51e0f0c Mon Sep 17 00:00:00 2001
From: Valentin Kovalenko
Date: Wed, 28 Feb 2024 17:50:47 -0700
Subject: [PATCH 13/13] Update
driver-core/src/main/com/mongodb/internal/connection/ServerMonitoringModeUtil.java
Co-authored-by: Jeff Yemin
---
.../mongodb/internal/connection/ServerMonitoringModeUtil.java | 3 +++
1 file changed, 3 insertions(+)
diff --git a/driver-core/src/main/com/mongodb/internal/connection/ServerMonitoringModeUtil.java b/driver-core/src/main/com/mongodb/internal/connection/ServerMonitoringModeUtil.java
index 807975652e..17629f38a5 100644
--- a/driver-core/src/main/com/mongodb/internal/connection/ServerMonitoringModeUtil.java
+++ b/driver-core/src/main/com/mongodb/internal/connection/ServerMonitoringModeUtil.java
@@ -19,6 +19,9 @@
import static java.lang.String.format;
+/**
+ * This class is not part of the public API and may be removed or changed at any time
+ */
public final class ServerMonitoringModeUtil {
/**
* Returns the string value of the provided {@code serverMonitoringMode}.