From a0f0872fc88a6eb360c5460830e5ded21a17d8bb Mon Sep 17 00:00:00 2001 From: jianghuazhu <740087514@qq.com> Date: Thu, 3 Oct 2024 00:12:28 +0800 Subject: [PATCH] HDDS-11444. Make Datanode Command metrics consistent across all commands (#7191) --- .../common/helpers/CommandHandlerMetrics.java | 3 + .../CloseContainerCommandHandler.java | 16 +- .../ClosePipelineCommandHandler.java | 16 +- .../CreatePipelineCommandHandler.java | 16 +- .../DeleteBlocksCommandHandler.java | 16 +- .../DeleteContainerCommandHandler.java | 16 +- ...inalizeNewLayoutVersionCommandHandler.java | 16 +- ...ReconstructECContainersCommandHandler.java | 26 +++- .../RefreshVolumeUsageCommandHandler.java | 16 +- .../ReplicateContainerCommandHandler.java | 31 ++-- ...SetNodeOperationalStateCommandHandler.java | 26 ++-- .../replication/ReplicationSupervisor.java | 33 +++++ .../ReplicationSupervisorMetrics.java | 7 +- ...ReconstructECContainersCommandHandler.java | 139 ++++++++++++++++++ .../TestReplicateContainerCommandHandler.java | 118 +++++++++++++++ .../TestReplicationSupervisor.java | 10 ++ 16 files changed, 419 insertions(+), 86 deletions(-) create mode 100644 hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReconstructECContainersCommandHandler.java create mode 100644 hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReplicateContainerCommandHandler.java diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/CommandHandlerMetrics.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/CommandHandlerMetrics.java index a6e4d6258d9..e52565952a5 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/CommandHandlerMetrics.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/CommandHandlerMetrics.java @@ -34,6 +34,7 @@ import static org.apache.hadoop.ozone.container.common.helpers.CommandHandlerMetrics.CommandMetricsMetricsInfo.TotalRunTimeMs; import static org.apache.hadoop.ozone.container.common.helpers.CommandHandlerMetrics.CommandMetricsMetricsInfo.QueueWaitingTaskCount; import static org.apache.hadoop.ozone.container.common.helpers.CommandHandlerMetrics.CommandMetricsMetricsInfo.InvocationCount; +import static org.apache.hadoop.ozone.container.common.helpers.CommandHandlerMetrics.CommandMetricsMetricsInfo.AvgRunTimeMs; import static org.apache.hadoop.ozone.container.common.helpers.CommandHandlerMetrics.CommandMetricsMetricsInfo.ThreadPoolActivePoolSize; import static org.apache.hadoop.ozone.container.common.helpers.CommandHandlerMetrics.CommandMetricsMetricsInfo.ThreadPoolMaxPoolSize; import static org.apache.hadoop.ozone.container.common.helpers.CommandHandlerMetrics.CommandMetricsMetricsInfo.CommandReceivedCount; @@ -46,6 +47,7 @@ public final class CommandHandlerMetrics implements MetricsSource { enum CommandMetricsMetricsInfo implements MetricsInfo { Command("The type of the SCM command"), TotalRunTimeMs("The total runtime of the command handler in milliseconds"), + AvgRunTimeMs("Average run time of the command handler in milliseconds"), QueueWaitingTaskCount("The number of queued tasks waiting for execution"), InvocationCount("The number of times the command handler has been invoked"), ThreadPoolActivePoolSize("The number of active threads in the thread pool"), @@ -108,6 +110,7 @@ public void getMetrics(MetricsCollector collector, boolean all) { commandHandler.getCommandType().name()); builder.addGauge(TotalRunTimeMs, commandHandler.getTotalRunTime()); + builder.addGauge(AvgRunTimeMs, commandHandler.getAverageRunTime()); builder.addGauge(QueueWaitingTaskCount, commandHandler.getQueuedCount()); builder.addGauge(InvocationCount, commandHandler.getInvocationCount()); int activePoolSize = commandHandler.getThreadPoolActivePoolSize(); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java index bc703ac6a55..cd032d4b275 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java @@ -31,6 +31,8 @@ import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.CloseContainerCommandProto; import org.apache.hadoop.hdds.tracing.TracingUtil; +import org.apache.hadoop.metrics2.lib.MetricsRegistry; +import org.apache.hadoop.metrics2.lib.MutableRate; import org.apache.hadoop.ozone.container.common.interfaces.Container; import org.apache.hadoop.ozone.container.common.statemachine .SCMConnectionManager; @@ -58,7 +60,7 @@ public class CloseContainerCommandHandler implements CommandHandler { private final AtomicLong invocationCount = new AtomicLong(0); private final AtomicInteger queuedCount = new AtomicInteger(0); private final ThreadPoolExecutor executor; - private long totalTime; + private final MutableRate opsLatencyMs; /** * Constructs a close container command handler. @@ -72,6 +74,9 @@ public CloseContainerCommandHandler( new ThreadFactoryBuilder() .setNameFormat(threadNamePrefix + "CloseContainerThread-%d") .build()); + MetricsRegistry registry = new MetricsRegistry( + CloseContainerCommandHandler.class.getSimpleName()); + this.opsLatencyMs = registry.newRate(SCMCommandProto.Type.closeContainerCommand + "Ms"); } /** @@ -155,7 +160,7 @@ public void handle(SCMCommand command, OzoneContainer ozoneContainer, LOG.error("Can't close container #{}", containerId, e); } finally { long endTime = Time.monotonicNow(); - totalTime += endTime - startTime; + this.opsLatencyMs.add(endTime - startTime); } }, executor).whenComplete((v, e) -> queuedCount.decrementAndGet()); } @@ -204,15 +209,12 @@ public int getInvocationCount() { */ @Override public long getAverageRunTime() { - if (invocationCount.get() > 0) { - return totalTime / invocationCount.get(); - } - return 0; + return (long) this.opsLatencyMs.lastStat().mean(); } @Override public long getTotalRunTime() { - return totalTime; + return (long) this.opsLatencyMs.lastStat().total(); } @Override diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java index 241abb6f4ae..be39277fdfa 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java @@ -24,6 +24,8 @@ import org.apache.hadoop.hdds.ratis.RatisHelper; import org.apache.hadoop.hdds.scm.client.HddsClientUtils; import org.apache.hadoop.hdds.scm.pipeline.PipelineID; +import org.apache.hadoop.metrics2.lib.MetricsRegistry; +import org.apache.hadoop.metrics2.lib.MutableRate; import org.apache.hadoop.ozone.container.common.statemachine .SCMConnectionManager; import org.apache.hadoop.ozone.container.common.statemachine.StateContext; @@ -60,9 +62,9 @@ public class ClosePipelineCommandHandler implements CommandHandler { private final AtomicLong invocationCount = new AtomicLong(0); private final AtomicInteger queuedCount = new AtomicInteger(0); - private long totalTime; private final Executor executor; private final BiFunction newRaftClient; + private final MutableRate opsLatencyMs; /** * Constructs a closePipelineCommand handler. @@ -80,6 +82,9 @@ public ClosePipelineCommandHandler( Executor executor) { this.newRaftClient = newRaftClient; this.executor = executor; + MetricsRegistry registry = new MetricsRegistry( + ClosePipelineCommandHandler.class.getSimpleName()); + this.opsLatencyMs = registry.newRate(SCMCommandProto.Type.closePipelineCommand + "Ms"); } /** @@ -155,7 +160,7 @@ public void handle(SCMCommand command, OzoneContainer ozoneContainer, } } finally { long endTime = Time.monotonicNow(); - totalTime += endTime - startTime; + this.opsLatencyMs.add(endTime - startTime); } }, executor).whenComplete((v, e) -> queuedCount.decrementAndGet()); } @@ -187,15 +192,12 @@ public int getInvocationCount() { */ @Override public long getAverageRunTime() { - if (invocationCount.get() > 0) { - return totalTime / invocationCount.get(); - } - return 0; + return (long) this.opsLatencyMs.lastStat().mean(); } @Override public long getTotalRunTime() { - return totalTime; + return (long) this.opsLatencyMs.lastStat().total(); } @Override diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CreatePipelineCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CreatePipelineCommandHandler.java index 4a36a1987de..62fc8a919d8 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CreatePipelineCommandHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CreatePipelineCommandHandler.java @@ -30,6 +30,8 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto; import org.apache.hadoop.hdds.ratis.RatisHelper; import org.apache.hadoop.hdds.scm.pipeline.PipelineID; +import org.apache.hadoop.metrics2.lib.MetricsRegistry; +import org.apache.hadoop.metrics2.lib.MutableRate; import org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager; import org.apache.hadoop.ozone.container.common.statemachine.StateContext; import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerSpi; @@ -59,8 +61,8 @@ public class CreatePipelineCommandHandler implements CommandHandler { private final AtomicInteger queuedCount = new AtomicInteger(0); private final BiFunction newRaftClient; - private long totalTime; private final Executor executor; + private final MutableRate opsLatencyMs; /** * Constructs a createPipelineCommand handler. @@ -75,6 +77,9 @@ public CreatePipelineCommandHandler(ConfigurationSource conf, Executor executor) { this.newRaftClient = newRaftClient; this.executor = executor; + MetricsRegistry registry = new MetricsRegistry( + CreatePipelineCommandHandler.class.getSimpleName()); + this.opsLatencyMs = registry.newRate(SCMCommandProto.Type.createPipelineCommand + "Ms"); } /** @@ -135,7 +140,7 @@ public void handle(SCMCommand command, OzoneContainer ozoneContainer, } } finally { long endTime = Time.monotonicNow(); - totalTime += endTime - startTime; + this.opsLatencyMs.add(endTime - startTime); } }, executor).whenComplete((v, e) -> queuedCount.decrementAndGet()); } @@ -167,15 +172,12 @@ public int getInvocationCount() { */ @Override public long getAverageRunTime() { - if (invocationCount.get() > 0) { - return totalTime / invocationCount.get(); - } - return 0; + return (long) this.opsLatencyMs.lastStat().mean(); } @Override public long getTotalRunTime() { - return totalTime; + return (long) this.opsLatencyMs.lastStat().total(); } @Override diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java index bd7431c6145..1a630f8f0be 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java @@ -32,6 +32,8 @@ import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction; import org.apache.hadoop.hdds.utils.db.Table; +import org.apache.hadoop.metrics2.lib.MetricsRegistry; +import org.apache.hadoop.metrics2.lib.MutableRate; import org.apache.hadoop.ozone.container.common.helpers.BlockData; import org.apache.hadoop.ozone.container.common.helpers.BlockDeletingServiceMetrics; import org.apache.hadoop.ozone.container.common.helpers.ChunkInfoList; @@ -91,7 +93,6 @@ public class DeleteBlocksCommandHandler implements CommandHandler { private final ContainerSet containerSet; private final ConfigurationSource conf; private int invocationCount; - private long totalTime; private final ThreadPoolExecutor executor; private final LinkedBlockingQueue deleteCommandQueues; private final Daemon handlerThread; @@ -99,6 +100,7 @@ public class DeleteBlocksCommandHandler implements CommandHandler { private final BlockDeletingServiceMetrics blockDeleteMetrics; private final long tryLockTimeoutMs; private final Map schemaHandlers; + private final MutableRate opsLatencyMs; public DeleteBlocksCommandHandler(OzoneContainer container, ConfigurationSource conf, DatanodeConfiguration dnConf, @@ -121,6 +123,9 @@ public DeleteBlocksCommandHandler(OzoneContainer container, dnConf.getBlockDeleteThreads(), threadFactory); this.deleteCommandQueues = new LinkedBlockingQueue<>(dnConf.getBlockDeleteQueueLimit()); + MetricsRegistry registry = new MetricsRegistry( + DeleteBlocksCommandHandler.class.getSimpleName()); + this.opsLatencyMs = registry.newRate(SCMCommandProto.Type.deleteBlocksCommand + "Ms"); long interval = dnConf.getBlockDeleteCommandWorkerInterval().toMillis(); handlerThread = new Daemon(new DeleteCmdWorker(interval)); handlerThread.start(); @@ -403,7 +408,7 @@ private void processCmd(DeleteCmdInfo cmd) { }; updateCommandStatus(cmd.getContext(), cmd.getCmd(), statusUpdater, LOG); long endTime = Time.monotonicNow(); - totalTime += endTime - startTime; + this.opsLatencyMs.add(endTime - startTime); invocationCount++; } } @@ -666,15 +671,12 @@ public int getInvocationCount() { @Override public long getAverageRunTime() { - if (invocationCount > 0) { - return totalTime / invocationCount; - } - return 0; + return (long) this.opsLatencyMs.lastStat().mean(); } @Override public long getTotalRunTime() { - return totalTime; + return (long) this.opsLatencyMs.lastStat().total(); } @Override diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteContainerCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteContainerCommandHandler.java index b76e306e1c0..59aaacc1c80 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteContainerCommandHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteContainerCommandHandler.java @@ -22,6 +22,8 @@ import java.util.concurrent.RejectedExecutionException; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMCommandProto; +import org.apache.hadoop.metrics2.lib.MetricsRegistry; +import org.apache.hadoop.metrics2.lib.MutableRate; import org.apache.hadoop.ozone.container.common.statemachine .SCMConnectionManager; import org.apache.hadoop.ozone.container.common.statemachine.StateContext; @@ -39,7 +41,6 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; /** * Handler to process the DeleteContainerCommand from SCM. @@ -51,10 +52,10 @@ public class DeleteContainerCommandHandler implements CommandHandler { private final AtomicInteger invocationCount = new AtomicInteger(0); private final AtomicInteger timeoutCount = new AtomicInteger(0); - private final AtomicLong totalTime = new AtomicLong(0); private final ThreadPoolExecutor executor; private final Clock clock; private int maxQueueSize; + private final MutableRate opsLatencyMs; public DeleteContainerCommandHandler( int threadPoolSize, Clock clock, int queueSize, String threadNamePrefix) { @@ -73,6 +74,9 @@ protected DeleteContainerCommandHandler(Clock clock, this.executor = executor; this.clock = clock; maxQueueSize = queueSize; + MetricsRegistry registry = new MetricsRegistry( + DeleteContainerCommandHandler.class.getSimpleName()); + this.opsLatencyMs = registry.newRate(SCMCommandProto.Type.deleteContainerCommand + "Ms"); } @Override public void handle(final SCMCommand command, @@ -124,7 +128,7 @@ private void handleInternal(SCMCommand command, StateContext context, } catch (IOException e) { LOG.error("Exception occurred while deleting the container.", e); } finally { - totalTime.getAndAdd(Time.monotonicNow() - startTime); + this.opsLatencyMs.add(Time.monotonicNow() - startTime); } } @@ -149,14 +153,12 @@ public int getTimeoutCount() { @Override public long getAverageRunTime() { - final int invocations = invocationCount.get(); - return invocations == 0 ? - 0 : totalTime.get() / invocations; + return (long) this.opsLatencyMs.lastStat().mean(); } @Override public long getTotalRunTime() { - return totalTime.get(); + return (long) this.opsLatencyMs.lastStat().total(); } @Override diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/FinalizeNewLayoutVersionCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/FinalizeNewLayoutVersionCommandHandler.java index bd7ec5710d9..77e152447b9 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/FinalizeNewLayoutVersionCommandHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/FinalizeNewLayoutVersionCommandHandler.java @@ -20,6 +20,8 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.FinalizeNewLayoutVersionCommandProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMCommandProto; +import org.apache.hadoop.metrics2.lib.MetricsRegistry; +import org.apache.hadoop.metrics2.lib.MutableRate; import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine; import org.apache.hadoop.ozone.container.common.statemachine .SCMConnectionManager; @@ -42,12 +44,15 @@ public class FinalizeNewLayoutVersionCommandHandler implements CommandHandler { LoggerFactory.getLogger(FinalizeNewLayoutVersionCommandHandler.class); private AtomicLong invocationCount = new AtomicLong(0); - private long totalTime; + private final MutableRate opsLatencyMs; /** * Constructs a FinalizeNewLayoutVersionCommandHandler. */ public FinalizeNewLayoutVersionCommandHandler() { + MetricsRegistry registry = new MetricsRegistry( + FinalizeNewLayoutVersionCommandHandler.class.getSimpleName()); + this.opsLatencyMs = registry.newRate(SCMCommandProto.Type.finalizeNewLayoutVersionCommand + "Ms"); } /** @@ -82,7 +87,7 @@ public void handle(SCMCommand command, OzoneContainer ozoneContainer, LOG.error("Exception during finalization.", e); } finally { long endTime = Time.monotonicNow(); - totalTime += endTime - startTime; + this.opsLatencyMs.add(endTime - startTime); } } @@ -113,15 +118,12 @@ public int getInvocationCount() { */ @Override public long getAverageRunTime() { - if (invocationCount.get() > 0) { - return totalTime / invocationCount.get(); - } - return 0; + return (long) this.opsLatencyMs.lastStat().mean(); } @Override public long getTotalRunTime() { - return totalTime; + return (long) this.opsLatencyMs.lastStat().total(); } @Override diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReconstructECContainersCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReconstructECContainersCommandHandler.java index 602687d7a00..030d169e9b8 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReconstructECContainersCommandHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReconstructECContainersCommandHandler.java @@ -36,6 +36,7 @@ public class ReconstructECContainersCommandHandler implements CommandHandler { private final ReplicationSupervisor supervisor; private final ECReconstructionCoordinator coordinator; private final ConfigurationSource conf; + private String metricsName; public ReconstructECContainersCommandHandler(ConfigurationSource conf, ReplicationSupervisor supervisor, @@ -52,8 +53,16 @@ public void handle(SCMCommand command, OzoneContainer container, (ReconstructECContainersCommand) command; ECReconstructionCommandInfo reconstructionCommandInfo = new ECReconstructionCommandInfo(ecContainersCommand); - this.supervisor.addTask(new ECReconstructionCoordinatorTask( - coordinator, reconstructionCommandInfo)); + ECReconstructionCoordinatorTask task = new ECReconstructionCoordinatorTask( + coordinator, reconstructionCommandInfo); + if (this.metricsName == null) { + this.metricsName = task.getMetricName(); + } + this.supervisor.addTask(task); + } + + public String getMetricsName() { + return this.metricsName; } @Override @@ -63,23 +72,26 @@ public Type getCommandType() { @Override public int getInvocationCount() { - return 0; + return this.metricsName == null ? 0 : (int) this.supervisor + .getReplicationRequestCount(metricsName); } @Override public long getAverageRunTime() { - return 0; + return this.metricsName == null ? 0 : (int) this.supervisor + .getReplicationRequestAvgTime(metricsName); } @Override public long getTotalRunTime() { - return 0; + return this.metricsName == null ? 0 : this.supervisor + .getReplicationRequestTotalTime(metricsName); } @Override public int getQueuedCount() { - return supervisor - .getInFlightReplications(ECReconstructionCoordinatorTask.class); + return this.metricsName == null ? 0 : (int) this.supervisor + .getReplicationQueuedCount(metricsName); } public ConfigurationSource getConf() { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/RefreshVolumeUsageCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/RefreshVolumeUsageCommandHandler.java index 3c14b2fb161..1ab31ba1c41 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/RefreshVolumeUsageCommandHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/RefreshVolumeUsageCommandHandler.java @@ -18,6 +18,8 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type; +import org.apache.hadoop.metrics2.lib.MetricsRegistry; +import org.apache.hadoop.metrics2.lib.MutableRate; import org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager; import org.apache.hadoop.ozone.container.common.statemachine.StateContext; import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; @@ -27,7 +29,6 @@ import org.slf4j.LoggerFactory; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; /** * Command handler to refresh usage info of all volumes. @@ -38,9 +39,12 @@ public class RefreshVolumeUsageCommandHandler implements CommandHandler { LoggerFactory.getLogger(RefreshVolumeUsageCommandHandler.class); private final AtomicInteger invocationCount = new AtomicInteger(0); - private final AtomicLong totalTime = new AtomicLong(0); + private final MutableRate opsLatencyMs; public RefreshVolumeUsageCommandHandler() { + MetricsRegistry registry = new MetricsRegistry( + RefreshVolumeUsageCommandHandler.class.getSimpleName()); + this.opsLatencyMs = registry.newRate(Type.refreshVolumeUsageInfo + "Ms"); } @Override @@ -50,7 +54,7 @@ public void handle(SCMCommand command, OzoneContainer container, invocationCount.incrementAndGet(); final long startTime = Time.monotonicNow(); container.getVolumeSet().refreshAllVolumeUsage(); - totalTime.getAndAdd(Time.monotonicNow() - startTime); + this.opsLatencyMs.add(Time.monotonicNow() - startTime); } @Override @@ -66,14 +70,12 @@ public int getInvocationCount() { @Override public long getAverageRunTime() { - final int invocations = invocationCount.get(); - return invocations == 0 ? - 0 : totalTime.get() / invocations; + return (long) this.opsLatencyMs.lastStat().mean(); } @Override public long getTotalRunTime() { - return totalTime.get(); + return (long) this.opsLatencyMs.lastStat().total(); } @Override diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java index 21b26339e23..242a4eb74be 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java @@ -43,29 +43,28 @@ public class ReplicateContainerCommandHandler implements CommandHandler { static final Logger LOG = LoggerFactory.getLogger(ReplicateContainerCommandHandler.class); - private int invocationCount; - - private long totalTime; - - private ConfigurationSource conf; - private ReplicationSupervisor supervisor; private ContainerReplicator downloadReplicator; private ContainerReplicator pushReplicator; + private String metricsName; + public ReplicateContainerCommandHandler( ConfigurationSource conf, ReplicationSupervisor supervisor, ContainerReplicator downloadReplicator, ContainerReplicator pushReplicator) { - this.conf = conf; this.supervisor = supervisor; this.downloadReplicator = downloadReplicator; this.pushReplicator = pushReplicator; } + public String getMetricsName() { + return this.metricsName; + } + @Override public void handle(SCMCommand command, OzoneContainer container, StateContext context, SCMConnectionManager connectionManager) { @@ -86,12 +85,16 @@ public void handle(SCMCommand command, OzoneContainer container, downloadReplicator : pushReplicator; ReplicationTask task = new ReplicationTask(replicateCommand, replicator); + if (metricsName == null) { + metricsName = task.getMetricName(); + } supervisor.addTask(task); } @Override public int getQueuedCount() { - return supervisor.getInFlightReplications(ReplicationTask.class); + return this.metricsName == null ? 0 : (int) this.supervisor + .getReplicationQueuedCount(metricsName); } @Override @@ -101,19 +104,19 @@ public SCMCommandProto.Type getCommandType() { @Override public int getInvocationCount() { - return this.invocationCount; + return this.metricsName == null ? 0 : (int) this.supervisor + .getReplicationRequestCount(metricsName); } @Override public long getAverageRunTime() { - if (invocationCount > 0) { - return totalTime / invocationCount; - } - return 0; + return this.metricsName == null ? 0 : (int) this.supervisor + .getReplicationRequestAvgTime(metricsName); } @Override public long getTotalRunTime() { - return totalTime; + return this.metricsName == null ? 0 : this.supervisor + .getReplicationRequestTotalTime(metricsName); } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/SetNodeOperationalStateCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/SetNodeOperationalStateCommandHandler.java index 6f7f4414eeb..33563624795 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/SetNodeOperationalStateCommandHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/SetNodeOperationalStateCommandHandler.java @@ -21,8 +21,10 @@ import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; -import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SetNodeOperationalStateCommandProto; import org.apache.hadoop.hdds.utils.HddsServerUtil; +import org.apache.hadoop.metrics2.lib.MetricsRegistry; +import org.apache.hadoop.metrics2.lib.MutableRate; import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils; import org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager; import org.apache.hadoop.ozone.container.common.statemachine.StateContext; @@ -39,7 +41,6 @@ import java.io.File; import java.io.IOException; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; @@ -54,7 +55,7 @@ public class SetNodeOperationalStateCommandHandler implements CommandHandler { private final ConfigurationSource conf; private final Consumer replicationSupervisor; private final AtomicInteger invocationCount = new AtomicInteger(0); - private final AtomicLong totalTime = new AtomicLong(0); + private final MutableRate opsLatencyMs; /** * Set Node State command handler. @@ -65,6 +66,9 @@ public SetNodeOperationalStateCommandHandler(ConfigurationSource conf, Consumer replicationSupervisor) { this.conf = conf; this.replicationSupervisor = replicationSupervisor; + MetricsRegistry registry = new MetricsRegistry( + SetNodeOperationalStateCommandHandler.class.getSimpleName()); + this.opsLatencyMs = registry.newRate(Type.setNodeOperationalStateCommand + "Ms"); } /** @@ -80,9 +84,6 @@ public void handle(SCMCommand command, OzoneContainer container, StateContext context, SCMConnectionManager connectionManager) { long startTime = Time.monotonicNow(); invocationCount.incrementAndGet(); - StorageContainerDatanodeProtocolProtos.SetNodeOperationalStateCommandProto - setNodeCmdProto = null; - if (command.getType() != Type.setNodeOperationalStateCommand) { LOG.warn("Skipping handling command, expected command " + "type {} but found {}", @@ -91,7 +92,7 @@ public void handle(SCMCommand command, OzoneContainer container, } SetNodeOperationalStateCommand setNodeCmd = (SetNodeOperationalStateCommand) command; - setNodeCmdProto = setNodeCmd.getProto(); + SetNodeOperationalStateCommandProto setNodeCmdProto = setNodeCmd.getProto(); DatanodeDetails dni = context.getParent().getDatanodeDetails(); HddsProtos.NodeOperationalState state = setNodeCmdProto.getNodeOperationalState(); @@ -106,7 +107,7 @@ public void handle(SCMCommand command, OzoneContainer container, // handler interface. } replicationSupervisor.accept(state); - totalTime.addAndGet(Time.monotonicNow() - startTime); + this.opsLatencyMs.add(Time.monotonicNow() - startTime); } // TODO - this duplicates code in HddsDatanodeService and InitDatanodeState @@ -125,8 +126,7 @@ private void persistDatanodeDetails(DatanodeDetails dnDetails) * @return Type */ @Override - public StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type - getCommandType() { + public Type getCommandType() { return Type.setNodeOperationalStateCommand; } @@ -147,14 +147,12 @@ public int getInvocationCount() { */ @Override public long getAverageRunTime() { - final int invocations = invocationCount.get(); - return invocations == 0 ? - 0 : totalTime.get() / invocations; + return (long) this.opsLatencyMs.lastStat().mean(); } @Override public long getTotalRunTime() { - return totalTime.get(); + return (long) this.opsLatencyMs.lastStat().total(); } @Override diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java index 92ff4b6d8d6..9513cac84ef 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java @@ -43,6 +43,8 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ReplicationCommandPriority; +import org.apache.hadoop.metrics2.lib.MetricsRegistry; +import org.apache.hadoop.metrics2.lib.MutableRate; import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration; import org.apache.hadoop.ozone.container.common.statemachine.StateContext; import org.apache.hadoop.ozone.container.replication.ReplicationServer.ReplicationConfig; @@ -50,6 +52,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hadoop.util.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -77,6 +80,10 @@ public final class ReplicationSupervisor { private final Map failureCounter = new ConcurrentHashMap<>(); private final Map timeoutCounter = new ConcurrentHashMap<>(); private final Map skippedCounter = new ConcurrentHashMap<>(); + private final Map queuedCounter = new ConcurrentHashMap<>(); + + private final MetricsRegistry registry; + private final Map opsLatencyMs = new ConcurrentHashMap<>(); private static final Map METRICS_MAP; @@ -218,6 +225,7 @@ private ReplicationSupervisor(StateContext context, ExecutorService executor, nodeStateUpdated(dn.getPersistedOpState()); } } + registry = new MetricsRegistry(ReplicationSupervisor.class.getSimpleName()); } /** @@ -240,6 +248,9 @@ public void addTask(AbstractReplicationTask task) { failureCounter.put(task.getMetricName(), new AtomicLong(0)); timeoutCounter.put(task.getMetricName(), new AtomicLong(0)); skippedCounter.put(task.getMetricName(), new AtomicLong(0)); + queuedCounter.put(task.getMetricName(), new AtomicLong(0)); + opsLatencyMs.put(task.getMetricName(), registry.newRate( + task.getClass().getSimpleName() + "Ms")); METRICS_MAP.put(task.getMetricName(), task.getMetricDescriptionSegment()); } } @@ -253,6 +264,7 @@ public void addTask(AbstractReplicationTask task) { taskCounter.computeIfAbsent(task.getClass(), k -> new AtomicInteger()).incrementAndGet(); } + queuedCounter.get(task.getMetricName()).incrementAndGet(); executor.execute(new TaskRunner(task)); } } @@ -353,6 +365,7 @@ public TaskRunner(AbstractReplicationTask task) { @Override public void run() { + final long startTime = Time.monotonicNow(); try { requestCounter.get(task.getMetricName()).incrementAndGet(); @@ -401,6 +414,8 @@ public void run() { LOG.warn("Failed {}", this, e); failureCounter.get(task.getMetricName()).incrementAndGet(); } finally { + queuedCounter.get(task.getMetricName()).decrementAndGet(); + opsLatencyMs.get(task.getMetricName()).add(Time.monotonicNow() - startTime); inFlight.remove(task); decrementTaskCounter(task); } @@ -511,4 +526,22 @@ public long getReplicationSkippedCount(String metricsName) { return counter != null ? counter.get() : 0; } + public long getReplicationQueuedCount() { + return getCount(queuedCounter); + } + + public long getReplicationQueuedCount(String metricsName) { + AtomicLong counter = queuedCounter.get(metricsName); + return counter != null ? counter.get() : 0; + } + + public long getReplicationRequestAvgTime(String metricsName) { + MutableRate rate = opsLatencyMs.get(metricsName); + return rate != null ? (long) rate.lastStat().mean() : 0; + } + + public long getReplicationRequestTotalTime(String metricsName) { + MutableRate rate = opsLatencyMs.get(metricsName); + return rate != null ? (long) rate.lastStat().total() : 0; + } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisorMetrics.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisorMetrics.java index a1763976af9..cd1103a0c46 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisorMetrics.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisorMetrics.java @@ -67,7 +67,7 @@ public void getMetrics(MetricsCollector collector, boolean all) { supervisor.getTotalInFlightReplications()) .addGauge(Interns.info("numQueuedReplications", "Number of replications in queue"), - supervisor.getQueueSize()) + supervisor.getReplicationQueuedCount()) .addGauge(Interns.info("numRequestedReplications", "Number of requested replications"), supervisor.getReplicationRequestCount()) @@ -107,7 +107,10 @@ public void getMetrics(MetricsCollector collector, boolean all) { .addGauge(Interns.info("numSkipped" + metricsName, "Number of " + descriptionSegment + " skipped as the container is " + "already present"), - supervisor.getReplicationSkippedCount(metricsName)); + supervisor.getReplicationSkippedCount(metricsName)) + .addGauge(Interns.info("numQueued" + metricsName, + "Number of " + descriptionSegment + " in queue"), + supervisor.getReplicationQueuedCount(metricsName)); } }); } diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReconstructECContainersCommandHandler.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReconstructECContainersCommandHandler.java new file mode 100644 index 00000000000..7e6c7608180 --- /dev/null +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReconstructECContainersCommandHandler.java @@ -0,0 +1,139 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.container.common.statemachine.commandhandler; + +import com.google.protobuf.ByteString; +import com.google.protobuf.Proto2Utils; +import org.apache.hadoop.hdds.client.ECReplicationConfig; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.MockDatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto; +import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl; +import org.apache.hadoop.ozone.container.common.helpers.CommandHandlerMetrics; +import org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager; +import org.apache.hadoop.ozone.container.common.statemachine.StateContext; +import org.apache.hadoop.ozone.container.ec.reconstruction.ECReconstructionCoordinator; +import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; +import org.apache.hadoop.ozone.container.replication.ReplicationSupervisor; +import org.apache.hadoop.ozone.protocol.commands.ReconstructECContainersCommand; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.when; + +/** + * Test cases to verify {@link ReconstructECContainersCommandHandler}. + */ +public class TestReconstructECContainersCommandHandler { + private OzoneConfiguration conf; + private ReplicationSupervisor supervisor; + private ECReconstructionCoordinator coordinator; + private OzoneContainer ozoneContainer; + private StateContext stateContext; + private SCMConnectionManager connectionManager; + + @BeforeEach + public void setUp() { + supervisor = mock(ReplicationSupervisor.class); + coordinator = mock(ECReconstructionCoordinator.class); + conf = new OzoneConfiguration(); + ozoneContainer = mock(OzoneContainer.class); + connectionManager = mock(SCMConnectionManager.class); + stateContext = mock(StateContext.class); + } + + @Test + public void testMetrics() { + ReconstructECContainersCommandHandler commandHandler = + new ReconstructECContainersCommandHandler(conf, supervisor, coordinator); + doNothing().when(supervisor).addTask(any()); + Map handlerMap = new HashMap<>(); + handlerMap.put(commandHandler.getCommandType(), commandHandler); + CommandHandlerMetrics metrics = CommandHandlerMetrics.create(handlerMap); + try { + byte[] missingIndexes = {1, 2}; + ByteString missingContainerIndexes = Proto2Utils.unsafeByteString(missingIndexes); + ECReplicationConfig ecReplicationConfig = new ECReplicationConfig(3, 2); + List dnDetails = getDNDetails(5); + List sources = + dnDetails.stream().map(a -> new ReconstructECContainersCommand + .DatanodeDetailsAndReplicaIndex(a, dnDetails.indexOf(a))) + .collect(Collectors.toList()); + List targets = getDNDetails(2); + ReconstructECContainersCommand reconstructECContainersCommand = + new ReconstructECContainersCommand(1L, sources, targets, + missingContainerIndexes, ecReplicationConfig); + + commandHandler.handle(reconstructECContainersCommand, ozoneContainer, + stateContext, connectionManager); + String metricsName = "ECReconstructions"; + assertEquals(commandHandler.getMetricsName(), metricsName); + when(supervisor.getReplicationRequestCount(metricsName)).thenReturn(1L); + assertEquals(commandHandler.getInvocationCount(), 1); + + commandHandler.handle(new ReconstructECContainersCommand(2L, sources, + targets, missingContainerIndexes, ecReplicationConfig), ozoneContainer, + stateContext, connectionManager); + commandHandler.handle(new ReconstructECContainersCommand(3L, sources, + targets, missingContainerIndexes, ecReplicationConfig), ozoneContainer, + stateContext, connectionManager); + commandHandler.handle(new ReconstructECContainersCommand(4L, sources, + targets, missingContainerIndexes, ecReplicationConfig), ozoneContainer, + stateContext, connectionManager); + commandHandler.handle(new ReconstructECContainersCommand(5L, sources, + targets, missingContainerIndexes, ecReplicationConfig), ozoneContainer, + stateContext, connectionManager); + commandHandler.handle(new ReconstructECContainersCommand(6L, sources, + targets, missingContainerIndexes, ecReplicationConfig), ozoneContainer, + stateContext, connectionManager); + + when(supervisor.getReplicationRequestCount(metricsName)).thenReturn(5L); + when(supervisor.getReplicationRequestTotalTime(metricsName)).thenReturn(10L); + when(supervisor.getReplicationRequestAvgTime(metricsName)).thenReturn(2L); + when(supervisor.getReplicationQueuedCount(metricsName)).thenReturn(1L); + assertEquals(commandHandler.getInvocationCount(), 5); + assertEquals(commandHandler.getQueuedCount(), 1); + assertEquals(commandHandler.getTotalRunTime(), 10); + assertEquals(commandHandler.getAverageRunTime(), 2); + + MetricsCollectorImpl metricsCollector = new MetricsCollectorImpl(); + metrics.getMetrics(metricsCollector, true); + assertEquals(1, metricsCollector.getRecords().size()); + } finally { + metrics.unRegister(); + } + } + + private List getDNDetails(int numDns) { + List dns = new ArrayList<>(); + for (int i = 0; i < numDns; i++) { + dns.add(MockDatanodeDetails.randomDatanodeDetails()); + } + return dns; + } +} diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReplicateContainerCommandHandler.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReplicateContainerCommandHandler.java new file mode 100644 index 00000000000..9de00877e5b --- /dev/null +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReplicateContainerCommandHandler.java @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.container.common.statemachine.commandhandler; + +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.MockDatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto; +import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl; +import org.apache.hadoop.ozone.container.common.helpers.CommandHandlerMetrics; +import org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager; +import org.apache.hadoop.ozone.container.common.statemachine.StateContext; +import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; +import org.apache.hadoop.ozone.container.replication.ContainerReplicator; +import org.apache.hadoop.ozone.container.replication.ReplicationSupervisor; +import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.doNothing; + +/** + * Test cases to verify {@link ReplicateContainerCommandHandler}. + */ +public class TestReplicateContainerCommandHandler { + private OzoneConfiguration conf; + private ReplicationSupervisor supervisor; + private ContainerReplicator downloadReplicator; + private ContainerReplicator pushReplicator; + private OzoneContainer ozoneContainer; + private StateContext stateContext; + private SCMConnectionManager connectionManager; + + @BeforeEach + public void setUp() { + conf = new OzoneConfiguration(); + supervisor = mock(ReplicationSupervisor.class); + downloadReplicator = mock(ContainerReplicator.class); + pushReplicator = mock(ContainerReplicator.class); + ozoneContainer = mock(OzoneContainer.class); + connectionManager = mock(SCMConnectionManager.class); + stateContext = mock(StateContext.class); + } + + @Test + public void testMetrics() { + ReplicateContainerCommandHandler commandHandler = + new ReplicateContainerCommandHandler(conf, supervisor, + downloadReplicator, pushReplicator); + Map handlerMap = new HashMap<>(); + handlerMap.put(commandHandler.getCommandType(), commandHandler); + CommandHandlerMetrics metrics = CommandHandlerMetrics.create(handlerMap); + try { + doNothing().when(supervisor).addTask(any()); + DatanodeDetails source = MockDatanodeDetails.randomDatanodeDetails(); + DatanodeDetails target = MockDatanodeDetails.randomDatanodeDetails(); + List sourceList = new ArrayList<>(); + sourceList.add(source); + + ReplicateContainerCommand command = ReplicateContainerCommand.fromSources( + 1, sourceList); + commandHandler.handle(command, ozoneContainer, stateContext, connectionManager); + String metricsName = "ContainerReplications"; + assertEquals(commandHandler.getMetricsName(), metricsName); + when(supervisor.getReplicationRequestCount(metricsName)).thenReturn(1L); + assertEquals(commandHandler.getInvocationCount(), 1); + + commandHandler.handle(ReplicateContainerCommand.fromSources(2, sourceList), + ozoneContainer, stateContext, connectionManager); + commandHandler.handle(ReplicateContainerCommand.fromSources(3, sourceList), + ozoneContainer, stateContext, connectionManager); + commandHandler.handle(ReplicateContainerCommand.toTarget(4, target), + ozoneContainer, stateContext, connectionManager); + commandHandler.handle(ReplicateContainerCommand.toTarget(5, target), + ozoneContainer, stateContext, connectionManager); + commandHandler.handle(ReplicateContainerCommand.fromSources(6, sourceList), + ozoneContainer, stateContext, connectionManager); + + when(supervisor.getReplicationRequestCount(metricsName)).thenReturn(5L); + when(supervisor.getReplicationRequestTotalTime(metricsName)).thenReturn(10L); + when(supervisor.getReplicationRequestAvgTime(metricsName)).thenReturn(3L); + when(supervisor.getReplicationQueuedCount(metricsName)).thenReturn(1L); + assertEquals(commandHandler.getInvocationCount(), 5); + assertEquals(commandHandler.getQueuedCount(), 1); + assertEquals(commandHandler.getTotalRunTime(), 10); + assertEquals(commandHandler.getAverageRunTime(), 3); + + MetricsCollectorImpl metricsCollector = new MetricsCollectorImpl(); + metrics.getMetrics(metricsCollector, true); + assertEquals(1, metricsCollector.getRecords().size()); + } finally { + metrics.unRegister(); + } + } +} diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java index ef37c226653..315e0c0253b 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java @@ -87,6 +87,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ReplicationCommandPriority.LOW; import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ReplicationCommandPriority.NORMAL; import static org.mockito.Mockito.any; @@ -488,6 +489,15 @@ public void testMultipleReplication(ContainerLayoutVersion layout, assertEquals(0, ecReconstructionSupervisor.getReplicationRequestCount( task1.getMetricName())); + assertTrue(replicationSupervisor.getReplicationRequestTotalTime( + task1.getMetricName()) > 0); + assertTrue(ecReconstructionSupervisor.getReplicationRequestTotalTime( + task2.getMetricName()) > 0); + assertTrue(replicationSupervisor.getReplicationRequestAvgTime( + task1.getMetricName()) > 0); + assertTrue(ecReconstructionSupervisor.getReplicationRequestAvgTime( + task2.getMetricName()) > 0); + MetricsCollectorImpl replicationMetricsCollector = new MetricsCollectorImpl(); replicationMetrics.getMetrics(replicationMetricsCollector, true); assertEquals(1, replicationMetricsCollector.getRecords().size());