Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Netty event loop resource leak on MongoClient close. #1646

Merged
merged 3 commits into from
Mar 25, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
import com.mongodb.connection.AsyncTransportSettings;
import com.mongodb.connection.NettyTransportSettings;
import com.mongodb.connection.SocketSettings;
import com.mongodb.connection.SslSettings;
import com.mongodb.connection.TransportSettings;
import com.mongodb.internal.connection.netty.NettyStreamFactoryFactory;
import com.mongodb.lang.Nullable;
import com.mongodb.spi.dns.InetAddressResolver;

import java.io.IOException;
Expand All @@ -34,16 +36,26 @@
*/
public final class StreamFactoryHelper {

public static StreamFactory getSyncStreamFactory(final MongoClientSettings settings,
final InetAddressResolver inetAddressResolver, final SocketSettings socketSettings) {
TransportSettings transportSettings = settings.getTransportSettings();
public static StreamFactoryFactory getSyncStreamFactoryFactory(
@Nullable final TransportSettings transportSettings,
final InetAddressResolver inetAddressResolver) {

if (transportSettings == null) {
return new SocketStreamFactory(inetAddressResolver, socketSettings, settings.getSslSettings());
return new StreamFactoryFactory() {
@Override
public StreamFactory create(final SocketSettings socketSettings, final SslSettings sslSettings) {
return new SocketStreamFactory(inetAddressResolver, socketSettings, sslSettings);
}

@Override
public void close() {
//NOP
}
};
} else if (transportSettings instanceof AsyncTransportSettings) {
throw new MongoClientException("Unsupported transport settings in sync: " + transportSettings.getClass().getName());
} else if (transportSettings instanceof NettyTransportSettings) {
return getNettyStreamFactoryFactory(inetAddressResolver, (NettyTransportSettings) transportSettings)
.create(socketSettings, settings.getSslSettings());
return getNettyStreamFactoryFactory(inetAddressResolver, (NettyTransportSettings) transportSettings);
} else {
throw new MongoClientException("Unsupported transport settings: " + transportSettings.getClass().getName());
}
Expand Down
19 changes: 18 additions & 1 deletion driver-legacy/src/main/com/mongodb/MongoClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.mongodb.client.ListDatabasesIterable;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.MongoIterable;
import com.mongodb.client.internal.Clusters;
import com.mongodb.client.internal.MongoClientImpl;
import com.mongodb.client.internal.OperationExecutor;
import com.mongodb.connection.ClusterConnectionMode;
Expand All @@ -37,6 +38,7 @@
import com.mongodb.internal.connection.Connection;
import com.mongodb.internal.connection.NoOpSessionContext;
import com.mongodb.internal.connection.OperationContext;
import com.mongodb.internal.connection.StreamFactoryFactory;
import com.mongodb.internal.diagnostics.logging.Logger;
import com.mongodb.internal.diagnostics.logging.Loggers;
import com.mongodb.internal.session.ServerSessionPool;
Expand All @@ -63,8 +65,11 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

import static com.mongodb.assertions.Assertions.notNull;
import static com.mongodb.internal.connection.ClientMetadataHelper.createClientMetadataDocument;
import static com.mongodb.internal.connection.ServerAddressHelper.createServerAddress;
import static com.mongodb.internal.connection.ServerAddressHelper.getInetAddressResolver;
import static com.mongodb.internal.connection.StreamFactoryHelper.getSyncStreamFactoryFactory;
import static java.lang.String.format;
import static java.util.Collections.singletonList;
import static java.util.concurrent.TimeUnit.SECONDS;
Expand Down Expand Up @@ -244,8 +249,20 @@ public MongoClient(final MongoClientSettings settings, @Nullable final MongoDriv
private MongoClient(final MongoClientSettings settings,
@Nullable final MongoClientOptions options,
@Nullable final MongoDriverInformation mongoDriverInformation) {
notNull("settings", settings);

MongoDriverInformation wrappedMongoDriverInformation = wrapMongoDriverInformation(mongoDriverInformation);
delegate = new MongoClientImpl(settings, wrappedMongoDriverInformation);

StreamFactoryFactory syncStreamFactoryFactory = getSyncStreamFactoryFactory(
settings.getTransportSettings(),
getInetAddressResolver(settings));

Cluster cluster = Clusters.createCluster(
settings,
wrappedMongoDriverInformation,
syncStreamFactoryFactory);

delegate = new MongoClientImpl(cluster, settings, wrappedMongoDriverInformation, syncStreamFactoryFactory);
this.options = options != null ? options : MongoClientOptions.builder(settings).build();
cursorCleaningService = this.options.isCursorFinalizerEnabled() ? createCursorCleaningService() : null;
this.closed = new AtomicBoolean();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ class MongoClientSpecification extends Specification {
def 'should validate the ChangeStreamIterable pipeline data correctly'() {
given:
def executor = new TestOperationExecutor([])
def client = new MongoClientImpl(Stub(Cluster), null, MongoClientSettings.builder().build(), executor)
def client = new MongoClientImpl(Stub(Cluster), null, MongoClientSettings.builder().build(), null, executor)

when:
client.watch((Class) null)
Expand Down
23 changes: 22 additions & 1 deletion driver-sync/src/main/com/mongodb/client/MongoClients.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,16 @@
import com.mongodb.ConnectionString;
import com.mongodb.MongoClientSettings;
import com.mongodb.MongoDriverInformation;
import com.mongodb.client.internal.Clusters;
import com.mongodb.client.internal.MongoClientImpl;
import com.mongodb.internal.connection.Cluster;
import com.mongodb.internal.connection.StreamFactoryFactory;
import com.mongodb.lang.Nullable;

import static com.mongodb.assertions.Assertions.notNull;
import static com.mongodb.internal.connection.ServerAddressHelper.getInetAddressResolver;
import static com.mongodb.internal.connection.StreamFactoryHelper.getSyncStreamFactoryFactory;


/**
* A factory for {@link MongoClient} instances. Use of this class is now the recommended way to connect to MongoDB via the Java driver.
Expand Down Expand Up @@ -103,9 +110,23 @@ public static MongoClient create(final ConnectionString connectionString,
* @return the client
*/
public static MongoClient create(final MongoClientSettings settings, @Nullable final MongoDriverInformation mongoDriverInformation) {
notNull("settings", settings);

MongoDriverInformation.Builder builder = mongoDriverInformation == null ? MongoDriverInformation.builder()
: MongoDriverInformation.builder(mongoDriverInformation);
return new MongoClientImpl(settings, builder.driverName("sync").build());

MongoDriverInformation driverInfo = builder.driverName("sync").build();

StreamFactoryFactory syncStreamFactoryFactory = getSyncStreamFactoryFactory(
settings.getTransportSettings(),
getInetAddressResolver(settings));

Cluster cluster = Clusters.createCluster(
settings,
driverInfo,
syncStreamFactoryFactory);

return new MongoClientImpl(cluster, settings, driverInfo, syncStreamFactoryFactory);
}

private MongoClients() {
Expand Down
63 changes: 63 additions & 0 deletions driver-sync/src/main/com/mongodb/client/internal/Clusters.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright 2008-present MongoDB, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.mongodb.client.internal;

import com.mongodb.MongoClientSettings;
import com.mongodb.MongoDriverInformation;
import com.mongodb.connection.SocketSettings;
import com.mongodb.internal.TimeoutSettings;
import com.mongodb.internal.connection.Cluster;
import com.mongodb.internal.connection.DefaultClusterFactory;
import com.mongodb.internal.connection.InternalConnectionPoolSettings;
import com.mongodb.internal.connection.StreamFactory;
import com.mongodb.internal.connection.StreamFactoryFactory;
import com.mongodb.lang.Nullable;

import static com.mongodb.assertions.Assertions.assertNotNull;
import static com.mongodb.internal.event.EventListenerHelper.getCommandListener;

public final class Clusters {

private Clusters() {
//NOP
}

public static Cluster createCluster(final MongoClientSettings settings,
@Nullable final MongoDriverInformation mongoDriverInformation,
final StreamFactoryFactory streamFactoryFactory) {
assertNotNull(streamFactoryFactory);
assertNotNull(settings);

StreamFactory streamFactory = getStreamFactory(streamFactoryFactory, settings, false);
StreamFactory heartbeatStreamFactory = getStreamFactory(streamFactoryFactory, settings, true);

return new DefaultClusterFactory().createCluster(settings.getClusterSettings(), settings.getServerSettings(),
settings.getConnectionPoolSettings(), InternalConnectionPoolSettings.builder().build(),
TimeoutSettings.create(settings), streamFactory,
TimeoutSettings.createHeartbeatSettings(settings), heartbeatStreamFactory,
settings.getCredential(), settings.getLoggerSettings(), getCommandListener(settings.getCommandListeners()),
settings.getApplicationName(), mongoDriverInformation, settings.getCompressorList(), settings.getServerApi(),
settings.getDnsClient());
}

private static StreamFactory getStreamFactory(
final StreamFactoryFactory streamFactoryFactory,
final MongoClientSettings settings,
final boolean isHeartbeat) {
SocketSettings socketSettings = isHeartbeat ? settings.getHeartbeatSocketSettings() : settings.getSocketSettings();
return streamFactoryFactory.create(socketSettings, settings.getSslSettings());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,20 +33,20 @@
import com.mongodb.client.MongoIterable;
import com.mongodb.client.SynchronousContextProvider;
import com.mongodb.client.model.bulk.ClientBulkWriteOptions;
import com.mongodb.client.model.bulk.ClientNamespacedWriteModel;
import com.mongodb.client.model.bulk.ClientBulkWriteResult;
import com.mongodb.client.model.bulk.ClientNamespacedWriteModel;
import com.mongodb.connection.ClusterDescription;
import com.mongodb.connection.SocketSettings;
import com.mongodb.internal.TimeoutSettings;
import com.mongodb.internal.connection.Cluster;
import com.mongodb.internal.connection.DefaultClusterFactory;
import com.mongodb.internal.connection.InternalConnectionPoolSettings;
import com.mongodb.internal.connection.StreamFactory;
import com.mongodb.internal.connection.StreamFactoryFactory;
import com.mongodb.internal.diagnostics.logging.Logger;
import com.mongodb.internal.diagnostics.logging.Loggers;
import com.mongodb.internal.session.ServerSessionPool;
import com.mongodb.lang.Nullable;
import com.mongodb.spi.dns.InetAddressResolver;
import org.bson.BsonDocument;
import org.bson.Document;
import org.bson.codecs.configuration.CodecRegistry;
Expand All @@ -59,8 +59,6 @@
import static com.mongodb.assertions.Assertions.notNull;
import static com.mongodb.client.internal.Crypts.createCrypt;
import static com.mongodb.internal.connection.ClientMetadataHelper.createClientMetadataDocument;
import static com.mongodb.internal.connection.ServerAddressHelper.getInetAddressResolver;
import static com.mongodb.internal.connection.StreamFactoryHelper.getSyncStreamFactory;
import static com.mongodb.internal.event.EventListenerHelper.getCommandListener;
import static java.lang.String.format;
import static org.bson.codecs.configuration.CodecRegistries.withUuidRepresentation;
Expand All @@ -75,14 +73,22 @@ public final class MongoClientImpl implements MongoClient {
private final MongoDriverInformation mongoDriverInformation;
private final MongoClusterImpl delegate;
private final AtomicBoolean closed;
private final AutoCloseable externalResourceCloser;

public MongoClientImpl(final MongoClientSettings settings, final MongoDriverInformation mongoDriverInformation) {
this(createCluster(settings, mongoDriverInformation), mongoDriverInformation, settings, null);
public MongoClientImpl(final Cluster cluster,
final MongoClientSettings settings,
final MongoDriverInformation mongoDriverInformation,
@Nullable final AutoCloseable externalResourceCloser) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change aligns with the implementation in MongoClientImpl for the reactive-streams driver, as detailed here: MongoClientImpl Code.

this(cluster, mongoDriverInformation, settings, externalResourceCloser, null);
}

public MongoClientImpl(final Cluster cluster, final MongoDriverInformation mongoDriverInformation,
private MongoClientImpl(final Cluster cluster,
final MongoDriverInformation mongoDriverInformation,
final MongoClientSettings settings,
@Nullable final OperationExecutor operationExecutor) {
@Nullable final AutoCloseable externalResourceCloser,
@Nullable final OperationExecutor operationExecutor) {

this.externalResourceCloser = externalResourceCloser;
this.settings = notNull("settings", settings);
this.mongoDriverInformation = mongoDriverInformation;
AutoEncryptionSettings autoEncryptionSettings = settings.getAutoEncryptionSettings();
Expand Down Expand Up @@ -114,6 +120,13 @@ public void close() {
}
delegate.getServerSessionPool().close();
delegate.getCluster().close();
if (externalResourceCloser != null) {
try {
externalResourceCloser.close();
} catch (Exception e) {
LOGGER.warn("Exception closing resource", e);
}
}
}
}

Expand Down Expand Up @@ -287,21 +300,24 @@ public ClientBulkWriteResult bulkWrite(
}

private static Cluster createCluster(final MongoClientSettings settings,
@Nullable final MongoDriverInformation mongoDriverInformation) {
@Nullable final MongoDriverInformation mongoDriverInformation,
final StreamFactory streamFactory, final StreamFactory heartbeatStreamFactory) {
notNull("settings", settings);
return new DefaultClusterFactory().createCluster(settings.getClusterSettings(), settings.getServerSettings(),
settings.getConnectionPoolSettings(), InternalConnectionPoolSettings.builder().build(),
TimeoutSettings.create(settings), getStreamFactory(settings, false),
TimeoutSettings.createHeartbeatSettings(settings), getStreamFactory(settings, true),
TimeoutSettings.create(settings), streamFactory,
TimeoutSettings.createHeartbeatSettings(settings), heartbeatStreamFactory,
settings.getCredential(), settings.getLoggerSettings(), getCommandListener(settings.getCommandListeners()),
settings.getApplicationName(), mongoDriverInformation, settings.getCompressorList(), settings.getServerApi(),
settings.getDnsClient());
}

private static StreamFactory getStreamFactory(final MongoClientSettings settings, final boolean isHeartbeat) {
private static StreamFactory getStreamFactory(
final StreamFactoryFactory streamFactoryFactory,
final MongoClientSettings settings,
final boolean isHeartbeat) {
SocketSettings socketSettings = isHeartbeat ? settings.getHeartbeatSocketSettings() : settings.getSocketSettings();
InetAddressResolver inetAddressResolver = getInetAddressResolver(settings);
return getSyncStreamFactory(settings, inetAddressResolver, socketSettings);
return streamFactoryFactory.create(socketSettings, settings.getSslSettings());
}

public Cluster getCluster() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,16 @@
package com.mongodb.client;

import com.mongodb.ClusterFixture;
import com.mongodb.MongoClientSettings;
import com.mongodb.MongoDriverInformation;
import com.mongodb.client.internal.MongoClientImpl;
import com.mongodb.connection.ClusterId;
import com.mongodb.event.ClusterListener;
import com.mongodb.event.ClusterOpeningEvent;
import com.mongodb.internal.connection.Cluster;
import com.mongodb.internal.mockito.MongoMockito;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
Expand All @@ -29,12 +35,13 @@

import static com.mongodb.client.Fixture.getMongoClientSettingsBuilder;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.doNothing;

public class MongoClientTest {
class MongoClientTest {

@SuppressWarnings("try")
@Test
public void shouldIncludeApplicationNameInClusterId() throws InterruptedException,
void shouldIncludeApplicationNameInClusterId() throws InterruptedException,
ExecutionException, TimeoutException {
CompletableFuture<ClusterId> clusterIdFuture = new CompletableFuture<>();
ClusterListener clusterListener = new ClusterListener() {
Expand All @@ -52,4 +59,37 @@ public void clusterOpening(final ClusterOpeningEvent event) {
assertEquals(applicationName, clusterId.getDescription());
}
}

@Test
void shouldCloseExternalResources() throws Exception {

//given
Cluster cluster = MongoMockito.mock(
Cluster.class,
mockedCluster -> {
doNothing().when(mockedCluster).close();
});
AutoCloseable externalResource = MongoMockito.mock(
AutoCloseable.class,
mockedExternalResource -> {
try {
doNothing().when(mockedExternalResource).close();
} catch (Exception e) {
throw new RuntimeException(e);
}
});

MongoClientImpl mongoClient = new MongoClientImpl(
cluster,
MongoClientSettings.builder().build(),
MongoDriverInformation.builder().build(),
externalResource);

//when
mongoClient.close();

//then
Mockito.verify(externalResource).close();
Mockito.verify(cluster).close();
}
}
Loading