Skip to content

Commit

Permalink
[FLINK-25920] Straighten EOI handling in CommittableCollector
Browse files Browse the repository at this point in the history
In some parts of the sink, EOI is treated as checkpointId=null and in some checkpointId=MAX. The code of CheckpointCommittableManagerImpl implies that a null is valid however the serializer actually breaks then. In practice, checkpointId=MAX is used all the time by accident.

This commit replaces the nullable checkpointIds with a primitive long EOI=MAX, so that we always use the special value instead of null. The serializer already used that value, so it actually simplifies many places and doesn't break any existing state.
  • Loading branch information
AHeise committed Sep 17, 2024
1 parent 96b8425 commit c56def0
Show file tree
Hide file tree
Showing 20 changed files with 245 additions and 196 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,6 @@
import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

import javax.annotation.Nullable;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -138,15 +136,15 @@ public void processElement(StreamRecord<CompactorRequest> element) throws Except
@Override
public void endInput() throws Exception {
// add collecting requests into the final snapshot
checkpointRequests.put(Long.MAX_VALUE, collectingRequests);
checkpointRequests.put(CommittableMessage.EOI, collectingRequests);
collectingRequests = new ArrayList<>();

// submit all requests and wait until they are done
submitUntil(Long.MAX_VALUE);
submitUntil(CommittableMessage.EOI);
assert checkpointRequests.isEmpty();

getAllTasksFuture().join();
emitCompacted(null);
emitCompacted(CommittableMessage.EOI);
assert compactingRequests.isEmpty();
}

Expand Down Expand Up @@ -223,7 +221,7 @@ private void submitUntil(long checkpointId) {
canSubmit.clear();
}

private void emitCompacted(@Nullable Long checkpointId) throws Exception {
private void emitCompacted(long checkpointId) throws Exception {
List<FileSinkCommittable> compacted = new ArrayList<>();
Iterator<Tuple2<CompactorRequest, CompletableFuture<Iterable<FileSinkCommittable>>>> iter =
compactingRequests.iterator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ private void appendCompactingResultsToSummary(CommittableSummary<FileSinkCommitt
new CommittableSummary<>(
summary.getSubtaskId(),
summary.getNumberOfSubtasks(),
getCheckpointId(summary),
summary.getCheckpointIdOrEOI(),
summary.getNumberOfCommittables() + results.size(),
summary.getNumberOfPendingCommittables() + results.size(),
summary.getNumberOfFailedCommittables())));
Expand All @@ -180,7 +180,7 @@ private void appendCompactingResultsToSummary(CommittableSummary<FileSinkCommitt
new StreamRecord<>(
new CommittableWithLineage<>(
committable,
getCheckpointId(summary),
summary.getCheckpointIdOrEOI(),
summary.getSubtaskId())));
}
}
Expand All @@ -204,7 +204,7 @@ private void handleHiddenCommittable(CommittableWithLineage<FileSinkCommittable>
// cleanup request to the next summary, since the count of pending committable
// for this checkpoint is immutable now
Iterable<FileSinkCommittable> result = submit(request).get();
Long checkpointId = getCheckpointId(message);
Long checkpointId = message.getCheckpointIdOrEOI();
boolean pendingFileSent = false;
for (FileSinkCommittable c : result) {
if (c.hasPendingFile()) {
Expand Down Expand Up @@ -275,10 +275,6 @@ public void snapshotState(StateSnapshotContext context) throws Exception {
remainingRequestsState.update(Collections.singletonList(requestsMap));
}

private Long getCheckpointId(CommittableMessage<FileSinkCommittable> message) {
return message.getCheckpointId().isPresent() ? message.getCheckpointId().getAsLong() : null;
}

private CompletableFuture<Iterable<FileSinkCommittable>> submit(CompactorRequest request) {
CompletableFuture<Iterable<FileSinkCommittable>> resultFuture = new CompletableFuture<>();
compactService.submit(request, resultFuture);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,33 @@

import java.util.OptionalLong;

/** The message send from {@link SinkWriter} to {@link Committer}. */
/** The message send from {@code SinkWriter} to {@code Committer}. */
@Experimental
public interface CommittableMessage<CommT> {
/**
* Special value for checkpointId for the end of input in case of batch commit or final
* checkpoint.
*/
long EOI = Long.MAX_VALUE;

/** The subtask that created this committable. */
int getSubtaskId();

/**
* Returns the checkpoint id or empty if the message does not belong to a checkpoint. In that
* case, the committable was created at the end of input (e.g., in batch mode).
*
* @see #getCheckpointIdOrEOI()
*/
@Deprecated
default OptionalLong getCheckpointId() {
long checkpointIdOrEOI = getCheckpointIdOrEOI();
return checkpointIdOrEOI == EOI ? OptionalLong.empty() : OptionalLong.of(checkpointIdOrEOI);
}

/**
* Returns the checkpoint id or EOI if this message belong to the final checkpoint or the batch
* commit.
*/
OptionalLong getCheckpointId();
long getCheckpointIdOrEOI();
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ public class CommittableMessageSerializer<CommT>
@VisibleForTesting static final int VERSION = 1;
private static final int COMMITTABLE = 1;
private static final int SUMMARY = 2;
private static final long EOI = Long.MAX_VALUE;

private final SimpleVersionedSerializer<CommT> committableSerializer;

Expand All @@ -64,14 +63,14 @@ public byte[] serialize(CommittableMessage<CommT> obj) throws IOException {
committableSerializer,
((CommittableWithLineage<CommT>) obj).getCommittable(),
out);
writeCheckpointId(out, obj);
out.writeLong(obj.getCheckpointIdOrEOI());
out.writeInt(obj.getSubtaskId());
} else if (obj instanceof CommittableSummary) {
out.writeByte(SUMMARY);
out.writeInt(obj.getSubtaskId());
CommittableSummary<?> committableSummary = (CommittableSummary<?>) obj;
out.writeInt(committableSummary.getNumberOfSubtasks());
writeCheckpointId(out, obj);
out.writeLong(obj.getCheckpointIdOrEOI());
out.writeInt(committableSummary.getNumberOfCommittables());
out.writeInt(committableSummary.getNumberOfPendingCommittables());
out.writeInt(committableSummary.getNumberOfFailedCommittables());
Expand All @@ -91,13 +90,13 @@ public CommittableMessage<CommT> deserialize(int version, byte[] serialized)
return new CommittableWithLineage<>(
SimpleVersionedSerialization.readVersionAndDeSerialize(
committableSerializer, in),
readCheckpointId(in),
in.readLong(),
in.readInt());
case SUMMARY:
return new CommittableSummary<>(
in.readInt(),
in.readInt(),
readCheckpointId(in),
in.readLong(),
in.readInt(),
in.readInt(),
in.readInt());
Expand All @@ -109,14 +108,4 @@ public CommittableMessage<CommT> deserialize(int version, byte[] serialized)
+ StringUtils.byteToHexString(serialized));
}
}

private void writeCheckpointId(DataOutputSerializer out, CommittableMessage<CommT> obj)
throws IOException {
out.writeLong(obj.getCheckpointId().orElse(EOI));
}

private Long readCheckpointId(DataInputDeserializer in) throws IOException {
long checkpointId = in.readLong();
return checkpointId == EOI ? null : checkpointId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ private CommittableMessageTypeInfo(
* @param committableSerializerFactory factory to create the serializer for a {@link
* CommittableMessage}
* @param <CommT> type of the committable
* @return
*/
public static <CommT> TypeInformation<CommittableMessage<CommT>> of(
SerializableSupplier<SimpleVersionedSerializer<CommT>> committableSerializerFactory) {
Expand Down Expand Up @@ -86,6 +85,7 @@ public int getTotalFields() {
return 1;
}

@SuppressWarnings({"unchecked", "rawtypes"})
@Override
public Class<CommittableMessage<CommT>> getTypeClass() {
return (Class) CommittableMessage.class;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@

import org.apache.flink.annotation.Experimental;

import javax.annotation.Nullable;

import java.util.OptionalLong;
import java.util.Objects;

/**
* This class tracks the information about committables belonging to one checkpoint coming from one
Expand All @@ -38,7 +36,7 @@ public class CommittableSummary<CommT> implements CommittableMessage<CommT> {
/** May change after recovery. */
private final int numberOfSubtasks;

@Nullable private final Long checkpointId;
private final long checkpointId;
/** The number of committables coming from the given subtask in the particular checkpoint. */
private final int numberOfCommittables;
/** The number of committables that have not been successfully committed. */
Expand All @@ -49,7 +47,7 @@ public class CommittableSummary<CommT> implements CommittableMessage<CommT> {
public CommittableSummary(
int subtaskId,
int numberOfSubtasks,
@Nullable Long checkpointId,
long checkpointId,
int numberOfCommittables,
int numberOfPendingCommittables,
int numberOfFailedCommittables) {
Expand All @@ -69,8 +67,8 @@ public int getNumberOfSubtasks() {
return numberOfSubtasks;
}

public OptionalLong getCheckpointId() {
return checkpointId == null ? OptionalLong.empty() : OptionalLong.of(checkpointId);
public long getCheckpointIdOrEOI() {
return checkpointId;
}

public int getNumberOfCommittables() {
Expand All @@ -94,4 +92,50 @@ public <NewCommT> CommittableSummary<NewCommT> map() {
numberOfPendingCommittables,
numberOfFailedCommittables);
}

@Override
public String toString() {
return "CommittableSummary{"
+ "subtaskId="
+ subtaskId
+ ", numberOfSubtasks="
+ numberOfSubtasks
+ ", checkpointId="
+ checkpointId
+ ", numberOfCommittables="
+ numberOfCommittables
+ ", numberOfPendingCommittables="
+ numberOfPendingCommittables
+ ", numberOfFailedCommittables="
+ numberOfFailedCommittables
+ '}';
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
CommittableSummary<?> that = (CommittableSummary<?>) o;
return subtaskId == that.subtaskId
&& numberOfSubtasks == that.numberOfSubtasks
&& checkpointId == that.checkpointId
&& numberOfCommittables == that.numberOfCommittables
&& numberOfPendingCommittables == that.numberOfPendingCommittables
&& numberOfFailedCommittables == that.numberOfFailedCommittables;
}

@Override
public int hashCode() {
return Objects.hash(
subtaskId,
numberOfSubtasks,
checkpointId,
numberOfCommittables,
numberOfPendingCommittables,
numberOfFailedCommittables);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@
import org.apache.flink.annotation.Experimental;
import org.apache.flink.api.connector.sink2.Committer;

import javax.annotation.Nullable;

import java.util.OptionalLong;
import java.util.Objects;
import java.util.function.Function;

import static org.apache.flink.util.Preconditions.checkNotNull;
Expand All @@ -35,10 +33,10 @@
@Experimental
public class CommittableWithLineage<CommT> implements CommittableMessage<CommT> {
private final CommT committable;
@Nullable private final Long checkpointId;
private final long checkpointId;
private final int subtaskId;

public CommittableWithLineage(CommT committable, @Nullable Long checkpointId, int subtaskId) {
public CommittableWithLineage(CommT committable, long checkpointId, int subtaskId) {
this.committable = checkNotNull(committable);
this.checkpointId = checkpointId;
this.subtaskId = subtaskId;
Expand All @@ -52,11 +50,42 @@ public int getSubtaskId() {
return subtaskId;
}

public OptionalLong getCheckpointId() {
return checkpointId == null ? OptionalLong.empty() : OptionalLong.of(checkpointId);
public long getCheckpointIdOrEOI() {
return checkpointId;
}

public <NewCommT> CommittableWithLineage<NewCommT> map(Function<CommT, NewCommT> mapper) {
return new CommittableWithLineage<>(mapper.apply(committable), checkpointId, subtaskId);
}

@Override
public String toString() {
return "CommittableWithLineage{"
+ "committable="
+ committable
+ ", checkpointId="
+ checkpointId
+ ", subtaskId="
+ subtaskId
+ '}';
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
CommittableWithLineage<?> that = (CommittableWithLineage<?>) o;
return checkpointId == that.checkpointId
&& subtaskId == that.subtaskId
&& Objects.equals(committable, that.committable);
}

@Override
public int hashCode() {
return Objects.hash(committable, checkpointId, subtaskId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import java.util.Collections;
import java.util.OptionalLong;

import static org.apache.flink.streaming.api.connector.sink2.CommittableMessage.EOI;
import static org.apache.flink.util.IOUtils.closeAll;
import static org.apache.flink.util.Preconditions.checkNotNull;

Expand Down Expand Up @@ -148,7 +149,7 @@ public void endInput() throws Exception {
endInput = true;
if (!isCheckpointingEnabled || isBatchMode) {
// There will be no final checkpoint, all committables should be committed here
notifyCheckpointComplete(Long.MAX_VALUE);
notifyCheckpointComplete(EOI);
}
}

Expand Down Expand Up @@ -208,8 +209,8 @@ public void processElement(StreamRecord<CommittableMessage<CommT>> element) thro

// in case of unaligned checkpoint, we may receive notifyCheckpointComplete before the
// committables
OptionalLong checkpointId = element.getValue().getCheckpointId();
if (checkpointId.isPresent() && checkpointId.getAsLong() <= lastCompletedCheckpointId) {
long checkpointId = element.getValue().getCheckpointIdOrEOI();
if (checkpointId <= lastCompletedCheckpointId) {
commitAndEmitCheckpoints();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,10 +197,10 @@ public void processWatermark(Watermark mark) throws Exception {
public void endInput() throws Exception {
endOfInput = true;
sinkWriter.flush(true);
emitCommittables(Long.MAX_VALUE);
emitCommittables(CommittableMessage.EOI);
}

private void emitCommittables(Long checkpointId) throws IOException, InterruptedException {
private void emitCommittables(long checkpointId) throws IOException, InterruptedException {
if (!emitDownstream) {
// To support SinkV1 topologies with only a writer we have to call prepareCommit
// although no committables are forwarded
Expand Down
Loading

0 comments on commit c56def0

Please sign in to comment.