Skip to content

Commit

Permalink
HDDS-11482. EC Checksum throws IllegalArgumentException because the b…
Browse files Browse the repository at this point in the history
…uffer limit is negative (apache#7230)

(cherry picked from commit 7ef7de2)
  • Loading branch information
aswinshakil authored Oct 10, 2024
1 parent 023799d commit a353f21
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,14 @@ public class ECBlockChecksumComputer extends AbstractBlockChecksumComputer {

private final List<ContainerProtos.ChunkInfo> chunkInfoList;
private final OmKeyInfo keyInfo;
private final long blockLength;


public ECBlockChecksumComputer(
List<ContainerProtos.ChunkInfo> chunkInfoList, OmKeyInfo keyInfo) {
List<ContainerProtos.ChunkInfo> chunkInfoList, OmKeyInfo keyInfo, long blockLength) {
this.chunkInfoList = chunkInfoList;
this.keyInfo = keyInfo;
this.blockLength = blockLength;
}

@Override
Expand All @@ -72,15 +74,13 @@ public void compute(OzoneClientConfig.ChecksumCombineMode combineMode)
private void computeMd5Crc() {
Preconditions.checkArgument(chunkInfoList.size() > 0);

final ContainerProtos.ChunkInfo firstChunkInfo = chunkInfoList.get(0);
long chunkSize = firstChunkInfo.getLen();
long bytesPerCrc = firstChunkInfo.getChecksumData().getBytesPerChecksum();
// Total parity checksum bytes per stripe to remove
int parityBytes = getParityBytes(chunkSize, bytesPerCrc);

final MessageDigest digester = MD5Hash.getDigester();

for (ContainerProtos.ChunkInfo chunkInfo : chunkInfoList) {
long chunkSize = chunkInfo.getLen();
long bytesPerCrc = chunkInfo.getChecksumData().getBytesPerChecksum();
// Total parity checksum bytes per stripe to remove
int parityBytes = getParityBytes(chunkSize, bytesPerCrc);
ByteString stripeChecksum = chunkInfo.getStripeChecksum();

Preconditions.checkNotNull(stripeChecksum);
Expand Down Expand Up @@ -121,66 +121,40 @@ private void computeCompositeCrc() throws IOException {

// Bytes required to create a CRC
long bytesPerCrc = firstChunkInfo.getChecksumData().getBytesPerChecksum();
long chunkSize = firstChunkInfo.getLen();

//When EC chunk size is not a multiple of ozone.client.bytes.per.checksum
// (default = 16KB) the last checksum in an EC chunk is only generated for
// offset.
long bytesPerCrcOffset = chunkSize % bytesPerCrc;

long keySize = keyInfo.getDataSize();
// Total parity checksum bytes per stripe to remove
int parityBytes = getParityBytes(chunkSize, bytesPerCrc);

// Number of checksum per chunk, Eg: 2MB EC chunk will
// have 2 checksum per chunk.
int numChecksumPerChunk = (int)
(Math.ceil((double) chunkSize / bytesPerCrc));
long blockSize = blockLength;

CrcComposer blockCrcComposer =
CrcComposer.newCrcComposer(dataChecksumType, bytesPerCrc);

for (ContainerProtos.ChunkInfo chunkInfo : chunkInfoList) {
ByteString stripeChecksum = chunkInfo.getStripeChecksum();
long chunkSize = chunkInfo.getLen();

// Total parity checksum bytes per stripe to remove
int parityBytes = getParityBytes(chunkSize, bytesPerCrc);

Preconditions.checkNotNull(stripeChecksum);
final int checksumSize = stripeChecksum.size();
Preconditions.checkArgument(checksumSize % 4 == 0,
"Checksum Bytes size does not match");
CrcComposer chunkCrcComposer =
CrcComposer.newCrcComposer(dataChecksumType, bytesPerCrc);

// Limit parity bytes as they do not contribute to fileChecksum
final ByteBuffer byteWrap = stripeChecksum.asReadOnlyByteBuffer();
byteWrap.limit(checksumSize - parityBytes);

long chunkOffsetIndex = 1;
while (byteWrap.hasRemaining()) {

/*
When chunk size is not a multiple of bytes.per.crc we get an offset.
For eg, RS-3-2-1524k is not a multiple of 1MB. So two checksums are
generated 1st checksum for 1024k bytes and 2nd checksum for 500k bytes.
When we reach the 2nd Checksum we need to modify the bytesPerCrc as in
this case 500k is the bytes for which the checksum is generated.
*/
long currentChunkOffset = Long.MAX_VALUE;
if ((chunkOffsetIndex % numChecksumPerChunk == 0)
&& (bytesPerCrcOffset > 0)) {
currentChunkOffset = bytesPerCrcOffset;
// Here Math.min in mainly required for last stripe's last chunk. The last chunk of the last stripe can be
// less than the chunkSize, chunkSize is only calculated from each stripe's first chunk. This would be fine
// for rest of the stripe because all the chunks are of the same size. But for the last stripe we don't know
// the exact size of the last chunk. So we calculate it with the of blockSize. If the block size is smaller
// than the chunk size, then we know it is the last stripe' last chunk.
long remainingChunkSize = Math.min(blockSize, chunkSize);
while (byteWrap.hasRemaining() && remainingChunkSize > 0) {
final int checksumData = byteWrap.getInt();
blockCrcComposer.update(checksumData, Math.min(bytesPerCrc, remainingChunkSize));
remainingChunkSize -= bytesPerCrc;
}

final int checksumDataCrc = byteWrap.getInt();
//To handle last chunk when it size is lower than 1524K in the case
// of rs-3-2-1524k.
long chunkSizePerChecksum = Math.min(Math.min(keySize, bytesPerCrc),
currentChunkOffset);
chunkCrcComposer.update(checksumDataCrc, chunkSizePerChecksum);

int chunkChecksumCrc = CrcUtil.readInt(chunkCrcComposer.digest(), 0);
blockCrcComposer.update(chunkChecksumCrc, chunkSizePerChecksum);
keySize -= Math.min(bytesPerCrc, currentChunkOffset);
++chunkOffsetIndex;
blockSize -= chunkSize;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ private boolean checksumBlock(OmKeyLocationInfo keyLocationInfo)
setBytesPerCRC(bytesPerChecksum);

ByteBuffer blockChecksumByteBuffer =
getBlockChecksumFromChunkChecksums(chunkInfos);
getBlockChecksumFromChunkChecksums(chunkInfos, keyLocationInfo.getLength());
String blockChecksumForDebug =
populateBlockChecksumBuf(blockChecksumByteBuffer);

Expand Down Expand Up @@ -140,10 +140,11 @@ private String populateBlockChecksumBuf(
}

private ByteBuffer getBlockChecksumFromChunkChecksums(
List<ContainerProtos.ChunkInfo> chunkInfos) throws IOException {
List<ContainerProtos.ChunkInfo> chunkInfos,
long blockLength) throws IOException {

AbstractBlockChecksumComputer blockChecksumComputer =
new ECBlockChecksumComputer(chunkInfos, getKeyInfo());
new ECBlockChecksumComputer(chunkInfos, getKeyInfo(), blockLength);
blockChecksumComputer.compute(getCombineMode());

return blockChecksumComputer.getOutByteBuffer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.hadoop.fs.ozone;

import com.google.common.collect.ImmutableList;
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileChecksum;
import org.apache.hadoop.fs.FileSystem;
Expand All @@ -39,6 +40,7 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import java.io.IOException;
Expand All @@ -53,10 +55,13 @@

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_KEY;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_NETWORK_TOPOLOGY_AWARE_READ_KEY;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE;
import static org.apache.hadoop.ozone.TestDataUtil.createBucket;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.params.provider.Arguments.arguments;

/**
* Test FileChecksum API.
Expand All @@ -68,10 +73,16 @@ public class TestOzoneFileChecksum {
true, false
};

private static final int[] DATA_SIZES = DoubleStream.of(0.5, 1, 1.5, 2, 7, 8)
.mapToInt(mb -> (int) (1024 * 1024 * mb))
private static final int[] DATA_SIZES_1 = DoubleStream.of(0.5, 1, 1.5, 2, 2.5, 3, 3.5, 4, 5, 6, 7, 8, 9, 10)
.mapToInt(mb -> (int) (1024 * 1024 * mb) + 510000)
.toArray();

private static final int[] DATA_SIZES_2 = DoubleStream.of(0.5, 1, 1.5, 2, 2.5, 3, 3.5, 4, 5, 6, 7, 8, 9, 10)
.mapToInt(mb -> (int) (1024 * 1024 * mb) + 820000)
.toArray();

private int[] dataSizes = new int[DATA_SIZES_1.length + DATA_SIZES_2.length];

private OzoneConfiguration conf;
private MiniOzoneCluster cluster = null;
private FileSystem fs;
Expand All @@ -84,6 +95,8 @@ public class TestOzoneFileChecksum {
void setup() throws IOException,
InterruptedException, TimeoutException {
conf = new OzoneConfiguration();
conf.setStorageSize(OZONE_SCM_CHUNK_SIZE_KEY, 1024 * 1024, StorageUnit.BYTES);
conf.setStorageSize(OZONE_SCM_BLOCK_SIZE, 2 * 1024 * 1024, StorageUnit.BYTES);
cluster = MiniOzoneCluster.newBuilder(conf)
.setNumDatanodes(5)
.build();
Expand All @@ -95,9 +108,8 @@ void setup() throws IOException,
OzoneConsts.OZONE_OFS_URI_SCHEME);
conf.setBoolean(disableCache, true);
conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, rootPath);
fs = FileSystem.get(conf);
ofs = (RootedOzoneFileSystem) fs;
adapter = (BasicRootedOzoneClientAdapterImpl) ofs.getAdapter();
System.arraycopy(DATA_SIZES_1, 0, dataSizes, 0, DATA_SIZES_1.length);
System.arraycopy(DATA_SIZES_2, 0, dataSizes, DATA_SIZES_1.length, DATA_SIZES_2.length);
}

@AfterEach
Expand All @@ -112,9 +124,13 @@ void teardown() {
* Test EC checksum with Replicated checksum.
*/
@ParameterizedTest
@MethodSource("missingIndexes")
void testEcFileChecksum(List<Integer> missingIndexes) throws IOException {
@MethodSource("missingIndexesAndChecksumSize")
void testEcFileChecksum(List<Integer> missingIndexes, double checksumSizeInMB) throws IOException {

conf.setInt("ozone.client.bytes.per.checksum", (int) (checksumSizeInMB * 1024 * 1024));
fs = FileSystem.get(conf);
ofs = (RootedOzoneFileSystem) fs;
adapter = (BasicRootedOzoneClientAdapterImpl) ofs.getAdapter();
String volumeName = UUID.randomUUID().toString();
String legacyBucket = UUID.randomUUID().toString();
String ecBucketName = UUID.randomUUID().toString();
Expand All @@ -139,7 +155,7 @@ void testEcFileChecksum(List<Integer> missingIndexes) throws IOException {

Map<Integer, String> replicatedChecksums = new HashMap<>();

for (int dataLen : DATA_SIZES) {
for (int dataLen : dataSizes) {
byte[] data = randomAlphabetic(dataLen).getBytes(UTF_8);

try (OutputStream file = adapter.createFile(volumeName + "/"
Expand Down Expand Up @@ -170,7 +186,7 @@ void testEcFileChecksum(List<Integer> missingIndexes) throws IOException {
clientConf.setBoolean(OZONE_NETWORK_TOPOLOGY_AWARE_READ_KEY,
topologyAware);
try (FileSystem fsForRead = FileSystem.get(clientConf)) {
for (int dataLen : DATA_SIZES) {
for (int dataLen : dataSizes) {
// Compute checksum after failed DNs
Path parent = new Path("/" + volumeName + "/" + ecBucketName + "/");
Path ecKey = new Path(parent, "test" + dataLen);
Expand All @@ -187,14 +203,13 @@ void testEcFileChecksum(List<Integer> missingIndexes) throws IOException {
}
}

static Stream<List<Integer>> missingIndexes() {
static Stream<Arguments> missingIndexesAndChecksumSize() {
return Stream.of(
ImmutableList.of(0, 1),
ImmutableList.of(1, 2),
ImmutableList.of(2, 3),
ImmutableList.of(3, 4),
ImmutableList.of(0, 3),
ImmutableList.of(0, 4)
);
arguments(ImmutableList.of(0, 1), 0.001),
arguments(ImmutableList.of(1, 2), 0.01),
arguments(ImmutableList.of(2, 3), 0.1),
arguments(ImmutableList.of(3, 4), 0.5),
arguments(ImmutableList.of(0, 3), 1),
arguments(ImmutableList.of(0, 4), 2));
}
}

0 comments on commit a353f21

Please sign in to comment.