From 17a50f0acc945792831daba89d417027aeb523f9 Mon Sep 17 00:00:00 2001 From: Hang Ruan Date: Tue, 26 Nov 2024 11:27:18 +0800 Subject: [PATCH 01/13] [FLINK-36784][common] Support to add metadata columns for data in the meta fields of DataChangeEvent at transform --- .../flink/cdc/common/source/DataSource.java | 5 + .../source/SupportedMetadataColumn.java | 40 ++++++++ .../composer/flink/FlinkPipelineComposer.java | 10 +- .../translator/DataSourceTranslator.java | 2 +- .../flink/translator/TransformTranslator.java | 13 ++- .../flink/FlinkPipelineComposerITCase.java | 28 +++--- .../FlinkPipelineComposerLenientITCase.java | 24 ++--- .../flink/FlinkPipelineUdfITCase.java | 92 +++++++++---------- .../mysql/source/MySqlDataSource.java | 6 ++ .../mysql/source/OpTsMetadataColumn.java | 51 ++++++++++ .../values/source/OpTsMetadataColumn.java | 51 ++++++++++ .../values/source/ValuesDataSource.java | 6 ++ .../values/source/ValuesDataSourceHelper.java | 35 ++++++- .../transform/PostTransformOperator.java | 51 +++++++--- .../operators/transform/PostTransformer.java | 10 +- .../transform/PreTransformOperator.java | 17 +++- .../transform/ProjectionColumnProcessor.java | 46 ++++++++-- .../transform/TransformFilterProcessor.java | 49 ++++++++-- .../TransformProjectionProcessor.java | 59 +++++++++--- .../operators/transform/TransformRule.java | 11 ++- .../cdc/runtime/parser/TransformParser.java | 52 ++++++++--- .../parser/metadata/MetadataColumns.java | 1 + .../transform/PostTransformOperatorTest.java | 10 +- .../transform/PreTransformOperatorTest.java | 31 +++++-- .../runtime/parser/TransformParserTest.java | 15 ++- 25 files changed, 558 insertions(+), 157 deletions(-) create mode 100644 flink-cdc-common/src/main/java/org/apache/flink/cdc/common/source/SupportedMetadataColumn.java create mode 100644 flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/OpTsMetadataColumn.java create mode 100644 flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/source/OpTsMetadataColumn.java diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/source/DataSource.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/source/DataSource.java index 1027207a416..096648f8b5e 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/source/DataSource.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/source/DataSource.java @@ -31,4 +31,9 @@ public interface DataSource { /** Get the {@link MetadataAccessor} for accessing metadata from external systems. */ MetadataAccessor getMetadataAccessor(); + + /** Get the {@link SupportedMetadataColumn}s of the source. */ + default SupportedMetadataColumn[] supportedMetadataColumns() { + return new SupportedMetadataColumn[0]; + } } diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/source/SupportedMetadataColumn.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/source/SupportedMetadataColumn.java new file mode 100644 index 00000000000..d4a72fdbe18 --- /dev/null +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/source/SupportedMetadataColumn.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.common.source; + +import org.apache.flink.cdc.common.annotation.Experimental; +import org.apache.flink.cdc.common.types.DataType; + +import java.io.Serializable; +import java.util.Map; + +/** A metadata column that the source supports. */ +@Experimental +public interface SupportedMetadataColumn extends Serializable { + /** Column name. */ + String getName(); + + /** The data type of this column in Flink CDC. */ + DataType getType(); + + /** The returned java class of the reader. */ + Class getJavaClass(); + + /** Read the metadata from the dataChangeEvent. */ + Object read(Map metadata); +} diff --git a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java index 579eb960759..4aa216c7d96 100644 --- a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java +++ b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java @@ -23,6 +23,7 @@ import org.apache.flink.cdc.common.pipeline.PipelineOptions; import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior; import org.apache.flink.cdc.common.sink.DataSink; +import org.apache.flink.cdc.common.source.DataSource; import org.apache.flink.cdc.composer.PipelineComposer; import org.apache.flink.cdc.composer.PipelineExecution; import org.apache.flink.cdc.composer.definition.PipelineDef; @@ -103,6 +104,9 @@ public PipelineExecution compose(PipelineDef pipelineDef) { sourceTranslator.translate( pipelineDef.getSource(), env, pipelineDefConfig, parallelism); + DataSource dataSource = + sourceTranslator.createDataSource(pipelineDef.getSource(), env, pipelineDefConfig); + // Build PreTransformOperator for processing Schema Event TransformTranslator transformTranslator = new TransformTranslator(); stream = @@ -110,7 +114,8 @@ public PipelineExecution compose(PipelineDef pipelineDef) { stream, pipelineDef.getTransforms(), pipelineDef.getUdfs(), - pipelineDef.getModels()); + pipelineDef.getModels(), + dataSource.supportedMetadataColumns()); // Schema operator SchemaOperatorTranslator schemaOperatorTranslator = @@ -129,7 +134,8 @@ public PipelineExecution compose(PipelineDef pipelineDef) { pipelineDef.getTransforms(), pipelineDef.getConfig().get(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE), pipelineDef.getUdfs(), - pipelineDef.getModels()); + pipelineDef.getModels(), + dataSource.supportedMetadataColumns()); // Build DataSink in advance as schema operator requires MetadataApplier DataSinkTranslator sinkTranslator = new DataSinkTranslator(); diff --git a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSourceTranslator.java b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSourceTranslator.java index 90bbea73ee3..40d49d8bb59 100644 --- a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSourceTranslator.java +++ b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSourceTranslator.java @@ -78,7 +78,7 @@ public DataStreamSource translate( } } - private DataSource createDataSource( + public DataSource createDataSource( SourceDef sourceDef, StreamExecutionEnvironment env, Configuration pipelineConfig) { // Search the data source factory DataSourceFactory sourceFactory = diff --git a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/TransformTranslator.java b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/TransformTranslator.java index c84911fc76f..bd2fadbc26f 100644 --- a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/TransformTranslator.java +++ b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/TransformTranslator.java @@ -19,6 +19,7 @@ import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.cdc.common.event.Event; +import org.apache.flink.cdc.common.source.SupportedMetadataColumn; import org.apache.flink.cdc.composer.definition.ModelDef; import org.apache.flink.cdc.composer.definition.TransformDef; import org.apache.flink.cdc.composer.definition.UdfDef; @@ -46,7 +47,8 @@ public DataStream translatePreTransform( DataStream input, List transforms, List udfFunctions, - List models) { + List models, + SupportedMetadataColumn[] supportedMetadataColumns) { if (transforms.isEmpty()) { return input; } @@ -61,7 +63,8 @@ public DataStream translatePreTransform( transform.getPrimaryKeys(), transform.getPartitionKeys(), transform.getTableOptions(), - transform.getPostTransformConverter()); + transform.getPostTransformConverter(), + supportedMetadataColumns); } preTransformFunctionBuilder.addUdfFunctions( @@ -77,7 +80,8 @@ public DataStream translatePostTransform( List transforms, String timezone, List udfFunctions, - List models) { + List models, + SupportedMetadataColumn[] supportedMetadataColumns) { if (transforms.isEmpty()) { return input; } @@ -93,7 +97,8 @@ public DataStream translatePostTransform( transform.getPrimaryKeys(), transform.getPartitionKeys(), transform.getTableOptions(), - transform.getPostTransformConverter()); + transform.getPostTransformConverter(), + supportedMetadataColumns); } } postTransformFunctionBuilder.addTimezone(timezone); diff --git a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java index 7820981ef68..e0223465a66 100644 --- a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java +++ b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java @@ -351,13 +351,13 @@ void testTransform(ValuesDataSink.SinkApi sinkApi) throws Exception { assertThat(outputEvents) .containsExactly( "CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING,`col2` STRING,`col12` STRING}, primaryKeys=col1, partitionKeys=col12, options=({key1=value1})}", - "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1, 10], op=INSERT, meta=()}", - "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2, 20], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1, 10], op=INSERT, meta=({op_ts=1})}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2, 20], op=INSERT, meta=({op_ts=2})}", "AddColumnEvent{tableId=default_namespace.default_schema.table1, addedColumns=[ColumnWithPosition{column=`col3` STRING, position=AFTER, existedColumnName=col2}]}", "RenameColumnEvent{tableId=default_namespace.default_schema.table1, nameMapping={col2=newCol2, col3=newCol3}}", "DropColumnEvent{tableId=default_namespace.default_schema.table1, droppedColumnNames=[newCol2]}", - "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1, 1, 10], after=[], op=DELETE, meta=()}", - "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, , 20], after=[2, x, 20], op=UPDATE, meta=()}"); + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1, 1, 10], after=[], op=DELETE, meta=({op_ts=4})}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, , 20], after=[2, x, 20], op=UPDATE, meta=({op_ts=5})}"); } @ParameterizedTest @@ -383,7 +383,7 @@ void testOpTypeMetadataColumn(ValuesDataSink.SinkApi sinkApi) throws Exception { TransformDef transformDef = new TransformDef( "default_namespace.default_schema.table1", - "*,concat(col1,'0') as col12,__data_event_type__ as rk", + "*,concat(col1,'0') as col12,__data_event_type__ as rk,op_ts as opts", "col1 <> '3'", "col1", "col12", @@ -413,14 +413,14 @@ void testOpTypeMetadataColumn(ValuesDataSink.SinkApi sinkApi) throws Exception { String[] outputEvents = outCaptor.toString().trim().split("\n"); assertThat(outputEvents) .containsExactly( - "CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING,`col2` STRING,`col12` STRING,`rk` STRING NOT NULL}, primaryKeys=col1, partitionKeys=col12, options=({key1=value1})}", - "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1, 10, +I], op=INSERT, meta=()}", - "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2, 20, +I], op=INSERT, meta=()}", + "CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING,`col2` STRING,`col12` STRING,`rk` STRING NOT NULL,`opts` BIGINT}, primaryKeys=col1, partitionKeys=col12, options=({key1=value1})}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1, 10, +I, 1], op=INSERT, meta=({op_ts=1})}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2, 20, +I, 2], op=INSERT, meta=({op_ts=2})}", "AddColumnEvent{tableId=default_namespace.default_schema.table1, addedColumns=[ColumnWithPosition{column=`col3` STRING, position=AFTER, existedColumnName=col2}]}", "RenameColumnEvent{tableId=default_namespace.default_schema.table1, nameMapping={col2=newCol2, col3=newCol3}}", "DropColumnEvent{tableId=default_namespace.default_schema.table1, droppedColumnNames=[newCol2]}", - "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1, 1, 10, -D], after=[], op=DELETE, meta=()}", - "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, , 20, -U], after=[2, x, 20, +U], op=UPDATE, meta=()}"); + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1, 1, 10, -D, 4], after=[], op=DELETE, meta=({op_ts=4})}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, , 20, -U, 5], after=[2, x, 20, +U, 5], op=UPDATE, meta=({op_ts=5})}"); } @ParameterizedTest @@ -486,13 +486,13 @@ void testTransformTwice(ValuesDataSink.SinkApi sinkApi) throws Exception { assertThat(outputEvents) .containsExactly( "CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING,`col2` STRING,`col12` STRING}, primaryKeys=col1, partitionKeys=col12, options=({key1=value1})}", - "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1, 11], op=INSERT, meta=()}", - "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2, 22], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1, 11], op=INSERT, meta=({op_ts=1})}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2, 22], op=INSERT, meta=({op_ts=2})}", "AddColumnEvent{tableId=default_namespace.default_schema.table1, addedColumns=[ColumnWithPosition{column=`col3` STRING, position=AFTER, existedColumnName=col2}]}", "RenameColumnEvent{tableId=default_namespace.default_schema.table1, nameMapping={col2=newCol2, col3=newCol3}}", "DropColumnEvent{tableId=default_namespace.default_schema.table1, droppedColumnNames=[newCol2]}", - "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1, 1, 11], after=[], op=DELETE, meta=()}", - "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, , 22], after=[2, x, 22], op=UPDATE, meta=()}"); + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1, 1, 11], after=[], op=DELETE, meta=({op_ts=4})}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, , 22], after=[2, x, 22], op=UPDATE, meta=({op_ts=5})}"); } @Test diff --git a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerLenientITCase.java b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerLenientITCase.java index 2dd6b8998f1..da33d83d59d 100644 --- a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerLenientITCase.java +++ b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerLenientITCase.java @@ -395,12 +395,12 @@ void testTransform(ValuesDataSink.SinkApi sinkApi) throws Exception { assertThat(outputEvents) .containsExactly( "CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING,`col2` STRING,`col12` STRING}, primaryKeys=col1, partitionKeys=col12, options=({key1=value1})}", - "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1, 10], op=INSERT, meta=()}", - "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2, 20], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1, 10], op=INSERT, meta=({op_ts=1})}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2, 20], op=INSERT, meta=({op_ts=2})}", "AddColumnEvent{tableId=default_namespace.default_schema.table1, addedColumns=[ColumnWithPosition{column=`col3` STRING, position=LAST, existedColumnName=null}]}", "AddColumnEvent{tableId=default_namespace.default_schema.table1, addedColumns=[ColumnWithPosition{column=`newCol2` STRING, position=LAST, existedColumnName=null}, ColumnWithPosition{column=`newCol3` STRING, position=LAST, existedColumnName=null}]}", - "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1, null, 10, null, null, 1], after=[], op=DELETE, meta=()}", - "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, null, 20, null, null, ], after=[2, null, 20, null, null, x], op=UPDATE, meta=()}"); + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1, null, 10, null, null, 1], after=[], op=DELETE, meta=({op_ts=4})}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, null, 20, null, null, ], after=[2, null, 20, null, null, x], op=UPDATE, meta=({op_ts=5})}"); } @ParameterizedTest @@ -456,12 +456,12 @@ void testOpTypeMetadataColumn(ValuesDataSink.SinkApi sinkApi) throws Exception { assertThat(outputEvents) .containsExactly( "CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING,`col2` STRING,`col12` STRING,`rk` STRING NOT NULL}, primaryKeys=col1, partitionKeys=col12, options=({key1=value1})}", - "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1, 10, +I], op=INSERT, meta=()}", - "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2, 20, +I], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1, 10, +I], op=INSERT, meta=({op_ts=1})}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2, 20, +I], op=INSERT, meta=({op_ts=2})}", "AddColumnEvent{tableId=default_namespace.default_schema.table1, addedColumns=[ColumnWithPosition{column=`col3` STRING, position=LAST, existedColumnName=null}]}", "AddColumnEvent{tableId=default_namespace.default_schema.table1, addedColumns=[ColumnWithPosition{column=`newCol2` STRING, position=LAST, existedColumnName=null}, ColumnWithPosition{column=`newCol3` STRING, position=LAST, existedColumnName=null}]}", - "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1, null, 10, -D, null, null, 1], after=[], op=DELETE, meta=()}", - "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, null, 20, -U, null, null, ], after=[2, null, 20, +U, null, null, x], op=UPDATE, meta=()}"); + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1, null, 10, -D, null, null, 1], after=[], op=DELETE, meta=({op_ts=4})}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, null, 20, -U, null, null, ], after=[2, null, 20, +U, null, null, x], op=UPDATE, meta=({op_ts=5})}"); } @ParameterizedTest @@ -526,12 +526,12 @@ void testTransformTwice(ValuesDataSink.SinkApi sinkApi) throws Exception { assertThat(outputEvents) .containsExactly( "CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING,`col2` STRING,`col12` STRING}, primaryKeys=col1, partitionKeys=col12, options=({key1=value1})}", - "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1, 11], op=INSERT, meta=()}", - "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2, 22], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1, 11], op=INSERT, meta=({op_ts=1})}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2, 22], op=INSERT, meta=({op_ts=2})}", "AddColumnEvent{tableId=default_namespace.default_schema.table1, addedColumns=[ColumnWithPosition{column=`col3` STRING, position=LAST, existedColumnName=null}]}", "AddColumnEvent{tableId=default_namespace.default_schema.table1, addedColumns=[ColumnWithPosition{column=`newCol2` STRING, position=LAST, existedColumnName=null}, ColumnWithPosition{column=`newCol3` STRING, position=LAST, existedColumnName=null}]}", - "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1, null, 11, null, null, 1], after=[], op=DELETE, meta=()}", - "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, null, 22, null, null, ], after=[2, null, 22, null, null, x], op=UPDATE, meta=()}"); + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1, null, 11, null, null, 1], after=[], op=DELETE, meta=({op_ts=4})}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, null, 22, null, null, ], after=[2, null, 22, null, null, x], op=UPDATE, meta=({op_ts=5})}"); } @Test diff --git a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineUdfITCase.java b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineUdfITCase.java index 55d5b960042..b97b4b6de7f 100644 --- a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineUdfITCase.java +++ b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineUdfITCase.java @@ -166,14 +166,14 @@ void testTransformWithUdf(ValuesDataSink.SinkApi sinkApi, String language) throw assertThat(outputEvents) .containsExactly( "CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING,`col2` STRING,`fmt` STRING}, primaryKeys=col1, options=({key1=value1})}", - "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1, from 1 to z is lie], op=INSERT, meta=()}", - "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2, from 2 to z is lie], op=INSERT, meta=()}", - "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[3, 3, from 3 to z is lie], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1, from 1 to z is lie], op=INSERT, meta=({op_ts=1})}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2, from 2 to z is lie], op=INSERT, meta=({op_ts=2})}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[3, 3, from 3 to z is lie], op=INSERT, meta=({op_ts=3})}", "AddColumnEvent{tableId=default_namespace.default_schema.table1, addedColumns=[ColumnWithPosition{column=`col3` STRING, position=AFTER, existedColumnName=col2}]}", "RenameColumnEvent{tableId=default_namespace.default_schema.table1, nameMapping={col2=newCol2, col3=newCol3}}", "DropColumnEvent{tableId=default_namespace.default_schema.table1, droppedColumnNames=[newCol2]}", - "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1, 1, from 1 to z is lie], after=[], op=DELETE, meta=()}", - "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, , from 2 to z is lie], after=[2, x, from 2 to z is lie], op=UPDATE, meta=()}"); + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1, 1, from 1 to z is lie], after=[], op=DELETE, meta=({op_ts=4})}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, , from 2 to z is lie], after=[2, x, from 2 to z is lie], op=UPDATE, meta=({op_ts=5})}"); } @ParameterizedTest @@ -237,12 +237,12 @@ void testFilterWithUdf(ValuesDataSink.SinkApi sinkApi, String language) throws E assertThat(outputEvents) .containsExactly( "CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING,`col2` STRING,`collen` STRING}, primaryKeys=col1, options=({key1=value1})}", - "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2, 3], op=INSERT, meta=()}", - "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[3, 3, 4], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2, 3], op=INSERT, meta=({op_ts=2})}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[3, 3, 4], op=INSERT, meta=({op_ts=3})}", "AddColumnEvent{tableId=default_namespace.default_schema.table1, addedColumns=[ColumnWithPosition{column=`col3` STRING, position=AFTER, existedColumnName=col2}]}", "RenameColumnEvent{tableId=default_namespace.default_schema.table1, nameMapping={col2=newCol2, col3=newCol3}}", "DropColumnEvent{tableId=default_namespace.default_schema.table1, droppedColumnNames=[newCol2]}", - "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, , 3], after=[2, x, 3], op=UPDATE, meta=()}"); + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, , 3], after=[2, x, 3], op=UPDATE, meta=({op_ts=5})}"); } @ParameterizedTest @@ -306,14 +306,14 @@ void testOverloadedUdf(ValuesDataSink.SinkApi sinkApi, String language) throws E assertThat(outputEvents) .containsExactly( "CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING,`col2` STRING,`tob` STRING,`toi` STRING,`tof` STRING,`tos` STRING}, primaryKeys=col1, options=({key1=value1})}", - "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1, Boolean: true, Integer: 1, Double: 3.14, String: str], op=INSERT, meta=()}", - "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2, Boolean: true, Integer: 1, Double: 3.14, String: str], op=INSERT, meta=()}", - "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[3, 3, Boolean: true, Integer: 1, Double: 3.14, String: str], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1, Boolean: true, Integer: 1, Double: 3.14, String: str], op=INSERT, meta=({op_ts=1})}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2, Boolean: true, Integer: 1, Double: 3.14, String: str], op=INSERT, meta=({op_ts=2})}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[3, 3, Boolean: true, Integer: 1, Double: 3.14, String: str], op=INSERT, meta=({op_ts=3})}", "AddColumnEvent{tableId=default_namespace.default_schema.table1, addedColumns=[ColumnWithPosition{column=`col3` STRING, position=AFTER, existedColumnName=col2}]}", "RenameColumnEvent{tableId=default_namespace.default_schema.table1, nameMapping={col2=newCol2, col3=newCol3}}", "DropColumnEvent{tableId=default_namespace.default_schema.table1, droppedColumnNames=[newCol2]}", - "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1, 1, Boolean: true, Integer: 1, Double: 3.14, String: str], after=[], op=DELETE, meta=()}", - "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, , Boolean: true, Integer: 1, Double: 3.14, String: str], after=[2, x, Boolean: true, Integer: 1, Double: 3.14, String: str], op=UPDATE, meta=()}"); + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1, 1, Boolean: true, Integer: 1, Double: 3.14, String: str], after=[], op=DELETE, meta=({op_ts=4})}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, , Boolean: true, Integer: 1, Double: 3.14, String: str], after=[2, x, Boolean: true, Integer: 1, Double: 3.14, String: str], op=UPDATE, meta=({op_ts=5})}"); } @ParameterizedTest @@ -378,14 +378,14 @@ void testUdfLifecycle(ValuesDataSink.SinkApi sinkApi, String language) throws Ex .contains("[ LifecycleFunction ] opened.") .contains( "CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING,`col2` STRING,`stt` STRING}, primaryKeys=col1, options=({key1=value1})}", - "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1, #0], op=INSERT, meta=()}", - "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2, #1], op=INSERT, meta=()}", - "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[3, 3, #2], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1, #0], op=INSERT, meta=({op_ts=1})}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2, #1], op=INSERT, meta=({op_ts=2})}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[3, 3, #2], op=INSERT, meta=({op_ts=3})}", "AddColumnEvent{tableId=default_namespace.default_schema.table1, addedColumns=[ColumnWithPosition{column=`col3` STRING, position=AFTER, existedColumnName=col2}]}", "RenameColumnEvent{tableId=default_namespace.default_schema.table1, nameMapping={col2=newCol2, col3=newCol3}}", "DropColumnEvent{tableId=default_namespace.default_schema.table1, droppedColumnNames=[newCol2]}", - "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1, 1, #3], after=[], op=DELETE, meta=()}", - "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, , #4], after=[2, x, #5], op=UPDATE, meta=()}") + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1, 1, #3], after=[], op=DELETE, meta=({op_ts=4})}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, , #4], after=[2, x, #5], op=UPDATE, meta=({op_ts=5})}") .contains("[ LifecycleFunction ] closed. Called 6 times."); } @@ -450,14 +450,14 @@ void testTypeHintedUdf(ValuesDataSink.SinkApi sinkApi, String language) throws E assertThat(outputEvents) .containsExactly( "CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING,`col2` STRING,`ans` STRING}, primaryKeys=col1, options=({key1=value1})}", - "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1, Forty-two], op=INSERT, meta=()}", - "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2, Forty-two], op=INSERT, meta=()}", - "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[3, 3, Forty-two], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1, Forty-two], op=INSERT, meta=({op_ts=1})}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2, Forty-two], op=INSERT, meta=({op_ts=2})}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[3, 3, Forty-two], op=INSERT, meta=({op_ts=3})}", "AddColumnEvent{tableId=default_namespace.default_schema.table1, addedColumns=[ColumnWithPosition{column=`col3` STRING, position=AFTER, existedColumnName=col2}]}", "RenameColumnEvent{tableId=default_namespace.default_schema.table1, nameMapping={col2=newCol2, col3=newCol3}}", "DropColumnEvent{tableId=default_namespace.default_schema.table1, droppedColumnNames=[newCol2]}", - "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1, 1, Forty-two], after=[], op=DELETE, meta=()}", - "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, , Forty-two], after=[2, x, Forty-two], op=UPDATE, meta=()}"); + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1, 1, Forty-two], after=[], op=DELETE, meta=({op_ts=4})}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, , Forty-two], after=[2, x, Forty-two], op=UPDATE, meta=({op_ts=5})}"); } @ParameterizedTest @@ -535,14 +535,14 @@ void testComplicatedUdf(ValuesDataSink.SinkApi sinkApi, String language) throws .contains("[ LifecycleFunction ] opened.") .contains( "CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING,`col2` STRING,`inccol` STRING,`typ` STRING,`fmt` STRING,`stt` STRING}, primaryKeys=col1, options=({key1=value1})}", - "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1, 3, Integer: 42, 1-42, #0], op=INSERT, meta=()}", - "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2, 4, Integer: 42, 2-42, #1], op=INSERT, meta=()}", - "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[3, 3, 5, Integer: 42, 3-42, #2], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1, 3, Integer: 42, 1-42, #0], op=INSERT, meta=({op_ts=1})}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2, 4, Integer: 42, 2-42, #1], op=INSERT, meta=({op_ts=2})}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[3, 3, 5, Integer: 42, 3-42, #2], op=INSERT, meta=({op_ts=3})}", "AddColumnEvent{tableId=default_namespace.default_schema.table1, addedColumns=[ColumnWithPosition{column=`col3` STRING, position=AFTER, existedColumnName=col2}]}", "RenameColumnEvent{tableId=default_namespace.default_schema.table1, nameMapping={col2=newCol2, col3=newCol3}}", "DropColumnEvent{tableId=default_namespace.default_schema.table1, droppedColumnNames=[newCol2]}", - "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1, 1, 3, Integer: 42, 1-42, #3], after=[], op=DELETE, meta=()}", - "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, , 4, Integer: 42, 2-42, #4], after=[2, x, 4, Integer: 42, 2-42, #5], op=UPDATE, meta=()}") + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1, 1, 3, Integer: 42, 1-42, #3], after=[], op=DELETE, meta=({op_ts=4})}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, , 4, Integer: 42, 2-42, #4], after=[2, x, 4, Integer: 42, 2-42, #5], op=UPDATE, meta=({op_ts=5})}") .contains("[ LifecycleFunction ] closed. Called 6 times."); } @@ -610,14 +610,14 @@ void testTransformWithFlinkUdf(ValuesDataSink.SinkApi sinkApi, String language) assertThat(outputEvents) .containsExactly( "CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING,`col2` STRING,`fmt` STRING}, primaryKeys=col1, options=({key1=value1})}", - "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1, from 1 to z is lie], op=INSERT, meta=()}", - "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2, from 2 to z is lie], op=INSERT, meta=()}", - "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[3, 3, from 3 to z is lie], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1, from 1 to z is lie], op=INSERT, meta=({op_ts=1})}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2, from 2 to z is lie], op=INSERT, meta=({op_ts=2})}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[3, 3, from 3 to z is lie], op=INSERT, meta=({op_ts=3})}", "AddColumnEvent{tableId=default_namespace.default_schema.table1, addedColumns=[ColumnWithPosition{column=`col3` STRING, position=AFTER, existedColumnName=col2}]}", "RenameColumnEvent{tableId=default_namespace.default_schema.table1, nameMapping={col2=newCol2, col3=newCol3}}", "DropColumnEvent{tableId=default_namespace.default_schema.table1, droppedColumnNames=[newCol2]}", - "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1, 1, from 1 to z is lie], after=[], op=DELETE, meta=()}", - "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, , from 2 to z is lie], after=[2, x, from 2 to z is lie], op=UPDATE, meta=()}"); + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1, 1, from 1 to z is lie], after=[], op=DELETE, meta=({op_ts=4})}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, , from 2 to z is lie], after=[2, x, from 2 to z is lie], op=UPDATE, meta=({op_ts=5})}"); } @ParameterizedTest @@ -680,12 +680,12 @@ void testFilterWithFlinkUdf(ValuesDataSink.SinkApi sinkApi, String language) thr assertThat(outputEvents) .containsExactly( "CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING,`col2` STRING,`collen` STRING}, primaryKeys=col1, options=({key1=value1})}", - "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2, 3], op=INSERT, meta=()}", - "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[3, 3, 4], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2, 3], op=INSERT, meta=({op_ts=2})}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[3, 3, 4], op=INSERT, meta=({op_ts=3})}", "AddColumnEvent{tableId=default_namespace.default_schema.table1, addedColumns=[ColumnWithPosition{column=`col3` STRING, position=AFTER, existedColumnName=col2}]}", "RenameColumnEvent{tableId=default_namespace.default_schema.table1, nameMapping={col2=newCol2, col3=newCol3}}", "DropColumnEvent{tableId=default_namespace.default_schema.table1, droppedColumnNames=[newCol2]}", - "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, , 3], after=[2, x, 3], op=UPDATE, meta=()}"); + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, , 3], after=[2, x, 3], op=UPDATE, meta=({op_ts=5})}"); } @ParameterizedTest @@ -748,14 +748,14 @@ void testOverloadedFlinkUdf(ValuesDataSink.SinkApi sinkApi, String language) thr assertThat(outputEvents) .containsExactly( "CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING,`col2` STRING,`tob` STRING,`toi` STRING,`tof` STRING,`tos` STRING}, primaryKeys=col1, options=({key1=value1})}", - "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1, Boolean: true, Integer: 1, Double: 3.14, String: str], op=INSERT, meta=()}", - "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2, Boolean: true, Integer: 1, Double: 3.14, String: str], op=INSERT, meta=()}", - "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[3, 3, Boolean: true, Integer: 1, Double: 3.14, String: str], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1, Boolean: true, Integer: 1, Double: 3.14, String: str], op=INSERT, meta=({op_ts=1})}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2, Boolean: true, Integer: 1, Double: 3.14, String: str], op=INSERT, meta=({op_ts=2})}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[3, 3, Boolean: true, Integer: 1, Double: 3.14, String: str], op=INSERT, meta=({op_ts=3})}", "AddColumnEvent{tableId=default_namespace.default_schema.table1, addedColumns=[ColumnWithPosition{column=`col3` STRING, position=AFTER, existedColumnName=col2}]}", "RenameColumnEvent{tableId=default_namespace.default_schema.table1, nameMapping={col2=newCol2, col3=newCol3}}", "DropColumnEvent{tableId=default_namespace.default_schema.table1, droppedColumnNames=[newCol2]}", - "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1, 1, Boolean: true, Integer: 1, Double: 3.14, String: str], after=[], op=DELETE, meta=()}", - "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, , Boolean: true, Integer: 1, Double: 3.14, String: str], after=[2, x, Boolean: true, Integer: 1, Double: 3.14, String: str], op=UPDATE, meta=()}"); + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1, 1, Boolean: true, Integer: 1, Double: 3.14, String: str], after=[], op=DELETE, meta=({op_ts=4})}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, , Boolean: true, Integer: 1, Double: 3.14, String: str], after=[2, x, Boolean: true, Integer: 1, Double: 3.14, String: str], op=UPDATE, meta=({op_ts=5})}"); } @ParameterizedTest @@ -827,14 +827,14 @@ void testComplicatedFlinkUdf(ValuesDataSink.SinkApi sinkApi, String language) th assertThat(outputEvents) .contains( "CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING,`col2` STRING,`inccol` STRING,`typ` STRING,`fmt` STRING}, primaryKeys=col1, options=({key1=value1})}", - "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1, 3, Integer: 42, 1-42], op=INSERT, meta=()}", - "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2, 4, Integer: 42, 2-42], op=INSERT, meta=()}", - "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[3, 3, 5, Integer: 42, 3-42], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1, 3, Integer: 42, 1-42], op=INSERT, meta=({op_ts=1})}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2, 4, Integer: 42, 2-42], op=INSERT, meta=({op_ts=2})}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[3, 3, 5, Integer: 42, 3-42], op=INSERT, meta=({op_ts=3})}", "AddColumnEvent{tableId=default_namespace.default_schema.table1, addedColumns=[ColumnWithPosition{column=`col3` STRING, position=AFTER, existedColumnName=col2}]}", "RenameColumnEvent{tableId=default_namespace.default_schema.table1, nameMapping={col2=newCol2, col3=newCol3}}", "DropColumnEvent{tableId=default_namespace.default_schema.table1, droppedColumnNames=[newCol2]}", - "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1, 1, 3, Integer: 42, 1-42], after=[], op=DELETE, meta=()}", - "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, , 4, Integer: 42, 2-42], after=[2, x, 4, Integer: 42, 2-42], op=UPDATE, meta=()}"); + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1, 1, 3, Integer: 42, 1-42], after=[], op=DELETE, meta=({op_ts=4})}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, , 4, Integer: 42, 2-42], after=[2, x, 4, Integer: 42, 2-42], op=UPDATE, meta=({op_ts=5})}"); } @ParameterizedTest diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSource.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSource.java index d1dc487c04e..7dd5059d178 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSource.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSource.java @@ -24,6 +24,7 @@ import org.apache.flink.cdc.common.source.EventSourceProvider; import org.apache.flink.cdc.common.source.FlinkSourceProvider; import org.apache.flink.cdc.common.source.MetadataAccessor; +import org.apache.flink.cdc.common.source.SupportedMetadataColumn; import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig; import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory; import org.apache.flink.cdc.connectors.mysql.source.reader.MySqlPipelineRecordEmitter; @@ -67,4 +68,9 @@ public MetadataAccessor getMetadataAccessor() { public MySqlSourceConfig getSourceConfig() { return sourceConfig; } + + @Override + public SupportedMetadataColumn[] supportedMetadataColumns() { + return new SupportedMetadataColumn[] {new OpTsMetadataColumn()}; + } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/OpTsMetadataColumn.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/OpTsMetadataColumn.java new file mode 100644 index 00000000000..5a2393f2d79 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/OpTsMetadataColumn.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.mysql.source; + +import org.apache.flink.cdc.common.source.SupportedMetadataColumn; +import org.apache.flink.cdc.common.types.DataType; +import org.apache.flink.cdc.common.types.DataTypes; + +import java.util.Map; + +/** A {@link SupportedMetadataColumn} for op_ts. */ +public class OpTsMetadataColumn implements SupportedMetadataColumn { + + @Override + public String getName() { + return "op_ts"; + } + + @Override + public DataType getType() { + return DataTypes.BIGINT().notNull(); + } + + @Override + public Class getJavaClass() { + return Long.class; + } + + @Override + public Object read(Map metadata) { + if (metadata.containsKey(getName())) { + return Long.parseLong(metadata.get(getName())); + } + throw new IllegalArgumentException("op_ts doesn't exist in the metadata: " + metadata); + } +} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/source/OpTsMetadataColumn.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/source/OpTsMetadataColumn.java new file mode 100644 index 00000000000..f6f1f437a90 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/source/OpTsMetadataColumn.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.values.source; + +import org.apache.flink.cdc.common.source.SupportedMetadataColumn; +import org.apache.flink.cdc.common.types.DataType; +import org.apache.flink.cdc.common.types.DataTypes; + +import java.util.Map; + +/** A {@link SupportedMetadataColumn} for op_ts. */ +public class OpTsMetadataColumn implements SupportedMetadataColumn { + + @Override + public String getName() { + return "op_ts"; + } + + @Override + public DataType getType() { + return DataTypes.BIGINT(); + } + + @Override + public Class getJavaClass() { + return Long.class; + } + + @Override + public Object read(Map metadata) { + if (metadata.containsKey(getName())) { + return Long.parseLong(metadata.get(getName())); + } + return null; + } +} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/source/ValuesDataSource.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/source/ValuesDataSource.java index 61699a5d031..60ed244a522 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/source/ValuesDataSource.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/source/ValuesDataSource.java @@ -34,6 +34,7 @@ import org.apache.flink.cdc.common.source.EventSourceProvider; import org.apache.flink.cdc.common.source.FlinkSourceProvider; import org.apache.flink.cdc.common.source.MetadataAccessor; +import org.apache.flink.cdc.common.source.SupportedMetadataColumn; import org.apache.flink.cdc.connectors.values.ValuesDatabase; import org.apache.flink.core.io.InputStatus; import org.apache.flink.core.io.SimpleVersionedSerializer; @@ -80,6 +81,11 @@ public MetadataAccessor getMetadataAccessor() { return new ValuesDatabase.ValuesMetadataAccessor(); } + @Override + public SupportedMetadataColumn[] supportedMetadataColumns() { + return new SupportedMetadataColumn[] {new OpTsMetadataColumn()}; + } + /** * A flink {@link Source} implementation for end to end tests, splits are created by {@link * ValuesDataSourceHelper}. diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/source/ValuesDataSourceHelper.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/source/ValuesDataSourceHelper.java index 25999c1bf7b..af9c1890b2d 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/source/ValuesDataSourceHelper.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/source/ValuesDataSourceHelper.java @@ -550,7 +550,12 @@ public static List> transformTable() { new Object[] { BinaryStringData.fromString("1"), BinaryStringData.fromString("1") - })); + }), + new HashMap() { + { + put("op_ts", "1"); + } + }); split1.add(insertEvent1); DataChangeEvent insertEvent2 = DataChangeEvent.insertEvent( @@ -559,7 +564,12 @@ public static List> transformTable() { new Object[] { BinaryStringData.fromString("2"), BinaryStringData.fromString("2") - })); + }), + new HashMap() { + { + put("op_ts", "2"); + } + }); split1.add(insertEvent2); DataChangeEvent insertEvent3 = DataChangeEvent.insertEvent( @@ -568,7 +578,12 @@ public static List> transformTable() { new Object[] { BinaryStringData.fromString("3"), BinaryStringData.fromString("3") - })); + }), + new HashMap() { + { + put("op_ts", "3"); + } + }); split1.add(insertEvent3); // add column @@ -599,7 +614,12 @@ public static List> transformTable() { new Object[] { BinaryStringData.fromString("1"), BinaryStringData.fromString("1") - }))); + }), + new HashMap() { + { + put("op_ts", "4"); + } + })); // update split1.add( @@ -614,7 +634,12 @@ public static List> transformTable() { new Object[] { BinaryStringData.fromString("2"), BinaryStringData.fromString("x") - }))); + }), + new HashMap() { + { + put("op_ts", "5"); + } + })); eventOfSplits.add(split1); return eventOfSplits; diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java index 68f4526f057..56e243d3148 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java @@ -31,6 +31,7 @@ import org.apache.flink.cdc.common.pipeline.PipelineOptions; import org.apache.flink.cdc.common.schema.Schema; import org.apache.flink.cdc.common.schema.Selectors; +import org.apache.flink.cdc.common.source.SupportedMetadataColumn; import org.apache.flink.cdc.common.udf.UserDefinedFunctionContext; import org.apache.flink.cdc.common.utils.SchemaUtils; import org.apache.flink.cdc.runtime.operators.transform.converter.PostTransformConverter; @@ -104,7 +105,8 @@ public PostTransformOperator.Builder addTransform( String primaryKey, String partitionKey, String tableOptions, - String postTransformConverter) { + String postTransformConverter, + SupportedMetadataColumn[] supportedMetadataColumns) { transformRules.add( new TransformRule( tableInclusions, @@ -113,14 +115,23 @@ public PostTransformOperator.Builder addTransform( primaryKey, partitionKey, tableOptions, - postTransformConverter)); + postTransformConverter, + supportedMetadataColumns)); return this; } public PostTransformOperator.Builder addTransform( String tableInclusions, @Nullable String projection, @Nullable String filter) { transformRules.add( - new TransformRule(tableInclusions, projection, filter, "", "", "", null)); + new TransformRule( + tableInclusions, + projection, + filter, + "", + "", + "", + null, + new SupportedMetadataColumn[0])); return this; } @@ -192,7 +203,8 @@ public void open() throws Exception { TransformFilter.of(filterExpression, udfDescriptors) .orElse(null), PostTransformConverters.of( - transformRule.getPostTransformConverter())); + transformRule.getPostTransformConverter()), + transformRule.getSupportedMetadataColumns()); }) .collect(Collectors.toList()); this.transformProjectionProcessorMap = new ConcurrentHashMap<>(); @@ -265,7 +277,8 @@ private Optional cacheSchema(SchemaChangeEvent event) throws .map(TransformProjection::getProjection) .orElse(null), createTableEvent.getSchema().getColumns(), - udfDescriptors) + udfDescriptors, + rule.getSupportedMetadataColumns()) .stream()) .map(ProjectionColumn::getColumnName) .collect(Collectors.toSet()); @@ -351,13 +364,16 @@ private Schema transformSchema(TableId tableId, Schema schema) { transformProjection, timezone, udfDescriptors, - getUdfFunctionInstances())); + getUdfFunctionInstances(), + transform.getSupportedMetadataColumns())); } TransformProjectionProcessor postTransformProcessor = transformProjectionProcessorMap.get( Tuple2.of(tableId, transformProjection)); // update the columns of projection and add the column of projection into Schema - newSchemas.add(postTransformProcessor.processSchemaChangeEvent(schema)); + newSchemas.add( + postTransformProcessor.processSchemaChangeEvent( + schema, transform.getSupportedMetadataColumns())); } } } @@ -401,7 +417,8 @@ private Optional processDataChangeEvent(DataChangeEvent dataCha transformFilter, timezone, udfDescriptors, - getUdfFunctionInstances())); + getUdfFunctionInstances(), + transform.getSupportedMetadataColumns())); } TransformFilterProcessor transformFilterProcessor = transformFilterProcessorMap.get(Tuple2.of(tableId, transformFilter)); @@ -427,7 +444,8 @@ private Optional processDataChangeEvent(DataChangeEvent dataCha transformProjection, timezone, udfDescriptors, - getUdfFunctionInstances())); + getUdfFunctionInstances(), + transform.getSupportedMetadataColumns())); } TransformProjectionProcessor postTransformProcessor = transformProjectionProcessorMap.get( @@ -474,17 +492,18 @@ private Optional processFilter( throws Exception { BinaryRecordData before = (BinaryRecordData) dataChangeEvent.before(); BinaryRecordData after = (BinaryRecordData) dataChangeEvent.after(); + Map meta = dataChangeEvent.meta(); // insert and update event only process afterData, delete only process beforeData if (after != null) { if (transformFilterProcessor.process( - after, epochTime, opTypeToRowKind(dataChangeEvent.op(), '+'))) { + after, epochTime, opTypeToRowKind(dataChangeEvent.op(), '+'), meta)) { return Optional.of(dataChangeEvent); } else { return Optional.empty(); } } else if (before != null) { if (transformFilterProcessor.process( - before, epochTime, opTypeToRowKind(dataChangeEvent.op(), '-'))) { + before, epochTime, opTypeToRowKind(dataChangeEvent.op(), '-'), meta)) { return Optional.of(dataChangeEvent); } else { return Optional.empty(); @@ -502,13 +521,19 @@ private Optional processProjection( if (before != null) { BinaryRecordData projectedBefore = postTransformProcessor.processData( - before, epochTime, opTypeToRowKind(dataChangeEvent.op(), '-')); + before, + epochTime, + opTypeToRowKind(dataChangeEvent.op(), '-'), + dataChangeEvent.meta()); dataChangeEvent = DataChangeEvent.projectBefore(dataChangeEvent, projectedBefore); } if (after != null) { BinaryRecordData projectedAfter = postTransformProcessor.processData( - after, epochTime, opTypeToRowKind(dataChangeEvent.op(), '+')); + after, + epochTime, + opTypeToRowKind(dataChangeEvent.op(), '+'), + dataChangeEvent.meta()); dataChangeEvent = DataChangeEvent.projectAfter(dataChangeEvent, projectedAfter); } return Optional.of(dataChangeEvent); diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformer.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformer.java index 06045bf4291..c448e9903fc 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformer.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformer.java @@ -18,6 +18,7 @@ package org.apache.flink.cdc.runtime.operators.transform; import org.apache.flink.cdc.common.schema.Selectors; +import org.apache.flink.cdc.common.source.SupportedMetadataColumn; import org.apache.flink.cdc.runtime.operators.transform.converter.PostTransformConverter; import javax.annotation.Nullable; @@ -27,6 +28,7 @@ /** Post-Transformation rule used by {@link PostTransformOperator}. */ public class PostTransformer { private final Selectors selectors; + private final SupportedMetadataColumn[] supportedMetadataColumns; private final Optional projection; private final Optional filter; @@ -36,11 +38,13 @@ public PostTransformer( Selectors selectors, @Nullable TransformProjection projection, @Nullable TransformFilter filter, - Optional postTransformConverter) { + Optional postTransformConverter, + SupportedMetadataColumn[] supportedMetadataColumns) { this.selectors = selectors; this.projection = projection != null ? Optional.of(projection) : Optional.empty(); this.filter = filter != null ? Optional.of(filter) : Optional.empty(); this.postTransformConverter = postTransformConverter; + this.supportedMetadataColumns = supportedMetadataColumns; } public Selectors getSelectors() { @@ -58,4 +62,8 @@ public Optional getFilter() { public Optional getPostTransformConverter() { return postTransformConverter; } + + public SupportedMetadataColumn[] getSupportedMetadataColumns() { + return supportedMetadataColumns; + } } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java index 6ee0d72e984..4a3c51583a7 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java @@ -33,6 +33,7 @@ import org.apache.flink.cdc.common.schema.Column; import org.apache.flink.cdc.common.schema.Schema; import org.apache.flink.cdc.common.schema.Selectors; +import org.apache.flink.cdc.common.source.SupportedMetadataColumn; import org.apache.flink.cdc.common.utils.SchemaUtils; import org.apache.flink.cdc.runtime.parser.TransformParser; import org.apache.flink.runtime.state.StateInitializationContext; @@ -89,7 +90,15 @@ public static class Builder { public PreTransformOperator.Builder addTransform( String tableInclusions, @Nullable String projection, @Nullable String filter) { transformRules.add( - new TransformRule(tableInclusions, projection, filter, "", "", "", null)); + new TransformRule( + tableInclusions, + projection, + filter, + "", + "", + "", + null, + new SupportedMetadataColumn[0])); return this; } @@ -100,7 +109,8 @@ public PreTransformOperator.Builder addTransform( String primaryKey, String partitionKey, String tableOption, - @Nullable String postTransformConverter) { + @Nullable String postTransformConverter, + SupportedMetadataColumn[] supportedMetadataColumns) { transformRules.add( new TransformRule( tableInclusions, @@ -109,7 +119,8 @@ public PreTransformOperator.Builder addTransform( primaryKey, partitionKey, tableOption, - postTransformConverter)); + postTransformConverter, + supportedMetadataColumns)); return this; } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java index ee7740d9e2a..292ba416a52 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java @@ -20,6 +20,7 @@ import org.apache.flink.cdc.common.data.RecordData; import org.apache.flink.cdc.common.data.binary.BinaryRecordData; import org.apache.flink.cdc.common.schema.Column; +import org.apache.flink.cdc.common.source.SupportedMetadataColumn; import org.apache.flink.cdc.runtime.parser.JaninoCompiler; import org.apache.flink.cdc.runtime.parser.metadata.MetadataColumns; import org.apache.flink.cdc.runtime.typeutils.DataTypeConverter; @@ -32,6 +33,8 @@ import java.util.ArrayList; import java.util.LinkedHashSet; import java.util.List; +import java.util.Map; +import java.util.stream.Stream; import static org.apache.flink.cdc.runtime.parser.metadata.MetadataColumns.METADATA_COLUMNS; @@ -47,6 +50,7 @@ public class ProjectionColumnProcessor { private String timezone; private TransformExpressionKey transformExpressionKey; private final List udfDescriptors; + private final SupportedMetadataColumn[] supportedMetadataColumns; private final transient List udfFunctionInstances; private transient ExpressionEvaluator expressionEvaluator; @@ -55,11 +59,13 @@ public ProjectionColumnProcessor( ProjectionColumn projectionColumn, String timezone, List udfDescriptors, - final List udfFunctionInstances) { + final List udfFunctionInstances, + SupportedMetadataColumn[] supportedMetadataColumns) { this.tableInfo = tableInfo; this.projectionColumn = projectionColumn; this.timezone = timezone; this.udfDescriptors = udfDescriptors; + this.supportedMetadataColumns = supportedMetadataColumns; this.transformExpressionKey = generateTransformExpressionKey(); this.expressionEvaluator = TransformExpressionCompiler.compileExpression( @@ -72,18 +78,25 @@ public static ProjectionColumnProcessor of( ProjectionColumn projectionColumn, String timezone, List udfDescriptors, - List udfFunctionInstances) { + List udfFunctionInstances, + SupportedMetadataColumn[] supportedMetadataColumns) { return new ProjectionColumnProcessor( - tableInfo, projectionColumn, timezone, udfDescriptors, udfFunctionInstances); + tableInfo, + projectionColumn, + timezone, + udfDescriptors, + udfFunctionInstances, + supportedMetadataColumns); } public ProjectionColumn getProjectionColumn() { return projectionColumn; } - public Object evaluate(BinaryRecordData record, long epochTime, String opType) { + public Object evaluate( + BinaryRecordData record, long epochTime, String opType, Map meta) { try { - return expressionEvaluator.evaluate(generateParams(record, epochTime, opType)); + return expressionEvaluator.evaluate(generateParams(record, epochTime, opType, meta)); } catch (InvocationTargetException e) { LOG.error( "Table:{} column:{} projection:{} execute failed. {}", @@ -95,7 +108,8 @@ public Object evaluate(BinaryRecordData record, long epochTime, String opType) { } } - private Object[] generateParams(BinaryRecordData record, long epochTime, String opType) { + private Object[] generateParams( + BinaryRecordData record, long epochTime, String opType, Map meta) { List params = new ArrayList<>(); List columns = tableInfo.getPreTransformedSchema().getColumns(); @@ -119,6 +133,18 @@ private Object[] generateParams(BinaryRecordData record, long epochTime, String continue; } + boolean foundInMeta = false; + for (SupportedMetadataColumn supportedMetadataColumn : supportedMetadataColumns) { + if (supportedMetadataColumn.getName().equals(originalColumnName)) { + params.add(supportedMetadataColumn.read(meta)); + foundInMeta = true; + break; + } + } + if (foundInMeta) { + continue; + } + boolean argumentFound = false; for (int i = 0; i < columns.size(); i++) { Column column = columns.get(i); @@ -171,6 +197,14 @@ private TransformExpressionKey generateTransformExpressionKey() { argumentNames.add(col.f0); paramTypes.add(col.f2); }); + Stream.of(supportedMetadataColumns) + .filter(col -> col.getName().equals(originalColumnName)) + .findFirst() + .ifPresent( + col -> { + argumentNames.add(col.getName()); + paramTypes.add(col.getJavaClass()); + }); } argumentNames.add(JaninoCompiler.DEFAULT_TIME_ZONE); diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilterProcessor.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilterProcessor.java index 430813d7fd8..7b6b4fad728 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilterProcessor.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilterProcessor.java @@ -21,6 +21,7 @@ import org.apache.flink.cdc.common.data.RecordData; import org.apache.flink.cdc.common.data.binary.BinaryRecordData; import org.apache.flink.cdc.common.schema.Column; +import org.apache.flink.cdc.common.source.SupportedMetadataColumn; import org.apache.flink.cdc.runtime.parser.JaninoCompiler; import org.apache.flink.cdc.runtime.parser.metadata.MetadataColumns; import org.apache.flink.cdc.runtime.typeutils.DataTypeConverter; @@ -33,6 +34,8 @@ import java.util.ArrayList; import java.util.LinkedHashSet; import java.util.List; +import java.util.Map; +import java.util.stream.Stream; import static org.apache.flink.cdc.runtime.parser.metadata.MetadataColumns.METADATA_COLUMNS; @@ -45,16 +48,19 @@ public class TransformFilterProcessor { private TransformExpressionKey transformExpressionKey; private final transient List udfFunctionInstances; private transient ExpressionEvaluator expressionEvaluator; + private final SupportedMetadataColumn[] supportedMetadataColumns; public TransformFilterProcessor( PostTransformChangeInfo tableInfo, TransformFilter transformFilter, String timezone, List udfDescriptors, - List udfFunctionInstances) { + List udfFunctionInstances, + SupportedMetadataColumn[] supportedMetadataColumns) { this.tableInfo = tableInfo; this.transformFilter = transformFilter; this.timezone = timezone; + this.supportedMetadataColumns = supportedMetadataColumns; this.transformExpressionKey = generateTransformExpressionKey(); this.udfFunctionInstances = udfFunctionInstances; this.expressionEvaluator = @@ -67,15 +73,22 @@ public static TransformFilterProcessor of( TransformFilter transformFilter, String timezone, List udfDescriptors, - List udfFunctionInstances) { + List udfFunctionInstances, + SupportedMetadataColumn[] supportedMetadataColumns) { return new TransformFilterProcessor( - tableInfo, transformFilter, timezone, udfDescriptors, udfFunctionInstances); + tableInfo, + transformFilter, + timezone, + udfDescriptors, + udfFunctionInstances, + supportedMetadataColumns); } - public boolean process(BinaryRecordData record, long epochTime, String opType) { + public boolean process( + BinaryRecordData record, long epochTime, String opType, Map meta) { try { return (Boolean) - expressionEvaluator.evaluate(generateParams(record, epochTime, opType)); + expressionEvaluator.evaluate(generateParams(record, epochTime, opType, meta)); } catch (InvocationTargetException e) { LOG.error( "Table:{} filter:{} execute failed. {}", @@ -110,10 +123,21 @@ private Tuple2, List>> generateArguments() { argTypes.add(col.f2); } }); + + Stream.of(supportedMetadataColumns) + .forEach( + col -> { + if (scriptExpression.contains(col.getName()) + && !argNames.contains(col.getName())) { + argNames.add(col.getName()); + argTypes.add(col.getJavaClass()); + } + }); return Tuple2.of(argNames, argTypes); } - private Object[] generateParams(BinaryRecordData record, long epochTime, String opType) { + private Object[] generateParams( + BinaryRecordData record, long epochTime, String opType, Map meta) { List params = new ArrayList<>(); List columns = tableInfo.getPreTransformedSchema().getColumns(); @@ -135,6 +159,19 @@ private Object[] generateParams(BinaryRecordData record, long epochTime, String params.add(opType); continue; } + + boolean foundInMeta = false; + for (SupportedMetadataColumn supportedMetadataColumn : supportedMetadataColumns) { + if (supportedMetadataColumn.getName().equals(columnName)) { + params.add(supportedMetadataColumn.read(meta)); + foundInMeta = true; + break; + } + } + if (foundInMeta) { + continue; + } + for (int i = 0; i < columns.size(); i++) { Column column = columns.get(i); if (column.getName().equals(columnName)) { diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformProjectionProcessor.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformProjectionProcessor.java index 45ea3577094..abedd912083 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformProjectionProcessor.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformProjectionProcessor.java @@ -21,6 +21,7 @@ import org.apache.flink.cdc.common.data.binary.BinaryRecordData; import org.apache.flink.cdc.common.schema.Column; import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.cdc.common.source.SupportedMetadataColumn; import org.apache.flink.cdc.common.types.DataType; import org.apache.flink.cdc.runtime.parser.TransformParser; import org.apache.flink.cdc.runtime.typeutils.DataTypeConverter; @@ -30,6 +31,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; @@ -59,7 +61,8 @@ public TransformProjectionProcessor( TransformProjection transformProjection, String timezone, List udfDescriptors, - final List udfFunctionInstances) { + final List udfFunctionInstances, + SupportedMetadataColumn[] supportedMetadataColumns) { this.postTransformChangeInfo = postTransformChangeInfo; this.transformProjection = transformProjection; this.timezone = timezone; @@ -68,7 +71,8 @@ public TransformProjectionProcessor( // Create cached projection column processors after setting all other fields. this.cachedProjectionColumnProcessors = - cacheProjectionColumnProcessors(postTransformChangeInfo, transformProjection); + cacheProjectionColumnProcessors( + postTransformChangeInfo, transformProjection, supportedMetadataColumns); } public boolean hasTableInfo() { @@ -80,32 +84,54 @@ public static TransformProjectionProcessor of( TransformProjection transformProjection, String timezone, List udfDescriptors, - List udfFunctionInstances) { + List udfFunctionInstances, + SupportedMetadataColumn[] supportedMetadataColumns) { return new TransformProjectionProcessor( - tableInfo, transformProjection, timezone, udfDescriptors, udfFunctionInstances); + tableInfo, + transformProjection, + timezone, + udfDescriptors, + udfFunctionInstances, + supportedMetadataColumns); } public static TransformProjectionProcessor of( TransformProjection transformProjection, String timezone, List udfDescriptors, - List udfFunctionInstances) { + List udfFunctionInstances, + SupportedMetadataColumn[] supportedMetadataColumns) { return new TransformProjectionProcessor( - null, transformProjection, timezone, udfDescriptors, udfFunctionInstances); + null, + transformProjection, + timezone, + udfDescriptors, + udfFunctionInstances, + supportedMetadataColumns); } public static TransformProjectionProcessor of( TransformProjection transformProjection, List udfDescriptors, - List udfFunctionInstances) { + List udfFunctionInstances, + SupportedMetadataColumn[] supportedMetadataColumns) { return new TransformProjectionProcessor( - null, transformProjection, null, udfDescriptors, udfFunctionInstances); + null, + transformProjection, + null, + udfDescriptors, + udfFunctionInstances, + supportedMetadataColumns); } - public Schema processSchemaChangeEvent(Schema schema) { + public Schema processSchemaChangeEvent( + Schema schema, SupportedMetadataColumn[] supportedMetadataColumns) { List projectionColumns = TransformParser.generateProjectionColumns( - transformProjection.getProjection(), schema.getColumns(), udfDescriptors); + transformProjection.getProjection(), + schema.getColumns(), + udfDescriptors, + supportedMetadataColumns); transformProjection.setProjectionColumns(projectionColumns); return schema.copy( projectionColumns.stream() @@ -113,7 +139,8 @@ public Schema processSchemaChangeEvent(Schema schema) { .collect(Collectors.toList())); } - public BinaryRecordData processData(BinaryRecordData payload, long epochTime, String opType) { + public BinaryRecordData processData( + BinaryRecordData payload, long epochTime, String opType, Map meta) { List valueList = new ArrayList<>(); List columns = postTransformChangeInfo.getPostTransformedSchema().getColumns(); @@ -124,7 +151,8 @@ public BinaryRecordData processData(BinaryRecordData payload, long epochTime, St ProjectionColumn projectionColumn = projectionColumnProcessor.getProjectionColumn(); valueList.add( DataTypeConverter.convert( - projectionColumnProcessor.evaluate(payload, epochTime, opType), + projectionColumnProcessor.evaluate( + payload, epochTime, opType, meta), projectionColumn.getDataType())); } else { Column column = columns.get(i); @@ -159,7 +187,9 @@ private Object getValueFromBinaryRecordData( } private List cacheProjectionColumnProcessors( - PostTransformChangeInfo tableInfo, TransformProjection transformProjection) { + PostTransformChangeInfo tableInfo, + TransformProjection transformProjection, + SupportedMetadataColumn[] supportedMetadataColumns) { List cachedProjectionColumnProcessors = new ArrayList<>(); if (!hasTableInfo()) { return cachedProjectionColumnProcessors; @@ -184,7 +214,8 @@ private List cacheProjectionColumnProcessors( col, timezone, udfDescriptors, - udfFunctionInstances)) + udfFunctionInstances, + supportedMetadataColumns)) .orElse(null)); } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformRule.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformRule.java index 7bdc5cffc74..31a24525731 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformRule.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformRule.java @@ -17,6 +17,8 @@ package org.apache.flink.cdc.runtime.operators.transform; +import org.apache.flink.cdc.common.source.SupportedMetadataColumn; + import javax.annotation.Nullable; import java.io.Serializable; @@ -34,6 +36,7 @@ public class TransformRule implements Serializable { private final String partitionKey; private final String tableOption; private final @Nullable String postTransformConverter; + private final SupportedMetadataColumn[] supportedMetadataColumns; public TransformRule( String tableInclusions, @@ -42,7 +45,8 @@ public TransformRule( String primaryKey, String partitionKey, String tableOption, - @Nullable String postTransformConverter) { + @Nullable String postTransformConverter, + SupportedMetadataColumn[] supportedMetadataColumns) { this.tableInclusions = tableInclusions; this.projection = projection; this.filter = normalizeFilter(projection, filter); @@ -50,6 +54,7 @@ public TransformRule( this.partitionKey = partitionKey; this.tableOption = tableOption; this.postTransformConverter = postTransformConverter; + this.supportedMetadataColumns = supportedMetadataColumns; } public String getTableInclusions() { @@ -82,4 +87,8 @@ public String getTableOption() { public String getPostTransformConverter() { return postTransformConverter; } + + public SupportedMetadataColumn[] getSupportedMetadataColumns() { + return supportedMetadataColumns; + } } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java index 16416daff82..4dfcd09a52d 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java @@ -19,6 +19,7 @@ import org.apache.flink.api.common.io.ParseException; import org.apache.flink.cdc.common.schema.Column; +import org.apache.flink.cdc.common.source.SupportedMetadataColumn; import org.apache.flink.cdc.common.types.DataType; import org.apache.flink.cdc.common.utils.Preconditions; import org.apache.flink.cdc.runtime.operators.transform.ProjectionColumn; @@ -83,6 +84,7 @@ import java.util.Set; import java.util.stream.Collectors; import java.util.stream.IntStream; +import java.util.stream.Stream; import static org.apache.flink.cdc.common.utils.StringUtils.isNullOrWhitespaceOnly; import static org.apache.flink.cdc.runtime.parser.metadata.MetadataColumns.METADATA_COLUMNS; @@ -106,8 +108,10 @@ private static SqlParser getCalciteParser(String sql) { private static RelNode sqlToRel( List columns, SqlNode sqlNode, - List udfDescriptors) { - List columnsWithMetadata = copyFillMetadataColumn(columns); + List udfDescriptors, + SupportedMetadataColumn[] supportedMetadataColumns) { + List columnsWithMetadata = + copyFillMetadataColumn(columns, supportedMetadataColumns); CalciteSchema rootSchema = CalciteSchema.createRootSchema(true); SchemaPlus schema = rootSchema.plus(); Map operand = new HashMap<>(); @@ -265,7 +269,8 @@ private static void expandWildcard(SqlSelect sqlSelect, List columns) { public static List generateProjectionColumns( String projectionExpression, List columns, - List udfDescriptors) { + List udfDescriptors, + SupportedMetadataColumn[] supportedMetadataColumns) { if (isNullOrWhitespaceOnly(projectionExpression)) { return new ArrayList<>(); } @@ -273,8 +278,9 @@ public static List generateProjectionColumns( if (sqlSelect.getSelectList().isEmpty()) { return new ArrayList<>(); } + expandWildcard(sqlSelect, columns); - RelNode relNode = sqlToRel(columns, sqlSelect, udfDescriptors); + RelNode relNode = sqlToRel(columns, sqlSelect, udfDescriptors, supportedMetadataColumns); Map relDataTypeMap = relNode.getRowType().getFieldList().stream() .collect( @@ -305,7 +311,7 @@ public static List generateProjectionColumns( String columnName = aliasNode.names.get(aliasNode.names.size() - 1); Preconditions.checkArgument( - !isMetadataColumn(columnName), + !isMetadataColumn(columnName, supportedMetadataColumns), "Column name %s is reserved and shading it is not allowed.", columnName); @@ -320,7 +326,11 @@ public static List generateProjectionColumns( identifierExprNode.names.get(identifierExprNode.names.size() - 1); projectionColumn = resolveProjectionColumnFromIdentifier( - relDataTypeMap, originalColumnMap, originalName, columnName); + relDataTypeMap, + originalColumnMap, + originalName, + columnName, + supportedMetadataColumns); } else { projectionColumn = ProjectionColumn.ofCalculated( @@ -339,7 +349,11 @@ else if (sqlNode instanceof SqlIdentifier) { String columnName = sqlIdentifier.names.get(sqlIdentifier.names.size() - 1); projectionColumn = resolveProjectionColumnFromIdentifier( - relDataTypeMap, originalColumnMap, columnName, columnName); + relDataTypeMap, + originalColumnMap, + columnName, + columnName, + supportedMetadataColumns); } else { throw new ParseException("Unrecognized projection: " + sqlNode.toString()); } @@ -366,8 +380,9 @@ public static ProjectionColumn resolveProjectionColumnFromIdentifier( Map relDataTypeMap, Map originalColumnMap, String identifier, - String projectedColumnName) { - if (isMetadataColumn(identifier)) { + String projectedColumnName, + SupportedMetadataColumn[] supportedMetadataColumns) { + if (isMetadataColumn(identifier, supportedMetadataColumns)) { // For a metadata column, we simply generate a projection column with the same return ProjectionColumn.ofCalculated( projectedColumnName, @@ -406,7 +421,8 @@ public static String translateFilterExpressionToJaninoExpression( return JaninoCompiler.translateSqlNodeToJaninoExpression(where, udfDescriptors); } - public static List parseComputedColumnNames(String projection) { + public static List parseComputedColumnNames( + String projection, SupportedMetadataColumn[] supportedMetadataColumns) { List columnNames = new ArrayList<>(); if (isNullOrWhitespaceOnly(projection)) { return columnNames; @@ -436,7 +452,8 @@ public static List parseComputedColumnNames(String projection) { } } else if (sqlNode instanceof SqlIdentifier) { String columnName = sqlNode.toString(); - if (isMetadataColumn(columnName) && !columnNames.contains(columnName)) { + if (isMetadataColumn(columnName, supportedMetadataColumns) + && !columnNames.contains(columnName)) { columnNames.add(columnName); } } else { @@ -499,17 +516,24 @@ private static SqlSelect parseProjectionExpression(String projection) { return parseSelect(statement.toString()); } - private static List copyFillMetadataColumn(List columns) { + private static List copyFillMetadataColumn( + List columns, SupportedMetadataColumn[] supportedMetadataColumns) { // Add metaColumn for SQLValidator.validate List columnsWithMetadata = new ArrayList<>(columns); METADATA_COLUMNS.stream() .map(col -> Column.physicalColumn(col.f0, col.f1)) .forEach(columnsWithMetadata::add); + Stream.of(supportedMetadataColumns) + .map(sCol -> Column.physicalColumn(sCol.getName(), sCol.getType())) + .forEach(columnsWithMetadata::add); return columnsWithMetadata; } - private static boolean isMetadataColumn(String columnName) { - return METADATA_COLUMNS.stream().anyMatch(col -> col.f0.equals(columnName)); + private static boolean isMetadataColumn( + String columnName, SupportedMetadataColumn[] supportedMetadataColumns) { + return METADATA_COLUMNS.stream().anyMatch(col -> col.f0.equals(columnName)) + || Stream.of(supportedMetadataColumns) + .anyMatch(col -> col.getName().equals(columnName)); } public static SqlSelect parseFilterExpression(String filterExpression) { diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/MetadataColumns.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/MetadataColumns.java index f70e012f14c..0177202c29d 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/MetadataColumns.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/MetadataColumns.java @@ -26,6 +26,7 @@ /** Contains all supported metadata columns that could be used in transform expressions. */ public class MetadataColumns { + public static final String DEFAULT_NAMESPACE_NAME = "__namespace_name__"; public static final String DEFAULT_SCHEMA_NAME = "__schema_name__"; public static final String DEFAULT_TABLE_NAME = "__table_name__"; diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java index d266ed20185..96ab2aa626b 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java @@ -25,6 +25,7 @@ import org.apache.flink.cdc.common.event.Event; import org.apache.flink.cdc.common.event.TableId; import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.cdc.common.source.SupportedMetadataColumn; import org.apache.flink.cdc.common.types.DataTypes; import org.apache.flink.cdc.common.types.RowType; import org.apache.flink.cdc.runtime.testutils.operators.EventOperatorTestHarness; @@ -435,7 +436,14 @@ void testDataChangeEventTransformProjectionDataTypeConvert() throws Exception { PostTransformOperator transform = PostTransformOperator.newBuilder() .addTransform( - DATATYPE_TABLEID.identifier(), "*", null, null, null, null, null) + DATATYPE_TABLEID.identifier(), + "*", + null, + null, + null, + null, + null, + new SupportedMetadataColumn[0]) .build(); EventOperatorTestHarness transformFunctionEventEventOperatorTestHarness = diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperatorTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperatorTest.java index 6f7ac12e755..0ab43bc1c8a 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperatorTest.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperatorTest.java @@ -25,6 +25,7 @@ import org.apache.flink.cdc.common.event.TableId; import org.apache.flink.cdc.common.schema.Column; import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.cdc.common.source.SupportedMetadataColumn; import org.apache.flink.cdc.common.types.DataTypes; import org.apache.flink.cdc.common.types.RowType; import org.apache.flink.cdc.runtime.testutils.operators.EventOperatorTestHarness; @@ -192,7 +193,8 @@ void testEventTransform() throws Exception { "col2", "col12", "key1=value1,key2=value2", - null) + null, + new SupportedMetadataColumn[0]) .build(); EventOperatorTestHarness transformFunctionEventEventOperatorTestHarness = @@ -303,7 +305,8 @@ public void testNullabilityColumn() throws Exception { "id", "id", "key1=value1,key2=value2", - null) + null, + new SupportedMetadataColumn[0]) .build(); EventOperatorTestHarness transformFunctionEventEventOperatorTestHarness = @@ -334,7 +337,8 @@ public void testReduceTransformColumn() throws Exception { "id", "id", "key1=value1,key2=value2", - null) + null, + new SupportedMetadataColumn[0]) .build(); EventOperatorTestHarness transformFunctionEventEventOperatorTestHarness = @@ -442,7 +446,8 @@ public void testWildcardTransformColumn() throws Exception { "id", "id", "key1=value1,key2=value2", - null) + null, + new SupportedMetadataColumn[0]) .build(); EventOperatorTestHarness transformFunctionEventEventOperatorTestHarness = @@ -559,7 +564,8 @@ void testMultiTransformWithDiffRefColumns() throws Exception { "id", null, null, - null) + null, + new SupportedMetadataColumn[0]) .addTransform( CUSTOMERS_TABLEID.identifier(), "id, name as roleName", @@ -567,7 +573,8 @@ void testMultiTransformWithDiffRefColumns() throws Exception { "id", null, null, - null) + null, + new SupportedMetadataColumn[0]) .build(); EventOperatorTestHarness transformFunctionEventEventOperatorTestHarness = @@ -598,7 +605,8 @@ void testMultiTransformWithAsterisk() throws Exception { "id", null, null, - null) + null, + new SupportedMetadataColumn[0]) .addTransform( CUSTOMERS_TABLEID.identifier(), "id, age, name, sex, 'Juvenile' as roleName", @@ -606,7 +614,8 @@ void testMultiTransformWithAsterisk() throws Exception { "id", null, null, - null) + null, + new SupportedMetadataColumn[0]) .build(); EventOperatorTestHarness transformFunctionEventEventOperatorTestHarness = @@ -636,7 +645,8 @@ void testMultiTransformMissingProjection() throws Exception { "id", null, null, - null) + null, + new SupportedMetadataColumn[0]) .addTransform( CUSTOMERS_TABLEID.identifier(), "id, age, UPPER(name) as name, sex", @@ -644,7 +654,8 @@ void testMultiTransformMissingProjection() throws Exception { "id", null, null, - null) + null, + new SupportedMetadataColumn[0]) .build(); EventOperatorTestHarness transformFunctionEventEventOperatorTestHarness = diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java index 0fd97c1b290..bc5d1200524 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java @@ -20,6 +20,7 @@ import org.apache.flink.api.common.io.ParseException; import org.apache.flink.cdc.common.schema.Column; import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.cdc.common.source.SupportedMetadataColumn; import org.apache.flink.cdc.common.types.DataTypes; import org.apache.flink.cdc.runtime.operators.transform.ProjectionColumn; import org.apache.flink.cdc.runtime.operators.transform.UserDefinedFunctionDescriptor; @@ -183,7 +184,8 @@ public void testCalciteRelNode() { @Test public void testParseComputedColumnNames() { List computedColumnNames = - TransformParser.parseComputedColumnNames("CONCAT(id, order_id) as uniq_id, *"); + TransformParser.parseComputedColumnNames( + "CONCAT(id, order_id) as uniq_id, *", new SupportedMetadataColumn[0]); Assertions.assertThat(computedColumnNames.toArray()).isEqualTo(new String[] {"uniq_id"}); } @@ -324,7 +326,8 @@ public void testGenerateProjectionColumns() { TransformParser.generateProjectionColumns( "id, upper(name) as name, age + 1 as newage, weight / (height * height) as bmi", testColumns, - Collections.emptyList()); + Collections.emptyList(), + new SupportedMetadataColumn[0]); List expected = Arrays.asList( @@ -338,7 +341,8 @@ public void testGenerateProjectionColumns() { TransformParser.generateProjectionColumns( "*, __namespace_name__, __schema_name__, __table_name__", testColumns, - Collections.emptyList()); + Collections.emptyList(), + new SupportedMetadataColumn[0]); List metadataExpected = Arrays.asList( @@ -359,7 +363,10 @@ public void testGenerateProjectionColumns() { Assertions.assertThatThrownBy( () -> TransformParser.generateProjectionColumns( - "id, 1 + 1", testColumns, Collections.emptyList())) + "id, 1 + 1", + testColumns, + Collections.emptyList(), + new SupportedMetadataColumn[0])) .isExactlyInstanceOf(IllegalArgumentException.class) .hasMessage( "Unrecognized projection expression: 1 + 1. Should be AS "); From ce46048e1935d67065561cbf38e2318c3b17e53c Mon Sep 17 00:00:00 2001 From: Hang Ruan Date: Tue, 10 Dec 2024 15:05:03 +0800 Subject: [PATCH 02/13] fix review --- .../source/SupportedMetadataColumn.java | 10 ++++-- .../transform/TransformFilterProcessor.java | 36 +++++++++---------- 2 files changed, 25 insertions(+), 21 deletions(-) diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/source/SupportedMetadataColumn.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/source/SupportedMetadataColumn.java index d4a72fdbe18..6bd2f51d658 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/source/SupportedMetadataColumn.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/source/SupportedMetadataColumn.java @@ -18,12 +18,13 @@ package org.apache.flink.cdc.common.source; import org.apache.flink.cdc.common.annotation.Experimental; +import org.apache.flink.cdc.common.event.DataChangeEvent; import org.apache.flink.cdc.common.types.DataType; import java.io.Serializable; import java.util.Map; -/** A metadata column that the source supports. */ +/** A metadata column that the source supports to read from the meta field. */ @Experimental public interface SupportedMetadataColumn extends Serializable { /** Column name. */ @@ -35,6 +36,11 @@ public interface SupportedMetadataColumn extends Serializable { /** The returned java class of the reader. */ Class getJavaClass(); - /** Read the metadata from the dataChangeEvent. */ + /** + * Read the metadata from the {@link DataChangeEvent#meta()}. + * + * @param metadata the metadata returned from {@link DataChangeEvent#meta()} + * @return the value of this metadata found by metadata name + */ Object read(Map metadata); } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilterProcessor.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilterProcessor.java index 7b6b4fad728..220f6110920 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilterProcessor.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilterProcessor.java @@ -32,10 +32,10 @@ import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; +import java.util.HashMap; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; -import java.util.stream.Stream; import static org.apache.flink.cdc.runtime.parser.metadata.MetadataColumns.METADATA_COLUMNS; @@ -48,7 +48,7 @@ public class TransformFilterProcessor { private TransformExpressionKey transformExpressionKey; private final transient List udfFunctionInstances; private transient ExpressionEvaluator expressionEvaluator; - private final SupportedMetadataColumn[] supportedMetadataColumns; + private final Map supportedMetadataColumns; public TransformFilterProcessor( PostTransformChangeInfo tableInfo, @@ -56,7 +56,7 @@ public TransformFilterProcessor( String timezone, List udfDescriptors, List udfFunctionInstances, - SupportedMetadataColumn[] supportedMetadataColumns) { + Map supportedMetadataColumns) { this.tableInfo = tableInfo; this.transformFilter = transformFilter; this.timezone = timezone; @@ -75,13 +75,18 @@ public static TransformFilterProcessor of( List udfDescriptors, List udfFunctionInstances, SupportedMetadataColumn[] supportedMetadataColumns) { + Map supportedMetadataColumnsMap = new HashMap<>(); + for (SupportedMetadataColumn supportedMetadataColumn : supportedMetadataColumns) { + supportedMetadataColumnsMap.put( + supportedMetadataColumn.getName(), supportedMetadataColumn); + } return new TransformFilterProcessor( tableInfo, transformFilter, timezone, udfDescriptors, udfFunctionInstances, - supportedMetadataColumns); + supportedMetadataColumnsMap); } public boolean process( @@ -124,13 +129,13 @@ private Tuple2, List>> generateArguments() { } }); - Stream.of(supportedMetadataColumns) + supportedMetadataColumns + .keySet() .forEach( - col -> { - if (scriptExpression.contains(col.getName()) - && !argNames.contains(col.getName())) { - argNames.add(col.getName()); - argTypes.add(col.getJavaClass()); + colName -> { + if (scriptExpression.contains(colName) && !argNames.contains(colName)) { + argNames.add(colName); + argTypes.add(supportedMetadataColumns.get(colName).getJavaClass()); } }); return Tuple2.of(argNames, argTypes); @@ -160,15 +165,8 @@ private Object[] generateParams( continue; } - boolean foundInMeta = false; - for (SupportedMetadataColumn supportedMetadataColumn : supportedMetadataColumns) { - if (supportedMetadataColumn.getName().equals(columnName)) { - params.add(supportedMetadataColumn.read(meta)); - foundInMeta = true; - break; - } - } - if (foundInMeta) { + if (supportedMetadataColumns.containsKey(columnName)) { + params.add(supportedMetadataColumns.get(columnName).read(meta)); continue; } From 3bb6e962af7d7fc2fdae4094940c7843c38274cf Mon Sep 17 00:00:00 2001 From: Kunni Date: Tue, 17 Dec 2024 16:20:15 +0800 Subject: [PATCH 03/13] [FLINK-36784][pipeline-connector][mysql] Add metadata.list option to in MySqlDataSource to pass metadata info like op_ts to downstream. --- .../mysql/factory/MySqlDataSourceFactory.java | 24 ++- .../mysql/source/MySqlDataSource.java | 17 +- .../mysql/source/MySqlDataSourceOptions.java | 9 + .../mysql/source/MySqlEventDeserializer.java | 27 ++- .../mysql/source/MySqlPipelineITCase.java | 159 ++++++++++++++++++ 5 files changed, 233 insertions(+), 3 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java index 118e8cdb19f..2e193370d89 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java @@ -26,12 +26,14 @@ import org.apache.flink.cdc.common.factories.FactoryHelper; import org.apache.flink.cdc.common.schema.Selectors; import org.apache.flink.cdc.common.source.DataSource; +import org.apache.flink.cdc.common.utils.StringUtils; import org.apache.flink.cdc.connectors.mysql.source.MySqlDataSource; import org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions; import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory; import org.apache.flink.cdc.connectors.mysql.source.config.ServerIdRange; import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset; import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffsetBuilder; +import org.apache.flink.cdc.connectors.mysql.table.MySqlReadableMetadata; import org.apache.flink.cdc.connectors.mysql.table.StartupOptions; import org.apache.flink.cdc.connectors.mysql.utils.MySqlSchemaUtils; import org.apache.flink.cdc.connectors.mysql.utils.OptionUtils; @@ -46,6 +48,8 @@ import java.time.Duration; import java.time.ZoneId; +import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -62,6 +66,7 @@ import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.CONNECT_TIMEOUT; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.HEARTBEAT_INTERVAL; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.HOSTNAME; +import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.METADATA_LIST; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PASSWORD; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PORT; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED; @@ -234,8 +239,24 @@ public DataSource createDataSource(Context context) { LOG.info("Add chunkKeyColumn {}.", chunkKeyColumnMap); configFactory.chunkKeyColumn(chunkKeyColumnMap); } + String metadataList = config.get(METADATA_LIST); + List readableMetadataList = listReadableMetadata(metadataList); + return new MySqlDataSource(configFactory, readableMetadataList); + } - return new MySqlDataSource(configFactory); + private List listReadableMetadata(String metadataList) { + if (StringUtils.isNullOrWhitespaceOnly(metadataList)) { + return new ArrayList<>(); + } + List readableMetadataList = + Arrays.stream(metadataList.split(",")) + .map(String::trim) + .collect(Collectors.toList()); + return Arrays.stream(MySqlReadableMetadata.values()) + .filter( + (mySqlReadableMetadata -> + readableMetadataList.contains(mySqlReadableMetadata.getKey()))) + .collect(Collectors.toList()); } @Override @@ -276,6 +297,7 @@ public Set> optionalOptions() { options.add(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND); options.add(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND); options.add(SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED); + options.add(METADATA_LIST); return options; } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSource.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSource.java index 7dd5059d178..17cb5cd5a3a 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSource.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSource.java @@ -28,8 +28,12 @@ import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig; import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory; import org.apache.flink.cdc.connectors.mysql.source.reader.MySqlPipelineRecordEmitter; +import org.apache.flink.cdc.connectors.mysql.table.MySqlReadableMetadata; import org.apache.flink.cdc.debezium.table.DebeziumChangelogMode; +import java.util.ArrayList; +import java.util.List; + /** A {@link DataSource} for mysql cdc connector. */ @Internal public class MySqlDataSource implements DataSource { @@ -37,16 +41,27 @@ public class MySqlDataSource implements DataSource { private final MySqlSourceConfigFactory configFactory; private final MySqlSourceConfig sourceConfig; + private List readableMetadataList; + public MySqlDataSource(MySqlSourceConfigFactory configFactory) { + this(configFactory, new ArrayList<>()); + } + + public MySqlDataSource( + MySqlSourceConfigFactory configFactory, + List readableMetadataList) { this.configFactory = configFactory; this.sourceConfig = configFactory.createConfig(0); + this.readableMetadataList = readableMetadataList; } @Override public EventSourceProvider getEventSourceProvider() { MySqlEventDeserializer deserializer = new MySqlEventDeserializer( - DebeziumChangelogMode.ALL, sourceConfig.isIncludeSchemaChanges()); + DebeziumChangelogMode.ALL, + sourceConfig.isIncludeSchemaChanges(), + readableMetadataList); MySqlSource source = new MySqlSource<>( diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java index 580d370b5aa..bfae22e3abd 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java @@ -272,4 +272,13 @@ public class MySqlDataSourceOptions { + "The difference between scan.newly-added-table.enabled and scan.binlog.newly-added-table.enabled options is: \n" + "scan.newly-added-table.enabled: do re-snapshot & binlog-reading for newly added table when restored; \n" + "scan.binlog.newly-added-table.enabled: only do binlog-reading for newly added table during binlog reading phase."); + + @Experimental + public static final ConfigOption METADATA_LIST = + ConfigOptions.key("metadata.list") + .stringType() + .noDefaultValue() + .withDescription( + "List of readable metadata from SourceRecord to be passed to downstream, split by `,`. " + + "Refer to MySqlReadableMetadata, available readable metadata are: table_name,database_name,op_ts,row_kind."); } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlEventDeserializer.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlEventDeserializer.java index 548e603fa5a..5fc30875b26 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlEventDeserializer.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlEventDeserializer.java @@ -18,10 +18,12 @@ package org.apache.flink.cdc.connectors.mysql.source; import org.apache.flink.cdc.common.annotation.Internal; +import org.apache.flink.cdc.common.data.TimestampData; import org.apache.flink.cdc.common.data.binary.BinaryStringData; import org.apache.flink.cdc.common.event.SchemaChangeEvent; import org.apache.flink.cdc.common.event.TableId; import org.apache.flink.cdc.connectors.mysql.source.parser.CustomMySqlAntlrDdlParser; +import org.apache.flink.cdc.connectors.mysql.table.MySqlReadableMetadata; import org.apache.flink.cdc.debezium.event.DebeziumEventDeserializationSchema; import org.apache.flink.cdc.debezium.table.DebeziumChangelogMode; @@ -39,6 +41,7 @@ import java.io.IOException; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -63,10 +66,20 @@ public class MySqlEventDeserializer extends DebeziumEventDeserializationSchema { private transient Tables tables; private transient CustomMySqlAntlrDdlParser customParser; + private List readableMetadataList; + public MySqlEventDeserializer( DebeziumChangelogMode changelogMode, boolean includeSchemaChanges) { + this(changelogMode, includeSchemaChanges, new ArrayList<>()); + } + + public MySqlEventDeserializer( + DebeziumChangelogMode changelogMode, + boolean includeSchemaChanges, + List readableMetadataList) { super(new MySqlSchemaDataTypeInference(), changelogMode); this.includeSchemaChanges = includeSchemaChanges; + this.readableMetadataList = readableMetadataList; } @Override @@ -118,7 +131,19 @@ protected TableId getTableId(SourceRecord record) { @Override protected Map getMetadata(SourceRecord record) { - return Collections.emptyMap(); + Map metadataMap = new HashMap<>(); + readableMetadataList.forEach( + (mySqlReadableMetadata -> { + Object metadata = mySqlReadableMetadata.getConverter().read(record); + if (mySqlReadableMetadata.equals(MySqlReadableMetadata.OP_TS)) { + metadataMap.put( + mySqlReadableMetadata.getKey(), + String.valueOf(((TimestampData) metadata).getMillisecond())); + } else { + metadataMap.put(mySqlReadableMetadata.getKey(), String.valueOf(metadata)); + } + })); + return metadataMap; } @Override diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java index f2b4ccb82f7..2f0caccec2e 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java @@ -19,6 +19,7 @@ import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.cdc.common.configuration.Configuration; import org.apache.flink.cdc.common.data.binary.BinaryStringData; import org.apache.flink.cdc.common.event.AddColumnEvent; import org.apache.flink.cdc.common.event.AlterColumnTypeEvent; @@ -28,8 +29,11 @@ import org.apache.flink.cdc.common.event.DropTableEvent; import org.apache.flink.cdc.common.event.Event; import org.apache.flink.cdc.common.event.RenameColumnEvent; +import org.apache.flink.cdc.common.event.SchemaChangeEvent; import org.apache.flink.cdc.common.event.TableId; import org.apache.flink.cdc.common.event.TruncateTableEvent; +import org.apache.flink.cdc.common.factories.Factory; +import org.apache.flink.cdc.common.factories.FactoryHelper; import org.apache.flink.cdc.common.schema.Column; import org.apache.flink.cdc.common.schema.Schema; import org.apache.flink.cdc.common.source.FlinkSourceProvider; @@ -64,8 +68,10 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -258,6 +264,159 @@ private static List fetchResultsExcept(Iterator iter, int size, T side return result; } + @Test + public void testInitialStartupModeWithOpTs() throws Exception { + inventoryDatabase.createAndInitialize(); + Configuration sourceConfiguration = new Configuration(); + sourceConfiguration.set(MySqlDataSourceOptions.HOSTNAME, MYSQL8_CONTAINER.getHost()); + sourceConfiguration.set(MySqlDataSourceOptions.PORT, MYSQL8_CONTAINER.getDatabasePort()); + sourceConfiguration.set(MySqlDataSourceOptions.USERNAME, TEST_USER); + sourceConfiguration.set(MySqlDataSourceOptions.PASSWORD, TEST_PASSWORD); + sourceConfiguration.set( + MySqlDataSourceOptions.TABLES, inventoryDatabase.getDatabaseName() + ".products"); + sourceConfiguration.set( + MySqlDataSourceOptions.SERVER_ID, getServerId(env.getParallelism())); + sourceConfiguration.set(MySqlDataSourceOptions.SERVER_TIME_ZONE, "UTC"); + sourceConfiguration.set(MySqlDataSourceOptions.METADATA_LIST, "op_ts"); + Factory.Context context = + new FactoryHelper.DefaultContext( + sourceConfiguration, new Configuration(), this.getClass().getClassLoader()); + FlinkSourceProvider sourceProvider = + (FlinkSourceProvider) + new MySqlDataSourceFactory() + .createDataSource(context) + .getEventSourceProvider(); + ; + CloseableIterator events = + env.fromSource( + sourceProvider.getSource(), + WatermarkStrategy.noWatermarks(), + MySqlDataSourceFactory.IDENTIFIER, + new EventTypeInfo()) + .executeAndCollect(); + Thread.sleep(10_000); + TableId tableId = TableId.tableId(inventoryDatabase.getDatabaseName(), "products"); + CreateTableEvent createTableEvent = getProductsCreateTableEvent(tableId); + // generate snapshot data + Map meta = new HashMap<>(); + meta.put("op_ts", "0"); + List expectedSnapshot = + getSnapshotExpected(tableId).stream() + .map( + event -> { + DataChangeEvent dataChangeEvent = (DataChangeEvent) event; + return DataChangeEvent.insertEvent( + dataChangeEvent.tableId(), + dataChangeEvent.after(), + meta); + }) + .collect(Collectors.toList()); + String startTime = String.valueOf(System.currentTimeMillis()); + Thread.sleep(1000); + List expectedBinlog = new ArrayList<>(); + try (Connection connection = inventoryDatabase.getJdbcConnection(); + Statement statement = connection.createStatement()) { + expectedBinlog.addAll(executeAlterAndProvideExpected(tableId, statement)); + RowType rowType = + RowType.of( + new DataType[] { + DataTypes.INT().notNull(), + DataTypes.VARCHAR(255).notNull(), + DataTypes.FLOAT(), + DataTypes.VARCHAR(45), + DataTypes.VARCHAR(55) + }, + new String[] {"id", "name", "weight", "col1", "col2"}); + BinaryRecordDataGenerator generator = new BinaryRecordDataGenerator(rowType); + // insert more data + statement.execute( + String.format( + "INSERT INTO `%s`.`products` VALUES (default,'scooter',5.5,'c-10','c-20');", + inventoryDatabase.getDatabaseName())); // 110 + expectedBinlog.add( + DataChangeEvent.insertEvent( + tableId, + generator.generate( + new Object[] { + 110, + BinaryStringData.fromString("scooter"), + 5.5f, + BinaryStringData.fromString("c-10"), + BinaryStringData.fromString("c-20") + }))); + statement.execute( + String.format( + "INSERT INTO `%s`.`products` VALUES (default,'football',6.6,'c-11','c-21');", + inventoryDatabase.getDatabaseName())); // 111 + expectedBinlog.add( + DataChangeEvent.insertEvent( + tableId, + generator.generate( + new Object[] { + 111, + BinaryStringData.fromString("football"), + 6.6f, + BinaryStringData.fromString("c-11"), + BinaryStringData.fromString("c-21") + }))); + statement.execute( + String.format( + "UPDATE `%s`.`products` SET `col1`='c-12', `col2`='c-22' WHERE id=110;", + inventoryDatabase.getDatabaseName())); + expectedBinlog.add( + DataChangeEvent.updateEvent( + tableId, + generator.generate( + new Object[] { + 110, + BinaryStringData.fromString("scooter"), + 5.5f, + BinaryStringData.fromString("c-10"), + BinaryStringData.fromString("c-20") + }), + generator.generate( + new Object[] { + 110, + BinaryStringData.fromString("scooter"), + 5.5f, + BinaryStringData.fromString("c-12"), + BinaryStringData.fromString("c-22") + }))); + statement.execute( + String.format( + "DELETE FROM `%s`.`products` WHERE `id` = 111;", + inventoryDatabase.getDatabaseName())); + expectedBinlog.add( + DataChangeEvent.deleteEvent( + tableId, + generator.generate( + new Object[] { + 111, + BinaryStringData.fromString("football"), + 6.6f, + BinaryStringData.fromString("c-11"), + BinaryStringData.fromString("c-21") + }))); + } + List actual = + fetchResults(events, 1 + expectedSnapshot.size() + expectedBinlog.size()); + assertThat(actual.get(0)).isEqualTo(createTableEvent); + assertThat(actual.subList(1, 10)) + .containsExactlyInAnyOrder(expectedSnapshot.toArray(new Event[0])); + for (int i = 0; i < expectedBinlog.size(); i++) { + if (expectedBinlog.get(i) instanceof SchemaChangeEvent) { + assertThat(expectedBinlog.get(i)).isEqualTo(actual.get(10 + i)); + } else { + DataChangeEvent expectedEvent = (DataChangeEvent) expectedBinlog.get(i); + DataChangeEvent actualEvent = (DataChangeEvent) actual.get(10 + i); + assertThat(actualEvent.op()).isEqualTo(expectedEvent.op()); + assertThat(actualEvent.before()).isEqualTo(expectedEvent.before()); + assertThat(actualEvent.after()).isEqualTo(expectedEvent.after()); + assertThat(actualEvent.meta().get("op_ts")).isGreaterThanOrEqualTo(startTime); + } + } + } + @Test public void testParseAlterStatement() throws Exception { env.setParallelism(1); From 853c97d1e043fa49d5bdbc84670ffe9d48b4eb2d Mon Sep 17 00:00:00 2001 From: Hang Ruan Date: Tue, 17 Dec 2024 16:26:34 +0800 Subject: [PATCH 04/13] add test --- .../pipeline/tests/TransformE2eITCase.java | 68 ++++++++++++++----- 1 file changed, 52 insertions(+), 16 deletions(-) diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java index 57cbf238f3d..e53896b1a9d 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java @@ -49,6 +49,7 @@ import java.util.Arrays; import java.util.List; import java.util.concurrent.TimeoutException; +import java.util.regex.Pattern; import java.util.stream.Stream; /** E2e tests for the {@link PreTransformOperator} and {@link PostTransformOperator}. */ @@ -442,13 +443,14 @@ public void testWildcardWithMetadataColumnTransform() throws Exception { + " tables: %s.\\.*\n" + " server-id: 5400-5404\n" + " server-time-zone: UTC\n" + + " metadata-column.include-list: op_ts\n" + "sink:\n" + " type: values\n" + "transform:\n" + " - source-table: %s.TABLEALPHA\n" - + " projection: \\*, __namespace_name__ || '.' || __schema_name__ || '.' || __table_name__ AS identifier_name, __data_event_type__ AS type\n" + + " projection: \\*, __namespace_name__ || '.' || __schema_name__ || '.' || __table_name__ AS identifier_name, __data_event_type__ AS type, op_ts AS opts\n" + " - source-table: %s.TABLEBETA\n" - + " projection: \\*, __namespace_name__ || '.' || __schema_name__ || '.' || __table_name__ AS identifier_name, __data_event_type__ AS type\n" + + " projection: \\*, __namespace_name__ || '.' || __schema_name__ || '.' || __table_name__ AS identifier_name, __data_event_type__ AS type, op_ts AS opts\n" + "pipeline:\n" + " parallelism: %d", INTER_CONTAINER_MYSQL_ALIAS, @@ -467,25 +469,25 @@ public void testWildcardWithMetadataColumnTransform() throws Exception { waitUntilSpecificEvent( String.format( - "CreateTableEvent{tableId=%s.TABLEALPHA, schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17),`PRICEALPHA` INT,`AGEALPHA` INT,`NAMEALPHA` VARCHAR(128),`identifier_name` STRING,`type` STRING NOT NULL}, primaryKeys=ID, options=()}", + "CreateTableEvent{tableId=%s.TABLEALPHA, schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17),`PRICEALPHA` INT,`AGEALPHA` INT,`NAMEALPHA` VARCHAR(128),`identifier_name` STRING,`type` STRING,`opts` BIGINT}, primaryKeys=ID, options=()}", transformTestDatabase.getDatabaseName()), 60000L); waitUntilSpecificEvent( String.format( - "CreateTableEvent{tableId=%s.TABLEBETA, schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17),`CODENAMESBETA` VARCHAR(17),`AGEBETA` INT,`NAMEBETA` VARCHAR(128),`identifier_name` STRING,`type` STRING NOT NULL}, primaryKeys=ID, options=()}", + "CreateTableEvent{tableId=%s.TABLEBETA, schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17),`CODENAMESBETA` VARCHAR(17),`AGEBETA` INT,`NAMEBETA` VARCHAR(128),`identifier_name` STRING,`type` STRING,`opts` BIGINT}, primaryKeys=ID, options=()}", transformTestDatabase.getDatabaseName()), 60000L); validateEvents( - "DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[1008, 8, 199, 17, Alice, null.%s.TABLEALPHA, +I], op=INSERT, meta=()}", - "DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[1010, 10, 99, 19, Carol, null.%s.TABLEALPHA, +I], op=INSERT, meta=()}", - "DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[1009, 8.1, 0, 18, Bob, null.%s.TABLEALPHA, +I], op=INSERT, meta=()}", - "DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[1011, 11, 59, 20, Dave, null.%s.TABLEALPHA, +I], op=INSERT, meta=()}", - "DataChangeEvent{tableId=%s.TABLEBETA, before=[], after=[2012, 12, Monterey, 22, Fred, null.%s.TABLEBETA, +I], op=INSERT, meta=()}", - "DataChangeEvent{tableId=%s.TABLEBETA, before=[], after=[2011, 11, Big Sur, 21, Eva, null.%s.TABLEBETA, +I], op=INSERT, meta=()}", - "DataChangeEvent{tableId=%s.TABLEBETA, before=[], after=[2014, 14, Sonoma, 24, Henry, null.%s.TABLEBETA, +I], op=INSERT, meta=()}", - "DataChangeEvent{tableId=%s.TABLEBETA, before=[], after=[2013, 13, Ventura, 23, Gus, null.%s.TABLEBETA, +I], op=INSERT, meta=()}"); + "DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[1008, 8, 199, 17, Alice, null.%s.TABLEALPHA, +I, 0], op=INSERT, meta=({op_ts=0})}", + "DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[1010, 10, 99, 19, Carol, null.%s.TABLEALPHA, +I, 0], op=INSERT, meta=({op_ts=0})}", + "DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[1009, 8.1, 0, 18, Bob, null.%s.TABLEALPHA, +I, 0], op=INSERT, meta=({op_ts=0})}", + "DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[1011, 11, 59, 20, Dave, null.%s.TABLEALPHA, +I, 0], op=INSERT, meta=({op_ts=0})}", + "DataChangeEvent{tableId=%s.TABLEBETA, before=[], after=[2012, 12, Monterey, 22, Fred, null.%s.TABLEBETA, +I, 0], op=INSERT, meta=({op_ts=0})}", + "DataChangeEvent{tableId=%s.TABLEBETA, before=[], after=[2011, 11, Big Sur, 21, Eva, null.%s.TABLEBETA, +I, 0], op=INSERT, meta=({op_ts=0})}", + "DataChangeEvent{tableId=%s.TABLEBETA, before=[], after=[2014, 14, Sonoma, 24, Henry, null.%s.TABLEBETA, +I, 0], op=INSERT, meta=({op_ts=0})}", + "DataChangeEvent{tableId=%s.TABLEBETA, before=[], after=[2013, 13, Ventura, 23, Gus, null.%s.TABLEBETA, +I, 0], op=INSERT, meta=({op_ts=0})}"); // generate binlogs String mysqlJdbcUrl = @@ -496,10 +498,10 @@ public void testWildcardWithMetadataColumnTransform() throws Exception { transformTestDatabase.getDatabaseName()); insertBinlogEvents(mysqlJdbcUrl); - validateEvents( - "DataChangeEvent{tableId=%s.TABLEALPHA, before=[1009, 8.1, 0, 18, Bob, null.%s.TABLEALPHA, -U], after=[1009, 100, 0, 18, Bob, null.%s.TABLEALPHA, +U], op=UPDATE, meta=()}", - "DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[3007, 7, 79, 16, IINA, null.%s.TABLEALPHA, +I], op=INSERT, meta=()}", - "DataChangeEvent{tableId=%s.TABLEBETA, before=[2011, 11, Big Sur, 21, Eva, null.%s.TABLEBETA, -D], after=[], op=DELETE, meta=()}"); + validateEventsWithPattern( + "DataChangeEvent\\{tableId=%s.TABLEALPHA, before=\\[1009, 8.1, 0, 18, Bob, null.%s.TABLEALPHA, -U, \\d+\\], after=\\[1009, 100, 0, 18, Bob, null.%s.TABLEALPHA, \\+U, \\d+\\], op=UPDATE, meta=\\(\\{op_ts=\\d+\\}\\)\\}", + "DataChangeEvent\\{tableId=%s.TABLEALPHA, before=\\[\\], after=\\[3007, 7, 79, 16, IINA, null.%s.TABLEALPHA, \\+I, \\d+\\], op=INSERT, meta=\\(\\{op_ts=\\d+\\}\\)\\}", + "DataChangeEvent\\{tableId=%s.TABLEBETA, before=\\[2011, 11, Big Sur, 21, Eva, null.%s.TABLEBETA, -D, \\d+\\], after=\\[\\], op=DELETE, meta=\\(\\{op_ts=\\d+\\}\\)\\}"); } private static void insertBinlogEvents(String mysqlJdbcUrl) throws SQLException { @@ -1121,12 +1123,46 @@ private void validateEvents(String... expectedEvents) throws Exception { } } + private void validateEventsWithPattern(String... patterns) throws Exception { + for (String pattern : patterns) { + waitUntilSpecificEventWithPattern( + String.format( + pattern, + transformTestDatabase.getDatabaseName(), + transformTestDatabase.getDatabaseName(), + transformTestDatabase.getDatabaseName()), + 20000L); + } + } + private void validateResult(List expectedEvents) throws Exception { for (String event : expectedEvents) { waitUntilSpecificEvent(event, 6000L); } } + private void waitUntilSpecificEventWithPattern(String patternStr, long timeout) + throws Exception { + boolean result = false; + long endTimeout = System.currentTimeMillis() + timeout; + while (System.currentTimeMillis() < endTimeout) { + String stdout = taskManagerConsumer.toUtf8String(); + Pattern pattern = Pattern.compile(patternStr); + if (pattern.matcher(stdout).find()) { + result = true; + break; + } + Thread.sleep(1000); + } + if (!result) { + throw new TimeoutException( + "failed to get events with pattern: " + + patternStr + + " from stdout: " + + taskManagerConsumer.toUtf8String()); + } + } + private void waitUntilSpecificEvent(String event, long timeout) throws Exception { boolean result = false; long endTimeout = System.currentTimeMillis() + timeout; From 72ed34285dba82162f052a8bafd76f956054e911 Mon Sep 17 00:00:00 2001 From: Hang Ruan Date: Tue, 17 Dec 2024 17:31:10 +0800 Subject: [PATCH 05/13] fix test --- .../flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java | 1 - 1 file changed, 1 deletion(-) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java index 2f0caccec2e..276a2f12da2 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java @@ -286,7 +286,6 @@ public void testInitialStartupModeWithOpTs() throws Exception { new MySqlDataSourceFactory() .createDataSource(context) .getEventSourceProvider(); - ; CloseableIterator events = env.fromSource( sourceProvider.getSource(), From 6565495637c3530dd5ffe4e85980535098700957 Mon Sep 17 00:00:00 2001 From: Hang Ruan Date: Tue, 17 Dec 2024 17:50:34 +0800 Subject: [PATCH 06/13] fix test --- .../cdc/connectors/mysql/source/MySqlEventDeserializer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlEventDeserializer.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlEventDeserializer.java index 5fc30875b26..710f12ffa97 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlEventDeserializer.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlEventDeserializer.java @@ -18,7 +18,6 @@ package org.apache.flink.cdc.connectors.mysql.source; import org.apache.flink.cdc.common.annotation.Internal; -import org.apache.flink.cdc.common.data.TimestampData; import org.apache.flink.cdc.common.data.binary.BinaryStringData; import org.apache.flink.cdc.common.event.SchemaChangeEvent; import org.apache.flink.cdc.common.event.TableId; @@ -35,6 +34,7 @@ import io.debezium.data.geometry.Point; import io.debezium.relational.Tables; import io.debezium.relational.history.HistoryRecord; +import org.apache.flink.table.data.TimestampData; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; From 6c4cf00262d2b598c0cec52ac315af6c59d652fe Mon Sep 17 00:00:00 2001 From: Hang Ruan Date: Tue, 17 Dec 2024 17:56:44 +0800 Subject: [PATCH 07/13] fix test --- .../cdc/connectors/mysql/source/MySqlEventDeserializer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlEventDeserializer.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlEventDeserializer.java index 710f12ffa97..a50ac872996 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlEventDeserializer.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlEventDeserializer.java @@ -25,6 +25,7 @@ import org.apache.flink.cdc.connectors.mysql.table.MySqlReadableMetadata; import org.apache.flink.cdc.debezium.event.DebeziumEventDeserializationSchema; import org.apache.flink.cdc.debezium.table.DebeziumChangelogMode; +import org.apache.flink.table.data.TimestampData; import com.esri.core.geometry.ogc.OGCGeometry; import com.fasterxml.jackson.databind.JsonNode; @@ -34,7 +35,6 @@ import io.debezium.data.geometry.Point; import io.debezium.relational.Tables; import io.debezium.relational.history.HistoryRecord; -import org.apache.flink.table.data.TimestampData; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; From b8351df631763061fe7e429e3082026655c9f1f9 Mon Sep 17 00:00:00 2001 From: Hang Ruan Date: Thu, 19 Dec 2024 14:04:25 +0800 Subject: [PATCH 08/13] fix test --- .../flink/cdc/composer/flink/FlinkPipelineTransformITCase.java | 1 + .../runtime/operators/transform/PostTransformOperatorTest.java | 3 ++- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java index f0d1f567960..6a775e74ae5 100644 --- a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java +++ b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java @@ -1889,6 +1889,7 @@ void testTransformWithLargeLiterals() throws Exception { null, null, null, + null, null)), Collections.emptyList(), pipelineConfig); diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java index 96ab2aa626b..2c9baa84ff8 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java @@ -591,7 +591,8 @@ void testSoftDeleteTransform() throws Exception { "", "", "", - "SOFT_DELETE") + "SOFT_DELETE", + new SupportedMetadataColumn[0]) .build(); EventOperatorTestHarness transformFunctionEventEventOperatorTestHarness = From 47fc105dadc187d1c529087d44c84f3769f1e812 Mon Sep 17 00:00:00 2001 From: Hang Ruan Date: Thu, 19 Dec 2024 14:09:40 +0800 Subject: [PATCH 09/13] fix test --- .../apache/flink/cdc/pipeline/tests/TransformE2eITCase.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java index e53896b1a9d..42e132ed4c1 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java @@ -69,7 +69,7 @@ public class TransformE2eITCase extends PipelineTestEnvironment { public static final MySqlContainer MYSQL = (MySqlContainer) new MySqlContainer( - MySqlVersion.V8_0) // v8 support both ARM and AMD architectures + MySqlVersion.V8_0_18) // v8 support both ARM and AMD architectures .withConfigurationOverride("docker/mysql/my.cnf") .withSetupSQL("docker/mysql/setup.sql") .withDatabaseName("flink-test") @@ -443,7 +443,7 @@ public void testWildcardWithMetadataColumnTransform() throws Exception { + " tables: %s.\\.*\n" + " server-id: 5400-5404\n" + " server-time-zone: UTC\n" - + " metadata-column.include-list: op_ts\n" + + " metadata.list: op_ts\n" + "sink:\n" + " type: values\n" + "transform:\n" From b6dbc2a5569fa66e4208801db70f780b8a74c687 Mon Sep 17 00:00:00 2001 From: Hang Ruan Date: Thu, 19 Dec 2024 14:17:43 +0800 Subject: [PATCH 10/13] fix test --- .../flink/cdc/pipeline/tests/TransformE2eITCase.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java index 42e132ed4c1..77994002953 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java @@ -69,7 +69,8 @@ public class TransformE2eITCase extends PipelineTestEnvironment { public static final MySqlContainer MYSQL = (MySqlContainer) new MySqlContainer( - MySqlVersion.V8_0_18) // v8 support both ARM and AMD architectures + MySqlVersion + .V8_0_18) // v8 support both ARM and AMD architectures .withConfigurationOverride("docker/mysql/my.cnf") .withSetupSQL("docker/mysql/setup.sql") .withDatabaseName("flink-test") @@ -469,13 +470,13 @@ public void testWildcardWithMetadataColumnTransform() throws Exception { waitUntilSpecificEvent( String.format( - "CreateTableEvent{tableId=%s.TABLEALPHA, schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17),`PRICEALPHA` INT,`AGEALPHA` INT,`NAMEALPHA` VARCHAR(128),`identifier_name` STRING,`type` STRING,`opts` BIGINT}, primaryKeys=ID, options=()}", + "CreateTableEvent{tableId=%s.TABLEALPHA, schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17),`PRICEALPHA` INT,`AGEALPHA` INT,`NAMEALPHA` VARCHAR(128),`identifier_name` STRING,`type` STRING NOT NULL,`opts` BIGINT NOT NULL}, primaryKeys=ID, options=()}", transformTestDatabase.getDatabaseName()), 60000L); waitUntilSpecificEvent( String.format( - "CreateTableEvent{tableId=%s.TABLEBETA, schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17),`CODENAMESBETA` VARCHAR(17),`AGEBETA` INT,`NAMEBETA` VARCHAR(128),`identifier_name` STRING,`type` STRING,`opts` BIGINT}, primaryKeys=ID, options=()}", + "CreateTableEvent{tableId=%s.TABLEBETA, schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17),`CODENAMESBETA` VARCHAR(17),`AGEBETA` INT,`NAMEBETA` VARCHAR(128),`identifier_name` STRING,`type` STRING NOT NULL,`opts` BIGINT NOT NULL}, primaryKeys=ID, options=()}", transformTestDatabase.getDatabaseName()), 60000L); From 6ea5b827584b544b38d3e06e74694c624a74448a Mon Sep 17 00:00:00 2001 From: Hang Ruan Date: Thu, 19 Dec 2024 14:20:17 +0800 Subject: [PATCH 11/13] fix test --- .../apache/flink/cdc/pipeline/tests/TransformE2eITCase.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java index 77994002953..c125de59b25 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java @@ -69,8 +69,7 @@ public class TransformE2eITCase extends PipelineTestEnvironment { public static final MySqlContainer MYSQL = (MySqlContainer) new MySqlContainer( - MySqlVersion - .V8_0_18) // v8 support both ARM and AMD architectures + MySqlVersion.V8_0) // v8 support both ARM and AMD architectures .withConfigurationOverride("docker/mysql/my.cnf") .withSetupSQL("docker/mysql/setup.sql") .withDatabaseName("flink-test") From 51fe3fe2685180ded095f2a84bfc7d9ce46383d6 Mon Sep 17 00:00:00 2001 From: Hang Ruan Date: Thu, 19 Dec 2024 15:10:29 +0800 Subject: [PATCH 12/13] fix test --- .../flink/cdc/composer/flink/FlinkPipelineComposerITCase.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java index e0223465a66..c8877ef6eae 100644 --- a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java +++ b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java @@ -413,7 +413,7 @@ void testOpTypeMetadataColumn(ValuesDataSink.SinkApi sinkApi) throws Exception { String[] outputEvents = outCaptor.toString().trim().split("\n"); assertThat(outputEvents) .containsExactly( - "CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING,`col2` STRING,`col12` STRING,`rk` STRING NOT NULL,`opts` BIGINT}, primaryKeys=col1, partitionKeys=col12, options=({key1=value1})}", + "CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING,`col2` STRING,`col12` STRING,`rk` STRING NOT NULL,`opts` BIGINT NOT NULL}, primaryKeys=col1, partitionKeys=col12, options=({key1=value1})}", "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1, 10, +I, 1], op=INSERT, meta=({op_ts=1})}", "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2, 20, +I, 2], op=INSERT, meta=({op_ts=2})}", "AddColumnEvent{tableId=default_namespace.default_schema.table1, addedColumns=[ColumnWithPosition{column=`col3` STRING, position=AFTER, existedColumnName=col2}]}", From a503ee9a7d4c949e489986eaa4a8b5d41d730b51 Mon Sep 17 00:00:00 2001 From: Hang Ruan Date: Thu, 19 Dec 2024 18:04:23 +0800 Subject: [PATCH 13/13] fix review --- .../composer/flink/FlinkPipelineComposer.java | 6 +-- .../translator/DataSourceTranslator.java | 43 +++++++++---------- .../mysql/factory/MySqlDataSourceFactory.java | 23 +++++++--- .../mysql/source/MySqlDataSourceOptions.java | 2 +- .../parser/metadata/MetadataColumns.java | 1 - 5 files changed, 39 insertions(+), 36 deletions(-) diff --git a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java index 4aa216c7d96..4578a49fdfa 100644 --- a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java +++ b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java @@ -100,12 +100,10 @@ public PipelineExecution compose(PipelineDef pipelineDef) { // Build Source Operator DataSourceTranslator sourceTranslator = new DataSourceTranslator(); - DataStream stream = - sourceTranslator.translate( - pipelineDef.getSource(), env, pipelineDefConfig, parallelism); - DataSource dataSource = sourceTranslator.createDataSource(pipelineDef.getSource(), env, pipelineDefConfig); + DataStream stream = + sourceTranslator.translate(pipelineDef.getSource(), env, parallelism, dataSource); // Build PreTransformOperator for processing Schema Event TransformTranslator transformTranslator = new TransformTranslator(); diff --git a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSourceTranslator.java b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSourceTranslator.java index 40d49d8bb59..4323871ff99 100644 --- a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSourceTranslator.java +++ b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSourceTranslator.java @@ -39,14 +39,29 @@ @Internal public class DataSourceTranslator { + public DataSource createDataSource( + SourceDef sourceDef, StreamExecutionEnvironment env, Configuration pipelineConfig) { + // Search the data source factory + DataSourceFactory sourceFactory = + FactoryDiscoveryUtils.getFactoryByIdentifier( + sourceDef.getType(), DataSourceFactory.class); + // Add source JAR to environment + FactoryDiscoveryUtils.getJarPathByIdentifier(sourceFactory) + .ifPresent(jar -> FlinkEnvironmentUtils.addJar(env, jar)); + DataSource dataSource = + sourceFactory.createDataSource( + new FactoryHelper.DefaultContext( + sourceDef.getConfig(), + pipelineConfig, + Thread.currentThread().getContextClassLoader())); + return dataSource; + } + public DataStreamSource translate( SourceDef sourceDef, StreamExecutionEnvironment env, - Configuration pipelineConfig, - int sourceParallelism) { - // Create data source - DataSource dataSource = createDataSource(sourceDef, env, pipelineConfig); - + int sourceParallelism, + DataSource dataSource) { // Get source provider EventSourceProvider eventSourceProvider = dataSource.getEventSourceProvider(); if (eventSourceProvider instanceof FlinkSourceProvider) { @@ -78,24 +93,6 @@ public DataStreamSource translate( } } - public DataSource createDataSource( - SourceDef sourceDef, StreamExecutionEnvironment env, Configuration pipelineConfig) { - // Search the data source factory - DataSourceFactory sourceFactory = - FactoryDiscoveryUtils.getFactoryByIdentifier( - sourceDef.getType(), DataSourceFactory.class); - // Add source JAR to environment - FactoryDiscoveryUtils.getJarPathByIdentifier(sourceFactory) - .ifPresent(jar -> FlinkEnvironmentUtils.addJar(env, jar)); - DataSource dataSource = - sourceFactory.createDataSource( - new FactoryHelper.DefaultContext( - sourceDef.getConfig(), - pipelineConfig, - Thread.currentThread().getContextClassLoader())); - return dataSource; - } - private String generateDefaultSourceName(SourceDef sourceDef) { return String.format("Flink CDC Event Source: %s", sourceDef.getType()); } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java index 2e193370d89..3d89f18c034 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java @@ -248,15 +248,24 @@ private List listReadableMetadata(String metadataList) { if (StringUtils.isNullOrWhitespaceOnly(metadataList)) { return new ArrayList<>(); } - List readableMetadataList = + Set readableMetadataList = Arrays.stream(metadataList.split(",")) .map(String::trim) - .collect(Collectors.toList()); - return Arrays.stream(MySqlReadableMetadata.values()) - .filter( - (mySqlReadableMetadata -> - readableMetadataList.contains(mySqlReadableMetadata.getKey()))) - .collect(Collectors.toList()); + .collect(Collectors.toSet()); + List foundMetadata = new ArrayList<>(); + for (MySqlReadableMetadata metadata : MySqlReadableMetadata.values()) { + if (readableMetadataList.contains(metadata.getKey())) { + foundMetadata.add(metadata); + readableMetadataList.remove(metadata.getKey()); + } + } + if (readableMetadataList.isEmpty()) { + return foundMetadata; + } + throw new IllegalArgumentException( + String.format( + "[%s] cannot be found in mysql metadata.", + String.join(", ", readableMetadataList))); } @Override diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java index bfae22e3abd..f37703f6941 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java @@ -280,5 +280,5 @@ public class MySqlDataSourceOptions { .noDefaultValue() .withDescription( "List of readable metadata from SourceRecord to be passed to downstream, split by `,`. " - + "Refer to MySqlReadableMetadata, available readable metadata are: table_name,database_name,op_ts,row_kind."); + + "Available readable metadata are: op_ts."); } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/MetadataColumns.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/MetadataColumns.java index 0177202c29d..f70e012f14c 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/MetadataColumns.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/MetadataColumns.java @@ -26,7 +26,6 @@ /** Contains all supported metadata columns that could be used in transform expressions. */ public class MetadataColumns { - public static final String DEFAULT_NAMESPACE_NAME = "__namespace_name__"; public static final String DEFAULT_SCHEMA_NAME = "__schema_name__"; public static final String DEFAULT_TABLE_NAME = "__table_name__";