diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java index 2223cf4e2f9..cd8fc0b45fe 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java @@ -128,13 +128,13 @@ public void testHeteroSchemaTransform() throws Exception { submitPipelineJob(pipelineJob, mysqlCdcJar, valuesCdcJar, mysqlDriverJar); waitUntilJobRunning(Duration.ofSeconds(30)); LOG.info("Pipeline job is running"); - waitUtilSpecificEvent( + waitUntilSpecificEvent( String.format( "DataChangeEvent{tableId=%s.terminus, before=[], after=[1011, 11], op=INSERT, meta=()}", transformRenameDatabase.getDatabaseName()), 6000L); - waitUtilSpecificEvent( + waitUntilSpecificEvent( String.format( "DataChangeEvent{tableId=%s.terminus, before=[], after=[2014, 14], op=INSERT, meta=()}", transformRenameDatabase.getDatabaseName()), @@ -184,19 +184,19 @@ public void testHeteroSchemaTransform() throws Exception { throw e; } - waitUtilSpecificEvent( + waitUntilSpecificEvent( String.format( "DataChangeEvent{tableId=%s.terminus, before=[], after=[3007, 7], op=INSERT, meta=()}", transformRenameDatabase.getDatabaseName()), 6000L); - waitUtilSpecificEvent( + waitUntilSpecificEvent( String.format( "DataChangeEvent{tableId=%s.terminus, before=[1009, 8.1], after=[1009, 100], op=UPDATE, meta=()}", transformRenameDatabase.getDatabaseName()), 6000L); - waitUtilSpecificEvent( + waitUntilSpecificEvent( String.format( "DataChangeEvent{tableId=%s.terminus, before=[2011, 11], after=[], op=DELETE, meta=()}", transformRenameDatabase.getDatabaseName()), @@ -206,17 +206,122 @@ public void testHeteroSchemaTransform() throws Exception { System.out.println(stdout); } - private void validateResult(List expectedEvents) { + @Test + public void testMultipleHittingTable() throws Exception { + String pipelineJob = + String.format( + "source:\n" + + " type: mysql\n" + + " hostname: %s\n" + + " port: 3306\n" + + " username: %s\n" + + " password: %s\n" + + " tables: %s.\\.*\n" + + " server-id: 5400-5404\n" + + " server-time-zone: UTC\n" + + "\n" + + "sink:\n" + + " type: values\n" + + "transform:\n" + + " - source-table: %s.TABLE\\.*\n" + + " projection: \\*, ID + 1000 as UID, VERSION AS NEWVERSION\n" + + "\n" + + "pipeline:\n" + + " parallelism: 1", + INTER_CONTAINER_MYSQL_ALIAS, + MYSQL_TEST_USER, + MYSQL_TEST_PASSWORD, + transformRenameDatabase.getDatabaseName(), + transformRenameDatabase.getDatabaseName()); + Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar"); + Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar"); + Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar"); + submitPipelineJob(pipelineJob, mysqlCdcJar, valuesCdcJar, mysqlDriverJar); + waitUntilJobRunning(Duration.ofSeconds(30)); + LOG.info("Pipeline job is running"); + + List expectedEvents = + Arrays.asList( + String.format( + "CreateTableEvent{tableId=%s.TABLEALPHA, schema=columns={`ID` INT NOT NULL,`VERSION` STRING,`PRICEALPHA` INT,`UID` INT,`NEWVERSION` STRING}, primaryKeys=ID, options=()}", + transformRenameDatabase.getDatabaseName()), + String.format( + "DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[1010, 10, 99, 2010, 10], op=INSERT, meta=()}", + transformRenameDatabase.getDatabaseName()), + String.format( + "DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[1011, 11, 59, 2011, 11], op=INSERT, meta=()}", + transformRenameDatabase.getDatabaseName()), + String.format( + "DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[1008, 8, 199, 2008, 8], op=INSERT, meta=()}", + transformRenameDatabase.getDatabaseName()), + String.format( + "DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[1009, 8.1, 0, 2009, 8.1], op=INSERT, meta=()}", + transformRenameDatabase.getDatabaseName()), + String.format( + "CreateTableEvent{tableId=%s.TABLEBETA, schema=columns={`ID` INT NOT NULL,`VERSION` STRING,`CODENAMESBETA` STRING,`UID` INT,`NEWVERSION` STRING}, primaryKeys=ID, options=()}", + transformRenameDatabase.getDatabaseName()), + String.format( + "DataChangeEvent{tableId=%s.TABLEBETA, before=[], after=[2014, 14, Sonoma, 3014, 14], op=INSERT, meta=()}", + transformRenameDatabase.getDatabaseName()), + String.format( + "DataChangeEvent{tableId=%s.TABLEBETA, before=[], after=[2012, 12, Monterey, 3012, 12], op=INSERT, meta=()}", + transformRenameDatabase.getDatabaseName()), + String.format( + "DataChangeEvent{tableId=%s.TABLEBETA, before=[], after=[2013, 13, Ventura, 3013, 13], op=INSERT, meta=()}", + transformRenameDatabase.getDatabaseName()), + String.format( + "DataChangeEvent{tableId=%s.TABLEBETA, before=[], after=[2011, 11, Big Sur, 3011, 11], op=INSERT, meta=()}", + transformRenameDatabase.getDatabaseName())); + validateResult(expectedEvents); + LOG.info("Begin incremental reading stage."); + // generate binlogs + String mysqlJdbcUrl = + String.format( + "jdbc:mysql://%s:%s/%s", + MYSQL.getHost(), + MYSQL.getDatabasePort(), + transformRenameDatabase.getDatabaseName()); + try (Connection conn = + DriverManager.getConnection( + mysqlJdbcUrl, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD); + Statement stat = conn.createStatement()) { + stat.execute("UPDATE TABLEALPHA SET VERSION='100' WHERE id=1009;"); + stat.execute("INSERT INTO TABLEALPHA VALUES (3007, '7', 79);"); + stat.execute("DELETE FROM TABLEBETA WHERE id=2011;"); + } catch (SQLException e) { + LOG.error("Update table for CDC failed.", e); + throw e; + } + + waitUntilSpecificEvent( + String.format( + "DataChangeEvent{tableId=%s.TABLEALPHA, before=[1009, 8.1, 0, 2009, 8.1], after=[1009, 100, 0, 2009, 100], op=UPDATE, meta=()}", + transformRenameDatabase.getDatabaseName()), + 6000L); + + waitUntilSpecificEvent( + String.format( + "DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[3007, 7, 79, 4007, 7], op=INSERT, meta=()}", + transformRenameDatabase.getDatabaseName()), + 6000L); + + waitUntilSpecificEvent( + String.format( + "DataChangeEvent{tableId=%s.TABLEBETA, before=[2011, 11, Big Sur, 3011, 11], after=[], op=DELETE, meta=()}", + transformRenameDatabase.getDatabaseName()), + 6000L); + String stdout = taskManagerConsumer.toUtf8String(); + System.out.println(stdout); + } + + private void validateResult(List expectedEvents) throws Exception { for (String event : expectedEvents) { - if (!stdout.contains(event)) { - throw new RuntimeException( - "failed to get specific event: " + event + " from stdout: " + stdout); - } + waitUntilSpecificEvent(event, 6000L); } } - private void waitUtilSpecificEvent(String event, long timeout) throws Exception { + private void waitUntilSpecificEvent(String event, long timeout) throws Exception { boolean result = false; long endTimeout = System.currentTimeMillis() + timeout; while (System.currentTimeMillis() < endTimeout) { diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformDataOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformDataOperator.java index 1ccbf87fd72..20479c87322 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformDataOperator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformDataOperator.java @@ -17,6 +17,7 @@ package org.apache.flink.cdc.runtime.operators.transform; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.cdc.common.data.binary.BinaryRecordData; @@ -70,9 +71,10 @@ public class TransformDataOperator extends AbstractStreamOperator /** keep the relationship of TableId and table information. */ private final Map tableInfoMap; - private transient Map + private transient Map, TransformProjectionProcessor> transformProjectionProcessorMap; - private transient Map transformFilterProcessorMap; + private transient Map, TransformFilterProcessor> + transformFilterProcessorMap; public static TransformDataOperator.Builder newBuilder() { return new TransformDataOperator.Builder(); @@ -228,13 +230,15 @@ private void transformSchema(TableId tableId, Schema schema) throws Exception { if (selectors.isMatch(tableId) && transform.f1.isPresent()) { TransformProjection transformProjection = transform.f1.get(); if (transformProjection.isValid()) { - if (!transformProjectionProcessorMap.containsKey(transformProjection)) { + if (!transformProjectionProcessorMap.containsKey( + Tuple2.of(tableId, transformProjection))) { transformProjectionProcessorMap.put( - transformProjection, + Tuple2.of(tableId, transformProjection), TransformProjectionProcessor.of(transformProjection)); } TransformProjectionProcessor transformProjectionProcessor = - transformProjectionProcessorMap.get(transformProjection); + transformProjectionProcessorMap.get( + Tuple2.of(tableId, transformProjection)); // update the columns of projection and add the column of projection into Schema transformProjectionProcessor.processSchemaChangeEvent(schema); } @@ -258,19 +262,21 @@ private Optional processDataChangeEvent(DataChangeEvent dataCha && transformProjectionOptional.isPresent() && transformProjectionOptional.get().isValid()) { TransformProjection transformProjection = transformProjectionOptional.get(); - if (!transformProjectionProcessorMap.containsKey(transformProjection) + if (!transformProjectionProcessorMap.containsKey( + Tuple2.of(tableId, transformProjection)) || !transformProjectionProcessorMap - .get(transformProjection) + .get(Tuple2.of(tableId, transformProjection)) .hasTableInfo()) { transformProjectionProcessorMap.put( - transformProjection, + Tuple2.of(tableId, transformProjection), TransformProjectionProcessor.of( getTableInfoFromSchemaEvolutionClient(tableId), transformProjection, timezone)); } TransformProjectionProcessor transformProjectionProcessor = - transformProjectionProcessorMap.get(transformProjection); + transformProjectionProcessorMap.get( + Tuple2.of(tableId, transformProjection)); dataChangeEventOptional = processProjection( transformProjectionProcessor, @@ -281,16 +287,17 @@ private Optional processDataChangeEvent(DataChangeEvent dataCha if (transformFilterOptional.isPresent() && transformFilterOptional.get().isVaild()) { TransformFilter transformFilter = transformFilterOptional.get(); - if (!transformFilterProcessorMap.containsKey(transformFilter)) { + if (!transformFilterProcessorMap.containsKey( + Tuple2.of(tableId, transformFilter))) { transformFilterProcessorMap.put( - transformFilter, + Tuple2.of(tableId, transformFilter), TransformFilterProcessor.of( getTableInfoFromSchemaEvolutionClient(tableId), transformFilter, timezone)); } TransformFilterProcessor transformFilterProcessor = - transformFilterProcessorMap.get(transformFilter); + transformFilterProcessorMap.get(Tuple2.of(tableId, transformFilter)); dataChangeEventOptional = processFilter( transformFilterProcessor, @@ -302,19 +309,21 @@ private Optional processDataChangeEvent(DataChangeEvent dataCha && transformProjectionOptional.isPresent() && transformProjectionOptional.get().isValid()) { TransformProjection transformProjection = transformProjectionOptional.get(); - if (!transformProjectionProcessorMap.containsKey(transformProjection) + if (!transformProjectionProcessorMap.containsKey( + Tuple2.of(tableId, transformProjection)) || !transformProjectionProcessorMap - .get(transformProjection) + .get(Tuple2.of(tableId, transformProjection)) .hasTableInfo()) { transformProjectionProcessorMap.put( - transformProjection, + Tuple2.of(tableId, transformProjection), TransformProjectionProcessor.of( getTableInfoFromSchemaEvolutionClient(tableId), transformProjection, timezone)); } TransformProjectionProcessor transformProjectionProcessor = - transformProjectionProcessorMap.get(transformProjection); + transformProjectionProcessorMap.get( + Tuple2.of(tableId, transformProjection)); dataChangeEventOptional = processProjection( transformProjectionProcessor,