diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index f8f8bd47cfbc..78bcaa2017bd 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -639,7 +639,7 @@ - name: MySQL sourceDefinitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad dockerRepository: airbyte/source-mysql - dockerImageTag: 0.6.2 + dockerImageTag: 0.6.3 documentationUrl: https://docs.airbyte.io/integrations/sources/mysql icon: mysql.svg sourceType: database diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index 6bce86b8145b..ac7d180aed53 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -5942,7 +5942,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-mysql:0.6.2" +- dockerImage: "airbyte/source-mysql:0.6.3" spec: documentationUrl: "https://docs.airbyte.io/integrations/sources/mysql" connectionSpecification: @@ -6000,6 +6000,135 @@ type: "boolean" default: true order: 6 + ssl_mode: + title: "SSL modes" + description: "SSL connection modes.
  • preferred - Automatically\ + \ attempt SSL connection. If the MySQL server does not support SSL, continue\ + \ with a regular connection.
  • required - Always connect\ + \ with SSL. If the MySQL server doesn’t support SSL, the connection will\ + \ not be established. Certificate Authority (CA) and Hostname are not\ + \ verified.
  • verify-ca - Always connect with SSL. Verifies\ + \ CA, but allows connection even if Hostname does not match.
  • Verify\ + \ Identity - Always connect with SSL. Verify both CA and Hostname.
  • Read\ + \ more in the docs." + type: "object" + order: 7 + oneOf: + - title: "preferred" + description: "Preferred SSL mode." + required: + - "mode" + properties: + mode: + type: "string" + const: "preferred" + enum: + - "preferred" + default: "preferred" + order: 0 + - title: "required" + description: "Require SSL mode." + required: + - "mode" + properties: + mode: + type: "string" + const: "required" + enum: + - "required" + default: "required" + order: 0 + - title: "Verify CA" + description: "Verify CA SSL mode." + required: + - "mode" + - "ca_certificate" + properties: + mode: + type: "string" + const: "verify_ca" + enum: + - "verify_ca" + default: "verify_ca" + order: 0 + ca_certificate: + type: "string" + title: "CA certificate" + description: "CA certificate" + airbyte_secret: true + multiline: true + order: 1 + client_certificate: + type: "string" + title: "Client certificate" + description: "Client certificate (this is not a required field, but\ + \ if you want to use it, you will need to add the Client key\ + \ as well)" + airbyte_secret: true + multiline: true + order: 2 + client_key: + type: "string" + title: "Client key" + description: "Client key (this is not a required field, but if you\ + \ want to use it, you will need to add the Client certificate\ + \ as well)" + airbyte_secret: true + multiline: true + order: 3 + client_key_password: + type: "string" + title: "Client key password (Optional)" + description: "Password for keystorage. This field is optional. If\ + \ you do not add it - the password will be generated automatically." + airbyte_secret: true + order: 4 + - title: "Verify Identity" + description: "Verify-full SSL mode." + required: + - "mode" + - "ca_certificate" + properties: + mode: + type: "string" + const: "verify_identity" + enum: + - "verify_identity" + default: "verify_identity" + order: 0 + ca_certificate: + type: "string" + title: "CA certificate" + description: "CA certificate" + airbyte_secret: true + multiline: true + order: 1 + client_certificate: + type: "string" + title: "Client certificate" + description: "Client certificate (this is not a required field, but\ + \ if you want to use it, you will need to add the Client key\ + \ as well)" + airbyte_secret: true + multiline: true + order: 2 + client_key: + type: "string" + title: "Client key" + description: "Client key (this is not a required field, but if you\ + \ want to use it, you will need to add the Client certificate\ + \ as well)" + airbyte_secret: true + multiline: true + order: 3 + client_key_password: + type: "string" + title: "Client key password (Optional)" + description: "Password for keystorage. This field is optional. If\ + \ you do not add it - the password will be generated automatically." + airbyte_secret: true + order: 4 replication_method: type: "string" title: "Replication Method" @@ -6008,7 +6137,7 @@ \ but will not be able to represent deletions incrementally. CDC uses\ \ the Binlog to detect inserts, updates, and deletes. This needs to be\ \ configured on the source database itself." - order: 7 + order: 8 default: "STANDARD" enum: - "STANDARD" diff --git a/airbyte-db/db-lib/build.gradle b/airbyte-db/db-lib/build.gradle index 80b1fc5bca94..26828b8c8932 100644 --- a/airbyte-db/db-lib/build.gradle +++ b/airbyte-db/db-lib/build.gradle @@ -15,6 +15,7 @@ dependencies { // Mark as compile only to avoid leaking transitively to connectors compileOnly libs.platform.testcontainers.postgresql + compileOnly libs.connectors.testcontainers.mysql // These are required because gradle might be using lower version of Jna from other // library transitive dependency. Can be removed if we can figure out which library is the cause. @@ -25,6 +26,7 @@ dependencies { testImplementation project(':airbyte-test-utils') testImplementation 'org.apache.commons:commons-lang3:3.11' testImplementation libs.platform.testcontainers.postgresql + testImplementation libs.connectors.testcontainers.mysql // Big Query implementation('com.google.cloud:google-cloud-bigquery:1.133.1') diff --git a/airbyte-db/db-lib/src/main/java/io/airbyte/db/MySqlUtils.java b/airbyte-db/db-lib/src/main/java/io/airbyte/db/MySqlUtils.java new file mode 100644 index 000000000000..5f637c2f72fa --- /dev/null +++ b/airbyte-db/db-lib/src/main/java/io/airbyte/db/MySqlUtils.java @@ -0,0 +1,74 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.db; + +import com.google.common.annotations.VisibleForTesting; +import java.io.IOException; +import org.testcontainers.containers.MySQLContainer; + +public class MySqlUtils { + + @VisibleForTesting + public static Certificate getCertificate(final MySQLContainer container, + final boolean useAllCertificates) + throws IOException, InterruptedException { + // add root and server certificates to config file + container.execInContainer("sh", "-c", "sed -i '31 a ssl' /etc/my.cnf"); + container.execInContainer("sh", "-c", "sed -i '32 a ssl-ca=/var/lib/mysql/ca.pem' /etc/my.cnf"); + container.execInContainer("sh", "-c", "sed -i '33 a ssl-cert=/var/lib/mysql/server-cert.pem' /etc/my.cnf"); + container.execInContainer("sh", "-c", "sed -i '34 a ssl-key=/var/lib/mysql/server-key.pem' /etc/my.cnf"); + container.execInContainer("sh", "-c", "sed -i '35 a require_secure_transport=ON' /etc/my.cnf"); + // add client certificates to config file + if (useAllCertificates) { + container.execInContainer("sh", "-c", "sed -i '39 a [client]' /etc/mysql/my.cnf"); + container.execInContainer("sh", "-c", "sed -i '40 a ssl-ca=/var/lib/mysql/ca.pem' /etc/my.cnf"); + container.execInContainer("sh", "-c", "sed -i '41 a ssl-cert=/var/lib/mysql/client-cert.pem' /etc/my.cnf"); + container.execInContainer("sh", "-c", "sed -i '42 a ssl-key=/var/lib/mysql/client-key.pem' /etc/my.cnf"); + } + // copy root certificate and client certificates + var caCert = container.execInContainer("sh", "-c", "cat /var/lib/mysql/ca.pem").getStdout().trim(); + + if (useAllCertificates) { + var clientKey = container.execInContainer("sh", "-c", "cat /var/lib/mysql/client-key.pem").getStdout().trim(); + var clientCert = container.execInContainer("sh", "-c", "cat /var/lib/mysql/client-cert.pem").getStdout().trim(); + return new Certificate(caCert, clientCert, clientKey); + } else { + return new Certificate(caCert); + } + } + + public static class Certificate { + + private final String caCertificate; + private final String clientCertificate; + private final String clientKey; + + public Certificate(final String caCertificate) { + this.caCertificate = caCertificate; + this.clientCertificate = null; + this.clientKey = null; + } + + public Certificate(final String caCertificate, final String clientCertificate, final String clientKey) { + this.caCertificate = caCertificate; + this.clientCertificate = clientCertificate; + this.clientKey = clientKey; + } + + public String getCaCertificate() { + return caCertificate; + } + + public String getClientCertificate() { + return clientCertificate; + } + + public String getClientKey() { + return clientKey; + } + + } + +} diff --git a/airbyte-db/db-lib/src/main/java/io/airbyte/db/jdbc/JdbcUtils.java b/airbyte-db/db-lib/src/main/java/io/airbyte/db/jdbc/JdbcUtils.java index 3ed570a194e8..7c8f57e47980 100644 --- a/airbyte-db/db-lib/src/main/java/io/airbyte/db/jdbc/JdbcUtils.java +++ b/airbyte-db/db-lib/src/main/java/io/airbyte/db/jdbc/JdbcUtils.java @@ -32,6 +32,8 @@ public class JdbcUtils { public static final String SSL_MODE_KEY = "ssl_mode"; public static final String TLS_KEY = "tls"; public static final String USERNAME_KEY = "username"; + public static final String MODE_KEY = "mode"; + public static final String AMPERSAND = "&"; private static final JdbcSourceOperations defaultSourceOperations = new JdbcSourceOperations(); private static final JSONFormat defaultJSONFormat = new JSONFormat().recordFormat(JSONFormat.RecordFormat.OBJECT); diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/util/MySqlSslConnectionUtils.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/util/MySqlSslConnectionUtils.java new file mode 100644 index 000000000000..f66da5436dc4 --- /dev/null +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/util/MySqlSslConnectionUtils.java @@ -0,0 +1,205 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.util; + +import com.fasterxml.jackson.databind.JsonNode; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.io.PrintWriter; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.RandomStringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MySqlSslConnectionUtils { + + private static final Logger LOGGER = LoggerFactory.getLogger(MySqlSslConnectionUtils.class); + + public static final String PARAM_MODE = "mode"; + public static final String PARAM_CLIENT_KEY_PASSWORD = "client_key_password"; + public static final String PARAM_CA_CERTIFICATE = "ca_certificate"; + public static final String PARAM_CLIENT_CERTIFICATE = "client_certificate"; + public static final String PARAM_CLIENT_KEY = "client_key"; + public static final String TRUST_KEY_STORE_URL = "trustCertificateKeyStoreUrl"; + public static final String TRUST_KEY_STORE_PASS = "trustCertificateKeyStorePassword"; + public static final String CLIENT_KEY_STORE_URL = "clientCertificateKeyStoreUrl"; + public static final String CLIENT_KEY_STORE_PASS = "clientCertificateKeyStorePassword"; + public static final String CUSTOM_TRUST_STORE = "customtruststore.jks"; + public static final String CUSTOM_KEY_STORE = "customkeystore.jks"; + public static final String SSL_MODE = "sslMode"; + public static final String VERIFY_CA = "VERIFY_CA"; + public static final String VERIFY_IDENTITY = "VERIFY_IDENTITY"; + public static final String ROOT_CERTIFICARE_NAME = "ca-cert.pem"; + public static final String ROOT_CERTIFICARE_DER_NAME = "ca-cert.der"; + public static final String CLIENT_CERTIFICARE_NAME = "client-cert.pem"; + public static final String CLIENT_KEY_NAME = "client-key.pem"; + public static final String CLIENT_CERT_P12 = "certificate.p12"; + public static final String ENCRYPT_FILE_NAME = "encrypt"; + + public static Map obtainConnection(final JsonNode encryption) { + Map additionalParameters = new HashMap<>(); + if (!encryption.isNull()) { + final var method = encryption.get(PARAM_MODE).asText().toUpperCase(); + var keyStorePassword = checkOrCreatePassword(encryption); + if (method.equals(VERIFY_CA) || method.equals(VERIFY_IDENTITY)) { + additionalParameters.putAll(checkCertificatesAndObtainConnection(encryption, method, keyStorePassword)); + } + } + return additionalParameters; + } + + public static String checkOrCreatePassword(final JsonNode encryption) { + String sslPassword = encryption.has(PARAM_CLIENT_KEY_PASSWORD) ? encryption.get(PARAM_CLIENT_KEY_PASSWORD).asText() : ""; + var keyStorePassword = RandomStringUtils.randomAlphanumeric(10); + if (sslPassword.isEmpty()) { + var file = new File(ENCRYPT_FILE_NAME); + if (file.exists()) { + keyStorePassword = readFile(file); + } else { + try { + createCertificateFile(ENCRYPT_FILE_NAME, keyStorePassword); + } catch (final IOException e) { + throw new RuntimeException("Failed to create encryption file "); + } + } + + } else { + keyStorePassword = sslPassword; + } + return keyStorePassword; + } + + private static String readFile(final File file) { + try { + BufferedReader reader = new BufferedReader(new FileReader(file)); + String currentLine = reader.readLine(); + reader.close(); + return currentLine; + } catch (final IOException e) { + throw new RuntimeException("Failed to read file with encryption"); + } + } + + private static Map checkCertificatesAndObtainConnection(final JsonNode encryption, + final String mode, + final String clientKeyPassword) { + var clientCert = encryption.has(PARAM_CLIENT_CERTIFICATE) && + !encryption.get(PARAM_CLIENT_CERTIFICATE).asText().isEmpty() ? encryption.get(PARAM_CLIENT_CERTIFICATE).asText() : null; + var clientKey = encryption.has(PARAM_CLIENT_KEY) && + !encryption.get(PARAM_CLIENT_KEY).asText().isEmpty() ? encryption.get(PARAM_CLIENT_KEY).asText() : null; + if (Objects.nonNull(clientCert) && Objects.nonNull(clientKey)) { + return obtainConnectionWithFullCertificatesOptions(encryption, mode, clientKeyPassword); + } else if (Objects.isNull(clientCert) && Objects.isNull(clientKey)) { + return obtainConnectionWithCaCertificateOptions(encryption, mode, clientKeyPassword); + } else { + throw new RuntimeException("Both fields \"Client certificate\" and \"Client key\" must be added to connect with client certificates."); + } + } + + private static Map obtainConnectionWithFullCertificatesOptions(final JsonNode encryption, + final String mode, + final String clientKeyPassword) { + Map additionalParameters = new HashMap<>(); + try { + convertAndImportFullCertificate(encryption.get(PARAM_CA_CERTIFICATE).asText(), + encryption.get(PARAM_CLIENT_CERTIFICATE).asText(), + encryption.get(PARAM_CLIENT_KEY).asText(), clientKeyPassword); + } catch (final IOException | InterruptedException e) { + throw new RuntimeException("Failed to import certificate into Java Keystore"); + } + additionalParameters.put(TRUST_KEY_STORE_URL, "file:" + CUSTOM_TRUST_STORE); + additionalParameters.put(TRUST_KEY_STORE_PASS, clientKeyPassword); + additionalParameters.put(CLIENT_KEY_STORE_URL, "file:" + CUSTOM_KEY_STORE); + additionalParameters.put(CLIENT_KEY_STORE_PASS, clientKeyPassword); + additionalParameters.put(SSL_MODE, mode); + + updateTrustStoreSystemProperty(clientKeyPassword); + System.setProperty("javax.net.ssl.keyStore", CUSTOM_KEY_STORE); + System.setProperty("javax.net.ssl.keyStorePassword", clientKeyPassword); + + return additionalParameters; + } + + private static Map obtainConnectionWithCaCertificateOptions(final JsonNode encryption, + final String mode, + final String clientKeyPassword) { + Map additionalParameters = new HashMap<>(); + try { + convertAndImportCaCertificate(encryption.get(PARAM_CA_CERTIFICATE).asText(), clientKeyPassword); + } catch (final IOException | InterruptedException e) { + throw new RuntimeException("Failed to import certificate into Java Keystore"); + } + additionalParameters.put(TRUST_KEY_STORE_URL, "file:" + CUSTOM_TRUST_STORE); + additionalParameters.put(TRUST_KEY_STORE_PASS, clientKeyPassword); + additionalParameters.put(SSL_MODE, mode); + + updateTrustStoreSystemProperty(clientKeyPassword); + + return additionalParameters; + } + + private static void convertAndImportFullCertificate(final String caCertificate, + final String clientCertificate, + final String clientKey, + final String clientKeyPassword) + throws IOException, InterruptedException { + final Runtime run = Runtime.getRuntime(); + convertAndImportCaCertificate(caCertificate, clientKeyPassword); + createCertificateFile(CLIENT_CERTIFICARE_NAME, clientCertificate); + createCertificateFile(CLIENT_KEY_NAME, clientKey); + // add client certificate to the custom keystore + runProcess("openssl pkcs12 -export -in " + CLIENT_CERTIFICARE_NAME + " -inkey " + CLIENT_KEY_NAME + + " -out " + CLIENT_CERT_P12 + " -name \"certificate\" -passout pass:" + clientKeyPassword, run); + // add client key to the custom keystore + runProcess("keytool -importkeystore -srckeystore " + CLIENT_CERT_P12 + + " -srcstoretype pkcs12 -destkeystore " + CUSTOM_KEY_STORE + " -srcstorepass " + clientKeyPassword + + " -deststoretype JKS -deststorepass " + clientKeyPassword + " -noprompt", run); + } + + private static void convertAndImportCaCertificate(final String caCertificate, + final String clientKeyPassword) + throws IOException, InterruptedException { + final Runtime run = Runtime.getRuntime(); + createCaCertificate(caCertificate, clientKeyPassword, run); + } + + private static void createCaCertificate(final String caCertificate, + final String clientKeyPassword, + final Runtime run) + throws IOException, InterruptedException { + createCertificateFile(ROOT_CERTIFICARE_NAME, caCertificate); + // add CA certificate to the custom keystore + runProcess("openssl x509 -outform der -in " + ROOT_CERTIFICARE_NAME + " -out " + ROOT_CERTIFICARE_DER_NAME, run); + runProcess("keytool -importcert -alias root-certificate -keystore " + CUSTOM_TRUST_STORE + + " -file " + ROOT_CERTIFICARE_DER_NAME + " -storepass " + clientKeyPassword + " -noprompt", run); + } + + private static void updateTrustStoreSystemProperty(final String clientKeyPassword) { + System.setProperty("javax.net.ssl.trustStore", CUSTOM_TRUST_STORE); + System.setProperty("javax.net.ssl.trustStorePassword", clientKeyPassword); + } + + private static void createCertificateFile(String fileName, String fileValue) throws IOException { + try (final PrintWriter out = new PrintWriter(fileName, StandardCharsets.UTF_8)) { + out.print(fileValue); + } + } + + private static void runProcess(final String cmd, final Runtime run) throws IOException, InterruptedException { + final Process pr = run.exec(cmd); + if (!pr.waitFor(30, TimeUnit.SECONDS)) { + pr.destroy(); + throw new RuntimeException("Timeout while executing: " + cmd); + } + } + +} diff --git a/airbyte-integrations/connectors/source-mysql-strict-encrypt/Dockerfile b/airbyte-integrations/connectors/source-mysql-strict-encrypt/Dockerfile index 370187835e0b..e1b0f49ecf89 100644 --- a/airbyte-integrations/connectors/source-mysql-strict-encrypt/Dockerfile +++ b/airbyte-integrations/connectors/source-mysql-strict-encrypt/Dockerfile @@ -16,5 +16,5 @@ ENV APPLICATION source-mysql-strict-encrypt COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=0.6.2 +LABEL io.airbyte.version=0.6.3 LABEL io.airbyte.name=airbyte/source-mysql-strict-encrypt diff --git a/airbyte-integrations/connectors/source-mysql-strict-encrypt/src/main/java/io/airbyte/integrations/source/mysql_strict_encrypt/MySqlStrictEncryptSource.java b/airbyte-integrations/connectors/source-mysql-strict-encrypt/src/main/java/io/airbyte/integrations/source/mysql_strict_encrypt/MySqlStrictEncryptSource.java index 5c5320f33b0d..6bac4cccf876 100644 --- a/airbyte-integrations/connectors/source-mysql-strict-encrypt/src/main/java/io/airbyte/integrations/source/mysql_strict_encrypt/MySqlStrictEncryptSource.java +++ b/airbyte-integrations/connectors/source-mysql-strict-encrypt/src/main/java/io/airbyte/integrations/source/mysql_strict_encrypt/MySqlStrictEncryptSource.java @@ -4,6 +4,7 @@ package io.airbyte.integrations.source.mysql_strict_encrypt; +import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; import io.airbyte.commons.json.Jsons; import io.airbyte.db.jdbc.JdbcUtils; @@ -23,6 +24,10 @@ public class MySqlStrictEncryptSource extends SpecModifyingSource implements Source { private static final Logger LOGGER = LoggerFactory.getLogger(MySqlStrictEncryptSource.class); + private static final String SSL_MODE_DESCRIPTION = "SSL connection modes. " + + "
  • required - Always connect with SSL. If the MySQL server doesn’t support SSL, the connection will not be established. Certificate Authority (CA) and Hostname are not verified.
  • " + + "
  • verify-ca - Always connect with SSL. Verifies CA, but allows connection even if Hostname does not match.
  • " + + "
  • Verify Identity - Always connect with SSL. Verify both CA and Hostname.
  • Read more in the docs."; MySqlStrictEncryptSource() { super(MySqlSource.sshWrappedSource()); @@ -34,6 +39,18 @@ public ConnectorSpecification modifySpec(final ConnectorSpecification originalSp // SSL property should be enabled by default for secure versions of connectors // that can be used in the Airbyte cloud. User should not be able to change this property. ((ObjectNode) spec.getConnectionSpecification().get("properties")).remove(JdbcUtils.SSL_KEY); + final ArrayNode modifiedSslModes = spec.getConnectionSpecification().get("properties").get("ssl_mode").get("oneOf").deepCopy(); + // update description for ssl_mode property + ((ObjectNode) spec.getConnectionSpecification().get("properties").get("ssl_mode")).remove("description"); + ((ObjectNode) spec.getConnectionSpecification().get("properties").get("ssl_mode")).remove("type"); + ((ObjectNode) spec.getConnectionSpecification().get("properties").get("ssl_mode")).remove("order"); + ((ObjectNode) spec.getConnectionSpecification().get("properties").get("ssl_mode")).put("description", SSL_MODE_DESCRIPTION); + ((ObjectNode) spec.getConnectionSpecification().get("properties").get("ssl_mode")).put("type", "object"); + ((ObjectNode) spec.getConnectionSpecification().get("properties").get("ssl_mode")).put("order", 7); + // Assume that the first items is the "preferred" option; remove this option + modifiedSslModes.remove(0); + ((ObjectNode) spec.getConnectionSpecification().get("properties").get("ssl_mode")).remove("oneOf"); + ((ObjectNode) spec.getConnectionSpecification().get("properties").get("ssl_mode")).put("oneOf", modifiedSslModes); return spec; } diff --git a/airbyte-integrations/connectors/source-mysql-strict-encrypt/src/test-integration/java/io/airbyte/integrations/source/mysql_strict_encrypt/AbstractMySqlSslCertificateStrictEncryptSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-mysql-strict-encrypt/src/test-integration/java/io/airbyte/integrations/source/mysql_strict_encrypt/AbstractMySqlSslCertificateStrictEncryptSourceAcceptanceTest.java new file mode 100644 index 000000000000..ecfa9e92953a --- /dev/null +++ b/airbyte-integrations/connectors/source-mysql-strict-encrypt/src/test-integration/java/io/airbyte/integrations/source/mysql_strict_encrypt/AbstractMySqlSslCertificateStrictEncryptSourceAcceptanceTest.java @@ -0,0 +1,73 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.source.mysql_strict_encrypt; + +import com.google.common.collect.ImmutableMap; +import io.airbyte.commons.json.Jsons; +import io.airbyte.db.Database; +import io.airbyte.db.MySqlUtils; +import io.airbyte.db.factory.DSLContextFactory; +import io.airbyte.db.factory.DatabaseDriver; +import io.airbyte.db.jdbc.JdbcUtils; +import io.airbyte.integrations.source.mysql.MySqlSource.ReplicationMethod; +import io.airbyte.integrations.standardtest.source.TestDestinationEnv; +import org.jooq.DSLContext; +import org.jooq.SQLDialect; +import org.testcontainers.containers.MySQLContainer; + +public abstract class AbstractMySqlSslCertificateStrictEncryptSourceAcceptanceTest extends MySqlStrictEncryptSourceAcceptanceTest { + + protected static MySqlUtils.Certificate certs; + protected static final String PASSWORD = "Passw0rd"; + + @Override + protected void setupEnvironment(final TestDestinationEnv environment) throws Exception { + + container = new MySQLContainer<>("mysql:8.0"); + container.start(); + addTestData(container); + certs = MySqlUtils.getCertificate(container, true); + + var sslMode = getSslConfig(); + + config = Jsons.jsonNode(ImmutableMap.builder() + .put(JdbcUtils.HOST_KEY, container.getHost()) + .put(JdbcUtils.PORT_KEY, container.getFirstMappedPort()) + .put(JdbcUtils.DATABASE_KEY, container.getDatabaseName()) + .put(JdbcUtils.USERNAME_KEY, container.getUsername()) + .put(JdbcUtils.PASSWORD_KEY, container.getPassword()) + .put(JdbcUtils.SSL_KEY, true) + .put(JdbcUtils.SSL_MODE_KEY, sslMode) + .put("replication_method", ReplicationMethod.STANDARD) + .build()); + } + + public abstract ImmutableMap getSslConfig(); + + private void addTestData(MySQLContainer container) throws Exception { + try (final DSLContext dslContext = DSLContextFactory.create( + container.getUsername(), + container.getPassword(), + DatabaseDriver.MYSQL.getDriverClassName(), + String.format("jdbc:mysql://%s:%s/%s", + container.getHost(), + container.getFirstMappedPort(), + container.getDatabaseName()), + SQLDialect.MYSQL)) { + final Database database = new Database(dslContext); + + database.query(ctx -> { + ctx.fetch("CREATE TABLE id_and_name(id INTEGER, name VARCHAR(200));"); + ctx.fetch( + "INSERT INTO id_and_name (id, name) VALUES (1,'picard'), (2, 'crusher'), (3, 'vash');"); + ctx.fetch("CREATE TABLE starships(id INTEGER, name VARCHAR(200));"); + ctx.fetch( + "INSERT INTO starships (id, name) VALUES (1,'enterprise-d'), (2, 'defiant'), (3, 'yamato');"); + return null; + }); + } + } + +} diff --git a/airbyte-integrations/connectors/source-mysql-strict-encrypt/src/test-integration/java/io/airbyte/integrations/source/mysql_strict_encrypt/MySqlSslCaCertificateStrictEncryptSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-mysql-strict-encrypt/src/test-integration/java/io/airbyte/integrations/source/mysql_strict_encrypt/MySqlSslCaCertificateStrictEncryptSourceAcceptanceTest.java new file mode 100644 index 000000000000..22925d4a06bc --- /dev/null +++ b/airbyte-integrations/connectors/source-mysql-strict-encrypt/src/test-integration/java/io/airbyte/integrations/source/mysql_strict_encrypt/MySqlSslCaCertificateStrictEncryptSourceAcceptanceTest.java @@ -0,0 +1,21 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.source.mysql_strict_encrypt; + +import com.google.common.collect.ImmutableMap; +import io.airbyte.db.jdbc.JdbcUtils; + +public class MySqlSslCaCertificateStrictEncryptSourceAcceptanceTest extends AbstractMySqlSslCertificateStrictEncryptSourceAcceptanceTest { + + @Override + public ImmutableMap getSslConfig() { + return ImmutableMap.builder() + .put(JdbcUtils.MODE_KEY, "verify_ca") + .put("ca_certificate", certs.getCaCertificate()) + .put("client_key_password", PASSWORD) + .build(); + } + +} diff --git a/airbyte-integrations/connectors/source-mysql-strict-encrypt/src/test-integration/java/io/airbyte/integrations/source/mysql_strict_encrypt/MySqlSslFullCertificateStrictEncryptSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-mysql-strict-encrypt/src/test-integration/java/io/airbyte/integrations/source/mysql_strict_encrypt/MySqlSslFullCertificateStrictEncryptSourceAcceptanceTest.java new file mode 100644 index 000000000000..f86ff0d88c69 --- /dev/null +++ b/airbyte-integrations/connectors/source-mysql-strict-encrypt/src/test-integration/java/io/airbyte/integrations/source/mysql_strict_encrypt/MySqlSslFullCertificateStrictEncryptSourceAcceptanceTest.java @@ -0,0 +1,23 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.source.mysql_strict_encrypt; + +import com.google.common.collect.ImmutableMap; +import io.airbyte.db.jdbc.JdbcUtils; + +public class MySqlSslFullCertificateStrictEncryptSourceAcceptanceTest extends AbstractMySqlSslCertificateStrictEncryptSourceAcceptanceTest { + + @Override + public ImmutableMap getSslConfig() { + return ImmutableMap.builder() + .put(JdbcUtils.MODE_KEY, "verify_identity") + .put("ca_certificate", certs.getCaCertificate()) + .put("client_certificate", certs.getClientCertificate()) + .put("client_key", certs.getClientKey()) + .put("client_key_password", PASSWORD) + .build(); + } + +} diff --git a/airbyte-integrations/connectors/source-mysql-strict-encrypt/src/test-integration/java/io/airbyte/integrations/source/mysql_strict_encrypt/MySqlStrictEncryptSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-mysql-strict-encrypt/src/test-integration/java/io/airbyte/integrations/source/mysql_strict_encrypt/MySqlStrictEncryptSourceAcceptanceTest.java index ef8e449fd6af..43493837f006 100644 --- a/airbyte-integrations/connectors/source-mysql-strict-encrypt/src/test-integration/java/io/airbyte/integrations/source/mysql_strict_encrypt/MySqlStrictEncryptSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-mysql-strict-encrypt/src/test-integration/java/io/airbyte/integrations/source/mysql_strict_encrypt/MySqlStrictEncryptSourceAcceptanceTest.java @@ -45,12 +45,17 @@ protected void setupEnvironment(final TestDestinationEnv environment) throws Exc container = new MySQLContainer<>("mysql:8.0"); container.start(); + var sslMode = ImmutableMap.builder() + .put(JdbcUtils.MODE_KEY, "required") + .build(); + config = Jsons.jsonNode(ImmutableMap.builder() .put(JdbcUtils.HOST_KEY, container.getHost()) .put(JdbcUtils.PORT_KEY, container.getFirstMappedPort()) .put(JdbcUtils.DATABASE_KEY, container.getDatabaseName()) .put(JdbcUtils.USERNAME_KEY, container.getUsername()) .put(JdbcUtils.PASSWORD_KEY, container.getPassword()) + .put(JdbcUtils.SSL_MODE_KEY, sslMode) .put("replication_method", MySqlSource.ReplicationMethod.STANDARD) .build()); diff --git a/airbyte-integrations/connectors/source-mysql-strict-encrypt/src/test/java/io/airbyte/integrations/source/mysql_strict_encrypt/MySqlStrictEncryptJdbcSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-mysql-strict-encrypt/src/test/java/io/airbyte/integrations/source/mysql_strict_encrypt/MySqlStrictEncryptJdbcSourceAcceptanceTest.java index 96b656995ea9..fe59d601fd92 100644 --- a/airbyte-integrations/connectors/source-mysql-strict-encrypt/src/test/java/io/airbyte/integrations/source/mysql_strict_encrypt/MySqlStrictEncryptJdbcSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-mysql-strict-encrypt/src/test/java/io/airbyte/integrations/source/mysql_strict_encrypt/MySqlStrictEncryptJdbcSourceAcceptanceTest.java @@ -20,10 +20,23 @@ import io.airbyte.integrations.base.ssh.SshHelpers; import io.airbyte.integrations.source.jdbc.test.JdbcSourceAcceptanceTest; import io.airbyte.integrations.source.mysql.MySqlSource; +import io.airbyte.integrations.source.relationaldb.models.DbStreamState; +import io.airbyte.protocol.models.AirbyteCatalog; +import io.airbyte.protocol.models.AirbyteMessage; +import io.airbyte.protocol.models.AirbyteRecordMessage; +import io.airbyte.protocol.models.CatalogHelpers; import io.airbyte.protocol.models.ConnectorSpecification; import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import io.airbyte.protocol.models.Field; +import io.airbyte.protocol.models.JsonSchemaType; +import io.airbyte.protocol.models.SyncMode; import org.jooq.DSLContext; import org.jooq.SQLDialect; import org.junit.jupiter.api.AfterAll; @@ -128,4 +141,91 @@ void testSpec() throws Exception { assertEquals(expected, actual); } + @Override + protected AirbyteCatalog getCatalog(final String defaultNamespace) { + return new AirbyteCatalog().withStreams(List.of( + CatalogHelpers.createAirbyteStream( + TABLE_NAME, + defaultNamespace, + Field.of(COL_ID, JsonSchemaType.INTEGER), + Field.of(COL_NAME, JsonSchemaType.STRING), + Field.of(COL_UPDATED_AT, JsonSchemaType.STRING_DATE)) + .withSupportedSyncModes(List.of(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)) + .withSourceDefinedPrimaryKey(List.of(List.of(COL_ID))), + CatalogHelpers.createAirbyteStream( + TABLE_NAME_WITHOUT_PK, + defaultNamespace, + Field.of(COL_ID, JsonSchemaType.INTEGER), + Field.of(COL_NAME, JsonSchemaType.STRING), + Field.of(COL_UPDATED_AT, JsonSchemaType.STRING_DATE)) + .withSupportedSyncModes(List.of(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)) + .withSourceDefinedPrimaryKey(Collections.emptyList()), + CatalogHelpers.createAirbyteStream( + TABLE_NAME_COMPOSITE_PK, + defaultNamespace, + Field.of(COL_FIRST_NAME, JsonSchemaType.STRING), + Field.of(COL_LAST_NAME, JsonSchemaType.STRING), + Field.of(COL_UPDATED_AT, JsonSchemaType.STRING_DATE)) + .withSupportedSyncModes(List.of(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)) + .withSourceDefinedPrimaryKey( + List.of(List.of(COL_FIRST_NAME), List.of(COL_LAST_NAME))))); + } + + @Override + protected void incrementalDateCheck() throws Exception { + incrementalCursorCheck( + COL_UPDATED_AT, + "2005-10-18", + "2006-10-19", + List.of(getTestMessages().get(1), getTestMessages().get(2))); + } + + @Override + protected List getTestMessages() { + return List.of( + new AirbyteMessage().withType(AirbyteMessage.Type.RECORD) + .withRecord(new AirbyteRecordMessage().withStream(streamName).withNamespace(getDefaultNamespace()) + .withData(Jsons.jsonNode(Map + .of(COL_ID, ID_VALUE_1, + COL_NAME, "picard", + COL_UPDATED_AT, "2004-10-19")))), + new AirbyteMessage().withType(AirbyteMessage.Type.RECORD) + .withRecord(new AirbyteRecordMessage().withStream(streamName).withNamespace(getDefaultNamespace()) + .withData(Jsons.jsonNode(Map + .of(COL_ID, ID_VALUE_2, + COL_NAME, "crusher", + COL_UPDATED_AT, + "2005-10-19")))), + new AirbyteMessage().withType(AirbyteMessage.Type.RECORD) + .withRecord(new AirbyteRecordMessage().withStream(streamName).withNamespace(getDefaultNamespace()) + .withData(Jsons.jsonNode(Map + .of(COL_ID, ID_VALUE_3, + COL_NAME, "vash", + COL_UPDATED_AT, "2006-10-19"))))); + } + + @Override + protected List getExpectedAirbyteMessagesSecondSync(String namespace) { + final List expectedMessages = new ArrayList<>(); + expectedMessages.add(new AirbyteMessage().withType(AirbyteMessage.Type.RECORD) + .withRecord(new AirbyteRecordMessage().withStream(streamName).withNamespace(namespace) + .withData(Jsons.jsonNode(Map + .of(COL_ID, ID_VALUE_4, + COL_NAME, "riker", + COL_UPDATED_AT, "2006-10-19"))))); + expectedMessages.add(new AirbyteMessage().withType(AirbyteMessage.Type.RECORD) + .withRecord(new AirbyteRecordMessage().withStream(streamName).withNamespace(namespace) + .withData(Jsons.jsonNode(Map + .of(COL_ID, ID_VALUE_5, + COL_NAME, "data", + COL_UPDATED_AT, "2006-10-19"))))); + final DbStreamState state = new DbStreamState() + .withStreamName(streamName) + .withStreamNamespace(namespace) + .withCursorField(List.of(COL_ID)) + .withCursor("5"); + expectedMessages.addAll(createExpectedTestMessages(List.of(state))); + return expectedMessages; + } + } diff --git a/airbyte-integrations/connectors/source-mysql-strict-encrypt/src/test/resources/expected_spec.json b/airbyte-integrations/connectors/source-mysql-strict-encrypt/src/test/resources/expected_spec.json index 1bd91e3f0f91..51bdd7793f73 100644 --- a/airbyte-integrations/connectors/source-mysql-strict-encrypt/src/test/resources/expected_spec.json +++ b/airbyte-integrations/connectors/source-mysql-strict-encrypt/src/test/resources/expected_spec.json @@ -47,11 +47,129 @@ "type": "string", "order": 5 }, + "ssl_mode": { + "title": "SSL modes", + "description": "SSL connection modes.
  • required - Always connect with SSL. If the MySQL server doesn’t support SSL, the connection will not be established. Certificate Authority (CA) and Hostname are not verified.
  • verify-ca - Always connect with SSL. Verifies CA, but allows connection even if Hostname does not match.
  • Verify Identity - Always connect with SSL. Verify both CA and Hostname.
  • Read more in the docs.", + "type": "object", + "order": 7, + "oneOf": [ + { + "title": "required", + "description": "Require SSL mode.", + "required": ["mode"], + "properties": { + "mode": { + "type": "string", + "const": "required", + "enum": ["required"], + "default": "required", + "order": 0 + } + } + }, + { + "title": "Verify CA", + "description": "Verify CA SSL mode.", + "required": [ + "mode", + "ca_certificate" + ], + "properties": { + "mode": { + "type": "string", + "const": "verify_ca", + "enum": ["verify_ca"], + "default": "verify_ca", + "order": 0 + }, + "ca_certificate": { + "type": "string", + "title": "CA certificate", + "description": "CA certificate", + "airbyte_secret": true, + "multiline": true, + "order": 1 + }, + "client_certificate": { + "type": "string", + "title": "Client certificate", + "description": "Client certificate (this is not a required field, but if you want to use it, you will need to add the Client key as well)", + "airbyte_secret": true, + "multiline": true, + "order": 2 + }, + "client_key": { + "type": "string", + "title": "Client key", + "description": "Client key (this is not a required field, but if you want to use it, you will need to add the Client certificate as well)", + "airbyte_secret": true, + "multiline": true, + "order": 3 + }, + "client_key_password": { + "type": "string", + "title": "Client key password (Optional)", + "description": "Password for keystorage. This field is optional. If you do not add it - the password will be generated automatically.", + "airbyte_secret": true, + "order": 4 + } + } + }, + { + "title": "Verify Identity", + "description": "Verify-full SSL mode.", + "required": [ + "mode", + "ca_certificate" + ], + "properties": { + "mode": { + "type": "string", + "const": "verify_identity", + "enum": ["verify_identity"], + "default": "verify_identity", + "order": 0 + }, + "ca_certificate": { + "type": "string", + "title": "CA certificate", + "description": "CA certificate", + "airbyte_secret": true, + "multiline": true, + "order": 1 + }, + "client_certificate": { + "type": "string", + "title": "Client certificate", + "description": "Client certificate (this is not a required field, but if you want to use it, you will need to add the Client key as well)", + "airbyte_secret": true, + "multiline": true, + "order": 2 + }, + "client_key": { + "type": "string", + "title": "Client key", + "description": "Client key (this is not a required field, but if you want to use it, you will need to add the Client certificate as well)", + "airbyte_secret": true, + "multiline": true, + "order": 3 + }, + "client_key_password": { + "type": "string", + "title": "Client key password (Optional)", + "description": "Password for keystorage. This field is optional. If you do not add it - the password will be generated automatically.", + "airbyte_secret": true, + "order": 4 + } + } + } + ] + }, "replication_method": { "type": "string", "title": "Replication Method", "description": "Replication method which is used for data extraction from the database. STANDARD replication requires no setup on the DB side but will not be able to represent deletions incrementally. CDC uses the Binlog to detect inserts, updates, and deletes. This needs to be configured on the source database itself.", - "order": 7, + "order": 8, "default": "STANDARD", "enum": ["STANDARD", "CDC"] } diff --git a/airbyte-integrations/connectors/source-mysql/Dockerfile b/airbyte-integrations/connectors/source-mysql/Dockerfile index 54ce574ace98..f686153111b4 100644 --- a/airbyte-integrations/connectors/source-mysql/Dockerfile +++ b/airbyte-integrations/connectors/source-mysql/Dockerfile @@ -16,5 +16,5 @@ ENV APPLICATION source-mysql COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=0.6.2 +LABEL io.airbyte.version=0.6.3 LABEL io.airbyte.name=airbyte/source-mysql diff --git a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlCdcProperties.java b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlCdcProperties.java index cf30b04329a3..a19100946707 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlCdcProperties.java +++ b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlCdcProperties.java @@ -5,8 +5,14 @@ package io.airbyte.integrations.source.mysql; import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.db.jdbc.JdbcUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.Properties; +import static io.airbyte.integrations.util.MySqlSslConnectionUtils.checkOrCreatePassword; + public class MySqlCdcProperties { static Properties getDebeziumProperties(final JsonNode config) { @@ -44,7 +50,35 @@ static Properties getDebeziumProperties(final JsonNode config) { // https://debezium.io/documentation/reference/1.9/connectors/mysql.html#mysql-property-binary-handling-mode props.setProperty("binary.handling.mode", "base64"); props.setProperty("database.include.list", config.get("database").asText()); + // Check params for SSL connection in config and add properties for CDC SSL connection + // https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-property-database-ssl-mode + if (!config.has(JdbcUtils.SSL_KEY) || config.get(JdbcUtils.SSL_KEY).asBoolean()) { + if (config.has(JdbcUtils.SSL_MODE_KEY) && config.get(JdbcUtils.SSL_MODE_KEY).has(JdbcUtils.MODE_KEY)) { + props.setProperty("database.ssl.mode", config.get(JdbcUtils.SSL_MODE_KEY).get(JdbcUtils.MODE_KEY).asText()); + final var method = config.get(JdbcUtils.SSL_MODE_KEY).get(JdbcUtils.MODE_KEY).asText(); + if (method.equals("verify_ca") || method.equals("verify_identity")) { + var sslPassword = checkOrCreatePassword(config.get(JdbcUtils.SSL_MODE_KEY)); + props.setProperty("database.history.producer.security.protocol", "SSL"); + props.setProperty("database.history.producer.ssl.truststore.location", "customtruststore.jks"); + props.setProperty("database.history.producer.ssl.truststore.password", sslPassword); + props.setProperty("database.history.producer.ssl.key.password", sslPassword); + props.setProperty("database.history.consumer.security.protocol", "SSL"); + props.setProperty("database.history.consumer.ssl.truststore.location", "customtruststore.jks"); + props.setProperty("database.history.consumer.ssl.truststore.password", sslPassword); + props.setProperty("database.history.consumer.ssl.key.password", sslPassword); + if (method.equals("verify_identity")) { + props.setProperty("database.history.producer.ssl.keystore.location", "customkeystore.jks"); + props.setProperty("database.history.producer.ssl.keystore.password", sslPassword); + + props.setProperty("database.history.consumer.ssl.keystore.location", "customkeystore.jks"); + props.setProperty("database.history.consumer.ssl.keystore.password", sslPassword); + } + } + } else { + props.setProperty("database.ssl.mode", "required"); + } + } return props; } diff --git a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java index 10ced47226ca..03a4cab465dc 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java +++ b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java @@ -4,11 +4,6 @@ package io.airbyte.integrations.source.mysql; -import static io.airbyte.integrations.debezium.AirbyteDebeziumHandler.shouldUseCDC; -import static io.airbyte.integrations.debezium.internals.DebeziumEventUtils.CDC_DELETED_AT; -import static io.airbyte.integrations.debezium.internals.DebeziumEventUtils.CDC_UPDATED_AT; -import static java.util.stream.Collectors.toList; - import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.collect.ImmutableMap; @@ -35,16 +30,24 @@ import io.airbyte.protocol.models.CommonField; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import io.airbyte.protocol.models.SyncMode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.time.Duration; import java.time.Instant; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; + +import static io.airbyte.integrations.debezium.AirbyteDebeziumHandler.shouldUseCDC; +import static io.airbyte.integrations.debezium.internals.DebeziumEventUtils.CDC_DELETED_AT; +import static io.airbyte.integrations.debezium.internals.DebeziumEventUtils.CDC_UPDATED_AT; +import static io.airbyte.integrations.util.MySqlSslConnectionUtils.obtainConnection; +import static java.util.stream.Collectors.toList; public class MySqlSource extends AbstractJdbcSource implements Source { @@ -57,8 +60,10 @@ public class MySqlSource extends AbstractJdbcSource implements Source public static final String CDC_LOG_POS = "_ab_cdc_log_pos"; public static final List SSL_PARAMETERS = List.of( "useSSL=true", - "requireSSL=true", - "verifyServerCertificate=false"); + "requireSSL=true"); + + public static final String SSL_PARAMETERS_WITH_CERTIFICATE_VALIDATION = "verifyServerCertificate=true"; + public static final String SSL_PARAMETERS_WITHOUT_CERTIFICATE_VALIDATION = "verifyServerCertificate=false"; public static Source sshWrappedSource() { return new SshWrappedSource(new MySqlSource(), JdbcUtils.HOST_LIST_KEY, JdbcUtils.PORT_LIST_KEY); @@ -145,22 +150,35 @@ public JsonNode toDatabaseConfig(final JsonNode config) { // in the setJsonField method in MySqlSourceOperations.java jdbcUrl.append("&yearIsDateType=true"); if (config.get(JdbcUtils.JDBC_URL_PARAMS_KEY) != null && !config.get(JdbcUtils.JDBC_URL_PARAMS_KEY).asText().isEmpty()) { - jdbcUrl.append("&").append(config.get(JdbcUtils.JDBC_URL_PARAMS_KEY).asText()); + jdbcUrl.append(JdbcUtils.AMPERSAND).append(config.get(JdbcUtils.JDBC_URL_PARAMS_KEY).asText()); } - + Map additionalParameters = new HashMap<>(); // assume ssl if not explicitly mentioned. if (!config.has(JdbcUtils.SSL_KEY) || config.get(JdbcUtils.SSL_KEY).asBoolean()) { - jdbcUrl.append("&").append(String.join("&", SSL_PARAMETERS)); + if (config.has(JdbcUtils.SSL_MODE_KEY)) { + additionalParameters.putAll(obtainConnection(config.get(JdbcUtils.SSL_MODE_KEY))); + jdbcUrl.append(JdbcUtils.AMPERSAND).append(String.join(JdbcUtils.AMPERSAND, SSL_PARAMETERS)) + .append(JdbcUtils.AMPERSAND); + if (additionalParameters.isEmpty()) { + jdbcUrl.append(SSL_PARAMETERS_WITHOUT_CERTIFICATE_VALIDATION); + } else { + jdbcUrl.append(SSL_PARAMETERS_WITH_CERTIFICATE_VALIDATION); + } + } else { + jdbcUrl.append(JdbcUtils.AMPERSAND).append(String.join(JdbcUtils.AMPERSAND, SSL_PARAMETERS)) + .append(JdbcUtils.AMPERSAND).append(SSL_PARAMETERS_WITHOUT_CERTIFICATE_VALIDATION); + } } final ImmutableMap.Builder configBuilder = ImmutableMap.builder() .put(JdbcUtils.USERNAME_KEY, config.get(JdbcUtils.USERNAME_KEY).asText()) .put(JdbcUtils.JDBC_URL_KEY, jdbcUrl.toString()); + configBuilder.putAll(additionalParameters); + if (config.has(JdbcUtils.PASSWORD_KEY)) { configBuilder.put(JdbcUtils.PASSWORD_KEY, config.get(JdbcUtils.PASSWORD_KEY).asText()); } - return Jsons.jsonNode(configBuilder.build()); } diff --git a/airbyte-integrations/connectors/source-mysql/src/main/resources/spec.json b/airbyte-integrations/connectors/source-mysql/src/main/resources/spec.json index 6021b6a02102..00cb937b6806 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/resources/spec.json +++ b/airbyte-integrations/connectors/source-mysql/src/main/resources/spec.json @@ -54,11 +54,143 @@ "default": true, "order": 6 }, + "ssl_mode": { + "title": "SSL modes", + "description": "SSL connection modes.
  • preferred - Automatically attempt SSL connection. If the MySQL server does not support SSL, continue with a regular connection.
  • required - Always connect with SSL. If the MySQL server doesn’t support SSL, the connection will not be established. Certificate Authority (CA) and Hostname are not verified.
  • verify-ca - Always connect with SSL. Verifies CA, but allows connection even if Hostname does not match.
  • Verify Identity - Always connect with SSL. Verify both CA and Hostname.
  • Read more in the docs.", + "type": "object", + "order": 7, + "oneOf": [ + { + "title": "preferred", + "description": "Preferred SSL mode.", + "required": ["mode"], + "properties": { + "mode": { + "type": "string", + "const": "preferred", + "enum": ["preferred"], + "default": "preferred", + "order": 0 + } + } + }, + { + "title": "required", + "description": "Require SSL mode.", + "required": ["mode"], + "properties": { + "mode": { + "type": "string", + "const": "required", + "enum": ["required"], + "default": "required", + "order": 0 + } + } + }, + { + "title": "Verify CA", + "description": "Verify CA SSL mode.", + "required": [ + "mode", + "ca_certificate" + ], + "properties": { + "mode": { + "type": "string", + "const": "verify_ca", + "enum": ["verify_ca"], + "default": "verify_ca", + "order": 0 + }, + "ca_certificate": { + "type": "string", + "title": "CA certificate", + "description": "CA certificate", + "airbyte_secret": true, + "multiline": true, + "order": 1 + }, + "client_certificate": { + "type": "string", + "title": "Client certificate", + "description": "Client certificate (this is not a required field, but if you want to use it, you will need to add the Client key as well)", + "airbyte_secret": true, + "multiline": true, + "order": 2 + }, + "client_key": { + "type": "string", + "title": "Client key", + "description": "Client key (this is not a required field, but if you want to use it, you will need to add the Client certificate as well)", + "airbyte_secret": true, + "multiline": true, + "order": 3 + }, + "client_key_password": { + "type": "string", + "title": "Client key password (Optional)", + "description": "Password for keystorage. This field is optional. If you do not add it - the password will be generated automatically.", + "airbyte_secret": true, + "order": 4 + } + } + }, + { + "title": "Verify Identity", + "description": "Verify-full SSL mode.", + "required": [ + "mode", + "ca_certificate" + ], + "properties": { + "mode": { + "type": "string", + "const": "verify_identity", + "enum": ["verify_identity"], + "default": "verify_identity", + "order": 0 + }, + "ca_certificate": { + "type": "string", + "title": "CA certificate", + "description": "CA certificate", + "airbyte_secret": true, + "multiline": true, + "order": 1 + }, + "client_certificate": { + "type": "string", + "title": "Client certificate", + "description": "Client certificate (this is not a required field, but if you want to use it, you will need to add the Client key as well)", + "airbyte_secret": true, + "multiline": true, + "order": 2 + }, + "client_key": { + "type": "string", + "title": "Client key", + "description": "Client key (this is not a required field, but if you want to use it, you will need to add the Client certificate as well)", + "airbyte_secret": true, + "multiline": true, + "order": 3 + }, + "client_key_password": { + "type": "string", + "title": "Client key password (Optional)", + "description": "Password for keystorage. This field is optional. If you do not add it - the password will be generated automatically.", + "airbyte_secret": true, + "order": 4 + } + } + } + ] + }, "replication_method": { "type": "string", "title": "Replication Method", "description": "Replication method which is used for data extraction from the database. STANDARD replication requires no setup on the DB side but will not be able to represent deletions incrementally. CDC uses the Binlog to detect inserts, updates, and deletes. This needs to be configured on the source database itself.", - "order": 7, + "order": 8, "default": "STANDARD", "enum": ["STANDARD", "CDC"] } diff --git a/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/AbstractMySqlSslCertificateSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/AbstractMySqlSslCertificateSourceAcceptanceTest.java new file mode 100644 index 000000000000..4f4ad709d2ad --- /dev/null +++ b/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/AbstractMySqlSslCertificateSourceAcceptanceTest.java @@ -0,0 +1,77 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.source.mysql; + +import com.google.common.collect.ImmutableMap; +import io.airbyte.commons.json.Jsons; +import io.airbyte.db.Database; +import io.airbyte.db.MySqlUtils; +import io.airbyte.db.factory.DSLContextFactory; +import io.airbyte.db.factory.DatabaseDriver; +import io.airbyte.db.jdbc.JdbcUtils; +import io.airbyte.integrations.source.mysql.MySqlSource.ReplicationMethod; +import io.airbyte.integrations.standardtest.source.TestDestinationEnv; +import org.jooq.DSLContext; +import org.jooq.SQLDialect; +import org.testcontainers.containers.MySQLContainer; + +import java.io.IOException; + +public abstract class AbstractMySqlSslCertificateSourceAcceptanceTest extends MySqlSourceAcceptanceTest { + + protected static MySqlUtils.Certificate certs; + protected static final String PASSWORD = "Passw0rd"; + + @Override + protected void setupEnvironment(final TestDestinationEnv environment) throws Exception { + + container = new MySQLContainer<>("mysql:8.0"); + container.start(); + addTestData(container); + certs = getCertificates(); + + var sslMode = getSslConfig(); + + config = Jsons.jsonNode(ImmutableMap.builder() + .put(JdbcUtils.HOST_KEY, container.getHost()) + .put(JdbcUtils.PORT_KEY, container.getFirstMappedPort()) + .put(JdbcUtils.DATABASE_KEY, container.getDatabaseName()) + .put(JdbcUtils.USERNAME_KEY, container.getUsername()) + .put(JdbcUtils.PASSWORD_KEY, container.getPassword()) + .put(JdbcUtils.SSL_KEY, true) + .put(JdbcUtils.SSL_MODE_KEY, sslMode) + .put("replication_method", ReplicationMethod.STANDARD) + .build()); + } + + public abstract MySqlUtils.Certificate getCertificates() throws IOException, InterruptedException; + + public abstract ImmutableMap getSslConfig(); + + private void addTestData(MySQLContainer container) throws Exception { + try (final DSLContext dslContext = DSLContextFactory.create( + container.getUsername(), + container.getPassword(), + DatabaseDriver.MYSQL.getDriverClassName(), + String.format("jdbc:mysql://%s:%s/%s", + container.getHost(), + container.getFirstMappedPort(), + container.getDatabaseName()), + SQLDialect.MYSQL)) { + final Database database = new Database(dslContext); + + database.query(ctx -> { + ctx.fetch("CREATE TABLE id_and_name(id INTEGER, name VARCHAR(200));"); + ctx.fetch( + "INSERT INTO id_and_name (id, name) VALUES (1,'picard'), (2, 'crusher'), (3, 'vash');"); + ctx.fetch("CREATE TABLE starships(id INTEGER, name VARCHAR(200));"); + ctx.fetch( + "INSERT INTO starships (id, name) VALUES (1,'enterprise-d'), (2, 'defiant'), (3, 'yamato');"); + return null; + }); + } + } + +} diff --git a/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/CdcMySqlSslCaCertificateSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/CdcMySqlSslCaCertificateSourceAcceptanceTest.java new file mode 100644 index 000000000000..2b7b9fde1ff2 --- /dev/null +++ b/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/CdcMySqlSslCaCertificateSourceAcceptanceTest.java @@ -0,0 +1,201 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.source.mysql; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import io.airbyte.commons.json.Jsons; +import io.airbyte.db.Database; +import io.airbyte.db.MySqlUtils; +import io.airbyte.db.factory.DSLContextFactory; +import io.airbyte.db.factory.DatabaseDriver; +import io.airbyte.db.jdbc.JdbcUtils; +import io.airbyte.integrations.base.ssh.SshHelpers; +import io.airbyte.integrations.source.mysql.MySqlSource.ReplicationMethod; +import io.airbyte.integrations.standardtest.source.SourceAcceptanceTest; +import io.airbyte.integrations.standardtest.source.TestDestinationEnv; +import io.airbyte.protocol.models.AirbyteMessage; +import io.airbyte.protocol.models.AirbyteRecordMessage; +import io.airbyte.protocol.models.AirbyteStateMessage; +import io.airbyte.protocol.models.CatalogHelpers; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import io.airbyte.protocol.models.ConfiguredAirbyteStream; +import io.airbyte.protocol.models.ConnectorSpecification; +import io.airbyte.protocol.models.DestinationSyncMode; +import io.airbyte.protocol.models.Field; +import io.airbyte.protocol.models.JsonSchemaType; +import io.airbyte.protocol.models.SyncMode; +import org.jooq.DSLContext; +import org.jooq.SQLDialect; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.MySQLContainer; + +import java.util.List; +import java.util.stream.Collectors; + +import static io.airbyte.protocol.models.SyncMode.INCREMENTAL; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; + +public class CdcMySqlSslCaCertificateSourceAcceptanceTest extends SourceAcceptanceTest { + + private static final String STREAM_NAME = "id_and_name"; + private static final String STREAM_NAME2 = "starships"; + private MySQLContainer container; + private JsonNode config; + private static MySqlUtils.Certificate certs; + + @Override + protected String getImageName() { + return "airbyte/source-mysql:dev"; + } + + @Override + protected ConnectorSpecification getSpec() throws Exception { + return SshHelpers.getSpecAndInjectSsh(); + } + + @Override + protected JsonNode getConfig() { + return config; + } + + @Override + protected ConfiguredAirbyteCatalog getConfiguredCatalog() { + return new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList( + new ConfiguredAirbyteStream() + .withSyncMode(INCREMENTAL) + .withDestinationSyncMode(DestinationSyncMode.APPEND) + .withStream(CatalogHelpers.createAirbyteStream( + String.format("%s", STREAM_NAME), + String.format("%s", config.get(JdbcUtils.DATABASE_KEY).asText()), + Field.of("id", JsonSchemaType.NUMBER), + Field.of("name", JsonSchemaType.STRING)) + .withSourceDefinedCursor(true) + .withSourceDefinedPrimaryKey(List.of(List.of("id"))) + .withSupportedSyncModes( + Lists.newArrayList(SyncMode.FULL_REFRESH, INCREMENTAL))), + new ConfiguredAirbyteStream() + .withSyncMode(INCREMENTAL) + .withDestinationSyncMode(DestinationSyncMode.APPEND) + .withStream(CatalogHelpers.createAirbyteStream( + String.format("%s", STREAM_NAME2), + String.format("%s", config.get(JdbcUtils.DATABASE_KEY).asText()), + Field.of("id", JsonSchemaType.NUMBER), + Field.of("name", JsonSchemaType.STRING)) + .withSourceDefinedCursor(true) + .withSourceDefinedPrimaryKey(List.of(List.of("id"))) + .withSupportedSyncModes( + Lists.newArrayList(SyncMode.FULL_REFRESH, INCREMENTAL))))); + } + + @Override + protected JsonNode getState() { + return null; + } + + @Override + protected void setupEnvironment(final TestDestinationEnv environment) throws Exception { + container = new MySQLContainer<>("mysql:8.0"); + container.start(); + certs = MySqlUtils.getCertificate(container, true); + + var sslMode = ImmutableMap.builder() + .put(JdbcUtils.MODE_KEY, "verify_ca") + .put("ca_certificate", certs.getCaCertificate()) + .put("client_certificate", certs.getClientCertificate()) + .put("client_key", certs.getClientKey()) + .put("client_key_password", "Passw0rd") + .build(); + + config = Jsons.jsonNode(ImmutableMap.builder() + .put(JdbcUtils.HOST_KEY, container.getHost()) + .put(JdbcUtils.PORT_KEY, container.getFirstMappedPort()) + .put(JdbcUtils.DATABASE_KEY, container.getDatabaseName()) + .put(JdbcUtils.USERNAME_KEY, container.getUsername()) + .put(JdbcUtils.PASSWORD_KEY, container.getPassword()) + .put(JdbcUtils.SSL_KEY, true) + .put(JdbcUtils.SSL_MODE_KEY, sslMode) + .put("replication_method", ReplicationMethod.CDC) + .build()); + + revokeAllPermissions(); + grantCorrectPermissions(); + createAndPopulateTables(); + } + + private void createAndPopulateTables() { + executeQuery("CREATE TABLE id_and_name(id INTEGER PRIMARY KEY, name VARCHAR(200));"); + executeQuery( + "INSERT INTO id_and_name (id, name) VALUES (1,'picard'), (2, 'crusher'), (3, 'vash');"); + executeQuery("CREATE TABLE starships(id INTEGER PRIMARY KEY, name VARCHAR(200));"); + executeQuery( + "INSERT INTO starships (id, name) VALUES (1,'enterprise-d'), (2, 'defiant'), (3, 'yamato');"); + } + + private void revokeAllPermissions() { + executeQuery("REVOKE ALL PRIVILEGES, GRANT OPTION FROM " + container.getUsername() + "@'%';"); + } + + private void grantCorrectPermissions() { + executeQuery( + "GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO " + + container.getUsername() + "@'%';"); + } + + private void executeQuery(final String query) { + try (final DSLContext dslContext = DSLContextFactory.create( + "root", + "test", + DatabaseDriver.MYSQL.getDriverClassName(), + String.format(DatabaseDriver.MYSQL.getUrlFormatString(), + container.getHost(), + container.getFirstMappedPort(), + container.getDatabaseName()), + SQLDialect.MYSQL)) { + final Database database = new Database(dslContext); + database.query( + ctx -> ctx + .execute(query)); + } catch (final Exception e) { + throw new RuntimeException(e); + } + } + + @Override + protected void tearDown(final TestDestinationEnv testEnv) { + container.close(); + } + + @Test + public void testIncrementalSyncShouldNotFailIfBinlogIsDeleted() throws Exception { + final ConfiguredAirbyteCatalog configuredCatalog = withSourceDefinedCursors(getConfiguredCatalog()); + // only sync incremental streams + configuredCatalog.setStreams( + configuredCatalog.getStreams().stream().filter(s -> s.getSyncMode() == INCREMENTAL).collect(Collectors.toList())); + + final List airbyteMessages = runRead(configuredCatalog, getState()); + final List recordMessages = filterRecords(airbyteMessages); + final List stateMessages = airbyteMessages + .stream() + .filter(m -> m.getType() == AirbyteMessage.Type.STATE) + .map(AirbyteMessage::getState) + .collect(Collectors.toList()); + assertFalse(recordMessages.isEmpty(), "Expected the first incremental sync to produce records"); + assertFalse(stateMessages.isEmpty(), "Expected incremental sync to produce STATE messages"); + + // when we run incremental sync again there should be no new records. Run a sync with the latest + // state message and assert no records were emitted. + final JsonNode latestState = Jsons.jsonNode(supportsPerStream() ? stateMessages : List.of(Iterables.getLast(stateMessages))); + // RESET MASTER removes all binary log files that are listed in the index file, + // leaving only a single, empty binary log file with a numeric suffix of .000001 + executeQuery("RESET MASTER;"); + + assertEquals(6, filterRecords(runRead(configuredCatalog, latestState)).size()); + } + +} diff --git a/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/MySqlSslCaCertificateSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/MySqlSslCaCertificateSourceAcceptanceTest.java new file mode 100644 index 000000000000..6872a9a67051 --- /dev/null +++ b/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/MySqlSslCaCertificateSourceAcceptanceTest.java @@ -0,0 +1,29 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.source.mysql; + +import com.google.common.collect.ImmutableMap; +import io.airbyte.db.MySqlUtils; +import io.airbyte.db.jdbc.JdbcUtils; + +import java.io.IOException; + +public class MySqlSslCaCertificateSourceAcceptanceTest extends AbstractMySqlSslCertificateSourceAcceptanceTest { + + @Override + public MySqlUtils.Certificate getCertificates() throws IOException, InterruptedException { + return MySqlUtils.getCertificate(container, false); + } + + @Override + public ImmutableMap getSslConfig() { + return ImmutableMap.builder() + .put(JdbcUtils.MODE_KEY, "verify_ca") + .put("ca_certificate", certs.getCaCertificate()) + .put("client_key_password", PASSWORD) + .build(); + } + +} diff --git a/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/MySqlSslFullCertificateSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/MySqlSslFullCertificateSourceAcceptanceTest.java new file mode 100644 index 000000000000..4f3d29108198 --- /dev/null +++ b/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/MySqlSslFullCertificateSourceAcceptanceTest.java @@ -0,0 +1,31 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.source.mysql; + +import com.google.common.collect.ImmutableMap; +import io.airbyte.db.MySqlUtils; +import io.airbyte.db.jdbc.JdbcUtils; + +import java.io.IOException; + +public class MySqlSslFullCertificateSourceAcceptanceTest extends AbstractMySqlSslCertificateSourceAcceptanceTest { + + @Override + public MySqlUtils.Certificate getCertificates() throws IOException, InterruptedException { + return MySqlUtils.getCertificate(container, true); + } + + @Override + public ImmutableMap getSslConfig() { + return ImmutableMap.builder() + .put(JdbcUtils.MODE_KEY, "verify_identity") + .put("ca_certificate", certs.getCaCertificate()) + .put("client_certificate", certs.getClientCertificate()) + .put("client_key", certs.getClientKey()) + .put("client_key_password", PASSWORD) + .build(); + } + +} diff --git a/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/MySqlSslSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/MySqlSslSourceAcceptanceTest.java index 05104c470b11..9d8dfdb6bd86 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/MySqlSslSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/MySqlSslSourceAcceptanceTest.java @@ -4,8 +4,6 @@ package io.airbyte.integrations.source.mysql; -import static io.airbyte.integrations.source.mysql.MySqlSource.SSL_PARAMETERS; - import com.google.common.collect.ImmutableMap; import io.airbyte.commons.json.Jsons; import io.airbyte.db.Database; @@ -25,6 +23,10 @@ protected void setupEnvironment(final TestDestinationEnv environment) throws Exc container = new MySQLContainer<>("mysql:8.0"); container.start(); + var sslMode = ImmutableMap.builder() + .put(JdbcUtils.MODE_KEY, "required") + .build(); + config = Jsons.jsonNode(ImmutableMap.builder() .put(JdbcUtils.HOST_KEY, container.getHost()) .put(JdbcUtils.PORT_KEY, container.getFirstMappedPort()) @@ -32,6 +34,7 @@ protected void setupEnvironment(final TestDestinationEnv environment) throws Exc .put(JdbcUtils.USERNAME_KEY, container.getUsername()) .put(JdbcUtils.PASSWORD_KEY, container.getPassword()) .put(JdbcUtils.SSL_KEY, true) + .put(JdbcUtils.SSL_MODE_KEY, sslMode) .put("replication_method", ReplicationMethod.STANDARD) .build()); @@ -39,11 +42,10 @@ protected void setupEnvironment(final TestDestinationEnv environment) throws Exc config.get(JdbcUtils.USERNAME_KEY).asText(), config.get(JdbcUtils.PASSWORD_KEY).asText(), DatabaseDriver.MYSQL.getDriverClassName(), - String.format("jdbc:mysql://%s:%s/%s?%s", + String.format("jdbc:mysql://%s:%s/%s", config.get(JdbcUtils.HOST_KEY).asText(), config.get(JdbcUtils.PORT_KEY).asText(), - config.get(JdbcUtils.DATABASE_KEY).asText(), - String.join("&", SSL_PARAMETERS)), + config.get(JdbcUtils.DATABASE_KEY).asText()), SQLDialect.MYSQL)) { final Database database = new Database(dslContext); diff --git a/docs/integrations/sources/mysql.md b/docs/integrations/sources/mysql.md index c54d5eed9ba5..2e5704f2b5bc 100644 --- a/docs/integrations/sources/mysql.md +++ b/docs/integrations/sources/mysql.md @@ -185,6 +185,7 @@ If you do not see a type in this list, assume that it is coerced into a string. | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------| +| 0.6.3 | 2022-08-12 | [15044](https://github.com/airbytehq/airbyte/pull/15044) | Added the ability to connect using different SSL modes and SSL certificates | | 0.6.2 | 2022-08-11 | [15538](https://github.com/airbytehq/airbyte/pull/15538) | Allow additional properties in db stream state | | 0.6.1 | 2022-08-02 | [14801](https://github.com/airbytehq/airbyte/pull/14801) | Fix multiple log bindings | | 0.6.0 | 2022-07-26 | [14362](https://github.com/airbytehq/airbyte/pull/14362) | Integral columns are now discovered as int64 fields. |