Skip to content

Commit

Permalink
remove offline server ip
Browse files Browse the repository at this point in the history
  • Loading branch information
songxiaosheng committed Sep 1, 2023
1 parent 9afe466 commit 0aaec92
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,45 +18,49 @@
package org.apache.shardingsphere.elasticjob.lite.internal.server;

import com.google.common.base.Strings;
import org.apache.commons.lang.StringUtils;
import org.apache.shardingsphere.elasticjob.lite.internal.instance.InstanceNode;
import org.apache.shardingsphere.elasticjob.lite.internal.schedule.JobRegistry;
import org.apache.shardingsphere.elasticjob.lite.internal.storage.JobNodeStorage;
import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
import org.apache.shardingsphere.elasticjob.infra.concurrent.BlockUtils;

import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

/**
* Server service.
*/
public final class ServerService {

private final String jobName;

private final JobNodeStorage jobNodeStorage;

private final ServerNode serverNode;

public ServerService(final CoordinatorRegistryCenter regCenter, final String jobName) {
this.jobName = jobName;
jobNodeStorage = new JobNodeStorage(regCenter, jobName);
serverNode = new ServerNode(jobName);
}

/**
* Persist online status of job server.
*
*
* @param enabled enable server or not
*/
public void persistOnline(final boolean enabled) {
if (!JobRegistry.getInstance().isShutdown(jobName)) {
jobNodeStorage.fillJobNode(serverNode.getServerNode(JobRegistry.getInstance().getJobInstance(jobName).getServerIp()), enabled ? ServerStatus.ENABLED.name() : ServerStatus.DISABLED.name());
}
}

/**
* Judge has available servers or not.
*
*
* @return has available servers or not
*/
public boolean hasAvailableServers() {
Expand All @@ -68,17 +72,17 @@ public boolean hasAvailableServers() {
}
return false;
}

/**
* Judge is available server or not.
*
*
* @param ip job server IP address
* @return is available server or not
*/
public boolean isAvailableServer(final String ip) {
return isEnableServer(ip) && hasOnlineInstances(ip);
}

private boolean hasOnlineInstances(final String ip) {
for (String each : jobNodeStorage.getJobNodeChildrenKeys(InstanceNode.ROOT)) {
if (each.startsWith(ip)) {
Expand All @@ -87,7 +91,7 @@ private boolean hasOnlineInstances(final String ip) {
}
return false;
}

/**
* Judge is server enabled or not.
*
Expand All @@ -102,4 +106,35 @@ public boolean isEnableServer(final String ip) {
}
return ServerStatus.ENABLED.name().equals(serverStatus);
}

public int removeOfflineServers() {
AtomicInteger affectNums = new AtomicInteger();
List<String> instances = jobNodeStorage.getJobNodeChildrenKeys(InstanceNode.ROOT);
if (instances == null || instances.isEmpty()) {
return affectNums.get();
}
Set<String> instanceIps = instances.stream()
.map(instance -> instance.split("@-@")[0])
.collect(Collectors.toSet());
if (instanceIps == null || instanceIps.isEmpty()) {
return affectNums.get();
}
List<String> serverIps = jobNodeStorage.getJobNodeChildrenKeys(ServerNode.ROOT);
if (serverIps == null || serverIps.isEmpty()) {
return affectNums.get();
}

serverIps.forEach(serverIp -> {
if (instanceIps.contains(serverIp)) {
return;
}
String status = jobNodeStorage.getJobNodeData(serverNode.getServerNode(serverIp));
if (StringUtils.isNotBlank(status)) {
return;
}
jobNodeStorage.removeJobNodeIfExisted(serverNode.getServerNode(serverIp));
affectNums.getAndIncrement();
});
return affectNums.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public void registerStartUpInfo(final boolean enabled) {
if (!reconcileService.isRunning()) {
reconcileService.startAsync();
}
serverService.removeOfflineServers();
}

/**
Expand Down

0 comments on commit 0aaec92

Please sign in to comment.