diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java index 3dde9b20c53..3694a29e247 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java @@ -152,22 +152,22 @@ private TransformExpressionKey generateTransformExpressionKey() { } } } - if (scriptExpression.contains(TransformParser.DEFAULT_NAMESPACE_NAME) - && !argumentNames.contains(TransformParser.DEFAULT_NAMESPACE_NAME)) { - argumentNames.add(TransformParser.DEFAULT_NAMESPACE_NAME); - paramTypes.add(String.class); - } - - if (scriptExpression.contains(TransformParser.DEFAULT_SCHEMA_NAME) - && !argumentNames.contains(TransformParser.DEFAULT_SCHEMA_NAME)) { - argumentNames.add(TransformParser.DEFAULT_SCHEMA_NAME); - paramTypes.add(String.class); - } - if (scriptExpression.contains(TransformParser.DEFAULT_TABLE_NAME) - && !argumentNames.contains(TransformParser.DEFAULT_TABLE_NAME)) { - argumentNames.add(TransformParser.DEFAULT_TABLE_NAME); - paramTypes.add(String.class); + for (String originalColumnName : originalColumnNames) { + switch (originalColumnName) { + case TransformParser.DEFAULT_NAMESPACE_NAME: + argumentNames.add(TransformParser.DEFAULT_NAMESPACE_NAME); + paramTypes.add(String.class); + break; + case TransformParser.DEFAULT_SCHEMA_NAME: + argumentNames.add(TransformParser.DEFAULT_SCHEMA_NAME); + paramTypes.add(String.class); + break; + case TransformParser.DEFAULT_TABLE_NAME: + argumentNames.add(TransformParser.DEFAULT_TABLE_NAME); + paramTypes.add(String.class); + break; + } } argumentNames.add(JaninoCompiler.DEFAULT_TIME_ZONE); diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java index 6260bf7b0ac..ae598b7da88 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java @@ -109,7 +109,7 @@ private static RelNode sqlToRel( List columns, SqlNode sqlNode, List udfDescriptors) { - List columnsWithMetadata = copyFillMetadataColumn(sqlNode.toString(), columns); + List columnsWithMetadata = copyFillMetadataColumn(columns); CalciteSchema rootSchema = CalciteSchema.createRootSchema(true); SchemaPlus schema = rootSchema.plus(); Map operand = new HashMap<>(); @@ -498,29 +498,15 @@ private static SqlSelect parseProjectionExpression(String projection) { return parseSelect(statement.toString()); } - private static List copyFillMetadataColumn( - String transformStatement, List columns) { + private static List copyFillMetadataColumn(List columns) { + // Add metaColumn for SQLValidator.validate List columnsWithMetadata = new ArrayList<>(columns); - if (transformStatement.contains(DEFAULT_NAMESPACE_NAME) - && !containsMetadataColumn(columnsWithMetadata, DEFAULT_NAMESPACE_NAME)) { - columnsWithMetadata.add( - Column.physicalColumn(DEFAULT_NAMESPACE_NAME, DataTypes.STRING())); - } - if (transformStatement.contains(DEFAULT_SCHEMA_NAME) - && !containsMetadataColumn(columnsWithMetadata, DEFAULT_SCHEMA_NAME)) { - columnsWithMetadata.add(Column.physicalColumn(DEFAULT_SCHEMA_NAME, DataTypes.STRING())); - } - if (transformStatement.contains(DEFAULT_TABLE_NAME) - && !containsMetadataColumn(columnsWithMetadata, DEFAULT_TABLE_NAME)) { - columnsWithMetadata.add(Column.physicalColumn(DEFAULT_TABLE_NAME, DataTypes.STRING())); - } + columnsWithMetadata.add(Column.physicalColumn(DEFAULT_NAMESPACE_NAME, DataTypes.STRING())); + columnsWithMetadata.add(Column.physicalColumn(DEFAULT_SCHEMA_NAME, DataTypes.STRING())); + columnsWithMetadata.add(Column.physicalColumn(DEFAULT_TABLE_NAME, DataTypes.STRING())); return columnsWithMetadata; } - private static boolean containsMetadataColumn(List columns, String columnName) { - return columns.stream().anyMatch(column -> column.getName().equals(columnName)); - } - private static boolean isMetadataColumn(String columnName) { return DEFAULT_TABLE_NAME.equals(columnName) || DEFAULT_SCHEMA_NAME.equals(columnName) diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/UnifiedTransformOperatorTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/UnifiedTransformOperatorTest.java index 009d700fee0..43a364e5153 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/UnifiedTransformOperatorTest.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/UnifiedTransformOperatorTest.java @@ -917,6 +917,57 @@ public void testMetadataAndCalculatedTransform() throws Exception { .destroyHarness(); } + @Test + public void testMetadataTransformIncludeMetaColumnString() throws Exception { + TableId tableId = TableId.tableId("my_company", "my_branch", "schema_nullability"); + UnifiedTransformTestCase.of( + tableId, + "id, name, age, id + age as computed, __namespace_name__ as metaColNameSpaceName, __schema_name__ as metaColSchemaName, __table_name__ as metaColNameTableName, " + + "UPPER(__schema_name__) as metaColSchemaNameUpper, '__table_name__' as metaColStr1, '__namespace__name__schema__name__table__name__' as metaColStr2", + "id > 100", + Schema.newBuilder() + .physicalColumn("id", DataTypes.INT()) + .physicalColumn("name", DataTypes.STRING().notNull()) + .physicalColumn("age", DataTypes.INT().notNull()) + .primaryKey("id") + .build(), + Schema.newBuilder() + .physicalColumn("id", DataTypes.INT()) + .physicalColumn("name", DataTypes.STRING().notNull()) + .physicalColumn("age", DataTypes.INT().notNull()) + .primaryKey("id") + .build(), + Schema.newBuilder() + .physicalColumn("id", DataTypes.INT()) + .physicalColumn("name", DataTypes.STRING().notNull()) + .physicalColumn("age", DataTypes.INT().notNull()) + .physicalColumn("computed", DataTypes.INT()) + .physicalColumn("metaColNameSpaceName", DataTypes.STRING()) + .physicalColumn("metaColSchemaName", DataTypes.STRING()) + .physicalColumn("metaColNameTableName", DataTypes.STRING()) + .physicalColumn("metaColSchemaNameUpper", DataTypes.STRING()) + .physicalColumn("metaColStr1", DataTypes.STRING()) + .physicalColumn("metaColStr2", DataTypes.STRING()) + .primaryKey("id") + .build()) + .initializeHarness() + .insertSource(1000, "Alice", 17) + .insertPreTransformed(1000, "Alice", 17) + .insertPostTransformed( + 1000, + "Alice", + 17, + 1017, + "my_company", + "my_branch", + "schema_nullability", + "MY_BRANCH", + "__table_name__", + "__namespace__name__schema__name__table__name__") + .runTests() + .destroyHarness(); + } + @Test public void testTransformWithCast() throws Exception { TableId tableId = TableId.tableId("my_company", "my_branch", "transform_with_cast");