Skip to content

Commit

Permalink
HDDS-10907. DataNode StorageContainerMetrics numWriteChunk is counted…
Browse files Browse the repository at this point in the history
… multiple times (apache#6835)


Co-authored-by: Doroszlai, Attila <adoroszlai@apache.org>
  • Loading branch information
chungen0126 and adoroszlai authored Jul 16, 2024
1 parent 9b29eae commit 63a232b
Show file tree
Hide file tree
Showing 2 changed files with 147 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,6 @@ private ContainerCommandResponseProto dispatchRequest(
long startTime = Time.monotonicNow();
Type cmdType = msg.getCmdType();
long containerID = msg.getContainerID();
metrics.incContainerOpsMetrics(cmdType);
Container container = getContainer(containerID);
boolean isWriteStage =
(cmdType == Type.WriteChunk && dispatcherContext != null
Expand All @@ -228,6 +227,16 @@ private ContainerCommandResponseProto dispatchRequest(
&& dispatcherContext.getStage()
== DispatcherContext.WriteChunkStage.COMMIT_DATA);

if (dispatcherContext == null) {
// increase all op not through ratis
metrics.incContainerOpsMetrics(cmdType);
} else if (isWriteStage) {
// increase WriteChunk in only WRITE_STAGE
metrics.incContainerOpsMetrics(cmdType);
} else if (cmdType != Type.WriteChunk) {
metrics.incContainerOpsMetrics(cmdType);
}

try {
if (DispatcherContext.op(dispatcherContext).validateToken()) {
validateToken(msg);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
package org.apache.hadoop.ozone.container.metrics;

import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.util.List;
import java.util.Map;
import java.util.UUID;

import org.apache.commons.io.FileUtils;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdds.DFSConfigKeysLegacy;
import org.apache.hadoop.hdds.client.BlockID;
Expand All @@ -32,32 +34,47 @@
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.XceiverClientGrpc;
import org.apache.hadoop.hdds.scm.XceiverClientRatis;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.scm.pipeline.MockPipeline;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.RatisTestHelper;
import org.apache.hadoop.ozone.container.ContainerTestHelper;
import org.apache.hadoop.ozone.container.common.ContainerTestUtils;
import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
import org.apache.hadoop.ozone.container.common.interfaces.Handler;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerGrpc;
import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerSpi;
import org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis;
import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil;
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
import org.apache.ozone.test.GenericTestUtils;

import com.google.common.collect.Maps;
import static org.apache.hadoop.hdds.protocol.MockDatanodeDetails.randomDatanodeDetails;
import static org.apache.ozone.test.MetricsAsserts.assertCounter;
import static org.apache.ozone.test.MetricsAsserts.assertQuantileGauges;
import static org.apache.ozone.test.MetricsAsserts.getMetrics;
import static org.apache.ratis.rpc.SupportedRpcType.GRPC;
import static org.junit.jupiter.api.Assertions.assertEquals;

import org.apache.ratis.util.function.CheckedBiConsumer;
import org.apache.ratis.util.function.CheckedBiFunction;
import org.apache.ratis.util.function.CheckedConsumer;
import org.apache.ratis.util.function.CheckedFunction;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.io.TempDir;
Expand All @@ -67,72 +84,126 @@
*/
@Timeout(300)
public class TestContainerMetrics {
static final String TEST_DIR = GenericTestUtils.getRandomizedTempPath() + File.separator;
@TempDir
private Path tempDir;
private static final OzoneConfiguration CONF = new OzoneConfiguration();
private static final int DFS_METRICS_PERCENTILES_INTERVALS = 1;

@BeforeAll
public static void setup() {
DefaultMetricsSystem.setMiniClusterMode(true);
CONF.setInt(DFSConfigKeysLegacy.DFS_METRICS_PERCENTILES_INTERVALS_KEY,
DFS_METRICS_PERCENTILES_INTERVALS);
CONF.setBoolean(OzoneConfigKeys.HDDS_CONTAINER_RATIS_DATASTREAM_ENABLED, false);
CONF.set(OzoneConfigKeys.OZONE_METADATA_DIRS, TEST_DIR);

}

@AfterAll
public static void cleanup() {
// clean up volume dir
File file = new File(TEST_DIR);
if (file.exists()) {
FileUtil.fullyDelete(file);
}
}

@AfterEach
public void cleanUp() throws IOException {
FileUtils.deleteQuietly(new File(CONF.get(ScmConfigKeys.HDDS_DATANODE_DIR_KEY)));
FileUtils.deleteQuietly(CONF.get(OzoneConfigKeys.HDDS_CONTAINER_RATIS_DATANODE_STORAGE_DIR) == null ?
null : new File(CONF.get(OzoneConfigKeys.HDDS_CONTAINER_RATIS_DATANODE_STORAGE_DIR)));
}

@Test
public void testContainerMetrics() throws Exception {
XceiverServerGrpc server = null;
XceiverClientGrpc client = null;
runTestClientServer(pipeline -> CONF
.setInt(OzoneConfigKeys.HDDS_CONTAINER_IPC_PORT,
pipeline.getFirstNode()
.getPort(DatanodeDetails.Port.Name.STANDALONE).getValue()),
pipeline -> new XceiverClientGrpc(pipeline, CONF),
(dn, volumeSet) -> new XceiverServerGrpc(dn, CONF,
createDispatcher(dn, volumeSet), null), (dn, p) -> {
});
}

@Test
public void testContainerMetricsRatis() throws Exception {
runTestClientServer(
pipeline -> RatisTestHelper.initRatisConf(GRPC, CONF),
pipeline -> XceiverClientRatis.newXceiverClientRatis(pipeline, CONF),
this::newXceiverServerRatis, (dn, p) ->
RatisTestHelper.initXceiverServerRatis(GRPC, dn, p));
}

private static MutableVolumeSet createVolumeSet(DatanodeDetails dn, String path) throws IOException {
CONF.set(ScmConfigKeys.HDDS_DATANODE_DIR_KEY, path);
return new MutableVolumeSet(
dn.getUuidString(), CONF,
null, StorageVolume.VolumeType.DATA_VOLUME, null);
}

private HddsDispatcher createDispatcher(DatanodeDetails dd, VolumeSet volumeSet) {
ContainerSet containerSet = new ContainerSet(1000);
StateContext context = ContainerTestUtils.getMockContext(
dd, CONF);
ContainerMetrics metrics = ContainerMetrics.create(CONF);
Map<ContainerProtos.ContainerType, Handler> handlers = Maps.newHashMap();
for (ContainerProtos.ContainerType containerType :
ContainerProtos.ContainerType.values()) {
handlers.put(containerType,
Handler.getHandlerForContainerType(containerType, CONF,
context.getParent().getDatanodeDetails().getUuidString(),
containerSet, volumeSet, metrics,
c -> { }));
}
HddsDispatcher dispatcher = new HddsDispatcher(CONF, containerSet,
volumeSet, handlers, context, metrics, null);
StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList())
.forEach(hddsVolume -> hddsVolume.setDbParentDir(tempDir.toFile()));
dispatcher.setClusterId(UUID.randomUUID().toString());
return dispatcher;
}

static void runTestClientServer(
CheckedConsumer<Pipeline, IOException> initConf,
CheckedFunction<Pipeline, XceiverClientSpi,
IOException> createClient,
CheckedBiFunction<DatanodeDetails, MutableVolumeSet, XceiverServerSpi,
IOException> createServer,
CheckedBiConsumer<DatanodeDetails, Pipeline, IOException> initServer)
throws Exception {
XceiverServerSpi server = null;
XceiverClientSpi client = null;
long containerID = ContainerTestHelper.getTestContainerID();
String path = GenericTestUtils.getRandomizedTempPath();
MutableVolumeSet volumeSet = null;

try {
final int interval = 1;
Pipeline pipeline = MockPipeline
.createSingleNodePipeline();
OzoneConfiguration conf = new OzoneConfiguration();
conf.setInt(OzoneConfigKeys.HDDS_CONTAINER_IPC_PORT,
pipeline.getFirstNode()
.getPort(DatanodeDetails.Port.Name.STANDALONE).getValue());
conf.setInt(DFSConfigKeysLegacy.DFS_METRICS_PERCENTILES_INTERVALS_KEY,
interval);

DatanodeDetails datanodeDetails = randomDatanodeDetails();
conf.set(ScmConfigKeys.HDDS_DATANODE_DIR_KEY, path);
conf.set(OzoneConfigKeys.OZONE_METADATA_DIRS, path);
VolumeSet volumeSet = new MutableVolumeSet(
datanodeDetails.getUuidString(), conf,
null, StorageVolume.VolumeType.DATA_VOLUME, null);
ContainerSet containerSet = new ContainerSet(1000);
StateContext context = ContainerTestUtils.getMockContext(
datanodeDetails, conf);
ContainerMetrics metrics = ContainerMetrics.create(conf);
Map<ContainerProtos.ContainerType, Handler> handlers = Maps.newHashMap();
for (ContainerProtos.ContainerType containerType :
ContainerProtos.ContainerType.values()) {
handlers.put(containerType,
Handler.getHandlerForContainerType(containerType, conf,
context.getParent().getDatanodeDetails().getUuidString(),
containerSet, volumeSet, metrics,
c -> { }));
}
HddsDispatcher dispatcher = new HddsDispatcher(conf, containerSet,
volumeSet, handlers, context, metrics, null);
StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList())
.forEach(hddsVolume -> hddsVolume.setDbParentDir(tempDir.toFile()));
dispatcher.setClusterId(UUID.randomUUID().toString());

server = new XceiverServerGrpc(datanodeDetails, conf, dispatcher, null);
client = new XceiverClientGrpc(pipeline, conf);
final Pipeline pipeline =
MockPipeline.createSingleNodePipeline();
initConf.accept(pipeline);

DatanodeDetails dn = pipeline.getFirstNode();
volumeSet = createVolumeSet(dn, TEST_DIR + dn.getUuidString());
server = createServer.apply(dn, volumeSet);
server.start();
initServer.accept(dn, pipeline);

client = createClient.apply(pipeline);
client.connect();

// Write Chunk
BlockID blockID = ContainerTestHelper.getTestBlockID(containerID);
ContainerTestHelper.getWriteChunkRequest(
pipeline, blockID, 1024);
ContainerProtos.ContainerCommandRequestProto writeChunkRequest =
final ContainerProtos.ContainerCommandRequestProto writeChunkRequest =
ContainerTestHelper.getWriteChunkRequest(
pipeline, blockID, 1024);
ContainerCommandResponseProto response =
client.sendCommand(writeChunkRequest);
ContainerCommandResponseProto response = client.sendCommand(writeChunkRequest);
assertEquals(ContainerProtos.Result.SUCCESS,
response.getResult());

//Read Chunk
ContainerProtos.ContainerCommandRequestProto readChunkRequest =
final ContainerProtos.ContainerCommandRequestProto readChunkRequest =
ContainerTestHelper.getReadChunkRequest(pipeline, writeChunkRequest
.getWriteChunk());
response = client.sendCommand(readChunkRequest);
Expand All @@ -147,11 +218,10 @@ public void testContainerMetrics() throws Exception {
assertCounter("bytesWriteChunk", 1024L, containerMetrics);
assertCounter("bytesReadChunk", 1024L, containerMetrics);

String sec = interval + "s";
Thread.sleep((interval + 1) * 1000);
String sec = DFS_METRICS_PERCENTILES_INTERVALS + "s";
Thread.sleep((DFS_METRICS_PERCENTILES_INTERVALS + 1) * 1000);
assertQuantileGauges("WriteChunkNanos" + sec, containerMetrics);

// Check VolumeIOStats metrics
List<HddsVolume> volumes =
StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList());
HddsVolume hddsVolume = volumes.get(0);
Expand All @@ -161,19 +231,30 @@ public void testContainerMetrics() throws Exception {
assertCounter("ReadOpCount", 1L, volumeIOMetrics);
assertCounter("WriteBytes", 1024L, volumeIOMetrics);
assertCounter("WriteOpCount", 1L, volumeIOMetrics);

} finally {
ContainerMetrics.remove();
if (volumeSet != null) {
volumeSet.shutdown();
}
if (client != null) {
client.close();
}
if (server != null) {
server.stop();
}
// clean up volume dir
File file = new File(path);
if (file.exists()) {
FileUtil.fullyDelete(file);
}
}
}

private XceiverServerSpi newXceiverServerRatis(DatanodeDetails dn, MutableVolumeSet volumeSet)
throws IOException {
CONF.setInt(OzoneConfigKeys.HDDS_CONTAINER_RATIS_IPC_PORT,
dn.getPort(DatanodeDetails.Port.Name.RATIS).getValue());
final String dir = TEST_DIR + dn.getUuid();
CONF.set(OzoneConfigKeys.HDDS_CONTAINER_RATIS_DATANODE_STORAGE_DIR, dir);
final ContainerDispatcher dispatcher = createDispatcher(dn,
volumeSet);
return XceiverServerRatis.newXceiverServerRatis(dn, CONF, dispatcher,
new ContainerController(new ContainerSet(1000), Maps.newHashMap()),
null, null);
}
}

0 comments on commit 63a232b

Please sign in to comment.