Skip to content

Commit

Permalink
Add SSH tunnel for MongoDB Destination (#18280)
Browse files Browse the repository at this point in the history
* Add SSH tunnel for MongoDB Destination

* Fixed host/port resolution issues with Docker containers

* Formatting changes

* Bump version number and adds documentation for SSH tunnel

* auto-bump connector version

Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
  • Loading branch information
ryankfu and octavia-squidington-iii authored Oct 26, 2022
1 parent 5d4b564 commit cd33eff
Show file tree
Hide file tree
Showing 11 changed files with 326 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@
- name: MongoDB
destinationDefinitionId: 8b746512-8c2e-6ac1-4adc-b59faafd473c
dockerRepository: airbyte/destination-mongodb
dockerImageTag: 0.1.7
dockerImageTag: 0.1.8
documentationUrl: https://docs.airbyte.com/integrations/destinations/mongodb
icon: mongodb.svg
releaseStage: alpha
Expand Down
103 changes: 102 additions & 1 deletion airbyte-config/init/src/main/resources/seed/destination_specs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3196,7 +3196,7 @@
supported_destination_sync_modes:
- "overwrite"
- "append"
- dockerImage: "airbyte/destination-mongodb:0.1.7"
- dockerImage: "airbyte/destination-mongodb:0.1.8"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/destinations/mongodb"
connectionSpecification:
Expand Down Expand Up @@ -3328,6 +3328,107 @@
type: "string"
airbyte_secret: true
order: 2
tunnel_method:
type: "object"
title: "SSH Tunnel Method"
description: "Whether to initiate an SSH tunnel before connecting to the\
\ database, and if so, which kind of authentication to use."
oneOf:
- title: "No Tunnel"
required:
- "tunnel_method"
properties:
tunnel_method:
description: "No ssh tunnel needed to connect to database"
type: "string"
const: "NO_TUNNEL"
order: 0
- title: "SSH Key Authentication"
required:
- "tunnel_method"
- "tunnel_host"
- "tunnel_port"
- "tunnel_user"
- "ssh_key"
properties:
tunnel_method:
description: "Connect through a jump server tunnel host using username\
\ and ssh key"
type: "string"
const: "SSH_KEY_AUTH"
order: 0
tunnel_host:
title: "SSH Tunnel Jump Server Host"
description: "Hostname of the jump server host that allows inbound\
\ ssh tunnel."
type: "string"
order: 1
tunnel_port:
title: "SSH Connection Port"
description: "Port on the proxy/jump server that accepts inbound ssh\
\ connections."
type: "integer"
minimum: 0
maximum: 65536
default: 22
examples:
- "22"
order: 2
tunnel_user:
title: "SSH Login Username"
description: "OS-level username for logging into the jump server host."
type: "string"
order: 3
ssh_key:
title: "SSH Private Key"
description: "OS-level user account ssh key credentials in RSA PEM\
\ format ( created with ssh-keygen -t rsa -m PEM -f myuser_rsa )"
type: "string"
airbyte_secret: true
multiline: true
order: 4
- title: "Password Authentication"
required:
- "tunnel_method"
- "tunnel_host"
- "tunnel_port"
- "tunnel_user"
- "tunnel_user_password"
properties:
tunnel_method:
description: "Connect through a jump server tunnel host using username\
\ and password authentication"
type: "string"
const: "SSH_PASSWORD_AUTH"
order: 0
tunnel_host:
title: "SSH Tunnel Jump Server Host"
description: "Hostname of the jump server host that allows inbound\
\ ssh tunnel."
type: "string"
order: 1
tunnel_port:
title: "SSH Connection Port"
description: "Port on the proxy/jump server that accepts inbound ssh\
\ connections."
type: "integer"
minimum: 0
maximum: 65536
default: 22
examples:
- "22"
order: 2
tunnel_user:
title: "SSH Login Username"
description: "OS-level username for logging into the jump server host"
type: "string"
order: 3
tunnel_user_password:
title: "Password"
description: "OS-level password for logging into the jump server host"
type: "string"
airbyte_secret: true
order: 4
supportsIncremental: true
supportsNormalization: false
supportsDBT: false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION destination-mongodb-strict-encrypt

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.1.7
LABEL io.airbyte.version=0.1.8
LABEL io.airbyte.name=airbyte/destination-mongodb-strict-encrypt
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,19 @@ public class MongodbDestinationStrictEncrypt extends SpecModifyingDestination im
private static final Logger LOGGER = LoggerFactory.getLogger(MongodbDestinationStrictEncrypt.class);

public MongodbDestinationStrictEncrypt() {
super(new MongodbDestination());
super(MongodbDestination.sshWrappedDestination());
}

@Override
public ConnectorSpecification modifySpec(ConnectorSpecification originalSpec) throws Exception {
public ConnectorSpecification modifySpec(final ConnectorSpecification originalSpec) throws Exception {
final ConnectorSpecification spec = Jsons.clone(originalSpec);
// removing tls property for a standalone instance to disable possibility to switch off a tls
// connection
((ObjectNode) spec.getConnectionSpecification().get("properties").get("instance_type").get("oneOf").get(0).get("properties")).remove("tls");
return spec;
}

public static void main(String[] args) throws Exception {
public static void main(final String[] args) throws Exception {
final Destination destination = new MongodbDestinationStrictEncrypt();
LOGGER.info("starting destination: {}", MongodbDestinationStrictEncrypt.class);
new IntegrationRunner(destination).run(args);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION destination-mongodb

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.1.7
LABEL io.airbyte.version=0.1.8
LABEL io.airbyte.name=airbyte/destination-mongodb
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.airbyte.integrations.base.AirbyteTraceMessageUtility;
import io.airbyte.integrations.base.Destination;
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.integrations.base.ssh.SshWrappedDestination;
import io.airbyte.integrations.destination.mongodb.exception.MongodbDatabaseException;
import io.airbyte.protocol.models.AirbyteConnectionStatus;
import io.airbyte.protocol.models.AirbyteMessage;
Expand Down Expand Up @@ -61,12 +62,16 @@ public class MongodbDestination extends BaseConnector implements Destination {

private final MongodbNameTransformer namingResolver;

public static Destination sshWrappedDestination() {
return new SshWrappedDestination(new MongodbDestination(), JdbcUtils.HOST_LIST_KEY, JdbcUtils.PORT_LIST_KEY);
}

public MongodbDestination() {
namingResolver = new MongodbNameTransformer();
}

public static void main(final String[] args) throws Exception {
final Destination destination = new MongodbDestination();
final Destination destination = sshWrappedDestination();
LOGGER.info("starting destination: {}", MongodbDestination.class);
new IntegrationRunner(destination).run(args);
LOGGER.info("completed destination: {}", MongodbDestination.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.airbyte.integrations.standardtest.destination.comparator.AdvancedTestDataComparator;
import io.airbyte.integrations.standardtest.destination.comparator.TestDataComparator;
import io.airbyte.protocol.models.AirbyteConnectionStatus;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.bson.Document;
Expand All @@ -27,22 +28,22 @@

public class MongodbDestinationAcceptanceTest extends DestinationAcceptanceTest {

private static final String DOCKER_IMAGE_NAME = "mongo:4.0.10";
private static final String DATABASE_NAME = "admin";
protected static final String DOCKER_IMAGE_NAME = "mongo:4.0.10";
protected static final String DATABASE_NAME = "admin";
private static final String DATABASE_FAIL_NAME = "fail_db";
private static final String AUTH_TYPE = "auth_type";
private static final String AIRBYTE_DATA = "_airbyte_data";
protected static final String AUTH_TYPE = "auth_type";
protected static final String AIRBYTE_DATA = "_airbyte_data";

private MongoDBContainer container;
private final MongodbNameTransformer namingResolver = new MongodbNameTransformer();
protected final MongodbNameTransformer namingResolver = new MongodbNameTransformer();

@Override
protected String getImageName() {
return "airbyte/destination-mongodb:dev";
}

@Override
protected JsonNode getConfig() {
protected JsonNode getConfig() throws Exception {
return Jsons.jsonNode(ImmutableMap.builder()
.put(JdbcUtils.HOST_KEY, container.getHost())
.put(JdbcUtils.PORT_KEY, container.getFirstMappedPort())
Expand All @@ -52,7 +53,7 @@ protected JsonNode getConfig() {
}

@Override
protected JsonNode getFailCheckConfig() {
protected JsonNode getFailCheckConfig() throws Exception {
return Jsons.jsonNode(ImmutableMap.builder()
.put(JdbcUtils.HOST_KEY, container.getHost())
.put(JdbcUtils.PORT_KEY, container.getFirstMappedPort())
Expand Down Expand Up @@ -110,54 +111,72 @@ protected List<JsonNode> retrieveRecords(final TestDestinationEnv testEnv,
*/
@Test
void testCheckIncorrectPasswordFailure() {
final JsonNode invalidConfig = getFailCheckConfig();
((ObjectNode) invalidConfig).put(JdbcUtils.DATABASE_KEY, DATABASE_NAME);
((ObjectNode) invalidConfig.get(AUTH_TYPE)).put(JdbcUtils.PASSWORD_KEY, "fake");
final MongodbDestination destination = new MongodbDestination();
final AirbyteConnectionStatus status = destination.check(invalidConfig);
assertEquals(AirbyteConnectionStatus.Status.FAILED, status.getStatus());
assertTrue(status.getMessage().contains("State code: 18"));
try {
final JsonNode invalidConfig = getFailCheckConfig();
((ObjectNode) invalidConfig).put(JdbcUtils.DATABASE_KEY, DATABASE_NAME);
((ObjectNode) invalidConfig.get(AUTH_TYPE)).put(JdbcUtils.PASSWORD_KEY, "fake");
final MongodbDestination destination = new MongodbDestination();
final AirbyteConnectionStatus status = destination.check(invalidConfig);
assertEquals(AirbyteConnectionStatus.Status.FAILED, status.getStatus());
} catch (final Exception e) {
assertTrue(e instanceof IOException);
}
}

@Test
public void testCheckIncorrectUsernameFailure() {
final JsonNode invalidConfig = getFailCheckConfig();
((ObjectNode) invalidConfig).put(JdbcUtils.DATABASE_KEY, DATABASE_NAME);
((ObjectNode) invalidConfig.get(AUTH_TYPE)).put(JdbcUtils.USERNAME_KEY, "fakeusername");
final MongodbDestination destination = new MongodbDestination();
final AirbyteConnectionStatus status = destination.check(invalidConfig);
assertEquals(AirbyteConnectionStatus.Status.FAILED, status.getStatus());
assertTrue(status.getMessage().contains("State code: 18"));
try {
final JsonNode invalidConfig = getFailCheckConfig();
((ObjectNode) invalidConfig).put(JdbcUtils.DATABASE_KEY, DATABASE_NAME);
((ObjectNode) invalidConfig.get(AUTH_TYPE)).put(JdbcUtils.USERNAME_KEY, "fakeusername");
final MongodbDestination destination = new MongodbDestination();
final AirbyteConnectionStatus status = destination.check(invalidConfig);
assertEquals(AirbyteConnectionStatus.Status.FAILED, status.getStatus());
} catch (final Exception e) {
assertTrue(e instanceof IOException);
}

}

@Test
public void testCheckIncorrectDataBaseFailure() {
final JsonNode invalidConfig = getFailCheckConfig();
((ObjectNode) invalidConfig).put(JdbcUtils.DATABASE_KEY, DATABASE_FAIL_NAME);
final MongodbDestination destination = new MongodbDestination();
final AirbyteConnectionStatus status = destination.check(invalidConfig);
assertEquals(AirbyteConnectionStatus.Status.FAILED, status.getStatus());
assertTrue(status.getMessage().contains("State code: 18"));
try {
final JsonNode invalidConfig = getFailCheckConfig();
((ObjectNode) invalidConfig).put(JdbcUtils.DATABASE_KEY, DATABASE_FAIL_NAME);
final MongodbDestination destination = new MongodbDestination();
final AirbyteConnectionStatus status = destination.check(invalidConfig);
assertEquals(AirbyteConnectionStatus.Status.FAILED, status.getStatus());
} catch (final Exception e) {
assertTrue(e instanceof IOException);
}

}

@Test
public void testCheckIncorrectHost() {
final JsonNode invalidConfig = getConfig();
((ObjectNode) invalidConfig).put(JdbcUtils.HOST_KEY, "localhost2");
final MongodbDestination destination = new MongodbDestination();
final AirbyteConnectionStatus status = destination.check(invalidConfig);
assertEquals(AirbyteConnectionStatus.Status.FAILED, status.getStatus());
assertTrue(status.getMessage().contains("State code: -3"));
try {
final JsonNode invalidConfig = getConfig();
((ObjectNode) invalidConfig).put(JdbcUtils.HOST_KEY, "localhost2");
final MongodbDestination destination = new MongodbDestination();
final AirbyteConnectionStatus status = destination.check(invalidConfig);
assertEquals(AirbyteConnectionStatus.Status.FAILED, status.getStatus());
} catch (final Exception e) {
assertTrue(e instanceof IOException);
}

}

@Test
public void testCheckIncorrectPort() {
final JsonNode invalidConfig = getConfig();
((ObjectNode) invalidConfig).put(JdbcUtils.PORT_KEY, 1234);
final MongodbDestination destination = new MongodbDestination();
final AirbyteConnectionStatus status = destination.check(invalidConfig);
assertEquals(AirbyteConnectionStatus.Status.FAILED, status.getStatus());
assertTrue(status.getMessage().contains("State code: -3"));
try {
final JsonNode invalidConfig = getConfig();
((ObjectNode) invalidConfig).put(JdbcUtils.PORT_KEY, 1234);
final MongodbDestination destination = new MongodbDestination();
final AirbyteConnectionStatus status = destination.check(invalidConfig);
assertEquals(AirbyteConnectionStatus.Status.FAILED, status.getStatus());
} catch (final Exception e) {
assertTrue(e instanceof IOException);
}
}

@Override
Expand All @@ -174,13 +193,13 @@ protected void tearDown(final TestDestinationEnv testEnv) {

/* Helpers */

private JsonNode getAuthTypeConfig() {
protected JsonNode getAuthTypeConfig() {
return Jsons.deserialize("{\n"
+ " \"authorization\": \"none\"\n"
+ "}");
}

private MongoDatabase getMongoDatabase(final String host, final int port, final String databaseName) {
protected MongoDatabase getMongoDatabase(final String host, final int port, final String databaseName) {
try {
final String connectionString = String.format("mongodb://%s:%s/", host, port);
return new MongoDatabase(connectionString, databaseName);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.mongodb;

import io.airbyte.integrations.base.ssh.SshTunnel;
import io.airbyte.integrations.base.ssh.SshTunnel.TunnelMethod;

public class SshKeyMongoDbDestinationAcceptanceTest extends SshMongoDbDestinationAcceptanceTest {

@Override
public SshTunnel.TunnelMethod getTunnelMethod() {
return TunnelMethod.SSH_KEY_AUTH;
}
}
Loading

0 comments on commit cd33eff

Please sign in to comment.