Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HDDS-10376. Add a Datanode API to supply a merkle tree for a given container. #6945

Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,17 @@ public static ContainerCommandResponseProto getEchoResponse(
.build();
}

public static ContainerCommandResponseProto getGetContainerMerkleTreeResponse(
ContainerCommandRequestProto request, ByteString checksumTree) {

ContainerProtos.GetContainerMerkleTreeResponseProto.Builder containerMerkleTree =
ContainerProtos.GetContainerMerkleTreeResponseProto.newBuilder()
.setContainerID(request.getContainerID())
.setContainerMerkleTree(checksumTree);
return getSuccessResponseBuilder(request)
.setGetContainerMerkleTree(containerMerkleTree).build();
}

private ContainerCommandResponseBuilders() {
throw new UnsupportedOperationException("no instances");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -726,6 +726,40 @@ public static EchoResponseProto echo(XceiverClientSpi client, String encodedCont
return response.getEcho();
}

/**
* Gets the Container merkle tree for a container from a datanode.
* @param client - client that communicates with the container
* @param containerID - Container Id of the container
* @param encodedContainerID - Encoded token if security is enabled
*/
public static ContainerProtos.GetContainerMerkleTreeResponseProto getContainerMerkleTree(
XceiverClientSpi client, long containerID, String encodedContainerID) throws IOException {
ContainerProtos.GetContainerMerkleTreeRequestProto containerMerkleTreeRequestProto =
ContainerProtos.GetContainerMerkleTreeRequestProto
.newBuilder()
.setContainerID(containerID)
.build();
String id = client.getPipeline().getClosestNode().getUuidString();

ContainerCommandRequestProto.Builder builder = ContainerCommandRequestProto
.newBuilder()
.setCmdType(Type.GetContainerMerkleTree)
.setContainerID(containerID)
.setDatanodeUuid(id)
.setGetContainerMerkleTree(containerMerkleTreeRequestProto);
if (encodedContainerID != null) {
builder.setEncodedToken(encodedContainerID);
}
String traceId = TracingUtil.exportCurrentSpan();
if (traceId != null) {
builder.setTraceID(traceId);
}
ContainerCommandRequestProto request = builder.build();
ContainerCommandResponseProto response =
client.sendCommand(request, getValidatorList());
return response.getGetContainerMerkleTree();
}

/**
* Validates a response from a container protocol call. Any non-successful
* return code is mapped to a corresponding exception and thrown.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ public enum DNAction implements AuditAction {
CLOSE_CONTAINER,
GET_COMMITTED_BLOCK_LENGTH,
STREAM_INIT,
ECHO;
ECHO,
GET_CONTAINER_MERKLE_TREE;

@Override
public String getAction() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.SortedSet;
Expand All @@ -32,6 +33,7 @@

import com.google.common.util.concurrent.Striped;
import org.apache.hadoop.hdds.utils.SimpleStriped;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -105,7 +107,7 @@ public void markBlocksAsDeleted(KeyValueContainerData data, SortedSet<Long> dele
}
}

public ContainerDiff diff(KeyValueContainerData thisContainer, ContainerProtos.ContainerChecksumInfo otherInfo)
public ContainerDiff diff(KeyValueContainerData thisContainer, File otherContainerTree)
throws IOException {
// TODO HDDS-10928 compare the checksum info of the two containers and return a summary.
// Callers can act on this summary to repair their container replica using the peer's replica.
Expand Down Expand Up @@ -166,6 +168,31 @@ private void write(KeyValueContainerData data, ContainerProtos.ContainerChecksum
}
}

public ByteString getContainerChecksumInfo(KeyValueContainerData data)
throws IOException {
long containerID = data.getContainerID();
Lock readLock = getReadLock(containerID);
readLock.lock();
try {
File checksumFile = getContainerChecksumFile(data);

try (FileInputStream inStream = new FileInputStream(checksumFile)) {
return ByteString.readFrom(inStream);
} catch (FileNotFoundException ex) {
// TODO: Build the container checksum tree when it doesn't exist.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not build it here, also this can be a WARN as post upgrade all containers won't have a checksum tree file

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, building the tree would be handled in the layer above based on the result of this method. We can update the comment. I'm actually thinking we should debug log here because this will get printed for every container the scanner encounters in the first run.

LOG.error("No checksum file currently exists for container {} at the path {}. Returning an empty instance.",
aswinshakil marked this conversation as resolved.
Show resolved Hide resolved
containerID, checksumFile, ex);
} catch (IOException ex) {
throw new IOException("Error occured when reading checksum file for container " + containerID +
" at the path " + checksumFile, ex);

aswinshakil marked this conversation as resolved.
Show resolved Hide resolved
}
return null;
} finally {
readLock.unlock();
}
}

public File getContainerChecksumFile(KeyValueContainerData data) {
return new File(data.getMetadataPath(), data.getContainerID() + ".tree");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public static ContainerMerkleTreeMetrics create() {
new ContainerMerkleTreeMetrics());
}

public void unregister() {
public static void unregister() {
MetricsSystem ms = DefaultMetricsSystem.instance();
ms.unregisterSource(METRICS_SOURCE_NAME);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.container.checksum;

import com.google.common.collect.ImmutableList;
import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.scm.client.ClientTrustManager;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls;
import org.apache.hadoop.hdds.security.SecurityConfig;
import org.apache.hadoop.hdds.security.symmetric.SecretKeySignerClient;
import org.apache.hadoop.hdds.security.x509.certificate.client.CACertificateProvider;
import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
import org.apache.hadoop.hdds.utils.HAUtils;
import org.apache.hadoop.ozone.OzoneSecurityUtil;
import jakarta.annotation.Nonnull;
import org.apache.hadoop.ozone.container.common.helpers.TokenHelper;
import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

import static org.apache.hadoop.ozone.container.common.helpers.TokenHelper.encode;

/**
* This class wraps necessary container-level rpc calls for container reconcilitaion.
aswinshakil marked this conversation as resolved.
Show resolved Hide resolved
* - GetContainerMerkleTree
*/
public class DNContainerOperationClient implements AutoCloseable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public class DNContainerOperationClient implements AutoCloseable {
public class DNContainerReconciliationOperationClient implements AutoCloseable {

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want to keep this generic so that we can merge this with ECContainerOperationClient

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can rename it then.


private static final Logger LOG =
LoggerFactory.getLogger(DNContainerOperationClient.class);
private final TokenHelper tokenHelper;
private final XceiverClientManager xceiverClientManager;

public DNContainerOperationClient(ConfigurationSource conf,
CertificateClient certificateClient,
SecretKeySignerClient secretKeyClient) throws IOException {
this.tokenHelper = new TokenHelper(new SecurityConfig(conf), secretKeyClient);
this.xceiverClientManager = createClientManager(conf, certificateClient);
}

@Nonnull
private static XceiverClientManager createClientManager(
ConfigurationSource conf, CertificateClient certificateClient)
throws IOException {
ClientTrustManager trustManager = null;
if (OzoneSecurityUtil.isSecurityEnabled(conf)) {
CACertificateProvider localCaCerts =
() -> HAUtils.buildCAX509List(certificateClient, conf);
CACertificateProvider remoteCacerts =
() -> HAUtils.buildCAX509List(null, conf);
trustManager = new ClientTrustManager(remoteCacerts, localCaCerts);
}
DatanodeConfiguration dnConf = conf.getObject(DatanodeConfiguration.class);
return new XceiverClientManager(conf,
new XceiverClientManager.XceiverClientManagerConfigBuilder()
.setMaxCacheSize(dnConf.getContainerClientCacheSize())
.setStaleThresholdMs(dnConf.getContainerClientCacheStaleThreshold())
Comment on lines +83 to +84
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the expected behavior to cache clients ? Is the client per Datanode? We do not expect much concurrency on the client, so should the threshold be longer ?

.build(), trustManager);
}

public XceiverClientManager getXceiverClientManager() {
return xceiverClientManager;
aswinshakil marked this conversation as resolved.
Show resolved Hide resolved
}

public ByteString getContainerMerkleTree(long containerId, DatanodeDetails dn)
throws IOException {
XceiverClientSpi xceiverClient = this.xceiverClientManager.acquireClient(createSingleNodePipeline(dn));
try {
String containerToken = encode(tokenHelper.getContainerToken(
ContainerID.valueOf(containerId)));
ContainerProtos.GetContainerMerkleTreeResponseProto response =
ContainerProtocolCalls.getContainerMerkleTree(xceiverClient,
containerId, containerToken);
return response.getContainerMerkleTree();
} finally {
this.xceiverClientManager.releaseClient(xceiverClient, false);
}
}

private Pipeline createSingleNodePipeline(DatanodeDetails dn) {
return Pipeline.newBuilder()
.setNodes(ImmutableList.of(dn))
.setId(PipelineID.valueOf(dn.getUuid()))
.setState(Pipeline.PipelineState.CLOSED)
.setReplicationConfig(StandaloneReplicationConfig.getInstance(
HddsProtos.ReplicationFactor.ONE)).build();
}

@Override
public void close() throws IOException {
if (xceiverClientManager != null) {
xceiverClientManager.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.container.ec.reconstruction;
package org.apache.hadoop.ozone.container.common.helpers;

import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.BlockTokenSecretProto.AccessModeProto;
Expand All @@ -40,16 +40,16 @@
/**
* Wraps block and container token managers for datanode.
*/
class TokenHelper {
public class TokenHelper {

private final OzoneBlockTokenSecretManager blockTokenMgr;
private final ContainerTokenSecretManager containerTokenMgr;
private final String user;
private static final Set<AccessModeProto> MODES =
EnumSet.of(READ, WRITE, DELETE);

TokenHelper(SecurityConfig securityConfig,
SecretKeySignerClient secretKeyClient) throws IOException {
public TokenHelper(SecurityConfig securityConfig,
SecretKeySignerClient secretKeyClient) throws IOException {

boolean blockTokenEnabled = securityConfig.isBlockTokenEnabled();
boolean containerTokenEnabled = securityConfig.isContainerTokenEnabled();
Expand Down Expand Up @@ -83,19 +83,19 @@ class TokenHelper {
}
}

Token<OzoneBlockTokenIdentifier> getBlockToken(BlockID blockID, long length) {
public Token<OzoneBlockTokenIdentifier> getBlockToken(BlockID blockID, long length) {
return blockTokenMgr != null
? blockTokenMgr.generateToken(user, blockID, MODES, length)
: null;
}

Token<ContainerTokenIdentifier> getContainerToken(ContainerID containerID) {
public Token<ContainerTokenIdentifier> getContainerToken(ContainerID containerID) {
return containerTokenMgr != null
? containerTokenMgr.generateToken(user, containerID)
: null;
}

static String encode(Token<?> token) throws IOException {
public static String encode(Token<?> token) throws IOException {
return token != null ? token.encodeToUrlString() : null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -824,6 +824,7 @@ private static DNAction getAuditAction(Type cmdType) {
case GetCommittedBlockLength : return DNAction.GET_COMMITTED_BLOCK_LENGTH;
case StreamInit : return DNAction.STREAM_INIT;
case Echo : return DNAction.ECHO;
case GetContainerMerkleTree : return DNAction.GET_CONTAINER_MERKLE_TREE;
default :
LOG.debug("Invalid command type - {}", cmdType);
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ public class DatanodeConfiguration extends ReconfigurableConfig {
public static final String CONTAINER_SCHEMA_V3_ENABLED =
"hdds.datanode.container.schema.v3.enabled";
public static final String CONTAINER_CHECKSUM_LOCK_STRIPES_KEY = "hdds.datanode.container.checksum.lock.stripes";
public static final String CONTAINER_CLIENT_CACHE_SIZE = "hdds.datanode.container.client.cache.size";
public static final String CONTAINER_CLIENT_CACHE_STALE_THRESHOLD =
"hdds.datanode.container.client.cache.stale.threshold";

static final boolean CHUNK_DATA_VALIDATION_CHECK_DEFAULT = false;

Expand Down Expand Up @@ -111,6 +114,8 @@ public class DatanodeConfiguration extends ReconfigurableConfig {
public static final Boolean
OZONE_DATANODE_CHECK_EMPTY_CONTAINER_DIR_ON_DELETE_DEFAULT = false;
public static final int CONTAINER_CHECKSUM_LOCK_STRIPES_DEFAULT = 127;
public static final int CONTAINER_CLIENT_CACHE_SIZE_DEFAULT = 100;
public static final int CONTAINER_CLIENT_CACHE_STALE_THRESHOLD_DEFAULT = 10000;
aswinshakil marked this conversation as resolved.
Show resolved Hide resolved

/**
* Number of threads per volume that Datanode will use for chunk read.
Expand Down Expand Up @@ -567,6 +572,24 @@ public void setWaitOnAllFollowers(boolean val) {
)
private int containerChecksumLockStripes = CONTAINER_CHECKSUM_LOCK_STRIPES_DEFAULT;

@Config(key = "container.client.cache.size",
type = ConfigType.INT,
defaultValue = "100",
tags = { DATANODE },
description = "The maximum number of clients to be cached by the datanode client manager"
)
private int containerClientCacheSize = CONTAINER_CLIENT_CACHE_SIZE_DEFAULT;

@Config(key = "container.client.cache.stale.threshold",
type = ConfigType.INT,
defaultValue = "10000",
tags = { DATANODE },
description = "The stale threshold in ms for a client in cache. After this threshold the client " +
"is evicted from cache."
)
private int containerClientCacheStaleThreshold = CONTAINER_CLIENT_CACHE_STALE_THRESHOLD_DEFAULT;

@SuppressWarnings("checkstyle:MethodLength")
@PostConstruct
public void validate() {
if (containerDeleteThreads < 1) {
Expand Down Expand Up @@ -706,6 +729,18 @@ public void validate() {
CONTAINER_CHECKSUM_LOCK_STRIPES_DEFAULT);
containerChecksumLockStripes = CONTAINER_CHECKSUM_LOCK_STRIPES_DEFAULT;
}

if (containerClientCacheSize < 1) {
LOG.warn("{} must be at least 1. Defaulting to {}", CONTAINER_CLIENT_CACHE_SIZE,
CONTAINER_CLIENT_CACHE_SIZE_DEFAULT);
containerClientCacheSize = CONTAINER_CLIENT_CACHE_SIZE_DEFAULT;
}

if (containerClientCacheStaleThreshold < 1) {
LOG.warn("{} must be at least 1. Defaulting to {}", CONTAINER_CLIENT_CACHE_STALE_THRESHOLD,
CONTAINER_CLIENT_CACHE_STALE_THRESHOLD_DEFAULT);
containerClientCacheStaleThreshold = CONTAINER_CLIENT_CACHE_STALE_THRESHOLD_DEFAULT;
}
}

public void setContainerDeleteThreads(int containerDeleteThreads) {
Expand Down Expand Up @@ -937,4 +972,12 @@ public void setAutoCompactionSmallSstFileNum(int num) {
public int getContainerChecksumLockStripes() {
return containerChecksumLockStripes;
}

public int getContainerClientCacheSize() {
return containerClientCacheSize;
}

public int getContainerClientCacheStaleThreshold() {
return containerClientCacheStaleThreshold;
}
}
Loading