diff --git a/conf/functions_worker.yml b/conf/functions_worker.yml index 410274b4b0c95..29889bb58108a 100644 --- a/conf/functions_worker.yml +++ b/conf/functions_worker.yml @@ -233,6 +233,18 @@ functionRuntimeFactoryConfigs: # ram: 17179869184 # disk: 107374182400 +## Granularities of requested resources. +## If the granularity of any type of resource is set, +## the requested resource of the type must be a multiple of the granularity. +#functionInstanceResourceGranularities: +# cpu: 1.0 +# ram: 1073741824 +# disk: 10737418240 + +## If this configuration is set to be true, the amount of requested resources of all type of resources +## that have the granularity set must be the same multiples of their granularities. +#functionInstanceResourceChangeInLockStep: true + ## The full class-name of an instance of RuntimeCustomizer. ## This class receives the customRuntimeOptions string and can customize details of how the runtime operates. #runtimeCustomizerClassName: "org.apache.pulsar.functions.runtime.kubernetes.KubernetesManifestCustomizer" diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java index 9155cf3dcc14d..e1b7970158f4b 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java @@ -95,6 +95,8 @@ public class KubernetesRuntimeFactory implements RuntimeFactory { private final String logDirectory = "logs/functions"; private Resources functionInstanceMinResources; private Resources functionInstanceMaxResources; + private Resources functionInstanceResourceGranularities; + private boolean functionInstanceResourceChangeInLockStep; private boolean authenticationEnabled; private Integer grpcPort; private Integer metricsPort; @@ -210,6 +212,8 @@ public void initialize(WorkerConfig workerConfig, AuthenticationConfig authentic this.changeConfigMapNamespace = factoryConfig.getChangeConfigMapNamespace(); this.functionInstanceMinResources = workerConfig.getFunctionInstanceMinResources(); this.functionInstanceMaxResources = workerConfig.getFunctionInstanceMaxResources(); + this.functionInstanceResourceGranularities = workerConfig.getFunctionInstanceResourceGranularities(); + this.functionInstanceResourceChangeInLockStep = workerConfig.isFunctionInstanceResourceChangeInLockStep(); this.secretsProviderConfigurator = secretsProviderConfigurator; this.authenticationEnabled = workerConfig.isAuthenticationEnabled(); this.javaInstanceJarFile = this.pulsarRootDir + "/instances/java-instance.jar"; @@ -333,6 +337,7 @@ public void doAdmissionChecks(Function.FunctionDetails functionDetails) { KubernetesRuntime.doChecks(functionDetails, overriddenJobName); validateMinResourcesRequired(functionDetails); validateMaxResourcesRequired(functionDetails); + validateResourcesGranularityAndProportion(functionDetails); secretsProviderConfigurator.doAdmissionChecks(appsClient, coreClient, getOverriddenNamespace(functionDetails), overriddenJobName, functionDetails); } @@ -447,6 +452,47 @@ void validateMaxResourcesRequired(Function.FunctionDetails functionDetails) { } } + void validateResourcesGranularityAndProportion(Function.FunctionDetails functionDetails) { + final long baseMillis = 1000; + long multiples = 0L; + if (functionInstanceResourceGranularities != null) { + Double grnCpu = functionInstanceResourceGranularities.getCpu(); + Long grnRam = functionInstanceResourceGranularities.getRam(); + if (grnCpu != null) { + // convert cpus to milli-cores to avoid loss of precision + long grnCpuMillis = Math.round(baseMillis * grnCpu); + if (grnCpuMillis > 0) { + long cpuMillis = Math.round(baseMillis * functionDetails.getResources().getCpu()); + if (cpuMillis == 0 || cpuMillis % grnCpuMillis != 0) { + throw new IllegalArgumentException( + String.format("Per instance cpu requested, %s, for function should be positive and a multiple of the granularity, %s", + functionDetails.getResources().getCpu(), grnCpu)); + } + if (functionInstanceResourceChangeInLockStep) { + multiples = cpuMillis / grnCpuMillis; + } + } + } + if (grnRam != null && grnRam > 0) { + if (functionDetails.getResources().getRam() == 0 || functionDetails.getResources().getRam() % grnRam != 0) { + throw new IllegalArgumentException( + String.format("Per instance ram requested, %s, for function should be positive and a multiple of the granularity, %s", + functionDetails.getResources().getRam(), grnRam)); + } + if (functionInstanceResourceChangeInLockStep && multiples > 0) { + long ramMultiples = functionDetails.getResources().getRam() / grnRam; + if (multiples != ramMultiples) { + throw new IllegalArgumentException( + String.format("Per instance cpu requested, %s, ram requested, %s," + + " for function should be positive and the same multiple of the granularity, cpu, %s, ram, %s", + functionDetails.getResources().getCpu(), functionDetails.getResources().getRam(), + grnCpu, grnRam)); + } + } + } + } + } + @Override public Optional getAuthProvider() { return authProvider; diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java index 84978d88057a7..64cbc095f4018 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java @@ -467,6 +467,18 @@ public String getBrokerClientTrustCertsFilePath() { doc = "A set of the maximum amount of resources functions may request. Support for this depends on function runtime." ) private Resources functionInstanceMaxResources; + @FieldContext( + category = CATEGORY_FUNC_RUNTIME_MNG, + doc = "Granularities of requested resources. If the granularity of any type of resource is set," + + " the requested resource of the type must be a multiple of the granularity." + ) + private Resources functionInstanceResourceGranularities; + @FieldContext( + category = CATEGORY_FUNC_RUNTIME_MNG, + doc = "If this configuration is set to be true, the amount of requested resources of all type of resources" + + " that have the granularity set must be the same multiples of their granularities." + ) + private boolean functionInstanceResourceChangeInLockStep = false; @FieldContext( category = CATEGORY_FUNC_RUNTIME_MNG, diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryTest.java index 62f7a42d110be..176d5e901dda9 100644 --- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryTest.java +++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryTest.java @@ -139,13 +139,18 @@ public void tearDown() { } KubernetesRuntimeFactory createKubernetesRuntimeFactory(String extraDepsDir, Resources minResources, - Resources maxResources) throws Exception { - return createKubernetesRuntimeFactory(extraDepsDir, minResources, maxResources, Optional.empty(), Optional.empty()); + Resources maxResources, + Resources resourceGranularities, + boolean resourceChangeInLockStep) throws Exception { + return createKubernetesRuntimeFactory(extraDepsDir, minResources, maxResources, resourceGranularities, + resourceChangeInLockStep, Optional.empty(), Optional.empty()); } KubernetesRuntimeFactory createKubernetesRuntimeFactory(String extraDepsDir, Resources minResources, Resources maxResources, + Resources resourceGranularities, + boolean resourceChangeInLockStep, Optional functionAuthProvider, Optional manifestCustomizer) throws Exception { KubernetesRuntimeFactory factory = spy(new KubernetesRuntimeFactory()); @@ -182,6 +187,8 @@ KubernetesRuntimeFactory createKubernetesRuntimeFactory(String extraDepsDir, workerConfig.setFunctionInstanceMinResources(minResources); workerConfig.setFunctionInstanceMaxResources(maxResources); + workerConfig.setFunctionInstanceResourceGranularities(resourceGranularities); + workerConfig.setFunctionInstanceResourceChangeInLockStep(resourceChangeInLockStep); workerConfig.setStateStorageServiceUrl(null); workerConfig.setAuthenticationEnabled(false); @@ -201,14 +208,14 @@ FunctionDetails createFunctionDetails() { @Test public void testAdmissionChecks() throws Exception { - factory = createKubernetesRuntimeFactory(null, null, null); + factory = createKubernetesRuntimeFactory(null, null, null, null, false); FunctionDetails functionDetails = createFunctionDetails(); factory.doAdmissionChecks(functionDetails); } @Test public void testValidateMinResourcesRequired() throws Exception { - factory = createKubernetesRuntimeFactory(null, null, null); + factory = createKubernetesRuntimeFactory(null, null, null, null, false); FunctionDetails functionDetailsBase = createFunctionDetails(); @@ -233,7 +240,7 @@ public void testValidateMinResourcesRequired() throws Exception { @Test public void testValidateMaxResourcesRequired() throws Exception { - factory = createKubernetesRuntimeFactory(null, null, null); + factory = createKubernetesRuntimeFactory(null, null, null, null, false); FunctionDetails functionDetailsBase = createFunctionDetails(); @@ -271,8 +278,64 @@ public void testValidateMinMaxResourcesRequired() throws Exception { testMinMaxResource(0.2, null, true, "Per instance RAM requested, 0, for function is less than the minimum required, 1024"); } + @Test + public void testValidateResourcesGranularityAndProportion() throws Exception { + factory = createKubernetesRuntimeFactory(null, null, null, null, false); + + Resources granularities = Resources.builder() + .cpu(0.1) + .ram(1000L) + .build(); + + // when resources granularities are not set + testResourceGranularities(null, null, null, false, false, null); + testResourceGranularities(0.05, 100L, null, false, false, null); + + // only accept positive resource values when granularities are set + testResourceGranularities(null, null, granularities, false, true, + "Per instance cpu requested, 0.0, for function should be positive and a multiple of the granularity, 0.1"); + testResourceGranularities(0.1, null, granularities, false, true, + "Per instance ram requested, 0, for function should be positive and a multiple of the granularity, 1000"); + testResourceGranularities(0.1, 0L, granularities, false, true, + "Per instance ram requested, 0, for function should be positive and a multiple of the granularity, 1000"); + testResourceGranularities(null, 1000L, granularities, false, true, + "Per instance cpu requested, 0.0, for function should be positive and a multiple of the granularity, 0.1"); + testResourceGranularities(0.0, 1000L, granularities, false, true, + "Per instance cpu requested, 0.0, for function should be positive and a multiple of the granularity, 0.1"); + + // requested resources must be multiples of granularities + testResourceGranularities(0.05, 100L, granularities, false, true, + "Per instance cpu requested, 0.05, for function should be positive and a multiple of the granularity, 0.1"); + testResourceGranularities(0.1, 100L, granularities, false, true, + "Per instance ram requested, 100, for function should be positive and a multiple of the granularity, 1000"); + testResourceGranularities(1.01, 100L, granularities, false, true, + "Per instance cpu requested, 1.01, for function should be positive and a multiple of the granularity, 0.1"); + testResourceGranularities(0.999, 100L, granularities, false, true, + "Per instance cpu requested, 0.999, for function should be positive and a multiple of the granularity, 0.1"); + testResourceGranularities(1.001, 100L, granularities, false, true, + "Per instance cpu requested, 1.001, for function should be positive and a multiple of the granularity, 0.1"); + testResourceGranularities(0.1, 1000L, granularities, false, false, null); + testResourceGranularities(1.0, 1000L, granularities, false, false, null); + testResourceGranularities(5.0, 1000L, granularities, false, false, null); + + // resource values of different dimensions should respect lock step configs + testResourceGranularities(0.2, 1000L, granularities, false, false, null); + testResourceGranularities(0.1, 2000L, granularities, false, false, null); + testResourceGranularities(0.1, 2000L, granularities, true, true, + "Per instance cpu requested, 0.1, ram requested, 2000, for function should be positive and the same multiple of the granularity, cpu, 0.1, ram, 1000"); + testResourceGranularities(0.2, 1000L, granularities, true, true, + "Per instance cpu requested, 0.2, ram requested, 1000, for function should be positive and the same multiple of the granularity, cpu, 0.1, ram, 1000"); + testResourceGranularities(0.1, 1000L, granularities, true, false, null); + testResourceGranularities(0.2, 2000L, granularities, true, false, null); + testResourceGranularities(1.0, 10000L, granularities, true, false, null); + testResourceGranularities(10.0, 100000L, granularities, true, false, null); + testResourceGranularities(10.0, null, granularities, true, true, + "Per instance ram requested, 0, for function should be positive and a multiple of the granularity, 1000"); + } + public void testAuthProvider(Optional authProvider) throws Exception { - factory = createKubernetesRuntimeFactory(null, null, null, authProvider, Optional.empty()); + factory = createKubernetesRuntimeFactory(null, null, null, null, false, + authProvider, Optional.empty()); } @@ -354,22 +417,29 @@ public void configureAuthDataStatefulSet(V1StatefulSet statefulSet, Optional