Skip to content

Commit

Permalink
Buffer limit (#25)
Browse files Browse the repository at this point in the history
* Create pulsar-functions module (#1)

* Create pulsar-functions module

* rename `sdk` package to `api`

* Added the first cut of the Java interface for Pulsar functions (#2)

* Added a way to limit the number of outstanding tuples read but still not executed
  • Loading branch information
srkukarni authored and sijie committed Mar 4, 2018
1 parent 5f3b93e commit f883f16
Show file tree
Hide file tree
Showing 6 changed files with 11 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ void run() throws Exception {

LimitsConfig limitsConfig = new LimitsConfig(
60000, // 60 seconds
1024 // 1GB
1024, // 1GB
1024 // 1024 outstanding tuples
);

Spawner spawner = Spawner.createSpawner(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,11 @@ class Payload {
private LinkedBlockingQueue<Payload> queue;
private String id;

ThreadFunctionContainer(JavaInstanceConfig instanceConfig,
ThreadFunctionContainer(JavaInstanceConfig instanceConfig, int maxBufferedTuples,
FunctionCacheManager fnCache, ThreadGroup threadGroup) {
this.javaInstanceConfig = instanceConfig;
this.fnCache = fnCache;
this.queue = new LinkedBlockingQueue<>();
this.queue = new LinkedBlockingQueue<>(maxBufferedTuples);
this.id = "fn-" + instanceConfig.getFunctionConfig().getName() + "-instance-" + instanceConfig.getInstanceId();
this.fnThread = new Thread(threadGroup,
new Runnable() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,20 @@ public class ThreadFunctionContainerFactory implements FunctionContainerFactory

private final ThreadGroup threadGroup;
protected final FunctionCacheManager fnCache;
private int maxBufferedTuples;

public ThreadFunctionContainerFactory() {
public ThreadFunctionContainerFactory(int maxBufferedTuples) {
this.fnCache = new FunctionCacheManagerImpl();
this.threadGroup = new ThreadGroup(
"Pulsar Function Container Threads");
this.maxBufferedTuples = maxBufferedTuples;
}

@Override
public ThreadFunctionContainer createContainer(JavaInstanceConfig instanceConfig) {
return new ThreadFunctionContainer(
instanceConfig,
maxBufferedTuples,
fnCache,
threadGroup);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,5 @@
public class LimitsConfig {
private int timeBudgetInMs;
private int maxMemory;
private int maxBufferedTuples;
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public Spawner(LimitsConfig limitsConfig, AssignmentInfo assignmentInfo, String
this.limitsConfig = limitsConfig;
this.assignmentInfo = assignmentInfo;
this.pulsarBrokerRootUrl = pulsarBrokerRootUrl;
this.threadFunctionContainerFactory = new ThreadFunctionContainerFactory();
this.threadFunctionContainerFactory = new ThreadFunctionContainerFactory(limitsConfig.getMaxBufferedTuples());
}

public void start() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public ThreadFunctionContainerTest() {
URL jarUrl = getClass().getClassLoader().getResource("multifunction.jar");
this.jarFiles = Lists.newArrayList(jarUrl.getPath());
this.classpaths = Collections.emptyList();
this.factory = new ThreadFunctionContainerFactory();
this.factory = new ThreadFunctionContainerFactory(1024);
}

@After
Expand Down

0 comments on commit f883f16

Please sign in to comment.