-
Notifications
You must be signed in to change notification settings - Fork 413
1.6_深入领域
-
Contract
契约即规范,是对不同领域内数据类型的高层抽象,其在Datalink中的主要表现形式为Record,如针对关系型数据库有RdbEventRecord、针对Hbase有HRecord。在整个产品规划中,契约处于最顶层,无论采用何种基础设施、何种业务模型、何种开发语言,契约都是一套独立的规范。契约是连接Reader和Writer的纽带,Reader和Writer互不感知,它们通过识别共同的契约实现数据交换。 -
Business Model
Business Model是对数据交换业务场景的高层抽象,将不同场景的共性需求进行了归纳和总结,抽象出了一套统一的模型定义。当然它不是万能的,不能包含所有的需求点,并且是随着场景的增多不断演化的,但它是必须的,统一的模型抽象可以支撑80%场景下的功能复用。主要模型定义如下:- Media:对存储单元的抽象。如:RDBMS中的表,HBase中的表,ElasticSearch中的索引,HDFS中的一个文件路径等,都称之为Media。
- MediaSource:对存储产品的抽象。如:Mysql,Sqlserver,HBase,Kafka,Hdfs等,都称之为MediaSource。
- MediaMapping(MediaMappingColumn):对存储单元间数据同步规则的抽象。具体见领域功能介绍部分。
- MetaMapping:对存储产品间数据类型映射规则的抽象。如:Mysql的varchar映射到ElasticSearch的数据类型是String。
本小节对datalink中提供的通用功能做一下介绍
功能项 | 详述 |
---|---|
别名 | 同步配置中可以为库、表和列配置别名,以支持源和目标名称不一致的场景 |
黑名单 | 可以通过黑名单指定不同步某些列 |
白名单 | 可以通过白名单指定只同步某些列 |
通配符 |
为了简化配置,目前支持了几种类型的通配符 |
配置重载 | 使用通配符很方便,但通配符中包含的个别表也经常有特殊配置需求,如果因为这些特殊需求而放弃使用通配符,显然得不偿失,因此,系统提供了配置重载机制:在使用了通配符时,还可以对表进行单独的配置,系统会用独立的配置重载掉通用配置(注:独立配置处于禁用状态时,也会重载,不会因为禁用了就又采用通用规则了) |
优先级 | 通过配置优先级,可以灵活指定表的同步顺序,系统会按照优先级从小到大的顺序执行 优先级的默认值为5,优先级相同时,系统按照数据的原始顺序执行 优先级的排序单元是Task,即:优先级排序时,范围只限于同一个Task内,不同Task之间的配置是完全隔离的 |
拦截器 | 拦截器是系统提供的扩展机制,用户可以在拦截器中自定义同步逻辑,实现自己的特殊需求,如:过滤特定数据、对数据进行重新组装、反查数据等等。拦截器有两种配置方式:Script和Class,前者是直接配置java源码,系统动态编译并使用;后者是配置Class类名,系统在指定目录中加载class文件并使用 |
多表聚合 | 在【同步规则】中可以配置【聚合列】,用来实现多表聚合,即:将【源端】多张表的数据在【目标端】聚合到一张宽表中 当然,系统只是提供了该配置项,具体聚合逻辑还得Writer插件自行实现 |
数据合并 | 数据合并指的是在数据同步过程中,可以对同一张表的相同pk的数据进行合并,以提升同步性能 如:同步Binlog时,将同一条数据的多次update事件合并成一条,这样在目标端执行的时候,只需执行一次即可 具体可参见:com.ucar.datalink.worker.api.merge.BuiltInRdbEventRecordMerger |
主键跳过 | 利用该功能可以在同步过程中过滤掉指定主键的数据,目前支持两种配置方式: 1、指定ID,多个以逗号分割,如:100,200,300 2、指定ID区域,多个以逗号分割,如:[100-200],[300-500] |
参数扩展 | 不同场景下的同步规则相差各异,MediaMapping提供了一个扩展参数字段,用来满足不同场景下的配置需求 |
本小节对数据同步的核心流程做一下介绍,此处所说的核心流程指的是系统提供的默认流程,用户自定义的Handler必须继承自com.ucar.datalink.worker.api.handle.AbstractHandler才会使用该流程。当然,用户也可以不继承该Handler,完全自定义同步逻辑,但大部分情况下的同步流程都是类似的,建议优先复用该Handler再考虑自行扩展。
上图展示的是一次数据同步的交互流程:同步流程从TaskReader发起,TaskReader将数据放到队列,然后进行callback等待,TaskWriter负责从队列take数据(实际情况可以是多个TaskWriter消费该批次数据,简单起见,上图并没有体现一对多的关系),然后根据Record类型加载对应的Handler,等Handler处理完数据后,不论成功失败进行callback通知,然后TaskReader结束等待,如果成功则执行commit然后发起下一轮同步,如果失败则执行rollback然后重试。基本流程就是这样的,具体细节可参考代码,下面对Handler的内部流程做重点介绍:
- Mapping
Record经过Mapping阶段后,有两个重要的变化,其一,如果在MediaMapping中配置了一对多的同步映射关系,一个Record会裂变成多个,其二,Record的metadata中被附加上了MediaMapping信息。Mapping完成之后,所有的Record都有了【目标归属】 - Intercept
拦截器是一个扩展机制,执行的是用户配置的自定义逻辑,拦截阶段可以对Record进行任何形式的处理,如:过滤、合并、再加工等,具体做什么主要看实际的业务需求。系统提供的一些默认功能也是靠拦截器实现的,如:主键跳过功能,只不过这些拦截器是内置的。 - Merge
对于某些Record,在执行前是可以进行Merge操作的,如RdbEventRecord,我们可以把同一条数据的变更事件合并(如多次update合并成一次update),这样可以大大提升同步性能 - Transform
上述阶段完成之后Record已经逃脱了被忽略的命运,Transform阶段将进行最后的组装,别名、黑白名单等规则在此阶段生效,组装之后等待最后的执行 - Group
Group的目的是为了并发,一条数据可能同时同步到A,B,C三个数据库,此阶段会按照目标MediaSouce进行分组,后续load阶段,不同分组会并发执行 - Load
Load阶段完成最终的写入,需要每个Writer插件在重载Handler的时候自行实现,每个插件根据实际情况,可以对数据再次整合,但该阶段的宏观目标就是【完成写入】
下面通过一个图示,进行更详细的说明:
- R_t1_p1在Mapping阶段完成了裂变,因为需要从ucar_admin同步到ucar_driver、ucar_order和ucar_crm三个库
- R_t3_p2在Intercept阶段被过滤,因为配置了编号为1的拦截器
- R_t2_p1(insert)和R_t2_p1(update),在Merge阶段合并成了一条新的R_t2_p1(insert)
- R_t3_p1在Transform阶段变成了R_t33_p1,因为配置了表别名
- Group阶段,R_t1_p1和R_t2_p1合并到了一组,因为它们都同步到ucar_driver,另一个R_t1_p1和R_t33_p1合并到了一组,因为它们都同步到ucar_crm
- Load阶段R_t33_p1执行完之后,其它的Record才执行(并行执行),因为R_t33_p1优先级高