Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -151,10 +151,10 @@ private void relocateAndBalanceGroup() {
continue;
}

Set<Long> unavailableBeIds = getUnavailableBeIdsInGroup(infoService, colocateIndex, groupId);
List<Long> availableBeIds = getAvailableBeIdsInGroup(db.getClusterName(), infoService, unavailableBeIds);
Set<Long> unavailableBeIdsInGroup = getUnavailableBeIdsInGroup(infoService, colocateIndex, groupId);
List<Long> availableBeIds = getAvailableBeIds(db.getClusterName(), infoService);
List<List<Long>> balancedBackendsPerBucketSeq = Lists.newArrayList();
if (relocateAndBalance(groupId, unavailableBeIds, availableBeIds, colocateIndex, infoService, statistic, balancedBackendsPerBucketSeq)) {
if (relocateAndBalance(groupId, unavailableBeIdsInGroup, availableBeIds, colocateIndex, infoService, statistic, balancedBackendsPerBucketSeq)) {
colocateIndex.addBackendsPerBucketSeq(groupId, balancedBackendsPerBucketSeq);
ColocatePersistInfo info = ColocatePersistInfo.createForBackendsPerBucketSeq(groupId, balancedBackendsPerBucketSeq);
catalog.getEditLog().logColocateBackendsPerBucketSeq(info);
Expand Down Expand Up @@ -484,27 +484,44 @@ private List<Integer> getBeSeqIndexes(List<Long> flatBackendsPerBucketSeq, long
private Set<Long> getUnavailableBeIdsInGroup(SystemInfoService infoService, ColocateTableIndex colocateIndex, GroupId groupId) {
Set<Long> backends = colocateIndex.getBackendsByGroup(groupId);
Set<Long> unavailableBeIds = Sets.newHashSet();
long currTime = System.currentTimeMillis();
for (Long backendId : backends) {
Backend be = infoService.getBackend(backendId);
if (be == null) {
if (!checkBackendAvailable(backendId, infoService)) {
unavailableBeIds.add(backendId);
} else if (!be.isAvailable()) {
// 1. BE is dead for a long time
// 2. BE is under decommission
if ((!be.isAlive() && (currTime - be.getLastUpdateMs()) > Config.tablet_repair_delay_factor_second * 1000 * 2)
|| be.isDecommissioned()) {
unavailableBeIds.add(backendId);
}
}
}
return unavailableBeIds;
}

private List<Long> getAvailableBeIdsInGroup(String cluster, SystemInfoService infoService, Set<Long> unavailableBeIds) {
List<Long> allBackendIds = infoService.getClusterBackendIds(cluster, true);
return allBackendIds.stream()
.filter(id -> !unavailableBeIds.contains(id))
.collect(Collectors.toList());
private List<Long> getAvailableBeIds(String cluster, SystemInfoService infoService) {
// get all backends to allBackendIds, and check be availability using checkBackendAvailable
// backend stopped for a short period of time is still considered available
List<Long> allBackendIds = infoService.getClusterBackendIds(cluster, false);
List<Long> availableBeIds = Lists.newArrayList();
for (Long backendId : allBackendIds) {
if (checkBackendAvailable(backendId, infoService)) {
availableBeIds.add(backendId);
}
}
return availableBeIds;
}

/**
* check backend available
* backend stopped for a short period of time is still considered available
*/
private boolean checkBackendAvailable(Long backendId, SystemInfoService infoService) {
long currTime = System.currentTimeMillis();
Backend be = infoService.getBackend(backendId);
if (be == null) {
return false;
} else if (!be.isAvailable()) {
// 1. BE is dead for a long time
// 2. BE is under decommission
if ((!be.isAlive() && (currTime - be.getLastUpdateMs()) > Config.tablet_repair_delay_factor_second * 1000 * 2)
|| be.isDecommissioned()) {
return false;
}
}
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -362,18 +362,75 @@ public void testGetUnavailableBeIdsInGroup(@Mocked ColocateTableIndex colocateTa
}

@Test
public void testGetAvailableBeIdsInGroup(@Mocked SystemInfoService infoService) {
List<Long> clusterAliveBackendIds = Lists.newArrayList(1L, 2L, 3L, 4L);
public void testGetAvailableBeIds(@Mocked SystemInfoService infoService,
@Mocked Backend myBackend2,
@Mocked Backend myBackend3,
@Mocked Backend myBackend4,
@Mocked Backend myBackend5) {
List<Long> clusterBackendIds = Lists.newArrayList(1L, 2L, 3L, 4L, 5L);
new Expectations(){
{
infoService.getClusterBackendIds("cluster1", true);
result = clusterAliveBackendIds;
infoService.getClusterBackendIds("cluster1", false);
result = clusterBackendIds;
minTimes = 0;

infoService.getBackend(1L);
result = null;
minTimes = 0;

// backend2 is available
infoService.getBackend(2L);
result = myBackend2;
minTimes = 0;
myBackend2.isAvailable();
result = true;
minTimes = 0;

// backend3 not available, and dead for a long time
infoService.getBackend(3L);
result = myBackend3;
minTimes = 0;
myBackend3.isAvailable();
result = false;
minTimes = 0;
myBackend3.isAlive();
result = false;
minTimes = 0;
myBackend3.getLastUpdateMs();
result = System.currentTimeMillis() - Config.tablet_repair_delay_factor_second * 1000 * 20;
minTimes = 0;

// backend4 available, not alive but dead for a short time
infoService.getBackend(4L);
result = myBackend4;
minTimes = 0;
myBackend4.isAvailable();
result = false;
minTimes = 0;
myBackend4.isAlive();
result = false;
minTimes = 0;
myBackend4.getLastUpdateMs();
result = System.currentTimeMillis();
minTimes = 0;

// backend5 not available, and in decommission
infoService.getBackend(5L);
result = myBackend5;
minTimes = 0;
myBackend5.isAvailable();
result = false;
minTimes = 0;
myBackend5.isAlive();
result = true;
minTimes = 0;
myBackend5.isDecommissioned();
result = true;
minTimes = 0;
}
};

Set<Long> unavailableBeIds = Sets.newHashSet(4L, 5L, 6L);
List<Long> availableBeIds = Deencapsulation.invoke(balancer, "getAvailableBeIdsInGroup","cluster1", infoService, unavailableBeIds);
Assert.assertArrayEquals(new long[]{1L, 2L, 3L}, availableBeIds.stream().mapToLong(i->i).sorted().toArray());
List<Long> availableBeIds = Deencapsulation.invoke(balancer, "getAvailableBeIds","cluster1", infoService);
Assert.assertArrayEquals(new long[]{2L, 4L}, availableBeIds.stream().mapToLong(i->i).sorted().toArray());
}
}