Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat][cli] Add command line option for configuring the memory limit #19434

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1353,7 +1353,7 @@ public PulsarClientImpl getNamespaceClient(ClusterDataImpl cluster) {
return namespaceClients.computeIfAbsent(cluster, key -> {
try {
ClientBuilder clientBuilder = PulsarClient.builder()
.memoryLimit(0, SizeUnit.BYTES)
.memoryLimit(rootParams.memory, SizeUnit.BYTES)
.enableTcpNoDelay(false)
.statsInterval(0, TimeUnit.SECONDS);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ private static class Arguments {

public static PulsarClient createClient(ServiceConfiguration brokerConfig) throws PulsarClientException {
ClientBuilder clientBuilder = PulsarClient.builder()
.memoryLimit(0, SizeUnit.BYTES);
.memoryLimit(rootParams.memory, SizeUnit.BYTES);

// Apply all arbitrary configuration. This must be called before setting any fields annotated as
// @Secret on the ClientConfigurationData object because of the way they are serialized.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ public static class RootParams {

@Parameter(names = { "--tlsTrustCertsFilePath" }, description = "File path to client trust certificates")
String tlsTrustCertsFilePath;

@Parameter(names = { "-m", "--memory", }, description = "Configure the Pulsar client memory limit")
Long memory=0L;
Comment on lines +80 to +81
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's pretty unconvenient to specify megabytes as bytes on the command line. Do we already have a parser that can accept typical units like M = megabyte etc. .

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think it should be changed to this ?

@parameter(names = { "-M", "--Memory", },

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shy-share IIUC it means you can specify -m 41M means configuring memory as 41MB.

Copy link
Contributor Author

@sunheyi6 sunheyi6 Feb 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tisonkun I know what you mean but I don't know how to write this. like this

    protected static final double MB = 1024 * 1024;
    @Parameter(names = { "-m", "--memory", }, description = "Configure the Pulsar client memory limit{MB}")
    Long memory=0*MB;

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lhotari @nicoloboschi Does our CLI tools already provide such functionality? I can read broker.conf options like:

# For Amazon S3 ledger offload, Max block size in bytes. (64MB by default, 5MB minimum)
s3ManagedLedgerOffloadMaxBlockSizeInBytes=67108864

# For Amazon S3 ledger offload, Read buffer size in bytes (1MB by default)
s3ManagedLedgerOffloadReadBufferSizeInBytes=1048576

}

protected RootParams rootParams;
Expand Down Expand Up @@ -162,7 +165,7 @@ protected void initRootParamsFromProperties(Properties properties) {

private void updateConfig() throws UnsupportedAuthenticationException {
ClientBuilder clientBuilder = PulsarClient.builder()
.memoryLimit(0, SizeUnit.BYTES);
.memoryLimit(rootParams.memory, SizeUnit.BYTES);
Authentication authentication = null;
if (isNotBlank(this.rootParams.authPluginClassName)) {
authentication = AuthenticationFactory.create(rootParams.authPluginClassName, rootParams.authParams);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ public static ClientBuilder createPulsarClientBuilder(String pulsarServiceUrl,
ClientBuilder clientBuilder = null;
if (isNotBlank(pulsarServiceUrl)) {
clientBuilder = PulsarClient.builder()
.memoryLimit(0, SizeUnit.BYTES)
.memoryLimit(memoryLimit.get(), SizeUnit.BYTES)
.serviceUrl(pulsarServiceUrl);
if (authConfig != null) {
if (isNotBlank(authConfig.getClientAuthenticationPlugin())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ public static PulsarClient getPulsarClient(String pulsarServiceUrl, String authP

try {
ClientBuilder clientBuilder = PulsarClient.builder()
.memoryLimit(0, SizeUnit.BYTES)
.memoryLimit(rootParams.memory, SizeUnit.BYTES)
.serviceUrl(pulsarServiceUrl);

if (workerConfig != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,9 @@ private static class MainArguments {

@Parameter(names = { "--service-url" }, description = "Pulsar Service URL", required = true)
public String serviceURL;

@Parameter(names = { "-m", "--memory", }, description = "Configure the Pulsar client memory limit")
Long memory=0L;
}

// Configuration class for initializing or modifying TradeUnits.
Expand Down Expand Up @@ -318,7 +321,7 @@ public LoadSimulationClient(final MainArguments arguments) throws Exception {
.serviceHttpUrl(arguments.serviceURL)
.build();
client = PulsarClient.builder()
.memoryLimit(0, SizeUnit.BYTES)
.memoryLimit(arguments.memory, SizeUnit.BYTES)
.serviceUrl(arguments.serviceURL)
.connectionsPerBroker(4)
.ioThreads(Runtime.getRuntime().availableProcessors())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public static ClientBuilder createClientBuilderFromArguments(PerformanceBaseArgu
throws PulsarClientException.UnsupportedAuthenticationException {

ClientBuilder clientBuilder = PulsarClient.builder()
.memoryLimit(0, SizeUnit.BYTES)
.memoryLimit(arguments.memory, SizeUnit.BYTES)
.serviceUrl(arguments.serviceURL)
.connectionsPerBroker(arguments.maxConnections)
.ioThreads(arguments.ioThreads)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ public abstract class PerformanceBaseArguments {
@Parameter(names = { "--proxy-protocol" }, description = "Proxy protocol to select type of routing at proxy.")
ProxyProtocol proxyProtocol = null;

@Parameter(names = { "-m", "--memory", }, description = "Configure the Pulsar client memory limit")
Long memory=0L;

public abstract void fillArgumentsFromProperties(Properties prop);

@SneakyThrows
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ public synchronized void setLocalCluster(ClusterData clusterData) {

private PulsarClient createClientInstance(ClusterData clusterData) throws IOException {
ClientBuilder clientBuilder = PulsarClient.builder() //
.memoryLimit(0, SizeUnit.BYTES)
.memoryLimit(rootParams.memory, SizeUnit.BYTES)
.statsInterval(0, TimeUnit.SECONDS) //
.enableTls(config.isTlsEnabled()) //
.allowTlsInsecureConnection(config.isTlsAllowInsecureConnection()) //
Expand Down