Skip to content

Commit

Permalink
Fix memory leak when running topic compaction. (apache#6485)
Browse files Browse the repository at this point in the history
Fixes apache#6482

### Motivation
Prevent topic compaction from leaking direct memory

### Modifications

Several leaks were discovered using Netty leak detection and code review.
* `CompactedTopicImpl.readOneMessageId` would get an `Enumeration` of `LedgerEntry`, but did not release the underlying buffers. Fix: iterate though the `Enumeration` and release underlying buffer. Instead of logging the case where the `Enumeration` did not contain any elements, complete the future exceptionally with the message (will be logged by Caffeine).
* Two main sources of leak in `TwoPhaseCompactor`. The `RawBacthConverter.rebatchMessage` method failed to close/release a `ByteBuf` (uncompressedPayload). Also, the return ByteBuf of `RawBacthConverter.rebatchMessage` was not closed. The first one was easy to fix (release buffer), to fix the second one and make the code easier to read, I decided to not let `RawBacthConverter.rebatchMessage`  close the message read from the topic, instead the message read from the topic can be closed in a try/finally clause surrounding most of the method body handing a message from a topic (in phase two loop). Then if a new message was produced by `RawBacthConverter.rebatchMessage` we check that after we have added the message to the compact ledger and release the message.

### Verifying this change
Modified `RawReaderTest.testBatchingRebatch` to show new contract.

One can run the test described to reproduce the issue, to verify no leak is detected.

(cherry picked from commit f2ec1b4)
  • Loading branch information
racorn authored and tuteng committed Mar 21, 2020
1 parent 1e1dd06 commit 5911b8b
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,7 @@ public static List<ImmutablePair<MessageId,String>> extractIdsAndKeys(RawMessage
* Take a batched message and a filter, and returns a message with the only the sub-messages
* which match the filter. Returns an empty optional if no messages match.
*
* This takes ownership of the passes in message, and if the returned optional is not empty,
* the ownership of that message is returned also.
* NOTE: this message does not alter the reference count of the RawMessage argument.
*/
public static Optional<RawMessage> rebatchMessage(RawMessage msg,
BiPredicate<String, MessageId> filter)
Expand Down Expand Up @@ -161,9 +160,9 @@ public static Optional<RawMessage> rebatchMessage(RawMessage msg,
return Optional.empty();
}
} finally {
uncompressedPayload.release();
batchBuffer.release();
metadata.recycle();
msg.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -164,12 +164,19 @@ private static CompletableFuture<MessageIdData> readOneMessageId(LedgerHandle lh
if (rc != BKException.Code.OK) {
promise.completeExceptionally(BKException.create(rc));
} else {
try (RawMessage m = RawMessageImpl.deserializeFrom(
seq.nextElement().getEntryBuffer())) {
promise.complete(m.getMessageIdData());
} catch (NoSuchElementException e) {
log.error("No such entry {} in ledger {}", entryId, lh.getId());
promise.completeExceptionally(e);
// Need to release buffers for all entries in the sequence
if (seq.hasMoreElements()) {
LedgerEntry entry = seq.nextElement();
try (RawMessage m = RawMessageImpl.deserializeFrom(entry.getEntryBuffer())) {
entry.getEntryBuffer().release();
while (seq.hasMoreElements()) {
seq.nextElement().getEntryBuffer().release();
}
promise.complete(m.getMessageIdData());
}
} else {
promise.completeExceptionally(new NoSuchElementException(
String.format("No such entry %d in ledger %d", entryId, lh.getId())));
}
}
}, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,77 +212,88 @@ private CompletableFuture<Long> phaseTwoSeekThenLoop(RawReader reader, MessageId

private void phaseTwoLoop(RawReader reader, MessageId to, Map<String, MessageId> latestForKey,
LedgerHandle lh, Semaphore outstanding, CompletableFuture<Void> promise) {
if (promise.isDone()) {
return;
}
reader.readNextAsync().whenCompleteAsync(
(m, exception) -> {
if (exception != null) {
promise.completeExceptionally(exception);
return;
} else if (promise.isDone()) {
m.close();
return;
}
MessageId id = m.getMessageId();
Optional<RawMessage> messageToAdd = Optional.empty();
if (RawBatchConverter.isReadableBatch(m)) {
try {
messageToAdd = RawBatchConverter.rebatchMessage(
m, (key, subid) -> latestForKey.get(key).equals(subid));
} catch (IOException ioe) {
log.info("Error decoding batch for message {}. Whole batch will be included in output",
id, ioe);
messageToAdd = Optional.of(m);
}
} else {
Pair<String,Integer> keyAndSize = extractKeyAndSize(m);
MessageId msg;
if (keyAndSize == null) { // pass through messages without a key
messageToAdd = Optional.of(m);
} else if ((msg = latestForKey.get(keyAndSize.getLeft())) != null
&& msg.equals(id)) { // consider message only if present into latestForKey map
if (keyAndSize.getRight() <= 0) {
promise.completeExceptionally(new IllegalArgumentException(
"Compaction phase found empty record from sorted key-map"));
try {
MessageId id = m.getMessageId();
Optional<RawMessage> messageToAdd = Optional.empty();
if (RawBatchConverter.isReadableBatch(m)) {
try {
messageToAdd = RawBatchConverter.rebatchMessage(
m, (key, subid) -> latestForKey.get(key).equals(subid));
} catch (IOException ioe) {
log.info("Error decoding batch for message {}. Whole batch will be included in output",
id, ioe);
messageToAdd = Optional.of(m);
}
messageToAdd = Optional.of(m);
} else {
m.close();
Pair<String,Integer> keyAndSize = extractKeyAndSize(m);
MessageId msg;
if (keyAndSize == null) { // pass through messages without a key
messageToAdd = Optional.of(m);
} else if ((msg = latestForKey.get(keyAndSize.getLeft())) != null
&& msg.equals(id)) { // consider message only if present into latestForKey map
if (keyAndSize.getRight() <= 0) {
promise.completeExceptionally(new IllegalArgumentException(
"Compaction phase found empty record from sorted key-map"));
}
messageToAdd = Optional.of(m);
}
}
}

if (messageToAdd.isPresent()) {
try {
outstanding.acquire();
CompletableFuture<Void> addFuture = addToCompactedLedger(lh, messageToAdd.get())
.whenComplete((res, exception2) -> {
outstanding.release();
if (exception2 != null) {
promise.completeExceptionally(exception2);
if (messageToAdd.isPresent()) {
RawMessage message = messageToAdd.get();
try {
outstanding.acquire();
CompletableFuture<Void> addFuture = addToCompactedLedger(lh, message)
.whenComplete((res, exception2) -> {
outstanding.release();
if (exception2 != null) {
promise.completeExceptionally(exception2);
}
});
if (to.equals(id)) {
addFuture.whenComplete((res, exception2) -> {
if (exception2 == null) {
promise.complete(null);
}
});
if (to.equals(id)) {
addFuture.whenComplete((res, exception2) -> {
if (exception2 == null) {
promise.complete(null);
}
});
}
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
promise.completeExceptionally(ie);
} finally {
if (message != m) {
message.close();
}
}
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
promise.completeExceptionally(ie);
}
} else if (to.equals(id)) {
// Reached to last-id and phase-one found it deleted-message while iterating on ledger so, not
// present under latestForKey. Complete the compaction.
try {
// make sure all inflight writes have finished
outstanding.acquire(MAX_OUTSTANDING);
promise.complete(null);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
promise.completeExceptionally(e);
} else if (to.equals(id)) {
// Reached to last-id and phase-one found it deleted-message while iterating on ledger so,
// not present under latestForKey. Complete the compaction.
try {
// make sure all inflight writes have finished
outstanding.acquire(MAX_OUTSTANDING);
promise.complete(null);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
promise.completeExceptionally(e);
}
return;
}
return;
phaseTwoLoop(reader, to, latestForKey, lh, outstanding, promise);
} finally {
m.close();
}
phaseTwoLoop(reader, to, latestForKey, lh, outstanding, promise);
}, scheduler);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,13 +319,13 @@ public void testBatchingRebatch() throws Exception {
}

RawReader reader = RawReader.create(pulsarClient, topic, subscription).get();
try {
RawMessage m1 = reader.readNextAsync().get();
try (RawMessage m1 = reader.readNextAsync().get()) {
RawMessage m2 = RawBatchConverter.rebatchMessage(m1, (key, id) -> key.equals("key2")).get();
List<ImmutablePair<MessageId,String>> idsAndKeys = RawBatchConverter.extractIdsAndKeys(m2);
Assert.assertEquals(idsAndKeys.size(), 1);
Assert.assertEquals(idsAndKeys.get(0).getRight(), "key2");
m2.close();
Assert.assertEquals(m1.getHeadersAndPayload().refCnt(), 1);
} finally {
reader.closeAsync().get();
}
Expand Down

0 comments on commit 5911b8b

Please sign in to comment.