Skip to content

Commit

Permalink
HDDS-11444. Make Datanode Command metrics consistent across all comma…
Browse files Browse the repository at this point in the history
…nds (apache#7191)
  • Loading branch information
jianghuazhu authored Oct 2, 2024
1 parent d3b63c6 commit a0f0872
Show file tree
Hide file tree
Showing 16 changed files with 419 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import static org.apache.hadoop.ozone.container.common.helpers.CommandHandlerMetrics.CommandMetricsMetricsInfo.TotalRunTimeMs;
import static org.apache.hadoop.ozone.container.common.helpers.CommandHandlerMetrics.CommandMetricsMetricsInfo.QueueWaitingTaskCount;
import static org.apache.hadoop.ozone.container.common.helpers.CommandHandlerMetrics.CommandMetricsMetricsInfo.InvocationCount;
import static org.apache.hadoop.ozone.container.common.helpers.CommandHandlerMetrics.CommandMetricsMetricsInfo.AvgRunTimeMs;
import static org.apache.hadoop.ozone.container.common.helpers.CommandHandlerMetrics.CommandMetricsMetricsInfo.ThreadPoolActivePoolSize;
import static org.apache.hadoop.ozone.container.common.helpers.CommandHandlerMetrics.CommandMetricsMetricsInfo.ThreadPoolMaxPoolSize;
import static org.apache.hadoop.ozone.container.common.helpers.CommandHandlerMetrics.CommandMetricsMetricsInfo.CommandReceivedCount;
Expand All @@ -46,6 +47,7 @@ public final class CommandHandlerMetrics implements MetricsSource {
enum CommandMetricsMetricsInfo implements MetricsInfo {
Command("The type of the SCM command"),
TotalRunTimeMs("The total runtime of the command handler in milliseconds"),
AvgRunTimeMs("Average run time of the command handler in milliseconds"),
QueueWaitingTaskCount("The number of queued tasks waiting for execution"),
InvocationCount("The number of times the command handler has been invoked"),
ThreadPoolActivePoolSize("The number of active threads in the thread pool"),
Expand Down Expand Up @@ -108,6 +110,7 @@ public void getMetrics(MetricsCollector collector, boolean all) {
commandHandler.getCommandType().name());

builder.addGauge(TotalRunTimeMs, commandHandler.getTotalRunTime());
builder.addGauge(AvgRunTimeMs, commandHandler.getAverageRunTime());
builder.addGauge(QueueWaitingTaskCount, commandHandler.getQueuedCount());
builder.addGauge(InvocationCount, commandHandler.getInvocationCount());
int activePoolSize = commandHandler.getThreadPoolActivePoolSize();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.CloseContainerCommandProto;
import org.apache.hadoop.hdds.tracing.TracingUtil;
import org.apache.hadoop.metrics2.lib.MetricsRegistry;
import org.apache.hadoop.metrics2.lib.MutableRate;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.apache.hadoop.ozone.container.common.statemachine
.SCMConnectionManager;
Expand Down Expand Up @@ -58,7 +60,7 @@ public class CloseContainerCommandHandler implements CommandHandler {
private final AtomicLong invocationCount = new AtomicLong(0);
private final AtomicInteger queuedCount = new AtomicInteger(0);
private final ThreadPoolExecutor executor;
private long totalTime;
private final MutableRate opsLatencyMs;

/**
* Constructs a close container command handler.
Expand All @@ -72,6 +74,9 @@ public CloseContainerCommandHandler(
new ThreadFactoryBuilder()
.setNameFormat(threadNamePrefix + "CloseContainerThread-%d")
.build());
MetricsRegistry registry = new MetricsRegistry(
CloseContainerCommandHandler.class.getSimpleName());
this.opsLatencyMs = registry.newRate(SCMCommandProto.Type.closeContainerCommand + "Ms");
}

/**
Expand Down Expand Up @@ -155,7 +160,7 @@ public void handle(SCMCommand command, OzoneContainer ozoneContainer,
LOG.error("Can't close container #{}", containerId, e);
} finally {
long endTime = Time.monotonicNow();
totalTime += endTime - startTime;
this.opsLatencyMs.add(endTime - startTime);
}
}, executor).whenComplete((v, e) -> queuedCount.decrementAndGet());
}
Expand Down Expand Up @@ -204,15 +209,12 @@ public int getInvocationCount() {
*/
@Override
public long getAverageRunTime() {
if (invocationCount.get() > 0) {
return totalTime / invocationCount.get();
}
return 0;
return (long) this.opsLatencyMs.lastStat().mean();
}

@Override
public long getTotalRunTime() {
return totalTime;
return (long) this.opsLatencyMs.lastStat().total();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.apache.hadoop.hdds.ratis.RatisHelper;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.metrics2.lib.MetricsRegistry;
import org.apache.hadoop.metrics2.lib.MutableRate;
import org.apache.hadoop.ozone.container.common.statemachine
.SCMConnectionManager;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
Expand Down Expand Up @@ -60,9 +62,9 @@ public class ClosePipelineCommandHandler implements CommandHandler {

private final AtomicLong invocationCount = new AtomicLong(0);
private final AtomicInteger queuedCount = new AtomicInteger(0);
private long totalTime;
private final Executor executor;
private final BiFunction<RaftPeer, GrpcTlsConfig, RaftClient> newRaftClient;
private final MutableRate opsLatencyMs;

/**
* Constructs a closePipelineCommand handler.
Expand All @@ -80,6 +82,9 @@ public ClosePipelineCommandHandler(
Executor executor) {
this.newRaftClient = newRaftClient;
this.executor = executor;
MetricsRegistry registry = new MetricsRegistry(
ClosePipelineCommandHandler.class.getSimpleName());
this.opsLatencyMs = registry.newRate(SCMCommandProto.Type.closePipelineCommand + "Ms");
}

/**
Expand Down Expand Up @@ -155,7 +160,7 @@ public void handle(SCMCommand command, OzoneContainer ozoneContainer,
}
} finally {
long endTime = Time.monotonicNow();
totalTime += endTime - startTime;
this.opsLatencyMs.add(endTime - startTime);
}
}, executor).whenComplete((v, e) -> queuedCount.decrementAndGet());
}
Expand Down Expand Up @@ -187,15 +192,12 @@ public int getInvocationCount() {
*/
@Override
public long getAverageRunTime() {
if (invocationCount.get() > 0) {
return totalTime / invocationCount.get();
}
return 0;
return (long) this.opsLatencyMs.lastStat().mean();
}

@Override
public long getTotalRunTime() {
return totalTime;
return (long) this.opsLatencyMs.lastStat().total();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
import org.apache.hadoop.hdds.ratis.RatisHelper;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.metrics2.lib.MetricsRegistry;
import org.apache.hadoop.metrics2.lib.MutableRate;
import org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerSpi;
Expand Down Expand Up @@ -59,8 +61,8 @@ public class CreatePipelineCommandHandler implements CommandHandler {
private final AtomicInteger queuedCount = new AtomicInteger(0);
private final BiFunction<RaftPeer, GrpcTlsConfig, RaftClient> newRaftClient;

private long totalTime;
private final Executor executor;
private final MutableRate opsLatencyMs;

/**
* Constructs a createPipelineCommand handler.
Expand All @@ -75,6 +77,9 @@ public CreatePipelineCommandHandler(ConfigurationSource conf,
Executor executor) {
this.newRaftClient = newRaftClient;
this.executor = executor;
MetricsRegistry registry = new MetricsRegistry(
CreatePipelineCommandHandler.class.getSimpleName());
this.opsLatencyMs = registry.newRate(SCMCommandProto.Type.createPipelineCommand + "Ms");
}

/**
Expand Down Expand Up @@ -135,7 +140,7 @@ public void handle(SCMCommand command, OzoneContainer ozoneContainer,
}
} finally {
long endTime = Time.monotonicNow();
totalTime += endTime - startTime;
this.opsLatencyMs.add(endTime - startTime);
}
}, executor).whenComplete((v, e) -> queuedCount.decrementAndGet());
}
Expand Down Expand Up @@ -167,15 +172,12 @@ public int getInvocationCount() {
*/
@Override
public long getAverageRunTime() {
if (invocationCount.get() > 0) {
return totalTime / invocationCount.get();
}
return 0;
return (long) this.opsLatencyMs.lastStat().mean();
}

@Override
public long getTotalRunTime() {
return totalTime;
return (long) this.opsLatencyMs.lastStat().total();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.metrics2.lib.MetricsRegistry;
import org.apache.hadoop.metrics2.lib.MutableRate;
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.ozone.container.common.helpers.BlockDeletingServiceMetrics;
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfoList;
Expand Down Expand Up @@ -91,14 +93,14 @@ public class DeleteBlocksCommandHandler implements CommandHandler {
private final ContainerSet containerSet;
private final ConfigurationSource conf;
private int invocationCount;
private long totalTime;
private final ThreadPoolExecutor executor;
private final LinkedBlockingQueue<DeleteCmdInfo> deleteCommandQueues;
private final Daemon handlerThread;
private final OzoneContainer ozoneContainer;
private final BlockDeletingServiceMetrics blockDeleteMetrics;
private final long tryLockTimeoutMs;
private final Map<String, SchemaHandler> schemaHandlers;
private final MutableRate opsLatencyMs;

public DeleteBlocksCommandHandler(OzoneContainer container,
ConfigurationSource conf, DatanodeConfiguration dnConf,
Expand All @@ -121,6 +123,9 @@ public DeleteBlocksCommandHandler(OzoneContainer container,
dnConf.getBlockDeleteThreads(), threadFactory);
this.deleteCommandQueues =
new LinkedBlockingQueue<>(dnConf.getBlockDeleteQueueLimit());
MetricsRegistry registry = new MetricsRegistry(
DeleteBlocksCommandHandler.class.getSimpleName());
this.opsLatencyMs = registry.newRate(SCMCommandProto.Type.deleteBlocksCommand + "Ms");
long interval = dnConf.getBlockDeleteCommandWorkerInterval().toMillis();
handlerThread = new Daemon(new DeleteCmdWorker(interval));
handlerThread.start();
Expand Down Expand Up @@ -403,7 +408,7 @@ private void processCmd(DeleteCmdInfo cmd) {
};
updateCommandStatus(cmd.getContext(), cmd.getCmd(), statusUpdater, LOG);
long endTime = Time.monotonicNow();
totalTime += endTime - startTime;
this.opsLatencyMs.add(endTime - startTime);
invocationCount++;
}
}
Expand Down Expand Up @@ -666,15 +671,12 @@ public int getInvocationCount() {

@Override
public long getAverageRunTime() {
if (invocationCount > 0) {
return totalTime / invocationCount;
}
return 0;
return (long) this.opsLatencyMs.lastStat().mean();
}

@Override
public long getTotalRunTime() {
return totalTime;
return (long) this.opsLatencyMs.lastStat().total();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import java.util.concurrent.RejectedExecutionException;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
import org.apache.hadoop.metrics2.lib.MetricsRegistry;
import org.apache.hadoop.metrics2.lib.MutableRate;
import org.apache.hadoop.ozone.container.common.statemachine
.SCMConnectionManager;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
Expand All @@ -39,7 +41,6 @@
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

/**
* Handler to process the DeleteContainerCommand from SCM.
Expand All @@ -51,10 +52,10 @@ public class DeleteContainerCommandHandler implements CommandHandler {

private final AtomicInteger invocationCount = new AtomicInteger(0);
private final AtomicInteger timeoutCount = new AtomicInteger(0);
private final AtomicLong totalTime = new AtomicLong(0);
private final ThreadPoolExecutor executor;
private final Clock clock;
private int maxQueueSize;
private final MutableRate opsLatencyMs;

public DeleteContainerCommandHandler(
int threadPoolSize, Clock clock, int queueSize, String threadNamePrefix) {
Expand All @@ -73,6 +74,9 @@ protected DeleteContainerCommandHandler(Clock clock,
this.executor = executor;
this.clock = clock;
maxQueueSize = queueSize;
MetricsRegistry registry = new MetricsRegistry(
DeleteContainerCommandHandler.class.getSimpleName());
this.opsLatencyMs = registry.newRate(SCMCommandProto.Type.deleteContainerCommand + "Ms");
}
@Override
public void handle(final SCMCommand command,
Expand Down Expand Up @@ -124,7 +128,7 @@ private void handleInternal(SCMCommand command, StateContext context,
} catch (IOException e) {
LOG.error("Exception occurred while deleting the container.", e);
} finally {
totalTime.getAndAdd(Time.monotonicNow() - startTime);
this.opsLatencyMs.add(Time.monotonicNow() - startTime);
}
}

Expand All @@ -149,14 +153,12 @@ public int getTimeoutCount() {

@Override
public long getAverageRunTime() {
final int invocations = invocationCount.get();
return invocations == 0 ?
0 : totalTime.get() / invocations;
return (long) this.opsLatencyMs.lastStat().mean();
}

@Override
public long getTotalRunTime() {
return totalTime.get();
return (long) this.opsLatencyMs.lastStat().total();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.FinalizeNewLayoutVersionCommandProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
import org.apache.hadoop.metrics2.lib.MetricsRegistry;
import org.apache.hadoop.metrics2.lib.MutableRate;
import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
import org.apache.hadoop.ozone.container.common.statemachine
.SCMConnectionManager;
Expand All @@ -42,12 +44,15 @@ public class FinalizeNewLayoutVersionCommandHandler implements CommandHandler {
LoggerFactory.getLogger(FinalizeNewLayoutVersionCommandHandler.class);

private AtomicLong invocationCount = new AtomicLong(0);
private long totalTime;
private final MutableRate opsLatencyMs;

/**
* Constructs a FinalizeNewLayoutVersionCommandHandler.
*/
public FinalizeNewLayoutVersionCommandHandler() {
MetricsRegistry registry = new MetricsRegistry(
FinalizeNewLayoutVersionCommandHandler.class.getSimpleName());
this.opsLatencyMs = registry.newRate(SCMCommandProto.Type.finalizeNewLayoutVersionCommand + "Ms");
}

/**
Expand Down Expand Up @@ -82,7 +87,7 @@ public void handle(SCMCommand command, OzoneContainer ozoneContainer,
LOG.error("Exception during finalization.", e);
} finally {
long endTime = Time.monotonicNow();
totalTime += endTime - startTime;
this.opsLatencyMs.add(endTime - startTime);
}
}

Expand Down Expand Up @@ -113,15 +118,12 @@ public int getInvocationCount() {
*/
@Override
public long getAverageRunTime() {
if (invocationCount.get() > 0) {
return totalTime / invocationCount.get();
}
return 0;
return (long) this.opsLatencyMs.lastStat().mean();
}

@Override
public long getTotalRunTime() {
return totalTime;
return (long) this.opsLatencyMs.lastStat().total();
}

@Override
Expand Down
Loading

0 comments on commit a0f0872

Please sign in to comment.