Skip to content

Commit

Permalink
[Broker] Fix replicated subscriptions direct memory leak (#11396)
Browse files Browse the repository at this point in the history
- .release() wasn't called for marker messages sent by ReplicatedSubscriptionsController.
- Use writeMarker method to publish marker messages and call release in try-finally block.

(cherry picked from commit 8841c81)
  • Loading branch information
lhotari committed Jul 22, 2021
1 parent d6c2667 commit 54b05fe
Showing 1 changed file with 7 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public void localSubscriptionUpdated(String subscriptionName, ReplicatedSubscrip
}

ByteBuf subscriptionUpdate = Markers.newReplicatedSubscriptionsUpdate(subscriptionName, clusterIds);
topic.publishMessage(subscriptionUpdate, this);
writeMarker(subscriptionUpdate);
}

private void receivedSnapshotRequest(ReplicatedSubscriptionsSnapshotRequest request) {
Expand All @@ -134,8 +134,7 @@ private void receivedSnapshotRequest(ReplicatedSubscriptionsSnapshotRequest requ
request.getSourceCluster(),
localCluster,
lastMsgId.getLedgerId(), lastMsgId.getEntryId());

topic.publishMessage(marker, this);
writeMarker(marker);
}

private void receivedSnapshotResponse(Position position, ReplicatedSubscriptionsSnapshotResponse response) {
Expand Down Expand Up @@ -228,7 +227,11 @@ void snapshotCompleted(String snapshotId) {
}

void writeMarker(ByteBuf marker) {
topic.publishMessage(marker, this);
try {
topic.publishMessage(marker, this);
} finally {
marker.release();
}
}

/**
Expand Down

0 comments on commit 54b05fe

Please sign in to comment.