diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/utils/ConvertTimeBceUtil.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/utils/ConvertTimeBceUtil.java new file mode 100644 index 00000000000..b6ac0b2265d --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/utils/ConvertTimeBceUtil.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.debezium.utils; + +import java.sql.Date; +import java.sql.Timestamp; +import java.time.LocalDate; +import java.time.chrono.IsoEra; + +/** Convert And Check TimeBce Util. */ +public class ConvertTimeBceUtil { + + private static final Date ONE_CE = Date.valueOf("0001-01-01"); + + public static String resolveEra(boolean isBce, String value) { + String mangledValue = value; + if (isBce) { + if (mangledValue.startsWith("-")) { + mangledValue = mangledValue.substring(1); + } + if (!mangledValue.endsWith(" BC")) { + mangledValue += " BC"; + } + } + return mangledValue; + } + + public static boolean isBce(LocalDate date) { + return date.getEra() == IsoEra.BCE; + } + + public static String resolveEra(LocalDate date, String value) { + return resolveEra(isBce(date), value); + } + + public static String resolveEra(Date date, String value) { + return resolveEra(date.before(ONE_CE), value); + } + + public static String resolveEra(Timestamp timestamp, String value) { + return resolveEra(timestamp.before(ONE_CE), value); + } +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/converters/MysqlDebeziumTimeConverter.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/converters/MysqlDebeziumTimeConverter.java new file mode 100644 index 00000000000..493fd682c20 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/converters/MysqlDebeziumTimeConverter.java @@ -0,0 +1,326 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.debezium.connector.mysql.converters; + +import org.apache.flink.cdc.debezium.utils.ConvertTimeBceUtil; + +import io.debezium.spi.converter.CustomConverter; +import io.debezium.spi.converter.RelationalColumn; +import io.debezium.time.Conversions; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.OffsetDateTime; +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.util.Arrays; +import java.util.Locale; +import java.util.Properties; +import java.util.concurrent.TimeUnit; + +/** + * Debezium converts the datetime type in MySQL into a UTC timestamp by default ({@link + * io.debezium.time.Timestamp} ),The time zone is hard-coded and cannot be changed. causing + * conversion errors part of the time Enable this converter to convert the four times "DATE", + * "DATETIME", "TIME", and "TIMESTAMP" into the format corresponding to the configured time zone + * (for example, yyyy-MM-dd) + * + * @see io.debezium.connector.mysql.converters.TinyIntOneToBooleanConverter + */ +public class MysqlDebeziumTimeConverter + implements CustomConverter { + + private static final Logger log = LoggerFactory.getLogger(MysqlDebeziumTimeConverter.class); + + private static boolean loggedUnknownTimestampClass = false; + private static boolean loggedUnknownDateClass = false; + private static boolean loggedUnknownTimeClass = false; + private static boolean loggedUnknownTimestampWithTimeZoneClass = false; + + private final String[] DATE_TYPES = {"DATE", "DATETIME", "TIME", "TIMESTAMP"}; + + protected static final String DATE_FORMAT = "yyyy-MM-dd"; + protected static final String TIME_FORMAT = "yyyy-MM-dd HH:mm:ss"; + protected static final String DATETIME_FORMAT = "yyyy-MM-dd HH:mm:ss"; + protected ZoneId zoneId; + protected static final String DEFAULT_DATE_FORMAT_PATTERN = "1970-01-01 00:00:00"; + protected DateTimeFormatter dateFormatter; + protected DateTimeFormatter timeFormatter; + protected DateTimeFormatter datetimeFormatter; + protected DateTimeFormatter timestampFormatter; + protected String schemaNamePrefix; + protected static DateTimeFormatter originalFormat = + DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + protected Boolean parseNullDefaultValue = true; + + @Override + public void configure(Properties properties) { + String dateFormat = properties.getProperty("format.date", DATE_FORMAT); + String timeFormat = properties.getProperty("format.time", TIME_FORMAT); + String datetimeFormat = properties.getProperty("format.datetime", DATETIME_FORMAT); + String timestampFormat = properties.getProperty("format.timestamp", DATETIME_FORMAT); + this.parseNullDefaultValue = + Boolean.parseBoolean( + properties.getProperty("format.default.value.convert", "true")); + String className = this.getClass().getName(); + this.schemaNamePrefix = properties.getProperty("schema.name.prefix", className + ".mysql"); + this.dateFormatter = DateTimeFormatter.ofPattern(dateFormat); + this.timeFormatter = DateTimeFormatter.ofPattern(timeFormat); + this.datetimeFormatter = DateTimeFormatter.ofPattern(datetimeFormat); + this.timestampFormatter = DateTimeFormatter.ofPattern(timestampFormat); + this.zoneId = + ZoneId.of( + properties.getProperty( + "format.timezone", ZoneId.systemDefault().toString())); + } + + @Override + public void converterFor( + final RelationalColumn field, final ConverterRegistration registration) { + if (Arrays.stream(DATE_TYPES).anyMatch(s -> s.equalsIgnoreCase(field.typeName()))) { + registerDateConverter(field, registration); + } + } + + private void registerDateConverter( + final RelationalColumn field, final ConverterRegistration registration) { + String columnType = field.typeName().toUpperCase(); + String schemaName = this.schemaNamePrefix + "." + columnType.toLowerCase(); + registration.register( + SchemaBuilder.string().name(schemaName).optional(), + value -> { + log.debug( + "find schema need to change dateType, field name:{} ,field type:{} ,field value:{} ,field " + + "default:{}", + field.name(), + columnType, + value == null ? "null" : value, + field.hasDefaultValue() ? field.defaultValue() : "null"); + if (value == null) { + return convertDateDefaultValue(field); + } + switch (columnType.toUpperCase(Locale.ROOT)) { + case "DATE": + if (value instanceof Integer) { + return this.convertToDate( + columnType, LocalDate.ofEpochDay((Integer) value)); + } + return this.convertToDate(columnType, value); + case "TIME": + if (value instanceof Long) { + long l = + Math.multiplyExact( + (Long) value, TimeUnit.MICROSECONDS.toNanos(1)); + return this.convertToTime(columnType, LocalTime.ofNanoOfDay(l)); + } + return this.convertToTime(columnType, value); + case "DATETIME": + if (value instanceof Long) { + if (getTimePrecision(field) <= 3) { + return this.convertToTimestamp( + columnType, + Conversions.toInstantFromMillis((Long) value)); + } + if (getTimePrecision(field) <= 6) { + return this.convertToTimestamp( + columnType, + Conversions.toInstantFromMicros((Long) value)); + } + } + return this.convertToTimestamp(columnType, value); + case "TIMESTAMP": + return this.convertToTimestampWithTimezone(columnType, value); + default: + throw new IllegalArgumentException( + "Unknown field type " + columnType.toUpperCase(Locale.ROOT)); + } + }); + } + + private Object convertToTimestampWithTimezone(String columnType, Object timestamp) { + // In snapshot mode, debezium produces a java.sql.Timestamp object for the TIMESTAMPTZ type. + // Conceptually, a timestamp with timezone is an Instant. But t.toInstant() actually + // mangles the value for ancient dates, because leap years weren't applied consistently in + // ye olden days. Additionally, toInstant() (and toLocalDateTime()) actually lose the era + // indicator, + // so we can't rely on their getEra() methods. + // So we have special handling for this case, which sidesteps the toInstant conversion. + if (timestamp instanceof Timestamp) { + Timestamp value = (Timestamp) timestamp; + ZonedDateTime zonedDateTime = value.toInstant().atZone(zoneId); + return ConvertTimeBceUtil.resolveEra(value, zonedDateTime.format(timestampFormatter)); + } else if (timestamp instanceof OffsetDateTime) { + OffsetDateTime value = (OffsetDateTime) timestamp; + return ConvertTimeBceUtil.resolveEra( + value.toLocalDate(), value.format(timestampFormatter)); + } else if (timestamp instanceof ZonedDateTime) { + ZonedDateTime zonedDateTime = (ZonedDateTime) timestamp; + return ConvertTimeBceUtil.resolveEra( + zonedDateTime.toLocalDate(), zonedDateTime.format(timestampFormatter)); + } else if (timestamp instanceof Instant) { + OffsetDateTime dateTime = OffsetDateTime.ofInstant((Instant) timestamp, zoneId); + ZonedDateTime timestampZt = ZonedDateTime.from(dateTime); + LocalDate localDate = timestampZt.toLocalDate(); + return ConvertTimeBceUtil.resolveEra(localDate, timestampZt.format(timestampFormatter)); + } else { + if (!loggedUnknownTimestampWithTimeZoneClass) { + printUnknownDateClassLogs(columnType, timestamp); + loggedUnknownTimestampWithTimeZoneClass = true; + } + // If init 1970-01-01T00:00:00Zd need to change + Instant instant = Instant.parse(timestamp.toString()); + OffsetDateTime dateTime = OffsetDateTime.ofInstant(instant, zoneId); + ZonedDateTime timestampZt = ZonedDateTime.from(dateTime); + LocalDate localDate = timestampZt.toLocalDate(); + return ConvertTimeBceUtil.resolveEra(localDate, timestampZt.format(timestampFormatter)); + } + } + + private Object convertToTimestamp(String columnType, Object timestamp) { + if (timestamp instanceof Timestamp) { + // Snapshot mode + LocalDateTime localDateTime = ((Timestamp) timestamp).toLocalDateTime(); + return ConvertTimeBceUtil.resolveEra( + ((Timestamp) timestamp), localDateTime.format(datetimeFormatter)); + } else if (timestamp instanceof Instant) { + // Incremental mode + Instant time = (Instant) timestamp; + ZonedDateTime zonedDateTime = time.atZone(zoneId); + return ConvertTimeBceUtil.resolveEra( + zonedDateTime.toLocalDate(), + time.atOffset(zonedDateTime.getOffset()) + .toLocalDateTime() + .format(datetimeFormatter)); + } else if (timestamp instanceof LocalDateTime) { + LocalDateTime dateTime = (LocalDateTime) timestamp; + LocalDate localDateTime = dateTime.toLocalDate(); + return ConvertTimeBceUtil.resolveEra(localDateTime, dateTime.format(datetimeFormatter)); + } + if (!loggedUnknownTimestampClass) { + printUnknownDateClassLogs(columnType, timestamp); + loggedUnknownTimestampClass = true; + } + LocalDateTime localDateTime = LocalDateTime.parse(timestamp.toString()); + LocalDate localDate = localDateTime.toLocalDate(); + return ConvertTimeBceUtil.resolveEra(localDate, localDateTime.format(datetimeFormatter)); + } + + private Object convertToTime(String columnType, Object time) { + if (time instanceof Time) { + return formatTime(((Time) time).toLocalTime()); + } else if (time instanceof LocalTime) { + return formatTime((LocalTime) time); + } else if (time instanceof java.time.Duration) { + long value = ((java.time.Duration) time).toNanos(); + if (value >= 0 && value < TimeUnit.DAYS.toNanos(1)) { + return formatTime(LocalTime.ofNanoOfDay(value)); + } else { + long updatedValue = Math.min(Math.abs(value), LocalTime.MAX.toNanoOfDay()); + log.debug( + "Time values must use number of nanoseconds greater than 0 and less than 86400000000000 but its {}, " + + "converting to {} ", + value, + updatedValue); + return formatTime(LocalTime.ofNanoOfDay(updatedValue)); + } + } else { + if (!loggedUnknownTimeClass) { + printUnknownDateClassLogs(columnType, time); + loggedUnknownTimeClass = true; + } + + String valueAsString = time.toString(); + if (valueAsString.startsWith("24")) { + log.debug("Time value {} is above range, converting to 23:59:59", valueAsString); + return LocalTime.MAX.toString(); + } + return formatTime(LocalTime.parse(valueAsString)); + } + } + + private String formatTime(LocalTime localTime) { + return localTime.format(timeFormatter); + } + + private int getTimePrecision(final RelationalColumn field) { + return field.length().orElse(-1); + } + + private String convertToDate(String columnType, Object date) { + if (date instanceof Date) { + // Snapshot mode + LocalDate localDate = ((Date) date).toLocalDate(); + return ConvertTimeBceUtil.resolveEra(localDate, localDate.format(dateFormatter)); + } else if (date instanceof LocalDate) { + // Incremental mode + return dateFormatter.format((LocalDate) date); + } else if (date instanceof LocalDateTime) { + return datetimeFormatter.format((LocalDateTime) date); + } else if (date instanceof Integer) { + return LocalDate.ofEpochDay(((Integer) date).longValue()).format(dateFormatter); + } else { + if (!loggedUnknownDateClass) { + printUnknownDateClassLogs(columnType, date); + loggedUnknownDateClass = true; + } + LocalDate localDate = LocalDate.parse(date.toString()); + return ConvertTimeBceUtil.resolveEra(localDate, localDate.format(dateFormatter)); + } + } + + public Object convertDateDefaultValue(RelationalColumn field) { + if (field.isOptional()) { + return null; + } else if (field.hasDefaultValue()) { + // There is an extreme case where the field defaultValue is 0, resulting in a Kafka + // Schema mismatch + if (parseNullDefaultValue) { + LocalDateTime dateTime = + LocalDateTime.parse(DEFAULT_DATE_FORMAT_PATTERN, originalFormat); + String columnType = field.typeName().toUpperCase(); + switch (columnType.toUpperCase(Locale.ROOT)) { + case "DATE": + return dateTime.format(dateFormatter); + case "DATETIME": + return dateTime.format(datetimeFormatter); + case "TIME": + return dateTime.format(timeFormatter); + case "TIMESTAMP": + return dateTime.format(timestampFormatter); + } + } + } + return null; + } + + private static void printUnknownDateClassLogs(String type, Object value) { + log.warn( + "MySql Date Convert Database type : {} Unknown class for Date data type {}", + type, + value.getClass()); + } +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/converters/MysqlDebeziumTimeConverterITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/converters/MysqlDebeziumTimeConverterITCase.java new file mode 100644 index 00000000000..d14f31f9787 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/converters/MysqlDebeziumTimeConverterITCase.java @@ -0,0 +1,298 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.mysql.debezium.converters; + +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.cdc.connectors.mysql.MySqlValidatorTest; +import org.apache.flink.cdc.connectors.mysql.source.MySqlSource; +import org.apache.flink.cdc.connectors.mysql.source.MySqlSourceBuilder; +import org.apache.flink.cdc.connectors.mysql.table.StartupOptions; +import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer; +import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion; +import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase; +import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.TableResult; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.types.Row; +import org.apache.flink.util.CloseableIterator; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + +import io.debezium.connector.mysql.converters.MysqlDebeziumTimeConverter; +import io.debezium.data.Envelope; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.rules.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.lifecycle.Startables; + +import java.io.File; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; +import java.time.ZoneId; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; +import java.util.Properties; +import java.util.UUID; +import java.util.stream.Stream; + +import static java.lang.String.format; +import static org.apache.flink.cdc.connectors.mysql.source.MySqlSourceTestBase.assertEqualsInAnyOrder; + +/** Test for {@link MysqlDebeziumTimeConverter}. */ +public class MysqlDebeziumTimeConverterITCase { + + private static TemporaryFolder tempFolder; + private static File resourceFolder; + + private final StreamExecutionEnvironment env = + StreamExecutionEnvironment.getExecutionEnvironment(); + + private static final Logger LOG = + LoggerFactory.getLogger(MysqlDebeziumTimeConverterITCase.class); + + @Rule public final Timeout timeoutPerTest = Timeout.seconds(300); + + @Before + public void setup() throws Exception { + resourceFolder = + Paths.get( + Objects.requireNonNull( + MySqlValidatorTest.class + .getClassLoader() + .getResource(".")) + .toURI()) + .toFile(); + tempFolder = new TemporaryFolder(resourceFolder); + tempFolder.create(); + env.setParallelism(1); + } + + @Test + public void testReadDateConvertDataStreamInJvmTime() throws Exception { + testReadDateConvertDataStreamSource(ZoneId.systemDefault().toString()); + } + + @Test + public void testReadDateConvertDataStreamInAsia() throws Exception { + testReadDateConvertDataStreamSource("Asia/Shanghai"); + } + + @Test + public void testReadDateConvertDataStreamInBerlin() throws Exception { + testReadDateConvertDataStreamSource("Europe/Berlin"); + } + + @Test + public void testReadDateConvertSQLSourceInAsia() throws Exception { + testTemporalTypesWithMySqlServerTimezone("Asia/Shanghai"); + } + + @Test + public void testReadDateConvertSQLSourceInBerlin() throws Exception { + testTemporalTypesWithMySqlServerTimezone("Europe/Berlin"); + } + + private void testReadDateConvertDataStreamSource(String timezone) throws Exception { + MySqlContainer mySqlContainer = createMySqlContainer(timezone); + startContainers(mySqlContainer, timezone); + UniqueDatabase db = getUniqueDatabase(mySqlContainer); + db.createAndInitialize(); + env.enableCheckpointing(1000L); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + MySqlSourceBuilder builder = + MySqlSource.builder() + .hostname(db.getHost()) + .port(db.getDatabasePort()) + .databaseList(db.getDatabaseName()) + .tableList(db.getDatabaseName() + ".date_convert_test") + .startupOptions(StartupOptions.initial()) + .serverTimeZone(timezone) + .username(db.getUsername()) + .password(db.getPassword()) + .debeziumProperties(getDebeziumConfigurations(timezone)); + builder.deserializer(new JsonDebeziumDeserializationSchema()); + DataStreamSource convertDataStreamSource = + env.fromSource( + builder.build(), + WatermarkStrategy.noWatermarks(), + "testDataStreamSourceConvertData"); + List result = convertDataStreamSource.executeAndCollect(3); + validTimestampValue(result); + } + + private void validTimestampValue(List result) throws JsonProcessingException { + ObjectMapper mapper = new ObjectMapper(); + String[] timestampValues = new String[] {"14:23:00", "00:00:00", "00:00:00"}; + for (String after : result) { + JsonNode jsonNode = mapper.readTree(after); + Assert.assertEquals( + timestampValues[jsonNode.get(Envelope.FieldName.AFTER).get("id").asInt() - 1], + jsonNode.get("after").get("test_timestamp").asText()); + } + } + + private void testTemporalTypesWithMySqlServerTimezone(String timezone) throws Exception { + MySqlContainer mySqlContainer = createMySqlContainer(timezone); + startContainers(mySqlContainer, timezone); + UniqueDatabase db = getUniqueDatabase(mySqlContainer); + db.createAndInitialize(); + env.enableCheckpointing(1000L); + StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); + String sourceDDL = + format( + "CREATE TABLE customers (" + + " id BIGINT NOT NULL," + + " test_timestamp STRING," + + " test_datetime STRING," + + " test_date STRING," + + " test_time STRING, " + + "primary key (id) not enforced" + + ") WITH (" + + " 'connector' = 'mysql-cdc'," + + " 'scan.incremental.snapshot.enabled' = 'true'," + + " 'hostname' = '%s'," + + " 'port' = '%s'," + + " 'username' = '%s'," + + " 'password' = '%s'," + + " 'database-name' = '%s'," + + " 'table-name' = 'date_convert_test'," + + " 'scan.startup.mode' = '%s'," + + " 'server-time-zone' = '%s'," + + " 'debezium.converters' = 'datetime'," + + " 'debezium.datetime.type' = '%s'," + + " 'debezium.database.connectionTimeZone' = '%s'," + + " 'debezium.datetime.format.time' = 'HH:mm:ss'," + + " 'debezium.datetime.format.timezone' = '%s'," + + " 'debezium.datetime.format.timestamp' = 'HH:mm:ss'," + + " 'debezium.datetime.format.default.value.convert' = 'true'" + + ")", + mySqlContainer.getHost(), + mySqlContainer.getDatabasePort(), + db.getUsername(), + db.getPassword(), + db.getDatabaseName(), + "initial", + timezone, + MysqlDebeziumTimeConverter.class.getName(), + timezone, + timezone); + tEnv.executeSql(sourceDDL); + TableResult tableResult = tEnv.executeSql("select * from customers"); + checkData(tableResult); + } + + private Properties getDebeziumConfigurations(String timezone) { + Properties debeziumProperties = new Properties(); + // set properties + debeziumProperties.setProperty("converters", "datetime"); + debeziumProperties.setProperty("datetime.type", MysqlDebeziumTimeConverter.class.getName()); + debeziumProperties.setProperty("datetime.format.timestamp", "HH:mm:ss"); + debeziumProperties.setProperty("datetime.format.default.value.convert", "false"); + // If not set time convert maybe error + debeziumProperties.setProperty("database.connectionTimeZone", timezone); + debeziumProperties.setProperty("datetime.format.timezone", timezone); + LOG.info("Supplied debezium properties: {}", debeziumProperties); + return debeziumProperties; + } + + private void checkData(TableResult tableResult) { + String[] snapshotForSingleTable = + new String[] { + "+I[1, 14:23:00, 2023-04-01 14:24:00, 2023-04-01, 14:25:00]", + "+I[3, 00:00:00, null, null, 00:01:20]", + "+I[2, 00:00:00, null, null, 00:00:00]" + }; + + List expectedSnapshotData = new ArrayList<>(Arrays.asList(snapshotForSingleTable)); + CloseableIterator collect = tableResult.collect(); + tableResult.getJobClient().get().getJobID(); + assertEqualsInAnyOrder( + expectedSnapshotData, fetchRows(collect, expectedSnapshotData.size())); + } + + private static List fetchRows(Iterator iter, int size) { + List rows = new ArrayList<>(size); + while (size > 0 && iter.hasNext()) { + Row row = iter.next(); + rows.add(row.toString()); + size--; + } + return rows; + } + + protected MySqlContainer createMySqlContainer(String timezone) { + return (MySqlContainer) + new MySqlContainer(MySqlVersion.V5_7) + .withConfigurationOverride(buildMySqlConfigWithTimezone(timezone)) + .withSetupSQL("docker/setup.sql") + .withDatabaseName("flink-test") + .withUsername("flinkuser") + .withPassword("flinkpw") + .withLogConsumer(new Slf4jLogConsumer(LOG)); + } + + protected void startContainers(MySqlContainer mySqlContainer, String timezone) { + LOG.info("Starting containers with timezone {} ...", timezone); + Startables.deepStart(Stream.of(mySqlContainer)).join(); + LOG.info("Containers are started."); + LOG.info("JVM System Clock Zone Id : {}", ZoneId.systemDefault()); + } + + protected UniqueDatabase getUniqueDatabase(MySqlContainer mySqlContainer) { + return new UniqueDatabase(mySqlContainer, "date_convert_test", "mysqluser", "mysqlpw"); + } + + private String buildMySqlConfigWithTimezone(String timezone) { + try { + File folder = tempFolder.newFolder(String.valueOf(UUID.randomUUID())); + Path cnf = Files.createFile(Paths.get(folder.getPath(), "my.cnf")); + String mysqldConf = + "[mysqld]\n" + + "binlog_format = row\n" + + "log_bin = mysql-bin\n" + + "server-id = 223344\n" + + "binlog_row_image = FULL\n"; + String timezoneConf = "default-time_zone = '" + timezone + "'\n"; + Files.write( + cnf, + Collections.singleton(mysqldConf + timezoneConf), + StandardCharsets.UTF_8, + StandardOpenOption.APPEND); + return Paths.get(resourceFolder.getAbsolutePath()).relativize(cnf).toString(); + } catch (Exception e) { + throw new RuntimeException("Failed to create my.cnf file.", e); + } + } +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/ddl/date_convert_test.sql b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/ddl/date_convert_test.sql new file mode 100644 index 00000000000..262c1ceb19f --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/ddl/date_convert_test.sql @@ -0,0 +1,36 @@ +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the `License`); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an `AS IS` BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. + +-- ---------------------------------------------------------------------------------------------------------------- +-- DATABASE: column_type_test +-- ---------------------------------------------------------------------------------------------------------------- + +-- datetime NOT NULL + +CREATE TABLE date_convert_test +( + `id` bigint NOT NULL AUTO_INCREMENT, + `test_timestamp` timestamp NULL, + `test_datetime` datetime NULL, + `test_date` date DEFAULT NULL, + `test_time` time DEFAULT NULL, + PRIMARY KEY (`id`) +) DEFAULT CHARSET=utf8; + +INSERT INTO date_convert_test (id,test_timestamp, test_datetime, test_date, test_time) +VALUES +(1,'2023-04-01 14:23:00', '2023-04-01 14:24:00', '2023-04-01', '14:25:00'), +(2,'2024-04-23 00:00:00', DEFAULT, NULL ,'00:00:00'), +(3,'2024-04-23 00:00:00', DEFAULT, NULL ,120); \ No newline at end of file