Skip to content

Commit eff3b8c

Browse files
authored
YARN-10885. Make FederationStateStoreFacade#getApplicationHomeSubCluster use JCache. (#4701)
1 parent b1d4af2 commit eff3b8c

File tree

2 files changed

+69
-3
lines changed

2 files changed

+69
-3
lines changed

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java

Lines changed: 43 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,8 @@ public final class FederationStateStoreFacade {
9292
private static final String GET_SUBCLUSTERS_CACHEID = "getSubClusters";
9393
private static final String GET_POLICIES_CONFIGURATIONS_CACHEID =
9494
"getPoliciesConfigurations";
95+
private static final String GET_APPLICATION_HOME_SUBCLUSTER_CACHEID =
96+
"getApplicationHomeSubCluster";
9597

9698
private static final FederationStateStoreFacade FACADE =
9799
new FederationStateStoreFacade();
@@ -382,10 +384,19 @@ public void updateApplicationHomeSubCluster(
382384
*/
383385
public SubClusterId getApplicationHomeSubCluster(ApplicationId appId)
384386
throws YarnException {
385-
GetApplicationHomeSubClusterResponse response =
386-
stateStore.getApplicationHomeSubCluster(
387+
try {
388+
if (isCachingEnabled()) {
389+
SubClusterId value = SubClusterId.class.cast(
390+
cache.get(buildGetApplicationHomeSubClusterRequest(appId)));
391+
return value;
392+
} else {
393+
GetApplicationHomeSubClusterResponse response = stateStore.getApplicationHomeSubCluster(
387394
GetApplicationHomeSubClusterRequest.newInstance(appId));
388-
return response.getApplicationHomeSubCluster().getHomeSubCluster();
395+
return response.getApplicationHomeSubCluster().getHomeSubCluster();
396+
}
397+
} catch (Throwable ex) {
398+
throw new YarnException(ex);
399+
}
389400
}
390401

391402
/**
@@ -548,6 +559,26 @@ public Map<String, SubClusterPolicyConfiguration> invoke(
548559
return cacheRequest;
549560
}
550561

562+
private Object buildGetApplicationHomeSubClusterRequest(ApplicationId applicationId) {
563+
final String cacheKey = buildCacheKey(getClass().getSimpleName(),
564+
GET_APPLICATION_HOME_SUBCLUSTER_CACHEID, applicationId.toString());
565+
CacheRequest<String, SubClusterId> cacheRequest = new CacheRequest<>(
566+
cacheKey,
567+
input -> {
568+
569+
GetApplicationHomeSubClusterRequest request =
570+
GetApplicationHomeSubClusterRequest.newInstance(applicationId);
571+
GetApplicationHomeSubClusterResponse response =
572+
stateStore.getApplicationHomeSubCluster(request);
573+
574+
ApplicationHomeSubCluster appHomeSubCluster = response.getApplicationHomeSubCluster();
575+
SubClusterId subClusterId = appHomeSubCluster.getHomeSubCluster();
576+
577+
return subClusterId;
578+
});
579+
return cacheRequest;
580+
}
581+
551582
protected String buildCacheKey(String typeName, String methodName,
552583
String argName) {
553584
StringBuilder buffer = new StringBuilder();
@@ -645,6 +676,15 @@ protected interface Func<T, TResult> {
645676
TResult invoke(T input) throws Exception;
646677
}
647678

679+
@VisibleForTesting
680+
public Cache<Object, Object> getCache() {
681+
return cache;
682+
}
683+
684+
@VisibleForTesting
685+
protected Object getAppHomeSubClusterCacheRequest(ApplicationId applicationId) {
686+
return buildGetApplicationHomeSubClusterRequest(applicationId);
687+
}
648688

649689
@VisibleForTesting
650690
public FederationStateStore getStateStore() {

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationStateStoreFacade.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@
4141
import org.junit.runners.Parameterized;
4242
import org.junit.runners.Parameterized.Parameters;
4343

44+
import javax.cache.Cache;
45+
4446
/**
4547
* Unit tests for FederationStateStoreFacade.
4648
*/
@@ -64,12 +66,14 @@ public static Collection<Boolean[]> getParameters() {
6466
private FederationStateStoreTestUtil stateStoreTestUtil;
6567
private FederationStateStoreFacade facade =
6668
FederationStateStoreFacade.getInstance();
69+
private Boolean isCachingEnabled;
6770

6871
public TestFederationStateStoreFacade(Boolean isCachingEnabled) {
6972
conf = new Configuration();
7073
if (!(isCachingEnabled.booleanValue())) {
7174
conf.setInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, 0);
7275
}
76+
this.isCachingEnabled = isCachingEnabled;
7377
}
7478

7579
@Before
@@ -206,4 +210,26 @@ public void testAddApplicationHomeSubCluster() throws YarnException {
206210
Assert.assertEquals(subClusterId1, result);
207211
}
208212

213+
@Test
214+
public void testGetApplicationHomeSubClusterCache() throws YarnException {
215+
ApplicationId appId = ApplicationId.newInstance(clusterTs, numApps + 1);
216+
SubClusterId subClusterId1 = SubClusterId.newInstance("Home1");
217+
218+
ApplicationHomeSubCluster appHomeSubCluster =
219+
ApplicationHomeSubCluster.newInstance(appId, subClusterId1);
220+
SubClusterId subClusterIdAdd = facade.addApplicationHomeSubCluster(appHomeSubCluster);
221+
222+
SubClusterId subClusterIdByFacade = facade.getApplicationHomeSubCluster(appId);
223+
Assert.assertEquals(subClusterIdByFacade, subClusterIdAdd);
224+
Assert.assertEquals(subClusterId1, subClusterIdAdd);
225+
226+
if (isCachingEnabled.booleanValue()) {
227+
Cache<Object, Object> cache = facade.getCache();
228+
Object cacheKey = facade.getAppHomeSubClusterCacheRequest(appId);
229+
Object subClusterIdByCache = cache.get(cacheKey);
230+
Assert.assertEquals(subClusterIdByFacade, subClusterIdByCache);
231+
Assert.assertEquals(subClusterId1, subClusterIdByCache);
232+
}
233+
}
234+
209235
}

0 commit comments

Comments
 (0)