Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-27264 Add options to consider compressed size when delimiting blocks during hfile writes #4675

Merged
merged 11 commits into from
Aug 15, 2022
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io.hfile;

import org.apache.yetus.audience.InterfaceAudience;

/**
* Allows for defining different compression rate predicates on its implementing classes. Useful
* when compression is in place, and we want to define block size based on the compressed size,
* rather than the default behaviour that considers the uncompressed size only. Since we don't
* actually know the compressed size until we actual apply compression in the block byte buffer, we
* need to "predicate" this compression rate and minimize compression execution to avoid excessive
* resources usage. Different approaches for predicating the compressed block size can be defined by
* implementing classes. The <code>updateLatestBlockSizes</code> allows for updating uncompressed
* and compressed size values, and is called during block finishing (when we finally apply
* compression on the block data). Final block size predicate logic is implemented in
* <code>shouldFinishBlock</code>, which is called by the block writer once uncompressed size has
* reached the configured BLOCK size, and additional checks should be applied to decide if the block
* can be finished.
*/
@InterfaceAudience.Private
public interface BlockCompressedSizePredicator {

String BLOCK_COMPRESSED_SIZE_PREDICATOR = "hbase.block.compressed.size.predicator";

String MAX_BLOCK_SIZE_UNCOMPRESSED = "hbase.block.max.size.uncompressed";

/**
* Updates the predicator with both compressed and uncompressed sizes of latest block written. To
* be called once the block is finshed and flushed to disk after compression.
* @param context the HFileContext containg the configured max block size.
* @param uncompressed the uncompressed size of last block written.
* @param compressed the compressed size of last block written.
*/
void updateLatestBlockSizes(HFileContext context, int uncompressed, int compressed);

/**
* Decides if the block should be finished based on the comparison of its uncompressed size
* against an adjusted size based on a predicated compression factor.
* @param uncompressed true if the block should be finished. n
*/
boolean shouldFinishBlock(int uncompressed);

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.io.hfile;

import static org.apache.hadoop.hbase.io.ByteBuffAllocator.HEAP;
import static org.apache.hadoop.hbase.io.hfile.BlockCompressedSizePredicator.BLOCK_COMPRESSED_SIZE_PREDICATOR;
import static org.apache.hadoop.hbase.io.hfile.trace.HFileContextAttributesBuilderConsumer.CONTEXT_KEY;

import io.opentelemetry.api.common.Attributes;
Expand Down Expand Up @@ -64,6 +65,7 @@
import org.apache.hadoop.hbase.util.ChecksumType;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -463,7 +465,7 @@ int getOnDiskSizeWithoutHeader() {
}

/** Returns the uncompressed size of data part (header and checksum excluded). */
int getUncompressedSizeWithoutHeader() {
public int getUncompressedSizeWithoutHeader() {
return uncompressedSizeWithoutHeader;
}

Expand Down Expand Up @@ -740,6 +742,10 @@ private enum State {
BLOCK_READY
}

private int maxSizeUnCompressed;

private BlockCompressedSizePredicator compressedSizePredicator;

/** Writer state. Used to ensure the correct usage protocol. */
private State state = State.INIT;

Expand Down Expand Up @@ -818,11 +824,11 @@ EncodingState getEncodingState() {
*/
public Writer(Configuration conf, HFileDataBlockEncoder dataBlockEncoder,
HFileContext fileContext) {
this(conf, dataBlockEncoder, fileContext, ByteBuffAllocator.HEAP);
this(conf, dataBlockEncoder, fileContext, ByteBuffAllocator.HEAP, fileContext.getBlocksize());
}

public Writer(Configuration conf, HFileDataBlockEncoder dataBlockEncoder,
HFileContext fileContext, ByteBuffAllocator allocator) {
HFileContext fileContext, ByteBuffAllocator allocator, int maxSizeUnCompressed) {
if (fileContext.getBytesPerChecksum() < HConstants.HFILEBLOCK_HEADER_SIZE) {
throw new RuntimeException("Unsupported value of bytesPerChecksum. " + " Minimum is "
+ HConstants.HFILEBLOCK_HEADER_SIZE + " but the configured value is "
Expand All @@ -845,6 +851,10 @@ public Writer(Configuration conf, HFileDataBlockEncoder dataBlockEncoder,
// TODO: Why fileContext saved away when we have dataBlockEncoder and/or
// defaultDataBlockEncoder?
this.fileContext = fileContext;
this.compressedSizePredicator = (BlockCompressedSizePredicator) ReflectionUtils.newInstance(
conf.getClass(BLOCK_COMPRESSED_SIZE_PREDICATOR, UncompressedBlockSizePredicator.class),
new Configuration(conf));
this.maxSizeUnCompressed = maxSizeUnCompressed;
}

/**
Expand Down Expand Up @@ -897,6 +907,15 @@ void ensureBlockReady() throws IOException {
finishBlock();
}

public boolean checkBoundariesWithPredicate() {
int rawBlockSize = encodedBlockSizeWritten();
if (rawBlockSize >= maxSizeUnCompressed) {
return true;
} else {
return compressedSizePredicator.shouldFinishBlock(rawBlockSize);
}
}

/**
* Finish up writing of the block. Flushes the compressing stream (if using compression), fills
* out the header, does any compression/encryption of bytes to flush out to disk, and manages
Expand All @@ -911,6 +930,11 @@ private void finishBlock() throws IOException {
userDataStream.flush();
prevOffset = prevOffsetByType[blockType.getId()];

// We need to cache the unencoded/uncompressed size before changing the block state
int rawBlockSize = 0;
if (this.getEncodingState() != null) {
rawBlockSize = blockSizeWritten();
}
// We need to set state before we can package the block up for cache-on-write. In a way, the
// block is ready, but not yet encoded or compressed.
state = State.BLOCK_READY;
Expand All @@ -931,13 +955,18 @@ private void finishBlock() throws IOException {
onDiskBlockBytesWithHeader.reset();
onDiskBlockBytesWithHeader.write(compressAndEncryptDat.get(),
compressAndEncryptDat.getOffset(), compressAndEncryptDat.getLength());
// Update raw and compressed sizes in the predicate
compressedSizePredicator.updateLatestBlockSizes(fileContext, rawBlockSize,
onDiskBlockBytesWithHeader.size());

// Calculate how many bytes we need for checksum on the tail of the block.
int numBytes = (int) ChecksumUtil.numBytes(onDiskBlockBytesWithHeader.size(),
fileContext.getBytesPerChecksum());

// Put the header for the on disk bytes; header currently is unfilled-out
putHeader(onDiskBlockBytesWithHeader, onDiskBlockBytesWithHeader.size() + numBytes,
baosInMemory.size(), onDiskBlockBytesWithHeader.size());

if (onDiskChecksum.length != numBytes) {
onDiskChecksum = new byte[numBytes];
}
Expand Down Expand Up @@ -1077,7 +1106,7 @@ int getUncompressedSizeWithoutHeader() {
/**
* The uncompressed size of the block data, including header size.
*/
int getUncompressedSizeWithHeader() {
public int getUncompressedSizeWithHeader() {
expectState(State.BLOCK_READY);
return baosInMemory.size();
}
Expand All @@ -1101,7 +1130,7 @@ public int encodedBlockSizeWritten() {
* block at the moment. Note that this will return zero in the "block ready" state as well.
* @return the number of bytes written
*/
int blockSizeWritten() {
public int blockSizeWritten() {
return state != State.WRITING ? 0 : this.getEncodingState().getUnencodedDataSizeWritten();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase.io.hfile;

import static org.apache.hadoop.hbase.io.hfile.BlockCompressedSizePredicator.MAX_BLOCK_SIZE_UNCOMPRESSED;

import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
Expand Down Expand Up @@ -292,7 +294,8 @@ protected void finishInit(final Configuration conf) {
throw new IllegalStateException("finishInit called twice");
}
blockWriter =
new HFileBlock.Writer(conf, blockEncoder, hFileContext, cacheConf.getByteBuffAllocator());
new HFileBlock.Writer(conf, blockEncoder, hFileContext, cacheConf.getByteBuffAllocator(),
conf.getInt(MAX_BLOCK_SIZE_UNCOMPRESSED, hFileContext.getBlocksize() * 10));
// Data block index writer
boolean cacheIndexesOnWrite = cacheConf.shouldCacheIndexesOnWrite();
dataBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter(blockWriter,
Expand All @@ -319,6 +322,7 @@ protected void checkBlockBoundary() throws IOException {
shouldFinishBlock = blockWriter.encodedBlockSizeWritten() >= hFileContext.getBlocksize()
|| blockWriter.blockSizeWritten() >= hFileContext.getBlocksize();
}
shouldFinishBlock &= blockWriter.checkBoundariesWithPredicate();
if (shouldFinishBlock) {
finishBlock();
writeInlineBlocks(false);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io.hfile;

import org.apache.yetus.audience.InterfaceAudience;

/**
* This BlockCompressedSizePredicator implementation adjusts the block size limit based on the
* compression rate of the block contents read so far. For the first block, adjusted size would be
* zero, so it performs a compression of current block contents and calculate compression rate and
* adjusted size. For subsequent blocks, decision whether the block should be finished or not will
* be based on the compression rate calculated for the previous block.
*/
@InterfaceAudience.Private
public class PreviousBlockCompressionRatePredicator implements BlockCompressedSizePredicator {

private int adjustedBlockSize;
private int compressionRatio = 1;
private int configuredMaxBlockSize;

/**
* Recalculates compression rate for the last block and adjusts the block size limit as:
* BLOCK_SIZE * (uncompressed/compressed).
* @param context HFIleContext containing the configured max block size.
* @param uncompressed the uncompressed size of last block written.
* @param compressed the compressed size of last block written.
*/
@Override
public void updateLatestBlockSizes(HFileContext context, int uncompressed, int compressed) {
configuredMaxBlockSize = context.getBlocksize();
compressionRatio = uncompressed / compressed;
adjustedBlockSize = context.getBlocksize() * compressionRatio;
}

/**
* Returns <b>true</b> if the passed uncompressed size is larger than the limit calculated by
* <code>updateLatestBlockSizes</code>.
* @param uncompressed true if the block should be finished. n
*/
@Override
public boolean shouldFinishBlock(int uncompressed) {
if (uncompressed >= configuredMaxBlockSize) {
return uncompressed >= adjustedBlockSize;
}
return false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io.hfile;

import org.apache.yetus.audience.InterfaceAudience;

/**
* This BlockCompressedSizePredicator implementation doesn't actually performs any predicate and
* simply returns <b>true</b> on <code>shouldFinishBlock</code>. This is the default implementation
* if <b>hbase.block.compressed.size.predicator</b> property is not defined.
*/
@InterfaceAudience.Private
public class UncompressedBlockSizePredicator implements BlockCompressedSizePredicator {

/**
* Empty implementation. Does nothing.
* @param uncompressed the uncompressed size of last block written.
* @param compressed the compressed size of last block written.
*/
@Override
public void updateLatestBlockSizes(HFileContext context, int uncompressed, int compressed) {
}

/**
* Dummy implementation that always returns true. This means, we will be only considering the
* block uncompressed size for deciding when to finish a block.
* @param uncompressed true if the block should be finished. n
*/
@Override
public boolean shouldFinishBlock(int uncompressed) {
return true;
}

}
Loading