Skip to content

Commit

Permalink
[functions] Add resource granularities settings for functions (#9736)
Browse files Browse the repository at this point in the history
### Motivation

Currently, users could request arbitrary resource values within the range of `functionInstanceMinResources` and `functionInstanceMaxResources` when creating functions, while in a production environment, it's common to define certain resource granularities (such as 0.5cpu/512Mi) and only allow the requested resources to be multiples of these granularities (such as 1cpu/512Mi, 2cpu/1.5Gi, 4cpu/8Gi) to reduce management costs.

Moreover, sometimes it's also necessary to require different types of resources to be the same multiples of the granularity. For example, if the resource granularities are set to 0.5cpu/512Mi, then valid resource values could be:
- 0.5cpu/512Mi
- 1cpu/1Gi
- 1.5cpu/1.5Gi
- 2cpu/2Gi
- ...
  • Loading branch information
fantapsody authored May 21, 2021
1 parent d700f8d commit 047fb6a
Show file tree
Hide file tree
Showing 6 changed files with 168 additions and 10 deletions.
12 changes: 12 additions & 0 deletions conf/functions_worker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,18 @@ functionRuntimeFactoryConfigs:
# ram: 17179869184
# disk: 107374182400

## Granularities of requested resources.
## If the granularity of any type of resource is set,
## the requested resource of the type must be a multiple of the granularity.
#functionInstanceResourceGranularities:
# cpu: 1.0
# ram: 1073741824
# disk: 10737418240

## If this configuration is set to be true, the amount of requested resources of all type of resources
## that have the granularity set must be the same multiples of their granularities.
#functionInstanceResourceChangeInLockStep: true

## The full class-name of an instance of RuntimeCustomizer.
## This class receives the customRuntimeOptions string and can customize details of how the runtime operates.
#runtimeCustomizerClassName: "org.apache.pulsar.functions.runtime.kubernetes.KubernetesManifestCustomizer"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ public class KubernetesRuntimeFactory implements RuntimeFactory {
private final String logDirectory = "logs/functions";
private Resources functionInstanceMinResources;
private Resources functionInstanceMaxResources;
private Resources functionInstanceResourceGranularities;
private boolean functionInstanceResourceChangeInLockStep;
private boolean authenticationEnabled;
private Integer grpcPort;
private Integer metricsPort;
Expand Down Expand Up @@ -210,6 +212,8 @@ public void initialize(WorkerConfig workerConfig, AuthenticationConfig authentic
this.changeConfigMapNamespace = factoryConfig.getChangeConfigMapNamespace();
this.functionInstanceMinResources = workerConfig.getFunctionInstanceMinResources();
this.functionInstanceMaxResources = workerConfig.getFunctionInstanceMaxResources();
this.functionInstanceResourceGranularities = workerConfig.getFunctionInstanceResourceGranularities();
this.functionInstanceResourceChangeInLockStep = workerConfig.isFunctionInstanceResourceChangeInLockStep();
this.secretsProviderConfigurator = secretsProviderConfigurator;
this.authenticationEnabled = workerConfig.isAuthenticationEnabled();
this.javaInstanceJarFile = this.pulsarRootDir + "/instances/java-instance.jar";
Expand Down Expand Up @@ -333,6 +337,7 @@ public void doAdmissionChecks(Function.FunctionDetails functionDetails) {
KubernetesRuntime.doChecks(functionDetails, overriddenJobName);
validateMinResourcesRequired(functionDetails);
validateMaxResourcesRequired(functionDetails);
validateResourcesGranularityAndProportion(functionDetails);
secretsProviderConfigurator.doAdmissionChecks(appsClient, coreClient,
getOverriddenNamespace(functionDetails), overriddenJobName, functionDetails);
}
Expand Down Expand Up @@ -447,6 +452,47 @@ void validateMaxResourcesRequired(Function.FunctionDetails functionDetails) {
}
}

void validateResourcesGranularityAndProportion(Function.FunctionDetails functionDetails) {
final long baseMillis = 1000;
long multiples = 0L;
if (functionInstanceResourceGranularities != null) {
Double grnCpu = functionInstanceResourceGranularities.getCpu();
Long grnRam = functionInstanceResourceGranularities.getRam();
if (grnCpu != null) {
// convert cpus to milli-cores to avoid loss of precision
long grnCpuMillis = Math.round(baseMillis * grnCpu);
if (grnCpuMillis > 0) {
long cpuMillis = Math.round(baseMillis * functionDetails.getResources().getCpu());
if (cpuMillis == 0 || cpuMillis % grnCpuMillis != 0) {
throw new IllegalArgumentException(
String.format("Per instance cpu requested, %s, for function should be positive and a multiple of the granularity, %s",
functionDetails.getResources().getCpu(), grnCpu));
}
if (functionInstanceResourceChangeInLockStep) {
multiples = cpuMillis / grnCpuMillis;
}
}
}
if (grnRam != null && grnRam > 0) {
if (functionDetails.getResources().getRam() == 0 || functionDetails.getResources().getRam() % grnRam != 0) {
throw new IllegalArgumentException(
String.format("Per instance ram requested, %s, for function should be positive and a multiple of the granularity, %s",
functionDetails.getResources().getRam(), grnRam));
}
if (functionInstanceResourceChangeInLockStep && multiples > 0) {
long ramMultiples = functionDetails.getResources().getRam() / grnRam;
if (multiples != ramMultiples) {
throw new IllegalArgumentException(
String.format("Per instance cpu requested, %s, ram requested, %s," +
" for function should be positive and the same multiple of the granularity, cpu, %s, ram, %s",
functionDetails.getResources().getCpu(), functionDetails.getResources().getRam(),
grnCpu, grnRam));
}
}
}
}
}

@Override
public Optional<KubernetesFunctionAuthProvider> getAuthProvider() {
return authProvider;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,18 @@ public String getBrokerClientTrustCertsFilePath() {
doc = "A set of the maximum amount of resources functions may request. Support for this depends on function runtime."
)
private Resources functionInstanceMaxResources;
@FieldContext(
category = CATEGORY_FUNC_RUNTIME_MNG,
doc = "Granularities of requested resources. If the granularity of any type of resource is set," +
" the requested resource of the type must be a multiple of the granularity."
)
private Resources functionInstanceResourceGranularities;
@FieldContext(
category = CATEGORY_FUNC_RUNTIME_MNG,
doc = "If this configuration is set to be true, the amount of requested resources of all type of resources" +
" that have the granularity set must be the same multiples of their granularities."
)
private boolean functionInstanceResourceChangeInLockStep = false;

@FieldContext(
category = CATEGORY_FUNC_RUNTIME_MNG,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,13 +139,18 @@ public void tearDown() {
}

KubernetesRuntimeFactory createKubernetesRuntimeFactory(String extraDepsDir, Resources minResources,
Resources maxResources) throws Exception {
return createKubernetesRuntimeFactory(extraDepsDir, minResources, maxResources, Optional.empty(), Optional.empty());
Resources maxResources,
Resources resourceGranularities,
boolean resourceChangeInLockStep) throws Exception {
return createKubernetesRuntimeFactory(extraDepsDir, minResources, maxResources, resourceGranularities,
resourceChangeInLockStep, Optional.empty(), Optional.empty());
}

KubernetesRuntimeFactory createKubernetesRuntimeFactory(String extraDepsDir,
Resources minResources,
Resources maxResources,
Resources resourceGranularities,
boolean resourceChangeInLockStep,
Optional<FunctionAuthProvider> functionAuthProvider,
Optional<RuntimeCustomizer> manifestCustomizer) throws Exception {
KubernetesRuntimeFactory factory = spy(new KubernetesRuntimeFactory());
Expand Down Expand Up @@ -182,6 +187,8 @@ KubernetesRuntimeFactory createKubernetesRuntimeFactory(String extraDepsDir,

workerConfig.setFunctionInstanceMinResources(minResources);
workerConfig.setFunctionInstanceMaxResources(maxResources);
workerConfig.setFunctionInstanceResourceGranularities(resourceGranularities);
workerConfig.setFunctionInstanceResourceChangeInLockStep(resourceChangeInLockStep);
workerConfig.setStateStorageServiceUrl(null);
workerConfig.setAuthenticationEnabled(false);

Expand All @@ -201,14 +208,14 @@ FunctionDetails createFunctionDetails() {

@Test
public void testAdmissionChecks() throws Exception {
factory = createKubernetesRuntimeFactory(null, null, null);
factory = createKubernetesRuntimeFactory(null, null, null, null, false);
FunctionDetails functionDetails = createFunctionDetails();
factory.doAdmissionChecks(functionDetails);
}

@Test
public void testValidateMinResourcesRequired() throws Exception {
factory = createKubernetesRuntimeFactory(null, null, null);
factory = createKubernetesRuntimeFactory(null, null, null, null, false);

FunctionDetails functionDetailsBase = createFunctionDetails();

Expand All @@ -233,7 +240,7 @@ public void testValidateMinResourcesRequired() throws Exception {

@Test
public void testValidateMaxResourcesRequired() throws Exception {
factory = createKubernetesRuntimeFactory(null, null, null);
factory = createKubernetesRuntimeFactory(null, null, null, null, false);

FunctionDetails functionDetailsBase = createFunctionDetails();

Expand Down Expand Up @@ -271,8 +278,64 @@ public void testValidateMinMaxResourcesRequired() throws Exception {
testMinMaxResource(0.2, null, true, "Per instance RAM requested, 0, for function is less than the minimum required, 1024");
}

@Test
public void testValidateResourcesGranularityAndProportion() throws Exception {
factory = createKubernetesRuntimeFactory(null, null, null, null, false);

Resources granularities = Resources.builder()
.cpu(0.1)
.ram(1000L)
.build();

// when resources granularities are not set
testResourceGranularities(null, null, null, false, false, null);
testResourceGranularities(0.05, 100L, null, false, false, null);

// only accept positive resource values when granularities are set
testResourceGranularities(null, null, granularities, false, true,
"Per instance cpu requested, 0.0, for function should be positive and a multiple of the granularity, 0.1");
testResourceGranularities(0.1, null, granularities, false, true,
"Per instance ram requested, 0, for function should be positive and a multiple of the granularity, 1000");
testResourceGranularities(0.1, 0L, granularities, false, true,
"Per instance ram requested, 0, for function should be positive and a multiple of the granularity, 1000");
testResourceGranularities(null, 1000L, granularities, false, true,
"Per instance cpu requested, 0.0, for function should be positive and a multiple of the granularity, 0.1");
testResourceGranularities(0.0, 1000L, granularities, false, true,
"Per instance cpu requested, 0.0, for function should be positive and a multiple of the granularity, 0.1");

// requested resources must be multiples of granularities
testResourceGranularities(0.05, 100L, granularities, false, true,
"Per instance cpu requested, 0.05, for function should be positive and a multiple of the granularity, 0.1");
testResourceGranularities(0.1, 100L, granularities, false, true,
"Per instance ram requested, 100, for function should be positive and a multiple of the granularity, 1000");
testResourceGranularities(1.01, 100L, granularities, false, true,
"Per instance cpu requested, 1.01, for function should be positive and a multiple of the granularity, 0.1");
testResourceGranularities(0.999, 100L, granularities, false, true,
"Per instance cpu requested, 0.999, for function should be positive and a multiple of the granularity, 0.1");
testResourceGranularities(1.001, 100L, granularities, false, true,
"Per instance cpu requested, 1.001, for function should be positive and a multiple of the granularity, 0.1");
testResourceGranularities(0.1, 1000L, granularities, false, false, null);
testResourceGranularities(1.0, 1000L, granularities, false, false, null);
testResourceGranularities(5.0, 1000L, granularities, false, false, null);

// resource values of different dimensions should respect lock step configs
testResourceGranularities(0.2, 1000L, granularities, false, false, null);
testResourceGranularities(0.1, 2000L, granularities, false, false, null);
testResourceGranularities(0.1, 2000L, granularities, true, true,
"Per instance cpu requested, 0.1, ram requested, 2000, for function should be positive and the same multiple of the granularity, cpu, 0.1, ram, 1000");
testResourceGranularities(0.2, 1000L, granularities, true, true,
"Per instance cpu requested, 0.2, ram requested, 1000, for function should be positive and the same multiple of the granularity, cpu, 0.1, ram, 1000");
testResourceGranularities(0.1, 1000L, granularities, true, false, null);
testResourceGranularities(0.2, 2000L, granularities, true, false, null);
testResourceGranularities(1.0, 10000L, granularities, true, false, null);
testResourceGranularities(10.0, 100000L, granularities, true, false, null);
testResourceGranularities(10.0, null, granularities, true, true,
"Per instance ram requested, 0, for function should be positive and a multiple of the granularity, 1000");
}

public void testAuthProvider(Optional<FunctionAuthProvider> authProvider) throws Exception {
factory = createKubernetesRuntimeFactory(null, null, null, authProvider, Optional.empty());
factory = createKubernetesRuntimeFactory(null, null, null, null, false,
authProvider, Optional.empty());
}


Expand Down Expand Up @@ -354,22 +417,29 @@ public void configureAuthDataStatefulSet(V1StatefulSet statefulSet, Optional<Fun
}

private void testMinResource(Double cpu, Long ram, boolean fail, String failError) throws Exception {
testResourceRestrictions(cpu, ram, Resources.builder().cpu(0.1).ram(1024L).build(), null, fail, failError);
testResourceRestrictions(cpu, ram, Resources.builder().cpu(0.1).ram(1024L).build(), null, null, false, fail, failError);
}

private void testMaxResource(Double cpu, Long ram, boolean fail, String failError) throws Exception {
testResourceRestrictions(cpu, ram, null, Resources.builder().cpu(1.0).ram(2048L).build(), fail, failError);
testResourceRestrictions(cpu, ram, null, Resources.builder().cpu(1.0).ram(2048L).build(), null, false, fail, failError);
}

private void testMinMaxResource(Double cpu, Long ram, boolean fail, String failError) throws Exception {
testResourceRestrictions(cpu, ram, Resources.builder().cpu(0.1).ram(1024L).build(),
Resources.builder().cpu(1.0).ram(2048L).build(), fail, failError);
Resources.builder().cpu(1.0).ram(2048L).build(), null, false, fail, failError);
}

private void testResourceGranularities(Double cpu, Long ram, Resources granularities, boolean changeInLockStep,
boolean fail, String failError) throws Exception {
testResourceRestrictions(cpu, ram, null, null, granularities, changeInLockStep,
fail, failError);
}

private void testResourceRestrictions(Double cpu, Long ram, Resources minResources, Resources maxResources,
Resources granularities, boolean changeInLockStep,
boolean fail, String failError) throws Exception {

factory = createKubernetesRuntimeFactory(null, minResources, maxResources);
factory = createKubernetesRuntimeFactory(null, minResources, maxResources, granularities, changeInLockStep);
FunctionDetails functionDetailsBase = createFunctionDetails();

Function.Resources.Builder resources = Function.Resources.newBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
package org.apache.pulsar.functions.worker;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;

import java.net.URL;

Expand Down Expand Up @@ -97,6 +99,8 @@ public void testLoadResourceRestrictionsConfig() throws Exception {
WorkerConfig emptyWc = WorkerConfig.load(emptyUrl.toURI().getPath());
assertNull(emptyWc.getFunctionInstanceMinResources());
assertNull(emptyWc.getFunctionInstanceMaxResources());
assertNull(emptyWc.getFunctionInstanceResourceGranularities());
assertFalse(emptyWc.isFunctionInstanceResourceChangeInLockStep());

URL newK8SUrl = getClass().getClassLoader().getResource("test_worker_k8s_resource_config.yml");
WorkerConfig newK8SWc = WorkerConfig.load(newK8SUrl.toURI().getPath());
Expand All @@ -109,5 +113,12 @@ public void testLoadResourceRestrictionsConfig() throws Exception {
assertEquals(newK8SWc.getFunctionInstanceMaxResources().getCpu(), 16.0, 0.001);
assertEquals(newK8SWc.getFunctionInstanceMaxResources().getRam().longValue(), 17179869184L);
assertEquals(newK8SWc.getFunctionInstanceMaxResources().getDisk().longValue(), 107374182400L);

assertNotNull(newK8SWc.getFunctionInstanceResourceGranularities());
assertEquals(newK8SWc.getFunctionInstanceResourceGranularities().getCpu(), 1.0, 0.001);
assertEquals(newK8SWc.getFunctionInstanceResourceGranularities().getRam().longValue(), 1073741824L);
assertEquals(newK8SWc.getFunctionInstanceResourceGranularities().getDisk().longValue(), 10737418240L);

assertTrue(newK8SWc.isFunctionInstanceResourceChangeInLockStep());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,10 @@ functionInstanceMaxResources:
cpu: 16
ram: 17179869184
disk: 107374182400

functionInstanceResourceGranularities:
cpu: 1.0
ram: 1073741824
disk: 10737418240

functionInstanceResourceChangeInLockStep: true

0 comments on commit 047fb6a

Please sign in to comment.