Skip to content

Commit

Permalink
RocketMQSourceExample2 can run now, read form mq and print.
Browse files Browse the repository at this point in the history
  • Loading branch information
ni-ze committed Sep 2, 2021
1 parent 23d5192 commit 2f4a99d
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 101 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,14 @@ public void persist(MessageQueue mq) {

@Override
public void removeOffset(MessageQueue mq) {
Set<String> splitIds = new HashSet<>();
splitIds.add(new RocketMQMessageQueue(mq).getQueueId());

//todo 启动时第一次做rebalance时source中也没有原有消费mq,不做移除,做了会有副作用
//后续整个checkpoint机制都会调整成异步,整块代码都不会保留,目前为了整体跑通,不做修改。
if (!starting.get()) {
source.removeSplit(splitIds);
if (starting.get()) {
starting.set(false);
} else {
Set<String> splitIds = new HashSet<>();
splitIds.add(new RocketMQMessageQueue(mq).getQueueId());
source.removeSplit(splitIds);
}

offsetStore.removeOffset(mq);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

public class RocketMQSource extends AbstractSupportOffsetResetSource {
Expand Down Expand Up @@ -99,7 +100,7 @@ protected boolean initConfigurable() {
protected boolean startSource() {
try {
destroyConsumer();
consumer=startConsumer();
consumer = startConsumer();
return true;
} catch (Exception e) {
setInitSuccess(false);
Expand All @@ -115,7 +116,7 @@ protected DefaultMQPushConsumer startConsumer() {
consumer.setPullInterval(pullIntervalMs);
}

consumer.setPersistConsumerOffsetInterval((int)this.checkpointTime);
consumer.setPersistConsumerOffsetInterval((int) this.checkpointTime);
consumer.setConsumeMessageBatchMaxSize(maxFetchLogGroupSize);
consumer.setNamesrvAddr(this.namesrvAddr);
if (consumeFromWhere != null) {
Expand All @@ -124,10 +125,10 @@ protected DefaultMQPushConsumer startConsumer() {
consumer.setConsumeTimestamp(consumerOffset);
}
}
Map<String,Boolean>isFirstDataForQueue=new HashMap<>();
Map<String, Boolean> isFirstDataForQueue = new HashMap<>();
//consumer.setCommitOffsetWithPullRequestEnable(false);
consumer.subscribe(topic, tags);
consumer.registerMessageListener((MessageListenerOrderly)(msgs, context) -> {
consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> {
try {
int i = 0;
for (MessageExt msg : msgs) {
Expand All @@ -138,15 +139,15 @@ protected DefaultMQPushConsumer startConsumer() {
org.apache.rocketmq.streams.common.context.Message message = createMessage(jsonObject, queueId, offset, false);
message.getHeader().setOffsetIsLong(true);

if(DebugWriter.isOpenDebug()){
Boolean isFirstData=isFirstDataForQueue.get(queueId);
if(isFirstData==null){
synchronized (this){
isFirstData=isFirstDataForQueue.get(queueId);
if(isFirstData==null){
isFirstDataForQueue.put(queueId,true);
if (DebugWriter.isOpenDebug()) {
Boolean isFirstData = isFirstDataForQueue.get(queueId);
if (isFirstData == null) {
synchronized (this) {
isFirstData = isFirstDataForQueue.get(queueId);
if (isFirstData == null) {
isFirstDataForQueue.put(queueId, true);
}
DebugWriter.getInstance(getTopic()).receiveFirstData(queueId,msg.getQueueOffset());
DebugWriter.getInstance(getTopic()).receiveFirstData(queueId, msg.getQueueOffset());
}
}
}
Expand All @@ -169,7 +170,6 @@ protected DefaultMQPushConsumer startConsumer() {
});

setOffsetStore(consumer);
// addRebalanceCallback(consumer);
consumer.start();

return consumer;
Expand All @@ -179,15 +179,16 @@ protected DefaultMQPushConsumer startConsumer() {
throw new RuntimeException("start metaq channel error " + topic, e);
}
}

@Override
public List<ISplit> getAllSplits(){
public List<ISplit> getAllSplits() {
try {
List<ISplit> messageQueues=new ArrayList<>();
List<ISplit> messageQueues = new ArrayList<>();
if (messageQueues == null || messageQueues.size() == 0) {
Set<MessageQueue> metaqQueueSet = consumer.fetchSubscribeMessageQueues(this.topic);
Set<MessageQueue> metaqQueueSet = consumer.fetchSubscribeMessageQueues(this.topic);
for (MessageQueue queue : metaqQueueSet) {
RocketMQMessageQueue metaqMessageQueue = new RocketMQMessageQueue(queue);
if(isNotDataSplit(metaqMessageQueue.getQueueId())){
if (isNotDataSplit(metaqMessageQueue.getQueueId())) {
continue;
}

Expand All @@ -198,31 +199,31 @@ public List<ISplit> getAllSplits(){
return messageQueues;
} catch (MQClientException e) {
e.printStackTrace();
throw new RuntimeException("get all splits error ",e);
throw new RuntimeException("get all splits error ", e);
}
}


@Override
public Map<String,List<ISplit>> getWorkingSplitsGroupByInstances(){
public Map<String, List<ISplit>> getWorkingSplitsGroupByInstances() {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt();
defaultMQAdminExt.setVipChannelEnabled(false);
defaultMQAdminExt.setAdminExtGroup(UUID.randomUUID().toString());
defaultMQAdminExt.setInstanceName(this.consumer.getInstanceName());
try {
defaultMQAdminExt.start();
Map<MessageQueue, String> queue2Instances= getMessageQueueAllocationResult(defaultMQAdminExt,this.groupName);
Map<String,List<ISplit>> instanceOwnerQueues=new HashMap<>();
for(MessageQueue messageQueue:queue2Instances.keySet()){
RocketMQMessageQueue metaqMessageQueue = new RocketMQMessageQueue(new MessageQueue(messageQueue.getTopic(),messageQueue.getBrokerName(),messageQueue.getQueueId()));
if(isNotDataSplit(metaqMessageQueue.getQueueId())){
Map<MessageQueue, String> queue2Instances = getMessageQueueAllocationResult(defaultMQAdminExt, this.groupName);
Map<String, List<ISplit>> instanceOwnerQueues = new HashMap<>();
for (MessageQueue messageQueue : queue2Instances.keySet()) {
RocketMQMessageQueue metaqMessageQueue = new RocketMQMessageQueue(new MessageQueue(messageQueue.getTopic(), messageQueue.getBrokerName(), messageQueue.getQueueId()));
if (isNotDataSplit(metaqMessageQueue.getQueueId())) {
continue;
}
String instanceName=queue2Instances.get(messageQueue);
List<ISplit> splits=instanceOwnerQueues.get(instanceName);
if(splits==null){
splits=new ArrayList<>();
instanceOwnerQueues.put(instanceName,splits);
String instanceName = queue2Instances.get(messageQueue);
List<ISplit> splits = instanceOwnerQueues.get(instanceName);
if (splits == null) {
splits = new ArrayList<>();
instanceOwnerQueues.put(instanceName, splits);
}
splits.add(metaqMessageQueue);
}
Expand All @@ -234,21 +235,22 @@ public Map<String,List<ISplit>> getWorkingSplitsGroupByInstances(){
defaultMQAdminExt.shutdown();
}
}

protected Map<MessageQueue, String> getMessageQueueAllocationResult(DefaultMQAdminExt defaultMQAdminExt, String groupName) {
HashMap results = new HashMap();

try {
ConsumerConnection consumerConnection = defaultMQAdminExt.examineConsumerConnectionInfo(groupName);
Iterator var5 = consumerConnection.getConnectionSet().iterator();

while(var5.hasNext()) {
Connection connection = (Connection)var5.next();
while (var5.hasNext()) {
Connection connection = (Connection) var5.next();
String clientId = connection.getClientId();
ConsumerRunningInfo consumerRunningInfo = defaultMQAdminExt.getConsumerRunningInfo(groupName, clientId, false);
Iterator var9 = consumerRunningInfo.getMqTable().keySet().iterator();

while(var9.hasNext()) {
MessageQueue messageQueue = (MessageQueue)var9.next();
while (var9.hasNext()) {
MessageQueue messageQueue = (MessageQueue) var9.next();
results.put(messageQueue, clientId.split("@")[1]);
}
}
Expand All @@ -261,7 +263,7 @@ protected Map<MessageQueue, String> getMessageQueueAllocationResult(DefaultMQAdm

/**
* 设置offset存储,包装原有的RemoteBrokerOffsetStore,在保存offset前发送系统消息
*
* this method suggest to be removed, use check barrier to achieve checkpoint asynchronous.
* @param consumer
*/
protected void setOffsetStore(DefaultMQPushConsumer consumer) {
Expand All @@ -270,7 +272,7 @@ protected void setOffsetStore(DefaultMQPushConsumer consumer) {
consumer.changeInstanceNameToPID();
}
MQClientInstance mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(defaultMQPushConsumer.getDefaultMQPushConsumer());
RemoteBrokerOffsetStore offsetStore = new RemoteBrokerOffsetStore(mQClientFactory, NamespaceUtil.wrapNamespace(consumer.getNamespace(), consumer.getConsumerGroup())){
RemoteBrokerOffsetStore offsetStore = new RemoteBrokerOffsetStore(mQClientFactory, NamespaceUtil.wrapNamespace(consumer.getNamespace(), consumer.getConsumerGroup())) {

@Override
public void removeOffset(MessageQueue mq) {
Expand All @@ -283,22 +285,22 @@ public void removeOffset(MessageQueue mq) {

@Override
public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
sendCheckpoint(new RocketMQMessageQueue(mq).getQueueId());
if(DebugWriter.isOpenDebug()){
ConcurrentMap<MessageQueue, AtomicLong> offsetTable=ReflectUtil.getDeclaredField(this,"offsetTable");
DebugWriter.getInstance(getTopic()).writeSaveOffset(mq,offsetTable.get(mq));
if (DebugWriter.isOpenDebug()) {
ConcurrentMap<MessageQueue, AtomicLong> offsetTable = ReflectUtil.getDeclaredField(this, "offsetTable");
DebugWriter.getInstance(getTopic()).writeSaveOffset(mq, offsetTable.get(mq));
}
LOG.info("the queue Id is "+new RocketMQMessageQueue(mq).getQueueId()+",rocketmq start save offset,the save time is "+ DateUtil.getCurrentTimeString());
super.updateConsumeOffsetToBroker(mq,offset,isOneway);
LOG.info("the queue Id is " + new RocketMQMessageQueue(mq).getQueueId() + ",rocketmq start save offset,the save time is " + DateUtil.getCurrentTimeString());
super.updateConsumeOffsetToBroker(mq, offset, isOneway);
}
};
consumer.setOffsetStore(offsetStore);//每个一分钟运行一次
}

@Override
protected boolean isNotDataSplit(String queueId) {
return queueId.toUpperCase().startsWith("RETRY")||queueId.toUpperCase().startsWith("%RETRY%");
return queueId.toUpperCase().startsWith("RETRY") || queueId.toUpperCase().startsWith("%RETRY%");
}

@Override
Expand All @@ -318,7 +320,7 @@ public boolean supportOffsetRest() {

public void destroyConsumer() {
List<DefaultMQPushConsumer> oldConsumers = new ArrayList<>();
if(consumer!=null){
if (consumer != null) {
oldConsumers.add(consumer);
}
try {
Expand All @@ -333,31 +335,8 @@ public void destroyConsumer() {
}

}
public static void main(String[] args) throws InterruptedException {
RocketMQSource source=new RocketMQSource("TOPIC_DIPPER_SYSTEM_MSG_6",null,"fdsdf",null);
source.init();
source.start(new IStreamOperator() {
@Override public Object doMessage(IMessage message, AbstractContext context) {
// System.out.println(message.getMessageBody());
return null;
}
});
System.out.println(source.getAllSplits().size());
while (true){
Map<String,List<ISplit>> map=source.getWorkingSplitsGroupByInstances();
List<ISplit> ownerSplits=map.get(RuntimeUtil.getDipperInstanceId());
int count=0;
if(ownerSplits!=null){
count=ownerSplits.size();
}
int sum=0;
for(List<ISplit> splits:map.values()){
sum+=splits.size();
}
System.out.println(count+" "+sum);
Thread.sleep(1000);
}
}


@Override
public void destroy() {
super.destroy();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;

import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.rocketmq.streams.common.channel.source.systemmsg.NewSplitMessage;
import org.apache.rocketmq.streams.common.channel.source.systemmsg.RemoveSplitMessage;
import org.apache.rocketmq.streams.common.channel.split.ISplit;
Expand Down Expand Up @@ -219,21 +221,17 @@ public JSONObject create(String message) {
return createJson(message);
}

/**
* 交给receiver执行后续逻辑
*
* @param channelMessage
* @return
*/

public AbstractContext executeMessage(Message channelMessage) {
AbstractContext context = new Context(channelMessage);
if (isSplitInRemoving(channelMessage)) {
return context;
}
if (!channelMessage.getHeader().isSystemMessage()) {
messageQueueChangedCheck(channelMessage.getHeader());
}

if (isSplitInRemoving(channelMessage)) {
return context;
}

boolean needFlush = !channelMessage.getHeader().isSystemMessage() && channelMessage.getHeader().isNeedFlush();

if (receiver != null) {
Expand Down Expand Up @@ -277,9 +275,6 @@ protected boolean isSplitInRemoving(Message channelMessage) {
* @param header
*/
protected void messageQueueChangedCheck(MessageHeader header) {
if (supportNewSplitFind() && supportRemoveSplitFind()) {
return;
}
Set<String> queueIds = new HashSet<>();
String msgQueueId = header.getQueueId();
if (StringUtil.isNotEmpty(msgQueueId)) {
Expand All @@ -290,7 +285,7 @@ protected void messageQueueChangedCheck(MessageHeader header) {
queueIds.addAll(checkpointQueueIds);
}
Set<String> newQueueIds = new HashSet<>();
Set<String> removeQueueIds = new HashSet<>();

for (String queueId : queueIds) {
if (isNotDataSplit(queueId)) {
continue;
Expand All @@ -306,22 +301,12 @@ protected void messageQueueChangedCheck(MessageHeader header) {
} else {

this.checkPointManager.updateLastUpdate(queueId);
//if(this.checkPointManager.isRemovedSplit(queueId)){
// this.checkPointManager.removeSplit(queueId);
// removeQueueIds.add(queueId);
//}else {
//
//}

}
}
}
//if(!supportRemoveSplitFind()){
// removeSplit(removeQueueIds);
//}
if (!supportNewSplitFind()) {
addNewSplit(newQueueIds);
}

addNewSplit(newQueueIds);
}

protected abstract boolean isNotDataSplit(String queueId);
Expand All @@ -343,12 +328,15 @@ public void removeSplit(Set<String> splitIds) {

}
}
public List<ISplit> getAllSplits(){

public List<ISplit> getAllSplits() {
return null;
}
public Map<String,List<ISplit>> getWorkingSplitsGroupByInstances(){

public Map<String, List<ISplit>> getWorkingSplitsGroupByInstances() {
return null;
}

public void addNewSplit(Set<String> splitIds) {
if (splitIds == null || splitIds.size() == 0) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.rocketmq.streams.common.configurable.BasedConfigurable;
import org.apache.rocketmq.streams.common.context.AbstractContext;
import org.apache.rocketmq.streams.common.context.IMessage;
import org.apache.rocketmq.streams.common.context.Message;
import org.apache.rocketmq.streams.common.interfaces.IStreamOperator;
import org.apache.rocketmq.streams.common.interfaces.ISystemMessageProcessor;
import org.apache.rocketmq.streams.common.optimization.SQLLogFingerprintFilter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
public class RocketMQSourceExample2 {
public static final String NAMESRV_ADDRESS = "127.0.0.1:9876";
public static final String RMQ_TOPIC = "topic_tiger_0901_01";
public static final String RMQ_CONSUMER_GROUP_NAME = "consumer_tiger_0901_16";
public static final String RMQ_CONSUMER_GROUP_NAME = "test-group-10";
public static final String TAGS = "*";

public static void main(String[] args) {
Expand Down

0 comments on commit 2f4a99d

Please sign in to comment.