Skip to content

Commit

Permalink
Add logs for data partition allocation (#12090)
Browse files Browse the repository at this point in the history
  • Loading branch information
CRZbulabula authored Feb 28, 2024
1 parent 80857ee commit 7833e6d
Showing 1 changed file with 21 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -123,12 +123,13 @@ public Map<String, DataPartitionTable> allocateDataPartition(

// Filter available DataRegionGroups and
// sort them by the number of allocated DataPartitions
BalanceTreeMap<TConsensusGroupId, Integer> counter = new BalanceTreeMap<>();
BalanceTreeMap<TConsensusGroupId, Integer> availableDataRegionGroupCounter =
new BalanceTreeMap<>();
List<Pair<Long, TConsensusGroupId>> regionSlotsCounter =
getPartitionManager()
.getSortedRegionGroupSlotsCounter(database, TConsensusGroupType.DataRegion);
for (Pair<Long, TConsensusGroupId> pair : regionSlotsCounter) {
counter.put(pair.getRight(), pair.getLeft().intValue());
availableDataRegionGroupCounter.put(pair.getRight(), pair.getLeft().intValue());
}

DataPartitionTable dataPartitionTable = new DataPartitionTable();
Expand All @@ -152,18 +153,20 @@ public Map<String, DataPartitionTable> allocateDataPartition(
TConsensusGroupId successor =
getPartitionManager()
.getSuccessorDataPartition(database, seriesPartitionSlot, timePartitionSlot);
if (successor != null && counter.containsKey(successor)) {
if (successor != null && availableDataRegionGroupCounter.containsKey(successor)) {
seriesPartitionTable.putDataPartition(timePartitionSlot, successor);
counter.put(successor, counter.get(successor) + 1);
availableDataRegionGroupCounter.put(
successor, availableDataRegionGroupCounter.get(successor) + 1);
continue;
}

// 2. Assign DataPartition base on the DataAllotTable
TConsensusGroupId allotGroupId =
allotTable.getRegionGroupIdOrActivateIfNecessary(seriesPartitionSlot);
if (counter.containsKey(allotGroupId)) {
if (availableDataRegionGroupCounter.containsKey(allotGroupId)) {
seriesPartitionTable.putDataPartition(timePartitionSlot, allotGroupId);
counter.put(allotGroupId, counter.get(allotGroupId) + 1);
availableDataRegionGroupCounter.put(
allotGroupId, availableDataRegionGroupCounter.get(allotGroupId) + 1);
continue;
}

Expand All @@ -172,17 +175,25 @@ public Map<String, DataPartitionTable> allocateDataPartition(
TConsensusGroupId predecessor =
getPartitionManager()
.getPredecessorDataPartition(database, seriesPartitionSlot, timePartitionSlot);
if (predecessor != null && counter.containsKey(predecessor)) {
if (predecessor != null && availableDataRegionGroupCounter.containsKey(predecessor)) {
seriesPartitionTable.putDataPartition(timePartitionSlot, predecessor);
counter.put(predecessor, counter.get(predecessor) + 1);
availableDataRegionGroupCounter.put(
predecessor, availableDataRegionGroupCounter.get(predecessor) + 1);
continue;
}

// 4. Assign the DataPartition to DataRegionGroup with the least DataPartitions
// If the above DataRegionGroups are unavailable
TConsensusGroupId greedyGroupId = counter.getKeyWithMinValue();
TConsensusGroupId greedyGroupId = availableDataRegionGroupCounter.getKeyWithMinValue();
seriesPartitionTable.putDataPartition(timePartitionSlot, greedyGroupId);
counter.put(greedyGroupId, counter.get(greedyGroupId) + 1);
availableDataRegionGroupCounter.put(
greedyGroupId, availableDataRegionGroupCounter.get(greedyGroupId) + 1);
LOGGER.warn(
"[PartitionBalancer] The SeriesSlot: {} in TimeSlot: {} will be allocated to DataRegionGroup: {}, because the original target: {} is currently unavailable.",
seriesPartitionSlot,
timePartitionSlot,
greedyGroupId,
allotGroupId);
}

dataPartitionTable
Expand Down

0 comments on commit 7833e6d

Please sign in to comment.