From 9766676715c7f00d1d4a8073b2deb6861ebe2820 Mon Sep 17 00:00:00 2001 From: Sanjeev Kulkarni Date: Tue, 19 Dec 2017 04:13:12 -0800 Subject: [PATCH] Opt (#22) * Create pulsar-functions module (#1) * Create pulsar-functions module * rename `sdk` package to `api` * Added the first cut of the Java interface for Pulsar functions (#2) * Don't wait for the result. Instead chain it via CompletedFuture --- .../api/examples/ExclamationFunction.java | 1 - .../container/ThreadFunctionContainer.java | 21 +++++++------------ .../subscribermanager/TopicSubscription.java | 1 - 3 files changed, 8 insertions(+), 15 deletions(-) diff --git a/pulsar-functions/api-examples/src/main/java/org/apache/pulsar/functions/api/examples/ExclamationFunction.java b/pulsar-functions/api-examples/src/main/java/org/apache/pulsar/functions/api/examples/ExclamationFunction.java index ea7bfae0e9bbb..1e623b1daf3e5 100644 --- a/pulsar-functions/api-examples/src/main/java/org/apache/pulsar/functions/api/examples/ExclamationFunction.java +++ b/pulsar-functions/api-examples/src/main/java/org/apache/pulsar/functions/api/examples/ExclamationFunction.java @@ -24,7 +24,6 @@ public class ExclamationFunction implements RequestHandler { @Override public String handleRequest(String input, Context context) { - System.out.println("input = " + input); return input + "!"; } } diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/container/ThreadFunctionContainer.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/container/ThreadFunctionContainer.java index 0662282e6d89a..403373b7432b6 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/container/ThreadFunctionContainer.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/container/ThreadFunctionContainer.java @@ -41,10 +41,12 @@ class Payload { public String topicName; public String messageId; public byte[] msgData; + CompletableFuture result; Payload(String topicName, String messageId, byte[] msgData) { this.topicName = topicName; this.messageId = messageId; this.msgData = msgData; + this.result = new CompletableFuture<>(); } } @@ -59,7 +61,6 @@ class Payload { private JavaInstance javaInstance; private final FunctionCacheManager fnCache; private LinkedBlockingQueue queue; - private LinkedBlockingQueue resultQueue; private String id; ThreadFunctionContainer(JavaInstanceConfig instanceConfig, @@ -67,7 +68,6 @@ class Payload { this.javaInstanceConfig = instanceConfig; this.fnCache = fnCache; this.queue = new LinkedBlockingQueue<>(); - this.resultQueue = new LinkedBlockingQueue<>(); this.id = "fn-" + instanceConfig.getFunctionConfig().getName() + "-instance-" + instanceConfig.getInstanceId(); this.fnThread = new Thread(threadGroup, new Runnable() { @@ -81,7 +81,9 @@ public void run() { Payload payload = queue.take(); result = javaInstance.handleMessage(payload.messageId, payload.topicName, payload.msgData); - resultQueue.offer(result); + ExecutionResult actualResult = ExecutionResult.fromJavaResult(result, + javaInstanceConfig.getSerDe()); + payload.result.complete(actualResult); } catch (InterruptedException ie) { log.info("Function thread {} is interrupted", ie); } @@ -147,16 +149,9 @@ public void stop() { @Override public CompletableFuture sendMessage(String topicName, String messageId, byte[] data) { - queue.offer(new Payload(topicName, messageId, data)); - try { - JavaExecutionResult result = resultQueue.take(); - return CompletableFuture.completedFuture(ExecutionResult.fromJavaResult(result, - javaInstanceConfig.getSerDe())); - } catch (InterruptedException e) { - CompletableFuture future = new CompletableFuture<>(); - future.completeExceptionally(e); - return future; - } + Payload payload = new Payload(topicName, messageId, data); + queue.offer(payload); + return payload.result; } @Override diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/subscribermanager/TopicSubscription.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/subscribermanager/TopicSubscription.java index bc9498b26245c..279d916a26650 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/subscribermanager/TopicSubscription.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/subscribermanager/TopicSubscription.java @@ -66,7 +66,6 @@ public void start() throws Exception { if (null == msg) { continue; } - log.info("Received message {}", msg); String messageId = convertMessageIdToString(msg.getMessageId()); for (FunctionContainer subscriber : subscriberMap.values()) { subscriber.sendMessage(topicName, messageId, msg.getData())