Skip to content

Commit 6288e15

Browse files
committed
HADOOP-16828. Zookeeper Delegation Token Manager fetch sequence number by batch. Contributed by Fengnan Li.
1 parent ed83c86 commit 6288e15

File tree

2 files changed

+93
-13
lines changed

2 files changed

+93
-13
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java

Lines changed: 41 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -98,12 +98,16 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
9898
+ "kerberos.keytab";
9999
public static final String ZK_DTSM_ZK_KERBEROS_PRINCIPAL = ZK_CONF_PREFIX
100100
+ "kerberos.principal";
101+
public static final String ZK_DTSM_TOKEN_SEQNUM_BATCH_SIZE = ZK_CONF_PREFIX
102+
+ "token.seqnum.batch.size";
101103

102104
public static final int ZK_DTSM_ZK_NUM_RETRIES_DEFAULT = 3;
103105
public static final int ZK_DTSM_ZK_SESSION_TIMEOUT_DEFAULT = 10000;
104106
public static final int ZK_DTSM_ZK_CONNECTION_TIMEOUT_DEFAULT = 10000;
105107
public static final int ZK_DTSM_ZK_SHUTDOWN_TIMEOUT_DEFAULT = 10000;
106108
public static final String ZK_DTSM_ZNODE_WORKING_PATH_DEAFULT = "zkdtsm";
109+
// by default it is still incrementing seq number by 1 each time
110+
public static final int ZK_DTSM_TOKEN_SEQNUM_BATCH_SIZE_DEFAULT = 1;
107111

108112
private static Logger LOG = LoggerFactory
109113
.getLogger(ZKDelegationTokenSecretManager.class);
@@ -135,6 +139,9 @@ public static void setCurator(CuratorFramework curator) {
135139
private PathChildrenCache tokenCache;
136140
private ExecutorService listenerThreadPool;
137141
private final long shutdownTimeout;
142+
private final int seqNumBatchSize;
143+
private int currentSeqNum;
144+
private int currentMaxSeqNum;
138145

139146
public ZKDelegationTokenSecretManager(Configuration conf) {
140147
super(conf.getLong(DelegationTokenManager.UPDATE_INTERVAL,
@@ -147,6 +154,8 @@ public ZKDelegationTokenSecretManager(Configuration conf) {
147154
DelegationTokenManager.REMOVAL_SCAN_INTERVAL_DEFAULT) * 1000);
148155
shutdownTimeout = conf.getLong(ZK_DTSM_ZK_SHUTDOWN_TIMEOUT,
149156
ZK_DTSM_ZK_SHUTDOWN_TIMEOUT_DEFAULT);
157+
seqNumBatchSize = conf.getInt(ZK_DTSM_TOKEN_SEQNUM_BATCH_SIZE,
158+
ZK_DTSM_TOKEN_SEQNUM_BATCH_SIZE_DEFAULT);
150159
if (CURATOR_TL.get() != null) {
151160
zkClient =
152161
CURATOR_TL.get().usingNamespace(
@@ -322,6 +331,12 @@ public void startThreads() throws IOException {
322331
if (delTokSeqCounter != null) {
323332
delTokSeqCounter.start();
324333
}
334+
// the first batch range should be allocated during this starting window
335+
// by calling the incrSharedCount
336+
currentSeqNum = incrSharedCount(delTokSeqCounter, seqNumBatchSize);
337+
currentMaxSeqNum = currentSeqNum + seqNumBatchSize;
338+
LOG.info("Fetched initial range of seq num, from {} to {} ",
339+
currentSeqNum+1, currentMaxSeqNum);
325340
} catch (Exception e) {
326341
throw new IOException("Could not start Sequence Counter", e);
327342
}
@@ -562,28 +577,41 @@ protected int getDelegationTokenSeqNum() {
562577
return delTokSeqCounter.getCount();
563578
}
564579

565-
private void incrSharedCount(SharedCount sharedCount) throws Exception {
580+
private int incrSharedCount(SharedCount sharedCount, int batchSize)
581+
throws Exception {
566582
while (true) {
567583
// Loop until we successfully increment the counter
568584
VersionedValue<Integer> versionedValue = sharedCount.getVersionedValue();
569-
if (sharedCount.trySetCount(versionedValue, versionedValue.getValue() + 1)) {
570-
break;
585+
if (sharedCount.trySetCount(
586+
versionedValue, versionedValue.getValue() + batchSize)) {
587+
return versionedValue.getValue();
571588
}
572589
}
573590
}
574591

575592
@Override
576593
protected int incrementDelegationTokenSeqNum() {
577-
try {
578-
incrSharedCount(delTokSeqCounter);
579-
} catch (InterruptedException e) {
580-
// The ExpirationThread is just finishing.. so dont do anything..
581-
LOG.debug("Thread interrupted while performing token counter increment", e);
582-
Thread.currentThread().interrupt();
583-
} catch (Exception e) {
584-
throw new RuntimeException("Could not increment shared counter !!", e);
594+
// The secret manager will keep a local range of seq num which won't be
595+
// seen by peers, so only when the range is exhausted it will ask zk for
596+
// another range again
597+
if (currentSeqNum >= currentMaxSeqNum) {
598+
try {
599+
// after a successful batch request, we can get the range starting point
600+
currentSeqNum = incrSharedCount(delTokSeqCounter, seqNumBatchSize);
601+
currentMaxSeqNum = currentSeqNum + seqNumBatchSize;
602+
LOG.info("Fetched new range of seq num, from {} to {} ",
603+
currentSeqNum+1, currentMaxSeqNum);
604+
} catch (InterruptedException e) {
605+
// The ExpirationThread is just finishing.. so dont do anything..
606+
LOG.debug(
607+
"Thread interrupted while performing token counter increment", e);
608+
Thread.currentThread().interrupt();
609+
} catch (Exception e) {
610+
throw new RuntimeException("Could not increment shared counter !!", e);
611+
}
585612
}
586-
return delTokSeqCounter.getCount();
613+
614+
return ++currentSeqNum;
587615
}
588616

589617
@Override
@@ -603,7 +631,7 @@ protected int getCurrentKeyId() {
603631
@Override
604632
protected int incrementCurrentKeyId() {
605633
try {
606-
incrSharedCount(keyIdSeqCounter);
634+
incrSharedCount(keyIdSeqCounter, 1);
607635
} catch (InterruptedException e) {
608636
// The ExpirationThread is just finishing.. so dont do anything..
609637
LOG.debug("Thread interrupted while performing keyId increment", e);

hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/TestZKDelegationTokenSecretManager.java

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,58 @@ public void testNodeUpAferAWhile() throws Exception {
217217
}
218218
}
219219

220+
@SuppressWarnings("unchecked")
221+
@Test
222+
public void testMultiNodeCompeteForSeqNum() throws Exception {
223+
DelegationTokenManager tm1, tm2 = null;
224+
String connectString = zkServer.getConnectString();
225+
Configuration conf = getSecretConf(connectString);
226+
conf.setInt(
227+
ZKDelegationTokenSecretManager.ZK_DTSM_TOKEN_SEQNUM_BATCH_SIZE, 1000);
228+
tm1 = new DelegationTokenManager(conf, new Text("bla"));
229+
tm1.init();
230+
231+
Token<DelegationTokenIdentifier> token1 =
232+
(Token<DelegationTokenIdentifier>) tm1.createToken(
233+
UserGroupInformation.getCurrentUser(), "foo");
234+
Assert.assertNotNull(token1);
235+
AbstractDelegationTokenIdentifier id1 =
236+
tm1.getDelegationTokenSecretManager().decodeTokenIdentifier(token1);
237+
Assert.assertEquals(
238+
"Token seq should be the same", 1, id1.getSequenceNumber());
239+
Token<DelegationTokenIdentifier> token2 =
240+
(Token<DelegationTokenIdentifier>) tm1.createToken(
241+
UserGroupInformation.getCurrentUser(), "foo");
242+
Assert.assertNotNull(token2);
243+
AbstractDelegationTokenIdentifier id2 =
244+
tm1.getDelegationTokenSecretManager().decodeTokenIdentifier(token2);
245+
Assert.assertEquals(
246+
"Token seq should be the same", 2, id2.getSequenceNumber());
247+
248+
tm2 = new DelegationTokenManager(conf, new Text("bla"));
249+
tm2.init();
250+
251+
Token<DelegationTokenIdentifier> token3 =
252+
(Token<DelegationTokenIdentifier>) tm2.createToken(
253+
UserGroupInformation.getCurrentUser(), "foo");
254+
Assert.assertNotNull(token3);
255+
AbstractDelegationTokenIdentifier id3 =
256+
tm2.getDelegationTokenSecretManager().decodeTokenIdentifier(token3);
257+
Assert.assertEquals(
258+
"Token seq should be the same", 1001, id3.getSequenceNumber());
259+
Token<DelegationTokenIdentifier> token4 =
260+
(Token<DelegationTokenIdentifier>) tm2.createToken(
261+
UserGroupInformation.getCurrentUser(), "foo");
262+
Assert.assertNotNull(token4);
263+
AbstractDelegationTokenIdentifier id4 =
264+
tm2.getDelegationTokenSecretManager().decodeTokenIdentifier(token4);
265+
Assert.assertEquals(
266+
"Token seq should be the same", 1002, id4.getSequenceNumber());
267+
268+
verifyDestroy(tm1, conf);
269+
verifyDestroy(tm2, conf);
270+
}
271+
220272
@SuppressWarnings("unchecked")
221273
@Test
222274
public void testRenewTokenSingleManager() throws Exception {

0 commit comments

Comments
 (0)