diff --git a/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/source/RocketMQSource.java b/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/source/RocketMQSource.java index 2d199b54..4134c191 100644 --- a/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/source/RocketMQSource.java +++ b/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/source/RocketMQSource.java @@ -159,61 +159,6 @@ public List getAllSplits() { } } - //todo 计算正在工作的分片? - @Override - public Map> getWorkingSplitsGroupByInstances() { - DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(); - defaultMQAdminExt.setVipChannelEnabled(false); - defaultMQAdminExt.setAdminExtGroup(UUID.randomUUID().toString()); - defaultMQAdminExt.setInstanceName(this.pullConsumer.getInstanceName()); - try { - defaultMQAdminExt.start(); - Map queue2Instances = getMessageQueueAllocationResult(defaultMQAdminExt, this.groupName); - Map> instanceOwnerQueues = new HashMap<>(); - for (MessageQueue messageQueue : queue2Instances.keySet()) { - RocketMQMessageQueue metaqMessageQueue = new RocketMQMessageQueue(new MessageQueue(messageQueue.getTopic(), messageQueue.getBrokerName(), messageQueue.getQueueId())); - if (isNotDataSplit(metaqMessageQueue.getQueueId())) { - continue; - } - String instanceName = queue2Instances.get(messageQueue); - List splits = instanceOwnerQueues.computeIfAbsent(instanceName, k -> new ArrayList<>()); - splits.add(metaqMessageQueue); - } - return instanceOwnerQueues; - - } catch (Exception e) { - throw new RuntimeException(e); - } finally { - defaultMQAdminExt.shutdown(); - } - } - - private Map getMessageQueueAllocationResult(DefaultMQAdminExt defaultMQAdminExt, - String groupName) { - HashMap results = new HashMap<>(); - - try { - ConsumerConnection consumerConnection = defaultMQAdminExt.examineConsumerConnectionInfo(groupName); - Iterator iterator = consumerConnection.getConnectionSet().iterator(); - - while (iterator.hasNext()) { - Connection connection = (Connection) iterator.next(); - String clientId = connection.getClientId(); - ConsumerRunningInfo consumerRunningInfo = defaultMQAdminExt.getConsumerRunningInfo(groupName, clientId, false); - Iterator iterator1 = consumerRunningInfo.getMqTable().keySet().iterator(); - - while (iterator1.hasNext()) { - MessageQueue messageQueue = (MessageQueue) iterator1.next(); - results.put(messageQueue, clientId.split("@")[1]); - } - } - } catch (Exception ex) { - ; - } - - return results; - } - @Override protected boolean isNotDataSplit(String queueId) { return false; diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/source/AbstractSource.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/source/AbstractSource.java index a0e35353..a333923e 100644 --- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/source/AbstractSource.java +++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/source/AbstractSource.java @@ -471,9 +471,6 @@ public List getAllSplits() { return null; } - public Map> getWorkingSplitsGroupByInstances() { - return new HashMap<>(); - } /** * 当新增分片时,需要做的回调 diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/fire/SplitEventTimeManager.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/fire/SplitEventTimeManager.java index 797c1c4c..6148ae0f 100644 --- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/fire/SplitEventTimeManager.java +++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/fire/SplitEventTimeManager.java @@ -37,18 +37,11 @@ public class SplitEventTimeManager { protected static final Log LOG = LogFactory.getLog(SplitEventTimeManager.class); protected static final Map messageSplitId2MaxTime = new HashMap<>(); private AtomicInteger queueIdCount = new AtomicInteger(0); - protected Long lastUpdateTime; protected volatile Integer allSplitSize; - protected volatile Integer workingSplitSize = 0; - protected Map> splitsGroupByInstance; protected ISource source; - - protected volatile boolean isAllSplitReceived = false; protected transient String queueId; - private static Long splitReadyTime; - public SplitEventTimeManager(ISource source, String queueId) { this.source = source; this.queueId = queueId; @@ -83,10 +76,6 @@ public void updateEventTime(IMessage message, AbstractWindow window) { } public Long getMaxEventTime() { - - if (!isSplitsReceiver()) { - return null; - } Long min = null; synchronized (messageSplitId2MaxTime) { @@ -109,68 +98,7 @@ public Long getMaxEventTime() { } - protected boolean isSplitsReceiver() { - if (isAllSplitReceived) { - return true; - } - if (lastUpdateTime == null) { - lastUpdateTime = System.currentTimeMillis(); - } - - if (allSplitSize == -1) { - return true; - } - if (allSplitSize != -1 && allSplitSize > workingSplitSize) { - if (System.currentTimeMillis() - lastUpdateTime > 1000) { - workingSplitSize = calcuteWorkingSplitSize(); - lastUpdateTime = System.currentTimeMillis(); - if (allSplitSize > workingSplitSize) { - return false; - } - } - - if (this.splitsGroupByInstance == null) { - return false; - } - //add time out policy: no necessary waiting for other split - if (splitReadyTime == null) { - synchronized (this) { - if (splitReadyTime == null) { - splitReadyTime = System.currentTimeMillis(); - } - } - } - if (System.currentTimeMillis() - splitReadyTime >= 1000 * 60) { - this.isAllSplitReceived = true; - return true; - } - } - - if (workingSplitSize == messageSplitId2MaxTime.size()) { - this.isAllSplitReceived = true; - return true; - } - return false; - } - - private Integer calcuteWorkingSplitSize() { - if (source instanceof AbstractSource) { - AbstractSource abstractSource = (AbstractSource) source; - Map> splits = abstractSource.getWorkingSplitsGroupByInstances(); - if (splits == null) { - return 0; - } - this.splitsGroupByInstance = splits; - int count = 0; - for (List splitList : splits.values()) { - count += splitList.size(); - } - return count; - } - return 0; - - } public void setSource(ISource source) { this.source = source;