diff --git a/conf/functions_worker.yml b/conf/functions_worker.yml index a2071f1c480d2c..4ecf1e0fff4f3c 100644 --- a/conf/functions_worker.yml +++ b/conf/functions_worker.yml @@ -214,3 +214,4 @@ brokerClientTrustCertsFilePath: ######################## connectorsDirectory: ./connectors +functionsDirectory: ./functions diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionDefinition.java b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionDefinition.java new file mode 100644 index 00000000000000..44163cbb7580f1 --- /dev/null +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionDefinition.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.functions; + +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * Basic information about a Pulsar function. + */ +@Data +@NoArgsConstructor +public class FunctionDefinition { + + /** + * The name of the function type. + */ + private String name; + + /** + * Description to be used for user help. + */ + private String description; + + /** + * The class name for the function implementation. + * + *

If not defined, it will be assumed this function cannot act as a data. + */ + private String functionClass; +} diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarClassLoader.java b/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarClassLoader.java index e33cbfa6930629..bc6af797e813c3 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarClassLoader.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarClassLoader.java @@ -139,12 +139,12 @@ public class NarClassLoader extends URLClassLoader { public static NarClassLoader getFromArchive(File narPath, Set additionalJars, String narExtractionDirectory) throws IOException { - File unpacked = NarUnpacker.unpackNar(narPath, getNarExtractionDirectory(narExtractionDirectory)); - try { - return new NarClassLoader(unpacked, additionalJars, NarClassLoader.class.getClassLoader()); - } catch (ClassNotFoundException | NoClassDefFoundError e) { - throw new IOException(e); - } + return NarClassLoader.getFromArchive(narPath, additionalJars, NarClassLoader.class.getClassLoader(), + NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR); + } + + public static NarClassLoader getFromArchive(File narPath, Set additionalJars) throws IOException { + return NarClassLoader.getFromArchive(narPath, additionalJars, NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR); } public static NarClassLoader getFromArchive(File narPath, Set additionalJars, ClassLoader parent, diff --git a/pulsar-functions/proto/src/main/proto/Function.proto b/pulsar-functions/proto/src/main/proto/Function.proto index d2ad02859c7fde..c91b6577059dce 100644 --- a/pulsar-functions/proto/src/main/proto/Function.proto +++ b/pulsar-functions/proto/src/main/proto/Function.proto @@ -81,6 +81,9 @@ message FunctionDetails { string runtimeFlags = 17; ComponentType componentType = 18; string customRuntimeOptions = 19; + /* If specified, this will refer to an archive that is + * already present in the server */ + string builtin = 20; } message ConsumerSpec { diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java index 4fd21f024aa6e5..0e526ea22e4165 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java @@ -79,6 +79,8 @@ public class WorkerConfig implements Serializable, PulsarConfiguration { private static final String CATEGORY_STATE = "State Management"; @Category private static final String CATEGORY_CONNECTORS = "Connectors"; + @Category + private static final String CATEGORY_FUNCTIONS = "Functions"; @FieldContext( category = CATEGORY_WORKER, @@ -140,6 +142,11 @@ public class WorkerConfig implements Serializable, PulsarConfiguration { doc = "The directory where nar packages are extractors" ) private String narExtractionDirectory = NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR; + @FieldContext( + category = CATEGORY_FUNCTIONS, + doc = "The path to the location to locate builtin functions" + ) + private String functionsDirectory = "./functions"; @FieldContext( category = CATEGORY_FUNC_METADATA_MNG, doc = "The pulsar topic used for storing function metadata" diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java index 9a945456ff98a5..3b1f7f48358790 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java @@ -54,6 +54,8 @@ public class FunctionConfigUtils { public static FunctionDetails convert(FunctionConfig functionConfig, ClassLoader classLoader) throws IllegalArgumentException { + + boolean isBuiltin = !org.apache.commons.lang3.StringUtils.isEmpty(functionConfig.getJar()) && functionConfig.getJar().startsWith(org.apache.pulsar.common.functions.Utils.BUILTIN); Class[] typeArgs = null; if (functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA) { @@ -249,6 +251,11 @@ public static FunctionDetails convert(FunctionConfig functionConfig, ClassLoader functionDetailsBuilder.setCustomRuntimeOptions(functionConfig.getCustomRuntimeOptions()); } + if (isBuiltin) { + String builtin = functionConfig.getJar().replaceFirst("^builtin://", ""); + functionDetailsBuilder.setBuiltin(builtin); + } + return functionDetailsBuilder.build(); } @@ -596,12 +603,6 @@ private static void doCommonChecks(FunctionConfig functionConfig) { throw new IllegalArgumentException("Dead Letter Topic specified, however max retries is set to infinity"); } - if (!isEmpty(functionConfig.getJar()) && !org.apache.pulsar.common.functions.Utils.isFunctionPackageUrlSupported(functionConfig.getJar()) - && functionConfig.getJar().startsWith(BUILTIN)) { - if (!new File(functionConfig.getJar()).exists()) { - throw new IllegalArgumentException("The supplied jar file does not exist"); - } - } if (!isEmpty(functionConfig.getPy()) && !org.apache.pulsar.common.functions.Utils.isFunctionPackageUrlSupported(functionConfig.getPy()) && functionConfig.getPy().startsWith(BUILTIN)) { if (!new File(functionConfig.getPy()).exists()) { diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functions/FunctionUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functions/FunctionUtils.java new file mode 100644 index 00000000000000..15514219389fbd --- /dev/null +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functions/FunctionUtils.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.utils.functions; + +import java.io.File; +import java.io.IOException; +import java.nio.file.DirectoryStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Collections; + +import lombok.experimental.UtilityClass; +import lombok.extern.slf4j.Slf4j; + +import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.common.functions.FunctionDefinition; +import org.apache.pulsar.common.nar.NarClassLoader; +import org.apache.pulsar.common.util.ObjectMapperFactory; +import org.apache.pulsar.functions.utils.Exceptions; +import org.apache.pulsar.functions.api.Function; + + +@UtilityClass +@Slf4j +public class FunctionUtils { + + private static final String PULSAR_IO_SERVICE_NAME = "pulsar-io.yaml"; + + /** + * Extract the Pulsar Function class from a functionctor archive. + */ + public static String getFunctionClass(ClassLoader classLoader) throws IOException { + NarClassLoader ncl = (NarClassLoader) classLoader; + String configStr = ncl.getServiceDefinition(PULSAR_IO_SERVICE_NAME); + + FunctionDefinition conf = ObjectMapperFactory.getThreadLocalYaml().readValue(configStr, + FunctionDefinition.class); + if (StringUtils.isEmpty(conf.getFunctionClass())) { + throw new IOException( + String.format("The '%s' functionctor does not provide a function implementation", conf.getName())); + } + + try { + // Try to load source class and check it implements Function interface + Class functionClass = ncl.loadClass(conf.getFunctionClass()); + if (!(Function.class.isAssignableFrom(functionClass))) { + throw new IOException( + "Class " + conf.getFunctionClass() + " does not implement interface " + Function.class.getName()); + } + } catch (Throwable t) { + Exceptions.rethrowIOException(t); + } + + return conf.getFunctionClass(); + } + + public static FunctionDefinition getFunctionDefinition(String narPath) throws IOException { + try (NarClassLoader ncl = NarClassLoader.getFromArchive(new File(narPath), Collections.emptySet())) { + String configStr = ncl.getServiceDefinition(PULSAR_IO_SERVICE_NAME); + return ObjectMapperFactory.getThreadLocalYaml().readValue(configStr, FunctionDefinition.class); + } + } + + public static Functions searchForFunctions(String functionsDirectory) throws IOException { + Path path = Paths.get(functionsDirectory).toAbsolutePath(); + log.info("Searching for functions in {}", path); + + Functions functions = new Functions(); + + if (!path.toFile().exists()) { + log.warn("Functions archive directory not found"); + return functions; + } + + try (DirectoryStream stream = Files.newDirectoryStream(path, "*.nar")) { + for (Path archive : stream) { + try { + FunctionDefinition cntDef = FunctionUtils.getFunctionDefinition(archive.toString()); + log.info("Found function {} from {}", cntDef, archive); + log.error(cntDef.getName()); + log.error(cntDef.getFunctionClass()); + if (!StringUtils.isEmpty(cntDef.getFunctionClass())) { + functions.functions.put(cntDef.getName(), archive); + } + + functions.functionsDefinitions.add(cntDef); + } catch (Throwable t) { + log.warn("Failed to load function from {}", archive, t); + } + } + } + + Collections.sort(functions.functionsDefinitions, + (c1, c2) -> String.CASE_INSENSITIVE_ORDER.compare(c1.getName(), c2.getName())); + + return functions; + } +} diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functions/Functions.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functions/Functions.java new file mode 100644 index 00000000000000..a7538a8afa35fd --- /dev/null +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functions/Functions.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.utils.functions; + +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +import lombok.Data; + +import org.apache.pulsar.common.functions.FunctionDefinition; + +@Data +public class Functions { + final List functionsDefinitions = new ArrayList<>(); + final Map functions = new TreeMap<>(); +} diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java index 0a9575568a9c8e..846f655a0a3a1e 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java @@ -77,16 +77,18 @@ public class FunctionActioner { private final RuntimeFactory runtimeFactory; private final Namespace dlogNamespace; private final ConnectorsManager connectorsManager; + private final FunctionsManager functionsManager; private final PulsarAdmin pulsarAdmin; public FunctionActioner(WorkerConfig workerConfig, RuntimeFactory runtimeFactory, Namespace dlogNamespace, - ConnectorsManager connectorsManager, PulsarAdmin pulsarAdmin) { + ConnectorsManager connectorsManager,FunctionsManager functionsManager,PulsarAdmin pulsarAdmin) { this.workerConfig = workerConfig; this.runtimeFactory = runtimeFactory; this.dlogNamespace = dlogNamespace; this.connectorsManager = connectorsManager; + this.functionsManager = functionsManager; this.pulsarAdmin = pulsarAdmin; } @@ -422,6 +424,10 @@ private File getBuiltinArchive(FunctionDetails.Builder functionDetails) throws I } } + if (!StringUtils.isEmpty(functionDetails.getBuiltin())) { + return functionsManager.getFunctionArchive(functionDetails.getBuiltin()).toFile(); + } + throw new IOException("Could not find built in archive definition"); } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java index 3f522ddda6ceaa..98183eaac6ddf5 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java @@ -131,7 +131,7 @@ public int size() { public FunctionRuntimeManager(WorkerConfig workerConfig, WorkerService workerService, Namespace dlogNamespace, - MembershipManager membershipManager, ConnectorsManager connectorsManager, + MembershipManager membershipManager, ConnectorsManager connectorsManager, FunctionsManager functionsManager, FunctionMetaDataManager functionMetaDataManager) throws Exception { this.workerConfig = workerConfig; this.workerService = workerService; @@ -196,7 +196,7 @@ public FunctionRuntimeManager(WorkerConfig workerConfig, WorkerService workerSer this.runtimeFactory.initialize(workerConfig, authConfig, secretsProviderConfigurator, functionAuthProvider, runtimeCustomizer); this.functionActioner = new FunctionActioner(this.workerConfig, runtimeFactory, - dlogNamespace, connectorsManager, workerService.getBrokerAdmin()); + dlogNamespace, connectorsManager, functionsManager, workerService.getBrokerAdmin()); this.membershipManager = membershipManager; this.functionMetaDataManager = functionMetaDataManager; diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsManager.java new file mode 100644 index 00000000000000..427cc1eb7185c0 --- /dev/null +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsManager.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.worker; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.List; +import lombok.extern.slf4j.Slf4j; + +import org.apache.pulsar.common.functions.FunctionDefinition; +import org.apache.pulsar.functions.utils.functions.FunctionUtils; +import org.apache.pulsar.functions.utils.functions.Functions; +@Slf4j +public class FunctionsManager { + + private Functions functions; + + public FunctionsManager(WorkerConfig workerConfig) throws IOException { + this.functions = FunctionUtils.searchForFunctions(workerConfig.getFunctionsDirectory()); + } + + public List getFunctions() { + return functions.getFunctionsDefinitions(); + } + + public Path getFunctionArchive(String functionType) { + return functions.getFunctions().get(functionType); + } + + public void reloadFunctions(WorkerConfig workerConfig) throws IOException { + this.functions = FunctionUtils.searchForFunctions(workerConfig.getFunctionsDirectory()); + } +} diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java index 6fe750046016a0..edaade62678eca 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java @@ -66,6 +66,7 @@ public class WorkerService { private AuthenticationService authenticationService; private AuthorizationService authorizationService; private ConnectorsManager connectorsManager; + private FunctionsManager functionsManager; private PulsarAdmin brokerAdmin; private PulsarAdmin functionAdmin; private final MetricsGenerator metricsGenerator; @@ -170,6 +171,7 @@ public void start(URI dlogUri, this.workerConfig, this.schedulerManager, this.client); this.connectorsManager = new ConnectorsManager(workerConfig); + this.functionsManager = new FunctionsManager(workerConfig); //create membership manager String coordinationTopic = workerConfig.getClusterCoordinationTopic(); @@ -180,7 +182,7 @@ public void start(URI dlogUri, // create function runtime manager this.functionRuntimeManager = new FunctionRuntimeManager( - this.workerConfig, this, this.dlogNamespace, this.membershipManager, connectorsManager, functionMetaDataManager); + this.workerConfig, this, this.dlogNamespace, this.membershipManager, connectorsManager, functionsManager, functionMetaDataManager); // Setting references to managers in scheduler this.schedulerManager.setFunctionMetaDataManager(this.functionMetaDataManager); diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java index 99faadd7b8f7f1..141769d4dc0690 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java @@ -304,6 +304,10 @@ public static boolean isFunctionCodeBuiltin(Function.FunctionDetailsOrBuilder fu } } + if (!StringUtils.isEmpty(functionDetails.getBuiltin())) { + return true; + } + return false; } } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java index d806c69a1dee08..3ed7f380cdd58b 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java @@ -40,6 +40,7 @@ import org.apache.pulsar.functions.worker.FunctionMetaDataManager; import org.apache.pulsar.functions.worker.WorkerService; import org.apache.pulsar.functions.worker.WorkerUtils; +import org.apache.commons.lang3.StringUtils; import org.glassfish.jersey.media.multipart.FormDataContentDisposition; import javax.ws.rs.WebApplicationException; @@ -48,6 +49,7 @@ import java.io.IOException; import java.io.InputStream; import java.net.URI; +import java.nio.file.Path; import java.util.Collection; import java.util.LinkedList; import java.util.List; @@ -651,11 +653,30 @@ private Function.FunctionDetails validateUpdateRequestParams(final String tenant final File componentPackageFile) throws IOException { // The rest end points take precedence over whatever is there in function config + Path archivePath = null; functionConfig.setTenant(tenant); functionConfig.setNamespace(namespace); functionConfig.setName(componentName); FunctionConfigUtils.inferMissingArguments(functionConfig); - ClassLoader clsLoader = FunctionConfigUtils.validate(functionConfig, componentPackageFile); + + if (!StringUtils.isEmpty(functionConfig.getJar())) { + String builtinArchive = functionConfig.getJar(); + if (builtinArchive.startsWith(org.apache.pulsar.common.functions.Utils.BUILTIN)) { + builtinArchive = builtinArchive.replaceFirst("^builtin://", ""); + } + try { + archivePath = this.worker().getFunctionsManager().getFunctionArchive(builtinArchive); + } catch (Exception e) { + throw new IllegalArgumentException(String.format("No Function archive %s found", archivePath)); + } + } + ClassLoader clsLoader = null; + if(archivePath != null){ + clsLoader = FunctionConfigUtils.validate(functionConfig, archivePath.toFile()); + } + else{ + clsLoader = FunctionConfigUtils.validate(functionConfig, componentPackageFile); + } return FunctionConfigUtils.convert(functionConfig, clsLoader); } diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java index e066404d28c4b6..dc49036a5bebb2 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java @@ -78,7 +78,7 @@ public void testStartFunctionWithDLNamespace() throws Exception { @SuppressWarnings("resource") FunctionActioner actioner = new FunctionActioner(workerConfig, factory, dlogNamespace, - new ConnectorsManager(workerConfig), mock(PulsarAdmin.class)); + new ConnectorsManager(workerConfig), new FunctionsManager(workerConfig), mock(PulsarAdmin.class)); Runtime runtime = mock(Runtime.class); Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder() .setFunctionDetails(Function.FunctionDetails.newBuilder().setTenant("test-tenant") @@ -124,7 +124,7 @@ public void testStartFunctionWithPkgUrl() throws Exception { @SuppressWarnings("resource") FunctionActioner actioner = new FunctionActioner(workerConfig, factory, dlogNamespace, - new ConnectorsManager(workerConfig), mock(PulsarAdmin.class)); + new ConnectorsManager(workerConfig), new FunctionsManager(workerConfig), mock(PulsarAdmin.class)); // (1) test with file url. functionActioner should be able to consider file-url and it should be able to call // RuntimeSpawner @@ -186,7 +186,7 @@ public void testFunctionAuthDisabled() throws Exception { @SuppressWarnings("resource") FunctionActioner actioner = new FunctionActioner(workerConfig, factory, dlogNamespace, - new ConnectorsManager(workerConfig), mock(PulsarAdmin.class)); + new ConnectorsManager(workerConfig), new FunctionsManager(workerConfig), mock(PulsarAdmin.class)); String pkgPathLocation = "http://invalid/my-file.jar"; diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java index a6569b337996a6..2e6ed5bb5de5b3 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java @@ -101,6 +101,7 @@ public void testProcessAssignmentUpdateAddFunctions() throws Exception { mock(Namespace.class), mock(MembershipManager.class), mock(ConnectorsManager.class), + mock(FunctionsManager.class), mock(FunctionMetaDataManager.class))); FunctionActioner functionActioner = spy(functionRuntimeManager.getFunctionActioner()); doNothing().when(functionActioner).startFunction(any(FunctionRuntimeInfo.class)); @@ -183,6 +184,7 @@ public void testProcessAssignmentUpdateDeleteFunctions() throws Exception { mock(Namespace.class), mock(MembershipManager.class), mock(ConnectorsManager.class), + mock(FunctionsManager.class), mock(FunctionMetaDataManager.class))); FunctionActioner functionActioner = spy(functionRuntimeManager.getFunctionActioner()); doNothing().when(functionActioner).startFunction(any(FunctionRuntimeInfo.class)); @@ -268,6 +270,7 @@ public void testProcessAssignmentUpdateModifyFunctions() throws Exception { mock(Namespace.class), mock(MembershipManager.class), mock(ConnectorsManager.class), + mock(FunctionsManager.class), mock(FunctionMetaDataManager.class)); FunctionActioner functionActioner = spy(functionRuntimeManager.getFunctionActioner()); doNothing().when(functionActioner).startFunction(any(FunctionRuntimeInfo.class)); @@ -397,6 +400,7 @@ public void testReassignment() throws Exception { mock(Namespace.class), mock(MembershipManager.class), mock(ConnectorsManager.class), + mock(FunctionsManager.class), mock(FunctionMetaDataManager.class)); FunctionActioner functionActioner = spy(functionRuntimeManager.getFunctionActioner()); doNothing().when(functionActioner).startFunction(any(FunctionRuntimeInfo.class)); @@ -574,6 +578,7 @@ public Boolean answer(InvocationOnMock invocationOnMock) throws Throwable { mock(Namespace.class), mock(MembershipManager.class), mock(ConnectorsManager.class), + mock(FunctionsManager.class), mock(FunctionMetaDataManager.class)); FunctionActioner functionActioner = spy(functionRuntimeManager.getFunctionActioner()); doNothing().when(functionActioner).startFunction(any(FunctionRuntimeInfo.class)); @@ -632,7 +637,7 @@ public void testExternallyManagedRuntimeUpdate() throws Exception { FunctionActioner functionActioner = spy(new FunctionActioner( workerConfig, - kubernetesRuntimeFactory, null, null, null)); + kubernetesRuntimeFactory, null, null, null, null)); // test new assignment update functions FunctionRuntimeManager functionRuntimeManager = new FunctionRuntimeManager( @@ -641,6 +646,7 @@ public void testExternallyManagedRuntimeUpdate() throws Exception { mock(Namespace.class), mock(MembershipManager.class), mock(ConnectorsManager.class), + mock(FunctionsManager.class), mock(FunctionMetaDataManager.class)); functionRuntimeManager.setFunctionActioner(functionActioner); @@ -744,6 +750,7 @@ public void testFunctionRuntimeSetCorrectly() { mock(Namespace.class), mock(MembershipManager.class), mock(ConnectorsManager.class), + mock(FunctionsManager.class), mock(FunctionMetaDataManager.class)); fail(); @@ -767,6 +774,7 @@ public void testFunctionRuntimeSetCorrectly() { mock(Namespace.class), mock(MembershipManager.class), mock(ConnectorsManager.class), + mock(FunctionsManager.class), mock(FunctionMetaDataManager.class)); fail(); @@ -790,6 +798,7 @@ public void testFunctionRuntimeSetCorrectly() { mock(Namespace.class), mock(MembershipManager.class), mock(ConnectorsManager.class), + mock(FunctionsManager.class), mock(FunctionMetaDataManager.class)); fail(); @@ -813,6 +822,7 @@ public void testFunctionRuntimeSetCorrectly() { mock(Namespace.class), mock(MembershipManager.class), mock(ConnectorsManager.class), + mock(FunctionsManager.class), mock(FunctionMetaDataManager.class)); assertEquals(functionRuntimeManager.getRuntimeFactory().getClass(), ThreadRuntimeFactory.class); @@ -841,6 +851,7 @@ public void testFunctionRuntimeFactoryConfigsBackwardsCompatibility() throws Exc mock(Namespace.class), mock(MembershipManager.class), mock(ConnectorsManager.class), + mock(FunctionsManager.class), mock(FunctionMetaDataManager.class)); assertEquals(functionRuntimeManager.getRuntimeFactory().getClass(), KubernetesRuntimeFactory.class); @@ -868,6 +879,7 @@ public void testFunctionRuntimeFactoryConfigsBackwardsCompatibility() throws Exc mock(Namespace.class), mock(MembershipManager.class), mock(ConnectorsManager.class), + mock(FunctionsManager.class), mock(FunctionMetaDataManager.class)); assertEquals(functionRuntimeManager.getRuntimeFactory().getClass(), ProcessRuntimeFactory.class); @@ -891,6 +903,7 @@ public void testFunctionRuntimeFactoryConfigsBackwardsCompatibility() throws Exc mock(Namespace.class), mock(MembershipManager.class), mock(ConnectorsManager.class), + mock(FunctionsManager.class), mock(FunctionMetaDataManager.class)); assertEquals(functionRuntimeManager.getRuntimeFactory().getClass(), ThreadRuntimeFactory.class); diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java index 2d0460ba883759..14d104424fe39b 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java @@ -154,6 +154,7 @@ public void testCheckFailuresNoFailures() throws Exception { mock(Namespace.class), mock(MembershipManager.class), mock(ConnectorsManager.class), + mock(FunctionsManager.class), functionMetaDataManager)); MembershipManager membershipManager = spy(new MembershipManager(workerService, pulsarClient, pulsarAdmin)); @@ -226,6 +227,7 @@ public void testCheckFailuresSomeFailures() throws Exception { mock(Namespace.class), mock(MembershipManager.class), mock(ConnectorsManager.class), + mock(FunctionsManager.class), functionMetaDataManager)); MembershipManager membershipManager = spy(new MembershipManager(workerService, mockPulsarClient(), pulsarAdmin)); @@ -313,6 +315,7 @@ public void testCheckFailuresSomeUnassigned() throws Exception { mock(Namespace.class), mock(MembershipManager.class), mock(ConnectorsManager.class), + mock(FunctionsManager.class), functionMetaDataManager)); MembershipManager membershipManager = spy(new MembershipManager(workerService, mockPulsarClient(), pulsarAdmin)); @@ -392,6 +395,7 @@ public void testHeartBeatFunctionWorkerDown() throws Exception { mock(Namespace.class), mock(MembershipManager.class), mock(ConnectorsManager.class), + mock(FunctionsManager.class), functionMetaDataManager)); MembershipManager membershipManager = spy(new MembershipManager(workerService, mockPulsarClient(), pulsarAdmin));