Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flink: store watermark as iceberg table's property #2109

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,14 @@
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
import org.apache.flink.api.java.typeutils.MapTypeInfo;
import org.apache.flink.core.io.SimpleVersionedSerialization;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.runtime.typeutils.SortedMapTypeInfo;
import org.apache.iceberg.AppendFiles;
Expand All @@ -55,6 +57,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Comparators;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.PropertyUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -73,6 +76,8 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
// avoiding committing the same data files twice. This id will be attached to iceberg's meta when committing the
// iceberg transaction.
private static final String MAX_COMMITTED_CHECKPOINT_ID = "flink.max-committed-checkpoint-id";
static final String CURRENT_WATERMARK = "flink.current-watermark";
static final String STORE_WATERMARK = "flink.store-watermark";

// TableLoader to load iceberg table lazily.
private final TableLoader tableLoader;
Expand All @@ -85,6 +90,7 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
// any data loss in iceberg table. So we keep the finished files <1, <file0, file1>> in memory and retry to commit
// iceberg table when the next checkpoint happen.
private final NavigableMap<Long, byte[]> dataFilesPerCheckpoint = Maps.newTreeMap();
private final Map<Long, Long> watermarkPerCheckpoint = Maps.newHashMap();

// The completed files cache for current checkpoint. Once the snapshot barrier received, it will be flushed to the
// 'dataFilesPerCheckpoint'.
Expand All @@ -95,6 +101,8 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
private transient Table table;
private transient ManifestOutputFileFactory manifestOutputFileFactory;
private transient long maxCommittedCheckpointId;
private transient long currentWatermark;
private transient boolean storeWatermark;

// There're two cases that we restore from flink checkpoints: the first case is restoring from snapshot created by the
// same flink job; another case is restoring from snapshot created by another different job. For the second case, we
Expand All @@ -106,6 +114,9 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
// All pending checkpoints states for this function.
private static final ListStateDescriptor<SortedMap<Long, byte[]>> STATE_DESCRIPTOR = buildStateDescriptor();
private transient ListState<SortedMap<Long, byte[]>> checkpointsState;
private static final ListStateDescriptor<Map<Long, Long>> WATERMARK_DESCRIPTOR = new ListStateDescriptor<>(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably should use SortedMap here.

"iceberg-flink-watermark", new MapTypeInfo<>(BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO));
private transient ListState<Map<Long, Long>> watermarkState;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we define a MetaData to hold all checkpointed metadata fields so that we don't have to define a new state for each case?

Ideally, I would prefer the metadata and the manifest file bundled in a single class (per checkpoint). That would require complexity of handling state schema evolution, which I am not sure if it is worth the effort.


IcebergFilesCommitter(TableLoader tableLoader, boolean replacePartitions) {
this.tableLoader = tableLoader;
Expand All @@ -126,9 +137,16 @@ public void initializeState(StateInitializationContext context) throws Exception
this.manifestOutputFileFactory = FlinkManifestUtil.createOutputFileFactory(table, flinkJobId, subTaskId, attemptId);
this.maxCommittedCheckpointId = INITIAL_CHECKPOINT_ID;

Map<String, String> properties = this.table.properties();
this.currentWatermark = PropertyUtil.propertyAsLong(properties, CURRENT_WATERMARK, -1L);
this.storeWatermark = PropertyUtil.propertyAsBoolean(properties, STORE_WATERMARK, false);

this.checkpointsState = context.getOperatorStateStore().getListState(STATE_DESCRIPTOR);
this.jobIdState = context.getOperatorStateStore().getListState(JOB_ID_DESCRIPTOR);
this.watermarkState = context.getOperatorStateStore().getListState(WATERMARK_DESCRIPTOR);

if (context.isRestored()) {
watermarkPerCheckpoint.putAll(watermarkState.get().iterator().next());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this be backwards compatible for on going streaming jobs that don't have any watermarkState when they restore? For example, for on going streaming jobs that are upgraded to a version of iceberg that includes this patch?

Looking at the Flink AppendingState interface, it says that calling .get() should return null if the state is empty. Also, you can see that the value of restoredFlinkJobId below from calling jobIdState.get().iterator().next() below is checked for null or empty.

Copy link
Contributor

@kbendick kbendick Jan 19, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, on further inspection of the ListState interface, it says that passing null to putAll is a no-op. So I don' think there should be backwards compatibility issues, but should we possibly be (1) logging something if no watermark state is restored and/or (2) possibly doing our own null check vs relying on the documented behavior of ListState#putAll to be consistent over time when inserting null?

I don't have a strong opinion about either point 1 or point 2, but I thought it might be worth bringing up for discussion.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @kbendick, i neglected backwards compatible thing, current code will raise an java.util.NoSuchElementException when watermarkState is empty, i will fix it, also i will logging whether watermark state is restored since the restore is not an high frequency event.
BTW, watermarkPerCheckpoint is an instance of HashMap, i think the variable name misled you here 😄.

String restoredFlinkJobId = jobIdState.get().iterator().next();
Preconditions.checkState(!Strings.isNullOrEmpty(restoredFlinkJobId),
"Flink job id parsed from checkpoint snapshot shouldn't be null or empty");
Expand Down Expand Up @@ -162,6 +180,10 @@ public void snapshotState(StateSnapshotContext context) throws Exception {
checkpointsState.clear();
checkpointsState.add(dataFilesPerCheckpoint);

watermarkPerCheckpoint.put(checkpointId, currentWatermark);
watermarkState.clear();
watermarkState.add(watermarkPerCheckpoint);

jobIdState.clear();
jobIdState.add(flinkJobId);

Expand Down Expand Up @@ -296,6 +318,13 @@ private void commitOperation(SnapshotUpdate<?> operation, int numDataFiles, int

long start = System.currentTimeMillis();
operation.commit(); // abort is automatically called if this fails.

Long watermarkForCheckpoint = watermarkPerCheckpoint.get(checkpointId);
Copy link
Contributor

@stevenzwu stevenzwu Jan 20, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to use table transaction here so that operation.commit() and `table.updatePropertie...commit() are atomic. This may require bigger refactoring of the code though.

Copy link
Contributor Author

@dixingxing0 dixingxing0 Jan 20, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we store watermark in snapshot summary metadata as @rdblue said, we can omit table.updatePropertie transaction.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We actually set it as table properties too.

I think table properties is easier for the workflow scheduler (in the batch system) to query. Otherwise, they have to iterate the snapshots and find out the latest watermarks for all 3 regions. cc @rdblue

if (storeWatermark && watermarkForCheckpoint != null && watermarkForCheckpoint > 0) {
table.updateProperties().set(CURRENT_WATERMARK, String.valueOf(watermarkForCheckpoint)).commit();
LOG.info("Update {} to {}", CURRENT_WATERMARK, watermarkForCheckpoint);
}

long duration = System.currentTimeMillis() - start;
LOG.info("Committed in {} ms", duration);
}
Expand All @@ -305,6 +334,14 @@ public void processElement(StreamRecord<WriteResult> element) {
this.writeResultsOfCurrentCkpt.add(element.getValue());
}

@Override
public void processWatermark(Watermark mark) throws Exception {
super.processWatermark(mark);
if (mark.getTimestamp() != Watermark.MAX_WATERMARK.getTimestamp()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need to ignore the MAX_WATERMARK? it signals the end of input.

Copy link
Contributor Author

@dixingxing0 dixingxing0 Jan 20, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As you said before, we use watermark to indicate the data completeness on the ingestion path, i think we do not need to store MAX_WATERMARK when flink job run in streaming-mode.
If flink job run in batch-mode, even we store one MAX_WATERMARK, we still can't know which partition is completed, i think in batch-mode, we can just simply rely on the scheduling system. I'm not sure how to use the MAX_WATERMARK, so i just ignore it 😁.

currentWatermark = mark.getTimestamp();
}
}

@Override
public void endInput() throws IOException {
// Flush the buffered data files into 'dataFilesPerCheckpoint' firstly.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,66 @@ public void testCommitTxn() throws Exception {
}
}

@Test
public void testCommitTxnWithWatermark() throws Exception {
table.updateProperties().set(IcebergFilesCommitter.STORE_WATERMARK, "true").commit();
JobID jobID = new JobID();
long baseTimestamp = System.currentTimeMillis();
int checkpointId = 1;
try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness = createStreamSink(jobID)) {
harness.setup();
harness.open();
List<RowData> rows = Lists.newArrayListWithExpectedSize(3);
for (; checkpointId <= 3; checkpointId++) {
long timestamp = baseTimestamp + checkpointId;
RowData rowData = SimpleDataUtil.createRowData(checkpointId, "hello" + checkpointId);
DataFile dataFile = writeDataFile("data-" + checkpointId, ImmutableList.of(rowData));
harness.processElement(of(dataFile), timestamp);
rows.add(rowData);

harness.processWatermark(timestamp);
harness.snapshot(checkpointId, timestamp);
harness.notifyOfCompletedCheckpoint(checkpointId);

SimpleDataUtil.assertTableRows(table, ImmutableList.copyOf(rows));
table.refresh();
Assert.assertEquals(String.valueOf(baseTimestamp + checkpointId),
table.properties().get(IcebergFilesCommitter.CURRENT_WATERMARK));
}
}

OperatorSubtaskState snapshot;
checkpointId = 4;
long timestamp4 = baseTimestamp + checkpointId;
try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness = createStreamSink(jobID)) {
harness.setup();
harness.open();
RowData rowData = SimpleDataUtil.createRowData(checkpointId, "hello" + checkpointId);
DataFile dataFile = writeDataFile("data-" + checkpointId, ImmutableList.of(rowData));
harness.processElement(of(dataFile), timestamp4);

harness.processWatermark(timestamp4);
snapshot = harness.snapshot(checkpointId, timestamp4);

table.refresh();
// since checkpoint 4 has not been committed, so flink.current-watermark should be watermark for checkpoint3
Assert.assertEquals(String.valueOf(baseTimestamp + 3),
table.properties().get(IcebergFilesCommitter.CURRENT_WATERMARK));
}

// test restore
try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness = createStreamSink(jobID)) {
harness.setup();
// will restore watermark for checkpoint 4
harness.initializeState(snapshot);
harness.open();
table.refresh();
// restore from snapshot 4 also will commit watermark for snapshot 4
Assert.assertEquals(String.valueOf(baseTimestamp + 4),
table.properties().get(IcebergFilesCommitter.CURRENT_WATERMARK));
}
}

@Test
public void testOrderedEventsBetweenCheckpoints() throws Exception {
// It's possible that two checkpoints happen in the following orders:
Expand Down