Skip to content

Commit 63b9a6a

Browse files
authored
YARN-11350. [Federation] Router Support DelegationToken With ZK. (apache#5131)
1 parent 4de8791 commit 63b9a6a

File tree

15 files changed

+1223
-40
lines changed

15 files changed

+1223
-40
lines changed

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/FederationDelegationTokenStateStore.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,4 +112,39 @@ RouterRMTokenResponse removeStoredToken(RouterRMTokenRequest request)
112112
*/
113113
RouterRMTokenResponse getTokenByRouterStoreToken(RouterRMTokenRequest request)
114114
throws YarnException, IOException;
115+
116+
/**
117+
* The Router Supports incrementDelegationTokenSeqNum.
118+
*
119+
* @return DelegationTokenSeqNum.
120+
*/
121+
int incrementDelegationTokenSeqNum();
122+
123+
/**
124+
* The Router Supports getDelegationTokenSeqNum.
125+
*
126+
* @return DelegationTokenSeqNum.
127+
*/
128+
int getDelegationTokenSeqNum();
129+
130+
/**
131+
* The Router Supports setDelegationTokenSeqNum.
132+
*
133+
* @param seqNum DelegationTokenSeqNum.
134+
*/
135+
void setDelegationTokenSeqNum(int seqNum);
136+
137+
/**
138+
* The Router Supports getCurrentKeyId.
139+
*
140+
* @return CurrentKeyId.
141+
*/
142+
int getCurrentKeyId();
143+
144+
/**
145+
* The Router Supports incrementCurrentKeyId.
146+
*
147+
* @return CurrentKeyId.
148+
*/
149+
int incrementCurrentKeyId();
115150
}

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.util.Set;
2828
import java.util.TimeZone;
2929
import java.util.concurrent.ConcurrentHashMap;
30+
import java.util.concurrent.atomic.AtomicInteger;
3031
import java.util.stream.Collectors;
3132
import java.util.Comparator;
3233

@@ -110,6 +111,8 @@ public class MemoryFederationStateStore implements FederationStateStore {
110111
private Map<String, SubClusterPolicyConfiguration> policies;
111112
private RouterRMDTSecretManagerState routerRMSecretManagerState;
112113
private int maxAppsInStateStore;
114+
private AtomicInteger sequenceNum;
115+
private AtomicInteger masterKeyId;
113116

114117
private final MonotonicClock clock = new MonotonicClock();
115118

@@ -126,6 +129,8 @@ public void init(Configuration conf) {
126129
maxAppsInStateStore = conf.getInt(
127130
YarnConfiguration.FEDERATION_STATESTORE_MAX_APPLICATIONS,
128131
YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_MAX_APPLICATIONS);
132+
sequenceNum = new AtomicInteger();
133+
masterKeyId = new AtomicInteger();
129134
}
130135

131136
@Override
@@ -534,6 +539,31 @@ public RouterRMTokenResponse getTokenByRouterStoreToken(RouterRMTokenRequest req
534539
return RouterRMTokenResponse.newInstance(resultToken);
535540
}
536541

542+
@Override
543+
public int incrementDelegationTokenSeqNum() {
544+
return sequenceNum.incrementAndGet();
545+
}
546+
547+
@Override
548+
public int getDelegationTokenSeqNum() {
549+
return sequenceNum.get();
550+
}
551+
552+
@Override
553+
public void setDelegationTokenSeqNum(int seqNum) {
554+
sequenceNum.set(seqNum);
555+
}
556+
557+
@Override
558+
public int getCurrentKeyId() {
559+
return masterKeyId.get();
560+
}
561+
562+
@Override
563+
public int incrementCurrentKeyId() {
564+
return masterKeyId.incrementAndGet();
565+
}
566+
537567
private void storeOrUpdateRouterRMDT(RMDelegationTokenIdentifier rmDTIdentifier,
538568
Long renewDate, boolean isUpdate) throws IOException {
539569
Map<RMDelegationTokenIdentifier, Long> rmDTState = routerRMSecretManagerState.getTokenState();

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1394,4 +1394,29 @@ public RouterRMTokenResponse getTokenByRouterStoreToken(RouterRMTokenRequest req
13941394
throws YarnException, IOException {
13951395
throw new NotImplementedException("Code is not implemented");
13961396
}
1397+
1398+
@Override
1399+
public int incrementDelegationTokenSeqNum() {
1400+
return 0;
1401+
}
1402+
1403+
@Override
1404+
public int getDelegationTokenSeqNum() {
1405+
return 0;
1406+
}
1407+
1408+
@Override
1409+
public void setDelegationTokenSeqNum(int seqNum) {
1410+
return;
1411+
}
1412+
1413+
@Override
1414+
public int getCurrentKeyId() {
1415+
return 0;
1416+
}
1417+
1418+
@Override
1419+
public int incrementCurrentKeyId() {
1420+
return 0;
1421+
}
13971422
}

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/ZKFederationStateStoreOpDurations.java

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,27 @@ public final class ZKFederationStateStoreOpDurations implements MetricsSource {
8989
@Metric("Duration for a update reservation homeSubCluster call")
9090
private MutableRate updateReservationHomeSubCluster;
9191

92+
@Metric("Duration for a store new master key call")
93+
private MutableRate storeNewMasterKey;
94+
95+
@Metric("Duration for a remove new master key call")
96+
private MutableRate removeStoredMasterKey;
97+
98+
@Metric("Duration for a get master key by delegation key call")
99+
private MutableRate getMasterKeyByDelegationKey;
100+
101+
@Metric("Duration for a store new token call")
102+
private MutableRate storeNewToken;
103+
104+
@Metric("Duration for a update stored token call")
105+
private MutableRate updateStoredToken;
106+
107+
@Metric("Duration for a remove stored token call")
108+
private MutableRate removeStoredToken;
109+
110+
@Metric("Duration for a get token by router store token call")
111+
private MutableRate getTokenByRouterStoreToken;
112+
92113
protected static final MetricsInfo RECORD_INFO =
93114
info("ZKFederationStateStoreOpDurations", "Durations of ZKFederationStateStore calls");
94115

@@ -187,4 +208,32 @@ public void addDeleteReservationHomeSubClusterDuration(long startTime, long endT
187208
public void addUpdateReservationHomeSubClusterDuration(long startTime, long endTime) {
188209
updateReservationHomeSubCluster.add(endTime - startTime);
189210
}
211+
212+
public void addStoreNewMasterKeyDuration(long startTime, long endTime) {
213+
storeNewMasterKey.add(endTime - startTime);
214+
}
215+
216+
public void removeStoredMasterKeyDuration(long startTime, long endTime) {
217+
removeStoredMasterKey.add(endTime - startTime);
218+
}
219+
220+
public void getMasterKeyByDelegationKeyDuration(long startTime, long endTime) {
221+
getMasterKeyByDelegationKey.add(endTime - startTime);
222+
}
223+
224+
public void getStoreNewTokenDuration(long startTime, long endTime) {
225+
storeNewToken.add(endTime - startTime);
226+
}
227+
228+
public void updateStoredTokenDuration(long startTime, long endTime) {
229+
updateStoredToken.add(endTime - startTime);
230+
}
231+
232+
public void removeStoredTokenDuration(long startTime, long endTime) {
233+
removeStoredToken.add(endTime - startTime);
234+
}
235+
236+
public void getTokenByRouterStoreTokenDuration(long startTime, long endTime) {
237+
getTokenByRouterStoreToken.add(endTime - startTime);
238+
}
190239
}

0 commit comments

Comments
 (0)