From 57a2824101c922c6e634103b06d3a13828a799d3 Mon Sep 17 00:00:00 2001 From: ryankfu Date: Thu, 20 Oct 2022 17:58:04 -0700 Subject: [PATCH 1/5] Add SSH tunnel for MongoDB Destination --- .../resources/seed/destination_specs.yaml | 101 ++++++++++++++++++ .../MongodbDestinationStrictEncrypt.java | 6 +- .../mongodb/MongodbDestination.java | 7 +- .../MongodbDestinationAcceptanceTest.java | 100 ++++++++++------- ...shKeyMongoDbDestinationAcceptanceTest.java | 16 +++ .../SshMongoDbDestinationAcceptanceTest.java | 65 +++++++++++ ...swordMongoDbDestinationAcceptanceTest.java | 15 +++ 7 files changed, 268 insertions(+), 42 deletions(-) create mode 100644 airbyte-integrations/connectors/destination-mongodb/src/test-integration/java/io/airbyte/integrations/destination/mongodb/SshKeyMongoDbDestinationAcceptanceTest.java create mode 100644 airbyte-integrations/connectors/destination-mongodb/src/test-integration/java/io/airbyte/integrations/destination/mongodb/SshMongoDbDestinationAcceptanceTest.java create mode 100644 airbyte-integrations/connectors/destination-mongodb/src/test-integration/java/io/airbyte/integrations/destination/mongodb/SshPasswordMongoDbDestinationAcceptanceTest.java diff --git a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml index 1186afc086a8..8d21cf0174db 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml @@ -3328,6 +3328,107 @@ type: "string" airbyte_secret: true order: 2 + tunnel_method: + type: "object" + title: "SSH Tunnel Method" + description: "Whether to initiate an SSH tunnel before connecting to the\ + \ database, and if so, which kind of authentication to use." + oneOf: + - title: "No Tunnel" + required: + - "tunnel_method" + properties: + tunnel_method: + description: "No ssh tunnel needed to connect to database" + type: "string" + const: "NO_TUNNEL" + order: 0 + - title: "SSH Key Authentication" + required: + - "tunnel_method" + - "tunnel_host" + - "tunnel_port" + - "tunnel_user" + - "ssh_key" + properties: + tunnel_method: + description: "Connect through a jump server tunnel host using username\ + \ and ssh key" + type: "string" + const: "SSH_KEY_AUTH" + order: 0 + tunnel_host: + title: "SSH Tunnel Jump Server Host" + description: "Hostname of the jump server host that allows inbound\ + \ ssh tunnel." + type: "string" + order: 1 + tunnel_port: + title: "SSH Connection Port" + description: "Port on the proxy/jump server that accepts inbound ssh\ + \ connections." + type: "integer" + minimum: 0 + maximum: 65536 + default: 22 + examples: + - "22" + order: 2 + tunnel_user: + title: "SSH Login Username" + description: "OS-level username for logging into the jump server host." + type: "string" + order: 3 + ssh_key: + title: "SSH Private Key" + description: "OS-level user account ssh key credentials in RSA PEM\ + \ format ( created with ssh-keygen -t rsa -m PEM -f myuser_rsa )" + type: "string" + airbyte_secret: true + multiline: true + order: 4 + - title: "Password Authentication" + required: + - "tunnel_method" + - "tunnel_host" + - "tunnel_port" + - "tunnel_user" + - "tunnel_user_password" + properties: + tunnel_method: + description: "Connect through a jump server tunnel host using username\ + \ and password authentication" + type: "string" + const: "SSH_PASSWORD_AUTH" + order: 0 + tunnel_host: + title: "SSH Tunnel Jump Server Host" + description: "Hostname of the jump server host that allows inbound\ + \ ssh tunnel." + type: "string" + order: 1 + tunnel_port: + title: "SSH Connection Port" + description: "Port on the proxy/jump server that accepts inbound ssh\ + \ connections." + type: "integer" + minimum: 0 + maximum: 65536 + default: 22 + examples: + - "22" + order: 2 + tunnel_user: + title: "SSH Login Username" + description: "OS-level username for logging into the jump server host" + type: "string" + order: 3 + tunnel_user_password: + title: "Password" + description: "OS-level password for logging into the jump server host" + type: "string" + airbyte_secret: true + order: 4 supportsIncremental: true supportsNormalization: false supportsDBT: false diff --git a/airbyte-integrations/connectors/destination-mongodb-strict-encrypt/src/main/java/io.airbyte.integrations.destination.mongodb/MongodbDestinationStrictEncrypt.java b/airbyte-integrations/connectors/destination-mongodb-strict-encrypt/src/main/java/io.airbyte.integrations.destination.mongodb/MongodbDestinationStrictEncrypt.java index fdf208f48c57..2cebbbd508bd 100644 --- a/airbyte-integrations/connectors/destination-mongodb-strict-encrypt/src/main/java/io.airbyte.integrations.destination.mongodb/MongodbDestinationStrictEncrypt.java +++ b/airbyte-integrations/connectors/destination-mongodb-strict-encrypt/src/main/java/io.airbyte.integrations.destination.mongodb/MongodbDestinationStrictEncrypt.java @@ -18,11 +18,11 @@ public class MongodbDestinationStrictEncrypt extends SpecModifyingDestination im private static final Logger LOGGER = LoggerFactory.getLogger(MongodbDestinationStrictEncrypt.class); public MongodbDestinationStrictEncrypt() { - super(new MongodbDestination()); + super(MongodbDestination.sshWrappedDestination()); } @Override - public ConnectorSpecification modifySpec(ConnectorSpecification originalSpec) throws Exception { + public ConnectorSpecification modifySpec(final ConnectorSpecification originalSpec) throws Exception { final ConnectorSpecification spec = Jsons.clone(originalSpec); // removing tls property for a standalone instance to disable possibility to switch off a tls // connection @@ -30,7 +30,7 @@ public ConnectorSpecification modifySpec(ConnectorSpecification originalSpec) th return spec; } - public static void main(String[] args) throws Exception { + public static void main(final String[] args) throws Exception { final Destination destination = new MongodbDestinationStrictEncrypt(); LOGGER.info("starting destination: {}", MongodbDestinationStrictEncrypt.class); new IntegrationRunner(destination).run(args); diff --git a/airbyte-integrations/connectors/destination-mongodb/src/main/java/io/airbyte/integrations/destination/mongodb/MongodbDestination.java b/airbyte-integrations/connectors/destination-mongodb/src/main/java/io/airbyte/integrations/destination/mongodb/MongodbDestination.java index 88ac858e0507..0a46d2700d90 100644 --- a/airbyte-integrations/connectors/destination-mongodb/src/main/java/io/airbyte/integrations/destination/mongodb/MongodbDestination.java +++ b/airbyte-integrations/connectors/destination-mongodb/src/main/java/io/airbyte/integrations/destination/mongodb/MongodbDestination.java @@ -25,6 +25,7 @@ import io.airbyte.integrations.base.AirbyteTraceMessageUtility; import io.airbyte.integrations.base.Destination; import io.airbyte.integrations.base.IntegrationRunner; +import io.airbyte.integrations.base.ssh.SshWrappedDestination; import io.airbyte.integrations.destination.mongodb.exception.MongodbDatabaseException; import io.airbyte.protocol.models.AirbyteConnectionStatus; import io.airbyte.protocol.models.AirbyteMessage; @@ -61,12 +62,16 @@ public class MongodbDestination extends BaseConnector implements Destination { private final MongodbNameTransformer namingResolver; + public static Destination sshWrappedDestination() { + return new SshWrappedDestination(new MongodbDestination(), JdbcUtils.HOST_LIST_KEY, JdbcUtils.PORT_LIST_KEY); + } + public MongodbDestination() { namingResolver = new MongodbNameTransformer(); } public static void main(final String[] args) throws Exception { - final Destination destination = new MongodbDestination(); + final Destination destination = sshWrappedDestination(); LOGGER.info("starting destination: {}", MongodbDestination.class); new IntegrationRunner(destination).run(args); LOGGER.info("completed destination: {}", MongodbDestination.class); diff --git a/airbyte-integrations/connectors/destination-mongodb/src/test-integration/java/io/airbyte/integrations/destination/mongodb/MongodbDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-mongodb/src/test-integration/java/io/airbyte/integrations/destination/mongodb/MongodbDestinationAcceptanceTest.java index e5642a090ed8..0f749fe888e3 100644 --- a/airbyte-integrations/connectors/destination-mongodb/src/test-integration/java/io/airbyte/integrations/destination/mongodb/MongodbDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-mongodb/src/test-integration/java/io/airbyte/integrations/destination/mongodb/MongodbDestinationAcceptanceTest.java @@ -19,6 +19,7 @@ import io.airbyte.integrations.standardtest.destination.comparator.AdvancedTestDataComparator; import io.airbyte.integrations.standardtest.destination.comparator.TestDataComparator; import io.airbyte.protocol.models.AirbyteConnectionStatus; +import java.io.IOException; import java.util.ArrayList; import java.util.List; import org.bson.Document; @@ -27,10 +28,10 @@ public class MongodbDestinationAcceptanceTest extends DestinationAcceptanceTest { - private static final String DOCKER_IMAGE_NAME = "mongo:4.0.10"; - private static final String DATABASE_NAME = "admin"; + protected static final String DOCKER_IMAGE_NAME = "mongo:4.0.10"; + protected static final String DATABASE_NAME = "admin"; private static final String DATABASE_FAIL_NAME = "fail_db"; - private static final String AUTH_TYPE = "auth_type"; + protected static final String AUTH_TYPE = "auth_type"; private static final String AIRBYTE_DATA = "_airbyte_data"; private MongoDBContainer container; @@ -42,7 +43,7 @@ protected String getImageName() { } @Override - protected JsonNode getConfig() { + protected JsonNode getConfig() throws Exception { return Jsons.jsonNode(ImmutableMap.builder() .put(JdbcUtils.HOST_KEY, container.getHost()) .put(JdbcUtils.PORT_KEY, container.getFirstMappedPort()) @@ -52,7 +53,7 @@ protected JsonNode getConfig() { } @Override - protected JsonNode getFailCheckConfig() { + protected JsonNode getFailCheckConfig() throws Exception { return Jsons.jsonNode(ImmutableMap.builder() .put(JdbcUtils.HOST_KEY, container.getHost()) .put(JdbcUtils.PORT_KEY, container.getFirstMappedPort()) @@ -110,54 +111,77 @@ protected List retrieveRecords(final TestDestinationEnv testEnv, */ @Test void testCheckIncorrectPasswordFailure() { - final JsonNode invalidConfig = getFailCheckConfig(); - ((ObjectNode) invalidConfig).put(JdbcUtils.DATABASE_KEY, DATABASE_NAME); - ((ObjectNode) invalidConfig.get(AUTH_TYPE)).put(JdbcUtils.PASSWORD_KEY, "fake"); - final MongodbDestination destination = new MongodbDestination(); - final AirbyteConnectionStatus status = destination.check(invalidConfig); - assertEquals(AirbyteConnectionStatus.Status.FAILED, status.getStatus()); - assertTrue(status.getMessage().contains("State code: 18")); + try { + final JsonNode invalidConfig = getFailCheckConfig(); + ((ObjectNode) invalidConfig).put(JdbcUtils.DATABASE_KEY, DATABASE_NAME); + ((ObjectNode) invalidConfig.get(AUTH_TYPE)).put(JdbcUtils.PASSWORD_KEY, "fake"); + final MongodbDestination destination = new MongodbDestination(); + final AirbyteConnectionStatus status = destination.check(invalidConfig); + assertEquals(AirbyteConnectionStatus.Status.FAILED, status.getStatus()); + assertTrue(status.getMessage().contains("State code: 18")); + } catch (final Exception e) { + assertTrue(e instanceof IOException); + } } @Test public void testCheckIncorrectUsernameFailure() { - final JsonNode invalidConfig = getFailCheckConfig(); - ((ObjectNode) invalidConfig).put(JdbcUtils.DATABASE_KEY, DATABASE_NAME); - ((ObjectNode) invalidConfig.get(AUTH_TYPE)).put(JdbcUtils.USERNAME_KEY, "fakeusername"); - final MongodbDestination destination = new MongodbDestination(); - final AirbyteConnectionStatus status = destination.check(invalidConfig); - assertEquals(AirbyteConnectionStatus.Status.FAILED, status.getStatus()); - assertTrue(status.getMessage().contains("State code: 18")); + try { + final JsonNode invalidConfig = getFailCheckConfig(); + ((ObjectNode) invalidConfig).put(JdbcUtils.DATABASE_KEY, DATABASE_NAME); + ((ObjectNode) invalidConfig.get(AUTH_TYPE)).put(JdbcUtils.USERNAME_KEY, "fakeusername"); + final MongodbDestination destination = new MongodbDestination(); + final AirbyteConnectionStatus status = destination.check(invalidConfig); + assertEquals(AirbyteConnectionStatus.Status.FAILED, status.getStatus()); + assertTrue(status.getMessage().contains("State code: 18")); + } catch (final Exception e) { + assertTrue(e instanceof IOException); + } + } @Test public void testCheckIncorrectDataBaseFailure() { - final JsonNode invalidConfig = getFailCheckConfig(); - ((ObjectNode) invalidConfig).put(JdbcUtils.DATABASE_KEY, DATABASE_FAIL_NAME); - final MongodbDestination destination = new MongodbDestination(); - final AirbyteConnectionStatus status = destination.check(invalidConfig); - assertEquals(AirbyteConnectionStatus.Status.FAILED, status.getStatus()); - assertTrue(status.getMessage().contains("State code: 18")); + try { + final JsonNode invalidConfig = getFailCheckConfig(); + ((ObjectNode) invalidConfig).put(JdbcUtils.DATABASE_KEY, DATABASE_FAIL_NAME); + final MongodbDestination destination = new MongodbDestination(); + final AirbyteConnectionStatus status = destination.check(invalidConfig); + assertEquals(AirbyteConnectionStatus.Status.FAILED, status.getStatus()); + assertTrue(status.getMessage().contains("State code: 18")); + } catch (final Exception e) { + assertTrue(e instanceof IOException); + } + } @Test public void testCheckIncorrectHost() { - final JsonNode invalidConfig = getConfig(); - ((ObjectNode) invalidConfig).put(JdbcUtils.HOST_KEY, "localhost2"); - final MongodbDestination destination = new MongodbDestination(); - final AirbyteConnectionStatus status = destination.check(invalidConfig); - assertEquals(AirbyteConnectionStatus.Status.FAILED, status.getStatus()); - assertTrue(status.getMessage().contains("State code: -3")); + try { + final JsonNode invalidConfig = getConfig(); + ((ObjectNode) invalidConfig).put(JdbcUtils.HOST_KEY, "localhost2"); + final MongodbDestination destination = new MongodbDestination(); + final AirbyteConnectionStatus status = destination.check(invalidConfig); + assertEquals(AirbyteConnectionStatus.Status.FAILED, status.getStatus()); + assertTrue(status.getMessage().contains("State code: -3")); + } catch (final Exception e) { + assertTrue(e instanceof IOException); + } + } @Test public void testCheckIncorrectPort() { - final JsonNode invalidConfig = getConfig(); - ((ObjectNode) invalidConfig).put(JdbcUtils.PORT_KEY, 1234); - final MongodbDestination destination = new MongodbDestination(); - final AirbyteConnectionStatus status = destination.check(invalidConfig); - assertEquals(AirbyteConnectionStatus.Status.FAILED, status.getStatus()); - assertTrue(status.getMessage().contains("State code: -3")); + try { + final JsonNode invalidConfig = getConfig(); + ((ObjectNode) invalidConfig).put(JdbcUtils.PORT_KEY, 1234); + final MongodbDestination destination = new MongodbDestination(); + final AirbyteConnectionStatus status = destination.check(invalidConfig); + assertEquals(AirbyteConnectionStatus.Status.FAILED, status.getStatus()); + assertTrue(status.getMessage().contains("State code: -3")); + } catch (final Exception e) { + assertTrue(e instanceof IOException); + } } @Override @@ -174,7 +198,7 @@ protected void tearDown(final TestDestinationEnv testEnv) { /* Helpers */ - private JsonNode getAuthTypeConfig() { + protected JsonNode getAuthTypeConfig() { return Jsons.deserialize("{\n" + " \"authorization\": \"none\"\n" + "}"); diff --git a/airbyte-integrations/connectors/destination-mongodb/src/test-integration/java/io/airbyte/integrations/destination/mongodb/SshKeyMongoDbDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-mongodb/src/test-integration/java/io/airbyte/integrations/destination/mongodb/SshKeyMongoDbDestinationAcceptanceTest.java new file mode 100644 index 000000000000..9a9e5d9889ce --- /dev/null +++ b/airbyte-integrations/connectors/destination-mongodb/src/test-integration/java/io/airbyte/integrations/destination/mongodb/SshKeyMongoDbDestinationAcceptanceTest.java @@ -0,0 +1,16 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.mongodb; + +import io.airbyte.integrations.base.ssh.SshTunnel; +import io.airbyte.integrations.base.ssh.SshTunnel.TunnelMethod; + +public class SshKeyMongoDbDestinationAcceptanceTest extends SshMongoDbDestinationAcceptanceTest { + + @Override + public SshTunnel.TunnelMethod getTunnelMethod() { + return TunnelMethod.SSH_KEY_AUTH; + } +} diff --git a/airbyte-integrations/connectors/destination-mongodb/src/test-integration/java/io/airbyte/integrations/destination/mongodb/SshMongoDbDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-mongodb/src/test-integration/java/io/airbyte/integrations/destination/mongodb/SshMongoDbDestinationAcceptanceTest.java new file mode 100644 index 000000000000..0338a2c8d34e --- /dev/null +++ b/airbyte-integrations/connectors/destination-mongodb/src/test-integration/java/io/airbyte/integrations/destination/mongodb/SshMongoDbDestinationAcceptanceTest.java @@ -0,0 +1,65 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.mongodb; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.collect.ImmutableMap; +import io.airbyte.commons.json.Jsons; +import io.airbyte.db.jdbc.JdbcUtils; +import io.airbyte.integrations.base.ssh.SshBastionContainer; +import io.airbyte.integrations.base.ssh.SshTunnel; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.testcontainers.containers.MongoDBContainer; +import org.testcontainers.containers.Network; + +public abstract class SshMongoDbDestinationAcceptanceTest extends MongodbDestinationAcceptanceTest { + + private static final Network network = Network.newNetwork(); + private static final SshBastionContainer bastion = new SshBastionContainer(); + private static MongoDBContainer container; + private static final int DEFAULT_PORT = 27017; + + public abstract SshTunnel.TunnelMethod getTunnelMethod(); + + @BeforeAll + public static void beforeAll() { + container = new MongoDBContainer(DOCKER_IMAGE_NAME) + .withNetwork(network) + .withExposedPorts(DEFAULT_PORT); + container.start(); + bastion.initAndStartBastion(network); + } + + @Override + protected JsonNode getConfig() throws Exception { + return bastion.getTunnelConfig(getTunnelMethod(), ImmutableMap.builder() + .put(JdbcUtils.HOST_KEY, container.getHost()) + .put(JdbcUtils.PORT_KEY, container.getFirstMappedPort()) + .put(JdbcUtils.DATABASE_KEY, DATABASE_NAME) + .put(AUTH_TYPE, getAuthTypeConfig())); + } + + @Override + protected JsonNode getFailCheckConfig() throws Exception { + // should result in a failed connection check + return bastion.getTunnelConfig(getTunnelMethod(), ImmutableMap.builder() + .put(JdbcUtils.HOST_KEY, container.getHost()) + .put(JdbcUtils.PORT_KEY, container.getFirstMappedPort()) + .put(JdbcUtils.DATABASE_KEY, DATABASE_NAME) + .put(AUTH_TYPE, Jsons.jsonNode(ImmutableMap.builder() + .put("authorization", "login/password") + .put(JdbcUtils.USERNAME_KEY, "user") + .put(JdbcUtils.PASSWORD_KEY, "invalid_pass") + .build()))); + } + + @AfterAll + public static void afterAll() { + container.close(); + bastion.getContainer().close(); + } + +} diff --git a/airbyte-integrations/connectors/destination-mongodb/src/test-integration/java/io/airbyte/integrations/destination/mongodb/SshPasswordMongoDbDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-mongodb/src/test-integration/java/io/airbyte/integrations/destination/mongodb/SshPasswordMongoDbDestinationAcceptanceTest.java new file mode 100644 index 000000000000..379afd47200b --- /dev/null +++ b/airbyte-integrations/connectors/destination-mongodb/src/test-integration/java/io/airbyte/integrations/destination/mongodb/SshPasswordMongoDbDestinationAcceptanceTest.java @@ -0,0 +1,15 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.mongodb; + +import io.airbyte.integrations.base.ssh.SshTunnel; + +public class SshPasswordMongoDbDestinationAcceptanceTest extends SshMongoDbDestinationAcceptanceTest { + + @Override + public SshTunnel.TunnelMethod getTunnelMethod() { + return SshTunnel.TunnelMethod.SSH_PASSWORD_AUTH; + } +} From d0a2d3d0885b5f4551265521ce7bf17162244934 Mon Sep 17 00:00:00 2001 From: ryankfu Date: Tue, 25 Oct 2022 16:21:36 -0700 Subject: [PATCH 2/5] Fixed host/port resolution issues with Docker containers --- .../MongodbDestinationAcceptanceTest.java | 11 ++--- .../SshMongoDbDestinationAcceptanceTest.java | 46 +++++++++++++++---- 2 files changed, 39 insertions(+), 18 deletions(-) diff --git a/airbyte-integrations/connectors/destination-mongodb/src/test-integration/java/io/airbyte/integrations/destination/mongodb/MongodbDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-mongodb/src/test-integration/java/io/airbyte/integrations/destination/mongodb/MongodbDestinationAcceptanceTest.java index 0f749fe888e3..20e5d5f9fee5 100644 --- a/airbyte-integrations/connectors/destination-mongodb/src/test-integration/java/io/airbyte/integrations/destination/mongodb/MongodbDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-mongodb/src/test-integration/java/io/airbyte/integrations/destination/mongodb/MongodbDestinationAcceptanceTest.java @@ -32,10 +32,10 @@ public class MongodbDestinationAcceptanceTest extends DestinationAcceptanceTest protected static final String DATABASE_NAME = "admin"; private static final String DATABASE_FAIL_NAME = "fail_db"; protected static final String AUTH_TYPE = "auth_type"; - private static final String AIRBYTE_DATA = "_airbyte_data"; + protected static final String AIRBYTE_DATA = "_airbyte_data"; private MongoDBContainer container; - private final MongodbNameTransformer namingResolver = new MongodbNameTransformer(); + protected final MongodbNameTransformer namingResolver = new MongodbNameTransformer(); @Override protected String getImageName() { @@ -118,7 +118,6 @@ void testCheckIncorrectPasswordFailure() { final MongodbDestination destination = new MongodbDestination(); final AirbyteConnectionStatus status = destination.check(invalidConfig); assertEquals(AirbyteConnectionStatus.Status.FAILED, status.getStatus()); - assertTrue(status.getMessage().contains("State code: 18")); } catch (final Exception e) { assertTrue(e instanceof IOException); } @@ -133,7 +132,6 @@ public void testCheckIncorrectUsernameFailure() { final MongodbDestination destination = new MongodbDestination(); final AirbyteConnectionStatus status = destination.check(invalidConfig); assertEquals(AirbyteConnectionStatus.Status.FAILED, status.getStatus()); - assertTrue(status.getMessage().contains("State code: 18")); } catch (final Exception e) { assertTrue(e instanceof IOException); } @@ -148,7 +146,6 @@ public void testCheckIncorrectDataBaseFailure() { final MongodbDestination destination = new MongodbDestination(); final AirbyteConnectionStatus status = destination.check(invalidConfig); assertEquals(AirbyteConnectionStatus.Status.FAILED, status.getStatus()); - assertTrue(status.getMessage().contains("State code: 18")); } catch (final Exception e) { assertTrue(e instanceof IOException); } @@ -163,7 +160,6 @@ public void testCheckIncorrectHost() { final MongodbDestination destination = new MongodbDestination(); final AirbyteConnectionStatus status = destination.check(invalidConfig); assertEquals(AirbyteConnectionStatus.Status.FAILED, status.getStatus()); - assertTrue(status.getMessage().contains("State code: -3")); } catch (final Exception e) { assertTrue(e instanceof IOException); } @@ -178,7 +174,6 @@ public void testCheckIncorrectPort() { final MongodbDestination destination = new MongodbDestination(); final AirbyteConnectionStatus status = destination.check(invalidConfig); assertEquals(AirbyteConnectionStatus.Status.FAILED, status.getStatus()); - assertTrue(status.getMessage().contains("State code: -3")); } catch (final Exception e) { assertTrue(e instanceof IOException); } @@ -204,7 +199,7 @@ protected JsonNode getAuthTypeConfig() { + "}"); } - private MongoDatabase getMongoDatabase(final String host, final int port, final String databaseName) { + protected MongoDatabase getMongoDatabase(final String host, final int port, final String databaseName) { try { final String connectionString = String.format("mongodb://%s:%s/", host, port); return new MongoDatabase(connectionString, databaseName); diff --git a/airbyte-integrations/connectors/destination-mongodb/src/test-integration/java/io/airbyte/integrations/destination/mongodb/SshMongoDbDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-mongodb/src/test-integration/java/io/airbyte/integrations/destination/mongodb/SshMongoDbDestinationAcceptanceTest.java index 0338a2c8d34e..3acae92257cc 100644 --- a/airbyte-integrations/connectors/destination-mongodb/src/test-integration/java/io/airbyte/integrations/destination/mongodb/SshMongoDbDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-mongodb/src/test-integration/java/io/airbyte/integrations/destination/mongodb/SshMongoDbDestinationAcceptanceTest.java @@ -4,14 +4,20 @@ package io.airbyte.integrations.destination.mongodb; +import static com.mongodb.client.model.Projections.excludeId; + import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.ImmutableMap; +import com.mongodb.client.MongoCursor; import io.airbyte.commons.json.Jsons; import io.airbyte.db.jdbc.JdbcUtils; +import io.airbyte.db.mongodb.MongoDatabase; import io.airbyte.integrations.base.ssh.SshBastionContainer; import io.airbyte.integrations.base.ssh.SshTunnel; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; +import io.airbyte.integrations.util.HostPortResolver; +import java.util.ArrayList; +import java.util.List; +import org.bson.Document; import org.testcontainers.containers.MongoDBContainer; import org.testcontainers.containers.Network; @@ -24,8 +30,8 @@ public abstract class SshMongoDbDestinationAcceptanceTest extends MongodbDestina public abstract SshTunnel.TunnelMethod getTunnelMethod(); - @BeforeAll - public static void beforeAll() { + @Override + protected void setup(final TestDestinationEnv testEnv) { container = new MongoDBContainer(DOCKER_IMAGE_NAME) .withNetwork(network) .withExposedPorts(DEFAULT_PORT); @@ -36,8 +42,8 @@ public static void beforeAll() { @Override protected JsonNode getConfig() throws Exception { return bastion.getTunnelConfig(getTunnelMethod(), ImmutableMap.builder() - .put(JdbcUtils.HOST_KEY, container.getHost()) - .put(JdbcUtils.PORT_KEY, container.getFirstMappedPort()) + .put(JdbcUtils.HOST_KEY, HostPortResolver.resolveIpAddress(container)) + .put(JdbcUtils.PORT_KEY, container.getExposedPorts().get(0)) .put(JdbcUtils.DATABASE_KEY, DATABASE_NAME) .put(AUTH_TYPE, getAuthTypeConfig())); } @@ -46,8 +52,8 @@ protected JsonNode getConfig() throws Exception { protected JsonNode getFailCheckConfig() throws Exception { // should result in a failed connection check return bastion.getTunnelConfig(getTunnelMethod(), ImmutableMap.builder() - .put(JdbcUtils.HOST_KEY, container.getHost()) - .put(JdbcUtils.PORT_KEY, container.getFirstMappedPort()) + .put(JdbcUtils.HOST_KEY, HostPortResolver.resolveIpAddress(container)) + .put(JdbcUtils.PORT_KEY, container.getExposedPorts().get(0)) .put(JdbcUtils.DATABASE_KEY, DATABASE_NAME) .put(AUTH_TYPE, Jsons.jsonNode(ImmutableMap.builder() .put("authorization", "login/password") @@ -56,9 +62,29 @@ protected JsonNode getFailCheckConfig() throws Exception { .build()))); } - @AfterAll - public static void afterAll() { + @Override + protected List retrieveRecords(final TestDestinationEnv testEnv, + final String streamName, + final String namespace, + final JsonNode streamSchema) { + final MongoDatabase database = getMongoDatabase(HostPortResolver.resolveIpAddress(container), + container.getExposedPorts().get(0), DATABASE_NAME); + final var collection = database.getOrCreateNewCollection(namingResolver.getRawTableName(streamName)); + final List result = new ArrayList<>(); + try (final MongoCursor cursor = collection.find().projection(excludeId()).iterator()) { + while (cursor.hasNext()) { + result.add(Jsons.jsonNode(cursor.next().get(AIRBYTE_DATA))); + } + } + return result; + } + + + @Override + protected void tearDown(final TestDestinationEnv testEnv) { + container.stop(); container.close(); + bastion.getContainer().stop(); bastion.getContainer().close(); } From 5dfe8aa594f7c87dd365feff8ddd6f53485c4eb2 Mon Sep 17 00:00:00 2001 From: ryankfu Date: Tue, 25 Oct 2022 17:47:48 -0700 Subject: [PATCH 3/5] Formatting changes --- .../resources/seed/destination_specs.yaml | 192 +++++++++--------- 1 file changed, 96 insertions(+), 96 deletions(-) diff --git a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml index 8d21cf0174db..2076085dce38 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml @@ -3332,103 +3332,103 @@ type: "object" title: "SSH Tunnel Method" description: "Whether to initiate an SSH tunnel before connecting to the\ - \ database, and if so, which kind of authentication to use." + \ database, and if so, which kind of authentication to use." oneOf: - - title: "No Tunnel" - required: - - "tunnel_method" - properties: - tunnel_method: - description: "No ssh tunnel needed to connect to database" - type: "string" - const: "NO_TUNNEL" - order: 0 - - title: "SSH Key Authentication" - required: - - "tunnel_method" - - "tunnel_host" - - "tunnel_port" - - "tunnel_user" - - "ssh_key" - properties: - tunnel_method: - description: "Connect through a jump server tunnel host using username\ - \ and ssh key" - type: "string" - const: "SSH_KEY_AUTH" - order: 0 - tunnel_host: - title: "SSH Tunnel Jump Server Host" - description: "Hostname of the jump server host that allows inbound\ - \ ssh tunnel." - type: "string" - order: 1 - tunnel_port: - title: "SSH Connection Port" - description: "Port on the proxy/jump server that accepts inbound ssh\ - \ connections." - type: "integer" - minimum: 0 - maximum: 65536 - default: 22 - examples: - - "22" - order: 2 - tunnel_user: - title: "SSH Login Username" - description: "OS-level username for logging into the jump server host." - type: "string" - order: 3 - ssh_key: - title: "SSH Private Key" - description: "OS-level user account ssh key credentials in RSA PEM\ - \ format ( created with ssh-keygen -t rsa -m PEM -f myuser_rsa )" - type: "string" - airbyte_secret: true - multiline: true - order: 4 - - title: "Password Authentication" - required: - - "tunnel_method" - - "tunnel_host" - - "tunnel_port" - - "tunnel_user" - - "tunnel_user_password" - properties: - tunnel_method: - description: "Connect through a jump server tunnel host using username\ - \ and password authentication" - type: "string" - const: "SSH_PASSWORD_AUTH" - order: 0 - tunnel_host: - title: "SSH Tunnel Jump Server Host" - description: "Hostname of the jump server host that allows inbound\ - \ ssh tunnel." - type: "string" - order: 1 - tunnel_port: - title: "SSH Connection Port" - description: "Port on the proxy/jump server that accepts inbound ssh\ - \ connections." - type: "integer" - minimum: 0 - maximum: 65536 - default: 22 - examples: - - "22" - order: 2 - tunnel_user: - title: "SSH Login Username" - description: "OS-level username for logging into the jump server host" - type: "string" - order: 3 - tunnel_user_password: - title: "Password" - description: "OS-level password for logging into the jump server host" - type: "string" - airbyte_secret: true - order: 4 + - title: "No Tunnel" + required: + - "tunnel_method" + properties: + tunnel_method: + description: "No ssh tunnel needed to connect to database" + type: "string" + const: "NO_TUNNEL" + order: 0 + - title: "SSH Key Authentication" + required: + - "tunnel_method" + - "tunnel_host" + - "tunnel_port" + - "tunnel_user" + - "ssh_key" + properties: + tunnel_method: + description: "Connect through a jump server tunnel host using username\ + \ and ssh key" + type: "string" + const: "SSH_KEY_AUTH" + order: 0 + tunnel_host: + title: "SSH Tunnel Jump Server Host" + description: "Hostname of the jump server host that allows inbound\ + \ ssh tunnel." + type: "string" + order: 1 + tunnel_port: + title: "SSH Connection Port" + description: "Port on the proxy/jump server that accepts inbound ssh\ + \ connections." + type: "integer" + minimum: 0 + maximum: 65536 + default: 22 + examples: + - "22" + order: 2 + tunnel_user: + title: "SSH Login Username" + description: "OS-level username for logging into the jump server host." + type: "string" + order: 3 + ssh_key: + title: "SSH Private Key" + description: "OS-level user account ssh key credentials in RSA PEM\ + \ format ( created with ssh-keygen -t rsa -m PEM -f myuser_rsa )" + type: "string" + airbyte_secret: true + multiline: true + order: 4 + - title: "Password Authentication" + required: + - "tunnel_method" + - "tunnel_host" + - "tunnel_port" + - "tunnel_user" + - "tunnel_user_password" + properties: + tunnel_method: + description: "Connect through a jump server tunnel host using username\ + \ and password authentication" + type: "string" + const: "SSH_PASSWORD_AUTH" + order: 0 + tunnel_host: + title: "SSH Tunnel Jump Server Host" + description: "Hostname of the jump server host that allows inbound\ + \ ssh tunnel." + type: "string" + order: 1 + tunnel_port: + title: "SSH Connection Port" + description: "Port on the proxy/jump server that accepts inbound ssh\ + \ connections." + type: "integer" + minimum: 0 + maximum: 65536 + default: 22 + examples: + - "22" + order: 2 + tunnel_user: + title: "SSH Login Username" + description: "OS-level username for logging into the jump server host" + type: "string" + order: 3 + tunnel_user_password: + title: "Password" + description: "OS-level password for logging into the jump server host" + type: "string" + airbyte_secret: true + order: 4 supportsIncremental: true supportsNormalization: false supportsDBT: false From 91577e9ae097b5bb847b482f9915b2ba860807ff Mon Sep 17 00:00:00 2001 From: ryankfu Date: Wed, 26 Oct 2022 13:01:57 -0700 Subject: [PATCH 4/5] Bump version number and adds documentation for SSH tunnel --- .../Dockerfile | 2 +- .../connectors/destination-mongodb/Dockerfile | 2 +- docs/integrations/destinations/mongodb.md | 39 ++++++++++++++----- 3 files changed, 32 insertions(+), 11 deletions(-) diff --git a/airbyte-integrations/connectors/destination-mongodb-strict-encrypt/Dockerfile b/airbyte-integrations/connectors/destination-mongodb-strict-encrypt/Dockerfile index 6bffbe8f7041..6b4ecd23bc7b 100644 --- a/airbyte-integrations/connectors/destination-mongodb-strict-encrypt/Dockerfile +++ b/airbyte-integrations/connectors/destination-mongodb-strict-encrypt/Dockerfile @@ -16,5 +16,5 @@ ENV APPLICATION destination-mongodb-strict-encrypt COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=0.1.7 +LABEL io.airbyte.version=0.1.8 LABEL io.airbyte.name=airbyte/destination-mongodb-strict-encrypt diff --git a/airbyte-integrations/connectors/destination-mongodb/Dockerfile b/airbyte-integrations/connectors/destination-mongodb/Dockerfile index 00cb6114919d..cff1b88e848c 100644 --- a/airbyte-integrations/connectors/destination-mongodb/Dockerfile +++ b/airbyte-integrations/connectors/destination-mongodb/Dockerfile @@ -16,5 +16,5 @@ ENV APPLICATION destination-mongodb COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=0.1.7 +LABEL io.airbyte.version=0.1.8 LABEL io.airbyte.name=airbyte/destination-mongodb diff --git a/docs/integrations/destinations/mongodb.md b/docs/integrations/destinations/mongodb.md index de1ebe28b83b..6a357aa6215b 100644 --- a/docs/integrations/destinations/mongodb.md +++ b/docs/integrations/destinations/mongodb.md @@ -60,6 +60,26 @@ You should now have all the requirements needed to configure MongoDB as a destin For more information regarding configuration parameters, please see [MongoDb Documentation](https://docs.mongodb.com/drivers/java/sync/v4.3/fundamentals/connection/). +### Connection via SSH Tunnel + +Airbyte has the ability to connect to an MongoDB instance via an SSH Tunnel. +The reason you might want to do this because it is not possible \(or against security policy\) to connect to your MongoDB instance directly \(e.g. it does not have a public IP address\). + +When using an SSH tunnel, you are configuring Airbyte to connect to an intermediate server \(a.k.a. a bastion sever\) that _does_ have direct access to the MongoDB instance. +Airbyte connects to the bastion and then asks the bastion to connect directly to the server. + +Using this feature requires additional configuration, when creating the source. We will talk through what each piece of configuration means. + +1. Configure all fields for the source as you normally would, except `SSH Tunnel Method`. +2. `SSH Tunnel Method` defaults to `No Tunnel` \(meaning a direct connection\). If you want to use an SSH Tunnel choose `SSH Key Authentication` or `Password Authentication`. + 1. Choose `Key Authentication` if you will be using an RSA private key as your secret for establishing the SSH Tunnel \(see below for more information on generating this key\). + 2. Choose `Password Authentication` if you will be using a password as your secret for establishing the SSH Tunnel. +3. `SSH Tunnel Jump Server Host` refers to the intermediate \(bastion\) server that Airbyte will connect to. This should be a hostname or an IP Address. +4. `SSH Connection Port` is the port on the bastion server with which to make the SSH connection. The default port for SSH connections is `22`, so unless you have explicitly changed something, go with the default. +5. `SSH Login Username` is the username that Airbyte should use when connection to the bastion server. This is NOT the TiDB username. +6. If you are using `Password Authentication`, then `SSH Login Username` should be set to the password of the User from the previous step. If you are using `SSH Key Authentication` TiDB password, but the password for the OS-user that Airbyte is using to perform commands on the bastion. +7. If you are using `SSH Key Authentication`, then `SSH Private Key` should be set to the RSA Private Key that you are using to create the SSH connection. This should be the full contents of the key file starting with `-----BEGIN RSA PRIVATE KEY-----` and ending with `-----END RSA PRIVATE KEY-----`. + ## Naming Conventions The following information comes from the [MongoDB Limits and Thresholds](https://docs.mongodb.com/manual/reference/limits/) documentation. @@ -95,13 +115,14 @@ Collection names should begin with an underscore or a letter character, and cann ## Changelog -| Version | Date | Pull Request | Subject | -| :--- | :--- | :--- | :--- | -| 0.1.7 | 2022-09-02 | [16025](https://github.com/airbytehq/airbyte/pull/16025) | Remove additionalProperties:false from spec | -| 0.1.6 | 2022-08-02 | [15211](https://github.com/airbytehq/airbyte/pull/15211) | Fix standard mode | -| 0.1.5 | 2022-07-27 | [14561](https://github.com/airbytehq/airbyte/pull/14561) | Change Airbyte Id from MD5 to SHA256 | -| 0.1.4 | 2022-02-14 | [10256](https://github.com/airbytehq/airbyte/pull/10256) | (unpublished) Add `-XX:+ExitOnOutOfMemoryError` JVM option | -| 0.1.3 | 2021-12-30 | [8809](https://github.com/airbytehq/airbyte/pull/8809) | Update connector fields title/description | -| 0.1.2 | 2021-10-18 | [6945](https://github.com/airbytehq/airbyte/pull/6945) | Create a secure-only MongoDb destination | -| 0.1.1 | 2021-09-29 | [6536](https://github.com/airbytehq/airbyte/pull/6536) | Destination MongoDb: added support via TLS/SSL | +| Version | Date | Pull Request | Subject | +|:--------|:-----------|:---------------------------------------------------------|:-----------------------------------------------------------| +| 0.1.8 | 2022-10-26 | [18280](https://github.com/airbytehq/airbyte/pull/18280) | Adds SSH tunneling | +| 0.1.7 | 2022-09-02 | [16025](https://github.com/airbytehq/airbyte/pull/16025) | Remove additionalProperties:false from spec | +| 0.1.6 | 2022-08-02 | [15211](https://github.com/airbytehq/airbyte/pull/15211) | Fix standard mode | +| 0.1.5 | 2022-07-27 | [14561](https://github.com/airbytehq/airbyte/pull/14561) | Change Airbyte Id from MD5 to SHA256 | +| 0.1.4 | 2022-02-14 | [10256](https://github.com/airbytehq/airbyte/pull/10256) | (unpublished) Add `-XX:+ExitOnOutOfMemoryError` JVM option | +| 0.1.3 | 2021-12-30 | [8809](https://github.com/airbytehq/airbyte/pull/8809) | Update connector fields title/description | +| 0.1.2 | 2021-10-18 | [6945](https://github.com/airbytehq/airbyte/pull/6945) | Create a secure-only MongoDb destination | +| 0.1.1 | 2021-09-29 | [6536](https://github.com/airbytehq/airbyte/pull/6536) | Destination MongoDb: added support via TLS/SSL | From 4f75515997a05a2f8e54d27882b6f7e195574618 Mon Sep 17 00:00:00 2001 From: Octavia Squidington III Date: Wed, 26 Oct 2022 21:55:05 +0000 Subject: [PATCH 5/5] auto-bump connector version --- .../init/src/main/resources/seed/destination_definitions.yaml | 2 +- .../init/src/main/resources/seed/destination_specs.yaml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml index eafa3ec2441f..7169e758f118 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml @@ -189,7 +189,7 @@ - name: MongoDB destinationDefinitionId: 8b746512-8c2e-6ac1-4adc-b59faafd473c dockerRepository: airbyte/destination-mongodb - dockerImageTag: 0.1.7 + dockerImageTag: 0.1.8 documentationUrl: https://docs.airbyte.com/integrations/destinations/mongodb icon: mongodb.svg releaseStage: alpha diff --git a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml index 2076085dce38..464fe0fd3e9e 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml @@ -3196,7 +3196,7 @@ supported_destination_sync_modes: - "overwrite" - "append" -- dockerImage: "airbyte/destination-mongodb:0.1.7" +- dockerImage: "airbyte/destination-mongodb:0.1.8" spec: documentationUrl: "https://docs.airbyte.com/integrations/destinations/mongodb" connectionSpecification: