Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -327,26 +327,21 @@ private boolean relocateAndBalance(GroupId groupId, Set<Long> unavailableBeIds,
// sort backends with replica num in desc order
List<Map.Entry<Long, Long>> backendWithReplicaNum =
getSortedBackendReplicaNumPairs(availableBeIds, unavailableBeIds, statistic, flatBackendsPerBucketSeq);
if (seqIndexes == null || seqIndexes.size() <= 0) {
// if there is only one available backend and no unavailable bucketId to relocate, end the outer loop
if (backendWithReplicaNum.size() <= 1) {
break;
}

// if there is only one available backend and no unavailable bucketId to relocate, end the outer loop
if (backendWithReplicaNum.size() <= 1) {
break;
}

if (seqIndexes == null || seqIndexes.size() <= 0) {
// choose max bucketId num be as src be
Preconditions.checkState(backendsPerBucketSeq.size() > 0);
srcBeId = backendWithReplicaNum.get(0).getKey();
seqIndexes = getBeSeqIndexes(flatBackendsPerBucketSeq, srcBeId);
}

int i;
if (hasUnavailableBe) {
i = -1;
} else {
i = 0;
}
int j = backendWithReplicaNum.size() - 1;
while (i < j) {
boolean isThisRoundChanged = false;
boolean isThisRoundChanged = false;
for (int j = backendWithReplicaNum.size() - 1; j >= 0; j--) {
// we try to use a low backend to replace the src backend.
// if replace failed(eg: both backends are on some host), select next low backend and try(j--)
Map.Entry<Long, Long> lowBackend = backendWithReplicaNum.get(j);
Expand All @@ -362,6 +357,11 @@ private boolean relocateAndBalance(GroupId groupId, Set<Long> unavailableBeIds,
return false;
}

// if we found src_id == dst_id we skip to next
if (srcBeId == destBeId) {
continue;
}

for (int seqIndex : seqIndexes) {
// the bucket index.
// eg: 0 / 3 = 0, so that the bucket index of the 4th backend id in flatBackendsPerBucketSeq is 0.
Expand All @@ -381,23 +381,23 @@ private boolean relocateAndBalance(GroupId groupId, Set<Long> unavailableBeIds,
}
}

if (!isThisRoundChanged) {
LOG.info("unable to replace backend {} with backend {} in colocate group {}",
srcBeId, destBeId, groupId);
if (--j == i) {
// if all backends are checked but this round is not changed,
// we should end the outer loop to avoid endless loops
LOG.info("all backends are checked but this round is not changed, " +
"end outer loop in colocate group {}", groupId);
break OUT;
} else {
// select another low backend and try again
continue;
}
if (isThisRoundChanged) {
// we found a change
break;
}
// we use next node as dst node
LOG.info("unable to replace backend {} with backend {} in colocate group {}",
srcBeId, destBeId, groupId);
}

if (!isThisRoundChanged) {
// if all backends are checked but this round is not changed,
// we should end the loop
LOG.info("all backends are checked but this round is not changed, " +
"end outer loop in colocate group {}", groupId);
break;
} // end inner loop
}
// end inner loop
}

if (isChanged) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,37 @@ public void testFixBalanceEndlessLoop(@Mocked SystemInfoService infoService,
Assert.assertFalse(changed);
}

@Test
public void testFixBalanceEndlessLoop2(@Mocked SystemInfoService infoService,
@Mocked ClusterLoadStatistic statistic) {
new Expectations() {
{
statistic.getBackendLoadStatistic(anyLong);
result = new Delegate<BackendLoadStatistic>() {
BackendLoadStatistic delegate(Long beId) {
return new FakeBackendLoadStatistic(beId, null, null, null);
}
};
minTimes = 0;
}
};
GroupId groupId = new GroupId(10000, 10001);
List<Column> distributionCols = Lists.newArrayList();
ColocateGroupSchema groupSchema = new ColocateGroupSchema(groupId, distributionCols, 5, (short) 1);
Map<GroupId, ColocateGroupSchema> group2Schema = Maps.newHashMap();
group2Schema.put(groupId, groupSchema);

ColocateTableIndex colocateTableIndex = createColocateIndex(groupId, Lists.newArrayList(7L, 7L, 7L, 7L, 7L));
Deencapsulation.setField(colocateTableIndex, "group2Schema", group2Schema);

List<List<Long>> balancedBackendsPerBucketSeq = Lists.newArrayList();
Set<Long> unAvailBackendIds = Sets.newHashSet(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L);
List<Long> availBackendIds = Lists.newArrayList();
boolean changed = (Boolean) Deencapsulation.invoke(balancer, "relocateAndBalance", groupId, unAvailBackendIds, availBackendIds,
colocateTableIndex, infoService, statistic, balancedBackendsPerBucketSeq);
Assert.assertFalse(changed);
}

@Test
public void testGetSortedBackendReplicaNumPairs(@Mocked ClusterLoadStatistic statistic) {
new Expectations() {
Expand Down