Skip to content
This repository has been archived by the owner on Mar 3, 2023. It is now read-only.

Improve the Kubernetes Resource handling #3664

Merged
merged 7 commits into from
Jan 18, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,18 +62,41 @@ String getKubernetesUri() {
return KubernetesContext.getSchedulerURI(configuration);
}

Resource getContainerResource(PackingPlan packingPlan) {
ContainerResourcePair getContainerResourcePair(PackingPlan packingPlan) {
// Align resources to maximal requested resource
PackingPlan updatedPackingPlan = packingPlan.cloneWithHomogeneousScheduledResource();
SchedulerUtils.persistUpdatedPackingPlan(Runtime.topologyName(runtimeConfiguration),
updatedPackingPlan, Runtime.schedulerStateManagerAdaptor(runtimeConfiguration));

return updatedPackingPlan.getContainers().iterator().next().getScheduledResource().get();
return new ContainerResourcePair(
updatedPackingPlan.getContainers().iterator().next().getRequiredResource(),
updatedPackingPlan.getContainers().iterator().next().getScheduledResource().get()
);
}

abstract boolean submit(PackingPlan packingPlan);

abstract boolean killTopology();

abstract boolean restart(int shardId);

public static class ContainerResourcePair {
private final Resource requiredResource;
private final Resource scheduledResource;

public ContainerResourcePair(
Resource requiredResource,
Resource scheduledResource) {
this.requiredResource = requiredResource;
this.scheduledResource = scheduledResource;
}

public Resource getRequiredResource() {
return requiredResource;
}

public Resource getScheduledResource() {
return scheduledResource;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import org.apache.heron.scheduler.utils.SchedulerUtils.ExecutorPort;
import org.apache.heron.spi.common.Config;
import org.apache.heron.spi.packing.PackingPlan;
import org.apache.heron.spi.packing.Resource;

import io.kubernetes.client.custom.Quantity;
import io.kubernetes.client.custom.V1Patch;
Expand Down Expand Up @@ -100,7 +99,7 @@ boolean submit(PackingPlan packingPlan) {
throw new TopologySubmissionException("K8S scheduler does not allow upper case topologies.");
}

final Resource containerResource = getContainerResource(packingPlan);
final ContainerResourcePair resourcePair = getContainerResourcePair(packingPlan);

final V1Service topologyService = createTopologyyService();
try {
Expand All @@ -118,7 +117,7 @@ boolean submit(PackingPlan packingPlan) {
for (PackingPlan.ContainerPlan containerPlan : packingPlan.getContainers()) {
numberOfInstances = Math.max(numberOfInstances, containerPlan.getInstances().size());
}
final V1StatefulSet statefulSet = createStatefulSet(containerResource, numberOfInstances);
final V1StatefulSet statefulSet = createStatefulSet(resourcePair, numberOfInstances);

try {
final V1StatefulSet response =
Expand Down Expand Up @@ -316,7 +315,7 @@ private V1Service createTopologyyService() {
return service;
}

private V1StatefulSet createStatefulSet(Resource containerResource, int numberOfInstances) {
private V1StatefulSet createStatefulSet(ContainerResourcePair containerResource, int numberOfInstances) {
final String topologyName = getTopologyName();
final Config runtimeConfiguration = getRuntimeConfiguration();

Expand Down Expand Up @@ -384,7 +383,7 @@ private Map<String, String> getLabels(String topologyName) {
return labels;
}

private V1PodSpec getPodSpec(List<String> executorCommand, Resource resource,
private V1PodSpec getPodSpec(List<String> executorCommand, ContainerResourcePair resource,
int numberOfInstances) {
final V1PodSpec podSpec = new V1PodSpec();

Expand Down Expand Up @@ -429,7 +428,7 @@ private void addVolumesIfPresent(V1PodSpec spec) {
}
}

private V1Container getContainer(List<String> executorCommand, Resource resource,
private V1Container getContainer(List<String> executorCommand, ContainerResourcePair resource,
int numberOfInstances) {
final Config configuration = getConfiguration();
final V1Container container = new V1Container().name("executor");
Expand Down Expand Up @@ -463,10 +462,16 @@ private V1Container getContainer(List<String> executorCommand, Resource resource
final V1ResourceRequirements resourceRequirements = new V1ResourceRequirements();
final Map<String, Quantity> requests = new HashMap<>();
requests.put(KubernetesConstants.MEMORY,
Quantity.fromString(KubernetesUtils.Megabytes(resource.getRam())));
Quantity.fromString(KubernetesUtils.Megabytes(resource.getRequiredResource().getRam())));
requests.put(KubernetesConstants.CPU,
Quantity.fromString(Double.toString(roundDecimal(resource.getCpu(), 3))));
Quantity.fromString(Double.toString(roundDecimal(resource.getRequiredResource().getCpu(), 3))));
resourceRequirements.setRequests(requests);
final Map<String, Quantity> limits = new HashMap<>();
limits.put(KubernetesConstants.MEMORY,
Quantity.fromString(KubernetesUtils.Megabytes(resource.getScheduledResource().getRam())));
limits.put(KubernetesConstants.CPU,
Quantity.fromString(Double.toString(roundDecimal(resource.getScheduledResource().getCpu(), 3))));
resourceRequirements.setLimits(limits);
container.setResources(resourceRequirements);

// set container ports
Expand Down