diff --git a/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/source/RocketMQSource.java b/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/source/RocketMQSource.java index 3a02abf2..190b06f5 100644 --- a/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/source/RocketMQSource.java +++ b/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/source/RocketMQSource.java @@ -32,6 +32,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -70,6 +71,7 @@ public class RocketMQSource extends AbstractSupportShuffleSource { private transient DefaultLitePullConsumer pullConsumer; private transient ExecutorService executorService; private transient PullTask[] pullTasks; + private transient volatile AtomicBoolean committing = new AtomicBoolean(false); public RocketMQSource() { } @@ -112,9 +114,9 @@ protected boolean startSource() { this.pullConsumer.start(); return true; - } catch (MQClientException e) { + } catch (Throwable t) { setInitSuccess(false); - throw new RuntimeException("start rocketmq channel error " + topic, e); + throw new RuntimeException("start rocketmq channel error " + topic, t); } } @@ -242,11 +244,29 @@ public void destroyConsumer() { this.executorService.shutdown(); //关闭消费实例 - this.pullConsumer.shutdown(); + try { + synchronized (committing) { + while (committing.get()) { + committing.wait(); + } + } + + this.pullConsumer.shutdown(); + } catch (Throwable t) { + LOG.error(t); + } + } public void commit(Set messageQueues) { - this.pullConsumer.commit(messageQueues, true); + if (this.pullConsumer.isRunning()) { + synchronized (committing) { + committing.set(true); + this.pullConsumer.commit(messageQueues, true); + committing.set(false); + committing.notifyAll(); + } + } } @Override @@ -302,57 +322,65 @@ public void run() { } while (!this.isStopped) { - - if (this.delegator.needSync()) { - synchronized (this.pullConsumer) { - if (this.delegator.needSync()) { - afterRebalance(); + try { + if (this.delegator.needSync()) { + synchronized (this.pullConsumer) { + if (this.delegator.needSync()) { + afterRebalance(); + } } } - } - List msgs = pullConsumer.poll(pullTimeout); + List msgs = pullConsumer.poll(pullTimeout); - int i = 0; - for (MessageExt msg : msgs) { - JSONObject jsonObject = create(msg.getBody(), msg.getProperties()); + int i = 0; + for (MessageExt msg : msgs) { + JSONObject jsonObject = create(msg.getBody(), msg.getProperties()); - String topic = msg.getTopic(); - int queueId = msg.getQueueId(); - String brokerName = msg.getBrokerName(); - MessageQueue queue = new MessageQueue(topic, brokerName, queueId); - String unionQueueId = RocketMQMessageQueue.getQueueId(queue); + String topic = msg.getTopic(); + int queueId = msg.getQueueId(); + String brokerName = msg.getBrokerName(); + MessageQueue queue = new MessageQueue(topic, brokerName, queueId); + String unionQueueId = RocketMQMessageQueue.getQueueId(queue); - String offset = msg.getQueueOffset() + ""; - org.apache.rocketmq.streams.common.context.Message message = createMessage(jsonObject, unionQueueId, offset, false); - message.getHeader().setOffsetIsLong(true); + String offset = msg.getQueueOffset() + ""; + org.apache.rocketmq.streams.common.context.Message message = createMessage(jsonObject, unionQueueId, offset, false); + message.getHeader().setOffsetIsLong(true); - if (i == msgs.size() - 1) { - message.getHeader().setNeedFlush(true); + if (i == msgs.size() - 1) { + message.getHeader().setNeedFlush(true); + } + executeMessage(message); + i++; } - executeMessage(message); - i++; - } - //拉取的批量消息处理完成以后判断是否提交位点; - synchronized (this.pullConsumer) { - if (System.currentTimeMillis() - lastCommit >= commitInternalMs || isStopped) { - lastCommit = System.currentTimeMillis(); - //向broker提交消费位点,todo 从consumer那里拿不到正在消费哪些messageQueue - commit(this.delegator.getLastDivided()); + //拉取的批量消息处理完成以后判断是否提交位点; + synchronized (this.pullConsumer) { + if (System.currentTimeMillis() - lastCommit >= commitInternalMs && !isStopped) { + lastCommit = System.currentTimeMillis(); + //向broker提交消费位点,todo 从consumer那里拿不到正在消费哪些messageQueue + commit(this.delegator.getLastDivided()); + } } + } catch (Throwable t) { + LOG.error(t); } } } public void shutdown() { + Set lastDivided = this.delegator.getLastDivided(); + if (lastDivided != null && lastDivided.size() != 0) { + commit(lastDivided); + } + this.isStopped = true; } } - private void newRebalance(Set allQueueInLastRebalance){ + private void newRebalance(Set allQueueInLastRebalance) { Set temp = new HashSet<>(); for (MessageQueue queue : allQueueInLastRebalance) { String unionQueueId = RocketMQMessageQueue.getQueueId(queue); diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/memory/MemorySource.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/memory/MemorySource.java index c59c0477..d68c41e9 100644 --- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/memory/MemorySource.java +++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/memory/MemorySource.java @@ -16,11 +16,14 @@ */ package org.apache.rocketmq.streams.common.channel.impl.memory; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.rocketmq.streams.common.channel.source.AbstractBatchSource; import org.apache.rocketmq.streams.common.configurable.IAfterConfigurableRefreshListener; import org.apache.rocketmq.streams.common.configurable.IConfigurableService; public class MemorySource extends AbstractBatchSource implements IAfterConfigurableRefreshListener { + private static final Log logger = LogFactory.getLog(MemorySource.class); protected String cacheName; protected transient MemoryCache memoryCache; @@ -51,8 +54,9 @@ public void run() { Thread.sleep(1000); } - } catch (Exception e) { - throw new RuntimeException(e); + } catch (Throwable t) { + logger.error("MemorySource error:"); + t.printStackTrace(); } } diff --git a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/service/impl/ScanFunctionService.java b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/service/impl/ScanFunctionService.java index a92b279a..9063cead 100644 --- a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/service/impl/ScanFunctionService.java +++ b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/service/impl/ScanFunctionService.java @@ -67,7 +67,7 @@ public ScanFunctionService(boolean scanDipper) { if (scanDipper) { scan.scanPackage("org.apache.rocketmq.streams.script.function.impl"); scan.scanPackage("org.apache.rocketmq.streams.filter.function"); - scan.scanPackage("org.apache.rocketmq.streams.dim.function"); +// scan.scanPackage("org.apache.rocketmq.streams.dim.function"); } }