diff --git "a/docs/design/1.RocketMQ-streams\346\225\264\344\275\223\346\236\266\346\236\204.md" "b/docs/design/1.RocketMQ-streams\346\225\264\344\275\223\346\236\266\346\236\204.md" new file mode 100644 index 00000000..8e564a8c --- /dev/null +++ "b/docs/design/1.RocketMQ-streams\346\225\264\344\275\223\346\236\266\346\236\204.md" @@ -0,0 +1,33 @@ +### 总体架构 + +![img.png](../images/总体架构图.png) + +数据从RocketMQ中被RocketMQ-streams消费,经过处理最终被写回到RocketMQ。 +如果流处理任务中含有算子groupBy,则需要将数据按照Key进行分组,将分组数据写入shuffle topic。后续算子从 +shuffle topic消费。如果还涉及count之类有状态算子,那么计算时需要读写状态,在窗口触发之后将计算结果写出。 + + +### 任务并行度模型 + +![img_2.png](../images/扩容前.png) + +计算实例实质上是依赖了Rocket-streams SDK的client,因此,计算实例消费的MQ依赖RocketMQ rebalance分配, +计算实例总个数也不能大于消费总MQ个数,否则将有部分计算实例处于等待状态,消费不到数据。 + +一个计算实例可以消费多个MQ,一个实例内也只有一张计算拓扑图。 + +### 状态 +![img_3.png](../images/状态.png) + +对于有状态算子,他的状态本地依赖RocksDB加速读取,远程依赖Mysql做持久化。允许流计算任务时,可以只依赖本地存储 +RocksDB, 只需要将setLocalStorageOnly设置成true即可。这种情况下可能存在状态丢失。 + + + +### 扩缩容 + +![img.png](../images/缩容.png) + +当计算实例从3个缩容到2个,借助于RocketMQ的rebalance,MQ会在计算实例之间重新分配。 +Instance1上消费的MQ2和MQ3被分配到Instance2和Instance3上,这两个MQ的状态数据也需要迁移到Instance2 +和Instance3上,这也暗示,状态数据是根据源数据分片保存的;扩容则是刚好相反的过程。 diff --git "a/docs/design/2.\346\236\204\345\273\272DataStream.md" "b/docs/design/2.\346\236\204\345\273\272DataStream.md" new file mode 100644 index 00000000..af12e27a --- /dev/null +++ "b/docs/design/2.\346\236\204\345\273\272DataStream.md" @@ -0,0 +1,73 @@ +DataStreamSource中有一个PipelineBuilder,在后续构建过程中,这个PipelineBuilder会一直向后流传, +将构建过程中产生的source、stage添加进来;最后在start的时候,真正利用PipelineBuilder构建出拓扑图。 + +### source类型 + - 设置source的namespace、configureName; + - 将source保存到PipelineBuilder中; + - 将source作为source节点保存到PipelineBuilder中的ChainPipeline中; + +### ChainStage类型 + +所有的其他运算,包括map,filter,script,window都会先构建出ChainStage,然后以ChainStage的身份进入 +PipelineBuilder,参加后续构建。 + +在DataStream中一个典型的添加新算子,过程如下所示: +```java + +public DataStream script(String script) { + //将用户定义的cript转化成ChainStage + // ChainStage stage = this.mainPipelineBuilder.createStage(new ScriptOperator(script)); + //将ChainStage添加到PipelineBuilder中,构建拓扑。 + this.mainPipelineBuilder.setTopologyStages(currentChainStage, stage); + //将PipelineBuilder构建成DataStream,向后传递,后续还可以用该PipelineBuilder构建拓扑 + return new DataStream(this.mainPipelineBuilder, this.otherPipelineBuilders, stage); +} + +``` + +### 创建ChainStage + +PipelineBuilder创建,创建过程中会设置label,并将这个ChainStage添加到PipelineBuilder持有ChainPipeline中 + +- 把ChainStage添加到pipeline中 + 在构建过程中,所有的添加算子都使用一个共同的PipelineBuilder实例,PipelineBuilder结构如图所示,他持有 + 一个ChainPipeline实例,ChainPipeline实例中含有一个ISource和多个stages,还有一个label与stage的映射关系, + 以及用于寻找下个stage的label。 + ![img.png](../images/Pipeline类图.png) +在createStage过程中,将chainStage加入到Pipeline中。 + +在setTopologyStages 过程中将label加入到Pipeline中; + +### 设置拓扑 +```java +public void setTopologyStages(ChainStage currentChainStage, List nextStages) { + if (isBreak) { + return; + } + if (nextStages == null) { + return; + } + List lableNames = new ArrayList<>(); + for (ChainStage stage : nextStages) { + lableNames.add(stage.getLabel()); + } + + if (currentChainStage == null) { + this.pipeline.setChannelNextStageLabel(lableNames); + } else { + currentChainStage.setNextStageLabels(lableNames); + for (ChainStage stage : nextStages) { + stage.getPrevStageLabels().add(currentChainStage.getLabel()); + } + } + } +``` + +如果是首个ChainStage,则设置下一跳的label;如果不是首个,需要将下个stage的label设置进入当前stage。 +同时,下个stage也需要设置前一个stage的label标签。形成双向链表的结构。 + + + + + + diff --git "a/docs/design/3.\345\220\257\345\212\250DataStream.md" "b/docs/design/3.\345\220\257\345\212\250DataStream.md" new file mode 100644 index 00000000..91f91f2f --- /dev/null +++ "b/docs/design/3.\345\220\257\345\212\250DataStream.md" @@ -0,0 +1,53 @@ +### Start流程 + +流式计算在运行时可以拉起多个相同实例进行扩容,所以不能直接启动上述已经构建好的拓扑图,需要将上述构建好的拓扑 +图保存起来,需要扩容时,直接拿出算子的副本,实例化启动即可。 + +### 统一管理点 + +- 加载统一管理点IConfigurableService; + + 三种方式存储:Memory, db, file + +- PipelineBuilder的build方法,将构建构成中保存起来的IConfigurable,source和statge都是IConfigurable, + 保存到IConfigurableService中; + +- IConfigurableService的refreshConfigurable方法; + + 1.主要做的事可以概括:从统一管理点加载出组件,赋值,init,在调用后置方法doProcessAfterRefreshConfigurable。 + + 2.ChainPipeline的后置方法比较特殊,会调用pipeline中各个组件的后置方法,如果这个组件是普通UDFChainStage, + 那么将会反序列化,实例成StageBuilder。如果是WindowChainStage,会讲用户数据接收的window实例化出来。 + + 3.从IConfigurable中加载实例副本出来; + + 4.将实例副本赋值; + + 5.初始化实例副本,实例都是AbstractConfigurable的继承类,调用他的的init方法。比如在初始化rocketmqSource + 的时候,就会在此时调用init方法,先于启动方法调用; + + 6.调用IConfigurable的doProcessAfterRefreshConfigurable方法,目前只有ChainPipeline会调用, + (典型的是ChainPipeline),会在此方法中构建label与stage映射的stageMap;设置source;再调用 + ChainPipeline中各个stage的doProcessAfterRefreshConfigurable方法; + + 7.这里ChainPipeline的stage都是UDFChainStage类似。UDFChainStage的 + doProcessAfterRefreshConfigurable方法会将之前序列化好的StageBuilder反序列化,成为StageBuilder实例。 + + 8.如果这个stage是window类型的WindowChainStage,ChainPipeline调用各个stage的 + doProcessAfterRefreshConfigurable。这里会将用于数据接收的window实例化赋值; + + 9.OutputChainStage此时会从统一管理点IConfigurableService查询出sink实例,并赋值给自己sink字段; + + +### ChainPipeline的启动 +```java +pipeline.startChannel(); +``` + +将ChainPipeline作为整个数据接收的入口,并启动source; + +当source有数据进来时,ChainPipeline将会收到数据;具体方法是ChainPipeline的doMessageInner方法; + +该方法将数据封装承AbstractContext后,向后传递; + + diff --git "a/docs/design/4.\346\225\260\346\215\256\347\232\204\346\265\201\350\275\254\350\277\207\347\250\213.md" "b/docs/design/4.\346\225\260\346\215\256\347\232\204\346\265\201\350\275\254\350\277\207\347\250\213.md" new file mode 100644 index 00000000..1eb791cb --- /dev/null +++ "b/docs/design/4.\346\225\260\346\215\256\347\232\204\346\265\201\350\275\254\350\277\207\347\250\213.md" @@ -0,0 +1,63 @@ +###总体过程 + +![img_1.png](../images/总体过程.png) + +数据流转整体过程如图所示,黑色箭头线是数据流,橙色为控制流。数据的整体流向是从source中接收到,经过 +AbstractSource判断是否发出系统消息,在进入ChainPipeline,ChainPipeline根据之间构建好的处理拓扑图,使用 +深度优先策略找出下一个处理节点stage,交给Pipeline。Pipeline发现如果是系统消息则对stage执行特殊的控制逻辑, +如果不是,则用stage来处理具体数据。 + +### 无window算子执行流程 +- source从RocketMQ中消费数据,进入RocketMQSource的父类AbstractSource; +- AbstractSource启动控制流,判断是否数据来自新分片,如果是,首先向下游传递一条NewSplitMessage消息,等待系 + 统消息处理完成返回后,才能继续处理该数据。 +- NewSplitMessage进入Pipeline,如果是系统消息,stage执行该类系统消息对应的控制操作。如果不是系统消息则用 +stage处理数据; +- Pipeline执行完成后,返回到ChainPipeline,选择下一个stage继续执行; +- 遍历stage直到结束。 + +### 含有window算子执行流程 + +![img_2.png](../images/有状态算子.png) + +- 数据流和控制流在上述流程一致,即先进入source,然后由AbstractSource判断是否发出发出系统消息,再进入 + ChainPipeline按照已经构建好的拓扑图执行。 +- 不同的是,如果是window算子,那么这条数据在执行具体计算之前需要先按照groupBy分组,在执行算子,例如count。 +分组操作需要借助于shuffle topic完成,即写入shuffle topic之前先按照groupBy的值,计算数据写入目的 + MessageQueue,相同groupBy值的数据将被写入一个MessageQueue中。这样shuffle数据被读取时, + groupBy值相同的数据总会被一个client处理,达到按照groupBy分组处理的效果。 + +- ShuffleChannel会自动订阅、消费shuffle topic。数据会经过shuffle并在ShuffleChannel中再次被消费到。 +- 判断是否是系统消息,如果是,执行该种类系统消息对应的控制流操作。 +- 如果不是系统消息,触发window中算子计算,比如算子是count,就对某个key出现的次数加1;count算子用到的状 + 态会在接收到NewSplitMessage类型系统消息时提前加载好。计算结束后的状态保存到RocksDB或者mysql中。 + +- window到时间后,将计算结果输出到下游stage继续计算,并清理RocksDB、Mysql中对应的状态。 + + +### 系统消息 + +#### NewSplitMessage +当发现数据来自新分片(MessageQueue)时,由AbstractSource产生并向下游拓扑传递。 + +作用于window算子,使其提前加载该分片对应的状态数据到内存,使得状态数据对该分片数据进行计算时,能使用 +到对应的状态,得出正确的结果。 + +#### CheckPointMessage + +##### 产生时机: +- 消费分片移除时; +- RocketMQ-streams向broker提交消费offset时; +- 处理完一批次消息后; + +##### 作用 +- 作用于各个缓存,例如将数据写入shuffle topic之前的WindowCache,使缓存中数据写出到下游。 +- 作用于sink,将sink中缓存而未写出的数据写出; +- 将有状态算子的状态flush到存储; + +#### RemoveSplitMessage +比较RocketMQ client触发rebalance前后消费的分片,如果某个分片不在被消费,需要将该分片移除,在移除该分配时发出 +RemoveSplitMessage类型消息。 + +作用于window算子,将RocksDB中状态清除; + diff --git "a/docs/design/5.Window\347\256\227\345\255\220\350\247\243\346\236\220.md" "b/docs/design/5.Window\347\256\227\345\255\220\350\247\243\346\236\220.md" new file mode 100644 index 00000000..4e8be748 --- /dev/null +++ "b/docs/design/5.Window\347\256\227\345\255\220\350\247\243\346\236\220.md" @@ -0,0 +1,55 @@ +### window算子初始化 +window的实例化和初始化时机,与普通无状态算子一样,在构建DataStream阶段以stage形式加入pipeline。在启动 +DataStream阶段完成window的初始化。 + +![img.png](../images/window.png) + +- 给window初始化WindowStorage用户状态存储; + + WindowStorage包括localStorage存储和remoteStorage存储;localStorage使用RocksDB, + remoteStorage使用mysql; + +- 向window添加一个WindowCache的匿名实例,用于存储写入shuffle topic之前数据; +- 向window添加SQLCache,作为写入Mysql之前的缓存; +- 向window添加ShuffleChannel,作为写出shuffle和接收来自shufffle topic数据的通道; + + +### ShuffleChannel写出shuffle数据 +AbstractShuffleWindow的doMessage方法,将数据写入shuffleChannel +```java +public AbstractContext doMessage(IMessage message, AbstractContext context) { + shuffleChannel.startChannel(); + return super.doMessage(message, context); +} +``` + +- shuffleChannel.startChannel +启动shuffleChannel中的consumer,从shuffletopic中消费数据;如果有消费到数据,将由 + shuffleChannel的doMessage处理。 + +- AbstractWindow.doMessage方法 + +对于一条消息来说,window 首先需要检查是否有窗口实例,如果没有则创建。如果窗口实例已经超过最大的watermark, +数据丢弃,否则进行消息积累 消息会先经历batchAdd 然后flush加入到windowCache中;windowCache定时触发,加入到 +shuffleMsgCache中,shuffleMsgCache中定时发出,用shuffleMsgCache中的producer写出到rocketmq。 + +### ShuffleChannel接收到shuffle数据 +ShuffleChannel#doMessage方法; + +将shuffle消息加入到shuffleCache中 + +最终进入ShuffleCache#batchInsert中 + +WindowOperator#shuffleCalculate中 + +实际窗口计算:WindowValue#calculate + +计算后并不会马上触发窗口,窗口需要定时出发 + +### window触发 + WindowFireSource#startSource启动定时任务,1s检查一次窗口是否触发WindowFireSource#fireWindowInstance +WindowOperator#fireWindowInstance + +windowFireSource.executeMessage + +windowFireSource.executeMessage这个方法里面会执行pipeline的下个节点 \ No newline at end of file diff --git "a/docs/design/6.RocketMQ-streams\345\246\202\344\275\225\345\256\236\347\216\260\345\256\271\351\224\231.md" "b/docs/design/6.RocketMQ-streams\345\246\202\344\275\225\345\256\236\347\216\260\345\256\271\351\224\231.md" new file mode 100644 index 00000000..e69de29b diff --git "a/docs/images/Pipeline\347\261\273\345\233\276.png" "b/docs/images/Pipeline\347\261\273\345\233\276.png" new file mode 100644 index 00000000..dafe81a1 Binary files /dev/null and "b/docs/images/Pipeline\347\261\273\345\233\276.png" differ diff --git a/docs/images/window.png b/docs/images/window.png new file mode 100644 index 00000000..30ba8945 Binary files /dev/null and b/docs/images/window.png differ diff --git "a/docs/images/\346\200\273\344\275\223\346\236\266\346\236\204\345\233\276.png" "b/docs/images/\346\200\273\344\275\223\346\236\266\346\236\204\345\233\276.png" new file mode 100644 index 00000000..5eba9cec Binary files /dev/null and "b/docs/images/\346\200\273\344\275\223\346\236\266\346\236\204\345\233\276.png" differ diff --git "a/docs/images/\346\200\273\344\275\223\350\277\207\347\250\213.png" "b/docs/images/\346\200\273\344\275\223\350\277\207\347\250\213.png" new file mode 100644 index 00000000..7a68947e Binary files /dev/null and "b/docs/images/\346\200\273\344\275\223\350\277\207\347\250\213.png" differ diff --git "a/docs/images/\346\211\251\345\256\271\345\211\215.png" "b/docs/images/\346\211\251\345\256\271\345\211\215.png" new file mode 100644 index 00000000..5232b762 Binary files /dev/null and "b/docs/images/\346\211\251\345\256\271\345\211\215.png" differ diff --git "a/docs/images/\346\234\211\347\212\266\346\200\201\347\256\227\345\255\220.png" "b/docs/images/\346\234\211\347\212\266\346\200\201\347\256\227\345\255\220.png" new file mode 100644 index 00000000..a9ec479c Binary files /dev/null and "b/docs/images/\346\234\211\347\212\266\346\200\201\347\256\227\345\255\220.png" differ diff --git "a/docs/images/\347\212\266\346\200\201.png" "b/docs/images/\347\212\266\346\200\201.png" new file mode 100644 index 00000000..e2fd9b2e Binary files /dev/null and "b/docs/images/\347\212\266\346\200\201.png" differ diff --git "a/docs/images/\347\274\251\345\256\271.png" "b/docs/images/\347\274\251\345\256\271.png" new file mode 100644 index 00000000..05dcee41 Binary files /dev/null and "b/docs/images/\347\274\251\345\256\271.png" differ diff --git a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/source/FileSourceExample.java b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/source/FileSourceExample.java index ecaf0bca..b51ea2eb 100644 --- a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/source/FileSourceExample.java +++ b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/source/FileSourceExample.java @@ -22,7 +22,7 @@ public class FileSourceExample { public static void main(String[] args) { DataStreamSource source = StreamBuilder.dataStream("namespace", "pipeline"); - source.fromFile("/Users/junjie.cheng/jobs/access.log", false) + source.fromFile("data.txt", false) .map(message -> message) .toPrint(1) .start();