Skip to content

Commit

Permalink
Merge pull request #917 from wangqifan/bugfix/beacon-checker-supportdc
Browse files Browse the repository at this point in the history
fix bug for beacon checker
  • Loading branch information
LanternLee authored Dec 4, 2024
2 parents 61194bb + 689ce3f commit ae59b36
Show file tree
Hide file tree
Showing 2 changed files with 227 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.ctrip.xpipe.redis.core.meta.MetaCache;
import com.ctrip.xpipe.utils.ServicesUtil;
import com.ctrip.xpipe.utils.StringUtil;
import com.ctrip.xpipe.utils.VisibleForTesting;

import java.util.*;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -42,6 +43,11 @@ protected void doExecute() throws Throwable {
}
}

@VisibleForTesting
public void setMetricProxy(MetricProxy metricProxy) {
this.metricProxy = metricProxy;
}

private void doJob() {
Map<String, Set<String>> allClusters = new HashMap<>();
clustersByBeaconSystemOrg.forEach(((beaconSystem, clustersByOrg) -> {
Expand All @@ -59,16 +65,21 @@ private void doJob() {
Set<String> inconsistencyClusters = new HashSet<>();
Set<String> notfoundCluster = new HashSet<>();

Set<String> supportDcs = getSupportDcs();

Map<String, DcMeta> dcs = metaCache.getXpipeMeta().getDcs();
Set<String> supportZones = config.getBeaconSupportZones();
for (DcMeta dcMeta : dcs.values()) {
if(!supportZones.isEmpty() && supportZones.stream().noneMatch(zone -> StringUtil.trimEquals(dcMeta.getZone(), zone, true))) {
if(!supportDcs.contains(dcMeta.getId().toUpperCase())) {
continue;
}
Map<String, ClusterMeta> clusterMetaMap = dcMeta.getClusters();
for (ClusterMeta cluster : clusterMetaMap.values()) {
String clusterName = cluster.getId();
String activeDc = cluster.getActiveDc();
if(StringUtil.trimEquals(ClusterType.ONE_WAY.toString(), cluster.getType(), true)) {
if(!StringUtil.trimEquals(dcMeta.getId(), activeDc, true)) {
continue;
}
String dc = cluster.getActiveDc().toUpperCase();
if(!allClusters.containsKey(clusterName)) {
notfoundCluster.add(clusterName);
Expand All @@ -87,6 +98,7 @@ private void doJob() {
Set<String> beaconDcSet = getBeaconClusterDcSet(allClusters, clusterName);
Set<String> loaclDcSet = Arrays.stream(cluster.getDcs().split(","))
.map(String::toUpperCase)
.filter(dc -> supportDcs.contains(dc))
.collect(Collectors.toSet());
if(!beaconDcSet.equals(loaclDcSet)) {
inconsistencyClusters.add(clusterName);
Expand All @@ -105,6 +117,10 @@ private void doJob() {
MetricData metricData = getMetricData(clusterName, "NOTFOUND");
sendMetricData(metricData);
}
if(inconsistencyClusters.isEmpty() && notfoundCluster.isEmpty()) {
MetricData metricData = getMetricData("", "CONSISTENT");
sendMetricData(metricData);
}

}

Expand Down Expand Up @@ -132,6 +148,18 @@ private void sendMetricData(MetricData metricData) {
}
}

private Set<String> getSupportDcs() {
Set<String> dcs = metaCache.getXpipeMeta().getDcs().keySet();
Set<String> supportZones = config.getBeaconSupportZones();
Set<String> supportDcs = new HashSet<>();
for(String dc : dcs) {
if(supportZones.stream().anyMatch(zone -> metaCache.isDcInRegion(dc, zone))) {
supportDcs.add(dc.toUpperCase());
}
}
return supportDcs;
}

@Override
protected void doReset() {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
package com.ctrip.xpipe.redis.console.healthcheck.nonredis.beacon;

import com.ctrip.xpipe.api.migration.auto.MonitorService;
import com.ctrip.xpipe.cluster.ClusterType;
import com.ctrip.xpipe.metric.MetricData;
import com.ctrip.xpipe.metric.MetricProxy;
import com.ctrip.xpipe.redis.console.AbstractConsoleTest;
import com.ctrip.xpipe.redis.console.config.ConsoleConfig;
import com.ctrip.xpipe.redis.console.config.model.BeaconOrgRoute;
import com.ctrip.xpipe.redis.console.migration.auto.DefaultMonitorManager;
import com.ctrip.xpipe.redis.console.resources.DefaultMetaCache;
import com.ctrip.xpipe.redis.core.config.ConsoleCommonConfig;
import com.ctrip.xpipe.redis.core.entity.ClusterMeta;
import com.ctrip.xpipe.redis.core.entity.DcMeta;
import com.ctrip.xpipe.redis.core.entity.XpipeMeta;
import com.ctrip.xpipe.redis.core.meta.MetaCache;
import com.ctrip.xpipe.redis.core.meta.XpipeMetaManager;
import com.ctrip.xpipe.redis.core.meta.impl.DefaultXpipeMetaManager;
import com.ctrip.xpipe.tuple.Pair;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentMatcher;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;

import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

@RunWith(MockitoJUnitRunner.class)
public class BeaconConsistencyCheckTest extends AbstractConsoleTest {

@Mock
ConsoleCommonConfig config;
@Mock
ConsoleConfig consoleConfig;
@Mock
MonitorService monitorService;
@Mock
MetricProxy metricProxy;

private BeaconConsistencyCheckJob consistencyCheckJob;

private Map<Long, List<MonitorService>> buildServices() {
HashMap result = new HashMap<>();
result.put(1L, Collections.singletonList(monitorService));
return result;
}

private ClusterMeta buildClusterMeta() {
ClusterMeta clusterMeta = new ClusterMeta("oneway_test");
clusterMeta.setType(ClusterType.ONE_WAY.toString());
clusterMeta.setActiveDc("AWS");
clusterMeta.setDcs("AWS,PTJQ");
clusterMeta.setOrgId(1);
return clusterMeta;
}

private ClusterMeta buildBiClusterMeta() {
ClusterMeta clusterMeta = new ClusterMeta("bi_test");
clusterMeta.setType(ClusterType.BI_DIRECTION.toString());
clusterMeta.setDcs("AWS,PTJQ");
clusterMeta.setOrgId(1);
return clusterMeta;
}

private ClusterMeta buildOneWaySHA() {
ClusterMeta clusterMeta = new ClusterMeta("oneway_sha");
clusterMeta.setType(ClusterType.ONE_WAY.toString());
clusterMeta.setActiveDc("PTJQ");
clusterMeta.setDcs("AWS,PTJQ");
clusterMeta.setOrgId(1);
return clusterMeta;
}

private MetaCache buildMetaCache() {
ClusterMeta oneWay = buildClusterMeta();
ClusterMeta bi = buildBiClusterMeta();
ClusterMeta oneWaySHA = buildOneWaySHA();

XpipeMeta currentMeta = new XpipeMeta();
DcMeta jqMeta = new DcMeta("PTJQ");
jqMeta.setZone("SHA");
jqMeta.addCluster(oneWay);
jqMeta.addCluster(bi);
jqMeta.addCluster(oneWaySHA);

DcMeta awsMeta = new DcMeta("AWS");
awsMeta.setZone("AWS");
awsMeta.addCluster(oneWay);
awsMeta.addCluster(bi);
awsMeta.addCluster(oneWaySHA);

currentMeta.addDc(jqMeta)
.addDc(awsMeta);
DefaultMetaCache metaCache = new DefaultMetaCache();
Pair<XpipeMeta, XpipeMetaManager> meta = new Pair<>(currentMeta, new DefaultXpipeMetaManager(currentMeta));
metaCache.setMeta(meta);
return metaCache;
}

@Before
public void beforeBeaconConsistencyCheckTest() {

Mockito.when(config.getBeaconSupportZones()).thenReturn(Stream.of("SHA").collect(Collectors.toSet()));
Mockito.when(consoleConfig.getServerMode()).thenReturn("CONSOLE");
Mockito.when(consoleConfig.getClusterHealthCheckInterval()).thenReturn(10000);
List<BeaconOrgRoute> routes = new ArrayList<>();
routes.add(new BeaconOrgRoute(1L, new ArrayList<>(), 1));
Mockito.when(consoleConfig.getBeaconOrgRoutes()).thenReturn(routes);

DefaultMonitorManager defaultMonitorManager = new DefaultMonitorManager(buildMetaCache(), consoleConfig, config);

consistencyCheckJob = new BeaconConsistencyCheckJob(defaultMonitorManager.clustersByBeaconSystemOrg(),
buildServices(),
buildMetaCache(),
config);

}

@Test
public void testCheck() throws Throwable {
consistencyCheckJob.setMetricProxy(metricProxy);
HashMap<String, Set<String>> oneWays = new HashMap<>();
oneWays.put("oneway_sha", Collections.singleton("PTJQ"));
Mockito.when(monitorService.getAllClusterWithDc("xpipe")).thenReturn(oneWays);
HashMap<String, Set<String>> allCluster = new HashMap<>();
allCluster.put("bi_test", Collections.singletonList("PTJQ").stream().collect(Collectors.toSet()));

Mockito.when(monitorService.getAllClusterWithDc("xpipe-bi")).thenReturn(allCluster);
consistencyCheckJob.doExecute();

verify(metricProxy, times(1))
.writeBinMultiDataPoint(
argThat(new ArgumentMatcher<MetricData>() {
@Override
public boolean matches(MetricData argument) {
return argument.getTags().get("consistency").equals("CONSISTENT");
}
}));

}

@Test
public void testCheckInConsistency() throws Throwable {
consistencyCheckJob.setMetricProxy(metricProxy);
HashMap<String, Set<String>> oneWays = new HashMap<>();
oneWays.put("oneway_sha", Collections.singleton("PTJQ"));
Mockito.when(monitorService.getAllClusterWithDc("xpipe")).thenReturn(oneWays);
HashMap<String, Set<String>> allCluster = new HashMap<>();
allCluster.put("bi_test", Arrays.asList("PTJQ", "AWS").stream().collect(Collectors.toSet()));

Mockito.when(monitorService.getAllClusterWithDc("xpipe-bi")).thenReturn(allCluster);
consistencyCheckJob.doExecute();

verify(metricProxy, times(1))
.writeBinMultiDataPoint(
argThat(new ArgumentMatcher<MetricData>() {
@Override
public boolean matches(MetricData argument) {
return argument.getTags().get("consistency").equals("INCONSISTENT")
&& argument.getClusterName().equals("bi_test");
}
}));
}

@Test
public void testCheckInNotFound() throws Throwable {
consistencyCheckJob.setMetricProxy(metricProxy);
HashMap<String, Set<String>> oneWays = new HashMap<>();
Mockito.when(monitorService.getAllClusterWithDc("xpipe")).thenReturn(oneWays);
HashMap<String, Set<String>> allCluster = new HashMap<>();
allCluster.put("bi_test", Collections.singletonList("PTJQ").stream().collect(Collectors.toSet()));

Mockito.when(monitorService.getAllClusterWithDc("xpipe-bi")).thenReturn(allCluster);
consistencyCheckJob.doExecute();

verify(metricProxy, times(1))
.writeBinMultiDataPoint(
argThat(new ArgumentMatcher<MetricData>() {
@Override
public boolean matches(MetricData argument) {
return argument.getTags().get("consistency").equals("NOTFOUND")
&& argument.getClusterName().equals("oneway_sha");
}
}));

}


}

0 comments on commit ae59b36

Please sign in to comment.