Skip to content

Commit

Permalink
[Broker] Disable memory limit controller for broker client and replic…
Browse files Browse the repository at this point in the history
…ation clients (apache#15723)

- disable memory limit by default for broker client and replication clients
- restore maxPendingMessages and maxPendingMessagesAcrossPartitions when memory limit is
  disabled so that pre-PIP-120 default configuration is restored when limit is disabled
  • Loading branch information
lhotari committed May 24, 2022
1 parent abec5d8 commit 4a2f75e
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -1374,6 +1374,10 @@ public synchronized PulsarClient getClient() throws PulsarServerException {
if (this.client == null) {
try {
ClientConfigurationData conf = new ClientConfigurationData();

// Disable memory limit for broker client
conf.setMemoryLimitBytes(0);

conf.setServiceUrl(this.getConfiguration().isTlsEnabled()
? this.brokerServiceUrlTls : this.brokerServiceUrl);
conf.setTlsAllowInsecureConnection(this.getConfiguration().isTlsAllowInsecureConnection());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@
import org.apache.pulsar.client.admin.PulsarAdminBuilder;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.SizeUnit;
import org.apache.pulsar.client.impl.ClientBuilderImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
Expand Down Expand Up @@ -1199,6 +1200,10 @@ public PulsarClient getReplicationClient(String cluster, Optional<ClusterData> c
.enableTcpNoDelay(false)
.connectionsPerBroker(pulsar.getConfiguration().getReplicationConnectionsPerBroker())
.statsInterval(0, TimeUnit.SECONDS);

// Disable memory limit for replication client
clientBuilder.memoryLimit(0, SizeUnit.BYTES);

if (data.getAuthenticationPlugin() != null && data.getAuthenticationParameters() != null) {
clientBuilder.authentication(data.getAuthenticationPlugin(), data.getAuthenticationParameters());
} else if (pulsar.getConfiguration().isAuthenticationEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,8 @@ public void releaseMemory(long size) {
public long currentUsage() {
return currentUsage.get();
}

public boolean isMemoryLimited() {
return memoryLimit > 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ public class PulsarClientImpl implements PulsarClient {

private static final Logger log = LoggerFactory.getLogger(PulsarClientImpl.class);

// default limits for producers when memory limit controller is disabled
private static final int NO_MEMORY_LIMIT_DEFAULT_MAX_PENDING_MESSAGES = 1000;
private static final int NO_MEMORY_LIMIT_DEFAULT_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS = 50000;

protected final ClientConfigurationData conf;
private final boolean createdExecutorProviders;
private LookupService lookup;
Expand Down Expand Up @@ -251,7 +255,14 @@ public ProducerBuilder<byte[]> newProducer() {

@Override
public <T> ProducerBuilder<T> newProducer(Schema<T> schema) {
return new ProducerBuilderImpl<>(this, schema);
ProducerBuilderImpl<T> producerBuilder = new ProducerBuilderImpl<>(this, schema);
if (!memoryLimitController.isMemoryLimited()) {
// set default limits for producers when memory limit controller is disabled
producerBuilder.maxPendingMessages(NO_MEMORY_LIMIT_DEFAULT_MAX_PENDING_MESSAGES);
producerBuilder.maxPendingMessagesAcrossPartitions(
NO_MEMORY_LIMIT_DEFAULT_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS);
}
return producerBuilder;
}

@Override
Expand Down

0 comments on commit 4a2f75e

Please sign in to comment.