Skip to content

Commit

Permalink
YARN-11158. Support getDelegationToken, renewDelegationToken, cancelD…
Browse files Browse the repository at this point in the history
…elegationToken API's for Federation.
  • Loading branch information
slfan1989 committed Jul 21, 2022
1 parent 479371e commit f277b75
Show file tree
Hide file tree
Showing 7 changed files with 80 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,11 @@
<name>nfs3.mountd.port</name>
<value>4272</value>
</property>

<property>
<name>hadoop.zk.address</name>
<value>127.0.0.1:2181</value>
<description>Host:Port of the ZooKeeper server to be used.
</description>
</property>
</configuration>
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ public class RouterClientRMService extends AbstractService
// and remove the oldest used ones.
private Map<String, RequestInterceptorChainWrapper> userPipelineMap;

private ZKCuratorManager zkManager;

public RouterClientRMService() {
super(RouterClientRMService.class.getName());
}
Expand All @@ -161,7 +163,7 @@ protected void serviceInit(Configuration conf) throws Exception {
YarnConfiguration.RM_DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT);
this.routerDTSecretManager = new RouterDelegationTokenSecretManager(secretKeyInterval,
tokenMaxLifetime, tokenRenewInterval, 3600000);
ZKCuratorManager zkManager = new ZKCuratorManager(conf);
zkManager = new ZKCuratorManager(conf);
zkManager.start();
this.tokenFetcher = new ZKDelegationTokenFetcher(conf, zkManager, routerDTSecretManager);
super.serviceInit(conf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,19 @@ public DelegationTokenFetcher(RouterDelegationTokenSecretManager secretManager)
this.secretManager = secretManager;
}

protected void updateToken(RMDelegationTokenIdentifier identifier, long renewDate) throws IOException {
protected void updateToken(RMDelegationTokenIdentifier identifier, long renewDate)
throws IOException {
secretManager.addPersistedDelegationToken(identifier, renewDate);
}

protected void removeToken(Token<RMDelegationTokenIdentifier> token, String user) throws IOException {
protected void removeToken(Token<RMDelegationTokenIdentifier> token, String user)
throws IOException {
secretManager.cancelToken(token, user);
}

protected void updateMasterKey(DelegationKey key) throws IOException {
secretManager.addKey(key);
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,12 @@

public class RouterDelegationKey extends DelegationKey {
private String clusterId;

public RouterDelegationKey(String subClusterId, DelegationKey key) {
super(key.getKeyId(), key.getExpiryDate(), key.getEncodedKey());
this.clusterId = subClusterId;
}

public String getClusterId() {
return clusterId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ public synchronized void addKey(DelegationKey key) throws IOException {
this.allKeys.put(key.getKeyId(), key);
}

@Override
protected void removeStoredMasterKey(DelegationKey key) {
super.removeStoredMasterKey(key);
}

@Override
public synchronized void addPersistedDelegationToken(
RMDelegationTokenIdentifier identifier, long renewDate) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,12 @@ public RouterRMDelegationTokenIdentifier(String cluster,
setMaxDate(rmDTIdentifier.getMaxDate());
this.cluster = cluster;
}

public String getCluster() {
return cluster;
}

public void setCluster(String cluster) {
this.cluster = cluster;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,8 @@
*/
package org.apache.hadoop.yarn.server.router.security;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.framework.recipes.cache.CuratorCache;
import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.DelegationKey;
Expand All @@ -34,6 +31,7 @@

import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;

public class ZKDelegationTokenFetcher extends DelegationTokenFetcher {

Expand All @@ -53,102 +51,70 @@ public class ZKDelegationTokenFetcher extends DelegationTokenFetcher {
private String rootPath;

public ZKDelegationTokenFetcher(Configuration conf, ZKCuratorManager zkcuratorManager,
RouterDelegationTokenSecretManager secretManager) throws Exception {
RouterDelegationTokenSecretManager secretManager) {
super(secretManager);
this.zkManager = zkcuratorManager;
this.rootPath = conf.get("yarn.resourcemanager.zk-state-store.rootpath", "/federation");
}

@Override
public void start() throws Exception {
PathChildrenCache subClusterCache =
new PathChildrenCache(zkManager.getCurator(), rootPath, true);
subClusterCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
subClusterCache.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) {
switch (event.getType()) {
case CHILD_ADDED:
case CHILD_UPDATED:
case CHILD_REMOVED:
default:
break;
}
}
});

for (ChildData data : subClusterCache.getCurrentData()) {
processSubCluster(data.getPath(), data.getData());
}
CuratorCache curatorCache = CuratorCache.build(zkManager.getCurator(), rootPath);
curatorCache.start();
CuratorCacheListener listener = CuratorCacheListener.builder()
.forInitialized(() -> LOG.info("Cache initialized."))
.build();
curatorCache.listenable().addListener(listener);
curatorCache.stream().forEach(data -> processSubCluster(data.getPath()));
}

private void processSubCluster(String path, byte[] data) throws Exception {
private void processSubCluster(String path) {
LOG.info("Monitor SubCluster path: {}.", path);
rootPath = path + "/" + ROOT_ZNODE_NAME + "/" + RM_DT_SECRET_MANAGER_ROOT;
zkManager.createRootDirRecursively(rootPath);
monitorMasterKey(rootPath + "/" + RM_DT_MASTER_KEYS_ROOT_ZNODE_NAME);
monitorDelegationToken(rootPath + "/" + RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME);
try {
zkManager.createRootDirRecursively(rootPath);
monitorMasterKey(rootPath + "/" + RM_DT_MASTER_KEYS_ROOT_ZNODE_NAME);
monitorDelegationToken(rootPath + "/" + RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME);
} catch (Exception e) {
throw new RuntimeException(e);
}
}

private void monitorDelegationToken(String path) throws Exception {
if (!zkManager.exists(path)) {
zkManager.create(path);
}
LOG.info("Monitor DelegationToken path: {}.", path);
PathChildrenCache delegationTokenCache =
new PathChildrenCache(zkManager.getCurator(), path, true);
delegationTokenCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);

PathChildrenCacheListener listener = new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event)
throws Exception {
LOG.info("Path: {}, Type: {}.", event.getData().getPath(), event.getType());
switch (event.getType()) {
case CHILD_ADDED:
case CHILD_UPDATED:
processDTNode(event.getData().getPath(), event.getData().getData(), true);
break;
case CHILD_REMOVED:
processDTNode(event.getData().getPath(), event.getData().getData(), false);
break;
default:
break;
}
}
};
delegationTokenCache.getListenable().addListener(listener);
for (ChildData data : delegationTokenCache.getCurrentData()) {
processDTNode(data.getPath(), data.getData(), true);
}
CuratorCache curatorCache = CuratorCache.build(zkManager.getCurator(), path);
curatorCache.start();
CuratorCacheListener listener =
CuratorCacheListener.builder().
forCreatesAndChanges((oldNode, node) ->
processDTNode(node.getPath(), node.getData(), true)).
forDeletes(node ->
processDTNode(node.getPath(), node.getData(), true)).build();

curatorCache.listenable().addListener(listener);
curatorCache.stream().forEach(data -> processDTNode(data.getPath(), data.getData(), true));
}

private void monitorMasterKey(String path) throws Exception {
if (!zkManager.exists(path)) {
zkManager.create(path);
}
LOG.info("Monitor MasterKey path: {}.", path);
PathChildrenCache masterKeyCache = new PathChildrenCache(zkManager.getCurator(), path, true);
masterKeyCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
PathChildrenCacheListener listener = (client, event) -> {
LOG.info("Path: {}, Type: {}.", event.getData().getPath(), event.getType());
switch (event.getType()) {
case CHILD_ADDED:
case CHILD_UPDATED:
processKeyNode(event.getData().getPath(), event.getData().getData(), true);
break;
case CHILD_REMOVED:
default:
break;
}
};
masterKeyCache.getListenable().addListener(listener);
for (ChildData data : masterKeyCache.getCurrentData()) {
processKeyNode(data.getPath(), data.getData(), true);
}
CuratorCache masterKeyCache = CuratorCache.build(zkManager.getCurator(), path);
masterKeyCache.start();
CuratorCacheListener listener =
CuratorCacheListener.builder().
forCreatesAndChanges((oldNode, node) ->
processKeyNode(node.getPath(), node.getData(), true)).build();
masterKeyCache.listenable().addListener(listener);
masterKeyCache.stream().forEach(data ->
processDTNode(data.getPath(), data.getData(), true));
}

private void processKeyNode(String path, byte[] data, boolean isUpdate) throws Exception {
private void processKeyNode(String path, byte[] data, boolean isUpdate) {
if (!getChildName(path).startsWith(DELEGATION_KEY_PREFIX)) {
LOG.info("Path: {} is not start with {}.", path, DELEGATION_KEY_PREFIX);
return;
Expand All @@ -169,11 +135,13 @@ private void processKeyNode(String path, byte[] data, boolean isUpdate) throws E
key.getKeyId(), key.getExpiryDate());
}
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}

private void processDTNode(String path, byte[] data, boolean isUpdate) throws Exception {
private void processDTNode(String path, byte[] data, boolean isUpdate) {
if (!getChildName(path).startsWith(DELEGATION_TOKEN_PREFIX)) {
LOG.info("Path: {} is not start with {}.", path, DELEGATION_TOKEN_PREFIX);
return;
Expand All @@ -199,13 +167,16 @@ private void processDTNode(String path, byte[] data, boolean isUpdate) throws Ex
identifier, renewDate);
}
} else {
Token fakeToken = new Token(identifier.getBytes(), null, null, null);
Token<RMDelegationTokenIdentifier> fakeToken =
new Token<>(identifier.getBytes(), null, null, null);
removeToken(fakeToken, identifier.getUser().getUserName());
if (LOG.isInfoEnabled()) {
LOG.info("Removed RMDelegationTokenIdentifier: {}, renewDate: {}.",
identifier, renewDate);
}
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
Expand Down

0 comments on commit f277b75

Please sign in to comment.