From d6c09a6a909fcf10a5ba368cbb84bf8d594b929c Mon Sep 17 00:00:00 2001 From: yuxiqian <34335406+yuxiqian@users.noreply.github.com> Date: Wed, 12 Feb 2025 09:24:44 +0800 Subject: [PATCH] [hotfix][tests] Fix MySQL ITCase failure in non-UTC timezone --- .../MysqlDebeziumTimeConverterITCase.java | 5 + .../MySqlMultipleTablesRenamingITCase.java | 110 +++--------------- .../mysql/source/MySqlSourceITCase.java | 1 + .../source/SpecificStartingOffsetITCase.java | 5 + .../mysql/table/MySqlTimezoneITCase.java | 5 + 5 files changed, 34 insertions(+), 92 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/converters/MysqlDebeziumTimeConverterITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/converters/MysqlDebeziumTimeConverterITCase.java index 8adb92454ae..599c8ed84e3 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/converters/MysqlDebeziumTimeConverterITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/converters/MysqlDebeziumTimeConverterITCase.java @@ -276,6 +276,11 @@ protected UniqueDatabase getUniqueDatabase(MySqlContainer mySqlContainer) { } private String buildMySqlConfigWithTimezone(String timezone) { + // JVM timezone is in "GMT+XX:XX" or "GMT-XX:XX" format + // while MySQL configuration file requires "+XX:XX" or "-XX:XX" + if (timezone.startsWith("GMT")) { + timezone = timezone.substring(3); + } try { File folder = tempFolder.newFolder(String.valueOf(UUID.randomUUID())); Path cnf = Files.createFile(Paths.get(folder.getPath(), "my.cnf")); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlMultipleTablesRenamingITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlMultipleTablesRenamingITCase.java index e8f4ac8c24a..b4cb578ff29 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlMultipleTablesRenamingITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlMultipleTablesRenamingITCase.java @@ -21,7 +21,6 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.cdc.common.utils.TestCaseUtils; import org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils; -import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer; import org.apache.flink.cdc.connectors.mysql.testutils.TestTable; import org.apache.flink.cdc.connectors.mysql.testutils.TestTableSchemas; import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase; @@ -38,61 +37,34 @@ import org.apache.flink.streaming.api.operators.collect.CollectSinkOperator; import org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory; import org.apache.flink.streaming.api.operators.collect.CollectStreamSink; -import org.apache.flink.test.junit5.MiniClusterExtension; -import org.apache.flink.util.FlinkRuntimeException; import io.debezium.connector.mysql.MySqlConnection; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.RegisterExtension; -import org.junit.rules.TemporaryFolder; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.testcontainers.containers.output.Slf4jLogConsumer; -import java.io.File; import java.lang.reflect.Field; -import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; -import java.nio.file.Paths; -import java.nio.file.StandardOpenOption; -import java.time.ZoneId; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Properties; +import java.util.Random; import java.util.UUID; /** * Integration tests for handling schema changes regard to renaming multiple tables within a single * statement. */ -public class MySqlMultipleTablesRenamingITCase { +public class MySqlMultipleTablesRenamingITCase extends MySqlSourceTestBase { private static final Logger LOG = LoggerFactory.getLogger(MySqlMultipleTablesRenamingITCase.class); - @RegisterExtension static MiniClusterExtension miniCluster = new MiniClusterExtension(); - - @SuppressWarnings("unchecked") - private static final MySqlContainer MYSQL_CONTAINER = - (MySqlContainer) - new MySqlContainer() - .withConfigurationOverride( - buildMySqlConfigWithTimezone( - getResourceFolder(), getSystemTimeZone())) - .withSetupSQL("docker/setup.sql") - .withDatabaseName("flink-test") - .withUsername("flinkuser") - .withPassword("flinkpw") - .withLogConsumer(new Slf4jLogConsumer(LOG)); private final UniqueDatabase customDatabase = new UniqueDatabase(MYSQL_CONTAINER, "customer", "mysqluser", "mysqlpw"); @@ -101,25 +73,15 @@ public class MySqlMultipleTablesRenamingITCase { private MySqlConnection connection; - @BeforeAll - public static void before() throws Exception { - MYSQL_CONTAINER.start(); - } - - @AfterAll - public static void after() throws Exception { - MYSQL_CONTAINER.stop(); - } - - @BeforeEach - void prepare() throws Exception { + @Before + public void prepare() throws Exception { connection = getConnection(); customDatabase.createAndInitialize(); flushLogs(); } - @AfterEach - void tearDown() throws Exception { + @After + public void tearDown() throws Exception { customDatabase.dropDatabase(); connection.close(); } @@ -146,7 +108,7 @@ void tearDown() throws Exception { * during schema updates. */ @Test - void testRenameTablesWithinSingleStatement() throws Exception { + public void testRenameTablesWithinSingleStatement() throws Exception { // Build Flink job StreamExecutionEnvironment env = getExecutionEnvironment(); MySqlSource source = getSourceBuilder().build(); @@ -269,6 +231,8 @@ private MySqlSourceBuilder getSourceBuilder() { .password(customDatabase.getPassword()) .databaseList(customDatabase.getDatabaseName()) .tableList(customers.getTableId()) + .serverId(getServerId()) + .serverTimeZone("UTC") .deserializer(new JsonDebeziumDeserializationSchema()); } @@ -323,50 +287,6 @@ private static List fetchRow(Iterator iter, int size) { return rows; } - private static String buildMySqlConfigWithTimezone(File resourceDirectory, String timezone) { - try { - TemporaryFolder tempFolder = new TemporaryFolder(resourceDirectory); - tempFolder.create(); - File folder = tempFolder.newFolder(String.valueOf(UUID.randomUUID())); - Path cnf = Files.createFile(Paths.get(folder.getPath(), "my.cnf")); - String mysqldConf = - "[mysqld]\n" - + "binlog_format = row\n" - + "log_bin = mysql-bin\n" - + "server-id = 223344\n" - + "binlog_row_image = FULL\n" - + "gtid_mode = on\n" - + "enforce_gtid_consistency = on\n"; - String timezoneConf = "default-time_zone = '" + timezone + "'\n"; - Files.write( - cnf, - Collections.singleton(mysqldConf + timezoneConf), - StandardCharsets.UTF_8, - StandardOpenOption.APPEND); - return Paths.get(resourceDirectory.getAbsolutePath()).relativize(cnf).toString(); - } catch (Exception e) { - throw new RuntimeException("Failed to create my.cnf file.", e); - } - } - - private static File getResourceFolder() { - try { - return Paths.get( - Objects.requireNonNull( - SpecificStartingOffsetITCase.class - .getClassLoader() - .getResource(".")) - .toURI()) - .toFile(); - } catch (Exception e) { - throw new FlinkRuntimeException("Get Resource File Directory fail"); - } - } - - private static String getSystemTimeZone() { - return ZoneId.systemDefault().toString(); - } - private void setupSavepoint(StreamExecutionEnvironment env, String savepointPath) throws Exception { // restore from savepoint @@ -392,4 +312,10 @@ private StreamExecutionEnvironment getExecutionEnvironment() { env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); return env; } + + private String getServerId() { + final Random random = new Random(); + int serverId = random.nextInt(100) + 5400; + return serverId + "-" + (serverId + DEFAULT_PARALLELISM); + } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceITCase.java index 7be090a8fef..2e73e9363ec 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceITCase.java @@ -714,6 +714,7 @@ public void testSourceMetrics() throws Exception { .password(customDatabase.getPassword()) .deserializer(new StringDebeziumDeserializationSchema()) .serverId(getServerId()) + .serverTimeZone("UTC") .build(); DataStreamSource stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "MySQL CDC Source"); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/SpecificStartingOffsetITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/SpecificStartingOffsetITCase.java index 7a178842e76..fe454154ba8 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/SpecificStartingOffsetITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/SpecificStartingOffsetITCase.java @@ -512,6 +512,11 @@ private List fetchRowData( } private static String buildMySqlConfigWithTimezone(File resourceDirectory, String timezone) { + // JVM timezone is in "GMT+XX:XX" or "GMT-XX:XX" format + // while MySQL configuration file requires "+XX:XX" or "-XX:XX" + if (timezone.startsWith("GMT")) { + timezone = timezone.substring(3); + } try { TemporaryFolder tempFolder = new TemporaryFolder(resourceDirectory); tempFolder.create(); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTimezoneITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTimezoneITCase.java index 7d994e8001d..7778d0e7f6a 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTimezoneITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTimezoneITCase.java @@ -239,6 +239,11 @@ private static List fetchRows(Iterator iter, int size) { } private String buildMySqlConfigWithTimezone(String timezone) { + // JVM timezone is in "GMT+XX:XX" or "GMT-XX:XX" format + // while MySQL configuration file requires "+XX:XX" or "-XX:XX" + if (timezone.startsWith("GMT")) { + timezone = timezone.substring(3); + } try { File folder = tempFolder.newFolder(String.valueOf(UUID.randomUUID())); Path cnf = Files.createFile(Paths.get(folder.getPath(), "my.cnf"));