diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java index bc520b62593d..5dafa76feb05 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java @@ -76,8 +76,6 @@ */ @InterfaceAudience.Private public class KeyValue implements ExtendedCell, Cloneable { - private static final ArrayList EMPTY_ARRAY_LIST = new ArrayList<>(); - private static final Logger LOG = LoggerFactory.getLogger(KeyValue.class); public static final int FIXED_OVERHEAD = ClassSize.OBJECT + // the KeyValue object itself diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedDelegatingInputStream.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedDelegatingInputStream.java new file mode 100644 index 000000000000..c7002114099b --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedDelegatingInputStream.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.io; + +import java.io.IOException; +import java.io.InputStream; + +import org.apache.yetus.audience.InterfaceAudience; + +/** + * This is a stream that will only supply bytes from its delegate up to a certain limit. + * When there is an attempt to set the position beyond that it will signal that the input + * is finished. + */ +@InterfaceAudience.Private +public class BoundedDelegatingInputStream extends DelegatingInputStream { + + protected long limit; + protected long pos; + + public BoundedDelegatingInputStream(InputStream in, long limit) { + super(in); + this.limit = limit; + this.pos = 0; + } + + public void setDelegate(InputStream in, long limit) { + this.in = in; + this.limit = limit; + this.pos = 0; + } + + /** + * Call the delegate's {@code read()} method if the current position is less than the limit. + * @return the byte read or -1 if the end of stream or the limit has been reached. + */ + @Override + public int read() throws IOException { + if (pos >= limit) { + return -1; + } + int result = in.read(); + pos++; + return result; + } + + /** + * Call the delegate's {@code read(byte[], int, int)} method if the current position is less + * than the limit. + * @param b read buffer + * @param off Start offset + * @param len The number of bytes to read + * @return the number of bytes read or -1 if the end of stream or the limit has been reached. + */ + @Override + public int read(final byte[] b, final int off, final int len) throws IOException { + if (pos >= limit) { + return -1; + } + long readLen = Math.min(len, limit - pos); + int read = in.read(b, off, (int)readLen); + if (read < 0) { + return -1; + } + pos += read; + return read; + } + + /** + * Call the delegate's {@code skip(long)} method. + * @param len the number of bytes to skip + * @return the actual number of bytes skipped + */ + @Override + public long skip(final long len) throws IOException { + long skipped = in.skip(Math.min(len, limit - pos)); + pos += skipped; + return skipped; + } + + /** + * Call the delegate's {@code available()} method. + * @return the delegate's available bytes if the current position is less than the + * limit, or 0 otherwise. + */ + @Override + public int available() throws IOException { + if (pos >= limit) { + return 0; + } + int available = in.available(); + return (int) Math.min(available, limit - pos); + } + +} diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/DelegatingInputStream.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/DelegatingInputStream.java new file mode 100644 index 000000000000..6bd82ae10641 --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/DelegatingInputStream.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.io; + +import java.io.FilterInputStream; +import java.io.InputStream; + +import org.apache.yetus.audience.InterfaceAudience; + +/** + * An input stream that delegates all operations to another input stream. + * The delegate can be switched out for another at any time but to minimize the + * possibility of violating the InputStream contract it would be best to replace + * the delegate only once it has been fully consumed.

For example, a + * ByteArrayInputStream, which is implicitly bounded by the size of the underlying + * byte array can be converted into an unbounded stream fed by multiple instances + * of ByteArrayInputStream, switched out one for the other in sequence. + *

+ * Although multithreaded access is allowed, users of this class will want to take + * care to order operations on this stream and the swap out of one delegate for + * another in a way that provides a valid view of stream contents. + */ +@InterfaceAudience.Private +public class DelegatingInputStream extends FilterInputStream { + + public DelegatingInputStream(InputStream in) { + super(in); + } + + public InputStream getDelegate() { + return this.in; + } + + public void setDelegate(InputStream in) { + this.in = in; + } + +} diff --git a/hbase-protocol-shaded/src/main/protobuf/server/region/WAL.proto b/hbase-protocol-shaded/src/main/protobuf/server/region/WAL.proto index fd622cfc5ba1..48a108bb8a79 100644 --- a/hbase-protocol-shaded/src/main/protobuf/server/region/WAL.proto +++ b/hbase-protocol-shaded/src/main/protobuf/server/region/WAL.proto @@ -32,6 +32,8 @@ message WALHeader { optional bool has_tag_compression = 3; optional string writer_cls_name = 4; optional string cell_codec_cls_name = 5; + optional bool has_value_compression = 6; + optional uint32 value_compression_algorithm = 7; } /* diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java index a94e5c8f02e5..3b84488d1dc7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.codec.Codec; +import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.crypto.Cipher; import org.apache.hadoop.hbase.io.crypto.Encryption; import org.apache.hadoop.hbase.io.crypto.Encryptor; @@ -144,9 +145,22 @@ private boolean initializeCompressionContext(Configuration conf, Path path) thro boolean doCompress = conf.getBoolean(HConstants.ENABLE_WAL_COMPRESSION, false); if (doCompress) { try { + final boolean useTagCompression = + conf.getBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, true); + final boolean useValueCompression = + conf.getBoolean(CompressionContext.ENABLE_WAL_VALUE_COMPRESSION, false); + final Compression.Algorithm valueCompressionType = + useValueCompression ? CompressionContext.getValueCompressionAlgorithm(conf) : + Compression.Algorithm.NONE; + if (LOG.isTraceEnabled()) { + LOG.trace("Initializing compression context for {}: isRecoveredEdits={}" + + ", hasTagCompression={}, hasValueCompression={}, valueCompressionType={}", path, + CommonFSUtils.isRecoveredEdits(path), useTagCompression, useValueCompression, + valueCompressionType); + } this.compressionContext = new CompressionContext(LRUDictionary.class, CommonFSUtils.isRecoveredEdits(path), - conf.getBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, true)); + useTagCompression, useValueCompression, valueCompressionType); } catch (Exception e) { throw new IOException("Failed to initiate CompressionContext", e); } @@ -165,17 +179,29 @@ public void init(FileSystem fs, Path path, Configuration conf, boolean overwrita initOutput(fs, path, overwritable, bufferSize, replication, blocksize); - boolean doTagCompress = doCompress - && conf.getBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, true); - length.set(writeMagicAndWALHeader(ProtobufLogReader.PB_WAL_MAGIC, buildWALHeader(conf, - WALHeader.newBuilder().setHasCompression(doCompress).setHasTagCompression(doTagCompress)))); + boolean doTagCompress = doCompress && + conf.getBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, true); + boolean doValueCompress = doCompress && + conf.getBoolean(CompressionContext.ENABLE_WAL_VALUE_COMPRESSION, false); + WALHeader.Builder headerBuilder = WALHeader.newBuilder() + .setHasCompression(doCompress) + .setHasTagCompression(doTagCompress) + .setHasValueCompression(doValueCompress); + if (doValueCompress) { + headerBuilder.setValueCompressionAlgorithm( + CompressionContext.getValueCompressionAlgorithm(conf).ordinal()); + } + length.set(writeMagicAndWALHeader(ProtobufLogReader.PB_WAL_MAGIC, + buildWALHeader(conf, headerBuilder))); initAfterHeader(doCompress); // instantiate trailer to default value. trailer = WALTrailer.newBuilder().build(); + if (LOG.isTraceEnabled()) { - LOG.trace("Initialized protobuf WAL=" + path + ", compression=" + doCompress); + LOG.trace("Initialized protobuf WAL={}, compression={}, tagCompression={}" + + ", valueCompression={}", path, doCompress, doTagCompress, doValueCompress); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java index 16866e18a7cf..82bad934393c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java @@ -18,37 +18,155 @@ package org.apache.hadoop.hbase.regionserver.wal; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; import java.util.EnumMap; import java.util.Map; - +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseInterfaceAudience; -import org.apache.yetus.audience.InterfaceAudience; +import org.apache.hadoop.hbase.io.BoundedDelegatingInputStream; import org.apache.hadoop.hbase.io.TagCompressionContext; +import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.util.Dictionary; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Context that holds the various dictionaries for compression in WAL. + *

+ * CompressionContexts are not expected to be shared among threads. Multithreaded use may + * produce unexpected results. */ @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX}) public class CompressionContext { - static final String ENABLE_WAL_TAGS_COMPRESSION = - "hbase.regionserver.wal.tags.enablecompression"; + private static final Logger LOG = LoggerFactory.getLogger(CompressionContext.class); + + public static final String ENABLE_WAL_TAGS_COMPRESSION = + "hbase.regionserver.wal.tags.enablecompression"; + + public static final String ENABLE_WAL_VALUE_COMPRESSION = + "hbase.regionserver.wal.value.enablecompression"; + + public static final String WAL_VALUE_COMPRESSION_TYPE = + "hbase.regionserver.wal.value.compression.type"; public enum DictionaryIndex { REGION, TABLE, FAMILY, QUALIFIER, ROW } + /** + * Encapsulates the compression algorithm and its streams that we will use for value + * compression in this WAL. + */ + static class ValueCompressor { + + static final int IO_BUFFER_SIZE = 4096; + + private final Compression.Algorithm algorithm; + private BoundedDelegatingInputStream lowerIn; + private ByteArrayOutputStream lowerOut; + private InputStream compressedIn; + private OutputStream compressedOut; + + public ValueCompressor(Compression.Algorithm algorithm) { + this.algorithm = algorithm; + } + + public Compression.Algorithm getAlgorithm() { + return algorithm; + } + + public byte[] compress(byte[] valueArray, int valueOffset, int valueLength) + throws IOException { + if (compressedOut == null) { + // Create the output streams here the first time around. + lowerOut = new ByteArrayOutputStream(); + compressedOut = algorithm.createCompressionStream(lowerOut, algorithm.getCompressor(), + IO_BUFFER_SIZE); + } else { + lowerOut.reset(); + } + compressedOut.write(valueArray, valueOffset, valueLength); + compressedOut.flush(); + return lowerOut.toByteArray(); + } + + public int decompress(InputStream in, int inLength, byte[] outArray, int outOffset, + int outLength) throws IOException { + + // Our input is a sequence of bounded byte ranges (call them segments), with + // BoundedDelegatingInputStream providing a way to switch in a new segment when the + // previous segment has been fully consumed. + + // Create the input streams here the first time around. + if (compressedIn == null) { + lowerIn = new BoundedDelegatingInputStream(in, inLength); + compressedIn = algorithm.createDecompressionStream(lowerIn, algorithm.getDecompressor(), + IO_BUFFER_SIZE); + } else { + lowerIn.setDelegate(in, inLength); + } + + // Caller must handle short reads. + // With current Hadoop compression codecs all 'outLength' bytes are read in here, so not + // an issue for now. + return compressedIn.read(outArray, outOffset, outLength); + } + + public void clear() { + if (compressedOut != null) { + try { + compressedOut.close(); + } catch (IOException e) { + LOG.warn("Exception closing compressed output stream", e); + } + } + compressedOut = null; + if (lowerOut != null) { + try { + lowerOut.close(); + } catch (IOException e) { + LOG.warn("Exception closing lower output stream", e); + } + } + lowerOut = null; + if (compressedIn != null) { + try { + compressedIn.close(); + } catch (IOException e) { + LOG.warn("Exception closing compressed input stream", e); + } + } + compressedIn = null; + if (lowerIn != null) { + try { + lowerIn.close(); + } catch (IOException e) { + LOG.warn("Exception closing lower input stream", e); + } + } + lowerIn = null; + } + + } + private final Map dictionaries = new EnumMap<>(DictionaryIndex.class); // Context used for compressing tags TagCompressionContext tagCompressionContext = null; + ValueCompressor valueCompressor = null; - public CompressionContext(Class dictType, boolean recoveredEdits, - boolean hasTagCompression) throws SecurityException, NoSuchMethodException, - InstantiationException, IllegalAccessException, InvocationTargetException { + public CompressionContext(Class dictType, + boolean recoveredEdits, boolean hasTagCompression, boolean hasValueCompression, + Compression.Algorithm valueCompressionType) + throws SecurityException, NoSuchMethodException, InstantiationException, + IllegalAccessException, InvocationTargetException, IOException { Constructor dictConstructor = dictType.getConstructor(); for (DictionaryIndex dictionaryIndex : DictionaryIndex.values()) { @@ -70,12 +188,34 @@ public CompressionContext(Class dictType, boolean recovere if (hasTagCompression) { tagCompressionContext = new TagCompressionContext(dictType, Short.MAX_VALUE); } + if (hasValueCompression && valueCompressionType != null) { + valueCompressor = new ValueCompressor(valueCompressionType); + } + } + + public CompressionContext(Class dictType, boolean recoveredEdits, + boolean hasTagCompression) + throws SecurityException, NoSuchMethodException, InstantiationException, + IllegalAccessException, InvocationTargetException, IOException { + this(dictType, recoveredEdits, hasTagCompression, false, null); + } + + public boolean hasTagCompression() { + return tagCompressionContext != null; + } + + public boolean hasValueCompression() { + return valueCompressor != null; } public Dictionary getDictionary(Enum dictIndex) { return dictionaries.get(dictIndex); } + public ValueCompressor getValueCompressor() { + return valueCompressor; + } + void clear() { for(Dictionary dictionary : dictionaries.values()){ dictionary.clear(); @@ -83,5 +223,20 @@ void clear() { if (tagCompressionContext != null) { tagCompressionContext.clear(); } + if (valueCompressor != null) { + valueCompressor.clear(); + } } + + public static Compression.Algorithm getValueCompressionAlgorithm(Configuration conf) { + if (conf.getBoolean(ENABLE_WAL_VALUE_COMPRESSION, true)) { + String compressionType = conf.get(WAL_VALUE_COMPRESSION_TYPE); + if (compressionType != null) { + return Compression.getCompressionAlgorithmByName(compressionType); + } + return Compression.Algorithm.GZ; + } + return Compression.Algorithm.NONE; + } + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java index 9cd48c056b7a..c86dd4d130bd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java @@ -31,6 +31,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.hbase.codec.Codec; +import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; @@ -81,6 +82,8 @@ public class ProtobufLogReader extends ReaderBase { protected WALCellCodec.ByteStringUncompressor byteStringUncompressor; protected boolean hasCompression = false; protected boolean hasTagCompression = false; + protected boolean hasValueCompression = false; + protected Compression.Algorithm valueCompressionType = null; // walEditsStopOffset is the position of the last byte to read. After reading the last WALEdit // entry in the wal, the inputstream's position is equal to walEditsStopOffset. private long walEditsStopOffset; @@ -227,6 +230,16 @@ private String initInternal(FSDataInputStream stream, boolean isFirst) WALProtos.WALHeader header = builder.build(); this.hasCompression = header.hasHasCompression() && header.getHasCompression(); this.hasTagCompression = header.hasHasTagCompression() && header.getHasTagCompression(); + this.hasValueCompression = header.hasHasValueCompression() && + header.getHasValueCompression(); + if (header.hasValueCompressionAlgorithm()) { + try { + this.valueCompressionType = + Compression.Algorithm.values()[header.getValueCompressionAlgorithm()]; + } catch (ArrayIndexOutOfBoundsException e) { + throw new IOException("Invalid compression type", e); + } + } } this.inputStream = stream; this.walEditsStopOffset = this.fileLength; @@ -235,7 +248,9 @@ private String initInternal(FSDataInputStream stream, boolean isFirst) this.seekOnFs(currentPosition); if (LOG.isTraceEnabled()) { LOG.trace("After reading the trailer: walEditsStopOffset: " + this.walEditsStopOffset - + ", fileLength: " + this.fileLength + ", " + "trailerPresent: " + (trailerPresent ? "true, size: " + trailer.getSerializedSize() : "false") + ", currentPosition: " + currentPosition); + + ", fileLength: " + this.fileLength + ", " + "trailerPresent: " + + (trailerPresent ? "true, size: " + trailer.getSerializedSize() : "false") + + ", currentPosition: " + currentPosition); } codecClsName = hdrCtxt.getCellCodecClsName(); @@ -327,6 +342,16 @@ protected boolean hasTagCompression() { return this.hasTagCompression; } + @Override + protected boolean hasValueCompression() { + return this.hasValueCompression; + } + + @Override + protected Compression.Algorithm getValueCompressionAlgorithm() { + return this.valueCompressionType; + } + @Override protected boolean readNext(Entry entry) throws IOException { // OriginalPosition might be < 0 on local fs; if so, it is useless to us. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java index 9b6d69a8ef19..90a1653a5140 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java @@ -26,6 +26,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.util.LRUDictionary; import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; @@ -68,8 +69,15 @@ public void init(FileSystem fs, Path path, Configuration conf, FSDataInputStream // If compression is enabled, new dictionaries are created here. try { if (compressionContext == null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Initializing compression context for {}: isRecoveredEdits={}" + + ", hasTagCompression={}, hasValueCompression={}, valueCompressionType={}", path, + CommonFSUtils.isRecoveredEdits(path), hasTagCompression(), hasValueCompression(), + getValueCompressionAlgorithm()); + } compressionContext = new CompressionContext(LRUDictionary.class, - CommonFSUtils.isRecoveredEdits(path), hasTagCompression()); + CommonFSUtils.isRecoveredEdits(path), hasTagCompression(), + hasValueCompression(), getValueCompressionAlgorithm()); } else { compressionContext.clear(); } @@ -151,6 +159,16 @@ public void seek(long pos) throws IOException { */ protected abstract boolean hasTagCompression(); + /** + * @return Whether value compression is enabled for this log. + */ + protected abstract boolean hasValueCompression(); + + /** + * @return Value compression algorithm for this log. + */ + protected abstract Compression.Algorithm getValueCompressionAlgorithm(); + /** * Read next entry. * @param e The entry to read into. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java index 5aa943f1d84a..31eccc7a18af 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java @@ -222,9 +222,13 @@ private static byte[] uncompressByteString(ByteString bs, Dictionary dict) throw static class CompressedKvEncoder extends BaseEncoder { private final CompressionContext compression; + private final boolean hasValueCompression; + private final boolean hasTagCompression; public CompressedKvEncoder(OutputStream out, CompressionContext compression) { super(out); this.compression = compression; + this.hasValueCompression = compression.hasValueCompression(); + this.hasTagCompression = compression.hasTagCompression(); } @Override @@ -241,12 +245,16 @@ public void write(Cell cell) throws IOException { compression.getDictionary(CompressionContext.DictionaryIndex.FAMILY)); PrivateCellUtil.compressQualifier(out, cell, compression.getDictionary(CompressionContext.DictionaryIndex.QUALIFIER)); - // Write timestamp, type and value as uncompressed. + // Write timestamp, type and value. StreamUtils.writeLong(out, cell.getTimestamp()); out.write(cell.getTypeByte()); - PrivateCellUtil.writeValue(out, cell, cell.getValueLength()); + if (hasValueCompression) { + writeCompressedValue(out, cell); + } else { + PrivateCellUtil.writeValue(out, cell, cell.getValueLength()); + } if (tagsLength > 0) { - if (compression.tagCompressionContext != null) { + if (hasTagCompression) { // Write tags using Dictionary compression PrivateCellUtil.compressTags(out, cell, compression.tagCompressionContext); } else { @@ -256,20 +264,31 @@ public void write(Cell cell) throws IOException { } } } + + private void writeCompressedValue(OutputStream out, Cell cell) throws IOException { + byte[] compressed = compression.getValueCompressor().compress(cell.getValueArray(), + cell.getValueOffset(), cell.getValueLength()); + StreamUtils.writeRawVInt32(out, compressed.length); + out.write(compressed); + } + } static class CompressedKvDecoder extends BaseDecoder { private final CompressionContext compression; + private final boolean hasValueCompression; + private final boolean hasTagCompression; public CompressedKvDecoder(InputStream in, CompressionContext compression) { super(in); this.compression = compression; + this.hasValueCompression = compression.hasValueCompression(); + this.hasTagCompression = compression.hasTagCompression(); } @Override protected Cell parseCell() throws IOException { int keylength = StreamUtils.readRawVarint32(in); int vlength = StreamUtils.readRawVarint32(in); - int tagsLength = StreamUtils.readRawVarint32(in); int length = 0; if(tagsLength == 0) { @@ -302,18 +321,27 @@ protected Cell parseCell() throws IOException { compression.getDictionary(CompressionContext.DictionaryIndex.QUALIFIER)); pos += elemLen; - // timestamp, type and value - int tsTypeValLen = length - pos; + // timestamp + long ts = StreamUtils.readLong(in); + pos = Bytes.putLong(backingArray, pos, ts); + // type and value + int typeValLen = length - pos; if (tagsLength > 0) { - tsTypeValLen = tsTypeValLen - tagsLength - KeyValue.TAGS_LENGTH_SIZE; + typeValLen = typeValLen - tagsLength - KeyValue.TAGS_LENGTH_SIZE; + } + pos = Bytes.putByte(backingArray, pos, (byte)in.read()); + int valLen = typeValLen - 1; + if (hasValueCompression) { + readCompressedValue(in, backingArray, pos, valLen); + pos += valLen; + } else { + IOUtils.readFully(in, backingArray, pos, valLen); + pos += valLen; } - IOUtils.readFully(in, backingArray, pos, tsTypeValLen); - pos += tsTypeValLen; - // tags if (tagsLength > 0) { pos = Bytes.putAsShort(backingArray, pos, tagsLength); - if (compression.tagCompressionContext != null) { + if (hasTagCompression) { compression.tagCompressionContext.uncompressTags(in, backingArray, pos, tagsLength); } else { IOUtils.readFully(in, backingArray, pos, tagsLength); @@ -349,6 +377,17 @@ private static void checkLength(int len, int max) throws IOException { throw new IOException("Invalid length for compresesed portion of keyvalue: " + len); } } + + private void readCompressedValue(InputStream in, byte[] outArray, int outOffset, + int expectedLength) throws IOException { + int compressedLen = StreamUtils.readRawVarint32(in); + int read = compression.getValueCompressor().decompress(in, compressedLen, outArray, + outOffset, expectedLength); + if (read != expectedLength) { + throw new IOException("ValueCompressor state error: short read"); + } + } + } public static class EnsureKvEncoder extends BaseEncoder { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncWALReplayValueCompression.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncWALReplayValueCompression.java new file mode 100644 index 000000000000..cbe1faa65d48 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncWALReplayValueCompression.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.wal; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.experimental.categories.Category; + +@Category({ RegionServerTests.class, MediumTests.class }) +public class TestAsyncWALReplayValueCompression extends TestAsyncWALReplay { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestAsyncWALReplayValueCompression.class); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + Configuration conf = AbstractTestWALReplay.TEST_UTIL.getConfiguration(); + conf.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true); + conf.setBoolean(CompressionContext.ENABLE_WAL_VALUE_COMPRESSION, true); + TestAsyncWALReplay.setUpBeforeClass(); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALCellCodecWithCompression.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALCellCodecWithCompression.java index dd9ee697867f..4e091ca34964 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALCellCodecWithCompression.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALCellCodecWithCompression.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.regionserver.wal; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -25,6 +26,7 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.ArrayBackedTag; import org.apache.hadoop.hbase.ByteBufferKeyValue; @@ -35,6 +37,7 @@ import org.apache.hadoop.hbase.Tag; import org.apache.hadoop.hbase.codec.Codec.Decoder; import org.apache.hadoop.hbase.codec.Codec.Encoder; +import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.util.LRUDictionary; import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.testclassification.SmallTests; @@ -62,24 +65,108 @@ public void testEncodeDecodeKVsWithTagsWithTagsCompression() throws Exception { @Test public void testEncodeDecodeOffKVsWithTagsWithTagsCompression() throws Exception { - doTest(true, true); + doTest(true, false); + } + + @Test + public void testValueCompressionEnabled() throws Exception { + doTest(false, true); + } + + @Test + public void testValueCompression() throws Exception { + final byte[] row_1 = Bytes.toBytes("row_1"); + final byte[] value_1 = new byte[20]; + Bytes.zero(value_1); + final byte[] row_2 = Bytes.toBytes("row_2"); + final byte[] value_2 = new byte[Bytes.SIZEOF_LONG]; + Bytes.random(value_2); + final byte[] row_3 = Bytes.toBytes("row_3"); + final byte[] value_3 = new byte[100]; + Bytes.random(value_3); + final byte[] row_4 = Bytes.toBytes("row_4"); + final byte[] value_4 = new byte[128]; + fillBytes(value_4, Bytes.toBytes("DEADBEEF")); + final byte[] row_5 = Bytes.toBytes("row_5"); + final byte[] value_5 = new byte[64]; + fillBytes(value_5, Bytes.toBytes("CAFEBABE")); + + Configuration conf = new Configuration(false); + WALCellCodec codec = new WALCellCodec(conf, new CompressionContext(LRUDictionary.class, + false, true, true, Compression.Algorithm.GZ)); + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + Encoder encoder = codec.getEncoder(bos); + encoder.write(createKV(row_1, value_1, 0)); + encoder.write(createOffheapKV(row_2, value_2, 0)); + encoder.write(createKV(row_3, value_3, 0)); + encoder.write(createOffheapKV(row_4, value_4, 0)); + encoder.write(createKV(row_5, value_5, 0)); + encoder.flush(); + + try (InputStream is = new ByteArrayInputStream(bos.toByteArray())) { + Decoder decoder = codec.getDecoder(is); + decoder.advance(); + KeyValue kv = (KeyValue) decoder.current(); + assertTrue(Bytes.equals(row_1, 0, row_1.length, + kv.getRowArray(), kv.getRowOffset(), kv.getRowLength())); + assertTrue(Bytes.equals(value_1, 0, value_1.length, + kv.getValueArray(), kv.getValueOffset(), kv.getValueLength())); + decoder.advance(); + kv = (KeyValue) decoder.current(); + assertTrue(Bytes.equals(row_2, 0, row_2.length, + kv.getRowArray(), kv.getRowOffset(), kv.getRowLength())); + assertTrue(Bytes.equals(value_2, 0, value_2.length, + kv.getValueArray(), kv.getValueOffset(), kv.getValueLength())); + decoder.advance(); + kv = (KeyValue) decoder.current(); + assertTrue(Bytes.equals(row_3, 0, row_3.length, + kv.getRowArray(), kv.getRowOffset(), kv.getRowLength())); + assertTrue(Bytes.equals(value_3, 0, value_3.length, + kv.getValueArray(), kv.getValueOffset(), kv.getValueLength())); + decoder.advance(); + kv = (KeyValue) decoder.current(); + assertTrue(Bytes.equals(row_4, 0, row_4.length, + kv.getRowArray(), kv.getRowOffset(), kv.getRowLength())); + assertTrue(Bytes.equals(value_4, 0, value_4.length, + kv.getValueArray(), kv.getValueOffset(), kv.getValueLength())); + decoder.advance(); + kv = (KeyValue) decoder.current(); + assertTrue(Bytes.equals(row_5, 0, row_5.length, + kv.getRowArray(), kv.getRowOffset(), kv.getRowLength())); + assertTrue(Bytes.equals(value_5, 0, value_5.length, + kv.getValueArray(), kv.getValueOffset(), kv.getValueLength())); + } + } + + static void fillBytes(byte[] buffer, byte[] fill) { + int offset = 0; + int remaining = buffer.length; + while (remaining > 0) { + int len = remaining < fill.length ? remaining : fill.length; + System.arraycopy(fill, 0, buffer, offset, len); + offset += len; + remaining -= len; + } } - private void doTest(boolean compressTags, boolean offheapKV) throws Exception { + private void doTest(boolean compressTags, boolean offheapKV) + throws Exception { + final byte[] key = Bytes.toBytes("myRow"); + final byte[] value = Bytes.toBytes("myValue"); Configuration conf = new Configuration(false); conf.setBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, compressTags); - WALCellCodec codec = new WALCellCodec(conf, new CompressionContext(LRUDictionary.class, false, - compressTags)); + WALCellCodec codec = new WALCellCodec(conf, new CompressionContext(LRUDictionary.class, + false, compressTags)); ByteArrayOutputStream bos = new ByteArrayOutputStream(1024); Encoder encoder = codec.getEncoder(bos); if (offheapKV) { - encoder.write(createOffheapKV(1)); - encoder.write(createOffheapKV(0)); - encoder.write(createOffheapKV(2)); + encoder.write(createOffheapKV(key, value, 1)); + encoder.write(createOffheapKV(key, value, 0)); + encoder.write(createOffheapKV(key, value, 2)); } else { - encoder.write(createKV(1)); - encoder.write(createKV(0)); - encoder.write(createKV(2)); + encoder.write(createKV(key, value, 1)); + encoder.write(createKV(key, value, 0)); + encoder.write(createKV(key, value, 2)); } InputStream is = new ByteArrayInputStream(bos.toByteArray()); @@ -101,11 +188,9 @@ private void doTest(boolean compressTags, boolean offheapKV) throws Exception { assertEquals("tagValue2", Bytes.toString(Tag.cloneValue(tags.get(1)))); } - private KeyValue createKV(int noOfTags) { - byte[] row = Bytes.toBytes("myRow"); + private KeyValue createKV(byte[] row, byte[] value, int noOfTags) { byte[] cf = Bytes.toBytes("myCF"); byte[] q = Bytes.toBytes("myQualifier"); - byte[] value = Bytes.toBytes("myValue"); List tags = new ArrayList<>(noOfTags); for (int i = 1; i <= noOfTags; i++) { tags.add(new ArrayBackedTag((byte) i, Bytes.toBytes("tagValue" + i))); @@ -113,11 +198,9 @@ private KeyValue createKV(int noOfTags) { return new KeyValue(row, cf, q, HConstants.LATEST_TIMESTAMP, value, tags); } - private ByteBufferKeyValue createOffheapKV(int noOfTags) { - byte[] row = Bytes.toBytes("myRow"); + private ByteBufferKeyValue createOffheapKV(byte[] row, byte[] value, int noOfTags) { byte[] cf = Bytes.toBytes("myCF"); byte[] q = Bytes.toBytes("myQualifier"); - byte[] value = Bytes.toBytes("myValue"); List tags = new ArrayList<>(noOfTags); for (int i = 1; i <= noOfTags; i++) { tags.add(new ArrayBackedTag((byte) i, Bytes.toBytes("tagValue" + i))); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplayValueCompression.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplayValueCompression.java new file mode 100644 index 000000000000..d10cc9c73540 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplayValueCompression.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.wal; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.experimental.categories.Category; + +/** + * Enables compression and runs the TestWALReplay tests. + */ +@Category({ RegionServerTests.class, LargeTests.class }) +public class TestWALReplayValueCompression extends TestWALReplay { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestWALReplayValueCompression.class); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + Configuration conf = AbstractTestWALReplay.TEST_UTIL.getConfiguration(); + conf.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true); + conf.setBoolean(CompressionContext.ENABLE_WAL_VALUE_COMPRESSION, true); + TestWALReplay.setUpBeforeClass(); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestCompressedWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestCompressedWAL.java new file mode 100644 index 000000000000..6df0d1d20ea2 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestCompressedWAL.java @@ -0,0 +1,159 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.wal; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.Arrays; +import java.util.List; +import java.util.NavigableMap; +import java.util.TreeMap; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.RegionInfoBuilder; +import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; +import org.apache.hadoop.hbase.regionserver.wal.CompressionContext; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.CommonFSUtils; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; + +@RunWith(Parameterized.class) +@Category({ RegionServerTests.class, MediumTests.class }) +public class TestCompressedWAL { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestCompressedWAL.class); + + static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + @Rule + public TestName name = new TestName(); + + @Parameter + public String walProvider; + + @Parameters(name = "{index}: provider={0}") + public static Iterable data() { + return Arrays.asList(new Object[] { "defaultProvider" }, new Object[] { "asyncfs" }); + } + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + CommonFSUtils.setRootDir(conf, TEST_UTIL.getDataTestDirOnTestFS()); + TEST_UTIL.startMiniDFSCluster(3); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Before + public void setUp() { + TEST_UTIL.getConfiguration().set(WALFactory.WAL_PROVIDER, walProvider); + TEST_UTIL.getConfiguration() + .setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true); + } + + @Test + public void testCompressedWAL() throws Exception { + TEST_UTIL.getConfiguration() + .setBoolean(CompressionContext.ENABLE_WAL_VALUE_COMPRESSION, false); + doTest(); + } + + @Test + public void testCompressedWALWithValueCompression() throws Exception { + TEST_UTIL.getConfiguration() + .setBoolean(CompressionContext.ENABLE_WAL_VALUE_COMPRESSION, true); + doTest(); + } + + private void doTest() throws Exception { + TableName tableName = TableName.valueOf(name.getMethodName().replaceAll("[^a-zA-Z0-9]", "_")); + NavigableMap scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); + scopes.put(tableName.getName(), 0); + RegionInfo regionInfo = RegionInfoBuilder.newBuilder(tableName).build(); + final int total = 1000; + final byte[] row = Bytes.toBytes("row"); + final byte[] family = Bytes.toBytes("family"); + final byte[] value = Bytes.toBytes("Test value"); + FileSystem fs = TEST_UTIL.getDFSCluster().getFileSystem(); + final WALFactory wals = + new WALFactory(TEST_UTIL.getConfiguration(), tableName.getNameAsString()); + + // Write the WAL + final WAL wal = wals.getWAL(regionInfo); + + MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); + + for (int i = 0; i < total; i++) { + WALEdit kvs = new WALEdit(); + kvs.add(new KeyValue(row, family, Bytes.toBytes(i), value)); + wal.appendData(regionInfo, new WALKeyImpl(regionInfo.getEncodedNameAsBytes(), tableName, + System.currentTimeMillis(), mvcc, scopes), kvs); + } + wal.sync(); + final Path walPath = AbstractFSWALProvider.getCurrentFileName(wal); + wals.shutdown(); + + // Confirm the WAL can be read back + WAL.Reader reader = wals.createReader(TEST_UTIL.getTestFileSystem(), walPath); + int count = 0; + WAL.Entry entry = new WAL.Entry(); + while (reader.next(entry) != null) { + count++; + List cells = entry.getEdit().getCells(); + assertTrue("Should be one KV per WALEdit", cells.size() == 1); + for (Cell cell: cells) { + assertTrue("Incorrect row", Bytes.equals(cell.getRowArray(), cell.getRowOffset(), + cell.getRowLength(), row, 0, row.length)); + assertTrue("Incorrect family", Bytes.equals(cell.getFamilyArray(), cell.getFamilyOffset(), + cell.getFamilyLength(), family, 0, family.length)); + assertTrue("Incorrect value", Bytes.equals(cell.getValueArray(), cell.getValueOffset(), + cell.getValueLength(), value, 0, value.length)); + } + } + assertEquals("Should have read back as many KVs as written", total, count); + reader.close(); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplitValueCompression.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplitValueCompression.java new file mode 100644 index 000000000000..32ed85f6bba8 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplitValueCompression.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.wal; + +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.regionserver.wal.CompressionContext; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.experimental.categories.Category; + +@Category({RegionServerTests.class, LargeTests.class}) +public class TestWALSplitValueCompression extends TestWALSplit { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestWALSplitValueCompression.class); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.getConfiguration() + .setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true); + TEST_UTIL.getConfiguration() + .setBoolean(CompressionContext.ENABLE_WAL_VALUE_COMPRESSION, true); + TestWALSplit.setUpBeforeClass(); + } +}