Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Source postgres: Add SSL certificates to source postgres #13840

Merged
merged 42 commits into from
Jul 18, 2022
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
9fef1bb
added ssl certificates for postgres source
andriikorotkov Jun 15, 2022
3f2da64
Merge branch 'master' of github.com:airbytehq/airbyte into akorotkov/…
andriikorotkov Jun 15, 2022
9a48ac3
added command for remove client private key after transformation to e…
andriikorotkov Jun 16, 2022
7c5fe79
added connection with CA and client certificates for postgres destina…
andriikorotkov Jun 16, 2022
3c70c36
updated code style
andriikorotkov Jun 16, 2022
cdea269
moved common methods to the common class
andriikorotkov Jun 16, 2022
c4f623e
moved common methods to the common class
andriikorotkov Jun 16, 2022
a2266b5
fixed remarks
andriikorotkov Jun 17, 2022
a0c49ac
Merge branch 'master' of github.com:airbytehq/airbyte into akorotkov/…
andriikorotkov Jun 17, 2022
fa65a81
updated postgres source tests
andriikorotkov Jun 20, 2022
da7c8ae
Merge branch 'master' of github.com:airbytehq/airbyte into akorotkov/…
andriikorotkov Jun 20, 2022
4189a19
Merge branch 'master' of github.com:airbytehq/airbyte into akorotkov/…
andriikorotkov Jun 20, 2022
59e467f
added minor changes to spec and added fixes to password mechanism
andriikorotkov Jun 22, 2022
5d115da
updated postgres source tests
andriikorotkov Jun 22, 2022
9ae6e8c
updated strict-encrypt postgres source and destination and added test…
andriikorotkov Jun 29, 2022
6f8f726
fixed check style
andriikorotkov Jun 29, 2022
4c9cd85
Merge branch 'master' of github.com:airbytehq/airbyte into akorotkov/…
andriikorotkov Jun 29, 2022
d5a408a
Merge branch 'master' of github.com:airbytehq/airbyte into akorotkov/…
andriikorotkov Jun 30, 2022
1901f35
updated documentation and versions of connectors
andriikorotkov Jun 30, 2022
c7044ff
updated ordrs in test spec
andriikorotkov Jun 30, 2022
a48774f
fixed minor remarks in specs and expected_specs
andriikorotkov Jul 1, 2022
ccda43b
Merge branch 'master' of github.com:airbytehq/airbyte into akorotkov/…
andriikorotkov Jul 1, 2022
0d4c5a5
fixed minor remarks in specs and expected_specs
andriikorotkov Jul 1, 2022
9f50f19
fixed Dockerfile
andriikorotkov Jul 1, 2022
a83a378
fixed remarks
andriikorotkov Jul 1, 2022
c57ae51
fixed remarks
andriikorotkov Jul 3, 2022
38f41f6
Merge branch 'master' of github.com:airbytehq/airbyte into akorotkov/…
andriikorotkov Jul 3, 2022
8137628
fixed remarks
andriikorotkov Jul 4, 2022
d78ec36
fixed remarks
andriikorotkov Jul 6, 2022
db80b1d
Merge branch 'master' of github.com:airbytehq/airbyte into akorotkov/…
andriikorotkov Jul 6, 2022
a686fc6
fixed remarks
andriikorotkov Jul 7, 2022
7e45a24
rollback changes for postgres destination and move them to a new pull…
andriikorotkov Jul 7, 2022
eeaf28f
rollback changes for postgres destination and move them to a new pull…
andriikorotkov Jul 7, 2022
033dd94
Merge branch 'master' of github.com:airbytehq/airbyte into akorotkov/…
andriikorotkov Jul 7, 2022
2f8c581
fixed code style
andriikorotkov Jul 7, 2022
0216119
fixed code style
andriikorotkov Jul 7, 2022
4e86961
fixed code style
andriikorotkov Jul 7, 2022
df656ff
fixed code style
andriikorotkov Jul 8, 2022
e4e26c5
fixed code style
andriikorotkov Jul 8, 2022
4c2225b
Merge branch 'master' of github.com:airbytehq/airbyte into akorotkov/…
andriikorotkov Jul 17, 2022
4b6d3cd
auto-bump connector version
octavia-squidington-iii Jul 18, 2022
2b26d0f
updated version of postgres strict-encrypt source
andriikorotkov Jul 18, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.util;

import com.fasterxml.jackson.databind.JsonNode;
import java.io.IOException;
import java.io.PrintWriter;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.RandomStringUtils;

public class PostgresSslConnectionUtils {

private static final String KEY_STORE_PASS = RandomStringUtils.randomAlphanumeric(8);
private static final String CA_CERTIFICATE = "ca.crt";
private static final String CLIENT_CERTIFICATE = "client.crt";
private static final String CLIENT_KEY = "client.key";
private static final String CLIENT_ENCRYPTED_KEY = "client.pk8";

public static final String PARAM_MODE = "mode";
public static final String PARAM_SSL = "ssl";
public static final String PARAM_SSL_MODE = "ssl_mode";
public static final String PARAM_SSLMODE = "sslmode";
public static final String PARAM_CLIENT_KEY_PASSWORD = "client_key_password";
public static final String PARAM_CA_CERTIFICATE = "ca_certificate";
public static final String PARAM_CLIENT_CERTIFICATE = "client_certificate";
public static final String PARAM_CLIENT_KEY = "client_key";

public static final String VERIFY_CA = "verify-ca";
public static final String VERIFY_FULL = "verify-full";
public static final String DISABLE = "disable";
public static final String TRUE_STRING_VALUE = "true";
public static final String FACTORY_VALUE = "org.postgresql.ssl.DefaultJavaSSLFactory";

public static Map<String, String> obtainConnectionOptions(final JsonNode encryption) {
final Map<String, String> additionalParameters = new HashMap<>();
if (!encryption.isNull()) {
final var method = encryption.get(PARAM_MODE).asText();
switch (method) {
case VERIFY_CA -> {
final var clientKeyPassword = getKeyStorePassword(encryption.get(PARAM_CLIENT_KEY_PASSWORD));
additionalParameters.putAll(obtainConnectionCaOptions(encryption, method, clientKeyPassword));
}
case VERIFY_FULL -> {
final var clientKeyPassword = getKeyStorePassword(encryption.get(PARAM_CLIENT_KEY_PASSWORD));
additionalParameters.putAll(obtainConnectionFullOptions(encryption, method, clientKeyPassword));
}
default -> {
additionalParameters.put(PARAM_SSL, TRUE_STRING_VALUE);
additionalParameters.put(PARAM_SSLMODE, method);
}
}
}
return additionalParameters;
}

private static Map<String, String> obtainConnectionFullOptions(final JsonNode encryption,
final String method,
final String clientKeyPassword) {
final Map<String, String> additionalParameters = new HashMap<>();
try {
convertAndImportFullCertificate(encryption.get(PARAM_CA_CERTIFICATE).asText(),
encryption.get(PARAM_CLIENT_CERTIFICATE).asText(), encryption.get(PARAM_CLIENT_KEY).asText(), clientKeyPassword);
} catch (final IOException | InterruptedException e) {
throw new RuntimeException("Failed to import certificate into Java Keystore");
}
additionalParameters.put("ssl", TRUE_STRING_VALUE);
additionalParameters.put("sslmode", method);
additionalParameters.put("sslrootcert", CA_CERTIFICATE);
additionalParameters.put("sslcert", CLIENT_CERTIFICATE);
additionalParameters.put("sslkey", CLIENT_ENCRYPTED_KEY);
additionalParameters.put("sslfactory", FACTORY_VALUE);
return additionalParameters;
}

private static Map<String, String> obtainConnectionCaOptions(final JsonNode encryption,
final String method,
final String clientKeyPassword) {
final Map<String, String> additionalParameters = new HashMap<>();
try {
convertAndImportCaCertificate(encryption.get(PARAM_CA_CERTIFICATE).asText(), clientKeyPassword);
} catch (final IOException | InterruptedException e) {
throw new RuntimeException("Failed to import certificate into Java Keystore");
}
additionalParameters.put("ssl", TRUE_STRING_VALUE);
additionalParameters.put("sslmode", method);
additionalParameters.put("sslrootcert", CA_CERTIFICATE);
additionalParameters.put("sslfactory", FACTORY_VALUE);
return additionalParameters;
}

private static void convertAndImportFullCertificate(final String caCertificate,
final String clientCertificate,
final String clientKey,
final String clientKeyPassword)
throws IOException, InterruptedException {
final Runtime run = Runtime.getRuntime();
createCaCertificate(caCertificate, clientKeyPassword, run);
createCertificateFile(CLIENT_CERTIFICATE, clientCertificate);
createCertificateFile(CLIENT_KEY, clientKey);
// add client certificate to the custom keystore
runProcess("keytool -alias client-certificate -keystore customkeystore"
+ " -import -file " + CLIENT_CERTIFICATE + " -storepass " + clientKeyPassword + " -noprompt", run);
// convert client.key to client.pk8 based on the documentation
runProcess("openssl pkcs8 -topk8 -inform PEM -in " + CLIENT_KEY + " -outform DER -out "
+ CLIENT_ENCRYPTED_KEY + " -nocrypt", run);
runProcess("rm " + CLIENT_KEY, run);

updateTrustStoreSystemProperty(clientKeyPassword);
}

private static void convertAndImportCaCertificate(final String caCertificate,
final String clientKeyPassword)
throws IOException, InterruptedException {
final Runtime run = Runtime.getRuntime();
createCaCertificate(caCertificate, clientKeyPassword, run);
updateTrustStoreSystemProperty(clientKeyPassword);
}

private static void createCaCertificate(final String caCertificate,
final String clientKeyPassword,
final Runtime run) throws IOException, InterruptedException {
createCertificateFile(CA_CERTIFICATE, caCertificate);
// add CA certificate to the custom keystore
runProcess("keytool -import -alias rds-root -keystore customkeystore"
+ " -file " + CA_CERTIFICATE + " -storepass " + clientKeyPassword + " -noprompt", run);
}

private static void updateTrustStoreSystemProperty(final String clientKeyPassword) {
String result = System.getProperty("user.dir") + "/customkeystore";
System.setProperty("javax.net.ssl.trustStore", result);
System.setProperty("javax.net.ssl.trustStorePassword", clientKeyPassword);
}

private static void createCertificateFile(String fileName, String fileValue) throws IOException {
try (final PrintWriter out = new PrintWriter(fileName, StandardCharsets.UTF_8)) {
out.print(fileValue);
}
}

private static String getKeyStorePassword(final JsonNode sslMode) {
var keyStorePassword = KEY_STORE_PASS;
if (!sslMode.isNull() || !sslMode.isEmpty()) {
keyStorePassword = sslMode.asText();
}
return keyStorePassword;
}

private static void runProcess(final String cmd, final Runtime run) throws IOException, InterruptedException {
final Process pr = run.exec(cmd);
if (!pr.waitFor(30, TimeUnit.SECONDS)) {
pr.destroy();
throw new RuntimeException("Timeout while executing: " + cmd);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@

package io.airbyte.integrations.destination.postgres;

import static io.airbyte.integrations.util.PostgresSslConnectionUtils.DISABLE;
import static io.airbyte.integrations.util.PostgresSslConnectionUtils.PARAM_MODE;
import static io.airbyte.integrations.util.PostgresSslConnectionUtils.PARAM_SSL;
import static io.airbyte.integrations.util.PostgresSslConnectionUtils.PARAM_SSL_MODE;
import static io.airbyte.integrations.util.PostgresSslConnectionUtils.obtainConnectionOptions;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
Expand All @@ -12,7 +18,7 @@
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.integrations.base.ssh.SshWrappedDestination;
import io.airbyte.integrations.destination.jdbc.AbstractJdbcDestination;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -30,14 +36,9 @@ public class PostgresDestination extends AbstractJdbcDestination implements Dest
public static final String JDBC_URL_KEY = "jdbc_url";
public static final String JDBC_URL_PARAMS_KEY = "jdbc_url_params";
public static final String PASSWORD_KEY = "password";
public static final String SSL_KEY = "ssl";
public static final String USERNAME_KEY = "username";
public static final String SCHEMA_KEY = "schema";

static final Map<String, String> SSL_JDBC_PARAMETERS = ImmutableMap.of(
"ssl", "true",
"sslmode", "require");

public static Destination sshWrappedDestination() {
return new SshWrappedDestination(new PostgresDestination(), HOST_KEY, PORT_KEY);
}
Expand All @@ -48,12 +49,15 @@ public PostgresDestination() {

@Override
protected Map<String, String> getDefaultConnectionProperties(final JsonNode config) {
if (useSsl(config)) {
return SSL_JDBC_PARAMETERS;
} else {
// No need for any parameters if the connection doesn't use SSL
return Collections.emptyMap();
final Map<String, String> additionalParameters = new HashMap<>();
andriikorotkov marked this conversation as resolved.
Show resolved Hide resolved
if (!config.has(PARAM_SSL) || config.get(PARAM_SSL).asBoolean()) {
if (DISABLE.equals(config.get(PARAM_SSL_MODE).get(PARAM_MODE).asText())) {
additionalParameters.put("sslmode", DISABLE);
} else {
additionalParameters.putAll(obtainConnectionOptions(config.get(PARAM_SSL_MODE)));
}
}
return additionalParameters;
}

@Override
Expand Down Expand Up @@ -81,10 +85,6 @@ public JsonNode toJdbcConfig(final JsonNode config) {
return Jsons.jsonNode(configBuilder.build());
}

private boolean useSsl(final JsonNode config) {
return !config.has(SSL_KEY) || config.get(SSL_KEY).asBoolean();
}

public static void main(final String[] args) throws Exception {
final Destination destination = PostgresDestination.sshWrappedDestination();
LOGGER.info("starting destination: {}", PostgresDestination.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,155 @@
"default": false,
"order": 6
},
"ssl_mode": {
"title": "SSL modes",
"description": "SSL modes.",
"type": "object",
"order": 7,
"oneOf": [
{
"title": "Disable",
andriikorotkov marked this conversation as resolved.
Show resolved Hide resolved
"additionalProperties": false,
"description": "Disable SSL.",
"required": ["mode"],
"properties": {
"mode": {
"type": "string",
"const": "disable",
"enum": ["disable"],
"default": "disable",
"order": 0
}
}
},
{
"title": "Allow",
"additionalProperties": false,
"description": "Allow SSL mode.",
"required": ["mode"],
"properties": {
"mode": {
"type": "string",
"const": "allow",
"enum": ["allow"],
"default": "allow",
"order": 0
}
}
},
{
"title": "Prefer",
"additionalProperties": false,
"description": "Prefer SSL mode.",
"required": ["mode"],
"properties": {
"mode": {
"type": "string",
"const": "prefer",
"enum": ["prefer"],
"default": "prefer",
"order": 0
}
}
},
{
"title": "Require",
"additionalProperties": false,
"description": "Require SSL mode.",
"required": ["mode"],
"properties": {
"mode": {
"type": "string",
"const": "require",
"enum": ["require"],
"default": "require",
"order": 0
}
}
},
{
"title": "Verify-ca",
"additionalProperties": false,
"description": "Verify-ca SSL mode.",
"required": ["mode"],
"properties": {
"mode": {
"type": "string",
"const": "verify-ca",
"enum": ["verify-ca"],
"default": "verify-ca",
"order": 0
},
"ca_certificate": {
"type": "string",
"title": "CA certificate",
"description": "CA certificate",
"airbyte_secret": true,
"multiline": true,
"order": 1
},
"client_key_password": {
"type": "string",
"title": "Client key password (Optional)",
"description": "Password for keystorage. This field is optional. If you do not add it - the password will be generated automatically.",
"airbyte_secret": true,
"order": 4
}
}
},
{
"title": "Verify-full",
"additionalProperties": false,
"description": "Verify-full SSL mode.",
"required": ["mode"],
"properties": {
"mode": {
"type": "string",
"const": "verify-full",
"enum": ["verify-full"],
"default": "verify-full",
"order": 0
},
"ca_certificate": {
"type": "string",
"title": "CA certificate",
"description": "CA certificate",
"airbyte_secret": true,
"multiline": true,
"order": 1
},
"client_certificate": {
"type": "string",
"title": "Client certificate",
"description": "Client certificate",
"airbyte_secret": true,
"multiline": true,
"order": 2
},
"client_key": {
"type": "string",
"title": "Client key",
"description": "Client key",
"airbyte_secret": true,
"multiline": true,
"order": 3
},
"client_key_password": {
"type": "string",
"title": "Client key password (Optional)",
"description": "Password for keystorage. This field is optional. If you do not add it - the password will be generated automatically.",
"airbyte_secret": true,
"order": 4
}
}
}
]
},
"jdbc_url_params": {
"description": "Additional properties to pass to the JDBC URL string when connecting to the database formatted as 'key=value' pairs separated by the symbol '&'. (example: key1=value1&key2=value2&key3=value3).",
"title": "JDBC URL Params",
"type": "string",
"order": 7
"order": 8
}
}
}
Expand Down
Loading