Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-27264 Add options to consider compressed size when delimiting blocks during hfile writes #4675

Merged
merged 11 commits into from
Aug 15, 2022
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io.hfile;

import org.apache.yetus.audience.InterfaceAudience;

import java.io.IOException;

/**
* Allows for defining different compression rate predicates in implementing classes. Useful
* when compression is in place, and we want to define block size based on the compressed size,
* rather than the default behaviour that considers the uncompressed size only.
*
* Since we don't actually know the compressed size until we actual apply compression in the block
* byte buffer, we need to "predicate" this compression rate and minimize compression execution to
* avoid excessive resources usage.
*/
@InterfaceAudience.Private
public interface BlockCompressedSizePredicator {

String BLOCK_COMPRESSED_SIZE_PREDICATOR = "hbase.block.compressed.size.predicator";

String MAX_BLOCK_SIZE_UNCOMPRESSED = "hbase.block.max.size.uncompressed";

/**
* Calculates an adjusted block size limit based on a compression rate predicate.
* @param context the meta file information for the current file.
* @param uncompressedBlockSize the total uncompressed size read for the block so far.
* @return the adjusted block size limit based on a compression rate predicate.
* @throws IOException
*/
int calculateCompressionSizeLimit(HFileContext context, int uncompressedBlockSize)
throws IOException;

/**
* Updates the predicator with both compressed and uncompressed sizes of latest block written.
* To be called once the block is finshed and flushed to disk after compression.
* @param uncompressed the uncompressed size of last block written.
* @param compressed the compressed size of last block written.
*/
void updateLatestBlockSizes(int uncompressed, int compressed);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,12 @@
package org.apache.hadoop.hbase.io.hfile;

import static org.apache.hadoop.hbase.io.ByteBuffAllocator.HEAP;

import static org.apache.hadoop.hbase.io.hfile.BlockCompressedSizePredicator.BLOCK_COMPRESSED_SIZE_PREDICATOR;

import static org.apache.hadoop.hbase.io.hfile.trace.HFileContextAttributesBuilderConsumer.CONTEXT_KEY;


import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.api.trace.Span;
Expand Down Expand Up @@ -64,6 +68,7 @@
import org.apache.hadoop.hbase.util.ChecksumType;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -463,7 +468,7 @@ int getOnDiskSizeWithoutHeader() {
}

/** Returns the uncompressed size of data part (header and checksum excluded). */
int getUncompressedSizeWithoutHeader() {
public int getUncompressedSizeWithoutHeader() {
return uncompressedSizeWithoutHeader;
}

Expand Down Expand Up @@ -740,6 +745,10 @@ private enum State {
BLOCK_READY
}

private int maxSizeUnCompressed;

private BlockCompressedSizePredicator compressedSizePredicator;

/** Writer state. Used to ensure the correct usage protocol. */
private State state = State.INIT;

Expand Down Expand Up @@ -818,11 +827,12 @@ EncodingState getEncodingState() {
*/
public Writer(Configuration conf, HFileDataBlockEncoder dataBlockEncoder,
HFileContext fileContext) {
this(conf, dataBlockEncoder, fileContext, ByteBuffAllocator.HEAP);
this(conf, dataBlockEncoder, fileContext, ByteBuffAllocator.HEAP, fileContext.getBlocksize());
}

public Writer(Configuration conf, HFileDataBlockEncoder dataBlockEncoder,
HFileContext fileContext, ByteBuffAllocator allocator) {
HFileContext fileContext, ByteBuffAllocator allocator,
int maxSizeUnCompressed) {
if (fileContext.getBytesPerChecksum() < HConstants.HFILEBLOCK_HEADER_SIZE) {
throw new RuntimeException("Unsupported value of bytesPerChecksum. " + " Minimum is "
+ HConstants.HFILEBLOCK_HEADER_SIZE + " but the configured value is "
Expand All @@ -845,6 +855,10 @@ public Writer(Configuration conf, HFileDataBlockEncoder dataBlockEncoder,
// TODO: Why fileContext saved away when we have dataBlockEncoder and/or
// defaultDataBlockEncoder?
this.fileContext = fileContext;
this.compressedSizePredicator = (BlockCompressedSizePredicator) ReflectionUtils.
newInstance(conf.getClass(BLOCK_COMPRESSED_SIZE_PREDICATOR,
UncompressedBlockSizePredicator.class), new Configuration(conf));
this.maxSizeUnCompressed = maxSizeUnCompressed;
}

/**
Expand Down Expand Up @@ -897,6 +911,20 @@ void ensureBlockReady() throws IOException {
finishBlock();
}

public boolean shouldFinishBlock() throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a reason to have this logic here vs in HFileWriteRImpl with the rest of the shouldfinish logic?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method involves dealing with some block specifics, like compression, the block content byte array buffer and what to do with compression size when deciding what should be a block limit. Moving it to HFileWriteRImpl would spill some block specific variables and logic into the file writer logic. It just feels to me, putting it here is more cohesive.

// int uncompressedBlockSize = blockSizeWritten();
int uncompressedBlockSize = baosInMemory.size();
if (uncompressedBlockSize >= fileContext.getBlocksize()) {
if (uncompressedBlockSize < maxSizeUnCompressed) {
int adjustedBlockSize = compressedSizePredicator.
calculateCompressionSizeLimit(fileContext, uncompressedBlockSize);
return uncompressedBlockSize >= adjustedBlockSize;
}
return true;
}
return false;
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll also move this to Predicator and generalize this method

Suggested change
public boolean shouldFinishBlock() throws IOException {
// int uncompressedBlockSize = blockSizeWritten();
int uncompressedBlockSize = baosInMemory.size();
if (uncompressedBlockSize >= fileContext.getBlocksize()) {
if (uncompressedBlockSize < maxSizeUnCompressed) {
int adjustedBlockSize = compressedSizePredicator.
calculateCompressionSizeLimit(fileContext, uncompressedBlockSize);
return uncompressedBlockSize >= adjustedBlockSize;
}
return true;
}
return false;
}
public boolean checkBoundariesWithPredicate() throws IOException {
if(predicator==null){
throw new IllegalArgumentException("Expected at least the default BoundariesCheckPredicate");
}
return predicator.
shouldFinishBlock(fileContext, uncompressedBlockSize);
}

/**
* Finish up writing of the block. Flushes the compressing stream (if using compression), fills
* out the header, does any compression/encryption of bytes to flush out to disk, and manages
Expand Down Expand Up @@ -980,6 +1008,7 @@ private void putHeader(ByteBuff buff, int onDiskSize, int uncompressedSize,
private void putHeader(ByteArrayOutputStream dest, int onDiskSize, int uncompressedSize,
int onDiskDataSize) {
putHeader(dest.getBuffer(), 0, onDiskSize, uncompressedSize, onDiskDataSize);
compressedSizePredicator.updateLatestBlockSizes(uncompressedSize, onDiskSize);
}

/**
Expand Down Expand Up @@ -1077,7 +1106,7 @@ int getUncompressedSizeWithoutHeader() {
/**
* The uncompressed size of the block data, including header size.
*/
int getUncompressedSizeWithHeader() {
public int getUncompressedSizeWithHeader() {
expectState(State.BLOCK_READY);
return baosInMemory.size();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase.io.hfile;

import static org.apache.hadoop.hbase.io.hfile.BlockCompressedSizePredicator.MAX_BLOCK_SIZE_UNCOMPRESSED;

import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
Expand Down Expand Up @@ -291,8 +293,9 @@ protected void finishInit(final Configuration conf) {
if (blockWriter != null) {
throw new IllegalStateException("finishInit called twice");
}
blockWriter =
new HFileBlock.Writer(conf, blockEncoder, hFileContext, cacheConf.getByteBuffAllocator());
blockWriter = new HFileBlock.Writer(conf, blockEncoder, hFileContext,
cacheConf.getByteBuffAllocator(),
conf.getInt(MAX_BLOCK_SIZE_UNCOMPRESSED, hFileContext.getBlocksize() * 10));
// Data block index writer
boolean cacheIndexesOnWrite = cacheConf.shouldCacheIndexesOnWrite();
dataBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter(blockWriter,
Expand All @@ -319,6 +322,7 @@ protected void checkBlockBoundary() throws IOException {
shouldFinishBlock = blockWriter.encodedBlockSizeWritten() >= hFileContext.getBlocksize()
|| blockWriter.blockSizeWritten() >= hFileContext.getBlocksize();
}
shouldFinishBlock &= blockWriter.shouldFinishBlock();
if (shouldFinishBlock) {
finishBlock();
writeInlineBlocks(false);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io.hfile;

import org.apache.yetus.audience.InterfaceAudience;

import java.io.IOException;

/**
* This BlockCompressedSizePredicator implementation adjusts the block size limit based on the
* compression rate of the block contents read so far. For the first block, adjusted size would be
* zero, so it performs a compression of current block contents and calculate compression rate and
* adjusted size. For subsequent blocks, it only performs this calculation once the previous block
* adjusted size has been reached, and the block is about to be closed.
*/
@InterfaceAudience.Private
public class PreviousBlockCompressionRatePredicator implements BlockCompressedSizePredicator {

private int adjustedBlockSize;
private int compressionRatio = 1;

/**
* Calculates an adjusted block size limit based on the compression rate of current block
* contents. This calculation is only performed if this is the first block, otherwise, if the
* adjusted size from previous block has been reached by the current one.
* @param context the meta file information for the current file.
* @param uncompressedBlockSize the total uncompressed size read for the block so far.
* @return the adjusted block size limit based on block compression rate.
* @throws IOException
*/
@Override
public int calculateCompressionSizeLimit(HFileContext context, int uncompressedBlockSize)
throws IOException {
// In order to avoid excessive compression size calculations, we do it only once when
// the uncompressed size has reached BLOCKSIZE. We then use this compression size to
// calculate the compression rate, and adjust the block size limit by this ratio.
if (uncompressedBlockSize >= adjustedBlockSize) {
adjustedBlockSize = context.getBlocksize() * compressionRatio;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll remove the (uncompressedBlockSize >= adjustedBlockSize) check so that we are adjusting size on the basis of previous block compression everytime by calculating the adjustedBlockSize in updateLatestBlockSizes itself, And from this method only return it

return adjustedBlockSize;
}

/**
* Recalculates compression rate for the last block.
* @param uncompressed the uncompressed size of last block written.
* @param compressed the compressed size of last block written.
*/
@Override
public void updateLatestBlockSizes(int uncompressed, int compressed) {
compressionRatio = uncompressed/compressed;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

Suggested change
compressionRatio = uncompressed/compressed;
compressionFactor = uncompressed/compressed;

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io.hfile;

import org.apache.yetus.audience.InterfaceAudience;

import java.io.IOException;

/**
* This BlockCompressedSizePredicator implementation doesn't actually performs any predicate
* and simply return the configured BLOCK_SIZE value, without any adjustments. This is the default
* implementation if <b>hbase.block.compressed.size.predicator</b> property is not defined.
*/
@InterfaceAudience.Private
public class UncompressedBlockSizePredicator implements BlockCompressedSizePredicator {

/**
* Returns the configured BLOCK_SIZE as the block size limit, without applying any compression
* rate adjustments.
* @param context the meta file information for the current file.
* @param uncompressedBlockSize the total uncompressed size read for the block so far.
* @return the configured BLOCK_SIZE as the block size limit, without applying any compression
* rate adjustments.
* @throws IOException
*/
@Override
public int calculateCompressionSizeLimit(HFileContext context, int uncompressedBlockSize)
throws IOException {
return context.getBlocksize();
}

/**
* Empty implementation. Does nothing.
* @param uncompressed the uncompressed size of last block written.
* @param compressed the compressed size of last block written.
*/
@Override
public void updateLatestBlockSizes(int uncompressed, int compressed) {}

}
Loading