From e1701c97ebcc67f876db3500f1e63e42a89c2645 Mon Sep 17 00:00:00 2001 From: Akash Kulkarni <113392464+akashkulk@users.noreply.github.com> Date: Wed, 12 Oct 2022 09:53:25 -0700 Subject: [PATCH] =?UTF-8?q?=F0=9F=8E=89=20=20MySQL=20Source=20:=20Expose?= =?UTF-8?q?=20serverTimezone=20debezium=20option=20via=20MySQL=20Source=20?= =?UTF-8?q?spec=20for=20CDC=20(#17815)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Expose serverTimezone debezium option via MySQL Source spec for CDC * Fix tests * Bump dockerfile version * Update Dockerfile * Update mysql.md * Update mysql.md * Documentation * Code Review Comments * Addressing comments * Addressing comments * Addressing comments * auto-bump connector version [ci skip] Co-authored-by: Octavia Squidington III --- .../resources/seed/source_definitions.yaml | 2 +- .../src/main/resources/seed/source_specs.yaml | 11 ++++++++-- .../source-mysql-strict-encrypt/Dockerfile | 2 +- .../src/test/resources/expected_spec.json | 10 +++++++-- .../connectors/source-mysql/Dockerfile | 2 +- .../source/mysql/MySqlCdcProperties.java | 11 +++++++++- .../source/mysql/MySqlSource.java | 1 + .../mysql/helpers/CdcConfigurationHelper.java | 22 +++++++++++++++++++ .../source-mysql/src/main/resources/spec.json | 10 +++++++-- .../mysql/CdcConfigurationHelperTest.java | 14 ++++++++++++ .../source/mysql/CdcMysqlSourceTest.java | 1 + docs/integrations/sources/mysql.md | 12 ++++++++++ 12 files changed, 88 insertions(+), 10 deletions(-) diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index c875770b9b59..f5b6b96539fa 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -678,7 +678,7 @@ - name: MySQL sourceDefinitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad dockerRepository: airbyte/source-mysql - dockerImageTag: 1.0.3 + dockerImageTag: 1.0.4 documentationUrl: https://docs.airbyte.com/integrations/sources/mysql icon: mysql.svg sourceType: database diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index 742ade1acb11..a67eff29f083 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -6964,7 +6964,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-mysql:1.0.3" +- dockerImage: "airbyte/source-mysql:1.0.4" spec: documentationUrl: "https://docs.airbyte.com/integrations/sources/mysql" connectionSpecification: @@ -7194,9 +7194,16 @@ \ initial waiting time." default: 300 - order: 4 min: 120 max: 1200 + order: 1 + server_time_zone: + type: "string" + title: "Configured server timezone for the MySQL source (Advanced)" + description: "Enter the configured MySQL server timezone. This should\ + \ only be done if the configured timezone in your MySQL instance\ + \ does not conform to IANNA standard." + order: 2 tunnel_method: type: "object" title: "SSH Tunnel Method" diff --git a/airbyte-integrations/connectors/source-mysql-strict-encrypt/Dockerfile b/airbyte-integrations/connectors/source-mysql-strict-encrypt/Dockerfile index a185d075b585..70596dc34839 100644 --- a/airbyte-integrations/connectors/source-mysql-strict-encrypt/Dockerfile +++ b/airbyte-integrations/connectors/source-mysql-strict-encrypt/Dockerfile @@ -17,6 +17,6 @@ ENV APPLICATION source-mysql-strict-encrypt COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=1.0.3 +LABEL io.airbyte.version=1.0.4 LABEL io.airbyte.name=airbyte/source-mysql-strict-encrypt diff --git a/airbyte-integrations/connectors/source-mysql-strict-encrypt/src/test/resources/expected_spec.json b/airbyte-integrations/connectors/source-mysql-strict-encrypt/src/test/resources/expected_spec.json index 2336c8b7f893..870f0a66f303 100644 --- a/airbyte-integrations/connectors/source-mysql-strict-encrypt/src/test/resources/expected_spec.json +++ b/airbyte-integrations/connectors/source-mysql-strict-encrypt/src/test/resources/expected_spec.json @@ -210,9 +210,15 @@ "title": "Initial Waiting Time in Seconds (Advanced)", "description": "The amount of time the connector will wait when it launches to determine if there is new data to sync or not. Defaults to 300 seconds. Valid range: 120 seconds to 1200 seconds. Read about initial waiting time.", "default": 300, - "order": 4, "min": 120, - "max": 1200 + "max": 1200, + "order": 1 + }, + "server_time_zone": { + "type": "string", + "title": "Configured server timezone for the MySQL source (Advanced)", + "description": "Enter the configured MySQL server timezone. This should only be done if the configured timezone in your MySQL instance does not conform to IANNA standard.", + "order": 2 } } } diff --git a/airbyte-integrations/connectors/source-mysql/Dockerfile b/airbyte-integrations/connectors/source-mysql/Dockerfile index 1a2ee2230018..0d6a6b405691 100644 --- a/airbyte-integrations/connectors/source-mysql/Dockerfile +++ b/airbyte-integrations/connectors/source-mysql/Dockerfile @@ -16,6 +16,6 @@ ENV APPLICATION source-mysql COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=1.0.3 +LABEL io.airbyte.version=1.0.4 LABEL io.airbyte.name=airbyte/source-mysql diff --git a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlCdcProperties.java b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlCdcProperties.java index edb7b839ccbf..82bca2a517d9 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlCdcProperties.java +++ b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlCdcProperties.java @@ -32,6 +32,7 @@ static Properties getDebeziumProperties(final JdbcDatabase database) { // https://debezium.io/documentation/reference/1.9/connectors/mysql.html#mysql-property-snapshot-mode props.setProperty("snapshot.mode", "when_needed"); } + return props; } @@ -52,6 +53,15 @@ private static Properties commonProperties(final JdbcDatabase database) { props.setProperty("boolean.type", "io.debezium.connector.mysql.converters.TinyIntOneToBooleanConverter"); props.setProperty("datetime.type", "io.airbyte.integrations.debezium.internals.MySQLDateTimeConverter"); + // For CDC mode, the user cannot provide timezone arguments as JDBC parameters - they are specifically defined in the replication_method + // config. + if (sourceConfig.get("replication_method").has("server_time_zone")) { + final String serverTimeZone = sourceConfig.get("replication_method").get("server_time_zone").asText(); + if (!serverTimeZone.isEmpty()) { + props.setProperty("database.serverTimezone", serverTimeZone); + } + } + // Check params for SSL connection in config and add properties for CDC SSL connection // https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-property-database-ssl-mode if (!sourceConfig.has(JdbcUtils.SSL_KEY) || sourceConfig.get(JdbcUtils.SSL_KEY).asBoolean()) { @@ -111,7 +121,6 @@ private static Properties commonProperties(final JdbcDatabase database) { props.setProperty("database.include.list", sourceConfig.get("database").asText()); return props; - } static Properties getSnapshotProperties(final JdbcDatabase database) { diff --git a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java index b1268f1c8630..6cd1cb72432f 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java +++ b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java @@ -125,6 +125,7 @@ public List> getCheckOperations(final J checkOperations.add(database -> { CdcConfigurationHelper.checkFirstRecordWaitTime(config); + CdcConfigurationHelper.checkServerTimeZoneConfig(config); }); } return checkOperations; diff --git a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/helpers/CdcConfigurationHelper.java b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/helpers/CdcConfigurationHelper.java index 555544107260..303efaa43242 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/helpers/CdcConfigurationHelper.java +++ b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/helpers/CdcConfigurationHelper.java @@ -5,9 +5,11 @@ package io.airbyte.integrations.source.mysql.helpers; import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.annotations.VisibleForTesting; import io.airbyte.commons.functional.CheckedConsumer; import io.airbyte.db.jdbc.JdbcDatabase; import java.time.Duration; +import java.time.ZoneId; import java.util.List; import java.util.Optional; import org.slf4j.Logger; @@ -74,6 +76,26 @@ public static Optional getFirstRecordWaitSeconds(final JsonNode config) return Optional.empty(); } + private static Optional getCdcServerTimezone(final JsonNode config) { + final JsonNode replicationMethod = config.get("replication_method"); + if (replicationMethod != null && replicationMethod.has("server_time_zone")) { + final String serverTimeZone = config.get("replication_method").get("server_time_zone").asText(); + return Optional.of(serverTimeZone); + } + return Optional.empty(); + } + + public static void checkServerTimeZoneConfig(final JsonNode config) { + final Optional serverTimeZone = getCdcServerTimezone(config); + if (serverTimeZone.isPresent()) { + final String timeZone = serverTimeZone.get(); + if (!timeZone.isEmpty() && !ZoneId.getAvailableZoneIds().contains((timeZone))) { + throw new IllegalArgumentException(String.format("Given timezone %s is not valid. The given timezone must conform to the IANNA standard. " + + "See https://www.iana.org/time-zones for more details", serverTimeZone.get())); + } + } + } + public static void checkFirstRecordWaitTime(final JsonNode config) { // we need to skip the check because in tests, we set initial_waiting_seconds // to 5 seconds for performance reasons, which is shorter than the minimum diff --git a/airbyte-integrations/connectors/source-mysql/src/main/resources/spec.json b/airbyte-integrations/connectors/source-mysql/src/main/resources/spec.json index 456f50136b42..c1a6f48b4e87 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/resources/spec.json +++ b/airbyte-integrations/connectors/source-mysql/src/main/resources/spec.json @@ -217,9 +217,15 @@ "title": "Initial Waiting Time in Seconds (Advanced)", "description": "The amount of time the connector will wait when it launches to determine if there is new data to sync or not. Defaults to 300 seconds. Valid range: 120 seconds to 1200 seconds. Read about initial waiting time.", "default": 300, - "order": 4, "min": 120, - "max": 1200 + "max": 1200, + "order": 1 + }, + "server_time_zone": { + "type": "string", + "title": "Configured server timezone for the MySQL source (Advanced)", + "description": "Enter the configured MySQL server timezone. This should only be done if the configured timezone in your MySQL instance does not conform to IANNA standard.", + "order": 2 } } } diff --git a/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/CdcConfigurationHelperTest.java b/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/CdcConfigurationHelperTest.java index 594252bdd6c1..2c15cfd2b456 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/CdcConfigurationHelperTest.java +++ b/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/CdcConfigurationHelperTest.java @@ -49,4 +49,18 @@ void testGetFirstRecordWaitTime() { assertEquals(MAX_FIRST_RECORD_WAIT_TIME, CdcConfigurationHelper.getFirstRecordWaitTime(tooLongConfig)); } + @Test + void testServerTimeConfig() { + final JsonNode emptyConfig = Jsons.jsonNode(Collections.emptyMap()); + assertDoesNotThrow(() -> CdcConfigurationHelper.checkServerTimeZoneConfig(emptyConfig)); + + final JsonNode normalConfig = Jsons.jsonNode(Map.of("replication_method", + Map.of("method", "CDC", "server_time_zone", "America/Los_Angeles"))); + assertDoesNotThrow(() -> CdcConfigurationHelper.checkServerTimeZoneConfig(normalConfig)); + + final JsonNode invalidConfig = Jsons.jsonNode(Map.of("replication_method", + Map.of("method", "CDC", "server_time_zone", "CEST"))); + assertThrows(IllegalArgumentException.class, () -> CdcConfigurationHelper.checkServerTimeZoneConfig(invalidConfig)); + } + } diff --git a/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/CdcMysqlSourceTest.java b/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/CdcMysqlSourceTest.java index db4adfdc2905..128ef25def1d 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/CdcMysqlSourceTest.java +++ b/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/CdcMysqlSourceTest.java @@ -82,6 +82,7 @@ private void init() { final JsonNode replicationMethod = Jsons.jsonNode(ImmutableMap.builder() .put("method", "CDC") .put("initial_waiting_seconds", INITIAL_WAITING_SECONDS) + .put("time_zone", "America/Los_Angeles") .build()); config = Jsons.jsonNode(ImmutableMap.builder() diff --git a/docs/integrations/sources/mysql.md b/docs/integrations/sources/mysql.md index 70d1314b1daa..3d84b175170b 100644 --- a/docs/integrations/sources/mysql.md +++ b/docs/integrations/sources/mysql.md @@ -124,6 +124,17 @@ The connector waits for the default initial wait time of 5 minutes (300 seconds) If you know there are database changes to be synced, but the connector cannot read those changes, the root cause may be insufficient waiting time. In that case, you can increase the waiting time (example: set to 600 seconds) to test if it is indeed the root cause. On the other hand, if you know there are no database changes, you can decrease the wait time to speed up the zero record syncs. +**4. Set up server timezone\(Optional\)** + +:::warning +This is an advanced feature. Use it if absolutely necessary. +::: + +In CDC mode, the MySQl connector may need a timezone configured if the existing MySQL database been set up with a system timezone that is not recognized by the [IANA Timezone Database](https://www.iana.org/time-zones). + +In this case, you can configure the server timezone to the equivalent IANA timezone compliant timezone. (e.g. CEST -> Europe/Berlin). + + **Note** When a sync runs for the first time using CDC, Airbyte performs an initial consistent snapshot of your database. Airbyte doesn't acquire any table locks \(for tables defined with MyISAM engine, the tables would still be locked\) while creating the snapshot to allow writes by other database clients. But in order for the sync to work without any error/unexpected behaviour, it is assumed that no schema changes are happening while the snapshot is running. @@ -240,6 +251,7 @@ WHERE actor_definition_id ='435bb9a5-7887-4809-aa58-28c27df0d7ad' AND (configura ## Changelog | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 1.0.4 | 2022-10-11 | [17815](https://github.com/airbytehq/airbyte/pull/17815) | Expose setting server timezone for CDC syncs | | 1.0.3 | 2022-10-07 | [17236](https://github.com/airbytehq/airbyte/pull/17236) | Fix large table issue by fetch size | | 1.0.2 | 2022-10-03 | [17170](https://github.com/airbytehq/airbyte/pull/17170) | Make initial CDC waiting time configurable | | 1.0.1 | 2022-10-01 | [17459](https://github.com/airbytehq/airbyte/pull/17459) | Upgrade debezium version to 1.9.6 from 1.9.2 |