diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java index e494243ccc1..b47116950c5 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java @@ -216,7 +216,6 @@ private ContainerCommandResponseProto dispatchRequest( long startTime = Time.monotonicNow(); Type cmdType = msg.getCmdType(); long containerID = msg.getContainerID(); - metrics.incContainerOpsMetrics(cmdType); Container container = getContainer(containerID); boolean isWriteStage = (cmdType == Type.WriteChunk && dispatcherContext != null @@ -228,6 +227,16 @@ private ContainerCommandResponseProto dispatchRequest( && dispatcherContext.getStage() == DispatcherContext.WriteChunkStage.COMMIT_DATA); + if (dispatcherContext == null) { + // increase all op not through ratis + metrics.incContainerOpsMetrics(cmdType); + } else if (isWriteStage) { + // increase WriteChunk in only WRITE_STAGE + metrics.incContainerOpsMetrics(cmdType); + } else if (cmdType != Type.WriteChunk) { + metrics.incContainerOpsMetrics(cmdType); + } + try { if (DispatcherContext.op(dispatcherContext).validateToken()) { validateToken(msg); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/metrics/TestContainerMetrics.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/metrics/TestContainerMetrics.java index a4a5701f549..068cb01a967 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/metrics/TestContainerMetrics.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/metrics/TestContainerMetrics.java @@ -18,11 +18,13 @@ package org.apache.hadoop.ozone.container.metrics; import java.io.File; +import java.io.IOException; import java.nio.file.Path; import java.util.List; import java.util.Map; import java.util.UUID; +import org.apache.commons.io.FileUtils; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.hdds.DFSConfigKeysLegacy; import org.apache.hadoop.hdds.client.BlockID; @@ -32,32 +34,47 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.XceiverClientGrpc; +import org.apache.hadoop.hdds.scm.XceiverClientRatis; +import org.apache.hadoop.hdds.scm.XceiverClientSpi; import org.apache.hadoop.hdds.scm.pipeline.MockPipeline; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.metrics2.MetricsRecordBuilder; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.RatisTestHelper; import org.apache.hadoop.ozone.container.ContainerTestHelper; import org.apache.hadoop.ozone.container.common.ContainerTestUtils; import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics; import org.apache.hadoop.ozone.container.common.impl.ContainerSet; import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher; +import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; import org.apache.hadoop.ozone.container.common.interfaces.Handler; import org.apache.hadoop.ozone.container.common.statemachine.StateContext; import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerGrpc; +import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerSpi; +import org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis; import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil; import org.apache.hadoop.ozone.container.common.volume.HddsVolume; import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet; import org.apache.hadoop.ozone.container.common.volume.StorageVolume; import org.apache.hadoop.ozone.container.common.volume.VolumeSet; +import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController; import org.apache.ozone.test.GenericTestUtils; import com.google.common.collect.Maps; -import static org.apache.hadoop.hdds.protocol.MockDatanodeDetails.randomDatanodeDetails; import static org.apache.ozone.test.MetricsAsserts.assertCounter; import static org.apache.ozone.test.MetricsAsserts.assertQuantileGauges; import static org.apache.ozone.test.MetricsAsserts.getMetrics; +import static org.apache.ratis.rpc.SupportedRpcType.GRPC; import static org.junit.jupiter.api.Assertions.assertEquals; +import org.apache.ratis.util.function.CheckedBiConsumer; +import org.apache.ratis.util.function.CheckedBiFunction; +import org.apache.ratis.util.function.CheckedConsumer; +import org.apache.ratis.util.function.CheckedFunction; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; import org.junit.jupiter.api.io.TempDir; @@ -67,72 +84,126 @@ */ @Timeout(300) public class TestContainerMetrics { + static final String TEST_DIR = GenericTestUtils.getRandomizedTempPath() + File.separator; @TempDir private Path tempDir; + private static final OzoneConfiguration CONF = new OzoneConfiguration(); + private static final int DFS_METRICS_PERCENTILES_INTERVALS = 1; + + @BeforeAll + public static void setup() { + DefaultMetricsSystem.setMiniClusterMode(true); + CONF.setInt(DFSConfigKeysLegacy.DFS_METRICS_PERCENTILES_INTERVALS_KEY, + DFS_METRICS_PERCENTILES_INTERVALS); + CONF.setBoolean(OzoneConfigKeys.HDDS_CONTAINER_RATIS_DATASTREAM_ENABLED, false); + CONF.set(OzoneConfigKeys.OZONE_METADATA_DIRS, TEST_DIR); + + } + + @AfterAll + public static void cleanup() { + // clean up volume dir + File file = new File(TEST_DIR); + if (file.exists()) { + FileUtil.fullyDelete(file); + } + } + + @AfterEach + public void cleanUp() throws IOException { + FileUtils.deleteQuietly(new File(CONF.get(ScmConfigKeys.HDDS_DATANODE_DIR_KEY))); + FileUtils.deleteQuietly(CONF.get(OzoneConfigKeys.HDDS_CONTAINER_RATIS_DATANODE_STORAGE_DIR) == null ? + null : new File(CONF.get(OzoneConfigKeys.HDDS_CONTAINER_RATIS_DATANODE_STORAGE_DIR))); + } @Test public void testContainerMetrics() throws Exception { - XceiverServerGrpc server = null; - XceiverClientGrpc client = null; + runTestClientServer(pipeline -> CONF + .setInt(OzoneConfigKeys.HDDS_CONTAINER_IPC_PORT, + pipeline.getFirstNode() + .getPort(DatanodeDetails.Port.Name.STANDALONE).getValue()), + pipeline -> new XceiverClientGrpc(pipeline, CONF), + (dn, volumeSet) -> new XceiverServerGrpc(dn, CONF, + createDispatcher(dn, volumeSet), null), (dn, p) -> { + }); + } + + @Test + public void testContainerMetricsRatis() throws Exception { + runTestClientServer( + pipeline -> RatisTestHelper.initRatisConf(GRPC, CONF), + pipeline -> XceiverClientRatis.newXceiverClientRatis(pipeline, CONF), + this::newXceiverServerRatis, (dn, p) -> + RatisTestHelper.initXceiverServerRatis(GRPC, dn, p)); + } + + private static MutableVolumeSet createVolumeSet(DatanodeDetails dn, String path) throws IOException { + CONF.set(ScmConfigKeys.HDDS_DATANODE_DIR_KEY, path); + return new MutableVolumeSet( + dn.getUuidString(), CONF, + null, StorageVolume.VolumeType.DATA_VOLUME, null); + } + + private HddsDispatcher createDispatcher(DatanodeDetails dd, VolumeSet volumeSet) { + ContainerSet containerSet = new ContainerSet(1000); + StateContext context = ContainerTestUtils.getMockContext( + dd, CONF); + ContainerMetrics metrics = ContainerMetrics.create(CONF); + Map handlers = Maps.newHashMap(); + for (ContainerProtos.ContainerType containerType : + ContainerProtos.ContainerType.values()) { + handlers.put(containerType, + Handler.getHandlerForContainerType(containerType, CONF, + context.getParent().getDatanodeDetails().getUuidString(), + containerSet, volumeSet, metrics, + c -> { })); + } + HddsDispatcher dispatcher = new HddsDispatcher(CONF, containerSet, + volumeSet, handlers, context, metrics, null); + StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList()) + .forEach(hddsVolume -> hddsVolume.setDbParentDir(tempDir.toFile())); + dispatcher.setClusterId(UUID.randomUUID().toString()); + return dispatcher; + } + + static void runTestClientServer( + CheckedConsumer initConf, + CheckedFunction createClient, + CheckedBiFunction createServer, + CheckedBiConsumer initServer) + throws Exception { + XceiverServerSpi server = null; + XceiverClientSpi client = null; long containerID = ContainerTestHelper.getTestContainerID(); - String path = GenericTestUtils.getRandomizedTempPath(); + MutableVolumeSet volumeSet = null; try { - final int interval = 1; - Pipeline pipeline = MockPipeline - .createSingleNodePipeline(); - OzoneConfiguration conf = new OzoneConfiguration(); - conf.setInt(OzoneConfigKeys.HDDS_CONTAINER_IPC_PORT, - pipeline.getFirstNode() - .getPort(DatanodeDetails.Port.Name.STANDALONE).getValue()); - conf.setInt(DFSConfigKeysLegacy.DFS_METRICS_PERCENTILES_INTERVALS_KEY, - interval); - - DatanodeDetails datanodeDetails = randomDatanodeDetails(); - conf.set(ScmConfigKeys.HDDS_DATANODE_DIR_KEY, path); - conf.set(OzoneConfigKeys.OZONE_METADATA_DIRS, path); - VolumeSet volumeSet = new MutableVolumeSet( - datanodeDetails.getUuidString(), conf, - null, StorageVolume.VolumeType.DATA_VOLUME, null); - ContainerSet containerSet = new ContainerSet(1000); - StateContext context = ContainerTestUtils.getMockContext( - datanodeDetails, conf); - ContainerMetrics metrics = ContainerMetrics.create(conf); - Map handlers = Maps.newHashMap(); - for (ContainerProtos.ContainerType containerType : - ContainerProtos.ContainerType.values()) { - handlers.put(containerType, - Handler.getHandlerForContainerType(containerType, conf, - context.getParent().getDatanodeDetails().getUuidString(), - containerSet, volumeSet, metrics, - c -> { })); - } - HddsDispatcher dispatcher = new HddsDispatcher(conf, containerSet, - volumeSet, handlers, context, metrics, null); - StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList()) - .forEach(hddsVolume -> hddsVolume.setDbParentDir(tempDir.toFile())); - dispatcher.setClusterId(UUID.randomUUID().toString()); - - server = new XceiverServerGrpc(datanodeDetails, conf, dispatcher, null); - client = new XceiverClientGrpc(pipeline, conf); + final Pipeline pipeline = + MockPipeline.createSingleNodePipeline(); + initConf.accept(pipeline); + DatanodeDetails dn = pipeline.getFirstNode(); + volumeSet = createVolumeSet(dn, TEST_DIR + dn.getUuidString()); + server = createServer.apply(dn, volumeSet); server.start(); + initServer.accept(dn, pipeline); + + client = createClient.apply(pipeline); client.connect(); // Write Chunk BlockID blockID = ContainerTestHelper.getTestBlockID(containerID); - ContainerTestHelper.getWriteChunkRequest( - pipeline, blockID, 1024); - ContainerProtos.ContainerCommandRequestProto writeChunkRequest = + final ContainerProtos.ContainerCommandRequestProto writeChunkRequest = ContainerTestHelper.getWriteChunkRequest( pipeline, blockID, 1024); - ContainerCommandResponseProto response = - client.sendCommand(writeChunkRequest); + ContainerCommandResponseProto response = client.sendCommand(writeChunkRequest); assertEquals(ContainerProtos.Result.SUCCESS, response.getResult()); //Read Chunk - ContainerProtos.ContainerCommandRequestProto readChunkRequest = + final ContainerProtos.ContainerCommandRequestProto readChunkRequest = ContainerTestHelper.getReadChunkRequest(pipeline, writeChunkRequest .getWriteChunk()); response = client.sendCommand(readChunkRequest); @@ -147,11 +218,10 @@ public void testContainerMetrics() throws Exception { assertCounter("bytesWriteChunk", 1024L, containerMetrics); assertCounter("bytesReadChunk", 1024L, containerMetrics); - String sec = interval + "s"; - Thread.sleep((interval + 1) * 1000); + String sec = DFS_METRICS_PERCENTILES_INTERVALS + "s"; + Thread.sleep((DFS_METRICS_PERCENTILES_INTERVALS + 1) * 1000); assertQuantileGauges("WriteChunkNanos" + sec, containerMetrics); - // Check VolumeIOStats metrics List volumes = StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList()); HddsVolume hddsVolume = volumes.get(0); @@ -161,19 +231,30 @@ public void testContainerMetrics() throws Exception { assertCounter("ReadOpCount", 1L, volumeIOMetrics); assertCounter("WriteBytes", 1024L, volumeIOMetrics); assertCounter("WriteOpCount", 1L, volumeIOMetrics); - } finally { + ContainerMetrics.remove(); + if (volumeSet != null) { + volumeSet.shutdown(); + } if (client != null) { client.close(); } if (server != null) { server.stop(); } - // clean up volume dir - File file = new File(path); - if (file.exists()) { - FileUtil.fullyDelete(file); - } } } + + private XceiverServerSpi newXceiverServerRatis(DatanodeDetails dn, MutableVolumeSet volumeSet) + throws IOException { + CONF.setInt(OzoneConfigKeys.HDDS_CONTAINER_RATIS_IPC_PORT, + dn.getPort(DatanodeDetails.Port.Name.RATIS).getValue()); + final String dir = TEST_DIR + dn.getUuid(); + CONF.set(OzoneConfigKeys.HDDS_CONTAINER_RATIS_DATANODE_STORAGE_DIR, dir); + final ContainerDispatcher dispatcher = createDispatcher(dn, + volumeSet); + return XceiverServerRatis.newXceiverServerRatis(dn, CONF, dispatcher, + new ContainerController(new ContainerSet(1000), Maps.newHashMap()), + null, null); + } }