diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCompressedSizePredicator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCompressedSizePredicator.java
new file mode 100644
index 000000000000..a90e04fe5ad8
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCompressedSizePredicator.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile;
+
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Allows for defining different compression rate predicates on its implementing classes. Useful
+ * when compression is in place, and we want to define block size based on the compressed size,
+ * rather than the default behaviour that considers the uncompressed size only. Since we don't
+ * actually know the compressed size until we actual apply compression in the block byte buffer, we
+ * need to "predicate" this compression rate and minimize compression execution to avoid excessive
+ * resources usage. Different approaches for predicating the compressed block size can be defined by
+ * implementing classes. The updateLatestBlockSizes
allows for updating uncompressed
+ * and compressed size values, and is called during block finishing (when we finally apply
+ * compression on the block data). Final block size predicate logic is implemented in
+ * shouldFinishBlock
, which is called by the block writer once uncompressed size has
+ * reached the configured BLOCK size, and additional checks should be applied to decide if the block
+ * can be finished.
+ */
+@InterfaceAudience.Private
+public interface BlockCompressedSizePredicator {
+
+ String BLOCK_COMPRESSED_SIZE_PREDICATOR = "hbase.block.compressed.size.predicator";
+
+ String MAX_BLOCK_SIZE_UNCOMPRESSED = "hbase.block.max.size.uncompressed";
+
+ /**
+ * Updates the predicator with both compressed and uncompressed sizes of latest block written. To
+ * be called once the block is finshed and flushed to disk after compression.
+ * @param context the HFileContext containg the configured max block size.
+ * @param uncompressed the uncompressed size of last block written.
+ * @param compressed the compressed size of last block written.
+ */
+ void updateLatestBlockSizes(HFileContext context, int uncompressed, int compressed);
+
+ /**
+ * Decides if the block should be finished based on the comparison of its uncompressed size
+ * against an adjusted size based on a predicated compression factor.
+ * @param uncompressed true if the block should be finished. n
+ */
+ boolean shouldFinishBlock(int uncompressed);
+
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
index f68ffffa94ae..8e04580874fe 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.io.hfile;
import static org.apache.hadoop.hbase.io.ByteBuffAllocator.HEAP;
+import static org.apache.hadoop.hbase.io.hfile.BlockCompressedSizePredicator.BLOCK_COMPRESSED_SIZE_PREDICATOR;
import static org.apache.hadoop.hbase.io.hfile.trace.HFileContextAttributesBuilderConsumer.CONTEXT_KEY;
import io.opentelemetry.api.common.Attributes;
@@ -64,6 +65,7 @@
import org.apache.hadoop.hbase.util.ChecksumType;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.util.ReflectionUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -463,7 +465,7 @@ int getOnDiskSizeWithoutHeader() {
}
/** Returns the uncompressed size of data part (header and checksum excluded). */
- int getUncompressedSizeWithoutHeader() {
+ public int getUncompressedSizeWithoutHeader() {
return uncompressedSizeWithoutHeader;
}
@@ -740,6 +742,10 @@ private enum State {
BLOCK_READY
}
+ private int maxSizeUnCompressed;
+
+ private BlockCompressedSizePredicator compressedSizePredicator;
+
/** Writer state. Used to ensure the correct usage protocol. */
private State state = State.INIT;
@@ -818,11 +824,11 @@ EncodingState getEncodingState() {
*/
public Writer(Configuration conf, HFileDataBlockEncoder dataBlockEncoder,
HFileContext fileContext) {
- this(conf, dataBlockEncoder, fileContext, ByteBuffAllocator.HEAP);
+ this(conf, dataBlockEncoder, fileContext, ByteBuffAllocator.HEAP, fileContext.getBlocksize());
}
public Writer(Configuration conf, HFileDataBlockEncoder dataBlockEncoder,
- HFileContext fileContext, ByteBuffAllocator allocator) {
+ HFileContext fileContext, ByteBuffAllocator allocator, int maxSizeUnCompressed) {
if (fileContext.getBytesPerChecksum() < HConstants.HFILEBLOCK_HEADER_SIZE) {
throw new RuntimeException("Unsupported value of bytesPerChecksum. " + " Minimum is "
+ HConstants.HFILEBLOCK_HEADER_SIZE + " but the configured value is "
@@ -845,6 +851,10 @@ public Writer(Configuration conf, HFileDataBlockEncoder dataBlockEncoder,
// TODO: Why fileContext saved away when we have dataBlockEncoder and/or
// defaultDataBlockEncoder?
this.fileContext = fileContext;
+ this.compressedSizePredicator = (BlockCompressedSizePredicator) ReflectionUtils.newInstance(
+ conf.getClass(BLOCK_COMPRESSED_SIZE_PREDICATOR, UncompressedBlockSizePredicator.class),
+ new Configuration(conf));
+ this.maxSizeUnCompressed = maxSizeUnCompressed;
}
/**
@@ -897,6 +907,15 @@ void ensureBlockReady() throws IOException {
finishBlock();
}
+ public boolean checkBoundariesWithPredicate() {
+ int rawBlockSize = encodedBlockSizeWritten();
+ if (rawBlockSize >= maxSizeUnCompressed) {
+ return true;
+ } else {
+ return compressedSizePredicator.shouldFinishBlock(rawBlockSize);
+ }
+ }
+
/**
* Finish up writing of the block. Flushes the compressing stream (if using compression), fills
* out the header, does any compression/encryption of bytes to flush out to disk, and manages
@@ -911,6 +930,11 @@ private void finishBlock() throws IOException {
userDataStream.flush();
prevOffset = prevOffsetByType[blockType.getId()];
+ // We need to cache the unencoded/uncompressed size before changing the block state
+ int rawBlockSize = 0;
+ if (this.getEncodingState() != null) {
+ rawBlockSize = blockSizeWritten();
+ }
// We need to set state before we can package the block up for cache-on-write. In a way, the
// block is ready, but not yet encoded or compressed.
state = State.BLOCK_READY;
@@ -931,6 +955,10 @@ private void finishBlock() throws IOException {
onDiskBlockBytesWithHeader.reset();
onDiskBlockBytesWithHeader.write(compressAndEncryptDat.get(),
compressAndEncryptDat.getOffset(), compressAndEncryptDat.getLength());
+ // Update raw and compressed sizes in the predicate
+ compressedSizePredicator.updateLatestBlockSizes(fileContext, rawBlockSize,
+ onDiskBlockBytesWithHeader.size());
+
// Calculate how many bytes we need for checksum on the tail of the block.
int numBytes = (int) ChecksumUtil.numBytes(onDiskBlockBytesWithHeader.size(),
fileContext.getBytesPerChecksum());
@@ -938,6 +966,7 @@ private void finishBlock() throws IOException {
// Put the header for the on disk bytes; header currently is unfilled-out
putHeader(onDiskBlockBytesWithHeader, onDiskBlockBytesWithHeader.size() + numBytes,
baosInMemory.size(), onDiskBlockBytesWithHeader.size());
+
if (onDiskChecksum.length != numBytes) {
onDiskChecksum = new byte[numBytes];
}
@@ -1077,7 +1106,7 @@ int getUncompressedSizeWithoutHeader() {
/**
* The uncompressed size of the block data, including header size.
*/
- int getUncompressedSizeWithHeader() {
+ public int getUncompressedSizeWithHeader() {
expectState(State.BLOCK_READY);
return baosInMemory.size();
}
@@ -1101,7 +1130,7 @@ public int encodedBlockSizeWritten() {
* block at the moment. Note that this will return zero in the "block ready" state as well.
* @return the number of bytes written
*/
- int blockSizeWritten() {
+ public int blockSizeWritten() {
return state != State.WRITING ? 0 : this.getEncodingState().getUnencodedDataSizeWritten();
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java
index 80e333050c6b..d58be5fd1ced 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase.io.hfile;
+import static org.apache.hadoop.hbase.io.hfile.BlockCompressedSizePredicator.MAX_BLOCK_SIZE_UNCOMPRESSED;
+
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
@@ -292,7 +294,8 @@ protected void finishInit(final Configuration conf) {
throw new IllegalStateException("finishInit called twice");
}
blockWriter =
- new HFileBlock.Writer(conf, blockEncoder, hFileContext, cacheConf.getByteBuffAllocator());
+ new HFileBlock.Writer(conf, blockEncoder, hFileContext, cacheConf.getByteBuffAllocator(),
+ conf.getInt(MAX_BLOCK_SIZE_UNCOMPRESSED, hFileContext.getBlocksize() * 10));
// Data block index writer
boolean cacheIndexesOnWrite = cacheConf.shouldCacheIndexesOnWrite();
dataBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter(blockWriter,
@@ -319,6 +322,7 @@ protected void checkBlockBoundary() throws IOException {
shouldFinishBlock = blockWriter.encodedBlockSizeWritten() >= hFileContext.getBlocksize()
|| blockWriter.blockSizeWritten() >= hFileContext.getBlocksize();
}
+ shouldFinishBlock &= blockWriter.checkBoundariesWithPredicate();
if (shouldFinishBlock) {
finishBlock();
writeInlineBlocks(false);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PreviousBlockCompressionRatePredicator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PreviousBlockCompressionRatePredicator.java
new file mode 100644
index 000000000000..be0ee3bb9a77
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PreviousBlockCompressionRatePredicator.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile;
+
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * This BlockCompressedSizePredicator implementation adjusts the block size limit based on the
+ * compression rate of the block contents read so far. For the first block, adjusted size would be
+ * zero, so it performs a compression of current block contents and calculate compression rate and
+ * adjusted size. For subsequent blocks, decision whether the block should be finished or not will
+ * be based on the compression rate calculated for the previous block.
+ */
+@InterfaceAudience.Private
+public class PreviousBlockCompressionRatePredicator implements BlockCompressedSizePredicator {
+
+ private int adjustedBlockSize;
+ private int compressionRatio = 1;
+ private int configuredMaxBlockSize;
+
+ /**
+ * Recalculates compression rate for the last block and adjusts the block size limit as:
+ * BLOCK_SIZE * (uncompressed/compressed).
+ * @param context HFIleContext containing the configured max block size.
+ * @param uncompressed the uncompressed size of last block written.
+ * @param compressed the compressed size of last block written.
+ */
+ @Override
+ public void updateLatestBlockSizes(HFileContext context, int uncompressed, int compressed) {
+ configuredMaxBlockSize = context.getBlocksize();
+ compressionRatio = uncompressed / compressed;
+ adjustedBlockSize = context.getBlocksize() * compressionRatio;
+ }
+
+ /**
+ * Returns true if the passed uncompressed size is larger than the limit calculated by
+ * updateLatestBlockSizes
.
+ * @param uncompressed true if the block should be finished. n
+ */
+ @Override
+ public boolean shouldFinishBlock(int uncompressed) {
+ if (uncompressed >= configuredMaxBlockSize) {
+ return uncompressed >= adjustedBlockSize;
+ }
+ return false;
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/UncompressedBlockSizePredicator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/UncompressedBlockSizePredicator.java
new file mode 100644
index 000000000000..c259375a97de
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/UncompressedBlockSizePredicator.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile;
+
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * This BlockCompressedSizePredicator implementation doesn't actually performs any predicate and
+ * simply returns true on shouldFinishBlock
. This is the default implementation
+ * if hbase.block.compressed.size.predicator property is not defined.
+ */
+@InterfaceAudience.Private
+public class UncompressedBlockSizePredicator implements BlockCompressedSizePredicator {
+
+ /**
+ * Empty implementation. Does nothing.
+ * @param uncompressed the uncompressed size of last block written.
+ * @param compressed the compressed size of last block written.
+ */
+ @Override
+ public void updateLatestBlockSizes(HFileContext context, int uncompressed, int compressed) {
+ }
+
+ /**
+ * Dummy implementation that always returns true. This means, we will be only considering the
+ * block uncompressed size for deciding when to finish a block.
+ * @param uncompressed true if the block should be finished. n
+ */
+ @Override
+ public boolean shouldFinishBlock(int uncompressed) {
+ return true;
+ }
+
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStoreFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStoreFile.java
index 7eff766c0b25..d71b33e82d5a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStoreFile.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStoreFile.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.regionserver;
+import static org.apache.hadoop.hbase.io.hfile.BlockCompressedSizePredicator.BLOCK_COMPRESSED_SIZE_PREDICATOR;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -39,6 +40,7 @@
import java.util.OptionalLong;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiFunction;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -61,6 +63,7 @@
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
import org.apache.hadoop.hbase.io.HFileLink;
+import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.hfile.BlockCache;
import org.apache.hadoop.hbase.io.hfile.BlockCacheFactory;
@@ -74,8 +77,10 @@
import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
import org.apache.hadoop.hbase.io.hfile.HFileInfo;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
+import org.apache.hadoop.hbase.io.hfile.PreviousBlockCompressionRatePredicator;
import org.apache.hadoop.hbase.io.hfile.ReaderContext;
import org.apache.hadoop.hbase.io.hfile.ReaderContextBuilder;
+import org.apache.hadoop.hbase.io.hfile.UncompressedBlockSizePredicator;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.testclassification.MediumTests;
@@ -189,6 +194,24 @@ public static void writeStoreFile(final StoreFileWriter writer, byte[] fam, byte
}
}
+ public static void writeLargeStoreFile(final StoreFileWriter writer, byte[] fam, byte[] qualifier,
+ int rounds) throws IOException {
+ long now = EnvironmentEdgeManager.currentTime();
+ try {
+ for (int i = 0; i < rounds; i++) {
+ for (char d = FIRST_CHAR; d <= LAST_CHAR; d++) {
+ for (char e = FIRST_CHAR; e <= LAST_CHAR; e++) {
+ byte[] b = new byte[] { (byte) d, (byte) e };
+ byte[] key = new byte[] { (byte) i };
+ writer.append(new KeyValue(key, fam, qualifier, now, b));
+ }
+ }
+ }
+ } finally {
+ writer.close();
+ }
+ }
+
/**
* Test that our mechanism of writing store files in one region to reference store files in other
* regions works.
@@ -1193,4 +1216,55 @@ public void testDataBlockSizeEncoded() throws Exception {
}
}
+ @Test
+ public void testDataBlockSizeCompressed() throws Exception {
+ conf.set(BLOCK_COMPRESSED_SIZE_PREDICATOR,
+ PreviousBlockCompressionRatePredicator.class.getName());
+ testDataBlockSizeWithCompressionRatePredicator(11,
+ (s, c) -> (c > 1 && c < 11) ? s >= BLOCKSIZE_SMALL * 10 : true);
+ }
+
+ @Test
+ public void testDataBlockSizeUnCompressed() throws Exception {
+ conf.set(BLOCK_COMPRESSED_SIZE_PREDICATOR, UncompressedBlockSizePredicator.class.getName());
+ testDataBlockSizeWithCompressionRatePredicator(200, (s, c) -> s < BLOCKSIZE_SMALL * 10);
+ }
+
+ private void testDataBlockSizeWithCompressionRatePredicator(int expectedBlockCount,
+ BiFunction validation) throws Exception {
+ Path dir = new Path(new Path(this.testDir, "7e0102"), "familyname");
+ Path path = new Path(dir, "1234567890");
+ DataBlockEncoding dataBlockEncoderAlgo = DataBlockEncoding.FAST_DIFF;
+ cacheConf = new CacheConfig(conf);
+ HFileContext meta =
+ new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL).withChecksumType(CKTYPE)
+ .withBytesPerCheckSum(CKBYTES).withDataBlockEncoding(dataBlockEncoderAlgo)
+ .withCompression(Compression.Algorithm.GZ).build();
+ // Make a store file and write data to it.
+ StoreFileWriter writer = new StoreFileWriter.Builder(conf, cacheConf, this.fs)
+ .withFilePath(path).withMaxKeyCount(2000).withFileContext(meta).build();
+ writeLargeStoreFile(writer, Bytes.toBytes(name.getMethodName()),
+ Bytes.toBytes(name.getMethodName()), 200);
+ writer.close();
+ HStoreFile storeFile =
+ new HStoreFile(fs, writer.getPath(), conf, cacheConf, BloomType.NONE, true);
+ storeFile.initReader();
+ HFile.Reader fReader =
+ HFile.createReader(fs, writer.getPath(), storeFile.getCacheConf(), true, conf);
+ FSDataInputStreamWrapper fsdis = new FSDataInputStreamWrapper(fs, writer.getPath());
+ long fileSize = fs.getFileStatus(writer.getPath()).getLen();
+ FixedFileTrailer trailer = FixedFileTrailer.readFromStream(fsdis.getStream(false), fileSize);
+ long offset = trailer.getFirstDataBlockOffset(), max = trailer.getLastDataBlockOffset();
+ HFileBlock block;
+ int blockCount = 0;
+ while (offset <= max) {
+ block = fReader.readBlock(offset, -1, /* cacheBlock */ false, /* pread */ false,
+ /* isCompaction */ false, /* updateCacheMetrics */ false, null, null);
+ offset += block.getOnDiskSizeWithHeader();
+ blockCount++;
+ assertTrue(validation.apply(block.getUncompressedSizeWithoutHeader(), blockCount));
+ }
+ assertEquals(expectedBlockCount, blockCount);
+ }
+
}