From ecd20b11b2ab57a5a005cb66a3d821d9ac015984 Mon Sep 17 00:00:00 2001 From: yuxiqian <34335406+yuxiqian@users.noreply.github.com> Date: Tue, 12 Nov 2024 14:35:01 +0800 Subject: [PATCH] tests: add mysql-source it case for parallelized schema evolution Signed-off-by: yuxiqian <34335406+yuxiqian@users.noreply.github.com> --- .../pom.xml | 14 ++ .../MySqlParallelizedPipelineITCase.java | 207 ++++++++++++++++++ .../mysql/testutils/MySqSourceTestUtils.java | 17 ++ .../cdc/pipeline/tests/RouteE2eITCase.java | 7 +- 4 files changed, 242 insertions(+), 3 deletions(-) create mode 100644 flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlParallelizedPipelineITCase.java diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/pom.xml b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/pom.xml index 944175e52a0..155c8240dc5 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/pom.xml +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/pom.xml @@ -46,6 +46,20 @@ limitations under the License. test-jar + + org.apache.flink + flink-cdc-composer + ${project.version} + test + + + + org.apache.flink + flink-cdc-pipeline-connector-values + ${project.version} + test + + org.apache.flink flink-connector-test-util diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlParallelizedPipelineITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlParallelizedPipelineITCase.java new file mode 100644 index 00000000000..4bd9806a580 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlParallelizedPipelineITCase.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.mysql.source; + +import org.apache.flink.cdc.common.configuration.Configuration; +import org.apache.flink.cdc.common.pipeline.PipelineOptions; +import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior; +import org.apache.flink.cdc.composer.PipelineExecution; +import org.apache.flink.cdc.composer.definition.PipelineDef; +import org.apache.flink.cdc.composer.definition.SinkDef; +import org.apache.flink.cdc.composer.definition.SourceDef; +import org.apache.flink.cdc.composer.flink.FlinkPipelineComposer; +import org.apache.flink.cdc.connectors.mysql.factory.MySqlDataSourceFactory; +import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase; +import org.apache.flink.cdc.connectors.values.ValuesDatabase; +import org.apache.flink.cdc.connectors.values.factory.ValuesDataFactory; +import org.apache.flink.cdc.connectors.values.sink.ValuesDataSinkOptions; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; +import java.time.Duration; +import java.util.Collections; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +import static org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils.TEST_PASSWORD; +import static org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils.TEST_USER; +import static org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils.getServerId; +import static org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils.loopCheck; +import static org.apache.flink.configuration.CoreOptions.ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL; +import static org.assertj.core.api.Assertions.assertThat; + +/** Parallelized Integration test for MySQL connector. */ +public class MySqlParallelizedPipelineITCase extends MySqlSourceTestBase { + + private static final int PARALLELISM = 4; + private static final int TEST_TABLE_NUMBER = 100; + + // Always use parent-first classloader for CDC classes. + // The reason is that ValuesDatabase uses static field for holding data, we need to make sure + // the class is loaded by AppClassloader so that we can verify data in the test case. + private static final org.apache.flink.configuration.Configuration MINI_CLUSTER_CONFIG = + new org.apache.flink.configuration.Configuration(); + + static { + MINI_CLUSTER_CONFIG.set( + ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL, + Collections.singletonList("org.apache.flink.cdc")); + } + + private final PrintStream standardOut = System.out; + private final ByteArrayOutputStream outCaptor = new ByteArrayOutputStream(); + + private final UniqueDatabase parallelismDatabase = + new UniqueDatabase( + MYSQL_CONTAINER, "extreme_parallelism_test_database", TEST_USER, TEST_PASSWORD); + + @Before + public void init() { + // Take over STDOUT as we need to check the output of values sink + System.setOut(new PrintStream(outCaptor)); + // Initialize in-memory database + ValuesDatabase.clear(); + } + + @After + public void cleanup() { + System.setOut(standardOut); + } + + @Test + public void testExtremeParallelizedSchemaChange() throws Exception { + final String databaseName = parallelismDatabase.getDatabaseName(); + try (Connection conn = + DriverManager.getConnection( + MYSQL_CONTAINER.getJdbcUrl(), TEST_USER, TEST_PASSWORD); + Statement stat = conn.createStatement()) { + stat.execute(String.format("CREATE DATABASE %s;", databaseName)); + stat.execute(String.format("USE %s;", databaseName)); + for (int i = 1; i <= TEST_TABLE_NUMBER; i++) { + stat.execute(String.format("DROP TABLE IF EXISTS TABLE%d;", i)); + stat.execute( + String.format( + "CREATE TABLE TABLE%d (ID INT NOT NULL PRIMARY KEY,VERSION VARCHAR(17));", + i)); + stat.execute(String.format("INSERT INTO TABLE%d VALUES (%d, 'No.%d');", i, i, i)); + } + } catch (SQLException e) { + LOG.error("Initialize table failed.", e); + throw e; + } + LOG.info("Table initialized successfully."); + + FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster(); + + // Setup MySQL source + Configuration sourceConfig = new Configuration(); + sourceConfig.set(MySqlDataSourceOptions.HOSTNAME, MYSQL_CONTAINER.getHost()); + sourceConfig.set(MySqlDataSourceOptions.PORT, MYSQL_CONTAINER.getDatabasePort()); + sourceConfig.set(MySqlDataSourceOptions.USERNAME, TEST_USER); + sourceConfig.set(MySqlDataSourceOptions.PASSWORD, TEST_PASSWORD); + sourceConfig.set(MySqlDataSourceOptions.SERVER_TIME_ZONE, "UTC"); + sourceConfig.set(MySqlDataSourceOptions.TABLES, "\\.*.\\.*"); + sourceConfig.set(MySqlDataSourceOptions.SERVER_ID, getServerId(PARALLELISM)); + + SourceDef sourceDef = + new SourceDef(MySqlDataSourceFactory.IDENTIFIER, "MySQL Source", sourceConfig); + + // Setup value sink + Configuration sinkConfig = new Configuration(); + sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true); + SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig); + + // Setup pipeline + Configuration pipelineConfig = new Configuration(); + pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, PARALLELISM); + pipelineConfig.set( + PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); + PipelineDef pipelineDef = + new PipelineDef( + sourceDef, + sinkDef, + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + pipelineConfig); + + // Execute the pipeline + PipelineExecution execution = composer.compose(pipelineDef); + Thread executeThread = + new Thread( + () -> { + try { + execution.execute(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + + executeThread.start(); + + try { + loopCheck( + () -> + outCaptor.toString().trim().split("\n").length + >= TEST_TABLE_NUMBER * (PARALLELISM + 1), + "collect enough rows", + Duration.ofSeconds(120), + Duration.ofSeconds(1)); + } finally { + executeThread.interrupt(); + } + + // Check the order and content of all received events + String outputEvents = outCaptor.toString(); + assertThat(outputEvents) + .contains( + IntStream.rangeClosed(1, TEST_TABLE_NUMBER) + .boxed() + .flatMap( + i -> + Stream.concat( + IntStream.range(0, PARALLELISM) + .boxed() + .map( + subTaskId -> + String.format( + "%d> CreateTableEvent{tableId=%s.TABLE%d, schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17)}, primaryKeys=ID, options=()}", + subTaskId, + parallelismDatabase + .getDatabaseName(), + i)), + Stream.of( + String.format( + "> DataChangeEvent{tableId=%s.TABLE%d, before=[], after=[%d, No.%d], op=INSERT, meta=()}", + parallelismDatabase + .getDatabaseName(), + i, + i, + i)))) + .collect(Collectors.toList())); + } +} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/testutils/MySqSourceTestUtils.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/testutils/MySqSourceTestUtils.java index a76838d664c..fee7ac5b9c0 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/testutils/MySqSourceTestUtils.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/testutils/MySqSourceTestUtils.java @@ -20,10 +20,13 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.cdc.common.event.CreateTableEvent; +import java.time.Duration; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Random; +import java.util.concurrent.TimeoutException; +import java.util.function.Supplier; /** Test utilities for MySQL event source. */ public class MySqSourceTestUtils { @@ -64,4 +67,18 @@ public static String getServerId(int parallelism) { } private MySqSourceTestUtils() {} + + public static void loopCheck( + Supplier runnable, String description, Duration timeout, Duration interval) + throws Exception { + long deadline = System.currentTimeMillis() + timeout.toMillis(); + while (System.currentTimeMillis() < deadline) { + if (runnable.get()) { + return; + } + Thread.sleep(interval.toMillis()); + } + throw new TimeoutException( + "Ran out of time when waiting for " + description + " to success."); + } } diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/RouteE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/RouteE2eITCase.java index 708507b9ca2..c9b26c7818b 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/RouteE2eITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/RouteE2eITCase.java @@ -55,6 +55,7 @@ public class RouteE2eITCase extends PipelineTestEnvironment { protected static final String MYSQL_DRIVER_CLASS = "com.mysql.cj.jdbc.Driver"; protected static final String INTER_CONTAINER_MYSQL_ALIAS = "mysql"; protected static final long EVENT_DEFAULT_TIMEOUT = 60000L; + protected static final int TEST_TABLE_NUMBER = 100; @ClassRule public static final MySqlContainer MYSQL = @@ -826,7 +827,7 @@ public void testExtremeMergeTableRoute() throws Exception { Statement stat = conn.createStatement()) { stat.execute(String.format("CREATE DATABASE %s;", databaseName)); stat.execute(String.format("USE %s;", databaseName)); - for (int i = 1; i <= 100; i++) { + for (int i = 1; i <= TEST_TABLE_NUMBER; i++) { stat.execute(String.format("DROP TABLE IF EXISTS TABLE%d;", i)); stat.execute( String.format( @@ -871,7 +872,7 @@ public void testExtremeMergeTableRoute() throws Exception { LOG.info("Verifying CreateTableEvents..."); validateResult( 180_000L, - IntStream.rangeClosed(1, 100) + IntStream.rangeClosed(1, TEST_TABLE_NUMBER) .mapToObj( i -> String.format( @@ -882,7 +883,7 @@ public void testExtremeMergeTableRoute() throws Exception { LOG.info("Verifying DataChangeEvents..."); validateResult( 180_000L, - IntStream.rangeClosed(1, 100) + IntStream.rangeClosed(1, TEST_TABLE_NUMBER) .mapToObj( i -> String.format(