-
Notifications
You must be signed in to change notification settings - Fork 413
1.4_深入(Re)Balance
Datalink的HA和Reblance机制深入参考了kafka和kafka-connect(版本<0.10.1.0>)的相关设计思想,结合自身的特点和场景进行了重写和改造,相关概念介绍如下:
- Central Co-Ordination
Datalink-Manager提供了GroupCoordinator,整个集群的 (Re-)Balance都必须经由中心协调器统一处理,这样可以保证整个集群的状态统一,尽量避免 "脑裂" 的发生 - Worker
Datalink-Worker(类似Kafka-Connect的Worker)提供了了WorkerCoordinator,负责和GroupCoordinator进行通信和协调,共同实现Failure-Detection、Balancing、HA等功能 - Group
进行(Re-)Balance Processing的基本单位是Group,不同Group之间完全隔离,同一Group中的Task和Worker根据不同的策略进行动态分配和组合
(Re-)Balance流程分为两个阶段
- 第一阶段(Group Membership)
该阶段主要用来进行【成员发现】和【Leader选举】 - 第二阶段(State Synchronization)
该阶段主要用来进行【状态收集】和【资源分配】
下面对两个阶段做进一步介绍
每个Worker启动完成之后,会拿到两个重要信息:自己所属的Group和GroupCoordinator的地址,这两个信息是Worker完成(Re-)Balance的必要条件
- Phase 1 : Joining the Group,主要流程如下所示:
1)触发(Re-)Balance
常见的触发条件有:Worker加入Group、Worker退出Group、新增Task、删除Task、Manager发生主备切换等
2)Worker发送JoinGroupRequest给Manager
JoinGroupRequest携带的主要信息有:GroupId,WorkerId,协议类型和相关元数据
3)Manager端进行Collect并Await
Manager以Group为单位对Worker进行管理,等待Group下所有worker的JoinGroupRequest请求(依据last generation的worker列表做判断),直至触发等待超时
4)Manager发送Response
当所有Worker已加入Group或触发了等待超时,Manager便随机选中一个Worker为Leader,并给所有worker发送JoinGroupResponse(kafka中还有协议选择的相关操作,我们暂未支持)
- Phase 2 : Synchronizing Group State
Join阶段完成后,每个Worker都会收到JoinGroupResponse,然后开始Sync,Sync阶段主要流程如下所示:
1)Worker判断自己是否是Leader,是的话进行Assign操作并把分配信息放到SyncGroupRequest中发送給Manager,不是的话直接发送一个空的SyncGroupRequest给Manager
2)Manager等待Leader-Worker的SyncGroupRequest,收到之后解析出每个worker对应的Assignment,然后构造SyncGroupResponse将Assignment信息分别发给每个worker
3)Worker收到SyncGroupResponse,解析Assignment信息,然后执行分配给自己的Task
- 补充
JoinGroupRequest、JoinGroupResponse、SyncGroupRequest、SyncGroupResponse的详情可参见 : org.apache.kafka.common.requests.*下的相关实现
介绍完Coordinating的主流程,接下来详细介绍一下Manager端Goup的状态机
-
Empty
当Group内没有任何Worker时,状态为Empty,实际场景中,Group的初始状态为Empty(即:未发生任何Balance之前),(Re-)Balance之后如果分组内没有任何worker了分组状态也会被重置为Empty -
PreparingRebalance
当GroupCoordinator检测到需要进行(Re-)Balance时,会把Group状态置为PreparingRebalance,检测条件主要有两个:GroupCoordinator收到了Worker主动发送的JoinGroup-Request请求,GroupCoordinator通过heartbeat机制检测到发生了Member-Failure。Group一旦进入PreparingRebalance状态,立即启动定时器,等待Group下所有剩余Worker的JoinGroupRequest请求,直至所有worker都加入或触发定时器超时,然后把状态转换AwaitingSync。当Group处于PreparingRebalance状态时,如果收到了Heartbeats或者SyncGroupRequest请求,GroupCoordinator会返回错误给worker,提示"A rebalance is in progress",worker基于此信息得知自己需要进行Re-join操作,随后发送Join请求。 -
AwaitingSync
AwaitingSync的上一状态只能是PreparingRebalance。Group处于AwaitingSync状态时,主要任务是等待Leader-Worker的SyncGroupRequest请求,一旦等到Leader的请求,便从请求信息中取出Assingments,将其放到GroupMetadata中,然后把Group状态设置为Stable。当然,Group处于AwaitingSync状态时,也会收到非Leader-Worker的Sync请求,其处理策略是把请求暂存,继续等待Leader,拿到Assingments结果后再对请求进行响应。当Group处于AwaitingSync状态时,如果收到了Heartbeats请求,会返回错误给worker,提示"A rebalance is in progress",worker基于此信息进行Re-join操作;如果收到了JoinGroupRequest请求,会把Group状态置为PreparingRebalance,然后给正在等待SyncGroupResponse响应的worker发送信息,提示"A rebalance is in progress",然后workers发起新一轮的Re-join请求。 -
Stable
Stable的上一状态只能是AwaitingSync。因为AwaitingSync阶段一旦收到Leader的请求,Group状态便立即转为Stable,所以,处于Stable状态下的Group,仍然会收到SyncGroupRequest请求,此时很简单,从GroupMetadata中取出对应worker的分配结果,直接发送SyncGroupResponse即可。Stable状态下关于心跳的说明,引用kafka文档中的一句话:Heartbeats are accepted from members in this state and are used to keep group members active or to indicate that they need to join the group. -
Dead
可以从任何一个状态转换为Dead状态,关于此状态的解释:Group has no more members and its metadata is being removed。目前Manager的高可用采用的是主备模式,并且暂未提供主备动态切换功能,所以Group状态转换为Dead的处理逻辑也还暂未实现。
本小节对Worker端的coordination逻辑做一个深入介绍,因为细节非常多,此处重在介绍主流程
说明:和"group state machine"不同的是,上图展示的并不是一个状态机,图中标识的各种状态也并没有在worker中进行定义,此图是对worker的coordinating逻辑做的一个抽象和总结
-
Down
表示worker还未启动 -
PreparingJoin
该状态表示worker需要进行(Re-)join操作,worker的tick线程检测到该状态之后,会触发(Re-)join,worker的状态也随之转换为Join-ing。Worker进入PreparingJoin状态的触发条件有很多,分别是: 23 incomplete 进程启动后的初始状态为PreparingJoin 7 incomplete JoinResponse返回了[特定异常],状态由Join-ing转换为PreparingJoin 8 incomplete SyncResponse返回了[特定异常],状态由Sync-ing转换为PreparingJoin 9 incomplete HeartbeatResponse返回了[特定异常],状态由Stable转换为PreparingJoin 10 incomplete WorkerKeeper检测到新增加了Task或删除了Task,触发PreparingJoin 关于[特定异常],做一下说明,此处表述的[特定异常]主要有: 24 incomplete GROUP_COORDINATOR_NOT_AVAILABLE 17 incomplete NOT_COORDINATOR_FOR_GROUP 18 incomplete REBALANCE_IN_PROGRESS 19 incomplete ILLEGAL_GENERATION 20 incomplete UNKNOWN_MEMBER_ID 26 incomplete 超时异常
当worker收到这些异常时,会触发PreparingJoin,此外,如果是GROUP_COORDINATOR_NOT_AVAILABLE或NOT_COORDINATOR_FOR_GROUP,还会触发"Re Discover Coordinator"操作,如果是ILLEGAL_GENERATION或UNKNOWN_MEMBER_ID,还会触发"Reset Generation"操作
-
Join-ing
该状态表示worker正在进行"Joining Group"操作,对应于Manager端的PreparingBalance状态。Join-ing状态开始于发送JoinRequest请求,结束于收到JoinResponse响应,如果响应信息正常则进入Sync-ing状态,否则进入PreparingJoin状态。Worker发送JoinRequest之前会检测本地是否有Task正在运行,如果有,则关闭Task之后才发送请求。 -
Sync-ing
该状态表示worker正在进行"Sync state"操作,对应于Manager端的AwaitingSync状态。Sync-ing状态开始于发送SyncRequest请求,结束于收到SyncResponse响应,如果响应信息正常则进入Stable状态,否则进入PreparingJoin状态。Worker发送SyncRequest之前会判断自己是否是leader,如果是,则进行资源分配,将分配结果放到SyncRequest中发送。Worker收到SyncResponse响应之后,从响应信息中取出自己需要运行的Task,然后运行这些Task。 -
Stable
该状态表示Worker正在稳定运行,也是worker绝大部分时间所处的状态,此状态下的worker依靠心跳维持和manager的会话,当会话超时或Manager通过心跳通知worker需要进行(Re-)Balance时,worker进入PreparingJoin状态。注:heartbeat在Join-ing和Sync-ing状态下是处于禁用状态的。
如前文所述,Join阶段完成之后,Manager会从Group中选中一个Worker为Leader,在Sync阶段,由这个Leader负责Task的分配,然后将分配结果回传给Manager,Manager再将结果下发给该分组下的所有worker。那么,为何我们不在Manager进行任务分配呢,直接在Join-Response中将分配结果下发给Worker,这样还可以省去Sync阶段,缩短Reblance的时间,其原因如下所示:
因为我们深度参考了kafka的设计,先来看一下kafka不在broker进行assign的原因:https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Client-side+Assignment+Proposal,
总结来看其原因主要有:
- 部署运维不方便
每次新增或修改Assignments策略,都需要对broker进行重新部署,当集群规模大的时候费时又费力,运维成本非常高 - 扩展性差
kafka的生态体系越来越丰富,衍生出了越来越多的子产品,这些子产品对HA和(Re-)Balance的设计要求基本相同,最大的不同点就在资源的分配上,如kafka-connect,分配的资源是job和task,而不是partition。如果在broker端进行"resource assignment",那么每新增一个子产品,都要为broker引入新的领域概念,broker将变的越来越重、越来越复杂,极大的限制了系统的扩展性;如果在客户端进行"resource assignment",broker只提供"group management",那么不仅能最大限度的让所有产品复用HA基础设置,还能实现对broker的零侵入,系统的扩展性大大增强。 - 耦合性高
即使没有基于kafka扩充新产品的需求,但broker和consumer也都分别有自己的【领域模型】,如果在broker端进行assign,一些简单的分配策略也许还没什么问题(比如:Round Robin),但进行一些特殊的或高级的资源分配策略时,可能需要用到consumer自己领域内的一些【指标】或【元数据】,显然这些都不合适在broker端实现
基于以上的描述,datalink中倒没有部署运维不方便的问题,因为Manager只有两个节点做HA互备,但从【扩展性】和【耦合性】两个维度看的话,在Worker端进行Assign还是上上策,虽然多了一个Sync阶段,但是这个对性能的影响并不大,完全在可接受范围内,so,Woker Assignment First
目前我们的Task分配策略还比较简单,只支持RoundRobin一种方式,新的策略还在规划设计中,简要介绍如下:
- 基于资源的分配
通过MetaData获取到Worker的CPU、内存、网卡等的配置参数,通过运行统计信息获取到Task的流量排行,基于这些信息实现Task和Worker的最优分配,解决分配倾斜的问题 - 降低Task的不可用时间
在现有的方案中,Worker每次Re-join之前都会关闭其运行的Task,但Re-balance之后,这些Task很可能仍然在此worker上运行,需要进行优化,尽量避免Task的重启 - Sticky Assign
增加对【粘性分配】的支持,可以显示指定Task和Worker之间的绑定关系,Task优先分配给绑定关系中配置的worke,除非worker已不在分组中
说明:如下链接的相关描述都是基于kafka0.9,同【Kafka-0.10】以及【datalink】相比,很多细节是有不少差异的,仅供参考
https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Client+Re-Design
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Client-side+Assignment+Proposal
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Detailed+Consumer+Coordinator+Design