Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-27264 Add options to consider compressed size when delimiting blocks during hfile writes #4675

Merged
merged 11 commits into from
Aug 15, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.hadoop.hbase.io.ByteBufferWriterDataOutputStream;
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.encoding.EncodedDataBlock;
import org.apache.hadoop.hbase.io.encoding.EncodingState;
import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext;
import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext;
Expand Down Expand Up @@ -239,6 +240,10 @@ static class Header {
static final byte[] DUMMY_HEADER_NO_CHECKSUM =
new byte[HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM];

public static final String BLOCK_SIZE_LIMIT_COMPRESSED = "hbase.block.size.limit.compressed";

public static final String MAX_BLOCK_SIZE_COMPRESSED = "hbase.block.size.max.compressed";

/**
* Used deserializing blocks from Cache. <code>
* ++++++++++++++
Expand Down Expand Up @@ -454,7 +459,7 @@ int getOnDiskSizeWithoutHeader() {
}

/** Returns the uncompressed size of data part (header and checksum excluded). */
int getUncompressedSizeWithoutHeader() {
public int getUncompressedSizeWithoutHeader() {
return uncompressedSizeWithoutHeader;
}

Expand Down Expand Up @@ -729,6 +734,16 @@ private enum State {
BLOCK_READY
}

public boolean isSizeLimitCompressed() {
return sizeLimitCompressed;
}

private boolean sizeLimitCompressed;

private int maxSizeCompressed;

private int adjustedBlockSize;

/** Writer state. Used to ensure the correct usage protocol. */
private State state = State.INIT;

Expand Down Expand Up @@ -807,11 +822,13 @@ EncodingState getEncodingState() {
*/
public Writer(Configuration conf, HFileDataBlockEncoder dataBlockEncoder,
HFileContext fileContext) {
this(conf, dataBlockEncoder, fileContext, ByteBuffAllocator.HEAP);
this(conf, dataBlockEncoder, fileContext, ByteBuffAllocator.HEAP, false,
fileContext.getBlocksize());
}

public Writer(Configuration conf, HFileDataBlockEncoder dataBlockEncoder,
HFileContext fileContext, ByteBuffAllocator allocator) {
HFileContext fileContext, ByteBuffAllocator allocator, boolean sizeLimitcompleted,
int maxSizeCompressed) {
if (fileContext.getBytesPerChecksum() < HConstants.HFILEBLOCK_HEADER_SIZE) {
throw new RuntimeException("Unsupported value of bytesPerChecksum. " + " Minimum is "
+ HConstants.HFILEBLOCK_HEADER_SIZE + " but the configured value is "
Expand All @@ -834,6 +851,8 @@ public Writer(Configuration conf, HFileDataBlockEncoder dataBlockEncoder,
// TODO: Why fileContext saved away when we have dataBlockEncoder and/or
// defaultDataBlockEncoder?
this.fileContext = fileContext;
this.sizeLimitCompressed = sizeLimitcompleted;
this.maxSizeCompressed = maxSizeCompressed;
}

/**
Expand Down Expand Up @@ -886,6 +905,27 @@ void ensureBlockReady() throws IOException {
finishBlock();
}

public boolean shouldFinishBlock() throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a reason to have this logic here vs in HFileWriteRImpl with the rest of the shouldfinish logic?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method involves dealing with some block specifics, like compression, the block content byte array buffer and what to do with compression size when deciding what should be a block limit. Moving it to HFileWriteRImpl would spill some block specific variables and logic into the file writer logic. It just feels to me, putting it here is more cohesive.

int uncompressedBlockSize = blockSizeWritten();
if (uncompressedBlockSize >= fileContext.getBlocksize()) {
if (sizeLimitCompressed && uncompressedBlockSize < maxSizeCompressed) {
// In order to avoid excessive compression size calculations, we do it only once when
// the uncompressed size has reached BLOCKSIZE. We then use this compression size to
// calculate the compression rate, and adjust the block size limit by this ratio.
if (adjustedBlockSize == 0 || uncompressedBlockSize >= adjustedBlockSize) {
int compressedSize = EncodedDataBlock.getCompressedSize(fileContext.getCompression(),
fileContext.getCompression().getCompressor(), baosInMemory.getBuffer(), 0,
baosInMemory.size());
adjustedBlockSize = uncompressedBlockSize / compressedSize;
adjustedBlockSize *= fileContext.getBlocksize();
}
return uncompressedBlockSize >= adjustedBlockSize;
}
return true;
}
return false;
}

/**
* Finish up writing of the block. Flushes the compressing stream (if using compression), fills
* out the header, does any compression/encryption of bytes to flush out to disk, and manages
Expand Down Expand Up @@ -1066,7 +1106,7 @@ int getUncompressedSizeWithoutHeader() {
/**
* The uncompressed size of the block data, including header size.
*/
int getUncompressedSizeWithHeader() {
public int getUncompressedSizeWithHeader() {
expectState(State.BLOCK_READY);
return baosInMemory.size();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
*/
package org.apache.hadoop.hbase.io.hfile;

import static org.apache.hadoop.hbase.io.hfile.HFileBlock.BLOCK_SIZE_LIMIT_COMPRESSED;
import static org.apache.hadoop.hbase.io.hfile.HFileBlock.MAX_BLOCK_SIZE_COMPRESSED;

import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
Expand Down Expand Up @@ -291,8 +294,9 @@ protected void finishInit(final Configuration conf) {
if (blockWriter != null) {
throw new IllegalStateException("finishInit called twice");
}
blockWriter =
new HFileBlock.Writer(conf, blockEncoder, hFileContext, cacheConf.getByteBuffAllocator());
blockWriter = new HFileBlock.Writer(conf, blockEncoder, hFileContext,
cacheConf.getByteBuffAllocator(), conf.getBoolean(BLOCK_SIZE_LIMIT_COMPRESSED, false),
conf.getInt(MAX_BLOCK_SIZE_COMPRESSED, hFileContext.getBlocksize() * 10));
// Data block index writer
boolean cacheIndexesOnWrite = cacheConf.shouldCacheIndexesOnWrite();
dataBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter(blockWriter,
Expand All @@ -319,6 +323,9 @@ protected void checkBlockBoundary() throws IOException {
shouldFinishBlock = blockWriter.encodedBlockSizeWritten() >= hFileContext.getBlocksize()
|| blockWriter.blockSizeWritten() >= hFileContext.getBlocksize();
}
if (blockWriter.isSizeLimitCompressed()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's change this to if (blockWriter.isSizeLimitCompressed() && !shouldFinishBlock)? Just noting your comment on the calculation of compression ratio in the other file, we could further avoid that cost if we already know we need to finish the block for other reasons.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We actually want to enter here if shouldFinishBlock is true. Because that means the raw/encoded uncompressed size is larger than BLOCK_SIZE. But we don't know if the compressed size is smaller than BLOCK_SIZE, so we'll call blockWriter.shouldFinishBlock() to find that out.

And in the case where shouldFinishBlock is false, we'll not actually calculate the compressed size inside blockWriter.shouldFinishBlock(), because we don't go beyond this point.

shouldFinishBlock &= blockWriter.shouldFinishBlock();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

related to the above comment by @bbeaudreault

in what situation the shouldFinishBlock = true and blockWriter.shouldFinishBlock() = false ? is it possible?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, shouldFinishBlock could be true at this point because so far here we just checked "raw" uncompressed size or encoded uncompressed against BLOCK_SIZE. It is possible that these sizes are higher than BLOCK_SIZE, but the compressed size might still be less than the BLOCK_SIZE.

}
if (shouldFinishBlock) {
finishBlock();
writeInlineBlocks(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
import org.apache.hadoop.hbase.io.HFileLink;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.hfile.BlockCache;
import org.apache.hadoop.hbase.io.hfile.BlockCacheFactory;
Expand Down Expand Up @@ -189,6 +190,24 @@ public static void writeStoreFile(final StoreFileWriter writer, byte[] fam, byte
}
}

public static void writeLargeStoreFile(final StoreFileWriter writer, byte[] fam, byte[] qualifier,
int rounds) throws IOException {
long now = EnvironmentEdgeManager.currentTime();
try {
for (int i = 0; i < rounds; i++) {
for (char d = FIRST_CHAR; d <= LAST_CHAR; d++) {
for (char e = FIRST_CHAR; e <= LAST_CHAR; e++) {
byte[] b = new byte[] { (byte) d, (byte) e };
byte[] key = new byte[] { (byte) i };
writer.append(new KeyValue(key, fam, qualifier, now, b));
}
}
}
} finally {
writer.close();
}
}

/**
* Test that our mechanism of writing store files in one region to reference store files in other
* regions works.
Expand Down Expand Up @@ -1193,4 +1212,42 @@ public void testDataBlockSizeEncoded() throws Exception {
}
}

@Test
public void testDataBlockSizeCompressed() throws Exception {
Path dir = new Path(new Path(this.testDir, "7e0102"), "familyname");
Path path = new Path(dir, "1234567890");
DataBlockEncoding dataBlockEncoderAlgo = DataBlockEncoding.FAST_DIFF;
conf.setBoolean("hbase.block.size.limit.compressed", true);
cacheConf = new CacheConfig(conf);
HFileContext meta =
new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL).withChecksumType(CKTYPE)
.withBytesPerCheckSum(CKBYTES).withDataBlockEncoding(dataBlockEncoderAlgo)
.withCompression(Compression.Algorithm.GZ).build();
// Make a store file and write data to it.
StoreFileWriter writer = new StoreFileWriter.Builder(conf, cacheConf, this.fs)
.withFilePath(path).withMaxKeyCount(2000).withFileContext(meta).build();
writeLargeStoreFile(writer, Bytes.toBytes(name.getMethodName()),
Bytes.toBytes(name.getMethodName()), 200);
writer.close();
HStoreFile storeFile =
new HStoreFile(fs, writer.getPath(), conf, cacheConf, BloomType.NONE, true);
storeFile.initReader();
HFile.Reader fReader =
HFile.createReader(fs, writer.getPath(), storeFile.getCacheConf(), true, conf);
FSDataInputStreamWrapper fsdis = new FSDataInputStreamWrapper(fs, writer.getPath());
long fileSize = fs.getFileStatus(writer.getPath()).getLen();
FixedFileTrailer trailer = FixedFileTrailer.readFromStream(fsdis.getStream(false), fileSize);
long offset = trailer.getFirstDataBlockOffset(), max = trailer.getLastDataBlockOffset();
HFileBlock block;
int blockCount = 0;
while (offset <= max) {
block = fReader.readBlock(offset, -1, /* cacheBlock */ false, /* pread */ false,
/* isCompaction */ false, /* updateCacheMetrics */ false, null, null);
offset += block.getOnDiskSizeWithHeader();
blockCount += 1;
wchevreuil marked this conversation as resolved.
Show resolved Hide resolved
assertTrue(block.getUncompressedSizeWithoutHeader() >= BLOCKSIZE_SMALL);
}
assertEquals(blockCount, 100);
}

}