From e9a8a052677027cc3c88c41f36206bcf7d59e7f0 Mon Sep 17 00:00:00 2001 From: VitaliiMaltsev <39538064+VitaliiMaltsev@users.noreply.github.com> Date: Sun, 4 Sep 2022 15:30:04 +0300 Subject: [PATCH] MySQL Source : Standardize spec.json for DB connectors that support log-based CDC replication (#16216) * Fixed bucket naming for S3 * removed redundant configs * MySQL Source : Standardize spec.json for DB connectors that support log-based CDC replication * fixed strict encrypt tests * fixed mysql tests * bump version * auto-bump connector version [ci skip] Co-authored-by: Octavia Squidington III --- .../resources/seed/source_definitions.yaml | 2 +- .../src/main/resources/seed/source_specs.yaml | 41 ++++++++++++++----- .../source-mysql-strict-encrypt/Dockerfile | 2 +- ...cateStrictEncryptSourceAcceptanceTest.java | 7 +++- ...ySqlStrictEncryptSourceAcceptanceTest.java | 11 ++--- .../src/test/resources/expected_spec.json | 36 ++++++++++++++-- .../connectors/source-mysql/Dockerfile | 2 +- .../source/mysql/MySqlSource.java | 13 ++++-- .../source-mysql/src/main/resources/spec.json | 36 ++++++++++++++-- ...SqlSslCertificateSourceAcceptanceTest.java | 7 +++- .../CdcBinlogsMySqlSourceDatatypeTest.java | 6 ++- ...nitialSnapshotMySqlSourceDatatypeTest.java | 7 +++- .../mysql/CdcMySqlSourceAcceptanceTest.java | 7 ++-- ...lSslCaCertificateSourceAcceptanceTest.java | 5 ++- .../mysql/MySqlSourceAcceptanceTest.java | 7 ++-- .../source/mysql/MySqlSourceDatatypeTest.java | 8 ++-- .../mysql/MySqlSslSourceAcceptanceTest.java | 9 +++- .../SshKeyMySqlSourceAcceptanceTest.java | 2 +- .../SshPasswordMySqlSourceAcceptanceTest.java | 2 +- .../mysql/FillMySqlTestDbScriptTest.java | 7 +++- docs/integrations/sources/mysql.md | 3 +- 21 files changed, 165 insertions(+), 55 deletions(-) diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 3996c7fe6fc2..c730776d3fd1 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -647,7 +647,7 @@ - name: MySQL sourceDefinitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad dockerRepository: airbyte/source-mysql - dockerImageTag: 0.6.8 + dockerImageTag: 0.6.9 documentationUrl: https://docs.airbyte.io/integrations/sources/mysql icon: mysql.svg sourceType: database diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index 1a558151a514..95c732a60e9b 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -6048,7 +6048,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-mysql:0.6.8" +- dockerImage: "airbyte/source-mysql:0.6.9" spec: documentationUrl: "https://docs.airbyte.io/integrations/sources/mysql" connectionSpecification: @@ -6236,18 +6236,37 @@ airbyte_secret: true order: 4 replication_method: - type: "string" + type: "object" title: "Replication Method" - description: "Replication method which is used for data extraction from\ - \ the database. STANDARD replication requires no setup on the DB side\ - \ but will not be able to represent deletions incrementally. CDC uses\ - \ the Binlog to detect inserts, updates, and deletes. This needs to be\ - \ configured on the source database itself." + description: "Replication method to use for extracting data from the database." order: 8 - default: "STANDARD" - enum: - - "STANDARD" - - "CDC" + oneOf: + - title: "STANDARD" + description: "Standard replication requires no setup on the DB side but\ + \ will not be able to represent deletions incrementally." + required: + - "method" + properties: + method: + type: "string" + const: "STANDARD" + enum: + - "STANDARD" + default: "STANDARD" + order: 0 + - title: "Logical Replication (CDC)" + description: "CDC uses the Binlog to detect inserts, updates, and deletes.\ + \ This needs to be configured on the source database itself." + required: + - "method" + properties: + method: + type: "string" + const: "CDC" + enum: + - "CDC" + default: "CDC" + order: 0 tunnel_method: type: "object" title: "SSH Tunnel Method" diff --git a/airbyte-integrations/connectors/source-mysql-strict-encrypt/Dockerfile b/airbyte-integrations/connectors/source-mysql-strict-encrypt/Dockerfile index bea4a4b304d6..b385f3d6571a 100644 --- a/airbyte-integrations/connectors/source-mysql-strict-encrypt/Dockerfile +++ b/airbyte-integrations/connectors/source-mysql-strict-encrypt/Dockerfile @@ -16,5 +16,5 @@ ENV APPLICATION source-mysql-strict-encrypt COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=0.6.8 +LABEL io.airbyte.version=0.6.9 LABEL io.airbyte.name=airbyte/source-mysql-strict-encrypt diff --git a/airbyte-integrations/connectors/source-mysql-strict-encrypt/src/test-integration/java/io/airbyte/integrations/source/mysql_strict_encrypt/AbstractMySqlSslCertificateStrictEncryptSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-mysql-strict-encrypt/src/test-integration/java/io/airbyte/integrations/source/mysql_strict_encrypt/AbstractMySqlSslCertificateStrictEncryptSourceAcceptanceTest.java index ecfa9e92953a..b637cf07a011 100644 --- a/airbyte-integrations/connectors/source-mysql-strict-encrypt/src/test-integration/java/io/airbyte/integrations/source/mysql_strict_encrypt/AbstractMySqlSslCertificateStrictEncryptSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-mysql-strict-encrypt/src/test-integration/java/io/airbyte/integrations/source/mysql_strict_encrypt/AbstractMySqlSslCertificateStrictEncryptSourceAcceptanceTest.java @@ -4,6 +4,7 @@ package io.airbyte.integrations.source.mysql_strict_encrypt; +import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.ImmutableMap; import io.airbyte.commons.json.Jsons; import io.airbyte.db.Database; @@ -31,7 +32,9 @@ protected void setupEnvironment(final TestDestinationEnv environment) throws Exc certs = MySqlUtils.getCertificate(container, true); var sslMode = getSslConfig(); - + final JsonNode replicationMethod = Jsons.jsonNode(ImmutableMap.builder() + .put("method", "STANDARD") + .build()); config = Jsons.jsonNode(ImmutableMap.builder() .put(JdbcUtils.HOST_KEY, container.getHost()) .put(JdbcUtils.PORT_KEY, container.getFirstMappedPort()) @@ -40,7 +43,7 @@ protected void setupEnvironment(final TestDestinationEnv environment) throws Exc .put(JdbcUtils.PASSWORD_KEY, container.getPassword()) .put(JdbcUtils.SSL_KEY, true) .put(JdbcUtils.SSL_MODE_KEY, sslMode) - .put("replication_method", ReplicationMethod.STANDARD) + .put("replication_method", replicationMethod) .build()); } diff --git a/airbyte-integrations/connectors/source-mysql-strict-encrypt/src/test-integration/java/io/airbyte/integrations/source/mysql_strict_encrypt/MySqlStrictEncryptSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-mysql-strict-encrypt/src/test-integration/java/io/airbyte/integrations/source/mysql_strict_encrypt/MySqlStrictEncryptSourceAcceptanceTest.java index 43493837f006..6a5ef2c3e2b5 100644 --- a/airbyte-integrations/connectors/source-mysql-strict-encrypt/src/test-integration/java/io/airbyte/integrations/source/mysql_strict_encrypt/MySqlStrictEncryptSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-mysql-strict-encrypt/src/test-integration/java/io/airbyte/integrations/source/mysql_strict_encrypt/MySqlStrictEncryptSourceAcceptanceTest.java @@ -16,7 +16,6 @@ import io.airbyte.db.factory.DatabaseDriver; import io.airbyte.db.jdbc.JdbcUtils; import io.airbyte.integrations.base.ssh.SshHelpers; -import io.airbyte.integrations.source.mysql.MySqlSource; import io.airbyte.integrations.standardtest.source.SourceAcceptanceTest; import io.airbyte.integrations.standardtest.source.TestDestinationEnv; import io.airbyte.protocol.models.CatalogHelpers; @@ -46,9 +45,11 @@ protected void setupEnvironment(final TestDestinationEnv environment) throws Exc container.start(); var sslMode = ImmutableMap.builder() - .put(JdbcUtils.MODE_KEY, "required") - .build(); - + .put(JdbcUtils.MODE_KEY, "required") + .build(); + final JsonNode replicationMethod = Jsons.jsonNode(ImmutableMap.builder() + .put("method", "STANDARD") + .build()); config = Jsons.jsonNode(ImmutableMap.builder() .put(JdbcUtils.HOST_KEY, container.getHost()) .put(JdbcUtils.PORT_KEY, container.getFirstMappedPort()) @@ -56,7 +57,7 @@ protected void setupEnvironment(final TestDestinationEnv environment) throws Exc .put(JdbcUtils.USERNAME_KEY, container.getUsername()) .put(JdbcUtils.PASSWORD_KEY, container.getPassword()) .put(JdbcUtils.SSL_MODE_KEY, sslMode) - .put("replication_method", MySqlSource.ReplicationMethod.STANDARD) + .put("replication_method", replicationMethod) .build()); try (final DSLContext dslContext = DSLContextFactory.create( diff --git a/airbyte-integrations/connectors/source-mysql-strict-encrypt/src/test/resources/expected_spec.json b/airbyte-integrations/connectors/source-mysql-strict-encrypt/src/test/resources/expected_spec.json index 2c6da3511475..c964d26ecad9 100644 --- a/airbyte-integrations/connectors/source-mysql-strict-encrypt/src/test/resources/expected_spec.json +++ b/airbyte-integrations/connectors/source-mysql-strict-encrypt/src/test/resources/expected_spec.json @@ -174,12 +174,40 @@ ] }, "replication_method": { - "type": "string", + "type": "object", "title": "Replication Method", - "description": "Replication method which is used for data extraction from the database. STANDARD replication requires no setup on the DB side but will not be able to represent deletions incrementally. CDC uses the Binlog to detect inserts, updates, and deletes. This needs to be configured on the source database itself.", + "description": "Replication method to use for extracting data from the database.", "order": 8, - "default": "STANDARD", - "enum": ["STANDARD", "CDC"] + "oneOf": [ + { + "title": "STANDARD", + "description": "Standard replication requires no setup on the DB side but will not be able to represent deletions incrementally.", + "required": ["method"], + "properties": { + "method": { + "type": "string", + "const": "STANDARD", + "enum": ["STANDARD"], + "default": "STANDARD", + "order": 0 + } + } + }, + { + "title": "Logical Replication (CDC)", + "description": "CDC uses the Binlog to detect inserts, updates, and deletes. This needs to be configured on the source database itself.", + "required": ["method"], + "properties": { + "method": { + "type": "string", + "const": "CDC", + "enum": ["CDC"], + "default": "CDC", + "order": 0 + } + } + } + ] } } } diff --git a/airbyte-integrations/connectors/source-mysql/Dockerfile b/airbyte-integrations/connectors/source-mysql/Dockerfile index 49e9e93fb676..92d7a9ff378f 100644 --- a/airbyte-integrations/connectors/source-mysql/Dockerfile +++ b/airbyte-integrations/connectors/source-mysql/Dockerfile @@ -16,5 +16,5 @@ ENV APPLICATION source-mysql COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=0.6.8 +LABEL io.airbyte.version=0.6.9 LABEL io.airbyte.name=airbyte/source-mysql diff --git a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java index 73351a988a5e..0d7da1036287 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java +++ b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java @@ -172,9 +172,16 @@ public JsonNode toDatabaseConfig(final JsonNode config) { } private static boolean isCdc(final JsonNode config) { - return config.hasNonNull("replication_method") - && ReplicationMethod.valueOf(config.get("replication_method").asText()) - .equals(ReplicationMethod.CDC); + if (config.hasNonNull("replication_method")) { + if (config.get("replication_method").isTextual()) { + return ReplicationMethod.valueOf(config.get("replication_method").asText()) + .equals(ReplicationMethod.CDC); + } else if (config.get("replication_method").isObject()) { + return config.get("replication_method").get("method").asText() + .equals(ReplicationMethod.CDC.name()); + } + } + return false; } @Override diff --git a/airbyte-integrations/connectors/source-mysql/src/main/resources/spec.json b/airbyte-integrations/connectors/source-mysql/src/main/resources/spec.json index eefb37c035ba..c09509ff9498 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/resources/spec.json +++ b/airbyte-integrations/connectors/source-mysql/src/main/resources/spec.json @@ -181,12 +181,40 @@ ] }, "replication_method": { - "type": "string", + "type": "object", "title": "Replication Method", - "description": "Replication method which is used for data extraction from the database. STANDARD replication requires no setup on the DB side but will not be able to represent deletions incrementally. CDC uses the Binlog to detect inserts, updates, and deletes. This needs to be configured on the source database itself.", + "description": "Replication method to use for extracting data from the database.", "order": 8, - "default": "STANDARD", - "enum": ["STANDARD", "CDC"] + "oneOf": [ + { + "title": "STANDARD", + "description": "Standard replication requires no setup on the DB side but will not be able to represent deletions incrementally.", + "required": ["method"], + "properties": { + "method": { + "type": "string", + "const": "STANDARD", + "enum": ["STANDARD"], + "default": "STANDARD", + "order": 0 + } + } + }, + { + "title": "Logical Replication (CDC)", + "description": "CDC uses the Binlog to detect inserts, updates, and deletes. This needs to be configured on the source database itself.", + "required": ["method"], + "properties": { + "method": { + "type": "string", + "const": "CDC", + "enum": ["CDC"], + "default": "CDC", + "order": 0 + } + } + } + ] } } } diff --git a/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/AbstractMySqlSslCertificateSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/AbstractMySqlSslCertificateSourceAcceptanceTest.java index 9f615d331b23..68efdaa1be3a 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/AbstractMySqlSslCertificateSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/AbstractMySqlSslCertificateSourceAcceptanceTest.java @@ -4,6 +4,7 @@ package io.airbyte.integrations.source.mysql; +import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.ImmutableMap; import io.airbyte.commons.json.Jsons; import io.airbyte.db.Database; @@ -32,7 +33,9 @@ protected void setupEnvironment(final TestDestinationEnv environment) throws Exc certs = getCertificates(); var sslMode = getSslConfig(); - + final JsonNode replicationMethod = Jsons.jsonNode(ImmutableMap.builder() + .put("method", "STANDARD") + .build()); config = Jsons.jsonNode(ImmutableMap.builder() .put(JdbcUtils.HOST_KEY, container.getHost()) .put(JdbcUtils.PORT_KEY, container.getFirstMappedPort()) @@ -41,7 +44,7 @@ protected void setupEnvironment(final TestDestinationEnv environment) throws Exc .put(JdbcUtils.PASSWORD_KEY, container.getPassword()) .put(JdbcUtils.SSL_KEY, true) .put(JdbcUtils.SSL_MODE_KEY, sslMode) - .put("replication_method", ReplicationMethod.STANDARD) + .put("replication_method", replicationMethod) .build()); } diff --git a/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/CdcBinlogsMySqlSourceDatatypeTest.java b/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/CdcBinlogsMySqlSourceDatatypeTest.java index 0867feaf2b32..045b78b828b9 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/CdcBinlogsMySqlSourceDatatypeTest.java +++ b/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/CdcBinlogsMySqlSourceDatatypeTest.java @@ -81,14 +81,16 @@ protected void setupEnvironment(TestDestinationEnv environment) throws Exception protected Database setupDatabase() throws Exception { container = new MySQLContainer<>("mysql:8.0"); container.start(); - + final JsonNode replicationMethod = Jsons.jsonNode(ImmutableMap.builder() + .put("method", "CDC") + .build()); config = Jsons.jsonNode(ImmutableMap.builder() .put(JdbcUtils.HOST_KEY, container.getHost()) .put(JdbcUtils.PORT_KEY, container.getFirstMappedPort()) .put(JdbcUtils.DATABASE_KEY, container.getDatabaseName()) .put(JdbcUtils.USERNAME_KEY, container.getUsername()) .put(JdbcUtils.PASSWORD_KEY, container.getPassword()) - .put("replication_method", MySqlSource.ReplicationMethod.CDC) + .put("replication_method", replicationMethod) .build()); dslContext = DSLContextFactory.create( diff --git a/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/CdcInitialSnapshotMySqlSourceDatatypeTest.java b/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/CdcInitialSnapshotMySqlSourceDatatypeTest.java index 566d5b6cecab..d52defbdaa99 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/CdcInitialSnapshotMySqlSourceDatatypeTest.java +++ b/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/CdcInitialSnapshotMySqlSourceDatatypeTest.java @@ -4,6 +4,7 @@ package io.airbyte.integrations.source.mysql; +import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.ImmutableMap; import io.airbyte.commons.json.Jsons; import io.airbyte.db.Database; @@ -29,14 +30,16 @@ protected void tearDown(final TestDestinationEnv testEnv) { protected Database setupDatabase() throws Exception { container = new MySQLContainer<>("mysql:8.0"); container.start(); - + final JsonNode replicationMethod = Jsons.jsonNode(ImmutableMap.builder() + .put("method", "CDC") + .build()); config = Jsons.jsonNode(ImmutableMap.builder() .put(JdbcUtils.HOST_KEY, container.getHost()) .put(JdbcUtils.PORT_KEY, container.getFirstMappedPort()) .put(JdbcUtils.DATABASE_KEY, container.getDatabaseName()) .put(JdbcUtils.USERNAME_KEY, container.getUsername()) .put(JdbcUtils.PASSWORD_KEY, container.getPassword()) - .put("replication_method", MySqlSource.ReplicationMethod.CDC) + .put("replication_method", replicationMethod) .put("snapshot_mode", "initial_only") .build()); diff --git a/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/CdcMySqlSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/CdcMySqlSourceAcceptanceTest.java index be71ec84fc2d..fb64826100b8 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/CdcMySqlSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/CdcMySqlSourceAcceptanceTest.java @@ -18,7 +18,6 @@ import io.airbyte.db.factory.DatabaseDriver; import io.airbyte.db.jdbc.JdbcUtils; import io.airbyte.integrations.base.ssh.SshHelpers; -import io.airbyte.integrations.source.mysql.MySqlSource.ReplicationMethod; import io.airbyte.integrations.standardtest.source.SourceAcceptanceTest; import io.airbyte.integrations.standardtest.source.TestDestinationEnv; import io.airbyte.protocol.models.AirbyteMessage; @@ -99,14 +98,16 @@ protected JsonNode getState() { protected void setupEnvironment(final TestDestinationEnv environment) { container = new MySQLContainer<>("mysql:8.0"); container.start(); - + final JsonNode replicationMethod = Jsons.jsonNode(ImmutableMap.builder() + .put("method", "CDC") + .build()); config = Jsons.jsonNode(ImmutableMap.builder() .put(JdbcUtils.HOST_KEY, container.getHost()) .put(JdbcUtils.PORT_KEY, container.getFirstMappedPort()) .put(JdbcUtils.DATABASE_KEY, container.getDatabaseName()) .put(JdbcUtils.USERNAME_KEY, container.getUsername()) .put(JdbcUtils.PASSWORD_KEY, container.getPassword()) - .put("replication_method", ReplicationMethod.CDC) + .put("replication_method", replicationMethod) .build()); revokeAllPermissions(); diff --git a/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/CdcMySqlSslCaCertificateSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/CdcMySqlSslCaCertificateSourceAcceptanceTest.java index 53a9cfa1ef29..3141173e8b86 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/CdcMySqlSslCaCertificateSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/CdcMySqlSslCaCertificateSourceAcceptanceTest.java @@ -110,6 +110,9 @@ protected void setupEnvironment(final TestDestinationEnv environment) throws Exc .put("client_key", certs.getClientKey()) .put("client_key_password", "Passw0rd") .build(); + final JsonNode replicationMethod = Jsons.jsonNode(ImmutableMap.builder() + .put("method", "CDC") + .build()); config = Jsons.jsonNode(ImmutableMap.builder() .put(JdbcUtils.HOST_KEY, container.getHost()) @@ -119,7 +122,7 @@ protected void setupEnvironment(final TestDestinationEnv environment) throws Exc .put(JdbcUtils.PASSWORD_KEY, container.getPassword()) .put(JdbcUtils.SSL_KEY, true) .put(JdbcUtils.SSL_MODE_KEY, sslMode) - .put("replication_method", ReplicationMethod.CDC) + .put("replication_method", replicationMethod) .build()); revokeAllPermissions(); diff --git a/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/MySqlSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/MySqlSourceAcceptanceTest.java index 0c276801038a..5aaf3979c4ab 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/MySqlSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/MySqlSourceAcceptanceTest.java @@ -13,7 +13,6 @@ import io.airbyte.db.factory.DatabaseDriver; import io.airbyte.db.jdbc.JdbcUtils; import io.airbyte.integrations.base.ssh.SshHelpers; -import io.airbyte.integrations.source.mysql.MySqlSource.ReplicationMethod; import io.airbyte.integrations.standardtest.source.SourceAcceptanceTest; import io.airbyte.integrations.standardtest.source.TestDestinationEnv; import io.airbyte.protocol.models.CatalogHelpers; @@ -41,14 +40,16 @@ public class MySqlSourceAcceptanceTest extends SourceAcceptanceTest { protected void setupEnvironment(final TestDestinationEnv environment) throws Exception { container = new MySQLContainer<>("mysql:8.0"); container.start(); - + final JsonNode replicationMethod = Jsons.jsonNode(ImmutableMap.builder() + .put("method", "STANDARD") + .build()); config = Jsons.jsonNode(ImmutableMap.builder() .put(JdbcUtils.HOST_KEY, container.getHost()) .put(JdbcUtils.PORT_KEY, container.getFirstMappedPort()) .put(JdbcUtils.DATABASE_KEY, container.getDatabaseName()) .put(JdbcUtils.USERNAME_KEY, container.getUsername()) .put(JdbcUtils.PASSWORD_KEY, container.getPassword()) - .put("replication_method", ReplicationMethod.STANDARD) + .put("replication_method", replicationMethod) .build()); try (final DSLContext dslContext = DSLContextFactory.create( diff --git a/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/MySqlSourceDatatypeTest.java b/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/MySqlSourceDatatypeTest.java index 584565b3b6d3..a77f81dbeb63 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/MySqlSourceDatatypeTest.java +++ b/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/MySqlSourceDatatypeTest.java @@ -4,13 +4,13 @@ package io.airbyte.integrations.source.mysql; +import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.ImmutableMap; import io.airbyte.commons.json.Jsons; import io.airbyte.db.Database; import io.airbyte.db.factory.DSLContextFactory; import io.airbyte.db.factory.DatabaseDriver; import io.airbyte.db.jdbc.JdbcUtils; -import io.airbyte.integrations.source.mysql.MySqlSource.ReplicationMethod; import io.airbyte.integrations.standardtest.source.TestDestinationEnv; import java.util.Map; import org.jooq.SQLDialect; @@ -27,14 +27,16 @@ protected void tearDown(final TestDestinationEnv testEnv) { protected Database setupDatabase() throws Exception { container = new MySQLContainer<>("mysql:8.0"); container.start(); - + final JsonNode replicationMethod = Jsons.jsonNode(ImmutableMap.builder() + .put("method", "STANDARD") + .build()); config = Jsons.jsonNode(ImmutableMap.builder() .put(JdbcUtils.HOST_KEY, container.getHost()) .put(JdbcUtils.PORT_KEY, container.getFirstMappedPort()) .put(JdbcUtils.DATABASE_KEY, container.getDatabaseName()) .put(JdbcUtils.USERNAME_KEY, container.getUsername()) .put(JdbcUtils.PASSWORD_KEY, container.getPassword()) - .put("replication_method", ReplicationMethod.STANDARD) + .put("replication_method", replicationMethod) .build()); final Database database = new Database( diff --git a/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/MySqlSslSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/MySqlSslSourceAcceptanceTest.java index 9d8dfdb6bd86..0c7d79c48d41 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/MySqlSslSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/MySqlSslSourceAcceptanceTest.java @@ -4,13 +4,15 @@ package io.airbyte.integrations.source.mysql; +import static io.airbyte.integrations.source.mysql.MySqlSource.SSL_PARAMETERS; + +import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.ImmutableMap; import io.airbyte.commons.json.Jsons; import io.airbyte.db.Database; import io.airbyte.db.factory.DSLContextFactory; import io.airbyte.db.factory.DatabaseDriver; import io.airbyte.db.jdbc.JdbcUtils; -import io.airbyte.integrations.source.mysql.MySqlSource.ReplicationMethod; import io.airbyte.integrations.standardtest.source.TestDestinationEnv; import org.jooq.DSLContext; import org.jooq.SQLDialect; @@ -22,6 +24,9 @@ public class MySqlSslSourceAcceptanceTest extends MySqlSourceAcceptanceTest { protected void setupEnvironment(final TestDestinationEnv environment) throws Exception { container = new MySQLContainer<>("mysql:8.0"); container.start(); + final JsonNode replicationMethod = Jsons.jsonNode(ImmutableMap.builder() + .put("method", "STANDARD") + .build()); var sslMode = ImmutableMap.builder() .put(JdbcUtils.MODE_KEY, "required") @@ -35,7 +40,7 @@ protected void setupEnvironment(final TestDestinationEnv environment) throws Exc .put(JdbcUtils.PASSWORD_KEY, container.getPassword()) .put(JdbcUtils.SSL_KEY, true) .put(JdbcUtils.SSL_MODE_KEY, sslMode) - .put("replication_method", ReplicationMethod.STANDARD) + .put("replication_method", replicationMethod) .build()); try (final DSLContext dslContext = DSLContextFactory.create( diff --git a/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/SshKeyMySqlSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/SshKeyMySqlSourceAcceptanceTest.java index 9da81da967b2..9c975b65ffcb 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/SshKeyMySqlSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/SshKeyMySqlSourceAcceptanceTest.java @@ -10,7 +10,7 @@ public class SshKeyMySqlSourceAcceptanceTest extends AbstractSshMySqlSourceAccep @Override public Path getConfigFilePath() { - return Path.of("secrets/ssh-key-config.json"); + return Path.of("secrets/ssh-key-repl-config.json"); } } diff --git a/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/SshPasswordMySqlSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/SshPasswordMySqlSourceAcceptanceTest.java index 2a7ac07cf677..fb32a73a8bc3 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/SshPasswordMySqlSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/SshPasswordMySqlSourceAcceptanceTest.java @@ -10,7 +10,7 @@ public class SshPasswordMySqlSourceAcceptanceTest extends AbstractSshMySqlSource @Override public Path getConfigFilePath() { - return Path.of("secrets/ssh-pwd-config.json"); + return Path.of("secrets/ssh-pwd-repl-config.json"); } } diff --git a/airbyte-integrations/connectors/source-mysql/src/test-performance/java/io/airbyte/integrations/source/mysql/FillMySqlTestDbScriptTest.java b/airbyte-integrations/connectors/source-mysql/src/test-performance/java/io/airbyte/integrations/source/mysql/FillMySqlTestDbScriptTest.java index 9bedc8f6af48..3af06df21c26 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test-performance/java/io/airbyte/integrations/source/mysql/FillMySqlTestDbScriptTest.java +++ b/airbyte-integrations/connectors/source-mysql/src/test-performance/java/io/airbyte/integrations/source/mysql/FillMySqlTestDbScriptTest.java @@ -11,7 +11,6 @@ import io.airbyte.db.factory.DSLContextFactory; import io.airbyte.db.factory.DatabaseDriver; import io.airbyte.db.jdbc.JdbcUtils; -import io.airbyte.integrations.source.mysql.MySqlSource.ReplicationMethod; import io.airbyte.integrations.standardtest.source.TestDestinationEnv; import io.airbyte.integrations.standardtest.source.performancetest.AbstractSourceFillDbWithTestData; import java.util.Map; @@ -38,13 +37,17 @@ protected String getImageName() { @Override protected Database setupDatabase(final String dbName) throws Exception { + + final JsonNode replicationMethod = Jsons.jsonNode(ImmutableMap.builder() + .put("method", "STANDARD") + .build()); config = Jsons.jsonNode(ImmutableMap.builder() .put(JdbcUtils.HOST_KEY, "your_host") .put(JdbcUtils.PORT_KEY, 3306) .put(JdbcUtils.DATABASE_KEY, dbName) // set your db name .put(JdbcUtils.USERNAME_KEY, "your_username") .put(JdbcUtils.PASSWORD_KEY, "your_pass") - .put("replication_method", ReplicationMethod.STANDARD) + .put("replication_method", replicationMethod) .build()); final Database database = new Database( diff --git a/docs/integrations/sources/mysql.md b/docs/integrations/sources/mysql.md index 0b1226c2d200..b73fd2e5ee72 100644 --- a/docs/integrations/sources/mysql.md +++ b/docs/integrations/sources/mysql.md @@ -188,7 +188,8 @@ If you do not see a type in this list, assume that it is coerced into a string. | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------| -| 0.6.8 | 2022-09-01 | [16259](https://github.com/airbytehq/airbyte/pull/16259) | Emit state messages more frequently | +| 0.6.9 | 2022-09-03 | [16216](https://github.com/airbytehq/airbyte/pull/16216) | Standardize spec for CDC replication. Replace the `replication_method` enum with a config object with a `method` enum field. | +| 0.6.8 | 2022-09-01 | [16259](https://github.com/airbytehq/airbyte/pull/16259) | Emit state messages more frequently | | 0.6.7 | 2022-08-30 | [16114](https://github.com/airbytehq/airbyte/pull/16114) | Prevent traffic going on an unsecured channel in strict-encryption version of source mysql | | 0.6.6 | 2022-08-25 | [15993](https://github.com/airbytehq/airbyte/pull/15993) | Improved support for connecting over SSL | | 0.6.5 | 2022-08-25 | [15917](https://github.com/airbytehq/airbyte/pull/15917) | Fix temporal data type default value bug |