From cd33effa8229f4cd9bf970be4501455ac8166f78 Mon Sep 17 00:00:00 2001 From: Ryan Fu Date: Wed, 26 Oct 2022 15:45:26 -0700 Subject: [PATCH] Add SSH tunnel for MongoDB Destination (#18280) * Add SSH tunnel for MongoDB Destination * Fixed host/port resolution issues with Docker containers * Formatting changes * Bump version number and adds documentation for SSH tunnel * auto-bump connector version Co-authored-by: Octavia Squidington III --- .../seed/destination_definitions.yaml | 2 +- .../resources/seed/destination_specs.yaml | 103 +++++++++++++++++- .../Dockerfile | 2 +- .../MongodbDestinationStrictEncrypt.java | 6 +- .../connectors/destination-mongodb/Dockerfile | 2 +- .../mongodb/MongodbDestination.java | 7 +- .../MongodbDestinationAcceptanceTest.java | 101 ++++++++++------- ...shKeyMongoDbDestinationAcceptanceTest.java | 16 +++ .../SshMongoDbDestinationAcceptanceTest.java | 91 ++++++++++++++++ ...swordMongoDbDestinationAcceptanceTest.java | 15 +++ docs/integrations/destinations/mongodb.md | 39 +++++-- 11 files changed, 326 insertions(+), 58 deletions(-) create mode 100644 airbyte-integrations/connectors/destination-mongodb/src/test-integration/java/io/airbyte/integrations/destination/mongodb/SshKeyMongoDbDestinationAcceptanceTest.java create mode 100644 airbyte-integrations/connectors/destination-mongodb/src/test-integration/java/io/airbyte/integrations/destination/mongodb/SshMongoDbDestinationAcceptanceTest.java create mode 100644 airbyte-integrations/connectors/destination-mongodb/src/test-integration/java/io/airbyte/integrations/destination/mongodb/SshPasswordMongoDbDestinationAcceptanceTest.java diff --git a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml index bc3d9d05aba8..da5c37324eac 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml @@ -189,7 +189,7 @@ - name: MongoDB destinationDefinitionId: 8b746512-8c2e-6ac1-4adc-b59faafd473c dockerRepository: airbyte/destination-mongodb - dockerImageTag: 0.1.7 + dockerImageTag: 0.1.8 documentationUrl: https://docs.airbyte.com/integrations/destinations/mongodb icon: mongodb.svg releaseStage: alpha diff --git a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml index fd8f56a34447..b32e0fe2d3d6 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml @@ -3196,7 +3196,7 @@ supported_destination_sync_modes: - "overwrite" - "append" -- dockerImage: "airbyte/destination-mongodb:0.1.7" +- dockerImage: "airbyte/destination-mongodb:0.1.8" spec: documentationUrl: "https://docs.airbyte.com/integrations/destinations/mongodb" connectionSpecification: @@ -3328,6 +3328,107 @@ type: "string" airbyte_secret: true order: 2 + tunnel_method: + type: "object" + title: "SSH Tunnel Method" + description: "Whether to initiate an SSH tunnel before connecting to the\ + \ database, and if so, which kind of authentication to use." + oneOf: + - title: "No Tunnel" + required: + - "tunnel_method" + properties: + tunnel_method: + description: "No ssh tunnel needed to connect to database" + type: "string" + const: "NO_TUNNEL" + order: 0 + - title: "SSH Key Authentication" + required: + - "tunnel_method" + - "tunnel_host" + - "tunnel_port" + - "tunnel_user" + - "ssh_key" + properties: + tunnel_method: + description: "Connect through a jump server tunnel host using username\ + \ and ssh key" + type: "string" + const: "SSH_KEY_AUTH" + order: 0 + tunnel_host: + title: "SSH Tunnel Jump Server Host" + description: "Hostname of the jump server host that allows inbound\ + \ ssh tunnel." + type: "string" + order: 1 + tunnel_port: + title: "SSH Connection Port" + description: "Port on the proxy/jump server that accepts inbound ssh\ + \ connections." + type: "integer" + minimum: 0 + maximum: 65536 + default: 22 + examples: + - "22" + order: 2 + tunnel_user: + title: "SSH Login Username" + description: "OS-level username for logging into the jump server host." + type: "string" + order: 3 + ssh_key: + title: "SSH Private Key" + description: "OS-level user account ssh key credentials in RSA PEM\ + \ format ( created with ssh-keygen -t rsa -m PEM -f myuser_rsa )" + type: "string" + airbyte_secret: true + multiline: true + order: 4 + - title: "Password Authentication" + required: + - "tunnel_method" + - "tunnel_host" + - "tunnel_port" + - "tunnel_user" + - "tunnel_user_password" + properties: + tunnel_method: + description: "Connect through a jump server tunnel host using username\ + \ and password authentication" + type: "string" + const: "SSH_PASSWORD_AUTH" + order: 0 + tunnel_host: + title: "SSH Tunnel Jump Server Host" + description: "Hostname of the jump server host that allows inbound\ + \ ssh tunnel." + type: "string" + order: 1 + tunnel_port: + title: "SSH Connection Port" + description: "Port on the proxy/jump server that accepts inbound ssh\ + \ connections." + type: "integer" + minimum: 0 + maximum: 65536 + default: 22 + examples: + - "22" + order: 2 + tunnel_user: + title: "SSH Login Username" + description: "OS-level username for logging into the jump server host" + type: "string" + order: 3 + tunnel_user_password: + title: "Password" + description: "OS-level password for logging into the jump server host" + type: "string" + airbyte_secret: true + order: 4 supportsIncremental: true supportsNormalization: false supportsDBT: false diff --git a/airbyte-integrations/connectors/destination-mongodb-strict-encrypt/Dockerfile b/airbyte-integrations/connectors/destination-mongodb-strict-encrypt/Dockerfile index 6bffbe8f7041..6b4ecd23bc7b 100644 --- a/airbyte-integrations/connectors/destination-mongodb-strict-encrypt/Dockerfile +++ b/airbyte-integrations/connectors/destination-mongodb-strict-encrypt/Dockerfile @@ -16,5 +16,5 @@ ENV APPLICATION destination-mongodb-strict-encrypt COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=0.1.7 +LABEL io.airbyte.version=0.1.8 LABEL io.airbyte.name=airbyte/destination-mongodb-strict-encrypt diff --git a/airbyte-integrations/connectors/destination-mongodb-strict-encrypt/src/main/java/io.airbyte.integrations.destination.mongodb/MongodbDestinationStrictEncrypt.java b/airbyte-integrations/connectors/destination-mongodb-strict-encrypt/src/main/java/io.airbyte.integrations.destination.mongodb/MongodbDestinationStrictEncrypt.java index fdf208f48c57..2cebbbd508bd 100644 --- a/airbyte-integrations/connectors/destination-mongodb-strict-encrypt/src/main/java/io.airbyte.integrations.destination.mongodb/MongodbDestinationStrictEncrypt.java +++ b/airbyte-integrations/connectors/destination-mongodb-strict-encrypt/src/main/java/io.airbyte.integrations.destination.mongodb/MongodbDestinationStrictEncrypt.java @@ -18,11 +18,11 @@ public class MongodbDestinationStrictEncrypt extends SpecModifyingDestination im private static final Logger LOGGER = LoggerFactory.getLogger(MongodbDestinationStrictEncrypt.class); public MongodbDestinationStrictEncrypt() { - super(new MongodbDestination()); + super(MongodbDestination.sshWrappedDestination()); } @Override - public ConnectorSpecification modifySpec(ConnectorSpecification originalSpec) throws Exception { + public ConnectorSpecification modifySpec(final ConnectorSpecification originalSpec) throws Exception { final ConnectorSpecification spec = Jsons.clone(originalSpec); // removing tls property for a standalone instance to disable possibility to switch off a tls // connection @@ -30,7 +30,7 @@ public ConnectorSpecification modifySpec(ConnectorSpecification originalSpec) th return spec; } - public static void main(String[] args) throws Exception { + public static void main(final String[] args) throws Exception { final Destination destination = new MongodbDestinationStrictEncrypt(); LOGGER.info("starting destination: {}", MongodbDestinationStrictEncrypt.class); new IntegrationRunner(destination).run(args); diff --git a/airbyte-integrations/connectors/destination-mongodb/Dockerfile b/airbyte-integrations/connectors/destination-mongodb/Dockerfile index 00cb6114919d..cff1b88e848c 100644 --- a/airbyte-integrations/connectors/destination-mongodb/Dockerfile +++ b/airbyte-integrations/connectors/destination-mongodb/Dockerfile @@ -16,5 +16,5 @@ ENV APPLICATION destination-mongodb COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=0.1.7 +LABEL io.airbyte.version=0.1.8 LABEL io.airbyte.name=airbyte/destination-mongodb diff --git a/airbyte-integrations/connectors/destination-mongodb/src/main/java/io/airbyte/integrations/destination/mongodb/MongodbDestination.java b/airbyte-integrations/connectors/destination-mongodb/src/main/java/io/airbyte/integrations/destination/mongodb/MongodbDestination.java index 88ac858e0507..0a46d2700d90 100644 --- a/airbyte-integrations/connectors/destination-mongodb/src/main/java/io/airbyte/integrations/destination/mongodb/MongodbDestination.java +++ b/airbyte-integrations/connectors/destination-mongodb/src/main/java/io/airbyte/integrations/destination/mongodb/MongodbDestination.java @@ -25,6 +25,7 @@ import io.airbyte.integrations.base.AirbyteTraceMessageUtility; import io.airbyte.integrations.base.Destination; import io.airbyte.integrations.base.IntegrationRunner; +import io.airbyte.integrations.base.ssh.SshWrappedDestination; import io.airbyte.integrations.destination.mongodb.exception.MongodbDatabaseException; import io.airbyte.protocol.models.AirbyteConnectionStatus; import io.airbyte.protocol.models.AirbyteMessage; @@ -61,12 +62,16 @@ public class MongodbDestination extends BaseConnector implements Destination { private final MongodbNameTransformer namingResolver; + public static Destination sshWrappedDestination() { + return new SshWrappedDestination(new MongodbDestination(), JdbcUtils.HOST_LIST_KEY, JdbcUtils.PORT_LIST_KEY); + } + public MongodbDestination() { namingResolver = new MongodbNameTransformer(); } public static void main(final String[] args) throws Exception { - final Destination destination = new MongodbDestination(); + final Destination destination = sshWrappedDestination(); LOGGER.info("starting destination: {}", MongodbDestination.class); new IntegrationRunner(destination).run(args); LOGGER.info("completed destination: {}", MongodbDestination.class); diff --git a/airbyte-integrations/connectors/destination-mongodb/src/test-integration/java/io/airbyte/integrations/destination/mongodb/MongodbDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-mongodb/src/test-integration/java/io/airbyte/integrations/destination/mongodb/MongodbDestinationAcceptanceTest.java index e5642a090ed8..20e5d5f9fee5 100644 --- a/airbyte-integrations/connectors/destination-mongodb/src/test-integration/java/io/airbyte/integrations/destination/mongodb/MongodbDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-mongodb/src/test-integration/java/io/airbyte/integrations/destination/mongodb/MongodbDestinationAcceptanceTest.java @@ -19,6 +19,7 @@ import io.airbyte.integrations.standardtest.destination.comparator.AdvancedTestDataComparator; import io.airbyte.integrations.standardtest.destination.comparator.TestDataComparator; import io.airbyte.protocol.models.AirbyteConnectionStatus; +import java.io.IOException; import java.util.ArrayList; import java.util.List; import org.bson.Document; @@ -27,14 +28,14 @@ public class MongodbDestinationAcceptanceTest extends DestinationAcceptanceTest { - private static final String DOCKER_IMAGE_NAME = "mongo:4.0.10"; - private static final String DATABASE_NAME = "admin"; + protected static final String DOCKER_IMAGE_NAME = "mongo:4.0.10"; + protected static final String DATABASE_NAME = "admin"; private static final String DATABASE_FAIL_NAME = "fail_db"; - private static final String AUTH_TYPE = "auth_type"; - private static final String AIRBYTE_DATA = "_airbyte_data"; + protected static final String AUTH_TYPE = "auth_type"; + protected static final String AIRBYTE_DATA = "_airbyte_data"; private MongoDBContainer container; - private final MongodbNameTransformer namingResolver = new MongodbNameTransformer(); + protected final MongodbNameTransformer namingResolver = new MongodbNameTransformer(); @Override protected String getImageName() { @@ -42,7 +43,7 @@ protected String getImageName() { } @Override - protected JsonNode getConfig() { + protected JsonNode getConfig() throws Exception { return Jsons.jsonNode(ImmutableMap.builder() .put(JdbcUtils.HOST_KEY, container.getHost()) .put(JdbcUtils.PORT_KEY, container.getFirstMappedPort()) @@ -52,7 +53,7 @@ protected JsonNode getConfig() { } @Override - protected JsonNode getFailCheckConfig() { + protected JsonNode getFailCheckConfig() throws Exception { return Jsons.jsonNode(ImmutableMap.builder() .put(JdbcUtils.HOST_KEY, container.getHost()) .put(JdbcUtils.PORT_KEY, container.getFirstMappedPort()) @@ -110,54 +111,72 @@ protected List retrieveRecords(final TestDestinationEnv testEnv, */ @Test void testCheckIncorrectPasswordFailure() { - final JsonNode invalidConfig = getFailCheckConfig(); - ((ObjectNode) invalidConfig).put(JdbcUtils.DATABASE_KEY, DATABASE_NAME); - ((ObjectNode) invalidConfig.get(AUTH_TYPE)).put(JdbcUtils.PASSWORD_KEY, "fake"); - final MongodbDestination destination = new MongodbDestination(); - final AirbyteConnectionStatus status = destination.check(invalidConfig); - assertEquals(AirbyteConnectionStatus.Status.FAILED, status.getStatus()); - assertTrue(status.getMessage().contains("State code: 18")); + try { + final JsonNode invalidConfig = getFailCheckConfig(); + ((ObjectNode) invalidConfig).put(JdbcUtils.DATABASE_KEY, DATABASE_NAME); + ((ObjectNode) invalidConfig.get(AUTH_TYPE)).put(JdbcUtils.PASSWORD_KEY, "fake"); + final MongodbDestination destination = new MongodbDestination(); + final AirbyteConnectionStatus status = destination.check(invalidConfig); + assertEquals(AirbyteConnectionStatus.Status.FAILED, status.getStatus()); + } catch (final Exception e) { + assertTrue(e instanceof IOException); + } } @Test public void testCheckIncorrectUsernameFailure() { - final JsonNode invalidConfig = getFailCheckConfig(); - ((ObjectNode) invalidConfig).put(JdbcUtils.DATABASE_KEY, DATABASE_NAME); - ((ObjectNode) invalidConfig.get(AUTH_TYPE)).put(JdbcUtils.USERNAME_KEY, "fakeusername"); - final MongodbDestination destination = new MongodbDestination(); - final AirbyteConnectionStatus status = destination.check(invalidConfig); - assertEquals(AirbyteConnectionStatus.Status.FAILED, status.getStatus()); - assertTrue(status.getMessage().contains("State code: 18")); + try { + final JsonNode invalidConfig = getFailCheckConfig(); + ((ObjectNode) invalidConfig).put(JdbcUtils.DATABASE_KEY, DATABASE_NAME); + ((ObjectNode) invalidConfig.get(AUTH_TYPE)).put(JdbcUtils.USERNAME_KEY, "fakeusername"); + final MongodbDestination destination = new MongodbDestination(); + final AirbyteConnectionStatus status = destination.check(invalidConfig); + assertEquals(AirbyteConnectionStatus.Status.FAILED, status.getStatus()); + } catch (final Exception e) { + assertTrue(e instanceof IOException); + } + } @Test public void testCheckIncorrectDataBaseFailure() { - final JsonNode invalidConfig = getFailCheckConfig(); - ((ObjectNode) invalidConfig).put(JdbcUtils.DATABASE_KEY, DATABASE_FAIL_NAME); - final MongodbDestination destination = new MongodbDestination(); - final AirbyteConnectionStatus status = destination.check(invalidConfig); - assertEquals(AirbyteConnectionStatus.Status.FAILED, status.getStatus()); - assertTrue(status.getMessage().contains("State code: 18")); + try { + final JsonNode invalidConfig = getFailCheckConfig(); + ((ObjectNode) invalidConfig).put(JdbcUtils.DATABASE_KEY, DATABASE_FAIL_NAME); + final MongodbDestination destination = new MongodbDestination(); + final AirbyteConnectionStatus status = destination.check(invalidConfig); + assertEquals(AirbyteConnectionStatus.Status.FAILED, status.getStatus()); + } catch (final Exception e) { + assertTrue(e instanceof IOException); + } + } @Test public void testCheckIncorrectHost() { - final JsonNode invalidConfig = getConfig(); - ((ObjectNode) invalidConfig).put(JdbcUtils.HOST_KEY, "localhost2"); - final MongodbDestination destination = new MongodbDestination(); - final AirbyteConnectionStatus status = destination.check(invalidConfig); - assertEquals(AirbyteConnectionStatus.Status.FAILED, status.getStatus()); - assertTrue(status.getMessage().contains("State code: -3")); + try { + final JsonNode invalidConfig = getConfig(); + ((ObjectNode) invalidConfig).put(JdbcUtils.HOST_KEY, "localhost2"); + final MongodbDestination destination = new MongodbDestination(); + final AirbyteConnectionStatus status = destination.check(invalidConfig); + assertEquals(AirbyteConnectionStatus.Status.FAILED, status.getStatus()); + } catch (final Exception e) { + assertTrue(e instanceof IOException); + } + } @Test public void testCheckIncorrectPort() { - final JsonNode invalidConfig = getConfig(); - ((ObjectNode) invalidConfig).put(JdbcUtils.PORT_KEY, 1234); - final MongodbDestination destination = new MongodbDestination(); - final AirbyteConnectionStatus status = destination.check(invalidConfig); - assertEquals(AirbyteConnectionStatus.Status.FAILED, status.getStatus()); - assertTrue(status.getMessage().contains("State code: -3")); + try { + final JsonNode invalidConfig = getConfig(); + ((ObjectNode) invalidConfig).put(JdbcUtils.PORT_KEY, 1234); + final MongodbDestination destination = new MongodbDestination(); + final AirbyteConnectionStatus status = destination.check(invalidConfig); + assertEquals(AirbyteConnectionStatus.Status.FAILED, status.getStatus()); + } catch (final Exception e) { + assertTrue(e instanceof IOException); + } } @Override @@ -174,13 +193,13 @@ protected void tearDown(final TestDestinationEnv testEnv) { /* Helpers */ - private JsonNode getAuthTypeConfig() { + protected JsonNode getAuthTypeConfig() { return Jsons.deserialize("{\n" + " \"authorization\": \"none\"\n" + "}"); } - private MongoDatabase getMongoDatabase(final String host, final int port, final String databaseName) { + protected MongoDatabase getMongoDatabase(final String host, final int port, final String databaseName) { try { final String connectionString = String.format("mongodb://%s:%s/", host, port); return new MongoDatabase(connectionString, databaseName); diff --git a/airbyte-integrations/connectors/destination-mongodb/src/test-integration/java/io/airbyte/integrations/destination/mongodb/SshKeyMongoDbDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-mongodb/src/test-integration/java/io/airbyte/integrations/destination/mongodb/SshKeyMongoDbDestinationAcceptanceTest.java new file mode 100644 index 000000000000..9a9e5d9889ce --- /dev/null +++ b/airbyte-integrations/connectors/destination-mongodb/src/test-integration/java/io/airbyte/integrations/destination/mongodb/SshKeyMongoDbDestinationAcceptanceTest.java @@ -0,0 +1,16 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.mongodb; + +import io.airbyte.integrations.base.ssh.SshTunnel; +import io.airbyte.integrations.base.ssh.SshTunnel.TunnelMethod; + +public class SshKeyMongoDbDestinationAcceptanceTest extends SshMongoDbDestinationAcceptanceTest { + + @Override + public SshTunnel.TunnelMethod getTunnelMethod() { + return TunnelMethod.SSH_KEY_AUTH; + } +} diff --git a/airbyte-integrations/connectors/destination-mongodb/src/test-integration/java/io/airbyte/integrations/destination/mongodb/SshMongoDbDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-mongodb/src/test-integration/java/io/airbyte/integrations/destination/mongodb/SshMongoDbDestinationAcceptanceTest.java new file mode 100644 index 000000000000..3acae92257cc --- /dev/null +++ b/airbyte-integrations/connectors/destination-mongodb/src/test-integration/java/io/airbyte/integrations/destination/mongodb/SshMongoDbDestinationAcceptanceTest.java @@ -0,0 +1,91 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.mongodb; + +import static com.mongodb.client.model.Projections.excludeId; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.collect.ImmutableMap; +import com.mongodb.client.MongoCursor; +import io.airbyte.commons.json.Jsons; +import io.airbyte.db.jdbc.JdbcUtils; +import io.airbyte.db.mongodb.MongoDatabase; +import io.airbyte.integrations.base.ssh.SshBastionContainer; +import io.airbyte.integrations.base.ssh.SshTunnel; +import io.airbyte.integrations.util.HostPortResolver; +import java.util.ArrayList; +import java.util.List; +import org.bson.Document; +import org.testcontainers.containers.MongoDBContainer; +import org.testcontainers.containers.Network; + +public abstract class SshMongoDbDestinationAcceptanceTest extends MongodbDestinationAcceptanceTest { + + private static final Network network = Network.newNetwork(); + private static final SshBastionContainer bastion = new SshBastionContainer(); + private static MongoDBContainer container; + private static final int DEFAULT_PORT = 27017; + + public abstract SshTunnel.TunnelMethod getTunnelMethod(); + + @Override + protected void setup(final TestDestinationEnv testEnv) { + container = new MongoDBContainer(DOCKER_IMAGE_NAME) + .withNetwork(network) + .withExposedPorts(DEFAULT_PORT); + container.start(); + bastion.initAndStartBastion(network); + } + + @Override + protected JsonNode getConfig() throws Exception { + return bastion.getTunnelConfig(getTunnelMethod(), ImmutableMap.builder() + .put(JdbcUtils.HOST_KEY, HostPortResolver.resolveIpAddress(container)) + .put(JdbcUtils.PORT_KEY, container.getExposedPorts().get(0)) + .put(JdbcUtils.DATABASE_KEY, DATABASE_NAME) + .put(AUTH_TYPE, getAuthTypeConfig())); + } + + @Override + protected JsonNode getFailCheckConfig() throws Exception { + // should result in a failed connection check + return bastion.getTunnelConfig(getTunnelMethod(), ImmutableMap.builder() + .put(JdbcUtils.HOST_KEY, HostPortResolver.resolveIpAddress(container)) + .put(JdbcUtils.PORT_KEY, container.getExposedPorts().get(0)) + .put(JdbcUtils.DATABASE_KEY, DATABASE_NAME) + .put(AUTH_TYPE, Jsons.jsonNode(ImmutableMap.builder() + .put("authorization", "login/password") + .put(JdbcUtils.USERNAME_KEY, "user") + .put(JdbcUtils.PASSWORD_KEY, "invalid_pass") + .build()))); + } + + @Override + protected List retrieveRecords(final TestDestinationEnv testEnv, + final String streamName, + final String namespace, + final JsonNode streamSchema) { + final MongoDatabase database = getMongoDatabase(HostPortResolver.resolveIpAddress(container), + container.getExposedPorts().get(0), DATABASE_NAME); + final var collection = database.getOrCreateNewCollection(namingResolver.getRawTableName(streamName)); + final List result = new ArrayList<>(); + try (final MongoCursor cursor = collection.find().projection(excludeId()).iterator()) { + while (cursor.hasNext()) { + result.add(Jsons.jsonNode(cursor.next().get(AIRBYTE_DATA))); + } + } + return result; + } + + + @Override + protected void tearDown(final TestDestinationEnv testEnv) { + container.stop(); + container.close(); + bastion.getContainer().stop(); + bastion.getContainer().close(); + } + +} diff --git a/airbyte-integrations/connectors/destination-mongodb/src/test-integration/java/io/airbyte/integrations/destination/mongodb/SshPasswordMongoDbDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-mongodb/src/test-integration/java/io/airbyte/integrations/destination/mongodb/SshPasswordMongoDbDestinationAcceptanceTest.java new file mode 100644 index 000000000000..379afd47200b --- /dev/null +++ b/airbyte-integrations/connectors/destination-mongodb/src/test-integration/java/io/airbyte/integrations/destination/mongodb/SshPasswordMongoDbDestinationAcceptanceTest.java @@ -0,0 +1,15 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.mongodb; + +import io.airbyte.integrations.base.ssh.SshTunnel; + +public class SshPasswordMongoDbDestinationAcceptanceTest extends SshMongoDbDestinationAcceptanceTest { + + @Override + public SshTunnel.TunnelMethod getTunnelMethod() { + return SshTunnel.TunnelMethod.SSH_PASSWORD_AUTH; + } +} diff --git a/docs/integrations/destinations/mongodb.md b/docs/integrations/destinations/mongodb.md index de1ebe28b83b..6a357aa6215b 100644 --- a/docs/integrations/destinations/mongodb.md +++ b/docs/integrations/destinations/mongodb.md @@ -60,6 +60,26 @@ You should now have all the requirements needed to configure MongoDB as a destin For more information regarding configuration parameters, please see [MongoDb Documentation](https://docs.mongodb.com/drivers/java/sync/v4.3/fundamentals/connection/). +### Connection via SSH Tunnel + +Airbyte has the ability to connect to an MongoDB instance via an SSH Tunnel. +The reason you might want to do this because it is not possible \(or against security policy\) to connect to your MongoDB instance directly \(e.g. it does not have a public IP address\). + +When using an SSH tunnel, you are configuring Airbyte to connect to an intermediate server \(a.k.a. a bastion sever\) that _does_ have direct access to the MongoDB instance. +Airbyte connects to the bastion and then asks the bastion to connect directly to the server. + +Using this feature requires additional configuration, when creating the source. We will talk through what each piece of configuration means. + +1. Configure all fields for the source as you normally would, except `SSH Tunnel Method`. +2. `SSH Tunnel Method` defaults to `No Tunnel` \(meaning a direct connection\). If you want to use an SSH Tunnel choose `SSH Key Authentication` or `Password Authentication`. + 1. Choose `Key Authentication` if you will be using an RSA private key as your secret for establishing the SSH Tunnel \(see below for more information on generating this key\). + 2. Choose `Password Authentication` if you will be using a password as your secret for establishing the SSH Tunnel. +3. `SSH Tunnel Jump Server Host` refers to the intermediate \(bastion\) server that Airbyte will connect to. This should be a hostname or an IP Address. +4. `SSH Connection Port` is the port on the bastion server with which to make the SSH connection. The default port for SSH connections is `22`, so unless you have explicitly changed something, go with the default. +5. `SSH Login Username` is the username that Airbyte should use when connection to the bastion server. This is NOT the TiDB username. +6. If you are using `Password Authentication`, then `SSH Login Username` should be set to the password of the User from the previous step. If you are using `SSH Key Authentication` TiDB password, but the password for the OS-user that Airbyte is using to perform commands on the bastion. +7. If you are using `SSH Key Authentication`, then `SSH Private Key` should be set to the RSA Private Key that you are using to create the SSH connection. This should be the full contents of the key file starting with `-----BEGIN RSA PRIVATE KEY-----` and ending with `-----END RSA PRIVATE KEY-----`. + ## Naming Conventions The following information comes from the [MongoDB Limits and Thresholds](https://docs.mongodb.com/manual/reference/limits/) documentation. @@ -95,13 +115,14 @@ Collection names should begin with an underscore or a letter character, and cann ## Changelog -| Version | Date | Pull Request | Subject | -| :--- | :--- | :--- | :--- | -| 0.1.7 | 2022-09-02 | [16025](https://github.com/airbytehq/airbyte/pull/16025) | Remove additionalProperties:false from spec | -| 0.1.6 | 2022-08-02 | [15211](https://github.com/airbytehq/airbyte/pull/15211) | Fix standard mode | -| 0.1.5 | 2022-07-27 | [14561](https://github.com/airbytehq/airbyte/pull/14561) | Change Airbyte Id from MD5 to SHA256 | -| 0.1.4 | 2022-02-14 | [10256](https://github.com/airbytehq/airbyte/pull/10256) | (unpublished) Add `-XX:+ExitOnOutOfMemoryError` JVM option | -| 0.1.3 | 2021-12-30 | [8809](https://github.com/airbytehq/airbyte/pull/8809) | Update connector fields title/description | -| 0.1.2 | 2021-10-18 | [6945](https://github.com/airbytehq/airbyte/pull/6945) | Create a secure-only MongoDb destination | -| 0.1.1 | 2021-09-29 | [6536](https://github.com/airbytehq/airbyte/pull/6536) | Destination MongoDb: added support via TLS/SSL | +| Version | Date | Pull Request | Subject | +|:--------|:-----------|:---------------------------------------------------------|:-----------------------------------------------------------| +| 0.1.8 | 2022-10-26 | [18280](https://github.com/airbytehq/airbyte/pull/18280) | Adds SSH tunneling | +| 0.1.7 | 2022-09-02 | [16025](https://github.com/airbytehq/airbyte/pull/16025) | Remove additionalProperties:false from spec | +| 0.1.6 | 2022-08-02 | [15211](https://github.com/airbytehq/airbyte/pull/15211) | Fix standard mode | +| 0.1.5 | 2022-07-27 | [14561](https://github.com/airbytehq/airbyte/pull/14561) | Change Airbyte Id from MD5 to SHA256 | +| 0.1.4 | 2022-02-14 | [10256](https://github.com/airbytehq/airbyte/pull/10256) | (unpublished) Add `-XX:+ExitOnOutOfMemoryError` JVM option | +| 0.1.3 | 2021-12-30 | [8809](https://github.com/airbytehq/airbyte/pull/8809) | Update connector fields title/description | +| 0.1.2 | 2021-10-18 | [6945](https://github.com/airbytehq/airbyte/pull/6945) | Create a secure-only MongoDb destination | +| 0.1.1 | 2021-09-29 | [6536](https://github.com/airbytehq/airbyte/pull/6536) | Destination MongoDb: added support via TLS/SSL |