Skip to content

Commit

Permalink
HBASE-27264 Add options to consider compressed size when delimiting b…
Browse files Browse the repository at this point in the history
…locks during hfile writes (apache#4675)

Signed-off-by: Tak Lon (Stephen) Wu <taklwu@apache.org>
Signed-off-by: Ankit Singhal <ankit@apache.org>

Change-Id: I0d969c83f9bb6a2a8d24e60e6b76b412b659e2b0
  • Loading branch information
wchevreuil authored and Wellington Chevreuil committed Aug 16, 2022
1 parent 240b5e6 commit cd2de1f
Show file tree
Hide file tree
Showing 6 changed files with 302 additions and 13 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io.hfile;

import org.apache.yetus.audience.InterfaceAudience;

/**
* Allows for defining different compression rate predicates on its implementing classes. Useful
* when compression is in place, and we want to define block size based on the compressed size,
* rather than the default behaviour that considers the uncompressed size only. Since we don't
* actually know the compressed size until we actual apply compression in the block byte buffer, we
* need to "predicate" this compression rate and minimize compression execution to avoid excessive
* resources usage. Different approaches for predicating the compressed block size can be defined by
* implementing classes. The <code>updateLatestBlockSizes</code> allows for updating uncompressed
* and compressed size values, and is called during block finishing (when we finally apply
* compression on the block data). Final block size predicate logic is implemented in
* <code>shouldFinishBlock</code>, which is called by the block writer once uncompressed size has
* reached the configured BLOCK size, and additional checks should be applied to decide if the block
* can be finished.
*/
@InterfaceAudience.Private
public interface BlockCompressedSizePredicator {

String BLOCK_COMPRESSED_SIZE_PREDICATOR = "hbase.block.compressed.size.predicator";

String MAX_BLOCK_SIZE_UNCOMPRESSED = "hbase.block.max.size.uncompressed";

/**
* Updates the predicator with both compressed and uncompressed sizes of latest block written. To
* be called once the block is finshed and flushed to disk after compression.
* @param context the HFileContext containg the configured max block size.
* @param uncompressed the uncompressed size of last block written.
* @param compressed the compressed size of last block written.
*/
void updateLatestBlockSizes(HFileContext context, int uncompressed, int compressed);

/**
* Decides if the block should be finished based on the comparison of its uncompressed size
* against an adjusted size based on a predicated compression factor.
* @param uncompressed true if the block should be finished. n
*/
boolean shouldFinishBlock(int uncompressed);

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.io.hfile;

import static org.apache.hadoop.hbase.io.ByteBuffAllocator.HEAP;
import static org.apache.hadoop.hbase.io.hfile.BlockCompressedSizePredicator.BLOCK_COMPRESSED_SIZE_PREDICATOR;

import java.io.DataInputStream;
import java.io.DataOutput;
Expand All @@ -29,6 +30,8 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.hbase.Cell;
Expand All @@ -53,6 +56,8 @@
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ChecksumType;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -451,7 +456,7 @@ int getOnDiskSizeWithoutHeader() {
}

/** Returns the uncompressed size of data part (header and checksum excluded). */
int getUncompressedSizeWithoutHeader() {
public int getUncompressedSizeWithoutHeader() {
return uncompressedSizeWithoutHeader;
}

Expand Down Expand Up @@ -726,6 +731,10 @@ private enum State {
BLOCK_READY
};

private int maxSizeUnCompressed;

private BlockCompressedSizePredicator compressedSizePredicator;

/** Writer state. Used to ensure the correct usage protocol. */
private State state = State.INIT;

Expand Down Expand Up @@ -803,11 +812,23 @@ EncodingState getEncodingState() {
* @param dataBlockEncoder data block encoding algorithm to use
*/
public Writer(HFileDataBlockEncoder dataBlockEncoder, HFileContext fileContext) {
this(dataBlockEncoder, fileContext, ByteBuffAllocator.HEAP);
this(new Configuration(), dataBlockEncoder, fileContext, ByteBuffAllocator.HEAP,
fileContext.getBlocksize());
}

public Writer(Configuration conf, HFileDataBlockEncoder dataBlockEncoder,
HFileContext fileContext) {
this(conf, dataBlockEncoder, fileContext, ByteBuffAllocator.HEAP, fileContext.getBlocksize());
}

public Writer(HFileDataBlockEncoder dataBlockEncoder,
HFileContext fileContext, ByteBuffAllocator allocator) {
this(new Configuration(), dataBlockEncoder, fileContext, allocator,
fileContext.getBlocksize());
}

public Writer(HFileDataBlockEncoder dataBlockEncoder, HFileContext fileContext,
ByteBuffAllocator allocator) {
public Writer(Configuration conf, HFileDataBlockEncoder dataBlockEncoder,
HFileContext fileContext, ByteBuffAllocator allocator, int maxSizeUnCompressed) {
if (fileContext.getBytesPerChecksum() < HConstants.HFILEBLOCK_HEADER_SIZE) {
throw new RuntimeException("Unsupported value of bytesPerChecksum. " + " Minimum is "
+ HConstants.HFILEBLOCK_HEADER_SIZE + " but the configured value is "
Expand All @@ -830,6 +851,10 @@ public Writer(HFileDataBlockEncoder dataBlockEncoder, HFileContext fileContext,
// TODO: Why fileContext saved away when we have dataBlockEncoder and/or
// defaultDataBlockEncoder?
this.fileContext = fileContext;
this.compressedSizePredicator = (BlockCompressedSizePredicator) ReflectionUtils.newInstance(
conf.getClass(BLOCK_COMPRESSED_SIZE_PREDICATOR, UncompressedBlockSizePredicator.class),
new Configuration(conf));
this.maxSizeUnCompressed = maxSizeUnCompressed;
}

/**
Expand Down Expand Up @@ -882,6 +907,15 @@ void ensureBlockReady() throws IOException {
finishBlock();
}

public boolean checkBoundariesWithPredicate() {
int rawBlockSize = encodedBlockSizeWritten();
if (rawBlockSize >= maxSizeUnCompressed) {
return true;
} else {
return compressedSizePredicator.shouldFinishBlock(rawBlockSize);
}
}

/**
* Finish up writing of the block. Flushes the compressing stream (if using compression), fills
* out the header, does any compression/encryption of bytes to flush out to disk, and manages
Expand All @@ -896,6 +930,11 @@ private void finishBlock() throws IOException {
userDataStream.flush();
prevOffset = prevOffsetByType[blockType.getId()];

// We need to cache the unencoded/uncompressed size before changing the block state
int rawBlockSize = 0;
if (this.getEncodingState() != null) {
rawBlockSize = blockSizeWritten();
}
// We need to set state before we can package the block up for cache-on-write. In a way, the
// block is ready, but not yet encoded or compressed.
state = State.BLOCK_READY;
Expand All @@ -916,13 +955,18 @@ private void finishBlock() throws IOException {
onDiskBlockBytesWithHeader.reset();
onDiskBlockBytesWithHeader.write(compressAndEncryptDat.get(),
compressAndEncryptDat.getOffset(), compressAndEncryptDat.getLength());
// Update raw and compressed sizes in the predicate
compressedSizePredicator.updateLatestBlockSizes(fileContext, rawBlockSize,
onDiskBlockBytesWithHeader.size());

// Calculate how many bytes we need for checksum on the tail of the block.
int numBytes = (int) ChecksumUtil.numBytes(onDiskBlockBytesWithHeader.size(),
fileContext.getBytesPerChecksum());

// Put the header for the on disk bytes; header currently is unfilled-out
putHeader(onDiskBlockBytesWithHeader, onDiskBlockBytesWithHeader.size() + numBytes,
baosInMemory.size(), onDiskBlockBytesWithHeader.size());

if (onDiskChecksum.length != numBytes) {
onDiskChecksum = new byte[numBytes];
}
Expand Down Expand Up @@ -1062,7 +1106,7 @@ int getUncompressedSizeWithoutHeader() {
/**
* The uncompressed size of the block data, including header size.
*/
int getUncompressedSizeWithHeader() {
public int getUncompressedSizeWithHeader() {
expectState(State.BLOCK_READY);
return baosInMemory.size();
}
Expand All @@ -1086,7 +1130,7 @@ public int encodedBlockSizeWritten() {
* block at the moment. Note that this will return zero in the "block ready" state as well.
* @return the number of bytes written
*/
int blockSizeWritten() {
public int blockSizeWritten() {
return state != State.WRITING ? 0 : this.getEncodingState().getUnencodedDataSizeWritten();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase.io.hfile;

import static org.apache.hadoop.hbase.io.hfile.BlockCompressedSizePredicator.MAX_BLOCK_SIZE_UNCOMPRESSED;

import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
Expand Down Expand Up @@ -292,7 +294,8 @@ protected void finishInit(final Configuration conf) {
throw new IllegalStateException("finishInit called twice");
}
blockWriter =
new HFileBlock.Writer(blockEncoder, hFileContext, cacheConf.getByteBuffAllocator());
new HFileBlock.Writer(conf, blockEncoder, hFileContext, cacheConf.getByteBuffAllocator(),
conf.getInt(MAX_BLOCK_SIZE_UNCOMPRESSED, hFileContext.getBlocksize() * 10));
// Data block index writer
boolean cacheIndexesOnWrite = cacheConf.shouldCacheIndexesOnWrite();
dataBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter(blockWriter,
Expand All @@ -319,6 +322,7 @@ protected void checkBlockBoundary() throws IOException {
shouldFinishBlock = blockWriter.encodedBlockSizeWritten() >= hFileContext.getBlocksize()
|| blockWriter.blockSizeWritten() >= hFileContext.getBlocksize();
}
shouldFinishBlock &= blockWriter.checkBoundariesWithPredicate();
if (shouldFinishBlock) {
finishBlock();
writeInlineBlocks(false);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io.hfile;

import org.apache.yetus.audience.InterfaceAudience;

/**
* This BlockCompressedSizePredicator implementation adjusts the block size limit based on the
* compression rate of the block contents read so far. For the first block, adjusted size would be
* zero, so it performs a compression of current block contents and calculate compression rate and
* adjusted size. For subsequent blocks, decision whether the block should be finished or not will
* be based on the compression rate calculated for the previous block.
*/
@InterfaceAudience.Private
public class PreviousBlockCompressionRatePredicator implements BlockCompressedSizePredicator {

private int adjustedBlockSize;
private int compressionRatio = 1;
private int configuredMaxBlockSize;

/**
* Recalculates compression rate for the last block and adjusts the block size limit as:
* BLOCK_SIZE * (uncompressed/compressed).
* @param context HFIleContext containing the configured max block size.
* @param uncompressed the uncompressed size of last block written.
* @param compressed the compressed size of last block written.
*/
@Override
public void updateLatestBlockSizes(HFileContext context, int uncompressed, int compressed) {
configuredMaxBlockSize = context.getBlocksize();
compressionRatio = uncompressed / compressed;
adjustedBlockSize = context.getBlocksize() * compressionRatio;
}

/**
* Returns <b>true</b> if the passed uncompressed size is larger than the limit calculated by
* <code>updateLatestBlockSizes</code>.
* @param uncompressed true if the block should be finished. n
*/
@Override
public boolean shouldFinishBlock(int uncompressed) {
if (uncompressed >= configuredMaxBlockSize) {
return uncompressed >= adjustedBlockSize;
}
return false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io.hfile;

import org.apache.yetus.audience.InterfaceAudience;

/**
* This BlockCompressedSizePredicator implementation doesn't actually performs any predicate and
* simply returns <b>true</b> on <code>shouldFinishBlock</code>. This is the default implementation
* if <b>hbase.block.compressed.size.predicator</b> property is not defined.
*/
@InterfaceAudience.Private
public class UncompressedBlockSizePredicator implements BlockCompressedSizePredicator {

/**
* Empty implementation. Does nothing.
* @param uncompressed the uncompressed size of last block written.
* @param compressed the compressed size of last block written.
*/
@Override
public void updateLatestBlockSizes(HFileContext context, int uncompressed, int compressed) {
}

/**
* Dummy implementation that always returns true. This means, we will be only considering the
* block uncompressed size for deciding when to finish a block.
* @param uncompressed true if the block should be finished. n
*/
@Override
public boolean shouldFinishBlock(int uncompressed) {
return true;
}

}
Loading

0 comments on commit cd2de1f

Please sign in to comment.