Skip to content

Commit

Permalink
HBASE-25869 WAL value compression (#3244)
Browse files Browse the repository at this point in the history
WAL storage can be expensive, especially if the cell values
represented in the edits are large, consisting of blobs or
significant lengths of text. Such WALs might need to be kept around
for a fairly long time to satisfy replication constraints on a space
limited (or space-contended) filesystem.

We have a custom dictionary compression scheme for cell metadata that
is engaged when WAL compression is enabled in site configuration.
This is fine for that application, where we can expect the universe
of values and their lengths in the custom dictionaries to be
constrained. For arbitrary cell values it is better to use one of the
available compression codecs, which are suitable for arbitrary albeit
compressible data.

Signed-off-by: Bharath Vissapragada <bharathv@apache.org>
Signed-off-by: Duo Zhang <zhangduo@apache.org>
Signed-off-by: Nick Dimiduk <ndimiduk@apache.org>
  • Loading branch information
apurtell authored May 21, 2021
1 parent dfa88e1 commit 8ec6fd9
Show file tree
Hide file tree
Showing 14 changed files with 847 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,6 @@
*/
@InterfaceAudience.Private
public class KeyValue implements ExtendedCell, Cloneable {
private static final ArrayList<Tag> EMPTY_ARRAY_LIST = new ArrayList<>();

private static final Logger LOG = LoggerFactory.getLogger(KeyValue.class);

public static final int FIXED_OVERHEAD = ClassSize.OBJECT + // the KeyValue object itself
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hbase.io;

import java.io.IOException;
import java.io.InputStream;

import org.apache.yetus.audience.InterfaceAudience;

/**
* This is a stream that will only supply bytes from its delegate up to a certain limit.
* When there is an attempt to set the position beyond that it will signal that the input
* is finished.
*/
@InterfaceAudience.Private
public class BoundedDelegatingInputStream extends DelegatingInputStream {

protected long limit;
protected long pos;

public BoundedDelegatingInputStream(InputStream in, long limit) {
super(in);
this.limit = limit;
this.pos = 0;
}

public void setDelegate(InputStream in, long limit) {
this.in = in;
this.limit = limit;
this.pos = 0;
}

/**
* Call the delegate's {@code read()} method if the current position is less than the limit.
* @return the byte read or -1 if the end of stream or the limit has been reached.
*/
@Override
public int read() throws IOException {
if (pos >= limit) {
return -1;
}
int result = in.read();
pos++;
return result;
}

/**
* Call the delegate's {@code read(byte[], int, int)} method if the current position is less
* than the limit.
* @param b read buffer
* @param off Start offset
* @param len The number of bytes to read
* @return the number of bytes read or -1 if the end of stream or the limit has been reached.
*/
@Override
public int read(final byte[] b, final int off, final int len) throws IOException {
if (pos >= limit) {
return -1;
}
long readLen = Math.min(len, limit - pos);
int read = in.read(b, off, (int)readLen);
if (read < 0) {
return -1;
}
pos += read;
return read;
}

/**
* Call the delegate's {@code skip(long)} method.
* @param len the number of bytes to skip
* @return the actual number of bytes skipped
*/
@Override
public long skip(final long len) throws IOException {
long skipped = in.skip(Math.min(len, limit - pos));
pos += skipped;
return skipped;
}

/**
* Call the delegate's {@code available()} method.
* @return the delegate's available bytes if the current position is less than the
* limit, or 0 otherwise.
*/
@Override
public int available() throws IOException {
if (pos >= limit) {
return 0;
}
int available = in.available();
return (int) Math.min(available, limit - pos);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hbase.io;

import java.io.FilterInputStream;
import java.io.InputStream;

import org.apache.yetus.audience.InterfaceAudience;

/**
* An input stream that delegates all operations to another input stream.
* The delegate can be switched out for another at any time but to minimize the
* possibility of violating the InputStream contract it would be best to replace
* the delegate only once it has been fully consumed. <p> For example, a
* ByteArrayInputStream, which is implicitly bounded by the size of the underlying
* byte array can be converted into an unbounded stream fed by multiple instances
* of ByteArrayInputStream, switched out one for the other in sequence.
* <p>
* Although multithreaded access is allowed, users of this class will want to take
* care to order operations on this stream and the swap out of one delegate for
* another in a way that provides a valid view of stream contents.
*/
@InterfaceAudience.Private
public class DelegatingInputStream extends FilterInputStream {

public DelegatingInputStream(InputStream in) {
super(in);
}

public InputStream getDelegate() {
return this.in;
}

public void setDelegate(InputStream in) {
this.in = in;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ message WALHeader {
optional bool has_tag_compression = 3;
optional string writer_cls_name = 4;
optional string cell_codec_cls_name = 5;
optional bool has_value_compression = 6;
optional uint32 value_compression_algorithm = 7;
}

/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.crypto.Cipher;
import org.apache.hadoop.hbase.io.crypto.Encryption;
import org.apache.hadoop.hbase.io.crypto.Encryptor;
Expand Down Expand Up @@ -144,9 +145,22 @@ private boolean initializeCompressionContext(Configuration conf, Path path) thro
boolean doCompress = conf.getBoolean(HConstants.ENABLE_WAL_COMPRESSION, false);
if (doCompress) {
try {
final boolean useTagCompression =
conf.getBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, true);
final boolean useValueCompression =
conf.getBoolean(CompressionContext.ENABLE_WAL_VALUE_COMPRESSION, false);
final Compression.Algorithm valueCompressionType =
useValueCompression ? CompressionContext.getValueCompressionAlgorithm(conf) :
Compression.Algorithm.NONE;
if (LOG.isTraceEnabled()) {
LOG.trace("Initializing compression context for {}: isRecoveredEdits={}" +
", hasTagCompression={}, hasValueCompression={}, valueCompressionType={}", path,
CommonFSUtils.isRecoveredEdits(path), useTagCompression, useValueCompression,
valueCompressionType);
}
this.compressionContext =
new CompressionContext(LRUDictionary.class, CommonFSUtils.isRecoveredEdits(path),
conf.getBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, true));
useTagCompression, useValueCompression, valueCompressionType);
} catch (Exception e) {
throw new IOException("Failed to initiate CompressionContext", e);
}
Expand All @@ -165,17 +179,29 @@ public void init(FileSystem fs, Path path, Configuration conf, boolean overwrita

initOutput(fs, path, overwritable, bufferSize, replication, blocksize);

boolean doTagCompress = doCompress
&& conf.getBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, true);
length.set(writeMagicAndWALHeader(ProtobufLogReader.PB_WAL_MAGIC, buildWALHeader(conf,
WALHeader.newBuilder().setHasCompression(doCompress).setHasTagCompression(doTagCompress))));
boolean doTagCompress = doCompress &&
conf.getBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, true);
boolean doValueCompress = doCompress &&
conf.getBoolean(CompressionContext.ENABLE_WAL_VALUE_COMPRESSION, false);
WALHeader.Builder headerBuilder = WALHeader.newBuilder()
.setHasCompression(doCompress)
.setHasTagCompression(doTagCompress)
.setHasValueCompression(doValueCompress);
if (doValueCompress) {
headerBuilder.setValueCompressionAlgorithm(
CompressionContext.getValueCompressionAlgorithm(conf).ordinal());
}
length.set(writeMagicAndWALHeader(ProtobufLogReader.PB_WAL_MAGIC,
buildWALHeader(conf, headerBuilder)));

initAfterHeader(doCompress);

// instantiate trailer to default value.
trailer = WALTrailer.newBuilder().build();

if (LOG.isTraceEnabled()) {
LOG.trace("Initialized protobuf WAL=" + path + ", compression=" + doCompress);
LOG.trace("Initialized protobuf WAL={}, compression={}, tagCompression={}" +
", valueCompression={}", path, doCompress, doTagCompress, doValueCompress);
}
}

Expand Down
Loading

0 comments on commit 8ec6fd9

Please sign in to comment.