Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MySQL Source : Standardize spec.json for DB connectors that support log-based CDC replication #16216

Merged
merged 28 commits into from
Sep 4, 2022
Merged
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
979fe1f
Fixed bucket naming for S3
VitaliiMaltsev Aug 1, 2022
7d2963a
Merge branch 'master' of https://github.com/airbytehq/airbyte
VitaliiMaltsev Aug 1, 2022
e921180
Merge branch 'master' of https://github.com/airbytehq/airbyte
VitaliiMaltsev Aug 2, 2022
0e6f7df
Merge branch 'master' of https://github.com/airbytehq/airbyte
VitaliiMaltsev Aug 7, 2022
7924657
Merge branch 'master' of https://github.com/airbytehq/airbyte
VitaliiMaltsev Aug 8, 2022
b5c1106
Merge branch 'master' of https://github.com/airbytehq/airbyte
VitaliiMaltsev Aug 10, 2022
fc34cfb
Merge branch 'master' of https://github.com/airbytehq/airbyte
VitaliiMaltsev Aug 11, 2022
8ae35a5
Merge branch 'master' of https://github.com/airbytehq/airbyte
VitaliiMaltsev Aug 11, 2022
e5e1d12
Merge branch 'master' of https://github.com/airbytehq/airbyte
VitaliiMaltsev Aug 11, 2022
f003cc6
Merge branch 'master' of https://github.com/airbytehq/airbyte
VitaliiMaltsev Aug 15, 2022
a526c81
Merge branch 'master' of https://github.com/airbytehq/airbyte
VitaliiMaltsev Aug 15, 2022
b9a4c3d
Merge branch 'master' of https://github.com/airbytehq/airbyte
VitaliiMaltsev Aug 17, 2022
3917237
Merge remote-tracking branch 'origin/master'
VitaliiMaltsev Aug 17, 2022
aeae719
removed redundant configs
VitaliiMaltsev Aug 17, 2022
9ec1b7a
Merge branch 'master' of https://github.com/airbytehq/airbyte
VitaliiMaltsev Aug 19, 2022
ebbcedf
Merge branch 'master' of https://github.com/airbytehq/airbyte
VitaliiMaltsev Aug 22, 2022
c2a123b
Merge branch 'master' of https://github.com/airbytehq/airbyte
VitaliiMaltsev Sep 1, 2022
28875bd
Merge branch 'master' of https://github.com/airbytehq/airbyte
VitaliiMaltsev Sep 1, 2022
6badf97
MySQL Source : Standardize spec.json for DB connectors that support l…
VitaliiMaltsev Sep 1, 2022
abe4d34
fixed strict encrypt tests
VitaliiMaltsev Sep 1, 2022
e7c2c7d
fixed mysql tests
VitaliiMaltsev Sep 1, 2022
1c668a7
Merge branch 'master' into vmaltsev/12917-mysql-source-standardize-spec
VitaliiMaltsev Sep 1, 2022
b3c7e13
Merge branch 'master' of https://github.com/airbytehq/airbyte
VitaliiMaltsev Sep 1, 2022
e9b8486
Merge branch 'master' of https://github.com/airbytehq/airbyte
VitaliiMaltsev Sep 3, 2022
5af7f2c
Merge branch 'master' into vmaltsev/12917-mysql-source-standardize-spec
VitaliiMaltsev Sep 3, 2022
cc46bbf
bump version
VitaliiMaltsev Sep 3, 2022
51509b5
Merge remote-tracking branch 'origin/vmaltsev/12917-mysql-source-stan…
VitaliiMaltsev Sep 3, 2022
537ff1f
auto-bump connector version [ci skip]
octavia-squidington-iii Sep 4, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -647,7 +647,7 @@
- name: MySQL
sourceDefinitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
dockerRepository: airbyte/source-mysql
dockerImageTag: 0.6.8
dockerImageTag: 0.6.9
documentationUrl: https://docs.airbyte.io/integrations/sources/mysql
icon: mysql.svg
sourceType: database
41 changes: 30 additions & 11 deletions airbyte-config/init/src/main/resources/seed/source_specs.yaml
Original file line number Diff line number Diff line change
@@ -6048,7 +6048,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-mysql:0.6.8"
- dockerImage: "airbyte/source-mysql:0.6.9"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/mysql"
connectionSpecification:
@@ -6236,18 +6236,37 @@
airbyte_secret: true
order: 4
replication_method:
type: "string"
type: "object"
title: "Replication Method"
description: "Replication method which is used for data extraction from\
\ the database. STANDARD replication requires no setup on the DB side\
\ but will not be able to represent deletions incrementally. CDC uses\
\ the Binlog to detect inserts, updates, and deletes. This needs to be\
\ configured on the source database itself."
description: "Replication method to use for extracting data from the database."
order: 8
default: "STANDARD"
enum:
- "STANDARD"
- "CDC"
oneOf:
- title: "STANDARD"
description: "Standard replication requires no setup on the DB side but\
\ will not be able to represent deletions incrementally."
required:
- "method"
properties:
method:
type: "string"
const: "STANDARD"
enum:
- "STANDARD"
default: "STANDARD"
order: 0
- title: "Logical Replication (CDC)"
description: "CDC uses the Binlog to detect inserts, updates, and deletes.\
\ This needs to be configured on the source database itself."
required:
- "method"
properties:
method:
type: "string"
const: "CDC"
enum:
- "CDC"
default: "CDC"
order: 0
tunnel_method:
type: "object"
title: "SSH Tunnel Method"
Original file line number Diff line number Diff line change
@@ -16,5 +16,5 @@ ENV APPLICATION source-mysql-strict-encrypt

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.6.8
LABEL io.airbyte.version=0.6.9
LABEL io.airbyte.name=airbyte/source-mysql-strict-encrypt
Original file line number Diff line number Diff line change
@@ -4,6 +4,7 @@

package io.airbyte.integrations.source.mysql_strict_encrypt;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.Database;
@@ -31,7 +32,9 @@ protected void setupEnvironment(final TestDestinationEnv environment) throws Exc
certs = MySqlUtils.getCertificate(container, true);

var sslMode = getSslConfig();

final JsonNode replicationMethod = Jsons.jsonNode(ImmutableMap.builder()
.put("method", "STANDARD")
.build());
config = Jsons.jsonNode(ImmutableMap.builder()
.put(JdbcUtils.HOST_KEY, container.getHost())
.put(JdbcUtils.PORT_KEY, container.getFirstMappedPort())
@@ -40,7 +43,7 @@ protected void setupEnvironment(final TestDestinationEnv environment) throws Exc
.put(JdbcUtils.PASSWORD_KEY, container.getPassword())
.put(JdbcUtils.SSL_KEY, true)
.put(JdbcUtils.SSL_MODE_KEY, sslMode)
.put("replication_method", ReplicationMethod.STANDARD)
.put("replication_method", replicationMethod)
.build());
}

Original file line number Diff line number Diff line change
@@ -16,7 +16,6 @@
import io.airbyte.db.factory.DatabaseDriver;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.integrations.base.ssh.SshHelpers;
import io.airbyte.integrations.source.mysql.MySqlSource;
import io.airbyte.integrations.standardtest.source.SourceAcceptanceTest;
import io.airbyte.integrations.standardtest.source.TestDestinationEnv;
import io.airbyte.protocol.models.CatalogHelpers;
@@ -46,17 +45,19 @@ protected void setupEnvironment(final TestDestinationEnv environment) throws Exc
container.start();

var sslMode = ImmutableMap.builder()
.put(JdbcUtils.MODE_KEY, "required")
.build();

.put(JdbcUtils.MODE_KEY, "required")
.build();
final JsonNode replicationMethod = Jsons.jsonNode(ImmutableMap.builder()
.put("method", "STANDARD")
.build());
config = Jsons.jsonNode(ImmutableMap.builder()
.put(JdbcUtils.HOST_KEY, container.getHost())
.put(JdbcUtils.PORT_KEY, container.getFirstMappedPort())
.put(JdbcUtils.DATABASE_KEY, container.getDatabaseName())
.put(JdbcUtils.USERNAME_KEY, container.getUsername())
.put(JdbcUtils.PASSWORD_KEY, container.getPassword())
.put(JdbcUtils.SSL_MODE_KEY, sslMode)
.put("replication_method", MySqlSource.ReplicationMethod.STANDARD)
.put("replication_method", replicationMethod)
.build());

try (final DSLContext dslContext = DSLContextFactory.create(
Original file line number Diff line number Diff line change
@@ -174,12 +174,40 @@
]
},
"replication_method": {
"type": "string",
"type": "object",
"title": "Replication Method",
"description": "Replication method which is used for data extraction from the database. STANDARD replication requires no setup on the DB side but will not be able to represent deletions incrementally. CDC uses the Binlog to detect inserts, updates, and deletes. This needs to be configured on the source database itself.",
"description": "Replication method to use for extracting data from the database.",
"order": 8,
"default": "STANDARD",
"enum": ["STANDARD", "CDC"]
"oneOf": [
{
"title": "STANDARD",
"description": "Standard replication requires no setup on the DB side but will not be able to represent deletions incrementally.",
"required": ["method"],
"properties": {
"method": {
"type": "string",
"const": "STANDARD",
"enum": ["STANDARD"],
"default": "STANDARD",
"order": 0
}
}
},
{
"title": "Logical Replication (CDC)",
"description": "CDC uses the Binlog to detect inserts, updates, and deletes. This needs to be configured on the source database itself.",
"required": ["method"],
"properties": {
"method": {
"type": "string",
"const": "CDC",
"enum": ["CDC"],
"default": "CDC",
"order": 0
}
}
}
]
}
}
}
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mysql/Dockerfile
Original file line number Diff line number Diff line change
@@ -16,5 +16,5 @@ ENV APPLICATION source-mysql

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.6.8
LABEL io.airbyte.version=0.6.9
LABEL io.airbyte.name=airbyte/source-mysql
Original file line number Diff line number Diff line change
@@ -172,9 +172,16 @@ public JsonNode toDatabaseConfig(final JsonNode config) {
}

private static boolean isCdc(final JsonNode config) {
return config.hasNonNull("replication_method")
&& ReplicationMethod.valueOf(config.get("replication_method").asText())
.equals(ReplicationMethod.CDC);
if (config.hasNonNull("replication_method")) {
if (config.get("replication_method").isTextual()) {
return ReplicationMethod.valueOf(config.get("replication_method").asText())
.equals(ReplicationMethod.CDC);
} else if (config.get("replication_method").isObject()) {
return config.get("replication_method").get("method").asText()
.equals(ReplicationMethod.CDC.name());
}
}
return false;
}

@Override
Original file line number Diff line number Diff line change
@@ -181,12 +181,40 @@
]
},
"replication_method": {
"type": "string",
"type": "object",
"title": "Replication Method",
"description": "Replication method which is used for data extraction from the database. STANDARD replication requires no setup on the DB side but will not be able to represent deletions incrementally. CDC uses the Binlog to detect inserts, updates, and deletes. This needs to be configured on the source database itself.",
"description": "Replication method to use for extracting data from the database.",
"order": 8,
"default": "STANDARD",
"enum": ["STANDARD", "CDC"]
"oneOf": [
{
"title": "STANDARD",
"description": "Standard replication requires no setup on the DB side but will not be able to represent deletions incrementally.",
"required": ["method"],
"properties": {
"method": {
"type": "string",
"const": "STANDARD",
"enum": ["STANDARD"],
"default": "STANDARD",
"order": 0
}
}
},
{
"title": "Logical Replication (CDC)",
"description": "CDC uses the Binlog to detect inserts, updates, and deletes. This needs to be configured on the source database itself.",
"required": ["method"],
"properties": {
"method": {
"type": "string",
"const": "CDC",
"enum": ["CDC"],
"default": "CDC",
"order": 0
}
}
}
]
}
}
}
Original file line number Diff line number Diff line change
@@ -4,6 +4,7 @@

package io.airbyte.integrations.source.mysql;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.Database;
@@ -32,7 +33,9 @@ protected void setupEnvironment(final TestDestinationEnv environment) throws Exc
certs = getCertificates();

var sslMode = getSslConfig();

final JsonNode replicationMethod = Jsons.jsonNode(ImmutableMap.builder()
.put("method", "STANDARD")
.build());
config = Jsons.jsonNode(ImmutableMap.builder()
.put(JdbcUtils.HOST_KEY, container.getHost())
.put(JdbcUtils.PORT_KEY, container.getFirstMappedPort())
@@ -41,7 +44,7 @@ protected void setupEnvironment(final TestDestinationEnv environment) throws Exc
.put(JdbcUtils.PASSWORD_KEY, container.getPassword())
.put(JdbcUtils.SSL_KEY, true)
.put(JdbcUtils.SSL_MODE_KEY, sslMode)
.put("replication_method", ReplicationMethod.STANDARD)
.put("replication_method", replicationMethod)
.build());
}

Original file line number Diff line number Diff line change
@@ -81,14 +81,16 @@ protected void setupEnvironment(TestDestinationEnv environment) throws Exception
protected Database setupDatabase() throws Exception {
container = new MySQLContainer<>("mysql:8.0");
container.start();

final JsonNode replicationMethod = Jsons.jsonNode(ImmutableMap.builder()
.put("method", "CDC")
.build());
config = Jsons.jsonNode(ImmutableMap.builder()
.put(JdbcUtils.HOST_KEY, container.getHost())
.put(JdbcUtils.PORT_KEY, container.getFirstMappedPort())
.put(JdbcUtils.DATABASE_KEY, container.getDatabaseName())
.put(JdbcUtils.USERNAME_KEY, container.getUsername())
.put(JdbcUtils.PASSWORD_KEY, container.getPassword())
.put("replication_method", MySqlSource.ReplicationMethod.CDC)
.put("replication_method", replicationMethod)
.build());

dslContext = DSLContextFactory.create(
Original file line number Diff line number Diff line change
@@ -4,6 +4,7 @@

package io.airbyte.integrations.source.mysql;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.Database;
@@ -29,14 +30,16 @@ protected void tearDown(final TestDestinationEnv testEnv) {
protected Database setupDatabase() throws Exception {
container = new MySQLContainer<>("mysql:8.0");
container.start();

final JsonNode replicationMethod = Jsons.jsonNode(ImmutableMap.builder()
.put("method", "CDC")
.build());
config = Jsons.jsonNode(ImmutableMap.builder()
.put(JdbcUtils.HOST_KEY, container.getHost())
.put(JdbcUtils.PORT_KEY, container.getFirstMappedPort())
.put(JdbcUtils.DATABASE_KEY, container.getDatabaseName())
.put(JdbcUtils.USERNAME_KEY, container.getUsername())
.put(JdbcUtils.PASSWORD_KEY, container.getPassword())
.put("replication_method", MySqlSource.ReplicationMethod.CDC)
.put("replication_method", replicationMethod)
.put("snapshot_mode", "initial_only")
.build());

Original file line number Diff line number Diff line change
@@ -18,7 +18,6 @@
import io.airbyte.db.factory.DatabaseDriver;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.integrations.base.ssh.SshHelpers;
import io.airbyte.integrations.source.mysql.MySqlSource.ReplicationMethod;
import io.airbyte.integrations.standardtest.source.SourceAcceptanceTest;
import io.airbyte.integrations.standardtest.source.TestDestinationEnv;
import io.airbyte.protocol.models.AirbyteMessage;
@@ -99,14 +98,16 @@ protected JsonNode getState() {
protected void setupEnvironment(final TestDestinationEnv environment) {
container = new MySQLContainer<>("mysql:8.0");
container.start();

final JsonNode replicationMethod = Jsons.jsonNode(ImmutableMap.builder()
.put("method", "CDC")
.build());
config = Jsons.jsonNode(ImmutableMap.builder()
.put(JdbcUtils.HOST_KEY, container.getHost())
.put(JdbcUtils.PORT_KEY, container.getFirstMappedPort())
.put(JdbcUtils.DATABASE_KEY, container.getDatabaseName())
.put(JdbcUtils.USERNAME_KEY, container.getUsername())
.put(JdbcUtils.PASSWORD_KEY, container.getPassword())
.put("replication_method", ReplicationMethod.CDC)
.put("replication_method", replicationMethod)
.build());

revokeAllPermissions();
Original file line number Diff line number Diff line change
@@ -110,6 +110,9 @@ protected void setupEnvironment(final TestDestinationEnv environment) throws Exc
.put("client_key", certs.getClientKey())
.put("client_key_password", "Passw0rd")
.build();
final JsonNode replicationMethod = Jsons.jsonNode(ImmutableMap.builder()
.put("method", "CDC")
.build());

config = Jsons.jsonNode(ImmutableMap.builder()
.put(JdbcUtils.HOST_KEY, container.getHost())
@@ -119,7 +122,7 @@ protected void setupEnvironment(final TestDestinationEnv environment) throws Exc
.put(JdbcUtils.PASSWORD_KEY, container.getPassword())
.put(JdbcUtils.SSL_KEY, true)
.put(JdbcUtils.SSL_MODE_KEY, sslMode)
.put("replication_method", ReplicationMethod.CDC)
.put("replication_method", replicationMethod)
.build());

revokeAllPermissions();
Original file line number Diff line number Diff line change
@@ -13,7 +13,6 @@
import io.airbyte.db.factory.DatabaseDriver;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.integrations.base.ssh.SshHelpers;
import io.airbyte.integrations.source.mysql.MySqlSource.ReplicationMethod;
import io.airbyte.integrations.standardtest.source.SourceAcceptanceTest;
import io.airbyte.integrations.standardtest.source.TestDestinationEnv;
import io.airbyte.protocol.models.CatalogHelpers;
@@ -41,14 +40,16 @@ public class MySqlSourceAcceptanceTest extends SourceAcceptanceTest {
protected void setupEnvironment(final TestDestinationEnv environment) throws Exception {
container = new MySQLContainer<>("mysql:8.0");
container.start();

final JsonNode replicationMethod = Jsons.jsonNode(ImmutableMap.builder()
.put("method", "STANDARD")
.build());
config = Jsons.jsonNode(ImmutableMap.builder()
.put(JdbcUtils.HOST_KEY, container.getHost())
.put(JdbcUtils.PORT_KEY, container.getFirstMappedPort())
.put(JdbcUtils.DATABASE_KEY, container.getDatabaseName())
.put(JdbcUtils.USERNAME_KEY, container.getUsername())
.put(JdbcUtils.PASSWORD_KEY, container.getPassword())
.put("replication_method", ReplicationMethod.STANDARD)
.put("replication_method", replicationMethod)
.build());

try (final DSLContext dslContext = DSLContextFactory.create(
Loading