Skip to content

Commit

Permalink
Merge pull request apache#43 from yuanxiaodong/window
Browse files Browse the repository at this point in the history
test window exactly once success and fixed bugs
  • Loading branch information
duhenglucky authored Sep 1, 2021
2 parents f48ae62 + 33ddaf9 commit 2e282c6
Show file tree
Hide file tree
Showing 103 changed files with 103,012 additions and 3,309 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,6 @@ nbbuild/
dist/
nbdist/

dipper.cs
dipper.properties

22 changes: 19 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@
<module>rocketmq-streams-channel-rocketmq</module>
<module>rocketmq-streams-channel-db</module>
<module>rocketmq-streams-channel-http</module>
<module>rocketmq-streams-state</module>
<module>rocketmq-streams-examples</module>

</modules>

<properties>
Expand All @@ -60,7 +62,9 @@
<maven.test.skip>false</maven.test.skip>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>

<java.version>1.8</java.version>
<java.encoding>UTF-8</java.encoding>
<project.build.sourceEncoding>${java.encoding}</project.build.sourceEncoding>
<log4j.version>1.2.17</log4j.version>
<commons-logging.version>1.1</commons-logging.version>
<spring.version>3.2.13.RELEASE</spring.version>
Expand All @@ -82,7 +86,7 @@
<java-grok.version>0.1.9</java-grok.version>
<jython.version>2.7.0</jython.version>
<scala-library.version>2.12.4</scala-library.version>
<logback-core.version>1.2.2</logback-core.version>
<logback-core.version>1.2.3</logback-core.version>
<minio.version>3.0.10</minio.version>
</properties>

Expand All @@ -93,6 +97,9 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.18.1</version>
<configuration>
<skipTests>true</skipTests>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.rat</groupId>
Expand Down Expand Up @@ -180,6 +187,11 @@
<artifactId>rocketmq-streams-clients</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-streams-state</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-streams-configurable</artifactId>
Expand Down Expand Up @@ -443,7 +455,11 @@
<artifactId>logback-core</artifactId>
<version>${logback-core.version}</version>
</dependency>

<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>${logback-core.version}</version>
</dependency>
<dependency>
<groupId>io.minio</groupId>
<artifactId>minio</artifactId>
Expand Down
6 changes: 3 additions & 3 deletions rocketmq-streams-channel-rocketmq/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -69,17 +69,17 @@
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.7</version>
<version>1.7.26</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>jcl-over-slf4j</artifactId>
<version>1.7.7</version>
<version>1.7.26</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
<version>1.7.26</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,12 @@ public ISink createSink(String namespace, String name, Properties properties, Me
}

@Override
public ISink createBySource(ISource piplineSoure) {
return null;
public ISink createBySource(ISource pipelineSource) {
RocketMQSource source = (RocketMQSource)pipelineSource;
String topic = source.getTopic();
RocketMQSink sink = new RocketMQSink();
sink.setTopic(topic);
sink.setTags(source.getTags());
return sink;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,17 @@
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.client.consumer.store.OffsetStore;
import org.apache.rocketmq.client.consumer.store.ReadOffsetType;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.streams.common.channel.source.AbstractSupportOffsetResetSource;
import org.apache.rocketmq.streams.common.utils.ReflectUtil;
import org.apache.rocketmq.streams.debug.DebugWriter;
import org.apache.rocketmq.streams.queue.RocketMQMessageQueue;

public class RocketMQOffset implements OffsetStore {
Expand All @@ -48,22 +52,16 @@ public void updateOffset(MessageQueue mq, long offset, boolean increaseOnly) {

@Override
public long readOffset(MessageQueue mq, ReadOffsetType type) {
return offsetStore.readOffset(mq,type);
return offsetStore.readOffset(mq,type);
}

@Override
public void persistAll(Set<MessageQueue> mqs) {
Set<String> queueIds=new HashSet<>();
for(MessageQueue mq:mqs){
queueIds.add(new RocketMQMessageQueue(mq).getQueueId());
}
source.sendCheckpoint(queueIds);
offsetStore.persistAll(mqs);
}

@Override
public void persist(MessageQueue mq) {
source.sendCheckpoint(new RocketMQMessageQueue(mq).getQueueId());
offsetStore.persist(mq);
}

Expand All @@ -73,7 +71,6 @@ public void removeOffset(MessageQueue mq) {
splitIds.add(new RocketMQMessageQueue(mq).getQueueId());
source.removeSplit(splitIds);
offsetStore.removeOffset(mq);
offsetStore.removeOffset(mq);
}

@Override
Expand All @@ -84,6 +81,11 @@ public Map<MessageQueue, Long> cloneOffsetTable(String topic) {
@Override
public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
offsetStore.updateOffset(mq,offset,isOneway);
source.sendCheckpoint(new RocketMQMessageQueue(mq).getQueueId());
if(DebugWriter.isOpenDebug()){
ConcurrentMap<MessageQueue, AtomicLong>offsetTable=ReflectUtil.getDeclaredField(this.offsetStore,"offsetTable");
DebugWriter.getInstance(source.getTopic()).writeSaveOffset(mq,offsetTable.get(mq));
}
offsetStore.updateConsumeOffsetToBroker(mq,offset,isOneway);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package org.apache.rocketmq.streams.debug;

import com.alibaba.fastjson.JSONObject;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.streams.common.utils.DateUtil;
import org.apache.rocketmq.streams.common.utils.FileUtil;
import org.apache.rocketmq.streams.queue.RocketMQMessageQueue;

public class DebugWriter {


protected String dir="/tmp/rocksmq-streams/mq";
protected static Map<String,DebugWriter> debugWriterMap=new HashMap<>();
public static DebugWriter getInstance(String topic){
DebugWriter debugWriter=debugWriterMap.get(topic);
if(debugWriter==null){
debugWriter=new DebugWriter();
debugWriterMap.put(topic,debugWriter);
}
return debugWriter;
}

public static boolean isOpenDebug(){
return false;
}

public DebugWriter(){}
public DebugWriter(String dir){
this.dir=dir;
}

/**
* write offset 2 file
* @param offsets
*/
public void writeSaveOffset(Map<MessageQueue, AtomicLong> offsets){
if(isOpenDebug()==false){
return;
}
String path=dir+"/offsets/offset.txt";
if(offsets==null||offsets.size()==0){
return;
}
Iterator<Map.Entry<MessageQueue, AtomicLong>> it = offsets.entrySet().iterator();
List<String> rows=new ArrayList<>();
while(it.hasNext()){
Map.Entry<MessageQueue, AtomicLong> entry=it.next();
String queueId=new RocketMQMessageQueue(entry.getKey()).getQueueId();
JSONObject msg=new JSONObject();
Long offset=entry.getValue().get();
msg.put(queueId,offset);
msg.put("saveTime", DateUtil.getCurrentTimeString());
msg.put("queueId", queueId);
rows.add(msg.toJSONString());
}
FileUtil.write(path,rows,true);
}

public void writeSaveOffset(MessageQueue messageQueue, AtomicLong offset){
if(isOpenDebug()==false){
return;
}
Map<MessageQueue, AtomicLong> offsets=new HashMap<>();
offsets.put(messageQueue,offset);
writeSaveOffset(offsets);
}


public void receiveFirstData(String queueId,Long offset){
if(isOpenDebug()==false){
return;
}
Map<String,Long> offsets=load();
Long saveOffset=offsets.get(queueId);
System.out.println("queueId is "+queueId+"current offset "+offset+"===="+saveOffset);
}
/**
* load offsets
* @return
*/
public Map<String,Long> load(){
if(isOpenDebug()==false){
return null;
}
String path=dir+"/offsets/offset.txt";
List<String> lines=FileUtil.loadFileLine(path);
Map<String,Long> offsets=new HashMap<>();
for(String line:lines){
JSONObject row=JSONObject.parseObject(line);
String queueId=row.getString("queueId");
offsets.put(queueId,row.getLong(queueId));
}
return offsets;
}
}
Loading

0 comments on commit 2e282c6

Please sign in to comment.