Skip to content

Commit

Permalink
Allow reuse of MongoClient instances for the same collection (#127)
Browse files Browse the repository at this point in the history
Spark pushes down aggregation pipelines in the configuration when acquiring MongoClient
instances. This change modifies the MongoClientFactory to only take the connection into
account for lookups in MongoClientCache.

Without this change different queries against the same MongoDB database result in
multiple MongoClient instances preventing caching in those scenarios.

SPARK-441
Co-authored-by: Ross Lawley <ross@mongodb.com>
  • Loading branch information
spingel authored Feb 3, 2025
1 parent 2fed09a commit f47d451
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,12 @@ public boolean equals(final Object o) {
return false;
}
final DefaultMongoClientFactory that = (DefaultMongoClientFactory) o;
return config.equals(that.config);
return config.getConnectionString().equals(that.config.getConnectionString());
}

@Override
public int hashCode() {
return Objects.hash(config);
return Objects.hash(config.getConnectionString());
}

private static MongoDriverInformation generateMongoDriverInformation(final String configType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,16 @@
import static com.mongodb.spark.sql.connector.config.MongoConfig.DATABASE_NAME_CONFIG;
import static com.mongodb.spark.sql.connector.config.MongoConfig.PREFIX;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertNotSame;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import com.mongodb.client.MongoClient;
import com.mongodb.spark.sql.connector.config.MongoConfig;
import com.mongodb.spark.sql.connector.config.ReadConfig;
import java.util.HashMap;
import java.util.Map;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
Expand Down Expand Up @@ -113,7 +112,7 @@ void factoriesWithEqualWriteConfigsCreateNotSameClientsThroughCache() {
}

@Test
void factoriesWithEqualReadWriteConfigsCreateNotSameClientsThroughCache() {
void factoriesWithEqualReadWriteConfigsCreateSameClientsThroughCache() {
MongoConfig config1 = MongoConfig.readConfig(CONFIG_MAP);
MongoConfig config2 = MongoConfig.writeConfig(CONFIG_MAP);
DefaultMongoClientFactory factory1 = new DefaultMongoClientFactory(config1);
Expand All @@ -123,11 +122,10 @@ void factoriesWithEqualReadWriteConfigsCreateNotSameClientsThroughCache() {
MongoClient client1 = mongoClientCache.acquire(factory1);
MongoClient client2 = mongoClientCache.acquire(factory2);

assertNotSame(client1, client2);
assertSame(client1, client2);
}

@Test
@Disabled("fixed in https://github.com/mongodb/mongo-spark/pull/112")
void factoriesWithSameSimpleConfigCreateSameClientsThroughCache() {
MongoConfig config = MongoConfig.createConfig(CONFIG_MAP);
DefaultMongoClientFactory factory1 = new DefaultMongoClientFactory(config);
Expand All @@ -141,7 +139,6 @@ void factoriesWithSameSimpleConfigCreateSameClientsThroughCache() {
}

@Test
@Disabled("fixed in https://github.com/mongodb/mongo-spark/pull/112")
void factoriesWithEqualSimpleConfigsCreateSameClientsThroughCache() {
MongoConfig config1 = MongoConfig.createConfig(CONFIG_MAP);
MongoConfig config2 = MongoConfig.createConfig(CONFIG_MAP);
Expand All @@ -155,6 +152,22 @@ void factoriesWithEqualSimpleConfigsCreateSameClientsThroughCache() {
assertSame(client1, client2);
}

@Test
void factoriesWithDifferentAggregationPipelinesCreateSameClientsThroughCache() {
MongoConfig config1 = MongoConfig.createConfig(CONFIG_MAP);
Map<String, String> configMap2 = new HashMap<>(CONFIG_MAP);
configMap2.put(ReadConfig.AGGREGATION_PIPELINE_CONFIG, "{ foo: \"bar\" }");
MongoConfig config2 = MongoConfig.createConfig(configMap2);
DefaultMongoClientFactory factory1 = new DefaultMongoClientFactory(config1);
DefaultMongoClientFactory factory2 = new DefaultMongoClientFactory(config2);
MongoClientCache mongoClientCache = new MongoClientCache(0, 0, 100);

MongoClient client1 = mongoClientCache.acquire(factory1);
MongoClient client2 = mongoClientCache.acquire(factory2);

assertSame(client1, client2);
}

@Test
void testKeepAliveReuseOfClient() {
MongoClientCache mongoClientCache = new MongoClientCache(500, 0, 200);
Expand Down

0 comments on commit f47d451

Please sign in to comment.