Skip to content

Commit

Permalink
HBASE-26959 Brotli compression support (#4353)
Browse files Browse the repository at this point in the history
Signed-off-by: Nick Dimiduk <ndimiduk@apache.org>
  • Loading branch information
apurtell committed Apr 22, 2022
1 parent 8d17241 commit 0f9f6f2
Show file tree
Hide file tree
Showing 32 changed files with 1,012 additions and 47 deletions.
4 changes: 4 additions & 0 deletions hbase-assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,10 @@
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-compression-aircompressor</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-compression-brotli</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-compression-lz4</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
public final class Compression {
private static final Logger LOG = LoggerFactory.getLogger(Compression.class);


// LZO

public static final String LZO_CODEC_CLASS_KEY =
Expand Down Expand Up @@ -97,6 +96,13 @@ public final class Compression {
public static final String LZMA_CODEC_CLASS_DEFAULT =
"org.apache.hadoop.hbase.io.compress.xz.LzmaCodec";

// Brotli

public static final String BROTLI_CODEC_CLASS_KEY =
"hbase.io.compress.brotli.codec";
public static final String BROTLI_CODEC_CLASS_DEFAULT =
"org.apache.hadoop.hbase.io.compress.brotli.BrotliCodec";

/**
* Prevent the instantiation of class.
*/
Expand Down Expand Up @@ -148,6 +154,7 @@ private static ClassLoader getClassLoaderForCodec() {
@edu.umd.cs.findbugs.annotations.SuppressWarnings(
value="SE_TRANSIENT_FIELD_NOT_RESTORED",
justification="We are not serializing so doesn't apply (not sure why transient though)")
@SuppressWarnings("ImmutableEnumChecker")
@InterfaceAudience.Public
public static enum Algorithm {
// LZO is GPL and requires extra install to setup. See
Expand Down Expand Up @@ -352,6 +359,31 @@ public CompressionCodec reload(Configuration conf) {
return lzmaCodec;
}
}
},

BROTLI("brotli", BROTLI_CODEC_CLASS_KEY, BROTLI_CODEC_CLASS_DEFAULT) {
// Use base type to avoid compile-time dependencies.
private volatile transient CompressionCodec brotliCodec;
private final transient Object lock = new Object();
@Override
CompressionCodec getCodec(Configuration conf) {
if (brotliCodec == null) {
synchronized (lock) {
if (brotliCodec == null) {
brotliCodec = buildCodec(conf, this);
}
}
}
return brotliCodec;
}
@Override
public CompressionCodec reload(Configuration conf) {
synchronized (lock) {
brotliCodec = buildCodec(conf, this);
LOG.warn("Reloaded configuration for {}", name());
return brotliCodec;
}
}
};

private final Configuration conf;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,16 @@ public static int roundInt2(int v) {
return v;
}

/**
* Most compression algorithms can be presented with pathological input that causes an
* expansion rather than a compression. Hadoop's compression API requires that we calculate
* additional buffer space required for the worst case. There is a formula developed for
* gzip that applies as a ballpark to all LZ variants. It should be good enough for now and
* has been tested as such with a range of different inputs.
*/
public static int compressionOverhead(int bufferSize) {
// Given an input buffer of 'buffersize' bytes we presume a worst case expansion of
// 32 bytes (block header) and addition 1/6th of the input size.
return (bufferSize / 6) + 32;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public int compress(byte[] b, int off, int len) throws IOException {
if (outBuf.hasRemaining()) {
int remaining = outBuf.remaining(), n = Math.min(remaining, len);
outBuf.get(b, off, n);
LOG.trace("compress: {} bytes from outBuf", n);
LOG.trace("compress: read {} remaining bytes from outBuf", n);
return n;
}
// We don't actually begin compression until our caller calls finish().
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public int decompress(byte[] b, int off, int len) throws IOException {
if (outBuf.hasRemaining()) {
int remaining = outBuf.remaining(), n = Math.min(remaining, len);
outBuf.get(b, off, n);
LOG.trace("decompress: {} bytes from outBuf", n);
LOG.trace("decompress: read {} remaining bytes from outBuf", n);
return n;
}
if (inBuf.position() > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.hbase.io.compress.CompressionUtil;
import org.apache.hadoop.io.compress.BlockCompressorStream;
import org.apache.hadoop.io.compress.BlockDecompressorStream;
import org.apache.hadoop.io.compress.CompressionCodec;
Expand Down Expand Up @@ -91,8 +92,8 @@ public CompressionOutputStream createOutputStream(OutputStream out) throws IOExc
public CompressionOutputStream createOutputStream(OutputStream out, Compressor c)
throws IOException {
int bufferSize = getBufferSize(conf);
int compressionOverhead = (bufferSize / 6) + 32;
return new BlockCompressorStream(out, c, bufferSize, compressionOverhead);
return new BlockCompressorStream(out, c, bufferSize,
CompressionUtil.compressionOverhead(bufferSize));
}

@Override
Expand Down Expand Up @@ -149,10 +150,9 @@ public class HadoopLz4Decompressor extends HadoopDecompressor<Lz4Decompressor> {
// Package private

static int getBufferSize(Configuration conf) {
int size = conf.getInt(LZ4_BUFFER_SIZE_KEY,
return conf.getInt(LZ4_BUFFER_SIZE_KEY,
conf.getInt(CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_KEY,
CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_DEFAULT));
return size > 0 ? size : 256 * 1024; // Don't change this default
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.hbase.io.compress.CompressionUtil;
import org.apache.hadoop.io.compress.BlockCompressorStream;
import org.apache.hadoop.io.compress.BlockDecompressorStream;
import org.apache.hadoop.io.compress.CompressionCodec;
Expand Down Expand Up @@ -91,8 +92,8 @@ public CompressionOutputStream createOutputStream(OutputStream out) throws IOExc
public CompressionOutputStream createOutputStream(OutputStream out, Compressor c)
throws IOException {
int bufferSize = getBufferSize(conf);
int compressionOverhead = (bufferSize / 6) + 32;
return new BlockCompressorStream(out, c, bufferSize, compressionOverhead);
return new BlockCompressorStream(out, c, bufferSize,
CompressionUtil.compressionOverhead(bufferSize));
}

@Override
Expand Down Expand Up @@ -149,10 +150,9 @@ public class HadoopLzoDecompressor extends HadoopDecompressor<LzoDecompressor> {
// Package private

static int getBufferSize(Configuration conf) {
int size = conf.getInt(LZO_BUFFER_SIZE_KEY,
return conf.getInt(LZO_BUFFER_SIZE_KEY,
conf.getInt(CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY,
CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_DEFAULT));
return size > 0 ? size : 256 * 1024; // Don't change this default
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.hbase.io.compress.CompressionUtil;
import org.apache.hadoop.io.compress.BlockCompressorStream;
import org.apache.hadoop.io.compress.BlockDecompressorStream;
import org.apache.hadoop.io.compress.CompressionCodec;
Expand Down Expand Up @@ -91,8 +92,8 @@ public CompressionOutputStream createOutputStream(OutputStream out) throws IOExc
public CompressionOutputStream createOutputStream(OutputStream out, Compressor c)
throws IOException {
int bufferSize = getBufferSize(conf);
int compressionOverhead = (bufferSize / 6) + 32;
return new BlockCompressorStream(out, c, bufferSize, compressionOverhead);
return new BlockCompressorStream(out, c, bufferSize,
CompressionUtil.compressionOverhead(bufferSize));
}

@Override
Expand Down Expand Up @@ -149,10 +150,9 @@ public class HadoopSnappyDecompressor extends HadoopDecompressor<SnappyDecompres
// Package private

static int getBufferSize(Configuration conf) {
int size = conf.getInt(SNAPPY_BUFFER_SIZE_KEY,
return conf.getInt(SNAPPY_BUFFER_SIZE_KEY,
conf.getInt(CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_KEY,
CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_DEFAULT));
return size > 0 ? size : 256 * 1024; // Don't change this default
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.hbase.io.compress.CompressionUtil;
import org.apache.hadoop.io.compress.BlockCompressorStream;
import org.apache.hadoop.io.compress.BlockDecompressorStream;
import org.apache.hadoop.io.compress.CompressionCodec;
Expand Down Expand Up @@ -52,6 +53,7 @@
public class ZstdCodec implements Configurable, CompressionCodec {

public static final String ZSTD_BUFFER_SIZE_KEY = "hbase.io.compress.zstd.buffersize";
public static final int ZSTD_BUFFER_SIZE_DEFAULT = 256 * 1024;

private Configuration conf;

Expand Down Expand Up @@ -99,8 +101,8 @@ public CompressionOutputStream createOutputStream(OutputStream out) throws IOExc
public CompressionOutputStream createOutputStream(OutputStream out, Compressor c)
throws IOException {
int bufferSize = getBufferSize(conf);
int compressionOverhead = (bufferSize / 6) + 32;
return new BlockCompressorStream(out, c, bufferSize, compressionOverhead);
return new BlockCompressorStream(out, c, bufferSize,
CompressionUtil.compressionOverhead(bufferSize));
}

@Override
Expand Down Expand Up @@ -157,10 +159,10 @@ public class HadoopZstdDecompressor extends HadoopDecompressor<ZstdDecompressor>
// Package private

static int getBufferSize(Configuration conf) {
int size = conf.getInt(ZSTD_BUFFER_SIZE_KEY,
return conf.getInt(ZSTD_BUFFER_SIZE_KEY,
conf.getInt(CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_KEY,
CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_DEFAULT));
return size > 0 ? size : 256 * 1024; // Don't change this default
// IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_DEFAULT is 0! We can't allow that.
ZSTD_BUFFER_SIZE_DEFAULT));
}

}
Loading

0 comments on commit 0f9f6f2

Please sign in to comment.