From 92918a346d53d17a23ebf0c58105260743da5c71 Mon Sep 17 00:00:00 2001 From: liliwei Date: Wed, 23 Nov 2022 14:58:22 +0800 Subject: [PATCH] Flink: Write the watermark to the snapshot summary --- .../flink/sink/IcebergFilesCommitter.java | 76 ++++++++++++++----- .../flink/sink/TestIcebergFilesCommitter.java | 43 +++++++++++ 2 files changed, 99 insertions(+), 20 deletions(-) diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java index 8aa2c0304eb0..15062941cfb4 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java @@ -37,6 +37,7 @@ import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.BoundedOneInput; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.table.runtime.typeutils.SortedMapTypeInfo; import org.apache.iceberg.AppendFiles; @@ -65,6 +66,7 @@ class IcebergFilesCommitter extends AbstractStreamOperator private static final long serialVersionUID = 1L; private static final long INITIAL_CHECKPOINT_ID = -1L; + private static final long INITIAL_WATERMARK = -1L; private static final byte[] EMPTY_MANIFEST_DATA = new byte[0]; private static final Logger LOG = LoggerFactory.getLogger(IcebergFilesCommitter.class); @@ -76,6 +78,7 @@ class IcebergFilesCommitter extends AbstractStreamOperator // id will be attached to iceberg's meta when committing the iceberg transaction. private static final String MAX_COMMITTED_CHECKPOINT_ID = "flink.max-committed-checkpoint-id"; static final String MAX_CONTINUOUS_EMPTY_COMMITS = "flink.max-continuous-empty-commits"; + static final String WATERMARK = "flink.watermark"; // TableLoader to load iceberg table lazily. private final TableLoader tableLoader; @@ -101,6 +104,7 @@ class IcebergFilesCommitter extends AbstractStreamOperator private transient IcebergFilesCommitterMetrics committerMetrics; private transient ManifestOutputFileFactory manifestOutputFileFactory; private transient long maxCommittedCheckpointId; + private transient long watermark; private transient int continuousEmptyCheckpoints; private transient int maxContinuousEmptyCommits; // There're two cases that we restore from flink checkpoints: the first case is restoring from @@ -177,6 +181,7 @@ public void initializeState(StateInitializationContext context) throws Exception // it's safe to assign the max committed checkpoint id from restored flink job to the current // flink job. this.maxCommittedCheckpointId = getMaxCommittedCheckpointId(table, restoredFlinkJobId); + this.watermark = getWatermark(table, restoredFlinkJobId); NavigableMap uncommittedDataFiles = Maps.newTreeMap(checkpointsState.get().iterator().next()) @@ -184,7 +189,8 @@ public void initializeState(StateInitializationContext context) throws Exception if (!uncommittedDataFiles.isEmpty()) { // Committed all uncommitted data files from the old flink job to iceberg table. long maxUncommittedCheckpointId = uncommittedDataFiles.lastKey(); - commitUpToCheckpoint(uncommittedDataFiles, restoredFlinkJobId, maxUncommittedCheckpointId); + commitUpToCheckpoint( + uncommittedDataFiles, restoredFlinkJobId, maxUncommittedCheckpointId, watermark); } } } @@ -226,13 +232,16 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception { // the files, // Besides, we need to maintain the max-committed-checkpoint-id to be increasing. if (checkpointId > maxCommittedCheckpointId) { - commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId, checkpointId); + commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId, checkpointId, watermark); this.maxCommittedCheckpointId = checkpointId; } } private void commitUpToCheckpoint( - NavigableMap deltaManifestsMap, String newFlinkJobId, long checkpointId) + NavigableMap deltaManifestsMap, + String newFlinkJobId, + long checkpointId, + long newWatermark) throws IOException { NavigableMap pendingMap = deltaManifestsMap.headMap(checkpointId, true); List manifests = Lists.newArrayList(); @@ -252,7 +261,7 @@ private void commitUpToCheckpoint( } CommitSummary summary = new CommitSummary(pendingResults); - commitPendingResult(pendingResults, summary, newFlinkJobId, checkpointId); + commitPendingResult(pendingResults, summary, newFlinkJobId, checkpointId, newWatermark); committerMetrics.updateCommitSummary(summary); pendingMap.clear(); deleteCommittedManifests(manifests, newFlinkJobId, checkpointId); @@ -262,14 +271,15 @@ private void commitPendingResult( NavigableMap pendingResults, CommitSummary summary, String newFlinkJobId, - long checkpointId) { + long checkpointId, + long newWatermark) { long totalFiles = summary.dataFilesCount() + summary.deleteFilesCount(); continuousEmptyCheckpoints = totalFiles == 0 ? continuousEmptyCheckpoints + 1 : 0; if (totalFiles != 0 || continuousEmptyCheckpoints % maxContinuousEmptyCommits == 0) { if (replacePartitions) { - replacePartitions(pendingResults, summary, newFlinkJobId, checkpointId); + replacePartitions(pendingResults, summary, newFlinkJobId, checkpointId, newWatermark); } else { - commitDeltaTxn(pendingResults, summary, newFlinkJobId, checkpointId); + commitDeltaTxn(pendingResults, summary, newFlinkJobId, checkpointId, newWatermark); } continuousEmptyCheckpoints = 0; } @@ -300,7 +310,8 @@ private void replacePartitions( NavigableMap pendingResults, CommitSummary summary, String newFlinkJobId, - long checkpointId) { + long checkpointId, + long newWatermark) { Preconditions.checkState( summary.deleteFilesCount() == 0, "Cannot overwrite partitions with delete files."); // Commit the overwrite transaction. @@ -312,14 +323,20 @@ private void replacePartitions( } commitOperation( - dynamicOverwrite, summary, "dynamic partition overwrite", newFlinkJobId, checkpointId); + dynamicOverwrite, + summary, + "dynamic partition overwrite", + newFlinkJobId, + checkpointId, + newWatermark); } private void commitDeltaTxn( NavigableMap pendingResults, CommitSummary summary, String newFlinkJobId, - long checkpointId) { + long checkpointId, + long newWatermark) { if (summary.deleteFilesCount() == 0) { // To be compatible with iceberg format V1. AppendFiles appendFiles = table.newAppend().scanManifestsWith(workerPool); @@ -329,7 +346,7 @@ private void commitDeltaTxn( "Should have no referenced data files for append."); Arrays.stream(result.dataFiles()).forEach(appendFiles::appendFile); } - commitOperation(appendFiles, summary, "append", newFlinkJobId, checkpointId); + commitOperation(appendFiles, summary, "append", newFlinkJobId, checkpointId, newWatermark); } else { // To be compatible with iceberg format V2. for (Map.Entry e : pendingResults.entrySet()) { @@ -350,7 +367,7 @@ private void commitDeltaTxn( Arrays.stream(result.dataFiles()).forEach(rowDelta::addRows); Arrays.stream(result.deleteFiles()).forEach(rowDelta::addDeletes); - commitOperation(rowDelta, summary, "rowDelta", newFlinkJobId, e.getKey()); + commitOperation(rowDelta, summary, "rowDelta", newFlinkJobId, e.getKey(), newWatermark); } } } @@ -360,13 +377,15 @@ private void commitOperation( CommitSummary summary, String description, String newFlinkJobId, - long checkpointId) { + long checkpointId, + long newWatermark) { LOG.info("Committing {} to table {} with summary: {}", description, table.name(), summary); snapshotProperties.forEach(operation::set); // custom snapshot metadata properties will be overridden if they conflict with internal ones // used by the sink. operation.set(MAX_COMMITTED_CHECKPOINT_ID, Long.toString(checkpointId)); operation.set(FLINK_JOB_ID, newFlinkJobId); + operation.set(WATERMARK, Long.toString(newWatermark)); long startNano = System.nanoTime(); operation.commit(); // abort is automatically called if this fails. @@ -392,7 +411,13 @@ public void endInput() throws IOException { dataFilesPerCheckpoint.put(currentCheckpointId, writeToManifest(currentCheckpointId)); writeResultsOfCurrentCkpt.clear(); - commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId, currentCheckpointId); + commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId, currentCheckpointId, watermark); + } + + @Override + public void processWatermark(Watermark mark) throws Exception { + super.processWatermark(mark); + this.watermark = mark.getTimestamp(); } /** @@ -445,23 +470,34 @@ private static ListStateDescriptor> buildStateDescriptor } static long getMaxCommittedCheckpointId(Table table, String flinkJobId) { + String lastCommittedCheckpointId = + getSummary( + table, flinkJobId, MAX_COMMITTED_CHECKPOINT_ID, Long.toString(INITIAL_CHECKPOINT_ID)); + return Long.parseLong(lastCommittedCheckpointId); + } + + static long getWatermark(Table table, String flinkJobId) { + String lastCommittedCheckpointId = + getSummary(table, flinkJobId, WATERMARK, Long.toString(INITIAL_WATERMARK)); + return Long.parseLong(lastCommittedCheckpointId); + } + + private static String getSummary( + Table table, String flinkJobId, String summaryName, String defaultValue) { Snapshot snapshot = table.currentSnapshot(); - long lastCommittedCheckpointId = INITIAL_CHECKPOINT_ID; while (snapshot != null) { Map summary = snapshot.summary(); String snapshotFlinkJobId = summary.get(FLINK_JOB_ID); if (flinkJobId.equals(snapshotFlinkJobId)) { - String value = summary.get(MAX_COMMITTED_CHECKPOINT_ID); + String value = summary.get(summaryName); if (value != null) { - lastCommittedCheckpointId = Long.parseLong(value); - break; + return value; } } Long parentSnapshotId = snapshot.parentId(); snapshot = parentSnapshotId != null ? table.snapshot(parentSnapshotId) : null; } - - return lastCommittedCheckpointId; + return defaultValue; } } diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java index 6fc4a5639fcf..983ae76200b4 100644 --- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java +++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java @@ -208,6 +208,43 @@ public void testCommitTxn() throws Exception { } } + @Test + public void testWatermark() throws Exception { + long timestamp = 0; + + JobID jobID = new JobID(); + try (OneInputStreamOperatorTestHarness harness = createStreamSink(jobID)) { + harness.setup(); + harness.open(); + assertSnapshotSize(0); + + List rows = Lists.newArrayListWithExpectedSize(3); + for (int i = 1; i <= 3; i++) { + RowData rowData = SimpleDataUtil.createRowData(i, "hello" + i); + DataFile dataFile = writeDataFile("data-" + i, ImmutableList.of(rowData)); + timestamp = timestamp + i; + + harness.processElement(of(dataFile), timestamp); + rows.add(rowData); + + harness.snapshot(i, timestamp); + assertFlinkManifests(1); + + harness.processWatermark(timestamp); + + harness.notifyOfCompletedCheckpoint(i); + assertFlinkManifests(0); + + SimpleDataUtil.assertTableRows(table, ImmutableList.copyOf(rows)); + assertSnapshotSize(i); + assertWatermark(jobID, timestamp); + Assert.assertEquals( + TestIcebergFilesCommitter.class.getName(), + table.currentSnapshot().summary().get("flink.test")); + } + } + } + @Test public void testOrderedEventsBetweenCheckpoints() throws Exception { // It's possible that two checkpoints happen in the following orders: @@ -817,6 +854,12 @@ private void assertMaxCommittedCheckpointId(JobID jobID, long expectedId) { Assert.assertEquals(expectedId, actualId); } + private void assertWatermark(JobID jobID, long expectedWatermark) { + table.refresh(); + long actualWatermark = IcebergFilesCommitter.getWatermark(table, jobID.toString()); + Assert.assertEquals(expectedWatermark, actualWatermark); + } + private void assertSnapshotSize(int expectedSnapshotSize) { table.refresh(); Assert.assertEquals(expectedSnapshotSize, Lists.newArrayList(table.snapshots()).size());