diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/ChangelogNormalizeRequirementResolver.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/ChangelogNormalizeRequirementResolver.java index 2191fd519409f..1f886899fa0d2 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/ChangelogNormalizeRequirementResolver.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/ChangelogNormalizeRequirementResolver.java @@ -18,6 +18,7 @@ package org.apache.flink.table.planner.plan.optimize; +import org.apache.flink.table.api.config.ExecutionConfigOptions; import org.apache.flink.table.catalog.Column; import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata; import org.apache.flink.table.planner.connectors.DynamicSourceUtils; @@ -31,6 +32,7 @@ import org.apache.flink.table.planner.plan.trait.UpdateKindTrait$; import org.apache.flink.table.planner.plan.trait.UpdateKindTraitDef$; import org.apache.flink.table.planner.plan.utils.FlinkRelUtil; +import org.apache.flink.table.planner.utils.ShortcutUtils; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.SingleRel; @@ -68,6 +70,12 @@ public static boolean isRequired(StreamPhysicalChangelogNormalize normalize) { return true; } + // the changelog normalize is requested to perform deduplication on a retract stream + if (ShortcutUtils.unwrapTableConfig(normalize) + .get(ExecutionConfigOptions.TABLE_EXEC_SOURCE_CDC_EVENTS_DUPLICATE)) { + return true; + } + // check if metadata columns are accessed final RelNode input = normalize.getInput(); diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ChangelogNormalizeOptimizationTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ChangelogNormalizeOptimizationTest.java index ff5d9c592d9b0..a5e817d1d3300 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ChangelogNormalizeOptimizationTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ChangelogNormalizeOptimizationTest.java @@ -30,8 +30,10 @@ import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -94,7 +96,9 @@ static List getTests() { SinkTable.UPSERT_SINK_METADATA), TestSpec.selectWithoutMetadata( SourceTable.UPSERT_SOURCE_PARTIAL_DELETES_METADATA_NO_PUSHDOWN, - SinkTable.UPSERT_SINK)); + SinkTable.UPSERT_SINK), + TestSpec.select(SourceTable.RETRACT_SOURCE_PARTIAL_DELETES, SinkTable.UPSERT_SINK) + .withSessionOption("table.exec.source.cdc-events-duplicate", "true")); } @AfterEach @@ -106,6 +110,7 @@ void tearDown() { @ParameterizedTest() @MethodSource("getTests") void testChangelogNormalizePlan(TestSpec spec) { + spec.sessionOptions.forEach((key, value) -> util.tableEnv().getConfig().set(key, value)); for (TableProperties tableProperties : spec.tablesToCreate) { final String additionalColumns = String.join(",\n", tableProperties.getAdditionalColumns()); @@ -149,6 +154,10 @@ public enum SourceTable implements TableProperties { "'changelog-mode' = 'UA,D'", "'source.produces-delete-by-key'='true'", "'readable-metadata' = 'offset:BIGINT'"), + RETRACT_SOURCE_PARTIAL_DELETES( + "retract_table_partial_deletes", + "'changelog-mode' = 'I,UA,UB,D'", + "'source.produces-delete-by-key'='true'"), UPSERT_SOURCE_PARTIAL_DELETES_METADATA_NO_PUSHDOWN( "upsert_table_partial_deletes_metadata_no_pushdown", List.of("`offset` BIGINT METADATA"), @@ -254,6 +263,7 @@ public List getAdditionalColumns() { private static class TestSpec { private final Set tablesToCreate; + private final Map sessionOptions = new HashMap<>(); private final String query; private final String description; @@ -312,6 +322,11 @@ public static TestSpec join( rightTable.getTableName())); } + public TestSpec withSessionOption(String key, String value) { + this.sessionOptions.put(key, value); + return this; + } + @Override public String toString() { return description; diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ChangelogNormalizeOptimizationTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ChangelogNormalizeOptimizationTest.xml index e312e3f2f1b62..325cb889bc108 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ChangelogNormalizeOptimizationTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ChangelogNormalizeOptimizationTest.xml @@ -293,6 +293,27 @@ LogicalSink(table=[default_catalog.default_database.upsert_sink_table], fields=[ Sink(table=[default_catalog.default_database.upsert_sink_table], fields=[id, col1, col2], changelogMode=[NONE]) +- Calc(select=[id, col1, col2], changelogMode=[UA,PD]) +- TableSourceScan(table=[[default_catalog, default_database, upsert_table_partial_deletes_metadata_no_pushdown, metadata=[offset]]], fields=[id, col1, col2, offset], changelogMode=[UA,PD]) +]]> + + + + + + + + + + +