diff --git a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml index 71c2d7305fd10..95292e8f19498 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml @@ -100,7 +100,7 @@ - destinationDefinitionId: 68f351a7-2745-4bef-ad7f-996b8e51bb8c name: ElasticSearch dockerRepository: airbyte/destination-elasticsearch - dockerImageTag: 0.1.3 + dockerImageTag: 0.1.4 documentationUrl: https://docs.airbyte.com/integrations/destinations/elasticsearch icon: elasticsearch.svg releaseStage: alpha diff --git a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml index 0c3794a58c192..88ae685a454cd 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml @@ -1654,7 +1654,7 @@ supported_destination_sync_modes: - "overwrite" - "append" -- dockerImage: "airbyte/destination-elasticsearch:0.1.3" +- dockerImage: "airbyte/destination-elasticsearch:0.1.4" spec: documentationUrl: "https://docs.airbyte.com/integrations/destinations/elasticsearch" connectionSpecification: @@ -1733,6 +1733,107 @@ \ server" type: "string" airbyte_secret: true + tunnel_method: + type: "object" + title: "SSH Tunnel Method" + description: "Whether to initiate an SSH tunnel before connecting to the\ + \ database, and if so, which kind of authentication to use." + oneOf: + - title: "No Tunnel" + required: + - "tunnel_method" + properties: + tunnel_method: + description: "No ssh tunnel needed to connect to database" + type: "string" + const: "NO_TUNNEL" + order: 0 + - title: "SSH Key Authentication" + required: + - "tunnel_method" + - "tunnel_host" + - "tunnel_port" + - "tunnel_user" + - "ssh_key" + properties: + tunnel_method: + description: "Connect through a jump server tunnel host using username\ + \ and ssh key" + type: "string" + const: "SSH_KEY_AUTH" + order: 0 + tunnel_host: + title: "SSH Tunnel Jump Server Host" + description: "Hostname of the jump server host that allows inbound\ + \ ssh tunnel." + type: "string" + order: 1 + tunnel_port: + title: "SSH Connection Port" + description: "Port on the proxy/jump server that accepts inbound ssh\ + \ connections." + type: "integer" + minimum: 0 + maximum: 65536 + default: 22 + examples: + - "22" + order: 2 + tunnel_user: + title: "SSH Login Username" + description: "OS-level username for logging into the jump server host." + type: "string" + order: 3 + ssh_key: + title: "SSH Private Key" + description: "OS-level user account ssh key credentials in RSA PEM\ + \ format ( created with ssh-keygen -t rsa -m PEM -f myuser_rsa )" + type: "string" + airbyte_secret: true + multiline: true + order: 4 + - title: "Password Authentication" + required: + - "tunnel_method" + - "tunnel_host" + - "tunnel_port" + - "tunnel_user" + - "tunnel_user_password" + properties: + tunnel_method: + description: "Connect through a jump server tunnel host using username\ + \ and password authentication" + type: "string" + const: "SSH_PASSWORD_AUTH" + order: 0 + tunnel_host: + title: "SSH Tunnel Jump Server Host" + description: "Hostname of the jump server host that allows inbound\ + \ ssh tunnel." + type: "string" + order: 1 + tunnel_port: + title: "SSH Connection Port" + description: "Port on the proxy/jump server that accepts inbound ssh\ + \ connections." + type: "integer" + minimum: 0 + maximum: 65536 + default: 22 + examples: + - "22" + order: 2 + tunnel_user: + title: "SSH Login Username" + description: "OS-level username for logging into the jump server host" + type: "string" + order: 3 + tunnel_user_password: + title: "Password" + description: "OS-level password for logging into the jump server host" + type: "string" + airbyte_secret: true + order: 4 supportsIncremental: true supportsNormalization: false supportsDBT: false diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/ssh/SshTunnel.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/ssh/SshTunnel.java index 8a4ba2cb02a06..1c694def8e9aa 100644 --- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/ssh/SshTunnel.java +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/ssh/SshTunnel.java @@ -10,12 +10,16 @@ import io.airbyte.commons.functional.CheckedFunction; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.string.Strings; +import io.airbyte.integrations.base.AirbyteTraceMessageUtility; import java.io.IOException; import java.io.StringReader; import java.net.InetSocketAddress; +import java.net.MalformedURLException; +import java.net.URL; import java.security.GeneralSecurityException; import java.security.KeyPair; import java.time.Duration; +import java.util.Arrays; import java.util.List; import org.apache.sshd.client.SshClient; import org.apache.sshd.client.keyverifier.AcceptAllServerKeyVerifier; @@ -54,10 +58,13 @@ public enum TunnelMethod { private final int tunnelPort; private final String tunnelUser; private final String sshKey; + private final String endPointKey; + private final String remoteServiceProtocol; + private final String remoteServicePath; private final String tunnelUserPassword; - private final String remoteDatabaseHost; - private final int remoteDatabasePort; - private int tunnelDatabasePort; + private final String remoteServiceHost; + private final int remoteServicePort; + protected int tunnelLocalPort; private SshClient sshclient; private ClientSession tunnelSession; @@ -69,6 +76,10 @@ public enum TunnelMethod { * in the config remoteDatabaseHost is found. * @param portKey - a list of keys that point to the database port. should be pointing to where in * the config remoteDatabasePort is found. + * @param endPointKey - key that points to the endpoint URL (this is commonly used for REST-based + * services such as Elastic and MongoDB) + * @param remoteServiceUrl - URL of the remote endpoint (this is commonly used for REST-based * + * services such as Elastic and MongoDB) * @param tunnelMethod - the type of ssh method that should be used (includes not using SSH at all). * @param tunnelHost - host name of the machine to which we will establish an ssh connection (e.g. * hostname of the bastion). @@ -79,25 +90,28 @@ public enum TunnelMethod { * using tunnelUserPassword instead. * @param tunnelUserPassword - the password for the tunnelUser. can be null if we are using sshKey * instead. - * @param remoteDatabaseHost - the actual host name of the database (as it is known to the tunnel + * @param remoteServiceHost - the actual host name of the remote service (as it is known to the + * tunnel host). + * @param remoteServicePort - the actual port of the remote service (as it is known to the tunnel * host). - * @param remoteDatabasePort - the actual port of the database (as it is known to the tunnel host). */ public SshTunnel(final JsonNode config, final List hostKey, final List portKey, + final String endPointKey, + final String remoteServiceUrl, final TunnelMethod tunnelMethod, final String tunnelHost, final int tunnelPort, final String tunnelUser, final String sshKey, final String tunnelUserPassword, - final String remoteDatabaseHost, - final int remoteDatabasePort) { + final String remoteServiceHost, + final int remoteServicePort) { this.config = config; this.hostKey = hostKey; this.portKey = portKey; - + this.endPointKey = endPointKey; Preconditions.checkNotNull(tunnelMethod); this.tunnelMethod = tunnelMethod; @@ -107,8 +121,10 @@ public SshTunnel(final JsonNode config, this.tunnelUser = null; this.sshKey = null; this.tunnelUserPassword = null; - this.remoteDatabaseHost = null; - this.remoteDatabasePort = 0; + this.remoteServiceHost = null; + this.remoteServicePort = 0; + this.remoteServiceProtocol = null; + this.remoteServicePath = null; } else { Preconditions.checkNotNull(tunnelHost); Preconditions.checkArgument(tunnelPort > 0); @@ -119,17 +135,34 @@ public SshTunnel(final JsonNode config, if (tunnelMethod.equals(TunnelMethod.SSH_PASSWORD_AUTH)) { Preconditions.checkNotNull(tunnelUserPassword); } - Preconditions.checkNotNull(remoteDatabaseHost); - Preconditions.checkArgument(remoteDatabasePort > 0); + // must provide either host/port or endpoint + Preconditions.checkArgument((hostKey != null && portKey != null) || endPointKey != null); + Preconditions.checkArgument((remoteServiceHost != null && remoteServicePort > 0) || remoteServiceUrl != null); + if (remoteServiceUrl != null) { + URL urlObject = null; + try { + urlObject = new URL(remoteServiceUrl); + } catch (MalformedURLException e) { + AirbyteTraceMessageUtility.emitConfigErrorTrace(e, + String.format("Provided value for remote service URL is not valid: %s", remoteServiceUrl)); + } + Preconditions.checkNotNull(urlObject, "Failed to parse URL of remote service"); + this.remoteServiceHost = urlObject.getHost(); + this.remoteServicePort = urlObject.getPort(); + this.remoteServiceProtocol = urlObject.getProtocol(); + this.remoteServicePath = urlObject.getPath(); + } else { + this.remoteServiceProtocol = null; + this.remoteServicePath = null; + this.remoteServiceHost = remoteServiceHost; + this.remoteServicePort = remoteServicePort; + } this.tunnelHost = tunnelHost; this.tunnelPort = tunnelPort; this.tunnelUser = tunnelUser; this.sshKey = sshKey; this.tunnelUserPassword = tunnelUserPassword; - this.remoteDatabaseHost = remoteDatabaseHost; - this.remoteDatabasePort = remoteDatabasePort; - this.sshclient = createClient(); this.tunnelSession = openTunnel(sshclient); } @@ -139,42 +172,37 @@ public JsonNode getOriginalConfig() { return config; } - public JsonNode getConfigInTunnel() { + public JsonNode getConfigInTunnel() throws Exception { if (tunnelMethod.equals(TunnelMethod.NO_TUNNEL)) { return getOriginalConfig(); } else { final JsonNode clone = Jsons.clone(config); - Jsons.replaceNestedString(clone, hostKey, SshdSocketAddress.LOCALHOST_ADDRESS.getHostName()); - Jsons.replaceNestedInt(clone, portKey, tunnelDatabasePort); + if (hostKey != null) { + Jsons.replaceNestedString(clone, hostKey, SshdSocketAddress.LOCALHOST_ADDRESS.getHostName()); + } + if (portKey != null) { + Jsons.replaceNestedInt(clone, portKey, tunnelLocalPort); + } + if (endPointKey != null) { + URL tunnelEndPointURL = new URL(remoteServiceProtocol, SshdSocketAddress.LOCALHOST_ADDRESS.getHostName(), tunnelLocalPort, remoteServicePath); + Jsons.replaceNestedString(clone, Arrays.asList(endPointKey), tunnelEndPointURL.toString()); + } return clone; } } - // /** - // * Finds a free port on the machine. As soon as this method returns, it is possible for process to - // bind to this port. Thus it only gives a guarantee that at the time - // */ - // private static int findFreePort() { - // // finds an available port. - // try (final var socket = new ServerSocket(0)) { - // return socket.getLocalPort(); - // } catch (final IOException e) { - // throw new RuntimeException(e); - // } - // } - public static SshTunnel getInstance(final JsonNode config, final List hostKey, final List portKey) { final TunnelMethod tunnelMethod = Jsons.getOptional(config, "tunnel_method", "tunnel_method") .map(method -> TunnelMethod.valueOf(method.asText().trim())) .orElse(TunnelMethod.NO_TUNNEL); LOGGER.info("Starting connection with method: {}", tunnelMethod); - // final int localPort = findFreePort(); - return new SshTunnel( config, hostKey, portKey, + null, + null, tunnelMethod, Strings.safeTrim(Jsons.getStringOrNull(config, "tunnel_method", "tunnel_host")), Jsons.getIntOrZero(config, "tunnel_method", "tunnel_port"), @@ -185,6 +213,27 @@ public static SshTunnel getInstance(final JsonNode config, final List ho Jsons.getIntOrZero(config, portKey)); } + public static SshTunnel getInstance(final JsonNode config, final String endPointKey) throws Exception { + final TunnelMethod tunnelMethod = Jsons.getOptional(config, "tunnel_method", "tunnel_method") + .map(method -> TunnelMethod.valueOf(method.asText().trim())) + .orElse(TunnelMethod.NO_TUNNEL); + LOGGER.info("Starting connection with method: {}", tunnelMethod); + + return new SshTunnel( + config, + null, + null, + endPointKey, + Jsons.getStringOrNull(config, endPointKey), + tunnelMethod, + Strings.safeTrim(Jsons.getStringOrNull(config, "tunnel_method", "tunnel_host")), + Jsons.getIntOrZero(config, "tunnel_method", "tunnel_port"), + Strings.safeTrim(Jsons.getStringOrNull(config, "tunnel_method", "tunnel_user")), + Strings.safeTrim(Jsons.getStringOrNull(config, "tunnel_method", "ssh_key")), + Strings.safeTrim(Jsons.getStringOrNull(config, "tunnel_method", "tunnel_user_password")), + null, 0); + } + public static void sshWrap(final JsonNode config, final List hostKey, final List portKey, @@ -196,6 +245,16 @@ public static void sshWrap(final JsonNode config, }); } + public static void sshWrap(final JsonNode config, + final String endPointKey, + final CheckedConsumer wrapped) + throws Exception { + sshWrap(config, endPointKey, (configInTunnel) -> { + wrapped.accept(configInTunnel); + return null; + }); + } + public static T sshWrap(final JsonNode config, final List hostKey, final List portKey, @@ -206,6 +265,15 @@ public static T sshWrap(final JsonNode config, } } + public static T sshWrap(final JsonNode config, + final String endPointKey, + final CheckedFunction wrapped) + throws Exception { + try (final SshTunnel sshTunnel = SshTunnel.getInstance(config, endPointKey)) { + return wrapped.apply(sshTunnel.getConfigInTunnel()); + } + } + /** * Closes a tunnel if one was open, and otherwise doesn't do anything (safe to run). */ @@ -281,13 +349,13 @@ ClientSession openTunnel(final SshClient client) { final SshdSocketAddress address = session.startLocalPortForwarding( // entering 0 lets the OS pick a free port for us. new SshdSocketAddress(InetSocketAddress.createUnresolved(SshdSocketAddress.LOCALHOST_ADDRESS.getHostName(), 0)), - new SshdSocketAddress(remoteDatabaseHost, remoteDatabasePort)); + new SshdSocketAddress(remoteServiceHost, remoteServicePort)); // discover the port that the OS picked and remember it so that we can use it when we try to connect - // later. - tunnelDatabasePort = address.getPort(); + tunnelLocalPort = address.getPort(); - LOGGER.info("Established tunneling session. Port forwarding started on " + address.toInetSocketAddress()); + LOGGER.info(String.format("Established tunneling session to %s:%d. Port forwarding started on %s ", + remoteServiceHost, remoteServicePort, address.toInetSocketAddress())); return session; } catch (final IOException | GeneralSecurityException e) { throw new RuntimeException(e); @@ -303,9 +371,9 @@ public String toString() { ", tunnelHost='" + tunnelHost + '\'' + ", tunnelPort=" + tunnelPort + ", tunnelUser='" + tunnelUser + '\'' + - ", remoteDatabaseHost='" + remoteDatabaseHost + '\'' + - ", remoteDatabasePort=" + remoteDatabasePort + - ", tunnelDatabasePort=" + tunnelDatabasePort + + ", remoteServiceHost='" + remoteServiceHost + '\'' + + ", remoteServicePort=" + remoteServicePort + + ", tunnelLocalPort=" + tunnelLocalPort + '}'; } diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/ssh/SshWrappedDestination.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/ssh/SshWrappedDestination.java index b7397b3389d74..9dfea5c377291 100644 --- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/ssh/SshWrappedDestination.java +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/ssh/SshWrappedDestination.java @@ -30,6 +30,7 @@ public class SshWrappedDestination implements Destination { private final Destination delegate; private final List hostKey; private final List portKey; + private final String endPointKey; public SshWrappedDestination(final Destination delegate, final List hostKey, @@ -37,6 +38,15 @@ public SshWrappedDestination(final Destination delegate, this.delegate = delegate; this.hostKey = hostKey; this.portKey = portKey; + this.endPointKey = null; + } + + public SshWrappedDestination(final Destination delegate, + String endPointKey) { + this.delegate = delegate; + this.endPointKey = endPointKey; + this.portKey = null; + this.hostKey = null; } @Override @@ -50,7 +60,8 @@ public ConnectorSpecification spec() throws Exception { @Override public AirbyteConnectionStatus check(final JsonNode config) throws Exception { - return SshTunnel.sshWrap(config, hostKey, portKey, delegate::check); + return (endPointKey != null) ? SshTunnel.sshWrap(config, endPointKey, delegate::check) + : SshTunnel.sshWrap(config, hostKey, portKey, delegate::check); } @Override @@ -58,7 +69,8 @@ public AirbyteMessageConsumer getConsumer(final JsonNode config, final ConfiguredAirbyteCatalog catalog, final Consumer outputRecordCollector) throws Exception { - final SshTunnel tunnel = SshTunnel.getInstance(config, hostKey, portKey); + final SshTunnel tunnel = (endPointKey != null) ? SshTunnel.getInstance(config, endPointKey) : SshTunnel.getInstance(config, hostKey, portKey); + final AirbyteMessageConsumer delegateConsumer; try { delegateConsumer = delegate.getConsumer(tunnel.getConfigInTunnel(), catalog, outputRecordCollector); diff --git a/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/base/ssh/SshTunnelTest.java b/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/base/ssh/SshTunnelTest.java index 59bb1ff39d789..314acd532e179 100644 --- a/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/base/ssh/SshTunnelTest.java +++ b/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/base/ssh/SshTunnelTest.java @@ -4,11 +4,14 @@ package io.airbyte.integrations.base.ssh; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import io.airbyte.commons.json.Jsons; import io.airbyte.integrations.base.ssh.SshTunnel.TunnelMethod; import java.nio.charset.StandardCharsets; import java.security.KeyPair; @@ -70,19 +73,88 @@ class SshTunnelTest { + "5WlcOxfV1wZuaM0fOd+PBmIlFEE7Uf6AY/UahBAxaFV2+twgK9GCDcu1t4Ye9wZ9kZ4Nal\\n" + "0fkKD4uN4DRO8hAAAAFm10dWhhaUBrYnAxLWxocC1hMTQ1MzMBAgME\\n" + "-----END OPENSSH PRIVATE KEY-----"; - private static final String CONFIG = + private static final String HOST_PORT_CONFIG = "{\"ssl\":true,\"host\":\"fakehost.com\",\"port\":5432,\"schema\":\"public\",\"database\":\"postgres\",\"password\":\"\",\"username\":\"postgres\",\"tunnel_method\":{\"ssh_key\":\"" + "%s" + "\",\"tunnel_host\":\"faketunnel.com\",\"tunnel_port\":22,\"tunnel_user\":\"ec2-user\",\"tunnel_method\":\"SSH_KEY_AUTH\"}}"; + private static final String URL_CONFIG_WITH_PORT = + "{\"ssl\":true,\"endpoint\":\"http://fakehost.com:9090/service\",\"password\":\"\",\"username\":\"restuser\",\"tunnel_method\":{\"ssh_key\":\"" + + "%s" + + "\",\"tunnel_host\":\"faketunnel.com\",\"tunnel_port\":22,\"tunnel_user\":\"ec2-user\",\"tunnel_method\":\"SSH_KEY_AUTH\"}}"; + + private static final String URL_CONFIG_NO_PORT = + "{\"ssl\":true,\"endpoint\":\"http://fakehost.com/service\",\"password\":\"\",\"username\":\"restuser\",\"tunnel_method\":{\"ssh_key\":\"" + + "%s" + + "\",\"tunnel_host\":\"faketunnel.com\",\"tunnel_port\":22,\"tunnel_user\":\"ec2-user\",\"tunnel_method\":\"SSH_KEY_AUTH\"}}"; + + /** + * This test verifies that OpenSsh correctly replaces values in connector configuration in a spec + * with host/port config and in a spec with endpoint URL config + * + * @param configString + * @throws Exception + */ + @ParameterizedTest + @ValueSource(strings = {HOST_PORT_CONFIG, URL_CONFIG_WITH_PORT, URL_CONFIG_NO_PORT}) + public void testConfigInTunnel(final String configString) throws Exception { + final JsonNode config = (new ObjectMapper()).readTree(String.format(configString, SSH_RSA_PRIVATE_KEY)); + String endPointURL = Jsons.getStringOrNull(config, "endpoint"); + final SshTunnel sshTunnel = new SshTunnel( + config, + endPointURL == null ? Arrays.asList(new String[] {"host"}) : null, + endPointURL == null ? Arrays.asList(new String[] {"port"}) : null, + endPointURL == null ? null : "endpoint", + endPointURL, + TunnelMethod.SSH_KEY_AUTH, + "faketunnel.com", + 22, + "tunnelUser", + SSH_RSA_PRIVATE_KEY, + "tunnelUserPassword", + endPointURL == null ? "fakeHost.com" : null, + endPointURL == null ? 5432 : 0) { + + @Override + ClientSession openTunnel(final SshClient client) { + tunnelLocalPort = 8080; + return null; // Prevent tunnel from attempting to connect + } + + }; + + final JsonNode configInTunnel = sshTunnel.getConfigInTunnel(); + if (endPointURL == null) { + assertTrue(configInTunnel.has("port")); + assertTrue(configInTunnel.has("host")); + assertFalse(configInTunnel.has("endpoint")); + assertEquals(8080, configInTunnel.get("port").asInt()); + assertEquals("127.0.0.1", configInTunnel.get("host").asText()); + } else { + assertFalse(configInTunnel.has("port")); + assertFalse(configInTunnel.has("host")); + assertTrue(configInTunnel.has("endpoint")); + assertEquals("http://127.0.0.1:8080/service", configInTunnel.get("endpoint").asText()); + } + } + + /** + * This test verifies that SshTunnel correctly extracts private key pairs from keys formatted as + * EdDSA and OpenSSH + * + * @param privateKey + * @throws Exception + */ @ParameterizedTest @ValueSource(strings = {SSH_ED25519_PRIVATE_KEY, SSH_RSA_PRIVATE_KEY}) public void getKeyPair(final String privateKey) throws Exception { - final JsonNode config = (new ObjectMapper()).readTree(String.format(CONFIG, privateKey)); + final JsonNode config = (new ObjectMapper()).readTree(String.format(HOST_PORT_CONFIG, privateKey)); final SshTunnel sshTunnel = new SshTunnel( config, Arrays.asList(new String[] {"host"}), Arrays.asList(new String[] {"port"}), + null, + null, TunnelMethod.SSH_KEY_AUTH, "faketunnel.com", 22, diff --git a/airbyte-integrations/connectors/destination-elasticsearch-strict-encrypt/Dockerfile b/airbyte-integrations/connectors/destination-elasticsearch-strict-encrypt/Dockerfile index 4f4ea189e0657..830bab62e8f2e 100644 --- a/airbyte-integrations/connectors/destination-elasticsearch-strict-encrypt/Dockerfile +++ b/airbyte-integrations/connectors/destination-elasticsearch-strict-encrypt/Dockerfile @@ -16,5 +16,5 @@ ENV APPLICATION destination-elasticsearch-strict-encrypt COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=0.1.3 +LABEL io.airbyte.version=0.1.4 LABEL io.airbyte.name=airbyte/destination-elasticsearch-strict-encrypt diff --git a/airbyte-integrations/connectors/destination-elasticsearch/Dockerfile b/airbyte-integrations/connectors/destination-elasticsearch/Dockerfile index 6f8d3f8d7ba9e..362a4df7beaf6 100644 --- a/airbyte-integrations/connectors/destination-elasticsearch/Dockerfile +++ b/airbyte-integrations/connectors/destination-elasticsearch/Dockerfile @@ -16,5 +16,5 @@ ENV APPLICATION destination-elasticsearch COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=0.1.3 +LABEL io.airbyte.version=0.1.4 LABEL io.airbyte.name=airbyte/destination-elasticsearch diff --git a/airbyte-integrations/connectors/destination-elasticsearch/src/main/java/io/airbyte/integrations/destination/elasticsearch/ElasticsearchDestination.java b/airbyte-integrations/connectors/destination-elasticsearch/src/main/java/io/airbyte/integrations/destination/elasticsearch/ElasticsearchDestination.java index 33382e9f299fc..5b5533e7d8b08 100644 --- a/airbyte-integrations/connectors/destination-elasticsearch/src/main/java/io/airbyte/integrations/destination/elasticsearch/ElasticsearchDestination.java +++ b/airbyte-integrations/connectors/destination-elasticsearch/src/main/java/io/airbyte/integrations/destination/elasticsearch/ElasticsearchDestination.java @@ -10,6 +10,7 @@ import io.airbyte.integrations.base.AirbyteMessageConsumer; import io.airbyte.integrations.base.Destination; import io.airbyte.integrations.base.IntegrationRunner; +import io.airbyte.integrations.base.ssh.SshWrappedDestination; import io.airbyte.protocol.models.*; import java.io.IOException; import java.util.ArrayList; @@ -25,12 +26,16 @@ public class ElasticsearchDestination extends BaseConnector implements Destinati private final ObjectMapper mapper = new ObjectMapper(); public static void main(String[] args) throws Exception { - final var destination = new ElasticsearchDestination(); + final var destination = sshWrappedDestination(); LOGGER.info("starting destination: {}", ElasticsearchDestination.class); new IntegrationRunner(destination).run(args); LOGGER.info("completed destination: {}", ElasticsearchDestination.class); } + public static Destination sshWrappedDestination() { + return new SshWrappedDestination(new ElasticsearchDestination(), "endpoint"); + } + @Override public AirbyteConnectionStatus check(JsonNode config) { final ConnectorConfiguration configObject = convertConfig(config); diff --git a/airbyte-integrations/connectors/destination-elasticsearch/src/test-integration/java/io/airbyte/integrations/destination/elasticsearch/ElasticsearchDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-elasticsearch/src/test-integration/java/io/airbyte/integrations/destination/elasticsearch/ElasticsearchDestinationAcceptanceTest.java index ca133e64a54be..ce37a6fee19a4 100644 --- a/airbyte-integrations/connectors/destination-elasticsearch/src/test-integration/java/io/airbyte/integrations/destination/elasticsearch/ElasticsearchDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-elasticsearch/src/test-integration/java/io/airbyte/integrations/destination/elasticsearch/ElasticsearchDestinationAcceptanceTest.java @@ -9,19 +9,14 @@ import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest; import io.airbyte.integrations.standardtest.destination.comparator.AdvancedTestDataComparator; import io.airbyte.integrations.standardtest.destination.comparator.TestDataComparator; -import java.io.IOException; import java.time.Duration; import java.util.List; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.testcontainers.elasticsearch.ElasticsearchContainer; public class ElasticsearchDestinationAcceptanceTest extends DestinationAcceptanceTest { - private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchDestinationAcceptanceTest.class); - private ObjectMapper mapper = new ObjectMapper(); private static ElasticsearchContainer container; @@ -87,16 +82,18 @@ protected TestDataComparator getTestDataComparator() { } @Override - protected JsonNode getConfig() { + protected JsonNode getConfig() throws Exception { var configJson = mapper.createObjectNode(); configJson.put("endpoint", String.format("http://%s:%s", container.getHost(), container.getMappedPort(9200))); return configJson; } @Override - protected JsonNode getFailCheckConfig() { + protected JsonNode getFailCheckConfig() throws Exception { // should result in a failed connection check - return mapper.createObjectNode(); + var configJson = mapper.createObjectNode(); + configJson.put("endpoint", String.format("htp::/%s:-%s", container.getHost(), container.getMappedPort(9200))); + return configJson; } @Override @@ -104,7 +101,7 @@ protected List retrieveRecords(TestDestinationEnv testEnv, String streamName, String namespace, JsonNode streamSchema) - throws IOException { + throws Exception { // Records returned from this method will be compared against records provided to the connector // to verify they were written correctly final String indexName = new ElasticsearchWriteConfig() @@ -117,12 +114,13 @@ protected List retrieveRecords(TestDestinationEnv testEnv, } @Override - protected void setup(TestDestinationEnv testEnv) {} + protected void setup(TestDestinationEnv testEnv) throws Exception {} @Override - protected void tearDown(TestDestinationEnv testEnv) { + protected void tearDown(TestDestinationEnv testEnv) throws Exception { ElasticsearchConnection connection = new ElasticsearchConnection(mapper.convertValue(getConfig(), ConnectorConfiguration.class)); connection.allIndices().forEach(connection::deleteIndexIfPresent); + connection.close(); } } diff --git a/airbyte-integrations/connectors/destination-elasticsearch/src/test-integration/java/io/airbyte/integrations/destination/elasticsearch/SshElasticsearchDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-elasticsearch/src/test-integration/java/io/airbyte/integrations/destination/elasticsearch/SshElasticsearchDestinationAcceptanceTest.java new file mode 100644 index 0000000000000..b3d19e4b6a48e --- /dev/null +++ b/airbyte-integrations/connectors/destination-elasticsearch/src/test-integration/java/io/airbyte/integrations/destination/elasticsearch/SshElasticsearchDestinationAcceptanceTest.java @@ -0,0 +1,64 @@ +package io.airbyte.integrations.destination.elasticsearch; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableMap; +import io.airbyte.commons.json.Jsons; +import io.airbyte.integrations.base.ssh.SshBastionContainer; +import io.airbyte.integrations.base.ssh.SshTunnel; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.testcontainers.containers.Network; +import org.testcontainers.elasticsearch.ElasticsearchContainer; + +public abstract class SshElasticsearchDestinationAcceptanceTest extends ElasticsearchDestinationAcceptanceTest { + private static final Network network = Network.newNetwork(); + private static final SshBastionContainer bastion = new SshBastionContainer(); + private static ElasticsearchContainer container; + private ObjectMapper mapper = new ObjectMapper(); + private final static String ELASTIC_PASSWORD = "MagicWord"; + + public abstract SshTunnel.TunnelMethod getTunnelMethod(); + + private String getEndPoint() { + return String.format("http://%s:%d", + container.getContainerInfo().getNetworkSettings() + .getNetworks() + .entrySet().stream().findFirst().get().getValue().getIpAddress(), + container.getExposedPorts().get(0)); + } + + @Override + protected JsonNode getConfig() throws Exception { + return bastion.getTunnelConfig(getTunnelMethod(), ImmutableMap.builder().put("endpoint", getEndPoint()) + .put("upsert", false) + .put("authenticationMethod", Jsons.jsonNode(ImmutableMap.builder().put("method", "basic") + .put("username", "elastic") + .put("password", ELASTIC_PASSWORD).build()))); + } + + @Override + protected JsonNode getFailCheckConfig() throws Exception { + // should result in a failed connection check + return bastion.getTunnelConfig(getTunnelMethod(), ImmutableMap.builder().put("endpoint", getEndPoint()) + .put("upsert", true) + .put("authenticationMethod", Jsons.jsonNode(ImmutableMap.builder().put("method", "basic") + .put("username", "elastic") + .put("password", "wrongpassword").build()))); + } + + @BeforeAll + public static void beforeAll() { + bastion.initAndStartBastion(network); + container = new ElasticsearchContainer("docker.elastic.co/elasticsearch/elasticsearch:7.15.1") + .withNetwork(network) + .withPassword(ELASTIC_PASSWORD); + container.start(); + } + + @AfterAll + public static void afterAll() { + container.close(); + bastion.getContainer().close(); + } +} diff --git a/airbyte-integrations/connectors/destination-elasticsearch/src/test-integration/java/io/airbyte/integrations/destination/elasticsearch/SshKeyElasticsearchDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-elasticsearch/src/test-integration/java/io/airbyte/integrations/destination/elasticsearch/SshKeyElasticsearchDestinationAcceptanceTest.java new file mode 100644 index 0000000000000..e9c94405ea32f --- /dev/null +++ b/airbyte-integrations/connectors/destination-elasticsearch/src/test-integration/java/io/airbyte/integrations/destination/elasticsearch/SshKeyElasticsearchDestinationAcceptanceTest.java @@ -0,0 +1,10 @@ +package io.airbyte.integrations.destination.elasticsearch; + +import io.airbyte.integrations.base.ssh.SshTunnel; + +public class SshKeyElasticsearchDestinationAcceptanceTest extends SshElasticsearchDestinationAcceptanceTest { + + public SshTunnel.TunnelMethod getTunnelMethod() { + return SshTunnel.TunnelMethod.SSH_KEY_AUTH; + } +} diff --git a/airbyte-integrations/connectors/destination-elasticsearch/src/test-integration/java/io/airbyte/integrations/destination/elasticsearch/SshPasswordElasticsearchDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-elasticsearch/src/test-integration/java/io/airbyte/integrations/destination/elasticsearch/SshPasswordElasticsearchDestinationAcceptanceTest.java new file mode 100644 index 0000000000000..62c83133b5bea --- /dev/null +++ b/airbyte-integrations/connectors/destination-elasticsearch/src/test-integration/java/io/airbyte/integrations/destination/elasticsearch/SshPasswordElasticsearchDestinationAcceptanceTest.java @@ -0,0 +1,11 @@ +package io.airbyte.integrations.destination.elasticsearch; + +import io.airbyte.integrations.base.ssh.SshTunnel; + +public class SshPasswordElasticsearchDestinationAcceptanceTest extends SshElasticsearchDestinationAcceptanceTest { + @Override + public SshTunnel.TunnelMethod getTunnelMethod() { + return SshTunnel.TunnelMethod.SSH_PASSWORD_AUTH; + } + +} diff --git a/docs/integrations/destinations/elasticsearch.md b/docs/integrations/destinations/elasticsearch.md index 2964b67cd403c..ccae1d4a4d571 100644 --- a/docs/integrations/destinations/elasticsearch.md +++ b/docs/integrations/destinations/elasticsearch.md @@ -36,12 +36,12 @@ This section should contain a table mapping each of the connector's data types t This section should contain a table with the following format: | Feature | Supported?(Yes/No) | Notes | -| :--- | :--- | :--- | -| Full Refresh Sync | yes | | -| Incremental Sync | yes | | -| Replicate Incremental Deletes | no | | -| SSL connection | yes | | -| SSH Tunnel Support | ?? | | +| :--- |:-------------------| :--- | +| Full Refresh Sync | yes | | +| Incremental Sync | yes | | +| Replicate Incremental Deletes | no | | +| SSL connection | yes | | +| SSH Tunnel Support | yes | | ### Performance considerations @@ -63,12 +63,33 @@ The connector should be enhanced to support variable batch sizes. ### Setup guide +Enter the endpoint URL, select authentication method, and whether to use 'upsert' method when indexing new documents. + +### Connection via SSH Tunnel + +Airbyte has the ability to connect to an Elastic instance via an SSH Tunnel. +The reason you might want to do this because it is not possible \(or against security policy\) to connect to your Elastic instance directly \(e.g. it does not have a public IP address\). + +When using an SSH tunnel, you are configuring Airbyte to connect to an intermediate server \(a.k.a. a bastion sever\) that _does_ have direct access to the Elastic instance. +Airbyte connects to the bastion and then asks the bastion to connect directly to the server. + +Using this feature requires additional configuration, when creating the source. We will talk through what each piece of configuration means. + +1. Configure all fields for the source as you normally would, except `SSH Tunnel Method`. +2. `SSH Tunnel Method` defaults to `No Tunnel` \(meaning a direct connection\). If you want to use an SSH Tunnel choose `SSH Key Authentication` or `Password Authentication`. + 1. Choose `Key Authentication` if you will be using an RSA private key as your secret for establishing the SSH Tunnel \(see below for more information on generating this key\). + 2. Choose `Password Authentication` if you will be using a password as your secret for establishing the SSH Tunnel. +3. `SSH Tunnel Jump Server Host` refers to the intermediate \(bastion\) server that Airbyte will connect to. This should be a hostname or an IP Address. +4. `SSH Connection Port` is the port on the bastion server with which to make the SSH connection. The default port for SSH connections is `22`, so unless you have explicitly changed something, go with the default. +5. `SSH Login Username` is the username that Airbyte should use when connection to the bastion server. This is NOT the TiDB username. +6. If you are using `Password Authentication`, then `SSH Login Username` should be set to the password of the User from the previous step. If you are using `SSH Key Authentication` TiDB password, but the password for the OS-user that Airbyte is using to perform commands on the bastion. +7. If you are using `SSH Key Authentication`, then `SSH Private Key` should be set to the RSA Private Key that you are using to create the SSH connection. This should be the full contents of the key file starting with `-----BEGIN RSA PRIVATE KEY-----` and ending with `-----END RSA PRIVATE KEY-----`. -Enter the hostname and/or other configuration information ... ## CHANGELOG | Version | Date | Pull Request | Subject | | :--- | :--- | :--- | :--- | +| 0.1.4 | 2022-10-14 | [17805](https://github.com/airbytehq/airbyte/pull/17805) | add SSH Tunneling | | 0.1.3 | 2022-05-30 | [14640](https://github.com/airbytehq/airbyte/pull/14640) | Include lifecycle management | | 0.1.2 | 2022-04-19 | [11752](https://github.com/airbytehq/airbyte/pull/11752) | Reduce batch size to 32Mb | | 0.1.1 | 2022-02-10 | [10256](https://github.com/airbytehq/airbyte/pull/1256) | Add ExitOnOutOfMemoryError connectors |