Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUES #129]feat(doc) add design documentation directory #130

Merged
merged 2 commits into from
Feb 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions docs/design/1.RocketMQ-streams整体架构.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
### 总体架构

![img.png](../images/总体架构图.png)

数据从RocketMQ中被RocketMQ-streams消费,经过处理最终被写回到RocketMQ。
如果流处理任务中含有算子groupBy,则需要将数据按照Key进行分组,将分组数据写入shuffle topic。后续算子从
shuffle topic消费。如果还涉及count之类有状态算子,那么计算时需要读写状态,在窗口触发之后将计算结果写出。


### 任务并行度模型

![img_2.png](../images/扩容前.png)

计算实例实质上是依赖了Rocket-streams SDK的client,因此,计算实例消费的MQ依赖RocketMQ rebalance分配,
计算实例总个数也不能大于消费总MQ个数,否则将有部分计算实例处于等待状态,消费不到数据。

一个计算实例可以消费多个MQ,一个实例内也只有一张计算拓扑图。

### 状态
![img_3.png](../images/状态.png)

对于有状态算子,他的状态本地依赖RocksDB加速读取,远程依赖Mysql做持久化。允许流计算任务时,可以只依赖本地存储
RocksDB, 只需要将setLocalStorageOnly设置成true即可。这种情况下可能存在状态丢失。



### 扩缩容

![img.png](../images/缩容.png)

当计算实例从3个缩容到2个,借助于RocketMQ的rebalance,MQ会在计算实例之间重新分配。
Instance1上消费的MQ2和MQ3被分配到Instance2和Instance3上,这两个MQ的状态数据也需要迁移到Instance2
和Instance3上,这也暗示,状态数据是根据源数据分片保存的;扩容则是刚好相反的过程。
73 changes: 73 additions & 0 deletions docs/design/2.构建DataStream.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
DataStreamSource中有一个PipelineBuilder,在后续构建过程中,这个PipelineBuilder会一直向后流传,
将构建过程中产生的source、stage添加进来;最后在start的时候,真正利用PipelineBuilder构建出拓扑图。

### source类型
- 设置source的namespace、configureName;
- 将source保存到PipelineBuilder中;
- 将source作为source节点保存到PipelineBuilder中的ChainPipeline中;

### ChainStage类型

所有的其他运算,包括map,filter,script,window都会先构建出ChainStage,然后以ChainStage的身份进入
PipelineBuilder,参加后续构建。

在DataStream中一个典型的添加新算子,过程如下所示:
```java

public DataStream script(String script) {
//将用户定义的cript转化成ChainStage
// ChainStage<?> stage = this.mainPipelineBuilder.createStage(new ScriptOperator(script));
//将ChainStage添加到PipelineBuilder中,构建拓扑。
this.mainPipelineBuilder.setTopologyStages(currentChainStage, stage);
//将PipelineBuilder构建成DataStream,向后传递,后续还可以用该PipelineBuilder构建拓扑
return new DataStream(this.mainPipelineBuilder, this.otherPipelineBuilders, stage);
}

```

### 创建ChainStage

PipelineBuilder创建,创建过程中会设置label,并将这个ChainStage添加到PipelineBuilder持有ChainPipeline中

- 把ChainStage添加到pipeline中
在构建过程中,所有的添加算子都使用一个共同的PipelineBuilder实例,PipelineBuilder结构如图所示,他持有
一个ChainPipeline实例,ChainPipeline实例中含有一个ISource和多个stages,还有一个label与stage的映射关系,
以及用于寻找下个stage的label。
![img.png](../images/Pipeline类图.png)
在createStage过程中,将chainStage加入到Pipeline中。

在setTopologyStages 过程中将label加入到Pipeline中;

### 设置拓扑
```java
public void setTopologyStages(ChainStage currentChainStage, List<ChainStage> nextStages) {
if (isBreak) {
return;
}
if (nextStages == null) {
return;
}
List<String> lableNames = new ArrayList<>();
for (ChainStage stage : nextStages) {
lableNames.add(stage.getLabel());
}

if (currentChainStage == null) {
this.pipeline.setChannelNextStageLabel(lableNames);
} else {
currentChainStage.setNextStageLabels(lableNames);
for (ChainStage stage : nextStages) {
stage.getPrevStageLabels().add(currentChainStage.getLabel());
}
}
}
```

如果是首个ChainStage,则设置下一跳的label;如果不是首个,需要将下个stage的label设置进入当前stage。
同时,下个stage也需要设置前一个stage的label标签。形成双向链表的结构。






53 changes: 53 additions & 0 deletions docs/design/3.启动DataStream.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
### Start流程

流式计算在运行时可以拉起多个相同实例进行扩容,所以不能直接启动上述已经构建好的拓扑图,需要将上述构建好的拓扑
图保存起来,需要扩容时,直接拿出算子的副本,实例化启动即可。

### 统一管理点

- 加载统一管理点IConfigurableService;

三种方式存储:Memory, db, file

- PipelineBuilder的build方法,将构建构成中保存起来的IConfigurable,source和statge都是IConfigurable,
保存到IConfigurableService中;

- IConfigurableService的refreshConfigurable方法;

1.主要做的事可以概括:从统一管理点加载出组件,赋值,init,在调用后置方法doProcessAfterRefreshConfigurable。

2.ChainPipeline的后置方法比较特殊,会调用pipeline中各个组件的后置方法,如果这个组件是普通UDFChainStage,
那么将会反序列化,实例成StageBuilder。如果是WindowChainStage,会讲用户数据接收的window实例化出来。

3.从IConfigurable中加载实例副本出来;

4.将实例副本赋值;

5.初始化实例副本,实例都是AbstractConfigurable的继承类,调用他的的init方法。比如在初始化rocketmqSource
的时候,就会在此时调用init方法,先于启动方法调用;

6.调用IConfigurable的doProcessAfterRefreshConfigurable方法,目前只有ChainPipeline会调用,
(典型的是ChainPipeline),会在此方法中构建label与stage映射的stageMap;设置source;再调用
ChainPipeline中各个stage的doProcessAfterRefreshConfigurable方法;

7.这里ChainPipeline的stage都是UDFChainStage类似。UDFChainStage的
doProcessAfterRefreshConfigurable方法会将之前序列化好的StageBuilder反序列化,成为StageBuilder实例。

8.如果这个stage是window类型的WindowChainStage,ChainPipeline调用各个stage的
doProcessAfterRefreshConfigurable。这里会将用于数据接收的window实例化赋值;

9.OutputChainStage此时会从统一管理点IConfigurableService查询出sink实例,并赋值给自己sink字段;


### ChainPipeline的启动
```java
pipeline.startChannel();
```

将ChainPipeline作为整个数据接收的入口,并启动source;

当source有数据进来时,ChainPipeline将会收到数据;具体方法是ChainPipeline的doMessageInner方法;

该方法将数据封装承AbstractContext后,向后传递;


63 changes: 63 additions & 0 deletions docs/design/4.数据的流转过程.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
###总体过程

![img_1.png](../images/总体过程.png)

数据流转整体过程如图所示,黑色箭头线是数据流,橙色为控制流。数据的整体流向是从source中接收到,经过
AbstractSource判断是否发出系统消息,在进入ChainPipeline,ChainPipeline根据之间构建好的处理拓扑图,使用
深度优先策略找出下一个处理节点stage,交给Pipeline。Pipeline发现如果是系统消息则对stage执行特殊的控制逻辑,
如果不是,则用stage来处理具体数据。

### 无window算子执行流程
- source从RocketMQ中消费数据,进入RocketMQSource的父类AbstractSource;
- AbstractSource启动控制流,判断是否数据来自新分片,如果是,首先向下游传递一条NewSplitMessage消息,等待系
统消息处理完成返回后,才能继续处理该数据。
- NewSplitMessage进入Pipeline,如果是系统消息,stage执行该类系统消息对应的控制操作。如果不是系统消息则用
stage处理数据;
- Pipeline执行完成后,返回到ChainPipeline,选择下一个stage继续执行;
- 遍历stage直到结束。

### 含有window算子执行流程

![img_2.png](../images/有状态算子.png)

- 数据流和控制流在上述流程一致,即先进入source,然后由AbstractSource判断是否发出发出系统消息,再进入
ChainPipeline按照已经构建好的拓扑图执行。
- 不同的是,如果是window算子,那么这条数据在执行具体计算之前需要先按照groupBy分组,在执行算子,例如count。
分组操作需要借助于shuffle topic完成,即写入shuffle topic之前先按照groupBy的值,计算数据写入目的
MessageQueue,相同groupBy值的数据将被写入一个MessageQueue中。这样shuffle数据被读取时,
groupBy值相同的数据总会被一个client处理,达到按照groupBy分组处理的效果。

- ShuffleChannel会自动订阅、消费shuffle topic。数据会经过shuffle并在ShuffleChannel中再次被消费到。
- 判断是否是系统消息,如果是,执行该种类系统消息对应的控制流操作。
- 如果不是系统消息,触发window中算子计算,比如算子是count,就对某个key出现的次数加1;count算子用到的状
态会在接收到NewSplitMessage类型系统消息时提前加载好。计算结束后的状态保存到RocksDB或者mysql中。

- window到时间后,将计算结果输出到下游stage继续计算,并清理RocksDB、Mysql中对应的状态。


### 系统消息

#### NewSplitMessage
当发现数据来自新分片(MessageQueue)时,由AbstractSource产生并向下游拓扑传递。

作用于window算子,使其提前加载该分片对应的状态数据到内存,使得状态数据对该分片数据进行计算时,能使用
到对应的状态,得出正确的结果。

#### CheckPointMessage

##### 产生时机:
- 消费分片移除时;
- RocketMQ-streams向broker提交消费offset时;
- 处理完一批次消息后;

##### 作用
- 作用于各个缓存,例如将数据写入shuffle topic之前的WindowCache,使缓存中数据写出到下游。
- 作用于sink,将sink中缓存而未写出的数据写出;
- 将有状态算子的状态flush到存储;

#### RemoveSplitMessage
比较RocketMQ client触发rebalance前后消费的分片,如果某个分片不在被消费,需要将该分片移除,在移除该分配时发出
RemoveSplitMessage类型消息。

作用于window算子,将RocksDB中状态清除;

55 changes: 55 additions & 0 deletions docs/design/5.Window算子解析.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
### window算子初始化
window的实例化和初始化时机,与普通无状态算子一样,在构建DataStream阶段以stage形式加入pipeline。在启动
DataStream阶段完成window的初始化。

![img.png](../images/window.png)

- 给window初始化WindowStorage用户状态存储;

WindowStorage包括localStorage存储和remoteStorage存储;localStorage使用RocksDB,
remoteStorage使用mysql;

- 向window添加一个WindowCache的匿名实例,用于存储写入shuffle topic之前数据;
- 向window添加SQLCache,作为写入Mysql之前的缓存;
- 向window添加ShuffleChannel,作为写出shuffle和接收来自shufffle topic数据的通道;


### ShuffleChannel写出shuffle数据
AbstractShuffleWindow的doMessage方法,将数据写入shuffleChannel
```java
public AbstractContext<IMessage> doMessage(IMessage message, AbstractContext context) {
shuffleChannel.startChannel();
return super.doMessage(message, context);
}
```

- shuffleChannel.startChannel
启动shuffleChannel中的consumer,从shuffletopic中消费数据;如果有消费到数据,将由
shuffleChannel的doMessage处理。

- AbstractWindow.doMessage方法

对于一条消息来说,window 首先需要检查是否有窗口实例,如果没有则创建。如果窗口实例已经超过最大的watermark,
数据丢弃,否则进行消息积累 消息会先经历batchAdd 然后flush加入到windowCache中;windowCache定时触发,加入到
shuffleMsgCache中,shuffleMsgCache中定时发出,用shuffleMsgCache中的producer写出到rocketmq。

### ShuffleChannel接收到shuffle数据
ShuffleChannel#doMessage方法;

将shuffle消息加入到shuffleCache中

最终进入ShuffleCache#batchInsert中

WindowOperator#shuffleCalculate中

实际窗口计算:WindowValue#calculate

计算后并不会马上触发窗口,窗口需要定时出发

### window触发
WindowFireSource#startSource启动定时任务,1s检查一次窗口是否触发WindowFireSource#fireWindowInstance
WindowOperator#fireWindowInstance

windowFireSource.executeMessage

windowFireSource.executeMessage这个方法里面会执行pipeline的下个节点
Empty file.
Binary file added docs/images/Pipeline类图.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/images/window.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/images/总体架构图.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/images/总体过程.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/images/扩容前.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/images/有状态算子.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/images/状态.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/images/缩容.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
public class FileSourceExample {
public static void main(String[] args) {
DataStreamSource source = StreamBuilder.dataStream("namespace", "pipeline");
source.fromFile("/Users/junjie.cheng/jobs/access.log", false)
source.fromFile("data.txt", false)
.map(message -> message)
.toPrint(1)
.start();
Expand Down