Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix memory leak when running topic compaction. #6485

Merged
merged 1 commit into from
Mar 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,7 @@ public static List<ImmutablePair<MessageId,String>> extractIdsAndKeys(RawMessage
* Take a batched message and a filter, and returns a message with the only the sub-messages
* which match the filter. Returns an empty optional if no messages match.
*
* This takes ownership of the passes in message, and if the returned optional is not empty,
* the ownership of that message is returned also.
* NOTE: this message does not alter the reference count of the RawMessage argument.
*/
public static Optional<RawMessage> rebatchMessage(RawMessage msg,
BiPredicate<String, MessageId> filter)
Expand Down Expand Up @@ -161,9 +160,9 @@ public static Optional<RawMessage> rebatchMessage(RawMessage msg,
return Optional.empty();
}
} finally {
uncompressedPayload.release();
batchBuffer.release();
metadata.recycle();
msg.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -164,12 +164,19 @@ private static CompletableFuture<MessageIdData> readOneMessageId(LedgerHandle lh
if (rc != BKException.Code.OK) {
promise.completeExceptionally(BKException.create(rc));
} else {
try (RawMessage m = RawMessageImpl.deserializeFrom(
seq.nextElement().getEntryBuffer())) {
promise.complete(m.getMessageIdData());
} catch (NoSuchElementException e) {
log.error("No such entry {} in ledger {}", entryId, lh.getId());
promise.completeExceptionally(e);
// Need to release buffers for all entries in the sequence
if (seq.hasMoreElements()) {
LedgerEntry entry = seq.nextElement();
try (RawMessage m = RawMessageImpl.deserializeFrom(entry.getEntryBuffer())) {
entry.getEntryBuffer().release();
while (seq.hasMoreElements()) {
seq.nextElement().getEntryBuffer().release();
}
promise.complete(m.getMessageIdData());
}
} else {
promise.completeExceptionally(new NoSuchElementException(
String.format("No such entry %d in ledger %d", entryId, lh.getId())));
}
}
}, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,77 +212,88 @@ private CompletableFuture<Long> phaseTwoSeekThenLoop(RawReader reader, MessageId

private void phaseTwoLoop(RawReader reader, MessageId to, Map<String, MessageId> latestForKey,
LedgerHandle lh, Semaphore outstanding, CompletableFuture<Void> promise) {
if (promise.isDone()) {
return;
}
reader.readNextAsync().whenCompleteAsync(
(m, exception) -> {
if (exception != null) {
promise.completeExceptionally(exception);
return;
} else if (promise.isDone()) {
m.close();
return;
}
MessageId id = m.getMessageId();
Optional<RawMessage> messageToAdd = Optional.empty();
if (RawBatchConverter.isReadableBatch(m)) {
try {
messageToAdd = RawBatchConverter.rebatchMessage(
m, (key, subid) -> latestForKey.get(key).equals(subid));
} catch (IOException ioe) {
log.info("Error decoding batch for message {}. Whole batch will be included in output",
id, ioe);
messageToAdd = Optional.of(m);
}
} else {
Pair<String,Integer> keyAndSize = extractKeyAndSize(m);
MessageId msg;
if (keyAndSize == null) { // pass through messages without a key
messageToAdd = Optional.of(m);
} else if ((msg = latestForKey.get(keyAndSize.getLeft())) != null
&& msg.equals(id)) { // consider message only if present into latestForKey map
if (keyAndSize.getRight() <= 0) {
promise.completeExceptionally(new IllegalArgumentException(
"Compaction phase found empty record from sorted key-map"));
try {
MessageId id = m.getMessageId();
Optional<RawMessage> messageToAdd = Optional.empty();
if (RawBatchConverter.isReadableBatch(m)) {
try {
messageToAdd = RawBatchConverter.rebatchMessage(
m, (key, subid) -> latestForKey.get(key).equals(subid));
} catch (IOException ioe) {
log.info("Error decoding batch for message {}. Whole batch will be included in output",
id, ioe);
messageToAdd = Optional.of(m);
}
messageToAdd = Optional.of(m);
} else {
m.close();
Pair<String,Integer> keyAndSize = extractKeyAndSize(m);
MessageId msg;
if (keyAndSize == null) { // pass through messages without a key
messageToAdd = Optional.of(m);
} else if ((msg = latestForKey.get(keyAndSize.getLeft())) != null
&& msg.equals(id)) { // consider message only if present into latestForKey map
if (keyAndSize.getRight() <= 0) {
promise.completeExceptionally(new IllegalArgumentException(
"Compaction phase found empty record from sorted key-map"));
}
messageToAdd = Optional.of(m);
}
}
}

if (messageToAdd.isPresent()) {
try {
outstanding.acquire();
CompletableFuture<Void> addFuture = addToCompactedLedger(lh, messageToAdd.get())
.whenComplete((res, exception2) -> {
outstanding.release();
if (exception2 != null) {
promise.completeExceptionally(exception2);
if (messageToAdd.isPresent()) {
RawMessage message = messageToAdd.get();
try {
outstanding.acquire();
CompletableFuture<Void> addFuture = addToCompactedLedger(lh, message)
.whenComplete((res, exception2) -> {
outstanding.release();
if (exception2 != null) {
promise.completeExceptionally(exception2);
}
});
if (to.equals(id)) {
addFuture.whenComplete((res, exception2) -> {
if (exception2 == null) {
promise.complete(null);
}
});
if (to.equals(id)) {
addFuture.whenComplete((res, exception2) -> {
if (exception2 == null) {
promise.complete(null);
}
});
}
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
promise.completeExceptionally(ie);
} finally {
if (message != m) {
message.close();
}
}
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
promise.completeExceptionally(ie);
}
} else if (to.equals(id)) {
// Reached to last-id and phase-one found it deleted-message while iterating on ledger so, not
// present under latestForKey. Complete the compaction.
try {
// make sure all inflight writes have finished
outstanding.acquire(MAX_OUTSTANDING);
promise.complete(null);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
promise.completeExceptionally(e);
} else if (to.equals(id)) {
// Reached to last-id and phase-one found it deleted-message while iterating on ledger so,
// not present under latestForKey. Complete the compaction.
try {
// make sure all inflight writes have finished
outstanding.acquire(MAX_OUTSTANDING);
promise.complete(null);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
promise.completeExceptionally(e);
}
return;
}
return;
phaseTwoLoop(reader, to, latestForKey, lh, outstanding, promise);
} finally {
m.close();
}
phaseTwoLoop(reader, to, latestForKey, lh, outstanding, promise);
}, scheduler);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,13 +319,13 @@ public void testBatchingRebatch() throws Exception {
}

RawReader reader = RawReader.create(pulsarClient, topic, subscription).get();
try {
RawMessage m1 = reader.readNextAsync().get();
try (RawMessage m1 = reader.readNextAsync().get()) {
RawMessage m2 = RawBatchConverter.rebatchMessage(m1, (key, id) -> key.equals("key2")).get();
List<ImmutablePair<MessageId,String>> idsAndKeys = RawBatchConverter.extractIdsAndKeys(m2);
Assert.assertEquals(idsAndKeys.size(), 1);
Assert.assertEquals(idsAndKeys.get(0).getRight(), "key2");
m2.close();
Assert.assertEquals(m1.getHeadersAndPayload().refCnt(), 1);
} finally {
reader.closeAsync().get();
}
Expand Down