From 7d04590d42e9e51f08a73f0fbdda4638906325ed Mon Sep 17 00:00:00 2001 From: jianghuazhu <740087514@qq.com> Date: Fri, 30 Aug 2024 23:32:14 +0800 Subject: [PATCH 01/10] HDDS-11376. Improve ReplicationSupervisor to record replication metrics --- .../replication/ReplicationSupervisor.java | 68 +++++-- .../ReplicationSupervisorMetrics.java | 36 +++- .../TestReplicationSupervisor.java | 191 ++++++++++++++++-- 3 files changed, 262 insertions(+), 33 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java index 5ceea125e81..66182727f73 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java @@ -72,10 +72,10 @@ public final class ReplicationSupervisor { private final Clock clock; private final AtomicLong requestCounter = new AtomicLong(); - private final AtomicLong successCounter = new AtomicLong(); - private final AtomicLong failureCounter = new AtomicLong(); - private final AtomicLong timeoutCounter = new AtomicLong(); - private final AtomicLong skippedCounter = new AtomicLong(); + private final Map, AtomicLong> successCounter = new ConcurrentHashMap<>(); + private final Map, AtomicLong> failureCounter = new ConcurrentHashMap<>(); + private final Map, AtomicLong> timeoutCounter = new ConcurrentHashMap<>(); + private final Map, AtomicLong> skippedCounter = new ConcurrentHashMap<>(); /** * A set of container IDs that are currently being downloaded @@ -221,6 +221,17 @@ public void addTask(AbstractReplicationTask task) { return; } + if (successCounter.get(task.getClass()) == null) { + synchronized (this) { + if (successCounter.get(task.getClass()) == null) { + successCounter.put(task.getClass(), new AtomicLong(0)); + failureCounter.put(task.getClass(), new AtomicLong(0)); + timeoutCounter.put(task.getClass(), new AtomicLong(0)); + skippedCounter.put(task.getClass(), new AtomicLong(0)); + } + } + } + if (inFlight.add(task)) { if (task.getPriority() != ReplicationCommandPriority.LOW) { // Low priority tasks are not included in the replication queue sizes @@ -337,7 +348,7 @@ public void run() { if (deadline > 0 && now > deadline) { LOG.info("Ignoring {} since the deadline has passed ({} < {})", this, Instant.ofEpochMilli(deadline), Instant.ofEpochMilli(now)); - timeoutCounter.incrementAndGet(); + timeoutCounter.get(task.getClass()).incrementAndGet(); return; } @@ -364,18 +375,18 @@ public void run() { task.runTask(); if (task.getStatus() == Status.FAILED) { LOG.warn("Failed {}", this); - failureCounter.incrementAndGet(); + failureCounter.get(task.getClass()).incrementAndGet(); } else if (task.getStatus() == Status.DONE) { LOG.info("Successful {}", this); - successCounter.incrementAndGet(); + successCounter.get(task.getClass()).incrementAndGet(); } else if (task.getStatus() == Status.SKIPPED) { LOG.info("Skipped {}", this); - skippedCounter.incrementAndGet(); + skippedCounter.get(task.getClass()).incrementAndGet(); } } catch (Exception e) { task.setStatus(Status.FAILED); LOG.warn("Failed {}", this, e); - failureCounter.incrementAndGet(); + failureCounter.get(task.getClass()).incrementAndGet(); } finally { inFlight.remove(task); decrementTaskCounter(task); @@ -438,20 +449,51 @@ public long getMaxReplicationStreams() { } } + private long getCount(Map, AtomicLong> counter) { + if (counter.isEmpty()) { + return 0; + } + AtomicLong total = new AtomicLong(0); + counter.forEach((key, value) -> { + total.set(total.get() + value.get()); + }); + return total.get(); + } + public long getReplicationSuccessCount() { - return successCounter.get(); + return getCount(successCounter); + } + + public long getReplicationSuccessCount(Class cls) { + AtomicLong counter = successCounter.get(cls); + return counter != null ? counter.get() : 0; } public long getReplicationFailureCount() { - return failureCounter.get(); + return getCount(failureCounter); + } + + public long getReplicationFailureCount(Class cls) { + AtomicLong counter = failureCounter.get(cls); + return counter != null ? counter.get() : 0; } public long getReplicationTimeoutCount() { - return timeoutCounter.get(); + return getCount(timeoutCounter); + } + + public long getReplicationTimeoutCount(Class cls) { + AtomicLong counter = timeoutCounter.get(cls); + return counter != null ? counter.get() : 0; } public long getReplicationSkippedCount() { - return skippedCounter.get(); + return getCount(skippedCounter); + } + + public long getReplicationSkippedCount(Class cls) { + AtomicLong counter = skippedCounter.get(cls); + return counter != null ? counter.get() : 0; } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisorMetrics.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisorMetrics.java index 671e985d7ad..cd11bfc4d8f 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisorMetrics.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisorMetrics.java @@ -26,6 +26,7 @@ import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.metrics2.lib.Interns; import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.container.ec.reconstruction.ECReconstructionCoordinatorTask; import java.util.Map; @@ -71,12 +72,45 @@ public void getMetrics(MetricsCollector collector, boolean all) { .addGauge(Interns.info("numRequestedReplications", "Number of requested replications"), supervisor.getReplicationRequestCount()) + .addGauge(Interns.info("numSuccessReplications", + "Number of successful replications"), + supervisor.getReplicationSuccessCount()) + .addGauge(Interns.info("numSuccessECReconstructions", + "Number of successful EC reconstructions"), + supervisor.getReplicationSuccessCount(ECReconstructionCoordinatorTask.class)) + .addGauge(Interns.info("numSuccessContainerReplications", + "Number of successful container replications"), + supervisor.getReplicationSuccessCount(ReplicationTask.class)) + .addGauge(Interns.info("numFailureReplications", + "Number of failure replications"), + supervisor.getReplicationFailureCount()) + .addGauge(Interns.info("numFailureECReconstructions", + "Number of failure EC reconstructions"), + supervisor.getReplicationFailureCount(ECReconstructionCoordinatorTask.class)) + .addGauge(Interns.info("numFailureContainerReplications", + "Number of failure container replications"), + supervisor.getReplicationFailureCount(ReplicationTask.class)) .addGauge(Interns.info("numTimeoutReplications", "Number of replication requests timed out before being processed"), supervisor.getReplicationTimeoutCount()) + .addGauge(Interns.info("numTimeoutECReconstructions", + "Number of EC reconstructions timed out before being processed"), + supervisor.getReplicationTimeoutCount(ECReconstructionCoordinatorTask.class)) + .addGauge(Interns.info("numTimeoutContainerReplications", + "Number of container replications timed out before being processed"), + supervisor.getReplicationTimeoutCount(ReplicationTask.class)) .addGauge(Interns.info("numSkippedReplications", "Number of replication requests skipped as the container is " - + "already present"), supervisor.getReplicationSkippedCount()) + + "already present"), + supervisor.getReplicationSkippedCount()) + .addGauge(Interns.info("numSkippedECReconstructions", + "Number of EC reconstructions skipped as the container is " + + "already present"), + supervisor.getReplicationSkippedCount(ECReconstructionCoordinatorTask.class)) + .addGauge(Interns.info("numSkippedContainerReplications", + "Number of container replications skipped as the container is " + + "already present"), + supervisor.getReplicationSkippedCount(ReplicationTask.class)) .addGauge(Interns.info("maxReplicationStreams", "Maximum number of " + "concurrent replication tasks which can run simultaneously"), supervisor.getMaxReplicationStreams()); diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java index 1f69db78d62..f992a72e08a 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java @@ -27,6 +27,7 @@ import java.time.Instant; import java.time.ZoneId; import java.util.List; +import java.util.SortedMap; import java.util.UUID; import java.util.concurrent.AbstractExecutorService; import java.util.concurrent.CountDownLatch; @@ -46,6 +47,8 @@ import org.apache.hadoop.hdds.protocol.MockDatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ReplicationCommandPriority; +import org.apache.hadoop.hdds.security.symmetric.SecretKeySignerClient; +import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient; import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl; import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion; import org.apache.hadoop.ozone.container.common.impl.ContainerSet; @@ -55,7 +58,9 @@ import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine; import org.apache.hadoop.ozone.container.common.statemachine.StateContext; import org.apache.hadoop.ozone.container.ec.reconstruction.ECReconstructionCommandInfo; +import org.apache.hadoop.ozone.container.ec.reconstruction.ECReconstructionCoordinator; import org.apache.hadoop.ozone.container.ec.reconstruction.ECReconstructionCoordinatorTask; +import org.apache.hadoop.ozone.container.ec.reconstruction.ECReconstructionMetrics; import org.apache.hadoop.ozone.container.keyvalue.ContainerLayoutTestInfo; import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer; import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData; @@ -109,6 +114,8 @@ public class TestReplicationSupervisor { }; private final AtomicReference replicatorRef = new AtomicReference<>(); + private final AtomicReference ecReplicatorRef = + new AtomicReference<>(); private ContainerSet set; @@ -135,6 +142,7 @@ public void setUp() throws Exception { @AfterEach public void cleanup() { replicatorRef.set(null); + ecReplicatorRef.set(null); } @ContainerLayoutTestInfo.ContainerTest @@ -394,6 +402,97 @@ public void taskWithObsoleteTermIsDropped(ContainerLayoutVersion layout) { assertEquals(0, supervisor.getReplicationSuccessCount()); } + @ContainerLayoutTestInfo.ContainerTest + public void testMultipleReplication(ContainerLayoutVersion layout, + @TempDir File tempFile) throws IOException { + this.layoutVersion = layout; + OzoneConfiguration conf = new OzoneConfiguration(); + // GIVEN + ReplicationSupervisor replicationSupervisor = + supervisorWithReplicator(FakeReplicator::new); + ReplicationSupervisor ecReconstructionSupervisor = supervisorWithECReconstruction(); + ReplicationSupervisorMetrics replicationMetrics = + ReplicationSupervisorMetrics.create(replicationSupervisor); + ReplicationSupervisorMetrics ecReconstructionMetrics = + ReplicationSupervisorMetrics.create(ecReconstructionSupervisor); + try { + //WHEN + replicationSupervisor.addTask(createTask(1L)); + ecReconstructionSupervisor.addTask(createECTaskWithCoordinator(2L)); + replicationSupervisor.addTask(createTask(1L)); + replicationSupervisor.addTask(createTask(3L)); + ecReconstructionSupervisor.addTask(createECTaskWithCoordinator(4L)); + + SimpleContainerDownloader moc = mock(SimpleContainerDownloader.class); + Path res = Paths.get("file:/tmp/no-such-file"); + when(moc.getContainerDataFromReplicas(anyLong(), anyList(), + any(Path.class), any())).thenReturn(res); + + final String testDir = tempFile.getPath(); + MutableVolumeSet volumeSet = mock(MutableVolumeSet.class); + when(volumeSet.getVolumesList()).thenReturn(singletonList( + new HddsVolume.Builder(testDir).conf(conf).build())); + ContainerController mockedCC = mock(ContainerController.class); + ContainerImporter importer = new ContainerImporter(conf, set, mockedCC, volumeSet); + ContainerReplicator replicator = new DownloadAndImportReplicator( + conf, set, importer, moc); + replicatorRef.set(replicator); + replicationSupervisor.addTask(createTask(5L)); + + ReplicateContainerCommand cmd1 = createCommand(6L); + cmd1.setDeadline(clock.millis() + 10000); + ReplicationTask task1 = new ReplicationTask(cmd1, replicatorRef.get()); + clock.fastForward(15000); + replicationSupervisor.addTask(task1); + + ReconstructECContainersCommand cmd2 = createReconstructionCmd(7L); + cmd2.setDeadline(clock.millis() + 10000); + ECReconstructionCoordinatorTask task2 = new ECReconstructionCoordinatorTask( + ecReplicatorRef.get(), new ECReconstructionCommandInfo(cmd2)); + clock.fastForward(15000); + ecReconstructionSupervisor.addTask(task2); + ecReconstructionSupervisor.addTask(createECTask(8L)); + ecReconstructionSupervisor.addTask(createECTask(9L)); + + //THEN + assertEquals(2, replicationSupervisor.getReplicationSuccessCount()); + assertEquals(2, replicationSupervisor.getReplicationSuccessCount( + ReplicationTask.class)); + assertEquals(1, replicationSupervisor.getReplicationFailureCount()); + assertEquals(1, replicationSupervisor.getReplicationFailureCount( + ReplicationTask.class)); + assertEquals(1, replicationSupervisor.getReplicationSkippedCount()); + assertEquals(1, replicationSupervisor.getReplicationSkippedCount( + ReplicationTask.class)); + assertEquals(1, replicationSupervisor.getReplicationTimeoutCount()); + assertEquals(1, replicationSupervisor.getReplicationTimeoutCount( + ReplicationTask.class)); + + assertEquals(2, ecReconstructionSupervisor.getReplicationSuccessCount()); + assertEquals(2, ecReconstructionSupervisor.getReplicationSuccessCount( + ECReconstructionCoordinatorTask.class)); + assertEquals(1, ecReconstructionSupervisor.getReplicationTimeoutCount()); + assertEquals(1, ecReconstructionSupervisor.getReplicationTimeoutCount( + ECReconstructionCoordinatorTask.class)); + assertEquals(2, ecReconstructionSupervisor.getReplicationFailureCount()); + assertEquals(2, ecReconstructionSupervisor.getReplicationFailureCount( + ECReconstructionCoordinatorTask.class)); + + MetricsCollectorImpl replicationMetricsCollector = new MetricsCollectorImpl(); + replicationMetrics.getMetrics(replicationMetricsCollector, true); + assertEquals(1, replicationMetricsCollector.getRecords().size()); + + MetricsCollectorImpl ecReconstructionMetricsCollector = new MetricsCollectorImpl(); + ecReconstructionMetrics.getMetrics(ecReconstructionMetricsCollector, true); + assertEquals(1, ecReconstructionMetricsCollector.getRecords().size()); + } finally { + replicationMetrics.unRegister(); + ecReconstructionMetrics.unRegister(); + replicationSupervisor.stop(); + ecReconstructionSupervisor.stop(); + } + } + @ContainerLayoutTestInfo.ContainerTest public void testPriorityOrdering(ContainerLayoutVersion layout) throws InterruptedException { @@ -531,6 +630,22 @@ private ReplicationSupervisor supervisorWith( return supervisor; } + private ReplicationSupervisor supervisorWithECReconstruction() throws IOException { + ConfigurationSource conf = new OzoneConfiguration(); + ExecutorService executor = newDirectExecutorService(); + ReplicationServer.ReplicationConfig repConf = + conf.getObject(ReplicationServer.ReplicationConfig.class); + ReplicationSupervisor supervisor = ReplicationSupervisor.newBuilder() + .stateContext(context).replicationConfig(repConf).executor(executor) + .clock(clock).build(); + + FakeECReconstructionCoordinator coordinator = new FakeECReconstructionCoordinator( + new OzoneConfiguration(), null, null, context, + ECReconstructionMetrics.create(), "", supervisor); + ecReplicatorRef.set(coordinator); + return supervisor; + } + private ReplicationTask createTask(long containerId) { ReplicateContainerCommand cmd = createCommand(containerId); return new ReplicationTask(cmd, replicatorRef.get()); @@ -538,7 +653,13 @@ private ReplicationTask createTask(long containerId) { private ECReconstructionCoordinatorTask createECTask(long containerId) { return new ECReconstructionCoordinatorTask(null, - createReconstructionCmd(containerId)); + createReconstructionCmdInfo(containerId)); + } + + private ECReconstructionCoordinatorTask createECTaskWithCoordinator(long containerId) { + ECReconstructionCommandInfo ecReconstructionCommandInfo = createReconstructionCmdInfo(containerId); + return new ECReconstructionCoordinatorTask(ecReplicatorRef.get(), + ecReconstructionCommandInfo); } private static ReplicateContainerCommand createCommand(long containerId) { @@ -548,18 +669,20 @@ private static ReplicateContainerCommand createCommand(long containerId) { return cmd; } - private static ECReconstructionCommandInfo createReconstructionCmd( + private static ECReconstructionCommandInfo createReconstructionCmdInfo( long containerId) { - List sources - = new ArrayList<>(); - sources.add(new ReconstructECContainersCommand - .DatanodeDetailsAndReplicaIndex( - MockDatanodeDetails.randomDatanodeDetails(), 1)); - sources.add(new ReconstructECContainersCommand - .DatanodeDetailsAndReplicaIndex( + return new ECReconstructionCommandInfo(createReconstructionCmd(containerId)); + } + + private static ReconstructECContainersCommand createReconstructionCmd( + long containerId) { + List sources = + new ArrayList<>(); + sources.add(new ReconstructECContainersCommand.DatanodeDetailsAndReplicaIndex( + MockDatanodeDetails.randomDatanodeDetails(), 1)); + sources.add(new ReconstructECContainersCommand.DatanodeDetailsAndReplicaIndex( MockDatanodeDetails.randomDatanodeDetails(), 2)); - sources.add(new ReconstructECContainersCommand - .DatanodeDetailsAndReplicaIndex( + sources.add(new ReconstructECContainersCommand.DatanodeDetailsAndReplicaIndex( MockDatanodeDetails.randomDatanodeDetails(), 3)); byte[] missingIndexes = new byte[1]; @@ -567,14 +690,44 @@ private static ECReconstructionCommandInfo createReconstructionCmd( List target = singletonList( MockDatanodeDetails.randomDatanodeDetails()); - ReconstructECContainersCommand cmd = - new ReconstructECContainersCommand(containerId, - sources, - target, - Proto2Utils.unsafeByteString(missingIndexes), - new ECReplicationConfig(3, 2)); - - return new ECReconstructionCommandInfo(cmd); + ReconstructECContainersCommand cmd = new ReconstructECContainersCommand(containerId, sources, target, + Proto2Utils.unsafeByteString(missingIndexes), + new ECReplicationConfig(3, 2)); + cmd.setTerm(CURRENT_TERM); + return cmd; + } + + /** + * A fake coordinator that simulates successful reconstruction of ec containers. + */ + private class FakeECReconstructionCoordinator extends ECReconstructionCoordinator { + + private final OzoneConfiguration conf = new OzoneConfiguration(); + private final ReplicationSupervisor supervisor; + + public FakeECReconstructionCoordinator(ConfigurationSource conf, + CertificateClient certificateClient, SecretKeySignerClient secretKeyClient, + StateContext context, ECReconstructionMetrics metrics, String threadNamePrefix, + ReplicationSupervisor supervisor) + throws IOException { + super(conf, certificateClient, secretKeyClient, context, metrics, threadNamePrefix); + this.supervisor = supervisor; + } + + @Override + public void reconstructECContainerGroup(long containerID, + ECReplicationConfig repConfig, SortedMap sourceNodeMap, + SortedMap targetNodeMap) { + assertEquals(1, supervisor.getTotalInFlightReplications()); + + KeyValueContainerData kvcd = new KeyValueContainerData( + containerID, layoutVersion, 100L, + UUID.randomUUID().toString(), UUID.randomUUID().toString()); + KeyValueContainer kvc = new KeyValueContainer(kvcd, conf); + assertDoesNotThrow(() -> { + set.addContainer(kvc); + }); + } } /** From 9627505f1cb991c2e338e80f728338ce461b009a Mon Sep 17 00:00:00 2001 From: jianghuazhu <740087514@qq.com> Date: Fri, 30 Aug 2024 23:41:05 +0800 Subject: [PATCH 02/10] Fix checkstyle. --- .../ozone/container/replication/TestReplicationSupervisor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java index f992a72e08a..bdecfd51fcd 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java @@ -705,7 +705,7 @@ private class FakeECReconstructionCoordinator extends ECReconstructionCoordinato private final OzoneConfiguration conf = new OzoneConfiguration(); private final ReplicationSupervisor supervisor; - public FakeECReconstructionCoordinator(ConfigurationSource conf, + FakeECReconstructionCoordinator(ConfigurationSource conf, CertificateClient certificateClient, SecretKeySignerClient secretKeyClient, StateContext context, ECReconstructionMetrics metrics, String threadNamePrefix, ReplicationSupervisor supervisor) From e25bb91545f42811d4ae6b72e44ba1e32362d2cd Mon Sep 17 00:00:00 2001 From: jianghuazhu <740087514@qq.com> Date: Thu, 5 Sep 2024 12:35:43 +0800 Subject: [PATCH 03/10] Improve requestCounter to record metrics for different replication types. --- .../replication/ReplicationSupervisor.java | 16 +++++++++++----- .../ReplicationSupervisorMetrics.java | 6 ++++++ .../replication/TestReplicationSupervisor.java | 10 ++++++++++ 3 files changed, 27 insertions(+), 5 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java index 66182727f73..0a250eb4d2b 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java @@ -71,7 +71,7 @@ public final class ReplicationSupervisor { private final StateContext context; private final Clock clock; - private final AtomicLong requestCounter = new AtomicLong(); + private final Map, AtomicLong> requestCounter = new ConcurrentHashMap<>(); private final Map, AtomicLong> successCounter = new ConcurrentHashMap<>(); private final Map, AtomicLong> failureCounter = new ConcurrentHashMap<>(); private final Map, AtomicLong> timeoutCounter = new ConcurrentHashMap<>(); @@ -221,9 +221,10 @@ public void addTask(AbstractReplicationTask task) { return; } - if (successCounter.get(task.getClass()) == null) { + if (requestCounter.get(task.getClass()) == null) { synchronized (this) { - if (successCounter.get(task.getClass()) == null) { + if (requestCounter.get(task.getClass()) == null) { + requestCounter.put(task.getClass(), new AtomicLong(0)); successCounter.put(task.getClass(), new AtomicLong(0)); failureCounter.put(task.getClass(), new AtomicLong(0)); timeoutCounter.put(task.getClass(), new AtomicLong(0)); @@ -341,7 +342,7 @@ public TaskRunner(AbstractReplicationTask task) { @Override public void run() { try { - requestCounter.incrementAndGet(); + requestCounter.get(task.getClass()).incrementAndGet(); final long now = clock.millis(); final long deadline = task.getDeadline(); @@ -430,7 +431,12 @@ public boolean equals(Object o) { } public long getReplicationRequestCount() { - return requestCounter.get(); + return getCount(requestCounter); + } + + public long getReplicationRequestCount(Class cls) { + AtomicLong counter = requestCounter.get(cls); + return counter != null ? counter.get() : 0; } public long getQueueSize() { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisorMetrics.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisorMetrics.java index cd11bfc4d8f..6a618683a96 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisorMetrics.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisorMetrics.java @@ -72,6 +72,12 @@ public void getMetrics(MetricsCollector collector, boolean all) { .addGauge(Interns.info("numRequestedReplications", "Number of requested replications"), supervisor.getReplicationRequestCount()) + .addGauge(Interns.info("numRequestedECReconstructions", + "Number of requested EC reconstructions"), + supervisor.getReplicationRequestCount(ECReconstructionCoordinatorTask.class)) + .addGauge(Interns.info("numRequestedContainerReplications", + "Number of requested container replications"), + supervisor.getReplicationRequestCount(ReplicationTask.class)) .addGauge(Interns.info("numSuccessReplications", "Number of successful replications"), supervisor.getReplicationSuccessCount()) diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java index bdecfd51fcd..44b28bda8b9 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java @@ -467,6 +467,11 @@ public void testMultipleReplication(ContainerLayoutVersion layout, assertEquals(1, replicationSupervisor.getReplicationTimeoutCount()); assertEquals(1, replicationSupervisor.getReplicationTimeoutCount( ReplicationTask.class)); + assertEquals(5, replicationSupervisor.getReplicationRequestCount()); + assertEquals(5, replicationSupervisor.getReplicationRequestCount( + ReplicationTask.class)); + assertEquals(0, replicationSupervisor.getReplicationRequestCount( + ECReconstructionCoordinatorTask.class)); assertEquals(2, ecReconstructionSupervisor.getReplicationSuccessCount()); assertEquals(2, ecReconstructionSupervisor.getReplicationSuccessCount( @@ -477,6 +482,11 @@ public void testMultipleReplication(ContainerLayoutVersion layout, assertEquals(2, ecReconstructionSupervisor.getReplicationFailureCount()); assertEquals(2, ecReconstructionSupervisor.getReplicationFailureCount( ECReconstructionCoordinatorTask.class)); + assertEquals(5, ecReconstructionSupervisor.getReplicationRequestCount()); + assertEquals(5, ecReconstructionSupervisor.getReplicationRequestCount( + ECReconstructionCoordinatorTask.class)); + assertEquals(0, ecReconstructionSupervisor.getReplicationRequestCount( + ReplicationTask.class)); MetricsCollectorImpl replicationMetricsCollector = new MetricsCollectorImpl(); replicationMetrics.getMetrics(replicationMetricsCollector, true); From 9e6ff2983662ca8511ae78fcd1dfc216fcd5b1d8 Mon Sep 17 00:00:00 2001 From: jianghuazhu <740087514@qq.com> Date: Fri, 6 Sep 2024 17:39:48 +0800 Subject: [PATCH 04/10] Update some code. --- .../ECReconstructionCoordinatorTask.java | 10 +++ .../replication/AbstractReplicationTask.java | 8 +++ .../replication/ReplicationSupervisor.java | 62 ++++++++++--------- .../ReplicationSupervisorMetrics.java | 56 +++++++---------- .../replication/ReplicationTask.java | 10 +++ .../TestReplicationSupervisor.java | 22 +++---- 6 files changed, 95 insertions(+), 73 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinatorTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinatorTask.java index 6d32f3a3f3e..a50a125f6d4 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinatorTask.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinatorTask.java @@ -46,6 +46,16 @@ public ECReconstructionCoordinatorTask( debugString = reconstructionCommandInfo.toString(); } + @Override + public String getMetricName() { + return "ECReconstructions"; + } + + @Override + public String getMetricDescriptionSegment() { + return "EC reconstructions"; + } + @Override public void runTask() { // Implement the coordinator logic to handle a container group diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/AbstractReplicationTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/AbstractReplicationTask.java index 72fa88b35d9..fcc4eb31371 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/AbstractReplicationTask.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/AbstractReplicationTask.java @@ -70,6 +70,14 @@ protected AbstractReplicationTask(long containerID, this.term = term; queued = Instant.now(clock); } + + protected String getMetricName() { + return ""; + } + + protected String getMetricDescriptionSegment() { + return ""; + } public long getContainerId() { return containerId; diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java index 0a250eb4d2b..22aecbdaa68 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java @@ -71,11 +71,14 @@ public final class ReplicationSupervisor { private final StateContext context; private final Clock clock; - private final Map, AtomicLong> requestCounter = new ConcurrentHashMap<>(); - private final Map, AtomicLong> successCounter = new ConcurrentHashMap<>(); - private final Map, AtomicLong> failureCounter = new ConcurrentHashMap<>(); - private final Map, AtomicLong> timeoutCounter = new ConcurrentHashMap<>(); - private final Map, AtomicLong> skippedCounter = new ConcurrentHashMap<>(); + private final Map requestCounter = new ConcurrentHashMap<>(); + private final Map successCounter = new ConcurrentHashMap<>(); + private final Map failureCounter = new ConcurrentHashMap<>(); + private final Map timeoutCounter = new ConcurrentHashMap<>(); + private final Map skippedCounter = new ConcurrentHashMap<>(); + + @SuppressWarnings({"checkstyle:VisibilityModifier", "checkstyle:StaticVariableName"}) + public static Map METRICS_MAP = new HashMap<>(); /** * A set of container IDs that are currently being downloaded @@ -221,14 +224,15 @@ public void addTask(AbstractReplicationTask task) { return; } - if (requestCounter.get(task.getClass()) == null) { + if (requestCounter.get(task.getMetricName()) == null) { synchronized (this) { - if (requestCounter.get(task.getClass()) == null) { - requestCounter.put(task.getClass(), new AtomicLong(0)); - successCounter.put(task.getClass(), new AtomicLong(0)); - failureCounter.put(task.getClass(), new AtomicLong(0)); - timeoutCounter.put(task.getClass(), new AtomicLong(0)); - skippedCounter.put(task.getClass(), new AtomicLong(0)); + if (requestCounter.get(task.getMetricName()) == null) { + requestCounter.put(task.getMetricName(), new AtomicLong(0)); + successCounter.put(task.getMetricName(), new AtomicLong(0)); + failureCounter.put(task.getMetricName(), new AtomicLong(0)); + timeoutCounter.put(task.getMetricName(), new AtomicLong(0)); + skippedCounter.put(task.getMetricName(), new AtomicLong(0)); + METRICS_MAP.put(task.getMetricName(), task.getMetricDescriptionSegment()); } } } @@ -342,14 +346,14 @@ public TaskRunner(AbstractReplicationTask task) { @Override public void run() { try { - requestCounter.get(task.getClass()).incrementAndGet(); + requestCounter.get(task.getMetricName()).incrementAndGet(); final long now = clock.millis(); final long deadline = task.getDeadline(); if (deadline > 0 && now > deadline) { LOG.info("Ignoring {} since the deadline has passed ({} < {})", this, Instant.ofEpochMilli(deadline), Instant.ofEpochMilli(now)); - timeoutCounter.get(task.getClass()).incrementAndGet(); + timeoutCounter.get(task.getMetricName()).incrementAndGet(); return; } @@ -376,18 +380,18 @@ public void run() { task.runTask(); if (task.getStatus() == Status.FAILED) { LOG.warn("Failed {}", this); - failureCounter.get(task.getClass()).incrementAndGet(); + failureCounter.get(task.getMetricName()).incrementAndGet(); } else if (task.getStatus() == Status.DONE) { LOG.info("Successful {}", this); - successCounter.get(task.getClass()).incrementAndGet(); + successCounter.get(task.getMetricName()).incrementAndGet(); } else if (task.getStatus() == Status.SKIPPED) { LOG.info("Skipped {}", this); - skippedCounter.get(task.getClass()).incrementAndGet(); + skippedCounter.get(task.getMetricName()).incrementAndGet(); } } catch (Exception e) { task.setStatus(Status.FAILED); LOG.warn("Failed {}", this, e); - failureCounter.get(task.getClass()).incrementAndGet(); + failureCounter.get(task.getMetricName()).incrementAndGet(); } finally { inFlight.remove(task); decrementTaskCounter(task); @@ -434,8 +438,8 @@ public long getReplicationRequestCount() { return getCount(requestCounter); } - public long getReplicationRequestCount(Class cls) { - AtomicLong counter = requestCounter.get(cls); + public long getReplicationRequestCount(String metricsName) { + AtomicLong counter = requestCounter.get(metricsName); return counter != null ? counter.get() : 0; } @@ -455,7 +459,7 @@ public long getMaxReplicationStreams() { } } - private long getCount(Map, AtomicLong> counter) { + private long getCount(Map counter) { if (counter.isEmpty()) { return 0; } @@ -470,8 +474,8 @@ public long getReplicationSuccessCount() { return getCount(successCounter); } - public long getReplicationSuccessCount(Class cls) { - AtomicLong counter = successCounter.get(cls); + public long getReplicationSuccessCount(String metricsName) { + AtomicLong counter = successCounter.get(metricsName); return counter != null ? counter.get() : 0; } @@ -479,8 +483,8 @@ public long getReplicationFailureCount() { return getCount(failureCounter); } - public long getReplicationFailureCount(Class cls) { - AtomicLong counter = failureCounter.get(cls); + public long getReplicationFailureCount(String metricsName) { + AtomicLong counter = failureCounter.get(metricsName); return counter != null ? counter.get() : 0; } @@ -488,8 +492,8 @@ public long getReplicationTimeoutCount() { return getCount(timeoutCounter); } - public long getReplicationTimeoutCount(Class cls) { - AtomicLong counter = timeoutCounter.get(cls); + public long getReplicationTimeoutCount(String metricsName) { + AtomicLong counter = timeoutCounter.get(metricsName); return counter != null ? counter.get() : 0; } @@ -497,8 +501,8 @@ public long getReplicationSkippedCount() { return getCount(skippedCounter); } - public long getReplicationSkippedCount(Class cls) { - AtomicLong counter = skippedCounter.get(cls); + public long getReplicationSkippedCount(String metricsName) { + AtomicLong counter = skippedCounter.get(metricsName); return counter != null ? counter.get() : 0; } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisorMetrics.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisorMetrics.java index 6a618683a96..9322fa75fd8 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisorMetrics.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisorMetrics.java @@ -26,7 +26,6 @@ import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.metrics2.lib.Interns; import org.apache.hadoop.ozone.OzoneConsts; -import org.apache.hadoop.ozone.container.ec.reconstruction.ECReconstructionCoordinatorTask; import java.util.Map; @@ -72,55 +71,46 @@ public void getMetrics(MetricsCollector collector, boolean all) { .addGauge(Interns.info("numRequestedReplications", "Number of requested replications"), supervisor.getReplicationRequestCount()) - .addGauge(Interns.info("numRequestedECReconstructions", - "Number of requested EC reconstructions"), - supervisor.getReplicationRequestCount(ECReconstructionCoordinatorTask.class)) - .addGauge(Interns.info("numRequestedContainerReplications", - "Number of requested container replications"), - supervisor.getReplicationRequestCount(ReplicationTask.class)) .addGauge(Interns.info("numSuccessReplications", "Number of successful replications"), supervisor.getReplicationSuccessCount()) - .addGauge(Interns.info("numSuccessECReconstructions", - "Number of successful EC reconstructions"), - supervisor.getReplicationSuccessCount(ECReconstructionCoordinatorTask.class)) - .addGauge(Interns.info("numSuccessContainerReplications", - "Number of successful container replications"), - supervisor.getReplicationSuccessCount(ReplicationTask.class)) .addGauge(Interns.info("numFailureReplications", "Number of failure replications"), supervisor.getReplicationFailureCount()) - .addGauge(Interns.info("numFailureECReconstructions", - "Number of failure EC reconstructions"), - supervisor.getReplicationFailureCount(ECReconstructionCoordinatorTask.class)) - .addGauge(Interns.info("numFailureContainerReplications", - "Number of failure container replications"), - supervisor.getReplicationFailureCount(ReplicationTask.class)) .addGauge(Interns.info("numTimeoutReplications", "Number of replication requests timed out before being processed"), supervisor.getReplicationTimeoutCount()) - .addGauge(Interns.info("numTimeoutECReconstructions", - "Number of EC reconstructions timed out before being processed"), - supervisor.getReplicationTimeoutCount(ECReconstructionCoordinatorTask.class)) - .addGauge(Interns.info("numTimeoutContainerReplications", - "Number of container replications timed out before being processed"), - supervisor.getReplicationTimeoutCount(ReplicationTask.class)) .addGauge(Interns.info("numSkippedReplications", "Number of replication requests skipped as the container is " + "already present"), supervisor.getReplicationSkippedCount()) - .addGauge(Interns.info("numSkippedECReconstructions", - "Number of EC reconstructions skipped as the container is " - + "already present"), - supervisor.getReplicationSkippedCount(ECReconstructionCoordinatorTask.class)) - .addGauge(Interns.info("numSkippedContainerReplications", - "Number of container replications skipped as the container is " - + "already present"), - supervisor.getReplicationSkippedCount(ReplicationTask.class)) .addGauge(Interns.info("maxReplicationStreams", "Maximum number of " + "concurrent replication tasks which can run simultaneously"), supervisor.getMaxReplicationStreams()); + if (!ReplicationSupervisor.METRICS_MAP.isEmpty()) { + ReplicationSupervisor.METRICS_MAP.forEach((metricsName, descriptionSegment) -> { + if (!metricsName.equals("")) { + builder.addGauge(Interns.info("numRequested" + metricsName, + "Number of requested " + descriptionSegment), + supervisor.getReplicationRequestCount(metricsName)) + .addGauge(Interns.info("numSuccess" + metricsName, + "Number of successful " + descriptionSegment), + supervisor.getReplicationSuccessCount(metricsName)) + .addGauge(Interns.info("numFailure" + metricsName, + "Number of failure " + descriptionSegment), + supervisor.getReplicationFailureCount(metricsName)) + .addGauge(Interns.info("numTimeout" + metricsName, + "Number of " + descriptionSegment + " timed out before being processed"), + supervisor.getReplicationTimeoutCount(metricsName)) + .addGauge(Interns.info("numSkipped" + metricsName, + "Number of " + descriptionSegment + " skipped as the container is " + + "already present"), + supervisor.getReplicationSkippedCount(metricsName)); + } + }); + } + Map tasks = supervisor.getInFlightReplicationSummary(); for (Map.Entry entry : tasks.entrySet()) { builder.addGauge(Interns.info("numInflight" + entry.getKey(), diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationTask.java index ca0ca98906c..2168f324c24 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationTask.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationTask.java @@ -65,6 +65,16 @@ protected ReplicationTask( replicator); } + @Override + public String getMetricName() { + return "ContainerReplications"; + } + + @Override + public String getMetricDescriptionSegment() { + return "container replications"; + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java index 44b28bda8b9..ee2ce3ed9b1 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java @@ -457,36 +457,36 @@ public void testMultipleReplication(ContainerLayoutVersion layout, //THEN assertEquals(2, replicationSupervisor.getReplicationSuccessCount()); assertEquals(2, replicationSupervisor.getReplicationSuccessCount( - ReplicationTask.class)); + task1.getMetricName())); assertEquals(1, replicationSupervisor.getReplicationFailureCount()); assertEquals(1, replicationSupervisor.getReplicationFailureCount( - ReplicationTask.class)); + task1.getMetricName())); assertEquals(1, replicationSupervisor.getReplicationSkippedCount()); assertEquals(1, replicationSupervisor.getReplicationSkippedCount( - ReplicationTask.class)); + task1.getMetricName())); assertEquals(1, replicationSupervisor.getReplicationTimeoutCount()); assertEquals(1, replicationSupervisor.getReplicationTimeoutCount( - ReplicationTask.class)); + task1.getMetricName())); assertEquals(5, replicationSupervisor.getReplicationRequestCount()); assertEquals(5, replicationSupervisor.getReplicationRequestCount( - ReplicationTask.class)); + task1.getMetricName())); assertEquals(0, replicationSupervisor.getReplicationRequestCount( - ECReconstructionCoordinatorTask.class)); + task2.getMetricName())); assertEquals(2, ecReconstructionSupervisor.getReplicationSuccessCount()); assertEquals(2, ecReconstructionSupervisor.getReplicationSuccessCount( - ECReconstructionCoordinatorTask.class)); + task2.getMetricName())); assertEquals(1, ecReconstructionSupervisor.getReplicationTimeoutCount()); assertEquals(1, ecReconstructionSupervisor.getReplicationTimeoutCount( - ECReconstructionCoordinatorTask.class)); + task2.getMetricName())); assertEquals(2, ecReconstructionSupervisor.getReplicationFailureCount()); assertEquals(2, ecReconstructionSupervisor.getReplicationFailureCount( - ECReconstructionCoordinatorTask.class)); + task2.getMetricName())); assertEquals(5, ecReconstructionSupervisor.getReplicationRequestCount()); assertEquals(5, ecReconstructionSupervisor.getReplicationRequestCount( - ECReconstructionCoordinatorTask.class)); + task2.getMetricName())); assertEquals(0, ecReconstructionSupervisor.getReplicationRequestCount( - ReplicationTask.class)); + task1.getMetricName())); MetricsCollectorImpl replicationMetricsCollector = new MetricsCollectorImpl(); replicationMetrics.getMetrics(replicationMetricsCollector, true); From d143ace3e1c2d0dbc4ab966cdbbe976d7b4fa4af Mon Sep 17 00:00:00 2001 From: jianghuazhu <740087514@qq.com> Date: Fri, 6 Sep 2024 19:14:28 +0800 Subject: [PATCH 05/10] Fix some checkstyle --- .../ozone/container/replication/ReplicationSupervisor.java | 5 ++--- .../container/replication/ReplicationSupervisorMetrics.java | 4 ++-- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java index 22aecbdaa68..da2dbb88e45 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java @@ -77,8 +77,7 @@ public final class ReplicationSupervisor { private final Map timeoutCounter = new ConcurrentHashMap<>(); private final Map skippedCounter = new ConcurrentHashMap<>(); - @SuppressWarnings({"checkstyle:VisibilityModifier", "checkstyle:StaticVariableName"}) - public static Map METRICS_MAP = new HashMap<>(); + public static final Map metricsMap = new HashMap<>(); /** * A set of container IDs that are currently being downloaded @@ -232,7 +231,7 @@ public void addTask(AbstractReplicationTask task) { failureCounter.put(task.getMetricName(), new AtomicLong(0)); timeoutCounter.put(task.getMetricName(), new AtomicLong(0)); skippedCounter.put(task.getMetricName(), new AtomicLong(0)); - METRICS_MAP.put(task.getMetricName(), task.getMetricDescriptionSegment()); + metricsMap.put(task.getMetricName(), task.getMetricDescriptionSegment()); } } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisorMetrics.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisorMetrics.java index 9322fa75fd8..757e6613934 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisorMetrics.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisorMetrics.java @@ -88,8 +88,8 @@ public void getMetrics(MetricsCollector collector, boolean all) { + "concurrent replication tasks which can run simultaneously"), supervisor.getMaxReplicationStreams()); - if (!ReplicationSupervisor.METRICS_MAP.isEmpty()) { - ReplicationSupervisor.METRICS_MAP.forEach((metricsName, descriptionSegment) -> { + if (!ReplicationSupervisor.metricsMap.isEmpty()) { + ReplicationSupervisor.metricsMap.forEach((metricsName, descriptionSegment) -> { if (!metricsName.equals("")) { builder.addGauge(Interns.info("numRequested" + metricsName, "Number of requested " + descriptionSegment), From ce67b307e1bf9ac9c274363a5eaf38200098ee78 Mon Sep 17 00:00:00 2001 From: jianghuazhu <740087514@qq.com> Date: Fri, 6 Sep 2024 19:36:15 +0800 Subject: [PATCH 06/10] Fix some checkstyle --- .../ozone/container/replication/ReplicationSupervisor.java | 4 ++-- .../container/replication/ReplicationSupervisorMetrics.java | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java index da2dbb88e45..813e4fece53 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java @@ -77,7 +77,7 @@ public final class ReplicationSupervisor { private final Map timeoutCounter = new ConcurrentHashMap<>(); private final Map skippedCounter = new ConcurrentHashMap<>(); - public static final Map metricsMap = new HashMap<>(); + public static final Map METRICS_MAP = new HashMap<>(); /** * A set of container IDs that are currently being downloaded @@ -231,7 +231,7 @@ public void addTask(AbstractReplicationTask task) { failureCounter.put(task.getMetricName(), new AtomicLong(0)); timeoutCounter.put(task.getMetricName(), new AtomicLong(0)); skippedCounter.put(task.getMetricName(), new AtomicLong(0)); - metricsMap.put(task.getMetricName(), task.getMetricDescriptionSegment()); + METRICS_MAP.put(task.getMetricName(), task.getMetricDescriptionSegment()); } } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisorMetrics.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisorMetrics.java index 757e6613934..9322fa75fd8 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisorMetrics.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisorMetrics.java @@ -88,8 +88,8 @@ public void getMetrics(MetricsCollector collector, boolean all) { + "concurrent replication tasks which can run simultaneously"), supervisor.getMaxReplicationStreams()); - if (!ReplicationSupervisor.metricsMap.isEmpty()) { - ReplicationSupervisor.metricsMap.forEach((metricsName, descriptionSegment) -> { + if (!ReplicationSupervisor.METRICS_MAP.isEmpty()) { + ReplicationSupervisor.METRICS_MAP.forEach((metricsName, descriptionSegment) -> { if (!metricsName.equals("")) { builder.addGauge(Interns.info("numRequested" + metricsName, "Number of requested " + descriptionSegment), From f03b420e3d0c691d2ddd67a3c18a8716946c8dc1 Mon Sep 17 00:00:00 2001 From: jianghuazhu <740087514@qq.com> Date: Fri, 6 Sep 2024 20:45:53 +0800 Subject: [PATCH 07/10] Fix some checkstyle --- .../container/replication/ReplicationSupervisor.java | 11 ++++++++++- .../replication/ReplicationSupervisorMetrics.java | 5 +++-- 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java index 813e4fece53..074717e3d2a 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java @@ -26,6 +26,7 @@ import java.util.Objects; import java.util.OptionalLong; import java.util.Set; +import java.util.Collections; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.PriorityBlockingQueue; @@ -77,7 +78,11 @@ public final class ReplicationSupervisor { private final Map timeoutCounter = new ConcurrentHashMap<>(); private final Map skippedCounter = new ConcurrentHashMap<>(); - public static final Map METRICS_MAP = new HashMap<>(); + private static final Map METRICS_MAP; + + static { + METRICS_MAP = new HashMap<>(); + } /** * A set of container IDs that are currently being downloaded @@ -190,6 +195,10 @@ public static Builder newBuilder() { return new Builder(); } + public static Map getMetricsMap() { + return Collections.unmodifiableMap(METRICS_MAP); + } + private ReplicationSupervisor(StateContext context, ExecutorService executor, ReplicationConfig replicationConfig, DatanodeConfiguration datanodeConfig, Clock clock, IntConsumer executorThreadUpdater) { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisorMetrics.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisorMetrics.java index 9322fa75fd8..a1763976af9 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisorMetrics.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisorMetrics.java @@ -88,8 +88,9 @@ public void getMetrics(MetricsCollector collector, boolean all) { + "concurrent replication tasks which can run simultaneously"), supervisor.getMaxReplicationStreams()); - if (!ReplicationSupervisor.METRICS_MAP.isEmpty()) { - ReplicationSupervisor.METRICS_MAP.forEach((metricsName, descriptionSegment) -> { + Map metricsMap = ReplicationSupervisor.getMetricsMap(); + if (!metricsMap.isEmpty()) { + metricsMap.forEach((metricsName, descriptionSegment) -> { if (!metricsName.equals("")) { builder.addGauge(Interns.info("numRequested" + metricsName, "Number of requested " + descriptionSegment), From ae8fc2c7ca9f399b816086b742be53efc0402fdf Mon Sep 17 00:00:00 2001 From: jianghuazhu <740087514@qq.com> Date: Fri, 6 Sep 2024 23:54:01 +0800 Subject: [PATCH 08/10] Improve some abstract methods. --- .../replication/AbstractReplicationTask.java | 8 ++------ .../TestReplicationSupervisor.java | 20 +++++++++++++++++++ 2 files changed, 22 insertions(+), 6 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/AbstractReplicationTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/AbstractReplicationTask.java index fcc4eb31371..f4bf54a3d82 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/AbstractReplicationTask.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/AbstractReplicationTask.java @@ -71,13 +71,9 @@ protected AbstractReplicationTask(long containerID, queued = Instant.now(clock); } - protected String getMetricName() { - return ""; - } + protected abstract String getMetricName(); - protected String getMetricDescriptionSegment() { - return ""; - } + protected abstract String getMetricDescriptionSegment(); public long getContainerId() { return containerId; diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java index ee2ce3ed9b1..ef37c226653 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java @@ -585,6 +585,16 @@ private static class BlockingTask extends AbstractReplicationTask { this.waitForCompleteLatch = waitForCompletion; } + @Override + protected String getMetricName() { + return "Blockings"; + } + + @Override + protected String getMetricDescriptionSegment() { + return "blockings"; + } + @Override public void runTask() { runningLatch.countDown(); @@ -611,6 +621,16 @@ private static class OrderedTask extends AbstractReplicationTask { setPriority(priority); } + @Override + protected String getMetricName() { + return "Ordereds"; + } + + @Override + protected String getMetricDescriptionSegment() { + return "ordereds"; + } + @Override public void runTask() { completeList.add(name); From 06f3db3c812a60ccacd5e69f2e7db53f0916f060 Mon Sep 17 00:00:00 2001 From: jianghuazhu <740087514@qq.com> Date: Sat, 7 Sep 2024 09:12:34 +0800 Subject: [PATCH 09/10] Simplify some code. --- .../ozone/container/replication/ReplicationSupervisor.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java index 074717e3d2a..ec5132562c8 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java @@ -472,9 +472,7 @@ private long getCount(Map counter) { return 0; } AtomicLong total = new AtomicLong(0); - counter.forEach((key, value) -> { - total.set(total.get() + value.get()); - }); + counter.forEach((key, value) -> total.addAndGet(value.get())); return total.get(); } From 8e06209f11ad60ce51c7ba37d224c9d94cef7ba9 Mon Sep 17 00:00:00 2001 From: jianghuazhu <740087514@qq.com> Date: Mon, 9 Sep 2024 20:40:19 +0800 Subject: [PATCH 10/10] Improve some code. --- .../container/replication/ReplicationSupervisor.java | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java index ec5132562c8..92ff4b6d8d6 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java @@ -468,12 +468,11 @@ public long getMaxReplicationStreams() { } private long getCount(Map counter) { - if (counter.isEmpty()) { - return 0; + long total = 0; + for (Map.Entry entry : counter.entrySet()) { + total += entry.getValue().get(); } - AtomicLong total = new AtomicLong(0); - counter.forEach((key, value) -> total.addAndGet(value.get())); - return total.get(); + return total; } public long getReplicationSuccessCount() {