From 340aaaf1fe821e5bbe324c7939d34e9d2310057f Mon Sep 17 00:00:00 2001 From: Nick Dimiduk Date: Tue, 17 Jan 2023 17:03:40 +0100 Subject: [PATCH] HBASE-27574 Implement ClusterManager interface for Kubernetes A basic implementation that supports taking destructive actions. Assume that services are running behind a resilient `Deployment` of some kind, and that the cluster will handle starting up replacement processes. Requires specification of a scoping namespace. --- hbase-it/pom.xml | 19 ++ .../hbase/KubernetesClusterManager.java | 258 ++++++++++++++++++ 2 files changed, 277 insertions(+) create mode 100644 hbase-it/src/test/java/org/apache/hadoop/hbase/KubernetesClusterManager.java diff --git a/hbase-it/pom.xml b/hbase-it/pom.xml index e6bb502b0362..1dde07924604 100644 --- a/hbase-it/pom.xml +++ b/hbase-it/pom.xml @@ -32,6 +32,7 @@ Integration and System tests for HBase + 17.0.0 **/Test*.java **/IntegrationTest*.java @@ -174,6 +175,24 @@ javax.servlet-api test + + io.kubernetes + client-java + ${k8s.version} + test + + + io.kubernetes + client-java-api + ${k8s.version} + test + + + com.google.code.findbugs + jsr305 + + + junit junit diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/KubernetesClusterManager.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/KubernetesClusterManager.java new file mode 100644 index 000000000000..72189cd951dd --- /dev/null +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/KubernetesClusterManager.java @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import com.google.gson.JsonSyntaxException; +import io.kubernetes.client.openapi.ApiClient; +import io.kubernetes.client.openapi.ApiException; +import io.kubernetes.client.openapi.apis.CoreV1Api; +import io.kubernetes.client.openapi.models.V1ObjectMeta; +import io.kubernetes.client.openapi.models.V1Pod; +import io.kubernetes.client.openapi.models.V1PodList; +import io.kubernetes.client.util.ClientBuilder; +import java.io.IOException; +import java.util.Collections; +import java.util.HashSet; +import java.util.Optional; +import java.util.Set; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.hbase.procedure2.util.StringUtils; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Cluster manager for K8s context. Supports taking destructive actions, and checking for the + * presence of a running process. Assumes services are running behind some type of `Deployment` that + * will handle starting replacement processes after a destructive action. Requires that the + * configuration specify a Kubernetes namespace. + *

+ * Note that the k8s java client is a bit dodgy unable to read responses when we call delete (i.e. + * kill) and sometimes when checking isRunning; makes for noisy logs and sometimes the operations + * overrun each other but generally succeed. + */ +@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) +public class KubernetesClusterManager extends Configured implements ClusterManager { + private static final Logger LOG = LoggerFactory.getLogger(KubernetesClusterManager.class); + + private static final String NAMESPACE_CONF_KEY = + KubernetesClusterManager.class.getCanonicalName() + ".namespace"; + private static final String ZOOKEEPER_ROLE_SELECTOR_CONF_KEY = + KubernetesClusterManager.class.getCanonicalName() + ".zookeeper_role"; + private static final String NAMENODE_ROLE_SELECTOR_CONF_KEY = + KubernetesClusterManager.class.getCanonicalName() + ".namenode_role"; + private static final String JOURNALNODE_ROLE_SELECTOR_CONF_KEY = + KubernetesClusterManager.class.getCanonicalName() + ".journalnode_role"; + private static final String DATANODE_ROLE_SELECTOR_CONF_KEY = + KubernetesClusterManager.class.getCanonicalName() + ".datanode_role"; + private static final String MASTER_ROLE_SELECTOR_CONF_KEY = + KubernetesClusterManager.class.getCanonicalName() + ".master_role"; + private static final String REGIONSERVER_ROLE_SELECTOR_CONF_KEY = + KubernetesClusterManager.class.getCanonicalName() + ".regionserver_role"; + private static final Set SUPPORTED_SERVICE_TYPES = buildSupportedServiceTypesSet(); + + private CoreV1Api api; + private String namespace; + + private static Set buildSupportedServiceTypesSet() { + final Set set = new HashSet<>(); + set.add(ServiceType.ZOOKEEPER_SERVER); + set.add(ServiceType.HADOOP_NAMENODE); + set.add(ServiceType.HADOOP_JOURNALNODE); + set.add(ServiceType.HADOOP_DATANODE); + set.add(ServiceType.HBASE_MASTER); + set.add(ServiceType.HBASE_REGIONSERVER); + return Collections.unmodifiableSet(set); + } + + @Override + public void setConf(Configuration configuration) { + // This is usually called from the constructor (The call to super goes to + // Configured which does a setConf in its construction) but the configuration + // null at that time. After construction, setConf is called again. + // Assume single-thread doing ClusterManager setup. + if (configuration == null) { + LOG.debug("Skipping because provided configuration=null"); + return; + } + if (getConf() != null) { + LOG.debug("Skipping because configuration already set, getConf={}", getConf()); + return; + } + super.setConf(configuration); + this.namespace = configuration.get(NAMESPACE_CONF_KEY); + LOG.info( + "Configuration={}, namespace={}, hbase.rootdir={}, hbase.zookeeper.quorum={}, " + + "hbase.client.zookeeper.quorum={}", + configuration, this.namespace, configuration.get("hbase.rootdir"), + configuration.get("hbase.zookeeper.quorum"), + configuration.get("hbase.client.zookeeper.quorum")); + final CoreV1Api coreV1Api; + try { + ApiClient client = ClientBuilder.cluster().build(); + io.kubernetes.client.openapi.Configuration.setDefaultApiClient(client); + coreV1Api = new CoreV1Api(); + } catch (IOException ioe) { + throw new RuntimeException("Failed Kubernetes ApiClient construction", ioe); + } + this.api = coreV1Api; + } + + @Override + public void start(ServiceType service, String hostname, int port) { + assertSupportedServiceType(service); + // presumably kubernetes is automatically starting pods, so nothing to do here. + } + + @Override + public void stop(ServiceType service, String hostname, int port) throws IOException { + assertSupportedServiceType(service); + kill(hostname); + } + + @Override + public void restart(ServiceType service, String hostname, int port) throws IOException { + assertSupportedServiceType(service); + kill(hostname); + } + + @Override + public void kill(ServiceType service, String hostname, int port) throws IOException { + assertSupportedServiceType(service); + kill(hostname); + } + + @Override + public void suspend(ServiceType service, String hostname, int port) { + throw unsupportedActionType("suspend"); + } + + @Override + public void resume(ServiceType service, String hostname, int port) { + throw unsupportedActionType("resume"); + } + + @Override + public boolean isRunning(ServiceType service, String hostname, int port) throws IOException { + final String roleSelector = roleSelectorForServiceType(service); + return isRunning(roleSelector, hostname); + } + + private String roleSelectorForServiceType(final ServiceType serviceType) { + assertSupportedServiceType(serviceType); + switch (serviceType) { + case HADOOP_NAMENODE: + return getConf().get(NAMENODE_ROLE_SELECTOR_CONF_KEY, "namenode"); + case HADOOP_DATANODE: + return getConf().get(DATANODE_ROLE_SELECTOR_CONF_KEY, "datanode"); + case HADOOP_JOURNALNODE: + return getConf().get(JOURNALNODE_ROLE_SELECTOR_CONF_KEY, "journalnode"); + case ZOOKEEPER_SERVER: + return getConf().get(ZOOKEEPER_ROLE_SELECTOR_CONF_KEY, "zookeeper"); + case HBASE_MASTER: + return getConf().get(MASTER_ROLE_SELECTOR_CONF_KEY, "master"); + case HBASE_REGIONSERVER: + return getConf().get(REGIONSERVER_ROLE_SELECTOR_CONF_KEY, "regionserver"); + default: + throw new RuntimeException("should not happen"); + } + } + + private boolean isRunning(String roleSelector, String hostname) throws IOException { + if (api == null) { + return false; + } + final V1PodList list; + try { + list = api.listNamespacedPod(namespace, null, null, null, null, + "role in (" + roleSelector + ")", null, null, null, null, null); + } catch (ApiException e) { + throw convertApiException(e); + } + if (list == null) { + return false; + } + for (V1Pod item : list.getItems()) { + final String podName = + Optional.ofNullable(item).map(V1Pod::getMetadata).map(V1ObjectMeta::getName).orElse(null); + if (StringUtils.isEmpty(podName)) { + LOG.warn("Listing of namespace '{}' contains entry with empty pod name.", namespace); + continue; + } + if (hostname.startsWith(podName)) { + return true; + } + } + return false; + } + + /** + * @param hostname fully qualified hostname of the target pod. + * @return The pod name without the domain. + */ + private static String hostNameToPodName(String hostname) { + return hostname.substring(0, hostname.indexOf(".")); + } + + private void kill(String hostname) throws IOException { + try { + final String podName = hostNameToPodName(hostname); + LOG.debug("Deleting pod: {}.{}", namespace, podName); + api.deleteNamespacedPod(podName, namespace, null, null, null, null, null, null); + } catch (JsonSyntaxException e) { + handleJsonSyntaxException(e); + } catch (ApiException e) { + throw convertApiException(e); + } + } + + /** + * See kubernetes-client/java#86 + */ + private void handleJsonSyntaxException(JsonSyntaxException e) throws JsonSyntaxException { + if (!(e.getCause() instanceof IllegalStateException)) { + throw e; + } + final IllegalStateException ise = (IllegalStateException) e.getCause(); + if ( + ise.getMessage() != null && ise.getMessage().contains("BEGIN_OBJECT") + && ise.getMessage().contains("Expected") + ) { + LOG.info("Operation probably succeeded but parse of result failed, " + + "see https://github.com/kubernetes-client/java/issues/86"); + } else { + throw e; + } + } + + private static IOException convertApiException(final ApiException e) { + return new IOException( + "response body: " + e.getResponseBody() + ", response code: " + e.getCode(), e); + } + + private static RuntimeException unsupportedActionType(final String action) { + return new RuntimeException("Unable to service request for action=" + action); + } + + private static void assertSupportedServiceType(final ServiceType serviceType) { + if (!SUPPORTED_SERVICE_TYPES.contains(serviceType)) { + throw new RuntimeException("Unsupported ServiceType " + serviceType); + } + } +}