diff --git a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml
index 0fa910b2a198..e15f81b1015e 100644
--- a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml
+++ b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml
@@ -203,7 +203,7 @@
- name: Postgres
destinationDefinitionId: 25c5221d-dce2-4163-ade9-739ef790f503
dockerRepository: airbyte/destination-postgres
- dockerImageTag: 0.3.21
+ dockerImageTag: 0.3.22
documentationUrl: https://docs.airbyte.io/integrations/destinations/postgres
icon: postgresql.svg
releaseStage: alpha
diff --git a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml
index ad360c84386f..de9469fc0280 100644
--- a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml
+++ b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml
@@ -3302,7 +3302,7 @@
supported_destination_sync_modes:
- "overwrite"
- "append"
-- dockerImage: "airbyte/destination-postgres:0.3.21"
+- dockerImage: "airbyte/destination-postgres:0.3.22"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/destinations/postgres"
connectionSpecification:
@@ -3360,17 +3360,161 @@
order: 5
ssl:
title: "SSL Connection"
- description: "Encrypt data using SSL."
+ description: "Encrypt data using SSL. When activating SSL, please select\
+ \ one of the connection modes."
type: "boolean"
default: false
order: 6
+ ssl_mode:
+ title: "SSL modes"
+ description: "SSL connection modes. \n disable - Chose this mode\
+ \ to disable encryption of communication between Airbyte and destination\
+ \ database\n allow - Chose this mode to enable encryption only\
+ \ when required by the source database\n prefer - Chose this mode\
+ \ to allow unencrypted connection only if the source database does not\
+ \ support encryption\n require - Chose this mode to always require\
+ \ encryption. If the source database server does not support encryption,\
+ \ connection will fail\n verify-ca - Chose this mode to always\
+ \ require encryption and to verify that the source database server has\
+ \ a valid SSL certificate\n verify-full - This is the most secure\
+ \ mode. Chose this mode to always require encryption and to verify the\
+ \ identity of the source database server\n See more information - in the\
+ \ docs."
+ type: "object"
+ order: 7
+ oneOf:
+ - title: "disable"
+ additionalProperties: false
+ description: "Disable SSL."
+ required:
+ - "mode"
+ properties:
+ mode:
+ type: "string"
+ const: "disable"
+ enum:
+ - "disable"
+ default: "disable"
+ order: 0
+ - title: "allow"
+ additionalProperties: false
+ description: "Allow SSL mode."
+ required:
+ - "mode"
+ properties:
+ mode:
+ type: "string"
+ const: "allow"
+ enum:
+ - "allow"
+ default: "allow"
+ order: 0
+ - title: "prefer"
+ additionalProperties: false
+ description: "Prefer SSL mode."
+ required:
+ - "mode"
+ properties:
+ mode:
+ type: "string"
+ const: "prefer"
+ enum:
+ - "prefer"
+ default: "prefer"
+ order: 0
+ - title: "require"
+ additionalProperties: false
+ description: "Require SSL mode."
+ required:
+ - "mode"
+ properties:
+ mode:
+ type: "string"
+ const: "require"
+ enum:
+ - "require"
+ default: "require"
+ order: 0
+ - title: "verify-ca"
+ additionalProperties: false
+ description: "Verify-ca SSL mode."
+ required:
+ - "mode"
+ - "ca_certificate"
+ properties:
+ mode:
+ type: "string"
+ const: "verify-ca"
+ enum:
+ - "verify-ca"
+ default: "verify-ca"
+ order: 0
+ ca_certificate:
+ type: "string"
+ title: "CA certificate"
+ description: "CA certificate"
+ airbyte_secret: true
+ multiline: true
+ order: 1
+ client_key_password:
+ type: "string"
+ title: "Client key password (Optional)"
+ description: "Password for keystorage. This field is optional. If\
+ \ you do not add it - the password will be generated automatically."
+ airbyte_secret: true
+ order: 4
+ - title: "verify-full"
+ additionalProperties: false
+ description: "Verify-full SSL mode."
+ required:
+ - "mode"
+ - "ca_certificate"
+ - "client_certificate"
+ - "client_key"
+ properties:
+ mode:
+ type: "string"
+ const: "verify-full"
+ enum:
+ - "verify-full"
+ default: "verify-full"
+ order: 0
+ ca_certificate:
+ type: "string"
+ title: "CA certificate"
+ description: "CA certificate"
+ airbyte_secret: true
+ multiline: true
+ order: 1
+ client_certificate:
+ type: "string"
+ title: "Client certificate"
+ description: "Client certificate"
+ airbyte_secret: true
+ multiline: true
+ order: 2
+ client_key:
+ type: "string"
+ title: "Client key"
+ description: "Client key"
+ airbyte_secret: true
+ multiline: true
+ order: 3
+ client_key_password:
+ type: "string"
+ title: "Client key password (Optional)"
+ description: "Password for keystorage. This field is optional. If\
+ \ you do not add it - the password will be generated automatically."
+ airbyte_secret: true
+ order: 4
jdbc_url_params:
description: "Additional properties to pass to the JDBC URL string when\
\ connecting to the database formatted as 'key=value' pairs separated\
\ by the symbol '&'. (example: key1=value1&key2=value2&key3=value3)."
title: "JDBC URL Params"
type: "string"
- order: 7
+ order: 8
tunnel_method:
type: "object"
title: "SSH Tunnel Method"
diff --git a/airbyte-db/db-lib/src/main/java/io/airbyte/db/PostgresUtils.java b/airbyte-db/db-lib/src/main/java/io/airbyte/db/PostgresUtils.java
index e0b0e1de3f41..ba50b0bfc044 100644
--- a/airbyte-db/db-lib/src/main/java/io/airbyte/db/PostgresUtils.java
+++ b/airbyte-db/db-lib/src/main/java/io/airbyte/db/PostgresUtils.java
@@ -33,7 +33,7 @@ public static Certificate getCertificate(final PostgreSQLContainer> container)
container.execInContainer("su", "-c", "psql -U test -c \"ALTER USER postgres WITH SUPERUSER;\"");
container.execInContainer("su", "-c", "openssl ecparam -name prime256v1 -genkey -noout -out ca.key");
- container.execInContainer("su", "-c", "openssl req -new -x509 -sha256 -key ca.key -out ca.crt -subj \"/CN=localhost\"");
+ container.execInContainer("su", "-c", "openssl req -new -x509 -sha256 -key ca.key -out ca.crt -subj \"/CN=127.0.0.1\"");
container.execInContainer("su", "-c", "openssl ecparam -name prime256v1 -genkey -noout -out server.key");
container.execInContainer("su", "-c", "openssl req -new -sha256 -key server.key -out server.csr -subj \"/CN=localhost\"");
container.execInContainer("su", "-c",
diff --git a/airbyte-db/db-lib/src/main/java/io/airbyte/db/jdbc/JdbcUtils.java b/airbyte-db/db-lib/src/main/java/io/airbyte/db/jdbc/JdbcUtils.java
index 8e0182e41ab2..3ed570a194e8 100644
--- a/airbyte-db/db-lib/src/main/java/io/airbyte/db/jdbc/JdbcUtils.java
+++ b/airbyte-db/db-lib/src/main/java/io/airbyte/db/jdbc/JdbcUtils.java
@@ -29,6 +29,7 @@ public class JdbcUtils {
// NOTE: this is the plural version of SCHEMA_KEY
public static final String SCHEMAS_KEY = "schemas";
public static final String SSL_KEY = "ssl";
+ public static final String SSL_MODE_KEY = "ssl_mode";
public static final String TLS_KEY = "tls";
public static final String USERNAME_KEY = "username";
private static final JdbcSourceOperations defaultSourceOperations = new JdbcSourceOperations();
diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/util/PostgresSslConnectionUtils.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/util/PostgresSslConnectionUtils.java
index 2c66dc31cad7..2c326df0ac1d 100644
--- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/util/PostgresSslConnectionUtils.java
+++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/util/PostgresSslConnectionUtils.java
@@ -5,6 +5,10 @@
package io.airbyte.integrations.util;
import com.fasterxml.jackson.databind.JsonNode;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
import java.io.IOException;
import java.io.PrintWriter;
import java.nio.charset.StandardCharsets;
@@ -18,7 +22,6 @@
public class PostgresSslConnectionUtils {
private static final Logger LOGGER = LoggerFactory.getLogger(PostgresSslConnectionUtils.class);
- private static final String KEY_STORE_PASS = RandomStringUtils.randomAlphanumeric(10);
private static final String CA_CERTIFICATE = "ca.crt";
private static final String CLIENT_CERTIFICATE = "client.crt";
private static final String CLIENT_KEY = "client.key";
@@ -37,17 +40,14 @@ public class PostgresSslConnectionUtils {
public static final String VERIFY_FULL = "verify-full";
public static final String DISABLE = "disable";
public static final String TRUE_STRING_VALUE = "true";
+ public static final String ENCRYPT_FILE_NAME = "encrypt";
public static final String FACTORY_VALUE = "org.postgresql.ssl.DefaultJavaSSLFactory";
public static Map obtainConnectionOptions(final JsonNode encryption) {
final Map additionalParameters = new HashMap<>();
if (!encryption.isNull()) {
final var method = encryption.get(PARAM_MODE).asText();
- String sslPassword = encryption.has(PARAM_CLIENT_KEY_PASSWORD) ? encryption.get(PARAM_CLIENT_KEY_PASSWORD).asText() : "";
- var keyStorePassword = KEY_STORE_PASS;
- if (!sslPassword.isEmpty()) {
- keyStorePassword = sslPassword;
- }
+ var keyStorePassword = checkOrCreatePassword(encryption);
switch (method) {
case VERIFY_CA -> {
additionalParameters.putAll(obtainConnectionCaOptions(encryption, method, keyStorePassword));
@@ -64,6 +64,37 @@ public static Map obtainConnectionOptions(final JsonNode encrypt
return additionalParameters;
}
+ private static String checkOrCreatePassword(final JsonNode encryption) {
+ String sslPassword = encryption.has(PARAM_CLIENT_KEY_PASSWORD) ? encryption.get(PARAM_CLIENT_KEY_PASSWORD).asText() : "";
+ var keyStorePassword = RandomStringUtils.randomAlphanumeric(10);
+ if (sslPassword.isEmpty()) {
+ var file = new File(ENCRYPT_FILE_NAME);
+ if (file.exists()) {
+ keyStorePassword = readFile(file);
+ } else {
+ try {
+ createCertificateFile(ENCRYPT_FILE_NAME, keyStorePassword);
+ } catch (final IOException e) {
+ throw new RuntimeException("Failed to create encryption file ");
+ }
+ }
+ } else {
+ keyStorePassword = sslPassword;
+ }
+ return keyStorePassword;
+ }
+
+ private static String readFile(final File file) {
+ try {
+ BufferedReader reader = new BufferedReader(new FileReader(file));
+ String currentLine = reader.readLine();
+ reader.close();
+ return currentLine;
+ } catch (final IOException e) {
+ throw new RuntimeException("Failed to read file with encryption");
+ }
+ }
+
private static Map obtainConnectionFullOptions(final JsonNode encryption,
final String method,
final String clientKeyPassword) {
diff --git a/airbyte-integrations/connectors/destination-postgres-strict-encrypt/Dockerfile b/airbyte-integrations/connectors/destination-postgres-strict-encrypt/Dockerfile
index 643721a7a75a..12173ee58a01 100644
--- a/airbyte-integrations/connectors/destination-postgres-strict-encrypt/Dockerfile
+++ b/airbyte-integrations/connectors/destination-postgres-strict-encrypt/Dockerfile
@@ -16,5 +16,5 @@ ENV APPLICATION destination-postgres-strict-encrypt
COPY --from=build /airbyte /airbyte
-LABEL io.airbyte.version=0.3.21
+LABEL io.airbyte.version=0.3.22
LABEL io.airbyte.name=airbyte/destination-postgres-strict-encrypt
diff --git a/airbyte-integrations/connectors/destination-postgres-strict-encrypt/src/main/java/io/airbyte/integrations/destination/postgres/PostgresDestinationStrictEncrypt.java b/airbyte-integrations/connectors/destination-postgres-strict-encrypt/src/main/java/io/airbyte/integrations/destination/postgres/PostgresDestinationStrictEncrypt.java
index 729898cd701a..b87e3e118016 100644
--- a/airbyte-integrations/connectors/destination-postgres-strict-encrypt/src/main/java/io/airbyte/integrations/destination/postgres/PostgresDestinationStrictEncrypt.java
+++ b/airbyte-integrations/connectors/destination-postgres-strict-encrypt/src/main/java/io/airbyte/integrations/destination/postgres/PostgresDestinationStrictEncrypt.java
@@ -4,6 +4,7 @@
package io.airbyte.integrations.destination.postgres;
+import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.jdbc.JdbcUtils;
@@ -17,6 +18,8 @@
public class PostgresDestinationStrictEncrypt extends SpecModifyingDestination implements Destination {
private static final Logger LOGGER = LoggerFactory.getLogger(PostgresDestinationStrictEncrypt.class);
+ private static final String PROPERTIES = "properties";
+ private static final String ONE_OF_PROPERTY = "oneOf";
public PostgresDestinationStrictEncrypt() {
super(PostgresDestination.sshWrappedDestination());
@@ -25,7 +28,14 @@ public PostgresDestinationStrictEncrypt() {
@Override
public ConnectorSpecification modifySpec(final ConnectorSpecification originalSpec) {
final ConnectorSpecification spec = Jsons.clone(originalSpec);
- ((ObjectNode) spec.getConnectionSpecification().get("properties")).remove(JdbcUtils.SSL_KEY);
+ ((ObjectNode) spec.getConnectionSpecification().get(PROPERTIES)).remove(JdbcUtils.SSL_KEY);
+ ArrayNode modifiedSslModes = spec.getConnectionSpecification().get(PROPERTIES).get(JdbcUtils.SSL_MODE_KEY).get(ONE_OF_PROPERTY).deepCopy();
+ // Assume that the first item is the "allow" option; remove it
+ modifiedSslModes.remove(1);
+ // Assume that the first item is the "disable" option; remove it
+ modifiedSslModes.remove(0);
+ ((ObjectNode) spec.getConnectionSpecification().get(PROPERTIES).get(JdbcUtils.SSL_MODE_KEY)).remove(ONE_OF_PROPERTY);
+ ((ObjectNode) spec.getConnectionSpecification().get(PROPERTIES).get(JdbcUtils.SSL_MODE_KEY)).put(ONE_OF_PROPERTY, modifiedSslModes);
return spec;
}
diff --git a/airbyte-integrations/connectors/destination-postgres-strict-encrypt/src/test-integration/java/io/airbyte/integrations/destination/postgres/PostgresDestinationStrictEncryptAcceptanceTest.java b/airbyte-integrations/connectors/destination-postgres-strict-encrypt/src/test-integration/java/io/airbyte/integrations/destination/postgres/PostgresDestinationStrictEncryptAcceptanceTest.java
index fcee51cf3a2e..82c20d4dfd15 100644
--- a/airbyte-integrations/connectors/destination-postgres-strict-encrypt/src/test-integration/java/io/airbyte/integrations/destination/postgres/PostgresDestinationStrictEncryptAcceptanceTest.java
+++ b/airbyte-integrations/connectors/destination-postgres-strict-encrypt/src/test-integration/java/io/airbyte/integrations/destination/postgres/PostgresDestinationStrictEncryptAcceptanceTest.java
@@ -4,10 +4,13 @@
package io.airbyte.integrations.destination.postgres;
+import static io.airbyte.db.PostgresUtils.getCertificate;
+
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.Database;
+import io.airbyte.db.PostgresUtils;
import io.airbyte.db.factory.DSLContextFactory;
import io.airbyte.db.factory.DatabaseDriver;
import io.airbyte.db.jdbc.JdbcUtils;
@@ -29,6 +32,9 @@ public class PostgresDestinationStrictEncryptAcceptanceTest extends DestinationA
private PostgreSQLContainer> db;
private final ExtendedNameTransformer namingResolver = new ExtendedNameTransformer();
+ protected static final String PASSWORD = "Passw0rd";
+ protected static PostgresUtils.Certificate certs;
+
@Override
protected String getImageName() {
return "airbyte/destination-postgres-strict-encrypt:dev";
@@ -43,6 +49,13 @@ protected JsonNode getConfig() {
.put(JdbcUtils.SCHEMA_KEY, "public")
.put(JdbcUtils.PORT_KEY, db.getFirstMappedPort())
.put(JdbcUtils.DATABASE_KEY, db.getDatabaseName())
+ .put(JdbcUtils.SSL_MODE_KEY, ImmutableMap.builder()
+ .put("mode", "verify-full")
+ .put("ca_certificate", certs.getCaCertificate())
+ .put("client_certificate", certs.getClientCertificate())
+ .put("client_key", certs.getClientKey())
+ .put("client_key_password", PASSWORD)
+ .build())
.build());
}
@@ -131,10 +144,11 @@ private List retrieveRecordsFromTable(final String tableName, final St
}
@Override
- protected void setup(final TestDestinationEnv testEnv) {
- db = new PostgreSQLContainer<>(DockerImageName.parse("marcosmarxm/postgres-ssl:dev").asCompatibleSubstituteFor("postgres"))
- .withCommand("postgres -c ssl=on -c ssl_cert_file=/var/lib/postgresql/server.crt -c ssl_key_file=/var/lib/postgresql/server.key");
+ protected void setup(final TestDestinationEnv testEnv) throws Exception {
+ db = new PostgreSQLContainer<>(DockerImageName.parse("postgres:bullseye")
+ .asCompatibleSubstituteFor("postgres"));
db.start();
+ certs = getCertificate(db);
}
@Override
diff --git a/airbyte-integrations/connectors/destination-postgres-strict-encrypt/src/test/resources/expected_spec.json b/airbyte-integrations/connectors/destination-postgres-strict-encrypt/src/test/resources/expected_spec.json
index 8ba1678fcb55..ad07d3eccfe8 100644
--- a/airbyte-integrations/connectors/destination-postgres-strict-encrypt/src/test/resources/expected_spec.json
+++ b/airbyte-integrations/connectors/destination-postgres-strict-encrypt/src/test/resources/expected_spec.json
@@ -54,11 +54,130 @@
"airbyte_secret": true,
"order": 5
},
+ "ssl_mode": {
+ "title": "SSL modes",
+ "description": "SSL connection modes. \n disable - Chose this mode to disable encryption of communication between Airbyte and destination database\n allow - Chose this mode to enable encryption only when required by the source database\n prefer - Chose this mode to allow unencrypted connection only if the source database does not support encryption\n require - Chose this mode to always require encryption. If the source database server does not support encryption, connection will fail\n verify-ca - Chose this mode to always require encryption and to verify that the source database server has a valid SSL certificate\n verify-full - This is the most secure mode. Chose this mode to always require encryption and to verify the identity of the source database server\n See more information - in the docs.",
+ "type": "object",
+ "order": 7,
+ "oneOf": [
+ {
+ "title": "prefer",
+ "additionalProperties": false,
+ "description": "Prefer SSL mode.",
+ "required": ["mode"],
+ "properties": {
+ "mode": {
+ "type": "string",
+ "const": "prefer",
+ "enum": ["prefer"],
+ "default": "prefer",
+ "order": 0
+ }
+ }
+ },
+ {
+ "title": "require",
+ "additionalProperties": false,
+ "description": "Require SSL mode.",
+ "required": ["mode"],
+ "properties": {
+ "mode": {
+ "type": "string",
+ "const": "require",
+ "enum": ["require"],
+ "default": "require",
+ "order": 0
+ }
+ }
+ },
+ {
+ "title": "verify-ca",
+ "additionalProperties": false,
+ "description": "Verify-ca SSL mode.",
+ "required": ["mode", "ca_certificate"],
+ "properties": {
+ "mode": {
+ "type": "string",
+ "const": "verify-ca",
+ "enum": ["verify-ca"],
+ "default": "verify-ca",
+ "order": 0
+ },
+ "ca_certificate": {
+ "type": "string",
+ "title": "CA certificate",
+ "description": "CA certificate",
+ "airbyte_secret": true,
+ "multiline": true,
+ "order": 1
+ },
+ "client_key_password": {
+ "type": "string",
+ "title": "Client key password (Optional)",
+ "description": "Password for keystorage. This field is optional. If you do not add it - the password will be generated automatically.",
+ "airbyte_secret": true,
+ "order": 4
+ }
+ }
+ },
+ {
+ "title": "verify-full",
+ "additionalProperties": false,
+ "description": "Verify-full SSL mode.",
+ "required": [
+ "mode",
+ "ca_certificate",
+ "client_certificate",
+ "client_key"
+ ],
+ "properties": {
+ "mode": {
+ "type": "string",
+ "const": "verify-full",
+ "enum": ["verify-full"],
+ "default": "verify-full",
+ "order": 0
+ },
+ "ca_certificate": {
+ "type": "string",
+ "title": "CA certificate",
+ "description": "CA certificate",
+ "airbyte_secret": true,
+ "multiline": true,
+ "order": 1
+ },
+ "client_certificate": {
+ "type": "string",
+ "title": "Client certificate",
+ "description": "Client certificate",
+ "airbyte_secret": true,
+ "multiline": true,
+ "order": 2
+ },
+ "client_key": {
+ "type": "string",
+ "title": "Client key",
+ "description": "Client key",
+ "airbyte_secret": true,
+ "multiline": true,
+ "order": 3
+ },
+ "client_key_password": {
+ "type": "string",
+ "title": "Client key password (Optional)",
+ "description": "Password for keystorage. This field is optional. If you do not add it - the password will be generated automatically.",
+ "airbyte_secret": true,
+ "order": 4
+ }
+ }
+ }
+ ]
+ },
"jdbc_url_params": {
"description": "Additional properties to pass to the JDBC URL string when connecting to the database formatted as 'key=value' pairs separated by the symbol '&'. (example: key1=value1&key2=value2&key3=value3).",
"title": "JDBC URL Params",
"type": "string",
- "order": 7
+ "order": 8
},
"tunnel_method": {
"type": "object",
diff --git a/airbyte-integrations/connectors/destination-postgres/Dockerfile b/airbyte-integrations/connectors/destination-postgres/Dockerfile
index 308263a7b563..41e7f5f649dd 100644
--- a/airbyte-integrations/connectors/destination-postgres/Dockerfile
+++ b/airbyte-integrations/connectors/destination-postgres/Dockerfile
@@ -16,5 +16,5 @@ ENV APPLICATION destination-postgres
COPY --from=build /airbyte /airbyte
-LABEL io.airbyte.version=0.3.21
+LABEL io.airbyte.version=0.3.22
LABEL io.airbyte.name=airbyte/destination-postgres
diff --git a/airbyte-integrations/connectors/destination-postgres/src/main/java/io/airbyte/integrations/destination/postgres/PostgresDestination.java b/airbyte-integrations/connectors/destination-postgres/src/main/java/io/airbyte/integrations/destination/postgres/PostgresDestination.java
index 0d426f215cbd..1204f472c2c6 100644
--- a/airbyte-integrations/connectors/destination-postgres/src/main/java/io/airbyte/integrations/destination/postgres/PostgresDestination.java
+++ b/airbyte-integrations/connectors/destination-postgres/src/main/java/io/airbyte/integrations/destination/postgres/PostgresDestination.java
@@ -4,6 +4,12 @@
package io.airbyte.integrations.destination.postgres;
+import static io.airbyte.integrations.util.PostgresSslConnectionUtils.DISABLE;
+import static io.airbyte.integrations.util.PostgresSslConnectionUtils.PARAM_MODE;
+import static io.airbyte.integrations.util.PostgresSslConnectionUtils.PARAM_SSL;
+import static io.airbyte.integrations.util.PostgresSslConnectionUtils.PARAM_SSL_MODE;
+import static io.airbyte.integrations.util.PostgresSslConnectionUtils.obtainConnectionOptions;
+
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
@@ -13,7 +19,7 @@
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.integrations.base.ssh.SshWrappedDestination;
import io.airbyte.integrations.destination.jdbc.AbstractJdbcDestination;
-import java.util.Collections;
+import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import org.slf4j.Logger;
@@ -25,10 +31,6 @@ public class PostgresDestination extends AbstractJdbcDestination implements Dest
public static final String DRIVER_CLASS = DatabaseDriver.POSTGRESQL.getDriverClassName();
- static final Map SSL_JDBC_PARAMETERS = ImmutableMap.of(
- JdbcUtils.SSL_KEY, "true",
- "sslmode", "require");
-
public static Destination sshWrappedDestination() {
return new SshWrappedDestination(new PostgresDestination(), JdbcUtils.HOST_LIST_KEY, JdbcUtils.PORT_LIST_KEY);
}
@@ -39,12 +41,20 @@ public PostgresDestination() {
@Override
protected Map getDefaultConnectionProperties(final JsonNode config) {
- if (JdbcUtils.useSsl(config)) {
- return SSL_JDBC_PARAMETERS;
- } else {
- // No need for any parameters if the connection doesn't use SSL
- return Collections.emptyMap();
+ final Map additionalParameters = new HashMap<>();
+ if (!config.has(PARAM_SSL) || config.get(PARAM_SSL).asBoolean()) {
+ if (config.has(PARAM_SSL_MODE)) {
+ if (DISABLE.equals(config.get(PARAM_SSL_MODE).get(PARAM_MODE).asText())) {
+ additionalParameters.put("sslmode", DISABLE);
+ } else {
+ additionalParameters.putAll(obtainConnectionOptions(config.get(PARAM_SSL_MODE)));
+ }
+ } else {
+ additionalParameters.put(JdbcUtils.SSL_KEY, "true");
+ additionalParameters.put("sslmode", "require");
+ }
}
+ return additionalParameters;
}
@Override
diff --git a/airbyte-integrations/connectors/destination-postgres/src/main/resources/spec.json b/airbyte-integrations/connectors/destination-postgres/src/main/resources/spec.json
index 1d475c9cf8b0..9117125c6553 100644
--- a/airbyte-integrations/connectors/destination-postgres/src/main/resources/spec.json
+++ b/airbyte-integrations/connectors/destination-postgres/src/main/resources/spec.json
@@ -56,16 +56,165 @@
},
"ssl": {
"title": "SSL Connection",
- "description": "Encrypt data using SSL.",
+ "description": "Encrypt data using SSL. When activating SSL, please select one of the connection modes.",
"type": "boolean",
"default": false,
"order": 6
},
+ "ssl_mode": {
+ "title": "SSL modes",
+ "description": "SSL connection modes. \n disable - Chose this mode to disable encryption of communication between Airbyte and destination database\n allow - Chose this mode to enable encryption only when required by the source database\n prefer - Chose this mode to allow unencrypted connection only if the source database does not support encryption\n require - Chose this mode to always require encryption. If the source database server does not support encryption, connection will fail\n verify-ca - Chose this mode to always require encryption and to verify that the source database server has a valid SSL certificate\n verify-full - This is the most secure mode. Chose this mode to always require encryption and to verify the identity of the source database server\n See more information - in the docs.",
+ "type": "object",
+ "order": 7,
+ "oneOf": [
+ {
+ "title": "disable",
+ "additionalProperties": false,
+ "description": "Disable SSL.",
+ "required": ["mode"],
+ "properties": {
+ "mode": {
+ "type": "string",
+ "const": "disable",
+ "enum": ["disable"],
+ "default": "disable",
+ "order": 0
+ }
+ }
+ },
+ {
+ "title": "allow",
+ "additionalProperties": false,
+ "description": "Allow SSL mode.",
+ "required": ["mode"],
+ "properties": {
+ "mode": {
+ "type": "string",
+ "const": "allow",
+ "enum": ["allow"],
+ "default": "allow",
+ "order": 0
+ }
+ }
+ },
+ {
+ "title": "prefer",
+ "additionalProperties": false,
+ "description": "Prefer SSL mode.",
+ "required": ["mode"],
+ "properties": {
+ "mode": {
+ "type": "string",
+ "const": "prefer",
+ "enum": ["prefer"],
+ "default": "prefer",
+ "order": 0
+ }
+ }
+ },
+ {
+ "title": "require",
+ "additionalProperties": false,
+ "description": "Require SSL mode.",
+ "required": ["mode"],
+ "properties": {
+ "mode": {
+ "type": "string",
+ "const": "require",
+ "enum": ["require"],
+ "default": "require",
+ "order": 0
+ }
+ }
+ },
+ {
+ "title": "verify-ca",
+ "additionalProperties": false,
+ "description": "Verify-ca SSL mode.",
+ "required": ["mode", "ca_certificate"],
+ "properties": {
+ "mode": {
+ "type": "string",
+ "const": "verify-ca",
+ "enum": ["verify-ca"],
+ "default": "verify-ca",
+ "order": 0
+ },
+ "ca_certificate": {
+ "type": "string",
+ "title": "CA certificate",
+ "description": "CA certificate",
+ "airbyte_secret": true,
+ "multiline": true,
+ "order": 1
+ },
+ "client_key_password": {
+ "type": "string",
+ "title": "Client key password (Optional)",
+ "description": "Password for keystorage. This field is optional. If you do not add it - the password will be generated automatically.",
+ "airbyte_secret": true,
+ "order": 4
+ }
+ }
+ },
+ {
+ "title": "verify-full",
+ "additionalProperties": false,
+ "description": "Verify-full SSL mode.",
+ "required": [
+ "mode",
+ "ca_certificate",
+ "client_certificate",
+ "client_key"
+ ],
+ "properties": {
+ "mode": {
+ "type": "string",
+ "const": "verify-full",
+ "enum": ["verify-full"],
+ "default": "verify-full",
+ "order": 0
+ },
+ "ca_certificate": {
+ "type": "string",
+ "title": "CA certificate",
+ "description": "CA certificate",
+ "airbyte_secret": true,
+ "multiline": true,
+ "order": 1
+ },
+ "client_certificate": {
+ "type": "string",
+ "title": "Client certificate",
+ "description": "Client certificate",
+ "airbyte_secret": true,
+ "multiline": true,
+ "order": 2
+ },
+ "client_key": {
+ "type": "string",
+ "title": "Client key",
+ "description": "Client key",
+ "airbyte_secret": true,
+ "multiline": true,
+ "order": 3
+ },
+ "client_key_password": {
+ "type": "string",
+ "title": "Client key password (Optional)",
+ "description": "Password for keystorage. This field is optional. If you do not add it - the password will be generated automatically.",
+ "airbyte_secret": true,
+ "order": 4
+ }
+ }
+ }
+ ]
+ },
"jdbc_url_params": {
"description": "Additional properties to pass to the JDBC URL string when connecting to the database formatted as 'key=value' pairs separated by the symbol '&'. (example: key1=value1&key2=value2&key3=value3).",
"title": "JDBC URL Params",
"type": "string",
- "order": 7
+ "order": 8
}
}
}
diff --git a/airbyte-integrations/connectors/destination-postgres/src/test-integration/java/io/airbyte/integrations/destination/postgres/PostgresDestinationSSLFullCertificateAcceptanceTest.java b/airbyte-integrations/connectors/destination-postgres/src/test-integration/java/io/airbyte/integrations/destination/postgres/PostgresDestinationSSLFullCertificateAcceptanceTest.java
new file mode 100644
index 000000000000..138f7735376b
--- /dev/null
+++ b/airbyte-integrations/connectors/destination-postgres/src/test-integration/java/io/airbyte/integrations/destination/postgres/PostgresDestinationSSLFullCertificateAcceptanceTest.java
@@ -0,0 +1,159 @@
+/*
+ * Copyright (c) 2022 Airbyte, Inc., all rights reserved.
+ */
+
+package io.airbyte.integrations.destination.postgres;
+
+import static io.airbyte.db.PostgresUtils.getCertificate;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.collect.ImmutableMap;
+import io.airbyte.commons.json.Jsons;
+import io.airbyte.db.Database;
+import io.airbyte.db.PostgresUtils;
+import io.airbyte.db.factory.DSLContextFactory;
+import io.airbyte.db.factory.DatabaseDriver;
+import io.airbyte.integrations.base.JavaBaseConstants;
+import io.airbyte.integrations.destination.ExtendedNameTransformer;
+import io.airbyte.integrations.standardtest.destination.JdbcDestinationAcceptanceTest;
+import io.airbyte.integrations.standardtest.destination.comparator.TestDataComparator;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.jooq.DSLContext;
+import org.jooq.SQLDialect;
+import org.testcontainers.containers.PostgreSQLContainer;
+import org.testcontainers.utility.DockerImageName;
+
+public class PostgresDestinationSSLFullCertificateAcceptanceTest extends JdbcDestinationAcceptanceTest {
+
+ private PostgreSQLContainer> db;
+
+ protected static PostgresUtils.Certificate certs;
+ private final ExtendedNameTransformer namingResolver = new ExtendedNameTransformer();
+
+ @Override
+ protected String getImageName() {
+ return "airbyte/destination-postgres:dev";
+ }
+
+ @Override
+ protected JsonNode getConfig() {
+ return Jsons.jsonNode(ImmutableMap.builder()
+ .put("host", db.getHost())
+ .put("username", "postgres")
+ .put("password", "postgres")
+ .put("schema", "public")
+ .put("port", db.getFirstMappedPort())
+ .put("database", db.getDatabaseName())
+ .put("ssl", true)
+ .put("ssl_mode", ImmutableMap.builder()
+ .put("mode", "verify-full")
+ .put("ca_certificate", certs.getCaCertificate())
+ .put("client_certificate", certs.getClientCertificate())
+ .put("client_key", certs.getClientKey())
+ .put("client_key_password", "Passw0rd")
+ .build())
+ .build());
+ }
+
+ @Override
+ protected JsonNode getFailCheckConfig() {
+ return Jsons.jsonNode(ImmutableMap.builder()
+ .put("host", db.getHost())
+ .put("username", db.getUsername())
+ .put("password", "wrong password")
+ .put("schema", "public")
+ .put("port", db.getFirstMappedPort())
+ .put("database", db.getDatabaseName())
+ .put("ssl", false)
+ .build());
+ }
+
+ @Override
+ protected List retrieveRecords(final TestDestinationEnv env,
+ final String streamName,
+ final String namespace,
+ final JsonNode streamSchema)
+ throws Exception {
+ return retrieveRecordsFromTable(namingResolver.getRawTableName(streamName), namespace)
+ .stream()
+ .map(r -> r.get(JavaBaseConstants.COLUMN_NAME_DATA))
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ protected boolean supportsNormalization() {
+ return true;
+ }
+
+ @Override
+ protected boolean supportsDBT() {
+ return true;
+ }
+
+ @Override
+ protected boolean implementsNamespaces() {
+ return true;
+ }
+
+ @Override
+ protected TestDataComparator getTestDataComparator() {
+ return new PostgresTestDataComparator();
+ }
+
+ @Override
+ protected boolean supportBasicDataTypeTest() {
+ return true;
+ }
+
+ @Override
+ protected boolean supportArrayDataTypeTest() {
+ return true;
+ }
+
+ @Override
+ protected boolean supportObjectDataTypeTest() {
+ return true;
+ }
+
+ @Override
+ protected List retrieveNormalizedRecords(final TestDestinationEnv env, final String streamName, final String namespace)
+ throws Exception {
+ final String tableName = namingResolver.getIdentifier(streamName);
+ return retrieveRecordsFromTable(tableName, namespace);
+ }
+
+ private List retrieveRecordsFromTable(final String tableName, final String schemaName) throws SQLException {
+ try (final DSLContext dslContext = DSLContextFactory.create(
+ db.getUsername(),
+ db.getPassword(),
+ DatabaseDriver.POSTGRESQL.getDriverClassName(),
+ db.getJdbcUrl(),
+ SQLDialect.POSTGRES)) {
+ return new Database(dslContext)
+ .query(ctx -> {
+ ctx.execute("set time zone 'UTC';");
+ return ctx.fetch(String.format("SELECT * FROM %s.%s ORDER BY %s ASC;", schemaName, tableName, JavaBaseConstants.COLUMN_NAME_EMITTED_AT))
+ .stream()
+ .map(this::getJsonFromRecord)
+ .collect(Collectors.toList());
+ });
+ }
+ }
+
+ @Override
+ protected void setup(final TestDestinationEnv testEnv) throws Exception {
+ db = new PostgreSQLContainer<>(DockerImageName.parse("postgres:bullseye")
+ .asCompatibleSubstituteFor("postgres"));
+ db.start();
+ certs = getCertificate(db);
+ }
+
+ @Override
+ protected void tearDown(final TestDestinationEnv testEnv) throws Exception {
+ db.stop();
+ db.close();
+ }
+
+}
diff --git a/airbyte-integrations/connectors/destination-postgres/src/test/java/io/airbyte/integrations/destination/postgres/PostgresDestinationTest.java b/airbyte-integrations/connectors/destination-postgres/src/test/java/io/airbyte/integrations/destination/postgres/PostgresDestinationTest.java
index f7c2d6380b29..a8032f0d8efa 100644
--- a/airbyte-integrations/connectors/destination-postgres/src/test/java/io/airbyte/integrations/destination/postgres/PostgresDestinationTest.java
+++ b/airbyte-integrations/connectors/destination-postgres/src/test/java/io/airbyte/integrations/destination/postgres/PostgresDestinationTest.java
@@ -40,6 +40,10 @@ public class PostgresDestinationTest {
private static final String SCHEMA_NAME = "public";
private static final String STREAM_NAME = "id_and_name";
+
+ static final Map SSL_JDBC_PARAMETERS = ImmutableMap.of(
+ "ssl", "true",
+ "sslmode", "require");
private static final ConfiguredAirbyteCatalog CATALOG = new ConfiguredAirbyteCatalog().withStreams(List.of(
CatalogHelpers.createConfiguredAirbyteStream(
STREAM_NAME,
@@ -56,7 +60,9 @@ private JsonNode buildConfigNoJdbcParameters() {
JdbcUtils.HOST_KEY, "localhost",
JdbcUtils.PORT_KEY, 1337,
JdbcUtils.USERNAME_KEY, "user",
- JdbcUtils.DATABASE_KEY, "db"));
+ JdbcUtils.DATABASE_KEY, "db",
+ "ssl", true,
+ "ssl_mode", ImmutableMap.of("mode", "require")));
}
private JsonNode buildConfigWithExtraJdbcParameters(final String extraParam) {
@@ -123,7 +129,7 @@ void testDefaultParamsNoSSL() {
void testDefaultParamsWithSSL() {
final Map defaultProperties = new PostgresDestination().getDefaultConnectionProperties(
buildConfigNoJdbcParameters());
- assertEquals(PostgresDestination.SSL_JDBC_PARAMETERS, defaultProperties);
+ assertEquals(SSL_JDBC_PARAMETERS, defaultProperties);
}
// This test is a bit redundant with PostgresIntegrationTest. It makes it easy to run the
diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java
index 5cbe4600bf8a..6e0f2dbd157f 100644
--- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java
+++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java
@@ -16,7 +16,6 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableMap.Builder;
import com.google.common.collect.Sets;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.commons.features.FeatureFlags;
diff --git a/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/AbstractPostgresSourceSSLCertificateAcceptanceTest.java b/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/AbstractPostgresSourceSSLCertificateAcceptanceTest.java
index c0bf2c0b4275..2d14210e4c76 100644
--- a/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/AbstractPostgresSourceSSLCertificateAcceptanceTest.java
+++ b/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/AbstractPostgresSourceSSLCertificateAcceptanceTest.java
@@ -57,8 +57,8 @@ protected void setupEnvironment(final TestDestinationEnv environment) throws Exc
.put("port", container.getFirstMappedPort())
.put("database", container.getDatabaseName())
.put("schemas", Jsons.jsonNode(List.of("public")))
- .put("username", container.getUsername())
- .put("password", container.getPassword())
+ .put("username", "postgres")
+ .put("password", "postgres")
.put("ssl", true)
.put("replication_method", replicationMethod)
.put("ssl_mode", getCertificateConfiguration())
diff --git a/docs/integrations/destinations/postgres.md b/docs/integrations/destinations/postgres.md
index 52307ba81fb1..74d2504088dd 100644
--- a/docs/integrations/destinations/postgres.md
+++ b/docs/integrations/destinations/postgres.md
@@ -128,6 +128,7 @@ Now that you have set up the Postgres destination connector, check out the follo
| Version | Date | Pull Request | Subject |
|:--------|:-----------| :--- |:----------------------------------------------------------------------------------------------------|
+| 0.3.22 | 2022-07-18 | [13840](https://github.com/airbytehq/airbyte/pull/13840) | Added the ability to connect using different SSL modes and SSL certificates. |
| 0.3.21 | 2022-07-06 | [14479](https://github.com/airbytehq/airbyte/pull/14479) | Publish amd64 and arm64 versions of the connector |
| 0.3.20 | 2022-05-17 | [12820](https://github.com/airbytehq/airbyte/pull/12820) | Improved 'check' operation performance |
| 0.3.19 | 2022-04-25 | [12195](https://github.com/airbytehq/airbyte/pull/12195) | Add support for additional JDBC URL Params input |