Skip to content

Commit

Permalink
Refactor cli (#27)
Browse files Browse the repository at this point in the history
* Create pulsar-functions module (#1)

* Create pulsar-functions module

* rename `sdk` package to `api`

* Added the first cut of the Java interface for Pulsar functions (#2)

* Refactor cli to establish a base FunctionCommand interface that all subcommands can piggyback
  • Loading branch information
srkukarni authored and sijie committed Mar 4, 2018
1 parent 117f816 commit f026ef8
Showing 1 changed file with 33 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,69 +37,77 @@ public class CmdFunctions extends CmdBase {
private final LocalRunner cmdRunner;

@Getter
@Parameters(commandDescription = "Run function locally")
class LocalRunner extends CliCommand {

abstract class FunctionsCommand extends CliCommand {
@Parameter(names = "--name", description = "Function Name\n")
private String name;
protected String name;
@Parameter(names = "--function-classname", description = "Function Class Name\n")
private String className;
protected String className;
@Parameter(
names = "--function-classpath",
description = "Function Classpath\n",
listConverter = StringConverter.class)
private List<String> jarFiles;
names = "--function-classpath",
description = "Function Classpath\n",
listConverter = StringConverter.class)
protected List<String> jarFiles;
@Parameter(names = "--source-topic", description = "Input Topic Name\n")
private String sourceTopicName;
protected String sourceTopicName;
@Parameter(names = "--sink-topic", description = "Output Topic Name\n")
private String sinkTopicName;
protected String sinkTopicName;

@Parameter(names = "--serde-classname", description = "SerDe\n")
private String serDeClassName;
protected String serDeClassName;
protected SerDe serDe;

@Parameter(names = "--function-config", description = "Function Config\n")
private String fnConfigFile;
protected String fnConfigFile;
protected FunctionConfig functionConfig;

@Override
void run() throws Exception {
FunctionConfig fc;
SerDe serDe = null;
if (null != fnConfigFile) {
fc = FunctionConfig.load(fnConfigFile);
functionConfig = FunctionConfig.load(fnConfigFile);
} else {
fc = new FunctionConfig();
functionConfig = new FunctionConfig();
}
if (null != sourceTopicName) {
fc.setSourceTopic(sourceTopicName);
functionConfig.setSourceTopic(sourceTopicName);
}
if (null != sinkTopicName) {
fc.setSinkTopic(sinkTopicName);
functionConfig.setSinkTopic(sinkTopicName);
}
if (null != name) {
fc.setName(name);
functionConfig.setName(name);
}
if (null != className) {
fc.setClassName(className);
functionConfig.setClassName(className);
}
if (null != serDeClassName) {
serDe = createSerDe(serDeClassName);
}
if (null != jarFiles) {
fc.setJarFiles(jarFiles);
functionConfig.setJarFiles(jarFiles);
} else {
fc.setJarFiles(Lists.newArrayList());
functionConfig.setJarFiles(Lists.newArrayList());
}

// Construct the spawner
run_functions_cmd();
}

abstract void run_functions_cmd() throws Exception;
}

@Getter
@Parameters(commandDescription = "Run function locally")
class LocalRunner extends FunctionsCommand {

@Override
void run_functions_cmd() throws Exception {
LimitsConfig limitsConfig = new LimitsConfig(
60000, // 60 seconds
1024, // 1GB
1024 // 1024 outstanding tuples
);

Spawner spawner = Spawner.createSpawner(
fc,
functionConfig,
limitsConfig,
serDe,
admin.getServiceUrl().toString());
Expand Down

0 comments on commit f026ef8

Please sign in to comment.