Skip to content

Commit

Permalink
[FLINK-36368] Do not prematurely merge CommittableManager (apache#25405)
Browse files Browse the repository at this point in the history
When a sink contains a shuffle between writer and committer, a committer may receive committables coming from multiple subtasks. So far, we immediately merged them on receiving. However, that makes it later impossible to trace whether we received all messages from an upstream task.

It also made rescaling of the committer awkward: during normal processing all committables of a committer have the same subtaskId as the committer. On downscale, these subtaskIds suddenly don't match and need to be replaced, which we solved by merging the SubtaskCommittableManagers.

This commit decouples the collection of committables from changing the subtaskId for emission. Committables retain the upstream subtask id in the CommittableCollection, which survives serialization and deserialization. Only upon emission, we substitute the subtask id with the one of the emitting committer.

This is, in particular, useful for a global committer, where all subtasks are collected. As a side fix, the new serialization also contains the numberOfSubtasks such that different checkpoints may have different degree of parallelism.

The old approach probably has edge cases where scaling a UC would result in stalled pipelines because certain metadata doesn't match. This would not affect pipelines which  chain Writer/Committer (no channel state), Writer and Committer have same DOP (results in a Forward channel, which doesn't use UC for exactly these reasons), and a non-keyed shuffles (because they don't provide any guarantees). Since a keyed shuffle must use the subtask id of the committables, the new approach should be safe. However, since we disabled UC entirely for sinks to adhere to the contract of notifyCheckpointComplete, this shouldn't matter going forward. It's still important to consider these cases though for restoring from Flink 1 checkpoints.
  • Loading branch information
AHeise authored Oct 2, 2024
1 parent d7c87f2 commit 8dc212c
Show file tree
Hide file tree
Showing 16 changed files with 288 additions and 258 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ public <NewCommT> CommittableWithLineage<NewCommT> map(Function<CommT, NewCommT>
return new CommittableWithLineage<>(mapper.apply(committable), checkpointId, subtaskId);
}

/** Creates a shallow copy with the given subtaskId. */
public CommittableWithLineage<CommT> withSubtaskId(int subtaskId) {
return new CommittableWithLineage<>(committable, checkpointId, subtaskId);
}

@Override
public String toString() {
return "CommittableWithLineage{"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManager;
import org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollector;
import org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollectorSerializer;
import org.apache.flink.streaming.runtime.operators.sink.committables.CommittableManager;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.util.function.SerializableSupplier;
Expand All @@ -45,7 +44,6 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;

Expand Down Expand Up @@ -88,7 +86,7 @@ protected void setup(
super.setup(containingTask, config, output);
committer = committerFactory.get();
metricGroup = InternalSinkCommitterMetricGroup.wrap(metrics);
committableCollector = CommittableCollector.of(getRuntimeContext(), metricGroup);
committableCollector = CommittableCollector.of(metricGroup);
committableSerializer = committableSerializerFactory.get();
}

Expand All @@ -113,11 +111,7 @@ public void initializeState(StateInitializationContext context) throws Exception
metricGroup);
final SimpleVersionedSerializer<GlobalCommittableWrapper<CommT, GlobalCommT>> serializer =
new GlobalCommitterSerializer<>(
committableCollectorSerializer,
globalCommittableSerializer,
getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(),
getRuntimeContext().getTaskInfo().getMaxNumberOfParallelSubtasks(),
metricGroup);
committableCollectorSerializer, globalCommittableSerializer, metricGroup);
globalCommitterState =
new SimpleVersionedListState<>(
context.getOperatorStateStore()
Expand All @@ -144,25 +138,17 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception {
commit(lastCompletedCheckpointId);
}

private Collection<? extends CheckpointCommittableManager<CommT>> getCommittables(
long checkpointId) {
final Collection<? extends CheckpointCommittableManager<CommT>> committables =
committableCollector.getCheckpointCommittablesUpTo(checkpointId);
if (committables == null) {
return Collections.emptyList();
}
return committables;
}

private void commit(long checkpointId) throws IOException, InterruptedException {
for (CheckpointCommittableManager<CommT> committable : getCommittables(checkpointId)) {
committable.commit(committer);
for (CheckpointCommittableManager<CommT> checkpoint :
committableCollector.getCheckpointCommittablesUpTo(checkpointId)) {
checkpoint.commit(committer);
}
committableCollector.compact();
}

@Override
public void endInput() throws Exception {
final CommittableManager<CommT> endOfInputCommittable =
final CheckpointCommittableManager<CommT> endOfInputCommittable =
committableCollector.getEndOfInputCommittable();
if (endOfInputCommittable != null) {
do {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,20 +48,14 @@ class GlobalCommitterSerializer<CommT, GlobalCommT>

private final CommittableCollectorSerializer<CommT> committableCollectorSerializer;
@Nullable private final SimpleVersionedSerializer<GlobalCommT> globalCommittableSerializer;
private final int subtaskId;
private final int numberOfSubtasks;
private final SinkCommitterMetricGroup metricGroup;

GlobalCommitterSerializer(
CommittableCollectorSerializer<CommT> committableCollectorSerializer,
@Nullable SimpleVersionedSerializer<GlobalCommT> globalCommittableSerializer,
int subtaskId,
int numberOfSubtasks,
SinkCommitterMetricGroup metricGroup) {
this.committableCollectorSerializer = checkNotNull(committableCollectorSerializer);
this.globalCommittableSerializer = globalCommittableSerializer;
this.subtaskId = subtaskId;
this.numberOfSubtasks = numberOfSubtasks;
this.metricGroup = metricGroup;
}

Expand Down Expand Up @@ -111,8 +105,7 @@ private GlobalCommittableWrapper<CommT, GlobalCommT> deserializeV1(DataInputView
SinkV1CommittableDeserializer.readVersionAndDeserializeList(
globalCommittableSerializer, in);
return new GlobalCommittableWrapper<>(
new CommittableCollector<>(subtaskId, numberOfSubtasks, metricGroup),
globalCommittables);
new CommittableCollector<>(metricGroup), globalCommittables);
}

private GlobalCommittableWrapper<CommT, GlobalCommT> deserializeV2(DataInputView in)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManager;
import org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollector;
import org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollectorSerializer;
import org.apache.flink.streaming.runtime.operators.sink.committables.CommittableManager;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
Expand Down Expand Up @@ -114,7 +113,7 @@ protected void setup(
Output<StreamRecord<CommittableMessage<CommT>>> output) {
super.setup(containingTask, config, output);
metricGroup = InternalSinkCommitterMetricGroup.wrap(getMetricGroup());
committableCollector = CommittableCollector.of(getRuntimeContext(), metricGroup);
committableCollector = CommittableCollector.of(metricGroup);
}

@Override
Expand Down Expand Up @@ -179,15 +178,19 @@ private void commitAndEmitCheckpoints() throws IOException, InterruptedException
// if not endInput, we can schedule retrying later
retryWithDelay();
}
committableCollector.compact();
}

private void commitAndEmit(CommittableManager<CommT> committableManager)
private void commitAndEmit(CheckpointCommittableManager<CommT> committableManager)
throws IOException, InterruptedException {
Collection<CommittableWithLineage<CommT>> committed = committableManager.commit(committer);
if (emitDownstream && !committed.isEmpty()) {
output.collect(new StreamRecord<>(committableManager.getSummary()));
int subtaskId = getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
int numberOfSubtasks = getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks();
output.collect(
new StreamRecord<>(committableManager.getSummary(subtaskId, numberOfSubtasks)));
for (CommittableWithLineage<CommT> committable : committed) {
output.collect(new StreamRecord<>(committable));
output.collect(new StreamRecord<>(committable.withSubtaskId(subtaskId)));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,52 @@
package org.apache.flink.streaming.runtime.operators.sink.committables;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;

import java.io.IOException;
import java.util.Collection;

/**
* This interface adds checkpoint meta information to the committable.
* A {@code CheckpointCommittableManager} collects committables for one checkpoint across
* potentially multiple upstream subtasks.
*
* <p>While it collects committables from multiple upstream subtasks, it belongs to exactly one
* committer subtask.
*
* <p>Each upstream subtask of this particular checkpoint is represented by a {@link
* SubtaskCommittableManager}.
*
* @param <CommT> type of the committable
*/
@Internal
public interface CheckpointCommittableManager<CommT> extends CommittableManager<CommT> {
public interface CheckpointCommittableManager<CommT> {
/**
* Returns the checkpoint id in which the committable was created.
* Returns the checkpoint id in which the committables were created.
*
* @return checkpoint id
*/
long getCheckpointId();

/** Returns the number of upstream subtasks belonging to the checkpoint. */
int getNumberOfSubtasks();

/**
* Returns a summary of the current commit progress for the emitting subtask identified by the
* parameters.
*/
CommittableSummary<CommT> getSummary(int emittingSubtaskId, int emittingNumberOfSubtasks);

/**
* Commits all due committables if all respective committables of the specific subtask and
* checkpoint have been received.
*
* @param committer used to commit to the external system
* @return successfully committed committables with meta information
* @throws IOException
* @throws InterruptedException
*/
Collection<CommittableWithLineage<CommT>> commit(Committer<CommT> committer)
throws IOException, InterruptedException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.streaming.runtime.operators.sink.committables;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.metrics.groups.SinkCommitterMetricGroup;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
Expand All @@ -32,6 +33,7 @@
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;

import static org.apache.flink.util.Preconditions.checkArgument;
Expand All @@ -42,39 +44,43 @@ class CheckpointCommittableManagerImpl<CommT> implements CheckpointCommittableMa
private final Map<Integer, SubtaskCommittableManager<CommT>> subtasksCommittableManagers;

private final long checkpointId;
private final int subtaskId;
private final int numberOfSubtasks;
private final SinkCommitterMetricGroup metricGroup;

private static final Logger LOG =
LoggerFactory.getLogger(CheckpointCommittableManagerImpl.class);

CheckpointCommittableManagerImpl(
int subtaskId,
int numberOfSubtasks,
long checkpointId,
SinkCommitterMetricGroup metricGroup) {
this(new HashMap<>(), subtaskId, numberOfSubtasks, checkpointId, metricGroup);
}

@VisibleForTesting
CheckpointCommittableManagerImpl(
Map<Integer, SubtaskCommittableManager<CommT>> subtasksCommittableManagers,
int subtaskId,
int numberOfSubtasks,
long checkpointId,
SinkCommitterMetricGroup metricGroup) {
this.subtasksCommittableManagers = checkNotNull(subtasksCommittableManagers);
this.subtaskId = subtaskId;
this.numberOfSubtasks = numberOfSubtasks;
this.checkpointId = checkpointId;
this.metricGroup = metricGroup;
}

public static <CommT> CheckpointCommittableManagerImpl<CommT> forSummary(
CommittableSummary<CommT> summary, SinkCommitterMetricGroup metricGroup) {
return new CheckpointCommittableManagerImpl<>(
new HashMap<>(),
summary.getNumberOfSubtasks(),
summary.getCheckpointIdOrEOI(),
metricGroup);
}

@Override
public long getCheckpointId() {
return checkpointId;
}

@Override
public int getNumberOfSubtasks() {
return numberOfSubtasks;
}

Collection<SubtaskCommittableManager<CommT>> getSubtaskCommittableManagers() {
return subtasksCommittableManagers.values();
}
Expand All @@ -83,7 +89,10 @@ void addSummary(CommittableSummary<CommT> summary) {
long checkpointId = summary.getCheckpointIdOrEOI();
SubtaskCommittableManager<CommT> manager =
new SubtaskCommittableManager<>(
summary.getNumberOfCommittables(), subtaskId, checkpointId, metricGroup);
summary.getNumberOfCommittables(),
summary.getSubtaskId(),
checkpointId,
metricGroup);
if (checkpointId == CommittableMessage.EOI) {
SubtaskCommittableManager<CommT> merged =
subtasksCommittableManagers.merge(
Expand Down Expand Up @@ -117,10 +126,11 @@ SubtaskCommittableManager<CommT> getSubtaskCommittableManager(int subtaskId) {
}

@Override
public CommittableSummary<CommT> getSummary() {
public CommittableSummary<CommT> getSummary(
int emittingSubtaskId, int emittingNumberOfSubtasks) {
return new CommittableSummary<>(
subtaskId,
numberOfSubtasks,
emittingSubtaskId,
emittingNumberOfSubtasks,
checkpointId,
subtasksCommittableManagers.values().stream()
.mapToInt(SubtaskCommittableManager::getNumCommittables)
Expand Down Expand Up @@ -180,9 +190,39 @@ CheckpointCommittableManagerImpl<CommT> copy() {
return new CheckpointCommittableManagerImpl<>(
subtasksCommittableManagers.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, (e) -> e.getValue().copy())),
subtaskId,
numberOfSubtasks,
checkpointId,
metricGroup);
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
CheckpointCommittableManagerImpl<?> that = (CheckpointCommittableManagerImpl<?>) o;
return checkpointId == that.checkpointId
&& numberOfSubtasks == that.numberOfSubtasks
&& Objects.equals(subtasksCommittableManagers, that.subtasksCommittableManagers);
}

@Override
public int hashCode() {
return Objects.hash(subtasksCommittableManagers, checkpointId, numberOfSubtasks);
}

@Override
public String toString() {
return "CheckpointCommittableManagerImpl{"
+ "numberOfSubtasks="
+ numberOfSubtasks
+ ", checkpointId="
+ checkpointId
+ ", subtasksCommittableManagers="
+ subtasksCommittableManagers
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.metrics.groups.SinkCommitterMetricGroup;

import java.util.Objects;

/**
* Internal implementation to commit a specific committable and handle the response.
*
Expand Down Expand Up @@ -118,4 +120,35 @@ void setCommittedIfNoError() {
CommitRequestImpl<CommT> copy() {
return new CommitRequestImpl<>(committable, numRetries, state, metricGroup);
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
CommitRequestImpl<?> that = (CommitRequestImpl<?>) o;
return numRetries == that.numRetries
&& Objects.equals(committable, that.committable)
&& state == that.state;
}

@Override
public int hashCode() {
return Objects.hash(committable, numRetries, state);
}

@Override
public String toString() {
return "CommitRequestImpl{"
+ "state="
+ state
+ ", numRetries="
+ numRetries
+ ", committable="
+ committable
+ '}';
}
}
Loading

0 comments on commit 8dc212c

Please sign in to comment.