From edccf15f717b75b7de005aad3f663216b6ace96d Mon Sep 17 00:00:00 2001 From: slfan1989 <55643692+slfan1989@users.noreply.github.com> Date: Sat, 29 Apr 2023 05:11:13 +0800 Subject: [PATCH] YARN-11469. Refactor FederationStateStoreFacade Cache Code. (#5570) --- .../federation/cache/FederationCache.java | 484 ++++++++++++++++++ .../federation/cache/FederationJCache.java | 155 ++++++ .../server/federation/cache/package-info.java | 17 + .../utils/FederationStateStoreFacade.java | 457 +++++++++-------- .../utils/TestFederationStateStoreFacade.java | 35 ++ 5 files changed, 925 insertions(+), 223 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/cache/FederationCache.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/cache/FederationJCache.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/cache/package-info.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/cache/FederationCache.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/cache/FederationCache.java new file mode 100644 index 00000000000000..d33361ad407be3 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/cache/FederationCache.java @@ -0,0 +1,484 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.federation.cache; + +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.commons.lang3.builder.HashCodeBuilder; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse; +import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse; +import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsResponse; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public abstract class FederationCache { + + // ------------------------------------ Constants ------------------------- + + protected static final String GET_SUBCLUSTERS_CACHEID = "getSubClusters"; + + protected static final String GET_POLICIES_CONFIGURATIONS_CACHEID = + "getPoliciesConfigurations"; + protected static final String GET_APPLICATION_HOME_SUBCLUSTER_CACHEID = + "getApplicationHomeSubCluster"; + + protected static final String POINT = "."; + + private FederationStateStore stateStore; + + /** + * Determine whether to enable cache. + * We judge whether to enable the cache according to the cache time. + * If the cache time is greater than 0, the cache is enabled. + * If the cache time is less than or equal 0, the cache is not enabled. + * + * @return true, enable cache; false, not enable cache. + */ + public abstract boolean isCachingEnabled(); + + /** + * Initialize the cache. + * + * @param pConf Configuration. + * @param pStateStore FederationStateStore. + */ + public abstract void initCache(Configuration pConf, FederationStateStore pStateStore); + + /** + * clear cache. + */ + public abstract void clearCache(); + + /** + * Build CacheKey. + * + * @param className Cache Class Name. + * @param methodName Method Name. + * @return append result. + * Example: className:FederationJCache, methodName:getPoliciesConfigurations. + * We Will Return FederationJCache.getPoliciesConfigurations. + */ + protected String buildCacheKey(String className, String methodName) { + return buildCacheKey(className, methodName, null); + } + + /** + * Build CacheKey. + * + * @param className Cache Class Name. + * @param methodName Method Name. + * @param argName ArgName. + * @return append result. + * Example: + * className:FederationJCache, methodName:getApplicationHomeSubCluster, argName: app_1 + * We Will Return FederationJCache.getApplicationHomeSubCluster.app_1 + */ + protected String buildCacheKey(String className, String methodName, String argName) { + StringBuilder buffer = new StringBuilder(); + buffer.append(className).append(POINT).append(methodName); + if (argName != null) { + buffer.append(POINT); + buffer.append(argName); + } + return buffer.toString(); + } + + /** + * Returns the {@link SubClusterInfo} of all active sub cluster(s). + * + * @param filterInactiveSubClusters whether to filter out inactive + * sub-clusters + * @return the information of all active sub cluster(s) + * @throws YarnException if the call to the state store is unsuccessful + */ + public abstract Map getSubClusters( + boolean filterInactiveSubClusters) throws YarnException; + + /** + * Get the policies that is represented as + * {@link SubClusterPolicyConfiguration} for all currently active queues in + * the system. + * + * @return the policies for all currently active queues in the system + * @throws YarnException if the call to the state store is unsuccessful + */ + public abstract Map getPoliciesConfigurations() + throws Exception; + + /** + * Returns the home {@link SubClusterId} for the specified + * {@link ApplicationId}. + * + * @param appId the identifier of the application + * @return the home sub cluster identifier + * @throws YarnException if the call to the state store is unsuccessful + */ + public abstract SubClusterId getApplicationHomeSubCluster(ApplicationId appId) throws Exception; + + /** + * Remove SubCluster from cache. + * + * @param filterInactiveSubClusters whether to filter out inactive + * sub-clusters. + */ + public abstract void removeSubCluster(boolean filterInactiveSubClusters); + + + // ------------------------------------ SubClustersCache ------------------------- + + /** + * Build GetSubClusters CacheRequest. + * + * @param cacheKey cacheKey. + * @param filterInactiveSubClusters filter Inactive SubClusters. + * @return CacheRequest. + * @throws YarnException exceptions from yarn servers. + */ + protected CacheRequest> buildGetSubClustersCacheRequest( + String cacheKey, final boolean filterInactiveSubClusters) throws YarnException { + CacheResponse response = + buildSubClusterInfoResponse(filterInactiveSubClusters); + CacheRequest> cacheRequest = + new CacheRequest<>(cacheKey, response); + return cacheRequest; + } + + /** + * Build SubClusterInfo Response. + * + * @param filterInactiveSubClusters whether to filter out inactive sub-clusters. + * @return SubClusterInfo Response. + * @throws YarnException exceptions from yarn servers. + */ + private CacheResponse buildSubClusterInfoResponse( + final boolean filterInactiveSubClusters) throws YarnException { + GetSubClustersInfoRequest request = GetSubClustersInfoRequest.newInstance( + filterInactiveSubClusters); + GetSubClustersInfoResponse subClusters = stateStore.getSubClusters(request); + CacheResponse response = new SubClusterInfoCacheResponse(); + response.setList(subClusters.getSubClusters()); + return response; + } + + /** + * According to the response, build SubClusterInfoMap. + * + * @param response GetSubClustersInfoResponse. + * @return SubClusterInfoMap. + */ + public static Map buildSubClusterInfoMap( + final GetSubClustersInfoResponse response) { + List subClusters = response.getSubClusters(); + return buildSubClusterInfoMap(subClusters); + } + + /** + * According to the cacheRequest, build SubClusterInfoMap. + * + * @param cacheRequest CacheRequest. + * @return SubClusterInfoMap. + */ + public static Map buildSubClusterInfoMap( + CacheRequest cacheRequest) { + Object value = cacheRequest.value; + SubClusterInfoCacheResponse response = SubClusterInfoCacheResponse.class.cast(value); + List subClusters = response.getList(); + return buildSubClusterInfoMap(subClusters); + } + + /** + * According to the subClusters, build SubClusterInfoMap. + * + * @param subClusters subCluster List. + * @return SubClusterInfoMap. + */ + private static Map buildSubClusterInfoMap( + List subClusters) { + Map subClustersMap = new HashMap<>(subClusters.size()); + for (SubClusterInfo subCluster : subClusters) { + subClustersMap.put(subCluster.getSubClusterId(), subCluster); + } + return subClustersMap; + } + + // ------------------------------------ ApplicationHomeSubClusterCache ------------------------- + + /** + * Build GetApplicationHomeSubCluster CacheRequest. + * + * @param cacheKey cacheKey. + * @param applicationId applicationId. + * @return CacheRequest. + * @throws YarnException exceptions from yarn servers. + */ + protected CacheRequest> + buildGetApplicationHomeSubClusterRequest(String cacheKey, ApplicationId applicationId) + throws YarnException { + CacheResponse response = buildSubClusterIdResponse(applicationId); + return new CacheRequest<>(cacheKey, response); + } + + /** + * Build SubClusterId Response. + * + * @param applicationId applicationId. + * @return subClusterId + * @throws YarnException exceptions from yarn servers. + */ + private CacheResponse buildSubClusterIdResponse(final ApplicationId applicationId) + throws YarnException { + GetApplicationHomeSubClusterRequest request = + GetApplicationHomeSubClusterRequest.newInstance(applicationId); + GetApplicationHomeSubClusterResponse response = + stateStore.getApplicationHomeSubCluster(request); + ApplicationHomeSubCluster appHomeSubCluster = response.getApplicationHomeSubCluster(); + SubClusterId subClusterId = appHomeSubCluster.getHomeSubCluster(); + CacheResponse cacheResponse = new ApplicationHomeSubClusterCacheResponse(); + cacheResponse.setItem(subClusterId); + return cacheResponse; + } + + // ------------------------------ SubClusterPolicyConfigurationCache ------------------------- + + /** + * Build GetPoliciesConfigurations CacheRequest. + * + * @param cacheKey cacheKey. + * @return CacheRequest. + * @throws YarnException exceptions from yarn servers. + */ + protected CacheRequest> + buildGetPoliciesConfigurationsCacheRequest(String cacheKey) throws YarnException { + CacheResponse response = + buildSubClusterPolicyConfigurationResponse(); + return new CacheRequest<>(cacheKey, response); + } + + /** + * According to the response, build PolicyConfigMap. + * + * @param response GetSubClusterPoliciesConfigurationsResponse. + * @return PolicyConfigMap. + */ + public static Map buildPolicyConfigMap( + GetSubClusterPoliciesConfigurationsResponse response) { + List policyConfigs = response.getPoliciesConfigs(); + return buildPolicyConfigMap(policyConfigs); + } + + /** + * According to the subClusters, build PolicyConfigMap. + * + * @param policyConfigs SubClusterPolicyConfigurations + * @return PolicyConfigMap. + */ + private static Map buildPolicyConfigMap( + List policyConfigs) { + Map queuePolicyConfigs = new HashMap<>(); + for (SubClusterPolicyConfiguration policyConfig : policyConfigs) { + queuePolicyConfigs.put(policyConfig.getQueue(), policyConfig); + } + return queuePolicyConfigs; + } + + /** + * According to the cacheRequest, build PolicyConfigMap. + * + * @param cacheRequest CacheRequest. + * @return PolicyConfigMap. + */ + public static Map buildPolicyConfigMap( + CacheRequest cacheRequest){ + Object value = cacheRequest.value; + SubClusterPolicyConfigurationCacheResponse response = + SubClusterPolicyConfigurationCacheResponse.class.cast(value); + List subClusters = response.getList(); + return buildPolicyConfigMap(subClusters); + } + + /** + * Build SubClusterPolicyConfiguration Response. + * + * @return SubClusterPolicyConfiguration Response. + * @throws YarnException exceptions from yarn servers. + */ + private CacheResponse buildSubClusterPolicyConfigurationResponse() + throws YarnException { + GetSubClusterPoliciesConfigurationsRequest request = + GetSubClusterPoliciesConfigurationsRequest.newInstance(); + GetSubClusterPoliciesConfigurationsResponse response = + stateStore.getPoliciesConfigurations(request); + List policyConfigs = response.getPoliciesConfigs(); + CacheResponse cacheResponse = + new SubClusterPolicyConfigurationCacheResponse(); + cacheResponse.setList(policyConfigs); + return cacheResponse; + } + + /** + * Internal class that encapsulates the cache key and a function that returns + * the value for the specified key. + */ + public class CacheRequest { + private K key; + private V value; + + CacheRequest(K pKey, V pValue) { + this.key = pKey; + this.value = pValue; + } + + public V getValue() throws Exception { + return value; + } + + @Override + public int hashCode() { + return new HashCodeBuilder().append(key).toHashCode(); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + + if (obj == null) { + return false; + } + + if (obj instanceof CacheRequest) { + Class cacheRequestClass = CacheRequest.class; + CacheRequest other = cacheRequestClass.cast(obj); + return new EqualsBuilder().append(key, other.key).isEquals(); + } + + return false; + } + } + + public class CacheResponse { + private List list; + + private R item; + + public List getList() { + return list; + } + + public void setList(List list) { + this.list = list; + } + + public R getItem() { + return item; + } + + public void setItem(R pItem) { + this.item = pItem; + } + } + + public class SubClusterInfoCacheResponse extends CacheResponse { + @Override + public List getList() { + return super.getList(); + } + + @Override + public void setList(List list) { + super.setList(list); + } + + @Override + public SubClusterInfo getItem() { + return super.getItem(); + } + + @Override + public void setItem(SubClusterInfo item) { + super.setItem(item); + } + } + + public class SubClusterPolicyConfigurationCacheResponse + extends CacheResponse { + @Override + public List getList() { + return super.getList(); + } + + @Override + public void setList(List list) { + super.setList(list); + } + + @Override + public SubClusterPolicyConfiguration getItem() { + return super.getItem(); + } + + @Override + public void setItem(SubClusterPolicyConfiguration item) { + super.setItem(item); + } + } + + public class ApplicationHomeSubClusterCacheResponse + extends CacheResponse { + @Override + public List getList() { + return super.getList(); + } + + @Override + public void setList(List list) { + super.setList(list); + } + + @Override + public SubClusterId getItem() { + return super.getItem(); + } + + @Override + public void setItem(SubClusterId item) { + super.setItem(item); + } + } + + public FederationStateStore getStateStore() { + return stateStore; + } + + public void setStateStore(FederationStateStore stateStore) { + this.stateStore = stateStore; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/cache/FederationJCache.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/cache/FederationJCache.java new file mode 100644 index 00000000000000..4b530149b48d01 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/cache/FederationJCache.java @@ -0,0 +1,155 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.server.federation.cache; + +import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.cache.Cache; +import javax.cache.CacheManager; +import javax.cache.Caching; +import javax.cache.configuration.FactoryBuilder; +import javax.cache.configuration.MutableConfiguration; +import javax.cache.expiry.CreatedExpiryPolicy; +import javax.cache.expiry.Duration; +import javax.cache.expiry.ExpiryPolicy; +import javax.cache.spi.CachingProvider; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +public class FederationJCache extends FederationCache { + + private static final Logger LOG = LoggerFactory.getLogger(FederationJCache.class); + + private Cache> cache; + + private int cacheTimeToLive; + + private boolean isCachingEnabled = false; + + private String className = this.getClass().getSimpleName(); + + @Override + public boolean isCachingEnabled() { + return isCachingEnabled; + } + + @Override + public void initCache(Configuration pConf, FederationStateStore pStateStore) { + // Picking the JCache provider from classpath, need to make sure there's + // no conflict or pick up a specific one in the future + cacheTimeToLive = pConf.getInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, + YarnConfiguration.DEFAULT_FEDERATION_CACHE_TIME_TO_LIVE_SECS); + if (cacheTimeToLive <= 0) { + isCachingEnabled = false; + return; + } + this.setStateStore(pStateStore); + CachingProvider jcacheProvider = Caching.getCachingProvider(); + CacheManager jcacheManager = jcacheProvider.getCacheManager(); + this.cache = jcacheManager.getCache(className); + if (this.cache == null) { + LOG.info("Creating a JCache Manager with name {}.", className); + Duration cacheExpiry = new Duration(TimeUnit.SECONDS, cacheTimeToLive); + FactoryBuilder.SingletonFactory expiryPolicySingletonFactory = + new FactoryBuilder.SingletonFactory<>(new CreatedExpiryPolicy(cacheExpiry)); + MutableConfiguration> configuration = + new MutableConfiguration<>(); + configuration.setStoreByValue(false); + configuration.setExpiryPolicyFactory(expiryPolicySingletonFactory); + this.cache = jcacheManager.createCache(className, configuration); + } + isCachingEnabled = true; + } + + @Override + public void clearCache() { + CachingProvider jcacheProvider = Caching.getCachingProvider(); + CacheManager jcacheManager = jcacheProvider.getCacheManager(); + jcacheManager.destroyCache(className); + this.cache = null; + } + + @Override + public Map getSubClusters(boolean filterInactiveSubClusters) + throws YarnException { + final String cacheKey = buildCacheKey(className, GET_SUBCLUSTERS_CACHEID, + Boolean.toString(filterInactiveSubClusters)); + CacheRequest cacheRequest = cache.get(cacheKey); + if (cacheRequest == null) { + cacheRequest = buildGetSubClustersCacheRequest(className, filterInactiveSubClusters); + cache.put(cacheKey, cacheRequest); + } + return buildSubClusterInfoMap(cacheRequest); + } + + @Override + public Map getPoliciesConfigurations() + throws Exception { + final String cacheKey = buildCacheKey(className, GET_POLICIES_CONFIGURATIONS_CACHEID); + CacheRequest cacheRequest = cache.get(cacheKey); + if(cacheRequest == null){ + cacheRequest = buildGetPoliciesConfigurationsCacheRequest(className); + cache.put(cacheKey, cacheRequest); + } + return buildPolicyConfigMap(cacheRequest); + } + + @Override + public SubClusterId getApplicationHomeSubCluster(ApplicationId appId) + throws Exception { + final String cacheKey = buildCacheKey(className, GET_APPLICATION_HOME_SUBCLUSTER_CACHEID, + appId.toString()); + CacheRequest cacheRequest = cache.get(cacheKey); + if (cacheRequest == null) { + cacheRequest = buildGetApplicationHomeSubClusterRequest(className, appId); + cache.put(cacheKey, cacheRequest); + } + CacheResponse response = + ApplicationHomeSubClusterCacheResponse.class.cast(cacheRequest.getValue()); + return response.getItem(); + } + + @Override + public void removeSubCluster(boolean flushCache) { + final String cacheKey = buildCacheKey(className, GET_SUBCLUSTERS_CACHEID, + Boolean.toString(flushCache)); + cache.remove(cacheKey); + } + + @VisibleForTesting + public Cache> getCache() { + return cache; + } + + @VisibleForTesting + public String getAppHomeSubClusterCacheKey(ApplicationId appId) + throws YarnException { + return buildCacheKey(className, GET_APPLICATION_HOME_SUBCLUSTER_CACHEID, + appId.toString()); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/cache/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/cache/package-info.java new file mode 100644 index 00000000000000..9af501be756e95 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/cache/package-info.java @@ -0,0 +1,17 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.yarn.server.federation.cache; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java index a067a041962453..234f65ce1fa8dd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java @@ -20,23 +20,16 @@ import java.util.HashMap; import java.util.List; +import java.util.ArrayList; import java.util.Map; import java.util.concurrent.TimeUnit; +import java.util.Random; +import java.util.Collection; -import javax.cache.Cache; -import javax.cache.CacheManager; -import javax.cache.Caching; -import javax.cache.configuration.CompleteConfiguration; -import javax.cache.configuration.FactoryBuilder; -import javax.cache.configuration.MutableConfiguration; -import javax.cache.expiry.CreatedExpiryPolicy; -import javax.cache.expiry.Duration; -import javax.cache.expiry.ExpiryPolicy; -import javax.cache.integration.CacheLoader; import javax.cache.integration.CacheLoaderException; -import javax.cache.spi.CachingProvider; -import org.apache.commons.lang3.NotImplementedException; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.collections.MapUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.retry.RetryPolicies; import org.apache.hadoop.io.retry.RetryPolicy; @@ -46,6 +39,10 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.federation.cache.FederationCache; +import org.apache.hadoop.yarn.server.federation.cache.FederationJCache; +import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils; +import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException; import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver; import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreRetriableException; @@ -57,21 +54,26 @@ import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest; import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoResponse; import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsRequest; -import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsResponse; import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationRequest; import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationResponse; import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest; -import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration; import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterResponse; +import org.apache.hadoop.yarn.webapp.NotFoundException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.classification.VisibleForTesting; import com.zaxxer.hikari.pool.HikariPool.PoolInitializationException; +import static org.apache.hadoop.yarn.server.federation.cache.FederationCache.buildPolicyConfigMap; +import static org.apache.hadoop.yarn.server.federation.cache.FederationCache.buildSubClusterInfoMap; + /** * * The FederationStateStoreFacade is an utility wrapper that provides singleton @@ -83,18 +85,15 @@ public final class FederationStateStoreFacade { private static final Logger LOG = LoggerFactory.getLogger(FederationStateStoreFacade.class); - private static final String GET_SUBCLUSTERS_CACHEID = "getSubClusters"; - private static final String GET_POLICIES_CONFIGURATIONS_CACHEID = - "getPoliciesConfigurations"; - private static final FederationStateStoreFacade FACADE = new FederationStateStoreFacade(); + private static Random rand = new Random(System.currentTimeMillis()); + private FederationStateStore stateStore; - private int cacheTimeToLive; private Configuration conf; - private Cache cache; private SubClusterResolver subclusterResolver; + private FederationCache federationCache; private FederationStateStoreFacade() { initializeFacadeInternal(new Configuration()); @@ -115,11 +114,11 @@ private void initializeFacadeInternal(Configuration config) { SubClusterResolver.class); this.subclusterResolver.load(); - initCache(); + federationCache = new FederationJCache(); + federationCache.initCache(config, stateStore); } catch (YarnException ex) { - LOG.error("Failed to initialize the FederationStateStoreFacade object", - ex); + LOG.error("Failed to initialize the FederationStateStoreFacade object", ex); throw new RuntimeException(ex); } } @@ -136,8 +135,8 @@ public synchronized void reinitialize(FederationStateStore store, Configuration config) { this.conf = config; this.stateStore = store; - clearCache(); - initCache(); + federationCache.clearCache(); + federationCache.initCache(config, stateStore); } /** @@ -158,8 +157,7 @@ public static RetryPolicy createRetryPolicy(Configuration conf) { conf.getLong(YarnConfiguration.CLIENT_FAILOVER_SLEEPTIME_BASE_MS, YarnConfiguration.DEFAULT_RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS), TimeUnit.MILLISECONDS); - Map, RetryPolicy> exceptionToPolicyMap = - new HashMap, RetryPolicy>(); + Map, RetryPolicy> exceptionToPolicyMap = new HashMap<>(); exceptionToPolicyMap.put(FederationStateStoreRetriableException.class, basePolicy); exceptionToPolicyMap.put(CacheLoaderException.class, basePolicy); @@ -170,47 +168,6 @@ public static RetryPolicy createRetryPolicy(Configuration conf) { return retryPolicy; } - private boolean isCachingEnabled() { - return (cacheTimeToLive > 0); - } - - private void initCache() { - // Picking the JCache provider from classpath, need to make sure there's - // no conflict or pick up a specific one in the future - cacheTimeToLive = - conf.getInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, - YarnConfiguration.DEFAULT_FEDERATION_CACHE_TIME_TO_LIVE_SECS); - if (isCachingEnabled()) { - CachingProvider jcacheProvider = Caching.getCachingProvider(); - CacheManager jcacheManager = jcacheProvider.getCacheManager(); - this.cache = jcacheManager.getCache(this.getClass().getSimpleName()); - if (this.cache == null) { - LOG.info("Creating a JCache Manager with name " - + this.getClass().getSimpleName()); - Duration cacheExpiry = new Duration(TimeUnit.SECONDS, cacheTimeToLive); - CompleteConfiguration configuration = - new MutableConfiguration().setStoreByValue(false) - .setReadThrough(true) - .setExpiryPolicyFactory( - new FactoryBuilder.SingletonFactory( - new CreatedExpiryPolicy(cacheExpiry))) - .setCacheLoaderFactory( - new FactoryBuilder.SingletonFactory>( - new CacheLoaderImpl())); - this.cache = jcacheManager.createCache(this.getClass().getSimpleName(), - configuration); - } - } - } - - private void clearCache() { - CachingProvider jcacheProvider = Caching.getCachingProvider(); - CacheManager jcacheManager = jcacheProvider.getCacheManager(); - - jcacheManager.destroyCache(this.getClass().getSimpleName()); - this.cache = null; - } - /** * Returns the singleton instance of the FederationStateStoreFacade object. * @@ -230,7 +187,7 @@ public static FederationStateStoreFacade getInstance() { */ public SubClusterInfo getSubCluster(final SubClusterId subClusterId) throws YarnException { - if (isCachingEnabled()) { + if (federationCache.isCachingEnabled()) { return getSubClusters(false).get(subClusterId); } else { GetSubClusterInfoResponse response = stateStore @@ -254,10 +211,10 @@ public SubClusterInfo getSubCluster(final SubClusterId subClusterId) */ public SubClusterInfo getSubCluster(final SubClusterId subClusterId, final boolean flushCache) throws YarnException { - if (flushCache && isCachingEnabled()) { + if (flushCache && federationCache.isCachingEnabled()) { LOG.info("Flushing subClusters from cache and rehydrating from store," + " most likely on account of RM failover."); - cache.remove(buildGetSubClustersCacheRequest(false)); + federationCache.removeSubCluster(false); } return getSubCluster(subClusterId); } @@ -270,16 +227,15 @@ public SubClusterInfo getSubCluster(final SubClusterId subClusterId, * @return the information of all active sub cluster(s) * @throws YarnException if the call to the state store is unsuccessful */ - @SuppressWarnings("unchecked") - public Map getSubClusters( - final boolean filterInactiveSubClusters) throws YarnException { + public Map getSubClusters(final boolean filterInactiveSubClusters) + throws YarnException { try { - if (isCachingEnabled()) { - return (Map) cache - .get(buildGetSubClustersCacheRequest(filterInactiveSubClusters)); + if (federationCache.isCachingEnabled()) { + return federationCache.getSubClusters(filterInactiveSubClusters); } else { - return buildSubClusterInfoMap(stateStore.getSubClusters( - GetSubClustersInfoRequest.newInstance(filterInactiveSubClusters))); + GetSubClustersInfoRequest request = + GetSubClustersInfoRequest.newInstance(filterInactiveSubClusters); + return buildSubClusterInfoMap(stateStore.getSubClusters(request)); } } catch (Throwable ex) { throw new YarnException(ex); @@ -294,15 +250,15 @@ public Map getSubClusters( * mapping for the queue * @throws YarnException if the call to the state store is unsuccessful */ - public SubClusterPolicyConfiguration getPolicyConfiguration( - final String queue) throws YarnException { - if (isCachingEnabled()) { + public SubClusterPolicyConfiguration getPolicyConfiguration(final String queue) + throws YarnException { + if (federationCache.isCachingEnabled()) { return getPoliciesConfigurations().get(queue); } else { - + GetSubClusterPolicyConfigurationRequest request = + GetSubClusterPolicyConfigurationRequest.newInstance(queue); GetSubClusterPolicyConfigurationResponse response = - stateStore.getPolicyConfiguration( - GetSubClusterPolicyConfigurationRequest.newInstance(queue)); + stateStore.getPolicyConfiguration(request); if (response == null) { return null; } else { @@ -319,16 +275,15 @@ public SubClusterPolicyConfiguration getPolicyConfiguration( * @return the policies for all currently active queues in the system * @throws YarnException if the call to the state store is unsuccessful */ - @SuppressWarnings("unchecked") public Map getPoliciesConfigurations() throws YarnException { try { - if (isCachingEnabled()) { - return (Map) cache - .get(buildGetPoliciesConfigurationsCacheRequest()); + if (federationCache.isCachingEnabled()) { + return federationCache.getPoliciesConfigurations(); } else { - return buildPolicyConfigMap(stateStore.getPoliciesConfigurations( - GetSubClusterPoliciesConfigurationsRequest.newInstance())); + GetSubClusterPoliciesConfigurationsRequest request = + GetSubClusterPoliciesConfigurationsRequest.newInstance(); + return buildPolicyConfigMap(stateStore.getPoliciesConfigurations(request)); } } catch (Throwable ex) { throw new YarnException(ex); @@ -363,7 +318,6 @@ public void updateApplicationHomeSubCluster( ApplicationHomeSubCluster appHomeSubCluster) throws YarnException { stateStore.updateApplicationHomeSubCluster( UpdateApplicationHomeSubClusterRequest.newInstance(appHomeSubCluster)); - return; } /** @@ -376,10 +330,17 @@ public void updateApplicationHomeSubCluster( */ public SubClusterId getApplicationHomeSubCluster(ApplicationId appId) throws YarnException { - GetApplicationHomeSubClusterResponse response = - stateStore.getApplicationHomeSubCluster( + try { + if (federationCache.isCachingEnabled()) { + return federationCache.getApplicationHomeSubCluster(appId); + } else { + GetApplicationHomeSubClusterResponse response = stateStore.getApplicationHomeSubCluster( GetApplicationHomeSubClusterRequest.newInstance(appId)); - return response.getApplicationHomeSubCluster().getHomeSubCluster(); + return response.getApplicationHomeSubCluster().getHomeSubCluster(); + } + } catch (Throwable ex) { + throw new YarnException(ex); + } } /** @@ -410,6 +371,7 @@ public Configuration getConf() { * @param defaultValue the default implementation for fallback * @param type the class for which a retry proxy is required * @param retryPolicy the policy for retrying method call failures + * @param The type of the instance. * @return a retry proxy for the specified interface */ public static Object createRetryInstance(Configuration conf, @@ -450,163 +412,212 @@ public static T createInstance(Configuration conf, } } - private Map buildSubClusterInfoMap( - final GetSubClustersInfoResponse response) { - List subClusters = response.getSubClusters(); - Map subClustersMap = - new HashMap<>(subClusters.size()); - for (SubClusterInfo subCluster : subClusters) { - subClustersMap.put(subCluster.getSubClusterId(), subCluster); - } - return subClustersMap; + @VisibleForTesting + public FederationStateStore getStateStore() { + return stateStore; } - private Object buildGetSubClustersCacheRequest( - final boolean filterInactiveSubClusters) { - final String cacheKey = - buildCacheKey(getClass().getSimpleName(), GET_SUBCLUSTERS_CACHEID, - Boolean.toString(filterInactiveSubClusters)); - CacheRequest> cacheRequest = - new CacheRequest>(cacheKey, - new Func>() { - @Override - public Map invoke(String key) - throws Exception { - GetSubClustersInfoResponse subClusters = - stateStore.getSubClusters(GetSubClustersInfoRequest - .newInstance(filterInactiveSubClusters)); - return buildSubClusterInfoMap(subClusters); - } - }); - return cacheRequest; + /** + * Get the number of active cluster nodes. + * + * @return number of active cluster nodes. + * @throws YarnException if the call to the state store is unsuccessful. + */ + public int getActiveSubClustersCount() throws YarnException { + Map activeSubClusters = getSubClusters(true); + if (activeSubClusters == null || activeSubClusters.isEmpty()) { + return 0; + } else { + return activeSubClusters.size(); + } } - private Map buildPolicyConfigMap( - GetSubClusterPoliciesConfigurationsResponse response) { - List policyConfigs = - response.getPoliciesConfigs(); - Map queuePolicyConfigs = - new HashMap<>(); - for (SubClusterPolicyConfiguration policyConfig : policyConfigs) { - queuePolicyConfigs.put(policyConfig.getQueue(), policyConfig); + /** + * Randomly pick ActiveSubCluster. + * During the selection process, we will exclude SubClusters from the blacklist. + * + * @param activeSubClusters List of active subClusters. + * @param blackList blacklist. + * @return Active SubClusterId. + * @throws YarnException When there is no Active SubCluster, + * an exception will be thrown (No active SubCluster available to submit the request.) + */ + public static SubClusterId getRandomActiveSubCluster( + Map activeSubClusters, List blackList) + throws YarnException { + + // Check if activeSubClusters is empty, if it is empty, we need to throw an exception + if (MapUtils.isEmpty(activeSubClusters)) { + throw new FederationPolicyException( + FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE); } - return queuePolicyConfigs; - } - private Object buildGetPoliciesConfigurationsCacheRequest() { - final String cacheKey = buildCacheKey(getClass().getSimpleName(), - GET_POLICIES_CONFIGURATIONS_CACHEID, null); - CacheRequest> cacheRequest = - new CacheRequest>( - cacheKey, - new Func>() { - @Override - public Map invoke( - String key) throws Exception { - GetSubClusterPoliciesConfigurationsResponse policyConfigs = - stateStore.getPoliciesConfigurations( - GetSubClusterPoliciesConfigurationsRequest - .newInstance()); - return buildPolicyConfigMap(policyConfigs); - } - }); - return cacheRequest; - } + // Change activeSubClusters to List + List subClusterIds = new ArrayList<>(activeSubClusters.keySet()); + + // If the blacklist is not empty, we need to remove all the subClusters in the blacklist + if (CollectionUtils.isNotEmpty(blackList)) { + subClusterIds.removeAll(blackList); + } - protected String buildCacheKey(String typeName, String methodName, - String argName) { - StringBuilder buffer = new StringBuilder(); - buffer.append(typeName).append(".") - .append(methodName); - if (argName != null) { - buffer.append("::"); - buffer.append(argName); + // Check there are still active subcluster after removing the blacklist + if (CollectionUtils.isEmpty(subClusterIds)) { + throw new FederationPolicyException( + FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE); } - return buffer.toString(); + + // Randomly choose a SubCluster + return subClusterIds.get(rand.nextInt(subClusterIds.size())); } /** - * Internal class that implements the CacheLoader interface that can be - * plugged into the CacheManager to load objects into the cache for specified - * keys. + * Get the number of retries. + * + * @param configRetries User-configured number of retries. + * @return number of retries. + * @throws YarnException yarn exception. */ - private static class CacheLoaderImpl implements CacheLoader { - @SuppressWarnings("unchecked") - @Override - public V load(K key) throws CacheLoaderException { - try { - CacheRequest query = (CacheRequest) key; - assert query != null; - return query.getValue(); - } catch (Throwable ex) { - throw new CacheLoaderException(ex); - } + public int getRetryNumbers(int configRetries) throws YarnException { + int activeSubClustersCount = getActiveSubClustersCount(); + int actualRetryNums = Math.min(activeSubClustersCount, configRetries); + // Normally, we don't set a negative number for the number of retries, + // but if the user sets a negative number for the number of retries, + // we will return 0 + if (actualRetryNums < 0) { + return 0; } + return actualRetryNums; + } - @Override - public Map loadAll(Iterable keys) - throws CacheLoaderException { - // The FACADE does not use the Cache's getAll API. Hence this is not - // required to be implemented - throw new NotImplementedException("Code is not implemented"); + /** + * Query SubClusterId By applicationId. + * + * If SubClusterId is not empty, it means it exists and returns true; + * if SubClusterId is empty, it means it does not exist and returns false. + * + * @param applicationId applicationId + * @return true, SubClusterId exists; false, SubClusterId not exists. + */ + public boolean existsApplicationHomeSubCluster(ApplicationId applicationId) { + try { + SubClusterId subClusterId = getApplicationHomeSubCluster(applicationId); + if (subClusterId != null) { + return true; + } + } catch (YarnException e) { + LOG.warn("get homeSubCluster by applicationId = {} error.", applicationId, e); } + return false; } /** - * Internal class that encapsulates the cache key and a function that returns - * the value for the specified key. + * Add ApplicationHomeSubCluster to FederationStateStore. + * + * @param applicationId applicationId. + * @param homeSubCluster homeSubCluster, homeSubCluster selected according to policy. + * @throws YarnException yarn exception. */ - private static class CacheRequest { - private K key; - private Func func; - - public CacheRequest(K key, Func func) { - this.key = key; - this.func = func; + public void addApplicationHomeSubCluster(ApplicationId applicationId, + ApplicationHomeSubCluster homeSubCluster) throws YarnException { + try { + addApplicationHomeSubCluster(homeSubCluster); + } catch (YarnException e) { + String msg = String.format( + "Unable to insert the ApplicationId %s into the FederationStateStore.", applicationId); + throw new YarnException(msg, e); } + } - public V getValue() throws Exception { - return func.invoke(key); + /** + * Update ApplicationHomeSubCluster to FederationStateStore. + * + * @param subClusterId homeSubClusterId + * @param applicationId applicationId. + * @param homeSubCluster homeSubCluster, homeSubCluster selected according to policy. + * @throws YarnException yarn exception. + */ + public void updateApplicationHomeSubCluster(SubClusterId subClusterId, + ApplicationId applicationId, ApplicationHomeSubCluster homeSubCluster) throws YarnException { + try { + updateApplicationHomeSubCluster(homeSubCluster); + } catch (YarnException e) { + SubClusterId subClusterIdInStateStore = getApplicationHomeSubCluster(applicationId); + if (subClusterId == subClusterIdInStateStore) { + LOG.info("Application {} already submitted on SubCluster {}.", applicationId, subClusterId); + } else { + String msg = String.format( + "Unable to update the ApplicationId %s into the FederationStateStore.", applicationId); + throw new YarnException(msg, e); + } } + } - @Override - public int hashCode() { - final int prime = 31; - int result = 1; - result = prime * result + ((key == null) ? 0 : key.hashCode()); - return result; + /** + * Add or Update ApplicationHomeSubCluster. + * + * @param applicationId applicationId, is the id of the application. + * @param subClusterId homeSubClusterId, this is selected by strategy. + * @param retryCount number of retries. + * @throws YarnException yarn exception. + */ + public void addOrUpdateApplicationHomeSubCluster(ApplicationId applicationId, + SubClusterId subClusterId, int retryCount) throws YarnException { + Boolean exists = existsApplicationHomeSubCluster(applicationId); + ApplicationHomeSubCluster appHomeSubCluster = + ApplicationHomeSubCluster.newInstance(applicationId, subClusterId); + if (!exists || retryCount == 0) { + // persist the mapping of applicationId and the subClusterId which has + // been selected as its home. + addApplicationHomeSubCluster(applicationId, appHomeSubCluster); + } else { + // update the mapping of applicationId and the home subClusterId to + // the new subClusterId we have selected. + updateApplicationHomeSubCluster(subClusterId, applicationId, appHomeSubCluster); } + } - @SuppressWarnings("unchecked") - @Override - public boolean equals(Object obj) { - if (this == obj) { - return true; - } - if (obj == null) { - return false; - } - if (getClass() != obj.getClass()) { - return false; - } - CacheRequest other = (CacheRequest) obj; - if (key == null) { - if (other.key != null) { - return false; - } - } else if (!key.equals(other.key)) { - return false; - } - + /** + * Deregister subCluster, Update the subCluster state to + * SC_LOST、SC_DECOMMISSIONED etc. + * + * @param subClusterId subClusterId. + * @param subClusterState The state of the subCluster to be updated. + * @throws YarnException yarn exception. + * @return If Deregister subCluster is successful, return true, otherwise, return false. + */ + public boolean deregisterSubCluster(SubClusterId subClusterId, + SubClusterState subClusterState) throws YarnException { + SubClusterDeregisterRequest deregisterRequest = + SubClusterDeregisterRequest.newInstance(subClusterId, subClusterState); + SubClusterDeregisterResponse response = stateStore.deregisterSubCluster(deregisterRequest); + // If the response is not empty, deregisterSubCluster is successful. + if (response != null) { return true; } + return false; } /** - * Encapsulates a method that has one parameter and returns a value of the - * type specified by the TResult parameter. + * Get active subclusters. + * + * @return We will return a list of active subclusters as a Collection. */ - protected interface Func { - TResult invoke(T input) throws Exception; + public Collection getActiveSubClusters() + throws NotFoundException { + try { + Map subClusterMap = getSubClusters(true); + if (MapUtils.isEmpty(subClusterMap)) { + throw new NotFoundException("Not Found SubClusters."); + } + return subClusterMap.values(); + } catch (Exception e) { + LOG.error("getActiveSubClusters failed.", e); + return null; + } + } + + @VisibleForTesting + public FederationCache getFederationCache() { + return federationCache; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationStateStoreFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationStateStoreFacade.java index 56fa0524a39dca..d4f156310b86da 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationStateStoreFacade.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationStateStoreFacade.java @@ -27,6 +27,10 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.cache.FederationCache; +import org.apache.hadoop.yarn.server.federation.cache.FederationCache.ApplicationHomeSubClusterCacheResponse; +import org.apache.hadoop.yarn.server.federation.cache.FederationCache.CacheRequest; +import org.apache.hadoop.yarn.server.federation.cache.FederationJCache; import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore; import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster; @@ -41,6 +45,8 @@ import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameters; +import javax.cache.Cache; + /** * Unit tests for FederationStateStoreFacade. */ @@ -64,12 +70,14 @@ public static Collection getParameters() { private FederationStateStoreTestUtil stateStoreTestUtil; private FederationStateStoreFacade facade = FederationStateStoreFacade.getInstance(); + private Boolean isCachingEnabled; public TestFederationStateStoreFacade(Boolean isCachingEnabled) { conf = new Configuration(); if (!(isCachingEnabled.booleanValue())) { conf.setInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, 0); } + this.isCachingEnabled = isCachingEnabled; } @Before @@ -206,4 +214,31 @@ public void testAddApplicationHomeSubCluster() throws YarnException { Assert.assertEquals(subClusterId1, result); } + @Test + public void testGetApplicationHomeSubClusterCache() throws Exception { + ApplicationId appId = ApplicationId.newInstance(clusterTs, numApps + 1); + SubClusterId subClusterId1 = SubClusterId.newInstance("Home1"); + + ApplicationHomeSubCluster appHomeSubCluster = + ApplicationHomeSubCluster.newInstance(appId, subClusterId1); + SubClusterId subClusterIdAdd = facade.addApplicationHomeSubCluster(appHomeSubCluster); + + SubClusterId subClusterIdByFacade = facade.getApplicationHomeSubCluster(appId); + Assert.assertEquals(subClusterIdByFacade, subClusterIdAdd); + Assert.assertEquals(subClusterId1, subClusterIdAdd); + + if (isCachingEnabled.booleanValue()) { + FederationCache fedCache = facade.getFederationCache(); + assert fedCache instanceof FederationJCache; + FederationJCache jCache = (FederationJCache) fedCache; + String cacheKey = jCache.getAppHomeSubClusterCacheKey(appId); + Cache> cache = jCache.getCache(); + CacheRequest cacheRequest = cache.get(cacheKey); + ApplicationHomeSubClusterCacheResponse response = + ApplicationHomeSubClusterCacheResponse.class.cast(cacheRequest.getValue()); + SubClusterId subClusterIdByCache = response.getItem(); + Assert.assertEquals(subClusterIdByFacade, subClusterIdByCache); + Assert.assertEquals(subClusterId1, subClusterIdByCache); + } + } }