Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flink: Write watermark to the snapshot summary #6253

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.runtime.typeutils.SortedMapTypeInfo;
import org.apache.iceberg.AppendFiles;
Expand Down Expand Up @@ -65,6 +66,7 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>

private static final long serialVersionUID = 1L;
private static final long INITIAL_CHECKPOINT_ID = -1L;
private static final long INITIAL_WATERMARK = -1L;
private static final byte[] EMPTY_MANIFEST_DATA = new byte[0];

private static final Logger LOG = LoggerFactory.getLogger(IcebergFilesCommitter.class);
Expand All @@ -76,6 +78,7 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
// id will be attached to iceberg's meta when committing the iceberg transaction.
private static final String MAX_COMMITTED_CHECKPOINT_ID = "flink.max-committed-checkpoint-id";
static final String MAX_CONTINUOUS_EMPTY_COMMITS = "flink.max-continuous-empty-commits";
static final String WATERMARK = "flink.watermark";

// TableLoader to load iceberg table lazily.
private final TableLoader tableLoader;
Expand All @@ -101,6 +104,7 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
private transient IcebergFilesCommitterMetrics committerMetrics;
private transient ManifestOutputFileFactory manifestOutputFileFactory;
private transient long maxCommittedCheckpointId;
private transient long watermark;
private transient int continuousEmptyCheckpoints;
private transient int maxContinuousEmptyCommits;
// There're two cases that we restore from flink checkpoints: the first case is restoring from
Expand Down Expand Up @@ -177,14 +181,16 @@ public void initializeState(StateInitializationContext context) throws Exception
// it's safe to assign the max committed checkpoint id from restored flink job to the current
// flink job.
this.maxCommittedCheckpointId = getMaxCommittedCheckpointId(table, restoredFlinkJobId);
this.watermark = getWatermark(table, restoredFlinkJobId);

NavigableMap<Long, byte[]> uncommittedDataFiles =
Maps.newTreeMap(checkpointsState.get().iterator().next())
.tailMap(maxCommittedCheckpointId, false);
if (!uncommittedDataFiles.isEmpty()) {
// Committed all uncommitted data files from the old flink job to iceberg table.
long maxUncommittedCheckpointId = uncommittedDataFiles.lastKey();
commitUpToCheckpoint(uncommittedDataFiles, restoredFlinkJobId, maxUncommittedCheckpointId);
commitUpToCheckpoint(
uncommittedDataFiles, restoredFlinkJobId, maxUncommittedCheckpointId, watermark);
}
}
}
Expand Down Expand Up @@ -226,13 +232,16 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception {
// the files,
// Besides, we need to maintain the max-committed-checkpoint-id to be increasing.
if (checkpointId > maxCommittedCheckpointId) {
commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId, checkpointId);
commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId, checkpointId, watermark);
this.maxCommittedCheckpointId = checkpointId;
}
}

private void commitUpToCheckpoint(
NavigableMap<Long, byte[]> deltaManifestsMap, String newFlinkJobId, long checkpointId)
NavigableMap<Long, byte[]> deltaManifestsMap,
String newFlinkJobId,
long checkpointId,
long newWatermark)
throws IOException {
NavigableMap<Long, byte[]> pendingMap = deltaManifestsMap.headMap(checkpointId, true);
List<ManifestFile> manifests = Lists.newArrayList();
Expand All @@ -252,7 +261,7 @@ private void commitUpToCheckpoint(
}

CommitSummary summary = new CommitSummary(pendingResults);
commitPendingResult(pendingResults, summary, newFlinkJobId, checkpointId);
commitPendingResult(pendingResults, summary, newFlinkJobId, checkpointId, newWatermark);
committerMetrics.updateCommitSummary(summary);
pendingMap.clear();
deleteCommittedManifests(manifests, newFlinkJobId, checkpointId);
Expand All @@ -262,14 +271,15 @@ private void commitPendingResult(
NavigableMap<Long, WriteResult> pendingResults,
CommitSummary summary,
String newFlinkJobId,
long checkpointId) {
long checkpointId,
long newWatermark) {
long totalFiles = summary.dataFilesCount() + summary.deleteFilesCount();
continuousEmptyCheckpoints = totalFiles == 0 ? continuousEmptyCheckpoints + 1 : 0;
if (totalFiles != 0 || continuousEmptyCheckpoints % maxContinuousEmptyCommits == 0) {
if (replacePartitions) {
replacePartitions(pendingResults, summary, newFlinkJobId, checkpointId);
replacePartitions(pendingResults, summary, newFlinkJobId, checkpointId, newWatermark);
} else {
commitDeltaTxn(pendingResults, summary, newFlinkJobId, checkpointId);
commitDeltaTxn(pendingResults, summary, newFlinkJobId, checkpointId, newWatermark);
}
continuousEmptyCheckpoints = 0;
}
Expand Down Expand Up @@ -300,7 +310,8 @@ private void replacePartitions(
NavigableMap<Long, WriteResult> pendingResults,
CommitSummary summary,
String newFlinkJobId,
long checkpointId) {
long checkpointId,
long newWatermark) {
Preconditions.checkState(
summary.deleteFilesCount() == 0, "Cannot overwrite partitions with delete files.");
// Commit the overwrite transaction.
Expand All @@ -312,14 +323,20 @@ private void replacePartitions(
}

commitOperation(
dynamicOverwrite, summary, "dynamic partition overwrite", newFlinkJobId, checkpointId);
dynamicOverwrite,
summary,
"dynamic partition overwrite",
newFlinkJobId,
checkpointId,
newWatermark);
}

private void commitDeltaTxn(
NavigableMap<Long, WriteResult> pendingResults,
CommitSummary summary,
String newFlinkJobId,
long checkpointId) {
long checkpointId,
long newWatermark) {
if (summary.deleteFilesCount() == 0) {
// To be compatible with iceberg format V1.
AppendFiles appendFiles = table.newAppend().scanManifestsWith(workerPool);
Expand All @@ -329,7 +346,7 @@ private void commitDeltaTxn(
"Should have no referenced data files for append.");
Arrays.stream(result.dataFiles()).forEach(appendFiles::appendFile);
}
commitOperation(appendFiles, summary, "append", newFlinkJobId, checkpointId);
commitOperation(appendFiles, summary, "append", newFlinkJobId, checkpointId, newWatermark);
} else {
// To be compatible with iceberg format V2.
for (Map.Entry<Long, WriteResult> e : pendingResults.entrySet()) {
Expand All @@ -350,7 +367,7 @@ private void commitDeltaTxn(

Arrays.stream(result.dataFiles()).forEach(rowDelta::addRows);
Arrays.stream(result.deleteFiles()).forEach(rowDelta::addDeletes);
commitOperation(rowDelta, summary, "rowDelta", newFlinkJobId, e.getKey());
commitOperation(rowDelta, summary, "rowDelta", newFlinkJobId, e.getKey(), newWatermark);
}
}
}
Expand All @@ -360,13 +377,15 @@ private void commitOperation(
CommitSummary summary,
String description,
String newFlinkJobId,
long checkpointId) {
long checkpointId,
long newWatermark) {
LOG.info("Committing {} to table {} with summary: {}", description, table.name(), summary);
snapshotProperties.forEach(operation::set);
// custom snapshot metadata properties will be overridden if they conflict with internal ones
// used by the sink.
operation.set(MAX_COMMITTED_CHECKPOINT_ID, Long.toString(checkpointId));
operation.set(FLINK_JOB_ID, newFlinkJobId);
operation.set(WATERMARK, Long.toString(newWatermark));

long startNano = System.nanoTime();
operation.commit(); // abort is automatically called if this fails.
Expand All @@ -392,7 +411,13 @@ public void endInput() throws IOException {
dataFilesPerCheckpoint.put(currentCheckpointId, writeToManifest(currentCheckpointId));
writeResultsOfCurrentCkpt.clear();

commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId, currentCheckpointId);
commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId, currentCheckpointId, watermark);
}

@Override
public void processWatermark(Watermark mark) throws Exception {
super.processWatermark(mark);
this.watermark = mark.getTimestamp();
}

/**
Expand Down Expand Up @@ -445,23 +470,34 @@ private static ListStateDescriptor<SortedMap<Long, byte[]>> buildStateDescriptor
}

static long getMaxCommittedCheckpointId(Table table, String flinkJobId) {
String lastCommittedCheckpointId =
getSummary(
table, flinkJobId, MAX_COMMITTED_CHECKPOINT_ID, Long.toString(INITIAL_CHECKPOINT_ID));
return Long.parseLong(lastCommittedCheckpointId);
}

static long getWatermark(Table table, String flinkJobId) {
String lastCommittedCheckpointId =
getSummary(table, flinkJobId, WATERMARK, Long.toString(INITIAL_WATERMARK));
return Long.parseLong(lastCommittedCheckpointId);
}

private static String getSummary(
Table table, String flinkJobId, String summaryName, String defaultValue) {
Snapshot snapshot = table.currentSnapshot();
long lastCommittedCheckpointId = INITIAL_CHECKPOINT_ID;

while (snapshot != null) {
Map<String, String> summary = snapshot.summary();
String snapshotFlinkJobId = summary.get(FLINK_JOB_ID);
if (flinkJobId.equals(snapshotFlinkJobId)) {
String value = summary.get(MAX_COMMITTED_CHECKPOINT_ID);
String value = summary.get(summaryName);
if (value != null) {
lastCommittedCheckpointId = Long.parseLong(value);
break;
return value;
}
}
Long parentSnapshotId = snapshot.parentId();
snapshot = parentSnapshotId != null ? table.snapshot(parentSnapshotId) : null;
}

return lastCommittedCheckpointId;
return defaultValue;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,43 @@ public void testCommitTxn() throws Exception {
}
}

@Test
public void testWatermark() throws Exception {
long timestamp = 0;

JobID jobID = new JobID();
try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness = createStreamSink(jobID)) {
harness.setup();
harness.open();
assertSnapshotSize(0);

List<RowData> rows = Lists.newArrayListWithExpectedSize(3);
for (int i = 1; i <= 3; i++) {
RowData rowData = SimpleDataUtil.createRowData(i, "hello" + i);
DataFile dataFile = writeDataFile("data-" + i, ImmutableList.of(rowData));
timestamp = timestamp + i;

harness.processElement(of(dataFile), timestamp);
rows.add(rowData);

harness.snapshot(i, timestamp);
assertFlinkManifests(1);

harness.processWatermark(timestamp);

harness.notifyOfCompletedCheckpoint(i);
assertFlinkManifests(0);

SimpleDataUtil.assertTableRows(table, ImmutableList.copyOf(rows));
assertSnapshotSize(i);
assertWatermark(jobID, timestamp);
Assert.assertEquals(
TestIcebergFilesCommitter.class.getName(),
table.currentSnapshot().summary().get("flink.test"));
}
}
}

@Test
public void testOrderedEventsBetweenCheckpoints() throws Exception {
// It's possible that two checkpoints happen in the following orders:
Expand Down Expand Up @@ -817,6 +854,12 @@ private void assertMaxCommittedCheckpointId(JobID jobID, long expectedId) {
Assert.assertEquals(expectedId, actualId);
}

private void assertWatermark(JobID jobID, long expectedWatermark) {
table.refresh();
long actualWatermark = IcebergFilesCommitter.getWatermark(table, jobID.toString());
Assert.assertEquals(expectedWatermark, actualWatermark);
}

private void assertSnapshotSize(int expectedSnapshotSize) {
table.refresh();
Assert.assertEquals(expectedSnapshotSize, Lists.newArrayList(table.snapshots()).size());
Expand Down