diff --git a/.github/workflows/publish-command.yml b/.github/workflows/publish-command.yml index 918ee6ec32dc..61fd5fe72434 100644 --- a/.github/workflows/publish-command.yml +++ b/.github/workflows/publish-command.yml @@ -120,6 +120,8 @@ jobs: MIXPANEL_INTEGRATION_TEST_CREDS: ${{ secrets.MIXPANEL_INTEGRATION_TEST_CREDS }} MSSQL_RDS_TEST_CREDS: ${{ secrets.MSSQL_RDS_TEST_CREDS }} PAYPAL_TRANSACTION_CREDS: ${{ secrets.SOURCE_PAYPAL_TRANSACTION_CREDS }} + POSTGRES_SSH_KEY_TEST_CREDS: ${{ secrets.POSTGRES_SSH_KEY_TEST_CREDS }} + POSTGRES_SSH_PWD_TEST_CREDS: ${{ secrets.POSTGRES_SSH_PWD_TEST_CREDS }} POSTHOG_TEST_CREDS: ${{ secrets.POSTHOG_TEST_CREDS }} PIPEDRIVE_INTEGRATION_TESTS_CREDS: ${{ secrets.PIPEDRIVE_INTEGRATION_TESTS_CREDS }} RECHARGE_INTEGRATION_TEST_CREDS: ${{ secrets.RECHARGE_INTEGRATION_TEST_CREDS }} diff --git a/.github/workflows/test-command.yml b/.github/workflows/test-command.yml index 7f88cd374997..c03e40b6c1ce 100644 --- a/.github/workflows/test-command.yml +++ b/.github/workflows/test-command.yml @@ -121,6 +121,8 @@ jobs: MIXPANEL_INTEGRATION_TEST_CREDS: ${{ secrets.MIXPANEL_INTEGRATION_TEST_CREDS }} MSSQL_RDS_TEST_CREDS: ${{ secrets.MSSQL_RDS_TEST_CREDS }} PAYPAL_TRANSACTION_CREDS: ${{ secrets.SOURCE_PAYPAL_TRANSACTION_CREDS }} + POSTGRES_SSH_KEY_TEST_CREDS: ${{ secrets.POSTGRES_SSH_KEY_TEST_CREDS }} + POSTGRES_SSH_PWD_TEST_CREDS: ${{ secrets.POSTGRES_SSH_PWD_TEST_CREDS }} POSTHOG_TEST_CREDS: ${{ secrets.POSTHOG_TEST_CREDS }} PIPEDRIVE_INTEGRATION_TESTS_CREDS: ${{ secrets.PIPEDRIVE_INTEGRATION_TESTS_CREDS }} RECHARGE_INTEGRATION_TEST_CREDS: ${{ secrets.RECHARGE_INTEGRATION_TEST_CREDS }} diff --git a/airbyte-commons/src/main/java/io/airbyte/commons/json/Jsons.java b/airbyte-commons/src/main/java/io/airbyte/commons/json/Jsons.java index 978c23950347..ae8eaa971069 100644 --- a/airbyte-commons/src/main/java/io/airbyte/commons/json/Jsons.java +++ b/airbyte-commons/src/main/java/io/airbyte/commons/json/Jsons.java @@ -31,16 +31,20 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectWriter; +import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.base.Charsets; +import com.google.common.base.Preconditions; import io.airbyte.commons.jackson.MoreMappers; import io.airbyte.commons.stream.MoreStreams; import java.io.IOException; +import java.util.Arrays; import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.function.BiConsumer; import java.util.stream.Collectors; public class Jsons { @@ -49,10 +53,10 @@ public class Jsons { private static final ObjectMapper OBJECT_MAPPER = MoreMappers.initMapper(); private static final ObjectWriter OBJECT_WRITER = OBJECT_MAPPER.writer(new JsonPrettyPrinter()); - public static String serialize(T object) { + public static String serialize(final T object) { try { return OBJECT_MAPPER.writeValueAsString(object); - } catch (JsonProcessingException e) { + } catch (final JsonProcessingException e) { throw new RuntimeException(e); } } @@ -60,7 +64,7 @@ public static String serialize(T object) { public static T deserialize(final String jsonString, final Class klass) { try { return OBJECT_MAPPER.readValue(jsonString, klass); - } catch (IOException e) { + } catch (final IOException e) { throw new RuntimeException(e); } } @@ -68,7 +72,7 @@ public static T deserialize(final String jsonString, final Class klass) { public static JsonNode deserialize(final String jsonString) { try { return OBJECT_MAPPER.readTree(jsonString); - } catch (IOException e) { + } catch (final IOException e) { throw new RuntimeException(e); } } @@ -76,7 +80,7 @@ public static JsonNode deserialize(final String jsonString) { public static Optional tryDeserialize(final String jsonString, final Class klass) { try { return Optional.of(OBJECT_MAPPER.readValue(jsonString, klass)); - } catch (IOException e) { + } catch (final IOException e) { return Optional.empty(); } } @@ -84,7 +88,7 @@ public static Optional tryDeserialize(final String jsonString, final Clas public static Optional tryDeserialize(final String jsonString) { try { return Optional.of(OBJECT_MAPPER.readTree(jsonString)); - } catch (IOException e) { + } catch (final IOException e) { return Optional.empty(); } } @@ -108,7 +112,7 @@ public static T object(final JsonNode jsonNode, final TypeReference typeR public static Optional tryObject(final JsonNode jsonNode, final Class klass) { try { return Optional.of(OBJECT_MAPPER.convertValue(jsonNode, klass)); - } catch (Exception e) { + } catch (final Exception e) { return Optional.empty(); } } @@ -116,7 +120,7 @@ public static Optional tryObject(final JsonNode jsonNode, final Class public static Optional tryObject(final JsonNode jsonNode, final TypeReference typeReference) { try { return Optional.of(OBJECT_MAPPER.convertValue(jsonNode, typeReference)); - } catch (Exception e) { + } catch (final Exception e) { return Optional.empty(); } } @@ -126,11 +130,11 @@ public static T clone(final T object) { return (T) deserialize(serialize(object), object.getClass()); } - public static byte[] toBytes(JsonNode jsonNode) { + public static byte[] toBytes(final JsonNode jsonNode) { return serialize(jsonNode).getBytes(Charsets.UTF_8); } - public static Set keys(JsonNode jsonNode) { + public static Set keys(final JsonNode jsonNode) { if (jsonNode.isObject()) { return Jsons.object(jsonNode, new TypeReference>() {}).keySet(); } else { @@ -138,18 +142,73 @@ public static Set keys(JsonNode jsonNode) { } } - public static List children(JsonNode jsonNode) { + public static List children(final JsonNode jsonNode) { return MoreStreams.toStream(jsonNode.elements()).collect(Collectors.toList()); } - public static String toPrettyString(JsonNode jsonNode) { + public static String toPrettyString(final JsonNode jsonNode) { try { return OBJECT_WRITER.writeValueAsString(jsonNode) + "\n"; - } catch (JsonProcessingException e) { + } catch (final JsonProcessingException e) { throw new RuntimeException(e); } } + public static JsonNode navigateTo(JsonNode node, final List keys) { + for (final String key : keys) { + node = node.get(key); + } + return node; + } + + public static void replaceNestedString(final JsonNode json, final List keys, final String replacement) { + replaceNested(json, keys, (node, finalKey) -> node.put(finalKey, replacement)); + } + + public static void replaceNestedInt(final JsonNode json, final List keys, final int replacement) { + replaceNested(json, keys, (node, finalKey) -> node.put(finalKey, replacement)); + } + + private static void replaceNested(final JsonNode json, final List keys, final BiConsumer typedReplacement) { + Preconditions.checkArgument(keys.size() > 0, "Must pass at least one key"); + final JsonNode nodeContainingFinalKey = navigateTo(json, keys.subList(0, keys.size() - 1)); + typedReplacement.accept((ObjectNode) nodeContainingFinalKey, keys.get(keys.size() - 1)); + } + + public static Optional getOptional(final JsonNode json, final String... keys) { + return getOptional(json, Arrays.asList(keys)); + } + + public static Optional getOptional(JsonNode json, final List keys) { + for (final String key : keys) { + if (json == null) { + return Optional.empty(); + } + + json = json.get(key); + } + + return Optional.ofNullable(json); + } + + public static String getStringOrNull(final JsonNode json, final String... keys) { + return getStringOrNull(json, Arrays.asList(keys)); + } + + public static String getStringOrNull(final JsonNode json, final List keys) { + final Optional optional = getOptional(json, keys); + return optional.map(JsonNode::asText).orElse(null); + } + + public static int getIntOrZero(final JsonNode json, final String... keys) { + return getIntOrZero(json, Arrays.asList(keys)); + } + + public static int getIntOrZero(final JsonNode json, final List keys) { + final Optional optional = getOptional(json, keys); + return optional.map(JsonNode::asInt).orElse(0); + } + /** * By the Jackson DefaultPrettyPrinter prints objects with an extra space as follows: {"name" : * "airbyte"}. We prefer {"name": "airbyte"}. @@ -165,7 +224,7 @@ public DefaultPrettyPrinter createInstance() { // override the method that inserts the extra space. @Override - public DefaultPrettyPrinter withSeparators(Separators separators) { + public DefaultPrettyPrinter withSeparators(final Separators separators) { _separators = separators; _objectFieldValueSeparatorWithSpaces = separators.getObjectFieldValueSeparator() + " "; return this; diff --git a/airbyte-commons/src/main/java/io/airbyte/commons/string/Strings.java b/airbyte-commons/src/main/java/io/airbyte/commons/string/Strings.java index 9c6067b9d351..4eee58aa459e 100644 --- a/airbyte-commons/src/main/java/io/airbyte/commons/string/Strings.java +++ b/airbyte-commons/src/main/java/io/airbyte/commons/string/Strings.java @@ -30,14 +30,18 @@ public class Strings { - public static String join(Iterable iterable, CharSequence separator) { + public static String join(final Iterable iterable, final CharSequence separator) { return Streams.stream(iterable) .map(Object::toString) .collect(Collectors.joining(separator)); } - public static String addRandomSuffix(String base, String separator, int suffixLength) { + public static String addRandomSuffix(final String base, final String separator, final int suffixLength) { return base + separator + RandomStringUtils.randomAlphabetic(suffixLength).toLowerCase(); } + public static String safeTrim(final String string) { + return string == null ? null : string.trim(); + } + } diff --git a/airbyte-commons/src/test/java/io/airbyte/commons/json/JsonsTest.java b/airbyte-commons/src/test/java/io/airbyte/commons/json/JsonsTest.java index 50e041d394f5..866bd164b048 100644 --- a/airbyte-commons/src/test/java/io/airbyte/commons/json/JsonsTest.java +++ b/airbyte-commons/src/test/java/io/airbyte/commons/json/JsonsTest.java @@ -27,6 +27,7 @@ import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotSame; +import static org.junit.jupiter.api.Assertions.assertNull; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.type.TypeReference; @@ -241,6 +242,32 @@ void testToPrettyString() { assertEquals(expectedOutput, Jsons.toPrettyString(jsonNode)); } + @Test + void testGetOptional() { + final JsonNode json = Jsons.deserialize("{ \"abc\": { \"def\": \"ghi\" }, \"jkl\": {}, \"mno\": \"pqr\", \"stu\": null }"); + + assertEquals(Optional.of(Jsons.jsonNode("ghi")), Jsons.getOptional(json, "abc", "def")); + assertEquals(Optional.of(Jsons.emptyObject()), Jsons.getOptional(json, "jkl")); + assertEquals(Optional.of(Jsons.jsonNode("pqr")), Jsons.getOptional(json, "mno")); + assertEquals(Optional.of(Jsons.jsonNode(null)), Jsons.getOptional(json, "stu")); + assertEquals(Optional.empty(), Jsons.getOptional(json, "xyz")); + assertEquals(Optional.empty(), Jsons.getOptional(json, "abc", "xyz")); + assertEquals(Optional.empty(), Jsons.getOptional(json, "abc", "def", "xyz")); + assertEquals(Optional.empty(), Jsons.getOptional(json, "abc", "jkl", "xyz")); + assertEquals(Optional.empty(), Jsons.getOptional(json, "stu", "xyz")); + } + + @Test + void testGetStringOrNull() { + final JsonNode json = Jsons.deserialize("{ \"abc\": { \"def\": \"ghi\" }, \"jkl\": \"mno\", \"pqr\": 1 }"); + + assertEquals("ghi", Jsons.getStringOrNull(json, "abc", "def")); + assertEquals("mno", Jsons.getStringOrNull(json, "jkl")); + assertEquals("1", Jsons.getStringOrNull(json, "pqr")); + assertNull(Jsons.getStringOrNull(json, "abc", "def", "xyz")); + assertNull(Jsons.getStringOrNull(json, "xyz")); + } + private static class ToClass { @JsonProperty("str") @@ -254,21 +281,21 @@ private static class ToClass { public ToClass() {} - public ToClass(String str, Integer num, long numLong) { + public ToClass(final String str, final Integer num, final long numLong) { this.str = str; this.num = num; this.numLong = numLong; } @Override - public boolean equals(Object o) { + public boolean equals(final Object o) { if (this == o) { return true; } if (o == null || getClass() != o.getClass()) { return false; } - ToClass toClass = (ToClass) o; + final ToClass toClass = (ToClass) o; return numLong == toClass.numLong && Objects.equals(str, toClass.str) && Objects.equals(num, toClass.num); diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index ed617e088f9f..207d81fa34c0 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -56,7 +56,7 @@ - sourceDefinitionId: decd338e-5647-4c0b-adf4-da0e75f5a750 name: Postgres dockerRepository: airbyte/source-postgres - dockerImageTag: 0.3.10 + dockerImageTag: 0.3.11 documentationUrl: https://docs.airbyte.io/integrations/sources/postgres icon: postgresql.svg - sourceDefinitionId: 9fa5862c-da7c-11eb-8d19-0242ac130003 diff --git a/airbyte-integrations/bases/base-java/build.gradle b/airbyte-integrations/bases/base-java/build.gradle index 34f8fca02d6d..447e14d0db0b 100644 --- a/airbyte-integrations/bases/base-java/build.gradle +++ b/airbyte-integrations/bases/base-java/build.gradle @@ -5,6 +5,12 @@ plugins { dependencies { implementation 'commons-cli:commons-cli:1.4' + implementation 'org.apache.sshd:sshd-mina:2.7.0' + // bouncycastle is pinned to version-match the transitive dependency from kubernetes client-java + // because a version conflict causes "parameter object not a ECParameterSpec" on ssh tunnel initiation + implementation 'org.bouncycastle:bcprov-jdk15on:1.66' + implementation 'org.bouncycastle:bcpkix-jdk15on:1.66' + implementation 'org.bouncycastle:bctls-jdk15on:1.66' implementation project(':airbyte-protocol:models') implementation project(":airbyte-json-validation") diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/ssh/SshHelpers.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/ssh/SshHelpers.java new file mode 100644 index 000000000000..bb762d094fd3 --- /dev/null +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/ssh/SshHelpers.java @@ -0,0 +1,47 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.base.ssh; + +import com.fasterxml.jackson.databind.node.ObjectNode; +import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.resources.MoreResources; +import io.airbyte.protocol.models.ConnectorSpecification; +import java.io.IOException; + +public class SshHelpers { + + public static ConnectorSpecification getSpecAndInjectSsh() throws IOException { + final ConnectorSpecification originalSpec = Jsons.deserialize(MoreResources.readResource("spec.json"), ConnectorSpecification.class); + return injectSshIntoSpec(originalSpec); + } + + public static ConnectorSpecification injectSshIntoSpec(final ConnectorSpecification connectorSpecification) throws IOException { + final ConnectorSpecification originalSpec = Jsons.clone(connectorSpecification); + final ObjectNode propNode = (ObjectNode) originalSpec.getConnectionSpecification().get("properties"); + propNode.set("tunnel_method", Jsons.deserialize(MoreResources.readResource("ssh-tunnel-spec.json"))); + return originalSpec; + } + +} diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/ssh/SshTunnel.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/ssh/SshTunnel.java new file mode 100644 index 000000000000..d104d0bf4760 --- /dev/null +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/ssh/SshTunnel.java @@ -0,0 +1,325 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.base.ssh; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.base.Preconditions; +import io.airbyte.commons.functional.CheckedConsumer; +import io.airbyte.commons.functional.CheckedFunction; +import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.string.Strings; +import java.io.IOException; +import java.io.StringReader; +import java.net.InetSocketAddress; +import java.security.KeyPair; +import java.util.List; +import org.apache.sshd.client.SshClient; +import org.apache.sshd.client.keyverifier.AcceptAllServerKeyVerifier; +import org.apache.sshd.client.session.ClientSession; +import org.apache.sshd.common.util.net.SshdSocketAddress; +import org.apache.sshd.server.forward.AcceptAllForwardingFilter; +import org.bouncycastle.asn1.x509.SubjectPublicKeyInfo; +import org.bouncycastle.openssl.PEMKeyPair; +import org.bouncycastle.openssl.PEMParser; +import org.bouncycastle.openssl.jcajce.JcaPEMKeyConverter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +// todo (cgardens) - this needs unit tests. it is currently tested transitively via source postgres +// integration tests. +/** + * Encapsulates the connection configuration for an ssh tunnel port forward through a proxy/bastion + * host plus the remote host and remote port to forward to a specified local port. + */ +public class SshTunnel implements AutoCloseable { + + private static final Logger LOGGER = LoggerFactory.getLogger(SshTunnel.class); + + public enum TunnelMethod { + NO_TUNNEL, + SSH_PASSWORD_AUTH, + SSH_KEY_AUTH + } + + public static final int TIMEOUT_MILLIS = 15000; // 15 seconds + + private final JsonNode config; + private final List hostKey; + private final List portKey; + + private final TunnelMethod tunnelMethod; + private final String tunnelHost; + private final int tunnelPort; + private final String tunnelUser; + private final String sshKey; + private final String tunnelUserPassword; + private final String remoteDatabaseHost; + private final int remoteDatabasePort; + private int tunnelDatabasePort; + + private SshClient sshclient; + private ClientSession tunnelSession; + + /** + * + * @param config - the full config that was passed to the source. + * @param hostKey - a list of keys that point to the database host name. should be pointing to where + * in the config remoteDatabaseHost is found. + * @param portKey - a list of keys that point to the database port. should be pointing to where in + * the config remoteDatabasePort is found. + * @param tunnelMethod - the type of ssh method that should be used (includes not using SSH at all). + * @param tunnelHost - host name of the machine to which we will establish an ssh connection (e.g. + * hostname of the bastion). + * @param tunnelPort - port of the machine to which we will establish an ssh connection. (e.g. port + * of the bastion). + * @param tunnelUser - user that is allowed to access the tunnelHost. + * @param sshKey - the ssh key that will be used to make the ssh connection. can be null if we are + * using tunnelUserPassword instead. + * @param tunnelUserPassword - the password for the tunnelUser. can be null if we are using sshKey + * instead. + * @param remoteDatabaseHost - the actual host name of the database (as it is known to the tunnel + * host). + * @param remoteDatabasePort - the actual port of the database (as it is known to the tunnel host). + */ + public SshTunnel(final JsonNode config, + final List hostKey, + final List portKey, + final TunnelMethod tunnelMethod, + final String tunnelHost, + final int tunnelPort, + final String tunnelUser, + final String sshKey, + final String tunnelUserPassword, + final String remoteDatabaseHost, + final int remoteDatabasePort) { + this.config = config; + this.hostKey = hostKey; + this.portKey = portKey; + + Preconditions.checkNotNull(tunnelMethod); + this.tunnelMethod = tunnelMethod; + + if (tunnelMethod.equals(TunnelMethod.NO_TUNNEL)) { + this.tunnelHost = null; + this.tunnelPort = 0; + this.tunnelUser = null; + this.sshKey = null; + this.tunnelUserPassword = null; + this.remoteDatabaseHost = null; + this.remoteDatabasePort = 0; + } else { + Preconditions.checkNotNull(tunnelHost); + Preconditions.checkArgument(tunnelPort > 0); + Preconditions.checkNotNull(tunnelUser); + if (tunnelMethod.equals(TunnelMethod.SSH_KEY_AUTH)) { + Preconditions.checkNotNull(sshKey); + } + if (tunnelMethod.equals(TunnelMethod.SSH_PASSWORD_AUTH)) { + Preconditions.checkNotNull(tunnelUserPassword); + } + Preconditions.checkNotNull(remoteDatabaseHost); + Preconditions.checkArgument(remoteDatabasePort > 0); + + this.tunnelHost = tunnelHost; + this.tunnelPort = tunnelPort; + this.tunnelUser = tunnelUser; + this.sshKey = sshKey; + this.tunnelUserPassword = tunnelUserPassword; + this.remoteDatabaseHost = remoteDatabaseHost; + this.remoteDatabasePort = remoteDatabasePort; + + this.sshclient = createClient(); + this.tunnelSession = openTunnel(sshclient); + } + } + + public JsonNode getOriginalConfig() { + return config; + } + + public JsonNode getConfigInTunnel() { + if (tunnelMethod.equals(TunnelMethod.NO_TUNNEL)) { + return getOriginalConfig(); + } else { + final JsonNode clone = Jsons.clone(config); + Jsons.replaceNestedString(clone, hostKey, SshdSocketAddress.LOCALHOST_ADDRESS.getHostName()); + Jsons.replaceNestedInt(clone, portKey, tunnelDatabasePort); + return clone; + } + } + + // /** + // * Finds a free port on the machine. As soon as this method returns, it is possible for process to + // bind to this port. Thus it only gives a guarantee that at the time + // */ + // private static int findFreePort() { + // // finds an available port. + // try (final var socket = new ServerSocket(0)) { + // return socket.getLocalPort(); + // } catch (final IOException e) { + // throw new RuntimeException(e); + // } + // } + + public static SshTunnel getInstance(final JsonNode config, final List hostKey, final List portKey) { + final TunnelMethod tunnelMethod = Jsons.getOptional(config, "tunnel_method", "tunnel_method") + .map(method -> TunnelMethod.valueOf(method.asText().trim())) + .orElse(TunnelMethod.NO_TUNNEL); + LOGGER.info("Starting connection with method: {}", tunnelMethod); + + // final int localPort = findFreePort(); + + return new SshTunnel( + config, + hostKey, + portKey, + tunnelMethod, + Strings.safeTrim(Jsons.getStringOrNull(config, "tunnel_method", "tunnel_host")), + Jsons.getIntOrZero(config, "tunnel_method", "tunnel_port"), + Strings.safeTrim(Jsons.getStringOrNull(config, "tunnel_method", "tunnel_user")), + Strings.safeTrim(Jsons.getStringOrNull(config, "tunnel_method", "ssh_key")), + Strings.safeTrim(Jsons.getStringOrNull(config, "tunnel_method", "tunnel_user_password")), + Strings.safeTrim(Jsons.getStringOrNull(config, hostKey)), + Jsons.getIntOrZero(config, portKey)); + } + + public static void sshWrap(final JsonNode config, + final List hostKey, + final List portKey, + final CheckedConsumer wrapped) + throws Exception { + sshWrap(config, hostKey, portKey, (configInTunnel) -> { + wrapped.accept(configInTunnel); + return null; + }); + } + + public static T sshWrap(final JsonNode config, + final List hostKey, + final List portKey, + final CheckedFunction wrapped) + throws Exception { + try (final SshTunnel sshTunnel = SshTunnel.getInstance(config, hostKey, portKey)) { + return wrapped.apply(sshTunnel.getConfigInTunnel()); + } + } + + /** + * Closes a tunnel if one was open, and otherwise doesn't do anything (safe to run). + */ + @Override + public void close() { + try { + if (tunnelSession != null) { + tunnelSession.close(); + tunnelSession = null; + } + if (sshclient != null) { + sshclient.stop(); + sshclient = null; + } + } catch (final Throwable t) { + throw new RuntimeException(t); + } + } + + /** + * From the RSA format private key string, use bouncycastle to deserialize the key pair, reconstruct + * the keys from the key info, and return the key pair for use in authentication. + */ + private KeyPair getPrivateKeyPair() throws IOException { + final PEMParser pemParser = new PEMParser(new StringReader(sshKey)); + final PEMKeyPair keypair = (PEMKeyPair) pemParser.readObject(); + final JcaPEMKeyConverter converter = new JcaPEMKeyConverter(); + return new KeyPair( + converter.getPublicKey(SubjectPublicKeyInfo.getInstance(keypair.getPublicKeyInfo())), + converter.getPrivateKey(keypair.getPrivateKeyInfo())); + } + + /** + * Generates a new ssh client and returns it, with forwarding set to accept all types; use this + * before opening a tunnel. + */ + private SshClient createClient() { + java.security.Security.addProvider( + new org.bouncycastle.jce.provider.BouncyCastleProvider()); + final SshClient client = SshClient.setUpDefaultClient(); + client.setForwardingFilter(AcceptAllForwardingFilter.INSTANCE); + client.setServerKeyVerifier(AcceptAllServerKeyVerifier.INSTANCE); + return client; + } + + /** + * Starts an ssh session; wrap this in a try-finally and use closeTunnel() to close it. + */ + private ClientSession openTunnel(final SshClient client) { + try { + client.start(); + final ClientSession session = client.connect( + tunnelUser.trim(), + tunnelHost.trim(), + tunnelPort) + .verify(TIMEOUT_MILLIS) + .getSession(); + if (tunnelMethod.equals(TunnelMethod.SSH_KEY_AUTH)) { + session.addPublicKeyIdentity(getPrivateKeyPair()); + } + if (tunnelMethod.equals(TunnelMethod.SSH_PASSWORD_AUTH)) { + session.addPasswordIdentity(tunnelUserPassword); + } + + session.auth().verify(TIMEOUT_MILLIS); + final SshdSocketAddress address = session.startLocalPortForwarding( + // entering 0 lets the OS pick a free port for us. + new SshdSocketAddress(InetSocketAddress.createUnresolved(SshdSocketAddress.LOCALHOST_ADDRESS.getHostName(), 0)), + new SshdSocketAddress(remoteDatabaseHost, remoteDatabasePort)); + + // discover the port that the OS picked and remember it so that we can use it when we try to connect + // later. + tunnelDatabasePort = address.getPort(); + + LOGGER.info("Established tunneling session. Port forwarding started on " + address.toInetSocketAddress()); + return session; + } catch (final IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public String toString() { + return "SshTunnel{" + + "hostKey=" + hostKey + + ", portKey=" + portKey + + ", tunnelMethod=" + tunnelMethod + + ", tunnelHost='" + tunnelHost + '\'' + + ", tunnelPort=" + tunnelPort + + ", tunnelUser='" + tunnelUser + '\'' + + ", remoteDatabaseHost='" + remoteDatabaseHost + '\'' + + ", remoteDatabasePort=" + remoteDatabasePort + + ", tunnelDatabasePort=" + tunnelDatabasePort + + '}'; + } + +} diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/ssh/SshWrappedSource.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/ssh/SshWrappedSource.java new file mode 100644 index 000000000000..7dc67deaec90 --- /dev/null +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/ssh/SshWrappedSource.java @@ -0,0 +1,72 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.base.ssh; + +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.commons.util.AutoCloseableIterator; +import io.airbyte.commons.util.AutoCloseableIterators; +import io.airbyte.integrations.base.Source; +import io.airbyte.protocol.models.AirbyteCatalog; +import io.airbyte.protocol.models.AirbyteConnectionStatus; +import io.airbyte.protocol.models.AirbyteMessage; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import io.airbyte.protocol.models.ConnectorSpecification; +import java.util.List; + +public class SshWrappedSource implements Source { + + private final Source delegate; + private final List hostKey; + private final List portKey; + + public SshWrappedSource(final Source delegate, final List hostKey, final List portKey) { + this.delegate = delegate; + this.hostKey = hostKey; + this.portKey = portKey; + } + + @Override + public ConnectorSpecification spec() throws Exception { + return SshHelpers.injectSshIntoSpec(delegate.spec()); + } + + @Override + public AirbyteConnectionStatus check(final JsonNode config) throws Exception { + return SshTunnel.sshWrap(config, hostKey, portKey, delegate::check); + } + + @Override + public AirbyteCatalog discover(final JsonNode config) throws Exception { + return SshTunnel.sshWrap(config, hostKey, portKey, delegate::discover); + } + + @Override + public AutoCloseableIterator read(final JsonNode config, final ConfiguredAirbyteCatalog catalog, final JsonNode state) + throws Exception { + final SshTunnel tunnel = SshTunnel.getInstance(config, hostKey, portKey); + return AutoCloseableIterators.appendOnClose(delegate.read(tunnel.getConfigInTunnel(), catalog, state), tunnel::close); + } + +} diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/ssh/readme.md b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/ssh/readme.md new file mode 100644 index 000000000000..4f99198029ec --- /dev/null +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/ssh/readme.md @@ -0,0 +1,44 @@ +# Developing an SSH Source + +## Goal +Easy development of any source that needs the ability to connect to a resource via SSH Tunnel. + +## Overview +Our SSH connector support is designed to be easy to plug into any existing connector. There are a few major pieces to consider: +1. Add SSH Configuration to the Spec - for SSH, we need to take in additional configuration, so we need to inject extra fields into the connector configuration. +2. Add SSH Logic to the Connector - before the connector code begins to execute we need to start an SSH tunnel. This library provides logic to create that tunnel (and clean it up). +3. Acceptance Testing - it is a good practice to include acceptance testing for the SSH version of a connector for at least one of the SSH types (password or ssh key). While unit testing for the SSH functionality exists in this package (coming soon), high-level acceptance testing to make sure this feature works with the individual connector belongs in the connector. + +## How To + +### Add SSH Configuration to the Spec +1. The `SshHelpers` class provides 2 helper functions that injects the SSH configuration objects into a spec JsonSchema for an existing connector. Usually the `spec()` method for a connector looks like `Jsons.deserialize(MoreResources.readResource("spec.json"), ConnectorSpecification.class);`. These helpers are just injecting the ssh spec (`ssh-tunnel-spec.json`) into that spec. +2. You may need to update tests to reflect that new fields have been added to the spec. Usually updating the tests just requires using these helpers in the tests. + +### Add SSH Logic to the Connector +1. This package provides a Source decorated class to make it easy to add SSH logic to an existing source. Simply pass the source you want to wrap into the constructor of the `SshWrappedSource`. That class also requires two other fields: `hostKey` and `portKey`. Both of these fields are pointers to fields in the connector specification. The `hostKey` is a pointer to the field that hold the host of the resource you want to connect and `portKey` is the port. In a simple case, where the host name for a connector is just defined in the top-level `host` field, then `hostKey` would simply be: `["host"]`. If that field is nested, however, then it might be: `["database", "configuration", "host"]`. + +### Acceptance Testing +1. The only difference between existing acceptance testing and acceptance testing with SSH is that the configuration that is used for testing needs to contain additional fields. You can see the `Postgres Source ssh key creds` in lastpass to see an example of what that might look like. Those credentials leverage an existing bastion host in our test infrastructure. (As future work, we want to get rid of the need to use a static bastion server and instead do it in docker so we can run it all locally.) + +## Misc + +### How to wrap the protocol in an SSH Tunnel +For `spec()`, `check()`, and `discover()` wrapping the connector in an SSH tunnel is easier to think about because when they return all work is done and the tunnel can be closed. Thus, each of these methods can simply be wrapped in a try-with-resource of the SSH Tunnel. + +For `read()` and `write()` they return an iterator and consumer respectively that perform work that must happen within the SSH Tunnel after the method has returned. Therefore, the `close` function on the iterator and consumer have to handle closing the SSH tunnel; the methods themselves cannot just be wrapped in a try-with-resource. This is handled for you by the `SshWrappedSource`, but if you need to implement any of this manually you must take it into account. + +### Name Mangling +One of the least intuitive pieces of the SSH setup to follow is the replacement of host names and ports. The reason `SshWrappedSource` needs to know how to get the hostname and port of the database you are trying to connect to is that when it builds the SSH tunnel that forwards to the database, it needs to know the hostname and port so that the tunnel forwards requests to the right place. After the SSH tunnel is established and forwarding to the database, the connector code itself runs. + +There's a trick here though! The connector should NOT try to connect to the hostname and port of the database. Instead, it should be trying to connect to `localhost` and whatever port we are forwarding to the database. The `SshTunnel#sshWrap` removes the original host and port from the configuration for the connector and replaces it with `localhost` and the correct port. So from the connector code's point of view it is just operating on localhost. + +There is a tradeoff here. +* (Good) The way we have structured this allows users to configure a connector in the UI in a way that it is intuitive to user. They put in the host and port they think about referring to the database as (they don't need to worry about any of the localhost version). +* (Good) The connector code does not need to know anything about SSH, it can just operate on the host and port it gets (and we let SSH Tunnel handle swapping the names for us) which makes writing a connector easier. +* (Bad) The downside is that the `SshTunnel` logic is more complicated because it is absorbing all of this name swapping so that neither user nor connector developer need to worry about it. In our estimation, the good outweighs the extra complexity incurred here. + +## Future Work +* Add unit / integration testing for `ssh` package. +* Restructure spec so that instead of having `SSH Key Authentication` or `Password Authentication` options for `tunnel_method`, just have an `SSH` option and then within that `SSH` option have a `oneOf` for password or key. This is blocked because we cannot use `oneOf`s nested in `oneOf`s. +* Improve the process of acceptance testing by allowing doing acceptance testing using a bastion running in a docker container instead of having to use dedicated infrastructure and a static database. diff --git a/airbyte-integrations/bases/base-java/src/main/resources/ssh-tunnel-spec.json b/airbyte-integrations/bases/base-java/src/main/resources/ssh-tunnel-spec.json new file mode 100644 index 000000000000..4eefff75970c --- /dev/null +++ b/airbyte-integrations/bases/base-java/src/main/resources/ssh-tunnel-spec.json @@ -0,0 +1,114 @@ +{ + "type": "object", + "title": "SSH Tunnel Method", + "description": "Whether to initiate an SSH tunnel before connecting to the database, and if so, which kind of authentication to use.", + "oneOf": [ + { + "title": "No Tunnel", + "required": ["tunnel_method"], + "properties": { + "tunnel_method": { + "description": "No ssh tunnel needed to connect to database", + "type": "string", + "const": "NO_TUNNEL", + "order": 0 + } + } + }, + { + "title": "SSH Key Authentication", + "required": [ + "tunnel_method", + "tunnel_host", + "tunnel_port", + "tunnel_user", + "ssh_key" + ], + "properties": { + "tunnel_method": { + "description": "Connect through a jump server tunnel host using username and ssh key", + "type": "string", + "const": "SSH_KEY_AUTH", + "order": 0 + }, + "tunnel_host": { + "title": "SSH Tunnel Jump Server Host", + "description": "Hostname of the jump server host that allows inbound ssh tunnel.", + "type": "string", + "order": 1 + }, + "tunnel_port": { + "title": "SSH Connection Port", + "description": "Port on the proxy/jump server that accepts inbound ssh connections.", + "type": "integer", + "minimum": 0, + "maximum": 65536, + "default": 22, + "examples": ["22"], + "order": 2 + }, + "tunnel_user": { + "title": "SSH Login Username", + "description": "OS-level username for logging into the jump server host.", + "type": "string", + "order": 3 + }, + "ssh_key": { + "title": "SSH Private Key", + "description": "OS-level user account ssh key credentials for logging into the jump server host.", + "type": "string", + "airbyte_secret": true, + "multiline": true, + "order": 4 + } + } + }, + { + "title": "Password Authentication", + "required": [ + "tunnel_method", + "tunnel_host", + "tunnel_port", + "tunnel_user", + "tunnel_user_password" + ], + "properties": { + "tunnel_method": { + "description": "Connect through a jump server tunnel host using username and password authentication", + "type": "string", + "const": "SSH_PASSWORD_AUTH", + "order": 0 + }, + "tunnel_host": { + "title": "SSH Tunnel Jump Server Host", + "description": "Hostname of the jump server host that allows inbound ssh tunnel.", + "type": "string", + "order": 1 + }, + "tunnel_port": { + "title": "SSH Connection Port", + "description": "Port on the proxy/jump server that accepts inbound ssh connections.", + "type": "integer", + "minimum": 0, + "maximum": 65536, + "default": 22, + "examples": ["22"], + "order": 2 + }, + "tunnel_user": { + "title": "SSH Login Username", + "description": "OS-level username for logging into the jump server host", + "type": "string", + "order": 3 + }, + "tunnel_user_password": { + "title": "Password", + "description": "OS-level password for logging into the jump server host", + "type": "string", + "airbyte_secret": true, + "order": 4 + } + } + } + ] +} diff --git a/airbyte-integrations/connectors/source-postgres/Dockerfile b/airbyte-integrations/connectors/source-postgres/Dockerfile index 77b233ac6361..e671b877ce94 100644 --- a/airbyte-integrations/connectors/source-postgres/Dockerfile +++ b/airbyte-integrations/connectors/source-postgres/Dockerfile @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar RUN tar xf ${APPLICATION}.tar --strip-components=1 -LABEL io.airbyte.version=0.3.10 +LABEL io.airbyte.version=0.3.11 LABEL io.airbyte.name=airbyte/source-postgres diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java index 7c78e1521a0a..3830cb26e6ae 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java @@ -40,6 +40,7 @@ import io.airbyte.db.jdbc.PostgresJdbcStreamingQueryConfiguration; import io.airbyte.integrations.base.IntegrationRunner; import io.airbyte.integrations.base.Source; +import io.airbyte.integrations.base.ssh.SshWrappedSource; import io.airbyte.integrations.debezium.AirbyteDebeziumHandler; import io.airbyte.integrations.source.jdbc.AbstractJdbcSource; import io.airbyte.integrations.source.relationaldb.StateManager; @@ -73,9 +74,9 @@ public PostgresSource() { } @Override - public JsonNode toDatabaseConfig(JsonNode config) { + public JsonNode toDatabaseConfig(final JsonNode config) { - List additionalParameters = new ArrayList<>(); + final List additionalParameters = new ArrayList<>(); final StringBuilder jdbcUrl = new StringBuilder(String.format("jdbc:postgresql://%s:%s/%s?", config.get("host").asText(), @@ -106,8 +107,8 @@ public Set getExcludedInternalNameSpaces() { } @Override - public AirbyteCatalog discover(JsonNode config) throws Exception { - AirbyteCatalog catalog = super.discover(config); + public AirbyteCatalog discover(final JsonNode config) throws Exception { + final AirbyteCatalog catalog = super.discover(config); if (isCdc(config)) { final List streams = catalog.getStreams().stream() @@ -123,14 +124,14 @@ public AirbyteCatalog discover(JsonNode config) throws Exception { } @Override - public List> getCheckOperations(JsonNode config) throws Exception { + public List> getCheckOperations(final JsonNode config) throws Exception { final List> checkOperations = new ArrayList<>(super.getCheckOperations(config)); if (isCdc(config)) { checkOperations.add(database -> { - List matchingSlots = database.query(connection -> { + final List matchingSlots = database.query(connection -> { final String sql = "SELECT * FROM pg_replication_slots WHERE slot_name = ? AND plugin = ? AND database = ?"; - PreparedStatement ps = connection.prepareStatement(sql); + final PreparedStatement ps = connection.prepareStatement(sql); ps.setString(1, config.get("replication_method").get("replication_slot").asText()); ps.setString(2, PostgresUtils.getPluginValue(config.get("replication_method"))); ps.setString(3, config.get("database").asText()); @@ -148,8 +149,8 @@ public List> getCheckOperations(JsonNod }); checkOperations.add(database -> { - List matchingPublications = database.query(connection -> { - PreparedStatement ps = connection.prepareStatement("SELECT * FROM pg_publication WHERE pubname = ?"); + final List matchingPublications = database.query(connection -> { + final PreparedStatement ps = connection.prepareStatement("SELECT * FROM pg_publication WHERE pubname = ?"); ps.setString(1, config.get("replication_method").get("publication").asText()); LOGGER.info("Attempting to find the publication using the query: " + ps.toString()); @@ -169,7 +170,8 @@ public List> getCheckOperations(JsonNod } @Override - public AutoCloseableIterator read(JsonNode config, ConfiguredAirbyteCatalog catalog, JsonNode state) throws Exception { + public AutoCloseableIterator read(final JsonNode config, final ConfiguredAirbyteCatalog catalog, final JsonNode state) + throws Exception { // this check is used to ensure that have the pgoutput slot available so Debezium won't attempt to // create it. final AirbyteConnectionStatus check = check(config); @@ -182,11 +184,11 @@ public AutoCloseableIterator read(JsonNode config, ConfiguredAir } @Override - public List> getIncrementalIterators(JdbcDatabase database, - ConfiguredAirbyteCatalog catalog, - Map>> tableNameToTable, - StateManager stateManager, - Instant emittedAt) { + public List> getIncrementalIterators(final JdbcDatabase database, + final ConfiguredAirbyteCatalog catalog, + final Map>> tableNameToTable, + final StateManager stateManager, + final Instant emittedAt) { /** * If a customer sets up a postgres source with cdc parameters (replication_slot and publication) * but selects all the tables in FULL_REFRESH mode then we would still end up going through this @@ -195,7 +197,7 @@ public List> getIncrementalIterators(JdbcD * have a check here as well to make sure that if no table is in INCREMENTAL mode then skip this * part */ - JsonNode sourceConfig = database.getSourceConfig(); + final JsonNode sourceConfig = database.getSourceConfig(); if (isCdc(sourceConfig)) { final AirbyteDebeziumHandler handler = new AirbyteDebeziumHandler(sourceConfig, PostgresCdcTargetPosition.targetPosition(database), PostgresCdcProperties.getDebeziumProperties(sourceConfig), catalog, false); @@ -208,7 +210,7 @@ public List> getIncrementalIterators(JdbcD } @VisibleForTesting - static boolean isCdc(JsonNode config) { + static boolean isCdc(final JsonNode config) { final boolean isCdc = config.hasNonNull("replication_method") && config.get("replication_method").hasNonNull("replication_slot") && config.get("replication_method").hasNonNull("publication"); @@ -224,7 +226,7 @@ static boolean isCdc(JsonNode config) { * * Note: in place mutation. */ - private static AirbyteStream removeIncrementalWithoutPk(AirbyteStream stream) { + private static AirbyteStream removeIncrementalWithoutPk(final AirbyteStream stream) { if (stream.getSourceDefinedPrimaryKey().isEmpty()) { stream.getSupportedSyncModes().remove(SyncMode.INCREMENTAL); } @@ -238,7 +240,7 @@ private static AirbyteStream removeIncrementalWithoutPk(AirbyteStream stream) { * * Note: in place mutation. */ - private static AirbyteStream setIncrementalToSourceDefined(AirbyteStream stream) { + private static AirbyteStream setIncrementalToSourceDefined(final AirbyteStream stream) { if (stream.getSupportedSyncModes().contains(SyncMode.INCREMENTAL)) { stream.setSourceDefinedCursor(true); } @@ -247,9 +249,9 @@ private static AirbyteStream setIncrementalToSourceDefined(AirbyteStream stream) } // Note: in place mutation. - private static AirbyteStream addCdcMetadataColumns(AirbyteStream stream) { - ObjectNode jsonSchema = (ObjectNode) stream.getJsonSchema(); - ObjectNode properties = (ObjectNode) jsonSchema.get("properties"); + private static AirbyteStream addCdcMetadataColumns(final AirbyteStream stream) { + final ObjectNode jsonSchema = (ObjectNode) stream.getJsonSchema(); + final ObjectNode properties = (ObjectNode) jsonSchema.get("properties"); final JsonNode stringType = Jsons.jsonNode(ImmutableMap.of("type", "string")); final JsonNode numberType = Jsons.jsonNode(ImmutableMap.of("type", "number")); @@ -260,8 +262,8 @@ private static AirbyteStream addCdcMetadataColumns(AirbyteStream stream) { return stream; } - public static void main(String[] args) throws Exception { - final Source source = new PostgresSource(); + public static void main(final String[] args) throws Exception { + final Source source = new SshWrappedSource(new PostgresSource(), List.of("host"), List.of("port")); LOGGER.info("starting source: {}", PostgresSource.class); new IntegrationRunner(source).run(args); LOGGER.info("completed source: {}", PostgresSource.class); diff --git a/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/AbstractSshPostgresSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/AbstractSshPostgresSourceAcceptanceTest.java new file mode 100644 index 000000000000..1b5148791ed4 --- /dev/null +++ b/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/AbstractSshPostgresSourceAcceptanceTest.java @@ -0,0 +1,116 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.io.airbyte.integration_tests.sources; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.collect.Lists; +import io.airbyte.commons.io.IOs; +import io.airbyte.commons.json.Jsons; +import io.airbyte.integrations.base.ssh.SshHelpers; +import io.airbyte.integrations.standardtest.source.SourceAcceptanceTest; +import io.airbyte.integrations.standardtest.source.TestDestinationEnv; +import io.airbyte.protocol.models.CatalogHelpers; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import io.airbyte.protocol.models.ConfiguredAirbyteStream; +import io.airbyte.protocol.models.ConnectorSpecification; +import io.airbyte.protocol.models.DestinationSyncMode; +import io.airbyte.protocol.models.Field; +import io.airbyte.protocol.models.JsonSchemaPrimitive; +import io.airbyte.protocol.models.SyncMode; +import java.nio.file.Path; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; + +public abstract class AbstractSshPostgresSourceAcceptanceTest extends SourceAcceptanceTest { + + private static final String STREAM_NAME = "public.id_and_name"; + private static final String STREAM_NAME2 = "public.starships"; + + private JsonNode config; + + public abstract Path getConfigFilePath(); + + // todo (cgardens) - dynamically create data by generating a database with a random name instead of + // requiring data to already be in place. + @Override + protected void setupEnvironment(final TestDestinationEnv environment) throws Exception { + config = Jsons.deserialize(IOs.readFile(getConfigFilePath())); + } + + @Override + protected void tearDown(final TestDestinationEnv testEnv) { + + } + + @Override + protected String getImageName() { + return "airbyte/source-postgres:dev"; + } + + @Override + protected ConnectorSpecification getSpec() throws Exception { + return SshHelpers.getSpecAndInjectSsh(); + } + + @Override + protected JsonNode getConfig() { + return config; + } + + @Override + protected ConfiguredAirbyteCatalog getConfiguredCatalog() { + return new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList( + new ConfiguredAirbyteStream() + .withSyncMode(SyncMode.INCREMENTAL) + .withCursorField(Lists.newArrayList("id")) + .withDestinationSyncMode(DestinationSyncMode.APPEND) + .withStream(CatalogHelpers.createAirbyteStream( + STREAM_NAME, + Field.of("id", JsonSchemaPrimitive.NUMBER), + Field.of("name", JsonSchemaPrimitive.STRING)) + .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))), + new ConfiguredAirbyteStream() + .withSyncMode(SyncMode.INCREMENTAL) + .withCursorField(Lists.newArrayList("id")) + .withDestinationSyncMode(DestinationSyncMode.APPEND) + .withStream(CatalogHelpers.createAirbyteStream( + STREAM_NAME2, + Field.of("id", JsonSchemaPrimitive.NUMBER), + Field.of("name", JsonSchemaPrimitive.STRING)) + .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))))); + } + + @Override + protected List getRegexTests() { + return Collections.emptyList(); + } + + @Override + protected JsonNode getState() { + return Jsons.jsonNode(new HashMap<>()); + } + +} diff --git a/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CdcPostgresSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CdcPostgresSourceAcceptanceTest.java index ee60d297be26..2580221d1f4d 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CdcPostgresSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CdcPostgresSourceAcceptanceTest.java @@ -28,9 +28,9 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import io.airbyte.commons.json.Jsons; -import io.airbyte.commons.resources.MoreResources; import io.airbyte.db.Database; import io.airbyte.db.Databases; +import io.airbyte.integrations.base.ssh.SshHelpers; import io.airbyte.integrations.standardtest.source.SourceAcceptanceTest; import io.airbyte.integrations.standardtest.source.TestDestinationEnv; import io.airbyte.protocol.models.CatalogHelpers; @@ -67,7 +67,7 @@ public class CdcPostgresSourceAcceptanceTest extends SourceAcceptanceTest { private JsonNode config; @Override - protected void setupEnvironment(TestDestinationEnv environment) throws Exception { + protected void setupEnvironment(final TestDestinationEnv environment) throws Exception { container = new PostgreSQLContainer<>("postgres:13-alpine") .withCopyFileToContainer(MountableFile.forClasspathResource("postgresql.conf"), "/etc/postgresql/postgresql.conf") .withCommand("postgres -c config_file=/etc/postgresql/postgresql.conf"); @@ -119,7 +119,7 @@ protected void setupEnvironment(TestDestinationEnv environment) throws Exception } @Override - protected void tearDown(TestDestinationEnv testEnv) { + protected void tearDown(final TestDestinationEnv testEnv) { container.close(); } @@ -130,7 +130,7 @@ protected String getImageName() { @Override protected ConnectorSpecification getSpec() throws Exception { - return Jsons.deserialize(MoreResources.readResource("spec.json"), ConnectorSpecification.class); + return SshHelpers.getSpecAndInjectSsh(); } @Override diff --git a/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/PostgresSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/PostgresSourceAcceptanceTest.java index 5378debd52e6..aed4ee31d3f8 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/PostgresSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/PostgresSourceAcceptanceTest.java @@ -28,9 +28,9 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import io.airbyte.commons.json.Jsons; -import io.airbyte.commons.resources.MoreResources; import io.airbyte.db.Database; import io.airbyte.db.Databases; +import io.airbyte.integrations.base.ssh.SshHelpers; import io.airbyte.integrations.standardtest.source.SourceAcceptanceTest; import io.airbyte.integrations.standardtest.source.TestDestinationEnv; import io.airbyte.protocol.models.CatalogHelpers; @@ -56,7 +56,7 @@ public class PostgresSourceAcceptanceTest extends SourceAcceptanceTest { private JsonNode config; @Override - protected void setupEnvironment(TestDestinationEnv environment) throws Exception { + protected void setupEnvironment(final TestDestinationEnv environment) throws Exception { container = new PostgreSQLContainer<>("postgres:13-alpine"); container.start(); final JsonNode replicationMethod = Jsons.jsonNode(ImmutableMap.builder() @@ -94,7 +94,7 @@ protected void setupEnvironment(TestDestinationEnv environment) throws Exception } @Override - protected void tearDown(TestDestinationEnv testEnv) { + protected void tearDown(final TestDestinationEnv testEnv) { container.close(); } @@ -105,7 +105,7 @@ protected String getImageName() { @Override protected ConnectorSpecification getSpec() throws Exception { - return Jsons.deserialize(MoreResources.readResource("spec.json"), ConnectorSpecification.class); + return SshHelpers.getSpecAndInjectSsh(); } @Override diff --git a/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/SshKeyPostgresSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/SshKeyPostgresSourceAcceptanceTest.java new file mode 100644 index 000000000000..930f86da7c97 --- /dev/null +++ b/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/SshKeyPostgresSourceAcceptanceTest.java @@ -0,0 +1,36 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.io.airbyte.integration_tests.sources; + +import java.nio.file.Path; + +public class SshKeyPostgresSourceAcceptanceTest extends AbstractSshPostgresSourceAcceptanceTest { + + @Override + public Path getConfigFilePath() { + return Path.of("secrets/ssh-key-config.json"); + } + +} diff --git a/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/SshPasswordPostgresSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/SshPasswordPostgresSourceAcceptanceTest.java new file mode 100644 index 000000000000..e92744e6df1d --- /dev/null +++ b/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/SshPasswordPostgresSourceAcceptanceTest.java @@ -0,0 +1,36 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.io.airbyte.integration_tests.sources; + +import java.nio.file.Path; + +public class SshPasswordPostgresSourceAcceptanceTest extends AbstractSshPostgresSourceAcceptanceTest { + + @Override + public Path getConfigFilePath() { + return Path.of("secrets/ssh-pwd-config.json"); + } + +} diff --git a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceTest.java b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceTest.java index 561443b2a346..26452fd14b24 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceTest.java @@ -82,7 +82,7 @@ void tearDown() throws Exception { @BeforeEach protected void setup() throws SQLException { - DockerImageName myImage = DockerImageName.parse("debezium/postgres:13-alpine").asCompatibleSubstituteFor("postgres"); + final DockerImageName myImage = DockerImageName.parse("debezium/postgres:13-alpine").asCompatibleSubstituteFor("postgres"); container = new PostgreSQLContainer<>(myImage) .withCopyFileToContainer(MountableFile.forClasspathResource("postgresql.conf"), "/etc/postgresql/postgresql.conf") .withCommand("postgres -c config_file=/etc/postgresql/postgresql.conf"); @@ -107,7 +107,7 @@ protected void setup() throws SQLException { super.setup(); } - private JsonNode getConfig(String dbName) { + private JsonNode getConfig(final String dbName) { final JsonNode replicationMethod = Jsons.jsonNode(ImmutableMap.builder() .put("replication_slot", SLOT_NAME_BASE + "_" + dbName) .put("publication", PUBLICATION) @@ -125,7 +125,7 @@ private JsonNode getConfig(String dbName) { .build()); } - private Database getDatabaseFromConfig(JsonNode config) { + private Database getDatabaseFromConfig(final JsonNode config) { return Databases.createDatabase( config.get("username").asText(), config.get("password").asText(), @@ -138,14 +138,14 @@ private Database getDatabaseFromConfig(JsonNode config) { } @Test - void testCheckWithoutPublication() throws SQLException { + void testCheckWithoutPublication() throws Exception { database.query(ctx -> ctx.execute("DROP PUBLICATION " + PUBLICATION + ";")); final AirbyteConnectionStatus status = source.check(config); assertEquals(status.getStatus(), AirbyteConnectionStatus.Status.FAILED); } @Test - void testCheckWithoutReplicationSlot() throws SQLException { + void testCheckWithoutReplicationSlot() throws Exception { final String fullReplicationSlot = SLOT_NAME_BASE + "_" + dbName; database.query(ctx -> ctx.execute("SELECT pg_drop_replication_slot('" + fullReplicationSlot + "');")); @@ -173,14 +173,14 @@ void testReadWithoutReplicationSlot() throws SQLException { } @Override - protected void assertExpectedStateMessages(List stateMessages) { + protected void assertExpectedStateMessages(final List stateMessages) { assertEquals(1, stateMessages.size()); assertNotNull(stateMessages.get(0).getData()); } @Override protected CdcTargetPosition cdcLatestTargetPosition() { - JdbcDatabase database = Databases.createJdbcDatabase( + final JdbcDatabase database = Databases.createJdbcDatabase( config.get("username").asText(), config.get("password").asText(), String.format("jdbc:postgresql://%s:%s/%s", @@ -192,19 +192,19 @@ protected CdcTargetPosition cdcLatestTargetPosition() { } @Override - protected CdcTargetPosition extractPosition(JsonNode record) { + protected CdcTargetPosition extractPosition(final JsonNode record) { return new PostgresCdcTargetPosition(PgLsn.fromLong(record.get(CDC_LSN).asLong())); } @Override - protected void assertNullCdcMetaData(JsonNode data) { + protected void assertNullCdcMetaData(final JsonNode data) { assertNull(data.get(CDC_LSN)); assertNull(data.get(CDC_UPDATED_AT)); assertNull(data.get(CDC_DELETED_AT)); } @Override - protected void assertCdcMetaData(JsonNode data, boolean deletedAtNull) { + protected void assertCdcMetaData(final JsonNode data, final boolean deletedAtNull) { assertNotNull(data.get(CDC_LSN)); assertNotNull(data.get(CDC_UPDATED_AT)); if (deletedAtNull) { @@ -215,16 +215,16 @@ protected void assertCdcMetaData(JsonNode data, boolean deletedAtNull) { } @Override - protected void removeCDCColumns(ObjectNode data) { + protected void removeCDCColumns(final ObjectNode data) { data.remove(CDC_LSN); data.remove(CDC_UPDATED_AT); data.remove(CDC_DELETED_AT); } @Override - protected void addCdcMetadataColumns(AirbyteStream stream) { - ObjectNode jsonSchema = (ObjectNode) stream.getJsonSchema(); - ObjectNode properties = (ObjectNode) jsonSchema.get("properties"); + protected void addCdcMetadataColumns(final AirbyteStream stream) { + final ObjectNode jsonSchema = (ObjectNode) stream.getJsonSchema(); + final ObjectNode properties = (ObjectNode) jsonSchema.get("properties"); final JsonNode stringType = Jsons.jsonNode(ImmutableMap.of("type", "string")); final JsonNode numberType = Jsons.jsonNode(ImmutableMap.of("type", "number")); @@ -250,7 +250,7 @@ protected Database getDatabase() { } @Override - public String createSchemaQuery(String schemaName) { + public String createSchemaQuery(final String schemaName) { return "CREATE SCHEMA " + schemaName + ";"; } diff --git a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresJdbcSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresJdbcSourceAcceptanceTest.java index e38104b73c3a..13ca5059985a 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresJdbcSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresJdbcSourceAcceptanceTest.java @@ -24,17 +24,22 @@ package io.airbyte.integrations.source.postgres; +import static org.junit.jupiter.api.Assertions.assertEquals; + import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.ImmutableMap; import io.airbyte.commons.io.IOs; import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.resources.MoreResources; import io.airbyte.commons.string.Strings; import io.airbyte.integrations.source.jdbc.AbstractJdbcSource; import io.airbyte.integrations.source.jdbc.test.JdbcSourceAcceptanceTest; +import io.airbyte.protocol.models.ConnectorSpecification; import io.airbyte.test.utils.PostgreSQLContainerHelper; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import org.testcontainers.containers.PostgreSQLContainer; import org.testcontainers.utility.MountableFile; @@ -95,4 +100,12 @@ static void cleanUp() { PSQL_DB.close(); } + @Test + void testSpec() throws Exception { + final ConnectorSpecification actual = source.spec(); + final ConnectorSpecification expected = Jsons.deserialize(MoreResources.readResource("spec.json"), ConnectorSpecification.class); + + assertEquals(expected, actual); + } + } diff --git a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractRelationalDbSource.java b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractRelationalDbSource.java index 32503c6f0fd4..a57ac18b8981 100644 --- a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractRelationalDbSource.java +++ b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractRelationalDbSource.java @@ -161,14 +161,14 @@ protected abstract Map> discoverPrimaryKeys(Database databa protected abstract String getQuoteString(); @Override - public AirbyteConnectionStatus check(JsonNode config) { + public AirbyteConnectionStatus check(final JsonNode config) throws Exception { try (final Database database = createDatabaseInternal(config)) { - for (CheckedConsumer checkOperation : getCheckOperations(config)) { + for (final CheckedConsumer checkOperation : getCheckOperations(config)) { checkOperation.accept(database); } return new AirbyteConnectionStatus().withStatus(Status.SUCCEEDED); - } catch (Exception e) { + } catch (final Exception e) { LOGGER.info("Exception while checking connection: ", e); return new AirbyteConnectionStatus() .withStatus(Status.FAILED) @@ -177,9 +177,9 @@ public AirbyteConnectionStatus check(JsonNode config) { } @Override - public AirbyteCatalog discover(JsonNode config) throws Exception { + public AirbyteCatalog discover(final JsonNode config) throws Exception { try (final Database database = createDatabaseInternal(config)) { - List streams = getTables(database).stream() + final List streams = getTables(database).stream() .map(tableInfo -> CatalogHelpers .createAirbyteStream(tableInfo.getName(), tableInfo.getNameSpace(), tableInfo.getFields()) .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)) @@ -190,7 +190,8 @@ public AirbyteCatalog discover(JsonNode config) throws Exception { } @Override - public AutoCloseableIterator read(JsonNode config, ConfiguredAirbyteCatalog catalog, JsonNode state) throws Exception { + public AutoCloseableIterator read(final JsonNode config, final ConfiguredAirbyteCatalog catalog, final JsonNode state) + throws Exception { final StateManager stateManager = new StateManager( state == null ? StateManager.emptyState() : Jsons.object(state, DbState.class), catalog); @@ -221,11 +222,11 @@ public AutoCloseableIterator read(JsonNode config, ConfiguredAir }); } - public List> getIncrementalIterators(Database database, - ConfiguredAirbyteCatalog catalog, - Map>> tableNameToTable, - StateManager stateManager, - Instant emittedAt) { + public List> getIncrementalIterators(final Database database, + final ConfiguredAirbyteCatalog catalog, + final Map>> tableNameToTable, + final StateManager stateManager, + final Instant emittedAt) { return getSelectedIterators( database, catalog, @@ -235,11 +236,11 @@ public List> getIncrementalIterators(Datab configuredStream -> configuredStream.getSyncMode().equals(SyncMode.INCREMENTAL)); } - public List> getFullRefreshIterators(Database database, - ConfiguredAirbyteCatalog catalog, - Map>> tableNameToTable, - StateManager stateManager, - Instant emittedAt) { + public List> getFullRefreshIterators(final Database database, + final ConfiguredAirbyteCatalog catalog, + final Map>> tableNameToTable, + final StateManager stateManager, + final Instant emittedAt) { return getSelectedIterators( database, catalog, @@ -249,12 +250,12 @@ public List> getFullRefreshIterators(Datab configuredStream -> configuredStream.getSyncMode().equals(SyncMode.FULL_REFRESH)); } - protected List> getSelectedIterators(Database database, - ConfiguredAirbyteCatalog catalog, - Map>> tableNameToTable, - StateManager stateManager, - Instant emittedAt, - Predicate selector) { + protected List> getSelectedIterators(final Database database, + final ConfiguredAirbyteCatalog catalog, + final Map>> tableNameToTable, + final StateManager stateManager, + final Instant emittedAt, + final Predicate selector) { final List> iteratorList = new ArrayList<>(); for (final ConfiguredAirbyteStream airbyteStream : catalog.getStreams()) { if (selector.test(airbyteStream)) { @@ -279,11 +280,11 @@ protected List> getSelectedIterators(Datab return iteratorList; } - protected AutoCloseableIterator createReadIterator(Database database, - ConfiguredAirbyteStream airbyteStream, - TableInfo> table, - StateManager stateManager, - Instant emittedAt) { + protected AutoCloseableIterator createReadIterator(final Database database, + final ConfiguredAirbyteStream airbyteStream, + final TableInfo> table, + final StateManager stateManager, + final Instant emittedAt) { final String streamName = airbyteStream.getStream().getName(); final String namespace = airbyteStream.getStream().getNamespace(); final AirbyteStreamNameNamespacePair pair = new AirbyteStreamNameNamespacePair(streamName, namespace); @@ -336,12 +337,12 @@ protected AutoCloseableIterator createReadIterator(Database data }); } - protected AutoCloseableIterator getIncrementalStream(Database database, - ConfiguredAirbyteStream airbyteStream, - List selectedDatabaseFields, - TableInfo> table, - String cursor, - Instant emittedAt) { + protected AutoCloseableIterator getIncrementalStream(final Database database, + final ConfiguredAirbyteStream airbyteStream, + final List selectedDatabaseFields, + final TableInfo> table, + final String cursor, + final Instant emittedAt) { final String streamName = airbyteStream.getStream().getName(); final String namespace = airbyteStream.getStream().getNamespace(); final String cursorField = IncrementalUtils.getCursorField(airbyteStream); @@ -366,18 +367,18 @@ protected AutoCloseableIterator getIncrementalStream(Database da return getMessageIterator(queryIterator, streamName, namespace, emittedAt.toEpochMilli()); } - protected AutoCloseableIterator getFullRefreshStream(Database database, - String streamName, - String namespace, - List selectedDatabaseFields, - TableInfo> table, - Instant emittedAt) { + protected AutoCloseableIterator getFullRefreshStream(final Database database, + final String streamName, + final String namespace, + final List selectedDatabaseFields, + final TableInfo> table, + final Instant emittedAt) { final AutoCloseableIterator queryStream = queryTableFullRefresh(database, selectedDatabaseFields, table.getNameSpace(), table.getName()); return getMessageIterator(queryStream, streamName, namespace, emittedAt.toEpochMilli()); } - protected String getFullyQualifiedTableName(String nameSpace, String tableName) { + protected String getFullyQualifiedTableName(final String nameSpace, final String tableName) { return nameSpace != null ? nameSpace + "." + tableName : tableName; } @@ -406,7 +407,7 @@ protected List> getTables(final Database database) throws Excep .collect(Collectors.toList()); } - protected void assertColumnsWithSameNameAreSame(String nameSpace, String tableName, List> columns) { + protected void assertColumnsWithSameNameAreSame(final String nameSpace, final String tableName, final List> columns) { columns.stream() .collect(Collectors.groupingBy(CommonField::getName)) .values() @@ -423,34 +424,34 @@ protected void assertColumnsWithSameNameAreSame(String nameSpace, String tableNa } protected List>> discoverWithoutSystemTables(final Database database) throws Exception { - Set systemNameSpaces = getExcludedInternalNameSpaces(); - List>> discoveredTables = discoverInternal(database); + final Set systemNameSpaces = getExcludedInternalNameSpaces(); + final List>> discoveredTables = discoverInternal(database); return (systemNameSpaces == null || systemNameSpaces.isEmpty() ? discoveredTables : discoveredTables.stream().filter(table -> !systemNameSpaces.contains(table.getNameSpace())).collect( Collectors.toList())); } - protected String getIdentifierWithQuoting(String identifier) { + protected String getIdentifierWithQuoting(final String identifier) { return getQuoteString() + identifier + getQuoteString(); } - protected String enquoteIdentifierList(List identifiers) { + protected String enquoteIdentifierList(final List identifiers) { final StringJoiner joiner = new StringJoiner(","); - for (String identifier : identifiers) { + for (final String identifier : identifiers) { joiner.add(getIdentifierWithQuoting(identifier)); } return joiner.toString(); } - protected String getFullTableName(String nameSpace, String tableName) { + protected String getFullTableName(final String nameSpace, final String tableName) { return (nameSpace == null || nameSpace.isEmpty() ? getIdentifierWithQuoting(tableName) : getIdentifierWithQuoting(nameSpace) + "." + getIdentifierWithQuoting(tableName)); } - public AutoCloseableIterator getMessageIterator(AutoCloseableIterator recordIterator, - String streamName, - String namespace, - long emittedAt) { + public AutoCloseableIterator getMessageIterator(final AutoCloseableIterator recordIterator, + final String streamName, + final String namespace, + final long emittedAt) { return AutoCloseableIterators.transform(recordIterator, r -> new AirbyteMessage() .withType(Type.RECORD) .withRecord(new AirbyteRecordMessage() @@ -460,21 +461,21 @@ public AutoCloseableIterator getMessageIterator(AutoCloseableIte .withData(r))); } - protected AutoCloseableIterator queryTable(Database database, String sqlQuery) { + protected AutoCloseableIterator queryTable(final Database database, final String sqlQuery) { return AutoCloseableIterators.lazyIterator(() -> { try { final Stream stream = database.query(sqlQuery); return AutoCloseableIterators.fromStream(stream); - } catch (Exception e) { + } catch (final Exception e) { throw new RuntimeException(e); } }); } - public AutoCloseableIterator queryTableFullRefresh(Database database, - List columnNames, - String schemaName, - String tableName) { + public AutoCloseableIterator queryTableFullRefresh(final Database database, + final List columnNames, + final String schemaName, + final String tableName) { LOGGER.info("Queueing query for table: {}", tableName); return queryTable(database, String.format("SELECT %s FROM %s", enquoteIdentifierList(columnNames), @@ -502,8 +503,8 @@ public abstract AutoCloseableIterator queryTableIncremental(Database d DataType cursorFieldType, String cursor); - private Database createDatabaseInternal(JsonNode sourceConfig) throws Exception { - Database database = createDatabase(sourceConfig); + private Database createDatabaseInternal(final JsonNode sourceConfig) throws Exception { + final Database database = createDatabase(sourceConfig); database.setSourceConfig(sourceConfig); database.setDatabaseConfig(toDatabaseConfig(sourceConfig)); return database; diff --git a/airbyte-integrations/infrastructure/ssh_tunnel/module/sql/postgresql-01-dbcreate.sql b/airbyte-integrations/infrastructure/ssh_tunnel/module/sql/postgresql-01-dbcreate.sql index d82073636d74..b3baf0145fd5 100644 --- a/airbyte-integrations/infrastructure/ssh_tunnel/module/sql/postgresql-01-dbcreate.sql +++ b/airbyte-integrations/infrastructure/ssh_tunnel/module/sql/postgresql-01-dbcreate.sql @@ -53,3 +53,59 @@ REVOKE ALL ON database postgres FROM public; + +# Test DATA used BY the postgres SOURCE test classes +SET +SCHEMA 'public'; + +CREATE + TABLE + id_and_name( + id INTEGER, + name VARCHAR(200) + ); + +INSERT + INTO + id_and_name( + id, + name + ) + VALUES( + 1, + 'picard' + ), + ( + 2, + 'crusher' + ), + ( + 3, + 'vash' + ); + +CREATE + TABLE + starships( + id INTEGER, + name VARCHAR(200) + ); + +INSERT + INTO + starships( + id, + name + ) + VALUES( + 1, + 'enterprise-d' + ), + ( + 2, + 'defiant' + ), + ( + 3, + 'yamato' + ); diff --git a/docs/integrations/sources/postgres.md b/docs/integrations/sources/postgres.md index c944f15d745f..5e6c576c5c80 100644 --- a/docs/integrations/sources/postgres.md +++ b/docs/integrations/sources/postgres.md @@ -246,6 +246,25 @@ Unfortunately, logical replication is not configurable for Google CloudSQL. You If you encounter one of those not listed below, please consider [contributing to our docs](https://github.com/airbytehq/airbyte/tree/master/docs) and providing setup instructions. +## Connection to Postgres via an SSH Tunnel + +Airbyte has the ability to connect to a Postgres instance via an SSH Tunnel. The reason you might want to do this because it is not possible (or against security policy) to connect to the database directly (e.g. it does not have a public IP address). + +When using an SSH tunnel, you are configuring Airbyte to connect to an intermediate server (a.k.a. a bastion sever) that _does_ have direct access to the database. Airbyte connects to the bastion and then asks the bastion to connect directly to the server. + +Using this feature requires additional configuration, when creating the source. We will talk through what each piece of configuration means. +1. Configure all fields for the source as you normally would, except `SSH Tunnel Method`. +2. `SSH Tunnel Method` defaults to `No Tunnel` (meaning a direct connection). If you want to use an SSH Tunnel choose `SSH Key Authentication` or `Password Authentication`. + 1. Choose `Key Authentication` if you will be using an RSA Private as your secrets for establishing the SSH Tunnel (see below for more information on generating this key). + 2. Choose `Password Authentication` if you will be using a password as your secret for establishing the SSH Tunnel. +3. `SSH Tunnel Jump Server Host` refers to the intermediate (bastion) server that Airbyte will connect to. This should be a hostname or an IP Address. +4. `SSH Connection Port` is the port on the bastion server with which to make the SSH connection. The default port for SSH connections is `22`, so unless you have explicitly changed something, go with the default. +5. `SSH Login Username` is the username that Airbyte should use when connection to the bastion server. This is NOT the Postgres username. +6. If you are using `Password Authentication`, then `SSH Login Username` should be set to the password of the User from the previous step. If you are using `SSH Key Authentication` leave this blank. Again, this is not the Postgres password, but the password for the OS-user that Airbyte is using to perform commands on the bastion. +7. If you are using `SSH Key Authentication`, then `SSH Private Key` should be set to the RSA Private Key that you are using to create the SSH connection. This should be the full contents of the key file starting with `-----BEGIN RSA PRIVATE KEY-----` and ending with `-----END RSA PRIVATE KEY-----`. + +### Generating an RSA Private Key +_Coming soon_ ## Changelog diff --git a/tools/bin/ci_credentials.sh b/tools/bin/ci_credentials.sh index 7cc5a8309fb7..746c970ba9d3 100755 --- a/tools/bin/ci_credentials.sh +++ b/tools/bin/ci_credentials.sh @@ -93,6 +93,8 @@ write_standard_creds source-mssql "$MSSQL_RDS_TEST_CREDS" write_standard_creds source-okta "$SOURCE_OKTA_TEST_CREDS" write_standard_creds source-plaid "$PLAID_INTEGRATION_TEST_CREDS" write_standard_creds source-paypal-transaction "$PAYPAL_TRANSACTION_CREDS" +write_standard_creds source-postgres "$POSTGRES_SSH_KEY_TEST_CREDS" "ssh-key-config.json" +write_standard_creds source-postgres "$POSTGRES_SSH_PWD_TEST_CREDS" "ssh-pwd-config.json" write_standard_creds source-posthog "$POSTHOG_TEST_CREDS" write_standard_creds source-pipedrive "$PIPEDRIVE_INTEGRATION_TESTS_CREDS" write_standard_creds source-quickbooks-singer "$QUICKBOOKS_TEST_CREDS"