From 9f60f1821f88ac26b29c111d9d0ec1e0aad2c52e Mon Sep 17 00:00:00 2001 From: Charles Connell Date: Tue, 9 Apr 2024 04:20:47 -0400 Subject: [PATCH] HBASE-28485 Re-use ZstdDecompressCtx/ZstdCompressCtx for performance (#5797) Co-authored-by: Charles Connell Signed-off-by: Andrew Purtell Signed-off-by: Nick Dimiduk --- .../io/compress/zstd/ZstdCompressor.java | 20 +++++++++++++------ .../io/compress/zstd/ZstdDecompressor.java | 19 +++++++++++------- 2 files changed, 26 insertions(+), 13 deletions(-) diff --git a/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdCompressor.java b/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdCompressor.java index 4d34d4825d31..b48db9106fb4 100644 --- a/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdCompressor.java +++ b/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdCompressor.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.io.compress.zstd; import com.github.luben.zstd.Zstd; +import com.github.luben.zstd.ZstdCompressCtx; import com.github.luben.zstd.ZstdDictCompress; import java.io.IOException; import java.nio.ByteBuffer; @@ -39,6 +40,7 @@ public class ZstdCompressor implements CanReinit, Compressor { protected long bytesRead, bytesWritten; protected int dictId; protected ZstdDictCompress dict; + protected ZstdCompressCtx ctx; ZstdCompressor(final int level, final int bufferSize, final byte[] dictionary) { this.level = level; @@ -46,9 +48,12 @@ public class ZstdCompressor implements CanReinit, Compressor { this.inBuf = ByteBuffer.allocateDirect(bufferSize); this.outBuf = ByteBuffer.allocateDirect(bufferSize); this.outBuf.position(bufferSize); + this.ctx = new ZstdCompressCtx(); + this.ctx.setLevel(level); if (dictionary != null) { this.dictId = ZstdCodec.getDictionaryId(dictionary); this.dict = new ZstdDictCompress(dictionary, level); + this.ctx.loadDict(this.dict); } } @@ -79,12 +84,7 @@ public int compress(final byte[] b, final int off, final int len) throws IOExcep } else { outBuf.clear(); } - int written; - if (dict != null) { - written = Zstd.compress(outBuf, inBuf, dict); - } else { - written = Zstd.compress(outBuf, inBuf, level); - } + int written = ctx.compress(outBuf, inBuf); bytesWritten += written; inBuf.clear(); finished = true; @@ -170,6 +170,14 @@ public void reset() { bytesWritten = 0; finish = false; finished = false; + ctx.reset(); + ctx.setLevel(level); + if (dict != null) { + ctx.loadDict(dict); + } else { + // loadDict((byte[]) accepts null to clear the dictionary + ctx.loadDict((byte[]) null); + } } @Override diff --git a/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdDecompressor.java b/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdDecompressor.java index ef0a0f87651f..79826c96d5e3 100644 --- a/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdDecompressor.java +++ b/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdDecompressor.java @@ -17,7 +17,7 @@ */ package org.apache.hadoop.hbase.io.compress.zstd; -import com.github.luben.zstd.Zstd; +import com.github.luben.zstd.ZstdDecompressCtx; import com.github.luben.zstd.ZstdDictDecompress; import java.io.IOException; import java.nio.ByteBuffer; @@ -39,15 +39,18 @@ public class ZstdDecompressor implements CanReinit, Decompressor { protected boolean finished; protected int dictId; protected ZstdDictDecompress dict; + protected ZstdDecompressCtx ctx; ZstdDecompressor(final int bufferSize, final byte[] dictionary) { this.bufferSize = bufferSize; this.inBuf = ByteBuffer.allocateDirect(bufferSize); this.outBuf = ByteBuffer.allocateDirect(bufferSize); this.outBuf.position(bufferSize); + this.ctx = new ZstdDecompressCtx(); if (dictionary != null) { this.dictId = ZstdCodec.getDictionaryId(dictionary); this.dict = new ZstdDictDecompress(dictionary); + this.ctx.loadDict(this.dict); } } @@ -67,12 +70,7 @@ public int decompress(final byte[] b, final int off, final int len) throws IOExc int remaining = inBuf.remaining(); inLen -= remaining; outBuf.clear(); - int written; - if (dict != null) { - written = Zstd.decompress(outBuf, inBuf, dict); - } else { - written = Zstd.decompress(outBuf, inBuf); - } + int written = ctx.decompress(outBuf, inBuf); inBuf.clear(); outBuf.flip(); int n = Math.min(written, len); @@ -109,6 +107,13 @@ public void reset() { outBuf.clear(); outBuf.position(outBuf.capacity()); finished = false; + ctx.reset(); + if (dict != null) { + ctx.loadDict(dict); + } else { + // loadDict((byte[]) accepts null to clear the dictionary + ctx.loadDict((byte[]) null); + } } @Override