diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java
index c90329a8dc0..111c32992dd 100644
--- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java
+++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java
@@ -372,6 +372,25 @@ private static Schema applyAlterColumnTypeEvent(AlterColumnTypeEvent event, Sche
return oldSchema.copy(columns);
}
+ /**
+ * This function determines if the given schema change event {@code event} should be sent to
+ * downstream based on if the given transform rule has asterisk, and what columns are
+ * referenced.
+ *
+ *
For example, if {@code hasAsterisk} is false, then all {@code AddColumnEvent} and {@code
+ * DropColumnEvent} should be ignored since asterisk-less transform should not emit schema
+ * change events that change number of downstream columns.
+ *
+ *
Also, {@code referencedColumns} will be used to determine if the schema change event
+ * affects any referenced columns, since if a column has been projected out of downstream, its
+ * corresponding schema change events should not be emitted, either.
+ *
+ *
For the case when {@code hasAsterisk} is true, things will be cleaner since we don't have
+ * to filter out any schema change events. All we need to do is to change {@code
+ * AddColumnEvent}'s inserting position, and replacing `FIRST` / `LAST` with column-relative
+ * position indicators. This is necessary since extra calculated columns might be added, and
+ * `FIRST` / `LAST` position might differ.
+ */
public static Optional transformSchemaChangeEvent(
boolean hasAsterisk, List referencedColumns, SchemaChangeEvent event) {
Optional evolvedSchemaChangeEvent = Optional.empty();
diff --git a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
index 88ad52fca60..b1a1c6a19b8 100644
--- a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
+++ b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
@@ -1227,6 +1227,101 @@ void testPostAsteriskWithSchemaEvolution() throws Exception {
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[15 -> Oops, 12th, 15, Oops], after=[], op=DELETE, meta=()}");
}
+ @Test
+ void testTransformUnmatchedSchemaEvolution() throws Exception {
+ FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
+
+ // Setup value source
+ Configuration sourceConfig = new Configuration();
+
+ sourceConfig.set(
+ ValuesDataSourceOptions.EVENT_SET_ID,
+ ValuesDataSourceHelper.EventSetId.CUSTOM_SOURCE_EVENTS);
+
+ TableId tableId = TableId.tableId("default_namespace", "default_schema", "mytable1");
+ List events = generateSchemaEvolutionEvents(tableId);
+
+ ValuesDataSourceHelper.setSourceEvents(Collections.singletonList(events));
+
+ SourceDef sourceDef =
+ new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", sourceConfig);
+
+ // Setup value sink
+ Configuration sinkConfig = new Configuration();
+ SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig);
+
+ // Setup pipeline
+ Configuration pipelineConfig = new Configuration();
+ pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
+ pipelineConfig.set(
+ PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
+ PipelineDef pipelineDef =
+ new PipelineDef(
+ sourceDef,
+ sinkDef,
+ Collections.emptyList(),
+ Collections.singletonList(
+ new TransformDef(
+ "foo.bar.baz", // This doesn't match given tableId
+ "*",
+ null,
+ null,
+ null,
+ null,
+ null)),
+ Collections.emptyList(),
+ pipelineConfig);
+
+ // Execute the pipeline
+ PipelineExecution execution = composer.compose(pipelineDef);
+ execution.execute();
+
+ // Check the order and content of all received events
+ String[] outputEvents = outCaptor.toString().trim().split("\n");
+
+ assertThat(outputEvents)
+ .containsExactly(
+ // Initial stage
+ "CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`name` STRING,`age` INT}, primaryKeys=id, options=()}",
+ "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1, Alice, 21], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2, Barcarolle, 22], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[3, Cecily, 23], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[3, Cecily, 23], after=[3, Colin, 24], op=UPDATE, meta=()}",
+ "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, Barcarolle, 22], after=[], op=DELETE, meta=()}",
+
+ // Add column stage
+ "AddColumnEvent{tableId=default_namespace.default_schema.mytable1, addedColumns=[ColumnWithPosition{column=`rank` STRING, position=BEFORE, existedColumnName=id}, ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}",
+ "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1st, 4, Derrida, 24, 0], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2nd, 5, Eve, 25, 1], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2nd, 5, Eve, 25, 1], after=[2nd, 5, Eva, 20, 2], op=UPDATE, meta=()}",
+ "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[3rd, 6, Fiona, 26, 3], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[3rd, 6, Fiona, 26, 3], after=[], op=DELETE, meta=()}",
+
+ // Alter column type stage
+ "AlterColumnTypeEvent{tableId=default_namespace.default_schema.mytable1, typeMapping={gender=INT, name=VARCHAR(17), age=DOUBLE}, oldTypeMapping={gender=TINYINT, name=STRING, age=INT}}",
+ "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[4th, 7, Gem, 19.0, -1], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[5th, 8, Helen, 18.0, -2], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[5th, 8, Helen, 18.0, -2], after=[5th, 8, Harry, 18.0, -3], op=UPDATE, meta=()}",
+ "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[6th, 9, IINA, 17.0, 0], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[6th, 9, IINA, 17.0, 0], after=[], op=DELETE, meta=()}",
+
+ // Rename column stage
+ "RenameColumnEvent{tableId=default_namespace.default_schema.mytable1, nameMapping={gender=biological_sex, age=toshi}}",
+ "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[7th, 10, Julia, 24.0, 1], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[8th, 11, Kalle, 23.0, 0], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[8th, 11, Kalle, 23.0, 0], after=[8th, 11, Kella, 18.0, 0], op=UPDATE, meta=()}",
+ "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[9th, 12, Lynx, 17.0, 0], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[9th, 12, Lynx, 17.0, 0], after=[], op=DELETE, meta=()}",
+
+ // Drop column stage
+ "DropColumnEvent{tableId=default_namespace.default_schema.mytable1, droppedColumnNames=[biological_sex, toshi]}",
+ "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[10th, 13, Munroe], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[11th, 14, Neko], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[11th, 14, Neko], after=[11th, 14, Nein], op=UPDATE, meta=()}",
+ "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[12th, 15, Oops], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[12th, 15, Oops], after=[], op=DELETE, meta=()}");
+ }
+
private List generateSchemaEvolutionEvents(TableId tableId) {
List events = new ArrayList<>();
diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java
index 83dd07c53eb..8a607ffb56d 100644
--- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java
+++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java
@@ -44,6 +44,7 @@
import java.lang.reflect.InvocationTargetException;
import java.time.ZoneId;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -242,6 +243,8 @@ public void processElement(StreamRecord element) throws Exception {
private Optional cacheSchema(SchemaChangeEvent event) throws Exception {
TableId tableId = event.tableId();
+ List columnNamesBeforeChange = Collections.emptyList();
+
if (event instanceof CreateTableEvent) {
CreateTableEvent createTableEvent = (CreateTableEvent) event;
Set projectedColumnsSet =
@@ -286,6 +289,9 @@ private Optional cacheSchema(SchemaChangeEvent event) throws
createTableEvent.getSchema().getColumnNames().stream()
.filter(projectedColumnsSet::contains)
.collect(Collectors.toList()));
+ } else {
+ columnNamesBeforeChange =
+ getPostTransformChangeInfo(tableId).getPreTransformedSchema().getColumnNames();
}
Schema schema;
@@ -304,9 +310,12 @@ private Optional cacheSchema(SchemaChangeEvent event) throws
if (event instanceof CreateTableEvent) {
return Optional.of(new CreateTableEvent(tableId, projectedSchema));
+ } else if (hasAsteriskMap.getOrDefault(tableId, true)) {
+ // See comments in PreTransformOperator#cacheChangeSchema method.
+ return SchemaUtils.transformSchemaChangeEvent(true, columnNamesBeforeChange, event);
} else {
return SchemaUtils.transformSchemaChangeEvent(
- hasAsteriskMap.get(tableId), projectedColumnsMap.get(tableId), event);
+ false, projectedColumnsMap.get(tableId), event);
}
}
diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java
index 3b050f993c9..7b43166acbd 100644
--- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java
+++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java
@@ -29,7 +29,6 @@
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.event.TruncateTableEvent;
-import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.schema.Selectors;
import org.apache.flink.cdc.common.utils.SchemaUtils;
@@ -52,7 +51,6 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
-import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
@@ -73,7 +71,6 @@ public class PreTransformOperator extends AbstractStreamOperator
private List udfDescriptors;
private Map preTransformProcessorMap;
private Map hasAsteriskMap;
- private Map> referencedColumnsMap;
public static PreTransformOperator.Builder newBuilder() {
return new PreTransformOperator.Builder();
@@ -165,7 +162,6 @@ public void setup(
}
this.preTransformProcessorMap = new ConcurrentHashMap<>();
this.hasAsteriskMap = new ConcurrentHashMap<>();
- this.referencedColumnsMap = new ConcurrentHashMap<>();
}
@Override
@@ -188,8 +184,7 @@ public void initializeState(StateInitializationContext context) throws Exception
new CreateTableEvent(
stateTableChangeInfo.getTableId(),
stateTableChangeInfo.getPreTransformedSchema());
- // hasAsteriskMap and referencedColumnsMap needs to be recalculated after restoring
- // from a checkpoint.
+ // hasAsteriskMap needs to be recalculated after restoring from a checkpoint.
cacheTransformRuleInfo(restoredCreateTableEvent);
// Since PostTransformOperator doesn't preserve state, pre-transformed schema
@@ -268,9 +263,27 @@ private Optional cacheChangeSchema(SchemaChangeEvent event) {
Schema originalSchema =
SchemaUtils.applySchemaChangeEvent(tableChangeInfo.getSourceSchema(), event);
Schema preTransformedSchema = tableChangeInfo.getPreTransformedSchema();
- Optional schemaChangeEvent =
- SchemaUtils.transformSchemaChangeEvent(
- hasAsteriskMap.get(tableId), referencedColumnsMap.get(tableId), event);
+
+ Optional schemaChangeEvent;
+ if (hasAsteriskMap.getOrDefault(tableId, true)) {
+ // If this TableId is asterisk-ful, we should use the latest upstream schema as
+ // referenced columns to perform schema evolution, not of the original ones generated
+ // when creating tables. If hasAsteriskMap has no entry for this TableId, it means that
+ // this TableId has not been referenced by any transform rules, and should be regarded
+ // as asterisk-ful by default.
+ schemaChangeEvent =
+ SchemaUtils.transformSchemaChangeEvent(
+ true, tableChangeInfo.getSourceSchema().getColumnNames(), event);
+ } else {
+ // Otherwise, we will use the pre-transformed columns to determine if the given schema
+ // change event should be passed to downstream, only when it is presented in the
+ // pre-transformed schema.
+ schemaChangeEvent =
+ SchemaUtils.transformSchemaChangeEvent(
+ false,
+ tableChangeInfo.getPreTransformedSchema().getColumnNames(),
+ event);
+ }
if (schemaChangeEvent.isPresent()) {
preTransformedSchema =
SchemaUtils.applySchemaChangeEvent(
@@ -283,26 +296,8 @@ private Optional cacheChangeSchema(SchemaChangeEvent event) {
private void cacheTransformRuleInfo(CreateTableEvent createTableEvent) {
TableId tableId = createTableEvent.tableId();
- Set referencedColumnsSet =
- transforms.stream()
- .filter(t -> t.getSelectors().isMatch(tableId))
- .flatMap(
- rule ->
- TransformParser.generateReferencedColumns(
- rule.getProjection()
- .map(TransformProjection::getProjection)
- .orElse(null),
- rule.getFilter()
- .map(TransformFilter::getExpression)
- .orElse(null),
- createTableEvent.getSchema().getColumns())
- .stream())
- .map(Column::getName)
- .collect(Collectors.toSet());
-
boolean notTransformed =
transforms.stream().noneMatch(t -> t.getSelectors().isMatch(tableId));
-
if (notTransformed) {
// If this TableId isn't presented in any transform block, it should behave like a "*"
// projection and should be regarded as asterisk-ful.
@@ -320,11 +315,6 @@ private void cacheTransformRuleInfo(CreateTableEvent createTableEvent) {
hasAsteriskMap.put(createTableEvent.tableId(), hasAsterisk);
}
- referencedColumnsMap.put(
- createTableEvent.tableId(),
- createTableEvent.getSchema().getColumnNames().stream()
- .filter(referencedColumnsSet::contains)
- .collect(Collectors.toList()));
}
private CreateTableEvent transformCreateTableEvent(CreateTableEvent createTableEvent) {