Skip to content

Commit

Permalink
Fix waitingCursors memory leak
Browse files Browse the repository at this point in the history
  • Loading branch information
gavingaozhangmin committed Feb 14, 2022
1 parent 1c0e17d commit 9018954
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3482,6 +3482,7 @@ public void deactivateCursor(ManagedCursor cursor) {
if (!cursor.isDurable()) {
nonDurableActiveCursors.removeCursor(cursor.getName());
}
waitingCursors.remove(cursor);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,20 +32,19 @@
import java.io.IOException;
import javax.ws.rs.ClientErrorException;
import javax.ws.rs.core.Response.Status;

import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.http.HttpResponse;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.PulsarAdminException.ConflictException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.*;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
Expand Down Expand Up @@ -348,4 +347,17 @@ public void createSubscriptionBySpecifyingStringPosition() throws IOException, P

producer.close();
}

@Test
public void testWaitingCurosrCausedMemoryLeak() throws Exception {
String topic = "persistent://my-property/my-ns/my-topic";
for (int i=0;i<10;i++) {
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(topic)
.subscriptionType(SubscriptionType.Failover).subscriptionName("test" + i).subscribe();
consumer.close();
}
PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topic).get();
ManagedLedgerImpl ml = (ManagedLedgerImpl)(topicRef.getManagedLedger());
assertEquals(ml.getWaitingCursorsCount(), 0);
}
}

0 comments on commit 9018954

Please sign in to comment.