Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Source postgres: Add SSL certificates to source postgres #13840

Merged
merged 42 commits into from
Jul 18, 2022
Merged
Show file tree
Hide file tree
Changes from 39 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
9fef1bb
added ssl certificates for postgres source
andriikorotkov Jun 15, 2022
3f2da64
Merge branch 'master' of github.com:airbytehq/airbyte into akorotkov/…
andriikorotkov Jun 15, 2022
9a48ac3
added command for remove client private key after transformation to e…
andriikorotkov Jun 16, 2022
7c5fe79
added connection with CA and client certificates for postgres destina…
andriikorotkov Jun 16, 2022
3c70c36
updated code style
andriikorotkov Jun 16, 2022
cdea269
moved common methods to the common class
andriikorotkov Jun 16, 2022
c4f623e
moved common methods to the common class
andriikorotkov Jun 16, 2022
a2266b5
fixed remarks
andriikorotkov Jun 17, 2022
a0c49ac
Merge branch 'master' of github.com:airbytehq/airbyte into akorotkov/…
andriikorotkov Jun 17, 2022
fa65a81
updated postgres source tests
andriikorotkov Jun 20, 2022
da7c8ae
Merge branch 'master' of github.com:airbytehq/airbyte into akorotkov/…
andriikorotkov Jun 20, 2022
4189a19
Merge branch 'master' of github.com:airbytehq/airbyte into akorotkov/…
andriikorotkov Jun 20, 2022
59e467f
added minor changes to spec and added fixes to password mechanism
andriikorotkov Jun 22, 2022
5d115da
updated postgres source tests
andriikorotkov Jun 22, 2022
9ae6e8c
updated strict-encrypt postgres source and destination and added test…
andriikorotkov Jun 29, 2022
6f8f726
fixed check style
andriikorotkov Jun 29, 2022
4c9cd85
Merge branch 'master' of github.com:airbytehq/airbyte into akorotkov/…
andriikorotkov Jun 29, 2022
d5a408a
Merge branch 'master' of github.com:airbytehq/airbyte into akorotkov/…
andriikorotkov Jun 30, 2022
1901f35
updated documentation and versions of connectors
andriikorotkov Jun 30, 2022
c7044ff
updated ordrs in test spec
andriikorotkov Jun 30, 2022
a48774f
fixed minor remarks in specs and expected_specs
andriikorotkov Jul 1, 2022
ccda43b
Merge branch 'master' of github.com:airbytehq/airbyte into akorotkov/…
andriikorotkov Jul 1, 2022
0d4c5a5
fixed minor remarks in specs and expected_specs
andriikorotkov Jul 1, 2022
9f50f19
fixed Dockerfile
andriikorotkov Jul 1, 2022
a83a378
fixed remarks
andriikorotkov Jul 1, 2022
c57ae51
fixed remarks
andriikorotkov Jul 3, 2022
38f41f6
Merge branch 'master' of github.com:airbytehq/airbyte into akorotkov/…
andriikorotkov Jul 3, 2022
8137628
fixed remarks
andriikorotkov Jul 4, 2022
d78ec36
fixed remarks
andriikorotkov Jul 6, 2022
db80b1d
Merge branch 'master' of github.com:airbytehq/airbyte into akorotkov/…
andriikorotkov Jul 6, 2022
a686fc6
fixed remarks
andriikorotkov Jul 7, 2022
7e45a24
rollback changes for postgres destination and move them to a new pull…
andriikorotkov Jul 7, 2022
eeaf28f
rollback changes for postgres destination and move them to a new pull…
andriikorotkov Jul 7, 2022
033dd94
Merge branch 'master' of github.com:airbytehq/airbyte into akorotkov/…
andriikorotkov Jul 7, 2022
2f8c581
fixed code style
andriikorotkov Jul 7, 2022
0216119
fixed code style
andriikorotkov Jul 7, 2022
4e86961
fixed code style
andriikorotkov Jul 7, 2022
df656ff
fixed code style
andriikorotkov Jul 8, 2022
e4e26c5
fixed code style
andriikorotkov Jul 8, 2022
4c2225b
Merge branch 'master' of github.com:airbytehq/airbyte into akorotkov/…
andriikorotkov Jul 17, 2022
4b6d3cd
auto-bump connector version
octavia-squidington-iii Jul 18, 2022
2b26d0f
updated version of postgres strict-encrypt source
andriikorotkov Jul 18, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 73 additions & 0 deletions airbyte-db/db-lib/src/main/java/io/airbyte/db/PostgresUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@
package io.airbyte.db;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.db.jdbc.JdbcUtils;
import java.io.IOException;
import java.sql.SQLException;
import java.util.List;
import org.testcontainers.containers.PostgreSQLContainer;

public class PostgresUtils {

Expand All @@ -23,4 +26,74 @@ public static PgLsn getLsn(final JdbcDatabase database) throws SQLException {
return PgLsn.fromPgString(jsonNodes.get(0).get("pg_current_wal_lsn").asText());
}

@VisibleForTesting
public static Certificate getCertificate(final PostgreSQLContainer<?> container) throws IOException, InterruptedException {
container.execInContainer("su", "-c", "psql -U test -c \"CREATE USER postgres WITH PASSWORD 'postgres';\"");
andriikorotkov marked this conversation as resolved.
Show resolved Hide resolved
container.execInContainer("su", "-c", "psql -U test -c \"GRANT CONNECT ON DATABASE \"test\" TO postgres;\"");
container.execInContainer("su", "-c", "psql -U test -c \"ALTER USER postgres WITH SUPERUSER;\"");

container.execInContainer("su", "-c", "openssl ecparam -name prime256v1 -genkey -noout -out ca.key");
container.execInContainer("su", "-c", "openssl req -new -x509 -sha256 -key ca.key -out ca.crt -subj \"/CN=localhost\"");
container.execInContainer("su", "-c", "openssl ecparam -name prime256v1 -genkey -noout -out server.key");
container.execInContainer("su", "-c", "openssl req -new -sha256 -key server.key -out server.csr -subj \"/CN=localhost\"");
container.execInContainer("su", "-c",
"openssl x509 -req -in server.csr -CA ca.crt -CAkey ca.key -CAcreateserial -out server.crt -days 365 -sha256");
container.execInContainer("su", "-c", "cp server.key /etc/ssl/private/");
container.execInContainer("su", "-c", "cp server.crt /etc/ssl/private/");
container.execInContainer("su", "-c", "cp ca.crt /etc/ssl/private/");
container.execInContainer("su", "-c", "chmod og-rwx /etc/ssl/private/server.* /etc/ssl/private/ca.*");
container.execInContainer("su", "-c", "chown postgres:postgres /etc/ssl/private/server.crt /etc/ssl/private/server.key /etc/ssl/private/ca.crt");
container.execInContainer("su", "-c", "echo \"ssl = on\" >> /var/lib/postgresql/data/postgresql.conf");
container.execInContainer("su", "-c", "echo \"ssl_cert_file = '/etc/ssl/private/server.crt'\" >> /var/lib/postgresql/data/postgresql.conf");
container.execInContainer("su", "-c", "echo \"ssl_key_file = '/etc/ssl/private/server.key'\" >> /var/lib/postgresql/data/postgresql.conf");
container.execInContainer("su", "-c", "echo \"ssl_ca_file = '/etc/ssl/private/ca.crt'\" >> /var/lib/postgresql/data/postgresql.conf");
container.execInContainer("su", "-c", "mkdir root/.postgresql");
container.execInContainer("su", "-c",
"echo \"hostssl all all 127.0.0.1/32 cert clientcert=verify-full\" >> /var/lib/postgresql/data/pg_hba.conf");

var caCert = container.execInContainer("su", "-c", "cat ca.crt").getStdout().trim();

container.execInContainer("su", "-c", "openssl ecparam -name prime256v1 -genkey -noout -out client.key");
container.execInContainer("su", "-c", "openssl req -new -sha256 -key client.key -out client.csr -subj \"/CN=postgres\"");
container.execInContainer("su", "-c",
"openssl x509 -req -in client.csr -CA ca.crt -CAkey ca.key -CAcreateserial -out client.crt -days 365 -sha256");
container.execInContainer("su", "-c", "cp client.crt ~/.postgresql/postgresql.crt");
container.execInContainer("su", "-c", "cp client.key ~/.postgresql/postgresql.key");
container.execInContainer("su", "-c", "chmod 0600 ~/.postgresql/postgresql.crt ~/.postgresql/postgresql.key");
container.execInContainer("su", "-c", "cp ca.crt root/.postgresql/ca.crt");
container.execInContainer("su", "-c", "chown postgres:postgres ~/.postgresql/ca.crt");

container.execInContainer("su", "-c", "psql -U test -c \"SELECT pg_reload_conf();\"");

var clientKey = container.execInContainer("su", "-c", "cat client.key").getStdout().trim();
var clientCert = container.execInContainer("su", "-c", "cat client.crt").getStdout().trim();
return new Certificate(caCert, clientCert, clientKey);
}

public static class Certificate {

private final String caCertificate;
private final String clientCertificate;
private final String clientKey;

public Certificate(final String caCertificate, final String clientCertificate, final String clientKey) {
this.caCertificate = caCertificate;
this.clientCertificate = clientCertificate;
this.clientKey = clientKey;
}

public String getCaCertificate() {
return caCertificate;
}

public String getClientCertificate() {
return clientCertificate;
}

public String getClientKey() {
return clientKey;
}

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.util;

import com.fasterxml.jackson.databind.JsonNode;
import java.io.IOException;
import java.io.PrintWriter;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.RandomStringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PostgresSslConnectionUtils {

private static final Logger LOGGER = LoggerFactory.getLogger(PostgresSslConnectionUtils.class);
private static final String KEY_STORE_PASS = RandomStringUtils.randomAlphanumeric(10);
private static final String CA_CERTIFICATE = "ca.crt";
private static final String CLIENT_CERTIFICATE = "client.crt";
private static final String CLIENT_KEY = "client.key";
private static final String CLIENT_ENCRYPTED_KEY = "client.pk8";

public static final String PARAM_MODE = "mode";
public static final String PARAM_SSL = "ssl";
public static final String PARAM_SSL_MODE = "ssl_mode";
public static final String PARAM_SSLMODE = "sslmode";
public static final String PARAM_CLIENT_KEY_PASSWORD = "client_key_password";
public static final String PARAM_CA_CERTIFICATE = "ca_certificate";
public static final String PARAM_CLIENT_CERTIFICATE = "client_certificate";
public static final String PARAM_CLIENT_KEY = "client_key";

public static final String VERIFY_CA = "verify-ca";
public static final String VERIFY_FULL = "verify-full";
public static final String DISABLE = "disable";
public static final String TRUE_STRING_VALUE = "true";
public static final String FACTORY_VALUE = "org.postgresql.ssl.DefaultJavaSSLFactory";

public static Map<String, String> obtainConnectionOptions(final JsonNode encryption) {
final Map<String, String> additionalParameters = new HashMap<>();
if (!encryption.isNull()) {
final var method = encryption.get(PARAM_MODE).asText();
String sslPassword = encryption.has(PARAM_CLIENT_KEY_PASSWORD) ? encryption.get(PARAM_CLIENT_KEY_PASSWORD).asText() : "";
var keyStorePassword = KEY_STORE_PASS;
if (!sslPassword.isEmpty()) {
keyStorePassword = sslPassword;
}
switch (method) {
case VERIFY_CA -> {
additionalParameters.putAll(obtainConnectionCaOptions(encryption, method, keyStorePassword));
}
case VERIFY_FULL -> {
additionalParameters.putAll(obtainConnectionFullOptions(encryption, method, keyStorePassword));
}
default -> {
additionalParameters.put(PARAM_SSL, TRUE_STRING_VALUE);
additionalParameters.put(PARAM_SSLMODE, method);
}
}
}
return additionalParameters;
}

private static Map<String, String> obtainConnectionFullOptions(final JsonNode encryption,
final String method,
final String clientKeyPassword) {
final Map<String, String> additionalParameters = new HashMap<>();
try {
convertAndImportFullCertificate(encryption.get(PARAM_CA_CERTIFICATE).asText(),
encryption.get(PARAM_CLIENT_CERTIFICATE).asText(), encryption.get(PARAM_CLIENT_KEY).asText(), clientKeyPassword);
} catch (final IOException | InterruptedException e) {
throw new RuntimeException("Failed to import certificate into Java Keystore");
}
additionalParameters.put("ssl", TRUE_STRING_VALUE);
additionalParameters.put("sslmode", method);
additionalParameters.put("sslrootcert", CA_CERTIFICATE);
additionalParameters.put("sslcert", CLIENT_CERTIFICATE);
additionalParameters.put("sslkey", CLIENT_ENCRYPTED_KEY);
additionalParameters.put("sslfactory", FACTORY_VALUE);
return additionalParameters;
}

private static Map<String, String> obtainConnectionCaOptions(final JsonNode encryption,
final String method,
final String clientKeyPassword) {
final Map<String, String> additionalParameters = new HashMap<>();
try {
convertAndImportCaCertificate(encryption.get(PARAM_CA_CERTIFICATE).asText(), clientKeyPassword);
} catch (final IOException | InterruptedException e) {
throw new RuntimeException("Failed to import certificate into Java Keystore");
}
additionalParameters.put("ssl", TRUE_STRING_VALUE);
additionalParameters.put("sslmode", method);
additionalParameters.put("sslrootcert", CA_CERTIFICATE);
additionalParameters.put("sslfactory", FACTORY_VALUE);
return additionalParameters;
}

private static void convertAndImportFullCertificate(final String caCertificate,
final String clientCertificate,
final String clientKey,
final String clientKeyPassword)
throws IOException, InterruptedException {
final Runtime run = Runtime.getRuntime();
createCaCertificate(caCertificate, clientKeyPassword, run);
createCertificateFile(CLIENT_CERTIFICATE, clientCertificate);
createCertificateFile(CLIENT_KEY, clientKey);
// add client certificate to the custom keystore
runProcess("keytool -alias client-certificate -keystore customkeystore"
+ " -import -file " + CLIENT_CERTIFICATE + " -storepass " + clientKeyPassword + " -noprompt", run);
// convert client.key to client.pk8 based on the documentation
runProcess("openssl pkcs8 -topk8 -inform PEM -in " + CLIENT_KEY + " -outform DER -out "
+ CLIENT_ENCRYPTED_KEY + " -nocrypt", run);
runProcess("rm " + CLIENT_KEY, run);

updateTrustStoreSystemProperty(clientKeyPassword);
}

private static void convertAndImportCaCertificate(final String caCertificate,
final String clientKeyPassword)
throws IOException, InterruptedException {
final Runtime run = Runtime.getRuntime();
createCaCertificate(caCertificate, clientKeyPassword, run);
updateTrustStoreSystemProperty(clientKeyPassword);
}

private static void createCaCertificate(final String caCertificate,
final String clientKeyPassword,
final Runtime run)
throws IOException, InterruptedException {
createCertificateFile(CA_CERTIFICATE, caCertificate);
// add CA certificate to the custom keystore
runProcess("keytool -import -alias rds-root -keystore customkeystore"
+ " -file " + CA_CERTIFICATE + " -storepass " + clientKeyPassword + " -noprompt", run);
}

private static void updateTrustStoreSystemProperty(final String clientKeyPassword) {
String result = System.getProperty("user.dir") + "/customkeystore";
System.setProperty("javax.net.ssl.trustStore", result);
System.setProperty("javax.net.ssl.trustStorePassword", clientKeyPassword);
}

private static void createCertificateFile(String fileName, String fileValue) throws IOException {
try (final PrintWriter out = new PrintWriter(fileName, StandardCharsets.UTF_8)) {
out.print(fileValue);
}
}

private static void runProcess(final String cmd, final Runtime run) throws IOException, InterruptedException {
final Process pr = run.exec(cmd);
if (!pr.waitFor(30, TimeUnit.SECONDS)) {
pr.destroy();
throw new RuntimeException("Timeout while executing: " + cmd);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-postgres-strict-encrypt

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.4.30
LABEL io.airbyte.version=0.4.31
LABEL io.airbyte.name=airbyte/source-postgres-strict-encrypt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.integrations.source.postgres;

import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.base.IntegrationRunner;
Expand All @@ -29,6 +30,11 @@ public class PostgresSourceStrictEncrypt extends SpecModifyingSource implements
public ConnectorSpecification modifySpec(final ConnectorSpecification originalSpec) {
final ConnectorSpecification spec = Jsons.clone(originalSpec);
((ObjectNode) spec.getConnectionSpecification().get("properties")).remove("ssl");
ArrayNode modifiedSslModes = spec.getConnectionSpecification().get("properties").get("ssl_mode").get("oneOf").deepCopy();
// Assume that the first item is the "disable" option; remove it
modifiedSslModes.remove(0);
((ObjectNode) spec.getConnectionSpecification().get("properties").get("ssl_mode")).remove("oneOf");
((ObjectNode) spec.getConnectionSpecification().get("properties").get("ssl_mode")).put("oneOf", modifiedSslModes);
return spec;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-postgres

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.4.30
LABEL io.airbyte.version=0.4.31
LABEL io.airbyte.name=airbyte/source-postgres
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,13 @@ public AirbyteMessage saveState(final Map<String, String> offset, final String d
}

/**
* Here we just want to emit the state to update the list of streams in the database to mark the completion of snapshot of new added streams.
* The addition of new streams in the state is done here {@link io.airbyte.integrations.source.relationaldb.state.GlobalStateManager#toState(Optional)}
* which is called inside the {@link StateManager#emit(Optional)} method which is being triggered below.
* The toState method adds all the streams present in the catalog in the state.
* Since there is no change in the CDC state value, whatever was present in the database will again be stored.
* Here we just want to emit the state to update the list of streams in the database to mark the
* completion of snapshot of new added streams. The addition of new streams in the state is done
* here
* {@link io.airbyte.integrations.source.relationaldb.state.GlobalStateManager#toState(Optional)}
* which is called inside the {@link StateManager#emit(Optional)} method which is being triggered
* below. The toState method adds all the streams present in the catalog in the state. Since there
* is no change in the CDC state value, whatever was present in the database will again be stored.
* This is done so that we can mark the completion of snapshot of new tables.
*/
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@
import static io.airbyte.integrations.debezium.AirbyteDebeziumHandler.shouldUseCDC;
import static io.airbyte.integrations.debezium.internals.DebeziumEventUtils.CDC_DELETED_AT;
import static io.airbyte.integrations.debezium.internals.DebeziumEventUtils.CDC_UPDATED_AT;
import static io.airbyte.integrations.util.PostgresSslConnectionUtils.DISABLE;
import static io.airbyte.integrations.util.PostgresSslConnectionUtils.PARAM_MODE;
import static io.airbyte.integrations.util.PostgresSslConnectionUtils.PARAM_SSL;
import static io.airbyte.integrations.util.PostgresSslConnectionUtils.PARAM_SSL_MODE;
import static io.airbyte.integrations.util.PostgresSslConnectionUtils.obtainConnectionOptions;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toSet;

Expand Down Expand Up @@ -36,8 +41,8 @@
import io.airbyte.integrations.source.relationaldb.state.StateManager;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteConnectionStatus;
import io.airbyte.protocol.models.AirbyteGlobalState;
import io.airbyte.protocol.models.AirbyteConnectionStatus.Status;
import io.airbyte.protocol.models.AirbyteGlobalState;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteStateMessage;
import io.airbyte.protocol.models.AirbyteStateMessage.AirbyteStateType;
Expand Down Expand Up @@ -66,6 +71,7 @@
public class PostgresSource extends AbstractJdbcSource<JDBCType> implements Source {

private static final Logger LOGGER = LoggerFactory.getLogger(PostgresSource.class);

public static final String CDC_LSN = "_ab_cdc_lsn";

static final String DRIVER_CLASS = DatabaseDriver.POSTGRESQL.getDriverClassName();
Expand Down Expand Up @@ -100,9 +106,22 @@ public JsonNode toDatabaseConfigStatic(final JsonNode config) {
}

// assume ssl if not explicitly mentioned.
if (!config.has("ssl") || config.get("ssl").asBoolean()) {
additionalParameters.add("ssl=true");
additionalParameters.add("sslmode=require");
if (!config.has(PARAM_SSL) || config.get(PARAM_SSL).asBoolean()) {
if (config.has(PARAM_SSL_MODE)) {
if (DISABLE.equals(config.get(PARAM_SSL_MODE).get(PARAM_MODE).asText())) {
additionalParameters.add("sslmode=disable");
} else {
var parametersList = obtainConnectionOptions(config.get(PARAM_SSL_MODE))
.entrySet()
.stream()
.map(e -> e.getKey() + "=" + e.getValue())
.toList();
additionalParameters.addAll(parametersList);
}
} else {
additionalParameters.add("ssl=true");
additionalParameters.add("sslmode=require");
andriikorotkov marked this conversation as resolved.
Show resolved Hide resolved
}
}

if (config.has("schemas") && config.get("schemas").isArray()) {
Expand Down Expand Up @@ -262,7 +281,8 @@ public List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(
final AutoCloseableIterator<AirbyteMessage> snapshotIterator = handler.getSnapshotIterators(
new ConfiguredAirbyteCatalog().withStreams(streamsToSnapshot), new PostgresCdcConnectorMetadataInjector(),
PostgresCdcProperties.getSnapshotProperties(), postgresCdcStateHandler, emittedAt);
return Collections.singletonList(AutoCloseableIterators.concatWithEagerClose(snapshotIterator, AutoCloseableIterators.lazyIterator(incrementalIteratorSupplier)));
return Collections.singletonList(
AutoCloseableIterators.concatWithEagerClose(snapshotIterator, AutoCloseableIterators.lazyIterator(incrementalIteratorSupplier)));

} else {
return super.getIncrementalIterators(database, catalog, tableNameToTable, stateManager, emittedAt);
Expand Down
Loading