Skip to content

Commit

Permalink
Made use of LimitsConfig in JavaInstanceConfig (apache#92)
Browse files Browse the repository at this point in the history
* Made use of LimitsConfig in JavaInstanceConfig

* Add uncommited file
  • Loading branch information
srkukarni authored and sijie committed Mar 4, 2018
1 parent 71997bb commit 493e54a
Show file tree
Hide file tree
Showing 16 changed files with 31 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,12 @@
import lombok.Getter;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarFunctionsAdmin;
import org.apache.pulsar.client.api.ClientConfiguration;
import org.apache.pulsar.functions.annotation.Annotations;
import org.apache.pulsar.functions.api.RequestHandler;
import org.apache.pulsar.functions.fs.FunctionConfig;
import org.apache.pulsar.functions.runtime.container.ThreadFunctionContainerFactory;
import org.apache.pulsar.functions.runtime.serde.SerDe;
import org.apache.pulsar.functions.runtime.spawner.LimitsConfig;
import org.apache.pulsar.functions.fs.LimitsConfig;
import org.apache.pulsar.functions.runtime.spawner.Spawner;
import org.apache.pulsar.functions.utils.Reflections;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.functions.runtime.spawner;
package org.apache.pulsar.functions.fs;

import java.io.File;
import java.io.IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,12 @@ public String getFunctionVersion() {

@Override
public long getMemoryLimit() {
return config.getMaxMemory();
return config.getLimitsConfig().getMaxMemoryMb() * 1024 * 1024;
}

@Override
public long getTimeBudgetInMs() {
return config.getTimeBudgetInMs();
return config.getLimitsConfig().getMaxTimeMs();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public JavaInstance(JavaInstanceConfig config, ClassLoader clsLoader) {
throw new RuntimeException("User class must be either a Request or Raw Request Handler");
}

if (config.getTimeBudgetInMs() > 0) {
if (config.getLimitsConfig().getMaxTimeMs() > 0) {
log.info("Spinning up a executor service since time budget is infinite");
executorService = Executors.newFixedThreadPool(1);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.pulsar.functions.fs.FunctionConfig;
import org.apache.pulsar.functions.runtime.FunctionID;
import org.apache.pulsar.functions.runtime.InstanceID;
import org.apache.pulsar.functions.fs.LimitsConfig;

/**
* This is the config passed to the Java Instance. Contains all the information
Expand All @@ -38,9 +39,8 @@
@ToString
public class JavaInstanceConfig {
private InstanceID instanceId;
private FunctionConfig functionConfig;
private FunctionID functionId;
private String functionVersion;
private int timeBudgetInMs;
private int maxMemory;
private FunctionConfig functionConfig;
private LimitsConfig limitsConfig;
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.functions.fs.FunctionConfig;
import org.apache.pulsar.functions.fs.FunctionStatus;
import org.apache.pulsar.functions.fs.LimitsConfig;
import org.apache.pulsar.functions.runtime.container.FunctionContainerFactory;
import org.apache.pulsar.functions.runtime.instance.JavaInstanceConfig;
import org.apache.pulsar.functions.runtime.FunctionID;
Expand Down Expand Up @@ -98,8 +99,7 @@ private JavaInstanceConfig createJavaInstanceConfig() {
javaInstanceConfig.setFunctionConfig(assignmentInfo.getFunctionConfig());
javaInstanceConfig.setFunctionId(assignmentInfo.getFunctionId());
javaInstanceConfig.setFunctionVersion(assignmentInfo.getFunctionVersion());
javaInstanceConfig.setTimeBudgetInMs(limitsConfig.getMaxTimeMs());
javaInstanceConfig.setMaxMemory(limitsConfig.getMaxMemoryMb());
javaInstanceConfig.setLimitsConfig(limitsConfig);
return javaInstanceConfig;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.pulsar.client.api.ClientConfiguration;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.functions.fs.FunctionConfig;
import org.apache.pulsar.functions.fs.LimitsConfig;
import org.apache.pulsar.functions.runtime.instance.JavaInstanceConfig;
import org.apache.pulsar.functions.runtime.FunctionID;
import org.apache.pulsar.functions.runtime.InstanceID;
Expand Down Expand Up @@ -81,8 +82,10 @@ JavaInstanceConfig createJavaInstanceConfig() {
config.setFunctionId(new FunctionID());
config.setFunctionVersion("1.0");
config.setInstanceId(new InstanceID());
config.setMaxMemory(2048);
config.setTimeBudgetInMs(2000);
LimitsConfig limitsConfig = new LimitsConfig();
limitsConfig.setMaxTimeMs(2000);
limitsConfig.setMaxMemoryMb(2048);
config.setLimitsConfig(limitsConfig);

return config;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.RequestHandler;
import org.apache.pulsar.functions.fs.FunctionConfig;
import org.apache.pulsar.functions.fs.LimitsConfig;
import org.apache.pulsar.functions.runtime.serde.JavaSerDe;
import org.apache.pulsar.functions.runtime.serde.Utf8StringSerDe;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -82,10 +83,12 @@ public Void handleRequest(String input, Context context) throws Exception {

private static JavaInstanceConfig createInstanceConfig() {
FunctionConfig functionConfig = new FunctionConfig();
LimitsConfig limitsConfig = new LimitsConfig();
functionConfig.setInputSerdeClassName(Utf8StringSerDe.class.getName());
functionConfig.setOutputSerdeClassName(Utf8StringSerDe.class.getName());
JavaInstanceConfig instanceConfig = new JavaInstanceConfig();
instanceConfig.setFunctionConfig(functionConfig);
instanceConfig.setLimitsConfig(limitsConfig);
return instanceConfig;
}

Expand All @@ -96,7 +99,7 @@ private static JavaInstanceConfig createInstanceConfig() {
@Test
public void testLongRunningFunction() throws Exception {
JavaInstanceConfig config = createInstanceConfig();
config.setTimeBudgetInMs(2000);
config.getLimitsConfig().setMaxTimeMs(2000);
JavaInstance instance = new JavaInstance(
config, new LongRunningHandler(), Thread.currentThread().getContextClassLoader());
String testString = "ABC123";
Expand All @@ -113,7 +116,7 @@ public void testLongRunningFunction() throws Exception {
@Test
public void testLambda() {
JavaInstanceConfig config = createInstanceConfig();
config.setTimeBudgetInMs(2000);
config.getLimitsConfig().setMaxTimeMs(2000);
JavaInstance instance = new JavaInstance(
config,
(RequestHandler<String, String>) (input, context) -> input + "-lambda",
Expand Down Expand Up @@ -165,7 +168,7 @@ public void testVoidInputClasses() {
@Test
public void testVoidOutputClasses() {
JavaInstanceConfig config = createInstanceConfig();
config.setTimeBudgetInMs(2000);
config.getLimitsConfig().setMaxTimeMs(2000);
JavaInstance instance = new JavaInstance(
config, new VoidOutputHandler(), Thread.currentThread().getContextClassLoader());
String testString = "ABC123";
Expand All @@ -181,7 +184,7 @@ public void testVoidOutputClasses() {
@Test
public void testInconsistentInputType() {
JavaInstanceConfig config = createInstanceConfig();
config.setTimeBudgetInMs(2000);
config.getLimitsConfig().setMaxTimeMs(2000);
config.getFunctionConfig().setInputSerdeClassName(JavaSerDe.class.getName());

try {
Expand All @@ -201,7 +204,7 @@ public void testInconsistentInputType() {
@Test
public void testInconsistentOutputType() {
JavaInstanceConfig config = createInstanceConfig();
config.setTimeBudgetInMs(2000);
config.getLimitsConfig().setMaxTimeMs(2000);
config.getFunctionConfig().setOutputSerdeClassName(JavaSerDe.class.getName());

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.apache.commons.lang.StringUtils;
import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.pulsar.functions.runtime.container.FunctionContainerFactory;
import org.apache.pulsar.functions.runtime.spawner.LimitsConfig;
import org.apache.pulsar.functions.fs.LimitsConfig;
import org.apache.pulsar.functions.runtime.spawner.Spawner;

import java.io.File;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import lombok.ToString;
import lombok.experimental.Accessors;
import org.apache.pulsar.functions.fs.FunctionConfig;
import org.apache.pulsar.functions.runtime.spawner.LimitsConfig;
import org.apache.pulsar.functions.fs.LimitsConfig;

@Data
@Setter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.functions.runtime.spawner.LimitsConfig;
import org.apache.pulsar.functions.fs.LimitsConfig;
import org.slf4j.bridge.SLF4JBridgeHandler;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,14 @@
import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.distributedlog.api.namespace.NamespaceBuilder;
import org.apache.pulsar.client.api.ClientConfiguration;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderConfiguration;
import org.apache.pulsar.functions.runtime.container.FunctionContainerFactory;
import org.apache.pulsar.functions.runtime.container.ThreadFunctionContainerFactory;
import org.apache.pulsar.functions.runtime.spawner.LimitsConfig;
import org.apache.pulsar.functions.fs.LimitsConfig;
import org.apache.pulsar.functions.worker.request.ServiceRequestManager;
import org.apache.pulsar.functions.worker.rest.WorkerServer;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import lombok.Setter;
import lombok.ToString;
import lombok.experimental.Accessors;
import org.apache.pulsar.functions.runtime.spawner.LimitsConfig;
import org.apache.pulsar.functions.fs.LimitsConfig;

@Data
@Setter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.pulsar.functions.annotation.Annotations;
import org.apache.pulsar.functions.fs.FunctionConfig;
import org.apache.pulsar.functions.fs.FunctionStatus;
import org.apache.pulsar.functions.runtime.spawner.LimitsConfig;
import org.apache.pulsar.functions.fs.LimitsConfig;
import org.apache.pulsar.functions.runtime.spawner.Spawner;
import org.apache.pulsar.functions.worker.FunctionMetaData;
import org.apache.pulsar.functions.worker.FunctionRuntimeInfo;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.pulsar.functions.worker;

import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.functions.runtime.spawner.LimitsConfig;
import org.apache.pulsar.functions.fs.LimitsConfig;

import java.net.URISyntaxException;
import org.testng.annotations.Test;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
import org.apache.pulsar.functions.fs.FunctionConfig;
import org.apache.pulsar.functions.fs.FunctionConfig.ProcessingGuarantees;
import org.apache.pulsar.functions.runtime.serde.Utf8StringSerDe;
import org.apache.pulsar.functions.runtime.spawner.LimitsConfig;
import org.apache.pulsar.functions.fs.LimitsConfig;
import org.apache.pulsar.functions.worker.FunctionMetaData;
import org.apache.pulsar.functions.worker.FunctionRuntimeInfo;
import org.apache.pulsar.functions.worker.PackageLocationMetaData;
Expand Down

0 comments on commit 493e54a

Please sign in to comment.