Skip to content

sunset Astrolabe #1769

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,41 +37,19 @@
import com.mongodb.client.vault.ClientEncryption;
import com.mongodb.connection.ClusterConnectionMode;
import com.mongodb.connection.ClusterDescription;
import com.mongodb.connection.ConnectionId;
import com.mongodb.connection.ServerId;
import com.mongodb.event.CommandEvent;
import com.mongodb.event.CommandFailedEvent;
import com.mongodb.event.CommandListener;
import com.mongodb.event.CommandStartedEvent;
import com.mongodb.event.CommandSucceededEvent;
import com.mongodb.event.ConnectionCheckOutFailedEvent;
import com.mongodb.event.ConnectionCheckOutStartedEvent;
import com.mongodb.event.ConnectionCheckedInEvent;
import com.mongodb.event.ConnectionCheckedOutEvent;
import com.mongodb.event.ConnectionClosedEvent;
import com.mongodb.event.ConnectionCreatedEvent;
import com.mongodb.event.ConnectionPoolClearedEvent;
import com.mongodb.event.ConnectionPoolClosedEvent;
import com.mongodb.event.ConnectionPoolCreatedEvent;
import com.mongodb.event.ConnectionPoolListener;
import com.mongodb.event.ConnectionPoolReadyEvent;
import com.mongodb.event.ConnectionReadyEvent;
import com.mongodb.event.TestServerMonitorListener;
import com.mongodb.internal.connection.ServerMonitoringModeUtil;
import com.mongodb.internal.connection.TestClusterListener;
import com.mongodb.internal.connection.TestCommandListener;
import com.mongodb.internal.connection.TestConnectionPoolListener;
import com.mongodb.internal.connection.TestServerListener;
import com.mongodb.internal.logging.LogMessage;
import com.mongodb.lang.NonNull;
import com.mongodb.lang.Nullable;
import com.mongodb.logging.TestLoggingInterceptor;
import org.bson.BsonArray;
import org.bson.BsonBoolean;
import org.bson.BsonDocument;
import org.bson.BsonDouble;
import org.bson.BsonInt32;
import org.bson.BsonInt64;
import org.bson.BsonString;
import org.bson.BsonValue;

Expand All @@ -97,22 +75,20 @@
import static com.mongodb.assertions.Assertions.notNull;
import static com.mongodb.client.Fixture.getMongoClientSettingsBuilder;
import static com.mongodb.client.Fixture.getMultiMongosMongoClientSettingsBuilder;
import static com.mongodb.client.unified.EventMatcher.getReasonString;
import static com.mongodb.client.unified.UnifiedClientEncryptionHelper.createKmsProvidersMap;
import static com.mongodb.client.unified.UnifiedCrudHelper.asReadConcern;
import static com.mongodb.client.unified.UnifiedCrudHelper.asReadPreference;
import static com.mongodb.client.unified.UnifiedCrudHelper.asWriteConcern;
import static com.mongodb.internal.connection.AbstractConnectionPoolTest.waitForPoolAsyncWorkManagerStart;
import static java.lang.System.getenv;
import static java.util.Arrays.asList;
import static java.util.Collections.synchronizedList;
import static org.junit.Assume.assumeTrue;

public final class Entities {
private static final Set<String> SUPPORTED_CLIENT_ENTITY_OPTIONS = new HashSet<>(
asList(
"id", "uriOptions", "serverApi", "useMultipleMongoses", "storeEventsAsEntities",
"observeEvents", "observeLogMessages", "observeSensitiveCommands", "ignoreCommandMonitoringEvents"));
"id", "uriOptions", "serverApi", "useMultipleMongoses", "observeEvents",
"observeLogMessages", "observeSensitiveCommands", "ignoreCommandMonitoringEvents"));
private final Set<String> entityNames = new HashSet<>();
private final Map<String, ExecutorService> threads = new HashMap<>();
private final Map<String, ArrayList<Future<?>>> tasks = new HashMap<>();
Expand All @@ -132,60 +108,8 @@ public final class Entities {
private final Map<String, TestServerMonitorListener> serverMonitorListeners = new HashMap<>();
private final Map<String, MongoCursor<BsonDocument>> cursors = new HashMap<>();
private final Map<String, ClusterDescription> topologyDescriptions = new HashMap<>();
private final Map<String, Long> successCounts = new HashMap<>();
private final Map<String, Long> iterationCounts = new HashMap<>();
private final Map<String, BsonArray> errorDocumentsMap = new HashMap<>();
private final Map<String, BsonArray> failureDocumentsMap = new HashMap<>();
private final Map<String, List<BsonDocument>> eventsMap = new HashMap<>();

public boolean hasSuccessCount(final String id) {
return successCounts.containsKey(id);
}

public void addSuccessCount(final String id, final long count) {
putEntity(id, count, successCounts);
}

public Long getSuccessCount(final String id) {
return getEntity(id, successCounts, "successCount");
}

public boolean hasIterationCount(final String id) {
return iterationCounts.containsKey(id);
}

public void addIterationCount(final String id, final long count) {
putEntity(id, count, iterationCounts);
}

public Long getIterationCount(final String id) {
return getEntity(id, iterationCounts, "successCount");
}

public boolean hasErrorDocuments(final String id) {
return errorDocumentsMap.containsKey(id);
}

public void addErrorDocuments(final String id, final BsonArray errorDocuments) {
putEntity(id, errorDocuments, errorDocumentsMap);
}

public BsonArray getErrorDocuments(final String id) {
return getEntity(id, errorDocumentsMap, "errorDocuments");
}

public boolean hasFailureDocuments(final String id) {
return failureDocumentsMap.containsKey(id);
}

public void addFailureDocuments(final String id, final BsonArray failureDocuments) {
putEntity(id, failureDocuments, failureDocumentsMap);
}

public BsonArray getFailureDocuments(final String id) {
return getEntity(id, failureDocumentsMap, "failureDocuments");
}

public boolean hasEvents(final String id) {
return eventsMap.containsKey(id);
}
Expand Down Expand Up @@ -433,35 +357,6 @@ private void initClient(final BsonDocument entity, final String id,
putEntity(id + "-connection-pool-listener", testConnectionPoolListener, clientConnectionPoolListeners);
}

if (entity.containsKey("storeEventsAsEntities")) {
BsonArray storeEventsAsEntitiesArray = entity.getArray("storeEventsAsEntities");
for (BsonValue eventValue : storeEventsAsEntitiesArray) {
BsonDocument eventDocument = eventValue.asDocument();
String key = eventDocument.getString("id").getValue();
BsonArray eventList = eventDocument.getArray("events");
List<BsonDocument> eventDocumentList = synchronizedList(new ArrayList<>());
putEntity(key, eventDocumentList, eventsMap);

if (eventList.stream()
.map(value -> value.asString().getValue())
.anyMatch(value -> value.startsWith("Command"))) {
clientSettingsBuilder.addCommandListener(new EntityCommandListener(eventList.stream()
.map(value -> value.asString().getValue())
.collect(Collectors.toSet()),
eventDocumentList));
}
if (eventList.stream()
.map(value -> value.asString().getValue())
.anyMatch(value -> value.startsWith("Pool") || value.startsWith("Connection"))) {
clientSettingsBuilder.
applyToConnectionPoolSettings(builder ->
builder.addConnectionPoolListener(new EntityConnectionPoolListener(eventList.stream()
.map(value -> value.asString().getValue())
.collect(Collectors.toSet()),
eventDocumentList)));
}
}
}
clientSettingsBuilder.applyToServerSettings(builder -> {
builder.heartbeatFrequency(50, TimeUnit.MILLISECONDS);
builder.minHeartbeatFrequency(50, TimeUnit.MILLISECONDS);
Expand Down Expand Up @@ -795,154 +690,4 @@ public void close() {
clientLoggingInterceptors.values().forEach(TestLoggingInterceptor::close);
threads.values().forEach(ExecutorService::shutdownNow);
}

private static class EntityCommandListener implements CommandListener {
private final List<BsonDocument> eventDocumentList;
private final Set<String> enabledEvents;

EntityCommandListener(final Set<String> enabledEvents, final List<BsonDocument> eventDocumentList) {
this.eventDocumentList = eventDocumentList;
this.enabledEvents = enabledEvents;
}

@Override
public void commandStarted(final CommandStartedEvent event) {
if (enabledEvents.contains("CommandStartedEvent")) {
eventDocumentList.add(createEventDocument(event, "CommandStartedEvent")
.append("databaseName", new BsonString(event.getDatabaseName())));
}
}

@Override
public void commandSucceeded(final CommandSucceededEvent event) {
if (enabledEvents.contains("CommandSucceededEvent")) {
eventDocumentList.add(createEventDocument(event, "CommandSucceededEvent")
.append("duration", new BsonInt64(event.getElapsedTime(TimeUnit.MILLISECONDS))));
}
}

@Override
public void commandFailed(final CommandFailedEvent event) {
if (enabledEvents.contains("CommandFailedEvent")) {
eventDocumentList.add(createEventDocument(event, "CommandFailedEvent")
.append("duration",
new BsonDouble(event.getElapsedTime(TimeUnit.NANOSECONDS) / 1_000_000_000.0))
.append("failure", new BsonString(event.getThrowable().toString())));
}
}

private BsonDocument createEventDocument(final CommandEvent event, final String name) {
return new BsonDocument()
.append("name", new BsonString(name))
.append("observedAt", new BsonDouble(System.currentTimeMillis() / 1000.0))
.append("commandName", new BsonString(event.getCommandName()))
.append("requestId", new BsonInt32(event.getRequestId()));
}
}

private static class EntityConnectionPoolListener implements ConnectionPoolListener {
private final List<BsonDocument> eventDocumentList;
private final Set<String> enabledEvents;

EntityConnectionPoolListener(final Set<String> enabledEvents, final List<BsonDocument> eventDocumentList) {
this.eventDocumentList = eventDocumentList;
this.enabledEvents = enabledEvents;
}

@Override
public void connectionPoolCreated(final ConnectionPoolCreatedEvent event) {
if (enabledEvents.contains("PoolCreatedEvent")) {
eventDocumentList.add(createEventDocument("PoolCreatedEvent", event.getServerId()));
}
}

@Override
public void connectionPoolCleared(final ConnectionPoolClearedEvent event) {
if (enabledEvents.contains("PoolClearedEvent")) {
eventDocumentList.add(createEventDocument("PoolClearedEvent", event.getServerId()));
}
}

@Override
public void connectionPoolReady(final ConnectionPoolReadyEvent event) {
if (enabledEvents.contains("PoolReadyEvent")) {
eventDocumentList.add(createEventDocument("PoolReadyEvent", event.getServerId()));
}
}

@Override
public void connectionPoolClosed(final ConnectionPoolClosedEvent event) {
if (enabledEvents.contains("PoolClosedEvent")) {
eventDocumentList.add(createEventDocument("PoolClosedEvent", event.getServerId()));
}
}

@Override
public void connectionCheckOutStarted(final ConnectionCheckOutStartedEvent event) {
if (enabledEvents.contains("ConnectionCheckOutStartedEvent")) {
eventDocumentList.add(createEventDocument("ConnectionCheckOutStartedEvent", event.getServerId()));
}
}

@Override
public void connectionCheckedOut(final ConnectionCheckedOutEvent event) {
if (enabledEvents.contains("ConnectionCheckedOutEvent")) {
eventDocumentList.add(createEventDocument("ConnectionCheckedOutEvent", event.getConnectionId()));
}
}

@Override
public void connectionCheckOutFailed(final ConnectionCheckOutFailedEvent event) {
if (enabledEvents.contains("ConnectionCheckOutFailedEvent")) {
eventDocumentList.add(createEventDocument("ConnectionCheckOutFailedEvent", event.getServerId())
.append("reason", new BsonString(getReasonString(event.getReason()))));
}
}

@Override
public void connectionCheckedIn(final ConnectionCheckedInEvent event) {
if (enabledEvents.contains("ConnectionCheckedInEvent")) {
eventDocumentList.add(createEventDocument("ConnectionCheckedInEvent", event.getConnectionId()));
}
}

@Override
public void connectionCreated(final ConnectionCreatedEvent event) {
if (enabledEvents.contains("ConnectionCreatedEvent")) {
eventDocumentList.add(createEventDocument("ConnectionCreatedEvent", event.getConnectionId()));
}
}

@Override
public void connectionReady(final ConnectionReadyEvent event) {
if (enabledEvents.contains("ConnectionReadyEvent")) {
eventDocumentList.add(createEventDocument("ConnectionReadyEvent", event.getConnectionId()));
}
}

@Override
public void connectionClosed(final ConnectionClosedEvent event) {
if (enabledEvents.contains("ConnectionClosedEvent")) {
eventDocumentList.add(createEventDocument("ConnectionClosedEvent", event.getConnectionId())
.append("reason", new BsonString(getReasonString(event.getReason()))));
}
}

private BsonDocument createEventDocument(final String name, final ConnectionId connectionId) {
return createEventDocument(name, connectionId.getServerId())
.append("connectionId", new BsonString(Long.toString(connectionId.getLocalValue())));
}

private BsonDocument createEventDocument(final String name, final ServerId serverId) {
return new BsonDocument()
.append("name", new BsonString(name))
.append("observedAt", new BsonDouble(System.currentTimeMillis() / 1000.0))
.append("address", new BsonString(getAddressAsString(serverId)));
}

@NonNull
private String getAddressAsString(final ServerId serverId) {
return serverId.getAddress().getHost() + ":" + serverId.getAddress().getPort();
}
}
}
Loading