diff --git a/conf/functions_worker.yml b/conf/functions_worker.yml
index a2071f1c480d2..4ecf1e0fff4f3 100644
--- a/conf/functions_worker.yml
+++ b/conf/functions_worker.yml
@@ -214,3 +214,4 @@ brokerClientTrustCertsFilePath:
########################
connectorsDirectory: ./connectors
+functionsDirectory: ./functions
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionDefinition.java b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionDefinition.java
new file mode 100644
index 0000000000000..44163cbb7580f
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionDefinition.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.functions;
+
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * Basic information about a Pulsar function.
+ */
+@Data
+@NoArgsConstructor
+public class FunctionDefinition {
+
+ /**
+ * The name of the function type.
+ */
+ private String name;
+
+ /**
+ * Description to be used for user help.
+ */
+ private String description;
+
+ /**
+ * The class name for the function implementation.
+ *
+ *
If not defined, it will be assumed this function cannot act as a data.
+ */
+ private String functionClass;
+}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarClassLoader.java b/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarClassLoader.java
index e33cbfa693062..bc6af797e813c 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarClassLoader.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarClassLoader.java
@@ -139,12 +139,12 @@ public class NarClassLoader extends URLClassLoader {
public static NarClassLoader getFromArchive(File narPath, Set additionalJars,
String narExtractionDirectory) throws IOException {
- File unpacked = NarUnpacker.unpackNar(narPath, getNarExtractionDirectory(narExtractionDirectory));
- try {
- return new NarClassLoader(unpacked, additionalJars, NarClassLoader.class.getClassLoader());
- } catch (ClassNotFoundException | NoClassDefFoundError e) {
- throw new IOException(e);
- }
+ return NarClassLoader.getFromArchive(narPath, additionalJars, NarClassLoader.class.getClassLoader(),
+ NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR);
+ }
+
+ public static NarClassLoader getFromArchive(File narPath, Set additionalJars) throws IOException {
+ return NarClassLoader.getFromArchive(narPath, additionalJars, NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR);
}
public static NarClassLoader getFromArchive(File narPath, Set additionalJars, ClassLoader parent,
diff --git a/pulsar-functions/localrun-shaded/pom.xml b/pulsar-functions/localrun-shaded/pom.xml
index fce2f24c7ba93..f27c4b57056fd 100644
--- a/pulsar-functions/localrun-shaded/pom.xml
+++ b/pulsar-functions/localrun-shaded/pom.xml
@@ -20,7 +20,7 @@
-->
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
4.0.0
@@ -71,10 +71,11 @@
-
+
+
com.google
org.apache.pulsar.functions.runtime.shaded.com.google
@@ -179,10 +180,11 @@
org.apache.distributedlog
org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog
-
+
+
org.inferred
org.apache.pulsar.functions.runtime.shaded.org.inferred
@@ -199,10 +201,11 @@
dlshade
org.apache.pulsar.functions.runtime.shaded.dlshade
-
+
+
net.java.dev.jna
org.apache.pulsar.functions.runtime.shaded.net.java.dev.jna
@@ -315,10 +318,11 @@
com.beust
org.apache.pulsar.functions.runtime.shaded.com.beust
-
+
+
org.hamcrest
org.apache.pulsar.functions.runtime.shaded.org.hamcrest
@@ -357,4 +361,4 @@
-
+
\ No newline at end of file
diff --git a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
index 8229de2f6eac7..2eb6c97742b81 100644
--- a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
+++ b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
@@ -44,8 +44,11 @@
import org.apache.pulsar.functions.utils.SinkConfigUtils;
import org.apache.pulsar.functions.utils.SourceConfigUtils;
import org.apache.pulsar.functions.utils.functioncache.FunctionCacheEntry;
+import org.apache.pulsar.functions.utils.functions.FunctionUtils;
+import org.apache.pulsar.functions.utils.functions.Functions;
import org.apache.pulsar.functions.utils.io.ConnectorUtils;
import org.apache.pulsar.functions.utils.io.Connectors;
+import org.apache.pulsar.functions.worker.WorkerConfig;
import java.io.File;
import java.io.IOException;
@@ -212,6 +215,14 @@ public void start(boolean blocking) throws Exception {
.getProtectionDomain().getCodeSource().getLocation().getFile();
}
+ boolean isBuiltin = !org.apache.commons.lang3.StringUtils.isEmpty(functionConfig.getJar()) && functionConfig.getJar().startsWith(org.apache.pulsar.common.functions.Utils.BUILTIN);
+ if(isBuiltin){
+ WorkerConfig workerConfig = WorkerConfig.load(System.getenv("PULSAR_HOME") + "/conf/functions_worker.yml");
+ Functions functions = FunctionUtils.searchForFunctions(System.getenv("PULSAR_HOME") + workerConfig.getFunctionsDirectory().replaceFirst("^.", ""));
+ String functionType = functionConfig.getJar().replaceFirst("^builtin://", "");
+ userCodeFile = functions.getFunctions().get(functionType).toString();
+ }
+
if (org.apache.pulsar.common.functions.Utils.isFunctionPackageUrlSupported(userCodeFile)) {
File file = FunctionCommon.extractFileFromPkgURL(userCodeFile);
classLoader = FunctionConfigUtils.validate(functionConfig, file);
@@ -440,9 +451,10 @@ private String isBuiltInSource(String sourceType) throws IOException {
// Validate the connector source type from the locally available connectors
Connectors connectors = getConnectors();
- if (connectors.getSources().containsKey(sourceType)) {
+ String source = sourceType.replaceFirst("^builtin://", "");
+ if (connectors.getSources().containsKey(source)) {
// Source type is a valid built-in connector type. For local-run we'll fill it up with its own archive path
- return connectors.getSources().get(sourceType).toString();
+ return connectors.getSources().get(source).toString();
} else {
return null;
}
@@ -452,9 +464,10 @@ private String isBuiltInSink(String sinkType) throws IOException {
// Validate the connector source type from the locally available connectors
Connectors connectors = getConnectors();
- if (connectors.getSinks().containsKey(sinkType)) {
+ String sink = sinkType.replaceFirst("^builtin://", "");
+ if (connectors.getSinks().containsKey(sink)) {
// Source type is a valid built-in connector type. For local-run we'll fill it up with its own archive path
- return connectors.getSinks().get(sinkType).toString();
+ return connectors.getSinks().get(sink).toString();
} else {
return null;
}
diff --git a/pulsar-functions/proto/src/main/proto/Function.proto b/pulsar-functions/proto/src/main/proto/Function.proto
index 97215bd185251..f01d63c2a9b88 100644
--- a/pulsar-functions/proto/src/main/proto/Function.proto
+++ b/pulsar-functions/proto/src/main/proto/Function.proto
@@ -81,6 +81,9 @@ message FunctionDetails {
string runtimeFlags = 17;
ComponentType componentType = 18;
string customRuntimeOptions = 19;
+ /* If specified, this will refer to an archive that is
+ * already present in the server */
+ string builtin = 20;
}
message ConsumerSpec {
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
index b1cf7cbd27130..8e256a6ac598f 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
@@ -79,6 +79,8 @@ public class WorkerConfig implements Serializable, PulsarConfiguration {
private static final String CATEGORY_STATE = "State Management";
@Category
private static final String CATEGORY_CONNECTORS = "Connectors";
+ @Category
+ private static final String CATEGORY_FUNCTIONS = "Functions";
@FieldContext(
category = CATEGORY_WORKER,
@@ -145,6 +147,11 @@ public class WorkerConfig implements Serializable, PulsarConfiguration {
doc = "Should we validate connector config during submission"
)
private Boolean validateConnectorConfig = false;
+ @FieldContext(
+ category = CATEGORY_FUNCTIONS,
+ doc = "The path to the location to locate builtin functions"
+ )
+ private String functionsDirectory = "./functions";
@FieldContext(
category = CATEGORY_FUNC_METADATA_MNG,
doc = "The pulsar topic used for storing function metadata"
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
index 4e8f2d9255c20..c6615e11b3fdd 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
@@ -54,6 +54,8 @@ public class FunctionConfigUtils {
public static FunctionDetails convert(FunctionConfig functionConfig, ClassLoader classLoader)
throws IllegalArgumentException {
+
+ boolean isBuiltin = !org.apache.commons.lang3.StringUtils.isEmpty(functionConfig.getJar()) && functionConfig.getJar().startsWith(org.apache.pulsar.common.functions.Utils.BUILTIN);
Class>[] typeArgs = null;
if (functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA) {
@@ -249,6 +251,11 @@ public static FunctionDetails convert(FunctionConfig functionConfig, ClassLoader
functionDetailsBuilder.setCustomRuntimeOptions(functionConfig.getCustomRuntimeOptions());
}
+ if (isBuiltin) {
+ String builtin = functionConfig.getJar().replaceFirst("^builtin://", "");
+ functionDetailsBuilder.setBuiltin(builtin);
+ }
+
return functionDetailsBuilder.build();
}
@@ -596,12 +603,6 @@ private static void doCommonChecks(FunctionConfig functionConfig) {
throw new IllegalArgumentException("Dead Letter Topic specified, however max retries is set to infinity");
}
- if (!isEmpty(functionConfig.getJar()) && !org.apache.pulsar.common.functions.Utils.isFunctionPackageUrlSupported(functionConfig.getJar())
- && functionConfig.getJar().startsWith(BUILTIN)) {
- if (!new File(functionConfig.getJar()).exists()) {
- throw new IllegalArgumentException("The supplied jar file does not exist");
- }
- }
if (!isEmpty(functionConfig.getPy()) && !org.apache.pulsar.common.functions.Utils.isFunctionPackageUrlSupported(functionConfig.getPy())
&& functionConfig.getPy().startsWith(BUILTIN)) {
if (!new File(functionConfig.getPy()).exists()) {
@@ -698,6 +699,10 @@ public static FunctionConfig validateUpdate(FunctionConfig existingConfig, Funct
mergedConfig.setClassName(newConfig.getClassName());
}
+ if (!StringUtils.isEmpty(newConfig.getJar())) {
+ mergedConfig.setJar(newConfig.getJar());
+ }
+
if (newConfig.getInputSpecs() == null) {
newConfig.setInputSpecs(new HashMap<>());
}
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functions/FunctionUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functions/FunctionUtils.java
new file mode 100644
index 0000000000000..15514219389fb
--- /dev/null
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functions/FunctionUtils.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.utils.functions;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Collections;
+
+import lombok.experimental.UtilityClass;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.common.functions.FunctionDefinition;
+import org.apache.pulsar.common.nar.NarClassLoader;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.functions.utils.Exceptions;
+import org.apache.pulsar.functions.api.Function;
+
+
+@UtilityClass
+@Slf4j
+public class FunctionUtils {
+
+ private static final String PULSAR_IO_SERVICE_NAME = "pulsar-io.yaml";
+
+ /**
+ * Extract the Pulsar Function class from a functionctor archive.
+ */
+ public static String getFunctionClass(ClassLoader classLoader) throws IOException {
+ NarClassLoader ncl = (NarClassLoader) classLoader;
+ String configStr = ncl.getServiceDefinition(PULSAR_IO_SERVICE_NAME);
+
+ FunctionDefinition conf = ObjectMapperFactory.getThreadLocalYaml().readValue(configStr,
+ FunctionDefinition.class);
+ if (StringUtils.isEmpty(conf.getFunctionClass())) {
+ throw new IOException(
+ String.format("The '%s' functionctor does not provide a function implementation", conf.getName()));
+ }
+
+ try {
+ // Try to load source class and check it implements Function interface
+ Class functionClass = ncl.loadClass(conf.getFunctionClass());
+ if (!(Function.class.isAssignableFrom(functionClass))) {
+ throw new IOException(
+ "Class " + conf.getFunctionClass() + " does not implement interface " + Function.class.getName());
+ }
+ } catch (Throwable t) {
+ Exceptions.rethrowIOException(t);
+ }
+
+ return conf.getFunctionClass();
+ }
+
+ public static FunctionDefinition getFunctionDefinition(String narPath) throws IOException {
+ try (NarClassLoader ncl = NarClassLoader.getFromArchive(new File(narPath), Collections.emptySet())) {
+ String configStr = ncl.getServiceDefinition(PULSAR_IO_SERVICE_NAME);
+ return ObjectMapperFactory.getThreadLocalYaml().readValue(configStr, FunctionDefinition.class);
+ }
+ }
+
+ public static Functions searchForFunctions(String functionsDirectory) throws IOException {
+ Path path = Paths.get(functionsDirectory).toAbsolutePath();
+ log.info("Searching for functions in {}", path);
+
+ Functions functions = new Functions();
+
+ if (!path.toFile().exists()) {
+ log.warn("Functions archive directory not found");
+ return functions;
+ }
+
+ try (DirectoryStream stream = Files.newDirectoryStream(path, "*.nar")) {
+ for (Path archive : stream) {
+ try {
+ FunctionDefinition cntDef = FunctionUtils.getFunctionDefinition(archive.toString());
+ log.info("Found function {} from {}", cntDef, archive);
+ log.error(cntDef.getName());
+ log.error(cntDef.getFunctionClass());
+ if (!StringUtils.isEmpty(cntDef.getFunctionClass())) {
+ functions.functions.put(cntDef.getName(), archive);
+ }
+
+ functions.functionsDefinitions.add(cntDef);
+ } catch (Throwable t) {
+ log.warn("Failed to load function from {}", archive, t);
+ }
+ }
+ }
+
+ Collections.sort(functions.functionsDefinitions,
+ (c1, c2) -> String.CASE_INSENSITIVE_ORDER.compare(c1.getName(), c2.getName()));
+
+ return functions;
+ }
+}
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functions/Functions.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functions/Functions.java
new file mode 100644
index 0000000000000..a7538a8afa35f
--- /dev/null
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functions/Functions.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.utils.functions;
+
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import lombok.Data;
+
+import org.apache.pulsar.common.functions.FunctionDefinition;
+
+@Data
+public class Functions {
+ final List functionsDefinitions = new ArrayList<>();
+ final Map functions = new TreeMap<>();
+}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
index ad93ba211a3ce..ccf00d5054bd4 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
@@ -78,16 +78,18 @@ public class FunctionActioner {
private final RuntimeFactory runtimeFactory;
private final Namespace dlogNamespace;
private final ConnectorsManager connectorsManager;
+ private final FunctionsManager functionsManager;
private final PulsarAdmin pulsarAdmin;
public FunctionActioner(WorkerConfig workerConfig,
RuntimeFactory runtimeFactory,
Namespace dlogNamespace,
- ConnectorsManager connectorsManager, PulsarAdmin pulsarAdmin) {
+ ConnectorsManager connectorsManager,FunctionsManager functionsManager,PulsarAdmin pulsarAdmin) {
this.workerConfig = workerConfig;
this.runtimeFactory = runtimeFactory;
this.dlogNamespace = dlogNamespace;
this.connectorsManager = connectorsManager;
+ this.functionsManager = functionsManager;
this.pulsarAdmin = pulsarAdmin;
}
@@ -439,6 +441,10 @@ private File getBuiltinArchive(FunctionDetails.Builder functionDetails) throws I
}
}
+ if (!StringUtils.isEmpty(functionDetails.getBuiltin())) {
+ return functionsManager.getFunctionArchive(functionDetails.getBuiltin()).toFile();
+ }
+
throw new IOException("Could not find built in archive definition");
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index 5ac73d96bdb27..075146eee26b5 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -131,7 +131,7 @@ public int size() {
public FunctionRuntimeManager(WorkerConfig workerConfig, WorkerService workerService, Namespace dlogNamespace,
- MembershipManager membershipManager, ConnectorsManager connectorsManager,
+ MembershipManager membershipManager, ConnectorsManager connectorsManager, FunctionsManager functionsManager,
FunctionMetaDataManager functionMetaDataManager) throws Exception {
this.workerConfig = workerConfig;
this.workerService = workerService;
@@ -196,7 +196,7 @@ public FunctionRuntimeManager(WorkerConfig workerConfig, WorkerService workerSer
this.runtimeFactory.initialize(workerConfig, authConfig, secretsProviderConfigurator, functionAuthProvider, runtimeCustomizer);
this.functionActioner = new FunctionActioner(this.workerConfig, runtimeFactory,
- dlogNamespace, connectorsManager, workerService.getBrokerAdmin());
+ dlogNamespace, connectorsManager, functionsManager, workerService.getBrokerAdmin());
this.membershipManager = membershipManager;
this.functionMetaDataManager = functionMetaDataManager;
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsManager.java
new file mode 100644
index 0000000000000..427cc1eb7185c
--- /dev/null
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsManager.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.worker;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.List;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.pulsar.common.functions.FunctionDefinition;
+import org.apache.pulsar.functions.utils.functions.FunctionUtils;
+import org.apache.pulsar.functions.utils.functions.Functions;
+@Slf4j
+public class FunctionsManager {
+
+ private Functions functions;
+
+ public FunctionsManager(WorkerConfig workerConfig) throws IOException {
+ this.functions = FunctionUtils.searchForFunctions(workerConfig.getFunctionsDirectory());
+ }
+
+ public List getFunctions() {
+ return functions.getFunctionsDefinitions();
+ }
+
+ public Path getFunctionArchive(String functionType) {
+ return functions.getFunctions().get(functionType);
+ }
+
+ public void reloadFunctions(WorkerConfig workerConfig) throws IOException {
+ this.functions = FunctionUtils.searchForFunctions(workerConfig.getFunctionsDirectory());
+ }
+}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
index 6fe750046016a..edaade62678ec 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
@@ -66,6 +66,7 @@ public class WorkerService {
private AuthenticationService authenticationService;
private AuthorizationService authorizationService;
private ConnectorsManager connectorsManager;
+ private FunctionsManager functionsManager;
private PulsarAdmin brokerAdmin;
private PulsarAdmin functionAdmin;
private final MetricsGenerator metricsGenerator;
@@ -170,6 +171,7 @@ public void start(URI dlogUri,
this.workerConfig, this.schedulerManager, this.client);
this.connectorsManager = new ConnectorsManager(workerConfig);
+ this.functionsManager = new FunctionsManager(workerConfig);
//create membership manager
String coordinationTopic = workerConfig.getClusterCoordinationTopic();
@@ -180,7 +182,7 @@ public void start(URI dlogUri,
// create function runtime manager
this.functionRuntimeManager = new FunctionRuntimeManager(
- this.workerConfig, this, this.dlogNamespace, this.membershipManager, connectorsManager, functionMetaDataManager);
+ this.workerConfig, this, this.dlogNamespace, this.membershipManager, connectorsManager, functionsManager, functionMetaDataManager);
// Setting references to managers in scheduler
this.schedulerManager.setFunctionMetaDataManager(this.functionMetaDataManager);
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
index a4e46a84e5f81..5cad320eb1853 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
@@ -309,6 +309,10 @@ public static boolean isFunctionCodeBuiltin(Function.FunctionDetailsOrBuilder fu
}
}
+ if (!StringUtils.isEmpty(functionDetails.getBuiltin())) {
+ return true;
+ }
+
return false;
}
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index d806c69a1dee0..3ed7f380cdd58 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -40,6 +40,7 @@
import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.functions.worker.WorkerUtils;
+import org.apache.commons.lang3.StringUtils;
import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
import javax.ws.rs.WebApplicationException;
@@ -48,6 +49,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
+import java.nio.file.Path;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
@@ -651,11 +653,30 @@ private Function.FunctionDetails validateUpdateRequestParams(final String tenant
final File componentPackageFile) throws IOException {
// The rest end points take precedence over whatever is there in function config
+ Path archivePath = null;
functionConfig.setTenant(tenant);
functionConfig.setNamespace(namespace);
functionConfig.setName(componentName);
FunctionConfigUtils.inferMissingArguments(functionConfig);
- ClassLoader clsLoader = FunctionConfigUtils.validate(functionConfig, componentPackageFile);
+
+ if (!StringUtils.isEmpty(functionConfig.getJar())) {
+ String builtinArchive = functionConfig.getJar();
+ if (builtinArchive.startsWith(org.apache.pulsar.common.functions.Utils.BUILTIN)) {
+ builtinArchive = builtinArchive.replaceFirst("^builtin://", "");
+ }
+ try {
+ archivePath = this.worker().getFunctionsManager().getFunctionArchive(builtinArchive);
+ } catch (Exception e) {
+ throw new IllegalArgumentException(String.format("No Function archive %s found", archivePath));
+ }
+ }
+ ClassLoader clsLoader = null;
+ if(archivePath != null){
+ clsLoader = FunctionConfigUtils.validate(functionConfig, archivePath.toFile());
+ }
+ else{
+ clsLoader = FunctionConfigUtils.validate(functionConfig, componentPackageFile);
+ }
return FunctionConfigUtils.convert(functionConfig, clsLoader);
}
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java
index e066404d28c4b..dc49036a5bebb 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java
@@ -78,7 +78,7 @@ public void testStartFunctionWithDLNamespace() throws Exception {
@SuppressWarnings("resource")
FunctionActioner actioner = new FunctionActioner(workerConfig, factory, dlogNamespace,
- new ConnectorsManager(workerConfig), mock(PulsarAdmin.class));
+ new ConnectorsManager(workerConfig), new FunctionsManager(workerConfig), mock(PulsarAdmin.class));
Runtime runtime = mock(Runtime.class);
Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder()
.setFunctionDetails(Function.FunctionDetails.newBuilder().setTenant("test-tenant")
@@ -124,7 +124,7 @@ public void testStartFunctionWithPkgUrl() throws Exception {
@SuppressWarnings("resource")
FunctionActioner actioner = new FunctionActioner(workerConfig, factory, dlogNamespace,
- new ConnectorsManager(workerConfig), mock(PulsarAdmin.class));
+ new ConnectorsManager(workerConfig), new FunctionsManager(workerConfig), mock(PulsarAdmin.class));
// (1) test with file url. functionActioner should be able to consider file-url and it should be able to call
// RuntimeSpawner
@@ -186,7 +186,7 @@ public void testFunctionAuthDisabled() throws Exception {
@SuppressWarnings("resource")
FunctionActioner actioner = new FunctionActioner(workerConfig, factory, dlogNamespace,
- new ConnectorsManager(workerConfig), mock(PulsarAdmin.class));
+ new ConnectorsManager(workerConfig), new FunctionsManager(workerConfig), mock(PulsarAdmin.class));
String pkgPathLocation = "http://invalid/my-file.jar";
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
index a6569b337996a..2e6ed5bb5de5b 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
@@ -101,6 +101,7 @@ public void testProcessAssignmentUpdateAddFunctions() throws Exception {
mock(Namespace.class),
mock(MembershipManager.class),
mock(ConnectorsManager.class),
+ mock(FunctionsManager.class),
mock(FunctionMetaDataManager.class)));
FunctionActioner functionActioner = spy(functionRuntimeManager.getFunctionActioner());
doNothing().when(functionActioner).startFunction(any(FunctionRuntimeInfo.class));
@@ -183,6 +184,7 @@ public void testProcessAssignmentUpdateDeleteFunctions() throws Exception {
mock(Namespace.class),
mock(MembershipManager.class),
mock(ConnectorsManager.class),
+ mock(FunctionsManager.class),
mock(FunctionMetaDataManager.class)));
FunctionActioner functionActioner = spy(functionRuntimeManager.getFunctionActioner());
doNothing().when(functionActioner).startFunction(any(FunctionRuntimeInfo.class));
@@ -268,6 +270,7 @@ public void testProcessAssignmentUpdateModifyFunctions() throws Exception {
mock(Namespace.class),
mock(MembershipManager.class),
mock(ConnectorsManager.class),
+ mock(FunctionsManager.class),
mock(FunctionMetaDataManager.class));
FunctionActioner functionActioner = spy(functionRuntimeManager.getFunctionActioner());
doNothing().when(functionActioner).startFunction(any(FunctionRuntimeInfo.class));
@@ -397,6 +400,7 @@ public void testReassignment() throws Exception {
mock(Namespace.class),
mock(MembershipManager.class),
mock(ConnectorsManager.class),
+ mock(FunctionsManager.class),
mock(FunctionMetaDataManager.class));
FunctionActioner functionActioner = spy(functionRuntimeManager.getFunctionActioner());
doNothing().when(functionActioner).startFunction(any(FunctionRuntimeInfo.class));
@@ -574,6 +578,7 @@ public Boolean answer(InvocationOnMock invocationOnMock) throws Throwable {
mock(Namespace.class),
mock(MembershipManager.class),
mock(ConnectorsManager.class),
+ mock(FunctionsManager.class),
mock(FunctionMetaDataManager.class));
FunctionActioner functionActioner = spy(functionRuntimeManager.getFunctionActioner());
doNothing().when(functionActioner).startFunction(any(FunctionRuntimeInfo.class));
@@ -632,7 +637,7 @@ public void testExternallyManagedRuntimeUpdate() throws Exception {
FunctionActioner functionActioner = spy(new FunctionActioner(
workerConfig,
- kubernetesRuntimeFactory, null, null, null));
+ kubernetesRuntimeFactory, null, null, null, null));
// test new assignment update functions
FunctionRuntimeManager functionRuntimeManager = new FunctionRuntimeManager(
@@ -641,6 +646,7 @@ public void testExternallyManagedRuntimeUpdate() throws Exception {
mock(Namespace.class),
mock(MembershipManager.class),
mock(ConnectorsManager.class),
+ mock(FunctionsManager.class),
mock(FunctionMetaDataManager.class));
functionRuntimeManager.setFunctionActioner(functionActioner);
@@ -744,6 +750,7 @@ public void testFunctionRuntimeSetCorrectly() {
mock(Namespace.class),
mock(MembershipManager.class),
mock(ConnectorsManager.class),
+ mock(FunctionsManager.class),
mock(FunctionMetaDataManager.class));
fail();
@@ -767,6 +774,7 @@ public void testFunctionRuntimeSetCorrectly() {
mock(Namespace.class),
mock(MembershipManager.class),
mock(ConnectorsManager.class),
+ mock(FunctionsManager.class),
mock(FunctionMetaDataManager.class));
fail();
@@ -790,6 +798,7 @@ public void testFunctionRuntimeSetCorrectly() {
mock(Namespace.class),
mock(MembershipManager.class),
mock(ConnectorsManager.class),
+ mock(FunctionsManager.class),
mock(FunctionMetaDataManager.class));
fail();
@@ -813,6 +822,7 @@ public void testFunctionRuntimeSetCorrectly() {
mock(Namespace.class),
mock(MembershipManager.class),
mock(ConnectorsManager.class),
+ mock(FunctionsManager.class),
mock(FunctionMetaDataManager.class));
assertEquals(functionRuntimeManager.getRuntimeFactory().getClass(), ThreadRuntimeFactory.class);
@@ -841,6 +851,7 @@ public void testFunctionRuntimeFactoryConfigsBackwardsCompatibility() throws Exc
mock(Namespace.class),
mock(MembershipManager.class),
mock(ConnectorsManager.class),
+ mock(FunctionsManager.class),
mock(FunctionMetaDataManager.class));
assertEquals(functionRuntimeManager.getRuntimeFactory().getClass(), KubernetesRuntimeFactory.class);
@@ -868,6 +879,7 @@ public void testFunctionRuntimeFactoryConfigsBackwardsCompatibility() throws Exc
mock(Namespace.class),
mock(MembershipManager.class),
mock(ConnectorsManager.class),
+ mock(FunctionsManager.class),
mock(FunctionMetaDataManager.class));
assertEquals(functionRuntimeManager.getRuntimeFactory().getClass(), ProcessRuntimeFactory.class);
@@ -891,6 +903,7 @@ public void testFunctionRuntimeFactoryConfigsBackwardsCompatibility() throws Exc
mock(Namespace.class),
mock(MembershipManager.class),
mock(ConnectorsManager.class),
+ mock(FunctionsManager.class),
mock(FunctionMetaDataManager.class));
assertEquals(functionRuntimeManager.getRuntimeFactory().getClass(), ThreadRuntimeFactory.class);
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
index 2d0460ba88375..14d104424fe39 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
@@ -154,6 +154,7 @@ public void testCheckFailuresNoFailures() throws Exception {
mock(Namespace.class),
mock(MembershipManager.class),
mock(ConnectorsManager.class),
+ mock(FunctionsManager.class),
functionMetaDataManager));
MembershipManager membershipManager = spy(new MembershipManager(workerService, pulsarClient, pulsarAdmin));
@@ -226,6 +227,7 @@ public void testCheckFailuresSomeFailures() throws Exception {
mock(Namespace.class),
mock(MembershipManager.class),
mock(ConnectorsManager.class),
+ mock(FunctionsManager.class),
functionMetaDataManager));
MembershipManager membershipManager = spy(new MembershipManager(workerService, mockPulsarClient(), pulsarAdmin));
@@ -313,6 +315,7 @@ public void testCheckFailuresSomeUnassigned() throws Exception {
mock(Namespace.class),
mock(MembershipManager.class),
mock(ConnectorsManager.class),
+ mock(FunctionsManager.class),
functionMetaDataManager));
MembershipManager membershipManager = spy(new MembershipManager(workerService, mockPulsarClient(), pulsarAdmin));
@@ -392,6 +395,7 @@ public void testHeartBeatFunctionWorkerDown() throws Exception {
mock(Namespace.class),
mock(MembershipManager.class),
mock(ConnectorsManager.class),
+ mock(FunctionsManager.class),
functionMetaDataManager));
MembershipManager membershipManager = spy(new MembershipManager(workerService, mockPulsarClient(), pulsarAdmin));