Skip to content

Commit

Permalink
MySQL Source : Standardize spec.json for DB connectors that support l…
Browse files Browse the repository at this point in the history
…og-based CDC replication (#16216)

* Fixed bucket naming for S3

* removed redundant configs

* MySQL Source : Standardize spec.json for DB connectors that support log-based CDC replication

* fixed strict encrypt tests

* fixed mysql tests

* bump version

* auto-bump connector version [ci skip]

Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
  • Loading branch information
VitaliiMaltsev and octavia-squidington-iii authored Sep 4, 2022
1 parent 28a6add commit e9a8a05
Show file tree
Hide file tree
Showing 21 changed files with 165 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -647,7 +647,7 @@
- name: MySQL
sourceDefinitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
dockerRepository: airbyte/source-mysql
dockerImageTag: 0.6.8
dockerImageTag: 0.6.9
documentationUrl: https://docs.airbyte.io/integrations/sources/mysql
icon: mysql.svg
sourceType: database
Expand Down
41 changes: 30 additions & 11 deletions airbyte-config/init/src/main/resources/seed/source_specs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6048,7 +6048,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-mysql:0.6.8"
- dockerImage: "airbyte/source-mysql:0.6.9"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/mysql"
connectionSpecification:
Expand Down Expand Up @@ -6236,18 +6236,37 @@
airbyte_secret: true
order: 4
replication_method:
type: "string"
type: "object"
title: "Replication Method"
description: "Replication method which is used for data extraction from\
\ the database. STANDARD replication requires no setup on the DB side\
\ but will not be able to represent deletions incrementally. CDC uses\
\ the Binlog to detect inserts, updates, and deletes. This needs to be\
\ configured on the source database itself."
description: "Replication method to use for extracting data from the database."
order: 8
default: "STANDARD"
enum:
- "STANDARD"
- "CDC"
oneOf:
- title: "STANDARD"
description: "Standard replication requires no setup on the DB side but\
\ will not be able to represent deletions incrementally."
required:
- "method"
properties:
method:
type: "string"
const: "STANDARD"
enum:
- "STANDARD"
default: "STANDARD"
order: 0
- title: "Logical Replication (CDC)"
description: "CDC uses the Binlog to detect inserts, updates, and deletes.\
\ This needs to be configured on the source database itself."
required:
- "method"
properties:
method:
type: "string"
const: "CDC"
enum:
- "CDC"
default: "CDC"
order: 0
tunnel_method:
type: "object"
title: "SSH Tunnel Method"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-mysql-strict-encrypt

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.6.8
LABEL io.airbyte.version=0.6.9
LABEL io.airbyte.name=airbyte/source-mysql-strict-encrypt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.integrations.source.mysql_strict_encrypt;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.Database;
Expand Down Expand Up @@ -31,7 +32,9 @@ protected void setupEnvironment(final TestDestinationEnv environment) throws Exc
certs = MySqlUtils.getCertificate(container, true);

var sslMode = getSslConfig();

final JsonNode replicationMethod = Jsons.jsonNode(ImmutableMap.builder()
.put("method", "STANDARD")
.build());
config = Jsons.jsonNode(ImmutableMap.builder()
.put(JdbcUtils.HOST_KEY, container.getHost())
.put(JdbcUtils.PORT_KEY, container.getFirstMappedPort())
Expand All @@ -40,7 +43,7 @@ protected void setupEnvironment(final TestDestinationEnv environment) throws Exc
.put(JdbcUtils.PASSWORD_KEY, container.getPassword())
.put(JdbcUtils.SSL_KEY, true)
.put(JdbcUtils.SSL_MODE_KEY, sslMode)
.put("replication_method", ReplicationMethod.STANDARD)
.put("replication_method", replicationMethod)
.build());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import io.airbyte.db.factory.DatabaseDriver;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.integrations.base.ssh.SshHelpers;
import io.airbyte.integrations.source.mysql.MySqlSource;
import io.airbyte.integrations.standardtest.source.SourceAcceptanceTest;
import io.airbyte.integrations.standardtest.source.TestDestinationEnv;
import io.airbyte.protocol.models.CatalogHelpers;
Expand Down Expand Up @@ -46,17 +45,19 @@ protected void setupEnvironment(final TestDestinationEnv environment) throws Exc
container.start();

var sslMode = ImmutableMap.builder()
.put(JdbcUtils.MODE_KEY, "required")
.build();

.put(JdbcUtils.MODE_KEY, "required")
.build();
final JsonNode replicationMethod = Jsons.jsonNode(ImmutableMap.builder()
.put("method", "STANDARD")
.build());
config = Jsons.jsonNode(ImmutableMap.builder()
.put(JdbcUtils.HOST_KEY, container.getHost())
.put(JdbcUtils.PORT_KEY, container.getFirstMappedPort())
.put(JdbcUtils.DATABASE_KEY, container.getDatabaseName())
.put(JdbcUtils.USERNAME_KEY, container.getUsername())
.put(JdbcUtils.PASSWORD_KEY, container.getPassword())
.put(JdbcUtils.SSL_MODE_KEY, sslMode)
.put("replication_method", MySqlSource.ReplicationMethod.STANDARD)
.put("replication_method", replicationMethod)
.build());

try (final DSLContext dslContext = DSLContextFactory.create(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,12 +174,40 @@
]
},
"replication_method": {
"type": "string",
"type": "object",
"title": "Replication Method",
"description": "Replication method which is used for data extraction from the database. STANDARD replication requires no setup on the DB side but will not be able to represent deletions incrementally. CDC uses the Binlog to detect inserts, updates, and deletes. This needs to be configured on the source database itself.",
"description": "Replication method to use for extracting data from the database.",
"order": 8,
"default": "STANDARD",
"enum": ["STANDARD", "CDC"]
"oneOf": [
{
"title": "STANDARD",
"description": "Standard replication requires no setup on the DB side but will not be able to represent deletions incrementally.",
"required": ["method"],
"properties": {
"method": {
"type": "string",
"const": "STANDARD",
"enum": ["STANDARD"],
"default": "STANDARD",
"order": 0
}
}
},
{
"title": "Logical Replication (CDC)",
"description": "CDC uses the Binlog to detect inserts, updates, and deletes. This needs to be configured on the source database itself.",
"required": ["method"],
"properties": {
"method": {
"type": "string",
"const": "CDC",
"enum": ["CDC"],
"default": "CDC",
"order": 0
}
}
}
]
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mysql/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-mysql

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.6.8
LABEL io.airbyte.version=0.6.9
LABEL io.airbyte.name=airbyte/source-mysql
Original file line number Diff line number Diff line change
Expand Up @@ -172,9 +172,16 @@ public JsonNode toDatabaseConfig(final JsonNode config) {
}

private static boolean isCdc(final JsonNode config) {
return config.hasNonNull("replication_method")
&& ReplicationMethod.valueOf(config.get("replication_method").asText())
.equals(ReplicationMethod.CDC);
if (config.hasNonNull("replication_method")) {
if (config.get("replication_method").isTextual()) {
return ReplicationMethod.valueOf(config.get("replication_method").asText())
.equals(ReplicationMethod.CDC);
} else if (config.get("replication_method").isObject()) {
return config.get("replication_method").get("method").asText()
.equals(ReplicationMethod.CDC.name());
}
}
return false;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,12 +181,40 @@
]
},
"replication_method": {
"type": "string",
"type": "object",
"title": "Replication Method",
"description": "Replication method which is used for data extraction from the database. STANDARD replication requires no setup on the DB side but will not be able to represent deletions incrementally. CDC uses the Binlog to detect inserts, updates, and deletes. This needs to be configured on the source database itself.",
"description": "Replication method to use for extracting data from the database.",
"order": 8,
"default": "STANDARD",
"enum": ["STANDARD", "CDC"]
"oneOf": [
{
"title": "STANDARD",
"description": "Standard replication requires no setup on the DB side but will not be able to represent deletions incrementally.",
"required": ["method"],
"properties": {
"method": {
"type": "string",
"const": "STANDARD",
"enum": ["STANDARD"],
"default": "STANDARD",
"order": 0
}
}
},
{
"title": "Logical Replication (CDC)",
"description": "CDC uses the Binlog to detect inserts, updates, and deletes. This needs to be configured on the source database itself.",
"required": ["method"],
"properties": {
"method": {
"type": "string",
"const": "CDC",
"enum": ["CDC"],
"default": "CDC",
"order": 0
}
}
}
]
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.integrations.source.mysql;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.Database;
Expand Down Expand Up @@ -32,7 +33,9 @@ protected void setupEnvironment(final TestDestinationEnv environment) throws Exc
certs = getCertificates();

var sslMode = getSslConfig();

final JsonNode replicationMethod = Jsons.jsonNode(ImmutableMap.builder()
.put("method", "STANDARD")
.build());
config = Jsons.jsonNode(ImmutableMap.builder()
.put(JdbcUtils.HOST_KEY, container.getHost())
.put(JdbcUtils.PORT_KEY, container.getFirstMappedPort())
Expand All @@ -41,7 +44,7 @@ protected void setupEnvironment(final TestDestinationEnv environment) throws Exc
.put(JdbcUtils.PASSWORD_KEY, container.getPassword())
.put(JdbcUtils.SSL_KEY, true)
.put(JdbcUtils.SSL_MODE_KEY, sslMode)
.put("replication_method", ReplicationMethod.STANDARD)
.put("replication_method", replicationMethod)
.build());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,14 +81,16 @@ protected void setupEnvironment(TestDestinationEnv environment) throws Exception
protected Database setupDatabase() throws Exception {
container = new MySQLContainer<>("mysql:8.0");
container.start();

final JsonNode replicationMethod = Jsons.jsonNode(ImmutableMap.builder()
.put("method", "CDC")
.build());
config = Jsons.jsonNode(ImmutableMap.builder()
.put(JdbcUtils.HOST_KEY, container.getHost())
.put(JdbcUtils.PORT_KEY, container.getFirstMappedPort())
.put(JdbcUtils.DATABASE_KEY, container.getDatabaseName())
.put(JdbcUtils.USERNAME_KEY, container.getUsername())
.put(JdbcUtils.PASSWORD_KEY, container.getPassword())
.put("replication_method", MySqlSource.ReplicationMethod.CDC)
.put("replication_method", replicationMethod)
.build());

dslContext = DSLContextFactory.create(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.integrations.source.mysql;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.Database;
Expand All @@ -29,14 +30,16 @@ protected void tearDown(final TestDestinationEnv testEnv) {
protected Database setupDatabase() throws Exception {
container = new MySQLContainer<>("mysql:8.0");
container.start();

final JsonNode replicationMethod = Jsons.jsonNode(ImmutableMap.builder()
.put("method", "CDC")
.build());
config = Jsons.jsonNode(ImmutableMap.builder()
.put(JdbcUtils.HOST_KEY, container.getHost())
.put(JdbcUtils.PORT_KEY, container.getFirstMappedPort())
.put(JdbcUtils.DATABASE_KEY, container.getDatabaseName())
.put(JdbcUtils.USERNAME_KEY, container.getUsername())
.put(JdbcUtils.PASSWORD_KEY, container.getPassword())
.put("replication_method", MySqlSource.ReplicationMethod.CDC)
.put("replication_method", replicationMethod)
.put("snapshot_mode", "initial_only")
.build());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import io.airbyte.db.factory.DatabaseDriver;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.integrations.base.ssh.SshHelpers;
import io.airbyte.integrations.source.mysql.MySqlSource.ReplicationMethod;
import io.airbyte.integrations.standardtest.source.SourceAcceptanceTest;
import io.airbyte.integrations.standardtest.source.TestDestinationEnv;
import io.airbyte.protocol.models.AirbyteMessage;
Expand Down Expand Up @@ -99,14 +98,16 @@ protected JsonNode getState() {
protected void setupEnvironment(final TestDestinationEnv environment) {
container = new MySQLContainer<>("mysql:8.0");
container.start();

final JsonNode replicationMethod = Jsons.jsonNode(ImmutableMap.builder()
.put("method", "CDC")
.build());
config = Jsons.jsonNode(ImmutableMap.builder()
.put(JdbcUtils.HOST_KEY, container.getHost())
.put(JdbcUtils.PORT_KEY, container.getFirstMappedPort())
.put(JdbcUtils.DATABASE_KEY, container.getDatabaseName())
.put(JdbcUtils.USERNAME_KEY, container.getUsername())
.put(JdbcUtils.PASSWORD_KEY, container.getPassword())
.put("replication_method", ReplicationMethod.CDC)
.put("replication_method", replicationMethod)
.build());

revokeAllPermissions();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ protected void setupEnvironment(final TestDestinationEnv environment) throws Exc
.put("client_key", certs.getClientKey())
.put("client_key_password", "Passw0rd")
.build();
final JsonNode replicationMethod = Jsons.jsonNode(ImmutableMap.builder()
.put("method", "CDC")
.build());

config = Jsons.jsonNode(ImmutableMap.builder()
.put(JdbcUtils.HOST_KEY, container.getHost())
Expand All @@ -119,7 +122,7 @@ protected void setupEnvironment(final TestDestinationEnv environment) throws Exc
.put(JdbcUtils.PASSWORD_KEY, container.getPassword())
.put(JdbcUtils.SSL_KEY, true)
.put(JdbcUtils.SSL_MODE_KEY, sslMode)
.put("replication_method", ReplicationMethod.CDC)
.put("replication_method", replicationMethod)
.build());

revokeAllPermissions();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import io.airbyte.db.factory.DatabaseDriver;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.integrations.base.ssh.SshHelpers;
import io.airbyte.integrations.source.mysql.MySqlSource.ReplicationMethod;
import io.airbyte.integrations.standardtest.source.SourceAcceptanceTest;
import io.airbyte.integrations.standardtest.source.TestDestinationEnv;
import io.airbyte.protocol.models.CatalogHelpers;
Expand Down Expand Up @@ -41,14 +40,16 @@ public class MySqlSourceAcceptanceTest extends SourceAcceptanceTest {
protected void setupEnvironment(final TestDestinationEnv environment) throws Exception {
container = new MySQLContainer<>("mysql:8.0");
container.start();

final JsonNode replicationMethod = Jsons.jsonNode(ImmutableMap.builder()
.put("method", "STANDARD")
.build());
config = Jsons.jsonNode(ImmutableMap.builder()
.put(JdbcUtils.HOST_KEY, container.getHost())
.put(JdbcUtils.PORT_KEY, container.getFirstMappedPort())
.put(JdbcUtils.DATABASE_KEY, container.getDatabaseName())
.put(JdbcUtils.USERNAME_KEY, container.getUsername())
.put(JdbcUtils.PASSWORD_KEY, container.getPassword())
.put("replication_method", ReplicationMethod.STANDARD)
.put("replication_method", replicationMethod)
.build());

try (final DSLContext dslContext = DSLContextFactory.create(
Expand Down
Loading

0 comments on commit e9a8a05

Please sign in to comment.