Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][Master+API+Scheduler] Propose for master refactor and scheduler module #4355

Closed
lenboo opened this issue Dec 31, 2020 · 10 comments
Closed
Labels
feature new feature
Milestone

Comments

@lenboo
Copy link
Contributor

lenboo commented Dec 31, 2020

Backgroud:

refer: #4083

At present, the problems of master:

There are many polling, that result in unnecessary time-cost

The distributed lock is used when the command is taken, that result in the bottleneck of concurrency

Too many threads(nProcessInstance*nTaskInstances) are used, that result in the waste of system resources

Polling database result in database query pressure bottleneck

here are the propose:

1. Reconstruct the communication function

  • Synchronization: sending thread sends message (blocking) - > the receiver receives the message and processes the message (storing dB, writing data, etc.) - > the receiver returns the message to the sender - > the sender unblocks

  • Asynchronous: send thread send message, send thread cache message, receive message and process message, reply to sender command after processing message, remove cache message after receiving command

2. Add scheduler function

image

  • Realize ha function

  • To implement the scheduler start process, scan the CMD table first, and then start the monitoring function of CMD to cache the CMD data to the local queue

  • Monitor the receiving process of CMD (synchronization)

  • CMD caches the queue processing thread and sends the CMD to the master according to the policy

  • The implementation of CMD sending policy can support multiple policies at the same time, and can be easily extended, such as the priority of CMD, the load of master, and so on

3. API

  • API receives the execution workflow command or pause / stop command, and sends the command to the specified scheduler / master. If it fails, it will try again three times. If it fails three times, it will throw a failure.

4. Fault tolerant process modification:

  • Master fault tolerance

  • Workflow instance responsible for fault tolerant master: find the unfinished workflow instance, generate fault tolerant CMD, and send it to active scheduler

    • Find the unfinished workflow instance and generate the fault-tolerant CMD

    • Send CMD to active scheduler

  • Find the unfinished command and send it to the scheduler for reprocessing (remove the host and reassign the master)

    • The unfinished command is found according to the host of the dead Master

    • Send the command back to the scheduler and let the scheduler reallocate it

  • Worker fault tolerance

    • When the worker hangs up, the master listens to the message and finds out all the unfinished tasks on the worker from the DB

    • Judge whether the DAG of the task belongs to itself, and change the state of the task to "fault tolerant"

    • Trigger task status modification

5. Master execution process

image

  • Modify the process pool of master processing workflow, from obtaining CMD to generating workflow instance to starting task submission to the end of queue

    • When the master submits a task, it is found that the task is in progress and needs to be updated

    • Check (whether the task instance worker is alive + worker start time < task start time). If the worker has fault tolerance, modify the task state to "fault tolerance" and trigger the task state change

    • Inform the worker where the task is, and change the host reported by the worker to the current master

  • Add master task status monitoring

    • Receive task / workflow status from worker / API / Master

    • Save the received task status to the corresponding DAG's unprocessed status queue

    • The corresponding DAG cannot be found in the current task state, and the receive failure is required

  • Add the thread pool of master task state processing, all task states of the same workflow can only be processed sequentially

    • When the master receives a task, it needs to determine whether the DAG to which the task belongs is processed by the master. If not, it does not belong to its own processing and needs to discard this status

    • Determine whether a thread is already processing the DAG state. If it already exists, the DAG state will not be processed

    • Determine whether the task state exists in the list of unprocessed states in DAG. If not, discard the state

    • Start the thread to process the DAG task status, and add DAG as key and feature as value to the map to facilitate the previous judgment

    • After processing the task state, delete the map and release the thread

  • The polling thread is added to poll the external workflow / task status (dependent / sub workflow).

    • Query the status of dependent tasks

    • Query whether the dependent task component is successful

    • According to the combination of dependency conditions, the dependency state is generated

    • Query subworkflow status

    • Query parent child relationship table

    • Query the status of a subworkflow

    • Modify the state of a subtask based on the state of the subworkflow

  • Add a timeout monitoring queue, add the task / workflow that needs to be monitored to the queue, and a time wheel / thread will monitor the timeout. If a timeout occurs, the timeout processing will be triggered.

    • The newly submitted task adds a timeout monitoring task in the timeout queue

    • If the task is in the timeout queue and the timeout is triggered, it will alarm or kill the task according to the timeout policy

    • When the task status is completed, it is removed from the timeout queue

  • The master monitors the CMD thread, marks the CMD, and caches the CMD to the local CMD queue.

    • Listen to the received CMD, mark it as the host of the master, cache it to the local CMD queue, and return the success message of the sender
  • The CMDS that cannot be processed by the master can be fed back to the scheduler for redistribution

    • When the master processing the CMD thread finds that the resources are insufficient, it feeds the CMD back to the scheduler for redistribution. If the feedback is successful, it is deleted from the local CMD queue
  • Actively report the resource usage, and the master reports the resource usage to the scheduler.

    • The master sends resource usage information to ZK and scheduler at the same time during heartbeat

6. Timing

  • Master timing: to prevent a timing from triggering multiple times: there may be multiple times of timing when the master sends the timing to the scheduler.

    • Using quartz distributed timing, each trigger timing

    • Add a unique index (definitionid + schedulertime + datetime) to the CMD table and the workflow instance table to prevent duplication.

==================================

1. 重构通信功能

  • 同步: 发送线程send消息(阻塞) -> 接收方收到消息,并且处理消息(存db,写数据等)->接收方返回消息给发送方 -> 发送方解除阻塞

  • 异步: 发送线程send消息->发送线程缓存消息-> 接收方收到消息,并处理->处理完消息回复发送方command -> 发送方收到command,移除缓存消息

2. 新增scheduler功能

image

  • 实现ha功能

  • 实现scheduler启动流程,先扫描cmd表,再启动监听cmd功能,将cmd数据缓存到本地队列

  • 监听cmd接收处理流程(同步)

  • cmd缓存队列处理线程,将cmd根据策略发送给master

  • 实现cmd发送策略,可以同时支持多个策略,且可以易扩展策略,例如:cmd的优先级,master的负载等等

3.API部分

  • api收到执行工作流命令,或者暂停/停止命令,将命令发送给指定的scheduler/master,失败则重试三次,三次失败以后抛出失败。

4. 容错流程修改:

  • master容错

    • 容错master负责的工作流实例: 找到未完成的工作流实例,生成容错cmd,将cmd发给active的scheduler.

      • 找到未完成的工作流实例,生成容错的cmd
      • 将cmd发送给active的scheduler
    • 找到还未处理完的command,发送到scheduler进行重新处理(去掉host,重新分配master)

      • 根据挂掉的master的host发现未处理完的command
      • 将command发回给scheduler,让scheduler重新分配
  • worker容错

    • worker挂掉,master监听到消息,从DB查出所有此worker上未完成的任务
    • 判断任务所属DAG是否属于自身,将属于自身的任务状态改为“容错”
    • 触发任务状态修改

5. master执行流程

image

  • 修改master处理工作流线程池,从获取cmd到生成工作流实例,到开始任务全部提交到队列结束

    • master提交任务的时候,发现此任务正在执行,需要
      • 检查(任务实例worker是否活着+worker启动时间<任务开始时间 ),如果worker发生了容错,则修改任务状态为“容错”,并触发任务状态变化
      • 通知任务所在的worker,将worker汇报的host更换到当前master
  • 增加master任务状态监听

    • 从worker/api/master收到任务/工作流状态
    • 将接收到的任务状态存到对应DAG的未处理状态队列
    • 当前任务状态找不到对应的DAG,需要返回接收失败
  • 增加master任务状态处理线程池,同一个工作流的所有任务状态只能顺序处理

    • master接收到任务,需要判断此任务所属的DAG是否本master处理,如不是,则不属于自己处理,需要丢弃这个状态
    • 判断是否已经有线程在处理这个DAG的状态,如果已经存在,则不处理这个状态
    • 判断此任务状态是否存在于DAG的未处理状态列表中,如果不存在,则丢弃这个状态
    • 启动线程处理DAG任务状态,将DAG为key,feature为value,加入map,方便前面判断使用
    • 处理完任务状态以后,将map删除,线程释放
  • 增加轮询线程,针对需要获取外部工作流/任务状态的需求(依赖/子工作流),进行轮询。

    • 查询依赖任务的状态
      • 查询依赖的任务组件是否成功
      • 根据依赖条件组合生成依赖状态
    • 查询子工作流状态
      • 查询父子关系表
      • 查询子工作流的状态
      • 根据子工作流状态修改子任务的状态
  • 增加超时监控队列,将需要监控的任务/工作流加入队列,由一个时间轮/线程进行超时监控,发生超时,则触发超时处理。

    • 新提交的任务在超时队列中新增超时监控任务
    • 任务在超时队列中,触发超时,则根据超时策略,报警或者kill任务
    • 任务状态完成则从超时队列中删除
  • master监听cmd线程,对cmd标记,cmd缓存到本地cmd队列。

    • 监听收到的cmd, 将cmd标记master所在的host,同时缓存到本地cmd队列,返回消息发送方成功消息
  • master处理不完的cmd可以反馈回scheduler进行重新分发

    • master处理cmd线程发现资源不够,将cmd反馈给scheduler进行重新分发,反馈成功则从本地cmd队列删除
  • 主动上报资源使用情况,master向scheduler汇报资源使用情况。

    • master在心跳时向zk和scheduler同时发送资源使用情况

6. 定时

  1. master定时:防止一个定时触发多次:master将定时发给scheduler过程中可能会有多次定时出现。
    • 使用quartz分布式定时,每次触发定时
    • 对cmd表和工作流实例表增加唯一索引(definitionId+schedulerTime+dateTime)防止产生重复问题。
@lenboo lenboo added the feature new feature label Dec 31, 2020
@lenboo lenboo closed this as completed Dec 31, 2020
@lenboo
Copy link
Contributor Author

lenboo commented Dec 31, 2020

the whole structure:

image

@lenboo lenboo changed the title [Feature][Master+API+Scheduler]master refactor and scheduler module [Feature][Master+API+Scheduler] Propose for master refactor and scheduler module Jan 4, 2021
@lenboo lenboo reopened this Jan 6, 2021
@lenboo lenboo closed this as completed Jan 19, 2021
@lenboo lenboo reopened this Jan 19, 2021
@lenboo
Copy link
Contributor Author

lenboo commented Jun 10, 2021

关于Scheduler模块
这确实是一个比较好的方案,可以很好地去掉zk锁,也可以减少数据库轮询次数,但是同时HA模式也是一个单点工作模式,当cmd的数量很大的情况下,分发cmd将会是瓶颈点,所以我们考虑实现真正的去中心化分布式设计来实现对cmd的处理。

我们考虑另外一种方案: hash分配cmd,去掉scheduler

  1. master启动时,根据zk和配置文件(槽位配置)计算出自己的槽位数,可能是多个
  2. master根据自己的masterIds从db取cmd,根据cmdId除模取余,只取和槽位匹配的cmd
  3. master上下线处理(master1=m1,master2=m2)
    1. m1上线,先计算出自己的masterIds,再开始取任务执行
    2. m2上线,m1收到m2上线的消息以后,获取到一个全局锁,停止取cmd,重新计算自己的masterIds,同步给所有的master消息
    3. 其他master收到消息以后,停止取cmd,重新计算自身masterIds,并将结果同步回复给m1
    4. m1确定所有master都回复以后,再同步给所有的master一条消息,开始取新的cmd,释放全局锁
    5. m1如果收到回复超时或者失败了,需要重新执行同步所有master的操作。
    6. m3/m4如果收到已经处理过的消息,就跳过处理流程。

image

image

@Issues-translate-bot
Copy link

Bot detected the issue body's language is not English, translate it automatically. 👯👭🏻🧑‍🤝‍🧑👫🧑🏿‍🤝‍🧑🏻👩🏾‍🤝‍👨🏿👬🏿


About the Scheduler module
This is indeed a better solution, which can well remove the zk lock and reduce the number of database polls, but at the same time the HA mode is also a single point of work mode. When the number of cmd is large, the distribution of cmd will It is the bottleneck point, so we consider implementing a truly decentralized distributed design to realize the processing of cmd.

We consider another option:
Hash allocation cmd, remove scheduler

  1. When the master starts, it calculates its own slot number according to zk and configuration file (slot configuration), which may be more than one
  2. The master takes cmd from db according to its own masterIds, and takes the remainder according to cmdId, and only takes cmd that matches the slot.
  3. Master online and offline processing (master1=m1, master2=m2)
    1. m1 is online, first calculate its own masterIds, and then start to take the task to execute
    2. After m2 goes online, m1 obtains a global lock after receiving the message that m2 goes online, stops fetching cmd, recalculates its own masterIds, and synchronizes messages to all masters
    3. After receiving the message, other masters stop fetching cmd, recalculate their own masterIds, and reply the result to m1 synchronously
    4. After m1 confirms that all masters have responded, it will synchronize a message to all masters, start to fetch a new cmd, and release the global lock
    5. If m1 receives a response timeout or fails, it needs to re-execute the operation of synchronizing all masters.
    6. If m3/m4 receives a message that has already been processed, skip the processing flow.

@github-actions
Copy link

Hi:

1 similar comment
@github-actions
Copy link

Hi:

@CalvinKirs
Copy link
Member

+1 Sounds good to me.

@echohlne
Copy link
Contributor

It seems like a huge challenge, but it's worth focusing on

@CalvinKirs
Copy link
Member

It seems like a huge challenge, but it's worth focusing on

yup, do you have any ideas about this (design or involvement in the development of the code)?

@echohlne
Copy link
Contributor

@CalvinKirs sorry for the delay, If there is a chance, I am willing to participate the development.

@lenboo
Copy link
Contributor Author

lenboo commented Oct 14, 2021

this work already have been done. so i close this issue.

@lenboo lenboo closed this as completed Oct 14, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature new feature
Projects
None yet
Development

No branches or pull requests

4 participants