From 28156351455591e43ec1f7a66563da9d8bbee726 Mon Sep 17 00:00:00 2001 From: dixingxing Date: Mon, 18 Jan 2021 23:08:36 +0800 Subject: [PATCH 1/2] Support store wartermark into table properties --- .../flink/sink/IcebergFilesCommitter.java | 39 ++++++++++++ .../flink/sink/TestIcebergFilesCommitter.java | 60 +++++++++++++++++++ 2 files changed, 99 insertions(+) diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java b/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java index c1d3440db41b..eaf695739cad 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java +++ b/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java @@ -30,12 +30,15 @@ import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.java.typeutils.MapTypeInfo; import org.apache.flink.core.io.SimpleVersionedSerialization; import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.StateSnapshotContext; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.BoundedOneInput; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.table.runtime.typeutils.SortedMapTypeInfo; import org.apache.iceberg.AppendFiles; @@ -45,6 +48,7 @@ import org.apache.iceberg.Snapshot; import org.apache.iceberg.SnapshotUpdate; import org.apache.iceberg.Table; +import org.apache.iceberg.UpdateProperties; import org.apache.iceberg.flink.TableLoader; import org.apache.iceberg.io.WriteResult; import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; @@ -55,6 +59,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.types.Comparators; import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.PropertyUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -73,6 +78,8 @@ class IcebergFilesCommitter extends AbstractStreamOperator // avoiding committing the same data files twice. This id will be attached to iceberg's meta when committing the // iceberg transaction. private static final String MAX_COMMITTED_CHECKPOINT_ID = "flink.max-committed-checkpoint-id"; + static final String CURRENT_WATERMARK = "flink.current-watermark"; + static final String STORE_WATERMARK = "flink.store-watermark"; // TableLoader to load iceberg table lazily. private final TableLoader tableLoader; @@ -85,6 +92,7 @@ class IcebergFilesCommitter extends AbstractStreamOperator // any data loss in iceberg table. So we keep the finished files <1, > in memory and retry to commit // iceberg table when the next checkpoint happen. private final NavigableMap dataFilesPerCheckpoint = Maps.newTreeMap(); + private final Map watermarkPerCheckpoint = Maps.newHashMap(); // The completed files cache for current checkpoint. Once the snapshot barrier received, it will be flushed to the // 'dataFilesPerCheckpoint'. @@ -95,6 +103,8 @@ class IcebergFilesCommitter extends AbstractStreamOperator private transient Table table; private transient ManifestOutputFileFactory manifestOutputFileFactory; private transient long maxCommittedCheckpointId; + private transient long currentWatermark; + private transient boolean storeWatermark; // There're two cases that we restore from flink checkpoints: the first case is restoring from snapshot created by the // same flink job; another case is restoring from snapshot created by another different job. For the second case, we @@ -106,6 +116,9 @@ class IcebergFilesCommitter extends AbstractStreamOperator // All pending checkpoints states for this function. private static final ListStateDescriptor> STATE_DESCRIPTOR = buildStateDescriptor(); private transient ListState> checkpointsState; + private static final ListStateDescriptor> WATERMARK_DESCRIPTOR = new ListStateDescriptor<>( + "iceberg-flink-watermark", new MapTypeInfo<>(BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO)); + private transient ListState> watermarkState; IcebergFilesCommitter(TableLoader tableLoader, boolean replacePartitions) { this.tableLoader = tableLoader; @@ -126,9 +139,16 @@ public void initializeState(StateInitializationContext context) throws Exception this.manifestOutputFileFactory = FlinkManifestUtil.createOutputFileFactory(table, flinkJobId, subTaskId, attemptId); this.maxCommittedCheckpointId = INITIAL_CHECKPOINT_ID; + Map properties = this.table.properties(); + this.currentWatermark = PropertyUtil.propertyAsLong(properties, CURRENT_WATERMARK, -1L); + this.storeWatermark = PropertyUtil.propertyAsBoolean(properties, STORE_WATERMARK, false); + this.checkpointsState = context.getOperatorStateStore().getListState(STATE_DESCRIPTOR); this.jobIdState = context.getOperatorStateStore().getListState(JOB_ID_DESCRIPTOR); + this.watermarkState = context.getOperatorStateStore().getListState(WATERMARK_DESCRIPTOR); + if (context.isRestored()) { + watermarkPerCheckpoint.putAll(watermarkState.get().iterator().next()); String restoredFlinkJobId = jobIdState.get().iterator().next(); Preconditions.checkState(!Strings.isNullOrEmpty(restoredFlinkJobId), "Flink job id parsed from checkpoint snapshot shouldn't be null or empty"); @@ -162,6 +182,10 @@ public void snapshotState(StateSnapshotContext context) throws Exception { checkpointsState.clear(); checkpointsState.add(dataFilesPerCheckpoint); + watermarkPerCheckpoint.put(checkpointId, currentWatermark); + watermarkState.clear(); + watermarkState.add(watermarkPerCheckpoint); + jobIdState.clear(); jobIdState.add(flinkJobId); @@ -296,6 +320,13 @@ private void commitOperation(SnapshotUpdate operation, int numDataFiles, int long start = System.currentTimeMillis(); operation.commit(); // abort is automatically called if this fails. + + Long watermarkForCheckpoint = watermarkPerCheckpoint.get(checkpointId); + if (storeWatermark && watermarkForCheckpoint != null && watermarkForCheckpoint > 0) { + table.updateProperties().set(CURRENT_WATERMARK, String.valueOf(watermarkForCheckpoint)).commit(); + LOG.info("Update {} to {}", CURRENT_WATERMARK, watermarkForCheckpoint); + } + long duration = System.currentTimeMillis() - start; LOG.info("Committed in {} ms", duration); } @@ -305,6 +336,14 @@ public void processElement(StreamRecord element) { this.writeResultsOfCurrentCkpt.add(element.getValue()); } + @Override + public void processWatermark(Watermark mark) throws Exception { + super.processWatermark(mark); + if (mark.getTimestamp() != Watermark.MAX_WATERMARK.getTimestamp()) { + currentWatermark = mark.getTimestamp(); + } + } + @Override public void endInput() throws IOException { // Flush the buffered data files into 'dataFilesPerCheckpoint' firstly. diff --git a/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java b/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java index 418e480c8722..41d7913cd041 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java +++ b/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java @@ -181,6 +181,66 @@ public void testCommitTxn() throws Exception { } } + @Test + public void testCommitTxnWithWatermark() throws Exception { + table.updateProperties().set(IcebergFilesCommitter.STORE_WATERMARK, "true").commit(); + JobID jobID = new JobID(); + long baseTimestamp = System.currentTimeMillis(); + int checkpointId = 1; + try (OneInputStreamOperatorTestHarness harness = createStreamSink(jobID)) { + harness.setup(); + harness.open(); + List rows = Lists.newArrayListWithExpectedSize(3); + for (; checkpointId <= 3; checkpointId++) { + long timestamp = baseTimestamp + checkpointId; + RowData rowData = SimpleDataUtil.createRowData(checkpointId, "hello" + checkpointId); + DataFile dataFile = writeDataFile("data-" + checkpointId, ImmutableList.of(rowData)); + harness.processElement(of(dataFile), timestamp); + rows.add(rowData); + + harness.processWatermark(timestamp); + harness.snapshot(checkpointId, timestamp); + harness.notifyOfCompletedCheckpoint(checkpointId); + + SimpleDataUtil.assertTableRows(table, ImmutableList.copyOf(rows)); + table.refresh(); + Assert.assertEquals(String.valueOf(baseTimestamp + checkpointId), + table.properties().get(IcebergFilesCommitter.CURRENT_WATERMARK)); + } + } + + OperatorSubtaskState snapshot; + checkpointId = 4; + long timestamp4 = baseTimestamp + checkpointId; + try (OneInputStreamOperatorTestHarness harness = createStreamSink(jobID)) { + harness.setup(); + harness.open(); + RowData rowData = SimpleDataUtil.createRowData(checkpointId, "hello" + checkpointId); + DataFile dataFile = writeDataFile("data-" + checkpointId, ImmutableList.of(rowData)); + harness.processElement(of(dataFile), timestamp4); + + harness.processWatermark(timestamp4); + snapshot = harness.snapshot(checkpointId, timestamp4); + + table.refresh(); + // since checkpoint 4 has not been committed, so flink.current-watermark should be watermark for checkpoint3 + Assert.assertEquals(String.valueOf(baseTimestamp + 3), + table.properties().get(IcebergFilesCommitter.CURRENT_WATERMARK)); + } + + // test restore + try (OneInputStreamOperatorTestHarness harness = createStreamSink(jobID)) { + harness.setup(); + // will restore watermark for checkpoint 4 + harness.initializeState(snapshot); + harness.open(); + table.refresh(); + // restore from snapshot 4 also will commit watermark for snapshot 4 + Assert.assertEquals(String.valueOf(baseTimestamp + 4), + table.properties().get(IcebergFilesCommitter.CURRENT_WATERMARK)); + } + } + @Test public void testOrderedEventsBetweenCheckpoints() throws Exception { // It's possible that two checkpoints happen in the following orders: From cfeba98feb6c2b01878f7008ad40deb6287c29fc Mon Sep 17 00:00:00 2001 From: dixingxing Date: Tue, 19 Jan 2021 02:04:10 +0800 Subject: [PATCH 2/2] Delete useless imports --- .../org/apache/iceberg/flink/sink/IcebergFilesCommitter.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java b/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java index eaf695739cad..6098b7b98424 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java +++ b/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java @@ -30,7 +30,6 @@ import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; -import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.java.typeutils.MapTypeInfo; import org.apache.flink.core.io.SimpleVersionedSerialization; import org.apache.flink.runtime.state.StateInitializationContext; @@ -48,7 +47,6 @@ import org.apache.iceberg.Snapshot; import org.apache.iceberg.SnapshotUpdate; import org.apache.iceberg.Table; -import org.apache.iceberg.UpdateProperties; import org.apache.iceberg.flink.TableLoader; import org.apache.iceberg.io.WriteResult; import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;