-
Notifications
You must be signed in to change notification settings - Fork 8.8k
NamingServer support acceptance report
-
项目名称:实现用于服务发现和注册的namingServer
-
方案描述:
namingserver完成后,seata开启事务时序图如上图所示:
1.在client侧配置好Namingserver的地址和相关配置
2.client启动后TM向namingserver发起服务发现的请求
3.namingserver根据TM传来的vGroup参数和内存中的事务分组映射关系返回相关的集群列表
4.客户端通过负载均衡策略找出合适的TC节点开启事务
5.TM将事务分组和TC节点传递给RM
6.RM向TC节点发起分支注册的请求
7.TC节点完成二阶段下发
1.在上述模型中,在namspace做环境隔离,在cluster层处理事务分组(流量切分),在unit层面做负载均衡
2.一个事务分组会被映射到一个cluster中,然后在该cluster下的unit列表中通过负载均衡策略选取一个unit,根据其元数据中指向的tc节点执行事务
3.定位一个事务分组需要(namspace,cluster)二元组
预计控制台视角如下:
集群名 | 事务分组列表 | unit数目 | 元数据 |
---|---|---|---|
clusterA | [groupA,groupB] | 3 | …… |
clusterB | [groupC,groupD] | 5 | …… |
元数据伪代码如下:
{
"clusterList": [
{
"clusterName": "cluster2",
"clusterType": "default",
"groupList":[group1,group2]
"unitData": [
{
"unitName": "115482ee-cf27-45d6-b17e-31b9e2d7892f",
"namingInstanceList": [
{
"ip": "172.31.31.191",
"port": 8092,
"nettyPort": 0,
"grpcPort": 0,
"weight": 1.0,
"healthy": true,
"timeStamp": 1695042063334,
"role": member,
"metadata": {
"weight": 1,
"cluster-type": "default"
}
}
]
},
{
"unitName": "097e6ab7-d2d2-47e4-a578-fae1a4f4c517",
"namingInstanceList": [
{
"ip": "172.31.31.191",
"port": 8091,
"nettyPort": 0,
"grpcPort": 0,
"weight": 1.0,
"healthy": true,
"timeStamp": 1695042076481,
"role": member,
"metadata": {
"weight": 1,
"cluster-type": "default"
}
}
]
}
]
}
],
"term": 1695042076578
}
以第三方注册中心nacos为例,时序图如下:
1.client侧向namingserver发起服务发现的请求
2.namingserver在controller层收到服务发现的请求,向manager层传递集群和命名空间的参数,获取对应的实例列表
3.manager根据是否配置了第三方注册中心来决定获取集群信息的方式。如上图所示,如果配置了第三方注册中心nacos,那么manager将请求转发给nacos,根据其返回数据进行格式转换;如果没有配置第三方注册中心,那么manager将从自身内存的哈希表获取实例列表信息。最后将列表信息返回给controller层
4.controller层对列表信息进行封装,并返回给客户端
1.在server侧配置多个namingserver节点地址,seata server在注册时通过一对多的方式向所有的namingserver节点发起服务注册
2.在client侧配置多个namingserver节点地址,seata server在服务发现时选取一个健康的namingserver节点发起请求
3.当有server节点宕机时namingserver可以通过unregister接口或者心跳检测立即感知到,并将最新的集群信息推送给client端,client端刷新本地缓存,并通过负载均衡策略重新选取合适的tc节点
4.当有namingserver节点宕机时,client侧可以通过健康检查将该节点标记为不健康,从而避免向故障节点发起请求
-
代码实现
namingserver核心代码类图如下:
(可缩放)
server侧核心代码如下:
-
已完成工作:
1.完成针对seata-namingserver领域模型的整理和设计,形成了相关文档https://www.yuque.com/minerhaoxue-saqup/lg95sq/zrvweaoox13t1aki?singleDoc# 《Namingserver方案设计-v2》
2.按预期实现了seata相关领域模型的数据结构,包括集群,单元,节点等。
3.按预期实现了namingserver的所有功能,包括服务注册,服务发现,心跳检测等。
4.预留了namingserver对第三方注册中心的接口。
5.兼容了现有的其它第三方注册中心,包括nacos,zookeeper,etcd,eureka等。
6.对namingserver进行了全链路的集成测试,补充完善相关测试用例。
client和server端的测试如下:
/* * Copyright 1999-2019 Seata.io Group. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package io.seata.discovery.registry.namingserver; import io.seata.common.holder.ObjectHolder; import io.seata.config.Configuration; import io.seata.config.ConfigurationFactory; import io.seata.common.http.HttpServlet; import io.seata.discovery.registry.RegistryService; import org.apache.http.client.methods.CloseableHttpResponse; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.springframework.context.annotation.AnnotationConfigApplicationContext; import org.springframework.core.env.ConfigurableEnvironment; import org.springframework.core.env.MutablePropertySources; import org.springframework.core.env.PropertiesPropertySource; import java.net.InetSocketAddress; import java.rmi.RemoteException; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import static io.seata.common.Constants.OBJECT_KEY_SPRING_CONFIGURABLE_ENVIRONMENT; import static org.junit.jupiter.api.Assertions.assertEquals; @Disabled class NamingserverRegistryServiceImplTest { private static final Configuration FILE_CONFIG = ConfigurationFactory.CURRENT_FILE_INSTANCE; @BeforeAll public static void beforeClass() throws Exception { System.setProperty("registry.namingserver.namespace", "dev"); System.setProperty("registry.namingserver.cluster", "cluster1"); System.setProperty("registry.namingserver.serverAddr", "127.0.0.1:8080"); AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(); // get environment ConfigurableEnvironment environment = context.getEnvironment(); MutablePropertySources propertySources = environment.getPropertySources(); Properties customProperties = new Properties(); customProperties.setProperty("seata.registry.namingserver.server-addr[0]", "127.0.0.1:8080"); PropertiesPropertySource customPropertySource = new PropertiesPropertySource("customSource", customProperties); propertySources.addLast(customPropertySource); ObjectHolder.INSTANCE.setObject(OBJECT_KEY_SPRING_CONFIGURABLE_ENVIRONMENT, environment); } @Test public void unregister1() throws Exception { NamingserverRegistryServiceImpl namingserverRegistryService = NamingserverRegistryServiceImpl.getInstance(); InetSocketAddress inetSocketAddress = new InetSocketAddress("127.0.0.1", 8080); namingserverRegistryService.register(inetSocketAddress); namingserverRegistryService.unregister(inetSocketAddress); } @Test public void getNamingAddrsTest() { NamingserverRegistryServiceImpl namingserverRegistryService = NamingserverRegistryServiceImpl.getInstance(); List<String> list = namingserverRegistryService.getNamingAddrs(); assertEquals(list.size(), 1); } @Test public void getNamingAddrTest() { NamingserverRegistryServiceImpl namingserverRegistryService = NamingserverRegistryServiceImpl.getInstance(); String addr = namingserverRegistryService.getNamingAddr(); assertEquals(addr, "127.0.0.1:8080"); } @Test public void convertTest() { InetSocketAddress inetSocketAddress = new InetSocketAddress("127.0.0.1", 8088); assertEquals(inetSocketAddress.getAddress().getHostAddress(), "127.0.0.1"); assertEquals(inetSocketAddress.getPort(), 8088); } @Test public void testRegister1() throws Exception { RegistryService registryService = new NamingserverRegistryProvider().provide(); InetSocketAddress inetSocketAddress1 = new InetSocketAddress("127.0.0.1", 8088); //1.register registryService.register(inetSocketAddress1); //2.create vGroup in cluster createGroupInCluster("dev", "group1", "cluster1"); //3.get instances List<InetSocketAddress> list = registryService.lookup("group1"); assertEquals(list.size(), 1); InetSocketAddress inetSocketAddress = list.get(0); assertEquals(inetSocketAddress.getAddress().getHostAddress(), "127.0.0.1"); assertEquals(inetSocketAddress.getPort(), 8088); registryService.unregister(inetSocketAddress1); } @Test public void testRegister2() throws Exception { NamingserverRegistryServiceImpl registryService = (NamingserverRegistryServiceImpl) new NamingserverRegistryProvider().provide(); InetSocketAddress inetSocketAddress1 = new InetSocketAddress("127.0.0.1", 8088); InetSocketAddress inetSocketAddress2 = new InetSocketAddress("127.0.0.1", 8088); //1.register registryService.register(inetSocketAddress1); registryService.register(inetSocketAddress2); //2.create vGroup in cluster String namespace = FILE_CONFIG.getConfig("registry.namingserver.namespace"); createGroupInCluster(namespace, "group1", "cluster1"); //3.get instances List list = registryService.lookup("group1"); assertEquals(list.size(), 1); registryService.unregister(inetSocketAddress1); registryService.unregister(inetSocketAddress2); registryService.unsubscribe("group1"); } @Test public void testRegister3() throws Exception { NamingserverRegistryServiceImpl registryService = (NamingserverRegistryServiceImpl) new NamingserverRegistryProvider().provide(); InetSocketAddress inetSocketAddress1 = new InetSocketAddress("127.0.0.1", 8088); InetSocketAddress inetSocketAddress2 = new InetSocketAddress("127.0.0.1", 8089); InetSocketAddress inetSocketAddress3 = new InetSocketAddress("127.0.0.1", 8090); InetSocketAddress inetSocketAddress4 = new InetSocketAddress("127.0.0.1", 8091); //1.register registryService.register(inetSocketAddress1); registryService.register(inetSocketAddress2); registryService.register(inetSocketAddress3); registryService.register(inetSocketAddress4); //2.create vGroup in cluster String namespace = FILE_CONFIG.getConfig("registry.namingserver.namespace"); createGroupInCluster(namespace, "group2", "cluster1"); //3.get instances List list = registryService.lookup("group2"); assertEquals(list.size(), 4); registryService.unregister(inetSocketAddress1); registryService.unregister(inetSocketAddress2); registryService.unregister(inetSocketAddress3); registryService.unregister(inetSocketAddress4); registryService.unsubscribe("group2"); } @Test public void testUnregister() throws Exception { RegistryService registryService = new NamingserverRegistryProvider().provide(); InetSocketAddress inetSocketAddress1 = new InetSocketAddress("127.0.0.1", 8088); //1.register registryService.register(inetSocketAddress1); //2.create vGroup in cluster String namespace = FILE_CONFIG.getConfig("registry.namingserver.namespace"); createGroupInCluster(namespace, "group1", "cluster1"); //3.get instances List list = registryService.lookup("group1"); assertEquals(list.size(), 1); //4.unregister registryService.unregister(inetSocketAddress1); //5.get instances List list1 = registryService.lookup("group1"); assertEquals(list1.size(), 0); } // @Disabled @Test public void testWatch() throws Exception { NamingserverRegistryServiceImpl registryService = (NamingserverRegistryServiceImpl) new NamingserverRegistryProvider().provide(); //1.注册cluster1下的一个节点 InetSocketAddress inetSocketAddress1 = new InetSocketAddress("127.0.0.1", 8088); registryService.register(inetSocketAddress1); ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); int delaySeconds = 500; //2.延迟0.5s后在cluster1下创建事务分组group1 executor.schedule(() -> { try { String namespace = FILE_CONFIG.getConfig("registry.namingserver.namespace"); createGroupInCluster(namespace, "group1", "cluster1"); } catch (Exception e) { throw new RuntimeException(e); } executor.shutdown(); // 任务执行后关闭执行器 }, delaySeconds, TimeUnit.MILLISECONDS); //3.watch事务分组group1 long timestamp1 = System.currentTimeMillis(); boolean needFetch = registryService.watch("group1"); long timestamp2 = System.currentTimeMillis(); //4. 0.5s后group1被映射到cluster1下,应该有数据在1s内推送到client端 assert timestamp2 - timestamp1 < 1500; //5. 获取实例 List<InetSocketAddress> list = registryService.lookup("group1"); registryService.unsubscribe("group1"); assertEquals(list.size(), 1); InetSocketAddress inetSocketAddress = list.get(0); assertEquals(inetSocketAddress.getAddress().getHostAddress(), "127.0.0.1"); assertEquals(inetSocketAddress.getPort(), 8088); } // @Disabled @Test public void testSubscribe() throws Exception { NamingserverRegistryServiceImpl registryService = NamingserverRegistryServiceImpl.getInstance(); AtomicBoolean isNotified = new AtomicBoolean(false); //1.subscribe registryService.subscribe(vGroup -> { try { isNotified.set(true); } catch (Exception e) { throw new RuntimeException(e); } }, "group2"); //2.register InetSocketAddress inetSocketAddress = new InetSocketAddress("127.0.0.1", 8088); registryService.register(inetSocketAddress); String namespace = FILE_CONFIG.getConfig("registry.namingserver.namespace"); createGroupInCluster(namespace, "group2", "cluster1"); //3.check assertEquals(isNotified.get(), true); registryService.unsubscribe("group2"); } @Test public void testUnsubscribe() throws Exception { NamingserverRegistryServiceImpl registryService = (NamingserverRegistryServiceImpl) new NamingserverRegistryProvider().provide(); NamingListenerimpl namingListenerimpl = new NamingListenerimpl(); //1.subscribe registryService.subscribe(namingListenerimpl, "group1"); //2.register InetSocketAddress inetSocketAddress = new InetSocketAddress("127.0.0.1", 8088); registryService.register(inetSocketAddress); String namespace = FILE_CONFIG.getConfig("registry.namingserver.namespace"); createGroupInCluster(namespace, "group1", "cluster1"); //3.check assertEquals(namingListenerimpl.isNotified, true); namingListenerimpl.setNotified(false); //4.unsubscribe registryService.unsubscribe(namingListenerimpl, "group1"); //5.unregister registryService.unregister(inetSocketAddress); //5.check assertEquals(namingListenerimpl.isNotified, false); } public void createGroupInCluster(String namespace, String vGroup, String clusterName) throws Exception { Map<String, String> paraMap = new HashMap<>(); paraMap.put("namespace", namespace); paraMap.put("vGroup", vGroup); paraMap.put("clusterName", clusterName); String url = "http://127.0.0.1:8080/naming/v1/createGroup"; try { CloseableHttpResponse response = HttpServlet.doGet(url, paraMap); } catch (Exception e) { throw new RemoteException(); } } private class NamingListenerimpl implements NamingListener { public boolean isNotified = false; public boolean isNotified() { return isNotified; } public void setNotified(boolean notified) { isNotified = notified; } @Override public void onEvent(String vGroup) { isNotified = true; } } };
测试名称 测试功能 通过情况 testRegister1 测试server节点注册后能否被发现 通过 testRegister2 测试同一server节点注册两次,服务发现时是否只有唯一一条数据 通过 testRegister3 测试多个server节点注册后能否全部被发现 通过 testUnregister 测试server节点register后又unregister是否不会被发现 通过 testWatch 测试server节点注册信息改变能否在1s内通知到client 通过 testSubscribe 测试server节点注册信息改变是否被通知 通过 testUnsubscribe 测试取消订阅后server节点注册信息改变是否会通知client 通过 namingserver端的测试如下:
package io.seata.namingserver; import io.seata.common.metadata.Cluster; import io.seata.common.metadata.MetaResponse; import io.seata.common.metadata.Node; import io.seata.common.metadata.Unit; import org.apache.tools.ant.taskdefs.Sleep; import org.junit.jupiter.api.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import java.util.UUID; import static org.junit.jupiter.api.Assertions.*; @RunWith(SpringRunner.class) @SpringBootTest class NamingControllerTest { @Value("${heartbeat.threshold}") private int threshold; @Value("${heartbeat.period}") private int period; @Autowired NamingController namingController; @Test void mockRegister() { String clusterName = "cluster1"; String namespace = "public"; String unitName = String.valueOf(UUID.randomUUID()); Node node = new Node(); node.setTransactionEndpoint(new Node.Endpoint("127.0.0.1", 8080, "netty")); namingController.registerInstance(namespace, clusterName, unitName, node); String vGroup = "vgroup1"; namingController.changeGroup(namespace, clusterName, null, vGroup); MetaResponse metaResponse = namingController.discovery(vGroup, namespace); assertNotNull(metaResponse); assertNotNull(metaResponse.getClusterList()); assertEquals(1, metaResponse.getClusterList().size()); Cluster cluster = metaResponse.getClusterList().get(0); assertNotNull(cluster.getUnitData()); assertEquals(1, cluster.getUnitData().size()); Unit unit = cluster.getUnitData().get(0); assertNotNull(unit.getNamingInstanceList()); assertEquals(1, unit.getNamingInstanceList().size()); Node node1 = unit.getNamingInstanceList().get(0); assertEquals("127.0.0.1", node1.getTransactionEndpoint().getHost()); assertEquals(8080, node1.getTransactionEndpoint().getPort()); namingController.unregisterInstance(unitName, node); } @Test void mockUnregisterGracefully() { String clusterName = "cluster1"; String namespace = "public"; String unitName = String.valueOf(UUID.randomUUID()); Node node = new Node(); node.setTransactionEndpoint(new Node.Endpoint("127.0.0.1", 8080, "netty")); namingController.registerInstance(namespace, clusterName, unitName, node); String vGroup = "vgroup1"; namingController.changeGroup(namespace, clusterName, null, vGroup); MetaResponse metaResponse = namingController.discovery(vGroup, namespace); assertNotNull(metaResponse); assertNotNull(metaResponse.getClusterList()); assertEquals(1, metaResponse.getClusterList().size()); Cluster cluster = metaResponse.getClusterList().get(0); assertNotNull(cluster.getUnitData()); assertEquals(1, cluster.getUnitData().size()); Unit unit = cluster.getUnitData().get(0); assertNotNull(unit.getNamingInstanceList()); assertEquals(1, unit.getNamingInstanceList().size()); Node node1 = unit.getNamingInstanceList().get(0); assertEquals("127.0.0.1", node1.getTransactionEndpoint().getHost()); assertEquals(8080, node1.getTransactionEndpoint().getPort()); namingController.unregisterInstance(unitName, node); metaResponse = namingController.discovery(vGroup, namespace); assertNotNull(metaResponse); assertNotNull(metaResponse.getClusterList()); assertEquals(1, metaResponse.getClusterList().size()); cluster = metaResponse.getClusterList().get(0); assertEquals(0, cluster.getUnitData().size()); } @Test void mockUnregisterUngracefully() throws InterruptedException { String clusterName = "cluster1"; String namespace = "public"; String unitName = String.valueOf(UUID.randomUUID()); Node node = new Node(); node.setTransactionEndpoint(new Node.Endpoint("127.0.0.1", 8080, "netty")); namingController.registerInstance(namespace, clusterName, unitName, node); String vGroup = "vgroup1"; namingController.changeGroup(namespace, clusterName, null, vGroup); MetaResponse metaResponse = namingController.discovery(vGroup, namespace); assertNotNull(metaResponse); assertNotNull(metaResponse.getClusterList()); assertEquals(1, metaResponse.getClusterList().size()); Cluster cluster = metaResponse.getClusterList().get(0); assertNotNull(cluster.getUnitData()); assertEquals(1, cluster.getUnitData().size()); Unit unit = cluster.getUnitData().get(0); assertNotNull(unit.getNamingInstanceList()); assertEquals(1, unit.getNamingInstanceList().size()); Node node1 = unit.getNamingInstanceList().get(0); assertEquals("127.0.0.1", node1.getTransactionEndpoint().getHost()); assertEquals(8080, node1.getTransactionEndpoint().getPort()); int timeGap = threshold + period; Thread.sleep(timeGap); metaResponse = namingController.discovery(vGroup, namespace); assertNotNull(metaResponse); assertNotNull(metaResponse.getClusterList()); assertEquals(1, metaResponse.getClusterList().size()); cluster = metaResponse.getClusterList().get(0); assertEquals(0, cluster.getUnitData().size()); } }
测试名称 测试功能 通过情况 mockRegister 测试namingserver接收register请求后是否注册成功 通过 mockUnregisterGracefully 测试mock tc节点优雅下线后namingserver能否将其立即剔除 通过 mockUnregisterUngracefully 测试mock tc节点非优雅下线后namingserver能否在规定时间内(心跳检查间隔时间+心跳间隔阈值)将其剔除 通过 7.在seata-samples上对namingserver进行了集成和验证,在2个tc节点,1个namingserver节点进行压测1000次,结果如下:
8.namingserver提供的restful api接口如下:
接口名称 | 接口类型 | 说明 | 参数 |
---|---|---|---|
registerInstance | Post | server服务注册接口 | namespace 节点所属命名空间 clusterName 节点所属集群名称 unitName 节点所属unit名称 node 节点实体类 |
unregisterInstance | Post | server服务下线接口 | unitName 节点所属命名空间 node 节点实体类 |
discovery | Get | client服务发现接口 | namespace 命名空间 vGroup 事务分组 |
changeGroup | Post | 控制台修改事务接口 | namespace 命名空间 clusterName 集群名称 unitName unit名称 vGroup 事务分组 |
watch | Get | client服务订阅接口 | clientTerm 客户端保存的订阅时间戳 vGroup 事务分组 timeout 超时时间 request 客户端HTTP请求 |
health | Get | namingserver健康检查接口 | 无 |
clusters | Get | 控制台监控所有集群状态接口 | namespace 命名空间 |
接口详情请查看[namingserver (getpostman.com)](https://documenter.getpostman.com/view/26098677/2s9YC31uAW)
- 遇到的问题及解决方案:
解决方案:采取定时pull+实时push的方式刷新client侧的tc列表缓存。
如上图所示,每隔30s client侧需要向namingserver发起一次服务发现的请求,用以拉取最新的tc列表。而在这30s的间隔中,client侧将采用HTTP长轮询的方式一直watch namingserver节点,如果namingserver 侧有如下的变化:
1.事务分组映射关系的变化;
2.集群中实例的增加或者减少;
3.集群中实例的属性的变化;
那么watch返回200状态码,告知client需要获取最新集群信息;否则namingserver将一直挂起watch方法,直到HTTP长轮询超时,然后返回304状态码, 告知client进行下一轮watch。
解决方案:namingserver将路由信息转存到seata的server侧,server通过db、file或者redis的方式进行存储,并且通过强弱链路向每一个namingserver发起路由同步的请求。
如图所示,当控制台发起在某个集群中创建或者修改事务分组的请求时,namingserver首先会修改自己内存态的映射关系,然后转发给对应集群的tc节点,tc节点在收到请求后会根据不同的存储方式将映射关系进行持久化存储,然后立即将映射关系一对多地同步给每一个namingserver节点(强链路),为了确保每个namingserver节点达成最终的一致性,tc还需要在心跳时携带包含映射关系的元数据,定时地推送给namingserver。