Skip to content

Commit

Permalink
Merge pull request apache#44 from ni-ze/private/debug
Browse files Browse the repository at this point in the history
make a runnable example.
  • Loading branch information
duhenglucky authored Sep 2, 2021
2 parents 2e282c6 + 2f4a99d commit c4a069d
Show file tree
Hide file tree
Showing 14 changed files with 125 additions and 158 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.client.consumer.store.OffsetStore;
Expand All @@ -36,9 +37,12 @@
public class RocketMQOffset implements OffsetStore {
protected OffsetStore offsetStore;
protected AbstractSupportOffsetResetSource source;
private AtomicBoolean starting;

public RocketMQOffset(OffsetStore offsetStore, AbstractSupportOffsetResetSource source){
this.offsetStore=offsetStore;
this.source=source;
this.starting = new AtomicBoolean(true);
}
@Override
public void load() throws MQClientException {
Expand Down Expand Up @@ -67,9 +71,16 @@ public void persist(MessageQueue mq) {

@Override
public void removeOffset(MessageQueue mq) {
Set<String> splitIds = new HashSet<>();
splitIds.add(new RocketMQMessageQueue(mq).getQueueId());
source.removeSplit(splitIds);
//todo 启动时第一次做rebalance时source中也没有原有消费mq,不做移除,做了会有副作用
//后续整个checkpoint机制都会调整成异步,整块代码都不会保留,目前为了整体跑通,不做修改。
if (starting.get()) {
starting.set(false);
} else {
Set<String> splitIds = new HashSet<>();
splitIds.add(new RocketMQMessageQueue(mq).getQueueId());
source.removeSplit(splitIds);
}

offsetStore.removeOffset(mq);
}

Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.Sets;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import org.apache.rocketmq.streams.client.DataStreamAction;
Expand Down Expand Up @@ -131,7 +133,7 @@ protected <T> T operate(IMessage message, AbstractContext context) {
for(T t:result){
Message subMessage=null;
if (result instanceof JSONObject) {
subMessage=new Message((JSONObject)result);
subMessage=new Message((JSONObject)t);
} else {
subMessage=new Message(new UserDefinedMessage(result));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;

import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.rocketmq.streams.common.channel.source.systemmsg.NewSplitMessage;
import org.apache.rocketmq.streams.common.channel.source.systemmsg.RemoveSplitMessage;
import org.apache.rocketmq.streams.common.channel.split.ISplit;
Expand Down Expand Up @@ -219,22 +221,18 @@ public JSONObject create(String message) {
return createJson(message);
}

/**
* 交给receiver执行后续逻辑
*
* @param channelMessage
* @return
*/

public AbstractContext executeMessage(Message channelMessage) {
AbstractContext context = new Context(channelMessage);
if (isSplitInRemoving(channelMessage)) {
return context;
}
if (!channelMessage.getHeader().isSystemMessage()) {
messageQueueChangedCheck(channelMessage.getHeader());
}

boolean needFlush = channelMessage.getHeader().isSystemMessage() == false && channelMessage.getHeader().isNeedFlush();
if (isSplitInRemoving(channelMessage)) {
return context;
}

boolean needFlush = !channelMessage.getHeader().isSystemMessage() && channelMessage.getHeader().isNeedFlush();

if (receiver != null) {
receiver.doMessage(channelMessage, context);
Expand Down Expand Up @@ -277,9 +275,6 @@ protected boolean isSplitInRemoving(Message channelMessage) {
* @param header
*/
protected void messageQueueChangedCheck(MessageHeader header) {
if (supportNewSplitFind() && supportRemoveSplitFind()) {
return;
}
Set<String> queueIds = new HashSet<>();
String msgQueueId = header.getQueueId();
if (StringUtil.isNotEmpty(msgQueueId)) {
Expand All @@ -290,7 +285,7 @@ protected void messageQueueChangedCheck(MessageHeader header) {
queueIds.addAll(checkpointQueueIds);
}
Set<String> newQueueIds = new HashSet<>();
Set<String> removeQueueIds = new HashSet<>();

for (String queueId : queueIds) {
if (isNotDataSplit(queueId)) {
continue;
Expand All @@ -306,22 +301,12 @@ protected void messageQueueChangedCheck(MessageHeader header) {
} else {

this.checkPointManager.updateLastUpdate(queueId);
//if(this.checkPointManager.isRemovedSplit(queueId)){
// this.checkPointManager.removeSplit(queueId);
// removeQueueIds.add(queueId);
//}else {
//
//}

}
}
}
//if(!supportRemoveSplitFind()){
// removeSplit(removeQueueIds);
//}
if (!supportNewSplitFind()) {
addNewSplit(newQueueIds);
}

addNewSplit(newQueueIds);
}

protected abstract boolean isNotDataSplit(String queueId);
Expand All @@ -343,12 +328,15 @@ public void removeSplit(Set<String> splitIds) {

}
}
public List<ISplit> getAllSplits(){

public List<ISplit> getAllSplits() {
return null;
}
public Map<String,List<ISplit>> getWorkingSplitsGroupByInstances(){

public Map<String, List<ISplit>> getWorkingSplitsGroupByInstances() {
return null;
}

public void addNewSplit(Set<String> splitIds) {
if (splitIds == null || splitIds.size() == 0) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,6 @@
*/
package org.apache.rocketmq.streams.common.checkpoint;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import org.apache.rocketmq.streams.common.channel.sinkcache.IMessageCache;
import org.apache.rocketmq.streams.common.channel.sinkcache.IMessageFlushCallBack;
import org.apache.rocketmq.streams.common.channel.sinkcache.impl.MessageCache;
Expand All @@ -32,6 +25,13 @@
import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
import org.apache.rocketmq.streams.common.utils.StringUtil;

import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;

public class CheckPointManager {
protected IMessageCache<CheckPointMessage> messageCache;
protected transient Map<String, Long> currentSplitAndLastUpdateTime = new HashMap<>();//保存这个实例处理的分片数
Expand Down Expand Up @@ -138,11 +138,7 @@ protected SourceState createSourceState(CheckPointMessage checkPointMessage) {
* @param sourceStateMap
*/
protected void saveCheckPoint(Map<String, SourceState> sourceStateMap) {
List<CheckPoint> checkPoints = new ArrayList<>();
for (SourceState sourceState : sourceStateMap.values()) {
CheckPoint checkPoint = new CheckPoint();
// checkPoint.setOffset();
}

}

/**
Expand Down Expand Up @@ -176,16 +172,7 @@ public void addCheckPointMessage(CheckPointMessage message) {
this.messageCache.addCache(message);
}

//public boolean isRemovedSplit(String queueId) {
// Long lastUpdateTime=this.currentSplitAndLastUpdateTime.get(queueId);
// if(lastUpdateTime==null){
// return false;
// }
// if(System.currentTimeMillis()-lastUpdateTime>10000*1000){
// return true;
// }
// return false;
//}


public void updateLastUpdate(String queueId) {
addSplit(queueId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@ public interface IConfigurable extends IJsonable, IConfigurableIdentification, S
/**
* 把toJson的结果当作一个特殊属性
*/
static final String JSON_PROPERTY = "configurable_json";
String JSON_PROPERTY = "configurable_json";

/**
* 把status当作configurable 的一个特殊属性
*/
static final String STATUS_PROPERTY = "configurable_status";
String STATUS_PROPERTY = "configurable_status";

/**
* 每个配置有一个独立的名字
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.rocketmq.streams.common.context;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -128,9 +129,7 @@ public void addSplitMessages(T... splitMessages) {
if (splitMessages == null) {
return;
}
for (T t : splitMessages) {
this.splitMessages.add(t);
}
this.splitMessages.addAll(Arrays.asList(splitMessages));
}

public void removeSpliteMessage(T message) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
*/
package org.apache.rocketmq.streams.common.functions;

public interface MapFunction<T, O> extends Function {
import java.io.Serializable;

public interface MapFunction<T, O> extends Function, Serializable {

T map(O message) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.rocketmq.streams.common.topology.builder;

import java.io.Serializable;
import org.apache.rocketmq.streams.common.channel.sink.ISink;
import org.apache.rocketmq.streams.common.channel.source.ISource;
import org.apache.rocketmq.streams.common.configurable.AbstractConfigurable;
Expand All @@ -30,13 +29,15 @@
import org.apache.rocketmq.streams.common.utils.NameCreatorUtil;
import org.apache.rocketmq.streams.common.utils.StringUtil;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class PipelineBuilder implements Serializable {
private static final long serialVersionUID = 1L;

/**
* 最终产出的pipeline
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.rocketmq.streams.common.configurable.BasedConfigurable;
import org.apache.rocketmq.streams.common.context.AbstractContext;
import org.apache.rocketmq.streams.common.context.IMessage;
import org.apache.rocketmq.streams.common.context.Message;
import org.apache.rocketmq.streams.common.interfaces.IStreamOperator;
import org.apache.rocketmq.streams.common.interfaces.ISystemMessageProcessor;
import org.apache.rocketmq.streams.common.optimization.SQLLogFingerprintFilter;
Expand All @@ -33,7 +34,7 @@
public abstract class AbstractStage<T extends IMessage> extends BasedConfigurable
implements IStreamOperator<T, T>, ISystemMessageProcessor {

private static final long serialVersionUID = -143202547707927632L;


private static final Log LOG = LogFactory.getLog(AbstractStage.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@
import org.apache.rocketmq.streams.common.topology.model.IStageHandle;
import org.apache.rocketmq.streams.common.topology.stages.AbstractStatelessChainStage;

import java.io.Serializable;

/**
* 给用户提供自定义的抽象类
*/
public abstract class StageBuilder extends AbstractStatelessChainStage<IMessage> implements IStageBuilder<ChainStage>, IAfterConfigurableRefreshListener {

public abstract class StageBuilder extends AbstractStatelessChainStage<IMessage> implements IStageBuilder<ChainStage>, Serializable, IAfterConfigurableRefreshListener {
private static final long serialVersionUID = 1L;
@Override
protected boolean initConfigurable() {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,18 @@

public class RocketMQSourceExample2 {
public static final String NAMESRV_ADDRESS = "127.0.0.1:9876";
public static final String RMQ_TOPIC = "topic_tiger_0822_01";
public static final String RMQ_CONSUMER_GROUP_NAME = "consumer_tiger_0822_01";
public static final String RMQ_TOPIC = "topic_tiger_0901_01";
public static final String RMQ_CONSUMER_GROUP_NAME = "test-group-10";
public static final String TAGS = "*";

public static void main(String[] args) {
DataStreamSource source = StreamBuilder.dataStream("namespace", "pipeline");

source.from(new RocketMQSource(
source.fromRocketmq(
RMQ_TOPIC,
TAGS,
RMQ_CONSUMER_GROUP_NAME,
NAMESRV_ADDRESS
))
false,
NAMESRV_ADDRESS)
.map(message -> message)
.toPrint(1)
.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@ public void testRelationExpression() {
jsonObject.put("vmip", "1.1.1.1");

boolean value =
ExpressionBuilder.executeExecute("namespace", "(ip,=,1.2.2.3)&((uid,=,1224)|(vmip,=,1.1.11.1))",
jsonObject);
ExpressionBuilder.executeExecute("namespace", "(ip,=,1.2.2.3)&((uid,=,1224)|(vmip,=,1.1.11.1))", jsonObject);
assertTrue(value);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ protected boolean initConfigurable() {
if (scriptOptimization.supportOptimize()) {
expressions = scriptOptimization.getScriptOptimizeExprssions();
}
FunctionScript functionScript = this;

//转化成istreamoperator 接口
for (IScriptExpression scriptExpression : expressions) {
receivers.add((message, context) -> {
Expand Down

0 comments on commit c4a069d

Please sign in to comment.