Skip to content

Commit

Permalink
Close Zstd Dictionary after execution to avoid any memory leak. (#9403)
Browse files Browse the repository at this point in the history
* Close Dictionary after every execution to avoid any memory leak

Signed-off-by: Mohit Godwani <mgodwan@amazon.com>

* Close Dictionary after every execution to avoid any memory leak

Signed-off-by: Mohit Godwani <mgodwan@amazon.com>

* Add changelog

Signed-off-by: Mohit Godwani <mgodwan@amazon.com>

---------

Signed-off-by: Mohit Godwani <mgodwan@amazon.com>
  • Loading branch information
mgodwan authored Aug 17, 2023
1 parent e1c40b4 commit 5cc7313
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 29 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Remote Store] Add Segment download stats to remotestore stats API ([#8718](https://github.com/opensearch-project/OpenSearch/pull/8718))
- [Remote Store] Add remote segment transfer stats on NodesStats API ([#9168](https://github.com/opensearch-project/OpenSearch/pull/9168))
- Return 409 Conflict HTTP status instead of 503 on failure to concurrently execute snapshots ([#8986](https://github.com/opensearch-project/OpenSearch/pull/5855))
- Fix memory leak when using Zstd Dictionary ([#9403](https://github.com/opensearch-project/OpenSearch/pull/9403))

### Deprecated

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,13 @@ private void compress(byte[] bytes, int offset, int length, DataOutput out) thro

// dictionary compression first
doCompress(bytes, offset, dictLength, cctx, out);
cctx.loadDict(new ZstdDictCompress(bytes, offset, dictLength, compressionLevel));
try (ZstdDictCompress dictCompress = new ZstdDictCompress(bytes, offset, dictLength, compressionLevel)) {
cctx.loadDict(dictCompress);

for (int start = offset + dictLength; start < end; start += blockLength) {
int l = Math.min(blockLength, end - start);
doCompress(bytes, start, l, cctx, out);
for (int start = offset + dictLength; start < end; start += blockLength) {
int l = Math.min(blockLength, end - start);
doCompress(bytes, start, l, cctx, out);
}
}
}
}
Expand Down Expand Up @@ -170,32 +172,33 @@ public void decompress(DataInput in, int originalLength, int offset, int length,

// decompress dictionary first
doDecompress(in, dctx, bytes, dictLength);

dctx.loadDict(new ZstdDictDecompress(bytes.bytes, 0, dictLength));

int offsetInBlock = dictLength;
int offsetInBytesRef = offset;

// Skip unneeded blocks
while (offsetInBlock + blockLength < offset) {
final int compressedLength = in.readVInt();
in.skipBytes(compressedLength);
offsetInBlock += blockLength;
offsetInBytesRef -= blockLength;
try (ZstdDictDecompress dictDecompress = new ZstdDictDecompress(bytes.bytes, 0, dictLength)) {
dctx.loadDict(dictDecompress);

int offsetInBlock = dictLength;
int offsetInBytesRef = offset;

// Skip unneeded blocks
while (offsetInBlock + blockLength < offset) {
final int compressedLength = in.readVInt();
in.skipBytes(compressedLength);
offsetInBlock += blockLength;
offsetInBytesRef -= blockLength;
}

// Read blocks that intersect with the interval we need
while (offsetInBlock < offset + length) {
bytes.bytes = ArrayUtil.grow(bytes.bytes, bytes.length + blockLength);
int l = Math.min(blockLength, originalLength - offsetInBlock);
doDecompress(in, dctx, bytes, l);
offsetInBlock += blockLength;
}

bytes.offset = offsetInBytesRef;
bytes.length = length;

assert bytes.isValid() : "decompression output is corrupted";
}

// Read blocks that intersect with the interval we need
while (offsetInBlock < offset + length) {
bytes.bytes = ArrayUtil.grow(bytes.bytes, bytes.length + blockLength);
int l = Math.min(blockLength, originalLength - offsetInBlock);
doDecompress(in, dctx, bytes, l);
offsetInBlock += blockLength;
}

bytes.offset = offsetInBytesRef;
bytes.length = length;

assert bytes.isValid() : "decompression output is corrupted";
}
}

Expand Down

0 comments on commit 5cc7313

Please sign in to comment.