Skip to content

Commit

Permalink
Source postgres: disable allow and prefer ssl modes in CDC mode (#18256)
Browse files Browse the repository at this point in the history
* Source postgres: disable ssl_mode allow and prefer in CDC mode

* Source postgres: code review fixes

* Source postgres: code review fixes

* Source postgres: improve ssl mode handling

* Source postgres: bump version

* auto-bump connector version

* Source postgres: bump version

* auto-bump connector version

Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
  • Loading branch information
2 people authored and nataly committed Nov 3, 2022
1 parent 1245fe2 commit 16ff7b1
Show file tree
Hide file tree
Showing 10 changed files with 63 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
- name: AlloyDB for PostgreSQL
sourceDefinitionId: 1fa90628-2b9e-11ed-a261-0242ac120002
dockerRepository: airbyte/source-alloydb
dockerImageTag: 1.0.15
dockerImageTag: 1.0.16
documentationUrl: https://docs.airbyte.com/integrations/sources/alloydb
icon: alloydb.svg
sourceType: database
Expand Down Expand Up @@ -864,7 +864,7 @@
- name: Postgres
sourceDefinitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
dockerRepository: airbyte/source-postgres
dockerImageTag: 1.0.20
dockerImageTag: 1.0.21
documentationUrl: https://docs.airbyte.com/integrations/sources/postgres
icon: postgresql.svg
sourceType: database
Expand Down
4 changes: 2 additions & 2 deletions airbyte-config/init/src/main/resources/seed/source_specs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-alloydb:1.0.15"
- dockerImage: "airbyte/source-alloydb:1.0.16"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/postgres"
connectionSpecification:
Expand Down Expand Up @@ -8792,7 +8792,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-postgres:1.0.20"
- dockerImage: "airbyte/source-postgres:1.0.21"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/postgres"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-alloydb-strict-encrypt

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=1.0.15
LABEL io.airbyte.version=1.0.16
LABEL io.airbyte.name=airbyte/source-alloydb-strict-encrypt
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-alloydb/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-alloydb

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=1.0.15
LABEL io.airbyte.version=1.0.16
LABEL io.airbyte.name=airbyte/source-alloydb
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-postgres-strict-encrypt

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=1.0.20
LABEL io.airbyte.version=1.0.21
LABEL io.airbyte.name=airbyte/source-postgres-strict-encrypt
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-postgres/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-postgres

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=1.0.20
LABEL io.airbyte.version=1.0.21
LABEL io.airbyte.name=airbyte/source-postgres
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.commons.features.FeatureFlags;
import io.airbyte.commons.functional.CheckedConsumer;
Expand Down Expand Up @@ -78,6 +79,7 @@ public class PostgresSource extends AbstractJdbcSource<JDBCType> implements Sour
private static final int INTERMEDIATE_STATE_EMISSION_FREQUENCY = 10_000;

public static final String PARAM_SSLMODE = "sslmode";
public static final String SSL_MODE = "ssl_mode";
public static final String PARAM_SSL = "ssl";
public static final String PARAM_SSL_TRUE = "true";
public static final String PARAM_SSL_FALSE = "false";
Expand All @@ -87,11 +89,13 @@ public class PostgresSource extends AbstractJdbcSource<JDBCType> implements Sour
public static final String CA_CERTIFICATE_PATH = "ca_certificate_path";
public static final String SSL_KEY = "sslkey";
public static final String SSL_PASSWORD = "sslpassword";
public static final String MODE = "mode";
static final Map<String, String> SSL_JDBC_PARAMETERS = ImmutableMap.of(
"ssl", "true",
"sslmode", "require");
private List<String> schemas;
private final FeatureFlags featureFlags;
private static final Set<String> INVALID_CDC_SSL_MODES = ImmutableSet.of("allow", "prefer");

public static Source sshWrappedSource() {
return new SshWrappedSource(new PostgresSource(), JdbcUtils.HOST_LIST_KEY, JdbcUtils.PORT_LIST_KEY);
Expand Down Expand Up @@ -464,6 +468,23 @@ public static void main(final String[] args) throws Exception {
LOGGER.info("completed source: {}", PostgresSource.class);
}

@Override
public AirbyteConnectionStatus check(final JsonNode config) throws Exception {
if (PostgresUtils.isCdc(config)) {
if (config.has(SSL_MODE) && config.get(SSL_MODE).has(MODE)){
String sslModeValue = config.get(SSL_MODE).get(MODE).asText();
if (INVALID_CDC_SSL_MODES.contains(sslModeValue)) {
return new AirbyteConnectionStatus()
.withStatus(Status.FAILED)
.withMessage(String.format(
"In CDC replication mode ssl value '%s' is invalid. Please use one of the following SSL modes: disable, require, verify-ca, verify-full",
sslModeValue));
}
}
}
return super.check(config);
}

@Override
protected String toSslJdbcParam(final SslMode sslMode) {
return toSslJdbcParamInternal(sslMode);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.airbyte.db.factory.DatabaseDriver;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteConnectionStatus;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.CatalogHelpers;
Expand All @@ -35,6 +36,7 @@
import io.airbyte.test.utils.PostgreSQLContainerHelper;
import java.math.BigDecimal;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -197,4 +199,34 @@ void testIsCdc() {
assertTrue(PostgresUtils.isCdc(config));
}

@Test
void testAllowSSLWithCdcReplicationMethod() throws Exception {

JsonNode config = getCDCAndSslModeConfig("allow");

final AirbyteConnectionStatus actual = new PostgresSource().check(config);
assertEquals(AirbyteConnectionStatus.Status.FAILED, actual.getStatus());
assertTrue(actual.getMessage().contains("In CDC replication mode ssl value 'allow' is invalid"));
}

@Test
void testPreferSSLWithCdcReplicationMethod() throws Exception {

JsonNode config = getCDCAndSslModeConfig("prefer");

final AirbyteConnectionStatus actual = new PostgresSource().check(config);
assertEquals(AirbyteConnectionStatus.Status.FAILED, actual.getStatus());
assertTrue(actual.getMessage().contains("In CDC replication mode ssl value 'prefer' is invalid"));
}

private JsonNode getCDCAndSslModeConfig(String sslMode) {
return Jsons.jsonNode(ImmutableMap.builder()
.put(JdbcUtils.SSL_KEY, true)
.put(JdbcUtils.SSL_MODE_KEY, Map.of(JdbcUtils.MODE_KEY, sslMode))
.put("replication_method", Map.of("method", "CDC",
"replication_slot", "slot",
"publication", "ab_pub"))
.build());
}

}
1 change: 1 addition & 0 deletions docs/integrations/sources/alloydb.md
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,7 @@ According to Postgres [documentation](https://www.postgresql.org/docs/14/datatyp

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:-------------------------------------------------|
| 1.0.16 | 2022-10-25 | [18256](https://github.com/airbytehq/airbyte/pull/18256) | Disable allow and prefer ssl modes in CDC mode |
| | 2022-10-13 | [15535](https://github.com/airbytehq/airbyte/pull/16238) | Update incremental query to avoid data missing when new data is inserted at the same time as a sync starts under non-CDC incremental mode |
| 1.0.15 | 2022-10-11 | [17782](https://github.com/airbytehq/airbyte/pull/17782) | Align with Postgres source v.1.0.15 |
| 1.0.0 | 2022-09-15 | [16776](https://github.com/airbytehq/airbyte/pull/16776) | Align with strict-encrypt version |
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/postgres.md
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,7 @@ The root causes is that the WALs needed for the incremental sync has been remove

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-------------------------------------------------------------------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 1.0.21 | 2022-10-25 | [18256](https://github.com/airbytehq/airbyte/pull/18256) | Disable allow and prefer ssl modes in CDC mode |
| 1.0.20 | 2022-10-25 | [18383](https://github.com/airbytehq/airbyte/pull/18383) | Better SSH error handling + messages |
| 1.0.19 | 2022-10-21 | [18263](https://github.com/airbytehq/airbyte/pull/18263) | Fixes bug introduced in [15833](https://github.com/airbytehq/airbyte/pull/15833) and adds better error messaging for SSH tunnel in Destinations |
| 1.0.18 | 2022-10-19 | [18087](https://github.com/airbytehq/airbyte/pull/18087) | Better error messaging for configuration errors (SSH configs, choosing an invalid cursor) |
Expand Down

0 comments on commit 16ff7b1

Please sign in to comment.