- 一个RPC框架
文档: http://dubbo.apache.org/zh-cn/docs/user/quick-start.html
一般情况下dubbo分为如下几个模块
- dubbo-client: 客户端
- dubbo-server: 服务端
- server-api:接口
- server-provider: 接口实现
- 依赖server-api
项目结构如下
-
server-api 接口
public interface DubboHello { String hello(String msg); }
-
server-provider接口实现
public class DubboHelloImpl implements DubboHello { @Override public String hello(String msg) { return "dubbo : " + msg; } }
-
dubbo-client客户端
public class HelloClient { public static void main(String[] args) { DubboHello dubboHello = null; String helloDubbo = dubboHello.hello("hello dubbo"); System.out.println(helloDubbo); } }
- 这样是没办法进行方法调用,此时client依赖
server-api
,还需要进行服务注册
- 这样是没办法进行方法调用,此时client依赖
-
server-provider 添加dubbo依赖
<properties> <dubbo.version>2.7.2</dubbo.version> </properties> <dependency> <groupId>org.apache.dubbo</groupId> <artifactId>dubbo</artifactId> <version>${dubbo.version}</version> </dependency>
-
dubbo-server.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:dubbo="http://dubbo.apache.org/schema/dubbo" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.3.xsd http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd"> <!-- 提供方应用信息,用于计算依赖关系 --> <dubbo:application name="dubbo-server-provider" owner="hufier"/> <!-- 使用multicast广播注册中心暴露服务地址 --> <dubbo:registry address="N/A"/> <!-- 用dubbo协议在20880端口暴露服务 --> <dubbo:protocol name="dubbo" port="20880"/> <!-- 声明需要暴露的服务接口 --> <dubbo:service interface="com.huifer.dubbo.server.api.DubboHello" ref="dubboHello"/> <!-- 和本地bean一样实现服务 id=server:ref--> <bean id="dubboHello" class="com.huifer.dubbo.server.provider.DubboHelloImpl"/> </beans>
-
Bootstrap
public class Bootstrap { public static void main(String[] args) throws IOException { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext( "dubbo-server.xml"); context.start(); System.in.read(); } }
-
启动
dubbo://0.0.0.0:20880/com.huifer.dubbo.server.api.DubboHello?anyhost=true&application=dubbo-server-provider&bean.name=com.huifer.dubbo.server.api.DubboHello&bind.ip=0.0.0.0&bind.port=20880&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=com.huifer.dubbo.server.api.DubboHello&methods=hello&owner=hufier&pid=12916®ister=true&release=2.7.2&side=provider×tamp=1560395544486, dubbo version: 2.7.2, current host: 0.0.0.0
-
dubbo-client
也需要加入dubbo
依赖 -
dubbo-client.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:dubbo="http://dubbo.apache.org/schema/dubbo" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.3.xsd http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd"> <!-- 消费方应用名,用于计算依赖关系,不是匹配条件,不要与提供方一样 --> <dubbo:application name="dubbo-client" /> <!-- 使用multicast广播注册中心暴露发现服务地址 --> <dubbo:registry address="N/A" /> <!-- 生成远程服务代理,可以和本地bean一样使用demoService --> <dubbo:reference id="dubboHello" interface="com.huifer.dubbo.server.api.DubboHello" url="dubbo://0.0.0.0:20880/com.huifer.dubbo.server.api.DubboHello"/> </beans>
-
HelloClient
public class HelloClient { public static void main(String[] args) { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext( "dubbo-client.xml"); DubboHello dubboHello = (DubboHello) context.getBean("dubboHello"); String helloDubbo = dubboHello.hello("hello dubbo"); System.out.println(helloDubbo); } }
-
运行结果
dubbo : hello dubbo
-
-
依赖
<dependency> <groupId>org.apache.dubbo</groupId> <artifactId>dubbo</artifactId> <version>${dubbo.version}</version> </dependency> <dependency> <groupId>org.apache.dubbo</groupId> <artifactId>dubbo-dependencies-zookeeper</artifactId> <version>${dubbo.version}</version> </dependency>
-
服务端地址配置
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:dubbo="http://dubbo.apache.org/schema/dubbo"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.3.xsd http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd">
<!-- 提供方应用信息,用于计算依赖关系 -->
<dubbo:application name="dubbo-server-provider" owner="hufier"/>
<dubbo:registry address="zookeeper://192.168.1.107:2181"/>
<!-- 用dubbo协议在20880端口暴露服务 -->
<dubbo:protocol name="dubbo" port="20880"/>
<!-- 声明需要暴露的服务接口 -->
<dubbo:service interface="com.huifer.dubbo.server.api.DubboHello" ref="dubboHello"/>
<!-- 和本地bean一样实现服务 id=server:ref-->
<bean id="dubboHello" class="com.huifer.dubbo.server.provider.DubboHelloImpl"/>
</beans>
-
客户端配置
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:dubbo="http://dubbo.apache.org/schema/dubbo" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.3.xsd http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd"> <!-- 消费方应用名,用于计算依赖关系,不是匹配条件,不要与提供方一样 --> <dubbo:application name="dubbo-client" /> <dubbo:registry address="zookeeper://192.168.1.107:2181"/> <!-- 生成远程服务代理,可以和本地bean一样使用demoService --> <dubbo:reference id="dubboHello" interface="com.huifer.dubbo.server.api.DubboHello" /> </beans>
启动一个dubbo服务端项目在zookeeper的结构
- 启动一个dubbo服务端,内含包括
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:dubbo="http://dubbo.apache.org/schema/dubbo"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.3.xsd http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd">
<!-- 提供方应用信息,用于计算依赖关系 -->
<dubbo:application name="dubbo-server-provider" owner="hufier"/>
<dubbo:registry address="zookeeper://192.168.1.107:2181"/>
<!-- 用dubbo协议在20880端口暴露服务 -->
<dubbo:protocol name="dubbo" port="20880"/>
<!-- 声明需要暴露的服务接口 protocl可以填写多个协议 -->
<dubbo:service interface="com.huifer.dubbo.server.api.DubboHello" ref="dubboHello" protocol="dubbo"/>
<dubbo:service interface="com.huifer.dubbo.server.api.DubboHello2" ref="dubboHello2" protocol="hessian"/>
<!-- 和本地bean一样实现服务 id=server:ref-->
<bean id="dubboHello" class="com.huifer.dubbo.server.provider.DubboHelloImpl"/>
<bean id="dubboHello2" class="com.huifer.dubbo.server.provider.DubboHelloImpl2"/>
</beans>
- dubbo在zookeeper下的存放结构
graph TD;
/ --> /dubbo
/dubbo --> /DubboHello
/dubbo --> /DubboHello2
/DubboHello --> /providers
/providers --> 具体协议dubbo+接口
/providers --> 具体协议hessian+接口
- 定义接口
public interface DubboVersion1 {
/**
* 第一个版本的hello
* @param o
* @return
*/
String sayHelloV1(String o);
}
-
同一个接口2个版本实现
public class DubboV1 implements DubboVersion1 { @Override public String sayHelloV1(String o) { return "version1 : " + o; } }
public class DubboV2 implements DubboVersion1 { @Override public String sayHelloV1(String o) { return "version2 : " + o; } }
-
服务端配置
<dubbo:service interface="com.huifer.dubbo.server.api.DubboVersion1" ref="dv1" protocol="dubbo" version="1.0.0"/> <dubbo:service interface="com.huifer.dubbo.server.api.DubboVersion1" ref="dv2" protocol="dubbo" version="2.0.0"/> <bean id="dv1" class="com.huifer.dubbo.server.provider.DubboV1" /> <bean id="dv2" class="com.huifer.dubbo.server.provider.DubboV2"/>
-
客户端配置
<dubbo:reference id="acac" interface="com.huifer.dubbo.server.api.DubboHello" version="1.0.0"/>
-
客户端代码
public class HelloClient { public static void main(String[] args) { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext( "dubbo-client.xml"); DubboVersion1 dv1 = (DubboVersion1) context.getBean("acac"); System.out.println(dv1.sayHelloV1("hello-dubbo")); } }
-
想要替换成
version=2.0.0
只需要将 客户端配置中version
进行修改即可 -
zookeeper变化
- 在这个路径中存放了
version=xxx
[dubbo%3A%2F%2F0.0.0.0%3A20880%2Fcom.huifer.dubbo.server.api.DubboVersion1%3Fanyhost%3Dtrue%26application%3Ddubbo-server-provider%26bean.name%3Dcom.huifer.dubbo.server.api.DubboVersion1%26deprecated%3Dfalse%26dubbo%3D2.0.2%26dynamic%3Dtrue%26generic%3Dfalse%26interface%3Dcom.huifer.dubbo.server.api.DubboVersion1%26methods%3DsayHelloV1%26owner%3Dhufier%26pid%3D10848%26register%3Dtrue%26release%3D2.7.2%26revision%3D1.0.0%26side%3Dprovider%26timestamp%3D1560473277839%26version%3D1.0.0, dubbo%3A%2F%2F0.0.0.0%3A20880%2Fcom.huifer.dubbo.server.api.DubboVersion12%3Fanyhost%3Dtrue%26application%3Ddubbo-server-provider%26bean.name%3Dcom.huifer.dubbo.server.api.DubboVersion12%26deprecated%3Dfalse%26dubbo%3D2.0.2%26dynamic%3Dtrue%26generic%3Dfalse%26interface%3Dcom.huifer.dubbo.server.api.DubboVersion1%26methods%3DsayHelloV1%26owner%3Dhufier%26pid%3D10848%26register%3Dtrue%26release%3D2.7.2%26revision%3D2.0.0%26side%3Dprovider%26timestamp%3D1560473277917%26version%3D2.0.0]
- 在这个路径中存放了
<dubbo:reference id="acac"
interface="com.huifer.dubbo.server.api.DubboVersion1"
version="2.0.0"
cluster="failsafe"
/>
org.apache.dubbo.rpc.cluster.support
包下
- failsafe
- 请求失败,记录请求,
- failover
- 默认
- 请求失败重试其他服务器
- 重试次数:2 ,不包含第一次请求,总共发送3次请求
- 搜索查询推荐使用
- failfast
- 快速失败,失败直接报错
- 数据操作增加、删除、修改推荐使用
- failback
- 失败后恢复
- 数据操作增加、删除、修改推荐使用
- forking
- 同时调用N个节点(由forks设置),多个节点谁先返回就返回这个结果
- forks:最大并行数量
- broadcast
- 广播,只要由一台服务报错执行就报错
标签 | 用途 | 解释 |
---|---|---|
<dubbo:service/> |
服务配置 | 用于暴露一个服务,定义服务的元信息,一个服务可以用多个协议暴露,一个服务也可以注册到多个注册中心 |
<dubbo:reference/> [2] |
引用配置 | 用于创建一个远程服务代理,一个引用可以指向多个注册中心 |
<dubbo:protocol/> |
协议配置 | 用于配置提供服务的协议信息,协议由提供方指定,消费方被动接受 |
<dubbo:application/> |
应用配置 | 用于配置当前应用信息,不管该应用是提供者还是消费者 |
<dubbo:module/> |
模块配置 | 用于配置当前模块信息,可选 |
<dubbo:registry/> |
注册中心配置 | 用于配置连接注册中心相关信息 |
<dubbo:monitor/> |
监控中心配置 | 用于配置连接监控中心相关信息,可选 |
<dubbo:provider/> |
提供方配置 | 当 ProtocolConfig 和 ServiceConfig 某属性没有配置时,采用此缺省值,可选 |
<dubbo:consumer/> |
消费方配置 | 当 ReferenceConfig 某属性没有配置时,采用此缺省值,可选 |
<dubbo:method/> |
方法配置 | 用于 ServiceConfig 和 ReferenceConfig 指定方法级的配置信息 |
<dubbo:argument/> |
参数配置 | 用于指定方法参数配置 |
官方文档-配置http://dubbo.apache.org/zh-cn/docs/user/configuration/xml.html
-
客户端>服务端
-
方法>接口>全局配置
-
同级别客户端优先
mock 客户端配置
-
定义mock类
public class MockDemo implements DubboVersion1 { @Override public String sayHelloV1(String o) { return "服务降级-mock" + o; } }
-
客户端配置
1毫秒完成调用
<dubbo:reference id="acac" interface="com.huifer.dubbo.server.api.DubboVersion1" version="2.0.0" timeout="1" mock="com.huifer.dubbo.client.mock.MockDemo" />
public class HelloClient { public static void main(String[] args) { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext( "dubbo-client.xml"); DubboVersion1 dv1 = (DubboVersion1) context.getBean("acac"); System.out.println(dv1.sayHelloV1("hello-dubbo")); } }
-
运行结果
服务降级-mock : hello-dubbo
调用者-->BaseService:调用
BaseService --> BaseServiceImpl1:发现
BaseService --> BaseServiceImpl2:发现
-
基本接口
public interface BaseService { String hello(String msg); }
-
实现1
public class BaseServiceImplV1 implements BaseService { @Override public String hello(String msg) { return "v1 : " + msg; } }
-
实现2
public class BaseServiceImplV2 implements BaseService { @Override public String hello(String msg) { return "v2 : " + msg; } }
-
配置文件
src/main/resources/
下创建/META-INF/services
,并且以接口全类名创建文件,填写内容为实现该类的实现类的全类名 -
启动
public class Run { public static void main(String[] args) { ServiceLoader<BaseService> baseServices = ServiceLoader.load(BaseService.class); for (BaseService baseService : baseServices) { System.out.println(baseService.hello("fff"));; } } }
-
运行结果
v1 : fff v2 : fff
-
官方文档 http://dubbo.apache.org/zh-cn/docs/source_code_guide/dubbo-spi.html
-
dubbo中spi资源应该放在那里?
- /META-INF/dubbo下
-
文件编写
- 文件名: 全路径类名
- 内容 key=value
-
接口
@SPI public interface Robot { void sayHello(); }
-
接口实现1
public class Bumblebee implements Robot { @Override public void sayHello() { System.out.println("Hello, I am Bumblebee."); } }
-
接口实现2
public class OptimusPrime implements Robot { @Override public void sayHello() { System.out.println("Hello, I am Optimus Prime."); } }
-
配置文件
optimusPrime=com.huifer.dubbo.client.spi.OptimusPrime bumblebee=com.huifer.dubbo.client.spi.Bumblebee
-
启动器
public class SpiDemo { public static void main(String[] args) { ExtensionLoader<Robot> extensionLoader = ExtensionLoader.getExtensionLoader(Robot.class); Robot optimusPrime = extensionLoader.getExtension("optimusPrime"); optimusPrime.sayHello(); Robot bumblebee = extensionLoader.getExtension("bumblebee"); bumblebee.sayHello(); } }
- 下图为一个错误的拼写
相关issues: apache/dubbo#4310 问题在单词拼写 😢
调用链
org.apache.dubbo.common.extension.ExtensionLoader#getExtensionLoader
org.apache.dubbo.common.extension.ExtensionLoader#getAdaptiveExtension
org.apache.dubbo.common.extension.ExtensionLoader#createAdaptiveExtension
org.apache.dubbo.common.extension.ExtensionLoader#getAdaptiveExtensionClass
org.apache.dubbo.common.extension.ExtensionLoader#getExtensionClasses
org.apache.dubbo.common.extension.ExtensionLoader#loadExtensionClasses
org.apache.dubbo.common.extension.ExtensionLoader#cacheDefaultExtensionName
org.apache.dubbo.common.extension.ExtensionLoader#createAdaptiveExtensionClass
-
org.apache.dubbo.common.extension.ExtensionLoader#getExtensionLoader
- 作用: 传入一个class 来获得
ExtensionLoader
@SuppressWarnings("unchecked") public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> type) { if (type == null) { throw new IllegalArgumentException("Extension type == null"); } if (!type.isInterface()) { throw new IllegalArgumentException("Extension type (" + type + ") is not an interface!"); } if (!withExtensionAnnotation(type)) { throw new IllegalArgumentException("Extension type (" + type + ") is not an extension, because it is NOT annotated with @" + SPI.class.getSimpleName() + "!"); } // 从EXTENSION_LOADERS中获取 这是个Map<Class,ExtensionLoader> ExtensionLoader<T> loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type); if (loader == null) { // 创建一个 ExtensionLoader EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader<T>(type)); loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type); } return loader; }
- 作用: 传入一个class 来获得
-
继续debug进入到了
org.apache.dubbo.common.extension.ExtensionLoader#getAdaptiveExtension
方法- 将instance 创建到内存中
@SuppressWarnings("unchecked") public T getAdaptiveExtension() { Object instance = cachedAdaptiveInstance.get(); if (instance == null) { if (createAdaptiveInstanceError == null) { synchronized (cachedAdaptiveInstance) { instance = cachedAdaptiveInstance.get(); if (instance == null) { try { // 将 instance 放入cachedAdaptiveInstance instance = createAdaptiveExtension(); cachedAdaptiveInstance.set(instance); } catch (Throwable t) { createAdaptiveInstanceError = t; throw new IllegalStateException("Failed to create adaptive instance: " + t.toString(), t); } } } } else { throw new IllegalStateException("Failed to create adaptive instance: " + createAdaptiveInstanceError.toString(), createAdaptiveInstanceError); } } return (T) instance; }
-
createAdaptiveExtension()
@SuppressWarnings("unchecked") private T createAdaptiveExtension() { try { // 获得一个适配器实例 return injectExtension((T) getAdaptiveExtensionClass().newInstance()); } catch (Exception e) { throw new IllegalStateException("Can't create adaptive extension " + type + ", cause: " + e.getMessage(), e); } }
-
getAdaptiveExtensionClass()
private Class<?> getAdaptiveExtensionClass() { getExtensionClasses(); if (cachedAdaptiveClass != null) { return cachedAdaptiveClass; } return cachedAdaptiveClass = createAdaptiveExtensionClass(); }
-
getExtensionClasses
- 获取所有扩展类
Holder<Map<String, Class<?>>> cachedClasses
private Map<String, Class<?>> getExtensionClasses() { Map<String, Class<?>> classes = cachedClasses.get(); if (classes == null) { synchronized (cachedClasses) { classes = cachedClasses.get(); if (classes == null) { classes = loadExtensionClasses(); cachedClasses.set(classes); } } } return classes; }
- 获取所有扩展类
-
loadExtensionClasses
加载dubbo预设扩展点
private Map<String, Class<?>> loadExtensionClasses() { cacheDefaultExtensionName(); Map<String, Class<?>> extensionClasses = new HashMap<>(); loadDirectory(extensionClasses, DUBBO_INTERNAL_DIRECTORY, type.getName()); loadDirectory(extensionClasses, DUBBO_INTERNAL_DIRECTORY, type.getName().replace("org.apache", "com.alibaba")); loadDirectory(extensionClasses, DUBBO_DIRECTORY, type.getName()); loadDirectory(extensionClasses, DUBBO_DIRECTORY, type.getName().replace("org.apache", "com.alibaba")); loadDirectory(extensionClasses, SERVICES_DIRECTORY, type.getName()); loadDirectory(extensionClasses, SERVICES_DIRECTORY, type.getName().replace("org.apache", "com.alibaba")); return extensionClasses; }
private static final String SERVICES_DIRECTORY = "META-INF/services/"; private static final String DUBBO_DIRECTORY = "META-INF/dubbo/"; private static final String DUBBO_INTERNAL_DIRECTORY = DUBBO_DIRECTORY + "internal/";
org.apache.dubbo.common.extension.ExtensionFactory
adaptive=org.apache.dubbo.common.extension.factory.AdaptiveExtensionFactory spi=org.apache.dubbo.common.extension.factory.SpiExtensionFactory spring=org.apache.dubbo.config.spring.extension.SpringExtensionFactory
这些内容会被加载,当然项目中的也会被加载
-
cacheDefaultExtensionName
-
createAdaptiveExtensionClass
- 创建扩展点
private Class<?> createAdaptiveExtensionClass() { String code = new AdaptiveClassCodeGenerator(type, cachedDefaultName).generate(); ClassLoader classLoader = findClassLoader(); org.apache.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(org.apache.dubbo.common.compiler.Compiler.class).getAdaptiveExtension(); return compiler.compile(code, classLoader); }
-
xml解析器
org.apache.dubbo.config.spring.schema.DubboNamespaceHandler
org.apache.dubbo.config.spring.schema.DubboBeanDefinitionParser
- 将配置文件中的标签解析成具体的类
public class DubboNamespaceHandler extends NamespaceHandlerSupport { static { Version.checkDuplicate(DubboNamespaceHandler.class); } @Override public void init() { registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true)); registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true)); registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true)); registerBeanDefinitionParser("config-center", new DubboBeanDefinitionParser(ConfigCenterBean.class, true)); registerBeanDefinitionParser("metadata-report", new DubboBeanDefinitionParser(MetadataReportConfig.class, true)); registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true)); registerBeanDefinitionParser("metrics", new DubboBeanDefinitionParser(MetricsConfig.class, true)); registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true)); registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true)); registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true)); registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true)); registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false)); registerBeanDefinitionParser("annotation", new AnnotationBeanDefinitionParser()); } }
-
org.apache.dubbo.config.spring.ServiceBean
public class ServiceBean<T> extends ServiceConfig<T> implements InitializingBean, DisposableBean, ApplicationContextAware, ApplicationListener<ContextRefreshedEvent>, BeanNameAware, ApplicationEventPublisherAware { // ... // 入口 if (!supportedApplicationListener) { export(); } }
-
supportedApplicationListener是什么?
@Override public void setApplicationContext(ApplicationContext applicationContext) { this.applicationContext = applicationContext; SpringExtensionFactory.addApplicationContext(applicationContext); supportedApplicationListener = addApplicationListener(applicationContext, this); }
InitializingBean
org.springframework.beans.factory.InitializingBean#afterPropertiesSet
类初始化成功后做什么
DisposableBean
org.springframework.beans.factory.DisposableBean#destroy
类销毁后做什么
ApplicationContextAware
-
org.springframework.context.ApplicationContextAware#setApplicationContext
容器初始化时设置application context
-
ApplicationListener
org.springframework.context.ApplicationListener#onApplicationEvent
spring的监听事件,监听bean
BeanNameAware
org.springframework.beans.factory.BeanNameAware#setBeanName
设置bean的名称
ApplicationEventPublisherAware
org.springframework.context.ApplicationEventPublisherAware#setApplicationEventPublisher
spring的事件发布者
-
加载流程图调用链见最后总结
xml-->实体:DubboNamespaceHandler和DubboBeanDefinitionParser解析 实体-->ServerBean#afterPropertiesSet:初始化后执行afterPropertiesSet方法 ServerBean#afterPropertiesSet-->ServiceConfig#export:执行export方法 ServiceConfig#export-->ServiceConfig#doExport: 执行doExport ServiceConfig#doExport-->ServiceConfig#doExportUrls:获取注册中心 ServiceConfig#doExportUrls-->ServiceConfig#doExportUrlsFor1Protocol: 将配置文件中protocol转换成map,在由map转换成url...
-
doExportUrls
<dubbo:registry address="zookeeper://192.168.1.107:2181"/>
-
doExportUrlsFor1Protocol
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) { // 本地服务 if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) { exportLocal(url); } } private void exportLocal(URL url) { URL local = URLBuilder.from(url) .setProtocol(LOCAL_PROTOCOL) .setHost(LOCALHOST_VALUE) .setPort(0) .build(); // proxyFactory获取一个Invoker对象添加到Exporter中 Exporter<?> exporter = protocol.export( proxyFactory.getInvoker(ref, (Class) interfaceClass, local)); exporters.add(exporter); logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry url : " + local); }
-
Exporter<?> exporter = protocol.export( proxyFactory.getInvoker(ref, (Class) interfaceClass, local));
-
protocol
private static final Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
public interface Protocol {}
Protocol
是接口方法实现从哪来?org.apache.dubbo.common.extension.ExtensionLoader#createAdaptiveExtensionClass
具体查看上文
private Class<?> createAdaptiveExtensionClass() { String code = new AdaptiveClassCodeGenerator(type, cachedDefaultName).generate(); ClassLoader classLoader = findClassLoader(); org.apache.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(org.apache.dubbo.common.compiler.Compiler.class).getAdaptiveExtension(); return compiler.compile(code, classLoader); }
import org.apache.dubbo.common.extension.ExtensionLoader; public class Protocol$Adaptive implements org.apache.dubbo.rpc.Protocol { public void destroy() { throw new UnsupportedOperationException( "The method public abstract void org.apache.dubbo.rpc.Protocol.destroy() of interface org.apache.dubbo.rpc.Protocol is not adaptive method!"); } public int getDefaultPort() { throw new UnsupportedOperationException( "The method public abstract int org.apache.dubbo.rpc.Protocol.getDefaultPort() of interface org.apache.dubbo.rpc.Protocol is not adaptive method!"); } public org.apache.dubbo.rpc.Exporter export(org.apache.dubbo.rpc.Invoker arg0) throws org.apache.dubbo.rpc.RpcException { if (arg0 == null) { throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null"); } if (arg0.getUrl() == null) { throw new IllegalArgumentException( "org.apache.dubbo.rpc.Invoker argument getUrl() == null"); } org.apache.dubbo.common.URL url = arg0.getUrl(); String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol()); if (extName == null) { throw new IllegalStateException( "Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url (" + url .toString() + ") use keys([protocol])"); } org.apache.dubbo.rpc.Protocol extension = ExtensionLoader .getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName); return extension.export(arg0); } public org.apache.dubbo.rpc.Invoker refer(java.lang.Class arg0, org.apache.dubbo.common.URL arg1) throws org.apache.dubbo.rpc.RpcException { if (arg1 == null) { throw new IllegalArgumentException("url == null"); } org.apache.dubbo.common.URL url = arg1; String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol()); if (extName == null) { throw new IllegalStateException( "Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url (" + url .toString() + ") use keys([protocol])"); } org.apache.dubbo.rpc.Protocol extension = ExtensionLoader .getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName); return extension.refer(arg0, arg1); } }
- 在服务注册阶段
org.apache.dubbo.rpc.Protocol extension = ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
具体为org.apache.dubbo.registry.integration.RegistryProtocol
- 在服务注册阶段
-
org.apache.dubbo.registry.integration.RegistryProtocol#export
@Override public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException { URL registryUrl = getRegistryUrl(originInvoker); // url to export locally URL providerUrl = getProviderUrl(originInvoker); // Subscribe the override data // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call // the same service. Because the subscribed is cached key with the name of the service, it causes the // subscription information to cover. final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl); final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker); overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener); providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener); //export invoker // 发布服务 final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl); // url to registry final Registry registry = getRegistry(originInvoker); final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl); ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl); //to judge if we need to delay publish boolean register = registeredProviderUrl.getParameter("register", true); if (register) { register(registryUrl, registeredProviderUrl); providerInvokerWrapper.setReg(true); } // Deprecated! Subscribe to override rules in 2.6.x or before. registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); exporter.setRegisterUrl(registeredProviderUrl); exporter.setSubscribeUrl(overrideSubscribeUrl); //Ensure that a new exporter instance is returned every time export return new DestroyableExporter<>(exporter); }
-
doLocalExport
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) { String key = getCacheKey(originInvoker); return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> { Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl); return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker); }); }
protocol.export(invokerDelegate), originInvoker);
又一次使用Protocol$Adaptive
此时调用
DubboProtocol
-
org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#export
@Override public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { URL url = invoker.getUrl(); // export service. String key = serviceKey(url); DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap); exporterMap.put(key, exporter); //export an stub service for dispatching event Boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT); Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false); if (isStubSupportEvent && !isCallbackservice) { String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY); if (stubServiceMethods == null || stubServiceMethods.length() == 0) { if (logger.isWarnEnabled()) { logger.warn(new IllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY) + "], has set stubproxy support event ,but no stub methods founded.")); } } else { stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods); } } openServer(url); optimizeSerialization(url); return exporter; }
-
org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#openServer
private void openServer(URL url) { // find server. String key = url.getAddress(); //client can export a service which's only for server to invoke boolean isServer = url.getParameter(IS_SERVER_KEY, true); if (isServer) { ExchangeServer server = serverMap.get(key); if (server == null) { synchronized (this) { server = serverMap.get(key); if (server == null) { serverMap.put(key, createServer(url)); // 创建server } } } else { // server supports reset, use together with override server.reset(url); } } }
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);
完成
-
-
-
final Registry registry = getRegistry(originInvoker);
执行后注册到zookeeper中-
org.apache.dubbo.registry.integration.RegistryProtocol#getRegistry
private Registry getRegistry(final Invoker<?> originInvoker) { URL registryUrl = getRegistryUrl(originInvoker); return registryFactory.getRegistry(registryUrl); } private URL getRegistryUrl(Invoker<?> originInvoker) { URL registryUrl = originInvoker.getUrl(); if (REGISTRY_PROTOCOL.equals(registryUrl.getProtocol())) { String protocol = registryUrl.getParameter(REGISTRY_KEY, DEFAULT_REGISTRY); registryUrl = registryUrl.setProtocol(protocol).removeParameter(REGISTRY_KEY); } return registryUrl; }
完成协议转换,此时这个url为zookeeper协议地址 ,并非所有的都转换为zookeeper,根据配置转换
<dubbo:registry address="zookeeper://192.168.1.107:2181"/>
return registryFactory.getRegistry(registryUrl);
其中的registryFactory
就是ZookeeperRegistryFactory
-
org.apache.dubbo.registry.support.AbstractRegistryFactory#getRegistry
@Override public Registry getRegistry(URL url) { url = URLBuilder.from(url) .setPath(RegistryService.class.getName()) .addParameter(INTERFACE_KEY, RegistryService.class.getName()) .removeParameters(EXPORT_KEY, REFER_KEY) .build(); String key = url.toServiceStringWithoutResolving(); // Lock the registry access process to ensure a single instance of the registry LOCK.lock(); try { Registry registry = REGISTRIES.get(key); if (registry != null) { return registry; } //create registry by spi/ioc // 创建zookeeper Registry org.apache.dubbo.registry.zookeeper.ZookeeperRegistryFactory#createRegistry registry = createRegistry(url); if (registry == null) { throw new IllegalStateException("Can not create registry " + url); } REGISTRIES.put(key, registry); return registry; } finally { // Release the lock LOCK.unlock(); } }
-
org.apache.dubbo.registry.zookeeper.ZookeeperRegistryFactory#createRegistry
@Override public Registry createRegistry(URL url) { return new ZookeeperRegistry(url, zookeeperTransporter); }
-
org.apache.dubbo.registry.zookeeper.ZookeeperRegistry#ZookeeperRegistry
public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) { super(url); if (url.isAnyHost()) { throw new IllegalStateException("registry address == null"); } String group = url.getParameter(GROUP_KEY, DEFAULT_ROOT); if (!group.startsWith(PATH_SEPARATOR)) { group = PATH_SEPARATOR + group; } this.root = group; zkClient = zookeeperTransporter.connect(url); zkClient.addStateListener(state -> { if (state == StateListener.RECONNECTED) { try { recover(); } catch (Exception e) { logger.error(e.getMessage(), e); } } }); }
通过zookeeper客户端连接
此时
final Registry registry = getRegistry(originInvoker);
为org.apache.dubbo.registry.zookeeper.ZookeeperRegistry
-
-
org.apache.dubbo.registry.integration.RegistryProtocol#register
boolean register = registeredProviderUrl.getParameter("register", true); if (register) { register(registryUrl, registeredProviderUrl); providerInvokerWrapper.setReg(true); } public void register(URL registryUrl, URL registeredProviderUrl) { Registry registry = registryFactory.getRegistry(registryUrl); registry.register(registeredProviderUrl); }
-
org.apache.dubbo.registry.support.FailbackRegistry#register
ZookeeperRegistry
的父类org.apache.dubbo.registry.support.FailbackRegistry
方法@Override public void register(URL url) { super.register(url); removeFailedRegistered(url); removeFailedUnregistered(url); try { // Sending a registration request to the server side // 注册 doRegister(url); } catch (Exception e) { Throwable t = e; // If the startup detection is opened, the Exception is thrown directly. boolean check = getUrl().getParameter(Constants.CHECK_KEY, true) && url.getParameter(Constants.CHECK_KEY, true) && !CONSUMER_PROTOCOL.equals(url.getProtocol()); boolean skipFailback = t instanceof SkipFailbackWrapperException; if (check || skipFailback) { if (skipFailback) { t = t.getCause(); } throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t); } else { logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t); } // Record a failed registration request to a failed list, retry regularly addFailedRegistered(url); } }
-
doRegister
方法由org.apache.dubbo.registry.zookeeper.ZookeeperRegistry#doRegister
实现@Override public void doRegister(URL url) { try { zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true)); } catch (Throwable e) { throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); } }
- 创建节点
-
-
-
-
-
-
-
org.apache.dubbo.config.spring.ServiceBean
org.apache.dubbo.config.spring.ServiceBean#afterPropertiesSet
org.apache.dubbo.config.ServiceConfig#export
org.apache.dubbo.config.ServiceConfig#doExport
org.apache.dubbo.config.ServiceConfig#doExportUrls
org.apache.dubbo.config.ServiceConfig#doExportUrlsFor1Protocol
org.apache.dubbo.config.ServiceConfig#exportLocal
org.apache.dubbo.common.extension.ExtensionLoader#createAdaptiveExtensionClass
中的具体export方法通过适配器的方式进行调用- `org.apache.dubbo.registry.integration.RegistryProtocol#export` - `final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);`方法后续调用链 - `org.apache.dubbo.registry.integration.RegistryProtocol#doLocalExport` - `org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper#export` - `org.apache.dubbo.rpc.protocol.ProtocolListenerWrapper#export` - `org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#export` - `org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#openServer` - `org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#createServer` - ```java final Registry registry = getRegistry(originInvoker); ``` 后续调用链 - `org.apache.dubbo.registry.integration.RegistryProtocol#getRegistry` - `org.apache.dubbo.registry.support.AbstractRegistryFactory#getRegistry` - `org.apache.dubbo.registry.zookeeper.ZookeeperRegistryFactory#createRegistry` - `org.apache.dubbo.registry.zookeeper.ZookeeperRegistryFactory#createRegistry` - `org.apache.dubbo.registry.zookeeper.ZookeeperRegistry#ZookeeperRegistry` ```java Registry registry = registryFactory.getRegistry(registryUrl); ``` 执行完成 - `org.apache.dubbo.registry.support.FailbackRegistry#register` - `org.apache.dubbo.registry.zookeeper.ZookeeperRegistry#doRegister`创建节点
graph TD
-
配置
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:dubbo="http://dubbo.apache.org/schema/dubbo" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.3.xsd http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd"> <!-- 消费方应用名,用于计算依赖关系,不是匹配条件,不要与提供方一样 --> <dubbo:application name="dubbo-client"/> <dubbo:registry address="zookeeper://192.168.1.107:2181"/> <!-- 生成远程服务代理,可以和本地bean一样使用demoService --> <dubbo:reference id="dubboHello" interface="com.huifer.dubbo.server.api.DubboHello"/> <dubbo:reference id="dubboHello2" interface="com.huifer.dubbo.server.api.DubboHello2"/> <dubbo:reference id="acac" interface="com.huifer.dubbo.server.api.DubboVersion1" version="2.0.0" timeout="1" mock="com.huifer.dubbo.client.mock.MockDemo" /> </beans>
-
reference
<dubbo:reference id="dubboHello" interface="com.huifer.dubbo.server.api.DubboHello"/>
<xsd:element name="reference" type="referenceType"> <xsd:annotation> <xsd:documentation><![CDATA[ Reference service config ]]></xsd:documentation> <xsd:appinfo> <tool:annotation> <tool:exports type="org.apache.dubbo.config.ReferenceConfig"/> </tool:annotation> </xsd:appinfo> </xsd:annotation> </xsd:element>
-
org.apache.dubbo.config.spring.ReferenceBean
-
org.apache.dubbo.config.ReferenceConfig#createProxy
@SuppressWarnings({"unchecked", "rawtypes", "deprecation"}) private T createProxy(Map<String, String> map) { if (shouldJvmRefer(map)) { URL url = new URL(LOCAL_PROTOCOL, LOCALHOST_VALUE, 0, interfaceClass.getName()).addParameters(map); invoker = REF_PROTOCOL.refer(interfaceClass, url); if (logger.isInfoEnabled()) { logger.info("Using injvm service " + interfaceClass.getName()); } } else { urls.clear(); // reference retry init will add url to urls, lead to OOM if (url != null && url.length() > 0) { // user specified URL, could be peer-to-peer address, or register center's address. String[] us = SEMICOLON_SPLIT_PATTERN.split(url); if (us != null && us.length > 0) { for (String u : us) { URL url = URL.valueOf(u); if (StringUtils.isEmpty(url.getPath())) { url = url.setPath(interfaceName); } if (REGISTRY_PROTOCOL.equals(url.getProtocol())) { urls.add(url.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map))); } else { urls.add(ClusterUtils.mergeUrl(url, map)); } } } } else { // assemble URL from register center's configuration // if protocols not injvm checkRegistry // 通过注册中心来注册 if (!LOCAL_PROTOCOL.equalsIgnoreCase(getProtocol())){ checkRegistry(); List<URL> us = loadRegistries(false); if (CollectionUtils.isNotEmpty(us)) { for (URL u : us) { URL monitorUrl = loadMonitor(u); if (monitorUrl != null) { map.put(MONITOR_KEY, URL.encode(monitorUrl.toFullString())); } urls.add(u.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map))); } } if (urls.isEmpty()) { throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config."); } } } if (urls.size() == 1) { invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0)); } else { List<Invoker<?>> invokers = new ArrayList<Invoker<?>>(); URL registryURL = null; for (URL url : urls) { invokers.add(REF_PROTOCOL.refer(interfaceClass, url)); if (REGISTRY_PROTOCOL.equals(url.getProtocol())) { registryURL = url; // use last registry url } } if (registryURL != null) { // registry url is available // use RegistryAwareCluster only when register's CLUSTER is available URL u = registryURL.addParameter(CLUSTER_KEY, RegistryAwareCluster.NAME); // The invoker wrap relation would be: RegistryAwareClusterInvoker(StaticDirectory) -> FailoverClusterInvoker(RegistryDirectory, will execute route) -> Invoker invoker = CLUSTER.join(new StaticDirectory(u, invokers)); } else { // not a registry url, must be direct invoke. invoker = CLUSTER.join(new StaticDirectory(invokers)); } } } if (shouldCheck() && !invoker.isAvailable()) { throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion()); } if (logger.isInfoEnabled()) { logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl()); } /** * @since 2.7.0 * ServiceData Store */ MetadataReportService metadataReportService = null; if ((metadataReportService = getMetadataReportService()) != null) { URL consumerURL = new URL(CONSUMER_PROTOCOL, map.remove(REGISTER_IP_KEY), 0, map.get(INTERFACE_KEY), map); metadataReportService.publishConsumer(consumerURL); } // create service proxy return (T) PROXY_FACTORY.getProxy(invoker); }
-
List<URL> us = loadRegistries(false);
org.apache.dubbo.config.AbstractInterfaceConfig#loadRegistries
protected List<URL> loadRegistries(boolean provider) { // check && override if necessary List<URL> registryList = new ArrayList<URL>(); if (CollectionUtils.isNotEmpty(registries)) { for (RegistryConfig config : registries) { String address = config.getAddress(); if (StringUtils.isEmpty(address)) { address = ANYHOST_VALUE; } if (!RegistryConfig.NO_AVAILABLE.equalsIgnoreCase(address)) { Map<String, String> map = new HashMap<String, String>(); appendParameters(map, application); appendParameters(map, config); map.put(PATH_KEY, RegistryService.class.getName()); appendRuntimeParameters(map); if (!map.containsKey(PROTOCOL_KEY)) { map.put(PROTOCOL_KEY, DUBBO_PROTOCOL); } List<URL> urls = UrlUtils.parseURLs(address, map); for (URL url : urls) { url = URLBuilder.from(url) .addParameter(REGISTRY_KEY, url.getProtocol()) .setProtocol(REGISTRY_PROTOCOL) .build(); if ((provider && url.getParameter(REGISTER_KEY, true)) || (!provider && url.getParameter(SUBSCRIBE_KEY, true))) { registryList.add(url); } } } } } return registryList; }
protected List<RegistryConfig> registries;
内部变量 -
-
URL monitorUrl = loadMonitor(u);
org.apache.dubbo.config.AbstractInterfaceConfig#loadMonitor
protected URL loadMonitor(URL registryURL) { checkMonitor(); Map<String, String> map = new HashMap<String, String>(); map.put(INTERFACE_KEY, MonitorService.class.getName()); appendRuntimeParameters(map); //set ip String hostToRegistry = ConfigUtils.getSystemProperty(DUBBO_IP_TO_REGISTRY); if (StringUtils.isEmpty(hostToRegistry)) { hostToRegistry = NetUtils.getLocalHost(); } else if (NetUtils.isInvalidLocalHost(hostToRegistry)) { throw new IllegalArgumentException("Specified invalid registry ip from property:" + DUBBO_IP_TO_REGISTRY + ", value:" + hostToRegistry); } map.put(REGISTER_IP_KEY, hostToRegistry); appendParameters(map, monitor); appendParameters(map, application); String address = monitor.getAddress(); String sysaddress = System.getProperty("dubbo.monitor.address"); if (sysaddress != null && sysaddress.length() > 0) { address = sysaddress; } if (ConfigUtils.isNotEmpty(address)) { if (!map.containsKey(PROTOCOL_KEY)) { if (getExtensionLoader(MonitorFactory.class).hasExtension(LOGSTAT_PROTOCOL)) { map.put(PROTOCOL_KEY, LOGSTAT_PROTOCOL); } else { map.put(PROTOCOL_KEY, DUBBO_PROTOCOL); } } return UrlUtils.parseURL(address, map); // 返回协议地址 } else if (REGISTRY_PROTOCOL.equals(monitor.getProtocol()) && registryURL != null) { return URLBuilder.from(registryURL) .setProtocol(DUBBO_PROTOCOL) .addParameter(PROTOCOL_KEY, REGISTRY_PROTOCOL) .addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)) .build(); //返回协议地址 } return null; }
-
invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0));
org.apache.dubbo.registry.integration.RegistryProtocol#refer
@Override @SuppressWarnings("unchecked") public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException { url = URLBuilder.from(url) .setProtocol(url.getParameter(REGISTRY_KEY, DEFAULT_REGISTRY)) .removeParameter(REGISTRY_KEY) .build(); Registry registry = registryFactory.getRegistry(url); if (RegistryService.class.equals(type)) { return proxyFactory.getInvoker((T) registry, type, url); } // group="a,b" or group="*" Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY)); String group = qs.get(GROUP_KEY); if (group != null && group.length() > 0) { if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) { return doRefer(getMergeableCluster(), registry, type, url); } } return doRefer(cluster, registry, type, url); }
此时的url为zookeeper协议
-
org.apache.dubbo.registry.integration.RegistryProtocol#doRefer
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) { RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url); directory.setRegistry(registry); directory.setProtocol(protocol); // all attributes of REFER_KEY Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters()); URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters); if (!ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true)) { directory.setRegisteredConsumerUrl(getRegisteredConsumerUrl(subscribeUrl, url)); registry.register(directory.getRegisteredConsumerUrl()); } directory.buildRouterChain(subscribeUrl); directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY, PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY)); Invoker invoker = cluster.join(directory); ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory); return invoker; }
-
-
org.apache.dubbo.rpc.cluster.Cluster
@SPI(FailoverCluster.NAME) // failover public interface Cluster { /** * Merge the directory invokers to a virtual invoker. * * @param <T> * @param directory * @return cluster invoker * @throws RpcException */ @Adaptive <T> Invoker<T> join(Directory<T> directory) throws RpcException; }
-
org.apache.dubbo.rpc.cluster.support.FailoverClusterInvoker
public AbstractClusterInvoker(Directory<T> directory, URL url) { if (directory == null) { throw new IllegalArgumentException("service directory == null"); } this.directory = directory; //sticky: invoker.isAvailable() should always be checked before using when availablecheck is true. this.availablecheck = url.getParameter(CLUSTER_AVAILABLE_CHECK_KEY, DEFAULT_CLUSTER_AVAILABLE_CHECK); }
-
return (T) PROXY_FACTORY.getProxy(invoker);
- PROXY_FACTORY是什么?
package com.huifer.dubbo.client; import org.apache.dubbo.common.extension.ExtensionLoader; public class ProxyFactory$Adaptive implements org.apache.dubbo.rpc.ProxyFactory { public java.lang.Object getProxy(org.apache.dubbo.rpc.Invoker arg0) throws org.apache.dubbo.rpc.RpcException { if (arg0 == null) { throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null"); } if (arg0.getUrl() == null) { throw new IllegalArgumentException( "org.apache.dubbo.rpc.Invoker argument getUrl() == null"); } org.apache.dubbo.common.URL url = arg0.getUrl(); String extName = url.getParameter("proxy", "javassist"); if (extName == null) { throw new IllegalStateException( "Failed to get extension (org.apache.dubbo.rpc.ProxyFactory) name from url (" + url.toString() + ") use keys([proxy])"); } org.apache.dubbo.rpc.ProxyFactory extension = ExtensionLoader .getExtensionLoader(org.apache.dubbo.rpc.ProxyFactory.class).getExtension(extName); return extension.getProxy(arg0); } public java.lang.Object getProxy(org.apache.dubbo.rpc.Invoker arg0, boolean arg1) throws org.apache.dubbo.rpc.RpcException { if (arg0 == null) { throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null"); } if (arg0.getUrl() == null) { throw new IllegalArgumentException( "org.apache.dubbo.rpc.Invoker argument getUrl() == null"); } org.apache.dubbo.common.URL url = arg0.getUrl(); String extName = url.getParameter("proxy", "javassist"); if (extName == null) { throw new IllegalStateException( "Failed to get extension (org.apache.dubbo.rpc.ProxyFactory) name from url (" + url.toString() + ") use keys([proxy])"); } org.apache.dubbo.rpc.ProxyFactory extension = ExtensionLoader .getExtensionLoader(org.apache.dubbo.rpc.ProxyFactory.class).getExtension(extName); return extension.getProxy(arg0, arg1); } public org.apache.dubbo.rpc.Invoker getInvoker(java.lang.Object arg0, java.lang.Class arg1, org.apache.dubbo.common.URL arg2) throws org.apache.dubbo.rpc.RpcException { if (arg2 == null) { throw new IllegalArgumentException("url == null"); } org.apache.dubbo.common.URL url = arg2; String extName = url.getParameter("proxy", "javassist"); if (extName == null) { throw new IllegalStateException( "Failed to get extension (org.apache.dubbo.rpc.ProxyFactory) name from url (" + url.toString() + ") use keys([proxy])"); } org.apache.dubbo.rpc.ProxyFactory extension = ExtensionLoader .getExtensionLoader(org.apache.dubbo.rpc.ProxyFactory.class).getExtension(extName); return extension.getInvoker(arg0, arg1, arg2); } }
-
org.apache.dubbo.rpc.proxy.wrapper.StubProxyFactoryWrapper#getProxy(org.apache.dubbo.rpc.Invoker<T>)
@Override @SuppressWarnings({"unchecked", "rawtypes"}) public <T> T getProxy(Invoker<T> invoker) throws RpcException { T proxy = proxyFactory.getProxy(invoker); if (GenericService.class != invoker.getInterface()) { URL url = invoker.getUrl(); String stub = url.getParameter(STUB_KEY, url.getParameter(LOCAL_KEY)); if (ConfigUtils.isNotEmpty(stub)) { Class<?> serviceType = invoker.getInterface(); if (ConfigUtils.isDefault(stub)) { if (url.hasParameter(STUB_KEY)) { stub = serviceType.getName() + "Stub"; } else { stub = serviceType.getName() + "Local"; } } try { Class<?> stubClass = ReflectUtils.forName(stub); if (!serviceType.isAssignableFrom(stubClass)) { throw new IllegalStateException("The stub implementation class " + stubClass.getName() + " not implement interface " + serviceType.getName()); } try { Constructor<?> constructor = ReflectUtils.findConstructor(stubClass, serviceType); proxy = (T) constructor.newInstance(new Object[]{proxy}); //export stub service URLBuilder urlBuilder = URLBuilder.from(url); if (url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT)) { urlBuilder.addParameter(STUB_EVENT_METHODS_KEY, StringUtils.join(Wrapper.getWrapper(proxy.getClass()).getDeclaredMethodNames(), ",")); urlBuilder.addParameter(IS_SERVER_KEY, Boolean.FALSE.toString()); try { export(proxy, (Class) invoker.getInterface(), urlBuilder.build()); } catch (Exception e) { LOGGER.error("export a stub service error.", e); } } } catch (NoSuchMethodException e) { throw new IllegalStateException("No such constructor \"public " + stubClass.getSimpleName() + "(" + serviceType.getName() + ")\" in stub implementation class " + stubClass.getName(), e); } } catch (Throwable t) { LOGGER.error("Failed to create stub implementation class " + stub + " in consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", cause: " + t.getMessage(), t); // ignore } } } return proxy; }
-
proxy相关类以及方法
-
org.apache.dubbo.registry.integration.RegistryProtocol#doRefer
追溯directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY, PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY));
-
org.apache.dubbo.registry.support.FailbackRegistry#subscribe
@Override public void subscribe(URL url, NotifyListener listener) { super.subscribe(url, listener); removeFailedSubscribed(url, listener); try { // Sending a subscription request to the server side // 向服务器订阅 org.apache.dubbo.registry.zookeeper.ZookeeperRegistry#doSubscribe 实现 doSubscribe(url, listener); } catch (Exception e) { Throwable t = e; List<URL> urls = getCacheUrls(url); if (CollectionUtils.isNotEmpty(urls)) { notify(url, listener, urls); logger.error("Failed to subscribe " + url + ", Using cached list: " + urls + " from cache file: " + getUrl().getParameter(FILE_KEY, System.getProperty("user.home") + "/dubbo-registry-" + url.getHost() + ".cache") + ", cause: " + t.getMessage(), t); } else { // If the startup detection is opened, the Exception is thrown directly. boolean check = getUrl().getParameter(Constants.CHECK_KEY, true) && url.getParameter(Constants.CHECK_KEY, true); boolean skipFailback = t instanceof SkipFailbackWrapperException; if (check || skipFailback) { if (skipFailback) { t = t.getCause(); } throw new IllegalStateException("Failed to subscribe " + url + ", cause: " + t.getMessage(), t); } else { logger.error("Failed to subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t); } } // Record a failed registration request to a failed list, retry regularly addFailedSubscribed(url, listener); } }
org.apache.dubbo.registry.zookeeper.ZookeeperRegistry#doSubscribe
-
-
org.apache.dubbo.registry.integration.RegistryDirectory#refreshInvoker
-
如果传入的invoker列表不是空的,这意味着它是最新的invoker列表
-
如果传入的invokerUrl列表是空的,这意味着该规则只是一个覆盖规则或路由
url转换成invoker
Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map
-
-
调用链
-
org.apache.dubbo.config.spring.ReferenceBean#afterPropertiesSet
-
org.apache.dubbo.config.spring.ReferenceBean#getObject
-
org.apache.dubbo.config.ReferenceConfig#get
-
org.apache.dubbo.config.ReferenceConfig#init
-
org.apache.dubbo.config.ReferenceConfig#createProxy
-
注册中心方式
org.apache.dubbo.config.AbstractInterfaceConfig#loadRegistries
org.apache.dubbo.config.AbstractInterfaceConfig#loadMonitor
-
org.apache.dubbo.registry.integration.RegistryProtocol#refer
-
org.apache.dubbo.registry.integration.RegistryProtocol#refer
-
org.apache.dubbo.registry.integration.RegistryProtocol#doRefer
-
org.apache.dubbo.rpc.cluster.support.FailoverCluster#join
org.apache.dubbo.rpc.cluster.support.FailoverClusterInvoker
org.apache.dubbo.rpc.cluster.support.FailoverClusterInvoker#FailoverClusterInvoker
org.apache.dubbo.rpc.cluster.support.AbstractClusterInvoker#AbstractClusterInvoker(org.apache.dubbo.rpc.cluster.Directory<T>, org.apache.dubbo.common.URL)
-
org.apache.dubbo.registry.integration.RegistryDirectory#subscribe
-
org.apache.dubbo.registry.integration.RegistryDirectory#subscribe
com.alibaba.dubbo.registry.support.FailbackRegistry#subscribe(com.alibaba.dubbo.common.URL, com.alibaba.dubbo.registry.NotifyListener)
-
-
-
org.apache.dubbo.rpc.proxy.wrapper.StubProxyFactoryWrapper#getProxy(org.apache.dubbo.rpc.Invoker<T>)
-
-
-
-
-
-
-
InvokerInvocationHandler-->MockInvoker:invoke()
MockInvoker-->AbstractClusterInvoker:invoke()
AbstractClusterInvoker-->AbstractDirectory:list获取
AbstractDirectory-->RegistryDirectory:doList()方法
RegistryDirectory-->AbstractDirectory:返回invokers
AbstractDirectory-->AbstractClusterInvoker:invokers获取
AbstractClusterInvoker-->FailoverClusterInvoker:doInvoke()
FailoverClusterInvoker--> MockInvokersSelector:select()
MockInvokersSelector-->AbstractLoadBanance:doSelect()
AbstractLoadBanance--> RandomLoadBanalce:random
RandomLoadBanalce-->AbstractLoadBanance:random
AbstractLoadBanance-->MockInvokersSelector:invoke
MockInvokersSelector-->FailoverClusterInvoker:invoke
FailoverClusterInvoker-->AbstractProxyInvoker:invoke
AbstractProxyInvoker-->DubboInvoker:doinvoke
DubboInvoker-->InvokerInvocationHandler:result
org.apache.dubbo.rpc.cluster.loadbalance.RoundRobinLoadBalance
@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
// 每个接口的权重
ConcurrentMap<String, WeightedRoundRobin> map = methodWeightMap.get(key);
if (map == null) {
methodWeightMap.putIfAbsent(key, new ConcurrentHashMap<String, WeightedRoundRobin>());
map = methodWeightMap.get(key);
}
// 权重总和
int totalWeight = 0;
long maxCurrent = Long.MIN_VALUE;
long now = System.currentTimeMillis();
Invoker<T> selectedInvoker = null;
WeightedRoundRobin selectedWRR = null;
for (Invoker<T> invoker : invokers) {
String identifyString = invoker.getUrl().toIdentityString();
WeightedRoundRobin weightedRoundRobin = map.get(identifyString);
// 获取权重
int weight = getWeight(invoker, invocation);
if (weightedRoundRobin == null) {
weightedRoundRobin = new WeightedRoundRobin();
weightedRoundRobin.setWeight(weight);
map.putIfAbsent(identifyString, weightedRoundRobin);
}
if (weight != weightedRoundRobin.getWeight()) {
//weight changed
weightedRoundRobin.setWeight(weight);
}
long cur = weightedRoundRobin.increaseCurrent();
weightedRoundRobin.setLastUpdate(now);
if (cur > maxCurrent) {
maxCurrent = cur;
selectedInvoker = invoker;
selectedWRR = weightedRoundRobin;
}
totalWeight += weight;
}
if (!updateLock.get() && invokers.size() != map.size()) {
if (updateLock.compareAndSet(false, true)) {
try {
// copy -> modify -> update reference
ConcurrentMap<String, WeightedRoundRobin> newMap = new ConcurrentHashMap<String, WeightedRoundRobin>();
newMap.putAll(map);
Iterator<Entry<String, WeightedRoundRobin>> it = newMap.entrySet().iterator();
while (it.hasNext()) {
Entry<String, WeightedRoundRobin> item = it.next();
if (now - item.getValue().getLastUpdate() > RECYCLE_PERIOD) {
it.remove();
}
}
methodWeightMap.put(key, newMap);
} finally {
updateLock.set(false);
}
}
}
if (selectedInvoker != null) {
selectedWRR.sel(totalWeight);
return selectedInvoker;
}
// should not happen here
return invokers.get(0);
}
protected static class WeightedRoundRobin {
// 权重值
private int weight;
// 当前权重值,随着使用次数而改变
private AtomicLong current = new AtomicLong(0);
// 最后一次使用事件
private long lastUpdate;
public int getWeight() {
return weight;
}
public void setWeight(int weight) {
this.weight = weight;
current.set(0);
}
public long increaseCurrent() {
return current.addAndGet(weight);
}
public void sel(int total) {
current.addAndGet(-1 * total);
}
public long getLastUpdate() {
return lastUpdate;
}
public void setLastUpdate(long lastUpdate) {
this.lastUpdate = lastUpdate;
}
}