Skip to content

Commit

Permalink
🎉 MySQL Source : Expose serverTimezone debezium option via MySQL Sour…
Browse files Browse the repository at this point in the history
…ce spec for CDC (#17815)

* Expose serverTimezone debezium option via MySQL Source spec for CDC

* Fix tests

* Bump dockerfile version

* Update Dockerfile

* Update mysql.md

* Update mysql.md

* Documentation

* Code Review Comments

* Addressing comments

* Addressing comments

* Addressing comments

* auto-bump connector version [ci skip]

Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
  • Loading branch information
akashkulk and octavia-squidington-iii authored Oct 12, 2022
1 parent de8895b commit e1701c9
Show file tree
Hide file tree
Showing 12 changed files with 88 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -678,7 +678,7 @@
- name: MySQL
sourceDefinitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
dockerRepository: airbyte/source-mysql
dockerImageTag: 1.0.3
dockerImageTag: 1.0.4
documentationUrl: https://docs.airbyte.com/integrations/sources/mysql
icon: mysql.svg
sourceType: database
Expand Down
11 changes: 9 additions & 2 deletions airbyte-config/init/src/main/resources/seed/source_specs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6964,7 +6964,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-mysql:1.0.3"
- dockerImage: "airbyte/source-mysql:1.0.4"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/mysql"
connectionSpecification:
Expand Down Expand Up @@ -7194,9 +7194,16 @@
\ <a href=\"https://docs.airbyte.com/integrations/sources/mysql/#change-data-capture-cdc\"\
>initial waiting time</a>."
default: 300
order: 4
min: 120
max: 1200
order: 1
server_time_zone:
type: "string"
title: "Configured server timezone for the MySQL source (Advanced)"
description: "Enter the configured MySQL server timezone. This should\
\ only be done if the configured timezone in your MySQL instance\
\ does not conform to IANNA standard."
order: 2
tunnel_method:
type: "object"
title: "SSH Tunnel Method"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,6 @@ ENV APPLICATION source-mysql-strict-encrypt
COPY --from=build /airbyte /airbyte


LABEL io.airbyte.version=1.0.3
LABEL io.airbyte.version=1.0.4

LABEL io.airbyte.name=airbyte/source-mysql-strict-encrypt
Original file line number Diff line number Diff line change
Expand Up @@ -210,9 +210,15 @@
"title": "Initial Waiting Time in Seconds (Advanced)",
"description": "The amount of time the connector will wait when it launches to determine if there is new data to sync or not. Defaults to 300 seconds. Valid range: 120 seconds to 1200 seconds. Read about <a href=\"https://docs.airbyte.com/integrations/sources/mysql/#change-data-capture-cdc\">initial waiting time</a>.",
"default": 300,
"order": 4,
"min": 120,
"max": 1200
"max": 1200,
"order": 1
},
"server_time_zone": {
"type": "string",
"title": "Configured server timezone for the MySQL source (Advanced)",
"description": "Enter the configured MySQL server timezone. This should only be done if the configured timezone in your MySQL instance does not conform to IANNA standard.",
"order": 2
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mysql/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,6 @@ ENV APPLICATION source-mysql

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=1.0.3
LABEL io.airbyte.version=1.0.4

LABEL io.airbyte.name=airbyte/source-mysql
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ static Properties getDebeziumProperties(final JdbcDatabase database) {
// https://debezium.io/documentation/reference/1.9/connectors/mysql.html#mysql-property-snapshot-mode
props.setProperty("snapshot.mode", "when_needed");
}

return props;
}

Expand All @@ -52,6 +53,15 @@ private static Properties commonProperties(final JdbcDatabase database) {
props.setProperty("boolean.type", "io.debezium.connector.mysql.converters.TinyIntOneToBooleanConverter");
props.setProperty("datetime.type", "io.airbyte.integrations.debezium.internals.MySQLDateTimeConverter");

// For CDC mode, the user cannot provide timezone arguments as JDBC parameters - they are specifically defined in the replication_method
// config.
if (sourceConfig.get("replication_method").has("server_time_zone")) {
final String serverTimeZone = sourceConfig.get("replication_method").get("server_time_zone").asText();
if (!serverTimeZone.isEmpty()) {
props.setProperty("database.serverTimezone", serverTimeZone);
}
}

// Check params for SSL connection in config and add properties for CDC SSL connection
// https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-property-database-ssl-mode
if (!sourceConfig.has(JdbcUtils.SSL_KEY) || sourceConfig.get(JdbcUtils.SSL_KEY).asBoolean()) {
Expand Down Expand Up @@ -111,7 +121,6 @@ private static Properties commonProperties(final JdbcDatabase database) {
props.setProperty("database.include.list", sourceConfig.get("database").asText());

return props;

}

static Properties getSnapshotProperties(final JdbcDatabase database) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ public List<CheckedConsumer<JdbcDatabase, Exception>> getCheckOperations(final J

checkOperations.add(database -> {
CdcConfigurationHelper.checkFirstRecordWaitTime(config);
CdcConfigurationHelper.checkServerTimeZoneConfig(config);
});
}
return checkOperations;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
package io.airbyte.integrations.source.mysql.helpers;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import io.airbyte.commons.functional.CheckedConsumer;
import io.airbyte.db.jdbc.JdbcDatabase;
import java.time.Duration;
import java.time.ZoneId;
import java.util.List;
import java.util.Optional;
import org.slf4j.Logger;
Expand Down Expand Up @@ -74,6 +76,26 @@ public static Optional<Integer> getFirstRecordWaitSeconds(final JsonNode config)
return Optional.empty();
}

private static Optional<String> getCdcServerTimezone(final JsonNode config) {
final JsonNode replicationMethod = config.get("replication_method");
if (replicationMethod != null && replicationMethod.has("server_time_zone")) {
final String serverTimeZone = config.get("replication_method").get("server_time_zone").asText();
return Optional.of(serverTimeZone);
}
return Optional.empty();
}

public static void checkServerTimeZoneConfig(final JsonNode config) {
final Optional<String> serverTimeZone = getCdcServerTimezone(config);
if (serverTimeZone.isPresent()) {
final String timeZone = serverTimeZone.get();
if (!timeZone.isEmpty() && !ZoneId.getAvailableZoneIds().contains((timeZone))) {
throw new IllegalArgumentException(String.format("Given timezone %s is not valid. The given timezone must conform to the IANNA standard. "
+ "See https://www.iana.org/time-zones for more details", serverTimeZone.get()));
}
}
}

public static void checkFirstRecordWaitTime(final JsonNode config) {
// we need to skip the check because in tests, we set initial_waiting_seconds
// to 5 seconds for performance reasons, which is shorter than the minimum
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,9 +217,15 @@
"title": "Initial Waiting Time in Seconds (Advanced)",
"description": "The amount of time the connector will wait when it launches to determine if there is new data to sync or not. Defaults to 300 seconds. Valid range: 120 seconds to 1200 seconds. Read about <a href=\"https://docs.airbyte.com/integrations/sources/mysql/#change-data-capture-cdc\">initial waiting time</a>.",
"default": 300,
"order": 4,
"min": 120,
"max": 1200
"max": 1200,
"order": 1
},
"server_time_zone": {
"type": "string",
"title": "Configured server timezone for the MySQL source (Advanced)",
"description": "Enter the configured MySQL server timezone. This should only be done if the configured timezone in your MySQL instance does not conform to IANNA standard.",
"order": 2
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,18 @@ void testGetFirstRecordWaitTime() {
assertEquals(MAX_FIRST_RECORD_WAIT_TIME, CdcConfigurationHelper.getFirstRecordWaitTime(tooLongConfig));
}

@Test
void testServerTimeConfig() {
final JsonNode emptyConfig = Jsons.jsonNode(Collections.emptyMap());
assertDoesNotThrow(() -> CdcConfigurationHelper.checkServerTimeZoneConfig(emptyConfig));

final JsonNode normalConfig = Jsons.jsonNode(Map.of("replication_method",
Map.of("method", "CDC", "server_time_zone", "America/Los_Angeles")));
assertDoesNotThrow(() -> CdcConfigurationHelper.checkServerTimeZoneConfig(normalConfig));

final JsonNode invalidConfig = Jsons.jsonNode(Map.of("replication_method",
Map.of("method", "CDC", "server_time_zone", "CEST")));
assertThrows(IllegalArgumentException.class, () -> CdcConfigurationHelper.checkServerTimeZoneConfig(invalidConfig));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ private void init() {
final JsonNode replicationMethod = Jsons.jsonNode(ImmutableMap.builder()
.put("method", "CDC")
.put("initial_waiting_seconds", INITIAL_WAITING_SECONDS)
.put("time_zone", "America/Los_Angeles")
.build());

config = Jsons.jsonNode(ImmutableMap.builder()
Expand Down
12 changes: 12 additions & 0 deletions docs/integrations/sources/mysql.md
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,17 @@ The connector waits for the default initial wait time of 5 minutes (300 seconds)

If you know there are database changes to be synced, but the connector cannot read those changes, the root cause may be insufficient waiting time. In that case, you can increase the waiting time (example: set to 600 seconds) to test if it is indeed the root cause. On the other hand, if you know there are no database changes, you can decrease the wait time to speed up the zero record syncs.

**4. Set up server timezone\(Optional\)**

:::warning
This is an advanced feature. Use it if absolutely necessary.
:::

In CDC mode, the MySQl connector may need a timezone configured if the existing MySQL database been set up with a system timezone that is not recognized by the [IANA Timezone Database](https://www.iana.org/time-zones).

In this case, you can configure the server timezone to the equivalent IANA timezone compliant timezone. (e.g. CEST -> Europe/Berlin).


**Note**

When a sync runs for the first time using CDC, Airbyte performs an initial consistent snapshot of your database. Airbyte doesn't acquire any table locks \(for tables defined with MyISAM engine, the tables would still be locked\) while creating the snapshot to allow writes by other database clients. But in order for the sync to work without any error/unexpected behaviour, it is assumed that no schema changes are happening while the snapshot is running.
Expand Down Expand Up @@ -240,6 +251,7 @@ WHERE actor_definition_id ='435bb9a5-7887-4809-aa58-28c27df0d7ad' AND (configura
## Changelog
| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 1.0.4 | 2022-10-11 | [17815](https://github.com/airbytehq/airbyte/pull/17815) | Expose setting server timezone for CDC syncs |
| 1.0.3 | 2022-10-07 | [17236](https://github.com/airbytehq/airbyte/pull/17236) | Fix large table issue by fetch size |
| 1.0.2 | 2022-10-03 | [17170](https://github.com/airbytehq/airbyte/pull/17170) | Make initial CDC waiting time configurable |
| 1.0.1 | 2022-10-01 | [17459](https://github.com/airbytehq/airbyte/pull/17459) | Upgrade debezium version to 1.9.6 from 1.9.2 |
Expand Down

0 comments on commit e1701c9

Please sign in to comment.