diff --git a/src/node_zlib.cc b/src/node_zlib.cc index de8b0335892f65..4a2c25c6e80638 100644 --- a/src/node_zlib.cc +++ b/src/node_zlib.cc @@ -34,6 +34,7 @@ #include #include #include +#include namespace node { @@ -96,6 +97,8 @@ class ZCtx : public AsyncWrap, public ThreadPoolWork { ~ZCtx() override { CHECK_EQ(false, write_in_progress_ && "write in progress"); Close(); + CHECK_EQ(zlib_memory_, 0); + CHECK_EQ(unreported_allocations_, 0); } void Close() { @@ -108,17 +111,15 @@ class ZCtx : public AsyncWrap, public ThreadPoolWork { CHECK(init_done_ && "close before init"); CHECK_LE(mode_, UNZIP); + AllocScope alloc_scope(this); int status = Z_OK; if (mode_ == DEFLATE || mode_ == GZIP || mode_ == DEFLATERAW) { status = deflateEnd(&strm_); - int64_t change_in_bytes = -static_cast(kDeflateContextSize); - env()->isolate()->AdjustAmountOfExternalAllocatedMemory(change_in_bytes); } else if (mode_ == INFLATE || mode_ == GUNZIP || mode_ == INFLATERAW || mode_ == UNZIP) { status = inflateEnd(&strm_); - int64_t change_in_bytes = -static_cast(kInflateContextSize); - env()->isolate()->AdjustAmountOfExternalAllocatedMemory(change_in_bytes); } + CHECK(status == Z_OK || status == Z_DATA_ERROR); mode_ = NONE; @@ -164,6 +165,8 @@ class ZCtx : public AsyncWrap, public ThreadPoolWork { CHECK(0 && "Invalid flush value"); } + AllocScope alloc_scope(ctx); + Bytef* in; Bytef* out; size_t in_off, in_len, out_off, out_len; @@ -354,6 +357,8 @@ class ZCtx : public AsyncWrap, public ThreadPoolWork { // v8 land! void AfterThreadPoolWork(int status) override { + AllocScope alloc_scope(this); + write_in_progress_ = false; if (status == UV_ECANCELED) { @@ -504,14 +509,15 @@ class ZCtx : public AsyncWrap, public ThreadPoolWork { int strategy, uint32_t* write_result, Local write_js_callback, char* dictionary, size_t dictionary_len) { + AllocScope alloc_scope(ctx); ctx->level_ = level; ctx->windowBits_ = windowBits; ctx->memLevel_ = memLevel; ctx->strategy_ = strategy; - ctx->strm_.zalloc = Z_NULL; - ctx->strm_.zfree = Z_NULL; - ctx->strm_.opaque = Z_NULL; + ctx->strm_.zalloc = AllocForZlib; + ctx->strm_.zfree = FreeForZlib; + ctx->strm_.opaque = static_cast(ctx); ctx->flush_ = Z_NO_FLUSH; @@ -539,16 +545,12 @@ class ZCtx : public AsyncWrap, public ThreadPoolWork { ctx->windowBits_, ctx->memLevel_, ctx->strategy_); - ctx->env()->isolate() - ->AdjustAmountOfExternalAllocatedMemory(kDeflateContextSize); break; case INFLATE: case GUNZIP: case INFLATERAW: case UNZIP: ctx->err_ = inflateInit2(&ctx->strm_, ctx->windowBits_); - ctx->env()->isolate() - ->AdjustAmountOfExternalAllocatedMemory(kInflateContextSize); break; default: UNREACHABLE(); @@ -604,6 +606,8 @@ class ZCtx : public AsyncWrap, public ThreadPoolWork { } static void Params(ZCtx* ctx, int level, int strategy) { + AllocScope alloc_scope(ctx); + ctx->err_ = Z_OK; switch (ctx->mode_) { @@ -621,6 +625,8 @@ class ZCtx : public AsyncWrap, public ThreadPoolWork { } void Reset() { + AllocScope alloc_scope(this); + err_ = Z_OK; switch (mode_) { @@ -659,8 +665,51 @@ class ZCtx : public AsyncWrap, public ThreadPoolWork { } } - static const int kDeflateContextSize = 16384; // approximate - static const int kInflateContextSize = 10240; // approximate + // Allocation functions provided to zlib itself. We store the real size of + // the allocated memory chunk just before the "payload" memory we return + // to zlib. + // Because we use zlib off the thread pool, we can not report memory directly + // to V8; rather, we first store it as "unreported" memory in a separate + // field and later report it back from the main thread. + static void* AllocForZlib(void* data, uInt items, uInt size) { + ZCtx* ctx = static_cast(data); + size_t real_size = + MultiplyWithOverflowCheck(static_cast(items), + static_cast(size)) + sizeof(size_t); + char* memory = UncheckedMalloc(real_size); + if (UNLIKELY(memory == nullptr)) return nullptr; + *reinterpret_cast(memory) = real_size; + ctx->unreported_allocations_.fetch_add(real_size, + std::memory_order_relaxed); + return memory + sizeof(size_t); + } + + static void FreeForZlib(void* data, void* pointer) { + if (UNLIKELY(pointer == nullptr)) return; + ZCtx* ctx = static_cast(data); + char* real_pointer = static_cast(pointer) - sizeof(size_t); + size_t real_size = *reinterpret_cast(real_pointer); + ctx->unreported_allocations_.fetch_sub(real_size, + std::memory_order_relaxed); + free(real_pointer); + } + + // This is called on the main thread after zlib may have allocated something + // in order to report it back to V8. + void AdjustAmountOfExternalAllocatedMemory() { + ssize_t report = + unreported_allocations_.exchange(0, std::memory_order_relaxed); + if (report == 0) return; + CHECK_IMPLIES(report < 0, zlib_memory_ >= static_cast(-report)); + zlib_memory_ += report; + env()->isolate()->AdjustAmountOfExternalAllocatedMemory(report); + } + + struct AllocScope { + explicit AllocScope(ZCtx* ctx) : ctx(ctx) {} + ~AllocScope() { ctx->AdjustAmountOfExternalAllocatedMemory(); } + ZCtx* ctx; + }; Bytef* dictionary_; size_t dictionary_len_; @@ -679,6 +728,8 @@ class ZCtx : public AsyncWrap, public ThreadPoolWork { unsigned int gzip_id_bytes_read_; uint32_t* write_result_; Persistent write_js_callback_; + std::atomic unreported_allocations_{0}; + size_t zlib_memory_ = 0; };