Skip to content

Commit

Permalink
[FLINK-36455] Fix PendingCommittable metric in sink
Browse files Browse the repository at this point in the history
We can only set the gauge once.

(cherry picked from commit 21c344c)
  • Loading branch information
AHeise committed Nov 15, 2024
1 parent b27a037 commit 47477d9
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,13 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
Expand Down Expand Up @@ -142,13 +143,13 @@ public boolean hasGloballyReceivedAll() {
@Override
public void commit(Committer<CommT> committer, int maxRetries)
throws IOException, InterruptedException {
Collection<CommitRequestImpl<CommT>> requests = getPendingRequests();
Collection<CommitRequestImpl<CommT>> requests =
getPendingRequests().collect(Collectors.toList());
for (int retry = 0; !requests.isEmpty() && retry <= maxRetries; retry++) {
requests.forEach(CommitRequestImpl::setSelected);
committer.commit(new ArrayList<>(requests));
committer.commit(Collections.unmodifiableCollection(requests));
requests.forEach(CommitRequestImpl::setCommittedIfNoError);
requests = requests.stream().filter(r -> !r.isFinished()).collect(Collectors.toList());
metricGroup.setCurrentPendingCommittablesGauge(requests::size);
}
if (!requests.isEmpty()) {
throw new IOException(
Expand All @@ -165,11 +166,10 @@ public Collection<CommT> getSuccessfulCommittables() {
.collect(Collectors.toList());
}

Collection<CommitRequestImpl<CommT>> getPendingRequests() {
Stream<CommitRequestImpl<CommT>> getPendingRequests() {
return subtasksCommittableManagers.values().stream()
.peek(this::assertReceivedAll)
.flatMap(SubtaskCommittableManager::getPendingRequests)
.collect(Collectors.toList());
.flatMap(SubtaskCommittableManager::getPendingRequests);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,13 @@ public CommittableCollector(SinkCommitterMetricGroup metricGroup) {
SinkCommitterMetricGroup metricGroup) {
this.checkpointCommittables = new TreeMap<>(checkNotNull(checkpointCommittables));
this.metricGroup = metricGroup;
this.metricGroup.setCurrentPendingCommittablesGauge(this::getNumPending);
}

private int getNumPending() {
return checkpointCommittables.values().stream()
.mapToInt(m -> (int) m.getPendingRequests().count())
.sum();
}

/**
Expand Down

0 comments on commit 47477d9

Please sign in to comment.