diff --git a/console/pom.xml b/console/pom.xml index fb3785c88c5..27b8875c295 100644 --- a/console/pom.xml +++ b/console/pom.xml @@ -50,6 +50,10 @@ ${project.groupId} nacos-istio + + ${project.groupId} + nacos-k8s-sync + diff --git a/console/src/main/resources/application.properties b/console/src/main/resources/application.properties index 273d6dcd38e..7d7f6fd0448 100644 --- a/console/src/main/resources/application.properties +++ b/console/src/main/resources/application.properties @@ -151,7 +151,13 @@ nacos.core.auth.plugin.nacos.token.secret.key=SecretKey0123456789012345678901234 ### If turn on the MCP server: nacos.istio.mcp.server.enabled=false +#*************** K8s Related Configurations ***************# +### If turn on the K8s sync: +nacos.k8s.sync.enabled=false +### If use the Java API from an application outside a kubernetes cluster +#nacos.k8s.sync.outsideCluster=false +#nacos.k8s.sync.kubeConfig=/.kube/config ###*************** Add from 1.3.0 ***************### diff --git a/distribution/conf/application.properties b/distribution/conf/application.properties index 60b24d8232e..d5e71aa755b 100644 --- a/distribution/conf/application.properties +++ b/distribution/conf/application.properties @@ -177,6 +177,14 @@ nacos.core.auth.plugin.nacos.token.secret.key=SecretKey0123456789012345678901234 ### If turn on the MCP server: nacos.istio.mcp.server.enabled=false +#*************** K8s Related Configurations ***************# +### If turn on the K8s sync: +nacos.k8s.sync.enabled=false + +### If use the Java API from an application outside a kubernetes cluster +#nacos.k8s.sync.outsideCluster=false +#nacos.k8s.sync.kubeConfig=/.kube/config + #*************** Core Related Configurations ***************# ### set the WorkerID manually diff --git a/distribution/conf/application.properties.example b/distribution/conf/application.properties.example index e4359aa0842..1e93fcd707a 100644 --- a/distribution/conf/application.properties.example +++ b/distribution/conf/application.properties.example @@ -178,6 +178,14 @@ nacos.core.auth.server.identity.value=security ### If turn on the MCP server: nacos.istio.mcp.server.enabled=false +#*************** K8s Related Configurations ***************# +### If turn on the K8s sync: +nacos.k8s.sync.enabled=false + +### If use the Java API from an application outside a kubernetes cluster +#nacos.k8s.sync.outsideCluster=false +#nacos.k8s.sync.kubeConfig=/.kube/config + #*************** Core Related Configurations ***************# ### set the WorkerID manually diff --git a/distribution/conf/nacos-logback.xml b/distribution/conf/nacos-logback.xml index 15f7623a2ab..a540a7bfc8c 100644 --- a/distribution/conf/nacos-logback.xml +++ b/distribution/conf/nacos-logback.xml @@ -582,6 +582,23 @@ + + ${LOG_HOME}/k8s-sync-main.log + true + + ${LOG_HOME}/k8s-sync-main.log.%d{yyyy-MM-dd}.%i + 2GB + 7 + 7GB + true + + + %date %level %msg%n%n + UTF-8 + + + @@ -702,6 +719,11 @@ + + + + + diff --git a/k8s-sync/pom.xml b/k8s-sync/pom.xml new file mode 100644 index 00000000000..10fe2050bfc --- /dev/null +++ b/k8s-sync/pom.xml @@ -0,0 +1,59 @@ + + + + + + nacos-all + com.alibaba.nacos + ${revision} + ../pom.xml + + + 4.0.0 + nacos-k8s-sync + jar + + nacos-k8s-sync ${project.version} + http://nacos.io + + + + ${project.groupId} + nacos-core + + + ${project.groupId} + nacos-naming + + + + io.kubernetes + client-java-api + + + io.kubernetes + client-java + + + org.springframework + spring-beans + 5.3.20 + compile + + + \ No newline at end of file diff --git a/k8s-sync/src/main/java/com/alibaba/nacos/k8s/sync/K8sSyncConfig.java b/k8s-sync/src/main/java/com/alibaba/nacos/k8s/sync/K8sSyncConfig.java new file mode 100644 index 00000000000..95fe9fe36f8 --- /dev/null +++ b/k8s-sync/src/main/java/com/alibaba/nacos/k8s/sync/K8sSyncConfig.java @@ -0,0 +1,49 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.k8s.sync; + +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +/** + * Configurations for k8s integration. + * + * @author EmanuelGi + */ +@Component +public class K8sSyncConfig { + @Value("${nacos.k8s.sync.enabled:false}") + private boolean enabled = false; + + @Value("${nacos.k8s.sync.outsideCluster:false}") + private boolean outsideCluster = false; + + @Value("${nacos.k8s.sync.kubeConfig:}") + private String kubeConfig; + + public boolean isEnabled() { + return enabled; + } + + public boolean isOutsideCluster() { + return outsideCluster; + } + + public String getKubeConfig() { + return kubeConfig; + } +} diff --git a/k8s-sync/src/main/java/com/alibaba/nacos/k8s/sync/K8sSyncServer.java b/k8s-sync/src/main/java/com/alibaba/nacos/k8s/sync/K8sSyncServer.java new file mode 100644 index 00000000000..65c40ceec45 --- /dev/null +++ b/k8s-sync/src/main/java/com/alibaba/nacos/k8s/sync/K8sSyncServer.java @@ -0,0 +1,438 @@ +/* + * Copyright 1999-2022 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.k8s.sync; + +import com.alibaba.nacos.api.common.Constants; +import com.alibaba.nacos.api.exception.NacosException; +import com.alibaba.nacos.api.naming.pojo.Instance; +import com.alibaba.nacos.naming.core.InstanceOperatorClientImpl; +import com.alibaba.nacos.naming.core.ServiceOperatorV2Impl; +import com.alibaba.nacos.naming.core.v2.ServiceManager; +import com.alibaba.nacos.naming.core.v2.pojo.Service; +import io.kubernetes.client.informer.ResourceEventHandler; +import io.kubernetes.client.informer.SharedIndexInformer; +import io.kubernetes.client.informer.SharedInformerFactory; +import io.kubernetes.client.informer.cache.Lister; +import io.kubernetes.client.openapi.ApiClient; +import io.kubernetes.client.openapi.Configuration; +import io.kubernetes.client.openapi.apis.CoreV1Api; +import io.kubernetes.client.openapi.models.V1EndpointAddress; +import io.kubernetes.client.openapi.models.V1EndpointSubset; +import io.kubernetes.client.openapi.models.V1Endpoints; +import io.kubernetes.client.openapi.models.V1EndpointsList; +import io.kubernetes.client.openapi.models.V1Service; +import io.kubernetes.client.openapi.models.V1ServiceList; +import io.kubernetes.client.openapi.models.V1ServicePort; +import io.kubernetes.client.util.CallGeneratorParams; +import io.kubernetes.client.util.ClientBuilder; +import io.kubernetes.client.util.KubeConfig; +import okhttp3.OkHttpClient; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; +import java.io.FileReader; +import java.io.IOException; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * Start and stop k8s-sync. + * + * @author EmanuelGi + */ +@Component +public class K8sSyncServer { + + @Autowired + private K8sSyncConfig k8sSyncConfig; + + @Autowired + private ServiceOperatorV2Impl serviceOperatorV2; + + @Autowired + private InstanceOperatorClientImpl instanceOperatorClient; + + private SharedInformerFactory factory; + + /** + * start. + * + * @throws IOException io exception + */ + @PostConstruct + public void start() throws IOException { + if (!k8sSyncConfig.isEnabled()) { + Loggers.MAIN.info("The Nacos k8s-sync is disabled."); + return; + } + Loggers.MAIN.info("Starting Nacos k8s-sync ..."); + startInformer(); + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + Loggers.MAIN.info("Stopping Nacos k8s-sync ..."); + K8sSyncServer.this.stop(); + Loggers.MAIN.info("Nacos k8s-sync stopped..."); + } + }); + } + + /** + * start informer. + * + * @throws IOException io exception + */ + public void startInformer() throws IOException { + ApiClient apiClient; + CoreV1Api coreV1Api; + + if (k8sSyncConfig.isOutsideCluster()) { + apiClient = getOutsideApiClient(); + coreV1Api = new CoreV1Api(); + } else { + coreV1Api = new CoreV1Api(); + apiClient = coreV1Api.getApiClient(); + } + + OkHttpClient httpClient = apiClient.getHttpClient().newBuilder().build(); + apiClient.setHttpClient(httpClient); + + factory = new SharedInformerFactory(apiClient); + SharedIndexInformer serviceInformer = + factory.sharedIndexInformerFor( + (CallGeneratorParams params) -> { + return coreV1Api.listServiceForAllNamespacesCall( + null, + null, + null, + null, + null, + null, + params.resourceVersion, + null, + params.timeoutSeconds, + params.watch, + null); + }, + V1Service.class, + V1ServiceList.class); + + SharedIndexInformer endpointInformer = + factory.sharedIndexInformerFor( + (CallGeneratorParams params) -> { + return coreV1Api.listEndpointsForAllNamespacesCall( + null, + null, + null, + null, + null, + null, + params.resourceVersion, + null, + params.timeoutSeconds, + params.watch, + null); + }, + V1Endpoints.class, + V1EndpointsList.class); + + serviceInformer.addEventHandler( + new ResourceEventHandler() { + @Override + public void onAdd(V1Service service) { + if (service.getMetadata() == null || service.getSpec() == null) { + return; + } + String serviceName = service.getMetadata().getName(); + String namespace = service.getMetadata().getNamespace(); + List servicePorts = service.getSpec().getPorts(); + try { + registerService(namespace, serviceName, servicePorts, false, endpointInformer); + Loggers.MAIN.info("add service, namespace:" + namespace + " serviceName: " + serviceName); + } catch (Exception e) { + Loggers.MAIN.warn("add service fail, message:" + e.getMessage() + " namespace:" + + namespace + " serviceName: " + serviceName); + } + } + + @Override + public void onUpdate(V1Service oldService, V1Service newService) { + if (oldService.getMetadata() == null || oldService.getSpec() == null + || newService.getMetadata() == null || newService.getSpec() == null) { + return; + } + List oldServicePorts = oldService.getSpec().getPorts(); + String serviceName = newService.getMetadata().getName(); + String namespace = newService.getMetadata().getNamespace(); + List newServicePorts = newService.getSpec().getPorts(); + boolean portChanged = compareServicePorts(oldServicePorts, newServicePorts); + try { + registerService(namespace, serviceName, newServicePorts, portChanged, endpointInformer); + Loggers.MAIN.info("update service, namespace: " + namespace + " serviceName: " + serviceName); + } catch (Exception e) { + Loggers.MAIN.warn("update service fail, message: " + e.getMessage() + " namespace: " + + namespace + " serviceName: " + serviceName); + } + } + + @Override + public void onDelete(V1Service service, boolean deletedFinalStateUnknown) { + if (service.getMetadata() == null) { + return; + } + String serviceName = service.getMetadata().getName(); + String namespace = service.getMetadata().getNamespace(); + try { + unregisterService(namespace, serviceName); + Loggers.MAIN.info("delete service, namespace:" + namespace + " serviceName:" + serviceName); + } catch (Exception e) { + Loggers.MAIN.warn("delete service fail, message: " + e.getMessage() + + " namespace:" + namespace + " serviceName:" + serviceName); + } + } + }); + + endpointInformer.addEventHandler(new ResourceEventHandler() { + @Override + public void onAdd(V1Endpoints obj) { + if (obj.getMetadata() == null) { + return; + } + String serviceName = obj.getMetadata().getName(); + String namespace = obj.getMetadata().getNamespace(); + Set addIpSet = getIpFromEndpoints(obj); + + //TODO 因为需要指定namespace,这里servicelister需要重新new,是否可以优化,比如说作为单例的放到map中 + Lister serviceLister = new Lister<>(serviceInformer.getIndexer(), namespace); + V1Service service = serviceLister.get(serviceName); + List servicePorts = service.getSpec().getPorts(); + try { + registerInstances(addIpSet, namespace, serviceName, servicePorts); + Loggers.MAIN.info("add instances, namespace:" + namespace + " serviceName: " + serviceName); + } catch (NacosException e) { + Loggers.MAIN.warn("add instances fail, message:" + e.getMessage() + " namespace:" + namespace + ", serviceName: " + serviceName); + } + } + + @Override + public void onUpdate(V1Endpoints oldObj, V1Endpoints newObj) { + if (newObj.getMetadata() == null) { + return; + } + String serviceName = newObj.getMetadata().getName(); + String namespace = newObj.getMetadata().getNamespace(); + Lister serviceLister = new Lister<>(serviceInformer.getIndexer(), namespace); + V1Service service = serviceLister.get(serviceName); + List servicePorts = service.getSpec().getPorts(); + try { + registerService(namespace, serviceName, servicePorts, false, endpointInformer); + Loggers.MAIN.info("update instances, namespace:" + namespace + " serviceName: " + serviceName); + } catch (NacosException e) { + Loggers.MAIN.warn("update instances fail, message:" + e.getMessage() + " namespace:" + + namespace + ", serviceName: " + serviceName); + } + } + + @Override + public void onDelete(V1Endpoints obj, boolean deletedFinalStateUnknown) { + if (obj.getMetadata() == null) { + return; + } + String serviceName = obj.getMetadata().getName(); + String namespace = obj.getMetadata().getNamespace(); + Set deleteIpSet = getIpFromEndpoints(obj); + try { + List oldInstanceList = instanceOperatorClient.listAllInstances(namespace, serviceName); + unregisterInstances(deleteIpSet, namespace, serviceName, oldInstanceList); + Loggers.MAIN.info("delete instances, namespace:" + namespace + ", serviceName: " + serviceName); + } catch (NacosException e) { + Loggers.MAIN.info("delete instances fail, namespace:" + namespace + ", serviceName: " + serviceName); + } + } + }); + factory.startAllRegisteredInformers(); + } + + /** + * create instance. + * + * @param ip instance ip + * @param targetPort instance port + * @param serviceName service name + * @param port service port + * @return instance + */ + public Instance createInstance(String ip, int targetPort, String serviceName, int port) { + Instance instance = new Instance(); + instance.setIp(ip); + instance.setPort(targetPort); + instance.setClusterName(serviceName); + instance.setEphemeral(false); + instance.setHealthy(true); + instance.addMetadata("servicePort", String.valueOf(port)); + return instance; + } + + /** + * register service. + * + * @param namespace service namespace + * @param serviceName service name + * @param servicePorts service ports + * @param portChanged port is changed or not + * @throws NacosException nacos exception during registering + */ + public void registerService(String namespace, String serviceName, List servicePorts, boolean portChanged, + SharedIndexInformer endpointInformer) throws NacosException { + //TODO defaultnamespace 常量 + + Service service = Service.newService(namespace, Constants.DEFAULT_GROUP, serviceName, false); + ServiceManager.getInstance().getSingleton(service); + + //NotifyCenter.publishEvent(new NamingTraceEvent.RegisterServiceTraceEvent(System.currentTimeMillis(), + // namespace, Constants.DEFAULT_GROUP, serviceName)); + + Set oldIpSet = new HashSet<>(); + List oldInstanceList = instanceOperatorClient.listAllInstances(namespace, serviceName); + for (Instance instance:oldInstanceList) { + oldIpSet.add(instance.getIp()); + } + Lister endpointLister = new Lister<>(endpointInformer.getIndexer(), namespace); + V1Endpoints endpoints = endpointLister.get(serviceName); + Set newIpSet = getIpFromEndpoints(endpoints); + + //unregister deleted instance + Set deleteIpSet = new HashSet<>(); + deleteIpSet.addAll(oldIpSet); + deleteIpSet.removeAll(newIpSet); + unregisterInstances(deleteIpSet, namespace, serviceName, oldInstanceList); + //register added instance + Set addIpSet = new HashSet<>(); + addIpSet.addAll(newIpSet); + if (!portChanged) { + addIpSet.removeAll(oldIpSet); + } + registerInstances(addIpSet, namespace, serviceName, servicePorts); + } + + /** + * unregister service. + * + * @param namespace service namespace + * @param serviceName service name + * @throws NacosException nacos exception during unregistering + */ + public void unregisterService(String namespace, String serviceName) throws NacosException { + List instancelist = instanceOperatorClient.listAllInstances(namespace, serviceName); + for (Instance instance:instancelist) { + instanceOperatorClient.removeInstance(namespace, serviceName, instance); + } + serviceOperatorV2.delete(namespace, serviceName); + } + + /** + * register instances. + * + * @param addIpSet add ip set + * @param namespace service namespace + * @param serviceName service name + * @param servicePorts servie ports + * @throws NacosException nacos exception during registering instances + */ + public void registerInstances(Set addIpSet, String namespace, String serviceName, + List servicePorts) throws NacosException { + for (V1ServicePort servicePort:servicePorts) { + int port = servicePort.getPort(); + if (!servicePort.getTargetPort().isInteger()) { + continue; + } + int targetPort = servicePort.getTargetPort().getIntValue(); + for (String ip:addIpSet) { + Instance instance = createInstance(ip, targetPort, serviceName, port); + instanceOperatorClient.registerInstance(namespace, serviceName, instance); + } + } + //TODO:register instance后是否需要发布事件 + } + + /** + * unregister instances. + * + * @param deleteIpSet delete ip set + * @param namespace service namespace + * @param serviceName service name + * @param oldInstanceList old instance list from nacos service + */ + public void unregisterInstances(Set deleteIpSet, String namespace, String serviceName, + List oldInstanceList) { + for (Instance instance:oldInstanceList) { + if (deleteIpSet.contains(instance.getIp())) { + instanceOperatorClient.removeInstance(namespace, serviceName, instance); + } + } + } + + public Set getIpFromEndpoints(V1Endpoints endpoints) { + Set ipSet = new HashSet<>(); + List endpointSubsetList = endpoints.getSubsets(); + for (V1EndpointSubset endpointSubset:endpointSubsetList) { + for (V1EndpointAddress endpointAddress:endpointSubset.getAddresses()) { + ipSet.add(endpointAddress.getIp()); + } + } + return ipSet; + } + + /** + * compare oldServicePorts and newServicePorts. + * + * @param oldServicePorts old service ports list + * @param newServicePorts new service ports list + */ + public boolean compareServicePorts(List oldServicePorts, List newServicePorts) { + if (oldServicePorts.size() != newServicePorts.size()) { + return false; + } + return oldServicePorts.containsAll(newServicePorts) && newServicePorts.containsAll(oldServicePorts); + } + + /** + * use the Java API from an application outside a kubernetes cluster. + * you should load a kubeConfig to generate apiClient instead of getting it from coreV1api. + */ + public ApiClient getOutsideApiClient() throws IOException { + String kubeConfigPath = k8sSyncConfig.getKubeConfig(); + + // loading the out-of-cluster config, a kubeconfig from file-system + ApiClient apiClient = ClientBuilder.kubeconfig(KubeConfig.loadKubeConfig(new FileReader(kubeConfigPath))).build(); + + // set the global default api-client to the in-cluster one from above + Configuration.setDefaultApiClient(apiClient); + return apiClient; + } + + /** + * stop. + */ + public void stop() { + if (factory != null) { + factory.stopAllRegisteredInformers(); + } + } +} diff --git a/k8s-sync/src/main/java/com/alibaba/nacos/k8s/sync/Loggers.java b/k8s-sync/src/main/java/com/alibaba/nacos/k8s/sync/Loggers.java new file mode 100644 index 00000000000..b4c028d2f09 --- /dev/null +++ b/k8s-sync/src/main/java/com/alibaba/nacos/k8s/sync/Loggers.java @@ -0,0 +1,30 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.k8s.sync; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Loggers Holder. + * + * @author EmanuelGi + */ +public class Loggers { + + public static final Logger MAIN = LoggerFactory.getLogger("com.alibaba.nacos.k8s.sync.main"); +} diff --git a/pom.xml b/pom.xml index 72403f46669..072cd1727b9 100644 --- a/pom.xml +++ b/pom.xml @@ -595,6 +595,7 @@ sys plugin plugin-default-impl + k8s-sync @@ -698,6 +699,11 @@ nacos-istio ${project.version} + + ${project.groupId} + nacos-k8s-sync + ${project.version} + ${project.groupId} nacos-consistency @@ -975,6 +981,18 @@ 2.9.0 compile + + + io.kubernetes + client-java-api + 14.0.0 + + + + io.kubernetes + client-java + 14.0.0 +