Skip to content

Commit

Permalink
Merge pull request apache#223 from ni-ze/supportRsqldb
Browse files Browse the repository at this point in the history
fix(window) remove isSplitsReceiver
  • Loading branch information
ni-ze authored Sep 28, 2022
2 parents 6c365c4 + ffb2b6e commit 95a2c78
Show file tree
Hide file tree
Showing 3 changed files with 0 additions and 130 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -159,61 +159,6 @@ public List<ISplit> getAllSplits() {
}
}

//todo 计算正在工作的分片?
@Override
public Map<String, List<ISplit>> getWorkingSplitsGroupByInstances() {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt();
defaultMQAdminExt.setVipChannelEnabled(false);
defaultMQAdminExt.setAdminExtGroup(UUID.randomUUID().toString());
defaultMQAdminExt.setInstanceName(this.pullConsumer.getInstanceName());
try {
defaultMQAdminExt.start();
Map<MessageQueue, String> queue2Instances = getMessageQueueAllocationResult(defaultMQAdminExt, this.groupName);
Map<String, List<ISplit>> instanceOwnerQueues = new HashMap<>();
for (MessageQueue messageQueue : queue2Instances.keySet()) {
RocketMQMessageQueue metaqMessageQueue = new RocketMQMessageQueue(new MessageQueue(messageQueue.getTopic(), messageQueue.getBrokerName(), messageQueue.getQueueId()));
if (isNotDataSplit(metaqMessageQueue.getQueueId())) {
continue;
}
String instanceName = queue2Instances.get(messageQueue);
List<ISplit> splits = instanceOwnerQueues.computeIfAbsent(instanceName, k -> new ArrayList<>());
splits.add(metaqMessageQueue);
}
return instanceOwnerQueues;

} catch (Exception e) {
throw new RuntimeException(e);
} finally {
defaultMQAdminExt.shutdown();
}
}

private Map<MessageQueue, String> getMessageQueueAllocationResult(DefaultMQAdminExt defaultMQAdminExt,
String groupName) {
HashMap<MessageQueue, String> results = new HashMap<>();

try {
ConsumerConnection consumerConnection = defaultMQAdminExt.examineConsumerConnectionInfo(groupName);
Iterator iterator = consumerConnection.getConnectionSet().iterator();

while (iterator.hasNext()) {
Connection connection = (Connection) iterator.next();
String clientId = connection.getClientId();
ConsumerRunningInfo consumerRunningInfo = defaultMQAdminExt.getConsumerRunningInfo(groupName, clientId, false);
Iterator iterator1 = consumerRunningInfo.getMqTable().keySet().iterator();

while (iterator1.hasNext()) {
MessageQueue messageQueue = (MessageQueue) iterator1.next();
results.put(messageQueue, clientId.split("@")[1]);
}
}
} catch (Exception ex) {
;
}

return results;
}

@Override
protected boolean isNotDataSplit(String queueId) {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -471,9 +471,6 @@ public List<ISplit> getAllSplits() {
return null;
}

public Map<String, List<ISplit>> getWorkingSplitsGroupByInstances() {
return new HashMap<>();
}

/**
* 当新增分片时,需要做的回调
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,11 @@ public class SplitEventTimeManager {
protected static final Log LOG = LogFactory.getLog(SplitEventTimeManager.class);
protected static final Map<String, Long> messageSplitId2MaxTime = new HashMap<>();
private AtomicInteger queueIdCount = new AtomicInteger(0);
protected Long lastUpdateTime;

protected volatile Integer allSplitSize;
protected volatile Integer workingSplitSize = 0;
protected Map<String, List<ISplit>> splitsGroupByInstance;
protected ISource source;

protected volatile boolean isAllSplitReceived = false;
protected transient String queueId;

private static Long splitReadyTime;

public SplitEventTimeManager(ISource source, String queueId) {
this.source = source;
this.queueId = queueId;
Expand Down Expand Up @@ -83,10 +76,6 @@ public void updateEventTime(IMessage message, AbstractWindow window) {
}

public Long getMaxEventTime() {

if (!isSplitsReceiver()) {
return null;
}
Long min = null;

synchronized (messageSplitId2MaxTime) {
Expand All @@ -109,68 +98,7 @@ public Long getMaxEventTime() {

}

protected boolean isSplitsReceiver() {
if (isAllSplitReceived) {
return true;
}
if (lastUpdateTime == null) {
lastUpdateTime = System.currentTimeMillis();
}

if (allSplitSize == -1) {
return true;
}

if (allSplitSize != -1 && allSplitSize > workingSplitSize) {
if (System.currentTimeMillis() - lastUpdateTime > 1000) {
workingSplitSize = calcuteWorkingSplitSize();
lastUpdateTime = System.currentTimeMillis();
if (allSplitSize > workingSplitSize) {
return false;
}
}

if (this.splitsGroupByInstance == null) {
return false;
}
//add time out policy: no necessary waiting for other split
if (splitReadyTime == null) {
synchronized (this) {
if (splitReadyTime == null) {
splitReadyTime = System.currentTimeMillis();
}
}
}
if (System.currentTimeMillis() - splitReadyTime >= 1000 * 60) {
this.isAllSplitReceived = true;
return true;
}
}

if (workingSplitSize == messageSplitId2MaxTime.size()) {
this.isAllSplitReceived = true;
return true;
}
return false;
}

private Integer calcuteWorkingSplitSize() {
if (source instanceof AbstractSource) {
AbstractSource abstractSource = (AbstractSource) source;
Map<String, List<ISplit>> splits = abstractSource.getWorkingSplitsGroupByInstances();
if (splits == null) {
return 0;
}
this.splitsGroupByInstance = splits;
int count = 0;
for (List<ISplit> splitList : splits.values()) {
count += splitList.size();
}
return count;
}
return 0;

}

public void setSource(ISource source) {
this.source = source;
Expand Down

0 comments on commit 95a2c78

Please sign in to comment.