diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSink.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSink.java index 61824ec44a0..5e8893e96cf 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSink.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSink.java @@ -21,6 +21,7 @@ import org.apache.flink.api.connector.sink2.Committer; import org.apache.flink.api.connector.sink2.Sink; import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.runtime.checkpoint.CheckpointIDCounter; import org.apache.flink.streaming.api.connector.sink2.CommittableMessage; import org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo; import org.apache.flink.streaming.api.connector.sink2.WithPreCommitTopology; @@ -61,7 +62,11 @@ public PaimonSink( @Override public PaimonWriter createWriter(InitContext context) { - return new PaimonWriter<>(catalogOptions, context.metricGroup(), commitUser, serializer); + long lastCheckpointId = + context.getRestoredCheckpointId() + .orElse(CheckpointIDCounter.INITIAL_CHECKPOINT_ID - 1); + return new PaimonWriter<>( + catalogOptions, context.metricGroup(), commitUser, serializer, lastCheckpointId); } @Override diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriter.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriter.java index 87229f36c00..d7b682b47b7 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriter.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriter.java @@ -24,6 +24,7 @@ import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; +import org.apache.flink.streaming.api.operators.StreamOperator; import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.Identifier; @@ -68,11 +69,15 @@ public class PaimonWriter private final MetricGroup metricGroup; private final List committables; + /** A workaround variable trace the checkpointId in {@link StreamOperator#snapshotState}. */ + private long lastCheckpointId; + public PaimonWriter( Options catalogOptions, MetricGroup metricGroup, String commitUser, - PaimonRecordSerializer serializer) { + PaimonRecordSerializer serializer, + long lastCheckpointId) { catalog = FlinkCatalogFactory.createPaimonCatalog(catalogOptions); this.metricGroup = metricGroup; this.commitUser = commitUser; @@ -85,12 +90,14 @@ public PaimonWriter( new ExecutorThreadFactory( Thread.currentThread().getName() + "-CdcMultiWrite-Compaction")); this.serializer = serializer; + this.lastCheckpointId = lastCheckpointId; } @Override public Collection prepareCommit() { Collection allCommittables = new ArrayList<>(committables); committables.clear(); + lastCheckpointId++; return allCommittables; } @@ -156,6 +163,9 @@ private FileStoreTable getTable(Identifier tableId) { * Called on checkpoint or end of input so that the writer to flush all pending data for * at-least-once. * + *

Execution order: flush(boolean endOfInput)=>prepareCommit()=>snapshotState(long + * checkpointId). + * *

this method will also be called when receiving {@link FlushEvent}, but we don't need to * commit the MultiTableCommittables immediately in this case, because {@link PaimonCommitter} * support committing data of different schemas. @@ -166,10 +176,14 @@ public void flush(boolean endOfInput) throws IOException { Identifier key = entry.getKey(); StoreSinkWrite write = entry.getValue(); boolean waitCompaction = false; - // checkpointId will be updated correctly by PreCommitOperator. - long checkpointId = 1L; committables.addAll( - write.prepareCommit(waitCompaction, checkpointId).stream() + // here we set it to lastCheckpointId+1 to + // avoid prepareCommit the same checkpointId with the first round. + // One thing to note is that during schema evolution, flush and checkpoint are + // consistent, + // but as long as there is data coming in, it will not trigger any conflict + // issues + write.prepareCommit(waitCompaction, lastCheckpointId + 1).stream() .map( committable -> MultiTableCommittable.fromCommittable(key, committable)) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java index f44fd47a333..cd494920ca7 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java @@ -253,6 +253,19 @@ public void testSinkWithDataChange(String metastore) .forEachRemaining(result::add); Assertions.assertEquals( Collections.singletonList(Row.ofKind(RowKind.INSERT, "2", "x")), result); + + result = new ArrayList<>(); + tEnv.sqlQuery("select max_sequence_number from paimon_catalog.test.`table1$files`") + .execute() + .collect() + .forEachRemaining(result::add); + // Each commit will generate one sequence number(equal to checkpointId). + Assertions.assertEquals( + Arrays.asList( + Row.ofKind(RowKind.INSERT, 1L), + Row.ofKind(RowKind.INSERT, 2L), + Row.ofKind(RowKind.INSERT, 3L)), + result); } @ParameterizedTest