Skip to content

Commit

Permalink
[Broker] Fix replicated subscriptions direct memory leak (apache#11396)
Browse files Browse the repository at this point in the history
- .release() wasn't called for marker messages sent by ReplicatedSubscriptionsController. 
- Use writeMarker method to publish marker messages and call release in try-finally block.
  • Loading branch information
lhotari authored and Technoboy- committed Jul 22, 2021
1 parent 8543df1 commit 939bb64
Showing 1 changed file with 7 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ public void localSubscriptionUpdated(String subscriptionName, ReplicatedSubscrip
}

ByteBuf subscriptionUpdate = Markers.newReplicatedSubscriptionsUpdate(subscriptionName, clusterIds);
topic.publishMessage(subscriptionUpdate, this);
writeMarker(subscriptionUpdate);
}

private void receivedSnapshotRequest(ReplicatedSubscriptionsSnapshotRequest request) {
Expand All @@ -140,8 +140,7 @@ private void receivedSnapshotRequest(ReplicatedSubscriptionsSnapshotRequest requ
request.getSourceCluster(),
localCluster,
lastMsgId.getLedgerId(), lastMsgId.getEntryId());

topic.publishMessage(marker, this);
writeMarker(marker);
}

private void receivedSnapshotResponse(Position position, ReplicatedSubscriptionsSnapshotResponse response) {
Expand Down Expand Up @@ -276,7 +275,11 @@ void snapshotCompleted(String snapshotId) {
}

void writeMarker(ByteBuf marker) {
topic.publishMessage(marker, this);
try {
topic.publishMessage(marker, this);
} finally {
marker.release();
}
}

/**
Expand Down

0 comments on commit 939bb64

Please sign in to comment.