Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][cli] PIP-343: Use picocli instead of jcommander in pulsar-function #22331

Merged
merged 1 commit into from
Mar 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pulsar-functions/instance/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,8 @@
</dependency>

<dependency>
<groupId>com.beust</groupId>
<artifactId>jcommander</artifactId>
<groupId>info.picocli</groupId>
<artifactId>picocli</artifactId>
</dependency>

<dependency>
Expand Down
6 changes: 3 additions & 3 deletions pulsar-functions/localrun-shaded/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@
<include>org.rocksdb:*</include>
<include>org.eclipse.jetty*:*</include>
<include>org.apache.avro:avro</include>
<include>com.beust:*</include>
<include>info.picocli:*</include>
<include>net.jodah:*</include>
<include>io.airlift:*</include>
<include>com.yahoo.datasketches:*</include>
Expand Down Expand Up @@ -385,8 +385,8 @@
<shadedPattern>org.apache.pulsar.shaded.com.yahoo.sketches</shadedPattern>
</relocation>
<relocation>
<pattern>com.beust</pattern>
<shadedPattern>org.apache.pulsar.functions.runtime.shaded.com.beust</shadedPattern>
<pattern>info.picocli</pattern>
<shadedPattern>org.apache.pulsar.functions.runtime.shaded.info.picocli</shadedPattern>
</relocation>
<!-- Netty cannot be shaded, this is causing java.lang.NoSuchMethodError -->
<relocation>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@

import static org.apache.commons.lang3.StringUtils.isNotEmpty;
import static org.apache.pulsar.common.functions.Utils.inferMissingArguments;
import com.beust.jcommander.IStringConverter;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonParser;
Expand Down Expand Up @@ -87,6 +84,10 @@
import org.apache.pulsar.functions.utils.functions.FunctionUtils;
import org.apache.pulsar.functions.utils.io.Connector;
import org.apache.pulsar.functions.utils.io.ConnectorUtils;
import picocli.CommandLine;
import picocli.CommandLine.ITypeConverter;
import picocli.CommandLine.Option;
import picocli.CommandLine.TypeConversionException;

@Slf4j
public class LocalRunner implements AutoCloseable {
Expand Down Expand Up @@ -115,95 +116,95 @@ private static class UserCodeClassLoader {
boolean classLoaderCreated;
}

public static class FunctionConfigConverter implements IStringConverter<FunctionConfig> {
public static class FunctionConfigConverter implements ITypeConverter<FunctionConfig> {
@Override
public FunctionConfig convert(String value) {
try {
return ObjectMapperFactory.getMapper().reader().readValue(value, FunctionConfig.class);
} catch (IOException e) {
throw new RuntimeException("Failed to parse function config:", e);
throw new TypeConversionException(e.getMessage());
}
}
}

public static class SourceConfigConverter implements IStringConverter<SourceConfig> {
public static class SourceConfigConverter implements ITypeConverter<SourceConfig> {
@Override
public SourceConfig convert(String value) {
try {
return ObjectMapperFactory.getMapper().reader().readValue(value, SourceConfig.class);
} catch (IOException e) {
throw new RuntimeException("Failed to parse source config:", e);
throw new TypeConversionException(e.getMessage());
}
}
}

public static class SinkConfigConverter implements IStringConverter<SinkConfig> {
public static class SinkConfigConverter implements ITypeConverter<SinkConfig> {
@Override
public SinkConfig convert(String value) {
try {
return ObjectMapperFactory.getMapper().reader().readValue(value, SinkConfig.class);
} catch (IOException e) {
throw new RuntimeException("Failed to parse sink config:", e);
throw new TypeConversionException(e.getMessage());
}
}
}

public static class RuntimeConverter implements IStringConverter<RuntimeEnv> {
public static class RuntimeConverter implements ITypeConverter<RuntimeEnv> {
@Override
public RuntimeEnv convert(String value) {
return RuntimeEnv.valueOf(value);
}
}

@Parameter(names = "--functionConfig", description = "The json representation of FunctionConfig",
@Option(names = "--functionConfig", description = "The json representation of FunctionConfig",
hidden = true, converter = FunctionConfigConverter.class)
protected FunctionConfig functionConfig;
@Parameter(names = "--sourceConfig", description = "The json representation of SourceConfig",
@Option(names = "--sourceConfig", description = "The json representation of SourceConfig",
hidden = true, converter = SourceConfigConverter.class)
protected SourceConfig sourceConfig;
@Parameter(names = "--sinkConfig", description = "The json representation of SinkConfig",
@Option(names = "--sinkConfig", description = "The json representation of SinkConfig",
hidden = true, converter = SinkConfigConverter.class)
protected SinkConfig sinkConfig;
@Parameter(names = "--stateStorageImplClass", description = "The implemenatation class "
@Option(names = "--stateStorageImplClass", description = "The implemenatation class "
+ "state storage service (by default Apache BookKeeper)", hidden = true, required = false)
protected String stateStorageImplClass;
@Parameter(names = "--stateStorageServiceUrl", description = "The URL for the state storage service "
@Option(names = "--stateStorageServiceUrl", description = "The URL for the state storage service "
+ "(by default Apache BookKeeper)", hidden = true)
protected String stateStorageServiceUrl;
@Parameter(names = "--brokerServiceUrl", description = "The URL for the Pulsar broker", hidden = true)
@Option(names = "--brokerServiceUrl", description = "The URL for the Pulsar broker", hidden = true)
protected String brokerServiceUrl;
@Parameter(names = "--webServiceUrl", description = "The URL for the Pulsar web service", hidden = true)
@Option(names = "--webServiceUrl", description = "The URL for the Pulsar web service", hidden = true)
protected String webServiceUrl = null;
@Parameter(names = "--clientAuthPlugin", description = "Client authentication plugin using which "
@Option(names = "--clientAuthPlugin", description = "Client authentication plugin using which "
+ "function-process can connect to broker", hidden = true)
protected String clientAuthPlugin;
@Parameter(names = "--clientAuthParams", description = "Client authentication param", hidden = true)
@Option(names = "--clientAuthParams", description = "Client authentication param", hidden = true)
protected String clientAuthParams;
@Parameter(names = "--useTls", description = "Use tls connection\n", hidden = true, arity = 1)
@Option(names = "--useTls", description = "Use tls connection\n", hidden = true, arity = "1")
protected boolean useTls;
@Parameter(names = "--tlsAllowInsecureConnection", description = "Allow insecure tls connection\n",
hidden = true, arity = 1)
@Option(names = "--tlsAllowInsecureConnection", description = "Allow insecure tls connection\n",
hidden = true, arity = "1")
protected boolean tlsAllowInsecureConnection;
@Parameter(names = "--tlsHostNameVerificationEnabled", description = "Enable hostname verification", hidden = true
, arity = 1)
@Option(names = "--tlsHostNameVerificationEnabled", description = "Enable hostname verification", hidden = true
, arity = "1")
protected boolean tlsHostNameVerificationEnabled;
@Parameter(names = "--tlsTrustCertFilePath", description = "tls trust cert file path", hidden = true)
@Option(names = "--tlsTrustCertFilePath", description = "tls trust cert file path", hidden = true)
protected String tlsTrustCertFilePath;
@Parameter(names = "--instanceIdOffset", description = "Start the instanceIds from this offset", hidden = true)
@Option(names = "--instanceIdOffset", description = "Start the instanceIds from this offset", hidden = true)
protected int instanceIdOffset = 0;
@Parameter(names = "--runtime", description = "Function runtime to use (Thread/Process)", hidden = true,
@Option(names = "--runtime", description = "Function runtime to use (Thread/Process)", hidden = true,
converter = RuntimeConverter.class)
protected RuntimeEnv runtimeEnv;
@Parameter(names = "--secretsProviderClassName",
@Option(names = "--secretsProviderClassName",
description = "Whats the classname of secrets provider", hidden = true)
protected String secretsProviderClassName;
@Parameter(names = "--secretsProviderConfig",
@Option(names = "--secretsProviderConfig",
description = "Whats the config for the secrets provider", hidden = true)
protected String secretsProviderConfig;
@Parameter(names = "--metricsPortStart", description = "The starting port range for metrics server. When running "
@Option(names = "--metricsPortStart", description = "The starting port range for metrics server. When running "
+ "instances as threads, one metrics server is used to host the stats for all instances.", hidden = true)
protected Integer metricsPortStart;
@Parameter(names = "--exitOnError", description = "The starting port range for metrics server. When running "
@Option(names = "--exitOnError", description = "The starting port range for metrics server. When running "
+ "instances as threads, one metrics server is used to host the stats for all instances.", hidden = true)
protected boolean exitOnError;

Expand All @@ -212,11 +213,10 @@ public RuntimeEnv convert(String value) {

public static void main(String[] args) throws Exception {
LocalRunner localRunner = LocalRunner.builder().build();
JCommander jcommander = new JCommander(localRunner);
jcommander.setProgramName("LocalRunner");
CommandLine jcommander = new CommandLine(localRunner);
jcommander.setCommandName("LocalRunner");

// parse args by JCommander
jcommander.parse(args);
jcommander.parseArgs(args);
try {
localRunner.start(true);
} catch (Exception e) {
Expand Down
4 changes: 2 additions & 2 deletions pulsar-functions/runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@
</dependency>

<dependency>
<groupId>com.beust</groupId>
<artifactId>jcommander</artifactId>
<groupId>info.picocli</groupId>
<artifactId>picocli</artifactId>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@

import static org.apache.pulsar.functions.utils.FunctionCommon.getSinkType;
import static org.apache.pulsar.functions.utils.FunctionCommon.getSourceType;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.converters.StringConverter;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import com.google.protobuf.Empty;
Expand Down Expand Up @@ -59,104 +56,104 @@
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager;
import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManagerImpl;
import picocli.CommandLine;
import picocli.CommandLine.Option;


@Slf4j
public class JavaInstanceStarter implements AutoCloseable {
@Parameter(names = "--function_details", description = "Function details json\n", required = true)
@Option(names = "--function_details", description = "Function details json\n", required = true)
public String functionDetailsJsonString;
@Parameter(
@Option(
names = "--jar",
description = "Path to Jar\n",
listConverter = StringConverter.class)
description = "Path to Jar\n")
public String jarFile;

@Parameter(
@Option(
names = "--transform_function_jar",
description = "Path to Transform Function Jar\n",
listConverter = StringConverter.class)
description = "Path to Transform Function Jar\n")
public String transformFunctionJarFile;

@Parameter(names = "--instance_id", description = "Instance Id\n", required = true)
@Option(names = "--instance_id", description = "Instance Id\n", required = true)
public int instanceId;

@Parameter(names = "--function_id", description = "Function Id\n", required = true)
@Option(names = "--function_id", description = "Function Id\n", required = true)
public String functionId;

@Parameter(names = "--function_version", description = "Function Version\n", required = true)
@Option(names = "--function_version", description = "Function Version\n", required = true)
public String functionVersion;

@Parameter(names = "--pulsar_serviceurl", description = "Pulsar Service Url\n", required = true)
@Option(names = "--pulsar_serviceurl", description = "Pulsar Service Url\n", required = true)
public String pulsarServiceUrl;

@Parameter(names = "--transform_function_id", description = "Transform Function Id\n")
@Option(names = "--transform_function_id", description = "Transform Function Id\n")
public String transformFunctionId;

@Parameter(names = "--client_auth_plugin", description = "Client auth plugin name\n")
@Option(names = "--client_auth_plugin", description = "Client auth plugin name\n")
public String clientAuthenticationPlugin;

@Parameter(names = "--client_auth_params", description = "Client auth param\n")
@Option(names = "--client_auth_params", description = "Client auth param\n")
public String clientAuthenticationParameters;

@Parameter(names = "--use_tls", description = "Use tls connection\n")
@Option(names = "--use_tls", description = "Use tls connection\n")
public String useTls = Boolean.FALSE.toString();

@Parameter(names = "--tls_allow_insecure", description = "Allow insecure tls connection\n")
@Option(names = "--tls_allow_insecure", description = "Allow insecure tls connection\n")
public String tlsAllowInsecureConnection = Boolean.FALSE.toString();

@Parameter(names = "--hostname_verification_enabled", description = "Enable hostname verification")
@Option(names = "--hostname_verification_enabled", description = "Enable hostname verification")
public String tlsHostNameVerificationEnabled = Boolean.FALSE.toString();

@Parameter(names = "--tls_trust_cert_path", description = "tls trust cert file path")
@Option(names = "--tls_trust_cert_path", description = "tls trust cert file path")
public String tlsTrustCertFilePath;

@Parameter(names = "--state_storage_impl_class", description = "State Storage Service "
@Option(names = "--state_storage_impl_class", description = "State Storage Service "
+ "Implementation class\n", required = false)
public String stateStorageImplClass;

@Parameter(names = "--state_storage_serviceurl", description = "State Storage Service Url\n", required = false)
@Option(names = "--state_storage_serviceurl", description = "State Storage Service Url\n", required = false)
public String stateStorageServiceUrl;

@Parameter(names = "--port", description = "Port to listen on\n", required = true)
@Option(names = "--port", description = "Port to listen on\n", required = true)
public int port;

@Parameter(names = "--metrics_port", description = "Port metrics will be exposed on\n", required = true)
@Option(names = "--metrics_port", description = "Port metrics will be exposed on\n", required = true)
public int metricsPort;

@Parameter(names = "--max_buffered_tuples", description = "Maximum number of tuples to buffer\n", required = true)
@Option(names = "--max_buffered_tuples", description = "Maximum number of tuples to buffer\n", required = true)
public int maxBufferedTuples;

@Parameter(names = "--expected_healthcheck_interval", description = "Expected interval in "
@Option(names = "--expected_healthcheck_interval", description = "Expected interval in "
+ "seconds between healtchecks", required = true)
public int expectedHealthCheckInterval;

@Parameter(names = "--secrets_provider", description = "The classname of the secrets provider", required = false)
@Option(names = "--secrets_provider", description = "The classname of the secrets provider", required = false)
public String secretsProviderClassName;

@Parameter(names = "--secrets_provider_config", description = "The config that needs to be "
@Option(names = "--secrets_provider_config", description = "The config that needs to be "
+ "passed to secrets provider", required = false)
public String secretsProviderConfig;

@Parameter(names = "--cluster_name", description = "The name of the cluster this "
@Option(names = "--cluster_name", description = "The name of the cluster this "
+ "instance is running on", required = true)
public String clusterName;

@Parameter(names = "--nar_extraction_directory", description = "The directory where "
@Option(names = "--nar_extraction_directory", description = "The directory where "
+ "extraction of nar packages happen", required = false)
public String narExtractionDirectory = NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR;

@Parameter(names = "--pending_async_requests", description = "Max pending async requests per instance",
@Option(names = "--pending_async_requests", description = "Max pending async requests per instance",
required = false)
public int maxPendingAsyncRequests = 1000;

@Parameter(names = "--web_serviceurl", description = "Pulsar Web Service Url", required = false)
@Option(names = "--web_serviceurl", description = "Pulsar Web Service Url", required = false)
public String webServiceUrl = null;

@Parameter(names = "--expose_pulsaradmin", description = "Whether the pulsar admin client "
@Option(names = "--expose_pulsaradmin", description = "Whether the pulsar admin client "
+ "exposed to function context, default is disabled.", required = false)
public Boolean exposePulsarAdminClientEnabled = false;

@Parameter(names = "--ignore_unknown_config_fields",
@Option(names = "--ignore_unknown_config_fields",
description = "Whether to ignore unknown properties when deserializing the connector configuration.",
required = false)
public Boolean ignoreUnknownConfigFields = false;
Expand All @@ -176,9 +173,8 @@ public void start(String[] args, ClassLoader functionInstanceClassLoader, ClassL
throws Exception {
Thread.currentThread().setContextClassLoader(functionInstanceClassLoader);

JCommander jcommander = new JCommander(this);
// parse args by JCommander
jcommander.parse(args);
CommandLine jcommander = new CommandLine(this);
jcommander.parseArgs(args);

InstanceConfig instanceConfig = new InstanceConfig();
instanceConfig.setFunctionId(functionId);
Expand Down
Loading
Loading