Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -676,8 +676,8 @@ public void suspendSchedule() {

static class ResourceCommitterService extends Thread {
private final CapacityScheduler cs;
private BlockingQueue<ResourceCommitRequest<FiCaSchedulerApp, FiCaSchedulerNode>>
backlogs = new LinkedBlockingQueue<>();
private final BlockingQueue<ResourceCommitRequest<FiCaSchedulerApp,
FiCaSchedulerNode>> backlogs = new LinkedBlockingQueue<>();

public ResourceCommitterService(CapacityScheduler cs) {
this.cs = cs;
Expand All @@ -690,6 +690,7 @@ public void run() {
try {
ResourceCommitRequest<FiCaSchedulerApp, FiCaSchedulerNode> request =
backlogs.take();
CapacitySchedulerMetrics.getMetrics().decrBacklogs();
cs.writeLock.lock();
try {
cs.tryCommit(cs.getClusterResource(), request, true);
Expand All @@ -708,6 +709,7 @@ public void run() {
public void addNewCommitRequest(
ResourceCommitRequest<FiCaSchedulerApp, FiCaSchedulerNode> proposal) {
backlogs.add(proposal);
CapacitySchedulerMetrics.getMetrics().incrBacklogs();
}

public int getPendingBacklogs() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;

import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.metrics2.MetricsInfo;
Expand Down Expand Up @@ -47,6 +48,7 @@ public class CapacitySchedulerMetrics {
"Metrics for the Yarn Capacity Scheduler");

@Metric("Scheduler allocate containers") MutableRate allocate;
@Metric("Scheduler commit backlogs") MutableGaugeLong backlogs;
@Metric("Scheduler commit success") MutableRate commitSuccess;
@Metric("Scheduler commit failure") MutableRate commitFailure;
@Metric("Scheduler node update") MutableRate nodeUpdate;
Expand Down Expand Up @@ -93,6 +95,14 @@ public void addAllocate(long latency) {
this.allocate.add(latency);
}

public void incrBacklogs() {
this.backlogs.incr();
}

public void decrBacklogs() {
this.backlogs.decr();
}

public void addCommitSuccess(long latency) {
this.commitSuccess.add(latency);
}
Expand Down Expand Up @@ -128,4 +138,9 @@ public void addSchedulerNodeHBInterval(long heartbeatInterval) {
public long getNumOfSchedulerNodeHBInterval() {
return this.schedulerNodeHBInterval.getEstimator().getCount();
}

@VisibleForTesting
public long getSizeOfBacklogs() {
return backlogs.value();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ public RMNodeLabelsManager createNodeLabelManager() {
}

Assert.assertEquals(0, csMetrics.getNumOfAllocates());
Assert.assertEquals(0, csMetrics.getSizeOfBacklogs());
Assert.assertEquals(0, csMetrics.getNumOfCommitSuccess());

RMApp rmApp = MockRMAppSubmitter.submit(rm,
Expand Down