|
22 | 22 | import java.net.InetSocketAddress; |
23 | 23 | import java.util.Collections; |
24 | 24 | import java.util.Map; |
| 25 | +import java.util.concurrent.TimeUnit; |
25 | 26 |
|
26 | 27 | import org.apache.hadoop.classification.InterfaceAudience.Private; |
27 | 28 | import org.apache.hadoop.conf.Configuration; |
|
105 | 106 | import org.apache.hadoop.yarn.exceptions.YarnException; |
106 | 107 | import org.apache.hadoop.yarn.ipc.YarnRPC; |
107 | 108 | import org.apache.hadoop.yarn.server.router.RouterServerUtil; |
| 109 | +import org.apache.hadoop.yarn.server.router.security.RouterDelegationTokenSecretManager; |
108 | 110 | import org.apache.hadoop.yarn.server.router.security.authorize.RouterPolicyProvider; |
109 | 111 | import org.apache.hadoop.yarn.util.LRUCacheHashMap; |
110 | 112 | import org.slf4j.Logger; |
@@ -136,6 +138,8 @@ public class RouterClientRMService extends AbstractService |
136 | 138 | // and remove the oldest used ones. |
137 | 139 | private Map<String, RequestInterceptorChainWrapper> userPipelineMap; |
138 | 140 |
|
| 141 | + private RouterDelegationTokenSecretManager routerDTSecretManager; |
| 142 | + |
139 | 143 | public RouterClientRMService() { |
140 | 144 | super(RouterClientRMService.class.getName()); |
141 | 145 | } |
@@ -164,8 +168,12 @@ protected void serviceStart() throws Exception { |
164 | 168 | serverConf.getInt(YarnConfiguration.RM_CLIENT_THREAD_COUNT, |
165 | 169 | YarnConfiguration.DEFAULT_RM_CLIENT_THREAD_COUNT); |
166 | 170 |
|
| 171 | + // Initialize RouterRMDelegationTokenSecretManager. |
| 172 | + routerDTSecretManager = createRouterRMDelegationTokenSecretManager(conf); |
| 173 | + routerDTSecretManager.startThreads(); |
| 174 | + |
167 | 175 | this.server = rpc.getServer(ApplicationClientProtocol.class, this, |
168 | | - listenerEndpoint, serverConf, null, numWorkerThreads); |
| 176 | + listenerEndpoint, serverConf, routerDTSecretManager, numWorkerThreads); |
169 | 177 |
|
170 | 178 | // Enable service authorization? |
171 | 179 | if (conf.getBoolean( |
@@ -508,6 +516,13 @@ private RequestInterceptorChainWrapper initializePipeline(String user) { |
508 | 516 | ClientRequestInterceptor interceptorChain = |
509 | 517 | this.createRequestInterceptorChain(); |
510 | 518 | interceptorChain.init(user); |
| 519 | + |
| 520 | + // We set the RouterDelegationTokenSecretManager instance to the interceptorChain |
| 521 | + // and let the interceptor use it. |
| 522 | + if (routerDTSecretManager != null) { |
| 523 | + interceptorChain.setTokenSecretManager(routerDTSecretManager); |
| 524 | + } |
| 525 | + |
511 | 526 | chainWrapper.init(interceptorChain); |
512 | 527 | } catch (Exception e) { |
513 | 528 | LOG.error("Init ClientRequestInterceptor error for user: {}.", user, e); |
@@ -558,4 +573,42 @@ protected void finalize() { |
558 | 573 | public Map<String, RequestInterceptorChainWrapper> getUserPipelineMap() { |
559 | 574 | return userPipelineMap; |
560 | 575 | } |
| 576 | + |
| 577 | + /** |
| 578 | + * Create RouterRMDelegationTokenSecretManager. |
| 579 | + * In the YARN federation, the Router will replace the RM to |
| 580 | + * manage the RMDelegationToken (generate, update, cancel), |
| 581 | + * so the relevant configuration parameters still obtain the configuration parameters of the RM. |
| 582 | + * |
| 583 | + * @param conf Configuration |
| 584 | + * @return RouterDelegationTokenSecretManager. |
| 585 | + */ |
| 586 | + protected RouterDelegationTokenSecretManager createRouterRMDelegationTokenSecretManager( |
| 587 | + Configuration conf) { |
| 588 | + |
| 589 | + long secretKeyInterval = conf.getLong( |
| 590 | + YarnConfiguration.RM_DELEGATION_KEY_UPDATE_INTERVAL_KEY, |
| 591 | + YarnConfiguration.RM_DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT); |
| 592 | + |
| 593 | + long tokenMaxLifetime = conf.getLong( |
| 594 | + YarnConfiguration.RM_DELEGATION_TOKEN_MAX_LIFETIME_KEY, |
| 595 | + YarnConfiguration.RM_DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT); |
| 596 | + |
| 597 | + long tokenRenewInterval = conf.getLong( |
| 598 | + YarnConfiguration.RM_DELEGATION_TOKEN_RENEW_INTERVAL_KEY, |
| 599 | + YarnConfiguration.RM_DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT); |
| 600 | + |
| 601 | + long removeScanInterval = conf.getTimeDuration( |
| 602 | + YarnConfiguration.RM_DELEGATION_TOKEN_REMOVE_SCAN_INTERVAL_KEY, |
| 603 | + YarnConfiguration.RM_DELEGATION_TOKEN_REMOVE_SCAN_INTERVAL_DEFAULT, |
| 604 | + TimeUnit.MILLISECONDS); |
| 605 | + |
| 606 | + return new RouterDelegationTokenSecretManager(secretKeyInterval, |
| 607 | + tokenMaxLifetime, tokenRenewInterval, removeScanInterval); |
| 608 | + } |
| 609 | + |
| 610 | + @VisibleForTesting |
| 611 | + public RouterDelegationTokenSecretManager getRouterDTSecretManager() { |
| 612 | + return routerDTSecretManager; |
| 613 | + } |
561 | 614 | } |
0 commit comments