diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ContainerCommandResponseBuilders.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ContainerCommandResponseBuilders.java
index 25b26aab3ca..c8e1eb5ae44 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ContainerCommandResponseBuilders.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ContainerCommandResponseBuilders.java
@@ -335,6 +335,17 @@ public static ContainerCommandResponseProto getEchoResponse(
.build();
}
+ public static ContainerCommandResponseProto getGetContainerMerkleTreeResponse(
+ ContainerCommandRequestProto request, ByteString checksumTree) {
+
+ ContainerProtos.GetContainerMerkleTreeResponseProto.Builder containerMerkleTree =
+ ContainerProtos.GetContainerMerkleTreeResponseProto.newBuilder()
+ .setContainerID(request.getContainerID())
+ .setContainerMerkleTree(checksumTree);
+ return getSuccessResponseBuilder(request)
+ .setGetContainerMerkleTree(containerMerkleTree).build();
+ }
+
private ContainerCommandResponseBuilders() {
throw new UnsupportedOperationException("no instances");
}
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
index 659ddf2738b..90f930dd40c 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
@@ -726,6 +726,40 @@ public static EchoResponseProto echo(XceiverClientSpi client, String encodedCont
return response.getEcho();
}
+ /**
+ * Gets the Container merkle tree for a container from a datanode.
+ * @param client - client that communicates with the container
+ * @param containerID - Container Id of the container
+ * @param encodedContainerID - Encoded token if security is enabled
+ */
+ public static ContainerProtos.GetContainerMerkleTreeResponseProto getContainerMerkleTree(
+ XceiverClientSpi client, long containerID, String encodedContainerID) throws IOException {
+ ContainerProtos.GetContainerMerkleTreeRequestProto containerMerkleTreeRequestProto =
+ ContainerProtos.GetContainerMerkleTreeRequestProto
+ .newBuilder()
+ .setContainerID(containerID)
+ .build();
+ String id = client.getPipeline().getClosestNode().getUuidString();
+
+ ContainerCommandRequestProto.Builder builder = ContainerCommandRequestProto
+ .newBuilder()
+ .setCmdType(Type.GetContainerMerkleTree)
+ .setContainerID(containerID)
+ .setDatanodeUuid(id)
+ .setGetContainerMerkleTree(containerMerkleTreeRequestProto);
+ if (encodedContainerID != null) {
+ builder.setEncodedToken(encodedContainerID);
+ }
+ String traceId = TracingUtil.exportCurrentSpan();
+ if (traceId != null) {
+ builder.setTraceID(traceId);
+ }
+ ContainerCommandRequestProto request = builder.build();
+ ContainerCommandResponseProto response =
+ client.sendCommand(request, getValidatorList());
+ return response.getGetContainerMerkleTree();
+ }
+
/**
* Validates a response from a container protocol call. Any non-successful
* return code is mapped to a corresponding exception and thrown.
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/audit/DNAction.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/audit/DNAction.java
index 3d6c9795b47..e170bf85915 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/audit/DNAction.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/audit/DNAction.java
@@ -40,7 +40,8 @@ public enum DNAction implements AuditAction {
CLOSE_CONTAINER,
GET_COMMITTED_BLOCK_LENGTH,
STREAM_INIT,
- ECHO;
+ ECHO,
+ GET_CONTAINER_MERKLE_TREE;
@Override
public String getAction() {
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ContainerChecksumTreeManager.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ContainerChecksumTreeManager.java
index f05d69cdceb..aa2edfdef25 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ContainerChecksumTreeManager.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ContainerChecksumTreeManager.java
@@ -25,6 +25,7 @@
import java.io.File;
import java.io.FileInputStream;
+import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Collection;
@@ -35,6 +36,7 @@
import com.google.common.util.concurrent.Striped;
import org.apache.hadoop.hdds.utils.SimpleStriped;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -175,6 +177,30 @@ private void write(ContainerData data, ContainerProtos.ContainerChecksumInfo che
}
}
+ public ByteString getContainerChecksumInfo(KeyValueContainerData data)
+ throws IOException {
+ long containerID = data.getContainerID();
+ Lock readLock = getReadLock(containerID);
+ readLock.lock();
+ try {
+ File checksumFile = getContainerChecksumFile(data);
+
+ try (FileInputStream inStream = new FileInputStream(checksumFile)) {
+ return ByteString.readFrom(inStream);
+ } catch (FileNotFoundException ex) {
+ // TODO: Build the container checksum tree when it doesn't exist.
+ LOG.debug("No checksum file currently exists for container {} at the path {}. Returning an empty instance.",
+ containerID, checksumFile, ex);
+ } catch (IOException ex) {
+ throw new IOException("Error occured when reading checksum file for container " + containerID +
+ " at the path " + checksumFile, ex);
+ }
+ return ByteString.EMPTY;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
@VisibleForTesting
public ContainerMerkleTreeMetrics getMetrics() {
return this.metrics;
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ContainerMerkleTreeMetrics.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ContainerMerkleTreeMetrics.java
index 5bcf2bc04e1..3d76288616e 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ContainerMerkleTreeMetrics.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ContainerMerkleTreeMetrics.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.ozone.container.checksum;
+import org.apache.hadoop.metrics2.MetricsSource;
import org.apache.hadoop.metrics2.MetricsSystem;
import org.apache.hadoop.metrics2.annotation.Metric;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
@@ -31,6 +32,11 @@ public class ContainerMerkleTreeMetrics {
public static ContainerMerkleTreeMetrics create() {
MetricsSystem ms = DefaultMetricsSystem.instance();
+ // TODO: Remove when checksum manager is moved from KeyValueHandler.
+ MetricsSource source = ms.getSource(METRICS_SOURCE_NAME);
+ if (source != null) {
+ ms.unregisterSource(METRICS_SOURCE_NAME);
+ }
return ms.register(METRICS_SOURCE_NAME, "Container Merkle Tree Metrics",
new ContainerMerkleTreeMetrics());
}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/DNContainerOperationClient.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/DNContainerOperationClient.java
new file mode 100644
index 00000000000..bdf75763e04
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/DNContainerOperationClient.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.checksum;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.XceiverClientManager;
+import org.apache.hadoop.hdds.scm.XceiverClientSpi;
+import org.apache.hadoop.hdds.scm.client.ClientTrustManager;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls;
+import org.apache.hadoop.hdds.security.SecurityConfig;
+import org.apache.hadoop.hdds.security.symmetric.SecretKeySignerClient;
+import org.apache.hadoop.hdds.security.x509.certificate.client.CACertificateProvider;
+import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
+import org.apache.hadoop.hdds.utils.HAUtils;
+import org.apache.hadoop.ozone.OzoneSecurityUtil;
+import jakarta.annotation.Nonnull;
+import org.apache.hadoop.ozone.container.common.helpers.TokenHelper;
+import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+import static org.apache.hadoop.ozone.container.common.helpers.TokenHelper.encode;
+
+/**
+ * This class wraps necessary container-level rpc calls for container reconciliation.
+ * - GetContainerMerkleTree
+ */
+public class DNContainerOperationClient implements AutoCloseable {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(DNContainerOperationClient.class);
+ private final TokenHelper tokenHelper;
+ private final XceiverClientManager xceiverClientManager;
+
+ public DNContainerOperationClient(ConfigurationSource conf,
+ CertificateClient certificateClient,
+ SecretKeySignerClient secretKeyClient) throws IOException {
+ this.tokenHelper = new TokenHelper(new SecurityConfig(conf), secretKeyClient);
+ this.xceiverClientManager = createClientManager(conf, certificateClient);
+ }
+
+ @Nonnull
+ private static XceiverClientManager createClientManager(
+ ConfigurationSource conf, CertificateClient certificateClient)
+ throws IOException {
+ ClientTrustManager trustManager = null;
+ if (OzoneSecurityUtil.isSecurityEnabled(conf)) {
+ CACertificateProvider localCaCerts =
+ () -> HAUtils.buildCAX509List(certificateClient, conf);
+ CACertificateProvider remoteCacerts =
+ () -> HAUtils.buildCAX509List(null, conf);
+ trustManager = new ClientTrustManager(remoteCacerts, localCaCerts);
+ }
+ DatanodeConfiguration dnConf = conf.getObject(DatanodeConfiguration.class);
+ return new XceiverClientManager(conf,
+ new XceiverClientManager.XceiverClientManagerConfigBuilder()
+ .setMaxCacheSize(dnConf.getContainerClientCacheSize())
+ .setStaleThresholdMs(dnConf.getContainerClientCacheStaleThreshold())
+ .build(), trustManager);
+ }
+
+ public XceiverClientManager getXceiverClientManager() {
+ return xceiverClientManager;
+ }
+
+ public ByteString getContainerMerkleTree(long containerId, DatanodeDetails dn)
+ throws IOException {
+ XceiverClientSpi xceiverClient = this.xceiverClientManager.acquireClient(createSingleNodePipeline(dn));
+ try {
+ String containerToken = encode(tokenHelper.getContainerToken(
+ ContainerID.valueOf(containerId)));
+ ContainerProtos.GetContainerMerkleTreeResponseProto response =
+ ContainerProtocolCalls.getContainerMerkleTree(xceiverClient,
+ containerId, containerToken);
+ return response.getContainerMerkleTree();
+ } finally {
+ this.xceiverClientManager.releaseClient(xceiverClient, false);
+ }
+ }
+
+ public static Pipeline createSingleNodePipeline(DatanodeDetails dn) {
+ return Pipeline.newBuilder()
+ .setNodes(ImmutableList.of(dn))
+ .setId(PipelineID.valueOf(dn.getUuid()))
+ .setState(Pipeline.PipelineState.CLOSED)
+ .setReplicationConfig(StandaloneReplicationConfig.getInstance(
+ HddsProtos.ReplicationFactor.ONE)).build();
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (xceiverClientManager != null) {
+ xceiverClientManager.close();
+ }
+ }
+}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/TokenHelper.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/TokenHelper.java
similarity index 88%
rename from hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/TokenHelper.java
rename to hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/TokenHelper.java
index d916300a7c2..98b4d29635d 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/TokenHelper.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/TokenHelper.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.ozone.container.ec.reconstruction;
+package org.apache.hadoop.ozone.container.common.helpers;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.BlockTokenSecretProto.AccessModeProto;
@@ -40,7 +40,7 @@
/**
* Wraps block and container token managers for datanode.
*/
-class TokenHelper {
+public class TokenHelper {
private final OzoneBlockTokenSecretManager blockTokenMgr;
private final ContainerTokenSecretManager containerTokenMgr;
@@ -48,8 +48,8 @@ class TokenHelper {
private static final Set MODES =
EnumSet.of(READ, WRITE, DELETE);
- TokenHelper(SecurityConfig securityConfig,
- SecretKeySignerClient secretKeyClient) throws IOException {
+ public TokenHelper(SecurityConfig securityConfig,
+ SecretKeySignerClient secretKeyClient) throws IOException {
boolean blockTokenEnabled = securityConfig.isBlockTokenEnabled();
boolean containerTokenEnabled = securityConfig.isContainerTokenEnabled();
@@ -83,19 +83,19 @@ class TokenHelper {
}
}
- Token getBlockToken(BlockID blockID, long length) {
+ public Token getBlockToken(BlockID blockID, long length) {
return blockTokenMgr != null
? blockTokenMgr.generateToken(user, blockID, MODES, length)
: null;
}
- Token getContainerToken(ContainerID containerID) {
+ public Token getContainerToken(ContainerID containerID) {
return containerTokenMgr != null
? containerTokenMgr.generateToken(user, containerID)
: null;
}
- static String encode(Token> token) throws IOException {
+ public static String encode(Token> token) throws IOException {
return token != null ? token.encodeToUrlString() : null;
}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
index b47116950c5..d7852ac972b 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
@@ -824,6 +824,7 @@ private static DNAction getAuditAction(Type cmdType) {
case GetCommittedBlockLength : return DNAction.GET_COMMITTED_BLOCK_LENGTH;
case StreamInit : return DNAction.STREAM_INIT;
case Echo : return DNAction.ECHO;
+ case GetContainerMerkleTree : return DNAction.GET_CONTAINER_MERKLE_TREE;
default :
LOG.debug("Invalid command type - {}", cmdType);
return null;
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java
index 28bbb17aa8f..16cacd6fae8 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java
@@ -75,6 +75,9 @@ public class DatanodeConfiguration extends ReconfigurableConfig {
public static final String CONTAINER_SCHEMA_V3_ENABLED =
"hdds.datanode.container.schema.v3.enabled";
public static final String CONTAINER_CHECKSUM_LOCK_STRIPES_KEY = "hdds.datanode.container.checksum.lock.stripes";
+ public static final String CONTAINER_CLIENT_CACHE_SIZE = "hdds.datanode.container.client.cache.size";
+ public static final String CONTAINER_CLIENT_CACHE_STALE_THRESHOLD =
+ "hdds.datanode.container.client.cache.stale.threshold";
static final boolean CHUNK_DATA_VALIDATION_CHECK_DEFAULT = false;
@@ -111,6 +114,9 @@ public class DatanodeConfiguration extends ReconfigurableConfig {
public static final Boolean
OZONE_DATANODE_CHECK_EMPTY_CONTAINER_DIR_ON_DELETE_DEFAULT = false;
public static final int CONTAINER_CHECKSUM_LOCK_STRIPES_DEFAULT = 127;
+ public static final int CONTAINER_CLIENT_CACHE_SIZE_DEFAULT = 100;
+ public static final int
+ CONTAINER_CLIENT_CACHE_STALE_THRESHOLD_MILLISECONDS_DEFAULT = 10000;
/**
* Number of threads per volume that Datanode will use for chunk read.
@@ -567,6 +573,25 @@ public void setWaitOnAllFollowers(boolean val) {
)
private int containerChecksumLockStripes = CONTAINER_CHECKSUM_LOCK_STRIPES_DEFAULT;
+ @Config(key = "container.client.cache.size",
+ type = ConfigType.INT,
+ defaultValue = "100",
+ tags = { DATANODE },
+ description = "The maximum number of clients to be cached by the datanode client manager"
+ )
+ private int containerClientCacheSize = CONTAINER_CLIENT_CACHE_SIZE_DEFAULT;
+
+ @Config(key = "container.client.cache.stale.threshold",
+ type = ConfigType.INT,
+ defaultValue = "10000",
+ tags = { DATANODE },
+ description = "The stale threshold in ms for a client in cache. After this threshold the client " +
+ "is evicted from cache."
+ )
+ private int containerClientCacheStaleThreshold =
+ CONTAINER_CLIENT_CACHE_STALE_THRESHOLD_MILLISECONDS_DEFAULT;
+
+ @SuppressWarnings("checkstyle:MethodLength")
@PostConstruct
public void validate() {
if (containerDeleteThreads < 1) {
@@ -706,6 +731,19 @@ public void validate() {
CONTAINER_CHECKSUM_LOCK_STRIPES_DEFAULT);
containerChecksumLockStripes = CONTAINER_CHECKSUM_LOCK_STRIPES_DEFAULT;
}
+
+ if (containerClientCacheSize < 1) {
+ LOG.warn("{} must be at least 1. Defaulting to {}", CONTAINER_CLIENT_CACHE_SIZE,
+ CONTAINER_CLIENT_CACHE_SIZE_DEFAULT);
+ containerClientCacheSize = CONTAINER_CLIENT_CACHE_SIZE_DEFAULT;
+ }
+
+ if (containerClientCacheStaleThreshold < 1) {
+ LOG.warn("{} must be at least 1. Defaulting to {}", CONTAINER_CLIENT_CACHE_STALE_THRESHOLD,
+ CONTAINER_CLIENT_CACHE_STALE_THRESHOLD_MILLISECONDS_DEFAULT);
+ containerClientCacheStaleThreshold =
+ CONTAINER_CLIENT_CACHE_STALE_THRESHOLD_MILLISECONDS_DEFAULT;
+ }
}
public void setContainerDeleteThreads(int containerDeleteThreads) {
@@ -937,4 +975,12 @@ public void setAutoCompactionSmallSstFileNum(int num) {
public int getContainerChecksumLockStripes() {
return containerChecksumLockStripes;
}
+
+ public int getContainerClientCacheSize() {
+ return containerClientCacheSize;
+ }
+
+ public int getContainerClientCacheStaleThreshold() {
+ return containerClientCacheStaleThreshold;
+ }
}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java
index 8fadd19b67d..7e64766b41c 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java
@@ -47,6 +47,7 @@
import org.apache.hadoop.ozone.client.io.BlockInputStreamFactoryImpl;
import org.apache.hadoop.ozone.client.io.ECBlockInputStreamProxy;
import org.apache.hadoop.ozone.client.io.ECBlockReconstructedStripeInputStream;
+import org.apache.hadoop.ozone.container.common.helpers.TokenHelper;
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.security.token.Token;
@@ -76,7 +77,7 @@
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
-import static org.apache.hadoop.ozone.container.ec.reconstruction.TokenHelper.encode;
+import static org.apache.hadoop.ozone.container.common.helpers.TokenHelper.encode;
/**
* The Coordinator implements the main flow of reconstructing
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
index b2034eb7665..c1a623dbb9d 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
@@ -62,6 +62,7 @@
import org.apache.hadoop.ozone.common.ChunkBuffer;
import org.apache.hadoop.ozone.common.OzoneChecksumException;
import org.apache.hadoop.ozone.common.utils.BufferUtils;
+import org.apache.hadoop.ozone.container.checksum.ContainerChecksumTreeManager;
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
@@ -110,6 +111,7 @@
import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getListBlockResponse;
import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getPutFileResponseSuccess;
import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getReadChunkResponse;
+import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getGetContainerMerkleTreeResponse;
import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getReadContainerResponse;
import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getSuccessResponse;
import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getSuccessResponseBuilder;
@@ -143,6 +145,7 @@ public class KeyValueHandler extends Handler {
private final boolean validateChunkChecksumData;
// A striped lock that is held during container creation.
private final Striped containerCreationLocks;
+ private final ContainerChecksumTreeManager checksumManager;
public KeyValueHandler(ConfigurationSource config,
String datanodeId,
@@ -156,6 +159,7 @@ public KeyValueHandler(ConfigurationSource config,
DatanodeConfiguration.class).isChunkDataValidationCheck();
chunkManager = ChunkManagerFactory.createChunkManager(config, blockManager,
volSet);
+ checksumManager = new ContainerChecksumTreeManager(config);
try {
volumeChoosingPolicy = VolumeChoosingPolicyFactory.getPolicy(conf);
} catch (Exception e) {
@@ -281,6 +285,8 @@ static ContainerCommandResponseProto dispatchRequest(KeyValueHandler handler,
return handler.handleGetCommittedBlockLength(request, kvContainer);
case Echo:
return handler.handleEcho(request, kvContainer);
+ case GetContainerMerkleTree:
+ return handler.handleGetContainerMerkleTree(request, kvContainer);
default:
return null;
}
@@ -296,6 +302,11 @@ public BlockManager getBlockManager() {
return this.blockManager;
}
+ @VisibleForTesting
+ public ContainerChecksumTreeManager getChecksumManager() {
+ return this.checksumManager;
+ }
+
ContainerCommandResponseProto handleStreamInit(
ContainerCommandRequestProto request, KeyValueContainer kvContainer,
DispatcherContext dispatcherContext) {
@@ -574,6 +585,33 @@ ContainerCommandResponseProto handleEcho(
return getEchoResponse(request);
}
+ ContainerCommandResponseProto handleGetContainerMerkleTree(
+ ContainerCommandRequestProto request, KeyValueContainer kvContainer) {
+
+ if (!request.hasGetContainerMerkleTree()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Malformed Read Container Merkle tree request. trace ID: {}",
+ request.getTraceID());
+ }
+ return malformedRequest(request);
+ }
+
+ KeyValueContainerData containerData = kvContainer.getContainerData();
+ ByteString checksumTree = null;
+ try {
+ checksumTree = checksumManager.getContainerChecksumInfo(containerData);
+ } catch (IOException ex) {
+ return ContainerCommandResponseProto.newBuilder()
+ .setCmdType(request.getCmdType())
+ .setTraceID(request.getTraceID())
+ .setResult(IO_EXCEPTION)
+ .setMessage(ex.getMessage())
+ .build();
+ }
+
+ return getGetContainerMerkleTreeResponse(request, checksumTree);
+ }
+
/**
* Checks if a replicaIndex needs to be checked based on the client version for a request.
* @param request ContainerCommandRequest object.
diff --git a/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto b/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
index 833159c84ec..8e58458d524 100644
--- a/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
+++ b/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
@@ -104,6 +104,7 @@ enum Type {
StreamInit = 19;
StreamWrite = 20;
Echo = 21;
+ GetContainerMerkleTree = 22;
}
@@ -210,6 +211,7 @@ message ContainerCommandRequestProto {
optional string encodedToken = 23;
optional uint32 version = 24;
optional EchoRequestProto echo = 25;
+ optional GetContainerMerkleTreeRequestProto getContainerMerkleTree = 26;
}
message ContainerCommandResponseProto {
@@ -241,6 +243,7 @@ message ContainerCommandResponseProto {
optional GetCommittedBlockLengthResponseProto getCommittedBlockLength = 21;
optional EchoResponseProto echo = 22;
+ optional GetContainerMerkleTreeResponseProto getContainerMerkleTree = 23;
}
message ContainerDataProto {
@@ -387,6 +390,14 @@ message EchoResponseProto {
optional bytes payload = 1;
}
+message GetContainerMerkleTreeRequestProto {
+ optional int64 containerID = 1;
+}
+
+message GetContainerMerkleTreeResponseProto {
+ optional int64 containerID = 1;
+ optional bytes containerMerkleTree = 2;
+}
// Chunk Operations
message ChunkInfo {
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainerWithTLS.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainerWithTLS.java
index a1e8e1781f5..fef69f9bf65 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainerWithTLS.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainerWithTLS.java
@@ -27,16 +27,23 @@
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.XceiverClientManager.ScmClientConfig;
import org.apache.hadoop.hdds.scm.client.ClientTrustManager;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls;
+import org.apache.hadoop.hdds.security.SecurityConfig;
import org.apache.hadoop.hdds.security.symmetric.SecretKeyClient;
import org.apache.hadoop.hdds.security.token.ContainerTokenIdentifier;
import org.apache.hadoop.hdds.security.token.ContainerTokenSecretManager;
import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClientTestImpl;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.utils.IOUtils;
import org.apache.hadoop.ozone.client.SecretKeyTestClient;
import org.apache.hadoop.hdds.scm.XceiverClientGrpc;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.ozone.container.checksum.DNContainerOperationClient;
import org.apache.hadoop.ozone.container.common.ContainerTestUtils;
+import org.apache.hadoop.ozone.container.common.helpers.TokenHelper;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil;
import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
@@ -44,6 +51,7 @@
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.ozone.test.GenericTestUtils.LogCapturer;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
@@ -82,6 +90,7 @@
import static org.apache.hadoop.ozone.container.ContainerTestHelper.getCloseContainer;
import static org.apache.hadoop.ozone.container.ContainerTestHelper.getCreateContainerSecureRequest;
import static org.apache.hadoop.ozone.container.ContainerTestHelper.getTestContainerID;
+import static org.apache.hadoop.ozone.container.common.helpers.TokenHelper.encode;
import static org.apache.hadoop.ozone.container.replication.CopyContainerCompression.NO_COMPRESSION;
import static org.apache.ozone.test.GenericTestUtils.LogCapturer.captureLogs;
import static org.apache.ozone.test.GenericTestUtils.setLogLevel;
@@ -221,6 +230,71 @@ public void downloadContainer(boolean containerTokenEnabled)
if (client != null) {
clientManager.releaseClient(client, true);
}
+ IOUtils.closeQuietly(clientManager);
+ }
+ }
+
+ @ParameterizedTest(name = "Container token enabled: {0}")
+ @ValueSource(booleans = {false, true})
+ public void testDNContainerOperationClient(boolean containerTokenEnabled)
+ throws Exception {
+ conf.setBoolean(HddsConfigKeys.HDDS_CONTAINER_TOKEN_ENABLED,
+ containerTokenEnabled);
+ OzoneContainer container = createAndStartOzoneContainerInstance();
+ ScmClientConfig scmClientConf = conf.getObject(ScmClientConfig.class);
+ XceiverClientManager clientManager =
+ new XceiverClientManager(conf, scmClientConf, aClientTrustManager());
+ XceiverClientSpi client = null;
+ try (DNContainerOperationClient dnClient =
+ new DNContainerOperationClient(conf, caClient, keyClient)) {
+ client = clientManager.acquireClient(pipeline);
+ long containerId = createAndCloseContainer(client, containerTokenEnabled);
+ dnClient.getContainerMerkleTree(containerId, dn);
+ } finally {
+ if (container != null) {
+ container.stop();
+ }
+ if (client != null) {
+ clientManager.releaseClient(client, true);
+ }
+ IOUtils.closeQuietly(clientManager);
+ }
+ }
+
+ @Test
+ public void testGetContainerMerkleTree() throws IOException {
+ conf.setBoolean(HddsConfigKeys.HDDS_CONTAINER_TOKEN_ENABLED, true);
+ OzoneContainer container = createAndStartOzoneContainerInstance();
+ ScmClientConfig scmClientConf = conf.getObject(ScmClientConfig.class);
+ XceiverClientManager clientManager =
+ new XceiverClientManager(conf, scmClientConf, aClientTrustManager());
+ XceiverClientSpi client = null;
+ try {
+ client = clientManager.acquireClient(pipeline);
+ long containerId = createAndCloseContainer(client, true);
+ TokenHelper tokenHelper = new TokenHelper(new SecurityConfig(conf), keyClient);
+ String containerToken = encode(tokenHelper.getContainerToken(
+ ContainerID.valueOf(containerId)));
+ ContainerProtos.GetContainerMerkleTreeResponseProto response =
+ ContainerProtocolCalls.getContainerMerkleTree(client,
+ containerId, containerToken);
+ // Getting container merkle tree with valid container token
+ assertEquals(response.getContainerMerkleTree(), ByteString.EMPTY);
+
+ // Getting container merkle tree with invalid container token
+ XceiverClientSpi finalClient = client;
+ StorageContainerException exception = assertThrows(StorageContainerException.class,
+ () -> ContainerProtocolCalls.getContainerMerkleTree(
+ finalClient, containerId, "invalidContainerToken"));
+ assertEquals(ContainerProtos.Result.BLOCK_TOKEN_VERIFICATION_FAILED, exception.getResult());
+ } finally {
+ if (container != null) {
+ container.stop();
+ }
+ if (client != null) {
+ clientManager.releaseClient(client, true);
+ }
+ IOUtils.closeQuietly(clientManager);
}
}
@@ -286,6 +360,7 @@ public void testLongLivingClientWithCertRenews() throws Exception {
if (container != null) {
container.stop();
}
+ IOUtils.closeQuietly(clientManager);
}
}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/checksum/TestContainerCommandReconciliation.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/checksum/TestContainerCommandReconciliation.java
new file mode 100644
index 00000000000..2fd40487e47
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/checksum/TestContainerCommandReconciliation.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.dn.checksum;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.ozone.HddsDatanodeService;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.client.ObjectStore;
+import org.apache.hadoop.ozone.client.OzoneBucket;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.client.OzoneVolume;
+import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
+import org.apache.hadoop.ozone.container.checksum.ContainerMerkleTree;
+import org.apache.hadoop.ozone.container.checksum.DNContainerOperationClient;
+import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
+import org.apache.ozone.test.GenericTestUtils;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.hadoop.hdds.HddsConfigKeys.OZONE_METADATA_DIRS;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SECURITY_ENABLED_DEFAULT;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SECURITY_ENABLED_KEY;
+import static org.apache.hadoop.ozone.container.checksum.ContainerMerkleTreeTestUtils.assertTreesSortedAndMatch;
+import static org.apache.hadoop.ozone.container.checksum.ContainerMerkleTreeTestUtils.buildChunk;
+
+/**
+ * This class tests container commands for reconciliation.
+ */
+public class TestContainerCommandReconciliation {
+
+ private static final Logger LOG = LoggerFactory
+ .getLogger(TestContainerCommandReconciliation.class);
+ private static MiniOzoneCluster cluster;
+ private static OzoneClient rpcClient;
+ private static ObjectStore store;
+ private static OzoneConfiguration conf;
+ private static DNContainerOperationClient dnClient;
+
+ @TempDir
+ private static File testDir;
+
+ @BeforeAll
+ public static void init() throws Exception {
+ testDir = GenericTestUtils.getTestDir(
+ TestContainerCommandReconciliation.class.getSimpleName());
+ conf = new OzoneConfiguration();
+ conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath());
+ conf.setInt(ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT, 1);
+ conf.set(OZONE_METADATA_DIRS, testDir.getAbsolutePath());
+ conf.getBoolean(OZONE_SECURITY_ENABLED_KEY,
+ OZONE_SECURITY_ENABLED_DEFAULT);
+ cluster = MiniOzoneCluster.newBuilder(conf)
+ .setNumDatanodes(3)
+ .build();
+ cluster.waitForClusterToBeReady();
+ rpcClient = OzoneClientFactory.getRpcClient(conf);
+ store = rpcClient.getObjectStore();
+ dnClient = new DNContainerOperationClient(conf, null, null);
+ }
+
+ @AfterAll
+ public static void stop() throws IOException {
+ if (rpcClient != null) {
+ rpcClient.close();
+ }
+
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+
+ @Test
+ public void testGetContainerMerkleTree() throws Exception {
+ final String volumeName = UUID.randomUUID().toString();
+ final String bucketName = UUID.randomUUID().toString();
+ final String keyName = UUID.randomUUID().toString();
+ byte[] data = "Test content".getBytes(UTF_8);
+ store.createVolume(volumeName);
+ OzoneVolume volume = store.getVolume(volumeName);
+ volume.createBucket(bucketName);
+ OzoneBucket bucket = volume.getBucket(bucketName);
+ // Write Key
+ try (OzoneOutputStream os = bucket.createKey(keyName, data.length)) {
+ IOUtils.write(data, os);
+ }
+
+ // Close container
+ ContainerInfo container = cluster.getStorageContainerManager()
+ .getContainerManager().getContainers().get(0);
+ closeContainer(container);
+
+ //Write Checksum Data
+ ContainerMerkleTree tree = buildTestTree();
+ writeChecksumFileToDatanode(container, tree);
+
+ // Verify all the ContainerMerkle Tree matches
+ List datanodeDetails = cluster.getHddsDatanodes().stream()
+ .map(HddsDatanodeService::getDatanodeDetails).collect(Collectors.toList());
+ for (DatanodeDetails dn: datanodeDetails) {
+ ByteString merkleTree = dnClient.getContainerMerkleTree(container.getContainerID(), dn);
+ ContainerProtos.ContainerChecksumInfo containerChecksumInfo =
+ ContainerProtos.ContainerChecksumInfo.parseFrom(merkleTree);
+ assertTreesSortedAndMatch(tree.toProto(), containerChecksumInfo.getContainerMerkleTree());
+ }
+ }
+
+ public static void writeChecksumFileToDatanode(ContainerInfo container, ContainerMerkleTree tree) throws Exception {
+ // Write Container Merkle Tree
+ for (HddsDatanodeService dn : cluster.getHddsDatanodes()) {
+ KeyValueHandler keyValueHandler =
+ (KeyValueHandler) dn.getDatanodeStateMachine().getContainer().getDispatcher()
+ .getHandler(ContainerProtos.ContainerType.KeyValueContainer);
+ KeyValueContainer keyValueContainer =
+ (KeyValueContainer) dn.getDatanodeStateMachine().getContainer().getController()
+ .getContainer(container.getContainerID());
+ keyValueHandler.getChecksumManager().writeContainerDataTree(
+ keyValueContainer.getContainerData(), tree);
+ }
+ }
+
+ public static void closeContainer(ContainerInfo container) throws Exception {
+ //Close the container first.
+ cluster.getStorageContainerManager().getClientProtocolServer().closeContainer(container.getContainerID());
+ GenericTestUtils.waitFor(() -> checkContainerState(container), 100, 50000);
+ }
+
+ private static boolean checkContainerState(ContainerInfo container) {
+ ContainerInfo containerInfo = null;
+ try {
+ containerInfo = cluster.getStorageContainerManager()
+ .getContainerInfo(container.getContainerID());
+ return containerInfo.getState() == HddsProtos.LifeCycleState.CLOSED;
+ } catch (IOException e) {
+ LOG.error("Error when getting the container info", e);
+ }
+ return false;
+ }
+
+ public static ContainerMerkleTree buildTestTree() throws Exception {
+ final long blockID1 = 1;
+ final long blockID2 = 2;
+ final long blockID3 = 3;
+ ChunkInfo b1c1 = buildChunk(conf, 0, ByteBuffer.wrap(new byte[]{1, 2, 3}));
+ ChunkInfo b1c2 = buildChunk(conf, 1, ByteBuffer.wrap(new byte[]{4, 5, 6}));
+ ChunkInfo b2c1 = buildChunk(conf, 0, ByteBuffer.wrap(new byte[]{7, 8, 9}));
+ ChunkInfo b2c2 = buildChunk(conf, 1, ByteBuffer.wrap(new byte[]{12, 11, 10}));
+ ChunkInfo b3c1 = buildChunk(conf, 0, ByteBuffer.wrap(new byte[]{13, 14, 15}));
+ ChunkInfo b3c2 = buildChunk(conf, 1, ByteBuffer.wrap(new byte[]{16, 17, 18}));
+
+ ContainerMerkleTree tree = new ContainerMerkleTree();
+ tree.addChunks(blockID1, Arrays.asList(b1c1, b1c2));
+ tree.addChunks(blockID2, Arrays.asList(b2c1, b2c2));
+ tree.addChunks(blockID3, Arrays.asList(b3c1, b3c2));
+
+ return tree;
+ }
+}