diff --git a/.gitignore b/.gitignore
index 1a9bb4f6..b4ff0d6b 100644
--- a/.gitignore
+++ b/.gitignore
@@ -2,6 +2,7 @@ target/
.DS_Store
!.mvn/wrapper/maven-wrapper.jar
*.versionsBackup
+.gradle/
### STS ###
.apt_generated
diff --git a/build_without_test.sh b/build_without_test.sh
new file mode 100755
index 00000000..584c56bf
--- /dev/null
+++ b/build_without_test.sh
@@ -0,0 +1 @@
+mvn clean package -Dmaven.test.skip=true
diff --git a/pom.xml b/pom.xml
index c2db4ebe..3eaad960 100644
--- a/pom.xml
+++ b/pom.xml
@@ -50,6 +50,7 @@
rocketmq-streams-channel-rocketmq
rocketmq-streams-channel-db
rocketmq-streams-channel-http
+ rocketmq-streams-examples
@@ -173,6 +174,11 @@
rocketmq-streams-commons
${project.version}
+
+ org.apache.rocketmq
+ rocketmq-streams-clients
+ ${project.version}
+
org.apache.rocketmq
rocketmq-streams-configurable
diff --git a/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/source/RocketMQSource.java b/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/source/RocketMQSource.java
index e496f3e1..d4b9b04c 100644
--- a/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/source/RocketMQSource.java
+++ b/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/source/RocketMQSource.java
@@ -150,7 +150,6 @@ protected DefaultMQPushConsumer startConsumer() {
});
setOffsetStore(consumer);
- addRebalanceCallback(consumer);
consumer.start();
return consumer;
diff --git a/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/source/DataStreamSource.java b/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/source/DataStreamSource.java
index 48f23a79..a40fff96 100644
--- a/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/source/DataStreamSource.java
+++ b/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/source/DataStreamSource.java
@@ -18,7 +18,9 @@
package org.apache.rocketmq.streams.client.source;
import com.google.common.collect.Sets;
+
import java.util.Set;
+
import org.apache.rocketmq.streams.client.transform.DataStream;
import org.apache.rocketmq.streams.common.channel.impl.file.FileSource;
import org.apache.rocketmq.streams.common.channel.source.ISource;
@@ -49,20 +51,21 @@ public DataStream fromFile(String filePath, Boolean isJsonData) {
return new DataStream(this.mainPipelineBuilder, this.otherPipelineBuilders, null);
}
- public DataStream fromRocketmq(String topic, String groupName) {
- return fromRocketmq(topic, groupName, null, false);
+ public DataStream fromRocketmq(String topic, String groupName, String namesrvAddress) {
+ return fromRocketmq(topic, groupName, false, namesrvAddress);
}
- public DataStream fromRocketmq(String topic, String groupName, boolean isJson) {
- return fromRocketmq(topic, groupName, null, isJson);
+ public DataStream fromRocketmq(String topic, String groupName, boolean isJson, String namesrvAddress) {
+ return fromRocketmq(topic, groupName, "*", isJson, namesrvAddress);
}
- public DataStream fromRocketmq(String topic, String groupName, String tags, boolean isJson) {
+ public DataStream fromRocketmq(String topic, String groupName, String tags, boolean isJson, String namesrvAddress) {
RocketMQSource rocketMQSource = new RocketMQSource();
rocketMQSource.setTopic(topic);
rocketMQSource.setTags(tags);
rocketMQSource.setGroupName(groupName);
rocketMQSource.setJsonData(isJson);
+ rocketMQSource.setNamesrvAddr(namesrvAddress);
this.mainPipelineBuilder.setSource(rocketMQSource);
return new DataStream(this.mainPipelineBuilder, this.otherPipelineBuilders, null);
}
diff --git a/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/DataStreamTest.java b/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/DataStreamTest.java
index 9ba44f1a..6af3ee8f 100644
--- a/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/DataStreamTest.java
+++ b/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/DataStreamTest.java
@@ -18,11 +18,8 @@
package org.apache.rocketmq.streams.client;
import com.alibaba.fastjson.JSONObject;
-import netscape.javascript.JSObject;
import org.apache.rocketmq.streams.client.source.DataStreamSource;
import org.apache.rocketmq.streams.client.strategy.CheckpointStrategy;
-import org.apache.rocketmq.streams.client.strategy.StateStrategy;
-import org.apache.rocketmq.streams.client.strategy.Strategy;
import org.apache.rocketmq.streams.client.transform.window.Time;
import org.apache.rocketmq.streams.client.transform.window.TumblingWindow;
import org.apache.rocketmq.streams.common.functions.MapFunction;
@@ -45,42 +42,40 @@ public void init() {
@Test
public void testFromFile() {
dataStream
- .fromFile("/Users/junjie.cheng/text.txt", false)
- .map(message -> message + "--")
- .toPrint(1)
- .start();
+ .fromFile("/Users/junjie.cheng/text.txt", false)
+ .map(message -> message + "--")
+ .toPrint(1)
+ .start();
}
@Test
public void testRocketmq() {
-
-
DataStreamSource dataStream = StreamBuilder.dataStream("test_namespace", "graph_pipeline");
dataStream
- .fromRocketmq("TOPIC_EVENT_SAS_SECURITY_EVENT", "111")
- .map(message -> message + "--")
- .toPrint(1)
- .start();
+ .fromRocketmq("topic_xxxx01", "consumer_xxxx01", "127.0.0.1:9876")
+ .map(message -> message + "--")
+ .toPrint(1)
+ .start();
}
@Test
public void testDBCheckPoint() {
dataStream
- .fromRocketmq("TSG_META_INFO", "")
- .map(message -> message + "--")
- .toPrint(1)
- .with(CheckpointStrategy.db("", "", "", 0L))
- .start();
+ .fromRocketmq("topic_xxxx02", "consumer_xxxx02", "127.0.0.1:9876")
+ .map(message -> message + "--")
+ .toPrint(1)
+ .with(CheckpointStrategy.db("", "", "", 0L))
+ .start();
}
@Test
public void testFileCheckPoint() {
dataStream
- .fromRocketmq("TSG_META_INFO", "")
- .map(message -> message + "--")
- .toPrint(1)
- .with(CheckpointStrategy.mem(0L))
- .start();
+ .fromFile("/Users/junjie.cheng/text.txt", false)
+ .map(message -> message + "--")
+ .toPrint(1)
+ .with(CheckpointStrategy.mem(0L))
+ .start();
}
@@ -88,33 +83,33 @@ public void testFileCheckPoint() {
public void testWindow() {
DataStreamSource dataStream = StreamBuilder.dataStream("test_namespace", "graph_pipeline");
dataStream
- .fromRocketmq("TSG_META_INFO", "")
- .map(new MapFunction() {
-
- @Override
- public JSONObject map(String message) throws Exception {
- JSONObject msg=JSONObject.parseObject(message);
- return msg;
- }
- })
- .window(TumblingWindow.of(Time.seconds(5)))
- .groupBy("name","age")
- .count("c")
- .sum("score","scoreValue")
- .toDataSteam()
- .toPrint(1)
- .with(CheckpointStrategy.db("","","",1000L))
- .start();
+ .fromRocketmq("topic_xxxx03", "consumer_xxxx03", "127.0.0.1:9876")
+ .map(new MapFunction() {
+
+ @Override
+ public JSONObject map(String message) throws Exception {
+ JSONObject msg = JSONObject.parseObject(message);
+ return msg;
+ }
+ })
+ .window(TumblingWindow.of(Time.seconds(5)))
+ .groupBy("name", "age")
+ .count("c")
+ .sum("score", "scoreValue")
+ .toDataSteam()
+ .toPrint(1)
+ .with(CheckpointStrategy.db("", "", "", 1000L))
+ .start();
}
@Test
public void testBothStrategy() {
dataStream
- .fromRocketmq("TSG_META_INFO", "")
- .map(message -> message + "--")
- .toPrint(1)
- .with()
- .start();
+ .fromRocketmq("topic_xxxx04", "consumer_xxxx04", "127.0.0.1:9876")
+ .map(message -> message + "--")
+ .toPrint(1)
+ .with()
+ .start();
}
@Test
diff --git a/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/windows/WindowFromMetaq.java b/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/windows/WindowFromMetaq.java
index a93352a4..e890a96e 100644
--- a/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/windows/WindowFromMetaq.java
+++ b/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/windows/WindowFromMetaq.java
@@ -56,7 +56,7 @@ public void testWindowToMetaq() throws InterruptedException {
protected DataStream createSourceDataStream(){
return StreamBuilder.dataStream("namespace", "name1")
- .fromRocketmq(topic,"chris1",true);
+ .fromRocketmq(topic,"chris1","");
}
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/AbstractChannel.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/AbstractChannel.java
index f5ff47e4..035a99d6 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/AbstractChannel.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/AbstractChannel.java
@@ -93,6 +93,16 @@ protected void setJsonObject(JSONObject jsonObject) {
jsonObject.put("source", Base64Utils.encode(InstantiationUtil.serializeObject(source)));
}
+ @Override
+ public void removeSplit(Set splitIds) {
+ source.removeSplit(splitIds);
+ }
+
+ @Override
+ public void addNewSplit(Set splitIds) {
+ source.addNewSplit(splitIds);
+ }
+
@Override
public Map getFinishedQueueIdAndOffsets(CheckPointMessage checkPointMessage) {
return sink.getFinishedQueueIdAndOffsets(checkPointMessage);
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/source/ISource.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/source/ISource.java
index 4b9f55fe..2aef6006 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/source/ISource.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/source/ISource.java
@@ -74,4 +74,8 @@ public interface ISource extends IConfigurable, IStageBuilder
*/
long getCheckpointTime();
+ void removeSplit(Set splitIds);
+
+ void addNewSplit(Set splitIds);
+
}
diff --git a/rocketmq-streams-examples/pom.xml b/rocketmq-streams-examples/pom.xml
new file mode 100644
index 00000000..ee6bbf63
--- /dev/null
+++ b/rocketmq-streams-examples/pom.xml
@@ -0,0 +1,50 @@
+
+
+
+
+ rocketmq-streams
+ org.apache.rocketmq
+ 1.0.0-SNAPSHOT
+
+ 4.0.0
+
+ rocketmq-streams-examples
+ ROCKETMQ STREAMS :: examples
+
+
+ org.apache.rocketmq
+ rocketmq-streams-clients
+
+
+ ch.qos.logback
+ logback-classic
+
+
+
+
+ jar
+
+
+ UTF-8
+ ${file_encoding}
+ 8
+ 8
+
+
\ No newline at end of file
diff --git a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/filesource/FileSourceExample.java b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/filesource/FileSourceExample.java
new file mode 100644
index 00000000..d568d5ff
--- /dev/null
+++ b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/filesource/FileSourceExample.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.examples.filesource;
+
+import org.apache.rocketmq.streams.client.StreamBuilder;
+import org.apache.rocketmq.streams.client.source.DataStreamSource;
+
+public class FileSourceExample {
+ public static void main(String[] args) {
+ DataStreamSource source = StreamBuilder.dataStream("namespace", "pipeline");
+ source.fromFile("/your/file/path", false)
+ .map(message -> message)
+ .toPrint(1)
+ .start();
+ }
+}
diff --git a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketMQSourceExample1.java b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketMQSourceExample1.java
new file mode 100644
index 00000000..c2e6bd13
--- /dev/null
+++ b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketMQSourceExample1.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.examples.rocketmqsource;
+
+import org.apache.rocketmq.streams.client.StreamBuilder;
+import org.apache.rocketmq.streams.client.source.DataStreamSource;
+
+public class RocketMQSourceExample1 {
+ public static void main(String[] args) {
+ DataStreamSource source = StreamBuilder.dataStream("namespace", "pipeline");
+
+ source.fromRocketmq(
+ RocketMQSourceExample2.RMQ_TOPIC,
+ RocketMQSourceExample2.RMQ_CONSUMER_GROUP_NAME,
+ RocketMQSourceExample2.NAMESRV_ADDRESS
+ )
+ .map(message -> message)
+ .toPrint(1)
+ .start();
+
+ }
+}
diff --git a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketMQSourceExample2.java b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketMQSourceExample2.java
new file mode 100644
index 00000000..93f13c3c
--- /dev/null
+++ b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketMQSourceExample2.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.examples.rocketmqsource;
+
+import org.apache.rocketmq.streams.client.StreamBuilder;
+import org.apache.rocketmq.streams.client.source.DataStreamSource;
+import org.apache.rocketmq.streams.source.RocketMQSource;
+
+public class RocketMQSourceExample2 {
+ public static final String NAMESRV_ADDRESS = "127.0.0.1:9876";
+ public static final String RMQ_TOPIC = "topic_tiger_0822_01";
+ public static final String RMQ_CONSUMER_GROUP_NAME = "consumer_tiger_0822_01";
+ public static final String TAGS = "*";
+
+ public static void main(String[] args) {
+ DataStreamSource source = StreamBuilder.dataStream("namespace", "pipeline");
+
+ source.from(new RocketMQSource(
+ RMQ_TOPIC,
+ TAGS,
+ RMQ_CONSUMER_GROUP_NAME,
+ "",
+ NAMESRV_ADDRESS,
+ "",
+ "",
+ ""
+ ))
+ .map(message -> message)
+ .toPrint(1)
+ .start();
+
+ }
+}