From 468f52b173724d26ab7148985c7c5ca5604625be Mon Sep 17 00:00:00 2001 From: MOBIN-F <18814118038@163.com> Date: Fri, 9 Aug 2024 22:18:53 +0800 Subject: [PATCH 1/3] Fix: Job crashes when metadata column names present in transform rules --- .../transform/ProjectionColumnProcessor.java | 30 ++++++------ .../cdc/runtime/parser/TransformParser.java | 26 +++-------- .../UnifiedTransformOperatorTest.java | 46 +++++++++++++++++++ 3 files changed, 67 insertions(+), 35 deletions(-) diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java index 3dde9b20c53..3694a29e247 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java @@ -152,22 +152,22 @@ private TransformExpressionKey generateTransformExpressionKey() { } } } - if (scriptExpression.contains(TransformParser.DEFAULT_NAMESPACE_NAME) - && !argumentNames.contains(TransformParser.DEFAULT_NAMESPACE_NAME)) { - argumentNames.add(TransformParser.DEFAULT_NAMESPACE_NAME); - paramTypes.add(String.class); - } - - if (scriptExpression.contains(TransformParser.DEFAULT_SCHEMA_NAME) - && !argumentNames.contains(TransformParser.DEFAULT_SCHEMA_NAME)) { - argumentNames.add(TransformParser.DEFAULT_SCHEMA_NAME); - paramTypes.add(String.class); - } - if (scriptExpression.contains(TransformParser.DEFAULT_TABLE_NAME) - && !argumentNames.contains(TransformParser.DEFAULT_TABLE_NAME)) { - argumentNames.add(TransformParser.DEFAULT_TABLE_NAME); - paramTypes.add(String.class); + for (String originalColumnName : originalColumnNames) { + switch (originalColumnName) { + case TransformParser.DEFAULT_NAMESPACE_NAME: + argumentNames.add(TransformParser.DEFAULT_NAMESPACE_NAME); + paramTypes.add(String.class); + break; + case TransformParser.DEFAULT_SCHEMA_NAME: + argumentNames.add(TransformParser.DEFAULT_SCHEMA_NAME); + paramTypes.add(String.class); + break; + case TransformParser.DEFAULT_TABLE_NAME: + argumentNames.add(TransformParser.DEFAULT_TABLE_NAME); + paramTypes.add(String.class); + break; + } } argumentNames.add(JaninoCompiler.DEFAULT_TIME_ZONE); diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java index 6260bf7b0ac..e0232412fd6 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java @@ -109,7 +109,7 @@ private static RelNode sqlToRel( List columns, SqlNode sqlNode, List udfDescriptors) { - List columnsWithMetadata = copyFillMetadataColumn(sqlNode.toString(), columns); + List columnsWithMetadata = copyFillMetadataColumn(columns); CalciteSchema rootSchema = CalciteSchema.createRootSchema(true); SchemaPlus schema = rootSchema.plus(); Map operand = new HashMap<>(); @@ -498,29 +498,15 @@ private static SqlSelect parseProjectionExpression(String projection) { return parseSelect(statement.toString()); } - private static List copyFillMetadataColumn( - String transformStatement, List columns) { + private static List copyFillMetadataColumn(List columns) { + // Add meteColumn for SQLValidater.validate List columnsWithMetadata = new ArrayList<>(columns); - if (transformStatement.contains(DEFAULT_NAMESPACE_NAME) - && !containsMetadataColumn(columnsWithMetadata, DEFAULT_NAMESPACE_NAME)) { - columnsWithMetadata.add( - Column.physicalColumn(DEFAULT_NAMESPACE_NAME, DataTypes.STRING())); - } - if (transformStatement.contains(DEFAULT_SCHEMA_NAME) - && !containsMetadataColumn(columnsWithMetadata, DEFAULT_SCHEMA_NAME)) { - columnsWithMetadata.add(Column.physicalColumn(DEFAULT_SCHEMA_NAME, DataTypes.STRING())); - } - if (transformStatement.contains(DEFAULT_TABLE_NAME) - && !containsMetadataColumn(columnsWithMetadata, DEFAULT_TABLE_NAME)) { - columnsWithMetadata.add(Column.physicalColumn(DEFAULT_TABLE_NAME, DataTypes.STRING())); - } + columnsWithMetadata.add(Column.physicalColumn(DEFAULT_NAMESPACE_NAME, DataTypes.STRING())); + columnsWithMetadata.add(Column.physicalColumn(DEFAULT_SCHEMA_NAME, DataTypes.STRING())); + columnsWithMetadata.add(Column.physicalColumn(DEFAULT_TABLE_NAME, DataTypes.STRING())); return columnsWithMetadata; } - private static boolean containsMetadataColumn(List columns, String columnName) { - return columns.stream().anyMatch(column -> column.getName().equals(columnName)); - } - private static boolean isMetadataColumn(String columnName) { return DEFAULT_TABLE_NAME.equals(columnName) || DEFAULT_SCHEMA_NAME.equals(columnName) diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/UnifiedTransformOperatorTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/UnifiedTransformOperatorTest.java index 009d700fee0..921549ae8cc 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/UnifiedTransformOperatorTest.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/UnifiedTransformOperatorTest.java @@ -917,6 +917,52 @@ public void testMetadataAndCalculatedTransform() throws Exception { .destroyHarness(); } + @Test + public void testMetadataTransformIncludeMetaColumnString() throws Exception { + TableId tableId = TableId.tableId("my_company", "my_branch", "schema_nullability"); + UnifiedTransformTestCase.of( + tableId, + "id, name, age, id + age as computed, __table_name__ as metaCol1, '__table_name__' as metaCol2, '__namespace__name__schema__name__table__name__' as metaCol3, UPPER(__schema_name__) as metaCol4", + "id > 100", + Schema.newBuilder() + .physicalColumn("id", DataTypes.INT()) + .physicalColumn("name", DataTypes.STRING().notNull()) + .physicalColumn("age", DataTypes.INT().notNull()) + .primaryKey("id") + .build(), + Schema.newBuilder() + .physicalColumn("id", DataTypes.INT()) + .physicalColumn("name", DataTypes.STRING().notNull()) + .physicalColumn("age", DataTypes.INT().notNull()) + .primaryKey("id") + .build(), + Schema.newBuilder() + .physicalColumn("id", DataTypes.INT()) + .physicalColumn("name", DataTypes.STRING().notNull()) + .physicalColumn("age", DataTypes.INT().notNull()) + .physicalColumn("computed", DataTypes.INT()) + .physicalColumn("metaCol1", DataTypes.STRING()) + .physicalColumn("metaCol2", DataTypes.STRING()) + .physicalColumn("metaCol3", DataTypes.STRING()) + .physicalColumn("metaCol4", DataTypes.STRING()) + .primaryKey("id") + .build()) + .initializeHarness() + .insertSource(1000, "Alice", 17) + .insertPreTransformed(1000, "Alice", 17) + .insertPostTransformed( + 1000, + "Alice", + 17, + 1017, + "schema_nullability", + "__table_name__", + "__namespace__name__schema__name__table__name__", + "MY_BRANCH") + .runTests() + .destroyHarness(); + } + @Test public void testTransformWithCast() throws Exception { TableId tableId = TableId.tableId("my_company", "my_branch", "transform_with_cast"); From b458fee3cefe8b7430b13d0400045a1fc3b4ded4 Mon Sep 17 00:00:00 2001 From: MOBIN-F <18814118038@163.com> Date: Sat, 10 Aug 2024 00:56:49 +0800 Subject: [PATCH 2/3] fix comment --- .../cdc/runtime/parser/TransformParser.java | 2 +- .../UnifiedTransformOperatorTest.java | 19 ++++++++++++------- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java index e0232412fd6..47bedc638e7 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java @@ -499,7 +499,7 @@ private static SqlSelect parseProjectionExpression(String projection) { } private static List copyFillMetadataColumn(List columns) { - // Add meteColumn for SQLValidater.validate + // Add metaColumn for SQLValidater.validate List columnsWithMetadata = new ArrayList<>(columns); columnsWithMetadata.add(Column.physicalColumn(DEFAULT_NAMESPACE_NAME, DataTypes.STRING())); columnsWithMetadata.add(Column.physicalColumn(DEFAULT_SCHEMA_NAME, DataTypes.STRING())); diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/UnifiedTransformOperatorTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/UnifiedTransformOperatorTest.java index 921549ae8cc..43a364e5153 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/UnifiedTransformOperatorTest.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/UnifiedTransformOperatorTest.java @@ -922,7 +922,8 @@ public void testMetadataTransformIncludeMetaColumnString() throws Exception { TableId tableId = TableId.tableId("my_company", "my_branch", "schema_nullability"); UnifiedTransformTestCase.of( tableId, - "id, name, age, id + age as computed, __table_name__ as metaCol1, '__table_name__' as metaCol2, '__namespace__name__schema__name__table__name__' as metaCol3, UPPER(__schema_name__) as metaCol4", + "id, name, age, id + age as computed, __namespace_name__ as metaColNameSpaceName, __schema_name__ as metaColSchemaName, __table_name__ as metaColNameTableName, " + + "UPPER(__schema_name__) as metaColSchemaNameUpper, '__table_name__' as metaColStr1, '__namespace__name__schema__name__table__name__' as metaColStr2", "id > 100", Schema.newBuilder() .physicalColumn("id", DataTypes.INT()) @@ -941,10 +942,12 @@ public void testMetadataTransformIncludeMetaColumnString() throws Exception { .physicalColumn("name", DataTypes.STRING().notNull()) .physicalColumn("age", DataTypes.INT().notNull()) .physicalColumn("computed", DataTypes.INT()) - .physicalColumn("metaCol1", DataTypes.STRING()) - .physicalColumn("metaCol2", DataTypes.STRING()) - .physicalColumn("metaCol3", DataTypes.STRING()) - .physicalColumn("metaCol4", DataTypes.STRING()) + .physicalColumn("metaColNameSpaceName", DataTypes.STRING()) + .physicalColumn("metaColSchemaName", DataTypes.STRING()) + .physicalColumn("metaColNameTableName", DataTypes.STRING()) + .physicalColumn("metaColSchemaNameUpper", DataTypes.STRING()) + .physicalColumn("metaColStr1", DataTypes.STRING()) + .physicalColumn("metaColStr2", DataTypes.STRING()) .primaryKey("id") .build()) .initializeHarness() @@ -955,10 +958,12 @@ public void testMetadataTransformIncludeMetaColumnString() throws Exception { "Alice", 17, 1017, + "my_company", + "my_branch", "schema_nullability", + "MY_BRANCH", "__table_name__", - "__namespace__name__schema__name__table__name__", - "MY_BRANCH") + "__namespace__name__schema__name__table__name__") .runTests() .destroyHarness(); } From 8137c90a1ee326326089f94dd8785450067e8184 Mon Sep 17 00:00:00 2001 From: MOBIN <18814118038@163.com> Date: Sat, 10 Aug 2024 12:43:21 +0800 Subject: [PATCH 3/3] fix comment Co-authored-by: yuxiqian <34335406+yuxiqian@users.noreply.github.com> --- .../org/apache/flink/cdc/runtime/parser/TransformParser.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java index 47bedc638e7..ae598b7da88 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java @@ -499,7 +499,7 @@ private static SqlSelect parseProjectionExpression(String projection) { } private static List copyFillMetadataColumn(List columns) { - // Add metaColumn for SQLValidater.validate + // Add metaColumn for SQLValidator.validate List columnsWithMetadata = new ArrayList<>(columns); columnsWithMetadata.add(Column.physicalColumn(DEFAULT_NAMESPACE_NAME, DataTypes.STRING())); columnsWithMetadata.add(Column.physicalColumn(DEFAULT_SCHEMA_NAME, DataTypes.STRING()));