Skip to content

Commit

Permalink
Tmp
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanzlenko committed Sep 23, 2024
1 parent a82ff70 commit e26a11a
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,6 @@
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadContainerRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadContainerResponseProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.VerifyBlockRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.VerifyBlockResponseProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.WriteChunkRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.FinalizeBlockRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.EchoRequestProto;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public class KeyValueContainerCheck {
private final String metadataPath;
private final HddsVolume volume;
private final KeyValueContainer container;
private static final DirectBufferPool BUFFER_POOL = new DirectBufferPool();
public static final DirectBufferPool BUFFER_POOL = new DirectBufferPool();

public KeyValueContainerCheck(String metadataPath, ConfigurationSource conf, long containerID, HddsVolume volume,
KeyValueContainer container) {
Expand Down Expand Up @@ -336,7 +336,7 @@ private ScanResult scanBlock(BlockData block, DataTransferThrottler throttler, C
return ScanResult.healthy();
}

private static ScanResult verifyChecksum(BlockData block, ContainerProtos.ChunkInfo chunk, File chunkFile,
public static ScanResult verifyChecksum(BlockData block, ContainerProtos.ChunkInfo chunk, File chunkFile,
ContainerLayoutVersion layout, ByteBuffer buffer, DataTransferThrottler throttler, Canceler canceler) {
ChecksumData checksumData = ChecksumData.getFromProtoBuf(chunk.getChecksumData());
int checksumCount = checksumData.getChecksums().size();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,7 @@

package org.apache.hadoop.ozone.container.keyvalue;

import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.*;
import java.nio.ByteBuffer;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
Expand Down Expand Up @@ -56,6 +52,8 @@
import org.apache.hadoop.hdds.upgrade.HDDSLayoutFeature;
import org.apache.hadoop.hdds.utils.FaultInjector;
import org.apache.hadoop.hdds.utils.HddsServerUtil;
import org.apache.hadoop.hdfs.util.Canceler;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.common.Checksum;
import org.apache.hadoop.ozone.common.ChunkBuffer;
Expand All @@ -65,10 +63,7 @@
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
import org.apache.hadoop.ozone.container.common.impl.BlockDeletingService;
import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion;
import org.apache.hadoop.ozone.container.common.impl.ContainerData;
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.apache.hadoop.ozone.container.common.impl.*;
import org.apache.hadoop.ozone.container.common.interfaces.BlockIterator;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.apache.hadoop.ozone.container.common.interfaces.DBHandle;
Expand All @@ -89,6 +84,7 @@
import org.apache.hadoop.ozone.container.keyvalue.impl.ChunkManagerFactory;
import org.apache.hadoop.ozone.container.keyvalue.interfaces.BlockManager;
import org.apache.hadoop.ozone.container.keyvalue.interfaces.ChunkManager;
import org.apache.hadoop.ozone.container.ozoneimpl.ContainerScannerConfiguration;
import org.apache.hadoop.ozone.container.upgrade.VersionedDatanodeFeatures;

import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -125,6 +121,10 @@
import static org.apache.hadoop.ozone.ClientVersion.EC_REPLICA_INDEX_REQUIRED_IN_BLOCK_REQUEST;
import static org.apache.hadoop.ozone.OzoneConsts.INCREMENTAL_CHUNK_LIST;
import static org.apache.hadoop.ozone.container.common.interfaces.Container.ScanResult;
import static org.apache.hadoop.ozone.container.common.interfaces.Container.ScanResult.FailureType.*;
import static org.apache.hadoop.ozone.container.common.interfaces.Container.ScanResult.unhealthy;
import static org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerCheck.BUFFER_POOL;
import static org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerCheck.verifyChecksum;

import org.apache.hadoop.util.Time;
import org.apache.ratis.statemachine.StateMachine;
Expand Down Expand Up @@ -270,7 +270,7 @@ static ContainerCommandResponseProto dispatchRequest(KeyValueHandler handler, Co
case Echo:
return handler.handleEcho(request);
case VerifyBlock:
return handler.handleVerifyBlock(request);
return handler.handleVerifyBlock(request, kvContainer);
default:
return null;
}
Expand Down Expand Up @@ -1366,14 +1366,78 @@ private ContainerCommandResponseProto checkFaultInjector(ContainerCommandRequest
return null;
}

private ContainerCommandResponseProto handleVerifyBlock(ContainerCommandRequestProto request) {
private ContainerCommandResponseProto handleVerifyBlock(ContainerCommandRequestProto request,
KeyValueContainer kvContainer) {
if (!request.hasVerifyBlock()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Malformed Verify Block request. trace ID: {}", request.getTraceID());
}
LOG.debug("Malformed Verify Block request. trace ID: {}", request.getTraceID());
return malformedRequest(request);
}

try {
BlockID blockID = BlockID.getFromProtobuf(request.getGetBlock().getBlockID());
if (replicaIndexCheckRequired(request)) {
BlockUtils.verifyReplicaIdx(kvContainer, blockID);
}
BlockData block = BlockData.getFromProtoBuf(blockManager.getBlock(kvContainer, blockID).getProtoBufMessage());

KeyValueContainerData onDiskContainerData;

String metadataPath = kvContainer.getContainerData().getMetadataPath();

long containerID = kvContainer.getContainerData().getContainerID();

File containerFile = KeyValueContainer.getContainerFile(metadataPath, containerID);

HddsVolume volume = kvContainer.getContainerData().getVolume();

try {
onDiskContainerData = (KeyValueContainerData) ContainerDataYaml.readContainerFile(containerFile);
onDiskContainerData.setVolume(volume);
} catch (FileNotFoundException ex) {
return unhealthy(MISSING_CONTAINER_FILE, containerFile, ex);
} catch (IOException ex) {
return unhealthy(CORRUPT_CONTAINER_FILE, containerFile, ex);
}

ContainerLayoutVersion layout = onDiskContainerData.getLayoutVersion();

for (ContainerProtos.ChunkInfo chunk : block.getChunks()) {
File chunkFile;
try {
chunkFile = layout.getChunkFile(onDiskContainerData, block.getBlockID(), chunk.getChunkName());
} catch (IOException ex) {
return unhealthy(MISSING_CHUNK_FILE, new File(onDiskContainerData.getChunksPath()), ex);
}

if (!chunkFile.exists()) {
// In EC, a client may write empty putBlock in padding block nodes.
// So, we need to make sure, chunk length > 0, before declaring the missing chunk file.
if (!block.getChunks().isEmpty() && block.getChunks().get(0).getLen() > 0) {
return unhealthy(MISSING_CHUNK_FILE, chunkFile,
new IOException("Missing chunk file " + chunkFile.getAbsolutePath()));
}
} else if (chunk.getChecksumData().getType() != ContainerProtos.ChecksumType.NONE) {
int bytesPerChecksum = chunk.getChecksumData().getBytesPerChecksum();
ByteBuffer buffer = BUFFER_POOL.getBuffer(bytesPerChecksum);

ContainerScannerConfiguration containerScannerConfiguration = conf.getObject(ContainerScannerConfiguration.class);

DataTransferThrottler throttler = new DataTransferThrottler(containerScannerConfiguration.getBandwidthPerVolume());

Canceler canceler = new Canceler();

ScanResult result = verifyChecksum(block, chunk, chunkFile, layout, buffer, throttler, canceler);
buffer.clear();
BUFFER_POOL.returnBuffer(buffer);
if (!result.isHealthy()) {
return result;
}
}
}
} catch (Exception e) {
LOG.error("OOPS", e);
}

return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ protected void execute(OzoneClient client, OzoneAddress address) throws IOExcept
writer.write(e.getMessage());
throw new RuntimeException(e);
} catch (Error e) {
writer.write(e.getMessage());
throw e;
} finally {
writer.flush();
Expand Down

0 comments on commit e26a11a

Please sign in to comment.