Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎉 Source MySql: Added SSL certificates to MySql Source #15044

Merged
merged 31 commits into from
Aug 16, 2022
Merged
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
92b8b8f
updated mysql source specification and added field for root and clien…
andriikorotkov Jul 21, 2022
93f7913
Merge branch 'master' of github.com:airbytehq/airbyte into akorotkov/…
andriikorotkov Jul 25, 2022
7178243
added SSL mode for mysql source
andriikorotkov Jul 26, 2022
f5bd946
fixed code style
andriikorotkov Jul 26, 2022
cfe583d
updated run process timeout
andriikorotkov Jul 27, 2022
6e2dbb3
updated method for create keystore and updated tests
andriikorotkov Jul 28, 2022
b872f43
Merge branch 'master' of github.com:airbytehq/airbyte into akorotkov/…
andriikorotkov Jul 28, 2022
ea08e2b
updated normalization version for postgres destination
andriikorotkov Aug 9, 2022
ac0f651
updated normalization version for postgres destination
andriikorotkov Aug 9, 2022
a10374d
added tests for connection with certificates
andriikorotkov Aug 9, 2022
67d67ae
Merge branch 'master' of github.com:airbytehq/airbyte into akorotkov/…
andriikorotkov Aug 9, 2022
9cc861d
updated tests for connection with full certificates and added tests f…
andriikorotkov Aug 9, 2022
2575317
updated tests
andriikorotkov Aug 10, 2022
bd1fcb5
updated source-mysql-strict-encrypt and updated versions
andriikorotkov Aug 10, 2022
95d6545
updated code style
andriikorotkov Aug 10, 2022
73bd508
Merge branch 'master' of github.com:airbytehq/airbyte into akorotkov/…
andriikorotkov Aug 10, 2022
5ccdc64
updated doc
andriikorotkov Aug 10, 2022
15b31b4
Merge branch 'master' of github.com:airbytehq/airbyte into akorotkov/…
andriikorotkov Aug 10, 2022
6c5a9a9
updated specs
andriikorotkov Aug 10, 2022
9109ec1
fixed minor remarks
andriikorotkov Aug 11, 2022
3c908c9
fixed minor remarks
andriikorotkov Aug 11, 2022
0e1c0cb
updated tests
andriikorotkov Aug 11, 2022
2c47c57
fixed remarks and updated specification
andriikorotkov Aug 14, 2022
135ae36
Merge branch 'master' of github.com:airbytehq/airbyte into akorotkov/…
andriikorotkov Aug 14, 2022
2418e98
fixed mysql sources connectors version
andriikorotkov Aug 14, 2022
132c7ce
added CDC + SSL Certificates tests
andriikorotkov Aug 14, 2022
5503174
added property for CDC and added tests for test SSL with CDC together
andriikorotkov Aug 15, 2022
5f2dad7
Merge branch 'master' of github.com:airbytehq/airbyte into akorotkov/…
andriikorotkov Aug 15, 2022
9e732d7
fixed MySqlStrictEncryptJdbcSourceAcceptanceTest for work with dateti…
andriikorotkov Aug 15, 2022
5c2d5aa
added property for CDC and added tests for test SSL with CDC together
andriikorotkov Aug 16, 2022
099f89d
auto-bump connector version [ci skip]
octavia-squidington-iii Aug 16, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions airbyte-db/db-lib/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ dependencies {

// Mark as compile only to avoid leaking transitively to connectors
compileOnly libs.platform.testcontainers.postgresql
compileOnly libs.connectors.testcontainers.mysql

// These are required because gradle might be using lower version of Jna from other
// library transitive dependency. Can be removed if we can figure out which library is the cause.
Expand All @@ -25,6 +26,7 @@ dependencies {
testImplementation project(':airbyte-test-utils')
testImplementation 'org.apache.commons:commons-lang3:3.11'
testImplementation libs.platform.testcontainers.postgresql
testImplementation libs.connectors.testcontainers.mysql

// Big Query
implementation('com.google.cloud:google-cloud-bigquery:1.133.1')
Expand Down
74 changes: 74 additions & 0 deletions airbyte-db/db-lib/src/main/java/io/airbyte/db/MySqlUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.db;

import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import org.testcontainers.containers.MySQLContainer;

public class MySqlUtils {

@VisibleForTesting
public static Certificate getCertificate(final MySQLContainer<?> container,
final boolean useAllCertificates)
throws IOException, InterruptedException {
// add root and server certificates to config file
container.execInContainer("sh", "-c", "sed -i '31 a ssl' /etc/my.cnf");
container.execInContainer("sh", "-c", "sed -i '32 a ssl-ca=/var/lib/mysql/ca.pem' /etc/my.cnf");
container.execInContainer("sh", "-c", "sed -i '33 a ssl-cert=/var/lib/mysql/server-cert.pem' /etc/my.cnf");
container.execInContainer("sh", "-c", "sed -i '34 a ssl-key=/var/lib/mysql/server-key.pem' /etc/my.cnf");
container.execInContainer("sh", "-c", "sed -i '35 a require_secure_transport=ON' /etc/my.cnf");
// add client certificates to config file
if (useAllCertificates) {
container.execInContainer("sh", "-c", "sed -i '39 a [client]' /etc/mysql/my.cnf");
container.execInContainer("sh", "-c", "sed -i '40 a ssl-ca=/var/lib/mysql/ca.pem' /etc/my.cnf");
container.execInContainer("sh", "-c", "sed -i '41 a ssl-cert=/var/lib/mysql/client-cert.pem' /etc/my.cnf");
container.execInContainer("sh", "-c", "sed -i '42 a ssl-key=/var/lib/mysql/client-key.pem' /etc/my.cnf");
}
// copy root certificate and client certificates
var caCert = container.execInContainer("sh", "-c", "cat /var/lib/mysql/ca.pem").getStdout().trim();

if (useAllCertificates) {
var clientKey = container.execInContainer("sh", "-c", "cat /var/lib/mysql/client-key.pem").getStdout().trim();
var clientCert = container.execInContainer("sh", "-c", "cat /var/lib/mysql/client-cert.pem").getStdout().trim();
return new Certificate(caCert, clientCert, clientKey);
} else {
return new Certificate(caCert);
}
}

public static class Certificate {

private final String caCertificate;
private final String clientCertificate;
private final String clientKey;

public Certificate(final String caCertificate) {
this.caCertificate = caCertificate;
this.clientCertificate = null;
this.clientKey = null;
}

public Certificate(final String caCertificate, final String clientCertificate, final String clientKey) {
this.caCertificate = caCertificate;
this.clientCertificate = clientCertificate;
this.clientKey = clientKey;
}

public String getCaCertificate() {
return caCertificate;
}

public String getClientCertificate() {
return clientCertificate;
}

public String getClientKey() {
return clientKey;
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ public class JdbcUtils {
public static final String SSL_MODE_KEY = "ssl_mode";
public static final String TLS_KEY = "tls";
public static final String USERNAME_KEY = "username";
public static final String MODE_KEY = "mode";
public static final String AMPERSAND = "&";
private static final JdbcSourceOperations defaultSourceOperations = new JdbcSourceOperations();

private static final JSONFormat defaultJSONFormat = new JSONFormat().recordFormat(JSONFormat.RecordFormat.OBJECT);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.util;

import com.fasterxml.jackson.databind.JsonNode;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.io.PrintWriter;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.RandomStringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MySqlSslConnectionUtils {

private static final Logger LOGGER = LoggerFactory.getLogger(MySqlSslConnectionUtils.class);

public static final String PARAM_MODE = "mode";
public static final String PARAM_CLIENT_KEY_PASSWORD = "client_key_password";
public static final String PARAM_CA_CERTIFICATE = "ca_certificate";
public static final String PARAM_CLIENT_CERTIFICATE = "client_certificate";
public static final String PARAM_CLIENT_KEY = "client_key";
public static final String TRUST_KEY_STORE_URL = "trustCertificateKeyStoreUrl";
public static final String TRUST_KEY_STORE_PASS = "trustCertificateKeyStorePassword";
public static final String CLIENT_KEY_STORE_URL = "clientCertificateKeyStoreUrl";
public static final String CLIENT_KEY_STORE_PASS = "clientCertificateKeyStorePassword";
public static final String CUSTOM_TRUST_STORE = "customtruststore.jks";
public static final String CUSTOM_KEY_STORE = "customkeystore.jks";
public static final String SSL_MODE = "sslMode";
public static final String VERIFY_CA = "VERIFY_CA";
public static final String VERIFY_IDENTITY = "VERIFY_IDENTITY";
public static final String ROOT_CERTIFICARE_NAME = "ca-cert.pem";
public static final String ROOT_CERTIFICARE_DER_NAME = "ca-cert.der";
public static final String CLIENT_CERTIFICARE_NAME = "client-cert.pem";
public static final String CLIENT_KEY_NAME = "client-key.pem";
public static final String CLIENT_CERT_P12 = "certificate.p12";
public static final String ENCRYPT_FILE_NAME = "encrypt";

public static Map<String, String> obtainConnection(final JsonNode encryption) {
Map<String, String> additionalParameters = new HashMap<>();
if (!encryption.isNull()) {
final var method = encryption.get(PARAM_MODE).asText().toUpperCase();
var keyStorePassword = checkOrCreatePassword(encryption);
if (method.equals(VERIFY_CA) || method.equals(VERIFY_IDENTITY)) {
additionalParameters.putAll(checkCertificatesAndObtainConnection(encryption, method, keyStorePassword));
}
}
return additionalParameters;
}

private static String checkOrCreatePassword(final JsonNode encryption) {
String sslPassword = encryption.has(PARAM_CLIENT_KEY_PASSWORD) ? encryption.get(PARAM_CLIENT_KEY_PASSWORD).asText() : "";
var keyStorePassword = RandomStringUtils.randomAlphanumeric(10);
if (sslPassword.isEmpty()) {
var file = new File(ENCRYPT_FILE_NAME);
if (file.exists()) {
keyStorePassword = readFile(file);
} else {
try {
createCertificateFile(ENCRYPT_FILE_NAME, keyStorePassword);
} catch (final IOException e) {
throw new RuntimeException("Failed to create encryption file ");
}
}

} else {
keyStorePassword = sslPassword;
}
return keyStorePassword;
}

private static String readFile(final File file) {
try {
BufferedReader reader = new BufferedReader(new FileReader(file));
String currentLine = reader.readLine();
reader.close();
return currentLine;
} catch (final IOException e) {
throw new RuntimeException("Failed to read file with encryption");
}
}

private static Map<String, String> checkCertificatesAndObtainConnection(final JsonNode encryption,
final String mode,
final String clientKeyPassword) {
var clientCert = encryption.has(PARAM_CLIENT_CERTIFICATE) &&
!encryption.get(PARAM_CLIENT_CERTIFICATE).asText().isEmpty() ? encryption.get(PARAM_CLIENT_CERTIFICATE).asText() : null;
var clientKey = encryption.has(PARAM_CLIENT_KEY) &&
!encryption.get(PARAM_CLIENT_KEY).asText().isEmpty() ? encryption.get(PARAM_CLIENT_KEY).asText() : null;
if (Objects.nonNull(clientCert) && Objects.nonNull(clientKey)) {
return obtainConnectionWithFullCertificatesOptions(encryption, mode, clientKeyPassword);
} else if (Objects.isNull(clientCert) && Objects.isNull(clientKey)) {
return obtainConnectionWithCaCertificateOptions(encryption, mode, clientKeyPassword);
} else {
throw new RuntimeException("Both fields \"Client certificate\" and \"Client key\" must be added to connect with client certificates.");
}
}

private static Map<String, String> obtainConnectionWithFullCertificatesOptions(final JsonNode encryption,
final String mode,
final String clientKeyPassword) {
Map<String, String> additionalParameters = new HashMap<>();
try {
convertAndImportFullCertificate(encryption.get(PARAM_CA_CERTIFICATE).asText(),
encryption.get(PARAM_CLIENT_CERTIFICATE).asText(),
encryption.get(PARAM_CLIENT_KEY).asText(), clientKeyPassword);
} catch (final IOException | InterruptedException e) {
throw new RuntimeException("Failed to import certificate into Java Keystore");
}
additionalParameters.put(TRUST_KEY_STORE_URL, "file:" + CUSTOM_TRUST_STORE);
additionalParameters.put(TRUST_KEY_STORE_PASS, clientKeyPassword);
andriikorotkov marked this conversation as resolved.
Show resolved Hide resolved
additionalParameters.put(CLIENT_KEY_STORE_URL, "file:" + CUSTOM_KEY_STORE);
additionalParameters.put(CLIENT_KEY_STORE_PASS, clientKeyPassword);
additionalParameters.put(SSL_MODE, mode);

updateTrustStoreSystemProperty(clientKeyPassword);
System.setProperty("javax.net.ssl.keyStore", CUSTOM_KEY_STORE);
System.setProperty("javax.net.ssl.keyStorePassword", clientKeyPassword);

return additionalParameters;
}

private static Map<String, String> obtainConnectionWithCaCertificateOptions(final JsonNode encryption,
final String mode,
final String clientKeyPassword) {
Map<String, String> additionalParameters = new HashMap<>();
try {
convertAndImportCaCertificate(encryption.get(PARAM_CA_CERTIFICATE).asText(), clientKeyPassword);
} catch (final IOException | InterruptedException e) {
throw new RuntimeException("Failed to import certificate into Java Keystore");
}
additionalParameters.put(TRUST_KEY_STORE_URL, "file:" + CUSTOM_TRUST_STORE);
additionalParameters.put(TRUST_KEY_STORE_PASS, clientKeyPassword);
additionalParameters.put(SSL_MODE, mode);

updateTrustStoreSystemProperty(clientKeyPassword);

return additionalParameters;
}

private static void convertAndImportFullCertificate(final String caCertificate,
final String clientCertificate,
final String clientKey,
final String clientKeyPassword)
throws IOException, InterruptedException {
final Runtime run = Runtime.getRuntime();
convertAndImportCaCertificate(caCertificate, clientKeyPassword);
createCertificateFile(CLIENT_CERTIFICARE_NAME, clientCertificate);
createCertificateFile(CLIENT_KEY_NAME, clientKey);
// add client certificate to the custom keystore
runProcess("openssl pkcs12 -export -in " + CLIENT_CERTIFICARE_NAME + " -inkey " + CLIENT_KEY_NAME +
" -out " + CLIENT_CERT_P12 + " -name \"certificate\" -passout pass:" + clientKeyPassword, run);
// add client key to the custom keystore
runProcess("keytool -importkeystore -srckeystore " + CLIENT_CERT_P12 +
" -srcstoretype pkcs12 -destkeystore " + CUSTOM_KEY_STORE + " -srcstorepass " + clientKeyPassword +
" -deststoretype JKS -deststorepass " + clientKeyPassword + " -noprompt", run);
}

private static void convertAndImportCaCertificate(final String caCertificate,
final String clientKeyPassword)
throws IOException, InterruptedException {
final Runtime run = Runtime.getRuntime();
createCaCertificate(caCertificate, clientKeyPassword, run);
}

private static void createCaCertificate(final String caCertificate,
final String clientKeyPassword,
final Runtime run)
throws IOException, InterruptedException {
createCertificateFile(ROOT_CERTIFICARE_NAME, caCertificate);
// add CA certificate to the custom keystore
runProcess("openssl x509 -outform der -in " + ROOT_CERTIFICARE_NAME + " -out " + ROOT_CERTIFICARE_DER_NAME, run);
runProcess("keytool -importcert -alias root-certificate -keystore " + CUSTOM_TRUST_STORE
+ " -file " + ROOT_CERTIFICARE_DER_NAME + " -storepass " + clientKeyPassword + " -noprompt", run);
}

private static void updateTrustStoreSystemProperty(final String clientKeyPassword) {
System.setProperty("javax.net.ssl.trustStore", CUSTOM_TRUST_STORE);
System.setProperty("javax.net.ssl.trustStorePassword", clientKeyPassword);
}

private static void createCertificateFile(String fileName, String fileValue) throws IOException {
try (final PrintWriter out = new PrintWriter(fileName, StandardCharsets.UTF_8)) {
out.print(fileValue);
}
}

private static void runProcess(final String cmd, final Runtime run) throws IOException, InterruptedException {
final Process pr = run.exec(cmd);
if (!pr.waitFor(30, TimeUnit.SECONDS)) {
pr.destroy();
throw new RuntimeException("Timeout while executing: " + cmd);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-mysql-strict-encrypt

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.6.2
LABEL io.airbyte.version=0.6.3
LABEL io.airbyte.name=airbyte/source-mysql-strict-encrypt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.integrations.source.mysql_strict_encrypt;

import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.jdbc.JdbcUtils;
Expand All @@ -23,6 +24,10 @@
public class MySqlStrictEncryptSource extends SpecModifyingSource implements Source {

private static final Logger LOGGER = LoggerFactory.getLogger(MySqlStrictEncryptSource.class);
private static final String SSL_MODE_DESCRIPTION = "SSL connection modes. " +
"<li><b>required</b> - Always connect with SSL. If the MySQL server doesn’t support SSL, the connection will not be established. Certificate Authority (CA) and Hostname are not verified.</li>" +
"<li><b>verify-ca</b> - Always connect with SSL. Verifies CA, but allows connection even if Hostname does not match.</li>" +
"<li><b>Verify Identity</b> - Always connect with SSL. Verify both CA and Hostname.</li></ul>Read more <a href=\"https://dev.mysql.com/doc/connector-j/8.0/en/connector-j-reference-using-ssl.html\"> in the docs</a>.";

MySqlStrictEncryptSource() {
super(MySqlSource.sshWrappedSource());
Expand All @@ -34,6 +39,18 @@ public ConnectorSpecification modifySpec(final ConnectorSpecification originalSp
// SSL property should be enabled by default for secure versions of connectors
// that can be used in the Airbyte cloud. User should not be able to change this property.
((ObjectNode) spec.getConnectionSpecification().get("properties")).remove(JdbcUtils.SSL_KEY);
final ArrayNode modifiedSslModes = spec.getConnectionSpecification().get("properties").get("ssl_mode").get("oneOf").deepCopy();
// update description for ssl_mode property
((ObjectNode) spec.getConnectionSpecification().get("properties").get("ssl_mode")).remove("description");
((ObjectNode) spec.getConnectionSpecification().get("properties").get("ssl_mode")).remove("type");
((ObjectNode) spec.getConnectionSpecification().get("properties").get("ssl_mode")).remove("order");
((ObjectNode) spec.getConnectionSpecification().get("properties").get("ssl_mode")).put("description", SSL_MODE_DESCRIPTION);
((ObjectNode) spec.getConnectionSpecification().get("properties").get("ssl_mode")).put("type", "object");
((ObjectNode) spec.getConnectionSpecification().get("properties").get("ssl_mode")).put("order", 7);
// Assume that the first items is the "preferred" option; remove this option
modifiedSslModes.remove(0);
((ObjectNode) spec.getConnectionSpecification().get("properties").get("ssl_mode")).remove("oneOf");
((ObjectNode) spec.getConnectionSpecification().get("properties").get("ssl_mode")).put("oneOf", modifiedSslModes);
return spec;
}

Expand Down
Loading