Skip to content

Commit

Permalink
Refactor (apache#18)
Browse files Browse the repository at this point in the history
* Create pulsar-functions module (#1)

* Create pulsar-functions module

* rename `sdk` package to `api`

* Added the first cut of the Java interface for Pulsar functions (#2)

* Refactored serialization/deserialization into an interface
  • Loading branch information
srkukarni authored and sijie committed Mar 4, 2018
1 parent 5b0ed1b commit 52c9c24
Show file tree
Hide file tree
Showing 15 changed files with 212 additions and 106 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ flexible messaging model and an intuitive client API.</description>
<module>tests</module>
<module>pulsar-log4j2-appender</module>
<!-- functions related modules -->
<module>pulsar-functions-parent</module>
<module>pulsar-functions</module>
</modules>

<issueManagement>
Expand Down
2 changes: 1 addition & 1 deletion pulsar-functions/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
<version>1.22.0-incubating-SNAPSHOT</version>
</parent>

<artifactId>pulsar-functions-parent</artifactId>
<artifactId>pulsar-functions</artifactId>
<name>Pulsar Functions :: Parent</name>

<modules>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.functions.fs.FunctionConfig;
import org.apache.pulsar.functions.instance.JavaInstanceConfig;
import org.apache.pulsar.functions.runtime.container.SerDe;
import org.apache.pulsar.functions.spawner.LimitsConfig;
import org.apache.pulsar.functions.spawner.Spawner;

Expand All @@ -57,12 +58,16 @@ class LocalRunner extends CliCommand {
@Parameter(names = "--sink-topic", description = "Output Topic Name\n")
private String sinkTopicName;

@Parameter(names = "--serde-classname", description = "SerDe\n")
private String serDeClassName;

@Parameter(names = "--function-config", description = "Function Config\n")
private String fnConfigFile;

@Override
void run() throws Exception {
FunctionConfig fc;
SerDe serDe = null;
if (null != fnConfigFile) {
fc = FunctionConfig.load(fnConfigFile);
} else {
Expand All @@ -80,6 +85,9 @@ void run() throws Exception {
if (null != className) {
fc.setClassName(className);
}
if (null != serDeClassName) {
serDe = createSerDe(serDeClassName);
}
if (null != jarFiles) {
fc.setJarFiles(jarFiles);
} else {
Expand All @@ -96,6 +104,7 @@ void run() throws Exception {
Spawner spawner = Spawner.createSpawner(
fc,
limitsConfig,
serDe,
admin.getServiceUrl().toString());

spawner.start();
Expand All @@ -104,6 +113,21 @@ void run() throws Exception {

}

SerDe createSerDe(String className) {
SerDe retval;
try {
Class<?> clazz = Class.forName(className);
retval = (SerDe) clazz.newInstance();
} catch (ClassNotFoundException ex) {
throw new RuntimeException(ex + " User class must be in class path.");
} catch (InstantiationException ex) {
throw new RuntimeException(ex + " User class must be concrete.");
} catch (IllegalAccessException ex) {
throw new RuntimeException(ex + " User class must have a no-arg constructor.");
}
return retval;
}

public CmdFunctions(PulsarAdmin admin) {
super("functions", admin);
cmdRunner = new LocalRunner();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,7 @@
package org.apache.pulsar.functions.instance;

import lombok.*;
import org.apache.pulsar.functions.spawner.ExecutionResult;


import java.io.ByteArrayOutputStream;
import java.util.concurrent.TimeoutException;

/**
Expand All @@ -35,14 +32,14 @@
@Getter
@EqualsAndHashCode
@ToString
public class JavaExecutionResult implements ExecutionResult {
public class JavaExecutionResult {
private Exception userException;
private TimeoutException timeoutException;
private byte[] result;
private Object result;

public void reset() {
this.setUserException(null);
this.setTimeoutException(null);
this.setResult((byte[])null);
setUserException(null);
setTimeoutException(null);
setResult(null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,12 @@
import net.jodah.typetools.TypeResolver;
import org.apache.pulsar.functions.api.RawRequestHandler;
import org.apache.pulsar.functions.api.RequestHandler;
import org.apache.pulsar.functions.fs.FunctionConfig;
import org.apache.pulsar.functions.runtime.container.SerDe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.*;
import java.lang.reflect.Type;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
Expand Down Expand Up @@ -57,6 +56,7 @@ enum SupportedTypes {
private RawRequestHandler rawRequestHandler;
private ExecutorService executorService;
private JavaExecutionResult executionResult;
private SerDe serDe;

public static Object createObject(String userClassName) {
Object object;
Expand Down Expand Up @@ -90,6 +90,7 @@ public JavaInstance(JavaInstanceConfig config, Object object) {

executorService = Executors.newFixedThreadPool(1);
this.executionResult = new JavaExecutionResult();
this.serDe = config.getSerDe();
}

private void computeInputAndOutputTypes() {
Expand Down Expand Up @@ -125,26 +126,23 @@ private SupportedTypes computeSupportedType(Type type) {
public JavaExecutionResult handleMessage(String messageId, String topicName, byte[] data) {
context.setCurrentMessageContext(messageId, topicName);
executionResult.reset();
Future<?> future = executorService.submit(new Runnable() {
@Override
public void run() {
if (requestHandler != null) {
try {
Object input = deserialize(data);
Object output = requestHandler.handleRequest(input, context);
executionResult.setResult(serialize(output));
} catch (Exception ex) {
executionResult.setUserException(ex);
}
} else if (rawRequestHandler != null) {
try {
ByteArrayInputStream inputStream = new ByteArrayInputStream(data);
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
rawRequestHandler.handleRequest(inputStream, outputStream, context);
executionResult.setResult(outputStream.toByteArray());
} catch (Exception ex) {
executionResult.setUserException(ex);
}
Future<?> future = executorService.submit(() -> {
if (requestHandler != null) {
try {
Object input = serDe.deserialize(data);
Object output = requestHandler.handleRequest(input, context);
executionResult.setResult(output);
} catch (Exception ex) {
executionResult.setUserException(ex);
}
} else if (rawRequestHandler != null) {
try {
ByteArrayInputStream inputStream = new ByteArrayInputStream(data);
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
rawRequestHandler.handleRequest(inputStream, outputStream, context);
executionResult.setResult(outputStream.toByteArray());
} catch (Exception ex) {
executionResult.setUserException(ex);
}
}
});
Expand All @@ -164,56 +162,4 @@ public void run() {

return executionResult;
}

private byte[] serialize(Object resultValue) {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutput out = null;
try {
out = new ObjectOutputStream(bos);
out.writeObject(resultValue);
out.flush();
return bos.toByteArray();
} catch (Exception ex) {
} finally {
try {
bos.close();
} catch (IOException ex) {
// ignore close exception
}
}
return null;
}

private Object deserialize(byte[] data) throws Exception {
Object obj = null;
ByteArrayInputStream bis = null;
ObjectInputStream ois = null;
try {
bis = new ByteArrayInputStream(data);
ois = new ObjectInputStream(bis);
obj = ois.readObject();
} finally {
if (bis != null) {
bis.close();
}
if (ois != null) {
ois.close();
}
}
switch (inputType) {
case INTEGER:
case LONG:
case DOUBLE:
case FLOAT:
case SHORT:
case BYTE:
case STRING:
case MAP:
case LIST:
return obj;
default: {
throw new RuntimeException("Unknown SupportedType " + inputType);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.pulsar.functions.fs.FunctionConfig;
import org.apache.pulsar.functions.runtime.FunctionID;
import org.apache.pulsar.functions.runtime.InstanceID;
import org.apache.pulsar.functions.runtime.container.SerDe;

/**
* This is the config passed to the Java Instance. Contains all the information
Expand All @@ -41,6 +42,7 @@ public class JavaInstanceConfig {
private FunctionConfig functionConfig;
private FunctionID functionId;
private String functionVersion;
private SerDe serDe;
private int timeBudgetInMs;
private int maxMemory;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.functions.runtime.container;

import lombok.*;
import org.apache.pulsar.functions.instance.JavaExecutionResult;

import java.io.PrintWriter;
import java.io.StringWriter;

/**
* An interface that represents the result of a function call.
*/
@Data
@Setter
@Getter
@EqualsAndHashCode
@ToString
@AllArgsConstructor
public class ExecutionResult {
private String userException;
private boolean timedOut;
private byte[] result;
public static ExecutionResult fromJavaResult(JavaExecutionResult result, SerDe serDe) {
String userException = null;
if (result.getUserException() != null ) {
StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw);
result.getUserException().printStackTrace(pw);
userException = sw.toString();
}
boolean timedOut = result.getTimeoutException() != null;
byte[] output = null;
if (result.getResult() != null) {
output = serDe.serialize(result.getResult());
}
return new ExecutionResult(userException, timedOut, output);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package org.apache.pulsar.functions.runtime.container;

import org.apache.pulsar.functions.fs.FunctionConfig;
import org.apache.pulsar.functions.spawner.ExecutionResult;

import java.util.concurrent.CompletableFuture;

Expand Down
Loading

0 comments on commit 52c9c24

Please sign in to comment.