Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add checkpoint storage #69

Merged
merged 13 commits into from
Sep 18, 2021
106 changes: 106 additions & 0 deletions README-chinese.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
# RocketMQ Streams
## Features

* 轻量级部署:可以单独部署,也支持集群部署
* 多种类型的数据输入以及输出,source 支持 rocketmq , sink 支持db, rocketmq 等

## DataStream Example

```java
import org.apache.rocketmq.streams.client.transform.DataStream;

DataStreamSource source=StreamBuilder.dataStream("namespace","pipeline");

source
.fromFile("~/admin/data/text.txt",false)
.map(message->message)
.toPrint(1)
.start();
```

## Maven Repository

```xml

<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-streams-clients</artifactId>
<version>1.0.0-SNAPSHOT</version>
</dependency>
```

# Core API

rocketmq-stream 实现了一系列高级的API,可以让用户很方便的编写流计算的程序,实现自己的业务需求;

## StreamBuilder

StreamBuilder 用于构建流任务的源; 内部包含```dataStream()``````tableStream()```俩个方法,分别返回DataStreamSource和TableStreamSource俩个源;

+ [dataStream(nameSpaceName,pipelineName)]() 返回DataStreamSource实例,用于分段编程实现流计算任务;
+ [tableStream(nameSpaceName,pipelineName)]()返回TableStreamSource实例, 用于脚本编程实现流计算任务;

## DataStream API

### Source

DataStreamSource 是分段式编程的源头类,用于对接各种数据源, 从各大消息队列中获取数据;

+ ```fromFile``` 从文件中读取数据, 该方法包含俩个参数
+ ```filePath``` 文件路径,必填参数
+ ```isJsonData``` 是否json数据, 非必填参数, 默认为```true```


+ ```fromRocketmq``` 从rocketmq中获取数据,包含四个参数
+ ```topic``` rocketmq消息队列的topic名称,必填参数
+ ```groupName``` 消费者组的名称,必填参数
+ ```isJson``` 是否json格式,非必填参数
+ ```tags``` rocketmq消费的tags值,用于过滤消息,非必填参数

+ ```from``` 自定义的数据源, 通过实现ISource接口实现自己的数据源

### transform

transform 允许在流计算过程中对输入源的数据进行修改,进行下一步的操作;DataStream API中包括```DataStream```,```JoinStream```, ```SplitStream```,```WindowStream```等多个transform类;

#### DataStream

DataStream实现了一系列常见的流计算算子

+ ```map``` 通过将源的每个记录传递给函数func来返回一个新的DataStream
+ ```flatmap``` 与map类似,一个输入项对应0个或者多个输出项
+ ```filter``` 只选择func返回true的源DStream的记录来返回一个新的DStream
+ ```forEach``` 对每个记录执行一次函数func, 返回一个新的DataStream
+ ```selectFields``` 对每个记录返回对应的字段值,返回一个新的DataStream
+ ```operate``` 对每个记录执行一次自定义的函数,返回一个新的DataStream
+ ```script``` 针对每个记录的字段执行一段脚本,返回新的字段,生成一个新的DataStream
+ ```toPrint``` 将结果在控制台打印,生成新的DataStreamAction实例
+ ```toFile``` 将结果保存为文件,生成一个新的DataStreamAction实例
+ ```toDB``` 将结果保存到数据库
+ ```toRocketmq``` 将结果输出到rocketmq
+ ```to``` 将结果经过自定义的ISink接口输出到指定的存储
+ ```window``` 在窗口内进行相关的统计分析,一般会与```groupBy```连用, ```window()```用来定义窗口的大小, ```groupBy()```用来定义统计分析的主key,可以指定多个
+ ```count``` 在窗口内计数
+ ```min``` 获取窗口内统计值的最小值
+ ```max``` 获取窗口内统计值得最大值
+ ```avg``` 获取窗口内统计值的平均值
+ ```sum``` 获取窗口内统计值的加和值
+ ```reduce``` 在窗口内进行自定义的汇总运算
+ ```join``` 根据条件将将俩个流进行关联, 合并为一个大流进行相关的运算
+ ```union``` 将俩个流进行合并
+ ```split``` 将一个数据流按照标签进行拆分,分为不同的数据流供下游进行分析计算
+ ```with``` with算子用来指定计算过程中的相关策略,包括checkpoint的存储策略,state的存储策略等

# Strategy

策略机制主要用来控制计算引擎运行过程中的底层逻辑,如checkpoint,state的存储方式等,后续还会增加对窗口、双流join等的控制;所有的控制策略通过```with```算子传入,可以同时传入多个策略类型;

```java
//指定checkpoint的存储策略
source
.fromRocketmq("TSG_META_INFO","")
.map(message->message+"--")
.toPrint(1)
.with(CheckpointStrategy.db("jdbc:mysql://XXXXX:3306/XXXXX","","",0L))
.start();
```
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
<module>rocketmq-streams-channel-http</module>
<module>rocketmq-streams-state</module>
<module>rocketmq-streams-examples</module>
<module>rocketmq-streams-checkpoint</module>

</modules>

Expand Down
66 changes: 66 additions & 0 deletions rocketmq-streams-checkpoint/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>rocketmq-streams</artifactId>
<groupId>org.apache.rocketmq</groupId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>rocketmq-streams-checkpoint</artifactId>
<name>ROCKETMQ STREAMS :: checkpoint</name>
<packaging>jar</packaging>

<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-streams-commons</artifactId>
<exclusions>
<exclusion>
<groupId>com.google.auto.service</groupId>
<artifactId>auto-service</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-streams-db-operator</artifactId>
<exclusions>
<exclusion>
<groupId>com.google.auto.service</groupId>
<artifactId>auto-service</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>com.google.auto.service</groupId>
<artifactId>auto-service</artifactId>
<optional>true</optional>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.streams.checkpoint.db;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.rocketmq.streams.common.channel.source.ISource;
import org.apache.rocketmq.streams.common.checkpoint.AbstractCheckPointStorage;
import org.apache.rocketmq.streams.common.checkpoint.CheckPoint;
import org.apache.rocketmq.streams.common.checkpoint.CheckPointManager;
import org.apache.rocketmq.streams.common.checkpoint.SourceSnapShot;
import org.apache.rocketmq.streams.db.driver.orm.ORMUtil;

import java.util.List;

/**
* @description
*/
public class DBCheckPointStorage extends AbstractCheckPointStorage {

static final Log logger = LogFactory.getLog(DBCheckPointStorage.class);

static final String STORAGE_NAME = "DB";

public DBCheckPointStorage(){

}

@Override
public String getStorageName() {
return STORAGE_NAME;
}

@Override
public <T> void save(List<T> checkPointState) {
logger.info(String.format("save checkpoint size %d", checkPointState.size()));
ORMUtil.batchReplaceInto(checkPointState);
}

@Override
//todo
public CheckPoint recover(ISource iSource, String queueId) {
String sourceName = CheckPointManager.createSourceName(iSource, null);
String key = CheckPointManager.createCheckPointKey(sourceName, queueId);
String sql = "select * from source_snap_shot where `key` = " + "'" + key + "';";
SourceSnapShot snapShot = ORMUtil.queryForObject(sql, null, SourceSnapShot.class);

logger.info(String.format("checkpoint recover key is %s, sql is %s, recover sourceSnapShot : %s", key, sql, snapShot == null ? "null snapShot" : snapShot.toString()));
return new CheckPoint().fromSnapShot(snapShot);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -227,4 +227,14 @@ public void setJsonData(Boolean isJsonData) {
create();
((AbstractSource)source).setJsonData(isJsonData);
}

@Override
public String getTopic(){
return source.getTopic();
}

@Override
public void setTopic(String topic){
source.setTopic(topic);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,4 +74,9 @@ protected boolean startSource() {
}
};
}

@Override
public String createCheckPointName() {
return "memory-source";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
import org.apache.rocketmq.streams.common.channel.source.ISource;
import org.apache.rocketmq.streams.common.channel.split.ISplit;
import org.apache.rocketmq.streams.common.checkpoint.CheckPointManager;
import org.apache.rocketmq.streams.common.checkpoint.CheckPointManager.SourceState;
import org.apache.rocketmq.streams.common.checkpoint.CheckPointMessage;
import org.apache.rocketmq.streams.common.checkpoint.SourceState;
import org.apache.rocketmq.streams.common.configurable.BasedConfigurable;
import org.apache.rocketmq.streams.common.configurable.IConfigurableIdentification;
import org.apache.rocketmq.streams.common.context.IMessage;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.rocketmq.streams.common.context.UserDefinedMessage;
import org.apache.rocketmq.streams.common.interfaces.IStreamOperator;
import org.apache.rocketmq.streams.common.topology.builder.PipelineBuilder;
import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
import org.apache.rocketmq.streams.common.utils.StringUtil;

/**
Expand Down Expand Up @@ -75,6 +76,7 @@ public abstract class AbstractSource extends BasedConfigurable implements ISourc

protected List<String> logFingerprintFields;//log fingerprint to filter msg quickly


/**
* 数据源投递消息的算子,此算子用来接收source的数据,做处理
*/
Expand Down Expand Up @@ -223,18 +225,22 @@ public JSONObject create(String message) {
return createJson(message);
}


/**
* 交给receiver执行后续逻辑
*
* @param channelMessage
* @return
*/
public AbstractContext executeMessage(Message channelMessage) {
AbstractContext context = new Context(channelMessage);
if (!channelMessage.getHeader().isSystemMessage()) {
messageQueueChangedCheck(channelMessage.getHeader());
}

if (isSplitInRemoving(channelMessage)) {
return context;
}
if (!channelMessage.getHeader().isSystemMessage()) {
messageQueueChangedCheck(channelMessage.getHeader());
}

boolean needFlush = !channelMessage.getHeader().isSystemMessage() && channelMessage.getHeader().isNeedFlush();
boolean needFlush = channelMessage.getHeader().isSystemMessage() == false && channelMessage.getHeader().isNeedFlush();

if (receiver != null) {
receiver.doMessage(channelMessage, context);
Expand Down Expand Up @@ -277,6 +283,9 @@ protected boolean isSplitInRemoving(Message channelMessage) {
* @param header
*/
protected void messageQueueChangedCheck(MessageHeader header) {
if (supportNewSplitFind() && supportRemoveSplitFind()) {
return;
}
Set<String> queueIds = new HashSet<>();
String msgQueueId = header.getQueueId();
if (StringUtil.isNotEmpty(msgQueueId)) {
Expand All @@ -287,7 +296,7 @@ protected void messageQueueChangedCheck(MessageHeader header) {
queueIds.addAll(checkpointQueueIds);
}
Set<String> newQueueIds = new HashSet<>();

Set<String> removeQueueIds = new HashSet<>();
for (String queueId : queueIds) {
if (isNotDataSplit(queueId)) {
continue;
Expand Down Expand Up @@ -536,4 +545,34 @@ public boolean isBatchMessage() {
return isBatchMessage;
}

@Override
public String createCheckPointName(){

ISource source = this;

String namespace = source.getNameSpace();
String name = source.getConfigureName();
String groupName = source.getGroupName();


if(StringUtil.isEmpty(namespace)){
namespace = "default_namespace";
}

if(StringUtil.isEmpty(name)){
name = "default_name";
}

if(StringUtil.isEmpty(groupName)){
groupName = "default_groupName";
}
String topic = source.getTopic();
if(topic == null || topic.trim().length() == 0){
topic = "default_topic";
}
return MapKeyUtil.createKey(namespace, groupName, topic, name);

}


}
Loading