diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorMetrics.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorMetrics.java index 1e00f6e0b4..d7d19b8993 100644 --- a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorMetrics.java +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorMetrics.java @@ -35,6 +35,7 @@ public class CoordinatorMetrics { private static final String RUNNING_APP_NUM = "running_app_num"; private static final String TOTAL_APP_NUM = "total_app_num"; private static final String EXCLUDE_SERVER_NUM = "exclude_server_num"; + private static final String UNHEALTHY_SERVER_NUM = "unhealthy_server_num"; private static final String TOTAL_ACCESS_REQUEST = "total_access_request"; private static final String TOTAL_CANDIDATES_DENIED_REQUEST = "total_candidates_denied_request"; private static final String TOTAL_LOAD_DENIED_REQUEST = "total_load_denied_request"; @@ -42,6 +43,7 @@ public class CoordinatorMetrics { static Gauge gaugeTotalServerNum; static Gauge gaugeExcludeServerNum; + static Gauge gaugeUnhealthyServerNum; static Gauge gaugeRunningAppNum; static Counter counterTotalAppNum; static Counter counterTotalAccessRequest; @@ -95,6 +97,7 @@ public static void updateDynamicGaugeForRemoteStorage(String storageHost, double private static void setUpMetrics() { gaugeTotalServerNum = metricsManager.addGauge(TOTAL_SERVER_NUM); gaugeExcludeServerNum = metricsManager.addGauge(EXCLUDE_SERVER_NUM); + gaugeUnhealthyServerNum = metricsManager.addGauge(UNHEALTHY_SERVER_NUM); gaugeRunningAppNum = metricsManager.addGauge(RUNNING_APP_NUM); counterTotalAppNum = metricsManager.addCounter(TOTAL_APP_NUM); counterTotalAccessRequest = metricsManager.addCounter(TOTAL_ACCESS_REQUEST); diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java index 83b29c8915..8f81b396e5 100644 --- a/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java @@ -85,10 +85,14 @@ void nodesCheck() { try { long timestamp = System.currentTimeMillis(); Set deleteIds = Sets.newHashSet(); + Set unhealthyNode = Sets.newHashSet(); for (ServerNode sn : servers.values()) { if (timestamp - sn.getTimestamp() > heartbeatTimeout) { LOG.warn("Heartbeat timeout detect, " + sn + " will be removed from node list."); deleteIds.add(sn.getId()); + } else if (!sn.isHealthy()) { + LOG.warn("Found server {} was unhealthy, will not assign it.", sn); + unhealthyNode.add(sn.getId()); } } for (String serverId : deleteIds) { @@ -100,6 +104,7 @@ void nodesCheck() { } } + CoordinatorMetrics.gaugeUnhealthyServerNum.set(unhealthyNode.size()); CoordinatorMetrics.gaugeTotalServerNum.set(servers.size()); } catch (Exception e) { LOG.warn("Error happened in nodesCheck", e); diff --git a/coordinator/src/test/java/org/apache/uniffle/coordinator/CoordinatorMetricsTest.java b/coordinator/src/test/java/org/apache/uniffle/coordinator/CoordinatorMetricsTest.java index 0e70092b5f..872d713ca1 100644 --- a/coordinator/src/test/java/org/apache/uniffle/coordinator/CoordinatorMetricsTest.java +++ b/coordinator/src/test/java/org/apache/uniffle/coordinator/CoordinatorMetricsTest.java @@ -87,7 +87,7 @@ public void testCoordinatorMetrics() throws Exception { ObjectMapper mapper = new ObjectMapper(); JsonNode actualObj = mapper.readTree(content); assertEquals(2, actualObj.size()); - assertEquals(8, actualObj.get("metrics").size()); + assertEquals(9, actualObj.get("metrics").size()); } @Test diff --git a/coordinator/src/test/java/org/apache/uniffle/coordinator/SimpleClusterManagerTest.java b/coordinator/src/test/java/org/apache/uniffle/coordinator/SimpleClusterManagerTest.java index 5abb4e9d20..e7894f1d2c 100644 --- a/coordinator/src/test/java/org/apache/uniffle/coordinator/SimpleClusterManagerTest.java +++ b/coordinator/src/test/java/org/apache/uniffle/coordinator/SimpleClusterManagerTest.java @@ -105,6 +105,39 @@ public void getServerListTest() throws IOException { clusterManager.close(); } + @Test + public void testGetCorrectServerNodesWhenOneNodeRemovedAndUnhealthyNodeFound() throws IOException { + CoordinatorConf ssc = new CoordinatorConf(); + ssc.setLong(CoordinatorConf.COORDINATOR_HEARTBEAT_TIMEOUT, 30 * 1000L); + SimpleClusterManager clusterManager = new SimpleClusterManager(ssc, new Configuration()); + ServerNode sn1 = new ServerNode("sn1", "ip", 0, 100L, 50L, 20, + 10, testTags, false); + ServerNode sn2 = new ServerNode("sn2", "ip", 0, 100L, 50L, 21, + 10, testTags, true); + ServerNode sn3 = new ServerNode("sn3", "ip", 0, 100L, 50L, 20, + 11, testTags, true); + clusterManager.add(sn1); + clusterManager.add(sn2); + clusterManager.add(sn3); + + List serverNodes = clusterManager.getServerList(testTags); + assertEquals(2, serverNodes.size()); + assertEquals(0, CoordinatorMetrics.gaugeUnhealthyServerNum.get()); + clusterManager.nodesCheck(); + + List serverList = clusterManager.getServerList(testTags); + Assertions.assertEquals(2, serverList.size()); + assertEquals(1, CoordinatorMetrics.gaugeUnhealthyServerNum.get()); + + sn3.setTimestamp(System.currentTimeMillis() - 60 * 1000L); + clusterManager.nodesCheck(); + + List serverList2 = clusterManager.getServerList(testTags); + Assertions.assertEquals(1, serverList2.size()); + assertEquals(1, CoordinatorMetrics.gaugeUnhealthyServerNum.get()); + clusterManager.close(); + } + @Test public void heartbeatTimeoutTest() throws Exception { CoordinatorConf ssc = new CoordinatorConf();