From affc62b151b64ea8d3a90ec421dfcfb3884ede06 Mon Sep 17 00:00:00 2001 From: Ashutosh Gupta Date: Thu, 11 Aug 2022 02:09:28 +0100 Subject: [PATCH 1/3] YARN-9425. Make initialDelay configurable for FederationStateStoreService#scheduledExecutorService --- .../apache/hadoop/yarn/conf/YarnConfiguration.java | 7 +++++++ .../src/main/resources/yarn-default.xml | 7 +++++++ .../federation/FederationStateStoreService.java | 11 ++++++++++- 3 files changed, 24 insertions(+), 1 deletion(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index d42562cf6140a..852c563d7ed58 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -3920,6 +3920,13 @@ public static boolean isAclEnabled(Configuration conf) { public static final String DEFAULT_FEDERATION_REGISTRY_BASE_KEY = "yarnfederation/"; + public static final String FEDERATION_STATESTORE_HEARTBEAT_INITIAL_DELAY_SECS = + FEDERATION_PREFIX + "state-store.heartbeat.initial-delay-secs"; + + // 30 secs + public static final int + DEFAULT_FEDERATION_STATESTORE_HEARTBEAT_INITIAL_DELAY_SECS = 30; + public static final String FEDERATION_STATESTORE_HEARTBEAT_INTERVAL_SECS = FEDERATION_PREFIX + "state-store.heartbeat-interval-secs"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 407ef74d3d062..59301a9fb69ac 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -3624,6 +3624,13 @@ yarn.federation.enabled false + + + Initial delay for federation state-store heartbeat service. + + yarn.federation.state-store.heartbeat.initial-delay-secs + 30 + Machine list file to be loaded by the FederationSubCluster Resolver diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java index c4dae7d4f7d3f..366bb67ff32ab 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java @@ -86,6 +86,7 @@ public class FederationStateStoreService extends AbstractService private FederationStateStore stateStoreClient = null; private SubClusterId subClusterId; private long heartbeatInterval; + private long heartbeatInitialDelay; private RMContext rmContext; public FederationStateStoreService(RMContext rmContext) { @@ -120,6 +121,14 @@ protected void serviceInit(Configuration conf) throws Exception { heartbeatInterval = YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_HEARTBEAT_INTERVAL_SECS; } + + heartbeatInitialDelay = conf.getLong( + YarnConfiguration.FEDERATION_STATESTORE_HEARTBEAT_INITIAL_DELAY_SECS, + YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_HEARTBEAT_INITIAL_DELAY_SECS); + if (heartbeatInitialDelay <= 0) { + heartbeatInitialDelay = + YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_HEARTBEAT_INITIAL_DELAY_SECS; + } LOG.info("Initialized federation membership service."); super.serviceInit(conf); @@ -196,7 +205,7 @@ private void registerAndInitializeHeartbeat() { scheduledExecutorService = HadoopExecutors.newSingleThreadScheduledExecutor(); scheduledExecutorService.scheduleWithFixedDelay(stateStoreHeartbeat, - heartbeatInterval, heartbeatInterval, TimeUnit.SECONDS); + heartbeatInitialDelay, heartbeatInterval, TimeUnit.SECONDS); LOG.info("Started federation membership heartbeat with interval: {}", heartbeatInterval); } From b07e36739973ddb416365c671021f7df7173a9be Mon Sep 17 00:00:00 2001 From: Ashutosh Gupta Date: Mon, 15 Aug 2022 12:29:22 +0100 Subject: [PATCH 2/3] Added unit test, logs and making configuration seconds bind free --- .../hadoop/yarn/conf/YarnConfiguration.java | 6 ++-- .../src/main/resources/yarn-default.xml | 9 +++-- .../FederationStateStoreService.java | 18 ++++++---- .../TestFederationRMStateStoreService.java | 34 +++++++++++++++++++ 4 files changed, 55 insertions(+), 12 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 852c563d7ed58..2edc61e254206 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -3920,12 +3920,12 @@ public static boolean isAclEnabled(Configuration conf) { public static final String DEFAULT_FEDERATION_REGISTRY_BASE_KEY = "yarnfederation/"; - public static final String FEDERATION_STATESTORE_HEARTBEAT_INITIAL_DELAY_SECS = - FEDERATION_PREFIX + "state-store.heartbeat.initial-delay-secs"; + public static final String FEDERATION_STATESTORE_HEARTBEAT_INITIAL_DELAY = + FEDERATION_PREFIX + "state-store.heartbeat.initial-delay"; // 30 secs public static final int - DEFAULT_FEDERATION_STATESTORE_HEARTBEAT_INITIAL_DELAY_SECS = 30; + DEFAULT_FEDERATION_STATESTORE_HEARTBEAT_INITIAL_DELAY = 30; public static final String FEDERATION_STATESTORE_HEARTBEAT_INTERVAL_SECS = FEDERATION_PREFIX + "state-store.heartbeat-interval-secs"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 59301a9fb69ac..1d4b01c24a5ae 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -3626,10 +3626,13 @@ - Initial delay for federation state-store heartbeat service. + Initial delay for federation state-store heartbeat service. Value is followed by a unit + specifier: ns, us, ms, s, m, h, d for nanoseconds, microseconds, milliseconds, seconds, + minutes, hours, days respectively. Values should provide units, + but seconds are assumed - yarn.federation.state-store.heartbeat.initial-delay-secs - 30 + yarn.federation.state-store.heartbeat.initial-delay + 30s diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java index 366bb67ff32ab..08b0b278543b8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java @@ -117,17 +117,23 @@ protected void serviceInit(Configuration conf) throws Exception { heartbeatInterval = conf.getLong( YarnConfiguration.FEDERATION_STATESTORE_HEARTBEAT_INTERVAL_SECS, YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_HEARTBEAT_INTERVAL_SECS); + if (heartbeatInterval <= 0) { heartbeatInterval = YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_HEARTBEAT_INTERVAL_SECS; } - heartbeatInitialDelay = conf.getLong( - YarnConfiguration.FEDERATION_STATESTORE_HEARTBEAT_INITIAL_DELAY_SECS, - YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_HEARTBEAT_INITIAL_DELAY_SECS); + heartbeatInitialDelay = conf.getTimeDuration( + YarnConfiguration.FEDERATION_STATESTORE_HEARTBEAT_INITIAL_DELAY, + YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_HEARTBEAT_INITIAL_DELAY, + TimeUnit.SECONDS); + if (heartbeatInitialDelay <= 0) { + LOG.warn("{} configured value is wrong, must be at <= 0; using default value of {}", + YarnConfiguration.FEDERATION_STATESTORE_HEARTBEAT_INITIAL_DELAY, + YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_HEARTBEAT_INITIAL_DELAY); heartbeatInitialDelay = - YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_HEARTBEAT_INITIAL_DELAY_SECS; + YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_HEARTBEAT_INITIAL_DELAY; } LOG.info("Initialized federation membership service."); @@ -206,8 +212,8 @@ private void registerAndInitializeHeartbeat() { HadoopExecutors.newSingleThreadScheduledExecutor(); scheduledExecutorService.scheduleWithFixedDelay(stateStoreHeartbeat, heartbeatInitialDelay, heartbeatInterval, TimeUnit.SECONDS); - LOG.info("Started federation membership heartbeat with interval: {}", - heartbeatInterval); + LOG.info("Started federation membership heartbeat with interval: {} and initial delay: {}", + heartbeatInterval, heartbeatInitialDelay); } @VisibleForTesting diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/federation/TestFederationRMStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/federation/TestFederationRMStateStoreService.java index e5e156dcf76c6..e8ebdd5bedde5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/federation/TestFederationRMStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/federation/TestFederationRMStateStoreService.java @@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ha.HAServiceProtocol; +import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; @@ -173,4 +174,37 @@ private String checkSubClusterInfo(SubClusterState state) return response.getCapability(); } + @Test + public void testFederationStateStoreServiceInitialHeartbeatDelay() throws Exception { + conf.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true); + conf.setInt(YarnConfiguration.FEDERATION_STATESTORE_HEARTBEAT_INITIAL_DELAY, 10); + conf.set(YarnConfiguration.RM_CLUSTER_ID, subClusterId.getId()); + + GenericTestUtils.LogCapturer logCapture = + GenericTestUtils.LogCapturer.captureLogs(FederationStateStoreService.LOG); + + final MockRM rm = new MockRM(conf); + + // Initially there should be no entry for the sub-cluster + rm.init(conf); + stateStore = rm.getFederationStateStoreService().getStateStoreClient(); + GetSubClusterInfoResponse response = stateStore.getSubCluster(request); + Assert.assertNull(response); + + // Validate if sub-cluster is registered + rm.start(); + String capability = checkSubClusterInfo(SubClusterState.SC_NEW); + Assert.assertTrue(capability.isEmpty()); + + // Heartbeat to see if sub-cluster transitions to running + FederationStateStoreHeartbeat storeHeartbeat = + rm.getFederationStateStoreService().getStateStoreHeartbeatThread(); + storeHeartbeat.run(); + capability = checkSubClusterInfo(SubClusterState.SC_RUNNING); + checkClusterMetricsInfo(capability, 0); + + Assert.assertTrue(logCapture.getOutput().contains( + "Started federation membership heartbeat with interval: 300 and initial delay: 10")); + rm.stop(); + } } From a33d05cfe3273421a514938c5423866684b8cd90 Mon Sep 17 00:00:00 2001 From: Ashutosh Gupta Date: Tue, 16 Aug 2022 14:46:12 +0100 Subject: [PATCH 3/3] fixing log line --- .../resourcemanager/federation/FederationStateStoreService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java index 08b0b278543b8..91e5763e66b88 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java @@ -129,7 +129,7 @@ protected void serviceInit(Configuration conf) throws Exception { TimeUnit.SECONDS); if (heartbeatInitialDelay <= 0) { - LOG.warn("{} configured value is wrong, must be at <= 0; using default value of {}", + LOG.warn("{} configured value is wrong, must be > 0; using default value of {}", YarnConfiguration.FEDERATION_STATESTORE_HEARTBEAT_INITIAL_DELAY, YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_HEARTBEAT_INITIAL_DELAY); heartbeatInitialDelay =