diff --git a/docs/content.zh/docs/core-concept/transform.md b/docs/content.zh/docs/core-concept/transform.md index 4a942788c23..950b0183389 100644 --- a/docs/content.zh/docs/core-concept/transform.md +++ b/docs/content.zh/docs/core-concept/transform.md @@ -39,10 +39,26 @@ To describe a transform rule, the following parameters can be used: | primary-keys | Sink table primary keys, separated by commas | optional | | partition-keys | Sink table partition keys, separated by commas | optional | | table-options | used to the configure table creation statement when automatically creating tables | optional | +| converter-after-transform | used to add a converter to change DataChangeEvent after transform | optional | | description | Transform rule description | optional | Multiple rules can be declared in one single pipeline YAML file. +## converter-after-transform + +`converter-after-transform` is used to change the DataChangeEvent after other transform. The available values of this options are as follows. + +- SOFT_DELETE: The delete event will be converted as an insert event. This converter should be used together with the metadata `__data_event_type__`. Then you can implement the soft delete. + +For example, the following transform will not delete data when the delete event happens. Instead it will update the column `op_type` to -D in sink and transform it to an insert record. + +```yaml +transform: + - source-table: \.*.\.* + projection: \*, __data_event_type__ AS op_type + converter-after-transform: SOFT_DELETE +``` + # Metadata Fields ## Fields definition There are some hidden columns used to access metadata information. They will only take effect when explicitly referenced in the transform rules. diff --git a/docs/content/docs/core-concept/transform.md b/docs/content/docs/core-concept/transform.md index d1d725f92ec..9a8cf5c6106 100644 --- a/docs/content/docs/core-concept/transform.md +++ b/docs/content/docs/core-concept/transform.md @@ -31,18 +31,34 @@ What's more, it also helps users filter some unnecessary data during the synchro # Parameters To describe a transform rule, the following parameters can be used: -| Parameter | Meaning | Optional/Required | -|--------------|----------------------------------------------------|-------------------| -| source-table | Source table id, supports regular expressions | required | -| projection | Projection rule, supports syntax similar to the select clause in SQL | optional | -| filter | Filter rule, supports syntax similar to the where clause in SQL | optional | -| primary-keys | Sink table primary keys, separated by commas | optional | -| partition-keys | Sink table partition keys, separated by commas | optional | -| table-options | used to the configure table creation statement when automatically creating tables | optional | -| description | Transform rule description | optional | +| Parameter | Meaning | Optional/Required | +|---------------------------|-----------------------------------------------------------------------------------|-------------------| +| source-table | Source table id, supports regular expressions | required | +| projection | Projection rule, supports syntax similar to the select clause in SQL | optional | +| filter | Filter rule, supports syntax similar to the where clause in SQL | optional | +| primary-keys | Sink table primary keys, separated by commas | optional | +| partition-keys | Sink table partition keys, separated by commas | optional | +| table-options | used to the configure table creation statement when automatically creating tables | optional | +| converter-after-transform | used to add a converter to change DataChangeEvent after transform | optional | +| description | Transform rule description | optional | Multiple rules can be declared in one single pipeline YAML file. +## converter-after-transform + +`converter-after-transform` is used to change the DataChangeEvent after other transform. The available values of this options are as follows. + +- SOFT_DELETE: The delete event will be converted as an insert event. This converter should be used together with the metadata `__data_event_type__`. Then you can implement the soft delete. + +For example, the following transform will not delete data when the delete event happens. Instead it will update the column `op_type` to -D in sink and transform it to an insert record. + +```yaml +transform: + - source-table: \.*.\.* + projection: \*, __data_event_type__ AS op_type + converter-after-transform: SOFT_DELETE +``` + # Metadata Fields ## Fields definition There are some hidden columns used to access metadata information. They will only take effect when explicitly referenced in the transform rules. diff --git a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java index 2f471db21ea..b594aa35bfc 100644 --- a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java +++ b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java @@ -78,6 +78,8 @@ public class YamlPipelineDefinitionParser implements PipelineDefinitionParser { private static final String TRANSFORM_PROJECTION_KEY = "projection"; private static final String TRANSFORM_FILTER_KEY = "filter"; private static final String TRANSFORM_DESCRIPTION_KEY = "description"; + private static final String TRANSFORM_CONVERTER_AFTER_TRANSFORM_KEY = + "converter-after-transform"; // UDF related keys private static final String UDF_KEY = "user-defined-function"; @@ -316,6 +318,10 @@ private TransformDef toTransformDef(JsonNode transformNode) { Optional.ofNullable(transformNode.get(TRANSFORM_DESCRIPTION_KEY)) .map(JsonNode::asText) .orElse(null); + String postTransformConverter = + Optional.ofNullable(transformNode.get(TRANSFORM_CONVERTER_AFTER_TRANSFORM_KEY)) + .map(JsonNode::asText) + .orElse(null); return new TransformDef( sourceTable, @@ -324,7 +330,8 @@ private TransformDef toTransformDef(JsonNode transformNode) { primaryKeys, partitionKeys, tableOptions, - description); + description, + postTransformConverter); } private Configuration toPipelineConfig(JsonNode pipelineConfigNode) { diff --git a/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java b/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java index dca3e3154de..27cbbb1de18 100644 --- a/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java +++ b/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java @@ -159,7 +159,7 @@ void testValidTimeZone() throws Exception { } @Test - void testInvalidTimeZone() throws Exception { + void testInvalidTimeZone() { URL resource = Resources.getResource("definitions/pipeline-definition-minimized.yaml"); YamlPipelineDefinitionParser parser = new YamlPipelineDefinitionParser(); assertThatThrownBy( @@ -336,7 +336,8 @@ private void testSchemaEvolutionTypesParsing( "id", "product_name", "comment=app order", - "project fields from source table"), + "project fields from source table", + "SOFT_DELETE"), new TransformDef( "mydb.web_order_.*", "CONCAT(id, order_id) as uniq_id, *", @@ -344,7 +345,8 @@ private void testSchemaEvolutionTypesParsing( null, null, null, - "add new uniq_id for each row")), + "add new uniq_id for each row", + null)), Collections.emptyList(), Collections.singletonList( new ModelDef( @@ -402,6 +404,7 @@ void testParsingFullDefinitionFromString() throws Exception { + " partition-keys: product_name\n" + " table-options: comment=app order\n" + " description: project fields from source table\n" + + " converter-after-transform: SOFT_DELETE\n" + " - source-table: mydb.web_order_.*\n" + " projection: CONCAT(id, order_id) as uniq_id, *\n" + " filter: uniq_id > 10\n" @@ -469,7 +472,8 @@ void testParsingFullDefinitionFromString() throws Exception { "id", "product_name", "comment=app order", - "project fields from source table"), + "project fields from source table", + "SOFT_DELETE"), new TransformDef( "mydb.web_order_.*", "CONCAT(id, order_id) as uniq_id, *", @@ -477,7 +481,8 @@ void testParsingFullDefinitionFromString() throws Exception { null, null, null, - "add new uniq_id for each row")), + "add new uniq_id for each row", + null)), Collections.emptyList(), Collections.singletonList( new ModelDef( @@ -606,7 +611,8 @@ void testParsingFullDefinitionFromString() throws Exception { "id", "product_name", "comment=app order", - "project fields from source table"), + "project fields from source table", + "SOFT_DELETE"), new TransformDef( "mydb.web_order_.*", "CONCAT(id, order_id) as uniq_id, *", @@ -614,7 +620,8 @@ void testParsingFullDefinitionFromString() throws Exception { null, null, null, - "add new uniq_id for each row")), + "add new uniq_id for each row", + null)), Collections.emptyList(), Configuration.fromMap( ImmutableMap.builder() @@ -646,6 +653,7 @@ void testParsingFullDefinitionFromString() throws Exception { null, null, null, + null, null)), Arrays.asList( new UdfDef( diff --git a/flink-cdc-cli/src/test/resources/definitions/pipeline-definition-full-with-repsym.yaml b/flink-cdc-cli/src/test/resources/definitions/pipeline-definition-full-with-repsym.yaml index 265358fb411..10c6f6e0c00 100644 --- a/flink-cdc-cli/src/test/resources/definitions/pipeline-definition-full-with-repsym.yaml +++ b/flink-cdc-cli/src/test/resources/definitions/pipeline-definition-full-with-repsym.yaml @@ -49,6 +49,7 @@ transform: partition-keys: product_name table-options: comment=app order description: project fields from source table + converter-after-transform: SOFT_DELETE - source-table: mydb.web_order_.* projection: CONCAT(id, order_id) as uniq_id, * filter: uniq_id > 10 diff --git a/flink-cdc-cli/src/test/resources/definitions/pipeline-definition-full.yaml b/flink-cdc-cli/src/test/resources/definitions/pipeline-definition-full.yaml index 5a8cfb004cb..46cfd951ceb 100644 --- a/flink-cdc-cli/src/test/resources/definitions/pipeline-definition-full.yaml +++ b/flink-cdc-cli/src/test/resources/definitions/pipeline-definition-full.yaml @@ -47,6 +47,7 @@ transform: partition-keys: product_name table-options: comment=app order description: project fields from source table + converter-after-transform: SOFT_DELETE - source-table: mydb.web_order_.* projection: CONCAT(id, order_id) as uniq_id, * filter: uniq_id > 10 diff --git a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/TransformDef.java b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/TransformDef.java index 62491917ddb..ff54eb17367 100644 --- a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/TransformDef.java +++ b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/TransformDef.java @@ -50,6 +50,7 @@ public class TransformDef { private final String primaryKeys; private final String partitionKeys; private final String tableOptions; + private final String postTransformConverter; public TransformDef( String sourceTable, @@ -58,7 +59,8 @@ public TransformDef( String primaryKeys, String partitionKeys, String tableOptions, - String description) { + String description, + String postTransformConverter) { this.sourceTable = sourceTable; this.projection = projection; this.filter = filter; @@ -66,6 +68,7 @@ public TransformDef( this.partitionKeys = partitionKeys; this.tableOptions = tableOptions; this.description = description; + this.postTransformConverter = postTransformConverter; } public String getSourceTable() { @@ -104,6 +107,10 @@ public String getTableOptions() { return tableOptions; } + public String getPostTransformConverter() { + return postTransformConverter; + } + @Override public String toString() { return "TransformDef{" @@ -119,6 +126,9 @@ public String toString() { + ", description='" + description + '\'' + + ", postTransformConverter='" + + postTransformConverter + + '\'' + '}'; } @@ -137,7 +147,8 @@ public boolean equals(Object o) { && Objects.equals(description, that.description) && Objects.equals(primaryKeys, that.primaryKeys) && Objects.equals(partitionKeys, that.partitionKeys) - && Objects.equals(tableOptions, that.tableOptions); + && Objects.equals(tableOptions, that.tableOptions) + && Objects.equals(postTransformConverter, that.postTransformConverter); } @Override @@ -149,6 +160,7 @@ public int hashCode() { description, primaryKeys, partitionKeys, - tableOptions); + tableOptions, + postTransformConverter); } } diff --git a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/TransformTranslator.java b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/TransformTranslator.java index d8f0097215e..c84911fc76f 100644 --- a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/TransformTranslator.java +++ b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/TransformTranslator.java @@ -60,7 +60,8 @@ public DataStream translatePreTransform( transform.getFilter().orElse(null), transform.getPrimaryKeys(), transform.getPartitionKeys(), - transform.getTableOptions()); + transform.getTableOptions(), + transform.getPostTransformConverter()); } preTransformFunctionBuilder.addUdfFunctions( @@ -91,7 +92,8 @@ public DataStream translatePostTransform( transform.isValidFilter() ? transform.getFilter().get() : null, transform.getPrimaryKeys(), transform.getPartitionKeys(), - transform.getTableOptions()); + transform.getTableOptions(), + transform.getPostTransformConverter()); } } postTransformFunctionBuilder.addTimezone(timezone); diff --git a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java index 6008848a568..7820981ef68 100644 --- a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java +++ b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java @@ -325,7 +325,8 @@ void testTransform(ValuesDataSink.SinkApi sinkApi) throws Exception { "col1", "col12", "key1=value1", - ""); + "", + null); // Setup pipeline Configuration pipelineConfig = new Configuration(); @@ -387,7 +388,8 @@ void testOpTypeMetadataColumn(ValuesDataSink.SinkApi sinkApi) throws Exception { "col1", "col12", "key1=value1", - ""); + "", + null); // Setup pipeline Configuration pipelineConfig = new Configuration(); @@ -449,7 +451,8 @@ void testTransformTwice(ValuesDataSink.SinkApi sinkApi) throws Exception { "col1", "col12", "key1=value1", - ""); + "", + null); TransformDef transformDef2 = new TransformDef( "default_namespace.default_schema.table1", @@ -458,7 +461,8 @@ void testTransformTwice(ValuesDataSink.SinkApi sinkApi) throws Exception { null, null, null, - ""); + "", + null); // Setup pipeline Configuration pipelineConfig = new Configuration(); pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); @@ -986,7 +990,8 @@ void testTransformMergingWithRoute() throws Exception { null, null, null, - "")); + "", + null)); // Setup route TableId mergedTable = TableId.tableId("default_namespace", "default_schema", "merged"); diff --git a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerLenientITCase.java b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerLenientITCase.java index 9d1fea7c796..2dd6b8998f1 100644 --- a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerLenientITCase.java +++ b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerLenientITCase.java @@ -370,7 +370,8 @@ void testTransform(ValuesDataSink.SinkApi sinkApi) throws Exception { "col1", "col12", "key1=value1", - ""); + "", + null); // Setup pipeline Configuration pipelineConfig = new Configuration(); @@ -430,7 +431,8 @@ void testOpTypeMetadataColumn(ValuesDataSink.SinkApi sinkApi) throws Exception { "col1", "col12", "key1=value1", - ""); + "", + null); // Setup pipeline Configuration pipelineConfig = new Configuration(); @@ -490,7 +492,8 @@ void testTransformTwice(ValuesDataSink.SinkApi sinkApi) throws Exception { "col1", "col12", "key1=value1", - ""); + "", + null); TransformDef transformDef2 = new TransformDef( "default_namespace.default_schema.table1", @@ -499,7 +502,8 @@ void testTransformTwice(ValuesDataSink.SinkApi sinkApi) throws Exception { null, null, null, - ""); + "", + null); // Setup pipeline Configuration pipelineConfig = new Configuration(); pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); @@ -1020,7 +1024,8 @@ void testTransformMergingWithRoute() throws Exception { null, null, null, - "")); + "", + null)); // Setup route TableId mergedTable = TableId.tableId("default_namespace", "default_schema", "merged"); diff --git a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java index 0c3fb5dca7e..52ce966eb61 100644 --- a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java +++ b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java @@ -138,6 +138,7 @@ void testCalculatedColumns(ValuesDataSink.SinkApi sinkApi) throws Exception { null, null, null, + null, null)), Arrays.asList( "CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`name` STRING,`age` INT,`uid` STRING,`double_age` INT}, primaryKeys=id, options=()}", @@ -165,6 +166,7 @@ void testMultipleReferencedColumnsInProjection(ValuesDataSink.SinkApi sinkApi) null, null, null, + null, null)), Arrays.asList( "CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`name` STRING,`age` INT,`cubic_age` INT}, primaryKeys=id, options=()}", @@ -191,6 +193,7 @@ void testMultipleReferencedColumnsInFilter(ValuesDataSink.SinkApi sinkApi) throw null, null, null, + null, null)), Arrays.asList( "CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`name` STRING,`age` INT}, primaryKeys=id, options=()}", @@ -212,6 +215,7 @@ void testFilteringRules(ValuesDataSink.SinkApi sinkApi) throws Exception { null, null, null, + null, null)), Arrays.asList( "CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`name` STRING,`age` INT}, primaryKeys=id, options=()}", @@ -239,6 +243,7 @@ void testMultipleDispatchTransform(ValuesDataSink.SinkApi sinkApi) throws Except null, null, null, + null, null), new TransformDef( "default_namespace.default_schema.\\.*", @@ -247,6 +252,7 @@ void testMultipleDispatchTransform(ValuesDataSink.SinkApi sinkApi) throws Except null, null, null, + null, null)), Arrays.asList( "CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`name` STRING,`age` INT,`category` STRING}, primaryKeys=id, options=()}", @@ -272,6 +278,7 @@ void testMultipleTransformWithDiffRefColumn(ValuesDataSink.SinkApi sinkApi) thro null, null, null, + null, null), new TransformDef( "default_namespace.default_schema.\\.*", @@ -280,6 +287,7 @@ void testMultipleTransformWithDiffRefColumn(ValuesDataSink.SinkApi sinkApi) thro null, null, null, + null, null)), Arrays.asList( "CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`age` INT,`roleName` STRING}, primaryKeys=id, options=()}", @@ -305,6 +313,7 @@ void testMultiTransformWithAsterisk(ValuesDataSink.SinkApi sinkApi) throws Excep null, null, null, + null, null), new TransformDef( "default_namespace.default_schema.mytable2", @@ -313,6 +322,7 @@ void testMultiTransformWithAsterisk(ValuesDataSink.SinkApi sinkApi) throws Excep null, null, null, + null, null)), Arrays.asList( "CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`name` STRING,`age` INT}, primaryKeys=id, options=()}", @@ -338,6 +348,7 @@ void testMultiTransformMissingProjection(ValuesDataSink.SinkApi sinkApi) throws null, null, null, + null, null), new TransformDef( "default_namespace.default_schema.mytable2", @@ -346,6 +357,7 @@ void testMultiTransformMissingProjection(ValuesDataSink.SinkApi sinkApi) throws null, null, null, + null, null)), Arrays.asList( "CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`name` STRING,`age` INT}, primaryKeys=id, options=()}", @@ -372,7 +384,8 @@ void testMetadataInfo(ValuesDataSink.SinkApi sinkApi) throws Exception { "id,name", "id", "replication_num=1,bucket=17", - "Just a Transform Block")), + "Just a Transform Block", + null)), Arrays.asList( "CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`name` STRING,`age` INT}, primaryKeys=id;name, partitionKeys=id, options=({bucket=17, replication_num=1})}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1, Alice, 18], op=INSERT, meta=()}", @@ -401,7 +414,8 @@ void testMetadataInfoWithoutChangingSchema(ValuesDataSink.SinkApi sinkApi) throw "id,name", "id", "replication_num=1,bucket=17", - "A Transform Block without projection or filter")), + "A Transform Block without projection or filter", + null)), Arrays.asList( "CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`name` STRING,`age` INT}, primaryKeys=id;name, partitionKeys=id, options=({bucket=17, replication_num=1})}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1, Alice, 18], op=INSERT, meta=()}", @@ -427,6 +441,7 @@ void testMetadataColumn(ValuesDataSink.SinkApi sinkApi) throws Exception { null, null, null, + null, null)), Arrays.asList( "CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`name` STRING,`age` INT,`__namespace_name__` STRING NOT NULL,`__schema_name__` STRING NOT NULL,`__table_name__` STRING NOT NULL}, primaryKeys=id, options=()}", @@ -453,6 +468,7 @@ void testMetadataColumnWithWildcard(ValuesDataSink.SinkApi sinkApi) throws Excep null, null, null, + null, null)), Arrays.asList( "CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`name` STRING,`age` INT,`__namespace_name__` STRING NOT NULL,`__schema_name__` STRING NOT NULL,`__table_name__` STRING NOT NULL}, primaryKeys=id, options=()}", @@ -483,6 +499,7 @@ void testUsingMetadataColumnLiteralWithWildcard(ValuesDataSink.SinkApi sinkApi) null, null, null, + null, null)), Arrays.asList( "CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`name` STRING,`age` INT,`string_literal` STRING}, primaryKeys=id, options=()}", @@ -495,6 +512,33 @@ void testUsingMetadataColumnLiteralWithWildcard(ValuesDataSink.SinkApi sinkApi) "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, Derrida, 25, student, __namespace_name____schema_name____table_name__], after=[], op=DELETE, meta=()}")); } + /** This tests if projection rule could reference metadata info correctly. */ + @ParameterizedTest + @EnumSource + void testConvertDeleteAsInsert(ValuesDataSink.SinkApi sinkApi) throws Exception { + runGenericTransformTest( + sinkApi, + Collections.singletonList( + new TransformDef( + "default_namespace.default_schema.\\.*", + "id, name, age, __namespace_name__, __schema_name__, __table_name__, __data_event_type__", + null, + null, + null, + null, + null, + "SOFT_DELETE")), + Arrays.asList( + "CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`name` STRING,`age` INT,`__namespace_name__` STRING NOT NULL,`__schema_name__` STRING NOT NULL,`__table_name__` STRING NOT NULL,`__data_event_type__` STRING NOT NULL}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1, Alice, 18, default_namespace, default_schema, mytable1, +I], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2, Bob, 20, default_namespace, default_schema, mytable1, +I], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, Bob, 20, default_namespace, default_schema, mytable1, -U], after=[2, Bob, 30, default_namespace, default_schema, mytable1, +U], op=UPDATE, meta=()}", + "CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT,`name` VARCHAR(255),`age` TINYINT,`__namespace_name__` STRING NOT NULL,`__schema_name__` STRING NOT NULL,`__table_name__` STRING NOT NULL,`__data_event_type__` STRING NOT NULL}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[3, Carol, 15, default_namespace, default_schema, mytable2, +I], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[4, Derrida, 25, default_namespace, default_schema, mytable2, +I], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[4, Derrida, 25, default_namespace, default_schema, mytable2, -D], op=INSERT, meta=()}")); + } + /** This tests if built-in comparison functions work as expected. */ @ParameterizedTest @EnumSource @@ -515,6 +559,7 @@ void testBuiltinComparisonFunctions(ValuesDataSink.SinkApi sinkApi) throws Excep null, null, null, + null, null)), Arrays.asList( "CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`name` STRING,`age` INT,`col1` BOOLEAN,`col2` BOOLEAN,`col3` BOOLEAN,`col4` BOOLEAN,`col5` BOOLEAN,`col6` BOOLEAN,`col7` BOOLEAN,`col8` BOOLEAN,`col9` BOOLEAN,`col10` BOOLEAN,`col11` BOOLEAN,`col12` BOOLEAN,`col13` BOOLEAN,`col14` BOOLEAN}, primaryKeys=id, options=()}", @@ -547,6 +592,7 @@ void testBuiltinLogicalFunctions(ValuesDataSink.SinkApi sinkApi) throws Exceptio null, null, null, + null, null)), Arrays.asList( "CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`name` STRING,`age` INT,`col1` BOOLEAN,`col2` BOOLEAN,`col4` BOOLEAN,`col5` BOOLEAN,`col6` BOOLEAN,`col7` BOOLEAN,`col8` BOOLEAN,`col9` BOOLEAN,`col10` BOOLEAN}, primaryKeys=id, options=()}", @@ -580,6 +626,7 @@ void testBuiltinArithmeticFunctions(ValuesDataSink.SinkApi sinkApi) throws Excep null, null, null, + null, null)), Arrays.asList( "CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`name` STRING,`age` INT,`col1` INT,`col2` INT,`col3` INT,`col4` DOUBLE,`col5` INT,`col6` INT,`col7` DOUBLE,`col8` DOUBLE,`col9` DOUBLE,`col10` INT}, primaryKeys=id, options=()}", @@ -616,6 +663,7 @@ void testBuiltinStringFunctions(ValuesDataSink.SinkApi sinkApi) throws Exception null, null, null, + null, null)), Arrays.asList( "CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`name` STRING,`age` INT,`col1` STRING,`col2` INT,`col3` STRING,`col4` STRING,`col5` STRING,`col6` STRING,`col7` STRING,`col8` STRING,`col9` STRING,`col10` STRING}, primaryKeys=id, options=()}", @@ -648,6 +696,7 @@ void testSubstringFunctions(ValuesDataSink.SinkApi sinkApi) throws Exception { null, null, null, + null, null)), Arrays.asList("To", "be", "added")); } @@ -683,6 +732,7 @@ void testConditionalFunctions(ValuesDataSink.SinkApi sinkApi) throws Exception { null, null, null, + null, null)), Arrays.asList("Foo", "Bar", "Baz")); } @@ -745,6 +795,7 @@ void testTransformWithTemporalFunction() throws Exception { null, null, null, + null, null)), Collections.emptyList(), pipelineConfig); @@ -1006,7 +1057,14 @@ void testWildcardTransformWithSchemaEvolution() throws Exception { Collections.emptyList(), Collections.singletonList( new TransformDef( - tableId.toString(), "*", null, null, null, null, null)), + tableId.toString(), + "*", + null, + null, + null, + null, + null, + null)), Collections.emptyList(), pipelineConfig); @@ -1100,6 +1158,7 @@ void testExplicitTransformWithSchemaEvolution() throws Exception { null, null, null, + null, null)), Collections.emptyList(), pipelineConfig); @@ -1182,6 +1241,7 @@ void testPreAsteriskWithSchemaEvolution() throws Exception { null, null, null, + null, null)), Collections.emptyList(), pipelineConfig); @@ -1276,6 +1336,7 @@ void testPostAsteriskWithSchemaEvolution() throws Exception { null, null, null, + null, null)), Collections.emptyList(), pipelineConfig); @@ -1371,6 +1432,7 @@ void testTransformWithFilterButNoProjection() throws Exception { null, null, null, + null, null)), Collections.emptyList(), pipelineConfig); @@ -1465,6 +1527,7 @@ void testTransformUnmatchedSchemaEvolution() throws Exception { null, null, null, + null, null)), Collections.emptyList(), pipelineConfig); @@ -1583,6 +1646,7 @@ void testTransformWithCommentsAndDefaultExpr() throws Exception { null, null, null, + null, null)), Collections.emptyList(), pipelineConfig); @@ -1647,6 +1711,7 @@ String[] runNumericCastingWith(String expression) throws Exception { null, null, null, + null, null)), Collections.emptyList(), pipelineConfig); diff --git a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineUdfITCase.java b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineUdfITCase.java index 609c62ad72a..55d5b960042 100644 --- a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineUdfITCase.java +++ b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineUdfITCase.java @@ -133,7 +133,8 @@ void testTransformWithUdf(ValuesDataSink.SinkApi sinkApi, String language) throw "col1", null, "key1=value1", - ""); + "", + null); UdfDef udfDef = new UdfDef( @@ -203,7 +204,8 @@ void testFilterWithUdf(ValuesDataSink.SinkApi sinkApi, String language) throws E "col1", null, "key1=value1", - ""); + "", + null); UdfDef udfDef = new UdfDef( @@ -271,7 +273,8 @@ void testOverloadedUdf(ValuesDataSink.SinkApi sinkApi, String language) throws E "col1", null, "key1=value1", - ""); + "", + null); UdfDef udfDef = new UdfDef( @@ -341,7 +344,8 @@ void testUdfLifecycle(ValuesDataSink.SinkApi sinkApi, String language) throws Ex "col1", null, "key1=value1", - ""); + "", + null); UdfDef udfDef = new UdfDef( @@ -413,7 +417,8 @@ void testTypeHintedUdf(ValuesDataSink.SinkApi sinkApi, String language) throws E "col1", null, "key1=value1", - ""); + "", + null); UdfDef udfDef = new UdfDef( @@ -483,7 +488,8 @@ void testComplicatedUdf(ValuesDataSink.SinkApi sinkApi, String language) throws "col1", null, "key1=value1", - ""); + "", + null); // Setup pipeline Configuration pipelineConfig = new Configuration(); @@ -572,7 +578,8 @@ void testTransformWithFlinkUdf(ValuesDataSink.SinkApi sinkApi, String language) "col1", null, "key1=value1", - ""); + "", + null); UdfDef udfDef = new UdfDef( @@ -641,7 +648,8 @@ void testFilterWithFlinkUdf(ValuesDataSink.SinkApi sinkApi, String language) thr "col1", null, "key1=value1", - ""); + "", + null); UdfDef udfDef = new UdfDef( @@ -708,7 +716,8 @@ void testOverloadedFlinkUdf(ValuesDataSink.SinkApi sinkApi, String language) thr "col1", null, "key1=value1", - ""); + "", + null); UdfDef udfDef = new UdfDef( @@ -777,7 +786,8 @@ void testComplicatedFlinkUdf(ValuesDataSink.SinkApi sinkApi, String language) th "col1", null, "key1=value1", - ""); + "", + null); // Setup pipeline Configuration pipelineConfig = new Configuration(); @@ -855,7 +865,8 @@ void testTransformWithModel(ValuesDataSink.SinkApi sinkApi) throws Exception { "col1", null, "key1=value1", - ""); + "", + null); // Setup pipeline Configuration pipelineConfig = new Configuration(); diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java index a8c7a8c5f67..8bd74079404 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java @@ -334,6 +334,152 @@ public void testSchemaChangeEvents() throws Exception { "DropTableEvent{tableId=%s.products}"); } + @Test + public void testSoftDelete() throws Exception { + String pipelineJob = + String.format( + "source:\n" + + " type: mysql\n" + + " hostname: %s\n" + + " port: 3306\n" + + " username: %s\n" + + " password: %s\n" + + " tables: %s.\\.*\n" + + " server-id: 5400-5404\n" + + " server-time-zone: UTC\n" + + "\n" + + "transform:\n" + + " - source-table: \\.*.\\.*\n" + + " projection: \\*, __data_event_type__ AS op_type\n" + + " converter-after-transform: SOFT_DELETE\n" + + "\n" + + "sink:\n" + + " type: values\n" + + "\n" + + "pipeline:\n" + + " parallelism: %d\n" + + " schema.change.behavior: evolve", + INTER_CONTAINER_MYSQL_ALIAS, + MYSQL_TEST_USER, + MYSQL_TEST_PASSWORD, + mysqlInventoryDatabase.getDatabaseName(), + parallelism); + Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar"); + Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar"); + Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar"); + submitPipelineJob(pipelineJob, mysqlCdcJar, valuesCdcJar, mysqlDriverJar); + waitUntilJobRunning(Duration.ofSeconds(30)); + LOG.info("Pipeline job is running"); + + waitUntilSpecificEvent( + String.format( + "DataChangeEvent{tableId=%s.customers, before=[], after=[104, user_4, Shanghai, 123567891234, +I], op=INSERT, meta=()}", + mysqlInventoryDatabase.getDatabaseName())); + waitUntilSpecificEvent( + String.format( + "DataChangeEvent{tableId=%s.products, before=[], after=[109, spare tire, 24 inch spare tire, 22.2, null, null, null, +I], op=INSERT, meta=()}", + mysqlInventoryDatabase.getDatabaseName())); + + validateResult( + "CreateTableEvent{tableId=%s.customers, schema=columns={`id` INT NOT NULL,`name` VARCHAR(255) NOT NULL 'flink',`address` VARCHAR(1024),`phone_number` VARCHAR(512),`op_type` STRING NOT NULL}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=%s.customers, before=[], after=[104, user_4, Shanghai, 123567891234, +I], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.customers, before=[], after=[103, user_3, Shanghai, 123567891234, +I], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.customers, before=[], after=[102, user_2, Shanghai, 123567891234, +I], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.customers, before=[], after=[101, user_1, Shanghai, 123567891234, +I], op=INSERT, meta=()}", + "CreateTableEvent{tableId=%s.products, schema=columns={`id` INT NOT NULL,`name` VARCHAR(255) NOT NULL 'flink',`description` VARCHAR(512),`weight` FLOAT,`enum_c` STRING 'red',`json_c` STRING,`point_c` STRING,`op_type` STRING NOT NULL}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=%s.products, before=[], after=[109, spare tire, 24 inch spare tire, 22.2, null, null, null, +I], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.products, before=[], after=[107, rocks, box of assorted rocks, 5.3, null, null, null, +I], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.products, before=[], after=[108, jacket, water resistent black wind breaker, 0.1, null, null, null, +I], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.products, before=[], after=[105, hammer, 14oz carpenter's hammer, 0.875, red, {\"k1\": \"v1\", \"k2\": \"v2\"}, {\"coordinates\":[5,5],\"type\":\"Point\",\"srid\":0}, +I], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.products, before=[], after=[106, hammer, 16oz carpenter's hammer, 1.0, null, null, null, +I], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.products, before=[], after=[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8, red, {\"key3\": \"value3\"}, {\"coordinates\":[3,3],\"type\":\"Point\",\"srid\":0}, +I], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.products, before=[], after=[104, hammer, 12oz carpenter's hammer, 0.75, white, {\"key4\": \"value4\"}, {\"coordinates\":[4,4],\"type\":\"Point\",\"srid\":0}, +I], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.products, before=[], after=[101, scooter, Small 2-wheel scooter, 3.14, red, {\"key1\": \"value1\"}, {\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0}, +I], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.products, before=[], after=[102, car battery, 12V car battery, 8.1, white, {\"key2\": \"value2\"}, {\"coordinates\":[2,2],\"type\":\"Point\",\"srid\":0}, +I], op=INSERT, meta=()}"); + + LOG.info("Begin incremental reading stage."); + // generate binlogs + String mysqlJdbcUrl = + String.format( + "jdbc:mysql://%s:%s/%s", + MYSQL.getHost(), + MYSQL.getDatabasePort(), + mysqlInventoryDatabase.getDatabaseName()); + try (Connection conn = + DriverManager.getConnection( + mysqlJdbcUrl, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD); + Statement stat = conn.createStatement()) { + stat.execute("UPDATE products SET description='18oz carpenter hammer' WHERE id=106;"); + stat.execute("UPDATE products SET weight='5.1' WHERE id=107;"); + + // Perform DDL changes after the binlog is generated + waitUntilSpecificEvent( + String.format( + "DataChangeEvent{tableId=%s.products, before=[106, hammer, 16oz carpenter's hammer, 1.0, null, null, null, -U], after=[106, hammer, 18oz carpenter hammer, 1.0, null, null, null, +U], op=UPDATE, meta=()}", + mysqlInventoryDatabase.getDatabaseName())); + + LOG.info("Begin schema evolution stage."); + + // Test AddColumnEvent + stat.execute("ALTER TABLE products ADD COLUMN new_col INT;"); + stat.execute( + "INSERT INTO products VALUES (default,'jacket','water resistent white wind breaker',0.2, null, null, null, 1);"); // 110 + stat.execute( + "INSERT INTO products VALUES (default,'scooter','Big 2-wheel scooter ',5.18, null, null, null, 1);"); // 111 + stat.execute( + "UPDATE products SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;"); + stat.execute("UPDATE products SET weight='5.17' WHERE id=111;"); + stat.execute("DELETE FROM products WHERE id=111;"); + + // Test AlterColumnTypeEvent + stat.execute("ALTER TABLE products MODIFY COLUMN new_col BIGINT;"); + stat.execute( + "INSERT INTO products VALUES (default,'derrida','forever 21',2.1728, null, null, null, 2147483649);"); // 112 + + // Test RenameColumnEvent + stat.execute("ALTER TABLE products RENAME COLUMN new_col TO new_column;"); + stat.execute( + "INSERT INTO products VALUES (default,'dynazenon','SSSS',2.1728, null, null, null, 2147483649);"); // 113 + + // Test DropColumnEvent + stat.execute("ALTER TABLE products DROP COLUMN new_column;"); + stat.execute( + "INSERT INTO products VALUES (default,'evangelion','Eva',2.1728, null, null, null);"); // 114 + + // Test TruncateTableEvent + stat.execute("TRUNCATE TABLE products;"); + + // Test DropTableEvent. It's all over. + stat.execute("DROP TABLE products;"); + } catch (SQLException e) { + LOG.error("Update table for CDC failed.", e); + throw e; + } + + waitUntilSpecificEvent( + String.format( + "DropTableEvent{tableId=%s.products}", + mysqlInventoryDatabase.getDatabaseName())); + + validateResult( + "DataChangeEvent{tableId=%s.products, before=[106, hammer, 16oz carpenter's hammer, 1.0, null, null, null, -U], after=[106, hammer, 18oz carpenter hammer, 1.0, null, null, null, +U], op=UPDATE, meta=()}", + "DataChangeEvent{tableId=%s.products, before=[107, rocks, box of assorted rocks, 5.3, null, null, null, -U], after=[107, rocks, box of assorted rocks, 5.1, null, null, null, +U], op=UPDATE, meta=()}", + "AddColumnEvent{tableId=%s.products, addedColumns=[ColumnWithPosition{column=`new_col` INT, position=AFTER, existedColumnName=point_c}]}", + "DataChangeEvent{tableId=%s.products, before=[], after=[110, jacket, water resistent white wind breaker, 0.2, null, null, null, 1, +I], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.products, before=[], after=[111, scooter, Big 2-wheel scooter , 5.18, null, null, null, 1, +I], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.products, before=[110, jacket, water resistent white wind breaker, 0.2, null, null, null, 1, -U], after=[110, jacket, new water resistent white wind breaker, 0.5, null, null, null, 1, +U], op=UPDATE, meta=()}", + "DataChangeEvent{tableId=%s.products, before=[111, scooter, Big 2-wheel scooter , 5.18, null, null, null, 1, -U], after=[111, scooter, Big 2-wheel scooter , 5.17, null, null, null, 1, +U], op=UPDATE, meta=()}", + "DataChangeEvent{tableId=%s.products, before=[], after=[111, scooter, Big 2-wheel scooter , 5.17, null, null, null, 1, -D], op=INSERT, meta=()}", + "AlterColumnTypeEvent{tableId=%s.products, typeMapping={new_col=BIGINT}, oldTypeMapping={new_col=INT}}", + "DataChangeEvent{tableId=%s.products, before=[], after=[112, derrida, forever 21, 2.1728, null, null, null, 2147483649, +I], op=INSERT, meta=()}", + "RenameColumnEvent{tableId=%s.products, nameMapping={new_col=new_column}}", + "DataChangeEvent{tableId=%s.products, before=[], after=[113, dynazenon, SSSS, 2.1728, null, null, null, 2147483649, +I], op=INSERT, meta=()}", + "DropColumnEvent{tableId=%s.products, droppedColumnNames=[new_column]}", + "DataChangeEvent{tableId=%s.products, before=[], after=[114, evangelion, Eva, 2.1728, null, null, null, +I], op=INSERT, meta=()}", + "TruncateTableEvent{tableId=%s.products}", + "DropTableEvent{tableId=%s.products}"); + } + @Test public void testDanglingDropTableEventInBinlog() throws Exception { // Create a new table for later deletion diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java index 23786c76664..68f4526f057 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java @@ -33,6 +33,8 @@ import org.apache.flink.cdc.common.schema.Selectors; import org.apache.flink.cdc.common.udf.UserDefinedFunctionContext; import org.apache.flink.cdc.common.utils.SchemaUtils; +import org.apache.flink.cdc.runtime.operators.transform.converter.PostTransformConverter; +import org.apache.flink.cdc.runtime.operators.transform.converter.PostTransformConverters; import org.apache.flink.cdc.runtime.parser.TransformParser; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; @@ -101,7 +103,8 @@ public PostTransformOperator.Builder addTransform( @Nullable String filter, String primaryKey, String partitionKey, - String tableOptions) { + String tableOptions, + String postTransformConverter) { transformRules.add( new TransformRule( tableInclusions, @@ -109,13 +112,15 @@ public PostTransformOperator.Builder addTransform( filter, primaryKey, partitionKey, - tableOptions)); + tableOptions, + postTransformConverter)); return this; } public PostTransformOperator.Builder addTransform( String tableInclusions, @Nullable String projection, @Nullable String filter) { - transformRules.add(new TransformRule(tableInclusions, projection, filter, "", "", "")); + transformRules.add( + new TransformRule(tableInclusions, projection, filter, "", "", "", null)); return this; } @@ -172,10 +177,10 @@ public void open() throws Exception { transforms = transformRules.stream() .map( - tuple3 -> { - String tableInclusions = tuple3.getTableInclusions(); - String projection = tuple3.getProjection(); - String filterExpression = tuple3.getFilter(); + transformRule -> { + String tableInclusions = transformRule.getTableInclusions(); + String projection = transformRule.getProjection(); + String filterExpression = transformRule.getFilter(); Selectors selectors = new Selectors.SelectorsBuilder() @@ -185,7 +190,9 @@ public void open() throws Exception { selectors, TransformProjection.of(projection).orElse(null), TransformFilter.of(filterExpression, udfDescriptors) - .orElse(null)); + .orElse(null), + PostTransformConverters.of( + transformRule.getPostTransformConverter())); }) .collect(Collectors.toList()); this.transformProjectionProcessorMap = new ConcurrentHashMap<>(); @@ -431,6 +438,13 @@ private Optional processDataChangeEvent(DataChangeEvent dataCha dataChangeEventOptional.get(), epochTime); } + if (dataChangeEventOptional.isPresent() + && transform.getPostTransformConverter().isPresent()) { + dataChangeEventOptional = + convertDataChangeEvent( + dataChangeEventOptional.get(), + transform.getPostTransformConverter().get()); + } transformedDataChangeEventOptionalList.add(dataChangeEventOptional); } } @@ -448,6 +462,11 @@ private Optional processDataChangeEvent(DataChangeEvent dataCha } } + private Optional convertDataChangeEvent( + DataChangeEvent dataChangeEvent, PostTransformConverter postTransformConverter) { + return postTransformConverter.convert(dataChangeEvent); + } + private Optional processFilter( TransformFilterProcessor transformFilterProcessor, DataChangeEvent dataChangeEvent, diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformer.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformer.java index 33d4395c194..06045bf4291 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformer.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformer.java @@ -18,6 +18,7 @@ package org.apache.flink.cdc.runtime.operators.transform; import org.apache.flink.cdc.common.schema.Selectors; +import org.apache.flink.cdc.runtime.operators.transform.converter.PostTransformConverter; import javax.annotation.Nullable; @@ -29,14 +30,17 @@ public class PostTransformer { private final Optional projection; private final Optional filter; + private final Optional postTransformConverter; public PostTransformer( Selectors selectors, @Nullable TransformProjection projection, - @Nullable TransformFilter filter) { + @Nullable TransformFilter filter, + Optional postTransformConverter) { this.selectors = selectors; this.projection = projection != null ? Optional.of(projection) : Optional.empty(); this.filter = filter != null ? Optional.of(filter) : Optional.empty(); + this.postTransformConverter = postTransformConverter; } public Selectors getSelectors() { @@ -50,4 +54,8 @@ public Optional getProjection() { public Optional getFilter() { return filter; } + + public Optional getPostTransformConverter() { + return postTransformConverter; + } } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java index 538c28ddb28..6ee0d72e984 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java @@ -88,7 +88,8 @@ public static class Builder { public PreTransformOperator.Builder addTransform( String tableInclusions, @Nullable String projection, @Nullable String filter) { - transformRules.add(new TransformRule(tableInclusions, projection, filter, "", "", "")); + transformRules.add( + new TransformRule(tableInclusions, projection, filter, "", "", "", null)); return this; } @@ -98,7 +99,8 @@ public PreTransformOperator.Builder addTransform( @Nullable String filter, String primaryKey, String partitionKey, - String tableOption) { + String tableOption, + @Nullable String postTransformConverter) { transformRules.add( new TransformRule( tableInclusions, @@ -106,7 +108,8 @@ public PreTransformOperator.Builder addTransform( filter, primaryKey, partitionKey, - tableOption)); + tableOption, + postTransformConverter)); return this; } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformRule.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformRule.java index 27b2652266d..7bdc5cffc74 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformRule.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformRule.java @@ -33,6 +33,7 @@ public class TransformRule implements Serializable { private final String primaryKey; private final String partitionKey; private final String tableOption; + private final @Nullable String postTransformConverter; public TransformRule( String tableInclusions, @@ -40,13 +41,15 @@ public TransformRule( @Nullable String filter, String primaryKey, String partitionKey, - String tableOption) { + String tableOption, + @Nullable String postTransformConverter) { this.tableInclusions = tableInclusions; this.projection = projection; this.filter = normalizeFilter(projection, filter); this.primaryKey = primaryKey; this.partitionKey = partitionKey; this.tableOption = tableOption; + this.postTransformConverter = postTransformConverter; } public String getTableInclusions() { @@ -74,4 +77,9 @@ public String getPartitionKey() { public String getTableOption() { return tableOption; } + + @Nullable + public String getPostTransformConverter() { + return postTransformConverter; + } } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/converter/PostTransformConverter.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/converter/PostTransformConverter.java new file mode 100644 index 00000000000..6fb2bbd4d4e --- /dev/null +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/converter/PostTransformConverter.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.runtime.operators.transform.converter; + +import org.apache.flink.cdc.common.annotation.Experimental; +import org.apache.flink.cdc.common.event.DataChangeEvent; + +import java.io.Serializable; +import java.util.Optional; + +/** + * The PostTransformConverter applies to convert the {@link DataChangeEvent} after other part of + * TransformRule in PostTransformOperator. + */ +@Experimental +public interface PostTransformConverter extends Serializable { + + /** Change some parts of {@link DataChangeEvent}, like op or meta. */ + Optional convert(DataChangeEvent dataChangeEvent); +} diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/converter/PostTransformConverters.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/converter/PostTransformConverters.java new file mode 100644 index 00000000000..78a222dab8c --- /dev/null +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/converter/PostTransformConverters.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.runtime.operators.transform.converter; + +import org.apache.flink.cdc.common.annotation.Experimental; +import org.apache.flink.cdc.common.utils.StringUtils; + +import java.util.Optional; + +/** The {@link PostTransformConverter} utils. */ +@Experimental +public class PostTransformConverters { + public static final String SOFT_DELETE_CONVERTER = "SOFT_DELETE"; + + /** Get the {@link PostTransformConverter} by given identifier. */ + public static Optional of(String identifier) { + if (StringUtils.isNullOrWhitespaceOnly(identifier)) { + return Optional.empty(); + } + + switch (identifier) { + case SOFT_DELETE_CONVERTER: + return Optional.of(new SoftDeleteConverter()); + default: + throw new IllegalArgumentException( + String.format("Failed to find the converter %s.", identifier)); + } + } +} diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/converter/SoftDeleteConverter.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/converter/SoftDeleteConverter.java new file mode 100644 index 00000000000..d26ee23cb78 --- /dev/null +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/converter/SoftDeleteConverter.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.runtime.operators.transform.converter; + +import org.apache.flink.cdc.common.event.DataChangeEvent; + +import java.util.Optional; + +import static org.apache.flink.cdc.common.event.OperationType.DELETE; + +/** This {@link PostTransformConverter} convert delete events to insert events. */ +public class SoftDeleteConverter implements PostTransformConverter { + + @Override + public Optional convert(DataChangeEvent dataChangeEvent) { + if (DELETE.equals(dataChangeEvent.op())) { + return Optional.of( + DataChangeEvent.insertEvent( + dataChangeEvent.tableId(), + dataChangeEvent.before(), + dataChangeEvent.meta())); + } + return Optional.of(dataChangeEvent); + } +} diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java index 94b190cf086..d266ed20185 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java @@ -434,7 +434,8 @@ void testDataChangeEventTransformTwice() throws Exception { void testDataChangeEventTransformProjectionDataTypeConvert() throws Exception { PostTransformOperator transform = PostTransformOperator.newBuilder() - .addTransform(DATATYPE_TABLEID.identifier(), "*", null, null, null, null) + .addTransform( + DATATYPE_TABLEID.identifier(), "*", null, null, null, null, null) .build(); EventOperatorTestHarness transformFunctionEventEventOperatorTestHarness = @@ -571,6 +572,91 @@ void testMetadataASTransform() throws Exception { .isEqualTo(new StreamRecord<>(insertEventExpect)); } + @Test + void testSoftDeleteTransform() throws Exception { + PostTransformOperator transform = + PostTransformOperator.newBuilder() + .addTransform( + METADATA_TABLEID.identifier(), + "*, __namespace_name__ || '.' || __schema_name__ || '.' || __table_name__ identifier_name, __namespace_name__, __schema_name__, __table_name__, __data_event_type__", + " __table_name__ = 'metadata_table' ", + "", + "", + "", + "SOFT_DELETE") + .build(); + EventOperatorTestHarness + transformFunctionEventEventOperatorTestHarness = + new EventOperatorTestHarness<>(transform, 1); + Schema expectedSchema = + Schema.newBuilder() + .physicalColumn("col1", DataTypes.STRING()) + .physicalColumn("identifier_name", DataTypes.STRING()) + .physicalColumn("__namespace_name__", DataTypes.STRING().notNull()) + .physicalColumn("__schema_name__", DataTypes.STRING().notNull()) + .physicalColumn("__table_name__", DataTypes.STRING().notNull()) + .physicalColumn("__data_event_type__", DataTypes.STRING().notNull()) + .primaryKey("col1") + .build(); + + // Initialization + transformFunctionEventEventOperatorTestHarness.open(); + // Create table + CreateTableEvent createTableEvent = new CreateTableEvent(METADATA_TABLEID, METADATA_SCHEMA); + BinaryRecordDataGenerator recordDataGenerator = + new BinaryRecordDataGenerator(((RowType) METADATA_SCHEMA.toRowDataType())); + BinaryRecordDataGenerator expectedRecordDataGenerator = + new BinaryRecordDataGenerator(((RowType) expectedSchema.toRowDataType())); + // Insert + DataChangeEvent insertEvent = + DataChangeEvent.insertEvent( + METADATA_TABLEID, + recordDataGenerator.generate(new Object[] {new BinaryStringData("1")})); + DataChangeEvent insertEventExpect = + DataChangeEvent.insertEvent( + METADATA_TABLEID, + expectedRecordDataGenerator.generate( + new Object[] { + new BinaryStringData("1"), + new BinaryStringData("my_company.my_branch.metadata_table"), + new BinaryStringData("my_company"), + new BinaryStringData("my_branch"), + new BinaryStringData("metadata_table"), + new BinaryStringData("+I") + })); + transform.processElement(new StreamRecord<>(createTableEvent)); + Assertions.assertThat( + transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll()) + .isEqualTo( + new StreamRecord<>(new CreateTableEvent(METADATA_TABLEID, expectedSchema))); + transform.processElement(new StreamRecord<>(insertEvent)); + Assertions.assertThat( + transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll()) + .isEqualTo(new StreamRecord<>(insertEventExpect)); + + // Delete + DataChangeEvent deleteEvent = + DataChangeEvent.deleteEvent( + METADATA_TABLEID, + recordDataGenerator.generate(new Object[] {new BinaryStringData("1")})); + DataChangeEvent deleteEventExpect = + DataChangeEvent.insertEvent( + METADATA_TABLEID, + expectedRecordDataGenerator.generate( + new Object[] { + new BinaryStringData("1"), + new BinaryStringData("my_company.my_branch.metadata_table"), + new BinaryStringData("my_company"), + new BinaryStringData("my_branch"), + new BinaryStringData("metadata_table"), + new BinaryStringData("-D") + })); + transform.processElement(new StreamRecord<>(deleteEvent)); + Assertions.assertThat( + transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll()) + .isEqualTo(new StreamRecord<>(deleteEventExpect)); + } + @Test void testDataChangeEventTransformWithDuplicateColumns() throws Exception { PostTransformOperator transform = diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperatorTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperatorTest.java index 0e3553e8c46..6f7ac12e755 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperatorTest.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperatorTest.java @@ -191,7 +191,8 @@ void testEventTransform() throws Exception { null, "col2", "col12", - "key1=value1,key2=value2") + "key1=value1,key2=value2", + null) .build(); EventOperatorTestHarness transformFunctionEventEventOperatorTestHarness = @@ -301,7 +302,8 @@ public void testNullabilityColumn() throws Exception { null, "id", "id", - "key1=value1,key2=value2") + "key1=value1,key2=value2", + null) .build(); EventOperatorTestHarness transformFunctionEventEventOperatorTestHarness = @@ -331,7 +333,8 @@ public void testReduceTransformColumn() throws Exception { "newage > 17 and ref2 > 17", "id", "id", - "key1=value1,key2=value2") + "key1=value1,key2=value2", + null) .build(); EventOperatorTestHarness transformFunctionEventEventOperatorTestHarness = @@ -438,7 +441,8 @@ public void testWildcardTransformColumn() throws Exception { "newage > 17", "id", "id", - "key1=value1,key2=value2") + "key1=value1,key2=value2", + null) .build(); EventOperatorTestHarness transformFunctionEventEventOperatorTestHarness = @@ -554,6 +558,7 @@ void testMultiTransformWithDiffRefColumns() throws Exception { "age < 18", "id", null, + null, null) .addTransform( CUSTOMERS_TABLEID.identifier(), @@ -561,6 +566,7 @@ void testMultiTransformWithDiffRefColumns() throws Exception { "age >= 18", "id", null, + null, null) .build(); EventOperatorTestHarness @@ -591,6 +597,7 @@ void testMultiTransformWithAsterisk() throws Exception { "age < 18", "id", null, + null, null) .addTransform( CUSTOMERS_TABLEID.identifier(), @@ -598,6 +605,7 @@ void testMultiTransformWithAsterisk() throws Exception { "age >= 18", "id", null, + null, null) .build(); EventOperatorTestHarness @@ -622,13 +630,20 @@ void testMultiTransformMissingProjection() throws Exception { PreTransformOperator transform = PreTransformOperator.newBuilder() .addTransform( - CUSTOMERS_TABLEID.identifier(), null, "age < 18", "id", null, null) + CUSTOMERS_TABLEID.identifier(), + null, + "age < 18", + "id", + null, + null, + null) .addTransform( CUSTOMERS_TABLEID.identifier(), "id, age, UPPER(name) as name, sex", "age >= 18", "id", null, + null, null) .build(); EventOperatorTestHarness