Skip to content

Commit

Permalink
HDDS-10376. Add a Datanode API to supply a merkle tree for a given co…
Browse files Browse the repository at this point in the history
…ntainer. (#6945)
  • Loading branch information
aswinshakil authored Aug 7, 2024
1 parent d7f302e commit ab35173
Show file tree
Hide file tree
Showing 14 changed files with 578 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,17 @@ public static ContainerCommandResponseProto getEchoResponse(
.build();
}

public static ContainerCommandResponseProto getGetContainerMerkleTreeResponse(
ContainerCommandRequestProto request, ByteString checksumTree) {

ContainerProtos.GetContainerMerkleTreeResponseProto.Builder containerMerkleTree =
ContainerProtos.GetContainerMerkleTreeResponseProto.newBuilder()
.setContainerID(request.getContainerID())
.setContainerMerkleTree(checksumTree);
return getSuccessResponseBuilder(request)
.setGetContainerMerkleTree(containerMerkleTree).build();
}

private ContainerCommandResponseBuilders() {
throw new UnsupportedOperationException("no instances");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -726,6 +726,40 @@ public static EchoResponseProto echo(XceiverClientSpi client, String encodedCont
return response.getEcho();
}

/**
* Gets the Container merkle tree for a container from a datanode.
* @param client - client that communicates with the container
* @param containerID - Container Id of the container
* @param encodedContainerID - Encoded token if security is enabled
*/
public static ContainerProtos.GetContainerMerkleTreeResponseProto getContainerMerkleTree(
XceiverClientSpi client, long containerID, String encodedContainerID) throws IOException {
ContainerProtos.GetContainerMerkleTreeRequestProto containerMerkleTreeRequestProto =
ContainerProtos.GetContainerMerkleTreeRequestProto
.newBuilder()
.setContainerID(containerID)
.build();
String id = client.getPipeline().getClosestNode().getUuidString();

ContainerCommandRequestProto.Builder builder = ContainerCommandRequestProto
.newBuilder()
.setCmdType(Type.GetContainerMerkleTree)
.setContainerID(containerID)
.setDatanodeUuid(id)
.setGetContainerMerkleTree(containerMerkleTreeRequestProto);
if (encodedContainerID != null) {
builder.setEncodedToken(encodedContainerID);
}
String traceId = TracingUtil.exportCurrentSpan();
if (traceId != null) {
builder.setTraceID(traceId);
}
ContainerCommandRequestProto request = builder.build();
ContainerCommandResponseProto response =
client.sendCommand(request, getValidatorList());
return response.getGetContainerMerkleTree();
}

/**
* Validates a response from a container protocol call. Any non-successful
* return code is mapped to a corresponding exception and thrown.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ public enum DNAction implements AuditAction {
CLOSE_CONTAINER,
GET_COMMITTED_BLOCK_LENGTH,
STREAM_INIT,
ECHO;
ECHO,
GET_CONTAINER_MERKLE_TREE;

@Override
public String getAction() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Collection;
Expand All @@ -35,6 +36,7 @@

import com.google.common.util.concurrent.Striped;
import org.apache.hadoop.hdds.utils.SimpleStriped;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -175,6 +177,30 @@ private void write(ContainerData data, ContainerProtos.ContainerChecksumInfo che
}
}

public ByteString getContainerChecksumInfo(KeyValueContainerData data)
throws IOException {
long containerID = data.getContainerID();
Lock readLock = getReadLock(containerID);
readLock.lock();
try {
File checksumFile = getContainerChecksumFile(data);

try (FileInputStream inStream = new FileInputStream(checksumFile)) {
return ByteString.readFrom(inStream);
} catch (FileNotFoundException ex) {
// TODO: Build the container checksum tree when it doesn't exist.
LOG.debug("No checksum file currently exists for container {} at the path {}. Returning an empty instance.",
containerID, checksumFile, ex);
} catch (IOException ex) {
throw new IOException("Error occured when reading checksum file for container " + containerID +
" at the path " + checksumFile, ex);
}
return ByteString.EMPTY;
} finally {
readLock.unlock();
}
}

@VisibleForTesting
public ContainerMerkleTreeMetrics getMetrics() {
return this.metrics;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.ozone.container.checksum;

import org.apache.hadoop.metrics2.MetricsSource;
import org.apache.hadoop.metrics2.MetricsSystem;
import org.apache.hadoop.metrics2.annotation.Metric;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
Expand All @@ -31,6 +32,11 @@ public class ContainerMerkleTreeMetrics {

public static ContainerMerkleTreeMetrics create() {
MetricsSystem ms = DefaultMetricsSystem.instance();
// TODO: Remove when checksum manager is moved from KeyValueHandler.
MetricsSource source = ms.getSource(METRICS_SOURCE_NAME);
if (source != null) {
ms.unregisterSource(METRICS_SOURCE_NAME);
}
return ms.register(METRICS_SOURCE_NAME, "Container Merkle Tree Metrics",
new ContainerMerkleTreeMetrics());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.container.checksum;

import com.google.common.collect.ImmutableList;
import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.scm.client.ClientTrustManager;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls;
import org.apache.hadoop.hdds.security.SecurityConfig;
import org.apache.hadoop.hdds.security.symmetric.SecretKeySignerClient;
import org.apache.hadoop.hdds.security.x509.certificate.client.CACertificateProvider;
import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
import org.apache.hadoop.hdds.utils.HAUtils;
import org.apache.hadoop.ozone.OzoneSecurityUtil;
import jakarta.annotation.Nonnull;
import org.apache.hadoop.ozone.container.common.helpers.TokenHelper;
import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

import static org.apache.hadoop.ozone.container.common.helpers.TokenHelper.encode;

/**
* This class wraps necessary container-level rpc calls for container reconciliation.
* - GetContainerMerkleTree
*/
public class DNContainerOperationClient implements AutoCloseable {

private static final Logger LOG =
LoggerFactory.getLogger(DNContainerOperationClient.class);
private final TokenHelper tokenHelper;
private final XceiverClientManager xceiverClientManager;

public DNContainerOperationClient(ConfigurationSource conf,
CertificateClient certificateClient,
SecretKeySignerClient secretKeyClient) throws IOException {
this.tokenHelper = new TokenHelper(new SecurityConfig(conf), secretKeyClient);
this.xceiverClientManager = createClientManager(conf, certificateClient);
}

@Nonnull
private static XceiverClientManager createClientManager(
ConfigurationSource conf, CertificateClient certificateClient)
throws IOException {
ClientTrustManager trustManager = null;
if (OzoneSecurityUtil.isSecurityEnabled(conf)) {
CACertificateProvider localCaCerts =
() -> HAUtils.buildCAX509List(certificateClient, conf);
CACertificateProvider remoteCacerts =
() -> HAUtils.buildCAX509List(null, conf);
trustManager = new ClientTrustManager(remoteCacerts, localCaCerts);
}
DatanodeConfiguration dnConf = conf.getObject(DatanodeConfiguration.class);
return new XceiverClientManager(conf,
new XceiverClientManager.XceiverClientManagerConfigBuilder()
.setMaxCacheSize(dnConf.getContainerClientCacheSize())
.setStaleThresholdMs(dnConf.getContainerClientCacheStaleThreshold())
.build(), trustManager);
}

public XceiverClientManager getXceiverClientManager() {
return xceiverClientManager;
}

public ByteString getContainerMerkleTree(long containerId, DatanodeDetails dn)
throws IOException {
XceiverClientSpi xceiverClient = this.xceiverClientManager.acquireClient(createSingleNodePipeline(dn));
try {
String containerToken = encode(tokenHelper.getContainerToken(
ContainerID.valueOf(containerId)));
ContainerProtos.GetContainerMerkleTreeResponseProto response =
ContainerProtocolCalls.getContainerMerkleTree(xceiverClient,
containerId, containerToken);
return response.getContainerMerkleTree();
} finally {
this.xceiverClientManager.releaseClient(xceiverClient, false);
}
}

public static Pipeline createSingleNodePipeline(DatanodeDetails dn) {
return Pipeline.newBuilder()
.setNodes(ImmutableList.of(dn))
.setId(PipelineID.valueOf(dn.getUuid()))
.setState(Pipeline.PipelineState.CLOSED)
.setReplicationConfig(StandaloneReplicationConfig.getInstance(
HddsProtos.ReplicationFactor.ONE)).build();
}

@Override
public void close() throws IOException {
if (xceiverClientManager != null) {
xceiverClientManager.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.container.ec.reconstruction;
package org.apache.hadoop.ozone.container.common.helpers;

import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.BlockTokenSecretProto.AccessModeProto;
Expand All @@ -40,16 +40,16 @@
/**
* Wraps block and container token managers for datanode.
*/
class TokenHelper {
public class TokenHelper {

private final OzoneBlockTokenSecretManager blockTokenMgr;
private final ContainerTokenSecretManager containerTokenMgr;
private final String user;
private static final Set<AccessModeProto> MODES =
EnumSet.of(READ, WRITE, DELETE);

TokenHelper(SecurityConfig securityConfig,
SecretKeySignerClient secretKeyClient) throws IOException {
public TokenHelper(SecurityConfig securityConfig,
SecretKeySignerClient secretKeyClient) throws IOException {

boolean blockTokenEnabled = securityConfig.isBlockTokenEnabled();
boolean containerTokenEnabled = securityConfig.isContainerTokenEnabled();
Expand Down Expand Up @@ -83,19 +83,19 @@ class TokenHelper {
}
}

Token<OzoneBlockTokenIdentifier> getBlockToken(BlockID blockID, long length) {
public Token<OzoneBlockTokenIdentifier> getBlockToken(BlockID blockID, long length) {
return blockTokenMgr != null
? blockTokenMgr.generateToken(user, blockID, MODES, length)
: null;
}

Token<ContainerTokenIdentifier> getContainerToken(ContainerID containerID) {
public Token<ContainerTokenIdentifier> getContainerToken(ContainerID containerID) {
return containerTokenMgr != null
? containerTokenMgr.generateToken(user, containerID)
: null;
}

static String encode(Token<?> token) throws IOException {
public static String encode(Token<?> token) throws IOException {
return token != null ? token.encodeToUrlString() : null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -824,6 +824,7 @@ private static DNAction getAuditAction(Type cmdType) {
case GetCommittedBlockLength : return DNAction.GET_COMMITTED_BLOCK_LENGTH;
case StreamInit : return DNAction.STREAM_INIT;
case Echo : return DNAction.ECHO;
case GetContainerMerkleTree : return DNAction.GET_CONTAINER_MERKLE_TREE;
default :
LOG.debug("Invalid command type - {}", cmdType);
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ public class DatanodeConfiguration extends ReconfigurableConfig {
public static final String CONTAINER_SCHEMA_V3_ENABLED =
"hdds.datanode.container.schema.v3.enabled";
public static final String CONTAINER_CHECKSUM_LOCK_STRIPES_KEY = "hdds.datanode.container.checksum.lock.stripes";
public static final String CONTAINER_CLIENT_CACHE_SIZE = "hdds.datanode.container.client.cache.size";
public static final String CONTAINER_CLIENT_CACHE_STALE_THRESHOLD =
"hdds.datanode.container.client.cache.stale.threshold";

static final boolean CHUNK_DATA_VALIDATION_CHECK_DEFAULT = false;

Expand Down Expand Up @@ -111,6 +114,9 @@ public class DatanodeConfiguration extends ReconfigurableConfig {
public static final Boolean
OZONE_DATANODE_CHECK_EMPTY_CONTAINER_DIR_ON_DELETE_DEFAULT = false;
public static final int CONTAINER_CHECKSUM_LOCK_STRIPES_DEFAULT = 127;
public static final int CONTAINER_CLIENT_CACHE_SIZE_DEFAULT = 100;
public static final int
CONTAINER_CLIENT_CACHE_STALE_THRESHOLD_MILLISECONDS_DEFAULT = 10000;

/**
* Number of threads per volume that Datanode will use for chunk read.
Expand Down Expand Up @@ -567,6 +573,25 @@ public void setWaitOnAllFollowers(boolean val) {
)
private int containerChecksumLockStripes = CONTAINER_CHECKSUM_LOCK_STRIPES_DEFAULT;

@Config(key = "container.client.cache.size",
type = ConfigType.INT,
defaultValue = "100",
tags = { DATANODE },
description = "The maximum number of clients to be cached by the datanode client manager"
)
private int containerClientCacheSize = CONTAINER_CLIENT_CACHE_SIZE_DEFAULT;

@Config(key = "container.client.cache.stale.threshold",
type = ConfigType.INT,
defaultValue = "10000",
tags = { DATANODE },
description = "The stale threshold in ms for a client in cache. After this threshold the client " +
"is evicted from cache."
)
private int containerClientCacheStaleThreshold =
CONTAINER_CLIENT_CACHE_STALE_THRESHOLD_MILLISECONDS_DEFAULT;

@SuppressWarnings("checkstyle:MethodLength")
@PostConstruct
public void validate() {
if (containerDeleteThreads < 1) {
Expand Down Expand Up @@ -706,6 +731,19 @@ public void validate() {
CONTAINER_CHECKSUM_LOCK_STRIPES_DEFAULT);
containerChecksumLockStripes = CONTAINER_CHECKSUM_LOCK_STRIPES_DEFAULT;
}

if (containerClientCacheSize < 1) {
LOG.warn("{} must be at least 1. Defaulting to {}", CONTAINER_CLIENT_CACHE_SIZE,
CONTAINER_CLIENT_CACHE_SIZE_DEFAULT);
containerClientCacheSize = CONTAINER_CLIENT_CACHE_SIZE_DEFAULT;
}

if (containerClientCacheStaleThreshold < 1) {
LOG.warn("{} must be at least 1. Defaulting to {}", CONTAINER_CLIENT_CACHE_STALE_THRESHOLD,
CONTAINER_CLIENT_CACHE_STALE_THRESHOLD_MILLISECONDS_DEFAULT);
containerClientCacheStaleThreshold =
CONTAINER_CLIENT_CACHE_STALE_THRESHOLD_MILLISECONDS_DEFAULT;
}
}

public void setContainerDeleteThreads(int containerDeleteThreads) {
Expand Down Expand Up @@ -937,4 +975,12 @@ public void setAutoCompactionSmallSstFileNum(int num) {
public int getContainerChecksumLockStripes() {
return containerChecksumLockStripes;
}

public int getContainerClientCacheSize() {
return containerClientCacheSize;
}

public int getContainerClientCacheStaleThreshold() {
return containerClientCacheStaleThreshold;
}
}
Loading

0 comments on commit ab35173

Please sign in to comment.