Skip to content

Commit

Permalink
HBASE-25869 WAL value compression
Browse files Browse the repository at this point in the history
WAL storage can be expensive, especially if the cell values
represented in the edits are large, consisting of blobs or
significant lengths of text. Such WALs might need to be kept around
for a fairly long time to satisfy replication constraints on a space
limited (or space-contended) filesystem.

We have a custom dictionary compression scheme for cell metadata that
is engaged when WAL compression is enabled in site configuration. This
is fine for that application, where we can expect the universe of values
and their lengths in the custom dictionaries to be constrained. For
arbitrary cell values it is better to use Deflate compression, which is
which is a complete LZ-class algorithm suitable for arbitrary albeit
compressible data, is reasonably fast, certainly fast enough for WALs,
compresses well, and is universally available as part of the Java
runtime.

With a trick that encodes whether or not the cell value is compressed in
the high order bit of the type byte, this can be done in a backwards
compatible manner.
  • Loading branch information
apurtell committed May 8, 2021
1 parent 02b018c commit bf89426
Show file tree
Hide file tree
Showing 8 changed files with 255 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,11 @@ public static enum Type {
DeleteColumn((byte)12),
DeleteFamily((byte)14),

// Effective maximum is 127 (Byte.MAX_VALUE). We set the high order bit of the
// type byte in the WAL codecs to indicate, in a backwards compatible way, if the
// value is compressed there.
EffectiveMaximum((byte)Byte.MAX_VALUE),

// Maximum is used when searching; you look from maximum on down.
Maximum((byte)255);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ message WALHeader {
optional bool has_tag_compression = 3;
optional string writer_cls_name = 4;
optional string cell_codec_cls_name = 5;
optional bool has_value_compression = 6;
}

/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,8 @@ private boolean initializeCompressionContext(Configuration conf, Path path) thro
try {
this.compressionContext =
new CompressionContext(LRUDictionary.class, CommonFSUtils.isRecoveredEdits(path),
conf.getBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, true));
conf.getBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, true),
conf.getBoolean(CompressionContext.ENABLE_WAL_VALUE_COMPRESSION, false));
} catch (Exception e) {
throw new IOException("Failed to initiate CompressionContext", e);
}
Expand All @@ -167,8 +168,13 @@ public void init(FileSystem fs, Path path, Configuration conf, boolean overwrita

boolean doTagCompress = doCompress
&& conf.getBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, true);
length.set(writeMagicAndWALHeader(ProtobufLogReader.PB_WAL_MAGIC, buildWALHeader(conf,
WALHeader.newBuilder().setHasCompression(doCompress).setHasTagCompression(doTagCompress))));
boolean doValueCompress = doCompress
&& conf.getBoolean(CompressionContext.ENABLE_WAL_VALUE_COMPRESSION, false);
length.set(writeMagicAndWALHeader(ProtobufLogReader.PB_WAL_MAGIC,
buildWALHeader(conf, WALHeader.newBuilder()
.setHasCompression(doCompress)
.setHasTagCompression(doTagCompress)
.setHasValueCompression(doValueCompress))));

initAfterHeader(doCompress);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.lang.reflect.InvocationTargetException;
import java.util.EnumMap;
import java.util.Map;
import java.util.zip.Deflater;

import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.yetus.audience.InterfaceAudience;
Expand All @@ -37,6 +38,9 @@ public class CompressionContext {
static final String ENABLE_WAL_TAGS_COMPRESSION =
"hbase.regionserver.wal.tags.enablecompression";

static final String ENABLE_WAL_VALUE_COMPRESSION =
"hbase.regionserver.wal.value.enablecompression";

public enum DictionaryIndex {
REGION, TABLE, FAMILY, QUALIFIER, ROW
}
Expand All @@ -45,10 +49,12 @@ public enum DictionaryIndex {
new EnumMap<>(DictionaryIndex.class);
// Context used for compressing tags
TagCompressionContext tagCompressionContext = null;
Deflater valueCompressor = null;

public CompressionContext(Class<? extends Dictionary> dictType, boolean recoveredEdits,
boolean hasTagCompression) throws SecurityException, NoSuchMethodException,
InstantiationException, IllegalAccessException, InvocationTargetException {
boolean hasTagCompression, boolean hasValueCompression)
throws SecurityException, NoSuchMethodException, InstantiationException,
IllegalAccessException, InvocationTargetException {
Constructor<? extends Dictionary> dictConstructor =
dictType.getConstructor();
for (DictionaryIndex dictionaryIndex : DictionaryIndex.values()) {
Expand All @@ -70,18 +76,38 @@ public CompressionContext(Class<? extends Dictionary> dictType, boolean recovere
if (hasTagCompression) {
tagCompressionContext = new TagCompressionContext(dictType, Short.MAX_VALUE);
}
if (hasValueCompression) {
valueCompressor = new Deflater();
// Optimize for encoding speed
valueCompressor.setLevel(Deflater.BEST_SPEED);
}
}

public CompressionContext(Class<? extends Dictionary> dictType, boolean recoveredEdits,
boolean hasTagCompression)
throws SecurityException, NoSuchMethodException, InstantiationException,
IllegalAccessException, InvocationTargetException {
this(dictType, recoveredEdits, hasTagCompression, false);
}

public Dictionary getDictionary(Enum dictIndex) {
return dictionaries.get(dictIndex);
}

public Deflater getValueCompressor() {
return valueCompressor;
}

void clear() {
for(Dictionary dictionary : dictionaries.values()){
dictionary.clear();
}
if (tagCompressionContext != null) {
tagCompressionContext.clear();
}
if (valueCompressor != null) {
valueCompressor.reset();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ public class ProtobufLogReader extends ReaderBase {
protected WALCellCodec.ByteStringUncompressor byteStringUncompressor;
protected boolean hasCompression = false;
protected boolean hasTagCompression = false;
protected boolean hasValueCompression = false;
// walEditsStopOffset is the position of the last byte to read. After reading the last WALEdit
// entry in the wal, the inputstream's position is equal to walEditsStopOffset.
private long walEditsStopOffset;
Expand Down Expand Up @@ -227,6 +228,7 @@ private String initInternal(FSDataInputStream stream, boolean isFirst)
WALProtos.WALHeader header = builder.build();
this.hasCompression = header.hasHasCompression() && header.getHasCompression();
this.hasTagCompression = header.hasHasTagCompression() && header.getHasTagCompression();
this.hasValueCompression = header.hasHasValueCompression() && header.getHasValueCompression();
}
this.inputStream = stream;
this.walEditsStopOffset = this.fileLength;
Expand Down Expand Up @@ -327,6 +329,11 @@ protected boolean hasTagCompression() {
return this.hasTagCompression;
}

@Override
protected boolean hasValueCompression() {
return this.hasValueCompression;
}

@Override
protected boolean readNext(Entry entry) throws IOException {
// OriginalPosition might be < 0 on local fs; if so, it is useless to us.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public void init(FileSystem fs, Path path, Configuration conf, FSDataInputStream
try {
if (compressionContext == null) {
compressionContext = new CompressionContext(LRUDictionary.class,
CommonFSUtils.isRecoveredEdits(path), hasTagCompression());
CommonFSUtils.isRecoveredEdits(path), hasTagCompression(), hasValueCompression());
} else {
compressionContext.clear();
}
Expand Down Expand Up @@ -151,6 +151,11 @@ public void seek(long pos) throws IOException {
*/
protected abstract boolean hasTagCompression();

/**
* @return Whether value compression is enabled for this log.
*/
protected abstract boolean hasValueCompression();

/**
* Read next entry.
* @param e The entry to read into.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.zip.DataFormatException;
import java.util.zip.Deflater;
import java.util.zip.Inflater;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
Expand All @@ -43,7 +46,7 @@
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.hadoop.io.IOUtils;

import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;

Expand Down Expand Up @@ -220,6 +223,8 @@ private static byte[] uncompressByteString(ByteString bs, Dictionary dict) throw
}
}

static final int VALUE_COMPRESS_THRESHOLD = 100;

static class CompressedKvEncoder extends BaseEncoder {
private final CompressionContext compression;
public CompressedKvEncoder(OutputStream out, CompressionContext compression) {
Expand All @@ -241,10 +246,19 @@ public void write(Cell cell) throws IOException {
compression.getDictionary(CompressionContext.DictionaryIndex.FAMILY));
PrivateCellUtil.compressQualifier(out, cell,
compression.getDictionary(CompressionContext.DictionaryIndex.QUALIFIER));
// Write timestamp, type and value as uncompressed.
// Write timestamp, type and value.
StreamUtils.writeLong(out, cell.getTimestamp());
out.write(cell.getTypeByte());
PrivateCellUtil.writeValue(out, cell, cell.getValueLength());
byte type = cell.getTypeByte();
if (compression.getValueCompressor() != null &&
cell.getValueLength() > VALUE_COMPRESS_THRESHOLD) {
// Set the high bit of type to indicate the value is compressed
out.write((byte)(type|0x80));
writeCompressedValue(out, cell.getValueArray(), cell.getValueOffset(),
cell.getValueLength(), compression.getValueCompressor());
} else {
out.write(type);
PrivateCellUtil.writeValue(out, cell, cell.getValueLength());
}
if (tagsLength > 0) {
if (compression.tagCompressionContext != null) {
// Write tags using Dictionary compression
Expand All @@ -256,6 +270,28 @@ public void write(Cell cell) throws IOException {
}
}
}

public static void writeCompressedValue(OutputStream out, byte[] valueArray, int offset,
int vlength, Deflater deflater) throws IOException {
byte[] buffer = new byte[4096];
ByteArrayOutputStream baos = new ByteArrayOutputStream();
deflater.reset();
deflater.setInput(valueArray, offset, vlength);
boolean finished = false;
do {
int bytesOut = deflater.deflate(buffer);
if (bytesOut > 0) {
baos.write(buffer, 0, bytesOut);
} else {
bytesOut = deflater.deflate(buffer, 0, buffer.length, Deflater.FULL_FLUSH);
baos.write(buffer, 0, bytesOut);
finished = true;
}
} while (!finished);
StreamUtils.writeRawVInt32(out, baos.size());
out.write(baos.toByteArray());
}

}

static class CompressedKvDecoder extends BaseDecoder {
Expand All @@ -269,7 +305,6 @@ public CompressedKvDecoder(InputStream in, CompressionContext compression) {
protected Cell parseCell() throws IOException {
int keylength = StreamUtils.readRawVarint32(in);
int vlength = StreamUtils.readRawVarint32(in);

int tagsLength = StreamUtils.readRawVarint32(in);
int length = 0;
if(tagsLength == 0) {
Expand Down Expand Up @@ -302,14 +337,28 @@ protected Cell parseCell() throws IOException {
compression.getDictionary(CompressionContext.DictionaryIndex.QUALIFIER));
pos += elemLen;

// timestamp, type and value
int tsTypeValLen = length - pos;
// timestamp
long ts = StreamUtils.readLong(in);
pos = Bytes.putLong(backingArray, pos, ts);
// type and value
int typeValLen = length - pos;
if (tagsLength > 0) {
tsTypeValLen = tsTypeValLen - tagsLength - KeyValue.TAGS_LENGTH_SIZE;
typeValLen = typeValLen - tagsLength - KeyValue.TAGS_LENGTH_SIZE;
}
// high bit of type byte is 1 if value is compressed
byte type = (byte)in.read();
if ((type & 0x80) == 0x80) {
type = (byte)(type & 0x7f);
pos = Bytes.putByte(backingArray, pos, type);
int valLen = typeValLen - 1;
readCompressedValue(in, backingArray, pos, valLen);
pos += valLen;
} else {
pos = Bytes.putByte(backingArray, pos, type);
int valLen = typeValLen - 1;
IOUtils.readFully(in, backingArray, pos, valLen);
pos += valLen;
}
IOUtils.readFully(in, backingArray, pos, tsTypeValLen);
pos += tsTypeValLen;

// tags
if (tagsLength > 0) {
pos = Bytes.putAsShort(backingArray, pos, tagsLength);
Expand Down Expand Up @@ -349,6 +398,27 @@ private static void checkLength(int len, int max) throws IOException {
throw new IOException("Invalid length for compresesed portion of keyvalue: " + len);
}
}

public static void readCompressedValue(InputStream in, byte[] outArray, int outOffset,
int expectedLength) throws IOException {
int compressedLength = StreamUtils.readRawVarint32(in);
byte[] buffer = new byte[compressedLength];
IOUtils.readFully(in, buffer, 0, compressedLength);
Inflater inflater = new Inflater();
inflater.setInput(buffer);
int remaining = expectedLength;
do {
try {
int inflatedBytes = inflater.inflate(outArray, outOffset, remaining);
Preconditions.checkState(inflatedBytes > 0, "Inflater state error");
outOffset += inflatedBytes;
remaining -= inflatedBytes;
} catch (DataFormatException e) {
throw new IOException(e);
}
} while (remaining > 0);
}

}

public static class EnsureKvEncoder extends BaseEncoder {
Expand Down
Loading

0 comments on commit bf89426

Please sign in to comment.