-
Notifications
You must be signed in to change notification settings - Fork 2.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Flink: store watermark as iceberg table's property #2109
Conversation
@dixingxing0, in our implementation, we store watermarks in snapshot summary metadata. I think that's a more appropriate place for it because it is metadata about the snapshot that is produced. We also use a watermark per writer because we write in 3 different AWS regions. So I think it would make sense to be able to name each watermark, possibly with a default if you choose not to name it. |
FYI @stevenzwu |
@dixingxing0 can you describe the motivation of checkpointing the watermarks in Flink state? Ryan described our use of watermarks in snapshot metadata. They are used to indicate the data completeness on the ingestion path so that downstream batch consumer jobs can be triggered when data is complete for a window (like hourly). |
if (context.isRestored()) { | ||
watermarkPerCheckpoint.putAll(watermarkState.get().iterator().next()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will this be backwards compatible for on going streaming jobs that don't have any watermarkState
when they restore? For example, for on going streaming jobs that are upgraded to a version of iceberg that includes this patch?
Looking at the Flink AppendingState
interface, it says that calling .get()
should return null
if the state is empty. Also, you can see that the value of restoredFlinkJobId
below from calling jobIdState.get().iterator().next()
below is checked for null or empty.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, on further inspection of the ListState
interface, it says that passing null
to putAll
is a no-op. So I don' think there should be backwards compatibility issues, but should we possibly be (1) logging something if no watermark state is restored and/or (2) possibly doing our own null
check vs relying on the documented behavior of ListState#putAll
to be consistent over time when inserting null
?
I don't have a strong opinion about either point 1 or point 2, but I thought it might be worth bringing up for discussion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @kbendick, i neglected backwards compatible thing, current code will raise an java.util.NoSuchElementException
when watermarkState
is empty, i will fix it, also i will logging whether watermark state is restored since the restore is not an high frequency event.
BTW, watermarkPerCheckpoint
is an instance of HashMap
, i think the variable name misled you here 😄.
Thanks @rdblue, agree with you. About to name watermark, i think we can introduce an new confiuration // user specified configuration
flink.store-watermark=false // as default
flink.watermark-name=default // as default
// written by flink file committer
flink.watermark-for-default=the-watermark // use flink.watermark-name as suffix @rdblue what do you think? |
Thanks @stevenzwu, about the watermark state, i am just according to the current restore behavior: NavigableMap<Long, byte[]> uncommittedDataFiles = Maps
.newTreeMap(checkpointsState.get().iterator().next())
.tailMap(maxCommittedCheckpointId, false);
if (!uncommittedDataFiles.isEmpty()) {
// Committed all uncommitted data files from the old flink job to iceberg table.
long maxUncommittedCheckpointId = uncommittedDataFiles.lastKey();
commitUpToCheckpoint(uncommittedDataFiles, restoredFlinkJobId, maxUncommittedCheckpointId);
} Since flink will commit last uncommitted checkpoint, i think we should also store the right watermark for that checkpoint. Our use case is exactly same as you and @rdblue described, except we don't have multi writers 😄 . |
@@ -106,6 +114,9 @@ | |||
// All pending checkpoints states for this function. | |||
private static final ListStateDescriptor<SortedMap<Long, byte[]>> STATE_DESCRIPTOR = buildStateDescriptor(); | |||
private transient ListState<SortedMap<Long, byte[]>> checkpointsState; | |||
private static final ListStateDescriptor<Map<Long, Long>> WATERMARK_DESCRIPTOR = new ListStateDescriptor<>( | |||
"iceberg-flink-watermark", new MapTypeInfo<>(BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO)); | |||
private transient ListState<Map<Long, Long>> watermarkState; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we define a MetaData
to hold all checkpointed metadata fields so that we don't have to define a new state for each case?
Ideally, I would prefer the metadata and the manifest file bundled in a single class (per checkpoint). That would require complexity of handling state schema evolution, which I am not sure if it is worth the effort.
@@ -296,6 +318,13 @@ private void commitOperation(SnapshotUpdate<?> operation, int numDataFiles, int | |||
|
|||
long start = System.currentTimeMillis(); | |||
operation.commit(); // abort is automatically called if this fails. | |||
|
|||
Long watermarkForCheckpoint = watermarkPerCheckpoint.get(checkpointId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to use table transaction here so that operation.commit()
and `table.updatePropertie...commit() are atomic. This may require bigger refactoring of the code though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we store watermark in snapshot summary metadata as @rdblue said, we can omit table.updatePropertie
transaction.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We actually set it as table properties too.
I think table properties is easier for the workflow scheduler (in the batch system) to query. Otherwise, they have to iterate the snapshots and find out the latest watermarks for all 3 regions. cc @rdblue
@Override | ||
public void processWatermark(Watermark mark) throws Exception { | ||
super.processWatermark(mark); | ||
if (mark.getTimestamp() != Watermark.MAX_WATERMARK.getTimestamp()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need to ignore the MAX_WATERMARK
? it signals the end of input.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As you said before, we use watermark to indicate the data completeness on the ingestion path, i think we do not need to store MAX_WATERMARK
when flink job run in streaming-mode.
If flink job run in batch-mode, even we store one MAX_WATERMARK
, we still can't know which partition is completed, i think in batch-mode, we can just simply rely on the scheduling system. I'm not sure how to use the MAX_WATERMARK
, so i just ignore it 😁.
@@ -106,6 +114,9 @@ | |||
// All pending checkpoints states for this function. | |||
private static final ListStateDescriptor<SortedMap<Long, byte[]>> STATE_DESCRIPTOR = buildStateDescriptor(); | |||
private transient ListState<SortedMap<Long, byte[]>> checkpointsState; | |||
private static final ListStateDescriptor<Map<Long, Long>> WATERMARK_DESCRIPTOR = new ListStateDescriptor<>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We probably should use SortedMap
here.
@dixingxing0 thx a lot for the additional context. that is very helpful. I left a few comments. Regarding the scenario of multiple writer jobs and single table, I am afraid that the additional config won't help because we are talking one table here. Somehow, we need to allow a provider to provide the suffix for watermark property key. For us, the suffix is the AWS region. I am not sure what is the cleanest way to achieve that. We can define a provider class config and use reflection to instantiate it. I am hesitant with reflection as it is impossible to pass dependency to reflection instantiated class. |
@stevenzwu thanks for the review and comments! As you described, we cannot config watermark name suffix as table property for multiple writers 😁 . How about we introduce new fields in // introduce new fields in org.apache.iceberg.flink.sink.FlinkSink.Builder
private boolean storeWatermarkEnabled; // default false
private String watermarkNameSuffix; // default "default"
// iceberg `table property` or `snapshot summary` written by flink file committer
flink.watermark-for-default=the-watermark // use watermarkNameSuffix config as suffix |
@dixingxing0 yeah. extending the Small suggestion on the naming:
|
Thanks, i will address it. |
We actually implemented this in a slightly different way of calculating the watermark. Instead of using Flink watermark, we add some additional metadata (min, max, sum, count) per DataFile for the timestamp column. In the committer, we use the min of min to decide the watermark value. We never regress the watermark value. Those metadata can also help us calculate metrics for ingestion latency (commit time - event/Kafka time): like min, max, avg. Just to share, by no means that I am suggesting changing the approach in this PR. It is perfectly good. |
Through this PR, I have an idea whether we can add a |
A callback sounds complicated and seems to tie too much of the back-end together. I wouldn't want Something plugged into the Iceberg component talking to Kafka directly. That sounds like we're trying to work around a framework limitation. |
thx @stevenzwu @rdblue, that sounds great! We also need to embed the iceberg table, which is regarded as real-time table, into our workflow. Is there any doc or patch for your implementation? |
This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@iceberg.apache.org list. Thank you for your contributions. |
This pull request has been closed due to lack of activity. This is not a judgement on the merit of the PR in any way. It is just a way of keeping the PR queue manageable. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time. |
Fixes #2108.