Skip to content

Commit

Permalink
Add ApiClientFactory.
Browse files Browse the repository at this point in the history
Update ConfigMapSyncServiceImpl, ConfigMapSyncTask.
Delete start.out
  • Loading branch information
wukong121 committed Oct 6, 2023
1 parent f0a3a5e commit b3e8e9e
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 60 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
*
* Copyright 1999-2023 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package com.alibaba.nacos.config.server.service.kubernetes;

import io.kubernetes.client.openapi.ApiClient;
import io.kubernetes.client.openapi.Configuration;
import io.kubernetes.client.util.ClientBuilder;
import io.kubernetes.client.util.KubeConfig;

import java.io.FileReader;
import java.io.IOException;

public class ApiClientFactory {

/**
* use the Java API from an application outside a kubernetes cluster.
*
* @param config ConfigMapSyncConfig
* @return apiclient
* @throws IOException exception
*/
public static ApiClient createOutsideApiClient(ConfigMapSyncConfig config) throws IOException {
String kubeConfigPath = config.getKubeConfig();

// loading the out-of-cluster config, a kubeconfig from file-system
ApiClient apiClient = ClientBuilder.kubeconfig(KubeConfig.loadKubeConfig(new FileReader(kubeConfigPath)))
.build();

// set the global default api-client to the in-cluster one from above
Configuration.setDefaultApiClient(apiClient);
return apiClient;
}

/**
* use the Java API from an application inside a kubernetes cluster.
*
* @return apiclient
*/
public static ApiClient createInsideApiClient() {
return Configuration.getDefaultApiClient();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,14 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -105,35 +107,20 @@ public void start() throws IOException {
}

/**
* use the Java API from an application outside a kubernetes cluster.
* you should load a kubeConfig to generate apiClient instead of getting it from coreV1api.
*/
public ApiClient getOutsideApiClient() throws IOException {
String kubeConfigPath = configMapSyncConfig.getKubeConfig();

// loading the out-of-cluster config, a kubeconfig from file-system
ApiClient apiClient = ClientBuilder.kubeconfig(KubeConfig.loadKubeConfig(new FileReader(kubeConfigPath))).build();

// set the global default api-client to the in-cluster one from above
Configuration.setDefaultApiClient(apiClient);
return apiClient;
}

/**
* Start watch ConfigMap.
* Start watch ConfigMap.
*/
public void startWatchConfigMap() throws IOException {
ApiClient apiClient;
CoreV1Api coreV1Api;
if (configMapSyncConfig.isOutsideCluster()) {
Loggers.MAIN.info("[{}] use outside cluster Apiclient.", "configMap-sync");
apiClient = getOutsideApiClient();
coreV1Api = new CoreV1Api(apiClient);
apiClient = ApiClientFactory.createOutsideApiClient(configMapSyncConfig);

} else {
Loggers.MAIN.info("[{}] use local cluster Apiclient.", "configMap-sync");
coreV1Api = new CoreV1Api();
apiClient = coreV1Api.getApiClient();
apiClient = ApiClientFactory.createInsideApiClient();
}
coreV1Api = new CoreV1Api(apiClient);
OkHttpClient httpClient = apiClient.getHttpClient().newBuilder().readTimeout(Duration.ZERO).build();
apiClient.setHttpClient(httpClient);
factory = new SharedInformerFactory(apiClient);
Expand All @@ -146,37 +133,43 @@ public void startWatchConfigMap() throws IOException {
@Override
public void onAdd(V1ConfigMap obj) {
if (obj == null || obj.getMetadata() == null || obj.getData() == null) {
Loggers.MAIN.warn("[{}] onAdd error: obj={}, metadata={}, data={}", "configMap-sync", obj,
obj != null ? obj.getMetadata() : null, obj != null ? obj.getData() : null);
return;
}
Loggers.MAIN.info("[{}] adding configMap...", "configMap-sync");
ConfigInfo configInfo = configMapToNacosConfigInfo(obj);
String srcIp = apiClient.getBasePath();

try {
publishConfigMap(obj, srcIp);
} catch (NacosException e) {
throw new RuntimeException(e);
Loggers.MAIN.error("[{}] catch an exception: " + e.getErrMsg(), "configMap-sync");
}
}

@Override
public void onUpdate(V1ConfigMap oldObj, V1ConfigMap newObj) {
if (oldObj == null || oldObj.getMetadata() == null || oldObj.getData() == null) {
Loggers.MAIN.warn("[{}] onUpdate error: obj={}, metadata={}, data={}", "configMap-sync", oldObj,
oldObj != null ? oldObj.getMetadata() : null, oldObj != null ? oldObj.getData() : null);
return;
}
Loggers.MAIN.info(
"[{}] update configMap " + oldObj.getMetadata().getName() + " to " + newObj.getMetadata()
.getName(), "configMap-sync");
"[{}] update configMap " + Objects.requireNonNull(oldObj.getMetadata()).getName() + " to "
+ Objects.requireNonNull(newObj.getMetadata()).getName(), "configMap-sync");
compareConfigMaps(oldObj, newObj);
ConfigInfo configInfo = configMapToNacosConfigInfo(newObj);
String srcIp = apiClient.getBasePath();
try {
publishConfigMap(newObj, srcIp);
} catch (NacosException e) {
throw new RuntimeException(e);
Loggers.MAIN.error("[{}] catch an exception: " + e.getErrMsg(), "configMap-sync");
}
}

@Override
public void onDelete(V1ConfigMap obj, boolean deletedFinalStateUnknown) {
if (obj == null || obj.getMetadata() == null || obj.getData() == null) {
Loggers.MAIN.warn("[{}] onDelete error: obj={}, metadata={}, data={}", "configMap-sync", obj,
obj != null ? obj.getMetadata() : null, obj != null ? obj.getData() : null);
return;
}
Loggers.MAIN.info("[{}] delete configMap " + obj.getMetadata().getName(), "configMap-sync");
Expand All @@ -185,23 +178,22 @@ public void onDelete(V1ConfigMap obj, boolean deletedFinalStateUnknown) {
}

});
// 启动SharedInformerFactory在后台运行事件监听器
// start event listener
factory.startAllRegisteredInformers();
}

private void compareConfigMaps(V1ConfigMap previousConfigMap, V1ConfigMap currentConfigMap) {
// 将先前的 ConfigMap 和当前的 ConfigMap 转换成 JSON 字符串
// convert configMap to json
String previousJson = gson.toJson(previousConfigMap);
String currentJson = gson.toJson(currentConfigMap);

// 将 JSON 字符串解析成 JsonElement
// convert json string to JsonElement
JsonElement previousElement = gson.fromJson(previousJson, JsonElement.class);
JsonElement currentElement = gson.fromJson(currentJson, JsonElement.class);

if (previousElement.isJsonObject() && currentElement.isJsonObject()) {
JsonObject previousObj = previousElement.getAsJsonObject();
JsonObject currentObj = currentElement.getAsJsonObject();
// 比较两个 JsonObject,找出哪些字段发生了变化
compareJsonObjects(previousObj, currentObj);
} else {
Loggers.MAIN.error("[{}] Element is not json.", "configMap-sync");
Expand All @@ -210,8 +202,9 @@ private void compareConfigMaps(V1ConfigMap previousConfigMap, V1ConfigMap curren

/**
* compare old and new ConfigMap.
*
* @param previousObj old ConfigMap
* @param currentObj new ConfigMap
* @param currentObj new ConfigMap
*/
private void compareJsonObjects(JsonObject previousObj, JsonObject currentObj) {
for (String key : previousObj.keySet()) {
Expand All @@ -234,42 +227,45 @@ private void compareJsonObjects(JsonObject previousObj, JsonObject currentObj) {

/**
* Convert ConfigMap to nacos ConfigInfo.
*
* @param configMap k8sConfigMap
* @return nacos configInfo
*/
public ConfigInfo configMapToNacosConfigInfo(V1ConfigMap configMap) {
Loggers.MAIN.info("[{}] Converting configMap to nacos ConfigInfo...", "configMap-sync");
String dataId = configMap.getMetadata().getName();
String dataId = Objects.requireNonNull(configMap.getMetadata()).getName();
String group = ConfigMapSyncConfig.K8S_GROUP;
String tenant = configMap.getMetadata().getNamespace();
String appName = null;
// 将 ConfigMap 的数据转换为 Nacos 的配置内容
Map<String, String> dataMap = configMap.getData();
Map<String, String> dataMap = configMap.getData() != null ? configMap.getData() : new HashMap<>();
String content = getContent(dataMap);
return new ConfigInfo(dataId, group, tenant, appName, content);
return new ConfigInfo(dataId, group, tenant, null, content);
}

/**
* publish ConfigMap.
*
* @param configMap k8sConfigMap
* @param srcIp source Ip
* @param srcIp source Ip
* @throws NacosException nacos exception
*/
public void publishConfigMap(V1ConfigMap configMap, String srcIp) throws NacosException {
Loggers.MAIN.info("[{}] Converting configMap to nacos ConfigForm...", "configMap-sync");

final ConfigForm configForm = new ConfigForm();
String configMapNamespace = configMap.getMetadata().getNamespace();
String configMapNamespace = Objects.requireNonNull(configMap.getMetadata()).getNamespace();
List<Namespace> namespaceList = namespaceOperationService.getNamespaceList();
String randomNamespace = UUID.randomUUID().toString();
Optional<Namespace> matchingNamespace = namespaceList.stream()
.filter(namespace -> Objects.equals(namespace.getNamespaceShowName(), configMapNamespace)).findFirst();
if (!matchingNamespace.isPresent()) {
namespaceOperationService.createNamespace(randomNamespace, configMapNamespace, "");
Boolean success = namespaceOperationService.createNamespace(randomNamespace, configMapNamespace, "");
while (!success) {
Loggers.MAIN.error("[{}] createNamespace failed.", "configMap-sync");
success = namespaceOperationService.createNamespace(randomNamespace, configMapNamespace, "");
}
}
String namespaceId = matchingNamespace.map(Namespace::getNamespace).orElse(randomNamespace);
String dataId = configMap.getMetadata().getName();
String content = getContent(configMap.getData());
String content = getContent(Objects.requireNonNull(configMap.getData()));
String group = ConfigMapSyncConfig.K8S_GROUP;
Pair<String, String> pair = EncryptionHandler.encryptHandler(dataId, content);
content = pair.getSecond();
Expand All @@ -292,8 +288,9 @@ public void publishConfigMap(V1ConfigMap configMap, String srcIp) throws NacosEx

/**
* delete k8sConfigMap.
*
* @param configMap k8sConfigMap
* @param clientIp client Ip
* @param clientIp client Ip
*/
public void deleteConfigMap(V1ConfigMap configMap, String clientIp) {
String dataId = configMap.getMetadata().getName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@

package com.alibaba.nacos.config.server.service.kubernetes;

import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.config.server.model.ConfigInfoWrapper;
import com.alibaba.nacos.config.server.service.repository.ConfigInfoPersistService;
import io.kubernetes.client.openapi.ApiClient;
import io.kubernetes.client.openapi.ApiException;
import io.kubernetes.client.openapi.apis.CoreV1Api;
import io.kubernetes.client.openapi.models.V1ConfigMap;
import okhttp3.OkHttpClient;
Expand Down Expand Up @@ -50,7 +52,7 @@ public class ConfigMapSyncTask {
private ApiClient apiClient;

@Autowired
@Qualifier("embeddedConfigInfoPersistServiceImpl")
@Qualifier(value = "embeddedConfigInfoPersistServiceImpl")
private ConfigInfoPersistService configInfoPersistService;

@Autowired
Expand All @@ -66,12 +68,11 @@ public class ConfigMapSyncTask {
@PostConstruct
public void init() throws IOException {
if (configMapSyncConfig.isOutsideCluster()) {
apiClient = configMapSyncService.getOutsideApiClient();
coreV1Api = new CoreV1Api(apiClient);
apiClient = ApiClientFactory.createOutsideApiClient(configMapSyncConfig);
} else {
coreV1Api = new CoreV1Api();
apiClient = coreV1Api.getApiClient();
apiClient = ApiClientFactory.createInsideApiClient();
}
coreV1Api = new CoreV1Api(apiClient);
OkHttpClient httpClient = apiClient.getHttpClient().newBuilder().readTimeout(Duration.ZERO).build();
apiClient.setHttpClient(httpClient);
}
Expand All @@ -81,15 +82,21 @@ public void init() throws IOException {
*/
@Scheduled(fixedDelay = 3600000)
public void checkAndSyncConfigMaps() {
List<V1ConfigMap> configMapList;
try {
List<V1ConfigMap> configMapList = coreV1Api.listConfigMapForAllNamespaces(null, null, null, null, null,
null, null, null, null, null).getItems();
for (V1ConfigMap configMap : configMapList) {
String dataId = Objects.requireNonNull(configMap.getMetadata()).getName();
String group = "K8S_GROUP";
String namespace = configMap.getMetadata().getNamespace();
String content = Objects.requireNonNull(configMap.getData()).toString();
ConfigInfoWrapper configInfo = configInfoPersistService.findConfigInfo(dataId, group, namespace);
configMapList = coreV1Api.listConfigMapForAllNamespaces(true, null, null, null, null,
null, null, null, null, true).getItems();
} catch (ApiException e) {
Loggers.MAIN.error("[{}] catch exception: " + e, "configMap-sync");
throw new RuntimeException(e);
}
for (V1ConfigMap configMap : configMapList) {
String dataId = Objects.requireNonNull(configMap.getMetadata(), "getMetadata() returns a null").getName();
String group = ConfigMapSyncConfig.K8S_GROUP;
String namespace = configMap.getMetadata().getNamespace();
String content = Objects.requireNonNull(configMap.getData(), "getData() returns a null.").toString();
ConfigInfoWrapper configInfo = configInfoPersistService.findConfigInfo(dataId, group, namespace);
try {
if (configInfo == null) {
Loggers.MAIN.info("[{}] find a missed config.", "configMap-sync");
configMapSyncService.publishConfigMap(configMap, apiClient.getBasePath());
Expand All @@ -99,9 +106,9 @@ public void checkAndSyncConfigMaps() {
configMapSyncService.publishConfigMap(configMap, apiClient.getBasePath());
}
}
} catch (NacosException e) {
Loggers.MAIN.error("[{}] catch exception: " + e, "configMap-sync");
}
} catch (Exception e) {
e.printStackTrace();
}
}

Expand All @@ -112,6 +119,10 @@ public void checkAndSyncConfigMaps() {
* @return whether contents are equal.
*/
private Boolean compareContent(String configMapContent, String nacosContent) {
if (configMapContent == null && nacosContent != null || configMapContent != null && nacosContent == null) {
return false;
}
assert configMapContent != null;
configMapContent = configMapContent.substring(1, configMapContent.length() - 1);
Map<String, String> configMapPair = contentToMap(configMapContent);
nacosContent = nacosContent.replace("\n", ", ");
Expand Down
2 changes: 0 additions & 2 deletions distribution/logs/start.out

This file was deleted.

0 comments on commit b3e8e9e

Please sign in to comment.