Skip to content

Commit

Permalink
16250 Destination Redis: Add SSH support (airbytehq#17951)
Browse files Browse the repository at this point in the history
* 16250 Destination Redis: Add SSH support

* 16250 Resolve port issue

* 11679 Bump version

* auto-bump connector version

Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
  • Loading branch information
2 people authored and jhammarstedt committed Oct 31, 2022
1 parent 224ace1 commit 8aa8506
Show file tree
Hide file tree
Showing 9 changed files with 283 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@
- name: Redis
destinationDefinitionId: d4d3fef9-e319-45c2-881a-bd02ce44cc9f
dockerRepository: airbyte/destination-redis
dockerImageTag: 0.1.2
dockerImageTag: 0.1.3
documentationUrl: https://docs.airbyte.com/integrations/destinations/redis
icon: redis.svg
releaseStage: alpha
Expand Down
103 changes: 102 additions & 1 deletion airbyte-config/init/src/main/resources/seed/destination_specs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4238,7 +4238,7 @@
supportsDBT: false
supported_destination_sync_modes:
- "append"
- dockerImage: "airbyte/destination-redis:0.1.2"
- dockerImage: "airbyte/destination-redis:0.1.3"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/destinations/redis"
connectionSpecification:
Expand Down Expand Up @@ -4287,6 +4287,107 @@
enum:
- "hash"
order: 5
tunnel_method:
type: "object"
title: "SSH Tunnel Method"
description: "Whether to initiate an SSH tunnel before connecting to the\
\ database, and if so, which kind of authentication to use."
oneOf:
- title: "No Tunnel"
required:
- "tunnel_method"
properties:
tunnel_method:
description: "No ssh tunnel needed to connect to database"
type: "string"
const: "NO_TUNNEL"
order: 0
- title: "SSH Key Authentication"
required:
- "tunnel_method"
- "tunnel_host"
- "tunnel_port"
- "tunnel_user"
- "ssh_key"
properties:
tunnel_method:
description: "Connect through a jump server tunnel host using username\
\ and ssh key"
type: "string"
const: "SSH_KEY_AUTH"
order: 0
tunnel_host:
title: "SSH Tunnel Jump Server Host"
description: "Hostname of the jump server host that allows inbound\
\ ssh tunnel."
type: "string"
order: 1
tunnel_port:
title: "SSH Connection Port"
description: "Port on the proxy/jump server that accepts inbound ssh\
\ connections."
type: "integer"
minimum: 0
maximum: 65536
default: 22
examples:
- "22"
order: 2
tunnel_user:
title: "SSH Login Username"
description: "OS-level username for logging into the jump server host."
type: "string"
order: 3
ssh_key:
title: "SSH Private Key"
description: "OS-level user account ssh key credentials in RSA PEM\
\ format ( created with ssh-keygen -t rsa -m PEM -f myuser_rsa )"
type: "string"
airbyte_secret: true
multiline: true
order: 4
- title: "Password Authentication"
required:
- "tunnel_method"
- "tunnel_host"
- "tunnel_port"
- "tunnel_user"
- "tunnel_user_password"
properties:
tunnel_method:
description: "Connect through a jump server tunnel host using username\
\ and password authentication"
type: "string"
const: "SSH_PASSWORD_AUTH"
order: 0
tunnel_host:
title: "SSH Tunnel Jump Server Host"
description: "Hostname of the jump server host that allows inbound\
\ ssh tunnel."
type: "string"
order: 1
tunnel_port:
title: "SSH Connection Port"
description: "Port on the proxy/jump server that accepts inbound ssh\
\ connections."
type: "integer"
minimum: 0
maximum: 65536
default: 22
examples:
- "22"
order: 2
tunnel_user:
title: "SSH Login Username"
description: "OS-level username for logging into the jump server host"
type: "string"
order: 3
tunnel_user_password:
title: "Password"
description: "OS-level password for logging into the jump server host"
type: "string"
airbyte_secret: true
order: 4
supportsIncremental: true
supportsNormalization: false
supportsDBT: false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ public void stopAndCloseContainers(final JdbcDatabaseContainer<?> db) {
db.close();
}

public void stopAndClose() {
bastion.close();
}

public GenericContainer getContainer() {
return bastion;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION destination-redis

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.1.2
LABEL io.airbyte.version=0.1.3
LABEL io.airbyte.name=airbyte/destination-redis
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@
import io.airbyte.integrations.base.AirbyteMessageConsumer;
import io.airbyte.integrations.base.Destination;
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.integrations.base.ssh.SshWrappedDestination;
import io.airbyte.protocol.models.AirbyteConnectionStatus;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import java.util.List;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -21,7 +23,14 @@ class RedisDestination extends BaseConnector implements Destination {
private static final Logger LOGGER = LoggerFactory.getLogger(RedisDestination.class);

public static void main(String[] args) throws Exception {
new IntegrationRunner(new RedisDestination()).run(args);
LOGGER.info("starting destination: {}", RedisDestination.class);
final Destination destination = RedisDestination.sshWrappedDestination();
new IntegrationRunner(destination).run(args);
LOGGER.info("completed destination: {}", RedisDestination.class);
}

public static Destination sshWrappedDestination() {
return new SshWrappedDestination(new RedisDestination(), List.of("host"), List.of("port"));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.redis;

import io.airbyte.integrations.base.ssh.SshTunnel;

public class SshKeyRedisDestinationAcceptanceTest extends SshRedisDestinationAcceptanceTest {

@Override
public SshTunnel.TunnelMethod getTunnelMethod() {
return SshTunnel.TunnelMethod.SSH_KEY_AUTH;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.redis;

import io.airbyte.integrations.base.ssh.SshTunnel;

public class SshPasswordRedisDestinationAcceptanceTest extends SshRedisDestinationAcceptanceTest {

@Override
public SshTunnel.TunnelMethod getTunnelMethod() {
return SshTunnel.TunnelMethod.SSH_PASSWORD_AUTH;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.redis;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.base.ssh.SshBastionContainer;
import io.airbyte.integrations.base.ssh.SshTunnel;
import io.airbyte.integrations.destination.redis.RedisContainerInitializr.RedisContainer;
import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest;
import io.airbyte.integrations.standardtest.destination.comparator.AdvancedTestDataComparator;
import io.airbyte.integrations.standardtest.destination.comparator.TestDataComparator;
import io.airbyte.integrations.util.HostPortResolver;
import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.testcontainers.containers.Network;

public abstract class SshRedisDestinationAcceptanceTest extends DestinationAcceptanceTest {

private static final SshBastionContainer bastion = new SshBastionContainer();
private static final Network network = Network.newNetwork();
private static RedisContainerInitializr.RedisContainer redisContainer;
private JsonNode configJson;
private RedisCache redisCache;
private RedisNameTransformer redisNameTransformer;

@BeforeAll
static void initContainers() {
redisContainer = new RedisContainer()
.withExposedPorts(6379)
.withNetwork(network);
redisContainer.start();
bastion.initAndStartBastion(network);
}

@AfterAll
static void stop() {
redisContainer.close();
bastion.stopAndClose();
}

public abstract SshTunnel.TunnelMethod getTunnelMethod();

@Override
protected void setup(TestDestinationEnv testEnv) {
configJson = RedisDataFactory.jsonConfig(
redisContainer.getHost(),
redisContainer.getFirstMappedPort());
var redisConfig = new RedisConfig(configJson);
redisCache = new RedisHCache(redisConfig);
redisNameTransformer = new RedisNameTransformer();
}

@Override
protected void tearDown(TestDestinationEnv testEnv) {
redisCache.flushAll();
}

@Override
protected String getImageName() {
return "airbyte/destination-redis:dev";
}

@Override
protected JsonNode getConfig() throws Exception {
return bastion.getTunnelConfig(getTunnelMethod(), ImmutableMap.builder()
.put("host", HostPortResolver.resolveIpAddress(redisContainer))
.put("port", redisContainer.getExposedPorts().get(0))
.put("username", configJson.get("username"))
.put("password", configJson.get("password"))
.put("cache_type", configJson.get("cache_type")));
}

@Override
protected JsonNode getFailCheckConfig() {
return RedisDataFactory.jsonConfig(
"127.0.0.9",
8080);
}

@Override
protected List<JsonNode> retrieveRecords(TestDestinationEnv testEnv,
String streamName,
String namespace,
JsonNode streamSchema) {
var key = redisNameTransformer.keyName(namespace, streamName);
return redisCache.getAll(key).stream()
.sorted(Comparator.comparing(RedisRecord::getTimestamp))
.map(RedisRecord::getData)
.map(Jsons::deserialize)
.collect(Collectors.toList());
}


@Override
protected boolean implementsNamespaces() {
return true;
}

@Override
protected TestDataComparator getTestDataComparator() {
return new AdvancedTestDataComparator();
}

@Override
protected boolean supportBasicDataTypeTest() {
return true;
}

@Override
protected boolean supportArrayDataTypeTest() {
return true;
}

@Override
protected boolean supportObjectDataTypeTest() {
return true;
}

}
8 changes: 7 additions & 1 deletion docs/integrations/destinations/redis.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,10 @@ save snapshots periodically on disk.

### Setup guide

######TODO: more info, screenshots?, etc...
######TODO: more info, screenshots?, etc...

## Changelog

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:------------------|
| 0.1.3 | 2022-10-18 | [\#17951](https://github.com/airbytehq/airbyte/pull/17951) | Add SSH support |

0 comments on commit 8aa8506

Please sign in to comment.