Skip to content

Commit

Permalink
[fix][test] Fix jvm oom on Unit Test broker group 1 (#16542)
Browse files Browse the repository at this point in the history
(cherry picked from commit 3752a11)
  • Loading branch information
codelipenghui committed Jul 15, 2022
1 parent daeb418 commit 4868463
Showing 1 changed file with 15 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,17 @@ protected void cleanup() throws Exception {
super.internalCleanup();
}

@Override
protected PulsarClient createNewPulsarClient(ClientBuilder clientBuilder) throws PulsarClientException {
ClientConfigurationData conf =
((ClientBuilderImpl) clientBuilder).getClientConfigurationData();
return new PulsarClientImpl(conf) {
// test that reproduces the issue https://github.com/apache/pulsar/issues/12024
// where closing the consumer leads to an endless receive loop
@Test
public void testMultiTopicsConsumerCloses() throws Exception {
String topicNameBase = "persistent://my-property/my-ns/my-topic-consumer-closes-";

ClientConfigurationData conf = ((ClientBuilderImpl) PulsarClient.builder().serviceUrl(lookupUrl.toString()))
.getClientConfigurationData();

@Cleanup
PulsarClientImpl client = new PulsarClientImpl(conf) {
{
ScheduledExecutorService internalExecutorService =
(ScheduledExecutorService) super.getScheduledExecutorProvider().getExecutor();
Expand All @@ -85,31 +91,23 @@ public ExecutorService getInternalExecutorService() {
return internalExecutorServiceDelegate;
}
};
}

// test that reproduces the issue https://github.com/apache/pulsar/issues/12024
// where closing the consumer leads to an endless receive loop
@Test
public void testMultiTopicsConsumerCloses() throws Exception {
String topicNameBase = "persistent://my-property/my-ns/my-topic-consumer-closes-";

@Cleanup
Producer<byte[]> producer1 = pulsarClient.newProducer()
Producer<byte[]> producer1 = client.newProducer()
.topic(topicNameBase + "1")
.enableBatching(false)
.create();
@Cleanup
Producer<byte[]> producer2 = pulsarClient.newProducer()
Producer<byte[]> producer2 = client.newProducer()
.topic(topicNameBase + "2")
.enableBatching(false)
.create();
@Cleanup
Producer<byte[]> producer3 = pulsarClient.newProducer()
Producer<byte[]> producer3 = client.newProducer()
.topic(topicNameBase + "3")
.enableBatching(false)
.create();

Consumer<byte[]> consumer = pulsarClient
Consumer<byte[]> consumer = client
.newConsumer()
.topics(Lists.newArrayList(topicNameBase + "1", topicNameBase + "2", topicNameBase + "3"))
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
Expand Down

0 comments on commit 4868463

Please sign in to comment.