Skip to content

Commit

Permalink
aggregate instance pull
Browse files Browse the repository at this point in the history
  • Loading branch information
yifuzhou committed Sep 18, 2024
1 parent 6381b1f commit 9434d27
Show file tree
Hide file tree
Showing 23 changed files with 589 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,7 @@
import org.slf4j.LoggerFactory;

import java.net.InetSocketAddress;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;

/**
Expand All @@ -38,6 +35,8 @@ public interface OuterClientService extends Ordered{

void markInstanceDownIfNoModifyFor(ClusterShardHostPort clusterShardHostPort, long noModifySeconds) throws OuterClientException;

void batchMarkInstance(MarkInstanceRequest markInstanceRequest) throws OuterClientException;

boolean clusterMigratePreCheck(String clusterName) throws OuterClientException;

MigrationPublishResult doMigrationPublish(String clusterName, String primaryDcName, List<InetSocketAddress> newMasters) throws OuterClientException;
Expand Down Expand Up @@ -724,6 +723,97 @@ public void setMessage(String message) {
}
}

@JsonIgnoreProperties(ignoreUnknown = true)
class MarkInstanceRequest{
private Set<HostPortDcStatus> hostPortDcStatuses;
private String clusterName;
private String activeDc;

public MarkInstanceRequest() {
}

public MarkInstanceRequest(Set<HostPortDcStatus> hostPortDcStatuses, String clusterName, String activeDc) {
this.hostPortDcStatuses = hostPortDcStatuses;
this.clusterName = clusterName;
this.activeDc = activeDc;
}

public Set<HostPortDcStatus> getHostPortDcStatuses() {
return hostPortDcStatuses;
}

public void setHostPortDcStatuses(Set<HostPortDcStatus> hostPortDcStatuses) {
this.hostPortDcStatuses = hostPortDcStatuses;
}

public String getClusterName() {
return clusterName;
}

public void setClusterName(String clusterName) {
this.clusterName = clusterName;
}

public String getActiveDc() {
return activeDc;
}

public void setActiveDc(String activeDc) {
this.activeDc = activeDc;
}
}

@JsonIgnoreProperties(ignoreUnknown = true)
class HostPortDcStatus{

private String host;
private int port;
private String dc;
private boolean canRead;

public HostPortDcStatus() {
}

public HostPortDcStatus(String host, int port, String dc, boolean canRead) {
this.host = host;
this.port = port;
this.dc = dc;
this.canRead = canRead;
}

public String getHost() {
return host;
}

public int getPort() {
return port;
}

public boolean isCanRead() {
return canRead;
}

public void setHost(String host) {
this.host = host;
}

public void setPort(int port) {
this.port = port;
}

public String getDc() {
return dc;
}

public void setDc(String dc) {
this.dc = dc;
}

public void setCanRead(boolean canRead) {
this.canRead = canRead;
}
}

enum ClusterType {
SINGEL_DC(0){
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,11 @@ public void markInstanceDownIfNoModifyFor(ClusterShardHostPort clusterShardHostP

}

@Override
public void batchMarkInstance(MarkInstanceRequest markInstanceRequest) throws OuterClientException {

}

@Override
public OuterClientDataResp<List<ClusterExcludedIdcInfo>> getAllExcludedIdcs() throws Exception {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,7 @@
import com.google.common.collect.Lists;

import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;

/**
Expand Down Expand Up @@ -119,6 +116,14 @@ public void markInstanceDownIfNoModifyFor(ClusterShardHostPort clusterShardHostP
instanceStatus.put(clusterShardHostPort.getHostPort(), false);
}

@Override
public void batchMarkInstance(MarkInstanceRequest markInstanceRequest) throws OuterClientException {
logger.info("[batchMarkInstance]{}", markInstanceRequest);
for (HostPortDcStatus hostPortDcStatus : markInstanceRequest.getHostPortDcStatuses()) {
instanceStatus.put(new HostPort(hostPortDcStatus.getHost(), hostPortDcStatus.getPort()), hostPortDcStatus.isCanRead());
}
}

@Override
public OuterClientDataResp<List<ClusterExcludedIdcInfo>> getAllExcludedIdcs() throws Exception {
OuterClientDataResp<List<ClusterExcludedIdcInfo>> resp = new OuterClientDataResp<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.HealthStatusDesc;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

/**
* @author lishanglin
Expand All @@ -23,4 +25,6 @@ class AllInstanceHealthStatus extends HashMap<HostPort, HealthStatusDesc> {}

Map<HostPort, HealthStatusDesc> getAllInstanceCrossRegionHealthStatus();

Map<HostPort, HealthStatusDesc> getAllClusterInstanceHealthStatus(Set<HostPort> hostPorts);

}
Original file line number Diff line number Diff line change
Expand Up @@ -108,4 +108,8 @@ public interface CheckerConfig {

int getKeeperCheckerIntervalMilli();

int getInstancePullIntervalSeconds();

int getInstancePullRandomSeconds();

}
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,10 @@ public class CheckConfigBean extends AbstractConfigBean {

public static final String KEY_KEEPER_CHECKER_INTERVAL = "keeper.checker.interval";

public static final String KEY_CHECKER_INSTANCE_PULL_INTERVAL = "checker.instance.pull.interval";

public static final String KEY_CHECKER_INSTANCE_PULL_RANDOM = "checker.instance.pull.random";

private FoundationService foundationService;

@Autowired
Expand Down Expand Up @@ -342,6 +346,14 @@ public int getKeeperCheckerIntervalMilli() {
return getIntProperty(KEY_KEEPER_CHECKER_INTERVAL, 60 * 1000);
}

public int getInstancePullIntervalSeconds() {
return getIntProperty(KEY_CHECKER_INSTANCE_PULL_INTERVAL, 5);
}

public int getInstancePullRandomSeconds() {
return getIntProperty(KEY_CHECKER_INSTANCE_PULL_RANDOM, 5);
}

public int getStableResetAfterRounds() {
return getIntProperty(KEY_CHECKER_STABLE_RESET_AFTER_ROUNDS, 30);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.ctrip.xpipe.redis.checker.controller;

import com.ctrip.xpipe.api.codec.Codec;
import com.ctrip.xpipe.api.foundation.FoundationService;
import com.ctrip.xpipe.endpoint.HostPort;
import com.ctrip.xpipe.redis.checker.RedisInfoManager;
import com.ctrip.xpipe.redis.checker.controller.result.ActionContextRetMessage;
Expand All @@ -15,14 +16,13 @@
import com.ctrip.xpipe.redis.checker.healthcheck.stability.StabilityHolder;
import com.ctrip.xpipe.redis.checker.model.DcClusterShard;
import com.ctrip.xpipe.redis.checker.model.DcClusterShardKeeper;
import com.ctrip.xpipe.redis.core.meta.MetaCache;
import com.google.common.collect.Lists;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.web.bind.annotation.*;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.concurrent.ConcurrentMap;

/**
Expand Down Expand Up @@ -55,6 +55,11 @@ public class CheckerHealthController {
@Autowired
private StabilityHolder siteStability;

@Autowired
private MetaCache metaCache;

private static final String currentDc = FoundationService.DEFAULT.getDataCenter();;

@RequestMapping(value = "/health/{ip}/{port}", method = RequestMethod.GET)
public HEALTH_STATE getHealthState(@PathVariable String ip, @PathVariable int port) {
if (siteStability.isSiteStable()) return defaultDelayPingActionCollector.getState(new HostPort(ip, port));
Expand Down Expand Up @@ -149,6 +154,20 @@ public Map<HostPort, HealthStatusDesc> getAllCrossRegionHealthStatusDesc() {
else return Collections.emptyMap();
}

@RequestMapping(value = "/health/check/instances/status", method = RequestMethod.POST)
public Map<HostPort, HEALTH_STATE> getHealthCheckInstanceCluster(@RequestBody List<HostPort> hostPorts) {
if (hostPorts == null || hostPorts.isEmpty()) return Collections.emptyMap();
Map<HostPort, HEALTH_STATE> result = new HashMap<>();
for (HostPort hostPort : hostPorts) {
if (Objects.equals(currentDc, metaCache.getDc(hostPort)) && metaCache.isCrossRegion(metaCache.getActiveDc(hostPort), currentDc)) {
result.put(hostPort, getCrossRegionHealthState(hostPort.getHost(), hostPort.getPort()));
} else {
result.put(hostPort, getHealthState(hostPort.getHost(), hostPort.getPort()));
}
}
return result;
}

@GetMapping("/health/keeper/status/all")
public ConcurrentMap<String, Map<DcClusterShardKeeper, Long>> getAllKeeperFlows() {
return keeperFlowCollector.getHostPort2InputFlow();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction;

import com.ctrip.xpipe.api.migration.OuterClientException;
import com.ctrip.xpipe.api.migration.OuterClientService.*;
import com.ctrip.xpipe.endpoint.HostPort;

import java.util.Set;

public interface AggregatorPullService {

Set<HostPortDcStatus> getNeedAdjustInstances(Set<HostPort> instances) throws Exception;

void doMarkInstances(String clusterName, Set<HostPortDcStatus> instances) throws OuterClientException;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction;

import com.ctrip.xpipe.api.migration.OuterClientException;
import com.ctrip.xpipe.api.migration.OuterClientService;
import com.ctrip.xpipe.api.migration.OuterClientService.*;
import com.ctrip.xpipe.endpoint.ClusterShardHostPort;
import com.ctrip.xpipe.endpoint.HostPort;
import com.ctrip.xpipe.redis.checker.CheckerService;
import com.ctrip.xpipe.redis.checker.RemoteCheckerManager;
import com.ctrip.xpipe.redis.checker.alert.ALERT_TYPE;
import com.ctrip.xpipe.redis.checker.alert.AlertManager;
import com.ctrip.xpipe.redis.checker.config.CheckerConfig;
import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.compensator.data.XPipeInstanceHealthHolder;
import com.ctrip.xpipe.redis.core.meta.MetaCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.*;
import java.util.concurrent.TimeUnit;


@Component
public class DefaultAggregatorPullService implements AggregatorPullService{

@Autowired
private RemoteCheckerManager remoteCheckerManager;
@Autowired
private AlertManager alertManager;
@Autowired
private CheckerConfig checkerConfig;
@Autowired
private MetaCache metaCache;
private final OuterClientService outerClientService = OuterClientService.DEFAULT;
private static final Logger logger = LoggerFactory.getLogger(DefaultAggregatorPullService.class);

@Override
public Set<HostPortDcStatus> getNeedAdjustInstances(Set<HostPort> instances) throws Exception{
Set<HostPortDcStatus> instanceNeedAdjust = new HashSet<>();
Map<HostPort, Boolean> xpipeAllHealthStatus = getXpipeAllHealthStatus(instances);
logger.info("[DefaultAggregatorPullService][getNeedAdjustInstances]xpipeAllHealthStatus:{}", xpipeAllHealthStatus);
Map<HostPort, Boolean> outerClientAllHealthStatus = getOuterClientAllHealthStatus(instances);
logger.info("[DefaultAggregatorPullService][getNeedAdjustInstances]outerClientAllHealthStatus:{}", outerClientAllHealthStatus);
for (Map.Entry<HostPort, Boolean> entry : xpipeAllHealthStatus.entrySet()) {
if (!outerClientAllHealthStatus.containsKey(entry.getKey()) || entry.getValue() != outerClientAllHealthStatus.get(entry.getKey())) {
instanceNeedAdjust.add(new HostPortDcStatus(entry.getKey().getHost(), entry.getKey().getPort(), metaCache.getDc(new HostPort(entry.getKey().getHost(), entry.getKey().getPort())), entry.getValue()));
}
}
return instanceNeedAdjust;
}

@Override
public void doMarkInstances(String clusterName, Set<HostPortDcStatus> instances) throws OuterClientException {
alertMarkInstance(clusterName, instances);
MarkInstanceRequest markInstanceRequest = new MarkInstanceRequest(instances, clusterName, metaCache.getActiveDc(clusterName));
outerClientService.batchMarkInstance(markInstanceRequest);
}

public Map<HostPort, Boolean> getXpipeAllHealthStatus(Set<HostPort> instances) {
XPipeInstanceHealthHolder xPipeInstanceHealthHolder = new XPipeInstanceHealthHolder();
for (CheckerService checkerService : remoteCheckerManager.getAllCheckerServices()) {
xPipeInstanceHealthHolder.add(checkerService.getAllClusterInstanceHealthStatus(instances));
}
return xPipeInstanceHealthHolder.getAllHealthStatus(checkerConfig.getQuorum());
}

public Map<HostPort, Boolean> getOuterClientAllHealthStatus(Set<HostPort> hostPorts) throws Exception {
Map<ClusterShardHostPort, Boolean> instancesUp = new HashMap<>();
for (HostPort hostPort : hostPorts) {
ClusterShardHostPort clusterShardHostPort = new ClusterShardHostPort(null, null, hostPort);
instancesUp.put(clusterShardHostPort, outerClientService.isInstanceUp(clusterShardHostPort));
};
Map<HostPort, Boolean> result = new HashMap<>();
for (Map.Entry<ClusterShardHostPort, Boolean> entry : instancesUp.entrySet()) {
result.put(entry.getKey().getHostPort(), entry.getValue());
}
return result;
}

public void alertMarkInstance(String clusterName, Set<HostPortDcStatus> instances) {
if (!instances.isEmpty()) {
for (HostPortDcStatus instance : instances) {
if (instance.isCanRead()) {
alertManager.alert(clusterName, null,
new HostPort(instance.getHost(), instance.getPort()), ALERT_TYPE.MARK_INSTANCE_UP, "Mark Instance Up");
} else {
alertManager.alert(clusterName, null,
new HostPort(instance.getHost(), instance.getPort()), ALERT_TYPE.MARK_INSTANCE_DOWN, "Mark Instance Down");
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,18 @@ public UpDownInstances aggregate(Map<String, Set<HostPort>> interested, int quor
return new UpDownInstances(healthyInstances, unhealthyInstances);
}

public Map<HostPort, Boolean> getAllHealthStatus(int quorum) {
Set<HostPort> allHostPorts = new HashSet<>();
for (Map<HostPort, HealthStatusDesc> hostPortHealthStatusDescMap : healthCheckResult) {
allHostPorts.addAll(hostPortHealthStatusDescMap.keySet());
}
Map<HostPort, Boolean> result = new HashMap<>();
for (HostPort hostPort : allHostPorts) {
result.put(hostPort, aggregate(hostPort, quorum));
}
return result;
}

public List<HealthStatusDesc> getHealthStatus(HostPort hostPort) {
List<HealthStatusDesc> statusList = new ArrayList<>();
healthCheckResult.forEach(result -> {
Expand Down
Loading

0 comments on commit 9434d27

Please sign in to comment.