Skip to content

Commit

Permalink
Fix compilation (#35)
Browse files Browse the repository at this point in the history
* Create pulsar-functions module (#1)

* Create pulsar-functions module

* rename `sdk` package to `api`

* Added the first cut of the Java interface for Pulsar functions (#2)

* Fix compilation issue
  • Loading branch information
srkukarni authored and sijie committed Mar 4, 2018
1 parent 3227065 commit da66b2d
Show file tree
Hide file tree
Showing 6 changed files with 19 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ void run_functions_cmd() throws Exception {
Spawner spawner = Spawner.createSpawner(
functionConfig,
limitsConfig,
admin.getServiceUrl().toString());
admin.getServiceUrl().toString(), jarFile);

spawner.start();
spawner.join();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public interface FunctionContainerFactory extends AutoCloseable {
* @return function container to start/stop instance
*/
FunctionContainer createContainer(
JavaInstanceConfig instanceConfig);
JavaInstanceConfig instanceConfig, String codeFile);

@Override
void close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,15 @@ class Payload {
private final FunctionCacheManager fnCache;
private LinkedBlockingQueue<Payload> queue;
private String id;
private String jarFile;

ThreadFunctionContainer(JavaInstanceConfig instanceConfig, int maxBufferedTuples,
FunctionCacheManager fnCache, ThreadGroup threadGroup) {
FunctionCacheManager fnCache, ThreadGroup threadGroup, String jarFile) {
this.javaInstanceConfig = instanceConfig;
this.fnCache = fnCache;
this.queue = new LinkedBlockingQueue<>(maxBufferedTuples);
this.id = "fn-" + instanceConfig.getFunctionConfig().getName() + "-instance-" + instanceConfig.getInstanceId();
this.jarFile = jarFile;
this.fnThread = new Thread(threadGroup,
new Runnable() {
@Override
Expand Down Expand Up @@ -111,7 +113,7 @@ public void start() throws Exception {
fnCache.registerFunctionInstance(
javaInstanceConfig.getFunctionId(),
javaInstanceConfig.getInstanceId(),
Arrays.asList(javaInstanceConfig.getFunctionConfig().getCodeFile()),
Arrays.asList(jarFile),
Collections.emptyList());
log.info("Initialize function class loader for function {} at function cache manager",
javaInstanceConfig.getFunctionConfig().getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,13 @@ public ThreadFunctionContainerFactory(int maxBufferedTuples) {
}

@Override
public ThreadFunctionContainer createContainer(JavaInstanceConfig instanceConfig) {
public ThreadFunctionContainer createContainer(JavaInstanceConfig instanceConfig, String jarFile) {
return new ThreadFunctionContainer(
instanceConfig,
maxBufferedTuples,
fnCache,
threadGroup);
threadGroup,
jarFile);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ public class Spawner {

public static Spawner createSpawner(FunctionConfig fnConfig,
LimitsConfig limitsConfig,
String pulsarBrokerRootUrl) {
String pulsarBrokerRootUrl,
String codeFile) {
AssignmentInfo assignmentInfo = new AssignmentInfo(
fnConfig,
new FunctionID(),
Expand All @@ -44,7 +45,8 @@ public static Spawner createSpawner(FunctionConfig fnConfig,
return new Spawner(
limitsConfig,
assignmentInfo,
pulsarBrokerRootUrl);
pulsarBrokerRootUrl,
codeFile);
}

private LimitsConfig limitsConfig;
Expand All @@ -53,17 +55,20 @@ public static Spawner createSpawner(FunctionConfig fnConfig,
private ThreadFunctionContainerFactory threadFunctionContainerFactory;
private FunctionContainer functionContainer;
private SubscriberManager subscriberManager;
private String codeFile;

public Spawner(LimitsConfig limitsConfig, AssignmentInfo assignmentInfo, String pulsarBrokerRootUrl) {
public Spawner(LimitsConfig limitsConfig, AssignmentInfo assignmentInfo, String pulsarBrokerRootUrl,
String codeFile) {
this.limitsConfig = limitsConfig;
this.assignmentInfo = assignmentInfo;
this.pulsarBrokerRootUrl = pulsarBrokerRootUrl;
this.threadFunctionContainerFactory = new ThreadFunctionContainerFactory(limitsConfig.getMaxBufferedTuples());
this.codeFile = codeFile;
}

public void start() throws Exception {
subscriberManager = new SubscriberManager(createSubscriptionName(), pulsarBrokerRootUrl);
functionContainer = threadFunctionContainerFactory.createContainer(createJavaInstanceConfig());
functionContainer = threadFunctionContainerFactory.createContainer(createJavaInstanceConfig(), codeFile);
subscriberManager.addSubscriber(assignmentInfo.getFunctionConfig().getSourceTopic(), functionContainer);
functionContainer.start();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ JavaInstanceConfig createJavaInstanceConfig() {
public void testConstructor() {
JavaInstanceConfig config = createJavaInstanceConfig();

ThreadFunctionContainer container = factory.createContainer(config);
ThreadFunctionContainer container = factory.createContainer(config, jarFile);
assertEquals(
"fn-" + config.getFunctionConfig().getName() + "-instance-" + config.getInstanceId(),
container.getFnThread().getName());
Expand Down

0 comments on commit da66b2d

Please sign in to comment.