Skip to content

Commit

Permalink
[fix][tools] Only apply maxPendingMessagesAcrossPartitions if it pres…
Browse files Browse the repository at this point in the history
…ents (#15283)
  • Loading branch information
codelipenghui authored Apr 24, 2022
1 parent a812f29 commit 188d4f4
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -597,10 +597,12 @@ private static void runProducer(int producerId,
.sendTimeout(arguments.sendTimeout, TimeUnit.SECONDS) //
.compressionType(arguments.compression) //
.maxPendingMessages(arguments.maxOutstanding) //
.maxPendingMessagesAcrossPartitions(arguments.maxPendingMessagesAcrossPartitions)
.accessMode(arguments.producerAccessMode)
// enable round robin message routing if it is a partitioned topic
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition);
if (arguments.maxPendingMessagesAcrossPartitions > 0) {
producerBuilder.maxPendingMessagesAcrossPartitions(arguments.maxPendingMessagesAcrossPartitions);
}

AtomicReference<Transaction> transactionAtomicReference;
if (arguments.isEnableTransaction) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,4 +190,25 @@ public void testDefaultIMessageFormatter() {
Assert.assertTrue(msgFormatter instanceof DefaultMessageFormatter);
}

@Test
public void testMaxOutstanding() throws Exception {
String argString = "%s -r 10 -u %s -au %s -m 5 -o 10000";
String topic = testTopic + UUID.randomUUID().toString();
String args = String.format(argString, topic, pulsar.getBrokerServiceUrl(), pulsar.getWebServiceAddress());
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("sub")
.subscriptionType(SubscriptionType.Key_Shared).subscribe();
new Thread(() -> {
try {
PerformanceProducer.main(args.split(" "));
} catch (Exception e) {
log.error("Failed to start perf producer");
}
}).start();
Awaitility.await()
.untilAsserted(() -> {
Message<byte[]> message = consumer.receive(3, TimeUnit.SECONDS);
assertNotNull(message);
});
consumer.close();
}
}

0 comments on commit 188d4f4

Please sign in to comment.