Skip to content

Commit

Permalink
HDDS-10411. Support incremental ChunkBuffer checksum calculation (apa…
Browse files Browse the repository at this point in the history
  • Loading branch information
smengcl authored Nov 25, 2024
1 parent 579a38e commit 9bc9145
Show file tree
Hide file tree
Showing 9 changed files with 327 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ public enum ChecksumCombineMode {
description =
"Indicates the time duration in seconds a client will wait "
+ "before retrying a read key request on encountering "
+ "a connectivity excepetion from Datanodes . "
+ "a connectivity exception from Datanodes. "
+ "By default the interval is 1 second",
tags = ConfigTag.CLIENT)
private int readRetryInterval = 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,8 +233,7 @@ public BlockOutputStream(
writtenDataLength = 0;
failedServers = new ArrayList<>(0);
ioException = new AtomicReference<>(null);
checksum = new Checksum(config.getChecksumType(),
config.getBytesPerChecksum());
this.checksum = new Checksum(config.getChecksumType(), config.getBytesPerChecksum(), true);
this.clientMetrics = clientMetrics;
this.streamBufferArgs = streamBufferArgs;
this.allowPutBlockPiggybacking = canEnablePutblockPiggybacking();
Expand Down Expand Up @@ -587,6 +586,7 @@ CompletableFuture<PutBlockResult> executePutBlock(boolean close,
final CompletableFuture<ContainerCommandResponseProto> flushFuture;
final XceiverClientReply asyncReply;
try {
// Note: checksum was previously appended to containerBlockData by WriteChunk
BlockData blockData = containerBlockData.build();
LOG.debug("sending PutBlock {} flushPos {}", blockData, flushPos);

Expand Down Expand Up @@ -854,6 +854,8 @@ public synchronized void cleanup(boolean invalidateClient) {
if (lastChunkBuffer != null) {
DIRECT_BUFFER_POOL.returnBuffer(lastChunkBuffer);
lastChunkBuffer = null;
// Clear checksum cache
checksum.clearChecksumCache();
}
}

Expand Down Expand Up @@ -903,7 +905,10 @@ private CompletableFuture<PutBlockResult> writeChunkToContainer(
final long offset = chunkOffset.getAndAdd(effectiveChunkSize);
final ByteString data = chunk.toByteString(
bufferPool.byteStringConversion());
ChecksumData checksumData = checksum.computeChecksum(chunk);
// chunk is incremental, don't cache its checksum
ChecksumData checksumData = checksum.computeChecksum(chunk, false);
// side note: checksum object is shared with PutBlock's (blockData) checksum calc,
// current impl does not support caching both
ChunkInfo chunkInfo = ChunkInfo.newBuilder()
.setChunkName(blockID.get().getLocalID() + "_chunk_" + ++chunkIndex)
.setOffset(offset)
Expand Down Expand Up @@ -1053,6 +1058,7 @@ private void updateBlockDataForWriteChunk(ChunkBuffer chunk)
lastChunkBuffer.capacity() - lastChunkBuffer.position();
appendLastChunkBuffer(chunk, 0, remainingBufferSize);
updateBlockDataWithLastChunkBuffer();
// TODO: Optional refactoring: Can attach ChecksumCache to lastChunkBuffer rather than Checksum
appendLastChunkBuffer(chunk, remainingBufferSize,
chunk.remaining() - remainingBufferSize);
}
Expand All @@ -1069,10 +1075,13 @@ private void updateBlockDataWithLastChunkBuffer()
LOG.debug("lastChunkInfo = {}", lastChunkInfo);
long lastChunkSize = lastChunkInfo.getLen();
addToBlockData(lastChunkInfo);

// Set ByteBuffer limit to capacity, pos to 0. Does not erase data
lastChunkBuffer.clear();

if (lastChunkSize == config.getStreamBufferSize()) {
lastChunkOffset += config.getStreamBufferSize();
// Reached stream buffer size (chunk size), starting new chunk, need to clear checksum cache
checksum.clearChecksumCache();
} else {
lastChunkBuffer.position((int) lastChunkSize);
}
Expand Down Expand Up @@ -1136,8 +1145,9 @@ private ChunkInfo createChunkInfo(long lastPartialChunkOffset)
lastChunkBuffer.flip();
int revisedChunkSize = lastChunkBuffer.remaining();
// create the chunk info to be sent in PutBlock.
ChecksumData revisedChecksumData =
checksum.computeChecksum(lastChunkBuffer);
// checksum cache is utilized for this computation
// this checksum is stored in blockData and later transferred in PutBlock
ChecksumData revisedChecksumData = checksum.computeChecksum(lastChunkBuffer, true);

long chunkID = lastPartialChunkOffset / config.getStreamBufferSize();
ChunkInfo.Builder revisedChunkInfo = ChunkInfo.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,17 @@
import org.apache.hadoop.ozone.common.utils.BufferUtils;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.thirdparty.com.google.protobuf.UnsafeByteOperations;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Class to compute and verify checksums for chunks.
*
* This class is not thread safe.
*/
public class Checksum {
public static final Logger LOG = LoggerFactory.getLogger(Checksum.class);

private static Function<ByteBuffer, ByteString> newMessageDigestFunction(
String algorithm) {
final MessageDigest md;
Expand All @@ -63,7 +67,7 @@ public static ByteString int2ByteString(int n) {
private static Function<ByteBuffer, ByteString> newChecksumByteBufferFunction(
Supplier<ChecksumByteBuffer> constructor) {
final ChecksumByteBuffer algorithm = constructor.get();
return data -> {
return data -> {
algorithm.reset();
algorithm.update(data);
return int2ByteString((int)algorithm.getValue());
Expand Down Expand Up @@ -97,6 +101,23 @@ Function<ByteBuffer, ByteString> newChecksumFunction() {

private final ChecksumType checksumType;
private final int bytesPerChecksum;
/**
* Caches computeChecksum() result when requested.
* This must be manually cleared when a new block chunk has been started.
*/
private final ChecksumCache checksumCache;

/**
* BlockOutputStream needs to call this method to clear the checksum cache
* whenever a block chunk has been established.
*/
public boolean clearChecksumCache() {
if (checksumCache != null) {
checksumCache.clear();
return true;
}
return false;
}

/**
* Constructs a Checksum object.
Expand All @@ -106,6 +127,24 @@ Function<ByteBuffer, ByteString> newChecksumFunction() {
public Checksum(ChecksumType type, int bytesPerChecksum) {
this.checksumType = type;
this.bytesPerChecksum = bytesPerChecksum;
this.checksumCache = null;
}

/**
* Constructs a Checksum object.
* @param type type of Checksum
* @param bytesPerChecksum number of bytes of data per checksum
* @param allowChecksumCache true to enable checksum cache
*/
public Checksum(ChecksumType type, int bytesPerChecksum, boolean allowChecksumCache) {
this.checksumType = type;
this.bytesPerChecksum = bytesPerChecksum;
LOG.debug("allowChecksumCache = {}", allowChecksumCache);
if (allowChecksumCache) {
this.checksumCache = new ChecksumCache(bytesPerChecksum);
} else {
this.checksumCache = null;
}
}

/**
Expand All @@ -128,13 +167,25 @@ public ChecksumData computeChecksum(byte[] data)
return computeChecksum(ByteBuffer.wrap(data));
}

/**
* The default implementation of computeChecksum(ByteBuffer) that does not use cache, even if cache is initialized.
* This is a stop-gap solution before the protocol change.
* @param data ByteBuffer
* @return ChecksumData
* @throws OzoneChecksumException
*/
public ChecksumData computeChecksum(ByteBuffer data)
throws OzoneChecksumException {
return computeChecksum(data, false);
}

/**
* Computes checksum for give data.
* @param data input data.
* @return ChecksumData computed for input data.
* @throws OzoneChecksumException thrown when ChecksumType is not recognized
*/
public ChecksumData computeChecksum(ByteBuffer data)
public ChecksumData computeChecksum(ByteBuffer data, boolean useChecksumCache)
throws OzoneChecksumException {
// If type is set to NONE, we do not need to compute the checksums. We also
// need to avoid unnecessary conversions.
Expand All @@ -144,7 +195,7 @@ public ChecksumData computeChecksum(ByteBuffer data)
if (!data.isReadOnly()) {
data = data.asReadOnlyBuffer();
}
return computeChecksum(ChunkBuffer.wrap(data));
return computeChecksum(ChunkBuffer.wrap(data), useChecksumCache);
}

public ChecksumData computeChecksum(List<ByteString> byteStrings)
Expand All @@ -154,8 +205,20 @@ public ChecksumData computeChecksum(List<ByteString> byteStrings)
return computeChecksum(ChunkBuffer.wrap(buffers));
}

/**
* The default implementation of computeChecksum(ChunkBuffer) that does not use cache, even if cache is initialized.
* This is a stop-gap solution before the protocol change.
* @param data ChunkBuffer
* @return ChecksumData
* @throws OzoneChecksumException
*/
public ChecksumData computeChecksum(ChunkBuffer data)
throws OzoneChecksumException {
return computeChecksum(data, false);
}

public ChecksumData computeChecksum(ChunkBuffer data, boolean useCache)
throws OzoneChecksumException {
if (checksumType == ChecksumType.NONE) {
// Since type is set to NONE, we do not need to compute the checksums
return new ChecksumData(checksumType, bytesPerChecksum);
Expand All @@ -168,12 +231,20 @@ public ChecksumData computeChecksum(ChunkBuffer data)
throw new OzoneChecksumException(checksumType);
}

// Checksum is computed for each bytesPerChecksum number of bytes of data
// starting at offset 0. The last checksum might be computed for the
// remaining data with length less than bytesPerChecksum.
final List<ByteString> checksumList = new ArrayList<>();
for (ByteBuffer b : data.iterate(bytesPerChecksum)) {
checksumList.add(computeChecksum(b, function, bytesPerChecksum));
final List<ByteString> checksumList;
if (checksumCache == null || !useCache) {
// When checksumCache is not enabled:
// Checksum is computed for each bytesPerChecksum number of bytes of data
// starting at offset 0. The last checksum might be computed for the
// remaining data with length less than bytesPerChecksum.
checksumList = new ArrayList<>();
for (ByteBuffer b : data.iterate(bytesPerChecksum)) {
checksumList.add(computeChecksum(b, function, bytesPerChecksum)); // merge this?
}
} else {
// When checksumCache is enabled:
// We only need to update the last checksum in the cache, then pass it along.
checksumList = checksumCache.computeChecksum(data, function);
}
return new ChecksumData(checksumType, bytesPerChecksum, checksumList);
}
Expand All @@ -185,7 +256,7 @@ public ChecksumData computeChecksum(ChunkBuffer data)
* @param maxLength the max length of data
* @return computed checksum ByteString
*/
private static ByteString computeChecksum(ByteBuffer data,
protected static ByteString computeChecksum(ByteBuffer data,
Function<ByteBuffer, ByteString> function, int maxLength) {
final int limit = data.limit();
try {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.common;

import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Function;

/**
* Cache previous checksums to avoid recomputing them.
* This is a stop-gap solution to reduce checksum calc overhead inside critical section
* without having to do a major refactoring/overhaul over protobuf and interfaces.
* This is only supposed to be used by BlockOutputStream, for now.
* <p>
* Each BlockOutputStream has its own Checksum instance.
* Each block chunk (4 MB default) is divided into 16 KB (default) each for checksum calculation.
* For CRC32/CRC32C, each checksum takes 4 bytes. Thus each block chunk has 4 MB / 16 KB * 4 B = 1 KB of checksum data.
*/
public class ChecksumCache {
public static final Logger LOG = LoggerFactory.getLogger(ChecksumCache.class);

private final int bytesPerChecksum;
private final List<ByteString> checksums;
// Chunk length last time the checksum is computed
private int prevChunkLength;
// This only serves as a hint for array list initial allocation. The array list will still grow as needed.
private static final int BLOCK_CHUNK_SIZE = 4 * 1024 * 1024; // 4 MB

public ChecksumCache(int bytesPerChecksum) {
LOG.info("Initializing ChecksumCache with bytesPerChecksum = {}", bytesPerChecksum);
this.prevChunkLength = 0;
this.bytesPerChecksum = bytesPerChecksum;
// Set initialCapacity to avoid costly resizes
this.checksums = new ArrayList<>(BLOCK_CHUNK_SIZE / bytesPerChecksum);
}

/**
* Clear cached checksums. And reset the written index.
*/
public void clear() {
prevChunkLength = 0;
checksums.clear();
}

public List<ByteString> getChecksums() {
return checksums;
}

public List<ByteString> computeChecksum(ChunkBuffer data, Function<ByteBuffer, ByteString> function) {
// Indicates how much data the current chunk buffer holds
final int currChunkLength = data.limit();

if (currChunkLength == prevChunkLength) {
LOG.debug("ChunkBuffer data limit same as last time ({}). No new checksums need to be computed", prevChunkLength);
return checksums;
}

// Sanity check
if (currChunkLength < prevChunkLength) {
// If currChunkLength <= lastChunkLength, it indicates a bug that needs to be addressed.
// It means BOS has not properly clear()ed the cache when a new chunk is started in that code path.
throw new IllegalArgumentException("ChunkBuffer data limit (" + currChunkLength + ")" +
" must not be smaller than last time (" + prevChunkLength + ")");
}

// One or more checksums need to be computed

// Start of the checksum index that need to be (re)computed
final int ciStart = prevChunkLength / bytesPerChecksum;
final int ciEnd = currChunkLength / bytesPerChecksum + (currChunkLength % bytesPerChecksum == 0 ? 0 : 1);
int i = 0;
for (ByteBuffer b : data.iterate(bytesPerChecksum)) {
if (i < ciStart) {
i++;
continue;
}

// variable i can either point to:
// 1. the last element in the list -- in which case the checksum needs to be updated
// 2. one after the last element -- in which case a new checksum needs to be added
assert i == checksums.size() - 1 || i == checksums.size();

// TODO: Furthermore for CRC32/CRC32C, it can be even more efficient by updating the last checksum byte-by-byte.
final ByteString checksum = Checksum.computeChecksum(b, function, bytesPerChecksum);
if (i == checksums.size()) {
checksums.add(checksum);
} else {
checksums.set(i, checksum);
}

i++;
}

// Sanity check
if (i != ciEnd) {
throw new IllegalStateException("ChecksumCache: Checksum index end does not match expectation");
}

// Update last written index
prevChunkLength = currChunkLength;
return checksums;
}
}
Loading

0 comments on commit 9bc9145

Please sign in to comment.