Skip to content

Commit

Permalink
✨ Source MongoDB Internal POC: Discover Operation (#28932)
Browse files Browse the repository at this point in the history
* Implement discover operation

* Formatting
  • Loading branch information
jdpgrailsdev authored and bnchrch committed Aug 3, 2023
1 parent e66ce47 commit 9372bc5
Show file tree
Hide file tree
Showing 4 changed files with 325 additions and 9 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.source.mongodb.internal;

import com.fasterxml.jackson.databind.JsonNode;
import com.mongodb.ConnectionString;
import com.mongodb.MongoClientSettings;
import com.mongodb.MongoCredential;
import com.mongodb.ReadPreference;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;

/**
* Helper utility for building a {@link MongoClient}.
*/
public class MongoConnectionUtils {

/**
* Creates a new {@link MongoClient} from the source configuration.
*
* @param config The source's configuration.
* @return The configured {@link MongoClient}.
*/
public static MongoClient createMongoClient(final JsonNode config) {
final String authSource = config.get("auth_source").asText();
final String connectionString = config.get("connection_string").asText();
final String replicaSet = config.get("replica_set").asText();

final ConnectionString mongoConnectionString = new ConnectionString(connectionString + "?replicaSet=" +
replicaSet + "&retryWrites=false&provider=airbyte&tls=true");

final MongoClientSettings.Builder mongoClientSettingsBuilder = MongoClientSettings.builder()
.applyConnectionString(mongoConnectionString)
.readPreference(ReadPreference.secondaryPreferred());

if (config.has("user") && config.has("password")) {
final String user = config.get("user").asText();
final String password = config.get("password").asText();
mongoClientSettingsBuilder.credential(MongoCredential.createCredential(user, authSource, password.toCharArray()));
}

return MongoClients.create(mongoClientSettingsBuilder.build());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,47 @@
package io.airbyte.integrations.source.mongodb.internal;

import com.fasterxml.jackson.databind.JsonNode;
import com.mongodb.MongoCommandException;
import com.mongodb.MongoException;
import com.mongodb.MongoSecurityException;
import com.mongodb.client.AggregateIterable;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import io.airbyte.commons.exceptions.ConnectionErrorException;
import io.airbyte.commons.util.AutoCloseableIterator;
import io.airbyte.integrations.BaseConnector;
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.integrations.base.Source;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.JsonSchemaType;
import io.airbyte.protocol.models.v0.AirbyteCatalog;
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.AirbyteStream;
import io.airbyte.protocol.models.v0.CatalogHelpers;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.bson.Document;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MongoDbSource extends BaseConnector implements Source, AutoCloseable {

private static final Logger LOGGER = LoggerFactory.getLogger(MongoDbSource.class);

/**
* Set of collection prefixes that should be ignored when performing operations, such as discover to
* avoid access issues.
*/
private static final Set<String> IGNORED_COLLECTIONS = Set.of("system.", "replset.", "oplog.");

public static void main(final String[] args) throws Exception {
final Source source = new MongoDbSource();
LOGGER.info("starting source: {}", MongoDbSource.class);
Expand All @@ -33,13 +59,16 @@ public AirbyteConnectionStatus check(final JsonNode config) throws Exception {
}

@Override
public AirbyteCatalog discover(final JsonNode config) throws Exception {
return null;
public AirbyteCatalog discover(final JsonNode config) {
final List<AirbyteStream> streams = discoverInternal(config);
return new AirbyteCatalog().withStreams(streams);
}

@Override
public AutoCloseableIterator<AirbyteMessage> read(final JsonNode config, final ConfiguredAirbyteCatalog catalog,
final JsonNode state) throws Exception {
public AutoCloseableIterator<AirbyteMessage> read(final JsonNode config,
final ConfiguredAirbyteCatalog catalog,
final JsonNode state)
throws Exception {
return null;
}

Expand All @@ -48,4 +77,87 @@ public void close() throws Exception {

}

private Set<String> getAuthorizedCollections(final MongoClient mongoClient, final String databaseName) {
/*
* db.runCommand ({listCollections: 1.0, authorizedCollections: true, nameOnly: true }) the command
* returns only those collections for which the user has privileges. For example, if a user has find
* action on specific collections, the command returns only those collections; or, if a user has
* find or any other action, on the database resource, the command lists all collections in the
* database.
*/
try {
final Document document = mongoClient.getDatabase(databaseName).runCommand(new Document("listCollections", 1)
.append("authorizedCollections", true)
.append("nameOnly", true))
.append("filter", "{ 'type': 'collection' }");
return document.toBsonDocument()
.get("cursor").asDocument()
.getArray("firstBatch")
.stream()
.map(bsonValue -> bsonValue.asDocument().getString("name").getValue())
.filter(this::isSupportedCollection)
.collect(Collectors.toSet());
} catch (final MongoSecurityException e) {
final MongoCommandException exception = (MongoCommandException) e.getCause();
throw new ConnectionErrorException(String.valueOf(exception.getCode()), e);
} catch (final MongoException e) {
throw new ConnectionErrorException(String.valueOf(e.getCode()), e);
}
}

private List<AirbyteStream> discoverInternal(final JsonNode config) {
final List<AirbyteStream> streams = new ArrayList<>();
try (final MongoClient mongoClient = MongoConnectionUtils.createMongoClient(config)) {
final Set<String> authorizedCollections = getAuthorizedCollections(mongoClient, config.get("database").asText());
authorizedCollections.parallelStream().forEach(collectionName -> {
final List<Field> fields = getFields(mongoClient.getDatabase(config.get("database").asText()).getCollection(collectionName));
streams.add(CatalogHelpers.createAirbyteStream(collectionName, "", fields));
});
return streams;
}
}

private List<Field> getFields(final MongoCollection collection) {
final Map<String, Object> fieldsMap = Map.of("input", Map.of("$objectToArray", "$$ROOT"),
"as", "each",
"in", Map.of("k", "$$each.k", "v", Map.of("$type", "$$each.v")));

final Document mapFunction = new Document("$map", fieldsMap);
final Document arrayToObjectAggregation = new Document("$arrayToObject", mapFunction);
final Document projection = new Document("$project", new Document("fields", arrayToObjectAggregation));

final Map<String, Object> groupMap = new HashMap<>();
groupMap.put("_id", null);
groupMap.put("fields", Map.of("$addToSet", "$fields"));

final AggregateIterable<Document> output = collection.aggregate(Arrays.asList(
projection,
new Document("$unwind", "$fields"),
new Document("$group", groupMap)));

final MongoCursor<Document> cursor = output.cursor();
if (cursor.hasNext()) {
final Map<String, String> fields = ((List<Map<String, String>>) output.cursor().next().get("fields")).get(0);
return fields.entrySet().stream()
.map(e -> new Field(e.getKey(), convertToSchemaType(e.getValue())))
.collect(Collectors.toList());
} else {
return List.of();
}
}

private JsonSchemaType convertToSchemaType(final String type) {
return switch (type) {
case "boolean" -> JsonSchemaType.BOOLEAN;
case "int", "long", "double", "decimal" -> JsonSchemaType.NUMBER;
case "array" -> JsonSchemaType.ARRAY;
case "object", "javascriptWithScope" -> JsonSchemaType.OBJECT;
default -> JsonSchemaType.STRING;
};
}

private boolean isSupportedCollection(final String collectionName) {
return !IGNORED_COLLECTIONS.stream().anyMatch(s -> collectionName.startsWith(s));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "MongoDb Source Spec",
"type": "object",
"required": ["database"],
"required": ["connection_string","database","replica_set"],
"additionalProperties": true,
"properties": {
"connection_string": {
Expand All @@ -15,32 +15,38 @@
"examples": ["mongodb+srv://example.mongodb.net", "mongodb://example1.host.com:27017,example2.host.com:27017,example3.host.com:27017", "mongodb://example.host.com:27017"],
"order": 1
},
"database": {
"title": "Database Name",
"type": "string",
"description": "The database you want to replicate.",
"order": 2
},
"user": {
"title": "User",
"type": "string",
"description": "The username which is used to access the database.",
"order": 2
"order": 3
},
"password": {
"title": "Password",
"type": "string",
"description": "The password associated with this username.",
"airbyte_secret": true,
"order": 3
"order": 4
},
"auth_source": {
"title": "Authentication Source",
"type": "string",
"description": "The authentication source where the user information is stored.",
"default": "admin",
"examples": ["admin"],
"order": 4
"order": 5
},
"replica_set": {
"title": "Replica Set",
"type": "string",
"description": "The name of the replica set to be replicated.",
"order": 5
"order": 6
}
}
}
Expand Down
Loading

0 comments on commit 9372bc5

Please sign in to comment.