Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix check failed issue #102

Merged
merged 1 commit into from
Dec 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 17 additions & 17 deletions README-chinese.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,15 @@ StreamBuilder 用于构建流任务的源; 内部包含```dataStream()```和``
DataStreamSource 是分段式编程的源头类,用于对接各种数据源, 从各大消息队列中获取数据;

+ ```fromFile``` 从文件中读取数据, 该方法包含俩个参数
+ ```filePath``` 文件路径,必填参数
+ ```isJsonData``` 是否json数据, 非必填参数, 默认为```true```
+ ```filePath``` 文件路径,必填参数
+ ```isJsonData``` 是否json数据, 非必填参数, 默认为```true```


+ ```fromRocketmq``` 从rocketmq中获取数据,包含四个参数
+ ```topic``` rocketmq消息队列的topic名称,必填参数
+ ```groupName``` 消费者组的名称,必填参数
+ ```isJson``` 是否json格式,非必填参数
+ ```tags``` rocketmq消费的tags值,用于过滤消息,非必填参数
+ ```topic``` rocketmq消息队列的topic名称,必填参数
+ ```groupName``` 消费者组的名称,必填参数
+ ```isJson``` 是否json格式,非必填参数
+ ```tags``` rocketmq消费的tags值,用于过滤消息,非必填参数

+ ```from``` 自定义的数据源, 通过实现ISource接口实现自己的数据源

Expand All @@ -80,12 +80,12 @@ DataStream实现了一系列常见的流计算算子
+ ```toRocketmq``` 将结果输出到rocketmq
+ ```to``` 将结果经过自定义的ISink接口输出到指定的存储
+ ```window``` 在窗口内进行相关的统计分析,一般会与```groupBy```连用, ```window()```用来定义窗口的大小, ```groupBy()```用来定义统计分析的主key,可以指定多个
+ ```count``` 在窗口内计数
+ ```min``` 获取窗口内统计值的最小值
+ ```max``` 获取窗口内统计值得最大值
+ ```avg``` 获取窗口内统计值的平均值
+ ```sum``` 获取窗口内统计值的加和值
+ ```reduce``` 在窗口内进行自定义的汇总运算
+ ```count``` 在窗口内计数
+ ```min``` 获取窗口内统计值的最小值
+ ```max``` 获取窗口内统计值得最大值
+ ```avg``` 获取窗口内统计值的平均值
+ ```sum``` 获取窗口内统计值的加和值
+ ```reduce``` 在窗口内进行自定义的汇总运算
+ ```join``` 根据条件将将俩个流进行关联, 合并为一个大流进行相关的运算
+ ```union``` 将俩个流进行合并
+ ```split``` 将一个数据流按照标签进行拆分,分为不同的数据流供下游进行分析计算
Expand Down Expand Up @@ -115,11 +115,11 @@ Rocketmq-Streams 核心就是一个独立的jar包, 用户可以在自己的
1. 通过```mvn clean install``` 构建工程
2. 从```rocketmq-streams-runner/target/rocket-streams-1.0.0-SNAPSHOT-distribution.tar.gz``` 中获取tar.gz包, 并解压
3. ```rocketmq-streams```目录架构如下:
+ ```bin``` 指令目录,包括启动和停止指令
+ ```conf``` 配置目录,包括日志配置以及应用的相关配置文件
+ ```jobs``` 任务目录, 独立打包后的rocketmq-streams jar包
+ ```lib``` 依赖包
+ ```log``` 日志目录
+ ```bin``` 指令目录,包括启动和停止指令
+ ```conf``` 配置目录,包括日志配置以及应用的相关配置文件
+ ```jobs``` 任务目录, 独立打包后的rocketmq-streams jar包
+ ```lib``` 依赖包
+ ```log``` 日志目录

### 发布应用
用户依赖rocketmq-streams,开发流处理程序,独立打包后, 将jar包拷贝到jobs目录, 通过指令即可完成任务的启动和运行;可以通过启动多个独立的应用程序;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.util.List;
import java.util.Locale;
import java.util.Set;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.rocketmq.streams.common.channel.IChannel;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@
package org.apache.rocketmq.streams.db.sink;

import com.alibaba.fastjson.JSONObject;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.rocketmq.streams.common.channel.IChannel;
Expand All @@ -38,11 +42,6 @@
import org.apache.rocketmq.streams.db.sink.sqltemplate.ISqlTemplate;
import org.apache.rocketmq.streams.db.sink.sqltemplate.SqlTemplateFactory;

import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;

/**
* @description enhance db sink, support atomic sink and multiple sink
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,10 @@
*/
package org.apache.rocketmq.streams.db.sink.sqltemplate;

import org.apache.rocketmq.streams.common.metadata.MetaData;
import org.apache.rocketmq.streams.common.utils.SQLUtil;

import java.util.List;
import java.util.Map;
import org.apache.rocketmq.streams.common.metadata.MetaData;
import org.apache.rocketmq.streams.common.utils.SQLUtil;

/**
* @description create insert into sql
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,10 @@
*/
package org.apache.rocketmq.streams.db.sink.sqltemplate;

import org.apache.rocketmq.streams.common.metadata.MetaData;
import org.apache.rocketmq.streams.common.utils.SQLUtil;

import java.util.List;
import java.util.Map;
import org.apache.rocketmq.streams.common.metadata.MetaData;
import org.apache.rocketmq.streams.common.utils.SQLUtil;

/**
* @author zengyu.cw
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,8 @@
*/
package org.apache.rocketmq.streams.db.sink.sqltemplate;

import org.apache.rocketmq.streams.common.metadata.MetaData;

import java.util.Arrays;
import org.apache.rocketmq.streams.common.metadata.MetaData;

/**
* @description
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,14 @@
package org.apache.rocketmq.streams.db.sink.db;

import com.alibaba.fastjson.JSONObject;
import java.util.ArrayList;
import java.util.List;
import org.apache.rocketmq.streams.common.metadata.MetaData;
import org.apache.rocketmq.streams.db.sink.sqltemplate.MysqlInsertIgnoreIntoSqlTemplate;
import org.apache.rocketmq.streams.db.sink.sqltemplate.MysqlInsertIntoSqlTemplate;
import org.apache.rocketmq.streams.db.sink.sqltemplate.MysqlInsertIntoWithDuplicateKeySqlTemplate;
import org.junit.Test;

import java.util.ArrayList;
import java.util.List;

/**
* @description
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.alibaba.fastjson.JSONObject;
import com.google.auto.service.AutoService;
import java.util.Properties;
import org.apache.rocketmq.streams.common.channel.builder.AbstractSupportShuffleChannelBuilder;
import org.apache.rocketmq.streams.common.channel.builder.IChannelBuilder;
import org.apache.rocketmq.streams.common.channel.sink.ISink;
Expand All @@ -26,8 +27,6 @@
import org.apache.rocketmq.streams.common.model.ServiceName;
import org.apache.rocketmq.streams.common.utils.ConfigurableUtil;

import java.util.Properties;

@AutoService(IChannelBuilder.class)
@ServiceName(value = ESSinkBuilder.TYPE, aliasName = "elasticsearch")
public class ESSinkBuilder extends AbstractSupportShuffleChannelBuilder {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
*/
package org.apache.rocketmq.streams.es.sink;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.http.HttpHost;
Expand All @@ -34,11 +38,6 @@
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

public class ESSinkOnlyChannel extends AbstractSink {
private static final Log LOG = LogFactory.getLog(ESSinkOnlyChannel.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,9 @@
*/
package org.apache.rocketmq.streams.db.sink.es;

import com.alibaba.fastjson.JSONObject;
import java.util.ArrayList;
import java.util.List;

import com.alibaba.fastjson.JSONObject;

import org.apache.rocketmq.streams.common.channel.sink.ISink;
import org.apache.rocketmq.streams.common.context.IMessage;
import org.apache.rocketmq.streams.common.context.Message;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@

import com.alibaba.fastjson.JSONObject;
import com.google.auto.service.AutoService;

import java.util.Properties;

import org.apache.rocketmq.streams.common.channel.builder.AbstractSupportShuffleChannelBuilder;
import org.apache.rocketmq.streams.common.channel.builder.IChannelBuilder;
import org.apache.rocketmq.streams.common.channel.sink.ISink;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.client.consumer.store.OffsetStore;
import org.apache.rocketmq.client.consumer.store.ReadOffsetType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,16 @@
package org.apache.rocketmq.streams.source;

import com.alibaba.fastjson.JSONObject;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
Expand Down Expand Up @@ -49,17 +59,6 @@
import org.apache.rocketmq.streams.queue.RocketMQMessageQueue;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;

public class RocketMQSource extends AbstractSupportShuffleSource {

protected static final Log LOG = LogFactory.getLog(RocketMQSource.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
*/
package org.apache.rocketmq.streams.syslog;

import org.apache.rocketmq.streams.common.channel.IChannel;

public interface ISyslogRouter {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,16 @@
*/
package org.apache.rocketmq.streams.syslog;

import com.alibaba.fastjson.JSONObject;
import java.net.URLDecoder;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;

import com.alibaba.fastjson.JSONObject;

import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.rocketmq.streams.common.channel.AbstractChannel;
import org.apache.rocketmq.streams.common.channel.sink.AbstractSink;
import org.apache.rocketmq.streams.common.channel.sink.ISink;
Expand All @@ -36,10 +38,6 @@
import org.apache.rocketmq.streams.common.context.MessageOffset;
import org.apache.rocketmq.streams.common.utils.IPUtil;
import org.apache.rocketmq.streams.common.utils.StringUtil;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.graylog2.syslog4j.Syslog;
import org.graylog2.syslog4j.SyslogConfigIF;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package org.apache.rocketmq.streams.syslog;

import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.rocketmq.streams.common.component.ComponentCreator;
import org.apache.rocketmq.streams.common.utils.StringUtil;
import org.graylog2.syslog4j.SyslogConstants;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,10 @@
*/
package org.apache.rocketmq.streams.syslog;

import java.util.Date;
import org.apache.rocketmq.streams.common.utils.DateUtil;
import org.apache.rocketmq.streams.common.utils.IPUtil;

import java.util.Date;

public class SyslogParser {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,24 @@
*/
package org.apache.rocketmq.streams.syslog;

import com.alibaba.fastjson.JSONObject;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;

import com.alibaba.fastjson.JSONObject;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.rocketmq.streams.common.cache.softreference.impl.SoftReferenceCache;
import org.apache.rocketmq.streams.common.channel.source.AbstractUnreliableSource;
import org.apache.rocketmq.streams.common.configurable.annotation.ENVDependence;
import org.apache.rocketmq.streams.common.context.UserDefinedMessage;
import org.apache.rocketmq.streams.common.interfaces.IStreamOperator;
import org.apache.rocketmq.streams.common.context.AbstractContext;
import org.apache.rocketmq.streams.common.context.IMessage;
import org.apache.rocketmq.streams.common.context.UserDefinedMessage;
import org.apache.rocketmq.streams.common.interfaces.IStreamOperator;
import org.apache.rocketmq.streams.common.utils.DateUtil;
import org.apache.rocketmq.streams.common.utils.IPUtil;
import org.apache.rocketmq.streams.common.utils.StringUtil;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.graylog2.syslog4j.server.SyslogServerConfigIF;
import org.graylog2.syslog4j.server.SyslogServerEventIF;
import org.graylog2.syslog4j.server.SyslogServerIF;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,13 @@
*/
package org.apache.rocketmq.streams.syslog;

import java.util.Date;

import com.alibaba.fastjson.JSONObject;

import org.apache.rocketmq.streams.common.context.Message;
import org.apache.rocketmq.streams.syslog.SyslogChannel;
import org.apache.rocketmq.streams.common.interfaces.IStreamOperator;
import java.util.Date;
import org.apache.rocketmq.streams.common.channel.IChannel;
import org.apache.rocketmq.streams.common.context.AbstractContext;
import org.apache.rocketmq.streams.common.context.IMessage;
import org.apache.rocketmq.streams.common.channel.IChannel;
import org.apache.rocketmq.streams.common.context.Message;
import org.apache.rocketmq.streams.common.interfaces.IStreamOperator;
import org.apache.rocketmq.streams.common.utils.DateUtil;
import org.apache.rocketmq.streams.common.utils.IPUtil;
import org.junit.Test;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,16 @@
*/
package org.apache.rocketmq.streams.checkpoint.db;

import com.google.auto.service.AutoService;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.rocketmq.streams.common.channel.source.ISource;
import org.apache.rocketmq.streams.common.checkpoint.AbstractCheckPointStorage;
import org.apache.rocketmq.streams.common.checkpoint.CheckPoint;
import org.apache.rocketmq.streams.common.checkpoint.CheckPointManager;
import org.apache.rocketmq.streams.common.checkpoint.ICheckPointStorage;
import org.apache.rocketmq.streams.common.checkpoint.SourceSnapShot;
import org.apache.rocketmq.streams.db.driver.orm.ORMUtil;

import java.util.List;

/**
* @description
*/
Expand Down
Loading