From df2dcae4ed0dce12bfc210b5072e75f36b53f586 Mon Sep 17 00:00:00 2001 From: Wang Chengming <634749869@qq.com> Date: Thu, 29 Feb 2024 18:03:07 +0800 Subject: [PATCH 1/4] optimize UserThreadPoolManager (#1390) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: 呈铭 --- .../com/alipay/sofa/rpc/config/UserThreadPoolManager.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/core/api/src/main/java/com/alipay/sofa/rpc/config/UserThreadPoolManager.java b/core/api/src/main/java/com/alipay/sofa/rpc/config/UserThreadPoolManager.java index 6f044f986..0f667ca0f 100644 --- a/core/api/src/main/java/com/alipay/sofa/rpc/config/UserThreadPoolManager.java +++ b/core/api/src/main/java/com/alipay/sofa/rpc/config/UserThreadPoolManager.java @@ -81,9 +81,7 @@ public static UserThreadPool getUserThread(String service) { public static Set getUserThreadPoolSet() { Set userThreadPoolSet = new HashSet<>(); if (hasUserThread()) { - for (UserThreadPool userThreadPool : userThreadMap.values()) { - userThreadPoolSet.add(userThreadPool); - } + userThreadPoolSet.addAll(userThreadMap.values()); } return userThreadPoolSet; } From ac2a73ef018a1bdbd9da0075dd64f0c4ae309984 Mon Sep 17 00:00:00 2001 From: Wang Chengming <634749869@qq.com> Date: Tue, 12 Mar 2024 15:59:44 +0800 Subject: [PATCH 2/4] fix #1380, create NacosRegistryProviderObserver when init method is executed (#1401) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix https://github.com/sofastack/sofa-rpc/issues/1380 * fix https://github.com/sofastack/sofa-rpc/issues/1380 --------- Co-authored-by: 呈铭 --- .../rpc/registry/nacos/NacosRegistry.java | 41 +++++++++++-------- .../rpc/registry/nacos/NacosRegistryTest.java | 37 +++++++++++++++-- 2 files changed, 58 insertions(+), 20 deletions(-) diff --git a/registry/registry-nacos/src/main/java/com/alipay/sofa/rpc/registry/nacos/NacosRegistry.java b/registry/registry-nacos/src/main/java/com/alipay/sofa/rpc/registry/nacos/NacosRegistry.java index 7161a8702..98dec21bc 100644 --- a/registry/registry-nacos/src/main/java/com/alipay/sofa/rpc/registry/nacos/NacosRegistry.java +++ b/registry/registry-nacos/src/main/java/com/alipay/sofa/rpc/registry/nacos/NacosRegistry.java @@ -20,12 +20,12 @@ import com.alibaba.nacos.api.exception.NacosException; import com.alibaba.nacos.api.naming.NamingFactory; import com.alibaba.nacos.api.naming.NamingService; -import com.alibaba.nacos.api.naming.listener.Event; import com.alibaba.nacos.api.naming.listener.EventListener; import com.alibaba.nacos.api.naming.listener.NamingEvent; import com.alibaba.nacos.api.naming.pojo.Instance; import com.alipay.sofa.rpc.client.ProviderGroup; import com.alipay.sofa.rpc.client.ProviderInfo; +import com.alipay.sofa.rpc.common.annotation.VisibleForTesting; import com.alipay.sofa.rpc.common.utils.CommonUtils; import com.alipay.sofa.rpc.common.utils.StringUtils; import com.alipay.sofa.rpc.config.ConsumerConfig; @@ -146,6 +146,10 @@ public synchronized void init() { nacosConfig.putAll(parameters); } + if (providerObserver == null) { + providerObserver = new NacosRegistryProviderObserver(); + } + try { namingService = NamingFactory.createNamingService(nacosConfig); } catch (NacosException e) { @@ -272,26 +276,19 @@ public List subscribe(final ConsumerConfig config) { } try { - if (providerObserver == null) { - providerObserver = new NacosRegistryProviderObserver(); - } - ProviderInfoListener providerInfoListener = config.getProviderInfoListener(); providerObserver.addProviderListener(config, providerInfoListener); - EventListener eventListener = new EventListener() { - @Override - public void onEvent(Event event) { - if (event instanceof NamingEvent) { - NamingEvent namingEvent = (NamingEvent) event; - List instances = namingEvent.getInstances(); - // avoid npe - if (null == instances) { - instances = new ArrayList(); - } - instances.removeIf(i -> !i.isEnabled()); - providerObserver.updateProviders(config, instances); + EventListener eventListener = event -> { + if (event instanceof NamingEvent) { + NamingEvent namingEvent = (NamingEvent) event; + List instances = namingEvent.getInstances(); + // avoid npe + if (null == instances) { + instances = new ArrayList(); } + instances.removeIf(i -> !i.isEnabled()); + providerObserver.updateProviders(config, instances); } }; namingService.subscribe(serviceName, defaultCluster, eventListener); @@ -359,4 +356,14 @@ public void destroy() { public Properties getNacosConfig() { return nacosConfig; } + + /** + * UT only + * + * @return + */ + @VisibleForTesting + public NacosRegistryProviderObserver getProviderObserver() { + return providerObserver; + } } diff --git a/registry/registry-nacos/src/test/java/com/alipay/sofa/rpc/registry/nacos/NacosRegistryTest.java b/registry/registry-nacos/src/test/java/com/alipay/sofa/rpc/registry/nacos/NacosRegistryTest.java index 373af0eb6..dc6e7f114 100644 --- a/registry/registry-nacos/src/test/java/com/alipay/sofa/rpc/registry/nacos/NacosRegistryTest.java +++ b/registry/registry-nacos/src/test/java/com/alipay/sofa/rpc/registry/nacos/NacosRegistryTest.java @@ -18,6 +18,7 @@ import com.alipay.sofa.rpc.client.ProviderGroup; import com.alipay.sofa.rpc.client.ProviderInfo; +import com.alipay.sofa.rpc.common.struct.ConcurrentHashSet; import com.alipay.sofa.rpc.config.ApplicationConfig; import com.alipay.sofa.rpc.config.ConsumerConfig; import com.alipay.sofa.rpc.config.ProviderConfig; @@ -36,9 +37,12 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; /** @@ -66,8 +70,6 @@ public void setUp() { .setRegister(true); registry = (NacosRegistry) RegistryFactory.getRegistry(registryConfig); - registry.init(); - Assert.assertTrue(registry.start()); } /** @@ -77,7 +79,30 @@ public void setUp() { public void tearDown() { registry.destroy(); registry = null; - serverConfig.destroy(); + } + + @Test + public void testMuiltInit() throws InterruptedException { + ExecutorService executorService = Executors.newFixedThreadPool(10); + final CountDownLatch latch = new CountDownLatch(10); + Set sets = new ConcurrentHashSet<>(); + + for (int i = 0; i < 10; i++) { + executorService.submit(() -> { + try { + registry.init(); + NacosRegistryProviderObserver providerObserver = registry.getProviderObserver(); + sets.add(providerObserver); + } finally { + latch.countDown(); + } + }); + } + + latch.await(); + executorService.shutdown(); + + Assert.assertEquals(1, sets.size()); } /** @@ -87,6 +112,8 @@ public void tearDown() { */ @Test public void testProviderObserver() throws Exception { + registry.init(); + Assert.assertTrue(registry.start()); int timeoutPerSub = 2000; //wait nacos startup ok @@ -227,6 +254,7 @@ public void testProviderObserver() throws Exception { List consumerConfigList = new ArrayList<>(); consumerConfigList.add(consumer2); registry.batchUnSubscribe(consumerConfigList); + serverConfig.destroy(); } /** @@ -236,6 +264,8 @@ public void testProviderObserver() throws Exception { */ @Test public void testVirtualHostAndVirtualPort() throws Exception { + registry.init(); + Assert.assertTrue(registry.start()); //wait nacos startup ok TimeUnit.SECONDS.sleep(10); // 模拟的场景 client -> proxy:127.7.7.7:8888 -> netty:0.0.0.0:12200 @@ -297,6 +327,7 @@ public void testVirtualHostAndVirtualPort() throws Exception { virtualHost + ":" + virtualPort); Assert.assertEquals("The provider's host should be virtualHost", virtualHost, pri.getHost()); Assert.assertEquals("The provider's port should be virtualPort", virtualPort, pri.getPort()); + serverConfig.destroy(); } private static class MockProviderInfoListener implements ProviderInfoListener { From 6cf6f00cab935bdf957d712792d37ee32aff8191 Mon Sep 17 00:00:00 2001 From: Wang Chengming <634749869@qq.com> Date: Mon, 25 Mar 2024 10:13:23 +0800 Subject: [PATCH 3/4] support sofa registry kubernetes (#1395) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: 呈铭 --- all/pom.xml | 6 + bom/pom.xml | 15 + .../sofa/rpc/config/RegistryConfig.java | 30 ++ registry/pom.xml | 1 + registry/registry-kubernetes/pom.xml | 39 ++ .../kubernetes/KubernetesRegistry.java | 280 ++++++++++++++ .../kubernetes/KubernetesRegistryHelper.java | 101 +++++ .../KubernetesRegistryProviderWatcher.java | 77 ++++ .../constant/KubernetesClientConstants.java | 73 ++++ .../utils/KubernetesClientUtils.java | 28 ++ .../utils/KubernetesConfigUtils.java | 104 ++++++ .../com.alipay.sofa.rpc.registry.Registry | 1 + .../kubernetes/KubernetesRegistryTest.java | 345 ++++++++++++++++++ .../rpc/registry/kubernetes/TestService.java | 22 ++ .../rpc/registry/kubernetes/TestService2.java | 22 ++ .../registry/kubernetes/TestServiceImpl.java | 25 ++ .../registry/kubernetes/TestServiceImpl2.java | 25 ++ .../src/test/resources/log4j.xml | 16 + .../org.mockito.plugins.MockMaker | 1 + .../test/resources/sofa-rpc/rpc-config.json | 4 + 20 files changed, 1215 insertions(+) create mode 100644 registry/registry-kubernetes/pom.xml create mode 100644 registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/KubernetesRegistry.java create mode 100644 registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/KubernetesRegistryHelper.java create mode 100644 registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/KubernetesRegistryProviderWatcher.java create mode 100644 registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/constant/KubernetesClientConstants.java create mode 100644 registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/utils/KubernetesClientUtils.java create mode 100644 registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/utils/KubernetesConfigUtils.java create mode 100644 registry/registry-kubernetes/src/main/resources/META-INF/services/sofa-rpc/com.alipay.sofa.rpc.registry.Registry create mode 100644 registry/registry-kubernetes/src/test/java/com/alipay/sofa/rpc/registry/kubernetes/KubernetesRegistryTest.java create mode 100644 registry/registry-kubernetes/src/test/java/com/alipay/sofa/rpc/registry/kubernetes/TestService.java create mode 100644 registry/registry-kubernetes/src/test/java/com/alipay/sofa/rpc/registry/kubernetes/TestService2.java create mode 100644 registry/registry-kubernetes/src/test/java/com/alipay/sofa/rpc/registry/kubernetes/TestServiceImpl.java create mode 100644 registry/registry-kubernetes/src/test/java/com/alipay/sofa/rpc/registry/kubernetes/TestServiceImpl2.java create mode 100755 registry/registry-kubernetes/src/test/resources/log4j.xml create mode 100644 registry/registry-kubernetes/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker create mode 100644 registry/registry-kubernetes/src/test/resources/sofa-rpc/rpc-config.json diff --git a/all/pom.xml b/all/pom.xml index 3daa4e511..df0690331 100644 --- a/all/pom.xml +++ b/all/pom.xml @@ -244,6 +244,11 @@ sofa-rpc-registry-polaris ${project.version} + + com.alipay.sofa + sofa-rpc-registry-kubernetes + ${project.version} + com.alipay.sofa sofa-rpc-remoting-bolt @@ -545,6 +550,7 @@ com.alipay.sofa:sofa-rpc-registry-multicast com.alipay.sofa:sofa-rpc-registry-sofa com.alipay.sofa:sofa-rpc-registry-polaris + com.alipay.sofa:sofa-rpc-registry-kubernetes com.alipay.sofa:sofa-rpc-remoting-bolt com.alipay.sofa:sofa-rpc-remoting-http com.alipay.sofa:sofa-rpc-remoting-resteasy diff --git a/bom/pom.xml b/bom/pom.xml index a97d8cc8f..d43a6183d 100644 --- a/bom/pom.xml +++ b/bom/pom.xml @@ -60,6 +60,8 @@ true true + + 6.9.2 @@ -514,6 +516,19 @@ ${grpc.version} + + + io.fabric8 + kubernetes-client + ${fabric8_kubernetes_version} + + + io.fabric8 + kubernetes-server-mock + test + ${fabric8_kubernetes_version} + + org.apache.curator diff --git a/core/api/src/main/java/com/alipay/sofa/rpc/config/RegistryConfig.java b/core/api/src/main/java/com/alipay/sofa/rpc/config/RegistryConfig.java index b11533728..674a89fa6 100644 --- a/core/api/src/main/java/com/alipay/sofa/rpc/config/RegistryConfig.java +++ b/core/api/src/main/java/com/alipay/sofa/rpc/config/RegistryConfig.java @@ -398,6 +398,36 @@ public String getParameter(String key) { return parameters == null ? null : parameters.get(key); } + /** + * Gets parameter or default. + * + * @param key the key + * @return the value + */ + public String getParameter(String key, String defaultValue) { + return getParameter(key) == null ? defaultValue : getParameter(key); + } + + /** + * Gets parameter or default. + * + * @param key the key + * @return the value + */ + public int getParameter(String key, int defaultValue) { + return getParameter(key) == null ? defaultValue : Integer.parseInt(parameters.get(key)); + } + + /** + * Gets parameter or default. + * + * @param key the key + * @return the value + */ + public boolean getParameter(String key, boolean defaultValue) { + return getParameter(key) == null ? defaultValue : Boolean.parseBoolean(parameters.get(key)); + } + @Override public String toString() { return "RegistryConfig{" + diff --git a/registry/pom.xml b/registry/pom.xml index e65411b8e..4dc28e046 100644 --- a/registry/pom.xml +++ b/registry/pom.xml @@ -22,6 +22,7 @@ registry-multicast registry-sofa registry-polaris + registry-kubernetes diff --git a/registry/registry-kubernetes/pom.xml b/registry/registry-kubernetes/pom.xml new file mode 100644 index 000000000..b3bb820ff --- /dev/null +++ b/registry/registry-kubernetes/pom.xml @@ -0,0 +1,39 @@ + + + 4.0.0 + + + com.alipay.sofa + sofa-rpc-registry + ${revision} + + + sofa-rpc-registry-kubernetes + + + + com.alipay.sofa + sofa-rpc-log + + + com.alipay.sofa + sofa-rpc-api + + + com.alipay.sofa + sofa-rpc-codec-api + + + io.fabric8 + kubernetes-client + + + io.fabric8 + kubernetes-server-mock + test + + + + diff --git a/registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/KubernetesRegistry.java b/registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/KubernetesRegistry.java new file mode 100644 index 000000000..7f4e616a4 --- /dev/null +++ b/registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/KubernetesRegistry.java @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.registry.kubernetes; + +import com.alipay.sofa.rpc.client.ProviderGroup; +import com.alipay.sofa.rpc.client.ProviderInfo; +import com.alipay.sofa.rpc.common.annotation.VisibleForTesting; +import com.alipay.sofa.rpc.common.utils.CommonUtils; +import com.alipay.sofa.rpc.config.ConsumerConfig; +import com.alipay.sofa.rpc.config.ProviderConfig; +import com.alipay.sofa.rpc.config.RegistryConfig; +import com.alipay.sofa.rpc.config.ServerConfig; +import com.alipay.sofa.rpc.ext.Extension; +import com.alipay.sofa.rpc.listener.ProviderInfoListener; +import com.alipay.sofa.rpc.log.LogCodes; +import com.alipay.sofa.rpc.log.Logger; +import com.alipay.sofa.rpc.log.LoggerFactory; +import com.alipay.sofa.rpc.registry.Registry; +import com.alipay.sofa.rpc.registry.kubernetes.utils.KubernetesClientUtils; +import com.alipay.sofa.rpc.registry.kubernetes.utils.KubernetesConfigUtils; +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.api.model.PodBuilder; +import io.fabric8.kubernetes.api.model.PodList; +import io.fabric8.kubernetes.client.Config; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.dsl.FilterWatchListDeletable; +import io.fabric8.kubernetes.client.dsl.PodResource; +import io.fabric8.kubernetes.client.informers.ResourceEventHandler; +import io.fabric8.kubernetes.client.informers.SharedIndexInformer; + +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +@Extension("kubernetes") +public class KubernetesRegistry extends Registry { + + /** + * slf4j Logger for this class + */ + private final static Logger LOGGER = LoggerFactory.getLogger(KubernetesRegistry.class); + + private KubernetesClient kubernetesClient; + + private String currentHostname; + + private String namespace; + + private KubernetesRegistryProviderWatcher kubernetesRegistryProviderWatcher; + + private final ConcurrentMap> consumerListeners = new ConcurrentHashMap<>(64); + + /** + * Instantiates a new kubernetes registry. + * + * @param registryConfig + */ + public KubernetesRegistry(RegistryConfig registryConfig) { + super(registryConfig); + } + + @Override + public synchronized void init() { + // init kubernetes config + Config config = KubernetesConfigUtils.buildKubernetesConfig(registryConfig); + // init kubernetes client + if (kubernetesClient == null) { + this.kubernetesClient = KubernetesClientUtils.buildKubernetesClient(config); + } + // init Watcher + if (kubernetesRegistryProviderWatcher == null) { + kubernetesRegistryProviderWatcher = new KubernetesRegistryProviderWatcher(); + } + this.currentHostname = System.getenv("HOSTNAME"); + this.namespace = config.getNamespace(); + } + + @Override + public boolean start() { + return true; + } + + @Override + public void register(ProviderConfig config) { + String appName = config.getAppName(); + if (!registryConfig.isRegister()) { + if (LOGGER.isInfoEnabled(appName)) { + LOGGER.infoWithApp(appName, LogCodes.getLog(LogCodes.INFO_REGISTRY_IGNORE)); + } + return; + } + + if (config.isRegister()) { + PodResource podResource = kubernetesClient.pods() + .inNamespace(namespace) + .withName(currentHostname); + + List serverConfigs = config.getServer(); + + if (CommonUtils.isNotEmpty(serverConfigs)) { + for (ServerConfig serverConfig : serverConfigs) { + String dataId = KubernetesRegistryHelper.buildDataId(config, serverConfig.getProtocol()); + // 对外提供服务的URL + String url = KubernetesRegistryHelper.convertToUrl(podResource.get(), serverConfig, config); + + podResource.edit(pod -> new PodBuilder(pod).editOrNewMetadata() + // 将ProviderConfig存在Annotations上 + .addToAnnotations(dataId, url) + // 为了过滤pod、其实value是用不到的 + .addToLabels(dataId, "") + .endMetadata().build()); + } + } + } + } + + @Override + public void unRegister(ProviderConfig config) { + String appName = config.getAppName(); + if (!registryConfig.isRegister()) { + if (LOGGER.isInfoEnabled(appName)) { + LOGGER.infoWithApp(appName, LogCodes.getLog(LogCodes.INFO_REGISTRY_IGNORE)); + } + return; + } + + if (config.isRegister()) { + PodResource podResource = kubernetesClient.pods() + .inNamespace(namespace) + .withName(currentHostname); + + List serverConfigs = config.getServer(); + if (CommonUtils.isNotEmpty(serverConfigs)) { + for (ServerConfig serverConfig : serverConfigs) { + String dataId = KubernetesRegistryHelper.buildDataId(config, serverConfig.getProtocol()); + + podResource.edit(pod -> new PodBuilder(pod).editOrNewMetadata() + .removeFromAnnotations(dataId) + .removeFromLabels(dataId) + .endMetadata() + .build()); + } + } + } + } + + @Override + public void batchUnRegister(List configs) { + // one by one + for (ProviderConfig config : configs) { + try { + this.unRegister(config); + } catch (Exception e) { + LOGGER.errorWithApp(config.getAppName(), "Batch unregister error", e); + } + } + } + + @Override + public List subscribe(ConsumerConfig config) { + String appName = config.getAppName(); + if (!registryConfig.isSubscribe()) { + // registry ignored + if (LOGGER.isInfoEnabled(appName)) { + LOGGER.infoWithApp(appName, LogCodes.getLog(LogCodes.INFO_REGISTRY_IGNORE)); + } + return null; + } + + if (config.isSubscribe()) { + + ProviderInfoListener providerInfoListener = config.getProviderInfoListener(); + kubernetesRegistryProviderWatcher.addProviderListener(config, providerInfoListener); + + String dataId = KubernetesRegistryHelper.buildDataId(config, config.getProtocol()); + FilterWatchListDeletable podPodListPodResourceFilterWatchListDeletable = + kubernetesClient.pods() + .inNamespace(namespace) + .withLabel(dataId); + + SharedIndexInformer inform = podPodListPodResourceFilterWatchListDeletable.inform(new ResourceEventHandler() { + @Override + public void onAdd(Pod pod) { + kubernetesRegistryProviderWatcher.updateProviders(config, getPods()); + } + + @Override + public void onUpdate(Pod pod, Pod t1) { + kubernetesRegistryProviderWatcher.updateProviders(config, getPods()); + } + + @Override + public void onDelete(Pod pod, boolean b) { + kubernetesRegistryProviderWatcher.updateProviders(config, getPods()); + } + }); + + consumerListeners.put(config, inform); + + inform.start(); + + List pods = podPodListPodResourceFilterWatchListDeletable.list().getItems(); + List providerInfos = KubernetesRegistryHelper.convertPodsToProviders(pods, config); + return Collections.singletonList(new ProviderGroup().addAll(providerInfos)); + } + + return null; + } + + @Override + public void unSubscribe(ConsumerConfig config) { + if (config.isSubscribe()) { + SharedIndexInformer informer = consumerListeners.remove(config); + if (null != informer) { + informer.stop(); + } + } + + kubernetesRegistryProviderWatcher.removeProviderListener(config); + } + + @Override + public void batchUnSubscribe(List configs) { + // one by one + for (ConsumerConfig config : configs) { + try { + this.unSubscribe(config); + } catch (Exception e) { + LOGGER.errorWithApp(config.getAppName(), "Batch unSubscribe error", e); + } + } + } + + @Override + public void destroy() { + // unRegister consumer + consumerListeners.forEach((k, v) -> unSubscribe(k)); + + // close kubernetes client + kubernetesClient.close(); + } + + private List getPods() { + return kubernetesClient.pods() + .inNamespace(namespace) + .list() + .getItems(); + } + + /** + * UT used only + */ + @VisibleForTesting + public void setCurrentHostname(String currentHostname) { + this.currentHostname = currentHostname; + } + + /** + * UT used only + */ + @VisibleForTesting + public ConcurrentMap> getConsumerListeners() { + return consumerListeners; + } +} \ No newline at end of file diff --git a/registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/KubernetesRegistryHelper.java b/registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/KubernetesRegistryHelper.java new file mode 100644 index 000000000..ca61fe851 --- /dev/null +++ b/registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/KubernetesRegistryHelper.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.registry.kubernetes; + +import com.alipay.sofa.rpc.client.ProviderHelper; +import com.alipay.sofa.rpc.client.ProviderInfo; +import com.alipay.sofa.rpc.common.RpcConstants; +import com.alipay.sofa.rpc.common.utils.CommonUtils; +import com.alipay.sofa.rpc.common.utils.StringUtils; +import com.alipay.sofa.rpc.config.AbstractInterfaceConfig; +import com.alipay.sofa.rpc.config.ConfigUniqueNameGenerator; +import com.alipay.sofa.rpc.config.ConsumerConfig; +import com.alipay.sofa.rpc.config.ProviderConfig; +import com.alipay.sofa.rpc.config.ServerConfig; +import com.alipay.sofa.rpc.log.Logger; +import com.alipay.sofa.rpc.log.LoggerFactory; +import com.alipay.sofa.rpc.registry.utils.RegistryUtils; +import io.fabric8.kubernetes.api.model.Pod; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public class KubernetesRegistryHelper extends RegistryUtils { + + private final static Logger LOGGER = LoggerFactory.getLogger(KubernetesRegistryHelper.class); + + public static List convertPodsToProviders(List pods, ConsumerConfig config) { + List providerInfos = new ArrayList<>(); + if (CommonUtils.isEmpty(pods) || null == config) { + return providerInfos; + } + + for (Pod pod : pods) { + ProviderInfo providerInfo = getProviderInfo(pod, config); + if (null == providerInfo) { + continue; + } + providerInfos.add(providerInfo); + } + + return providerInfos; + } + + public static String convertToUrl(Pod pod, ServerConfig serverConfig, ProviderConfig providerConfig) { + String uri = ""; + String protocol = serverConfig.getProtocol(); + if (StringUtils.isNotEmpty(protocol)) { + uri = protocol + "://"; + } + uri += pod.getStatus().getPodIP() + ":" + serverConfig.getPort(); + + Map metaData = RegistryUtils.convertProviderToMap(providerConfig, serverConfig); + + StringBuilder sb = new StringBuilder(); + for (Map.Entry entry : metaData.entrySet()) { + sb.append("&").append(entry.getKey()).append("=").append(entry.getValue()); + } + if (sb.length() > 0) { + uri += sb.replace(0, 1, "?").toString(); + } + return uri; + } + + private static ProviderInfo getProviderInfo(Pod pod, ConsumerConfig config) { + try { + String dataId = buildDataId(config, config.getProtocol()); + String providerUrlString = pod.getMetadata().getAnnotations().get(dataId); + + if (StringUtils.isBlank(providerUrlString)) { + return null; + } + return ProviderHelper.toProviderInfo(providerUrlString); + } catch (Exception e) { + LOGGER.info("get provider config error with pod"); + return null; + } + } + + public static String buildDataId(AbstractInterfaceConfig config, String protocol) { + if (RpcConstants.PROTOCOL_TYPE_BOLT.equals(protocol) || RpcConstants.PROTOCOL_TYPE_TR.equals(protocol)) { + return ConfigUniqueNameGenerator.getUniqueName(config) + "@DEFAULT"; + } else { + return ConfigUniqueNameGenerator.getUniqueName(config) + "@" + protocol; + } + } +} diff --git a/registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/KubernetesRegistryProviderWatcher.java b/registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/KubernetesRegistryProviderWatcher.java new file mode 100644 index 000000000..8c026427b --- /dev/null +++ b/registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/KubernetesRegistryProviderWatcher.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.registry.kubernetes; + +import com.alipay.sofa.rpc.client.ProviderGroup; +import com.alipay.sofa.rpc.client.ProviderInfo; +import com.alipay.sofa.rpc.common.utils.CommonUtils; +import com.alipay.sofa.rpc.config.ConsumerConfig; +import com.alipay.sofa.rpc.listener.ProviderInfoListener; +import com.alipay.sofa.rpc.registry.utils.RegistryUtils; +import io.fabric8.kubernetes.api.model.Pod; + +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +public class KubernetesRegistryProviderWatcher { + + /** + * The Provider add listener map. + */ + private final ConcurrentMap> providerListenerMap = new ConcurrentHashMap<>(); + + /** + * Add provider listener. + * + * @param consumerConfig the consumer config + * @param listener the listener + */ + public void addProviderListener(ConsumerConfig consumerConfig, ProviderInfoListener listener) { + if (listener != null) { + RegistryUtils.initOrAddList(providerListenerMap, consumerConfig, listener); + } + } + + /** + * Remove provider listener. + * + * @param consumerConfig the consumer config + */ + public void removeProviderListener(ConsumerConfig consumerConfig) { + providerListenerMap.remove(consumerConfig); + } + + /** + * Update providers. + * + * @param config the config + * @param podList the pod list + */ + public void updateProviders(ConsumerConfig config, List podList) { + List providerInfoListeners = providerListenerMap.get(config); + if (CommonUtils.isNotEmpty(providerInfoListeners)) { + List providerInfos = KubernetesRegistryHelper.convertPodsToProviders(podList, config); + + for (ProviderInfoListener providerInfoListener : providerInfoListeners) { + providerInfoListener.updateAllProviders(Collections.singletonList(new ProviderGroup().addAll(providerInfos))); + } + } + } + +} \ No newline at end of file diff --git a/registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/constant/KubernetesClientConstants.java b/registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/constant/KubernetesClientConstants.java new file mode 100644 index 000000000..4bd508296 --- /dev/null +++ b/registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/constant/KubernetesClientConstants.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.registry.kubernetes.constant; + +public class KubernetesClientConstants { + + public static final String DEFAULT_MASTER_URL = "https://kubernetes.default.svc"; + + public static final String TRUST_CERTS = "trustCerts"; + + public static final String USE_HTTPS = "useHttps"; + + public static final String HTTP2_DISABLE = "http2Disable"; + + public static final String NAMESPACE = "namespace"; + + public static final String API_VERSION = "apiVersion"; + + public static final String CA_CERT_FILE = "caCertFile"; + + public static final String CA_CERT_DATA = "caCertData"; + + public static final String CLIENT_CERT_FILE = "clientCertFile"; + + public static final String CLIENT_CERT_DATA = "clientCertData"; + + public static final String CLIENT_KEY_FILE = "clientKeyFile"; + + public static final String CLIENT_KEY_DATA = "clientKeyData"; + + public static final String CLIENT_KEY_ALGO = "clientKeyAlgo"; + + public static final String CLIENT_KEY_PASSPHRASE = "clientKeyPassphrase"; + + public static final String OAUTH_TOKEN = "oauthToken"; + + public static final String USERNAME = "username"; + + public static final String PASSWORD = "password"; + + public static final String WATCH_RECONNECT_INTERVAL = "watchReconnectInterval"; + + public static final String WATCH_RECONNECT_LIMIT = "watchReconnectLimit"; + + public static final String CONNECTION_TIMEOUT = "connectionTimeout"; + + public static final String REQUEST_TIMEOUT = "requestTimeout"; + + public static final String LOGGING_INTERVAL = "loggingInterval"; + + public static final String HTTP_PROXY = "httpProxy"; + + public static final String HTTPS_PROXY = "httpsProxy"; + + public static final String PROXY_USERNAME = "proxyUsername"; + + public static final String PROXY_PASSWORD = "proxyPassword"; + +} diff --git a/registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/utils/KubernetesClientUtils.java b/registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/utils/KubernetesClientUtils.java new file mode 100644 index 000000000..274d40a99 --- /dev/null +++ b/registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/utils/KubernetesClientUtils.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.registry.kubernetes.utils; + +import io.fabric8.kubernetes.client.Config; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.KubernetesClientBuilder; + +public class KubernetesClientUtils { + + public static KubernetesClient buildKubernetesClient(Config config) { + return new KubernetesClientBuilder().withConfig(config).build(); + } +} \ No newline at end of file diff --git a/registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/utils/KubernetesConfigUtils.java b/registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/utils/KubernetesConfigUtils.java new file mode 100644 index 000000000..f7aea188f --- /dev/null +++ b/registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/utils/KubernetesConfigUtils.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.registry.kubernetes.utils; + +import com.alipay.sofa.rpc.common.utils.StringUtils; +import com.alipay.sofa.rpc.config.RegistryConfig; +import io.fabric8.kubernetes.client.Config; +import io.fabric8.kubernetes.client.ConfigBuilder; + +import java.util.Base64; + +import static com.alipay.sofa.rpc.registry.kubernetes.constant.KubernetesClientConstants.API_VERSION; +import static com.alipay.sofa.rpc.registry.kubernetes.constant.KubernetesClientConstants.CA_CERT_DATA; +import static com.alipay.sofa.rpc.registry.kubernetes.constant.KubernetesClientConstants.CA_CERT_FILE; +import static com.alipay.sofa.rpc.registry.kubernetes.constant.KubernetesClientConstants.CLIENT_CERT_DATA; +import static com.alipay.sofa.rpc.registry.kubernetes.constant.KubernetesClientConstants.CLIENT_CERT_FILE; +import static com.alipay.sofa.rpc.registry.kubernetes.constant.KubernetesClientConstants.CLIENT_KEY_ALGO; +import static com.alipay.sofa.rpc.registry.kubernetes.constant.KubernetesClientConstants.CLIENT_KEY_DATA; +import static com.alipay.sofa.rpc.registry.kubernetes.constant.KubernetesClientConstants.CLIENT_KEY_FILE; +import static com.alipay.sofa.rpc.registry.kubernetes.constant.KubernetesClientConstants.CLIENT_KEY_PASSPHRASE; +import static com.alipay.sofa.rpc.registry.kubernetes.constant.KubernetesClientConstants.CONNECTION_TIMEOUT; +import static com.alipay.sofa.rpc.registry.kubernetes.constant.KubernetesClientConstants.DEFAULT_MASTER_URL; +import static com.alipay.sofa.rpc.registry.kubernetes.constant.KubernetesClientConstants.HTTP2_DISABLE; +import static com.alipay.sofa.rpc.registry.kubernetes.constant.KubernetesClientConstants.HTTPS_PROXY; +import static com.alipay.sofa.rpc.registry.kubernetes.constant.KubernetesClientConstants.HTTP_PROXY; +import static com.alipay.sofa.rpc.registry.kubernetes.constant.KubernetesClientConstants.LOGGING_INTERVAL; +import static com.alipay.sofa.rpc.registry.kubernetes.constant.KubernetesClientConstants.NAMESPACE; +import static com.alipay.sofa.rpc.registry.kubernetes.constant.KubernetesClientConstants.OAUTH_TOKEN; +import static com.alipay.sofa.rpc.registry.kubernetes.constant.KubernetesClientConstants.PASSWORD; +import static com.alipay.sofa.rpc.registry.kubernetes.constant.KubernetesClientConstants.PROXY_PASSWORD; +import static com.alipay.sofa.rpc.registry.kubernetes.constant.KubernetesClientConstants.PROXY_USERNAME; +import static com.alipay.sofa.rpc.registry.kubernetes.constant.KubernetesClientConstants.REQUEST_TIMEOUT; +import static com.alipay.sofa.rpc.registry.kubernetes.constant.KubernetesClientConstants.TRUST_CERTS; +import static com.alipay.sofa.rpc.registry.kubernetes.constant.KubernetesClientConstants.USERNAME; +import static com.alipay.sofa.rpc.registry.kubernetes.constant.KubernetesClientConstants.USE_HTTPS; +import static com.alipay.sofa.rpc.registry.kubernetes.constant.KubernetesClientConstants.WATCH_RECONNECT_INTERVAL; +import static com.alipay.sofa.rpc.registry.kubernetes.constant.KubernetesClientConstants.WATCH_RECONNECT_LIMIT; + +public class KubernetesConfigUtils { + + public static Config buildKubernetesConfig(RegistryConfig registryConfig) { + + // Init default config + Config base = Config.autoConfigure(null); + + return new ConfigBuilder(base) + .withMasterUrl(buildMasterUrl(registryConfig)) + .withApiVersion(registryConfig.getParameter(API_VERSION, base.getApiVersion())) + .withNamespace(registryConfig.getParameter(NAMESPACE, base.getNamespace())) + .withUsername(registryConfig.getParameter(USERNAME, base.getUsername())) + .withPassword(registryConfig.getParameter(PASSWORD, base.getPassword())) + .withOauthToken(registryConfig.getParameter(OAUTH_TOKEN, base.getOauthToken())) + .withCaCertFile(registryConfig.getParameter(CA_CERT_FILE, base.getCaCertFile())) + .withCaCertData(registryConfig.getParameter(CA_CERT_DATA, decodeBase64(base.getCaCertData()))) + .withClientKeyFile(registryConfig.getParameter(CLIENT_KEY_FILE, base.getClientKeyFile())) + .withClientKeyData(registryConfig.getParameter(CLIENT_KEY_DATA, decodeBase64(base.getClientKeyData()))) + .withClientCertFile(registryConfig.getParameter(CLIENT_CERT_FILE, base.getClientCertFile())) + .withClientCertData(registryConfig.getParameter(CLIENT_CERT_DATA, decodeBase64(base.getClientCertData()))) + .withClientKeyAlgo(registryConfig.getParameter(CLIENT_KEY_ALGO, base.getClientKeyAlgo())) + .withClientKeyPassphrase(registryConfig.getParameter(CLIENT_KEY_PASSPHRASE, base.getClientKeyPassphrase())) + .withConnectionTimeout(registryConfig.getParameter(CONNECTION_TIMEOUT, base.getConnectionTimeout())) + .withRequestTimeout(registryConfig.getParameter(REQUEST_TIMEOUT, base.getRequestTimeout())) + .withWatchReconnectInterval( + registryConfig.getParameter(WATCH_RECONNECT_INTERVAL, base.getWatchReconnectInterval())) + .withWatchReconnectLimit(registryConfig.getParameter(WATCH_RECONNECT_LIMIT, base.getWatchReconnectLimit())) + .withLoggingInterval(registryConfig.getParameter(LOGGING_INTERVAL, base.getLoggingInterval())) + .withTrustCerts(registryConfig.getParameter(TRUST_CERTS, base.isTrustCerts())) + .withHttp2Disable(registryConfig.getParameter(HTTP2_DISABLE, base.isHttp2Disable())) + .withHttpProxy(registryConfig.getParameter(HTTP_PROXY, base.getHttpProxy())) + .withHttpsProxy(registryConfig.getParameter(HTTPS_PROXY, base.getHttpsProxy())) + .withProxyUsername(registryConfig.getParameter(PROXY_USERNAME, base.getProxyUsername())) + .withProxyPassword(registryConfig.getParameter(PROXY_PASSWORD, base.getProxyPassword())) + .build(); + } + + private static String buildMasterUrl(RegistryConfig registryConfig) { + String address = registryConfig.getAddress(); + if (StringUtils.isBlank(address)) { + return DEFAULT_MASTER_URL; + } + if (address.startsWith("http")) { + return address; + } + return registryConfig.getParameter(USE_HTTPS, true) ? "https://" + address : "http://" + address; + } + + private static String decodeBase64(String str) { + return StringUtils.isNotEmpty(str) ? new String(Base64.getDecoder().decode(str)) : null; + } +} diff --git a/registry/registry-kubernetes/src/main/resources/META-INF/services/sofa-rpc/com.alipay.sofa.rpc.registry.Registry b/registry/registry-kubernetes/src/main/resources/META-INF/services/sofa-rpc/com.alipay.sofa.rpc.registry.Registry new file mode 100644 index 000000000..3bf1f8e63 --- /dev/null +++ b/registry/registry-kubernetes/src/main/resources/META-INF/services/sofa-rpc/com.alipay.sofa.rpc.registry.Registry @@ -0,0 +1 @@ +kubernetes=com.alipay.sofa.rpc.registry.kubernetes.KubernetesRegistry \ No newline at end of file diff --git a/registry/registry-kubernetes/src/test/java/com/alipay/sofa/rpc/registry/kubernetes/KubernetesRegistryTest.java b/registry/registry-kubernetes/src/test/java/com/alipay/sofa/rpc/registry/kubernetes/KubernetesRegistryTest.java new file mode 100644 index 000000000..7f543df82 --- /dev/null +++ b/registry/registry-kubernetes/src/test/java/com/alipay/sofa/rpc/registry/kubernetes/KubernetesRegistryTest.java @@ -0,0 +1,345 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.registry.kubernetes; + +import com.alipay.sofa.rpc.client.ProviderGroup; +import com.alipay.sofa.rpc.common.RpcConstants; +import com.alipay.sofa.rpc.config.ApplicationConfig; +import com.alipay.sofa.rpc.config.ConsumerConfig; +import com.alipay.sofa.rpc.config.ProviderConfig; +import com.alipay.sofa.rpc.config.RegistryConfig; +import com.alipay.sofa.rpc.config.ServerConfig; +import com.alipay.sofa.rpc.context.RpcInternalContext; +import com.alipay.sofa.rpc.context.RpcInvokeContext; +import com.alipay.sofa.rpc.context.RpcRunningState; +import com.alipay.sofa.rpc.context.RpcRuntimeContext; +import com.alipay.sofa.rpc.listener.ProviderInfoListener; +import io.fabric8.kubernetes.api.model.Endpoints; +import io.fabric8.kubernetes.api.model.EndpointsBuilder; +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.api.model.PodBuilder; +import io.fabric8.kubernetes.api.model.Service; +import io.fabric8.kubernetes.api.model.ServiceBuilder; +import io.fabric8.kubernetes.client.Config; +import io.fabric8.kubernetes.client.NamespacedKubernetesClient; +import io.fabric8.kubernetes.client.server.mock.KubernetesServer; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static com.alipay.sofa.rpc.registry.kubernetes.KubernetesRegistryHelper.buildDataId; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +public class KubernetesRegistryTest { + + private static final String NAMESPACE = "TestNameSpace"; + private static final String POD_NAME = "TestPodName"; + private static final String APP_NAME = "TestAppName"; + private static final String SERVICE_NAME = "TestService"; + + public KubernetesServer mockServer; + + private NamespacedKubernetesClient mockClient; + + private static KubernetesRegistry kubernetesRegistry; + + private static RegistryConfig registryConfig; + + private static ConsumerConfig consumer; + + /** + * Ad before class. + */ + @BeforeClass + public static void adBeforeClass() { + RpcRunningState.setUnitTestMode(true); + } + + /** + * Ad after class. + */ + @AfterClass + public static void adAfterClass() { + RpcRuntimeContext.destroy(); + RpcInternalContext.removeContext(); + RpcInvokeContext.removeContext(); + } + + @Before + public void setup() { + mockServer = new KubernetesServer(false, true); + mockServer.before(); + mockClient = mockServer.getClient().inNamespace(NAMESPACE); + + registryConfig = new RegistryConfig(); + registryConfig.setProtocol("kubernetes"); + registryConfig.setAddress(mockClient.getConfiguration().getMasterUrl()); + // registryConfig.setParameter("trustCerts", "true"); + registryConfig.setParameter("namespace", NAMESPACE); + registryConfig.setParameter("useHttps", "false"); + registryConfig.setParameter("http2Disable", "true"); + + kubernetesRegistry = new KubernetesRegistry(registryConfig); + kubernetesRegistry.init(); + kubernetesRegistry.setCurrentHostname(POD_NAME); + + System.setProperty(Config.KUBERNETES_AUTH_TRYKUBECONFIG_SYSTEM_PROPERTY, "false"); + System.setProperty(Config.KUBERNETES_AUTH_TRYSERVICEACCOUNT_SYSTEM_PROPERTY, "false"); + + Pod pod = new PodBuilder() + .withNewMetadata() + .withName(POD_NAME) + .endMetadata() + .withNewStatus() + .withPodIP("192.168.1.100") + .endStatus() + .build(); + + Service service = new ServiceBuilder() + .withNewMetadata() + .withName(SERVICE_NAME) + .endMetadata() + .withNewSpec() + .endSpec() + .build(); + + Endpoints endPoints = new EndpointsBuilder() + .withNewMetadata() + .withName(SERVICE_NAME) + .endMetadata() + .addNewSubset() + .addNewAddress() + .withIp("ip1") + .withNewTargetRef() + .withUid("uid1") + .withName(POD_NAME) + .endTargetRef() + .endAddress() + .addNewPort("Test", "Test", 12345, "TCP") + .endSubset() + .build(); + + mockClient.pods().inNamespace(NAMESPACE).create(pod); + mockClient.services().inNamespace(NAMESPACE).create(service); + mockClient.endpoints().inNamespace(NAMESPACE).create(endPoints); + + Assert.assertTrue(kubernetesRegistry.start()); + } + + @After + public void cleanup() { + kubernetesRegistry.destroy(); + mockClient.close(); + mockServer.after(); + } + + @Test + public void testAll() throws InterruptedException { + ApplicationConfig applicationConfig = new ApplicationConfig() + .setAppName(APP_NAME); + + ServerConfig serverConfig1 = new ServerConfig() + .setProtocol("bolt") + .setPort(12200) + .setDaemon(false); + + ProviderConfig providerConfig1 = new ProviderConfig() + .setApplication(applicationConfig) + .setInterfaceId(TestService.class.getName()) + .setRegistry(registryConfig) + .setRegister(true) + // .setUniqueId("standalone") + .setRef(new TestServiceImpl()) + .setDelay(20) + .setServer(serverConfig1); + + // 注册第一个providerConfig1 + kubernetesRegistry.register(providerConfig1); + + ServerConfig serverConfig2 = new ServerConfig() + .setProtocol("h2c") + .setPort(12202) + .setDaemon(false); + + ProviderConfig providerConfig2 = new ProviderConfig() + .setApplication(applicationConfig) + .setInterfaceId(TestService2.class.getName()) + .setRegistry(registryConfig) + .setRegister(true) + // .setUniqueId("standalone") + .setRef(new TestServiceImpl2()) + .setDelay(20) + .setServer(serverConfig2); + + // 注册第二个providerConfig2 + kubernetesRegistry.register(providerConfig2); + + List items = mockClient.pods().inNamespace(NAMESPACE).list().getItems(); + + Assert.assertEquals(1, items.size()); + Pod pod = items.get(0); + String annotationBolt = pod.getMetadata().getAnnotations().get(buildDataId(providerConfig1, "bolt")); + Assert.assertNotNull(annotationBolt); + String annotationH2c = pod.getMetadata().getAnnotations().get(buildDataId(providerConfig2, "h2c")); + Assert.assertNotNull(annotationH2c); + + // 订阅 + consumer = new ConsumerConfig(); + consumer.setInterfaceId("com.alipay.sofa.rpc.registry.kubernetes.TestService") + .setApplication(applicationConfig) + .setProxy("javassist") + .setSubscribe(true) + .setSerialization("java") + .setInvokeType("sync") + .setTimeout(4444); + + CountDownLatch latch = new CountDownLatch(1); + MockProviderInfoListener providerInfoListener = new MockProviderInfoListener(); + providerInfoListener.setCountDownLatch(latch); + consumer.setProviderInfoListener(providerInfoListener); + List all = kubernetesRegistry.subscribe(consumer); + providerInfoListener.updateAllProviders(all); + latch.await(5000, TimeUnit.MILLISECONDS); + Map ps = providerInfoListener.getData(); + + Assert.assertEquals(1, kubernetesRegistry.getConsumerListeners().size()); + Assert.assertTrue(ps.size() > 0); + Assert.assertEquals(1, ps.size()); + Assert.assertNotNull(ps.get(RpcConstants.ADDRESS_DEFAULT_GROUP)); + Assert.assertTrue(ps.get(RpcConstants.ADDRESS_DEFAULT_GROUP).size() > 0); + + // 一次发2个端口的再次注册 + latch = new CountDownLatch(2); + providerInfoListener.setCountDownLatch(latch); + ServerConfig serverConfig = new ServerConfig() + .setProtocol("bolt") + .setHost("0.0.0.0") + .setDaemon(false) + .setPort(12201); + providerConfig1.getServer().add(serverConfig); + kubernetesRegistry.register(providerConfig1); + latch.await(5000 * 2, TimeUnit.MILLISECONDS); + Assert.assertTrue(ps.size() > 0); + Assert.assertNotNull(ps.get(RpcConstants.ADDRESS_DEFAULT_GROUP)); + Assert.assertEquals(1, ps.get(RpcConstants.ADDRESS_DEFAULT_GROUP).size()); + + // 反订阅 + kubernetesRegistry.unSubscribe(consumer); + Assert.assertEquals(0, kubernetesRegistry.getConsumerListeners().size()); + + // 反注册providerConfig1 + kubernetesRegistry.unRegister(providerConfig1); + // 反注册providerConfig2 + kubernetesRegistry.unRegister(providerConfig2); + + List unRegisterItems = mockClient.pods().inNamespace(NAMESPACE).list().getItems(); + Assert.assertEquals(0, unRegisterItems.get(0).getMetadata().getAnnotations().size()); + } + + private static class MockProviderInfoListener implements ProviderInfoListener { + + Map providerGroupMap = new HashMap<>(); + + private CountDownLatch countDownLatch; + + public void setCountDownLatch(CountDownLatch countDownLatch) { + this.countDownLatch = countDownLatch; + } + + @Override + public void addProvider(ProviderGroup providerGroup) { + + } + + @Override + public void removeProvider(ProviderGroup providerGroup) { + + } + + @Override + public void updateProviders(ProviderGroup providerGroup) { + + providerGroupMap.put(providerGroup.getName(), providerGroup); + if (countDownLatch != null) { + countDownLatch.countDown(); + countDownLatch = null; + } + } + + @Override + public void updateAllProviders(List providerGroups) { + providerGroupMap.clear(); + + if (providerGroups == null || providerGroups.size() == 0) { + } else { + for (ProviderGroup p : providerGroups) { + providerGroupMap.put(p.getName(), p); + } + + } + } + + public Map getData() { + return providerGroupMap; + } + } + + @Test + public void testUpdatePodAnnotations() { + + // 创建一个新的 Pod + String podName = "test-pod"; + String namespace = "test-namespace"; + Pod pod = new PodBuilder() + .withNewMetadata() + .withName(podName) + .withNamespace(namespace) + .endMetadata() + .build(); + + // 在模拟环境中创建 Pod + pod = mockClient.pods().inNamespace(namespace).create(pod); + assertNotNull(pod); + + // 准备要更新的 annotations + Map annotations = new HashMap<>(); + annotations.put("example.com/annotation", "value"); + + // 更新 Pod 的 annotations + pod = new PodBuilder(pod) + .editMetadata() + .addToAnnotations(annotations) + .endMetadata() + .build(); + + // 在模拟环境中更新 Pod + pod = mockClient.pods().inNamespace(namespace).withName(podName).replace(pod); + + // 获取并验证 annotations 是否已更新 + assertEquals("value", pod.getMetadata().getAnnotations().get("example.com/annotation")); + } +} \ No newline at end of file diff --git a/registry/registry-kubernetes/src/test/java/com/alipay/sofa/rpc/registry/kubernetes/TestService.java b/registry/registry-kubernetes/src/test/java/com/alipay/sofa/rpc/registry/kubernetes/TestService.java new file mode 100644 index 000000000..8cb791154 --- /dev/null +++ b/registry/registry-kubernetes/src/test/java/com/alipay/sofa/rpc/registry/kubernetes/TestService.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.registry.kubernetes; + +public interface TestService { + + String sayHello(String str); +} \ No newline at end of file diff --git a/registry/registry-kubernetes/src/test/java/com/alipay/sofa/rpc/registry/kubernetes/TestService2.java b/registry/registry-kubernetes/src/test/java/com/alipay/sofa/rpc/registry/kubernetes/TestService2.java new file mode 100644 index 000000000..30d7392e6 --- /dev/null +++ b/registry/registry-kubernetes/src/test/java/com/alipay/sofa/rpc/registry/kubernetes/TestService2.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.registry.kubernetes; + +public interface TestService2 { + + String sayHello(String str); +} \ No newline at end of file diff --git a/registry/registry-kubernetes/src/test/java/com/alipay/sofa/rpc/registry/kubernetes/TestServiceImpl.java b/registry/registry-kubernetes/src/test/java/com/alipay/sofa/rpc/registry/kubernetes/TestServiceImpl.java new file mode 100644 index 000000000..25b7cc7c3 --- /dev/null +++ b/registry/registry-kubernetes/src/test/java/com/alipay/sofa/rpc/registry/kubernetes/TestServiceImpl.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.registry.kubernetes; + +public class TestServiceImpl implements TestService { + + @Override + public String sayHello(String str) { + return str; + } +} \ No newline at end of file diff --git a/registry/registry-kubernetes/src/test/java/com/alipay/sofa/rpc/registry/kubernetes/TestServiceImpl2.java b/registry/registry-kubernetes/src/test/java/com/alipay/sofa/rpc/registry/kubernetes/TestServiceImpl2.java new file mode 100644 index 000000000..bc6c3aa1b --- /dev/null +++ b/registry/registry-kubernetes/src/test/java/com/alipay/sofa/rpc/registry/kubernetes/TestServiceImpl2.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.registry.kubernetes; + +public class TestServiceImpl2 implements TestService2 { + + @Override + public String sayHello(String str) { + return str; + } +} \ No newline at end of file diff --git a/registry/registry-kubernetes/src/test/resources/log4j.xml b/registry/registry-kubernetes/src/test/resources/log4j.xml new file mode 100755 index 000000000..e95634f16 --- /dev/null +++ b/registry/registry-kubernetes/src/test/resources/log4j.xml @@ -0,0 +1,16 @@ + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/registry/registry-kubernetes/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker b/registry/registry-kubernetes/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker new file mode 100644 index 000000000..ca6ee9cea --- /dev/null +++ b/registry/registry-kubernetes/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker @@ -0,0 +1 @@ +mock-maker-inline \ No newline at end of file diff --git a/registry/registry-kubernetes/src/test/resources/sofa-rpc/rpc-config.json b/registry/registry-kubernetes/src/test/resources/sofa-rpc/rpc-config.json new file mode 100644 index 000000000..a555027fe --- /dev/null +++ b/registry/registry-kubernetes/src/test/resources/sofa-rpc/rpc-config.json @@ -0,0 +1,4 @@ +{ + "rpc.config.order": 999, // 加载顺序,越大越后加载 + "logger.impl" : "com.alipay.sofa.rpc.log.SLF4JLoggerImpl" +} \ No newline at end of file From 70f16f3c550aeb40e9c8ac8e63870ad9c8c4992b Mon Sep 17 00:00:00 2001 From: evenliu Date: Mon, 25 Mar 2024 15:43:08 +0800 Subject: [PATCH 4/4] fix lost interface name and method name in tracer when no such method case. (#1397) Co-authored-by: liujianjun.ljj --- .../bootstrap/DefaultClientProxyInvoker.java | 1 + .../sofatracer/ProviderTracerFilter.java | 40 ++------ .../rpc/tracer/sofatracer/RpcSofaTracer.java | 18 +++- .../rpc/event/SofaTracerSubscriberTest.java | 93 +++++++++++++++++++ 4 files changed, 118 insertions(+), 34 deletions(-) create mode 100644 tracer/tracer-opentracing/src/test/java/com/alipay/sofa/rpc/event/SofaTracerSubscriberTest.java diff --git a/bootstrap/bootstrap-api/src/main/java/com/alipay/sofa/rpc/bootstrap/DefaultClientProxyInvoker.java b/bootstrap/bootstrap-api/src/main/java/com/alipay/sofa/rpc/bootstrap/DefaultClientProxyInvoker.java index afcfcc7af..0aadda9c9 100644 --- a/bootstrap/bootstrap-api/src/main/java/com/alipay/sofa/rpc/bootstrap/DefaultClientProxyInvoker.java +++ b/bootstrap/bootstrap-api/src/main/java/com/alipay/sofa/rpc/bootstrap/DefaultClientProxyInvoker.java @@ -133,6 +133,7 @@ protected void decorateRequest(SofaRequest request) { // 额外属性通过HEAD传递给服务端 request.addRequestProp(RemotingConstants.HEAD_APP_NAME, consumerConfig.getAppName()); request.addRequestProp(RemotingConstants.HEAD_PROTOCOL, consumerConfig.getProtocol()); + request.addRequestProp(RemotingConstants.HEAD_INVOKE_TYPE, request.getInvokeType()); customRequest(request, internalContext); diff --git a/tracer/tracer-opentracing/src/main/java/com/alipay/sofa/rpc/filter/sofatracer/ProviderTracerFilter.java b/tracer/tracer-opentracing/src/main/java/com/alipay/sofa/rpc/filter/sofatracer/ProviderTracerFilter.java index 3e01989ba..9bac54c5d 100644 --- a/tracer/tracer-opentracing/src/main/java/com/alipay/sofa/rpc/filter/sofatracer/ProviderTracerFilter.java +++ b/tracer/tracer-opentracing/src/main/java/com/alipay/sofa/rpc/filter/sofatracer/ProviderTracerFilter.java @@ -19,9 +19,7 @@ import com.alipay.common.tracer.core.context.trace.SofaTraceContext; import com.alipay.common.tracer.core.holder.SofaTraceContextHolder; import com.alipay.common.tracer.core.span.SofaTracerSpan; -import com.alipay.sofa.rpc.common.RpcConstants; import com.alipay.sofa.rpc.config.ProviderConfig; -import com.alipay.sofa.rpc.context.RpcInternalContext; import com.alipay.sofa.rpc.core.exception.SofaRpcException; import com.alipay.sofa.rpc.core.request.SofaRequest; import com.alipay.sofa.rpc.core.response.SofaResponse; @@ -32,10 +30,6 @@ import com.alipay.sofa.rpc.module.SofaTracerModule; import com.alipay.sofa.rpc.tracer.sofatracer.log.tags.RpcSpanTags; -import static com.alipay.sofa.rpc.common.RemotingConstants.HEAD_APP_NAME; -import static com.alipay.sofa.rpc.common.RemotingConstants.HEAD_INVOKE_TYPE; -import static com.alipay.sofa.rpc.common.RemotingConstants.HEAD_PROTOCOL; - /** * @author zhanggeng */ @@ -50,34 +44,14 @@ public boolean needToLoad(FilterInvoker invoker) { @Override public SofaResponse invoke(FilterInvoker invoker, SofaRequest request) throws SofaRpcException { - SofaTracerSpan serverSpan = null; - try { - SofaTraceContext sofaTraceContext = SofaTraceContextHolder.getSofaTraceContext(); - serverSpan = sofaTraceContext.getCurrentSpan(); - if (serverSpan != null) { - RpcInternalContext context = RpcInternalContext.getContext(); - serverSpan.setTag(RpcSpanTags.SERVICE, request.getTargetServiceUniqueName()); - serverSpan.setTag(RpcSpanTags.METHOD, request.getMethodName()); - serverSpan.setTag(RpcSpanTags.REMOTE_IP, context.getRemoteHostName()); // 客户端地址 - - // 从请求里获取ConsumerTracerFilter额外传递的信息 - serverSpan.setTag(RpcSpanTags.REMOTE_APP, (String) request.getRequestProp(HEAD_APP_NAME)); - serverSpan.setTag(RpcSpanTags.PROTOCOL, (String) request.getRequestProp(HEAD_PROTOCOL)); - serverSpan.setTag(RpcSpanTags.INVOKE_TYPE, (String) request.getRequestProp(HEAD_INVOKE_TYPE)); - - ProviderConfig providerConfig = (ProviderConfig) invoker.getConfig(); - serverSpan.setTag(RpcSpanTags.LOCAL_APP, providerConfig.getAppName()); - - serverSpan.setTag(RpcSpanTags.SERVER_THREAD_POOL_WAIT_TIME, - (Number) context.getAttachment(RpcConstants.INTERNAL_KEY_PROCESS_WAIT_TIME)); - } - return invoker.invoke(request); - } finally { - if (serverSpan != null) { - serverSpan.setTag(RpcSpanTags.SERVER_BIZ_TIME, - (Number) RpcInternalContext.getContext().getAttachment(RpcConstants.INTERNAL_KEY_IMPL_ELAPSE)); - } + SofaTraceContext sofaTraceContext = SofaTraceContextHolder.getSofaTraceContext(); + SofaTracerSpan serverSpan = sofaTraceContext.getCurrentSpan(); + if (serverSpan != null) { + ProviderConfig providerConfig = (ProviderConfig) invoker.getConfig(); + serverSpan.setTag(RpcSpanTags.LOCAL_APP, providerConfig.getAppName()); } + return invoker.invoke(request); + } } diff --git a/tracer/tracer-opentracing/src/main/java/com/alipay/sofa/rpc/tracer/sofatracer/RpcSofaTracer.java b/tracer/tracer-opentracing/src/main/java/com/alipay/sofa/rpc/tracer/sofatracer/RpcSofaTracer.java index 9be88bae8..ac3a24d54 100644 --- a/tracer/tracer-opentracing/src/main/java/com/alipay/sofa/rpc/tracer/sofatracer/RpcSofaTracer.java +++ b/tracer/tracer-opentracing/src/main/java/com/alipay/sofa/rpc/tracer/sofatracer/RpcSofaTracer.java @@ -61,6 +61,10 @@ import java.util.TreeMap; import java.util.concurrent.TimeUnit; +import static com.alipay.sofa.rpc.common.RemotingConstants.HEAD_APP_NAME; +import static com.alipay.sofa.rpc.common.RemotingConstants.HEAD_INVOKE_TYPE; +import static com.alipay.sofa.rpc.common.RemotingConstants.HEAD_PROTOCOL; + /** * SofaTracer * @@ -490,11 +494,19 @@ public void serverReceived(SofaRequest request) { // Record server receive event serverSpan.log(LogData.SERVER_RECV_EVENT_VALUE); + // Retrieve logging context information from the request + RpcInternalContext context = RpcInternalContext.getContext(); + serverSpan.setTag(RpcSpanTags.LOCAL_APP, request.getTargetAppName()); + serverSpan.setTag(RpcSpanTags.SERVICE, request.getTargetServiceUniqueName()); + serverSpan.setTag(RpcSpanTags.METHOD, request.getMethodName()); + serverSpan.setTag(RpcSpanTags.PROTOCOL, (String) request.getRequestProp(HEAD_PROTOCOL)); + serverSpan.setTag(RpcSpanTags.INVOKE_TYPE, (String) request.getRequestProp(HEAD_INVOKE_TYPE)); + serverSpan.setTag(RpcSpanTags.REMOTE_IP, context.getRemoteHostName()); + serverSpan.setTag(RpcSpanTags.REMOTE_APP, (String) request.getRequestProp(HEAD_APP_NAME)); //放到线程上下文 sofaTraceContext.push(serverSpan); //rpc 上下文 if (RpcInternalContext.isAttachmentEnable()) { - RpcInternalContext context = RpcInternalContext.getContext(); context.setAttachment(RpcConstants.INTERNAL_KEY_TRACE_ID, spanContext.getTraceId()); context.setAttachment(RpcConstants.INTERNAL_KEY_SPAN_ID, spanContext.getSpanId()); } @@ -564,6 +576,10 @@ public void serverSend(SofaRequest request, SofaResponse response, Throwable exc RpcInternalContext context = RpcInternalContext.getContext(); RpcInvokeContext invokeContext = RpcInvokeContext.getContext(); + serverSpan.setTag(RpcSpanTags.SERVER_THREAD_POOL_WAIT_TIME, + (Number) context.getAttachment(RpcConstants.INTERNAL_KEY_PROCESS_WAIT_TIME)); + serverSpan.setTag(RpcSpanTags.SERVER_BIZ_TIME, + (Number) RpcInternalContext.getContext().getAttachment(RpcConstants.INTERNAL_KEY_IMPL_ELAPSE)); serverSpan.setTag(RpcSpanTags.RESP_SERIALIZE_TIME, (Number) context.getAttachment(RpcConstants.INTERNAL_KEY_RESP_SERIALIZE_TIME)); serverSpan.setTag(RpcSpanTags.REQ_DESERIALIZE_TIME, diff --git a/tracer/tracer-opentracing/src/test/java/com/alipay/sofa/rpc/event/SofaTracerSubscriberTest.java b/tracer/tracer-opentracing/src/test/java/com/alipay/sofa/rpc/event/SofaTracerSubscriberTest.java new file mode 100644 index 000000000..4c46faa73 --- /dev/null +++ b/tracer/tracer-opentracing/src/test/java/com/alipay/sofa/rpc/event/SofaTracerSubscriberTest.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.event; + +import com.alipay.common.tracer.core.holder.SofaTraceContextHolder; +import com.alipay.common.tracer.core.span.SofaTracerSpan; +import com.alipay.sofa.rpc.common.RemotingConstants; +import com.alipay.sofa.rpc.common.RpcOptions; +import com.alipay.sofa.rpc.context.RpcInternalContext; +import com.alipay.sofa.rpc.core.request.SofaRequest; +import com.alipay.sofa.rpc.tracer.sofatracer.log.tags.RpcSpanTags; +import io.opentracing.tag.Tags; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.Map; + +import static com.alipay.sofa.rpc.common.RemotingConstants.HEAD_APP_NAME; +import static com.alipay.sofa.rpc.common.RemotingConstants.HEAD_INVOKE_TYPE; +import static com.alipay.sofa.rpc.common.RemotingConstants.HEAD_PROTOCOL; + +/** + * @author Even + * @date 2024/2/27 19:48 + */ +public class SofaTracerSubscriberTest { + + @BeforeClass + public static void beforeClass() { + System.getProperties().put(RpcOptions.DEFAULT_TRACER, "sofaTracer"); + } + + @Test + public void testClientSendAndServerReceiveTracerEvent() { + SofaRequest sofaRequest = new SofaRequest(); + sofaRequest.setMethodName("testService"); + sofaRequest.setTimeout(1000); + sofaRequest.setInvokeType("sync"); + sofaRequest.setTargetServiceUniqueName("testInterface:1.0"); + sofaRequest.setTargetAppName("targetAppName"); + EventBus.post(new ClientStartInvokeEvent(sofaRequest)); + SofaTracerSpan currentClientSpan = SofaTraceContextHolder.getSofaTraceContext().getCurrentSpan(); + Map clientTagsWithStr = currentClientSpan.getTagsWithStr(); + Assert.assertEquals("client", clientTagsWithStr.get(Tags.SPAN_KIND.getKey())); + Assert.assertEquals(sofaRequest.getTargetServiceUniqueName(), clientTagsWithStr.get(RpcSpanTags.SERVICE)); + Assert.assertEquals(sofaRequest.getMethodName(), clientTagsWithStr.get(RpcSpanTags.METHOD)); + Assert.assertEquals(Thread.currentThread().getName(), clientTagsWithStr.get(RpcSpanTags.CURRENT_THREAD_NAME)); + + Assert.assertNull(sofaRequest.getRequestProps()); + EventBus.post(new ClientBeforeSendEvent(sofaRequest)); + Map traceContext = (Map) sofaRequest.getRequestProps().get(RemotingConstants.RPC_TRACE_NAME); + Assert.assertNotNull(traceContext); + + sofaRequest.getRequestProps().put(HEAD_PROTOCOL, "tr"); + sofaRequest.getRequestProps().put(HEAD_INVOKE_TYPE, sofaRequest.getInvokeType()); + sofaRequest.getRequestProps().put(HEAD_APP_NAME, "callerAppName"); + RpcInternalContext.getContext().setRemoteAddress("127.0.0.1", 12200); + EventBus.post(new ServerReceiveEvent(sofaRequest)); + SofaTracerSpan currentServerSpan = SofaTraceContextHolder.getSofaTraceContext().getCurrentSpan(); + Map serverTagsWithStr = currentServerSpan.getTagsWithStr(); + Assert.assertEquals("server", serverTagsWithStr.get(Tags.SPAN_KIND.getKey())); + Assert.assertEquals(sofaRequest.getTargetServiceUniqueName(), serverTagsWithStr.get(RpcSpanTags.SERVICE)); + Assert.assertEquals(sofaRequest.getMethodName(), serverTagsWithStr.get(RpcSpanTags.METHOD)); + Assert.assertEquals(sofaRequest.getTargetAppName(), serverTagsWithStr.get(RpcSpanTags.LOCAL_APP)); + Assert.assertEquals(sofaRequest.getInvokeType(), serverTagsWithStr.get(RpcSpanTags.INVOKE_TYPE)); + Assert.assertEquals("tr", serverTagsWithStr.get(RpcSpanTags.PROTOCOL)); + Assert.assertEquals("127.0.0.1", serverTagsWithStr.get(RpcSpanTags.REMOTE_IP)); + Assert.assertEquals("callerAppName", serverTagsWithStr.get(RpcSpanTags.REMOTE_APP)); + SofaTraceContextHolder.getSofaTraceContext().clear(); + } + + @AfterClass + public static void afterClass() { + System.getProperties().remove(RpcOptions.DEFAULT_TRACER); + } + +} \ No newline at end of file