From 31589361fcea3c86686a45368b4d409020c61caa Mon Sep 17 00:00:00 2001 From: sunheyi <1061867552@qq.com> Date: Mon, 6 Feb 2023 09:55:58 +0800 Subject: [PATCH 1/2] Add command line option for configuring the memory limit --- .../org/apache/pulsar/broker/namespace/NamespaceService.java | 2 +- .../java/org/apache/pulsar/compaction/CompactorTool.java | 2 +- .../java/org/apache/pulsar/client/cli/PulsarClientTool.java | 5 ++++- .../org/apache/pulsar/functions/instance/InstanceUtils.java | 2 +- .../java/org/apache/pulsar/functions/worker/WorkerUtils.java | 2 +- .../org/apache/pulsar/testclient/LoadSimulationClient.java | 5 ++++- .../java/org/apache/pulsar/testclient/PerfClientUtils.java | 4 ++-- .../apache/pulsar/testclient/PerformanceBaseArguments.java | 3 +++ .../java/org/apache/pulsar/websocket/WebSocketService.java | 2 +- 9 files changed, 18 insertions(+), 9 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index abbabcd3b00a1..43d1748da85cf 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -1353,7 +1353,7 @@ public PulsarClientImpl getNamespaceClient(ClusterDataImpl cluster) { return namespaceClients.computeIfAbsent(cluster, key -> { try { ClientBuilder clientBuilder = PulsarClient.builder() - .memoryLimit(0, SizeUnit.BYTES) + .memoryLimit(rootParams.memory, SizeUnit.BYTES) .enableTcpNoDelay(false) .statsInterval(0, TimeUnit.SECONDS); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java index c359823549fcd..add1b6a372b9e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java @@ -67,7 +67,7 @@ private static class Arguments { public static PulsarClient createClient(ServiceConfiguration brokerConfig) throws PulsarClientException { ClientBuilder clientBuilder = PulsarClient.builder() - .memoryLimit(0, SizeUnit.BYTES); + .memoryLimit(rootParams.memory, SizeUnit.BYTES); // Apply all arbitrary configuration. This must be called before setting any fields annotated as // @Secret on the ClientConfigurationData object because of the way they are serialized. diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/PulsarClientTool.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/PulsarClientTool.java index 2c3e6935b515e..5c4eac1b9b316 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/PulsarClientTool.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/PulsarClientTool.java @@ -76,6 +76,9 @@ public static class RootParams { @Parameter(names = { "--tlsTrustCertsFilePath" }, description = "File path to client trust certificates") String tlsTrustCertsFilePath; + + @Parameter(names = { "-m", "--memory", }, description = "Configure the Pulsar client memory limit") + Long memory=0L; } protected RootParams rootParams; @@ -162,7 +165,7 @@ protected void initRootParamsFromProperties(Properties properties) { private void updateConfig() throws UnsupportedAuthenticationException { ClientBuilder clientBuilder = PulsarClient.builder() - .memoryLimit(0, SizeUnit.BYTES); + .memoryLimit(rootParams.memory, SizeUnit.BYTES); Authentication authentication = null; if (isNotBlank(this.rootParams.authPluginClassName)) { authentication = AuthenticationFactory.create(rootParams.authPluginClassName, rootParams.authParams); diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java index bec738af6c99d..7ea85e517025a 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java @@ -158,7 +158,7 @@ public static ClientBuilder createPulsarClientBuilder(String pulsarServiceUrl, ClientBuilder clientBuilder = null; if (isNotBlank(pulsarServiceUrl)) { clientBuilder = PulsarClient.builder() - .memoryLimit(0, SizeUnit.BYTES) + .memoryLimit(memoryLimit.get(), SizeUnit.BYTES) .serviceUrl(pulsarServiceUrl); if (authConfig != null) { if (isNotBlank(authConfig.getClientAuthenticationPlugin()) diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java index af1edf5c8e80d..36ec0851942ef 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java @@ -282,7 +282,7 @@ public static PulsarClient getPulsarClient(String pulsarServiceUrl, String authP try { ClientBuilder clientBuilder = PulsarClient.builder() - .memoryLimit(0, SizeUnit.BYTES) + .memoryLimit(rootParams.memory, SizeUnit.BYTES) .serviceUrl(pulsarServiceUrl); if (workerConfig != null) { diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/LoadSimulationClient.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/LoadSimulationClient.java index 64330ae2eeea1..b6f6723772a6b 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/LoadSimulationClient.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/LoadSimulationClient.java @@ -180,6 +180,9 @@ private static class MainArguments { @Parameter(names = { "--service-url" }, description = "Pulsar Service URL", required = true) public String serviceURL; + + @Parameter(names = { "-m", "--memory", }, description = "Configure the Pulsar client memory limit") + Long memory=0L; } // Configuration class for initializing or modifying TradeUnits. @@ -318,7 +321,7 @@ public LoadSimulationClient(final MainArguments arguments) throws Exception { .serviceHttpUrl(arguments.serviceURL) .build(); client = PulsarClient.builder() - .memoryLimit(0, SizeUnit.BYTES) + .memoryLimit(arguments.memory, SizeUnit.BYTES) .serviceUrl(arguments.serviceURL) .connectionsPerBroker(4) .ioThreads(Runtime.getRuntime().availableProcessors()) diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerfClientUtils.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerfClientUtils.java index e40e9610bf42f..ceb3becad54ef 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerfClientUtils.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerfClientUtils.java @@ -4,7 +4,7 @@ * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance + * "License"); you may not use this file except in cfompliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 @@ -67,7 +67,7 @@ public static ClientBuilder createClientBuilderFromArguments(PerformanceBaseArgu throws PulsarClientException.UnsupportedAuthenticationException { ClientBuilder clientBuilder = PulsarClient.builder() - .memoryLimit(0, SizeUnit.BYTES) + .memoryLimit(arguments.memory, SizeUnit.BYTES) .serviceUrl(arguments.serviceURL) .connectionsPerBroker(arguments.maxConnections) .ioThreads(arguments.ioThreads) diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceBaseArguments.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceBaseArguments.java index ce402884d5ced..2e2178e0126c6 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceBaseArguments.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceBaseArguments.java @@ -94,6 +94,9 @@ public abstract class PerformanceBaseArguments { @Parameter(names = { "--proxy-protocol" }, description = "Proxy protocol to select type of routing at proxy.") ProxyProtocol proxyProtocol = null; + @Parameter(names = { "-m", "--memory", }, description = "Configure the Pulsar client memory limit") + Long memory=0L; + public abstract void fillArgumentsFromProperties(Properties prop); @SneakyThrows diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java index 9a8653029ce4c..74a7d3bbc697b 100644 --- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java @@ -195,7 +195,7 @@ public synchronized void setLocalCluster(ClusterData clusterData) { private PulsarClient createClientInstance(ClusterData clusterData) throws IOException { ClientBuilder clientBuilder = PulsarClient.builder() // - .memoryLimit(0, SizeUnit.BYTES) + .memoryLimit(rootParams.memory, SizeUnit.BYTES) .statsInterval(0, TimeUnit.SECONDS) // .enableTls(config.isTlsEnabled()) // .allowTlsInsecureConnection(config.isTlsAllowInsecureConnection()) // From f128bf3b8fe336609476614791c38f9c23537431 Mon Sep 17 00:00:00 2001 From: sunheyi <1061867552@qq.com> Date: Fri, 10 Feb 2023 08:46:06 +0800 Subject: [PATCH 2/2] update note --- .../main/java/org/apache/pulsar/testclient/PerfClientUtils.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerfClientUtils.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerfClientUtils.java index ceb3becad54ef..57fe1a3fc2368 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerfClientUtils.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerfClientUtils.java @@ -4,7 +4,7 @@ * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in cfompliance + * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0