From a26425c178e23d69b102bf3ed3dd5cb6b8cf4f44 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=BB=B4=E7=AB=A0?= Date: Tue, 30 Aug 2022 20:55:08 +0800 Subject: [PATCH 1/7] feat(nest join) support nest join --- .../channel/sinkcache/impl/MessageCache.java | 1 + .../streams/common/context/MessageHeader.java | 10 ++++++++++ .../streams/common/topology/ChainPipeline.java | 1 + .../common/topology/stages/JoinChainStage.java | 10 +++++++--- .../window/operator/join/JoinWindow.java | 17 ++++++++++++++++- .../streams/window/shuffle/ShuffleChannel.java | 5 +++++ 6 files changed, 40 insertions(+), 4 deletions(-) diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/sinkcache/impl/MessageCache.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/sinkcache/impl/MessageCache.java index dcb7abbb..7586d3e6 100644 --- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/sinkcache/impl/MessageCache.java +++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/sinkcache/impl/MessageCache.java @@ -26,6 +26,7 @@ import org.apache.rocketmq.streams.common.channel.sinkcache.DataSourceAutoFlushTask; import org.apache.rocketmq.streams.common.channel.sinkcache.IMessageCache; import org.apache.rocketmq.streams.common.channel.sinkcache.IMessageFlushCallBack; +import org.apache.rocketmq.streams.common.context.Message; import org.apache.rocketmq.streams.common.schedule.ScheduleManager; import org.apache.rocketmq.streams.common.schedule.ScheduleTask; diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/context/MessageHeader.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/context/MessageHeader.java index 0b105430..70d93106 100644 --- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/context/MessageHeader.java +++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/context/MessageHeader.java @@ -99,6 +99,8 @@ public class MessageHeader { protected String msgRouteFromLable;//消息从哪里来的标签,标记上游节点的标记,主要是通过build table name来标记 + private String originTable; + protected String logFingerprintValue;//日志指纹的值 public MessageHeader copy() { @@ -359,4 +361,12 @@ public String getPipelineName() { public void setPipelineName(String pipelineName) { this.pipelineName = pipelineName; } + + public String getOriginTable() { + return originTable; + } + + public void setOriginTable(String originTable) { + this.originTable = originTable; + } } diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/ChainPipeline.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/ChainPipeline.java index 96416044..87743696 100644 --- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/ChainPipeline.java +++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/ChainPipeline.java @@ -290,6 +290,7 @@ public void doNextStages(AbstractContext context, String msgPrevSourceName, Stri //boolean needFlush = needFlush(msg); if (StringUtil.isNotEmpty(oriMsgPrewSourceName)) { msg.getHeader().setMsgRouteFromLable(oriMsgPrewSourceName); + msg.getHeader().setOriginTable(oriMsgPrewSourceName); } boolean isContinue = executeStage(stage, msg, copyContext); diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/JoinChainStage.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/JoinChainStage.java index 9cb9df35..cf4b9e9f 100644 --- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/JoinChainStage.java +++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/JoinChainStage.java @@ -44,8 +44,14 @@ public class JoinChainStage extends AbstractWindowStage { @Override protected IMessage doProcess(IMessage message, AbstractContext context) { String lable = message.getHeader().getMsgRouteFromLable(); + String originTable = message.getHeader().getOriginTable(); + String joinFlag = null; if (lable != null) { + if ((lable.equals("left") || lable.equals("right")) && originTable != null) { + lable = originTable; + } + if (lable.equals(rightDependentTableName)) { joinFlag = MessageHeader.JOIN_RIGHT; } else { @@ -61,9 +67,7 @@ protected IMessage doProcess(IMessage message, AbstractContext context) { } else { rightPipeline.doMessage(message, context); } - //if(!MessageGloableTrace.existFinshBranch(message)){ - // context.setBreak(true); - //} + context.breakExecute(); return message; } diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/join/JoinWindow.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/join/JoinWindow.java index b16002a3..eea5986a 100644 --- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/join/JoinWindow.java +++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/join/JoinWindow.java @@ -102,7 +102,6 @@ public void shuffleCalculate(List messages, WindowInstance instance, S if (WindowJoinType.left.name().equalsIgnoreCase(routeLabel)) { storage.putWindowBaseValue(queueId, windowInstanceId, WindowType.JOIN_WINDOW, WindowJoinType.left, temp); - } else if (WindowJoinType.right.name().equalsIgnoreCase(routeLabel)) { storage.putWindowBaseValue(queueId, windowInstanceId, WindowType.JOIN_WINDOW, WindowJoinType.right, temp); } else { @@ -490,6 +489,22 @@ protected void sendMessage(JSONObject message, boolean needFlush) { if (needFlush) { nextMessage.getHeader().setNeedFlush(true); } + + String routeLabel = nextMessage.getHeader().getMsgRouteFromLable(); + if (routeLabel == null) { + //嵌套join,内部join后没有routeLabel,需要设置结果的routeLabel + String configureName = this.getConfigureName(); + String[] tempList = configureName.split("_"); + for (int i = tempList.length -1; i > 0; i--) { + if ("left".equalsIgnoreCase(tempList[i]) || "right".equalsIgnoreCase(tempList[i])) { + routeLabel = tempList[i]; + System.out.println("nested join, routeLabel=" + routeLabel); + break; + } + } + nextMessage.getHeader().setMsgRouteFromLable(routeLabel); + } + AbstractContext context = new Context(nextMessage); boolean isWindowTest = ComponentCreator.getPropertyBooleanValue("window.fire.isTest"); if (isWindowTest) { diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java index bad436fd..7b8175c4 100644 --- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java +++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java @@ -229,6 +229,11 @@ public void addNewSplit(IMessage message, AbstractContext context, NewSplitMessa for (String splitId : splitIds) { this.loadResult.put(splitId, future); } + + if (message.getHeader().isSystemMessage() && window.getFireReceiver() == null) { + return; + } + window.getFireReceiver().doMessage(message, context); } From 18b51850875110101434beec466d25e0a522435d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=BB=B4=E7=AB=A0?= Date: Tue, 30 Aug 2022 20:59:36 +0800 Subject: [PATCH 2/7] maintain(example) remove MqttSourceExample --- .../examples/source/MqttSourceExample.java | 42 ------------------- 1 file changed, 42 deletions(-) delete mode 100644 rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/source/MqttSourceExample.java diff --git a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/source/MqttSourceExample.java b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/source/MqttSourceExample.java deleted file mode 100644 index 33b8b0a4..00000000 --- a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/source/MqttSourceExample.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.rocketmq.streams.examples.source; - -import com.alibaba.fastjson.JSONObject; -import org.apache.rocketmq.streams.client.StreamBuilder; -import org.apache.rocketmq.streams.client.source.DataStreamSource; -import org.apache.rocketmq.streams.client.strategy.ShuffleStrategy; -import org.apache.rocketmq.streams.client.transform.window.Time; -import org.apache.rocketmq.streams.client.transform.window.TumblingWindow; - -public class MqttSourceExample { - - public static void main(String[] args) { - DataStreamSource dataStream = StreamBuilder.dataStream("test_namespace", "graph_pipeline"); - dataStream.fromMqtt("xxxxx", "xxxx", "xxxxxx", "", "") - .flatMap(message -> ((JSONObject) message).getJSONArray("Data")) - .window(TumblingWindow.of(Time.minutes(1))) - .groupBy("AttributeCode") - .setLocalStorageOnly(true) - .avg("Value", "avg_value") - .toDataStream() - .toPrint() - .with(ShuffleStrategy.shuffleWithMemory()) - .start(); - } - -} From 52d75250fe9a78d80abc2cc87b3f3c60927336dd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=BB=B4=E7=AB=A0?= Date: Wed, 31 Aug 2022 11:24:54 +0800 Subject: [PATCH 3/7] add synchronized --- .../window/fire/SplitEventTimeManager.java | 24 +++++++++++-------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/fire/SplitEventTimeManager.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/fire/SplitEventTimeManager.java index b4daf916..35bf2c63 100644 --- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/fire/SplitEventTimeManager.java +++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/fire/SplitEventTimeManager.java @@ -35,7 +35,7 @@ public class SplitEventTimeManager { protected static final Log LOG = LogFactory.getLog(SplitEventTimeManager.class); - protected static Map messageSplitId2MaxTime = new HashMap<>(); + protected static final Map messageSplitId2MaxTime = new HashMap<>(); private AtomicInteger queueIdCount = new AtomicInteger(0); protected Long lastUpdateTime; @@ -86,19 +86,23 @@ public Long getMaxEventTime() { return null; } Long min = null; - Set eventTimes = new HashSet<>(messageSplitId2MaxTime.values()); - for (Long eventTime : eventTimes) { - if (eventTime == null) { - return null; - } - if (min == null) { - min = eventTime; - } else { - if (eventTime < min) { + + synchronized (messageSplitId2MaxTime) { + Set eventTimes = new HashSet<>(messageSplitId2MaxTime.values()); + for (Long eventTime : eventTimes) { + if (eventTime == null) { + return null; + } + if (min == null) { min = eventTime; + } else { + if (eventTime < min) { + min = eventTime; + } } } } + return min; } From d4a5daee2ab1285483eedb4245f4907b03a762d0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=BB=B4=E7=AB=A0?= Date: Thu, 1 Sep 2022 15:08:14 +0800 Subject: [PATCH 4/7] fix(channel-rocketmq)commit offset when shutdown --- .../org/apache/rocketmq/streams/source/RocketMQSource.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/source/RocketMQSource.java b/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/source/RocketMQSource.java index 3a02abf2..9cc32a14 100644 --- a/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/source/RocketMQSource.java +++ b/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/source/RocketMQSource.java @@ -338,7 +338,7 @@ public void run() { //拉取的批量消息处理完成以后判断是否提交位点; synchronized (this.pullConsumer) { - if (System.currentTimeMillis() - lastCommit >= commitInternalMs || isStopped) { + if (System.currentTimeMillis() - lastCommit >= commitInternalMs && !isStopped) { lastCommit = System.currentTimeMillis(); //向broker提交消费位点,todo 从consumer那里拿不到正在消费哪些messageQueue commit(this.delegator.getLastDivided()); @@ -348,6 +348,7 @@ public void run() { } public void shutdown() { + commit(this.delegator.getLastDivided()); this.isStopped = true; } } From b8da8e80eb6a5ce050755b6fce178cefd89f33e8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=BB=B4=E7=AB=A0?= Date: Thu, 1 Sep 2022 19:53:56 +0800 Subject: [PATCH 5/7] fix(channel-rocketmq)commit offset before shutdown --- .../streams/source/RocketMQSource.java | 97 ++++++++++++------- 1 file changed, 62 insertions(+), 35 deletions(-) diff --git a/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/source/RocketMQSource.java b/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/source/RocketMQSource.java index 9cc32a14..190b06f5 100644 --- a/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/source/RocketMQSource.java +++ b/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/source/RocketMQSource.java @@ -32,6 +32,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -70,6 +71,7 @@ public class RocketMQSource extends AbstractSupportShuffleSource { private transient DefaultLitePullConsumer pullConsumer; private transient ExecutorService executorService; private transient PullTask[] pullTasks; + private transient volatile AtomicBoolean committing = new AtomicBoolean(false); public RocketMQSource() { } @@ -112,9 +114,9 @@ protected boolean startSource() { this.pullConsumer.start(); return true; - } catch (MQClientException e) { + } catch (Throwable t) { setInitSuccess(false); - throw new RuntimeException("start rocketmq channel error " + topic, e); + throw new RuntimeException("start rocketmq channel error " + topic, t); } } @@ -242,11 +244,29 @@ public void destroyConsumer() { this.executorService.shutdown(); //关闭消费实例 - this.pullConsumer.shutdown(); + try { + synchronized (committing) { + while (committing.get()) { + committing.wait(); + } + } + + this.pullConsumer.shutdown(); + } catch (Throwable t) { + LOG.error(t); + } + } public void commit(Set messageQueues) { - this.pullConsumer.commit(messageQueues, true); + if (this.pullConsumer.isRunning()) { + synchronized (committing) { + committing.set(true); + this.pullConsumer.commit(messageQueues, true); + committing.set(false); + committing.notifyAll(); + } + } } @Override @@ -302,58 +322,65 @@ public void run() { } while (!this.isStopped) { - - if (this.delegator.needSync()) { - synchronized (this.pullConsumer) { - if (this.delegator.needSync()) { - afterRebalance(); + try { + if (this.delegator.needSync()) { + synchronized (this.pullConsumer) { + if (this.delegator.needSync()) { + afterRebalance(); + } } } - } - List msgs = pullConsumer.poll(pullTimeout); + List msgs = pullConsumer.poll(pullTimeout); - int i = 0; - for (MessageExt msg : msgs) { - JSONObject jsonObject = create(msg.getBody(), msg.getProperties()); + int i = 0; + for (MessageExt msg : msgs) { + JSONObject jsonObject = create(msg.getBody(), msg.getProperties()); - String topic = msg.getTopic(); - int queueId = msg.getQueueId(); - String brokerName = msg.getBrokerName(); - MessageQueue queue = new MessageQueue(topic, brokerName, queueId); - String unionQueueId = RocketMQMessageQueue.getQueueId(queue); + String topic = msg.getTopic(); + int queueId = msg.getQueueId(); + String brokerName = msg.getBrokerName(); + MessageQueue queue = new MessageQueue(topic, brokerName, queueId); + String unionQueueId = RocketMQMessageQueue.getQueueId(queue); - String offset = msg.getQueueOffset() + ""; - org.apache.rocketmq.streams.common.context.Message message = createMessage(jsonObject, unionQueueId, offset, false); - message.getHeader().setOffsetIsLong(true); + String offset = msg.getQueueOffset() + ""; + org.apache.rocketmq.streams.common.context.Message message = createMessage(jsonObject, unionQueueId, offset, false); + message.getHeader().setOffsetIsLong(true); - if (i == msgs.size() - 1) { - message.getHeader().setNeedFlush(true); + if (i == msgs.size() - 1) { + message.getHeader().setNeedFlush(true); + } + executeMessage(message); + i++; } - executeMessage(message); - i++; - } - //拉取的批量消息处理完成以后判断是否提交位点; - synchronized (this.pullConsumer) { - if (System.currentTimeMillis() - lastCommit >= commitInternalMs && !isStopped) { - lastCommit = System.currentTimeMillis(); - //向broker提交消费位点,todo 从consumer那里拿不到正在消费哪些messageQueue - commit(this.delegator.getLastDivided()); + //拉取的批量消息处理完成以后判断是否提交位点; + synchronized (this.pullConsumer) { + if (System.currentTimeMillis() - lastCommit >= commitInternalMs && !isStopped) { + lastCommit = System.currentTimeMillis(); + //向broker提交消费位点,todo 从consumer那里拿不到正在消费哪些messageQueue + commit(this.delegator.getLastDivided()); + } } + } catch (Throwable t) { + LOG.error(t); } } } public void shutdown() { - commit(this.delegator.getLastDivided()); + Set lastDivided = this.delegator.getLastDivided(); + if (lastDivided != null && lastDivided.size() != 0) { + commit(lastDivided); + } + this.isStopped = true; } } - private void newRebalance(Set allQueueInLastRebalance){ + private void newRebalance(Set allQueueInLastRebalance) { Set temp = new HashSet<>(); for (MessageQueue queue : allQueueInLastRebalance) { String unionQueueId = RocketMQMessageQueue.getQueueId(queue); From be73d55a39563e848baa5bf0ec2a4298e0bd3ac2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=BB=B4=E7=AB=A0?= Date: Fri, 2 Sep 2022 09:48:20 +0800 Subject: [PATCH 6/7] remove scan dim --- .../script/function/service/impl/ScanFunctionService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/service/impl/ScanFunctionService.java b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/service/impl/ScanFunctionService.java index a92b279a..9063cead 100644 --- a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/service/impl/ScanFunctionService.java +++ b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/service/impl/ScanFunctionService.java @@ -67,7 +67,7 @@ public ScanFunctionService(boolean scanDipper) { if (scanDipper) { scan.scanPackage("org.apache.rocketmq.streams.script.function.impl"); scan.scanPackage("org.apache.rocketmq.streams.filter.function"); - scan.scanPackage("org.apache.rocketmq.streams.dim.function"); +// scan.scanPackage("org.apache.rocketmq.streams.dim.function"); } } From fa74f37ffb9dd90e9cc8a26f8fe68e6bb9fdcb59 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=BB=B4=E7=AB=A0?= Date: Fri, 2 Sep 2022 21:48:30 +0800 Subject: [PATCH 7/7] add log --- .../streams/common/channel/impl/memory/MemorySource.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/memory/MemorySource.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/memory/MemorySource.java index c59c0477..d68c41e9 100644 --- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/memory/MemorySource.java +++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/memory/MemorySource.java @@ -16,11 +16,14 @@ */ package org.apache.rocketmq.streams.common.channel.impl.memory; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.rocketmq.streams.common.channel.source.AbstractBatchSource; import org.apache.rocketmq.streams.common.configurable.IAfterConfigurableRefreshListener; import org.apache.rocketmq.streams.common.configurable.IConfigurableService; public class MemorySource extends AbstractBatchSource implements IAfterConfigurableRefreshListener { + private static final Log logger = LogFactory.getLog(MemorySource.class); protected String cacheName; protected transient MemoryCache memoryCache; @@ -51,8 +54,9 @@ public void run() { Thread.sleep(1000); } - } catch (Exception e) { - throw new RuntimeException(e); + } catch (Throwable t) { + logger.error("MemorySource error:"); + t.printStackTrace(); } }