Skip to content

Commit

Permalink
HBASE-28485 Re-use ZstdDecompressCtx/ZstdCompressCtx for performance (a…
Browse files Browse the repository at this point in the history
…pache#5797)

Co-authored-by: Charles Connell <cconnell@hubspot.com>
Signed-off-by: Andrew Purtell <apurtell@apache.org>
Signed-off-by: Nick Dimiduk <ndimiduk@apache.org>
  • Loading branch information
2 people authored and ndimiduk committed Apr 9, 2024
1 parent add57ca commit f6e8db8
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.io.compress.zstd;

import com.github.luben.zstd.Zstd;
import com.github.luben.zstd.ZstdCompressCtx;
import com.github.luben.zstd.ZstdDictCompress;
import java.io.IOException;
import java.nio.ByteBuffer;
Expand All @@ -39,16 +40,20 @@ public class ZstdCompressor implements CanReinit, Compressor {
protected long bytesRead, bytesWritten;
protected int dictId;
protected ZstdDictCompress dict;
protected ZstdCompressCtx ctx;

ZstdCompressor(final int level, final int bufferSize, final byte[] dictionary) {
this.level = level;
this.bufferSize = bufferSize;
this.inBuf = ByteBuffer.allocateDirect(bufferSize);
this.outBuf = ByteBuffer.allocateDirect(bufferSize);
this.outBuf.position(bufferSize);
this.ctx = new ZstdCompressCtx();
this.ctx.setLevel(level);
if (dictionary != null) {
this.dictId = ZstdCodec.getDictionaryId(dictionary);
this.dict = new ZstdDictCompress(dictionary, level);
this.ctx.loadDict(this.dict);
}
}

Expand Down Expand Up @@ -79,12 +84,7 @@ public int compress(final byte[] b, final int off, final int len) throws IOExcep
} else {
outBuf.clear();
}
int written;
if (dict != null) {
written = Zstd.compress(outBuf, inBuf, dict);
} else {
written = Zstd.compress(outBuf, inBuf, level);
}
int written = ctx.compress(outBuf, inBuf);
bytesWritten += written;
inBuf.clear();
finished = true;
Expand Down Expand Up @@ -170,6 +170,14 @@ public void reset() {
bytesWritten = 0;
finish = false;
finished = false;
ctx.reset();
ctx.setLevel(level);
if (dict != null) {
ctx.loadDict(dict);
} else {
// loadDict((byte[]) accepts null to clear the dictionary
ctx.loadDict((byte[]) null);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*/
package org.apache.hadoop.hbase.io.compress.zstd;

import com.github.luben.zstd.Zstd;
import com.github.luben.zstd.ZstdDecompressCtx;
import com.github.luben.zstd.ZstdDictDecompress;
import java.io.IOException;
import java.nio.ByteBuffer;
Expand All @@ -39,15 +39,18 @@ public class ZstdDecompressor implements CanReinit, Decompressor {
protected boolean finished;
protected int dictId;
protected ZstdDictDecompress dict;
protected ZstdDecompressCtx ctx;

ZstdDecompressor(final int bufferSize, final byte[] dictionary) {
this.bufferSize = bufferSize;
this.inBuf = ByteBuffer.allocateDirect(bufferSize);
this.outBuf = ByteBuffer.allocateDirect(bufferSize);
this.outBuf.position(bufferSize);
this.ctx = new ZstdDecompressCtx();
if (dictionary != null) {
this.dictId = ZstdCodec.getDictionaryId(dictionary);
this.dict = new ZstdDictDecompress(dictionary);
this.ctx.loadDict(this.dict);
}
}

Expand All @@ -67,12 +70,7 @@ public int decompress(final byte[] b, final int off, final int len) throws IOExc
int remaining = inBuf.remaining();
inLen -= remaining;
outBuf.clear();
int written;
if (dict != null) {
written = Zstd.decompress(outBuf, inBuf, dict);
} else {
written = Zstd.decompress(outBuf, inBuf);
}
int written = ctx.decompress(outBuf, inBuf);
inBuf.clear();
outBuf.flip();
int n = Math.min(written, len);
Expand Down Expand Up @@ -109,6 +107,13 @@ public void reset() {
outBuf.clear();
outBuf.position(outBuf.capacity());
finished = false;
ctx.reset();
if (dict != null) {
ctx.loadDict(dict);
} else {
// loadDict((byte[]) accepts null to clear the dictionary
ctx.loadDict((byte[]) null);
}
}

@Override
Expand Down

0 comments on commit f6e8db8

Please sign in to comment.