Skip to content

Commit 4160b57

Browse files
slfan1989KeeProMise
authored andcommitted
YARN-11711. Clean Up ServiceScheduler Code. (apache#6977) Contributed by Shilun Fan.
Reviewed-by: Steve Loughran <stevel@apache.org> Signed-off-by: Shilun Fan <slfan1989@apache.org>
1 parent dc8da8f commit 4160b57

File tree

2 files changed

+58
-57
lines changed

2 files changed

+58
-57
lines changed

hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -725,4 +725,13 @@
725725
<Match>
726726
<Package name="org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.placement.schema" />
727727
</Match>
728+
729+
<!-- The ServiceScheduler#createConfigFileCache method uses the `load` method,
730+
which is not allowed to return null; we can ignore it here. -->
731+
<Match>
732+
<Class name="org.apache.hadoop.yarn.service.ServiceScheduler"/>
733+
<Method name="$1.load(ConfigFile)" />
734+
<Bug pattern="NP_NONNULL_RETURN_VIOLATION"/>
735+
</Match>
736+
728737
</FindBugsFilter>

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java

Lines changed: 49 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,8 @@
123123
.EXIT_FALSE;
124124
import static org.apache.hadoop.yarn.service.exceptions.LauncherExitCodes
125125
.EXIT_SUCCESS;
126+
import static org.apache.hadoop.yarn.webapp.util.WebAppUtils.HTTPS_PREFIX;
127+
import static org.apache.hadoop.yarn.webapp.util.WebAppUtils.HTTP_PREFIX;
126128

127129
/**
128130
*
@@ -153,10 +155,10 @@ public class ServiceScheduler extends CompositeService {
153155

154156
private boolean timelineServiceEnabled;
155157

156-
// Global diagnostics that will be reported to RM on eRxit.
158+
// Global diagnostics that will be reported to RM on exit.
157159
// The unit the number of characters. This will be limited to 64 * 1024
158160
// characters.
159-
private BoundedAppender diagnostics = new BoundedAppender(64 * 1024);
161+
private final BoundedAppender diagnostics = new BoundedAppender(64 * 1024);
160162

161163
// A cache for loading config files from remote such as hdfs
162164
public LoadingCache<ConfigFile, Object> configFileCache = null;
@@ -168,7 +170,7 @@ public class ServiceScheduler extends CompositeService {
168170
private NMClientAsync nmClient;
169171
private AsyncDispatcher dispatcher;
170172
private YarnRegistryViewForProviders yarnRegistryOperations;
171-
private ServiceContext context;
173+
private final ServiceContext context;
172174
private ContainerLaunchService containerLaunchService;
173175
private final Map<ContainerId, ComponentInstance> unRecoveredInstances =
174176
new ConcurrentHashMap<>();
@@ -185,10 +187,10 @@ public class ServiceScheduler extends CompositeService {
185187
private volatile FinalApplicationStatus finalApplicationStatus =
186188
FinalApplicationStatus.ENDED;
187189

188-
private Clock systemClock;
190+
private final Clock systemClock;
189191

190192
// For unit test override since we don't want to terminate UT process.
191-
private ServiceUtils.ProcessTerminationHandler
193+
private final ServiceUtils.ProcessTerminationHandler
192194
terminationHandler = new ServiceUtils.ProcessTerminationHandler();
193195

194196
public ServiceScheduler(ServiceContext context) {
@@ -199,10 +201,10 @@ public ServiceScheduler(ServiceContext context) {
199201
}
200202

201203
public void buildInstance(ServiceContext context, Configuration configuration)
202-
throws YarnException, IOException {
204+
throws YarnException {
203205
app = context.service;
204206
executorService = Executors.newScheduledThreadPool(10);
205-
RegistryOperations registryClient = null;
207+
RegistryOperations registryClient;
206208
if (UserGroupInformation.isSecurityEnabled() &&
207209
!StringUtils.isEmpty(context.principal)
208210
&& !StringUtils.isEmpty(context.keytab)) {
@@ -480,7 +482,7 @@ private void recoverComponents(RegisterApplicationMasterResponse response) {
480482
}
481483
});
482484

483-
if (unRecoveredInstances.size() > 0) {
485+
if (!unRecoveredInstances.isEmpty()) {
484486
executorService.schedule(() -> {
485487
synchronized (unRecoveredInstances) {
486488
// after containerRecoveryTimeout, all the containers that haven't be
@@ -532,7 +534,8 @@ private void createConfigFileCache(final FileSystem fileSystem) {
532534
this.configFileCache =
533535
CacheBuilder.newBuilder().expireAfterAccess(10, TimeUnit.MINUTES)
534536
.build(new CacheLoader<ConfigFile, Object>() {
535-
@Override public Object load(ConfigFile key) throws Exception {
537+
@Override
538+
public Object load(ConfigFile key) throws Exception {
536539
switch (key.getType()) {
537540
case HADOOP_XML:
538541
try (FSDataInputStream input = fileSystem
@@ -560,34 +563,30 @@ private void createConfigFileCache(final FileSystem fileSystem) {
560563
}
561564

562565
private void registerServiceInstance(ApplicationAttemptId attemptId,
563-
Service service) throws IOException {
564-
LOG.info("Registering " + attemptId + ", " + service.getName()
565-
+ " into registry");
566+
Service service) {
567+
LOG.info("Registering {}, {} into registry.", attemptId, service.getName());
566568
ServiceRecord serviceRecord = new ServiceRecord();
567569
serviceRecord.set(YarnRegistryAttributes.YARN_ID,
568570
attemptId.getApplicationId().toString());
569571
serviceRecord.set(YarnRegistryAttributes.YARN_PERSISTENCE,
570572
PersistencePolicies.APPLICATION);
571573
serviceRecord.description = "YarnServiceMaster";
572574

573-
executorService.submit(new Runnable() {
574-
@Override public void run() {
575-
try {
576-
yarnRegistryOperations.registerSelf(serviceRecord, false);
577-
LOG.info("Registered service under {}; absolute path {}",
578-
yarnRegistryOperations.getSelfRegistrationPath(),
579-
yarnRegistryOperations.getAbsoluteSelfRegistrationPath());
580-
boolean isFirstAttempt = 1 == attemptId.getAttemptId();
581-
// delete the children in case there are any and this is an AM startup.
582-
// just to make sure everything underneath is purged
583-
if (isFirstAttempt) {
584-
yarnRegistryOperations.deleteChildren(
585-
yarnRegistryOperations.getSelfRegistrationPath(), true);
586-
}
587-
} catch (IOException e) {
588-
LOG.error(
589-
"Failed to register app " + app.getName() + " in registry", e);
575+
executorService.submit(() -> {
576+
try {
577+
yarnRegistryOperations.registerSelf(serviceRecord, false);
578+
LOG.info("Registered service under {}; absolute path {}",
579+
yarnRegistryOperations.getSelfRegistrationPath(),
580+
yarnRegistryOperations.getAbsoluteSelfRegistrationPath());
581+
boolean isFirstAttempt = 1 == attemptId.getAttemptId();
582+
// delete the children in case there are any and this is an AM startup.
583+
// just to make sure everything underneath is purged
584+
if (isFirstAttempt) {
585+
yarnRegistryOperations.deleteChildren(
586+
yarnRegistryOperations.getSelfRegistrationPath(), true);
590587
}
588+
} catch (IOException e) {
589+
LOG.error("Failed to register app {} in registry.", app.getName(), e);
591590
}
592591
});
593592
if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) {
@@ -637,7 +636,7 @@ public void handle(ComponentEvent event) {
637636
Component component = componentsByName.get(event.getName());
638637

639638
if (component == null) {
640-
LOG.error("No component exists for " + event.getName());
639+
LOG.error("No component exists for {}.", event.getName());
641640
return;
642641
}
643642
try {
@@ -657,14 +656,14 @@ public void handle(ComponentInstanceEvent event) {
657656
ComponentInstance instance =
658657
liveInstances.get(event.getContainerId());
659658
if (instance == null) {
660-
LOG.error("No component instance exists for " + event.getContainerId());
659+
LOG.error("No component instance exists for {}.", event.getContainerId());
661660
return;
662661
}
663662
try {
664663
instance.handle(event);
665664
} catch (Throwable t) {
666-
LOG.error(instance.getCompInstanceId() +
667-
": Error in handling event type " + event.getType(), t);
665+
LOG.error("{} : Error in handling event type {}.",
666+
instance.getCompInstanceId(), event.getType(), t);
668667
}
669668
}
670669
}
@@ -673,7 +672,7 @@ class AMRMClientCallback extends AMRMClientAsync.AbstractCallbackHandler {
673672

674673
@Override
675674
public void onContainersAllocated(List<Container> containers) {
676-
LOG.info(containers.size() + " containers allocated. ");
675+
LOG.info("{} containers allocated. ", containers.size());
677676
for (Container container : containers) {
678677
Component comp = componentsById.get(container.getAllocationRequestId());
679678
ComponentEvent event =
@@ -684,8 +683,8 @@ public void onContainersAllocated(List<Container> containers) {
684683
Collection<AMRMClient.ContainerRequest> requests = amRMClient
685684
.getMatchingRequests(container.getAllocationRequestId());
686685
LOG.info("[COMPONENT {}]: remove {} outstanding container requests " +
687-
"for allocateId " + container.getAllocationRequestId(),
688-
comp.getName(), requests.size());
686+
"for allocateId {}.", comp.getName(), requests.size(),
687+
container.getAllocationRequestId());
689688
// remove the corresponding request
690689
if (requests.iterator().hasNext()) {
691690
AMRMClient.ContainerRequest request = requests.iterator().next();
@@ -799,7 +798,7 @@ private class NMClientCallback extends NMClientAsync.AbstractCallbackHandler {
799798
Map<String, ByteBuffer> allServiceResponse) {
800799
ComponentInstance instance = liveInstances.get(containerId);
801800
if (instance == null) {
802-
LOG.error("No component instance exists for " + containerId);
801+
LOG.error("No component instance exists for {}.", containerId);
803802
return;
804803
}
805804
ComponentEvent event =
@@ -821,10 +820,10 @@ private class NMClientCallback extends NMClientAsync.AbstractCallbackHandler {
821820
public void onStartContainerError(ContainerId containerId, Throwable t) {
822821
ComponentInstance instance = liveInstances.get(containerId);
823822
if (instance == null) {
824-
LOG.error("No component instance exists for " + containerId);
823+
LOG.error("No component instance exists for {}.", containerId);
825824
return;
826825
}
827-
LOG.error("Failed to start " + containerId, t);
826+
LOG.error("Failed to start {}.", containerId, t);
828827
amRMClient.releaseAssignedContainer(containerId);
829828
// After container released, it'll get CONTAINER_COMPLETED event from RM
830829
// automatically which will trigger stopping COMPONENT INSTANCE
@@ -950,15 +949,14 @@ public boolean hasAtLeastOnePlacementConstraint() {
950949
}
951950

952951
public boolean terminateServiceIfNeeded(Component component) {
953-
boolean serviceIsTerminated =
952+
return
954953
terminateServiceIfDominantComponentFinished(component) ||
955954
terminateServiceIfAllComponentsFinished();
956-
return serviceIsTerminated;
957955
}
958956

959957
/**
960958
* If the service state component is finished, the service is also terminated.
961-
* @param component
959+
* @param component service component.
962960
*/
963961
private boolean terminateServiceIfDominantComponentFinished(Component
964962
component) {
@@ -981,8 +979,7 @@ private boolean terminateServiceIfDominantComponentFinished(Component
981979
state);
982980
component.getComponentSpec().setState(state);
983981
LOG.info("Dominate component {} finished, exiting Service Master... " +
984-
", final status=" + (isSucceeded ? "Succeeded" : "Failed"),
985-
component.getName());
982+
", final status={}.", component.getName(), (isSucceeded ? "Succeeded" : "Failed"));
986983
terminateService(isSucceeded);
987984
}
988985
}
@@ -1042,14 +1039,10 @@ private boolean terminateServiceIfAllComponentsFinished() {
10421039
}
10431040

10441041
if (shouldTerminate) {
1045-
LOG.info("All component finished, exiting Service Master... "
1046-
+ ", final status=" + (failedComponents.isEmpty() ?
1047-
"Succeeded" :
1048-
"Failed"));
1049-
LOG.info("Succeeded components: [" + org.apache.commons.lang3.StringUtils
1050-
.join(succeededComponents, ",") + "]");
1051-
LOG.info("Failed components: [" + org.apache.commons.lang3.StringUtils
1052-
.join(failedComponents, ",") + "]");
1042+
LOG.info("All component finished, exiting Service Master... " +
1043+
", final status={}", (failedComponents.isEmpty() ? "Succeeded" : "Failed"));
1044+
LOG.info("Succeeded components: [" + StringUtils.join(succeededComponents, ",") + "]");
1045+
LOG.info("Failed components: [" + StringUtils.join(failedComponents, ",") + "]");
10531046

10541047
terminateService(failedComponents.isEmpty());
10551048
}
@@ -1093,7 +1086,7 @@ public void syncSysFs(Service yarnApp) {
10931086
spec = ServiceApiUtil.jsonSerDeser.toJson(yarnApp);
10941087
for (org.apache.hadoop.yarn.service.api.records.Component c :
10951088
yarnApp.getComponents()) {
1096-
Set<String> nodes = new HashSet<String>();
1089+
Set<String> nodes = new HashSet<>();
10971090
boolean update = Boolean.parseBoolean(c.getConfiguration()
10981091
.getEnv(ApplicationConstants.Environment
10991092
.YARN_CONTAINER_RUNTIME_YARN_SYSFS_ENABLE.name()));
@@ -1109,9 +1102,9 @@ public void syncSysFs(Service yarnApp) {
11091102
for (String bareHost : nodes) {
11101103
StringBuilder requestPath = new StringBuilder();
11111104
if (YarnConfiguration.useHttps(conf)) {
1112-
requestPath.append("https://");
1105+
requestPath.append(HTTPS_PREFIX);
11131106
} else {
1114-
requestPath.append("http://");
1107+
requestPath.append(HTTP_PREFIX);
11151108
}
11161109
requestPath.append(bareHost)
11171110
.append(":")
@@ -1129,8 +1122,7 @@ public void syncSysFs(Service yarnApp) {
11291122
Builder builder = HttpUtil.connect(requestPath.toString());
11301123
ClientResponse response = builder.put(ClientResponse.class, spec);
11311124
if (response.getStatus()!=ClientResponse.Status.OK.getStatusCode()) {
1132-
LOG.warn("Error synchronize YARN sysfs: " +
1133-
response.getEntity(String.class));
1125+
LOG.warn("Error synchronize YARN sysfs: {}.", response.getEntity(String.class));
11341126
success = false;
11351127
}
11361128
}

0 commit comments

Comments
 (0)