Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,15 @@ public class CoordinatorMetrics {
private static final String RUNNING_APP_NUM = "running_app_num";
private static final String TOTAL_APP_NUM = "total_app_num";
private static final String EXCLUDE_SERVER_NUM = "exclude_server_num";
private static final String UNHEALTHY_SERVER_NUM = "unhealthy_server_num";
private static final String TOTAL_ACCESS_REQUEST = "total_access_request";
private static final String TOTAL_CANDIDATES_DENIED_REQUEST = "total_candidates_denied_request";
private static final String TOTAL_LOAD_DENIED_REQUEST = "total_load_denied_request";
public static final String REMOTE_STORAGE_IN_USED_PREFIX = "remote_storage_in_used_";

static Gauge gaugeTotalServerNum;
static Gauge gaugeExcludeServerNum;
static Gauge gaugeUnhealthyServerNum;
static Gauge gaugeRunningAppNum;
static Counter counterTotalAppNum;
static Counter counterTotalAccessRequest;
Expand Down Expand Up @@ -95,6 +97,7 @@ public static void updateDynamicGaugeForRemoteStorage(String storageHost, double
private static void setUpMetrics() {
gaugeTotalServerNum = metricsManager.addGauge(TOTAL_SERVER_NUM);
gaugeExcludeServerNum = metricsManager.addGauge(EXCLUDE_SERVER_NUM);
gaugeUnhealthyServerNum = metricsManager.addGauge(UNHEALTHY_SERVER_NUM);
gaugeRunningAppNum = metricsManager.addGauge(RUNNING_APP_NUM);
counterTotalAppNum = metricsManager.addCounter(TOTAL_APP_NUM);
counterTotalAccessRequest = metricsManager.addCounter(TOTAL_ACCESS_REQUEST);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,14 @@ void nodesCheck() {
try {
long timestamp = System.currentTimeMillis();
Set<String> deleteIds = Sets.newHashSet();
Set<String> unhealthyNode = Sets.newHashSet();
for (ServerNode sn : servers.values()) {
if (timestamp - sn.getTimestamp() > heartbeatTimeout) {
LOG.warn("Heartbeat timeout detect, " + sn + " will be removed from node list.");
deleteIds.add(sn.getId());
} else if (!sn.isHealthy()) {
LOG.warn("Found server {} was unhealthy, will not assign it.", sn);
unhealthyNode.add(sn.getId());
}
}
for (String serverId : deleteIds) {
Expand All @@ -100,6 +104,7 @@ void nodesCheck() {
}
}

CoordinatorMetrics.gaugeUnhealthyServerNum.set(unhealthyNode.size());
CoordinatorMetrics.gaugeTotalServerNum.set(servers.size());
} catch (Exception e) {
LOG.warn("Error happened in nodesCheck", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public void testCoordinatorMetrics() throws Exception {
ObjectMapper mapper = new ObjectMapper();
JsonNode actualObj = mapper.readTree(content);
assertEquals(2, actualObj.size());
assertEquals(8, actualObj.get("metrics").size());
assertEquals(9, actualObj.get("metrics").size());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,39 @@ public void getServerListTest() throws IOException {
clusterManager.close();
}

@Test
public void testGetCorrectServerNodesWhenOneNodeRemovedAndUnhealthyNodeFound() throws IOException {
CoordinatorConf ssc = new CoordinatorConf();
ssc.setLong(CoordinatorConf.COORDINATOR_HEARTBEAT_TIMEOUT, 30 * 1000L);
SimpleClusterManager clusterManager = new SimpleClusterManager(ssc, new Configuration());
ServerNode sn1 = new ServerNode("sn1", "ip", 0, 100L, 50L, 20,
10, testTags, false);
ServerNode sn2 = new ServerNode("sn2", "ip", 0, 100L, 50L, 21,
10, testTags, true);
ServerNode sn3 = new ServerNode("sn3", "ip", 0, 100L, 50L, 20,
11, testTags, true);
clusterManager.add(sn1);
clusterManager.add(sn2);
clusterManager.add(sn3);

List<ServerNode> serverNodes = clusterManager.getServerList(testTags);
assertEquals(2, serverNodes.size());
assertEquals(0, CoordinatorMetrics.gaugeUnhealthyServerNum.get());
clusterManager.nodesCheck();

List<ServerNode> serverList = clusterManager.getServerList(testTags);
Assertions.assertEquals(2, serverList.size());
assertEquals(1, CoordinatorMetrics.gaugeUnhealthyServerNum.get());

sn3.setTimestamp(System.currentTimeMillis() - 60 * 1000L);
clusterManager.nodesCheck();

List<ServerNode> serverList2 = clusterManager.getServerList(testTags);
Assertions.assertEquals(1, serverList2.size());
assertEquals(1, CoordinatorMetrics.gaugeUnhealthyServerNum.get());
clusterManager.close();
}

@Test
public void heartbeatTimeoutTest() throws Exception {
CoordinatorConf ssc = new CoordinatorConf();
Expand Down