Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-25869 WAL value compression #3244

Merged
merged 12 commits into from
May 21, 2021
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,6 @@
*/
@InterfaceAudience.Private
public class KeyValue implements ExtendedCell, Cloneable {
private static final ArrayList<Tag> EMPTY_ARRAY_LIST = new ArrayList<>();

private static final Logger LOG = LoggerFactory.getLogger(KeyValue.class);

public static final int FIXED_OVERHEAD = ClassSize.OBJECT + // the KeyValue object itself
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hbase.io;

import java.io.IOException;
import java.io.InputStream;

import org.apache.yetus.audience.InterfaceAudience;

/**
* This is a stream that will only supply bytes from its delegate up to a certain limit.
* When there is an attempt to set the position beyond that it will signal that the input
* is finished.
*/
@InterfaceAudience.Private
public class BoundedDelegatingInputStream extends DelegatingInputStream {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can merge this and DelegatingInputStream? Don't think we need both of them separately.

public class BoundedDelegatingInputStream extends FilterInputStream {
 ....   < bounding methods>...
 setDelegate() {}
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought we could keep them both.


protected long limit;
protected long pos;

public BoundedDelegatingInputStream(InputStream in, long limit) {
super(in);
this.limit = limit;
this.pos = 0;
}

public void setDelegate(InputStream in, long limit) {
this.in = in;
this.limit = limit;
this.pos = 0;
}

/**
* Call the delegate's {@code read()} method if the current position is less than the limit.
* @return the byte read or -1 if the end of stream or the limit has been reached.
*/
@Override
public int read() throws IOException {
if (pos >= limit) {
return -1;
}
int result = in.read();
pos++;
return result;
}

/**
* Call the delegate's {@code read(byte[], int, int)} method if the current position is less
* than the limit.
* @param b read buffer
* @param off Start offset
* @param len The number of bytes to read
* @return the number of bytes read or -1 if the end of stream or the limit has been reached.
*/
@Override
public int read(final byte[] b, final int off, final int len) throws IOException {
if (pos >= limit) {
return -1;
}
long readLen = Math.min(len, limit - pos);
int read = in.read(b, off, (int)readLen);
if (read < 0) {
return -1;
}
pos += read;
return read;
}

/**
* Call the delegate's {@code skip(long)} method.
* @param len the number of bytes to skip
* @return the actual number of bytes skipped
*/
@Override
public long skip(final long len) throws IOException {
long skipped = in.skip(Math.min(len, limit - pos));
pos += skipped;
return skipped;
}

/**
* Call the delegate's {@code available()} method.
* @return the delegate's available bytes if the current position is less than the
* limit, or 0 otherwise.
*/
@Override
public int available() throws IOException {
if (pos >= limit) {
return 0;
}
int available = in.available();
return (int) Math.min(available, limit - pos);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hbase.io;

import java.io.FilterInputStream;
import java.io.InputStream;

import org.apache.yetus.audience.InterfaceAudience;

/**
* An input stream that delegates all operations to another input stream.
* The delegate can be switched out for another at any time but to minimize the
* possibility of violating the InputStream contract it would be best to replace
* the delegate only once it has been fully consumed. <p> For example, a
* ByteArrayInputStream, which is implicitly bounded by the size of the underlying
* byte array can be converted into an unbounded stream fed by multiple instances
* of ByteArrayInputStream, switched out one for the other in sequence.
* <p>
* Although multithreaded access is allowed, users of this class will want to take
* care to order operations on this stream and the swap out of one delegate for
* another in a way that provides a valid view of stream contents.
*/
@InterfaceAudience.Private
public class DelegatingInputStream extends FilterInputStream {

public DelegatingInputStream(InputStream in) {
super(in);
}

public InputStream getDelegate() {
return this.in;
}

public void setDelegate(InputStream in) {
this.in = in;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ message WALHeader {
optional bool has_tag_compression = 3;
optional string writer_cls_name = 4;
optional string cell_codec_cls_name = 5;
optional bool has_value_compression = 6;
apurtell marked this conversation as resolved.
Show resolved Hide resolved
optional uint32 value_compression_algorithm = 7;
}

/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.crypto.Cipher;
import org.apache.hadoop.hbase.io.crypto.Encryption;
import org.apache.hadoop.hbase.io.crypto.Encryptor;
Expand Down Expand Up @@ -144,9 +145,22 @@ private boolean initializeCompressionContext(Configuration conf, Path path) thro
boolean doCompress = conf.getBoolean(HConstants.ENABLE_WAL_COMPRESSION, false);
if (doCompress) {
try {
final boolean useTagCompression =
conf.getBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, true);
final boolean useValueCompression =
conf.getBoolean(CompressionContext.ENABLE_WAL_VALUE_COMPRESSION, false);
final Compression.Algorithm valueCompressionType =
useValueCompression ? CompressionContext.getValueCompressionAlgorithm(conf) :
Compression.Algorithm.NONE;
if (LOG.isTraceEnabled()) {
LOG.trace("Initializing compression context for {}: isRecoveredEdits={}" +
", hasTagCompression={}, hasValueCompression={}, valueCompressionType={}", path,
CommonFSUtils.isRecoveredEdits(path), useTagCompression, useValueCompression,
valueCompressionType);
}
this.compressionContext =
new CompressionContext(LRUDictionary.class, CommonFSUtils.isRecoveredEdits(path),
conf.getBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, true));
useTagCompression, useValueCompression, valueCompressionType);
} catch (Exception e) {
throw new IOException("Failed to initiate CompressionContext", e);
}
Expand All @@ -165,17 +179,29 @@ public void init(FileSystem fs, Path path, Configuration conf, boolean overwrita

initOutput(fs, path, overwritable, bufferSize, replication, blocksize);

boolean doTagCompress = doCompress
&& conf.getBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, true);
length.set(writeMagicAndWALHeader(ProtobufLogReader.PB_WAL_MAGIC, buildWALHeader(conf,
WALHeader.newBuilder().setHasCompression(doCompress).setHasTagCompression(doTagCompress))));
boolean doTagCompress = doCompress &&
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Duplication of config parsing between init andinitializeCompressionContext is bizarre ; we literally just did this work 8 lines earlier. for future cleanup, I suppose.

conf.getBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, true);
boolean doValueCompress = doCompress &&
conf.getBoolean(CompressionContext.ENABLE_WAL_VALUE_COMPRESSION, false);
WALHeader.Builder headerBuilder = WALHeader.newBuilder()
.setHasCompression(doCompress)
.setHasTagCompression(doTagCompress)
.setHasValueCompression(doValueCompress);
if (doValueCompress) {
headerBuilder.setValueCompressionAlgorithm(
CompressionContext.getValueCompressionAlgorithm(conf).ordinal());
}
length.set(writeMagicAndWALHeader(ProtobufLogReader.PB_WAL_MAGIC,
buildWALHeader(conf, headerBuilder)));

initAfterHeader(doCompress);

// instantiate trailer to default value.
trailer = WALTrailer.newBuilder().build();

if (LOG.isTraceEnabled()) {
LOG.trace("Initialized protobuf WAL=" + path + ", compression=" + doCompress);
LOG.trace("Initialized protobuf WAL={}, compression={}, tagCompression={}" +
", valueCompression={}", path, doCompress, doTagCompress, doValueCompress);
}
}

Expand Down
Loading