, Void>, BoundedOneInput {
+/**
+ * Implements the {@code GlobalCommitter}.
+ *
+ * This operator usually trails behind a {@code CommitterOperator}. In this case, the global
+ * committer will receive committables from the committer operator through {@link
+ * #processElement(StreamRecord)}. Once all committables from all subtasks have been received, the
+ * global committer will commit them. This approach also works for any number of intermediate custom
+ * operators between the committer and the global committer in a custom post-commit topology.
+ *
+ *
That means that the global committer will not wait for {@link
+ * #notifyCheckpointComplete(long)}. In many cases, it receives the callback before the actual
+ * committables anyway. So it would effectively globally commit one checkpoint later.
+ *
+ *
However, we can leverage the following observation: the global committer will only receive
+ * committables iff the respective checkpoint was completed and upstream committers received the
+ * {@link #notifyCheckpointComplete(long)}. So by waiting for all committables of a given
+ * checkpoint, we implicitly know that the checkpoint was successful and the global committer is
+ * supposed to globally commit.
+ *
+ *
Note that committables of checkpoint X are not checkpointed in X because the global committer
+ * is trailing behind the checkpoint. They are replayed from the committer state in case of an
+ * error. The state only includes incomplete checkpoints coming from upstream committers not
+ * receiving {@link #notifyCheckpointComplete(long)}. All committables received are successful.
+ *
+ *
In rare cases, the GlobalCommitterOperator may not be connected (in)directly to a committer
+ * but instead is connected (in)directly to a writer. In this case, the global committer needs to
+ * perform the 2PC protocol instead of the committer. Thus, we absolutely need to use {@link
+ * #notifyCheckpointComplete(long)} similarly to the {@code CommitterOperator}. Hence, {@link
+ * #commitOnInput} is set to false in this case. In particular, the following three prerequisites
+ * must be met:
+ *
+ *
+ * - No committer is upstream of which we could implicitly infer {@link
+ * #notifyCheckpointComplete(long)} as sketched above.
+ *
- The application runs in streaming mode.
+ *
- Checkpointing is enabled.
+ *
+ *
+ * In all other cases (batch or upstream committer or checkpointing is disabled), the global
+ * committer commits on input.
+ */
+@Internal
+public class GlobalCommitterOperator extends AbstractStreamOperator
+ implements OneInputStreamOperator, Void> {
/** The operator's state descriptor. */
private static final ListStateDescriptor GLOBAL_COMMITTER_OPERATOR_RAW_STATES_DESC =
@@ -65,6 +106,11 @@ class GlobalCommitterOperator extends AbstractStreamOperator
private final SerializableSupplier> committerFactory;
private final SerializableSupplier>
committableSerializerFactory;
+ /**
+ * Depending on whether there is an upstream committer or it's connected to a writer, we may
+ * either wait for notifyCheckpointCompleted or not.
+ */
+ private final boolean commitOnInput;
private ListState> globalCommitterState;
private Committer committer;
@@ -72,16 +118,19 @@ class GlobalCommitterOperator extends AbstractStreamOperator
private long lastCompletedCheckpointId = -1;
private SimpleVersionedSerializer committableSerializer;
private SinkCommitterMetricGroup metricGroup;
+ private int maxRetries;
@Nullable private GlobalCommitter globalCommitter;
@Nullable private SimpleVersionedSerializer globalCommittableSerializer;
private List sinkV1State = new ArrayList<>();
- GlobalCommitterOperator(
+ public GlobalCommitterOperator(
SerializableSupplier> committerFactory,
- SerializableSupplier> committableSerializerFactory) {
+ SerializableSupplier> committableSerializerFactory,
+ boolean commitOnInput) {
this.committerFactory = checkNotNull(committerFactory);
this.committableSerializerFactory = checkNotNull(committableSerializerFactory);
+ this.commitOnInput = commitOnInput;
}
@Override
@@ -92,7 +141,7 @@ public void setup(
super.setup(containingTask, config, output);
committer = committerFactory.get();
metricGroup = InternalSinkCommitterMetricGroup.wrap(metrics);
- committableCollector = CommittableCollector.of(getRuntimeContext(), metricGroup);
+ committableCollector = CommittableCollector.of(metricGroup);
committableSerializer = committableSerializerFactory.get();
if (committer instanceof SinkV1Adapter.GlobalCommitterAdapter) {
final SinkV1Adapter, CommT, ?, GlobalCommT>.GlobalCommitterAdapter gc =
@@ -100,6 +149,7 @@ public void setup(
globalCommitter = gc.getGlobalCommitter();
globalCommittableSerializer = gc.getGlobalCommittableSerializer();
}
+ maxRetries = config.getConfiguration().get(SinkOptions.COMMITTER_RETRIES);
}
@Override
@@ -115,24 +165,11 @@ public void snapshotState(StateSnapshotContext context) throws Exception {
@Override
public void initializeState(StateInitializationContext context) throws Exception {
super.initializeState(context);
- final CommittableCollectorSerializer committableCollectorSerializer =
- new CommittableCollectorSerializer<>(
- committableSerializer,
- getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(),
- getRuntimeContext().getTaskInfo().getMaxNumberOfParallelSubtasks(),
- metricGroup);
- final SimpleVersionedSerializer> serializer =
- new GlobalCommitterSerializer<>(
- committableCollectorSerializer,
- globalCommittableSerializer,
- getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(),
- getRuntimeContext().getTaskInfo().getMaxNumberOfParallelSubtasks(),
- metricGroup);
globalCommitterState =
new SimpleVersionedListState<>(
context.getOperatorStateStore()
.getListState(GLOBAL_COMMITTER_OPERATOR_RAW_STATES_DESC),
- serializer);
+ getCommitterStateSerializer());
if (context.isRestored()) {
globalCommitterState
.get()
@@ -141,57 +178,65 @@ public void initializeState(StateInitializationContext context) throws Exception
sinkV1State.addAll(cc.getGlobalCommittables());
committableCollector.merge(cc.getCommittableCollector());
});
- lastCompletedCheckpointId = context.getRestoredCheckpointId().getAsLong();
if (globalCommitter != null) {
sinkV1State = globalCommitter.filterRecoveredCommittables(sinkV1State);
}
// try to re-commit recovered transactions as quickly as possible
- commit(lastCompletedCheckpointId);
+ if (context.getRestoredCheckpointId().isPresent()) {
+ commit(context.getRestoredCheckpointId().getAsLong());
+ }
}
}
+ private SimpleVersionedSerializer>
+ getCommitterStateSerializer() {
+ final CommittableCollectorSerializer committableCollectorSerializer =
+ new CommittableCollectorSerializer<>(
+ committableSerializer,
+ getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(),
+ getRuntimeContext().getTaskInfo().getMaxNumberOfParallelSubtasks(),
+ metricGroup);
+ return new GlobalCommitterSerializer<>(
+ committableCollectorSerializer, globalCommittableSerializer, metricGroup);
+ }
+
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
super.notifyCheckpointComplete(checkpointId);
checkState(
globalCommitter != null || sinkV1State.isEmpty(),
"GlobalCommitter is required to commit SinkV1 state.");
- lastCompletedCheckpointId = Math.max(lastCompletedCheckpointId, checkpointId);
- commit(lastCompletedCheckpointId);
- }
-
- private Collection extends CheckpointCommittableManager> getCommittables(
- long checkpointId) {
- final Collection extends CheckpointCommittableManager> committables =
- committableCollector.getCheckpointCommittablesUpTo(checkpointId);
- if (committables == null) {
- return Collections.emptyList();
+ if (!commitOnInput) {
+ commit(checkpointId);
}
- return committables;
}
- private void commit(long checkpointId) throws IOException, InterruptedException {
+ private void commit(long checkpointIdOrEOI) throws IOException, InterruptedException {
if (globalCommitter != null && !sinkV1State.isEmpty()) {
sinkV1State = globalCommitter.commit(sinkV1State);
}
- for (CheckpointCommittableManager committable : getCommittables(checkpointId)) {
- committable.commit(committer);
- }
- }
- @Override
- public void endInput() throws Exception {
- final CommittableManager endOfInputCommittable =
- committableCollector.getEndOfInputCommittable();
- if (endOfInputCommittable != null) {
- do {
- endOfInputCommittable.commit(committer);
- } while (!committableCollector.isFinished());
+ lastCompletedCheckpointId = Math.max(lastCompletedCheckpointId, checkpointIdOrEOI);
+ for (CheckpointCommittableManager checkpointManager :
+ committableCollector.getCheckpointCommittablesUpTo(lastCompletedCheckpointId)) {
+ if (!checkpointManager.hasGloballyReceivedAll()) {
+ return;
+ }
+ checkpointManager.commit(committer, maxRetries);
+ committableCollector.remove(checkpointManager);
}
}
@Override
public void processElement(StreamRecord> element) throws Exception {
committableCollector.addMessage(element.getValue());
+
+ // commitOnInput implies that the global committer is not using notifyCheckpointComplete.
+ // Instead, it commits as soon as it receives all committables of a specific checkpoint.
+ // For commitOnInput=false, lastCompletedCheckpointId is only updated on
+ // notifyCheckpointComplete.
+ if (commitOnInput) {
+ commit(element.getValue().getCheckpointIdOrEOI());
+ }
}
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterSerializer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterSerializer.java
index 36b385835175c..a8c5ae5497883 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterSerializer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterSerializer.java
@@ -48,20 +48,14 @@ class GlobalCommitterSerializer
private final CommittableCollectorSerializer committableCollectorSerializer;
@Nullable private final SimpleVersionedSerializer globalCommittableSerializer;
- private final int subtaskId;
- private final int numberOfSubtasks;
private final SinkCommitterMetricGroup metricGroup;
GlobalCommitterSerializer(
CommittableCollectorSerializer committableCollectorSerializer,
@Nullable SimpleVersionedSerializer globalCommittableSerializer,
- int subtaskId,
- int numberOfSubtasks,
SinkCommitterMetricGroup metricGroup) {
this.committableCollectorSerializer = checkNotNull(committableCollectorSerializer);
this.globalCommittableSerializer = globalCommittableSerializer;
- this.subtaskId = subtaskId;
- this.numberOfSubtasks = numberOfSubtasks;
this.metricGroup = metricGroup;
}
@@ -111,8 +105,7 @@ private GlobalCommittableWrapper deserializeV1(DataInputView
SinkV1CommittableDeserializer.readVersionAndDeserializeList(
globalCommittableSerializer, in);
return new GlobalCommittableWrapper<>(
- new CommittableCollector<>(subtaskId, numberOfSubtasks, metricGroup),
- globalCommittables);
+ new CommittableCollector<>(metricGroup), globalCommittables);
}
private GlobalCommittableWrapper deserializeV2(DataInputView in)
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/StandardSinkTopologies.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/StandardSinkTopologies.java
index 956675c2b8e23..691a797752eee 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/StandardSinkTopologies.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/StandardSinkTopologies.java
@@ -19,12 +19,10 @@
package org.apache.flink.streaming.api.connector.sink2;
import org.apache.flink.annotation.Experimental;
-import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.operators.ChainingStrategy;
-import org.apache.flink.streaming.api.transformations.PhysicalTransformation;
+import org.apache.flink.streaming.api.transformations.GlobalCommitterTransform;
import org.apache.flink.util.function.SerializableSupplier;
/** This utility class provides building blocks for custom topologies. */
@@ -43,19 +41,10 @@ public static void addGlobalCommitter(
DataStream> committables,
SerializableSupplier> committerFactory,
SerializableSupplier> committableSerializer) {
- final PhysicalTransformation transformation =
- (PhysicalTransformation)
- committables
- .global()
- .transform(
- GLOBAL_COMMITTER_TRANSFORMATION_NAME,
- Types.VOID,
- new GlobalCommitterOperator<>(
- committerFactory, committableSerializer))
- .getTransformation();
- transformation.setChainingStrategy(ChainingStrategy.ALWAYS);
- transformation.setName(GLOBAL_COMMITTER_TRANSFORMATION_NAME);
- transformation.setParallelism(1);
- transformation.setMaxParallelism(1);
+ committables
+ .getExecutionEnvironment()
+ .addOperator(
+ new GlobalCommitterTransform<>(
+ committables, committerFactory, committableSerializer));
}
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/SupportsPreCommitTopology.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/SupportsPreCommitTopology.java
index 67f277b1b45be..5470b92383aa9 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/SupportsPreCommitTopology.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/SupportsPreCommitTopology.java
@@ -19,6 +19,7 @@
package org.apache.flink.streaming.api.connector.sink2;
import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.core.io.SimpleVersionedSerializer;
@@ -30,6 +31,18 @@
*
* It is recommended to use immutable committables because mutating committables can have
* unexpected side-effects.
+ *
+ *
It's important that all {@link CommittableMessage}s are modified appropriately, such that all
+ * messages with the same subtask id will also be processed by the same {@link Committer} subtask
+ * and the {@link CommittableSummary} matches the respective count. If committables are combined or
+ * split in any way, the summary needs to be adjusted.
+ *
+ *
There is also no requirement to keep the subtask ids of the writer, they can be changed as
+ * long as there are no two summaries with the same subtask ids (and corresponding {@link
+ * CommittableWithLineage}). Subtask ids don't need to be consecutive or small. The global committer
+ * will use {@link CommittableSummary#getNumberOfSubtasks()} to determine if all committables have
+ * been received, so that number needs to correctly reflect the number of distinct subtask ids. The
+ * easiest way to guarantee all of this is to use {@link RuntimeContext#getTaskInfo()}.
*/
@Experimental
public interface SupportsPreCommitTopology {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
index 0e20431516e3a..2a64fabc7111b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
@@ -53,6 +53,7 @@
import org.apache.flink.streaming.api.transformations.CacheTransformation;
import org.apache.flink.streaming.api.transformations.CoFeedbackTransformation;
import org.apache.flink.streaming.api.transformations.FeedbackTransformation;
+import org.apache.flink.streaming.api.transformations.GlobalCommitterTransform;
import org.apache.flink.streaming.api.transformations.KeyedBroadcastStateTransformation;
import org.apache.flink.streaming.api.transformations.KeyedMultipleInputTransformation;
import org.apache.flink.streaming.api.transformations.LegacySinkTransformation;
@@ -72,6 +73,7 @@
import org.apache.flink.streaming.api.transformations.WithBoundedness;
import org.apache.flink.streaming.runtime.translators.BroadcastStateTransformationTranslator;
import org.apache.flink.streaming.runtime.translators.CacheTransformationTranslator;
+import org.apache.flink.streaming.runtime.translators.GlobalCommitterTransformationTranslator;
import org.apache.flink.streaming.runtime.translators.KeyedBroadcastStateTransformationTranslator;
import org.apache.flink.streaming.runtime.translators.LegacySinkTransformationTranslator;
import org.apache.flink.streaming.runtime.translators.LegacySourceTransformationTranslator;
@@ -184,6 +186,7 @@ public class StreamGraphGenerator {
tmp.put(KeyedMultipleInputTransformation.class, new MultiInputTransformationTranslator<>());
tmp.put(SourceTransformation.class, new SourceTransformationTranslator<>());
tmp.put(SinkTransformation.class, new SinkTransformationTranslator<>());
+ tmp.put(GlobalCommitterTransform.class, new GlobalCommitterTransformationTranslator<>());
tmp.put(LegacySinkTransformation.class, new LegacySinkTransformationTranslator<>());
tmp.put(LegacySourceTransformation.class, new LegacySourceTransformationTranslator<>());
tmp.put(UnionTransformation.class, new UnionTransformationTranslator<>());
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/GlobalCommitterTransform.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/GlobalCommitterTransform.java
new file mode 100644
index 0000000000000..a7dc311e89676
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/GlobalCommitterTransform.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.transformations;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.connector.sink2.Committer;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
+import org.apache.flink.streaming.api.connector.sink2.StandardSinkTopologies;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.util.function.SerializableSupplier;
+
+import org.apache.flink.shaded.guava31.com.google.common.collect.Lists;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Transformation for global committer. Only used to fetch if the pipeline is streaming or batch
+ * with the respective {@link
+ * org.apache.flink.streaming.runtime.translators.GlobalCommitterTransformationTranslator}.
+ *
+ * @param
+ */
+@Internal
+public class GlobalCommitterTransform extends Transformation {
+
+ private final DataStream> inputStream;
+ private final SerializableSupplier> committerFactory;
+ private final SerializableSupplier> committableSerializer;
+
+ public GlobalCommitterTransform(
+ DataStream> inputStream,
+ SerializableSupplier> committerFactory,
+ SerializableSupplier> committableSerializer) {
+ super(StandardSinkTopologies.GLOBAL_COMMITTER_TRANSFORMATION_NAME, Types.VOID, 1, true);
+ this.inputStream = inputStream;
+ this.committerFactory = committerFactory;
+ this.committableSerializer = committableSerializer;
+ }
+
+ @Override
+ protected List> getTransitivePredecessorsInternal() {
+ final List> result = Lists.newArrayList();
+ result.add(this);
+ result.addAll(inputStream.getTransformation().getTransitivePredecessors());
+ return result;
+ }
+
+ @Override
+ public List> getInputs() {
+ return Collections.singletonList(inputStream.getTransformation());
+ }
+
+ public DataStream> getInputStream() {
+ return inputStream;
+ }
+
+ public SerializableSupplier> getCommitterFactory() {
+ return committerFactory;
+ }
+
+ public SerializableSupplier> getCommittableSerializer() {
+ return committableSerializer;
+ }
+}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java
index e33594475e806..10ae86cf10de0 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java
@@ -22,12 +22,14 @@
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.api.connector.sink2.CommitterInitContext;
+import org.apache.flink.configuration.SinkOptions;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.metrics.groups.SinkCommitterMetricGroup;
import org.apache.flink.runtime.metrics.groups.InternalSinkCommitterMetricGroup;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
+import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
@@ -39,7 +41,6 @@
import org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManager;
import org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollector;
import org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollectorSerializer;
-import org.apache.flink.streaming.runtime.operators.sink.committables.CommittableManager;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
@@ -66,7 +67,6 @@ class CommitterOperator extends AbstractStreamOperator, CommittableMessage>,
BoundedOneInput {
- private static final long RETRY_DELAY = 1000;
private final SimpleVersionedSerializer committableSerializer;
private final FunctionWithException, IOException>
committerSupplier;
@@ -77,6 +77,7 @@ class CommitterOperator extends AbstractStreamOperator committer;
private CommittableCollector committableCollector;
private long lastCompletedCheckpointId = -1;
+ private int maxRetries;
private boolean endInput = false;
@@ -111,7 +112,8 @@ public void setup(
Output>> output) {
super.setup(containingTask, config, output);
metricGroup = InternalSinkCommitterMetricGroup.wrap(getMetricGroup());
- committableCollector = CommittableCollector.of(getRuntimeContext(), metricGroup);
+ committableCollector = CommittableCollector.of(metricGroup);
+ maxRetries = config.getConfiguration().get(SinkOptions.COMMITTER_RETRIES);
}
@Override
@@ -162,49 +164,47 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception {
private void commitAndEmitCheckpoints() throws IOException, InterruptedException {
long completedCheckpointId = endInput ? EOI : lastCompletedCheckpointId;
- do {
- for (CheckpointCommittableManager manager :
- committableCollector.getCheckpointCommittablesUpTo(completedCheckpointId)) {
- commitAndEmit(manager);
- }
- // !committableCollector.isFinished() indicates that we should retry
- // Retry should be done here if this is a final checkpoint (indicated by endInput)
- // WARN: this is an endless retry, may make the job stuck while finishing
- } while (!committableCollector.isFinished() && endInput);
-
- if (!committableCollector.isFinished()) {
- // if not endInput, we can schedule retrying later
- retryWithDelay();
+ for (CheckpointCommittableManager checkpointManager :
+ committableCollector.getCheckpointCommittablesUpTo(completedCheckpointId)) {
+ // ensure that all committables of the first checkpoint are fully committed before
+ // attempting the next committable
+ commitAndEmit(checkpointManager);
+ committableCollector.remove(checkpointManager);
}
}
- private void commitAndEmit(CommittableManager committableManager)
+ private void commitAndEmit(CheckpointCommittableManager committableManager)
throws IOException, InterruptedException {
- Collection> committed = committableManager.commit(committer);
- if (emitDownstream && !committed.isEmpty()) {
- output.collect(new StreamRecord<>(committableManager.getSummary()));
- for (CommittableWithLineage committable : committed) {
- output.collect(new StreamRecord<>(committable));
- }
+ committableManager.commit(committer, maxRetries);
+ if (emitDownstream) {
+ emit(committableManager);
}
}
- private void retryWithDelay() {
- processingTimeService.registerTimer(
- processingTimeService.getCurrentProcessingTime() + RETRY_DELAY,
- ts -> commitAndEmitCheckpoints());
+ private void emit(CheckpointCommittableManager committableManager) {
+ int subtaskId = getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
+ int numberOfSubtasks = getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks();
+ long checkpointId = committableManager.getCheckpointId();
+ Collection committables = committableManager.getSuccessfulCommittables();
+ output.collect(
+ new StreamRecord<>(
+ new CommittableSummary<>(
+ subtaskId,
+ numberOfSubtasks,
+ checkpointId,
+ committables.size(),
+ 0,
+ 0)));
+ for (CommT committable : committables) {
+ output.collect(
+ new StreamRecord<>(
+ new CommittableWithLineage<>(committable, checkpointId, subtaskId)));
+ }
}
@Override
public void processElement(StreamRecord> element) throws Exception {
committableCollector.addMessage(element.getValue());
-
- // in case of unaligned checkpoint, we may receive notifyCheckpointComplete before the
- // committables
- long checkpointId = element.getValue().getCheckpointIdOrEOI();
- if (checkpointId <= lastCompletedCheckpointId) {
- commitAndEmitCheckpoints();
- }
}
@Override
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManager.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManager.java
index f6525dd27f746..4e34fbbe698c4 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManager.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManager.java
@@ -19,18 +19,52 @@
package org.apache.flink.streaming.runtime.operators.sink.committables;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.sink2.Committer;
+
+import java.io.IOException;
+import java.util.Collection;
/**
- * This interface adds checkpoint meta information to the committable.
+ * A {@code CheckpointCommittableManager} collects committables for one checkpoint across
+ * potentially multiple upstream subtasks.
+ *
+ * While it collects committables from multiple upstream subtasks, it belongs to exactly one
+ * committer subtask.
+ *
+ *
Each upstream subtask of this particular checkpoint is represented by a {@link
+ * SubtaskCommittableManager}.
*
* @param type of the committable
*/
@Internal
-public interface CheckpointCommittableManager extends CommittableManager {
+public interface CheckpointCommittableManager {
/**
- * Returns the checkpoint id in which the committable was created.
+ * Returns the checkpoint id in which the committables were created.
*
* @return checkpoint id
*/
long getCheckpointId();
+
+ /** Returns the number of upstream subtasks belonging to the checkpoint. */
+ int getNumberOfSubtasks();
+
+ boolean isFinished();
+
+ /**
+ * Returns true if all committables of all upstream subtasks arrived, which is only guaranteed
+ * to happen if the DOP of the caller is 1.
+ */
+ boolean hasGloballyReceivedAll();
+
+ /**
+ * Commits all due committables if all respective committables of the specific subtask and
+ * checkpoint have been received.
+ *
+ * @param committer used to commit to the external system
+ * @param maxRetries
+ */
+ void commit(Committer committer, int maxRetries)
+ throws IOException, InterruptedException;
+
+ Collection getSuccessfulCommittables();
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java
index eeac41eb3c98f..da4491cda617d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java
@@ -18,21 +18,25 @@
package org.apache.flink.streaming.runtime.operators.sink.committables;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.metrics.groups.SinkCommitterMetricGroup;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
+import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.Objects;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -42,39 +46,43 @@ class CheckpointCommittableManagerImpl implements CheckpointCommittableMa
private final Map> subtasksCommittableManagers;
private final long checkpointId;
- private final int subtaskId;
private final int numberOfSubtasks;
private final SinkCommitterMetricGroup metricGroup;
private static final Logger LOG =
LoggerFactory.getLogger(CheckpointCommittableManagerImpl.class);
- CheckpointCommittableManagerImpl(
- int subtaskId,
- int numberOfSubtasks,
- long checkpointId,
- SinkCommitterMetricGroup metricGroup) {
- this(new HashMap<>(), subtaskId, numberOfSubtasks, checkpointId, metricGroup);
- }
-
+ @VisibleForTesting
CheckpointCommittableManagerImpl(
Map> subtasksCommittableManagers,
- int subtaskId,
int numberOfSubtasks,
long checkpointId,
SinkCommitterMetricGroup metricGroup) {
this.subtasksCommittableManagers = checkNotNull(subtasksCommittableManagers);
- this.subtaskId = subtaskId;
this.numberOfSubtasks = numberOfSubtasks;
this.checkpointId = checkpointId;
this.metricGroup = metricGroup;
}
+ public static CheckpointCommittableManagerImpl forSummary(
+ CommittableSummary summary, SinkCommitterMetricGroup metricGroup) {
+ return new CheckpointCommittableManagerImpl<>(
+ new HashMap<>(),
+ summary.getNumberOfSubtasks(),
+ summary.getCheckpointIdOrEOI(),
+ metricGroup);
+ }
+
@Override
public long getCheckpointId() {
return checkpointId;
}
+ @Override
+ public int getNumberOfSubtasks() {
+ return numberOfSubtasks;
+ }
+
Collection> getSubtaskCommittableManagers() {
return subtasksCommittableManagers.values();
}
@@ -83,7 +91,10 @@ void addSummary(CommittableSummary summary) {
long checkpointId = summary.getCheckpointIdOrEOI();
SubtaskCommittableManager manager =
new SubtaskCommittableManager<>(
- summary.getNumberOfCommittables(), subtaskId, checkpointId, metricGroup);
+ summary.getNumberOfCommittables(),
+ summary.getSubtaskId(),
+ checkpointId,
+ metricGroup);
if (checkpointId == CommittableMessage.EOI) {
SubtaskCommittableManager merged =
subtasksCommittableManagers.merge(
@@ -117,50 +128,72 @@ SubtaskCommittableManager getSubtaskCommittableManager(int subtaskId) {
}
@Override
- public CommittableSummary getSummary() {
- return new CommittableSummary<>(
- subtaskId,
- numberOfSubtasks,
- checkpointId,
- subtasksCommittableManagers.values().stream()
- .mapToInt(SubtaskCommittableManager::getNumCommittables)
- .sum(),
- subtasksCommittableManagers.values().stream()
- .mapToInt(SubtaskCommittableManager::getNumPending)
- .sum(),
- subtasksCommittableManagers.values().stream()
- .mapToInt(SubtaskCommittableManager::getNumFailed)
- .sum());
- }
-
- boolean isFinished() {
+ public boolean isFinished() {
return subtasksCommittableManagers.values().stream()
.allMatch(SubtaskCommittableManager::isFinished);
}
@Override
- public Collection> commit(Committer committer)
+ public boolean hasGloballyReceivedAll() {
+ return subtasksCommittableManagers.size() == numberOfSubtasks
+ && subtasksCommittableManagers.values().stream()
+ .allMatch(SubtaskCommittableManager::hasReceivedAll);
+ }
+
+ @Override
+ public void commit(Committer committer, int maxRetries)
throws IOException, InterruptedException {
- Collection> requests = getPendingRequests(true);
- requests.forEach(CommitRequestImpl::setSelected);
- committer.commit(new ArrayList<>(requests));
- requests.forEach(CommitRequestImpl::setCommittedIfNoError);
- Collection> committed = drainFinished();
- metricGroup.setCurrentPendingCommittablesGauge(() -> getPendingRequests(false).size());
- return committed;
+ Collection> requests =
+ getPendingRequests().collect(Collectors.toList());
+ for (int retry = 0; !requests.isEmpty() && retry <= maxRetries; retry++) {
+ requests.forEach(CommitRequestImpl::setSelected);
+ committer.commit(Collections.unmodifiableCollection(requests));
+ requests.forEach(CommitRequestImpl::setCommittedIfNoError);
+ requests = requests.stream().filter(r -> !r.isFinished()).collect(Collectors.toList());
+ }
+ if (!requests.isEmpty()) {
+ throw new IOException(
+ String.format(
+ "Failed to commit %s committables after %s retries: %s",
+ requests.size(), maxRetries, requests));
+ }
}
- Collection> getPendingRequests(boolean onlyIfFullyReceived) {
+ @Override
+ public Collection getSuccessfulCommittables() {
return subtasksCommittableManagers.values().stream()
- .filter(subtask -> !onlyIfFullyReceived || subtask.hasReceivedAll())
- .flatMap(SubtaskCommittableManager::getPendingRequests)
+ .flatMap(SubtaskCommittableManager::getSuccessfulCommittables)
.collect(Collectors.toList());
}
- Collection> drainFinished() {
+ Stream> getPendingRequests() {
return subtasksCommittableManagers.values().stream()
- .flatMap(subtask -> subtask.drainCommitted().stream())
- .collect(Collectors.toList());
+ .peek(this::assertReceivedAll)
+ .flatMap(SubtaskCommittableManager::getPendingRequests);
+ }
+
+ /**
+ * For committers: Sinks don't use unaligned checkpoints, so we receive all committables of a
+ * given upstream task before the respective barrier. Thus, when the barrier reaches the
+ * committer, all committables of a specific checkpoint must have been received. Committing
+ * happens even later on notifyCheckpointComplete.
+ *
+ * Global committers need to ensure that all committables of all subtasks have been received
+ * with {@link #hasGloballyReceivedAll()} before trying to commit. Naturally, this method then
+ * becomes a no-op.
+ *
+ *
Note that by transitivity, the assertion also holds for committables of subsumed
+ * checkpoints.
+ *
+ *
This assertion will fail in case of bugs in the writer or in the pre-commit topology if
+ * present.
+ */
+ private void assertReceivedAll(SubtaskCommittableManager subtask) {
+ Preconditions.checkArgument(
+ subtask.hasReceivedAll(),
+ "Trying to commit incomplete batch of committables subtask=%s, manager=%s",
+ subtask.getSubtaskId(),
+ this);
}
CheckpointCommittableManagerImpl merge(CheckpointCommittableManagerImpl other) {
@@ -179,9 +212,39 @@ CheckpointCommittableManagerImpl copy() {
return new CheckpointCommittableManagerImpl<>(
subtasksCommittableManagers.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, (e) -> e.getValue().copy())),
- subtaskId,
numberOfSubtasks,
checkpointId,
metricGroup);
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ CheckpointCommittableManagerImpl> that = (CheckpointCommittableManagerImpl>) o;
+ return checkpointId == that.checkpointId
+ && numberOfSubtasks == that.numberOfSubtasks
+ && Objects.equals(subtasksCommittableManagers, that.subtasksCommittableManagers);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(subtasksCommittableManagers, checkpointId, numberOfSubtasks);
+ }
+
+ @Override
+ public String toString() {
+ return "CheckpointCommittableManagerImpl{"
+ + "numberOfSubtasks="
+ + numberOfSubtasks
+ + ", checkpointId="
+ + checkpointId
+ + ", subtasksCommittableManagers="
+ + subtasksCommittableManagers
+ + '}';
+ }
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommitRequestImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommitRequestImpl.java
index 8837f2afb3291..e03ef3060784f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommitRequestImpl.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommitRequestImpl.java
@@ -22,6 +22,8 @@
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.metrics.groups.SinkCommitterMetricGroup;
+import java.util.Objects;
+
/**
* Internal implementation to commit a specific committable and handle the response.
*
@@ -118,4 +120,35 @@ void setCommittedIfNoError() {
CommitRequestImpl copy() {
return new CommitRequestImpl<>(committable, numRetries, state, metricGroup);
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ CommitRequestImpl> that = (CommitRequestImpl>) o;
+ return numRetries == that.numRetries
+ && Objects.equals(committable, that.committable)
+ && state == that.state;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(committable, numRetries, state);
+ }
+
+ @Override
+ public String toString() {
+ return "CommitRequestImpl{"
+ + "state="
+ + state
+ + ", numRetries="
+ + numRetries
+ + ", committable="
+ + committable
+ + '}';
+ }
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollector.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollector.java
index 2dac78c71ea7b..4e49d73279e48 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollector.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollector.java
@@ -19,7 +19,6 @@
package org.apache.flink.streaming.runtime.operators.sink.committables;
import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.connector.sink2.InitContext;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.metrics.groups.SinkCommitterMetricGroup;
@@ -27,21 +26,24 @@
import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
-import javax.annotation.Nullable;
-
+import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NavigableMap;
+import java.util.Objects;
+import java.util.Optional;
import java.util.TreeMap;
import java.util.stream.Collectors;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
- * This class is responsible to book-keep the committing progress across checkpoints and subtasks.
- * It handles the emission of committables and the {@link CommittableSummary}.
+ * This class is responsible to book-keep the committing progress across checkpoints and upstream
+ * subtasks.
+ *
+ * Each checkpoint in turn is handled by a {@link CheckpointCommittableManager}.
*
* @param type of committable
*/
@@ -51,47 +53,38 @@ public class CommittableCollector {
/** Mapping of checkpoint id to {@link CheckpointCommittableManagerImpl}. */
private final NavigableMap>
checkpointCommittables;
- /** Denotes the subtask id the collector is running. */
- private final int subtaskId;
- private final int numberOfSubtasks;
private final SinkCommitterMetricGroup metricGroup;
- public CommittableCollector(
- int subtaskId, int numberOfSubtasks, SinkCommitterMetricGroup metricGroup) {
- this.subtaskId = subtaskId;
- this.numberOfSubtasks = numberOfSubtasks;
- this.checkpointCommittables = new TreeMap<>();
- this.metricGroup = metricGroup;
+ public CommittableCollector(SinkCommitterMetricGroup metricGroup) {
+ this(new TreeMap<>(), metricGroup);
}
/** For deep-copy. */
CommittableCollector(
Map> checkpointCommittables,
- int subtaskId,
- int numberOfSubtasks,
SinkCommitterMetricGroup metricGroup) {
this.checkpointCommittables = new TreeMap<>(checkNotNull(checkpointCommittables));
- this.subtaskId = subtaskId;
- this.numberOfSubtasks = numberOfSubtasks;
this.metricGroup = metricGroup;
+ this.metricGroup.setCurrentPendingCommittablesGauge(this::getNumPending);
+ }
+
+ private int getNumPending() {
+ return checkpointCommittables.values().stream()
+ .mapToInt(m -> (int) m.getPendingRequests().count())
+ .sum();
}
/**
* Creates a {@link CommittableCollector} based on the current runtime information. This method
* should be used for to instantiate a collector for all Sink V2.
*
- * @param context holding runtime of information
* @param metricGroup storing the committable metrics
* @param type of the committable
* @return {@link CommittableCollector}
*/
- public static CommittableCollector of(
- RuntimeContext context, SinkCommitterMetricGroup metricGroup) {
- return new CommittableCollector<>(
- context.getTaskInfo().getIndexOfThisSubtask(),
- context.getTaskInfo().getNumberOfParallelSubtasks(),
- metricGroup);
+ public static CommittableCollector of(SinkCommitterMetricGroup metricGroup) {
+ return new CommittableCollector<>(metricGroup);
}
/**
@@ -105,8 +98,7 @@ public static CommittableCollector of(
*/
static CommittableCollector ofLegacy(
List committables, SinkCommitterMetricGroup metricGroup) {
- CommittableCollector committableCollector =
- new CommittableCollector<>(0, 1, metricGroup);
+ CommittableCollector committableCollector = new CommittableCollector<>(metricGroup);
// add a checkpoint with the lowest checkpoint id, this will be merged into the next
// checkpoint data, subtask id is arbitrary
CommittableSummary summary =
@@ -148,22 +140,16 @@ public void addMessage(CommittableMessage message) {
*/
public Collection extends CheckpointCommittableManager> getCheckpointCommittablesUpTo(
long checkpointId) {
- // clean up fully committed previous checkpoints
- // this wouldn't work with concurrent unaligned checkpoints
- Collection> checkpoints =
- checkpointCommittables.headMap(checkpointId, true).values();
- checkpoints.removeIf(CheckpointCommittableManagerImpl::isFinished);
- return checkpoints;
+ return new ArrayList<>(checkpointCommittables.headMap(checkpointId, true).values());
}
/**
- * Returns {@link CommittableManager} belonging to the last input.
+ * Returns {@link CheckpointCommittableManager} belonging to the last input.
*
* @return {@link CheckpointCommittableManager}
*/
- @Nullable
- public CommittableManager getEndOfInputCommittable() {
- return checkpointCommittables.get(EOI);
+ public Optional> getEndOfInputCommittable() {
+ return Optional.ofNullable(checkpointCommittables.get(EOI));
}
/**
@@ -194,24 +180,6 @@ public void merge(CommittableCollector cc) {
}
}
- /**
- * Returns number of subtasks.
- *
- * @return number of subtasks
- */
- public int getNumberOfSubtasks() {
- return numberOfSubtasks;
- }
-
- /**
- * Returns subtask id.
- *
- * @return subtask id.
- */
- public int getSubtaskId() {
- return subtaskId;
- }
-
/**
* Returns a new committable collector that deep copies all internals.
*
@@ -222,8 +190,6 @@ public CommittableCollector copy() {
checkpointCommittables.entrySet().stream()
.map(e -> Tuple2.of(e.getKey(), e.getValue().copy()))
.collect(Collectors.toMap((t) -> t.f0, (t) -> t.f1)),
- subtaskId,
- numberOfSubtasks,
metricGroup);
}
@@ -235,12 +201,7 @@ private void addSummary(CommittableSummary summary) {
checkpointCommittables
.computeIfAbsent(
summary.getCheckpointIdOrEOI(),
- key ->
- new CheckpointCommittableManagerImpl<>(
- subtaskId,
- numberOfSubtasks,
- summary.getCheckpointIdOrEOI(),
- metricGroup))
+ key -> CheckpointCommittableManagerImpl.forSummary(summary, metricGroup))
.addSummary(summary);
}
@@ -254,4 +215,31 @@ private CheckpointCommittableManagerImpl getCheckpointCommittables(
this.checkpointCommittables.get(committable.getCheckpointIdOrEOI());
return checkNotNull(committables, "Unknown checkpoint for %s", committable);
}
+
+ /** Removes the manager for a specific checkpoint and all it's metadata. */
+ public void remove(CheckpointCommittableManager manager) {
+ checkpointCommittables.remove(manager.getCheckpointId());
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ CommittableCollector> that = (CommittableCollector>) o;
+ return Objects.equals(checkpointCommittables, that.checkpointCommittables);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(checkpointCommittables);
+ }
+
+ @Override
+ public String toString() {
+ return "CommittableCollector{" + "checkpointCommittables=" + checkpointCommittables + '}';
+ }
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializer.java
index 0cd5930578678..f41350bd25866 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializer.java
@@ -48,18 +48,21 @@ public final class CommittableCollectorSerializer
private static final int MAGIC_NUMBER = 0xb91f252c;
private final SimpleVersionedSerializer committableSerializer;
- private final int subtaskId;
- private final int numberOfSubtasks;
+ /** Default values are used to deserialize from Flink 1 that didn't store the information. */
+ private final int owningSubtaskId;
+ /** Default values are used to deserialize from Flink 1 that didn't store the information. */
+ private final int owningNumberOfSubtasks;
+
private final SinkCommitterMetricGroup metricGroup;
public CommittableCollectorSerializer(
SimpleVersionedSerializer committableSerializer,
- int subtaskId,
- int numberOfSubtasks,
+ int owningSubtaskId,
+ int owningNumberOfSubtasks,
SinkCommitterMetricGroup metricGroup) {
this.committableSerializer = checkNotNull(committableSerializer);
- this.subtaskId = subtaskId;
- this.numberOfSubtasks = numberOfSubtasks;
+ this.owningSubtaskId = owningSubtaskId;
+ this.owningNumberOfSubtasks = owningNumberOfSubtasks;
this.metricGroup = metricGroup;
}
@@ -116,8 +119,6 @@ private CommittableCollector deserializeV2(DataInputDeserializer in) thro
.collect(
Collectors.toMap(
CheckpointCommittableManagerImpl::getCheckpointId, e -> e)),
- subtaskId,
- numberOfSubtasks,
metricGroup);
}
@@ -134,7 +135,7 @@ private class CheckpointSimpleVersionedSerializer
@Override
public int getVersion() {
- return 0;
+ return 1;
}
@Override
@@ -142,6 +143,7 @@ public byte[] serialize(CheckpointCommittableManagerImpl checkpoint)
throws IOException {
DataOutputSerializer out = new DataOutputSerializer(256);
out.writeLong(checkpoint.getCheckpointId());
+ out.writeInt(checkpoint.getNumberOfSubtasks());
SimpleVersionedSerialization.writeVersionAndSerializeList(
new SubtaskSimpleVersionedSerializer(),
new ArrayList<>(checkpoint.getSubtaskCommittableManagers()),
@@ -155,6 +157,7 @@ public CheckpointCommittableManagerImpl deserialize(int version, byte[] s
DataInputDeserializer in = new DataInputDeserializer(serialized);
long checkpointId = in.readLong();
+ int numberOfSubtasks = version == 0 ? owningNumberOfSubtasks : in.readInt();
List> subtaskCommittableManagers =
SimpleVersionedSerialization.readVersionAndDeserializeList(
@@ -165,28 +168,16 @@ public CheckpointCommittableManagerImpl deserialize(int version, byte[] s
for (SubtaskCommittableManager subtaskCommittableManager :
subtaskCommittableManagers) {
-
- // check if we already have manager for current
- // subtaskCommittableManager.getSubtaskId() if yes,
- // then merge them.
- SubtaskCommittableManager mergedManager =
- subtasksCommittableManagers.computeIfPresent(
- subtaskId,
- (key, manager) -> manager.merge(subtaskCommittableManager));
-
- // This is new subtaskId, lets add the mapping.
- if (mergedManager == null) {
- subtasksCommittableManagers.put(
- subtaskCommittableManager.getSubtaskId(), subtaskCommittableManager);
- }
+ // merge in case we already have a manager for that subtaskId
+ // merging is only necessary for recovering Flink 1 unaligned checkpoints
+ subtasksCommittableManagers.merge(
+ subtaskCommittableManager.getSubtaskId(),
+ subtaskCommittableManager,
+ SubtaskCommittableManager::merge);
}
return new CheckpointCommittableManagerImpl<>(
- subtasksCommittableManagers,
- subtaskId,
- numberOfSubtasks,
- checkpointId,
- metricGroup);
+ subtasksCommittableManagers, numberOfSubtasks, checkpointId, metricGroup);
}
}
@@ -216,12 +207,13 @@ public SubtaskSimpleVersionedSerializer() {
@Override
public int getVersion() {
- return 0;
+ return 1;
}
@Override
public byte[] serialize(SubtaskCommittableManager subtask) throws IOException {
DataOutputSerializer out = new DataOutputSerializer(256);
+ out.writeInt(subtask.getSubtaskId());
SimpleVersionedSerialization.writeVersionAndSerializeList(
new RequestSimpleVersionedSerializer(),
new ArrayList<>(subtask.getRequests()),
@@ -236,6 +228,8 @@ public byte[] serialize(SubtaskCommittableManager subtask) throws IOExcep
public SubtaskCommittableManager deserialize(int version, byte[] serialized)
throws IOException {
DataInputDeserializer in = new DataInputDeserializer(serialized);
+ // Version 0 didn't store the subtaskId, so use default value.
+ int subtaskId = version == 0 ? owningSubtaskId : in.readInt();
List> requests =
SimpleVersionedSerialization.readVersionAndDeserializeList(
new RequestSimpleVersionedSerializer(), in);
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableManager.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableManager.java
deleted file mode 100644
index f6f176d67483d..0000000000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableManager.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.operators.sink.committables;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.connector.sink2.Committer;
-import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
-import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
-
-import java.io.IOException;
-import java.util.Collection;
-
-/**
- * Internal wrapper to handle the committing of committables.
- *
- * @param type of the committable
- */
-@Internal
-public interface CommittableManager {
- /** Returns a summary of the current commit progress. */
- CommittableSummary getSummary();
-
- /**
- * Commits all due committables if all respective committables of the specific subtask and
- * checkpoint have been received.
- *
- * @param committer used to commit to the external system
- * @return successfully committed committables with meta information
- * @throws IOException
- * @throws InterruptedException
- */
- Collection> commit(Committer committer)
- throws IOException, InterruptedException;
-}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/SubtaskCommittableManager.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/SubtaskCommittableManager.java
index c647c8c5d9a7e..3128a2d083c7f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/SubtaskCommittableManager.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/SubtaskCommittableManager.java
@@ -22,6 +22,8 @@
import org.apache.flink.metrics.groups.SinkCommitterMetricGroup;
import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
+import org.apache.flink.shaded.guava31.com.google.common.collect.Iterables;
+
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
@@ -29,14 +31,16 @@
import java.util.Deque;
import java.util.Iterator;
import java.util.List;
+import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import static org.apache.flink.streaming.runtime.operators.sink.committables.CommitRequestState.COMMITTED;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;
-/** Manages the committables coming from one subtask. */
+/** Manages the committables coming from one upstream subtask. */
class SubtaskCommittableManager {
private final Deque> requests;
private int numExpectedCommittables;
@@ -135,6 +139,12 @@ Stream> getPendingRequests() {
return requests.stream().filter(c -> !c.isFinished());
}
+ Stream getSuccessfulCommittables() {
+ return getRequests().stream()
+ .filter(c -> c.getState() == COMMITTED)
+ .map(CommitRequestImpl::getCommittable);
+ }
+
/**
* Iterates through all currently registered {@link #requests} and returns all {@link
* CommittableWithLineage} that could be successfully committed.
@@ -181,7 +191,7 @@ long getCheckpointId() {
return checkpointId;
}
- Deque> getRequests() {
+ Collection> getRequests() {
return requests;
}
@@ -205,6 +215,29 @@ SubtaskCommittableManager copy() {
metricGroup);
}
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ SubtaskCommittableManager> that = (SubtaskCommittableManager>) o;
+ return numExpectedCommittables == that.numExpectedCommittables
+ && checkpointId == that.checkpointId
+ && subtaskId == that.subtaskId
+ && numDrained == that.numDrained
+ && numFailed == that.numFailed
+ && Iterables.elementsEqual(requests, that.requests);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(
+ requests, numExpectedCommittables, checkpointId, subtaskId, numDrained, numFailed);
+ }
+
@Override
public String toString() {
return "SubtaskCommittableManager{"
@@ -220,8 +253,6 @@ public String toString() {
+ numDrained
+ ", numFailed="
+ numFailed
- + ", metricGroup="
- + metricGroup
+ '}';
}
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/GlobalCommitterTransformationTranslator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/GlobalCommitterTransformationTranslator.java
new file mode 100644
index 0000000000000..d46e6a295107a
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/GlobalCommitterTransformationTranslator.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.translators;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
+import org.apache.flink.streaming.api.connector.sink2.GlobalCommitterOperator;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.graph.TransformationTranslator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
+import org.apache.flink.streaming.api.transformations.GlobalCommitterTransform;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.streaming.api.transformations.PhysicalTransformation;
+import org.apache.flink.streaming.runtime.operators.sink.CommitterOperatorFactory;
+import org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperatorFactory;
+
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Queue;
+import java.util.Set;
+
+import static org.apache.flink.streaming.api.connector.sink2.StandardSinkTopologies.GLOBAL_COMMITTER_TRANSFORMATION_NAME;
+
+/**
+ * A {@link TransformationTranslator} for the {@link GlobalCommitterOperator}. The main purpose is
+ * to detect whether we set {@link GlobalCommitterOperator#commitOnInput} or not.
+ */
+@Internal
+public class GlobalCommitterTransformationTranslator
+ implements TransformationTranslator> {
+
+ @Override
+ public Collection translateForBatch(
+ GlobalCommitterTransform transformation, Context context) {
+ return translateInternal(transformation, true);
+ }
+
+ @Override
+ public Collection translateForStreaming(
+ GlobalCommitterTransform transformation, Context context) {
+ return translateInternal(transformation, false);
+ }
+
+ private Collection translateInternal(
+ GlobalCommitterTransform globalCommitterTransform, boolean batch) {
+ DataStream> inputStream =
+ globalCommitterTransform.getInputStream();
+ boolean checkpointingEnabled =
+ inputStream
+ .getExecutionEnvironment()
+ .getCheckpointConfig()
+ .isCheckpointingEnabled();
+ boolean commitOnInput = batch || !checkpointingEnabled || hasUpstreamCommitter(inputStream);
+
+ // Create a global shuffle and add the global committer with parallelism 1.
+ final PhysicalTransformation transformation =
+ (PhysicalTransformation)
+ inputStream
+ .global()
+ .transform(
+ GLOBAL_COMMITTER_TRANSFORMATION_NAME,
+ Types.VOID,
+ new GlobalCommitterOperator<>(
+ globalCommitterTransform.getCommitterFactory(),
+ globalCommitterTransform.getCommittableSerializer(),
+ commitOnInput))
+ .getTransformation();
+ transformation.setChainingStrategy(ChainingStrategy.ALWAYS);
+ transformation.setName(GLOBAL_COMMITTER_TRANSFORMATION_NAME);
+ transformation.setParallelism(1);
+ transformation.setMaxParallelism(1);
+ return Collections.emptyList();
+ }
+
+ /**
+ * Looks for a committer in the pipeline and aborts on writer. The GlobalCommitter behaves
+ * differently if there is a committer after the writer.
+ */
+ private static boolean hasUpstreamCommitter(DataStream> ds) {
+ Transformation> dsTransformation = ds.getTransformation();
+
+ Set seenIds = new HashSet<>();
+ Queue> pendingsTransformations =
+ new ArrayDeque<>(Collections.singleton(dsTransformation));
+ while (!pendingsTransformations.isEmpty()) {
+ Transformation> transformation = pendingsTransformations.poll();
+ if (transformation instanceof OneInputTransformation) {
+ StreamOperatorFactory> operatorFactory =
+ ((OneInputTransformation, ?>) transformation).getOperatorFactory();
+ if (operatorFactory instanceof CommitterOperatorFactory) {
+ return true;
+ }
+ if (operatorFactory instanceof SinkWriterOperatorFactory) {
+ // don't look at the inputs of the writer
+ continue;
+ }
+ }
+ for (Transformation> input : transformation.getInputs()) {
+ if (seenIds.add(input.getId())) {
+ pendingsTransformations.add(input);
+ }
+ }
+ }
+
+ return false;
+ }
+}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/connector/sink2/CommittableSummaryAssert.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/connector/sink2/CommittableSummaryAssert.java
index ed6ea6440ee21..14416da2d99a1 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/connector/sink2/CommittableSummaryAssert.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/connector/sink2/CommittableSummaryAssert.java
@@ -21,34 +21,34 @@
import org.assertj.core.api.AbstractObjectAssert;
/** Custom assertions for {@link CommittableSummary}. */
-public class CommittableSummaryAssert
- extends AbstractObjectAssert> {
+public class CommittableSummaryAssert
+ extends AbstractObjectAssert, CommittableSummary> {
- public CommittableSummaryAssert(CommittableSummary> summary) {
+ public CommittableSummaryAssert(CommittableSummary summary) {
super(summary, CommittableSummaryAssert.class);
}
- public CommittableSummaryAssert hasSubtaskId(int subtaskId) {
+ public CommittableSummaryAssert hasSubtaskId(int subtaskId) {
return returns(subtaskId, CommittableSummary::getSubtaskId);
}
- public CommittableSummaryAssert hasNumberOfSubtasks(int numberOfSubtasks) {
+ public CommittableSummaryAssert hasNumberOfSubtasks(int numberOfSubtasks) {
return returns(numberOfSubtasks, CommittableSummary::getNumberOfSubtasks);
}
- public CommittableSummaryAssert hasOverallCommittables(int committableNumber) {
+ public CommittableSummaryAssert hasOverallCommittables(int committableNumber) {
return returns(committableNumber, CommittableSummary::getNumberOfCommittables);
}
- public CommittableSummaryAssert hasPendingCommittables(int committableNumber) {
+ public CommittableSummaryAssert hasPendingCommittables(int committableNumber) {
return returns(committableNumber, CommittableSummary::getNumberOfPendingCommittables);
}
- public CommittableSummaryAssert hasFailedCommittables(int committableNumber) {
+ public CommittableSummaryAssert hasFailedCommittables(int committableNumber) {
return returns(committableNumber, CommittableSummary::getNumberOfFailedCommittables);
}
- public CommittableSummaryAssert hasCheckpointId(long checkpointId) {
+ public CommittableSummaryAssert hasCheckpointId(long checkpointId) {
return returns(checkpointId, CommittableSummary::getCheckpointIdOrEOI);
}
}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/connector/sink2/CommittableWithLinageAssert.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/connector/sink2/CommittableWithLineageAssert.java
similarity index 70%
rename from flink-streaming-java/src/test/java/org/apache/flink/streaming/api/connector/sink2/CommittableWithLinageAssert.java
rename to flink-streaming-java/src/test/java/org/apache/flink/streaming/api/connector/sink2/CommittableWithLineageAssert.java
index 853fe6235d7ad..1e0dcc1a81147 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/connector/sink2/CommittableWithLinageAssert.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/connector/sink2/CommittableWithLineageAssert.java
@@ -24,22 +24,23 @@
* Custom assertions for {@link
* org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage}.
*/
-public class CommittableWithLinageAssert
- extends AbstractObjectAssert> {
+public class CommittableWithLineageAssert
+ extends AbstractObjectAssert<
+ CommittableWithLineageAssert, CommittableWithLineage>> {
- public CommittableWithLinageAssert(CommittableWithLineage> summary) {
- super(summary, CommittableWithLinageAssert.class);
+ public CommittableWithLineageAssert(CommittableWithLineage> summary) {
+ super(summary, CommittableWithLineageAssert.class);
}
- public CommittableWithLinageAssert hasCommittable(Object committable) {
+ public CommittableWithLineageAssert hasCommittable(CommT committable) {
return returns(committable, CommittableWithLineage::getCommittable);
}
- public CommittableWithLinageAssert hasCheckpointId(long checkpointId) {
+ public CommittableWithLineageAssert hasCheckpointId(long checkpointId) {
return returns(checkpointId, CommittableWithLineage::getCheckpointIdOrEOI);
}
- public CommittableWithLinageAssert hasSubtaskId(int subtaskId) {
+ public CommittableWithLineageAssert hasSubtaskId(int subtaskId) {
return returns(subtaskId, CommittableWithLineage::getSubtaskId);
}
}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterOperatorTest.java
index dc45e939b1c78..641a651e2e406 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterOperatorTest.java
@@ -24,6 +24,8 @@
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import java.io.IOException;
import java.util.ArrayList;
@@ -35,66 +37,113 @@
class GlobalCommitterOperatorTest {
- @Test
- void testWaitForCommittablesOfLatestCheckpointBeforeCommitting() throws Exception {
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void testWaitForCommittablesOfLatestCheckpointBeforeCommitting(boolean commitOnInput)
+ throws Exception {
final MockCommitter committer = new MockCommitter();
- final OneInputStreamOperatorTestHarness, Void> testHarness =
- createTestHarness(committer);
- testHarness.open();
+ try (OneInputStreamOperatorTestHarness, Void> testHarness =
+ createTestHarness(committer, commitOnInput)) {
+ testHarness.open();
+
+ long cid = 1L;
+ testHarness.processElement(
+ new StreamRecord<>(new CommittableSummary<>(1, 1, cid, 2, 0, 0)));
+
+ testHarness.processElement(new StreamRecord<>(new CommittableWithLineage<>(1, cid, 1)));
+
+ testHarness.notifyOfCompletedCheckpoint(cid);
+
+ assertThat(testHarness.getOutput()).isEmpty();
+ // Not committed because incomplete
+ assertThat(committer.committed).isEmpty();
+
+ // immediately commit on receiving the second committable iff commitOnInput is true
+ testHarness.processElement(new StreamRecord<>(new CommittableWithLineage<>(2, cid, 1)));
+ if (commitOnInput) {
+ assertThat(committer.committed).containsExactly(1, 2);
+ } else {
+ // 3PC behavior
+ assertThat(committer.committed).isEmpty();
+ testHarness.notifyOfCompletedCheckpoint(cid + 1);
+ assertThat(committer.committed).containsExactly(1, 2);
+ }
+
+ assertThat(testHarness.getOutput()).isEmpty();
+ }
+ }
- testHarness.processElement(new StreamRecord<>(new CommittableSummary<>(1, 1, 1L, 2, 0, 0)));
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void testWaitForNotifyCheckpointCompleted(boolean commitOnInput) throws Exception {
+ final MockCommitter committer = new MockCommitter();
+ try (OneInputStreamOperatorTestHarness, Void> testHarness =
+ createTestHarness(committer, commitOnInput)) {
+ testHarness.open();
- testHarness.processElement(new StreamRecord<>(new CommittableWithLineage<>(1, 1L, 1)));
+ long cid = 1L;
+ testHarness.processElement(
+ new StreamRecord<>(new CommittableSummary<>(1, 1, cid, 2, 0, 0)));
- testHarness.notifyOfCompletedCheckpoint(1);
+ testHarness.processElement(new StreamRecord<>(new CommittableWithLineage<>(1, cid, 1)));
- assertThat(testHarness.getOutput()).isEmpty();
- // Not committed because incomplete
- assertThat(committer.committed).isEmpty();
+ assertThat(testHarness.getOutput()).isEmpty();
+ // Not committed because incomplete
+ assertThat(committer.committed).isEmpty();
- testHarness.processElement(new StreamRecord<>(new CommittableWithLineage<>(2, 1L, 1)));
+ // immediately commit on receiving the second committable iff commitOnInput is true
+ testHarness.processElement(new StreamRecord<>(new CommittableWithLineage<>(2, cid, 1)));
+ if (commitOnInput) {
+ assertThat(committer.committed).containsExactly(1, 2);
+ } else {
+ assertThat(committer.committed).isEmpty();
+ }
- testHarness.notifyOfCompletedCheckpoint(2);
+ // for commitOnInput = false, the committer waits for notifyCheckpointComplete
+ testHarness.notifyOfCompletedCheckpoint(cid);
- assertThat(testHarness.getOutput()).isEmpty();
- assertThat(committer.committed).containsExactly(1, 2);
- testHarness.close();
+ assertThat(committer.committed).containsExactly(1, 2);
+
+ assertThat(testHarness.getOutput()).isEmpty();
+ }
}
@Test
void testStateRestore() throws Exception {
final MockCommitter committer = new MockCommitter();
- final OneInputStreamOperatorTestHarness, Void> testHarness =
- createTestHarness(committer);
- testHarness.open();
-
- final CommittableSummary committableSummary =
- new CommittableSummary<>(1, 1, 0L, 1, 1, 0);
- testHarness.processElement(new StreamRecord<>(committableSummary));
- final CommittableWithLineage first = new CommittableWithLineage<>(1, 0L, 1);
- testHarness.processElement(new StreamRecord<>(first));
-
- final OperatorSubtaskState snapshot = testHarness.snapshot(0L, 2L);
- assertThat(testHarness.getOutput()).isEmpty();
- testHarness.close();
- assertThat(committer.committed).isEmpty();
-
- final OneInputStreamOperatorTestHarness, Void> restored =
- createTestHarness(committer);
-
- restored.initializeState(snapshot);
- restored.open();
-
- assertThat(testHarness.getOutput()).isEmpty();
- assertThat(committer.committed).containsExactly(1);
- restored.close();
+ try (OneInputStreamOperatorTestHarness, Void> testHarness =
+ createTestHarness(committer, false)) {
+ testHarness.open();
+
+ final CommittableSummary committableSummary =
+ new CommittableSummary<>(1, 1, 0L, 1, 1, 0);
+ testHarness.processElement(new StreamRecord<>(committableSummary));
+ final CommittableWithLineage first = new CommittableWithLineage<>(1, 0L, 1);
+ testHarness.processElement(new StreamRecord<>(first));
+
+ final OperatorSubtaskState snapshot = testHarness.snapshot(0L, 2L);
+ assertThat(testHarness.getOutput()).isEmpty();
+ testHarness.close();
+ assertThat(committer.committed).isEmpty();
+
+ try (OneInputStreamOperatorTestHarness, Void> restored =
+ createTestHarness(committer, true)) {
+
+ restored.initializeState(snapshot);
+ restored.open();
+
+ assertThat(testHarness.getOutput()).isEmpty();
+ assertThat(committer.committed).containsExactly(1);
+ }
+ }
}
- @Test
- void testCommitAllCommittablesOnEndOfInput() throws Exception {
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void testCommitAllCommittablesOnFinalCheckpoint(boolean commitOnInput) throws Exception {
final MockCommitter committer = new MockCommitter();
final OneInputStreamOperatorTestHarness, Void> testHarness =
- createTestHarness(committer);
+ createTestHarness(committer, commitOnInput);
testHarness.open();
final CommittableSummary committableSummary =
@@ -109,16 +158,23 @@ void testCommitAllCommittablesOnEndOfInput() throws Exception {
final CommittableWithLineage second = new CommittableWithLineage<>(2, EOI, 2);
testHarness.processElement(new StreamRecord<>(second));
- testHarness.endInput();
+ // commitOnInput implies that the global committer is not using notifyCheckpointComplete
+ if (commitOnInput) {
+ assertThat(committer.committed).containsExactly(1, 2);
+ } else {
+ assertThat(committer.committed).isEmpty();
+ testHarness.notifyOfCompletedCheckpoint(EOI);
+ assertThat(committer.committed).containsExactly(1, 2);
+ }
assertThat(testHarness.getOutput()).isEmpty();
- assertThat(committer.committed).containsExactly(1, 2);
}
private OneInputStreamOperatorTestHarness, Void> createTestHarness(
- Committer committer) throws Exception {
+ Committer committer, boolean commitOnInput) throws Exception {
return new OneInputStreamOperatorTestHarness<>(
- new GlobalCommitterOperator<>(() -> committer, IntegerSerializer::new));
+ new GlobalCommitterOperator<>(
+ () -> committer, IntegerSerializer::new, commitOnInput));
}
private static class MockCommitter implements Committer {
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterSerializerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterSerializerTest.java
index 0df8f1674e16b..12a5644fe718f 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterSerializerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterSerializerTest.java
@@ -23,6 +23,7 @@
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.metrics.groups.SinkCommitterMetricGroup;
import org.apache.flink.runtime.metrics.groups.MetricsGroupTestUtils;
+import org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManager;
import org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollector;
import org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollectorSerializer;
import org.apache.flink.streaming.runtime.operators.sink.committables.SinkV1CommittableDeserializer;
@@ -49,11 +50,7 @@ class GlobalCommitterSerializerTest {
new IntegerSerializer(), SUBTASK_ID, NUMBER_OF_SUBTASKS, METRIC_GROUP);
private static final GlobalCommitterSerializer SERIALIZER =
new GlobalCommitterSerializer<>(
- COMMITTABLE_COLLECTOR_SERIALIZER,
- new StringSerializer(),
- SUBTASK_ID,
- NUMBER_OF_SUBTASKS,
- METRIC_GROUP);
+ COMMITTABLE_COLLECTOR_SERIALIZER, new StringSerializer(), METRIC_GROUP);
@ParameterizedTest
@ValueSource(booleans = {true, false})
@@ -62,11 +59,8 @@ void testSerDe(boolean withSinkV1State) throws IOException {
new GlobalCommitterSerializer<>(
COMMITTABLE_COLLECTOR_SERIALIZER,
withSinkV1State ? new StringSerializer() : null,
- SUBTASK_ID,
- NUMBER_OF_SUBTASKS,
METRIC_GROUP);
- final CommittableCollector collector =
- new CommittableCollector<>(SUBTASK_ID, NUMBER_OF_SUBTASKS, METRIC_GROUP);
+ final CommittableCollector collector = new CommittableCollector<>(METRIC_GROUP);
collector.addMessage(new CommittableSummary<>(2, 3, 1L, 1, 1, 0));
collector.addMessage(new CommittableWithLineage<>(1, 1L, 2));
final List v1State =
@@ -76,10 +70,11 @@ void testSerDe(boolean withSinkV1State) throws IOException {
final GlobalCommittableWrapper copy =
serializer.deserialize(2, serializer.serialize(wrapper));
assertThat(copy.getGlobalCommittables()).containsExactlyInAnyOrderElementsOf(v1State);
- assertThat(collector.getNumberOfSubtasks()).isEqualTo(1);
- assertThat(collector.isFinished()).isFalse();
- assertThat(collector.getSubtaskId()).isEqualTo(0);
- assertThat(collector.getCheckpointCommittablesUpTo(2)).hasSize(1);
+ assertThat(collector).returns(false, CommittableCollector::isFinished);
+ assertThat(collector.getCheckpointCommittablesUpTo(2))
+ .singleElement()
+ .returns(1L, CheckpointCommittableManager::getCheckpointId)
+ .returns(3, CheckpointCommittableManager::getNumberOfSubtasks);
}
@Test
@@ -101,8 +96,6 @@ void testDeserializationV1() throws IOException {
assertThat(wrapper.getGlobalCommittables()).containsExactlyInAnyOrder(state1, state2);
final CommittableCollector collector = wrapper.getCommittableCollector();
- assertThat(collector.getNumberOfSubtasks()).isEqualTo(1);
- assertThat(collector.getSubtaskId()).isEqualTo(0);
assertThat(collector.getCheckpointCommittablesUpTo(Long.MAX_VALUE)).isEmpty();
}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/connector/sink2/SinkV2Assertions.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/connector/sink2/SinkV2Assertions.java
index 1ea890024ff14..2e810550319d4 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/connector/sink2/SinkV2Assertions.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/connector/sink2/SinkV2Assertions.java
@@ -18,15 +18,33 @@
package org.apache.flink.streaming.api.connector.sink2;
+import org.assertj.core.api.InstanceOfAssertFactory;
+
/** Custom assertions for Sink V2 related classes. */
public class SinkV2Assertions {
+ @SuppressWarnings({"rawtypes"})
+ public static
+ InstanceOfAssertFactory>
+ committableWithLineage() {
+ return new InstanceOfAssertFactory<>(
+ CommittableWithLineage.class, SinkV2Assertions::assertThat);
+ }
+
+ @SuppressWarnings({"rawtypes"})
+ public static
+ InstanceOfAssertFactory>
+ committableSummary() {
+ return new InstanceOfAssertFactory<>(
+ CommittableSummary.class, SinkV2Assertions::assertThat);
+ }
- public static CommittableSummaryAssert assertThat(CommittableSummary> summary) {
- return new CommittableSummaryAssert(summary);
+ public static CommittableSummaryAssert assertThat(
+ CommittableSummary summary) {
+ return new CommittableSummaryAssert<>(summary);
}
- public static CommittableWithLinageAssert assertThat(
- CommittableWithLineage> committableWithLineage) {
- return new CommittableWithLinageAssert(committableWithLineage);
+ public static CommittableWithLineageAssert assertThat(
+ CommittableWithLineage committableWithLineage) {
+ return new CommittableWithLineageAssert<>(committableWithLineage);
}
}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java
index dec6347847517..756ea0c8022f0 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java
@@ -19,26 +19,26 @@
package org.apache.flink.streaming.runtime.operators.sink;
import org.apache.flink.api.connector.sink2.SupportsCommitter;
+import org.apache.flink.configuration.SinkOptions;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
-import org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions;
-import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.assertj.core.api.AbstractThrowableAssert;
+import org.assertj.core.api.ListAssert;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
-import java.util.List;
import java.util.function.IntSupplier;
import static org.apache.flink.streaming.api.connector.sink2.CommittableMessage.EOI;
-import static org.apache.flink.streaming.runtime.operators.sink.SinkTestUtil.fromOutput;
-import static org.apache.flink.streaming.runtime.operators.sink.SinkTestUtil.toCommittableSummary;
-import static org.apache.flink.streaming.runtime.operators.sink.SinkTestUtil.toCommittableWithLinage;
+import static org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions.committableSummary;
+import static org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions.committableWithLineage;
+import static org.assertj.core.api.Assertions.as;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;
@@ -73,13 +73,14 @@ void testEmitCommittables(boolean withPostCommitTopology) throws Exception {
assertThat(sinkAndCounters.commitCounter.getAsInt()).isEqualTo(1);
if (withPostCommitTopology) {
- final List output = fromOutput(testHarness.getOutput());
- SinkV2Assertions.assertThat(toCommittableSummary(output.get(0)))
+ ListAssert