diff --git a/airbyte-integrations/bases/base-java/build.gradle b/airbyte-integrations/bases/base-java/build.gradle index 34f8fca02d6d7..a429d6ddb61a7 100644 --- a/airbyte-integrations/bases/base-java/build.gradle +++ b/airbyte-integrations/bases/base-java/build.gradle @@ -5,6 +5,12 @@ plugins { dependencies { implementation 'commons-cli:commons-cli:1.4' + implementation group: 'org.apache.sshd', name: 'sshd-mina', version: '2.7.0' + // bouncycastle is pinned to version-match the transitive dependency from kubernetes client-java + // because a version conflict causes "parameter object not a ECParameterSpec" on ssh tunnel initiation + implementation group: 'org.bouncycastle', name: 'bcprov-jdk15on', version: '1.66' + implementation group: 'org.bouncycastle', name: 'bcpkix-jdk15on', version: '1.66' + implementation group: 'org.bouncycastle', name: 'bctls-jdk15on', version: '1.66' implementation project(':airbyte-protocol:models') implementation project(":airbyte-json-validation") diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/BaseConnector.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/BaseConnector.java index 2f4d3b0383bdb..c9dba9ab2a017 100644 --- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/BaseConnector.java +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/BaseConnector.java @@ -24,10 +24,12 @@ package io.airbyte.integrations; +import com.fasterxml.jackson.databind.JsonNode; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.resources.MoreResources; import io.airbyte.integrations.base.Integration; import io.airbyte.protocol.models.ConnectorSpecification; +import java.io.IOException; public abstract class BaseConnector implements Integration { @@ -42,7 +44,18 @@ public abstract class BaseConnector implements Integration { public ConnectorSpecification spec() throws Exception { // return a JsonSchema representation of the spec for the integration. final String resourceString = MoreResources.readResource("spec.json"); - return Jsons.deserialize(resourceString, ConnectorSpecification.class); + return Jsons.object(addToSpec(Jsons.deserialize(resourceString)), ConnectorSpecification.class); + } + + /** + * Extension point for child classes to add things to the spec json tree before it's deserialized + * into a ConnectorSpecification object. + * + * @param root + * @return root + */ + public JsonNode addToSpec(JsonNode root) throws IOException { + return root; } } diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/SSHTunnel.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/SSHTunnel.java new file mode 100644 index 0000000000000..0a5fde77de42b --- /dev/null +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/SSHTunnel.java @@ -0,0 +1,276 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.base; + +import com.fasterxml.jackson.databind.JsonNode; +import java.io.IOException; +import java.io.StringReader; +import java.net.URISyntaxException; +import java.security.KeyPair; +import java.security.NoSuchAlgorithmException; +import java.security.interfaces.RSAPrivateKey; +import java.security.interfaces.RSAPublicKey; +import java.security.spec.InvalidKeySpecException; +import org.apache.sshd.client.SshClient; +import org.apache.sshd.client.keyverifier.AcceptAllServerKeyVerifier; +import org.apache.sshd.client.session.ClientSession; +import org.apache.sshd.common.util.net.SshdSocketAddress; +import org.apache.sshd.server.forward.AcceptAllForwardingFilter; +import org.bouncycastle.asn1.x509.SubjectPublicKeyInfo; +import org.bouncycastle.openssl.PEMKeyPair; +import org.bouncycastle.openssl.PEMParser; +import org.bouncycastle.openssl.jcajce.JcaPEMKeyConverter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Encapsulates the connection configuration for an ssh tunnel port forward through a proxy/bastion + * host plus the remote host and remote port to forward to a specified local port. + */ +public class SSHTunnel { + + private static final Logger LOGGER = LoggerFactory.getLogger(SSHTunnel.class); + + public static final int TIMEOUT_MILLIS = 15000; // 15 seconds + private final String method; + private final String host; + private final String tunnelSshPort; + private final String user; + private final String sshkey; + private final String password; + private final String remoteDatabaseHost; + private final String remoteDatabasePort; + private final String tunnelDatabasePort; + + private SSHTunnel tunnelConfig = null; + private SshClient sshclient = null; + private ClientSession tunnelSession = null; + + public SSHTunnel(String method, + String host, + String tunnelSshPort, + String user, + String sshkey, + String password, + String remoteDatabaseHost, + String remoteDatabasePort, + String tunnelDatabasePort) { + if (method == null) { + this.method = "NO_TUNNEL"; + } else { + this.method = method; + } + this.host = host; + this.tunnelSshPort = tunnelSshPort; + this.user = user; + this.sshkey = sshkey; + this.password = password; + this.remoteDatabaseHost = remoteDatabaseHost; + this.remoteDatabasePort = remoteDatabasePort; + this.tunnelDatabasePort = tunnelDatabasePort; + } + + public static SSHTunnel getInstance(JsonNode config) { + JsonNode ourConfig = config.get("tunnel_method"); + SSHTunnel sshconfig = new SSHTunnel( + getConfigValueOrNull(ourConfig, "tunnel_method"), + getConfigValueOrNull(ourConfig, "tunnel_host"), + getConfigValueOrNull(ourConfig, "tunnel_ssh_port"), + getConfigValueOrNull(ourConfig, "tunnel_username"), + getConfigValueOrNull(ourConfig, "tunnel_usersshkey"), + getConfigValueOrNull(ourConfig, "tunnel_userpass"), + getConfigValueOrNull(ourConfig, "tunnel_db_remote_host"), + getConfigValueOrNull(ourConfig, "tunnel_db_remote_port"), + getConfigValueOrNull(ourConfig, "tunnel_localport")); + return sshconfig; + } + + static String getConfigValueOrNull(JsonNode config, String key) { + return config != null && config.has(key) ? config.get(key).asText() : null; + } + + /** + * Starts an ssh session; wrap this in a try-finally and use closeTunnel() to close it. + * + * @throws IOException + */ + public void openTunnelIfRequested() { + if (shouldTunnel()) { + if (tunnelSession != null || sshclient != null) { + throw new RuntimeException("SSH Tunnel was requested to be opened while it was already open. This is a coding error."); + } + sshclient = createClient(); + tunnelSession = openTunnel(sshclient); + } + } + + /** + * Closes a tunnel if one was open, and otherwise doesn't do anything (safe to run). + */ + public void closeTunnel() { + try { + if (shouldTunnel()) { + if (tunnelSession != null) { + tunnelSession.close(); + } + if (sshclient != null) { + sshclient.stop(); + } + tunnelSession = null; + sshclient = null; + } + } catch (Throwable t) { + throw new RuntimeException(t); + } + } + + public boolean shouldTunnel() { + return method != null && !"NO_TUNNEL".equals(method); + } + + public String getMethod() { + return method; + } + + public String getHost() { + return host; + } + + public String getTunnelSshPort() { + return tunnelSshPort; + } + + public String getUser() { + return user; + } + + private String getSSHKey() { + return sshkey; + } + + private String getPassword() { + return password; + } + + public String getRemoteDatabaseHost() { + return remoteDatabaseHost; + } + + public String getRemoteDatabasePort() { + return remoteDatabasePort; + } + + public String getTunnelDatabasePort() { + return tunnelDatabasePort; + } + + /** + * From the RSA format private key string, use bouncycastle to deserialize the key pair, reconstruct + * the keys from the key info, and return the key pair for use in authentication. + * + * @return + * @throws IOException + */ + private KeyPair getPrivateKeyPair() throws IOException { + PEMParser pemParser = new PEMParser(new StringReader(getSSHKey())); + PEMKeyPair keypair = (PEMKeyPair) pemParser.readObject(); + JcaPEMKeyConverter converter = new JcaPEMKeyConverter(); + return new KeyPair( + (RSAPublicKey) converter.getPublicKey(SubjectPublicKeyInfo.getInstance(keypair.getPublicKeyInfo())), + (RSAPrivateKey) converter.getPrivateKey(keypair.getPrivateKeyInfo())); + } + + /** + * Generates a new ssh client and returns it, with forwarding set to accept all types; use this + * before opening a tunnel. + * + * @return + */ + private SshClient createClient() { + java.security.Security.addProvider( + new org.bouncycastle.jce.provider.BouncyCastleProvider()); + SshClient client = SshClient.setUpDefaultClient(); + client.setForwardingFilter(AcceptAllForwardingFilter.INSTANCE); + client.setServerKeyVerifier(AcceptAllServerKeyVerifier.INSTANCE); + return client; + } + + private void validate() { + if (getHost() == null) { + throw new RuntimeException("SSH Tunnel host is null - verify configuration before starting tunnel!"); + } + } + + /** + * Starts an ssh session; wrap this in a try-finally and use closeTunnel() to close it. + * + * @return + * @throws IOException + * @throws InvalidKeySpecException + * @throws NoSuchAlgorithmException + * @throws URISyntaxException + */ + private ClientSession openTunnel(SshClient client) { + try { + validate(); + client.start(); + ClientSession session = client.connect( + getUser().trim(), + getHost().trim(), + Integer.parseInt( + getTunnelSshPort().trim())) + .verify(TIMEOUT_MILLIS) + .getSession(); + if (getMethod().equals("SSH_KEY_AUTH")) { + session.addPublicKeyIdentity(getPrivateKeyPair()); + } + if (getMethod().equals("SSH_PASSWORD_AUTH")) { + session.addPasswordIdentity(getPassword()); + } + session.auth().verify(TIMEOUT_MILLIS); + SshdSocketAddress address = session.startLocalPortForwarding( + new SshdSocketAddress(SshdSocketAddress.LOCALHOST_ADDRESS.getHostName(), Integer.parseInt(getTunnelDatabasePort().trim())), + new SshdSocketAddress(getRemoteDatabaseHost().trim(), Integer.parseInt(getRemoteDatabasePort().trim()))); + LOGGER.info("Established tunneling session. Port forwarding started on " + address.toInetSocketAddress()); + return session; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public String toString() { + return "SSHTunnel{" + + "method='" + method + '\'' + + ", host='" + host + '\'' + + ", tunnelSshPort='" + tunnelSshPort + '\'' + + ", user='" + user + '\'' + + ", remoteDatabaseHost='" + remoteDatabaseHost + '\'' + + ", remoteDatabasePort='" + remoteDatabasePort + '\'' + + ", tunnelDatabasePort='" + tunnelDatabasePort + '\'' + + '}'; + } + +} diff --git a/airbyte-integrations/connectors/destination-jdbc/build.gradle b/airbyte-integrations/connectors/destination-jdbc/build.gradle index cec590fe388cc..5ce268da4cc10 100644 --- a/airbyte-integrations/connectors/destination-jdbc/build.gradle +++ b/airbyte-integrations/connectors/destination-jdbc/build.gradle @@ -11,6 +11,7 @@ application { dependencies { implementation 'com.google.cloud:google-cloud-storage:1.113.16' implementation 'com.google.auth:google-auth-library-oauth2-http:0.25.5' + implementation group: 'org.apache.sshd', name: 'sshd-mina', version: '2.7.0' implementation project(':airbyte-db') implementation project(':airbyte-integrations:bases:base-java') diff --git a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/AbstractJdbcDestination.java b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/AbstractJdbcDestination.java index 61922be562d9d..201e098cb046a 100644 --- a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/AbstractJdbcDestination.java +++ b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/AbstractJdbcDestination.java @@ -25,17 +25,22 @@ package io.airbyte.integrations.destination.jdbc; import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.resources.MoreResources; import io.airbyte.db.Databases; import io.airbyte.db.jdbc.JdbcDatabase; import io.airbyte.db.jdbc.JdbcUtils; import io.airbyte.integrations.BaseConnector; import io.airbyte.integrations.base.AirbyteMessageConsumer; import io.airbyte.integrations.base.Destination; +import io.airbyte.integrations.base.SSHTunnel; import io.airbyte.integrations.destination.NamingConventionTransformer; import io.airbyte.protocol.models.AirbyteConnectionStatus; import io.airbyte.protocol.models.AirbyteConnectionStatus.Status; import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import java.io.IOException; import java.util.UUID; import java.util.function.Consumer; import org.slf4j.Logger; @@ -57,6 +62,13 @@ protected NamingConventionTransformer getNamingResolver() { return namingResolver; } + @Override + public JsonNode addToSpec(JsonNode spec) throws IOException { + ObjectNode propNode = (ObjectNode) spec.get("connectionSpecification").get("properties"); + propNode.set("tunnel_method", Jsons.deserialize(MoreResources.readResource("ssh-tunnel-spec.json"))); + return spec; + } + protected SqlOperations getSqlOperations() { return sqlOperations; } @@ -71,8 +83,9 @@ public AbstractJdbcDestination(final String driverClass, @Override public AirbyteConnectionStatus check(JsonNode config) { - + SSHTunnel sshTunnel = SSHTunnel.getInstance(config); try (final JdbcDatabase database = getDatabase(config)) { + sshTunnel.openTunnelIfRequested(); String outputSchema = namingResolver.getIdentifier(config.get("schema").asText()); attemptSQLCreateAndDropTableOperations(outputSchema, database, namingResolver, sqlOperations); return new AirbyteConnectionStatus().withStatus(Status.SUCCEEDED); @@ -81,6 +94,8 @@ public AirbyteConnectionStatus check(JsonNode config) { return new AirbyteConnectionStatus() .withStatus(Status.FAILED) .withMessage("Could not connect with provided configuration. \n" + e.getMessage()); + } finally { + sshTunnel.closeTunnel(); } } @@ -114,7 +129,8 @@ protected JdbcDatabase getDatabase(JsonNode config) { @Override public AirbyteMessageConsumer getConsumer(JsonNode config, ConfiguredAirbyteCatalog catalog, Consumer outputRecordCollector) { - return JdbcBufferedConsumerFactory.create(outputRecordCollector, getDatabase(config), sqlOperations, namingResolver, config, catalog); + return JdbcBufferedConsumerFactory + .create(outputRecordCollector, getDatabase(config), SSHTunnel.getInstance(config), sqlOperations, namingResolver, config, catalog); } } diff --git a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/JdbcBufferedConsumerFactory.java b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/JdbcBufferedConsumerFactory.java index e1f3fcef08b40..f80ecb1c6b040 100644 --- a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/JdbcBufferedConsumerFactory.java +++ b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/JdbcBufferedConsumerFactory.java @@ -30,6 +30,7 @@ import io.airbyte.db.jdbc.JdbcDatabase; import io.airbyte.integrations.base.AirbyteMessageConsumer; import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; +import io.airbyte.integrations.base.SSHTunnel; import io.airbyte.integrations.destination.NamingConventionTransformer; import io.airbyte.integrations.destination.buffered_stream_consumer.BufferedStreamConsumer; import io.airbyte.integrations.destination.buffered_stream_consumer.OnCloseFunction; @@ -67,6 +68,7 @@ public class JdbcBufferedConsumerFactory { public static AirbyteMessageConsumer create(Consumer outputRecordCollector, JdbcDatabase database, + SSHTunnel sshTunnel, SqlOperations sqlOperations, NamingConventionTransformer namingResolver, JsonNode config, @@ -75,9 +77,9 @@ public static AirbyteMessageConsumer create(Consumer outputRecor return new BufferedStreamConsumer( outputRecordCollector, - onStartFunction(database, sqlOperations, writeConfigs), + onStartFunction(database, sshTunnel, sqlOperations, writeConfigs), recordWriterFunction(database, sqlOperations, writeConfigs, catalog), - onCloseFunction(database, sqlOperations, writeConfigs), + onCloseFunction(database, sshTunnel, sqlOperations, writeConfigs), catalog, sqlOperations::isValidData, MAX_BATCH_SIZE); @@ -134,8 +136,12 @@ private static String getOutputSchema(AirbyteStream stream, String defaultDestSc return defaultDestSchema; } - private static OnStartFunction onStartFunction(JdbcDatabase database, SqlOperations sqlOperations, List writeConfigs) { + private static OnStartFunction onStartFunction(JdbcDatabase database, + SSHTunnel sshTunnel, + SqlOperations sqlOperations, + List writeConfigs) { return () -> { + sshTunnel.openTunnelIfRequested(); LOGGER.info("Preparing tmp tables in destination started for {} streams", writeConfigs.size()); for (final WriteConfig writeConfig : writeConfigs) { final String schemaName = writeConfig.getOutputSchemaName(); @@ -168,7 +174,10 @@ private static RecordWriter recordWriterFunction(JdbcDatabase database, }; } - private static OnCloseFunction onCloseFunction(JdbcDatabase database, SqlOperations sqlOperations, List writeConfigs) { + private static OnCloseFunction onCloseFunction(JdbcDatabase database, + SSHTunnel sshTunnel, + SqlOperations sqlOperations, + List writeConfigs) { return (hasFailed) -> { // copy data if (!hasFailed) { @@ -206,6 +215,7 @@ private static OnCloseFunction onCloseFunction(JdbcDatabase database, SqlOperati sqlOperations.dropTableIfExists(database, schemaName, tmpTableName); } LOGGER.info("Cleaning tmp tables in destination completed."); + sshTunnel.closeTunnel(); }; } diff --git a/airbyte-integrations/connectors/destination-jdbc/src/main/resources/ssh-tunnel-spec.json b/airbyte-integrations/connectors/destination-jdbc/src/main/resources/ssh-tunnel-spec.json new file mode 100644 index 0000000000000..0b8521f1a07a5 --- /dev/null +++ b/airbyte-integrations/connectors/destination-jdbc/src/main/resources/ssh-tunnel-spec.json @@ -0,0 +1,167 @@ +{ + "type": "object", + "title": "SSH Tunnel Method", + "description": "Whether to initiate an SSH tunnel before connecting to the database, and if so, which kind of authentication to use.", + "oneOf": [ + { + "title": "No Tunnel", + "required": ["tunnel_method"], + "properties": { + "tunnel_method": { + "description": "No ssh tunnel needed to connect to database", + "type": "string", + "enum": ["NO_TUNNEL"], + "default": "NO_TUNNEL", + "order": 0 + } + } + }, + { + "title": "SSH Key Authentication", + "required": [ + "tunnel_method", + "tunnel_host", + "tunnel_ssh_port", + "tunnel_username", + "tunnel_localport", + "tunnel_usersshkey" + ], + "properties": { + "tunnel_method": { + "description": "Connect through a jump server tunnel host using username and ssh key", + "type": "string", + "enum": ["SSH_KEY_AUTH"], + "default": "SSH_KEY_AUTH", + "order": 0 + }, + "tunnel_host": { + "title": "SSH Tunnel Jump Server Host", + "description": "Hostname of the jump server host that allows inbound ssh tunnel.", + "type": "string", + "order": 1 + }, + "tunnel_ssh_port": { + "title": "SSH Connection Port", + "description": "Port on the proxy/jump server that accepts inbound ssh connections.", + "type": "integer", + "minimum": 0, + "maximum": 65536, + "default": 22, + "examples": ["22"], + "order": 2 + }, + "tunnel_username": { + "title": "SSH Login Username", + "description": "OS-level username for logging into the jump server host.", + "type": "string", + "order": 3 + }, + "tunnel_usersshkey": { + "title": "SSH Private Key", + "description": "OS-level user account ssh key credentials for logging into the jump server host.", + "type": "string", + "airbyte_secret": true, + "multiline": true, + "order": 4 + }, + "tunnel_db_remote_host": { + "title": "Remote Database Host", + "description": "Hostname or static IP address of the database to port-forward, as recognized from the jump server.", + "type": "string", + "order": 5 + }, + "tunnel_db_remote_port": { + "title": "Remote Database Port", + "description": "Port on the database to port-forward, typically that database's usual default port.", + "type": "integer", + "minimum": 0, + "maximum": 65536, + "examples": ["5432"], + "order": 6 + }, + "tunnel_localport": { + "title": "SSH Tunnel Database Port", + "description": "Port on the jump server host for the database's port to forward to. Do not share a port between database instances.", + "type": "integer", + "minimum": 0, + "maximum": 65536, + "examples": ["5000"], + "order": 7 + } + } + }, + { + "title": "Password Authentication", + "required": [ + "tunnel_method", + "tunnel_host", + "tunnel_ssh_port", + "tunnel_username", + "tunnel_localport", + "tunnel_userpass" + ], + "properties": { + "tunnel_method": { + "description": "Connect through a jump server tunnel host using username and password authentication", + "type": "string", + "enum": ["SSH_PASSWORD_AUTH"], + "default": "SSH_PASSWORD_AUTH", + "order": 0 + }, + "tunnel_host": { + "title": "SSH Tunnel Jump Server Host", + "description": "Hostname of the jump server host that allows inbound ssh tunnel.", + "type": "string", + "order": 1 + }, + "tunnel_ssh_port": { + "title": "SSH Connection Port", + "description": "Port on the proxy/jump server that accepts inbound ssh connections.", + "type": "integer", + "minimum": 0, + "maximum": 65536, + "default": 22, + "examples": ["22"], + "order": 2 + }, + "tunnel_username": { + "title": "SSH Login Username", + "description": "OS-level username for logging into the jump server host", + "type": "string", + "order": 3 + }, + "tunnel_userpass": { + "title": "Password", + "description": "OS-level password for logging into the jump server host", + "type": "string", + "airbyte_secret": true, + "order": 4 + }, + "tunnel_db_remote_host": { + "title": "Remote Database Host", + "description": "Hostname or static IP address of the database to port-forward, as recognized from the jump server.", + "type": "string", + "order": 5 + }, + "tunnel_db_remote_port": { + "title": "Remote Database Port", + "description": "Port on the database to port-forward, typically that database's usual default port.", + "type": "integer", + "minimum": 0, + "maximum": 65536, + "examples": ["5432"], + "order": 6 + }, + "tunnel_localport": { + "title": "SSH Tunnel Database Port", + "description": "Port on the jump server host for the database's port to forward to. Do not share a port between database instances.", + "type": "integer", + "minimum": 0, + "maximum": 65536, + "examples": ["5000"], + "order": 7 + } + } + } + ] +} diff --git a/airbyte-integrations/connectors/destination-postgres/build.gradle b/airbyte-integrations/connectors/destination-postgres/build.gradle index 0d63c4007ab28..0f0a0561e0147 100644 --- a/airbyte-integrations/connectors/destination-postgres/build.gradle +++ b/airbyte-integrations/connectors/destination-postgres/build.gradle @@ -16,6 +16,9 @@ dependencies { testImplementation project(':airbyte-test-utils') + implementation project(':airbyte-workers') + implementation project(':airbyte-config:models') + testImplementation "org.testcontainers:postgresql:1.15.1" integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-destination-test') diff --git a/airbyte-integrations/connectors/destination-postgres/src/test-integration/java/io/airbyte/integrations/destination/postgres/PostgresDestinationSSHTunnelKeyAcceptanceTest.java b/airbyte-integrations/connectors/destination-postgres/src/test-integration/java/io/airbyte/integrations/destination/postgres/PostgresDestinationSSHTunnelKeyAcceptanceTest.java new file mode 100644 index 0000000000000..51b621eb0e887 --- /dev/null +++ b/airbyte-integrations/connectors/destination-postgres/src/test-integration/java/io/airbyte/integrations/destination/postgres/PostgresDestinationSSHTunnelKeyAcceptanceTest.java @@ -0,0 +1,331 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.postgres; + + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import io.airbyte.commons.io.IOs; +import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.lang.Exceptions; +import io.airbyte.commons.resources.MoreResources; +import io.airbyte.commons.util.MoreLists; +import io.airbyte.config.JobGetSpecConfig; +import io.airbyte.config.WorkerDestinationConfig; +import io.airbyte.db.Database; +import io.airbyte.db.Databases; +import io.airbyte.integrations.base.JavaBaseConstants; +import io.airbyte.integrations.base.SSHTunnel; +import io.airbyte.integrations.standardtest.destination.DataArgumentsProvider; +import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest; +import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest.TestDestinationEnv; +import io.airbyte.protocol.models.AirbyteCatalog; +import io.airbyte.protocol.models.AirbyteMessage; +import io.airbyte.protocol.models.AirbyteMessage.Type; +import io.airbyte.protocol.models.CatalogHelpers; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import io.airbyte.protocol.models.ConnectorSpecification; +import io.airbyte.workers.DefaultCheckConnectionWorker; +import io.airbyte.workers.DefaultGetSpecWorker; +import io.airbyte.workers.WorkerUtils; +import io.airbyte.workers.normalization.NormalizationRunner; +import io.airbyte.workers.normalization.NormalizationRunnerFactory; +import io.airbyte.workers.process.AirbyteIntegrationLauncher; +import io.airbyte.workers.process.DockerProcessFactory; +import io.airbyte.workers.process.ProcessFactory; +import io.airbyte.workers.protocols.airbyte.AirbyteDestination; +import io.airbyte.workers.protocols.airbyte.DefaultAirbyteDestination; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.stream.Collectors; +import org.junit.After; +import org.junit.Before; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import io.airbyte.config.StandardCheckConnectionInput; +import io.airbyte.config.StandardCheckConnectionOutput; +import io.airbyte.config.StandardCheckConnectionOutput.Status; +import io.airbyte.workers.WorkerException; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ArgumentsSource; + +/** + * Tests that we can connect to a postgres destination that's tucked away behind an ssh tunnel. Presumes a bastion host and postgres database are + * already set up and configured for this scenario. Uses a generated hash for schema name (and presumes that the service account user has schema + * create-delete privs) so that the test can run concurrently without clobbering its other run. + */ +public class PostgresDestinationSSHTunnelKeyAcceptanceTest { + + private static final String UNIQUE_RUN_HASH = UUID.randomUUID().toString().replace('-','x').substring(0,10); + private static final String JOB_ID = "0"; + private static final int JOB_ATTEMPT = 0; + + private TestDestinationEnv testEnv; + private Path jobRoot; + protected Path localRoot; + private ProcessFactory processFactory; + + /** + * Verify that the worker returns a valid spec. + */ + @Test + public void testGetSpec() throws WorkerException { + ConnectorSpecification spec = new DefaultGetSpecWorker( + new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), processFactory)) + .run(new JobGetSpecConfig().withDockerImage(getImageName()), jobRoot); + assertNotNull(spec); + } + + + /** + * Configured to use an ssh tunnel with key authentication. + */ + protected JsonNode getConfig() { + return Jsons.deserialize(IOs.readFile(Path.of("secrets/postgres_destination_ssh_tunnel_key_config.json"))); + } + + /** + * Configured to fail, similar to the standard test suite. + */ + protected JsonNode getFailCheckConfig() { + JsonNode config = Jsons.deserialize(IOs.readFile(Path.of("secrets/postgres_destination_ssh_tunnel_key_config.json"))); + ((ObjectNode) config).put("password", "invalidvalue"); + return config; + } + + /** + * Verify that when given valid credentials, that check connection returns a success response. Assume that the getConfig() is valid. + */ + @Test + public void testCheckConnection() throws Exception { + assertEquals(Status.SUCCEEDED, runCheck(getConfig()).getStatus()); + } + + /** + * Verify that when given invalid credentials, that check connection returns a failed response. Assume that the getFailCheckConfig() is invalid. + */ + @Test + public void testCheckConnectionInvalidCredentials() throws Exception { + assertEquals(Status.FAILED, runCheck(getFailCheckConfig()).getStatus()); + } + + private StandardCheckConnectionOutput runCheck(JsonNode config) throws WorkerException { + return + new DefaultCheckConnectionWorker(new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, + getImageName(), processFactory)).run(new + StandardCheckConnectionInput().withConnectionConfiguration(config), jobRoot); + } + + /** + * Prepares the ability to launch a docker worker process for the connector worker. + */ + @BeforeEach + void setUpInternal() throws Exception { + Files.createDirectories(Path.of("/tmp/airbyte_tests/")); + final Path workspaceRoot = Files.createTempDirectory(Path.of("/tmp/airbyte_tests/"), "test"); + jobRoot = Files.createDirectories(Path.of(workspaceRoot.toString(), "job")); + localRoot = Files.createTempDirectory(Path.of("/tmp/airbyte_tests/"), "output"); + processFactory = new DockerProcessFactory(workspaceRoot, workspaceRoot.toString(), localRoot.toString(), "host"); + } + + @Before + void setUp() throws Exception { + createUniqueSchema(); + } + + @After + void tearDown() throws Exception { + dropUniqueSchema(); + } + + /** + * Ideally, creating and dropping the transient schema used for a testing should be accomplished + * in the connector itself (since it is encapsulating the knowledge of the database internals) + * but for now that API spec doesn't support that, so we'll do it here. + * @throws Exception + */ + void dropUniqueSchema() throws Exception { + final JsonNode config = getConfig(); + // Let's go clean up the schema we used for this test run, with a hashcode in its name. + String uniqueRunSchema = config.get("schema").asText() + UNIQUE_RUN_HASH; + ((ObjectNode) config).put("schema",uniqueRunSchema); + + SSHTunnel tunnel = null; + try { + tunnel = SSHTunnel.getInstance(config); + tunnel.openTunnelIfRequested(); + + String jdbcUrl = String.format("jdbc:postgresql://%s:%s/", config.get("host").asText(), config.get("port").asInt()); + Database database = Databases.createPostgresDatabase(config.get("username").asText(), + config.get("password").asText(), jdbcUrl); + database.query(ctx -> { + ctx.fetch(String.format("DROP DATABASE %s;", uniqueRunSchema)); + ctx.fetch(String.format("DROP DATABASE _%s;", uniqueRunSchema)); + return null; + }); + } finally { + tunnel.closeTunnel(); + } + } + + /** + * Ideally, creating and dropping the transient schema used for a testing should be accomplished + * in the connector itself (since it is encapsulating the knowledge of the database internals) + * but for now that API spec doesn't support that, so we'll do it here. + * @throws Exception + */ + void createUniqueSchema() throws Exception { + final JsonNode config = getConfig(); + // Let's go clean up the schema we used for this test run, with a hashcode in its name. + String uniqueRunSchema = config.get("schema").asText() + UNIQUE_RUN_HASH; + ((ObjectNode) config).put("schema",uniqueRunSchema); + + SSHTunnel tunnel = null; + try { + tunnel = SSHTunnel.getInstance(config); + tunnel.openTunnelIfRequested(); + + String jdbcUrl = String.format("jdbc:postgresql://%s:%s/", config.get("host").asText(), config.get("port").asInt()); + Database database = Databases.createPostgresDatabase(config.get("username").asText(), + config.get("password").asText(), jdbcUrl); + database.query(ctx -> { + ctx.fetch(String.format("CREATE SCHEMA IF NOT EXISTS %s;", uniqueRunSchema)); + return null; + }); + } finally { + tunnel.closeTunnel(); + } + } + + protected String getImageName() { + return "airbyte/destination-postgres:dev"; + } + + /** + * Verify that the integration successfully writes records. + * Based on DestinationAcceptanceTest but adapted to support concurrency across a persistent external db. + */ + @ParameterizedTest + @ArgumentsSource(DataArgumentsProvider.class) + public void testSync(String messagesFilename, String catalogFilename) throws Exception { + final AirbyteCatalog catalog = Jsons.deserialize(MoreResources.readResource(catalogFilename), AirbyteCatalog.class); + final ConfiguredAirbyteCatalog configuredCatalog = CatalogHelpers.toDefaultConfiguredCatalog(catalog); + final List messages = MoreResources.readResource(messagesFilename).lines() + .map(record -> Jsons.deserialize(record, AirbyteMessage.class)).collect(Collectors.toList()); + + final JsonNode config = getConfig(); + ((ObjectNode) config).put("schema",config.get("schema") + UNIQUE_RUN_HASH); + runSyncAndVerifyStateOutput(config, messages, configuredCatalog, false); + + // The retrieveRaw line goes into a section of code that needs to be refactored. It + // incorrectly assumes that it can create arbitrary connections to the database from anywhere + // based on internal assumptions about the jdbc connection configuration, and does not leave + // room for an ssh tunnel. It should be passing in connection config instead and allowing top level control. + // TODO: We should come back and clean that up later - this section is the primary reason + // I created a separate acceptance test instead of reusing the existing hierarchy. + // The other reason is the unique has on a run. + //retrieveRawRecordsAndAssertSameMessages(catalog, messages, defaultSchema); + } + + protected void runSyncAndVerifyStateOutput(JsonNode config, + List messages, + ConfiguredAirbyteCatalog catalog, + boolean runNormalization) + throws Exception { + final List destinationOutput = runSync(config, messages, catalog, runNormalization); + final AirbyteMessage expectedStateMessage = MoreLists.reversed(messages) + .stream() + .filter(m -> m.getType() == Type.STATE) + .findFirst() + .orElseThrow(() -> new IllegalArgumentException("All message sets used for testing should include a state record")); + + final AirbyteMessage actualStateMessage = MoreLists.reversed(destinationOutput) + .stream() + .filter(m -> m.getType() == Type.STATE) + .findFirst() + .orElseGet(() -> { + fail("Destination failed to output state"); + return null; + }); + + assertEquals(expectedStateMessage, actualStateMessage); + } + + protected AirbyteDestination getDestination() { + return new DefaultAirbyteDestination(new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), processFactory)); + } + + + private List runSync(JsonNode config, List messages, ConfiguredAirbyteCatalog catalog, boolean runNormalization) + throws Exception { + + final WorkerDestinationConfig destinationConfig = new WorkerDestinationConfig() + .withConnectionId(UUID.randomUUID()) + .withCatalog(catalog) + .withDestinationConnectionConfiguration(config); + + final AirbyteDestination destination = getDestination(); + + destination.start(destinationConfig, jobRoot); + messages.forEach(message -> Exceptions.toRuntime(() -> destination.accept(message))); + destination.notifyEndOfStream(); + + List destinationOutput = new ArrayList<>(); + while (!destination.isFinished()) { + destination.attemptRead().ifPresent(destinationOutput::add); + } + + destination.close(); + + if (!runNormalization) { + return destinationOutput; + } + + final NormalizationRunner runner = NormalizationRunnerFactory.create( + getImageName(), + processFactory); + runner.start(); + final Path normalizationRoot = Files.createDirectories(jobRoot.resolve("normalize")); + if (!runner.normalize(JOB_ID, JOB_ATTEMPT, normalizationRoot, destinationConfig.getDestinationConnectionConfiguration(), + destinationConfig.getCatalog(), WorkerUtils.DEFAULT_RESOURCE_REQUIREMENTS)) { + throw new WorkerException("Normalization Failed."); + } + runner.close(); + return destinationOutput; + } + + +} diff --git a/airbyte-integrations/connectors/destination-postgres/src/test-integration/java/io/airbyte/integrations/destination/postgres/PostgresDestinationSSHTunnelPasswordAcceptanceTest.java b/airbyte-integrations/connectors/destination-postgres/src/test-integration/java/io/airbyte/integrations/destination/postgres/PostgresDestinationSSHTunnelPasswordAcceptanceTest.java new file mode 100644 index 0000000000000..466e9c39a4d52 --- /dev/null +++ b/airbyte-integrations/connectors/destination-postgres/src/test-integration/java/io/airbyte/integrations/destination/postgres/PostgresDestinationSSHTunnelPasswordAcceptanceTest.java @@ -0,0 +1,29 @@ +package io.airbyte.integrations.destination.postgres; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import io.airbyte.commons.io.IOs; +import io.airbyte.commons.json.Jsons; +import java.nio.file.Path; + +public class PostgresDestinationSSHTunnelPasswordAcceptanceTest extends PostgresDestinationSSHTunnelKeyAcceptanceTest { + + /** + * Configured to use an ssh tunnel with password authentication. + */ + @Override + protected JsonNode getConfig() { + return Jsons.deserialize(IOs.readFile(Path.of("secrets/postgres_destination_ssh_tunnel_password_config.json"))); + } + + /** + * Configured to fail, similar to the standard test suite. + */ + protected JsonNode getFailCheckConfig() { + JsonNode config = Jsons.deserialize(IOs.readFile(Path.of("secrets/postgres_destination_ssh_tunnel_password_config.json"))); + ((ObjectNode) config).put("password", "invalidvalue"); + return config; + } + + +} diff --git a/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java b/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java index 18e179e441f94..0d1015b0d03b3 100644 --- a/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java +++ b/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java @@ -31,6 +31,7 @@ import io.airbyte.commons.json.Jsons; import io.airbyte.commons.util.AutoCloseableIterator; import io.airbyte.commons.util.AutoCloseableIterators; +import io.airbyte.integrations.base.SSHTunnel; import io.airbyte.db.Databases; import io.airbyte.db.SqlDatabase; import io.airbyte.db.jdbc.JdbcDatabase; @@ -39,7 +40,11 @@ import io.airbyte.integrations.base.Source; import io.airbyte.integrations.source.relationaldb.AbstractRelationalDbSource; import io.airbyte.integrations.source.relationaldb.TableInfo; +import io.airbyte.protocol.models.AirbyteCatalog; +import io.airbyte.protocol.models.AirbyteConnectionStatus; +import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.CommonField; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import io.airbyte.protocol.models.JsonSchemaPrimitive; import java.sql.JDBCType; import java.sql.PreparedStatement; @@ -153,7 +158,8 @@ public List>> discoverInternal(final JdbcDatabas f.get(INTERNAL_COLUMN_TYPE))); jdbcType = JDBCType.VARCHAR; } - return new CommonField(f.get(INTERNAL_COLUMN_NAME).asText(), jdbcType) {}; + return new CommonField(f.get(INTERNAL_COLUMN_NAME).asText(), jdbcType) { + }; }) .collect(Collectors.toList())) .build()) @@ -167,7 +173,7 @@ protected JsonSchemaPrimitive getType(JDBCType columnType) { @Override protected Map> discoverPrimaryKeys(JdbcDatabase database, - List>> tableInfos) { + List>> tableInfos) { LOGGER.info("Discover primary keys for tables: " + tableInfos.stream().map(tab -> tab.getName()).collect( Collectors.toSet())); try { @@ -214,12 +220,12 @@ protected String getQuoteString() { @Override public AutoCloseableIterator queryTableIncremental(JdbcDatabase database, - List columnNames, - String schemaName, - String tableName, - String cursorField, - JDBCType cursorFieldType, - String cursor) { + List columnNames, + String schemaName, + String tableName, + String cursorField, + JDBCType cursorFieldType, + String cursor) { LOGGER.info("Queueing query for table: {}", tableName); return AutoCloseableIterators.lazyIterator(() -> { try { @@ -262,4 +268,39 @@ public JdbcDatabase createDatabase(JsonNode config) throws SQLException { return database; } + @Override + public AirbyteConnectionStatus check(JsonNode config) { + SSHTunnel tunnel = null; + try { + tunnel = SSHTunnel.getInstance(config); + tunnel.openTunnelIfRequested(); + return super.check(config); + } finally { + tunnel.closeTunnel(); + } + } + + @Override + public AirbyteCatalog discover(JsonNode config) throws Exception { + SSHTunnel tunnel = null; + try { + tunnel = SSHTunnel.getInstance(config); + tunnel.openTunnelIfRequested(); + return super.discover(config); + } finally { + tunnel.closeTunnel(); + } + } + + @Override + public AutoCloseableIterator read(JsonNode config, ConfiguredAirbyteCatalog catalog, JsonNode state) throws Exception { + SSHTunnel tunnel = null; + try { + tunnel = SSHTunnel.getInstance(config); + tunnel.openTunnelIfRequested(); + return super.read(config, catalog, state); + } finally { + tunnel.closeTunnel(); + } + } } diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java index f8c3590baf2b3..b9e56a46f221d 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java @@ -39,6 +39,7 @@ import io.airbyte.db.jdbc.JdbcUtils; import io.airbyte.db.jdbc.PostgresJdbcStreamingQueryConfiguration; import io.airbyte.integrations.base.IntegrationRunner; +import io.airbyte.integrations.base.SSHTunnel; import io.airbyte.integrations.base.Source; import io.airbyte.integrations.debezium.AirbyteDebeziumHandler; import io.airbyte.integrations.source.jdbc.AbstractJdbcSource; @@ -173,11 +174,9 @@ public AutoCloseableIterator read(JsonNode config, ConfiguredAir // this check is used to ensure that have the pgoutput slot available so Debezium won't attempt to // create it. final AirbyteConnectionStatus check = check(config); - if (check.getStatus().equals(AirbyteConnectionStatus.Status.FAILED)) { throw new RuntimeException("Unable establish a connection: " + check.getMessage()); } - return super.read(config, catalog, state); } @@ -260,11 +259,16 @@ private static AirbyteStream addCdcMetadataColumns(AirbyteStream stream) { return stream; } - public static void main(String[] args) throws Exception { - final Source source = new PostgresSource(); - LOGGER.info("starting source: {}", PostgresSource.class); - new IntegrationRunner(source).run(args); - LOGGER.info("completed source: {}", PostgresSource.class); + public static void main(String[] args) { + try { + final Source source = new PostgresSource(); + LOGGER.info("starting source: {}", PostgresSource.class); + new IntegrationRunner(source).run(args); + LOGGER.info("completed source: {}", PostgresSource.class); + } catch (Throwable t) { + LOGGER.error("Error in postgres source: ", t); + System.exit(1); + } } } diff --git a/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/PostgresSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/PostgresSourceAcceptanceTest.java index 5378debd52e6c..608fadcbfa5d2 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/PostgresSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/PostgresSourceAcceptanceTest.java @@ -25,6 +25,7 @@ package io.airbyte.integrations.io.airbyte.integration_tests.sources; import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import io.airbyte.commons.json.Jsons; @@ -41,6 +42,7 @@ import io.airbyte.protocol.models.Field; import io.airbyte.protocol.models.JsonSchemaPrimitive; import io.airbyte.protocol.models.SyncMode; +import java.io.IOException; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -105,7 +107,9 @@ protected String getImageName() { @Override protected ConnectorSpecification getSpec() throws Exception { - return Jsons.deserialize(MoreResources.readResource("spec.json"), ConnectorSpecification.class); + JsonNode spec = Jsons.deserialize(MoreResources.readResource("spec.json")); + ((ObjectNode) spec.get("connectionSpecification").get("properties")).set("tunnel_method", Jsons.deserialize(MoreResources.readResource("ssh-tunnel-spec.json"))); + return Jsons.object(spec, ConnectorSpecification.class); } @Override diff --git a/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/PostgresSourceSSHTunnelKeyAcceptanceTest.java b/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/PostgresSourceSSHTunnelKeyAcceptanceTest.java new file mode 100644 index 0000000000000..6073f4489ff00 --- /dev/null +++ b/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/PostgresSourceSSHTunnelKeyAcceptanceTest.java @@ -0,0 +1,27 @@ +package io.airbyte.integrations.io.airbyte.integration_tests.sources; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import io.airbyte.commons.io.IOs; +import io.airbyte.commons.json.Jsons; +import io.airbyte.integrations.standardtest.source.TestDestinationEnv; +import java.nio.file.Path; + +/** + * Tests an ssh tunnel wrapped around a postgres source. Uses cloud postgres instance to + * test this, not a local docker container database. + */ +public class PostgresSourceSSHTunnelKeyAcceptanceTest extends PostgresSourceAcceptanceTest { + + @Override + protected void setupEnvironment(TestDestinationEnv environment) throws Exception { + } + @Override + protected void tearDown(TestDestinationEnv testEnv) { + } + + @Override + protected JsonNode getConfig() { + return Jsons.deserialize(IOs.readFile(Path.of("secrets/postgres_source_ssh_tunnel_key_config.json"))); + } +} diff --git a/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/PostgresSourceSSHTunnelPasswordAcceptanceTest.java b/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/PostgresSourceSSHTunnelPasswordAcceptanceTest.java new file mode 100644 index 0000000000000..81c0493cb1d30 --- /dev/null +++ b/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/PostgresSourceSSHTunnelPasswordAcceptanceTest.java @@ -0,0 +1,23 @@ +package io.airbyte.integrations.io.airbyte.integration_tests.sources; + +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.commons.io.IOs; +import io.airbyte.commons.json.Jsons; +import io.airbyte.integrations.standardtest.source.TestDestinationEnv; +import java.nio.file.Path; + +public class PostgresSourceSSHTunnelPasswordAcceptanceTest extends PostgresSourceAcceptanceTest { + + @Override + protected void setupEnvironment(TestDestinationEnv environment) throws Exception { + } + @Override + protected void tearDown(TestDestinationEnv testEnv) { + } + + @Override + protected JsonNode getConfig() { + return Jsons.deserialize(IOs.readFile(Path.of("secrets/postgres_source_ssh_tunnel_password_config.json"))); + } + +} diff --git a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractRelationalDbSource.java b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractRelationalDbSource.java index 2078bc12db1cc..f6ef268b9827e 100644 --- a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractRelationalDbSource.java +++ b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractRelationalDbSource.java @@ -25,11 +25,13 @@ package io.airbyte.integrations.source.relationaldb; import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import io.airbyte.commons.functional.CheckedConsumer; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.lang.Exceptions; +import io.airbyte.commons.resources.MoreResources; import io.airbyte.commons.type.Types; import io.airbyte.commons.util.AutoCloseableIterator; import io.airbyte.commons.util.AutoCloseableIterators; @@ -53,6 +55,7 @@ import io.airbyte.protocol.models.Field; import io.airbyte.protocol.models.JsonSchemaPrimitive; import io.airbyte.protocol.models.SyncMode; +import java.io.IOException; import java.time.Instant; import java.util.ArrayList; import java.util.Collection; @@ -144,11 +147,11 @@ protected abstract Map> discoverPrimaryKeys(Database databa @Override public AirbyteConnectionStatus check(JsonNode config) { - try (final Database database = createDatabaseInternal(config)) { + try { + final Database database = createDatabaseInternal(config); for (CheckedConsumer checkOperation : getCheckOperations(config)) { checkOperation.accept(database); } - return new AirbyteConnectionStatus().withStatus(Status.SUCCEEDED); } catch (Exception e) { LOGGER.info("Exception while checking connection: ", e); @@ -203,6 +206,21 @@ public AutoCloseableIterator read(JsonNode config, ConfiguredAir }); } + /** + * Override the default spec.json to insert ssh-tunnel configuration as one of the properties of all + * relational database connection specs. + * + * @param spec + * @return + * @throws IOException + */ + @Override + public JsonNode addToSpec(JsonNode spec) throws IOException { + ObjectNode propNode = (ObjectNode) spec.get("connectionSpecification").get("properties"); + propNode.set("tunnel_method", Jsons.deserialize(MoreResources.readResource("ssh-tunnel-spec.json"))); + return spec; + } + public List> getIncrementalIterators(Database database, ConfiguredAirbyteCatalog catalog, Map>> tableNameToTable, diff --git a/airbyte-integrations/connectors/source-relational-db/src/main/resources/ssh-tunnel-spec.json b/airbyte-integrations/connectors/source-relational-db/src/main/resources/ssh-tunnel-spec.json new file mode 100644 index 0000000000000..0b8521f1a07a5 --- /dev/null +++ b/airbyte-integrations/connectors/source-relational-db/src/main/resources/ssh-tunnel-spec.json @@ -0,0 +1,167 @@ +{ + "type": "object", + "title": "SSH Tunnel Method", + "description": "Whether to initiate an SSH tunnel before connecting to the database, and if so, which kind of authentication to use.", + "oneOf": [ + { + "title": "No Tunnel", + "required": ["tunnel_method"], + "properties": { + "tunnel_method": { + "description": "No ssh tunnel needed to connect to database", + "type": "string", + "enum": ["NO_TUNNEL"], + "default": "NO_TUNNEL", + "order": 0 + } + } + }, + { + "title": "SSH Key Authentication", + "required": [ + "tunnel_method", + "tunnel_host", + "tunnel_ssh_port", + "tunnel_username", + "tunnel_localport", + "tunnel_usersshkey" + ], + "properties": { + "tunnel_method": { + "description": "Connect through a jump server tunnel host using username and ssh key", + "type": "string", + "enum": ["SSH_KEY_AUTH"], + "default": "SSH_KEY_AUTH", + "order": 0 + }, + "tunnel_host": { + "title": "SSH Tunnel Jump Server Host", + "description": "Hostname of the jump server host that allows inbound ssh tunnel.", + "type": "string", + "order": 1 + }, + "tunnel_ssh_port": { + "title": "SSH Connection Port", + "description": "Port on the proxy/jump server that accepts inbound ssh connections.", + "type": "integer", + "minimum": 0, + "maximum": 65536, + "default": 22, + "examples": ["22"], + "order": 2 + }, + "tunnel_username": { + "title": "SSH Login Username", + "description": "OS-level username for logging into the jump server host.", + "type": "string", + "order": 3 + }, + "tunnel_usersshkey": { + "title": "SSH Private Key", + "description": "OS-level user account ssh key credentials for logging into the jump server host.", + "type": "string", + "airbyte_secret": true, + "multiline": true, + "order": 4 + }, + "tunnel_db_remote_host": { + "title": "Remote Database Host", + "description": "Hostname or static IP address of the database to port-forward, as recognized from the jump server.", + "type": "string", + "order": 5 + }, + "tunnel_db_remote_port": { + "title": "Remote Database Port", + "description": "Port on the database to port-forward, typically that database's usual default port.", + "type": "integer", + "minimum": 0, + "maximum": 65536, + "examples": ["5432"], + "order": 6 + }, + "tunnel_localport": { + "title": "SSH Tunnel Database Port", + "description": "Port on the jump server host for the database's port to forward to. Do not share a port between database instances.", + "type": "integer", + "minimum": 0, + "maximum": 65536, + "examples": ["5000"], + "order": 7 + } + } + }, + { + "title": "Password Authentication", + "required": [ + "tunnel_method", + "tunnel_host", + "tunnel_ssh_port", + "tunnel_username", + "tunnel_localport", + "tunnel_userpass" + ], + "properties": { + "tunnel_method": { + "description": "Connect through a jump server tunnel host using username and password authentication", + "type": "string", + "enum": ["SSH_PASSWORD_AUTH"], + "default": "SSH_PASSWORD_AUTH", + "order": 0 + }, + "tunnel_host": { + "title": "SSH Tunnel Jump Server Host", + "description": "Hostname of the jump server host that allows inbound ssh tunnel.", + "type": "string", + "order": 1 + }, + "tunnel_ssh_port": { + "title": "SSH Connection Port", + "description": "Port on the proxy/jump server that accepts inbound ssh connections.", + "type": "integer", + "minimum": 0, + "maximum": 65536, + "default": 22, + "examples": ["22"], + "order": 2 + }, + "tunnel_username": { + "title": "SSH Login Username", + "description": "OS-level username for logging into the jump server host", + "type": "string", + "order": 3 + }, + "tunnel_userpass": { + "title": "Password", + "description": "OS-level password for logging into the jump server host", + "type": "string", + "airbyte_secret": true, + "order": 4 + }, + "tunnel_db_remote_host": { + "title": "Remote Database Host", + "description": "Hostname or static IP address of the database to port-forward, as recognized from the jump server.", + "type": "string", + "order": 5 + }, + "tunnel_db_remote_port": { + "title": "Remote Database Port", + "description": "Port on the database to port-forward, typically that database's usual default port.", + "type": "integer", + "minimum": 0, + "maximum": 65536, + "examples": ["5432"], + "order": 6 + }, + "tunnel_localport": { + "title": "SSH Tunnel Database Port", + "description": "Port on the jump server host for the database's port to forward to. Do not share a port between database instances.", + "type": "integer", + "minimum": 0, + "maximum": 65536, + "examples": ["5000"], + "order": 7 + } + } + } + ] +} diff --git a/airbyte-integrations/infrastructure/ssh_tunnel/module/sql/postgresql-01-dbcreate.sql b/airbyte-integrations/infrastructure/ssh_tunnel/module/sql/postgresql-01-dbcreate.sql index d82073636d74d..3bd06dc01c127 100644 --- a/airbyte-integrations/infrastructure/ssh_tunnel/module/sql/postgresql-01-dbcreate.sql +++ b/airbyte-integrations/infrastructure/ssh_tunnel/module/sql/postgresql-01-dbcreate.sql @@ -53,3 +53,13 @@ REVOKE ALL ON database postgres FROM public; + +# Test data used by the postgres source test classes +set schema 'public'; +CREATE TABLE id_and_name(id INTEGER, name VARCHAR(200)); +INSERT INTO id_and_name (id, name) VALUES (1,'picard'), (2, 'crusher'), (3, 'vash'); +CREATE TABLE starships(id INTEGER, name VARCHAR(200)); +INSERT INTO starships (id, name) VALUES (1,'enterprise-d'), (2, 'defiant'), (3, 'yamato'); + + + diff --git a/airbyte-integrations/infrastructure/ssh_tunnel/module/sql/postgresql-02-user-create.sql b/airbyte-integrations/infrastructure/ssh_tunnel/module/sql/postgresql-02-user-create.sql index cbc7759aca958..84db86142d659 100644 --- a/airbyte-integrations/infrastructure/ssh_tunnel/module/sql/postgresql-02-user-create.sql +++ b/airbyte-integrations/infrastructure/ssh_tunnel/module/sql/postgresql-02-user-create.sql @@ -2,4 +2,14 @@ CREATE USER testcaseuser WITH password 'ThisIsNotTheRealPassword.PleaseSetThisByHand'; +GRANT CONNECT ON +DATABASE test TO testcaseuser; + +GRANT USAGE ON +SCHEMA public TO testcaseuser; + GRANT integrationtest_rw TO testcaseuser; + +-- picking a default schema means it can log in without a schema named to connect to, helping test case setup. +ALTER ROLE testcaseuser SET search_path TO public; +