From b412e069fac663566d3e5cc7eb725eb11f1882b2 Mon Sep 17 00:00:00 2001 From: Nicholas Nezis Date: Thu, 21 Nov 2019 01:52:53 -0500 Subject: [PATCH 1/5] Removed the deprecated k8s models --- deploy/kubernetes/general/apiserver.yaml | 2 +- .../general/bookkeeper.statefulset.yaml | 12 +++++++++-- .../general/bookkeeper.statefulset_empty.yaml | 12 +++++++++-- deploy/kubernetes/general/bookkeeper.yaml | 13 ++++++++++-- deploy/kubernetes/general/tools.yaml | 19 +++++++++++++++++- deploy/kubernetes/general/zookeeper.yaml | 8 ++++++-- deploy/kubernetes/gke/gcs-apiserver.yaml | 2 +- deploy/kubernetes/helm/templates/bookie.yaml | 10 ++++++++-- deploy/kubernetes/helm/templates/tools.yaml | 6 +++++- deploy/kubernetes/minikube/apiserver.yaml | 2 +- deploy/kubernetes/minikube/bookkeeper.yaml | 7 ++++++- deploy/kubernetes/minikube/tools.yaml | 19 +++++++++++++++++- deploy/kubernetes/minikube/zookeeper.yaml | 20 ++++++++++++++++++- 13 files changed, 114 insertions(+), 18 deletions(-) diff --git a/deploy/kubernetes/general/apiserver.yaml b/deploy/kubernetes/general/apiserver.yaml index b11f7acf63d..470dff330d2 100644 --- a/deploy/kubernetes/general/apiserver.yaml +++ b/deploy/kubernetes/general/apiserver.yaml @@ -45,7 +45,7 @@ subjects: --- -apiVersion: extensions/v1beta1 +apiVersion: apps/v1 kind: Deployment metadata: name: heron-apiserver diff --git a/deploy/kubernetes/general/bookkeeper.statefulset.yaml b/deploy/kubernetes/general/bookkeeper.statefulset.yaml index 89aaf2d9130..e5c9fe2c19a 100644 --- a/deploy/kubernetes/general/bookkeeper.statefulset.yaml +++ b/deploy/kubernetes/general/bookkeeper.statefulset.yaml @@ -36,7 +36,7 @@ data: BK_useHostNameAsBookieID: "true" --- -apiVersion: apps/v1beta1 +apiVersion: apps/v1 kind: StatefulSet metadata: name: bookie @@ -44,6 +44,10 @@ metadata: app: bookkeeper component: bookie spec: + selector: + matchLabels: + app: bookkeeper + component: bookie serviceName: "bookkeeper" replicas: 3 template: @@ -145,11 +149,15 @@ spec: ## Auto-Recovery makes sure to restore the replication factor when any bookie ## crashes and it's not recovering on its own. ## -apiVersion: apps/v1beta1 +apiVersion: apps/v1 kind: Deployment metadata: name: bookie-autorecovery spec: + selector: + matchLabels: + app: bookkeeper + component: bookkeeper-replication replicas: 2 template: metadata: diff --git a/deploy/kubernetes/general/bookkeeper.statefulset_empty.yaml b/deploy/kubernetes/general/bookkeeper.statefulset_empty.yaml index 219e3c276a3..90a158f71ae 100644 --- a/deploy/kubernetes/general/bookkeeper.statefulset_empty.yaml +++ b/deploy/kubernetes/general/bookkeeper.statefulset_empty.yaml @@ -36,7 +36,7 @@ data: BK_useHostNameAsBookieID: "true" --- -apiVersion: apps/v1beta1 +apiVersion: apps/v1 kind: StatefulSet metadata: name: bookie @@ -44,6 +44,10 @@ metadata: app: bookkeeper component: bookie spec: + selector: + matchLabels: + app: bookkeeper + component: bookie serviceName: "bookkeeper" replicas: 3 template: @@ -129,11 +133,15 @@ spec: ## Auto-Recovery makes sure to restore the replication factor when any bookie ## crashes and it's not recovering on its own. ## -apiVersion: apps/v1beta1 +apiVersion: apps/v1 kind: Deployment metadata: name: bookie-autorecovery spec: + selector: + matchLabels: + app: bookkeeper + component: bookkeeper-replication replicas: 2 template: metadata: diff --git a/deploy/kubernetes/general/bookkeeper.yaml b/deploy/kubernetes/general/bookkeeper.yaml index 4fd985bc20f..2ba48912276 100644 --- a/deploy/kubernetes/general/bookkeeper.yaml +++ b/deploy/kubernetes/general/bookkeeper.yaml @@ -38,7 +38,7 @@ data: ## cannot be moved across different nodes. ## For this reason, we run BK as a daemon set, one for each node in the ## cluster, unless restricted by label selectors -apiVersion: extensions/v1beta1 +apiVersion: apps/v1 kind: DaemonSet metadata: name: bookie @@ -46,6 +46,11 @@ metadata: app: bookkeeper component: bookie spec: + selector: + matchLabels: + app: bookkeeper + component: bookie + cluster: bookkeeper template: metadata: labels: @@ -130,11 +135,15 @@ spec: ## Auto-Recovery makes sure to restore the replication factor when any bookie ## crashes and it's not recovering on its own. ## -apiVersion: apps/v1beta1 +apiVersion: apps/v1 kind: Deployment metadata: name: bookie-autorecovery spec: + selector: + matchLabels: + app: bookkeeper + component: bookkeeper-replication replicas: 2 template: metadata: diff --git a/deploy/kubernetes/general/tools.yaml b/deploy/kubernetes/general/tools.yaml index d4ab5ce52ba..096841b1a81 100644 --- a/deploy/kubernetes/general/tools.yaml +++ b/deploy/kubernetes/general/tools.yaml @@ -18,12 +18,15 @@ ## ## Deployment Pod for tracker and ui ## -apiVersion: extensions/v1beta1 +apiVersion: apps/v1 kind: Deployment metadata: name: heron-tracker namespace: default spec: + selector: + matchLabels: + app: heron-tracker template: metadata: labels: @@ -57,6 +60,13 @@ spec: --name=localzk --hostport=zookeeper:2181 --rootpath="/heron" + resources: + requests: + cpu: "100m" + memory: "200M" + limits: + cpu: "200m" + memory: "300M" - name: heron-ui image: heron/heron:latest ports: @@ -68,6 +78,13 @@ spec: heron-ui --port=8889 --base_url=/api/v1/namespaces/default/services/heron-ui:8889/proxy + resources: + requests: + cpu: "100m" + memory: "200M" + limits: + cpu: "200m" + memory: "300M" --- ## diff --git a/deploy/kubernetes/general/zookeeper.yaml b/deploy/kubernetes/general/zookeeper.yaml index c644621ace9..835bee57a31 100644 --- a/deploy/kubernetes/general/zookeeper.yaml +++ b/deploy/kubernetes/general/zookeeper.yaml @@ -33,7 +33,7 @@ spec: --- ## Define a StatefulSet for ZK servers -apiVersion: apps/v1beta1 +apiVersion: apps/v1 kind: StatefulSet metadata: name: zk @@ -43,6 +43,10 @@ metadata: spec: serviceName: zookeeper replicas: 1 + selector: + matchLabels: + app: heron + component: zookeeper template: metadata: labels: @@ -66,7 +70,7 @@ spec: topologyKey: "kubernetes.io/hostname" containers: - name: zookeeper - image: heron/heron:0.16.2 + image: heron/heron:latest command: ["sh", "-c"] args: - > diff --git a/deploy/kubernetes/gke/gcs-apiserver.yaml b/deploy/kubernetes/gke/gcs-apiserver.yaml index ee0b3808333..d268a0f7f3a 100644 --- a/deploy/kubernetes/gke/gcs-apiserver.yaml +++ b/deploy/kubernetes/gke/gcs-apiserver.yaml @@ -45,7 +45,7 @@ subjects: --- -apiVersion: extensions/v1beta1 +apiVersion: apps/v1 kind: Deployment metadata: name: heron-apiserver diff --git a/deploy/kubernetes/helm/templates/bookie.yaml b/deploy/kubernetes/helm/templates/bookie.yaml index fecc7f16bc6..7c627729642 100644 --- a/deploy/kubernetes/helm/templates/bookie.yaml +++ b/deploy/kubernetes/helm/templates/bookie.yaml @@ -41,6 +41,7 @@ data: BK_indexDirectories: "/bookkeeper/data/ledgers" BK_zkServers: {{ .Release.Name }}-zookeeper:{{ .Values.zookeeper.clientPort }} BK_autoRecoveryDaemonEnabled: "true" + BK_useHostNameAsBookieID: "true" # TODO: Issue 458: https://github.com/apache/bookkeeper/issues/458 {{- if eq .Values.bookkeeper.prometheus.enabled true }} BK_enableStatistics: "true" @@ -54,10 +55,10 @@ data: ## For this reason, we run BK as a daemon set, one for each node in the ## cluster, unless restricted by label selectors {{- if or (eq .Values.platform "gke") (eq .Values.platform "minikube") }} -apiVersion: apps/v1beta1 +apiVersion: apps/v1 kind: StatefulSet {{- else }} -apiVersion: extensions/v1beta1 +apiVersion: apps/v1 kind: DaemonSet {{- end }} @@ -67,6 +68,11 @@ metadata: app: {{ .Release.Name }}-bookkeeper component: {{ .Release.Name }}-bookie spec: + selector: + matchLabels: + app: {{ .Release.Name }}-bookkeeper + component: {{ .Release.Name }}-bookie + cluster: {{ .Release.Name }}-bookkeeper {{- if or (eq .Values.platform "gke") (eq .Values.platform "minikube") }} serviceName: {{ .Release.Name }}-bookkeeper replicas: {{ $bookieReplicas }} diff --git a/deploy/kubernetes/helm/templates/tools.yaml b/deploy/kubernetes/helm/templates/tools.yaml index 72f92dd7d40..779f0623930 100644 --- a/deploy/kubernetes/helm/templates/tools.yaml +++ b/deploy/kubernetes/helm/templates/tools.yaml @@ -28,7 +28,7 @@ data: HERON_APISERVER_MEM_MAX: {{ $apiServerMemory | quote }} --- -apiVersion: extensions/v1beta1 +apiVersion: apps/v1 kind: Deployment metadata: name: {{ .Release.Name }}-tools @@ -38,6 +38,10 @@ metadata: release: {{ .Release.Name }} heritage: {{ .Release.Service }} spec: + selector: + matchLabels: + app: {{ .Release.Name }}-tools + release: {{ .Release.Name }} template: metadata: labels: diff --git a/deploy/kubernetes/minikube/apiserver.yaml b/deploy/kubernetes/minikube/apiserver.yaml index 2a89c1a6623..3c4a9b29950 100644 --- a/deploy/kubernetes/minikube/apiserver.yaml +++ b/deploy/kubernetes/minikube/apiserver.yaml @@ -46,7 +46,7 @@ subjects: --- -apiVersion: extensions/v1beta1 +apiVersion: apps/v1 kind: Deployment metadata: name: heron-apiserver diff --git a/deploy/kubernetes/minikube/bookkeeper.yaml b/deploy/kubernetes/minikube/bookkeeper.yaml index d5ec6687e7e..f5778c3c303 100644 --- a/deploy/kubernetes/minikube/bookkeeper.yaml +++ b/deploy/kubernetes/minikube/bookkeeper.yaml @@ -36,7 +36,7 @@ data: ## cannot be moved across different nodes. ## For this reason, we run BK as a daemon set, one for each node in the ## cluster, unless restricted by label selectors -apiVersion: extensions/v1beta1 +apiVersion: apps/v1 kind: DaemonSet metadata: name: bookie @@ -44,6 +44,11 @@ metadata: app: bookkeeper component: bookie spec: + selector: + matchLabels: + app: bookkeeper + component: bookie + cluster: bookkeeper template: metadata: labels: diff --git a/deploy/kubernetes/minikube/tools.yaml b/deploy/kubernetes/minikube/tools.yaml index 535d9b6112d..75643d8e76f 100644 --- a/deploy/kubernetes/minikube/tools.yaml +++ b/deploy/kubernetes/minikube/tools.yaml @@ -18,12 +18,15 @@ ## ## Deployment Pod for tracker and ui ## -apiVersion: extensions/v1beta1 +apiVersion: apps/v1 kind: Deployment metadata: name: heron-tracker namespace: default spec: + selector: + matchLabels: + app: heron-tracker template: metadata: labels: @@ -43,6 +46,13 @@ spec: --name=kubernetes --hostport=zookeeper:2181 --rootpath="/heron" + resources: + requests: + cpu: "100m" + memory: "200M" + limits: + cpu: "200m" + memory: "300M" - name: heron-ui image: heron/heron:latest ports: @@ -54,6 +64,13 @@ spec: heron-ui --port=8889 --base_url=/api/v1/namespaces/default/services/heron-ui:8889/proxy + resources: + requests: + cpu: "100m" + memory: "200M" + limits: + cpu: "200m" + memory: "300M" --- ## diff --git a/deploy/kubernetes/minikube/zookeeper.yaml b/deploy/kubernetes/minikube/zookeeper.yaml index f4e5f047ee2..779db9d0fd1 100644 --- a/deploy/kubernetes/minikube/zookeeper.yaml +++ b/deploy/kubernetes/minikube/zookeeper.yaml @@ -20,7 +20,7 @@ ## ## Define a StatefulSet for ZK servers -apiVersion: apps/v1beta1 +apiVersion: apps/v1 kind: StatefulSet metadata: name: zk @@ -30,6 +30,10 @@ metadata: spec: serviceName: zookeeper replicas: 1 + selector: + matchLabels: + app: heron + component: zookeeper template: metadata: labels: @@ -37,6 +41,20 @@ spec: component: zookeeper spec: + # Make sure multiple pods of ZK don't get scheduled on the + # same node, unless there are no other available nodes + affinity: + podAntiAffinity: + preferredDuringSchedulingIgnoredDuringExecution: + - weight: 1 + podAffinityTerm: + labelSelector: + matchExpressions: + - key: "app" + operator: In + values: + - zookeeper + topologyKey: "kubernetes.io/hostname" containers: - name: zookeeper image: heron/heron:latest From 27d8a0afee3b460b3ae72141ff931e723997e0c0 Mon Sep 17 00:00:00 2001 From: Nicholas Nezis Date: Thu, 21 Nov 2019 02:19:00 -0500 Subject: [PATCH 2/5] Upgraded Kubernetes Java client --- WORKSPACE | 2 +- .../kubernetes/AppsV1Controller.java | 471 ++++++++++++++++++ .../kubernetes/AppsV1beta1Controller.java | 38 +- .../kubernetes/KubernetesCompat.java | 6 +- .../kubernetes/KubernetesScheduler.java | 2 +- .../heron/scheduler/kubernetes/Volumes.java | 8 +- .../scheduler/kubernetes/VolumesTests.java | 2 +- 7 files changed, 500 insertions(+), 29 deletions(-) create mode 100644 heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/AppsV1Controller.java diff --git a/WORKSPACE b/WORKSPACE index 5853ef5da78..01ea9262df8 100644 --- a/WORKSPACE +++ b/WORKSPACE @@ -537,7 +537,7 @@ maven_jar( # end Pulsar Client # Kubernetes java client -kubernetes_client_version = "1.0.0-beta1" +kubernetes_client_version = "6.0.1" squareup_okhttp_version = "2.7.5" maven_jar( diff --git a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/AppsV1Controller.java b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/AppsV1Controller.java new file mode 100644 index 00000000000..a58d0f97b30 --- /dev/null +++ b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/AppsV1Controller.java @@ -0,0 +1,471 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.heron.scheduler.kubernetes; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.logging.Level; +import java.util.logging.Logger; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import com.google.gson.Gson; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.squareup.okhttp.Response; + +import org.apache.heron.api.utils.TopologyUtils; +import org.apache.heron.scheduler.TopologyRuntimeManagementException; +import org.apache.heron.scheduler.TopologySubmissionException; +import org.apache.heron.scheduler.utils.Runtime; +import org.apache.heron.scheduler.utils.SchedulerUtils; +import org.apache.heron.scheduler.utils.SchedulerUtils.ExecutorPort; +import org.apache.heron.spi.common.Config; +import org.apache.heron.spi.packing.PackingPlan; +import org.apache.heron.spi.packing.Resource; + +import io.kubernetes.client.openapi.ApiClient; +import io.kubernetes.client.openapi.ApiException; +import io.kubernetes.client.openapi.api.AppsV1Api; +import io.kubernetes.client.openapi.models.V1Container; +import io.kubernetes.client.openapi.models.V1ContainerPort; +import io.kubernetes.client.openapi.models.V1DeleteOptions; +import io.kubernetes.client.openapi.models.V1EnvVar; +import io.kubernetes.client.openapi.models.V1EnvVarSource; +import io.kubernetes.client.openapi.models.V1LabelSelector; +import io.kubernetes.client.openapi.models.V1ObjectFieldSelector; +import io.kubernetes.client.openapi.models.V1ObjectMeta; +import io.kubernetes.client.openapi.models.V1PodSpec; +import io.kubernetes.client.openapi.models.V1PodTemplateSpec; +import io.kubernetes.client.openapi.models.V1ResourceRequirements; +import io.kubernetes.client.openapi.models.V1Toleration; +import io.kubernetes.client.openapi.models.V1Volume; +import io.kubernetes.client.openapi.models.V1VolumeMount; +import io.kubernetes.client.openapi.models.V1StatefulSet; +import io.kubernetes.client.openapi.models.V1StatefulSetSpec; + +public class AppsV1Controller extends KubernetesController { + + private static final Logger LOG = + Logger.getLogger(AppsV1Controller.class.getName()); + + private static final String ENV_SHARD_ID = "SHARD_ID"; + + private final AppsV1Api client; + + AppsV1Controller(Config configuration, Config runtimeConfiguration) { + super(configuration, runtimeConfiguration); + final ApiClient apiClient = new ApiClient().setBasePath(getKubernetesUri()); + client = new AppsV1Api(apiClient); + } + + @Override + boolean submit(PackingPlan packingPlan) { + final String topologyName = getTopologyName(); + if (!topologyName.equals(topologyName.toLowerCase())) { + throw new TopologySubmissionException("K8S scheduler does not allow upper case topologies."); + } + + final Resource containerResource = getContainerResource(packingPlan); + + // find the max number of instances in a container so we can open + // enough ports if remote debugging is enabled. + int numberOfInstances = 0; + for (PackingPlan.ContainerPlan containerPlan : packingPlan.getContainers()) { + numberOfInstances = Math.max(numberOfInstances, containerPlan.getInstances().size()); + } + final V1StatefulSet statefulSet = createStatefulSet(containerResource, numberOfInstances); + + try { + final Response response = + client.createNamespacedStatefulSetCall(getNamespace(), statefulSet, null, + null, null).execute(); + if (!response.isSuccessful()) { + LOG.log(Level.SEVERE, "Error creating topology message: " + response.message()); + KubernetesUtils.logResponseBodyIfPresent(LOG, response); + // construct a message based on the k8s API server response + throw new TopologySubmissionException( + KubernetesUtils.errorMessageFromResponse(response)); + } + } catch (IOException | ApiException e) { + KubernetesUtils.logExceptionWithDetails(LOG, "Error creating topology", e); + throw new TopologySubmissionException(e.getMessage()); + } + + return true; + } + + @Override + boolean killTopology() { + return + isStatefulSet() + ? deleteStatefulSet() + : + new KubernetesCompat().killTopology(getKubernetesUri(), getTopologyName(), getNamespace()); + } + + @Override + boolean restart(int shardId) { + final String message = "Restarting the whole topology is not supported yet. " + + "Please kill and resubmit the topology."; + LOG.log(Level.SEVERE, message); + return false; + } + + @Override + public Set + addContainers(Set containersToAdd) { + final V1StatefulSet statefulSet; + try { + statefulSet = getStatefulSet(); + } catch (ApiException ae) { + final String message = ae.getMessage() + "\ndetails:" + ae.getResponseBody(); + throw new TopologyRuntimeManagementException(message, ae); + } + final int currentContainerCount = statefulSet.getSpec().getReplicas(); + final int newContainerCount = currentContainerCount + containersToAdd.size(); + + final V1StatefulSetSpec newSpec = new V1StatefulSetSpec(); + newSpec.setReplicas(newContainerCount); + + try { + doPatch(newSpec); + } catch (ApiException ae) { + throw new TopologyRuntimeManagementException( + ae.getMessage() + "\ndetails\n" + ae.getResponseBody()); + } + + return containersToAdd; + } + + @Override + public void removeContainers(Set containersToRemove) { + final V1StatefulSet statefulSet; + try { + statefulSet = getStatefulSet(); + } catch (ApiException ae) { + final String message = ae.getMessage() + "\ndetails:" + ae.getResponseBody(); + throw new TopologyRuntimeManagementException(message, ae); + } + final int currentContainerCount = statefulSet.getSpec().getReplicas(); + final int newContainerCount = currentContainerCount - containersToRemove.size(); + + final V1StatefulSetSpec newSpec = new V1StatefulSetSpec(); + newSpec.setReplicas(newContainerCount); + + try { + doPatch(newSpec); + } catch (ApiException e) { + throw new TopologyRuntimeManagementException( + e.getMessage() + "\ndetails\n" + e.getResponseBody()); + } + } + + private void doPatch(V1StatefulSetSpec patchedSpec) throws ApiException { + final String body = + String.format(JSON_PATCH_STATEFUL_SET_REPLICAS_FORMAT, + patchedSpec.getReplicas().toString()); + final ArrayList arr = new ArrayList<>(); + arr.add(((JsonElement) deserialize(body, JsonElement.class)).getAsJsonObject()); + LOG.fine("Update body: " + arr); + client.patchNamespacedStatefulSet(getTopologyName(), getNamespace(), arr, null); + } + + private static final String JSON_PATCH_STATEFUL_SET_REPLICAS_FORMAT = + "{\"op\":\"replace\",\"path\":\"/spec/replicas\",\"value\":%s}"; + + private Object deserialize(String jsonStr, Class targetClass) { + return (new Gson()).fromJson(jsonStr, targetClass); + } + + V1StatefulSet getStatefulSet() throws ApiException { + return client.readNamespacedStatefulSet(getTopologyName(), getNamespace(), null, null, null); + } + + boolean deleteStatefulSet() { + try { + final V1DeleteOptions options = new V1DeleteOptions(); + options.setGracePeriodSeconds(0L); + options.setPropagationPolicy(KubernetesConstants.DELETE_OPTIONS_PROPAGATION_POLICY); + final Response response = client.deleteNamespacedStatefulSetCall(getTopologyName(), + getNamespace(), options, null, null, null, null, null, null) + .execute(); + + if (!response.isSuccessful()) { + LOG.log(Level.SEVERE, "Error killing topology message: " + response.message()); + KubernetesUtils.logResponseBodyIfPresent(LOG, response); + + throw new TopologyRuntimeManagementException( + KubernetesUtils.errorMessageFromResponse(response)); + } + } catch (IOException | ApiException e) { + KubernetesUtils.logExceptionWithDetails(LOG, "Error deleting topology", e); + return false; + } + + return true; + } + + boolean isStatefulSet() { + try { + final Response response = + client.readNamespacedStatefulSetCall(getTopologyName(), getNamespace(), + null, null, null, null, null) + .execute(); + return response.isSuccessful(); + } catch (IOException | ApiException e) { + LOG.warning("isStatefulSet check " + e.getMessage()); + } + return false; + } + + protected List getExecutorCommand(String containerId) { + final Map ports = + KubernetesConstants.EXECUTOR_PORTS.entrySet() + .stream() + .collect(Collectors.toMap(Map.Entry::getKey, + e -> e.getValue().toString())); + + final Config configuration = getConfiguration(); + final Config runtimeConfiguration = getRuntimeConfiguration(); + final String[] executorCommand = + SchedulerUtils.getExecutorCommand(configuration, runtimeConfiguration, + containerId, ports); + return Arrays.asList( + "sh", + "-c", + KubernetesUtils.getConfCommand(configuration) + + " && " + KubernetesUtils.getFetchCommand(configuration, runtimeConfiguration) + + " && " + setShardIdEnvironmentVariableCommand() + + " && " + String.join(" ", executorCommand) + ); + } + + private static String setShardIdEnvironmentVariableCommand() { + return String.format("%s=${POD_NAME##*-} && echo shardId=${%s}", ENV_SHARD_ID, ENV_SHARD_ID); + } + + + private V1StatefulSet createStatefulSet(Resource containerResource, int numberOfInstances) { + final String topologyName = getTopologyName(); + final Config runtimeConfiguration = getRuntimeConfiguration(); + + final V1StatefulSet statefulSet = new V1StatefulSet(); + + // setup stateful set metadata + final V1ObjectMeta objectMeta = new V1ObjectMeta(); + objectMeta.name(topologyName); + statefulSet.metadata(objectMeta); + + // create the stateful set spec + final V1StatefulSetSpec statefulSetSpec = new V1StatefulSetSpec(); + statefulSetSpec.serviceName(topologyName); + statefulSetSpec.setReplicas(Runtime.numContainers(runtimeConfiguration).intValue()); + + // Parallel pod management tells the StatefulSet controller to launch or terminate + // all Pods in parallel, and not to wait for Pods to become Running and Ready or completely + // terminated prior to launching or terminating another Pod. + statefulSetSpec.setPodManagementPolicy("Parallel"); + + // add selector match labels "app=heron" and "topology=topology-name" + // so the we know which pods to manage + final V1LabelSelector selector = new V1LabelSelector(); + selector.matchLabels(getMatchLabels(topologyName)); + statefulSetSpec.selector(selector); + + // create a pod template + final V1PodTemplateSpec podTemplateSpec = new V1PodTemplateSpec(); + + // set up pod meta + final V1ObjectMeta templateMetaData = new V1ObjectMeta().labels(getLabels(topologyName)); + templateMetaData.annotations(getPrometheusAnnotations()); + podTemplateSpec.setMetadata(templateMetaData); + + final List command = getExecutorCommand("$" + ENV_SHARD_ID); + podTemplateSpec.spec(getPodSpec(command, containerResource, numberOfInstances)); + + statefulSetSpec.setTemplate(podTemplateSpec); + + statefulSet.spec(statefulSetSpec); + + return statefulSet; + } + + private Map getPrometheusAnnotations() { + final Map annotations = new HashMap<>(); + annotations.put(KubernetesConstants.ANNOTATION_PROMETHEUS_SCRAPE, "true"); + annotations.put(KubernetesConstants.ANNOTATION_PROMETHEUS_PORT, + KubernetesConstants.PROMETHEUS_PORT); + + return annotations; + } + + private Map getMatchLabels(String topologyName) { + final Map labels = new HashMap<>(); + labels.put(KubernetesConstants.LABEL_APP, KubernetesConstants.LABEL_APP_VALUE); + labels.put(KubernetesConstants.LABEL_TOPOLOGY, topologyName); + return labels; + } + + private Map getLabels(String topologyName) { + final Map labels = new HashMap<>(); + labels.put(KubernetesConstants.LABEL_APP, KubernetesConstants.LABEL_APP_VALUE); + labels.put(KubernetesConstants.LABEL_TOPOLOGY, topologyName); + return labels; + } + + private V1PodSpec getPodSpec(List executorCommand, Resource resource, + int numberOfInstances) { + final V1PodSpec podSpec = new V1PodSpec(); + + // set the termination period to 0 so pods can be deleted quickly + podSpec.setTerminationGracePeriodSeconds(0L); + + // set the pod tolerations so pods are rescheduled when nodes go down + // https://kubernetes.io/docs/concepts/configuration/taint-and-toleration/#taint-based-evictions + podSpec.setTolerations(getTolerations()); + + podSpec.containers(Collections.singletonList( + getContainer(executorCommand, resource, numberOfInstances))); + + addVolumesIfPresent(podSpec); + + return podSpec; + } + + private List getTolerations() { + final List tolerations = new ArrayList<>(); + KubernetesConstants.TOLERATIONS.forEach(t -> { + final V1Toleration toleration = + new V1Toleration() + .key(t) + .operator("Exists") + .effect("NoExecute") + .tolerationSeconds(10L); + tolerations.add(toleration); + }); + + return tolerations; + } + + private void addVolumesIfPresent(V1PodSpec spec) { + final Config config = getConfiguration(); + if (KubernetesContext.hasVolume(config)) { + final V1Volume volume = Volumes.get().create(config); + if (volume != null) { + LOG.fine("Adding volume: " + volume.toString()); + spec.volumes(Collections.singletonList(volume)); + } + } + } + + private V1Container getContainer(List executorCommand, Resource resource, + int numberOfInstances) { + final Config configuration = getConfiguration(); + final V1Container container = new V1Container().name("executor"); + + // set up the container images + container.setImage(KubernetesContext.getExecutorDockerImage(configuration)); + + // set up the container command + container.setCommand(executorCommand); + + if (KubernetesContext.hasImagePullPolicy(configuration)) { + container.setImagePullPolicy(KubernetesContext.getKubernetesImagePullPolicy(configuration)); + } + + // setup the environment variables for the container + final V1EnvVar envVarHost = new V1EnvVar(); + envVarHost.name(KubernetesConstants.ENV_HOST) + .valueFrom(new V1EnvVarSource() + .fieldRef(new V1ObjectFieldSelector() + .fieldPath(KubernetesConstants.POD_IP))); + + final V1EnvVar envVarPodName = new V1EnvVar(); + envVarPodName.name(KubernetesConstants.ENV_POD_NAME) + .valueFrom(new V1EnvVarSource() + .fieldRef(new V1ObjectFieldSelector() + .fieldPath(KubernetesConstants.POD_NAME))); + container.setEnv(Arrays.asList(envVarHost, envVarPodName)); + + + // set container resources + final V1ResourceRequirements resourceRequirements = new V1ResourceRequirements(); + final Map requests = new HashMap<>(); + requests.put(KubernetesConstants.MEMORY, + KubernetesUtils.Megabytes(resource.getRam())); + requests.put(KubernetesConstants.CPU, Double.toString(resource.getCpu())); + resourceRequirements.setRequests(requests); + container.setResources(resourceRequirements); + + // set container ports + final boolean debuggingEnabled = + TopologyUtils.getTopologyRemoteDebuggingEnabled( + Runtime.topology(getRuntimeConfiguration())); + container.setPorts(getContainerPorts(debuggingEnabled, numberOfInstances)); + + // setup volume mounts + mountVolumeIfPresent(container); + + return container; + } + + private List getContainerPorts(boolean remoteDebugEnabled, + int numberOfInstances) { + List ports = new ArrayList<>(); + KubernetesConstants.EXECUTOR_PORTS.forEach((p, v) -> { + final V1ContainerPort port = new V1ContainerPort(); + port.setName(p.getName()); + port.setContainerPort(v); + ports.add(port); + }); + + + if (remoteDebugEnabled) { + IntStream.range(0, numberOfInstances).forEach(i -> { + final String portName = + KubernetesConstants.JVM_REMOTE_DEBUGGER_PORT_NAME + "-" + String.valueOf(i); + final V1ContainerPort port = new V1ContainerPort(); + port.setName(portName); + port.setContainerPort(KubernetesConstants.JVM_REMOTE_DEBUGGER_PORT + i); + ports.add(port); + }); + } + + return ports; + } + + private void mountVolumeIfPresent(V1Container container) { + final Config config = getConfiguration(); + if (KubernetesContext.hasContainerVolume(config)) { + final V1VolumeMount mount = + new V1VolumeMount() + .name(KubernetesContext.getContainerVolumeName(config)) + .mountPath(KubernetesContext.getContainerVolumeMountPath(config)); + container.volumeMounts(Collections.singletonList(mount)); + } + } +} diff --git a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/AppsV1beta1Controller.java b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/AppsV1beta1Controller.java index 2aea58d1f01..f9cb0da3b1b 100644 --- a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/AppsV1beta1Controller.java +++ b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/AppsV1beta1Controller.java @@ -47,25 +47,25 @@ import org.apache.heron.spi.packing.PackingPlan; import org.apache.heron.spi.packing.Resource; -import io.kubernetes.client.ApiClient; -import io.kubernetes.client.ApiException; -import io.kubernetes.client.apis.AppsV1beta1Api; -import io.kubernetes.client.models.V1Container; -import io.kubernetes.client.models.V1ContainerPort; -import io.kubernetes.client.models.V1DeleteOptions; -import io.kubernetes.client.models.V1EnvVar; -import io.kubernetes.client.models.V1EnvVarSource; -import io.kubernetes.client.models.V1LabelSelector; -import io.kubernetes.client.models.V1ObjectFieldSelector; -import io.kubernetes.client.models.V1ObjectMeta; -import io.kubernetes.client.models.V1PodSpec; -import io.kubernetes.client.models.V1PodTemplateSpec; -import io.kubernetes.client.models.V1ResourceRequirements; -import io.kubernetes.client.models.V1Toleration; -import io.kubernetes.client.models.V1Volume; -import io.kubernetes.client.models.V1VolumeMount; -import io.kubernetes.client.models.V1beta1StatefulSet; -import io.kubernetes.client.models.V1beta1StatefulSetSpec; +import io.kubernetes.client.openapi.ApiClient; +import io.kubernetes.client.openapi.ApiException; +import io.kubernetes.client.openapi.api.AppsV1beta1Api; +import io.kubernetes.client.openapi.models.V1Container; +import io.kubernetes.client.openapi.models.V1ContainerPort; +import io.kubernetes.client.openapi.models.V1DeleteOptions; +import io.kubernetes.client.openapi.models.V1EnvVar; +import io.kubernetes.client.openapi.models.V1EnvVarSource; +import io.kubernetes.client.openapi.models.V1LabelSelector; +import io.kubernetes.client.openapi.models.V1ObjectFieldSelector; +import io.kubernetes.client.openapi.models.V1ObjectMeta; +import io.kubernetes.client.openapi.models.V1PodSpec; +import io.kubernetes.client.openapi.models.V1PodTemplateSpec; +import io.kubernetes.client.openapi.models.V1ResourceRequirements; +import io.kubernetes.client.openapi.models.V1Toleration; +import io.kubernetes.client.openapi.models.V1Volume; +import io.kubernetes.client.openapi.models.V1VolumeMount; +import io.kubernetes.client.openapi.models.V1beta1StatefulSet; +import io.kubernetes.client.openapi.models.V1beta1StatefulSetSpec; public class AppsV1beta1Controller extends KubernetesController { diff --git a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesCompat.java b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesCompat.java index 4197535a4ce..b7a7005c96b 100644 --- a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesCompat.java +++ b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesCompat.java @@ -27,9 +27,9 @@ import org.apache.heron.scheduler.TopologyRuntimeManagementException; -import io.kubernetes.client.ApiClient; -import io.kubernetes.client.ApiException; -import io.kubernetes.client.apis.CoreV1Api; +import io.kubernetes.client.openapi.ApiClient; +import io.kubernetes.client.openapi.ApiException; +import io.kubernetes.client.openapi.api.CoreV1Api; public class KubernetesCompat { diff --git a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesScheduler.java b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesScheduler.java index bb6a919f605..71545beab3c 100644 --- a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesScheduler.java +++ b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesScheduler.java @@ -50,7 +50,7 @@ public class KubernetesScheduler implements IScheduler, IScalable { private UpdateTopologyManager updateTopologyManager; protected KubernetesController getController() { - return new AppsV1beta1Controller(configuration, runtimeConfiguration); + return new AppsV1Controller(configuration, runtimeConfiguration); } @Override diff --git a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/Volumes.java b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/Volumes.java index 32194af8bce..90f15bd7334 100644 --- a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/Volumes.java +++ b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/Volumes.java @@ -24,10 +24,10 @@ import org.apache.heron.spi.common.Config; -import io.kubernetes.client.models.V1AWSElasticBlockStoreVolumeSource; -import io.kubernetes.client.models.V1HostPathVolumeSource; -import io.kubernetes.client.models.V1NFSVolumeSource; -import io.kubernetes.client.models.V1Volume; +import io.kubernetes.client.openapi.models.V1AWSElasticBlockStoreVolumeSource; +import io.kubernetes.client.openapi.models.V1HostPathVolumeSource; +import io.kubernetes.client.openapi.models.V1NFSVolumeSource; +import io.kubernetes.client.openapi.models.V1Volume; final class Volumes { diff --git a/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/VolumesTests.java b/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/VolumesTests.java index 32ff5ecea58..95f82f44e5f 100644 --- a/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/VolumesTests.java +++ b/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/VolumesTests.java @@ -24,7 +24,7 @@ import org.apache.heron.spi.common.Config; -import io.kubernetes.client.models.V1Volume; +import io.kubernetes.client.openapi.models.V1Volume; public class VolumesTests { From 8d977e2318a0425e39d511ce66dba084c32d16d3 Mon Sep 17 00:00:00 2001 From: Nicholas Nezis Date: Wed, 27 Nov 2019 23:21:26 -0500 Subject: [PATCH 3/5] Updated to use Kubernetes client 6.0.1 --- WORKSPACE | 2 +- .../kubernetes/AppsV1Controller.java | 51 +- .../kubernetes/AppsV1beta1Controller.java | 471 ------------------ .../kubernetes/KubernetesCompat.java | 6 +- .../heron/scheduler/kubernetes/Volumes.java | 8 +- .../scheduler/kubernetes/VolumesTests.java | 2 +- 6 files changed, 38 insertions(+), 502 deletions(-) delete mode 100644 heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/AppsV1beta1Controller.java diff --git a/WORKSPACE b/WORKSPACE index 01ea9262df8..5d55887b3fa 100644 --- a/WORKSPACE +++ b/WORKSPACE @@ -537,7 +537,7 @@ maven_jar( # end Pulsar Client # Kubernetes java client -kubernetes_client_version = "6.0.1" +kubernetes_client_version = "2.0.0" squareup_okhttp_version = "2.7.5" maven_jar( diff --git a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/AppsV1Controller.java b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/AppsV1Controller.java index a58d0f97b30..62bb24cc856 100644 --- a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/AppsV1Controller.java +++ b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/AppsV1Controller.java @@ -47,25 +47,27 @@ import org.apache.heron.spi.packing.PackingPlan; import org.apache.heron.spi.packing.Resource; -import io.kubernetes.client.openapi.ApiClient; -import io.kubernetes.client.openapi.ApiException; -import io.kubernetes.client.openapi.api.AppsV1Api; -import io.kubernetes.client.openapi.models.V1Container; -import io.kubernetes.client.openapi.models.V1ContainerPort; -import io.kubernetes.client.openapi.models.V1DeleteOptions; -import io.kubernetes.client.openapi.models.V1EnvVar; -import io.kubernetes.client.openapi.models.V1EnvVarSource; -import io.kubernetes.client.openapi.models.V1LabelSelector; -import io.kubernetes.client.openapi.models.V1ObjectFieldSelector; -import io.kubernetes.client.openapi.models.V1ObjectMeta; -import io.kubernetes.client.openapi.models.V1PodSpec; -import io.kubernetes.client.openapi.models.V1PodTemplateSpec; -import io.kubernetes.client.openapi.models.V1ResourceRequirements; -import io.kubernetes.client.openapi.models.V1Toleration; -import io.kubernetes.client.openapi.models.V1Volume; -import io.kubernetes.client.openapi.models.V1VolumeMount; -import io.kubernetes.client.openapi.models.V1StatefulSet; -import io.kubernetes.client.openapi.models.V1StatefulSetSpec; +import io.kubernetes.client.ApiClient; +import io.kubernetes.client.ApiException; +import io.kubernetes.client.apis.AppsV1Api; +import io.kubernetes.client.custom.Quantity; +import io.kubernetes.client.custom.Quantity.Format; +import io.kubernetes.client.models.V1Container; +import io.kubernetes.client.models.V1ContainerPort; +import io.kubernetes.client.models.V1DeleteOptions; +import io.kubernetes.client.models.V1EnvVar; +import io.kubernetes.client.models.V1EnvVarSource; +import io.kubernetes.client.models.V1LabelSelector; +import io.kubernetes.client.models.V1ObjectFieldSelector; +import io.kubernetes.client.models.V1ObjectMeta; +import io.kubernetes.client.models.V1PodSpec; +import io.kubernetes.client.models.V1PodTemplateSpec; +import io.kubernetes.client.models.V1ResourceRequirements; +import io.kubernetes.client.models.V1Toleration; +import io.kubernetes.client.models.V1Volume; +import io.kubernetes.client.models.V1VolumeMount; +import io.kubernetes.client.models.V1StatefulSet; +import io.kubernetes.client.models.V1StatefulSetSpec; public class AppsV1Controller extends KubernetesController { @@ -414,10 +416,10 @@ private V1Container getContainer(List executorCommand, Resource resource // set container resources final V1ResourceRequirements resourceRequirements = new V1ResourceRequirements(); - final Map requests = new HashMap<>(); + final Map requests = new HashMap<>(); requests.put(KubernetesConstants.MEMORY, - KubernetesUtils.Megabytes(resource.getRam())); - requests.put(KubernetesConstants.CPU, Double.toString(resource.getCpu())); + Quantity.fromString(KubernetesUtils.Megabytes(resource.getRam()))); + requests.put(KubernetesConstants.CPU, Quantity.fromString(Double.toString(roundDecimal(resource.getCpu(), 3)))); resourceRequirements.setRequests(requests); container.setResources(resourceRequirements); @@ -468,4 +470,9 @@ private void mountVolumeIfPresent(V1Container container) { container.volumeMounts(Collections.singletonList(mount)); } } + + public static double roundDecimal(double value, int places) { + double scale = Math.pow(10, places); + return Math.round(value * scale) / scale; + } } diff --git a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/AppsV1beta1Controller.java b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/AppsV1beta1Controller.java deleted file mode 100644 index f9cb0da3b1b..00000000000 --- a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/AppsV1beta1Controller.java +++ /dev/null @@ -1,471 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.heron.scheduler.kubernetes; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.logging.Level; -import java.util.logging.Logger; -import java.util.stream.Collectors; -import java.util.stream.IntStream; - -import com.google.gson.Gson; -import com.google.gson.JsonElement; -import com.google.gson.JsonObject; -import com.squareup.okhttp.Response; - -import org.apache.heron.api.utils.TopologyUtils; -import org.apache.heron.scheduler.TopologyRuntimeManagementException; -import org.apache.heron.scheduler.TopologySubmissionException; -import org.apache.heron.scheduler.utils.Runtime; -import org.apache.heron.scheduler.utils.SchedulerUtils; -import org.apache.heron.scheduler.utils.SchedulerUtils.ExecutorPort; -import org.apache.heron.spi.common.Config; -import org.apache.heron.spi.packing.PackingPlan; -import org.apache.heron.spi.packing.Resource; - -import io.kubernetes.client.openapi.ApiClient; -import io.kubernetes.client.openapi.ApiException; -import io.kubernetes.client.openapi.api.AppsV1beta1Api; -import io.kubernetes.client.openapi.models.V1Container; -import io.kubernetes.client.openapi.models.V1ContainerPort; -import io.kubernetes.client.openapi.models.V1DeleteOptions; -import io.kubernetes.client.openapi.models.V1EnvVar; -import io.kubernetes.client.openapi.models.V1EnvVarSource; -import io.kubernetes.client.openapi.models.V1LabelSelector; -import io.kubernetes.client.openapi.models.V1ObjectFieldSelector; -import io.kubernetes.client.openapi.models.V1ObjectMeta; -import io.kubernetes.client.openapi.models.V1PodSpec; -import io.kubernetes.client.openapi.models.V1PodTemplateSpec; -import io.kubernetes.client.openapi.models.V1ResourceRequirements; -import io.kubernetes.client.openapi.models.V1Toleration; -import io.kubernetes.client.openapi.models.V1Volume; -import io.kubernetes.client.openapi.models.V1VolumeMount; -import io.kubernetes.client.openapi.models.V1beta1StatefulSet; -import io.kubernetes.client.openapi.models.V1beta1StatefulSetSpec; - -public class AppsV1beta1Controller extends KubernetesController { - - private static final Logger LOG = - Logger.getLogger(AppsV1beta1Controller.class.getName()); - - private static final String ENV_SHARD_ID = "SHARD_ID"; - - private final AppsV1beta1Api client; - - AppsV1beta1Controller(Config configuration, Config runtimeConfiguration) { - super(configuration, runtimeConfiguration); - final ApiClient apiClient = new ApiClient().setBasePath(getKubernetesUri()); - client = new AppsV1beta1Api(apiClient); - } - - @Override - boolean submit(PackingPlan packingPlan) { - final String topologyName = getTopologyName(); - if (!topologyName.equals(topologyName.toLowerCase())) { - throw new TopologySubmissionException("K8S scheduler does not allow upper case topologies."); - } - - final Resource containerResource = getContainerResource(packingPlan); - - // find the max number of instances in a container so we can open - // enough ports if remote debugging is enabled. - int numberOfInstances = 0; - for (PackingPlan.ContainerPlan containerPlan : packingPlan.getContainers()) { - numberOfInstances = Math.max(numberOfInstances, containerPlan.getInstances().size()); - } - final V1beta1StatefulSet statefulSet = createStatefulSet(containerResource, numberOfInstances); - - try { - final Response response = - client.createNamespacedStatefulSetCall(getNamespace(), statefulSet, null, - null, null).execute(); - if (!response.isSuccessful()) { - LOG.log(Level.SEVERE, "Error creating topology message: " + response.message()); - KubernetesUtils.logResponseBodyIfPresent(LOG, response); - // construct a message based on the k8s API server response - throw new TopologySubmissionException( - KubernetesUtils.errorMessageFromResponse(response)); - } - } catch (IOException | ApiException e) { - KubernetesUtils.logExceptionWithDetails(LOG, "Error creating topology", e); - throw new TopologySubmissionException(e.getMessage()); - } - - return true; - } - - @Override - boolean killTopology() { - return - isStatefulSet() - ? deleteStatefulSet() - : - new KubernetesCompat().killTopology(getKubernetesUri(), getTopologyName(), getNamespace()); - } - - @Override - boolean restart(int shardId) { - final String message = "Restarting the whole topology is not supported yet. " - + "Please kill and resubmit the topology."; - LOG.log(Level.SEVERE, message); - return false; - } - - @Override - public Set - addContainers(Set containersToAdd) { - final V1beta1StatefulSet statefulSet; - try { - statefulSet = getStatefulSet(); - } catch (ApiException ae) { - final String message = ae.getMessage() + "\ndetails:" + ae.getResponseBody(); - throw new TopologyRuntimeManagementException(message, ae); - } - final int currentContainerCount = statefulSet.getSpec().getReplicas(); - final int newContainerCount = currentContainerCount + containersToAdd.size(); - - final V1beta1StatefulSetSpec newSpec = new V1beta1StatefulSetSpec(); - newSpec.setReplicas(newContainerCount); - - try { - doPatch(newSpec); - } catch (ApiException ae) { - throw new TopologyRuntimeManagementException( - ae.getMessage() + "\ndetails\n" + ae.getResponseBody()); - } - - return containersToAdd; - } - - @Override - public void removeContainers(Set containersToRemove) { - final V1beta1StatefulSet statefulSet; - try { - statefulSet = getStatefulSet(); - } catch (ApiException ae) { - final String message = ae.getMessage() + "\ndetails:" + ae.getResponseBody(); - throw new TopologyRuntimeManagementException(message, ae); - } - final int currentContainerCount = statefulSet.getSpec().getReplicas(); - final int newContainerCount = currentContainerCount - containersToRemove.size(); - - final V1beta1StatefulSetSpec newSpec = new V1beta1StatefulSetSpec(); - newSpec.setReplicas(newContainerCount); - - try { - doPatch(newSpec); - } catch (ApiException e) { - throw new TopologyRuntimeManagementException( - e.getMessage() + "\ndetails\n" + e.getResponseBody()); - } - } - - private void doPatch(V1beta1StatefulSetSpec patchedSpec) throws ApiException { - final String body = - String.format(JSON_PATCH_STATEFUL_SET_REPLICAS_FORMAT, - patchedSpec.getReplicas().toString()); - final ArrayList arr = new ArrayList<>(); - arr.add(((JsonElement) deserialize(body, JsonElement.class)).getAsJsonObject()); - LOG.fine("Update body: " + arr); - client.patchNamespacedStatefulSet(getTopologyName(), getNamespace(), arr, null); - } - - private static final String JSON_PATCH_STATEFUL_SET_REPLICAS_FORMAT = - "{\"op\":\"replace\",\"path\":\"/spec/replicas\",\"value\":%s}"; - - private Object deserialize(String jsonStr, Class targetClass) { - return (new Gson()).fromJson(jsonStr, targetClass); - } - - V1beta1StatefulSet getStatefulSet() throws ApiException { - return client.readNamespacedStatefulSet(getTopologyName(), getNamespace(), null, null, null); - } - - boolean deleteStatefulSet() { - try { - final V1DeleteOptions options = new V1DeleteOptions(); - options.setGracePeriodSeconds(0L); - options.setPropagationPolicy(KubernetesConstants.DELETE_OPTIONS_PROPAGATION_POLICY); - final Response response = client.deleteNamespacedStatefulSetCall(getTopologyName(), - getNamespace(), options, null, null, null, null, null, null) - .execute(); - - if (!response.isSuccessful()) { - LOG.log(Level.SEVERE, "Error killing topology message: " + response.message()); - KubernetesUtils.logResponseBodyIfPresent(LOG, response); - - throw new TopologyRuntimeManagementException( - KubernetesUtils.errorMessageFromResponse(response)); - } - } catch (IOException | ApiException e) { - KubernetesUtils.logExceptionWithDetails(LOG, "Error deleting topology", e); - return false; - } - - return true; - } - - boolean isStatefulSet() { - try { - final Response response = - client.readNamespacedStatefulSetCall(getTopologyName(), getNamespace(), - null, null, null, null, null) - .execute(); - return response.isSuccessful(); - } catch (IOException | ApiException e) { - LOG.warning("isStatefulSet check " + e.getMessage()); - } - return false; - } - - protected List getExecutorCommand(String containerId) { - final Map ports = - KubernetesConstants.EXECUTOR_PORTS.entrySet() - .stream() - .collect(Collectors.toMap(Map.Entry::getKey, - e -> e.getValue().toString())); - - final Config configuration = getConfiguration(); - final Config runtimeConfiguration = getRuntimeConfiguration(); - final String[] executorCommand = - SchedulerUtils.getExecutorCommand(configuration, runtimeConfiguration, - containerId, ports); - return Arrays.asList( - "sh", - "-c", - KubernetesUtils.getConfCommand(configuration) - + " && " + KubernetesUtils.getFetchCommand(configuration, runtimeConfiguration) - + " && " + setShardIdEnvironmentVariableCommand() - + " && " + String.join(" ", executorCommand) - ); - } - - private static String setShardIdEnvironmentVariableCommand() { - return String.format("%s=${POD_NAME##*-} && echo shardId=${%s}", ENV_SHARD_ID, ENV_SHARD_ID); - } - - - private V1beta1StatefulSet createStatefulSet(Resource containerResource, int numberOfInstances) { - final String topologyName = getTopologyName(); - final Config runtimeConfiguration = getRuntimeConfiguration(); - - final V1beta1StatefulSet statefulSet = new V1beta1StatefulSet(); - - // setup stateful set metadata - final V1ObjectMeta objectMeta = new V1ObjectMeta(); - objectMeta.name(topologyName); - statefulSet.metadata(objectMeta); - - // create the stateful set spec - final V1beta1StatefulSetSpec statefulSetSpec = new V1beta1StatefulSetSpec(); - statefulSetSpec.serviceName(topologyName); - statefulSetSpec.setReplicas(Runtime.numContainers(runtimeConfiguration).intValue()); - - // Parallel pod management tells the StatefulSet controller to launch or terminate - // all Pods in parallel, and not to wait for Pods to become Running and Ready or completely - // terminated prior to launching or terminating another Pod. - statefulSetSpec.setPodManagementPolicy("Parallel"); - - // add selector match labels "app=heron" and "topology=topology-name" - // so the we know which pods to manage - final V1LabelSelector selector = new V1LabelSelector(); - selector.matchLabels(getMatchLabels(topologyName)); - statefulSetSpec.selector(selector); - - // create a pod template - final V1PodTemplateSpec podTemplateSpec = new V1PodTemplateSpec(); - - // set up pod meta - final V1ObjectMeta templateMetaData = new V1ObjectMeta().labels(getLabels(topologyName)); - templateMetaData.annotations(getPrometheusAnnotations()); - podTemplateSpec.setMetadata(templateMetaData); - - final List command = getExecutorCommand("$" + ENV_SHARD_ID); - podTemplateSpec.spec(getPodSpec(command, containerResource, numberOfInstances)); - - statefulSetSpec.setTemplate(podTemplateSpec); - - statefulSet.spec(statefulSetSpec); - - return statefulSet; - } - - private Map getPrometheusAnnotations() { - final Map annotations = new HashMap<>(); - annotations.put(KubernetesConstants.ANNOTATION_PROMETHEUS_SCRAPE, "true"); - annotations.put(KubernetesConstants.ANNOTATION_PROMETHEUS_PORT, - KubernetesConstants.PROMETHEUS_PORT); - - return annotations; - } - - private Map getMatchLabels(String topologyName) { - final Map labels = new HashMap<>(); - labels.put(KubernetesConstants.LABEL_APP, KubernetesConstants.LABEL_APP_VALUE); - labels.put(KubernetesConstants.LABEL_TOPOLOGY, topologyName); - return labels; - } - - private Map getLabels(String topologyName) { - final Map labels = new HashMap<>(); - labels.put(KubernetesConstants.LABEL_APP, KubernetesConstants.LABEL_APP_VALUE); - labels.put(KubernetesConstants.LABEL_TOPOLOGY, topologyName); - return labels; - } - - private V1PodSpec getPodSpec(List executorCommand, Resource resource, - int numberOfInstances) { - final V1PodSpec podSpec = new V1PodSpec(); - - // set the termination period to 0 so pods can be deleted quickly - podSpec.setTerminationGracePeriodSeconds(0L); - - // set the pod tolerations so pods are rescheduled when nodes go down - // https://kubernetes.io/docs/concepts/configuration/taint-and-toleration/#taint-based-evictions - podSpec.setTolerations(getTolerations()); - - podSpec.containers(Collections.singletonList( - getContainer(executorCommand, resource, numberOfInstances))); - - addVolumesIfPresent(podSpec); - - return podSpec; - } - - private List getTolerations() { - final List tolerations = new ArrayList<>(); - KubernetesConstants.TOLERATIONS.forEach(t -> { - final V1Toleration toleration = - new V1Toleration() - .key(t) - .operator("Exists") - .effect("NoExecute") - .tolerationSeconds(10L); - tolerations.add(toleration); - }); - - return tolerations; - } - - private void addVolumesIfPresent(V1PodSpec spec) { - final Config config = getConfiguration(); - if (KubernetesContext.hasVolume(config)) { - final V1Volume volume = Volumes.get().create(config); - if (volume != null) { - LOG.fine("Adding volume: " + volume.toString()); - spec.volumes(Collections.singletonList(volume)); - } - } - } - - private V1Container getContainer(List executorCommand, Resource resource, - int numberOfInstances) { - final Config configuration = getConfiguration(); - final V1Container container = new V1Container().name("executor"); - - // set up the container images - container.setImage(KubernetesContext.getExecutorDockerImage(configuration)); - - // set up the container command - container.setCommand(executorCommand); - - if (KubernetesContext.hasImagePullPolicy(configuration)) { - container.setImagePullPolicy(KubernetesContext.getKubernetesImagePullPolicy(configuration)); - } - - // setup the environment variables for the container - final V1EnvVar envVarHost = new V1EnvVar(); - envVarHost.name(KubernetesConstants.ENV_HOST) - .valueFrom(new V1EnvVarSource() - .fieldRef(new V1ObjectFieldSelector() - .fieldPath(KubernetesConstants.POD_IP))); - - final V1EnvVar envVarPodName = new V1EnvVar(); - envVarPodName.name(KubernetesConstants.ENV_POD_NAME) - .valueFrom(new V1EnvVarSource() - .fieldRef(new V1ObjectFieldSelector() - .fieldPath(KubernetesConstants.POD_NAME))); - container.setEnv(Arrays.asList(envVarHost, envVarPodName)); - - - // set container resources - final V1ResourceRequirements resourceRequirements = new V1ResourceRequirements(); - final Map requests = new HashMap<>(); - requests.put(KubernetesConstants.MEMORY, - KubernetesUtils.Megabytes(resource.getRam())); - requests.put(KubernetesConstants.CPU, Double.toString(resource.getCpu())); - resourceRequirements.setRequests(requests); - container.setResources(resourceRequirements); - - // set container ports - final boolean debuggingEnabled = - TopologyUtils.getTopologyRemoteDebuggingEnabled( - Runtime.topology(getRuntimeConfiguration())); - container.setPorts(getContainerPorts(debuggingEnabled, numberOfInstances)); - - // setup volume mounts - mountVolumeIfPresent(container); - - return container; - } - - private List getContainerPorts(boolean remoteDebugEnabled, - int numberOfInstances) { - List ports = new ArrayList<>(); - KubernetesConstants.EXECUTOR_PORTS.forEach((p, v) -> { - final V1ContainerPort port = new V1ContainerPort(); - port.setName(p.getName()); - port.setContainerPort(v); - ports.add(port); - }); - - - if (remoteDebugEnabled) { - IntStream.range(0, numberOfInstances).forEach(i -> { - final String portName = - KubernetesConstants.JVM_REMOTE_DEBUGGER_PORT_NAME + "-" + String.valueOf(i); - final V1ContainerPort port = new V1ContainerPort(); - port.setName(portName); - port.setContainerPort(KubernetesConstants.JVM_REMOTE_DEBUGGER_PORT + i); - ports.add(port); - }); - } - - return ports; - } - - private void mountVolumeIfPresent(V1Container container) { - final Config config = getConfiguration(); - if (KubernetesContext.hasContainerVolume(config)) { - final V1VolumeMount mount = - new V1VolumeMount() - .name(KubernetesContext.getContainerVolumeName(config)) - .mountPath(KubernetesContext.getContainerVolumeMountPath(config)); - container.volumeMounts(Collections.singletonList(mount)); - } - } -} diff --git a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesCompat.java b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesCompat.java index b7a7005c96b..4197535a4ce 100644 --- a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesCompat.java +++ b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesCompat.java @@ -27,9 +27,9 @@ import org.apache.heron.scheduler.TopologyRuntimeManagementException; -import io.kubernetes.client.openapi.ApiClient; -import io.kubernetes.client.openapi.ApiException; -import io.kubernetes.client.openapi.api.CoreV1Api; +import io.kubernetes.client.ApiClient; +import io.kubernetes.client.ApiException; +import io.kubernetes.client.apis.CoreV1Api; public class KubernetesCompat { diff --git a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/Volumes.java b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/Volumes.java index 90f15bd7334..32194af8bce 100644 --- a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/Volumes.java +++ b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/Volumes.java @@ -24,10 +24,10 @@ import org.apache.heron.spi.common.Config; -import io.kubernetes.client.openapi.models.V1AWSElasticBlockStoreVolumeSource; -import io.kubernetes.client.openapi.models.V1HostPathVolumeSource; -import io.kubernetes.client.openapi.models.V1NFSVolumeSource; -import io.kubernetes.client.openapi.models.V1Volume; +import io.kubernetes.client.models.V1AWSElasticBlockStoreVolumeSource; +import io.kubernetes.client.models.V1HostPathVolumeSource; +import io.kubernetes.client.models.V1NFSVolumeSource; +import io.kubernetes.client.models.V1Volume; final class Volumes { diff --git a/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/VolumesTests.java b/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/VolumesTests.java index 95f82f44e5f..32ff5ecea58 100644 --- a/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/VolumesTests.java +++ b/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/VolumesTests.java @@ -24,7 +24,7 @@ import org.apache.heron.spi.common.Config; -import io.kubernetes.client.openapi.models.V1Volume; +import io.kubernetes.client.models.V1Volume; public class VolumesTests { From 6d5b48b4630984cfd9d53b963c009d76ba11657b Mon Sep 17 00:00:00 2001 From: Nicholas Nezis Date: Mon, 2 Dec 2019 08:40:58 -0500 Subject: [PATCH 4/5] Updated Kubernetes client version --- WORKSPACE | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/WORKSPACE b/WORKSPACE index 5d55887b3fa..74cb167c27d 100644 --- a/WORKSPACE +++ b/WORKSPACE @@ -537,7 +537,7 @@ maven_jar( # end Pulsar Client # Kubernetes java client -kubernetes_client_version = "2.0.0" +kubernetes_client_version = "3.0.0" squareup_okhttp_version = "2.7.5" maven_jar( From 09d0ba4c3025fd563b6d0df13740f2f54bab6545 Mon Sep 17 00:00:00 2001 From: Nicholas Nezis Date: Wed, 4 Dec 2019 00:02:30 -0500 Subject: [PATCH 5/5] Removing unused import --- .../org/apache/heron/scheduler/kubernetes/AppsV1Controller.java | 1 - 1 file changed, 1 deletion(-) diff --git a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/AppsV1Controller.java b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/AppsV1Controller.java index 62bb24cc856..58ac941561e 100644 --- a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/AppsV1Controller.java +++ b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/AppsV1Controller.java @@ -51,7 +51,6 @@ import io.kubernetes.client.ApiException; import io.kubernetes.client.apis.AppsV1Api; import io.kubernetes.client.custom.Quantity; -import io.kubernetes.client.custom.Quantity.Format; import io.kubernetes.client.models.V1Container; import io.kubernetes.client.models.V1ContainerPort; import io.kubernetes.client.models.V1DeleteOptions;