Skip to content

Commit

Permalink
[FLINK-25920] Fix AbstractStreamingWriter sending after EOI
Browse files Browse the repository at this point in the history
AbstractStreamingWriter send partition info twice on EOI. This commit ensures that we are not resending partition information even after restarting from a final checkpoint.

(cherry picked from commit 6d60f41)
  • Loading branch information
AHeise committed Nov 8, 2024
1 parent bd87a2a commit f163030
Showing 1 changed file with 51 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@

package org.apache.flink.connector.file.table.stream;

import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.base.BooleanSerializer;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
Expand All @@ -33,6 +36,12 @@
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

import org.apache.flink.shaded.guava31.com.google.common.collect.Lists;

import java.util.List;

import static org.apache.flink.util.Preconditions.checkState;

/**
* Operator for file system sink. It is a operator version of {@link StreamingFileSink}. It can send
* file and bucket information to downstream.
Expand All @@ -58,6 +67,16 @@ public abstract class AbstractStreamingWriter<IN, OUT> extends AbstractStreamOpe

protected transient long currentWatermark;

/**
* Used to remember that EOI has already happened so that we don't emit the last committables of
* the final checkpoints twice.
*/
private static final ListStateDescriptor<Boolean> END_OF_INPUT_STATE_DESC =
new ListStateDescriptor<>("end_of_input_state", BooleanSerializer.INSTANCE);

private boolean endOfInput;
private ListState<Boolean> endOfInputState;

public AbstractStreamingWriter(
long bucketCheckInterval,
StreamingFileSink.BucketsBuilder<
Expand Down Expand Up @@ -123,6 +142,27 @@ public void bucketInactive(Bucket<IN, String> bucket) {
bucketCheckInterval);

currentWatermark = Long.MIN_VALUE;

// Figure out if we have seen end of input before and if we should anything downstream. We
// have the following
// cases:
// 1. state is empty:
// - First time initialization
// - Restoring from a previous version of Flink that didn't handle EOI
// - Upscaled from a final or regular checkpoint
// In all cases, we regularly handle EOI, potentially resulting in unnecessary .
// 2. state is not empty:
// - This implies Flink restores from a version that handles EOI.
// - If there is one entry, no rescaling happened (for this subtask), so if it's true,
// we recover from a final checkpoint (for this subtask) and can ignore another EOI
// else we have a regular checkpoint.
// - If there are multiple entries, Flink downscaled, and we need to check if all are
// true and do the same as above. As soon as one entry is false, we regularly start
// the writer and potentially emit duplicate summaries if we indeed recovered from a
// final checkpoint.
endOfInputState = context.getOperatorStateStore().getListState(END_OF_INPUT_STATE_DESC);
List<Boolean> previousState = Lists.newArrayList(endOfInputState.get());
endOfInput = !previousState.isEmpty() && !previousState.contains(false);
}

@Override
Expand All @@ -139,6 +179,7 @@ public void processWatermark(Watermark mark) throws Exception {

@Override
public void processElement(StreamRecord<IN> element) throws Exception {
checkState(!endOfInput, "Received element after endOfInput: %s", element);
helper.onElement(
element.getValue(),
getProcessingTimeService().getCurrentProcessingTime(),
Expand All @@ -149,15 +190,20 @@ public void processElement(StreamRecord<IN> element) throws Exception {
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
super.notifyCheckpointComplete(checkpointId);
commitUpToCheckpoint(checkpointId);
if (!this.endOfInput) {
commitUpToCheckpoint(checkpointId);
}
}

@Override
public void endInput() throws Exception {
buckets.onProcessingTime(Long.MAX_VALUE);
helper.snapshotState(Long.MAX_VALUE);
output.emitWatermark(new Watermark(Long.MAX_VALUE));
commitUpToCheckpoint(Long.MAX_VALUE);
if (!this.endOfInput) {
this.endOfInput = true;
buckets.onProcessingTime(Long.MAX_VALUE);
helper.snapshotState(Long.MAX_VALUE);
output.emitWatermark(new Watermark(Long.MAX_VALUE));
commitUpToCheckpoint(Long.MAX_VALUE);
}
}

@Override
Expand Down

0 comments on commit f163030

Please sign in to comment.