Skip to content

Commit

Permalink
fixup! zlib: add zstd support
Browse files Browse the repository at this point in the history
  • Loading branch information
Jan Martin authored and jkrems committed Sep 10, 2024
1 parent 3b8013e commit 0d50235
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 17 deletions.
6 changes: 6 additions & 0 deletions doc/api/zlib.md
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,12 @@ The most important options are:
* Set compression parameters according to pre-defined cLevel table. Default
level is ZSTD\_CLEVEL\_DEFAULT==3.

#### Pledged Source Size

It's possible to specify the expected total size of the uncompressed input via
`opts.pledgedSrcSize`. If the size doesn't match at the end of the input,
compression will fail with the code `ZSTD_error_srcSize_wrong`.

#### Decompressor options

These advanced options are available for controlling decompression:
Expand Down
3 changes: 3 additions & 0 deletions lib/zlib.js
Original file line number Diff line number Diff line change
Expand Up @@ -884,8 +884,11 @@ function Zstd(opts, mode, initParamsArray, maxParam) {
const handle = mode === ZSTD_COMPRESS ?
new binding.ZstdCompress() : new binding.ZstdDecompress();

const pledgedSrcSize = opts?.pledgedSrcSize ?? undefined;

this._writeState = new Uint32Array(2);
if (!handle.init(initParamsArray,
pledgedSrcSize,
this._writeState,
processCallback)) {
throw new ERR_ZLIB_INITIALIZATION_FAILED();
Expand Down
60 changes: 44 additions & 16 deletions src/node_zlib.cc
Original file line number Diff line number Diff line change
Expand Up @@ -324,8 +324,8 @@ class ZstdCompressContext final : public ZstdContext {
CompressionError ResetStream();

// Zstd specific:
CompressionError Init();
CompressionError SetParameter(int key, uint32_t value);
CompressionError Init(uint64_t pledged_src_size);
CompressionError SetParameter(int key, int value);

// Wrap ZSTD_freeCCtx to remove the return type.
static void FreeCCtx(ZSTD_CCtx* cctx) { ZSTD_freeCCtx(cctx); }
Expand All @@ -336,6 +336,8 @@ class ZstdCompressContext final : public ZstdContext {

private:
DeleteFnPtr<ZSTD_CCtx, ZstdCompressContext::FreeCCtx> cctx_;

uint64_t pledged_src_size_ = ZSTD_CONTENTSIZE_UNKNOWN;
};

class ZstdDecompressContext final : public ZstdContext {
Expand All @@ -347,8 +349,8 @@ class ZstdDecompressContext final : public ZstdContext {
CompressionError ResetStream();

// Zstd specific:
CompressionError Init();
CompressionError SetParameter(int key, uint32_t value);
CompressionError Init(uint64_t pledged_src_size);
CompressionError SetParameter(int key, int value);

// Wrap ZSTD_freeDCtx to remove the return type.
static void FreeDCtx(ZSTD_DCtx* dctx) { ZSTD_freeDCtx(dctx); }
Expand Down Expand Up @@ -860,19 +862,37 @@ class ZstdStream final : public CompressionStream<CompressionContext> {
}

static void Init(const FunctionCallbackInfo<Value>& args) {
CHECK(args.Length() == 3 && "init(params, writeResult, writeCallback)");
Environment* env = Environment::GetCurrent(args);
Local<Context> context = env->context();

CHECK(args.Length() == 4 &&
"init(params, pledgedSrcSize, writeResult, writeCallback)");
ZstdStream* wrap;
ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder());

CHECK(args[1]->IsUint32Array());
uint32_t* write_result = reinterpret_cast<uint32_t*>(Buffer::Data(args[1]));
CHECK(args[2]->IsUint32Array());
uint32_t* write_result = reinterpret_cast<uint32_t*>(Buffer::Data(args[2]));

CHECK(args[2]->IsFunction());
Local<Function> write_js_callback = args[2].As<Function>();
CHECK(args[3]->IsFunction());
Local<Function> write_js_callback = args[3].As<Function>();
wrap->InitStream(write_result, write_js_callback);

uint64_t pledged_src_size = ZSTD_CONTENTSIZE_UNKNOWN;
if (args[1]->IsNumber()) {
int64_t signed_pledged_src_size;
if (!args[1]->IntegerValue(context).To(&signed_pledged_src_size)) {
args.GetReturnValue().Set(false);
return;
}
if (signed_pledged_src_size < 0) {
args.GetReturnValue().Set(false);
return;
}
pledged_src_size = signed_pledged_src_size;
}

AllocScope alloc_scope(wrap);
CompressionError err = wrap->context()->Init();
CompressionError err = wrap->context()->Init(pledged_src_size);
if (err.IsError()) {
wrap->EmitError(err);
args.GetReturnValue().Set(false);
Expand Down Expand Up @@ -1469,7 +1489,7 @@ CompressionError ZstdContext::GetErrorInfo() const {
}
}

CompressionError ZstdCompressContext::SetParameter(int key, uint32_t value) {
CompressionError ZstdCompressContext::SetParameter(int key, int value) {
size_t result = ZSTD_CCtx_setParameter(
cctx_.get(), static_cast<ZSTD_cParameter>(key), value);
if (ZSTD_isError(result)) {
Expand All @@ -1479,18 +1499,24 @@ CompressionError ZstdCompressContext::SetParameter(int key, uint32_t value) {
return CompressionError{};
}

CompressionError ZstdCompressContext::Init() {
CompressionError ZstdCompressContext::Init(uint64_t pledged_src_size) {
pledged_src_size_ = pledged_src_size;
cctx_.reset(ZSTD_createCCtx());
if (!cctx_) {
return CompressionError("Could not initialize zstd instance",
"ERR_ZLIB_INITIALIZATION_FAILED",
-1);
}
size_t result = ZSTD_CCtx_setPledgedSrcSize(cctx_.get(), pledged_src_size);
if (ZSTD_isError(result)) {
return CompressionError(
"Could not set pledged src size", "ERR_ZLIB_INITIALIZATION_FAILED", -1);
}
return CompressionError{};
}

CompressionError ZstdCompressContext::ResetStream() {
return Init();
return Init(pledged_src_size_);
}

void ZstdCompressContext::DoThreadPoolWork() {
Expand All @@ -1504,7 +1530,7 @@ void ZstdCompressContext::DoThreadPoolWork() {
}
}

CompressionError ZstdDecompressContext::SetParameter(int key, uint32_t value) {
CompressionError ZstdDecompressContext::SetParameter(int key, int value) {
size_t result = ZSTD_DCtx_setParameter(
dctx_.get(), static_cast<ZSTD_dParameter>(key), value);
if (ZSTD_isError(result)) {
Expand All @@ -1514,7 +1540,7 @@ CompressionError ZstdDecompressContext::SetParameter(int key, uint32_t value) {
return CompressionError{};
}

CompressionError ZstdDecompressContext::Init() {
CompressionError ZstdDecompressContext::Init(uint64_t pledged_src_size) {
dctx_.reset(ZSTD_createDCtx());
if (!dctx_) {
return CompressionError("Could not initialize zstd instance",
Expand All @@ -1525,7 +1551,9 @@ CompressionError ZstdDecompressContext::Init() {
}

CompressionError ZstdDecompressContext::ResetStream() {
return Init();
// We pass ZSTD_CONTENTSIZE_UNKNOWN because the argument is ignored for
// decompression.
return Init(ZSTD_CONTENTSIZE_UNKNOWN);
}

void ZstdDecompressContext::DoThreadPoolWork() {
Expand Down
37 changes: 37 additions & 0 deletions test/parallel/test-zlib-zstd-pledged-src-size.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
'use strict';
const common = require('../common');
const assert = require('assert');
const zlib = require('zlib');

function compressWithPledgedSrcSize({ pledgedSrcSize, actualSrcSize }) {
return new Promise((resolve, reject) => {
const compressor = zlib.createZstdCompress({ pledgedSrcSize });
compressor.on('error', (e) => {
reject(e);
});
compressor.on('end', resolve);
compressor.write('x'.repeat(actualSrcSize), () => {
compressor.end();
compressor.resume();
});
}).then(() => {
// Compression should only succeed if sizes match
assert.strictEqual(pledgedSrcSize, actualSrcSize);
}, (error) => {
assert.strictEqual(error.code, 'ZSTD_error_srcSize_wrong');
// Size error should only happen when sizes do not match
assert.notStrictEqual(pledgedSrcSize, actualSrcSize);
}).then(common.mustCall());
}

compressWithPledgedSrcSize({ pledgedSrcSize: 0, actualSrcSize: 0 });

compressWithPledgedSrcSize({ pledgedSrcSize: 0, actualSrcSize: 42 });

compressWithPledgedSrcSize({ pledgedSrcSize: 13, actualSrcSize: 42 });

compressWithPledgedSrcSize({ pledgedSrcSize: 42, actualSrcSize: 0 });

compressWithPledgedSrcSize({ pledgedSrcSize: 42, actualSrcSize: 13 });

compressWithPledgedSrcSize({ pledgedSrcSize: 42, actualSrcSize: 42 });
2 changes: 1 addition & 1 deletion test/parallel/test-zlib-zstd.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ const fixtures = require('../common/fixtures');
const assert = require('assert');
const zlib = require('zlib');

// Test some brotli-specific properties of the brotli streams that can not
// Test some zstd-specific properties of the zstd streams that can not
// be easily covered through expanding zlib-only tests.

const sampleBuffer = fixtures.readSync('/pss-vectors.json');
Expand Down

0 comments on commit 0d50235

Please sign in to comment.