Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HDDS-11376. Improve ReplicationSupervisor to record replication metrics #7140

Merged
merged 10 commits into from
Sep 10, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,16 @@ public ECReconstructionCoordinatorTask(
debugString = reconstructionCommandInfo.toString();
}

@Override
public String getMetricName() {
return "ECReconstructions";
}

@Override
public String getMetricDescriptionSegment() {
return "EC reconstructions";
}

@Override
public void runTask() {
// Implement the coordinator logic to handle a container group
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ protected AbstractReplicationTask(long containerID,
this.term = term;
queued = Instant.now(clock);
}

protected abstract String getMetricName();

protected abstract String getMetricDescriptionSegment();

public long getContainerId() {
return containerId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Objects;
import java.util.OptionalLong;
import java.util.Set;
import java.util.Collections;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.PriorityBlockingQueue;
Expand Down Expand Up @@ -71,11 +72,17 @@ public final class ReplicationSupervisor {
private final StateContext context;
private final Clock clock;

private final AtomicLong requestCounter = new AtomicLong();
private final AtomicLong successCounter = new AtomicLong();
private final AtomicLong failureCounter = new AtomicLong();
private final AtomicLong timeoutCounter = new AtomicLong();
private final AtomicLong skippedCounter = new AtomicLong();
private final Map<String, AtomicLong> requestCounter = new ConcurrentHashMap<>();
private final Map<String, AtomicLong> successCounter = new ConcurrentHashMap<>();
private final Map<String, AtomicLong> failureCounter = new ConcurrentHashMap<>();
private final Map<String, AtomicLong> timeoutCounter = new ConcurrentHashMap<>();
private final Map<String, AtomicLong> skippedCounter = new ConcurrentHashMap<>();

private static final Map<String, String> METRICS_MAP;

static {
METRICS_MAP = new HashMap<>();
}

/**
* A set of container IDs that are currently being downloaded
Expand Down Expand Up @@ -188,6 +195,10 @@ public static Builder newBuilder() {
return new Builder();
}

public static Map<String, String> getMetricsMap() {
return Collections.unmodifiableMap(METRICS_MAP);
}

private ReplicationSupervisor(StateContext context, ExecutorService executor,
ReplicationConfig replicationConfig, DatanodeConfiguration datanodeConfig,
Clock clock, IntConsumer executorThreadUpdater) {
Expand Down Expand Up @@ -221,6 +232,19 @@ public void addTask(AbstractReplicationTask task) {
return;
}

if (requestCounter.get(task.getMetricName()) == null) {
synchronized (this) {
if (requestCounter.get(task.getMetricName()) == null) {
requestCounter.put(task.getMetricName(), new AtomicLong(0));
successCounter.put(task.getMetricName(), new AtomicLong(0));
failureCounter.put(task.getMetricName(), new AtomicLong(0));
timeoutCounter.put(task.getMetricName(), new AtomicLong(0));
skippedCounter.put(task.getMetricName(), new AtomicLong(0));
METRICS_MAP.put(task.getMetricName(), task.getMetricDescriptionSegment());
}
}
}

if (inFlight.add(task)) {
if (task.getPriority() != ReplicationCommandPriority.LOW) {
// Low priority tasks are not included in the replication queue sizes
Expand Down Expand Up @@ -330,14 +354,14 @@ public TaskRunner(AbstractReplicationTask task) {
@Override
public void run() {
try {
requestCounter.incrementAndGet();
requestCounter.get(task.getMetricName()).incrementAndGet();

final long now = clock.millis();
final long deadline = task.getDeadline();
if (deadline > 0 && now > deadline) {
LOG.info("Ignoring {} since the deadline has passed ({} < {})",
this, Instant.ofEpochMilli(deadline), Instant.ofEpochMilli(now));
timeoutCounter.incrementAndGet();
timeoutCounter.get(task.getMetricName()).incrementAndGet();
return;
}

Expand All @@ -364,18 +388,18 @@ public void run() {
task.runTask();
if (task.getStatus() == Status.FAILED) {
LOG.warn("Failed {}", this);
failureCounter.incrementAndGet();
failureCounter.get(task.getMetricName()).incrementAndGet();
} else if (task.getStatus() == Status.DONE) {
LOG.info("Successful {}", this);
successCounter.incrementAndGet();
successCounter.get(task.getMetricName()).incrementAndGet();
} else if (task.getStatus() == Status.SKIPPED) {
LOG.info("Skipped {}", this);
skippedCounter.incrementAndGet();
skippedCounter.get(task.getMetricName()).incrementAndGet();
}
} catch (Exception e) {
task.setStatus(Status.FAILED);
LOG.warn("Failed {}", this, e);
failureCounter.incrementAndGet();
failureCounter.get(task.getMetricName()).incrementAndGet();
} finally {
inFlight.remove(task);
decrementTaskCounter(task);
Expand Down Expand Up @@ -419,7 +443,12 @@ public boolean equals(Object o) {
}

public long getReplicationRequestCount() {
return requestCounter.get();
return getCount(requestCounter);
}

public long getReplicationRequestCount(String metricsName) {
AtomicLong counter = requestCounter.get(metricsName);
return counter != null ? counter.get() : 0;
}

public long getQueueSize() {
Expand All @@ -438,20 +467,48 @@ public long getMaxReplicationStreams() {
}
}

private long getCount(Map<String, AtomicLong> counter) {
long total = 0;
for (Map.Entry<String, AtomicLong> entry : counter.entrySet()) {
total += entry.getValue().get();
}
return total;
}

public long getReplicationSuccessCount() {
return successCounter.get();
return getCount(successCounter);
}

public long getReplicationSuccessCount(String metricsName) {
AtomicLong counter = successCounter.get(metricsName);
return counter != null ? counter.get() : 0;
}

public long getReplicationFailureCount() {
return failureCounter.get();
return getCount(failureCounter);
}

public long getReplicationFailureCount(String metricsName) {
AtomicLong counter = failureCounter.get(metricsName);
return counter != null ? counter.get() : 0;
}

public long getReplicationTimeoutCount() {
return timeoutCounter.get();
return getCount(timeoutCounter);
}

public long getReplicationTimeoutCount(String metricsName) {
AtomicLong counter = timeoutCounter.get(metricsName);
return counter != null ? counter.get() : 0;
}

public long getReplicationSkippedCount() {
return skippedCounter.get();
return getCount(skippedCounter);
}

public long getReplicationSkippedCount(String metricsName) {
AtomicLong counter = skippedCounter.get(metricsName);
return counter != null ? counter.get() : 0;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -71,16 +71,47 @@ public void getMetrics(MetricsCollector collector, boolean all) {
.addGauge(Interns.info("numRequestedReplications",
"Number of requested replications"),
supervisor.getReplicationRequestCount())
.addGauge(Interns.info("numSuccessReplications",
"Number of successful replications"),
supervisor.getReplicationSuccessCount())
.addGauge(Interns.info("numFailureReplications",
"Number of failure replications"),
supervisor.getReplicationFailureCount())
.addGauge(Interns.info("numTimeoutReplications",
"Number of replication requests timed out before being processed"),
supervisor.getReplicationTimeoutCount())
.addGauge(Interns.info("numSkippedReplications",
"Number of replication requests skipped as the container is "
+ "already present"), supervisor.getReplicationSkippedCount())
+ "already present"),
supervisor.getReplicationSkippedCount())
.addGauge(Interns.info("maxReplicationStreams", "Maximum number of "
+ "concurrent replication tasks which can run simultaneously"),
supervisor.getMaxReplicationStreams());

Map<String, String> metricsMap = ReplicationSupervisor.getMetricsMap();
if (!metricsMap.isEmpty()) {
metricsMap.forEach((metricsName, descriptionSegment) -> {
if (!metricsName.equals("")) {
builder.addGauge(Interns.info("numRequested" + metricsName,
"Number of requested " + descriptionSegment),
supervisor.getReplicationRequestCount(metricsName))
.addGauge(Interns.info("numSuccess" + metricsName,
"Number of successful " + descriptionSegment),
supervisor.getReplicationSuccessCount(metricsName))
.addGauge(Interns.info("numFailure" + metricsName,
"Number of failure " + descriptionSegment),
supervisor.getReplicationFailureCount(metricsName))
.addGauge(Interns.info("numTimeout" + metricsName,
"Number of " + descriptionSegment + " timed out before being processed"),
supervisor.getReplicationTimeoutCount(metricsName))
.addGauge(Interns.info("numSkipped" + metricsName,
"Number of " + descriptionSegment + " skipped as the container is "
+ "already present"),
supervisor.getReplicationSkippedCount(metricsName));
}
});
}

Map<String, Integer> tasks = supervisor.getInFlightReplicationSummary();
for (Map.Entry<String, Integer> entry : tasks.entrySet()) {
builder.addGauge(Interns.info("numInflight" + entry.getKey(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,16 @@ protected ReplicationTask(
replicator);
}

@Override
public String getMetricName() {
return "ContainerReplications";
}

@Override
public String getMetricDescriptionSegment() {
return "container replications";
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
Loading