Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][client] The partitionedProducer maxPendingMessages always is 0 #23593

Merged
merged 1 commit into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.LookupService;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.PartitionedProducerImpl;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
Expand Down Expand Up @@ -1340,6 +1341,49 @@ public void testProducerQueueFullBlocking() throws Exception {
setup();
}

@Test
public void testProducerQueueFullBlockingWithPartitionedTopic() throws Exception {
final String topicName = "persistent://prop/ns-abc/topic-xyzx2";
admin.topics().createPartitionedTopic(topicName, 2);

@Cleanup
PulsarClient client = PulsarClient.builder().serviceUrl(brokerUrl.toString()).build();

// 1. Producer connect
PartitionedProducerImpl<byte[]> producer = (PartitionedProducerImpl<byte[]>) client.newProducer()
.topic(topicName)
.maxPendingMessages(1)
.blockIfQueueFull(true)
.sendTimeout(1, TimeUnit.SECONDS)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();

// 2. Stop broker
cleanup();

// 2. producer publish messages
long startTime = System.nanoTime();
producer.sendAsync("msg".getBytes());

// Verify thread was not blocked
long delayNs = System.nanoTime() - startTime;
assertTrue(delayNs < TimeUnit.SECONDS.toNanos(1));

// Next send operation must block, until all the messages in the queue expire
startTime = System.nanoTime();
producer.sendAsync("msg".getBytes());
delayNs = System.nanoTime() - startTime;
assertTrue(delayNs > TimeUnit.MILLISECONDS.toNanos(500));
assertTrue(delayNs < TimeUnit.MILLISECONDS.toNanos(1500));

// 4. producer disconnect
producer.close();

// 5. Restart broker
setup();
}

@Test
public void testProducerQueueFullNonBlocking() throws Exception {
final String topicName = "persistent://prop/ns-abc/topic-xyzx";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.pulsar.client.impl;

import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.pulsar.client.impl.conf.ProducerConfigurationData.DEFAULT_MAX_PENDING_MESSAGES;
import static org.apache.pulsar.client.impl.conf.ProducerConfigurationData.DEFAULT_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import io.netty.util.Timeout;
Expand Down Expand Up @@ -84,9 +86,16 @@ public PartitionedProducerImpl(PulsarClientImpl client, String topic, ProducerCo
: null;

// MaxPendingMessagesAcrossPartitions doesn't support partial partition such as SinglePartition correctly
int maxPendingMessages = Math.min(conf.getMaxPendingMessages(),
conf.getMaxPendingMessagesAcrossPartitions() / numPartitions);
conf.setMaxPendingMessages(maxPendingMessages);
int maxPendingMessages = conf.getMaxPendingMessages();
int maxPendingMessagesAcrossPartitions = conf.getMaxPendingMessagesAcrossPartitions();
if (maxPendingMessagesAcrossPartitions != DEFAULT_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS) {
int maxPendingMsgsForOnePartition = maxPendingMessagesAcrossPartitions / numPartitions;
maxPendingMessages = (maxPendingMessages == DEFAULT_MAX_PENDING_MESSAGES)
? maxPendingMsgsForOnePartition
: Math.min(maxPendingMessages, maxPendingMsgsForOnePartition);
conf.setMaxPendingMessages(maxPendingMessages);
}


final List<Integer> indexList;
if (conf.isLazyStartPartitionedProducers()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,44 @@ public void testGetNumOfPartitions() throws Exception {
assertEquals(producerImpl.getNumOfPartitions(), 0);
}

@Test
public void testMaxPendingQueueSize() throws Exception {
String topicName = "test-max-pending-queue-size";
ClientConfigurationData conf = new ClientConfigurationData();
conf.setServiceUrl("pulsar://localhost:6650");
conf.setStatsIntervalSeconds(100);

ThreadFactory threadFactory = new DefaultThreadFactory("client-test-stats", Thread.currentThread().isDaemon());
@Cleanup("shutdownGracefully")
EventLoopGroup eventLoopGroup = EventLoopUtil.newEventLoopGroup(conf.getNumIoThreads(), false, threadFactory);

@Cleanup
PulsarClientImpl clientImpl = new PulsarClientImpl(conf, eventLoopGroup);

// Test set maxPendingMessage to 10
ProducerConfigurationData producerConfData = new ProducerConfigurationData();
producerConfData.setMessageRoutingMode(MessageRoutingMode.CustomPartition);
producerConfData.setCustomMessageRouter(new CustomMessageRouter());
producerConfData.setMaxPendingMessages(10);
PartitionedProducerImpl partitionedProducerImpl = new PartitionedProducerImpl(
clientImpl, topicName, producerConfData, 1, null, null, null);
assertEquals(partitionedProducerImpl.getConfiguration().getMaxPendingMessages(), 10);

// Test set MaxPendingMessagesAcrossPartitions=5
producerConfData.setMaxPendingMessages(ProducerConfigurationData.DEFAULT_MAX_PENDING_MESSAGES);
producerConfData.setMaxPendingMessagesAcrossPartitions(5);
partitionedProducerImpl = new PartitionedProducerImpl(
clientImpl, topicName, producerConfData, 1, null, null, null);
assertEquals(partitionedProducerImpl.getConfiguration().getMaxPendingMessages(), 5);

// Test set maxPendingMessage=10 and MaxPendingMessagesAcrossPartitions=10 with 2 partitions
producerConfData.setMaxPendingMessages(10);
producerConfData.setMaxPendingMessagesAcrossPartitions(10);
partitionedProducerImpl = new PartitionedProducerImpl(
clientImpl, topicName, producerConfData, 2, null, null, null);
assertEquals(partitionedProducerImpl.getConfiguration().getMaxPendingMessages(), 5);
}


@Test
public void testOnTopicsExtended() throws Exception {
Expand Down
Loading