Skip to content

Commit

Permalink
YARN-9425. Make initialDelay configurable for FederationStateStoreSer…
Browse files Browse the repository at this point in the history
…vice#scheduledExecutorService
  • Loading branch information
Ashutosh Gupta committed Aug 11, 2022
1 parent 133e8aa commit affc62b
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -3920,6 +3920,13 @@ public static boolean isAclEnabled(Configuration conf) {
public static final String DEFAULT_FEDERATION_REGISTRY_BASE_KEY =
"yarnfederation/";

public static final String FEDERATION_STATESTORE_HEARTBEAT_INITIAL_DELAY_SECS =
FEDERATION_PREFIX + "state-store.heartbeat.initial-delay-secs";

// 30 secs
public static final int
DEFAULT_FEDERATION_STATESTORE_HEARTBEAT_INITIAL_DELAY_SECS = 30;

public static final String FEDERATION_STATESTORE_HEARTBEAT_INTERVAL_SECS =
FEDERATION_PREFIX + "state-store.heartbeat-interval-secs";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3624,6 +3624,13 @@
<name>yarn.federation.enabled</name>
<value>false</value>
</property>
<property>
<description>
Initial delay for federation state-store heartbeat service.
</description>
<name>yarn.federation.state-store.heartbeat.initial-delay-secs</name>
<value>30</value>
</property>
<property>
<description>
Machine list file to be loaded by the FederationSubCluster Resolver
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ public class FederationStateStoreService extends AbstractService
private FederationStateStore stateStoreClient = null;
private SubClusterId subClusterId;
private long heartbeatInterval;
private long heartbeatInitialDelay;
private RMContext rmContext;

public FederationStateStoreService(RMContext rmContext) {
Expand Down Expand Up @@ -120,6 +121,14 @@ protected void serviceInit(Configuration conf) throws Exception {
heartbeatInterval =
YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_HEARTBEAT_INTERVAL_SECS;
}

heartbeatInitialDelay = conf.getLong(
YarnConfiguration.FEDERATION_STATESTORE_HEARTBEAT_INITIAL_DELAY_SECS,
YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_HEARTBEAT_INITIAL_DELAY_SECS);
if (heartbeatInitialDelay <= 0) {
heartbeatInitialDelay =
YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_HEARTBEAT_INITIAL_DELAY_SECS;
}
LOG.info("Initialized federation membership service.");

super.serviceInit(conf);
Expand Down Expand Up @@ -196,7 +205,7 @@ private void registerAndInitializeHeartbeat() {
scheduledExecutorService =
HadoopExecutors.newSingleThreadScheduledExecutor();
scheduledExecutorService.scheduleWithFixedDelay(stateStoreHeartbeat,
heartbeatInterval, heartbeatInterval, TimeUnit.SECONDS);
heartbeatInitialDelay, heartbeatInterval, TimeUnit.SECONDS);
LOG.info("Started federation membership heartbeat with interval: {}",
heartbeatInterval);
}
Expand Down

0 comments on commit affc62b

Please sign in to comment.