Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Source MongoDb: added support via TLS/SSL #6364

Merged
merged 9 commits into from
Sep 24, 2021
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/publish-command.yml
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ jobs:
DESTINATION_DATABRICKS_CREDS: ${{ secrets.DESTINATION_DATABRICKS_CREDS }}
MSSQL_SSH_KEY_TEST_CREDS: ${{ secrets.MSSQL_SSH_KEY_TEST_CREDS }}
MSSQL_SSH_PWD_TEST_CREDS: ${{ secrets.MSSQL_SSH_PWD_TEST_CREDS }}
MONGODB_TEST_CREDS: ${{ secrets.MONGODB_TEST_CREDS }}
- run: |
echo "$SPEC_CACHE_SERVICE_ACCOUNT_KEY" > spec_cache_key_file.json && docker login -u airbytebot -p ${DOCKER_PASSWORD}
./tools/integrations/manage.sh publish airbyte-integrations/${{ github.event.inputs.connector }} ${{ github.event.inputs.run-tests }} --publish_spec_to_cache
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/test-command.yml
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ jobs:
DESTINATION_DATABRICKS_CREDS: ${{ secrets.DESTINATION_DATABRICKS_CREDS }}
MSSQL_SSH_KEY_TEST_CREDS: ${{ secrets.MSSQL_SSH_KEY_TEST_CREDS }}
MSSQL_SSH_PWD_TEST_CREDS: ${{ secrets.MSSQL_SSH_PWD_TEST_CREDS }}
MONGODB_TEST_CREDS: ${{ secrets.MONGODB_TEST_CREDS }}
- run: |
./tools/bin/ci_integration_test.sh ${{ github.event.inputs.connector }}
name: test ${{ github.event.inputs.connector }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"sourceDefinitionId": "b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e",
"name": "MongoDb",
"dockerRepository": "airbyte/source-mongodb-v2",
"dockerImageTag": "0.1.0",
"dockerImageTag": "0.1.1",
"documentationUrl": "https://docs.airbyte.io/integrations/sources/mongodb-v2",
"icon": "mongodb.svg"
}
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,7 @@
- sourceDefinitionId: b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e
name: MongoDb
dockerRepository: airbyte/source-mongodb-v2
dockerImageTag: 0.1.0
dockerImageTag: 0.1.1
documentationUrl: https://docs.airbyte.io/integrations/sources/mongodb-v2
icon: mongodb.svg
sourceType: database
Expand Down
25 changes: 25 additions & 0 deletions airbyte-db/lib/src/main/java/io/airbyte/db/mongodb/MongoUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.api.client.util.DateTime;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import io.airbyte.commons.json.Jsons;
Expand All @@ -36,13 +37,19 @@
import java.util.List;
import java.util.Map;
import org.bson.BsonBinary;
import org.bson.BsonDateTime;
import org.bson.BsonDocument;
import org.bson.BsonDocumentReader;
import org.bson.BsonDouble;
import org.bson.BsonInt32;
import org.bson.BsonInt64;
import org.bson.BsonReader;
import org.bson.BsonTimestamp;
import org.bson.BsonType;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.bson.types.Decimal128;
import org.bson.types.ObjectId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -70,6 +77,24 @@ public static JsonNode toJsonNode(final Document document, final List<String> co
return objectNode;
}

public static Object getBsonValue(BsonType type, String value) {
try {
return switch (type) {
case INT32 -> new BsonInt32(Integer.parseInt(value));
case INT64 -> new BsonInt64(Long.parseLong(value));
case DOUBLE -> new BsonDouble(Double.parseDouble(value));
case DECIMAL128 -> Decimal128.parse(value);
case TIMESTAMP -> new BsonTimestamp(Long.parseLong(value));
case DATE_TIME -> new BsonDateTime(new DateTime(value).getValue());
case OBJECT_ID -> new ObjectId(value);
default -> null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of default null why don't we return string?

Also I don't see a mapping from string or object, how come?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've changed to String, thanks! Also added String and Symbol.
This mapping needed only for cursor field, since it couldn't be object or array, it isn't present here.

};
} catch (Exception e) {
LOGGER.error("Failed to get BsonValue for field type " + type, e.getMessage());
return null;
}
}

private static void readBson(final Document document, final ObjectNode o, final List<String> columnNames) {
final BsonDocument bsonDocument = toBsonDocument(document);
try (BsonReader reader = new BsonDocumentReader(bsonDocument)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.1.0
LABEL io.airbyte.version=0.1.1
LABEL io.airbyte.name=airbyte/source-mongodb-v2
24 changes: 23 additions & 1 deletion airbyte-integrations/connectors/source-mongodb-v2/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,29 @@ the Dockerfile.
We use `JUnit` for Java tests.

### Test Configuration
No specific configuration needed for testing, MongoDb Test Container is used.

In order to test the MongoDb source, you need to have a standalone instance, a replica set or Atlas cluster.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it doesn't work with a docker image anymore?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It works, but I've also added test for Atlas Cluster with ssl, I've corrected doc.


## Community Contributor

As a community contributor, you will need to have an Atlas cluster to test MongoDb source.

1. Create `secrets/credentials.json` file
1. Insert below json to the file with your configuration
```
{
"database": "database_name",
"user": "user",
"password": "password",
"cluster_url": "cluster_url"
}
```

## Airbyte Employee

1. Access the `MONGODB_TEST_CREDS` secret on LastPass
1. Create a file with the contents at `secrets/credentials.json`


#### Acceptance Tests
To run acceptance and custom integration tests:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ public class MongoDbSource extends AbstractDbSource<BsonType, MongoDatabase> {

private static final Logger LOGGER = LoggerFactory.getLogger(MongoDbSource.class);

private static final String MONGODB_SERVER_URL = "mongodb://%s%s:%s/?";
private static final String MONGODB_CLUSTER_URL = "mongodb+srv://%s%s/%s?retryWrites=true&w=majority&";
private static final String MONGODB_REPLICA_URL = "mongodb://%s%s/?replicaSet=%s&";
private static final String MONGODB_SERVER_URL = "mongodb://%s%s:%s/%s?authSource=%s&ssl=%s";
private static final String MONGODB_CLUSTER_URL = "mongodb+srv://%s%s/%s?authSource=%s&retryWrites=true&w=majority&tls=true";
private static final String MONGODB_REPLICA_URL = "mongodb://%s%s/%s?authSource=%s&directConnection=false&ssl=true";
private static final String USER = "user";
private static final String PASSWORD = "password";
private static final String INSTANCE_TYPE = "instance_type";
Expand All @@ -90,26 +90,29 @@ public JsonNode toDatabaseConfig(JsonNode config) {
? String.format("%s:%s@", config.get(USER).asText(), config.get(PASSWORD).asText())
: StringUtils.EMPTY;

StringBuilder connectionStrBuilder = new StringBuilder();
JsonNode instanceConfig = config.get(INSTANCE_TYPE);
String instanceConnectUrl;
if (instanceConfig.has(HOST) && instanceConfig.has(PORT)) {
instanceConnectUrl = String.format(MONGODB_SERVER_URL,
credentials, instanceConfig.get(HOST).asText(), instanceConfig.get(PORT).asText());
// Standalone MongoDb Instance
connectionStrBuilder.append(String.format(MONGODB_SERVER_URL, credentials, instanceConfig.get(HOST).asText(), instanceConfig.get(PORT).asText(),
config.get(DATABASE).asText(), config.get(AUTH_SOURCE).asText(), instanceConfig.get(TLS).asBoolean()));
} else if (instanceConfig.has(CLUSTER_URL)) {
instanceConnectUrl = String.format(MONGODB_CLUSTER_URL,
credentials, instanceConfig.get(CLUSTER_URL).asText(), config.get(DATABASE).asText());
// MongoDB Atlas
connectionStrBuilder.append(
String.format(MONGODB_CLUSTER_URL, credentials, instanceConfig.get(CLUSTER_URL).asText(), config.get(DATABASE).asText(),
config.get(AUTH_SOURCE).asText()));
} else {
instanceConnectUrl = String.format(MONGODB_REPLICA_URL,
credentials, instanceConfig.get(SERVER_ADDRESSES).asText(), config.get(REPLICA_SET).asText());
}

String options = "authSource=".concat(config.get(AUTH_SOURCE).asText());
if (config.get(TLS).asBoolean()) {
options.concat("&tls=true");
// Replica Set & Shard
connectionStrBuilder.append(
String.format(MONGODB_REPLICA_URL, credentials, instanceConfig.get(SERVER_ADDRESSES).asText(), config.get(DATABASE).asText(),
config.get(AUTH_SOURCE).asText()));
if (instanceConfig.has(REPLICA_SET)) {
connectionStrBuilder.append(String.format("&replicaSet=%s", instanceConfig.get(REPLICA_SET).asText()));
}
}

return Jsons.jsonNode(ImmutableMap.builder()
.put("connectionString", instanceConnectUrl + options)
.put("connectionString", connectionStrBuilder.toString())
.put("database", config.get(DATABASE).asText())
.build());
}
Expand Down Expand Up @@ -207,7 +210,7 @@ public AutoCloseableIterator<JsonNode> queryTableIncremental(MongoDatabase datab
String cursorField,
BsonType cursorFieldType,
String cursor) {
Bson greaterComparison = gt(cursorField, cursor);
Bson greaterComparison = gt(cursorField, MongoUtils.getBsonValue(cursorFieldType, cursor));
return queryTable(database, columnNames, tableName, greaterComparison);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
"instance_type": {
"type": "object",
"title": "MongoDb instance type",
"description": "MongoDb instance to connect to.",
"description": "MongoDb instance to connect to. For MongoDB Atlas and Replica Set TLS connection is used by default.",
"order": 0,
"oneOf": [
{
Expand All @@ -34,13 +34,20 @@
"default": 27017,
"examples": ["27017"],
"order": 1
},
"tls": {
"title": "TLS connection",
"type": "boolean",
"description": "Indicates whether TLS encryption protocol will be used to connect to MongoDB. It is recommended to use TLS connection if possible. For more information see <a href=\"https://docs.airbyte.io/integrations/sources/mongodb-v2\">documentation</a>.",
"default": false,
"order": 2
}
}
},
{
"title": "Replica Set",
"additionalProperties": false,
"required": ["server_addresses", "replica_set"],
"required": ["server_addresses"],
"properties": {
"server_addresses": {
"title": "Server addresses",
Expand Down Expand Up @@ -98,13 +105,6 @@
"default": "admin",
"examples": ["admin"],
"order": 4
},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you will need to make additionalProperties=true for this to be backwards compatible, and potentially handle it manually in the connector code

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a lot, I've added.

"tls": {
"title": "TLS connection",
"type": "boolean",
"description": "If this switch is enabled, TLS connections will be used to connect to MongoDB.",
"default": false,
"order": 5
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,11 @@
package io.airbyte.integrations.io.airbyte.integration_tests.sources;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.mongodb.client.MongoCollection;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.db.mongodb.MongoDatabase;
import io.airbyte.integrations.standardtest.source.SourceAcceptanceTest;
import io.airbyte.integrations.standardtest.source.TestDestinationEnv;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
Expand All @@ -44,15 +41,14 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import org.bson.Document;
import org.testcontainers.containers.MongoDBContainer;
import org.testcontainers.utility.DockerImageName;

public class MongoDbSourceAcceptanceTest extends SourceAcceptanceTest {
public abstract class MongoDbSourceAbstractAcceptanceTest extends SourceAcceptanceTest {

private MongoDBContainer mongoDBContainer;
private JsonNode config;
private MongoDatabase database;
protected static final String DATABASE_NAME = "test";
protected static final String COLLECTION_NAME = "acceptance_test";

protected JsonNode config;
protected MongoDatabase database;

@Override
protected String getImageName() {
Expand All @@ -64,43 +60,6 @@ protected JsonNode getConfig() throws Exception {
return config;
}

@Override
protected void setupEnvironment(TestDestinationEnv environment) throws Exception {
mongoDBContainer = new MongoDBContainer(DockerImageName.parse("mongo:4.0.10"));
mongoDBContainer.start();

final JsonNode instanceConfig = Jsons.jsonNode(ImmutableMap.builder()
.put("host", mongoDBContainer.getHost())
.put("port", mongoDBContainer.getFirstMappedPort())
.build());

config = Jsons.jsonNode(ImmutableMap.builder()
.put("instance_type", instanceConfig)
.put("database", "test")
.put("auth_source", "admin")
.put("tls", false)
.build());

String connectionString = String.format("mongodb://%s:%s/",
mongoDBContainer.getHost(),
mongoDBContainer.getFirstMappedPort());

database = new MongoDatabase(connectionString, "test");

MongoCollection<Document> collection = database.createCollection("acceptance_test");
var doc1 = new Document("id", "0001").append("name", "Test");
var doc2 = new Document("id", "0002").append("name", "Mongo");
var doc3 = new Document("id", "0003").append("name", "Source");

collection.insertMany(List.of(doc1, doc2, doc3));
}

@Override
protected void tearDown(TestDestinationEnv testEnv) throws Exception {
database.close();
mongoDBContainer.close();
}

@Override
protected ConnectorSpecification getSpec() throws Exception {
return Jsons.deserialize(MoreResources.readResource("spec.json"), ConnectorSpecification.class);
Expand All @@ -115,7 +74,7 @@ protected ConfiguredAirbyteCatalog getConfiguredCatalog() throws Exception {
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withCursorField(List.of("_id"))
.withStream(CatalogHelpers.createAirbyteStream(
"test.acceptance_test",
DATABASE_NAME + "." + COLLECTION_NAME,
Field.of("_id", JsonSchemaPrimitive.STRING),
Field.of("id", JsonSchemaPrimitive.STRING),
Field.of("name", JsonSchemaPrimitive.STRING))
Expand Down
Loading