diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index a36a05d2ae2b0..4855d2ef41ec4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -1374,6 +1374,10 @@ public synchronized PulsarClient getClient() throws PulsarServerException { if (this.client == null) { try { ClientConfigurationData conf = new ClientConfigurationData(); + + // Disable memory limit for broker client + conf.setMemoryLimitBytes(0); + conf.setServiceUrl(this.getConfiguration().isTlsEnabled() ? this.brokerServiceUrlTls : this.brokerServiceUrl); conf.setTlsAllowInsecureConnection(this.getConfiguration().isTlsAllowInsecureConnection()); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index ff3d248d592b9..6fe48994bf0c3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -127,6 +127,7 @@ import org.apache.pulsar.client.admin.PulsarAdminBuilder; import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.SizeUnit; import org.apache.pulsar.client.impl.ClientBuilderImpl; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; @@ -1199,6 +1200,10 @@ public PulsarClient getReplicationClient(String cluster, Optional c .enableTcpNoDelay(false) .connectionsPerBroker(pulsar.getConfiguration().getReplicationConnectionsPerBroker()) .statsInterval(0, TimeUnit.SECONDS); + + // Disable memory limit for replication client + clientBuilder.memoryLimit(0, SizeUnit.BYTES); + if (data.getAuthenticationPlugin() != null && data.getAuthenticationParameters() != null) { clientBuilder.authentication(data.getAuthenticationPlugin(), data.getAuthenticationParameters()); } else if (pulsar.getConfiguration().isAuthenticationEnabled()) { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MemoryLimitController.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MemoryLimitController.java index c3a78e491454e..47c0c7e2b02cc 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MemoryLimitController.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MemoryLimitController.java @@ -80,4 +80,8 @@ public void releaseMemory(long size) { public long currentUsage() { return currentUsage.get(); } + + public boolean isMemoryLimited() { + return memoryLimit > 0; + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index 7930477b4a97d..c2bf8a216e292 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -92,6 +92,10 @@ public class PulsarClientImpl implements PulsarClient { private static final Logger log = LoggerFactory.getLogger(PulsarClientImpl.class); + // default limits for producers when memory limit controller is disabled + private static final int NO_MEMORY_LIMIT_DEFAULT_MAX_PENDING_MESSAGES = 1000; + private static final int NO_MEMORY_LIMIT_DEFAULT_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS = 50000; + protected final ClientConfigurationData conf; private final boolean createdExecutorProviders; private LookupService lookup; @@ -251,7 +255,14 @@ public ProducerBuilder newProducer() { @Override public ProducerBuilder newProducer(Schema schema) { - return new ProducerBuilderImpl<>(this, schema); + ProducerBuilderImpl producerBuilder = new ProducerBuilderImpl<>(this, schema); + if (!memoryLimitController.isMemoryLimited()) { + // set default limits for producers when memory limit controller is disabled + producerBuilder.maxPendingMessages(NO_MEMORY_LIMIT_DEFAULT_MAX_PENDING_MESSAGES); + producerBuilder.maxPendingMessagesAcrossPartitions( + NO_MEMORY_LIMIT_DEFAULT_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS); + } + return producerBuilder; } @Override