Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HDDS-10907. DataNode StorageContainerMetrics numWriteChunk is counted multiple times #6835

Merged
merged 7 commits into from
Jul 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,6 @@ private ContainerCommandResponseProto dispatchRequest(
long startTime = Time.monotonicNow();
Type cmdType = msg.getCmdType();
long containerID = msg.getContainerID();
metrics.incContainerOpsMetrics(cmdType);
Container container = getContainer(containerID);
boolean isWriteStage =
(cmdType == Type.WriteChunk && dispatcherContext != null
Expand All @@ -228,6 +227,16 @@ private ContainerCommandResponseProto dispatchRequest(
&& dispatcherContext.getStage()
== DispatcherContext.WriteChunkStage.COMMIT_DATA);

if (dispatcherContext == null) {
// increase all op not through ratis
metrics.incContainerOpsMetrics(cmdType);
} else if (isWriteStage) {
// increase WriteChunk in only WRITE_STAGE
metrics.incContainerOpsMetrics(cmdType);
} else if (cmdType != Type.WriteChunk) {
metrics.incContainerOpsMetrics(cmdType);
}

try {
if (DispatcherContext.op(dispatcherContext).validateToken()) {
validateToken(msg);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
package org.apache.hadoop.ozone.container.metrics;

import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.util.List;
import java.util.Map;
import java.util.UUID;

import org.apache.commons.io.FileUtils;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdds.DFSConfigKeysLegacy;
import org.apache.hadoop.hdds.client.BlockID;
Expand All @@ -32,32 +34,47 @@
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.XceiverClientGrpc;
import org.apache.hadoop.hdds.scm.XceiverClientRatis;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.scm.pipeline.MockPipeline;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.RatisTestHelper;
import org.apache.hadoop.ozone.container.ContainerTestHelper;
import org.apache.hadoop.ozone.container.common.ContainerTestUtils;
import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
import org.apache.hadoop.ozone.container.common.interfaces.Handler;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerGrpc;
import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerSpi;
import org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis;
import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil;
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
import org.apache.ozone.test.GenericTestUtils;

import com.google.common.collect.Maps;
import static org.apache.hadoop.hdds.protocol.MockDatanodeDetails.randomDatanodeDetails;
import static org.apache.ozone.test.MetricsAsserts.assertCounter;
import static org.apache.ozone.test.MetricsAsserts.assertQuantileGauges;
import static org.apache.ozone.test.MetricsAsserts.getMetrics;
import static org.apache.ratis.rpc.SupportedRpcType.GRPC;
import static org.junit.jupiter.api.Assertions.assertEquals;

import org.apache.ratis.util.function.CheckedBiConsumer;
import org.apache.ratis.util.function.CheckedBiFunction;
import org.apache.ratis.util.function.CheckedConsumer;
import org.apache.ratis.util.function.CheckedFunction;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.io.TempDir;
Expand All @@ -67,72 +84,126 @@
*/
@Timeout(300)
public class TestContainerMetrics {
static final String TEST_DIR = GenericTestUtils.getRandomizedTempPath() + File.separator;
@TempDir
private Path tempDir;
private static final OzoneConfiguration CONF = new OzoneConfiguration();
private static final int DFS_METRICS_PERCENTILES_INTERVALS = 1;

@BeforeAll
public static void setup() {
DefaultMetricsSystem.setMiniClusterMode(true);
CONF.setInt(DFSConfigKeysLegacy.DFS_METRICS_PERCENTILES_INTERVALS_KEY,
DFS_METRICS_PERCENTILES_INTERVALS);
CONF.setBoolean(OzoneConfigKeys.HDDS_CONTAINER_RATIS_DATASTREAM_ENABLED, false);
CONF.set(OzoneConfigKeys.OZONE_METADATA_DIRS, TEST_DIR);

}

@AfterAll
public static void cleanup() {
// clean up volume dir
File file = new File(TEST_DIR);
if (file.exists()) {
FileUtil.fullyDelete(file);
}
}

@AfterEach
public void cleanUp() throws IOException {
FileUtils.deleteQuietly(new File(CONF.get(ScmConfigKeys.HDDS_DATANODE_DIR_KEY)));
FileUtils.deleteQuietly(CONF.get(OzoneConfigKeys.HDDS_CONTAINER_RATIS_DATANODE_STORAGE_DIR) == null ?
null : new File(CONF.get(OzoneConfigKeys.HDDS_CONTAINER_RATIS_DATANODE_STORAGE_DIR)));
}

@Test
public void testContainerMetrics() throws Exception {
XceiverServerGrpc server = null;
XceiverClientGrpc client = null;
runTestClientServer(pipeline -> CONF
.setInt(OzoneConfigKeys.HDDS_CONTAINER_IPC_PORT,
pipeline.getFirstNode()
.getPort(DatanodeDetails.Port.Name.STANDALONE).getValue()),
pipeline -> new XceiverClientGrpc(pipeline, CONF),
(dn, volumeSet) -> new XceiverServerGrpc(dn, CONF,
createDispatcher(dn, volumeSet), null), (dn, p) -> {
});
}

@Test
public void testContainerMetricsRatis() throws Exception {
runTestClientServer(
pipeline -> RatisTestHelper.initRatisConf(GRPC, CONF),
pipeline -> XceiverClientRatis.newXceiverClientRatis(pipeline, CONF),
this::newXceiverServerRatis, (dn, p) ->
RatisTestHelper.initXceiverServerRatis(GRPC, dn, p));
}

private static MutableVolumeSet createVolumeSet(DatanodeDetails dn, String path) throws IOException {
CONF.set(ScmConfigKeys.HDDS_DATANODE_DIR_KEY, path);
return new MutableVolumeSet(
dn.getUuidString(), CONF,
null, StorageVolume.VolumeType.DATA_VOLUME, null);
}

private HddsDispatcher createDispatcher(DatanodeDetails dd, VolumeSet volumeSet) {
ContainerSet containerSet = new ContainerSet(1000);
StateContext context = ContainerTestUtils.getMockContext(
dd, CONF);
ContainerMetrics metrics = ContainerMetrics.create(CONF);
Map<ContainerProtos.ContainerType, Handler> handlers = Maps.newHashMap();
for (ContainerProtos.ContainerType containerType :
ContainerProtos.ContainerType.values()) {
handlers.put(containerType,
Handler.getHandlerForContainerType(containerType, CONF,
context.getParent().getDatanodeDetails().getUuidString(),
containerSet, volumeSet, metrics,
c -> { }));
}
HddsDispatcher dispatcher = new HddsDispatcher(CONF, containerSet,
volumeSet, handlers, context, metrics, null);
StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList())
.forEach(hddsVolume -> hddsVolume.setDbParentDir(tempDir.toFile()));
dispatcher.setClusterId(UUID.randomUUID().toString());
return dispatcher;
}

static void runTestClientServer(
CheckedConsumer<Pipeline, IOException> initConf,
CheckedFunction<Pipeline, XceiverClientSpi,
IOException> createClient,
CheckedBiFunction<DatanodeDetails, MutableVolumeSet, XceiverServerSpi,
IOException> createServer,
CheckedBiConsumer<DatanodeDetails, Pipeline, IOException> initServer)
throws Exception {
XceiverServerSpi server = null;
XceiverClientSpi client = null;
long containerID = ContainerTestHelper.getTestContainerID();
String path = GenericTestUtils.getRandomizedTempPath();
MutableVolumeSet volumeSet = null;

try {
final int interval = 1;
Pipeline pipeline = MockPipeline
.createSingleNodePipeline();
OzoneConfiguration conf = new OzoneConfiguration();
conf.setInt(OzoneConfigKeys.HDDS_CONTAINER_IPC_PORT,
pipeline.getFirstNode()
.getPort(DatanodeDetails.Port.Name.STANDALONE).getValue());
conf.setInt(DFSConfigKeysLegacy.DFS_METRICS_PERCENTILES_INTERVALS_KEY,
interval);

DatanodeDetails datanodeDetails = randomDatanodeDetails();
conf.set(ScmConfigKeys.HDDS_DATANODE_DIR_KEY, path);
conf.set(OzoneConfigKeys.OZONE_METADATA_DIRS, path);
VolumeSet volumeSet = new MutableVolumeSet(
datanodeDetails.getUuidString(), conf,
null, StorageVolume.VolumeType.DATA_VOLUME, null);
ContainerSet containerSet = new ContainerSet(1000);
StateContext context = ContainerTestUtils.getMockContext(
datanodeDetails, conf);
ContainerMetrics metrics = ContainerMetrics.create(conf);
Map<ContainerProtos.ContainerType, Handler> handlers = Maps.newHashMap();
for (ContainerProtos.ContainerType containerType :
ContainerProtos.ContainerType.values()) {
handlers.put(containerType,
Handler.getHandlerForContainerType(containerType, conf,
context.getParent().getDatanodeDetails().getUuidString(),
containerSet, volumeSet, metrics,
c -> { }));
}
HddsDispatcher dispatcher = new HddsDispatcher(conf, containerSet,
volumeSet, handlers, context, metrics, null);
StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList())
.forEach(hddsVolume -> hddsVolume.setDbParentDir(tempDir.toFile()));
dispatcher.setClusterId(UUID.randomUUID().toString());

server = new XceiverServerGrpc(datanodeDetails, conf, dispatcher, null);
client = new XceiverClientGrpc(pipeline, conf);
final Pipeline pipeline =
MockPipeline.createSingleNodePipeline();
initConf.accept(pipeline);

DatanodeDetails dn = pipeline.getFirstNode();
volumeSet = createVolumeSet(dn, TEST_DIR + dn.getUuidString());
server = createServer.apply(dn, volumeSet);
server.start();
initServer.accept(dn, pipeline);

client = createClient.apply(pipeline);
client.connect();

// Write Chunk
BlockID blockID = ContainerTestHelper.getTestBlockID(containerID);
ContainerTestHelper.getWriteChunkRequest(
pipeline, blockID, 1024);
ContainerProtos.ContainerCommandRequestProto writeChunkRequest =
final ContainerProtos.ContainerCommandRequestProto writeChunkRequest =
ContainerTestHelper.getWriteChunkRequest(
pipeline, blockID, 1024);
ContainerCommandResponseProto response =
client.sendCommand(writeChunkRequest);
ContainerCommandResponseProto response = client.sendCommand(writeChunkRequest);
assertEquals(ContainerProtos.Result.SUCCESS,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should check the related metrics in containerMetrics are correct. In specific, make sure numWriteChunk is expected.

Copy link
Contributor Author

@chungen0126 chungen0126 Jun 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here are some assertions for containerMetrics:

assertCounter("NumOps", 3L, containerMetrics);
assertCounter("numCreateContainer", 1L, containerMetrics);
assertCounter("numWriteChunk", 1L, containerMetrics);
assertCounter("numReadChunk", 1L, containerMetrics);

Is it enough?

response.getResult());

//Read Chunk
ContainerProtos.ContainerCommandRequestProto readChunkRequest =
final ContainerProtos.ContainerCommandRequestProto readChunkRequest =
ContainerTestHelper.getReadChunkRequest(pipeline, writeChunkRequest
.getWriteChunk());
response = client.sendCommand(readChunkRequest);
Expand All @@ -147,11 +218,10 @@ public void testContainerMetrics() throws Exception {
assertCounter("bytesWriteChunk", 1024L, containerMetrics);
assertCounter("bytesReadChunk", 1024L, containerMetrics);

String sec = interval + "s";
Thread.sleep((interval + 1) * 1000);
String sec = DFS_METRICS_PERCENTILES_INTERVALS + "s";
Thread.sleep((DFS_METRICS_PERCENTILES_INTERVALS + 1) * 1000);
assertQuantileGauges("WriteChunkNanos" + sec, containerMetrics);

// Check VolumeIOStats metrics
List<HddsVolume> volumes =
StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList());
HddsVolume hddsVolume = volumes.get(0);
Expand All @@ -161,19 +231,30 @@ public void testContainerMetrics() throws Exception {
assertCounter("ReadOpCount", 1L, volumeIOMetrics);
assertCounter("WriteBytes", 1024L, volumeIOMetrics);
assertCounter("WriteOpCount", 1L, volumeIOMetrics);

} finally {
ContainerMetrics.remove();
if (volumeSet != null) {
volumeSet.shutdown();
}
if (client != null) {
client.close();
}
if (server != null) {
server.stop();
}
// clean up volume dir
File file = new File(path);
if (file.exists()) {
FileUtil.fullyDelete(file);
}
}
}

private XceiverServerSpi newXceiverServerRatis(DatanodeDetails dn, MutableVolumeSet volumeSet)
throws IOException {
CONF.setInt(OzoneConfigKeys.HDDS_CONTAINER_RATIS_IPC_PORT,
dn.getPort(DatanodeDetails.Port.Name.RATIS).getValue());
final String dir = TEST_DIR + dn.getUuid();
CONF.set(OzoneConfigKeys.HDDS_CONTAINER_RATIS_DATANODE_STORAGE_DIR, dir);
final ContainerDispatcher dispatcher = createDispatcher(dn,
volumeSet);
return XceiverServerRatis.newXceiverServerRatis(dn, CONF, dispatcher,
new ContainerController(new ContainerSet(1000), Maps.newHashMap()),
null, null);
}
}