Skip to content

Commit 213f478

Browse files
ZanderXuzengqiang.xu
authored andcommitted
HDFS-17413. [FGL] CacheReplicationMonitor supports fine-grained lock (#6641)
1 parent 3214795 commit 213f478

File tree

3 files changed

+40
-38
lines changed

3 files changed

+40
-38
lines changed

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import org.apache.hadoop.hdfs.server.namenode.INode;
4949
import org.apache.hadoop.hdfs.server.namenode.INodeDirectory;
5050
import org.apache.hadoop.hdfs.server.namenode.INodeFile;
51+
import org.apache.hadoop.hdfs.server.namenode.fgl.FSNamesystemLockMode;
5152
import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
5253
import org.apache.hadoop.hdfs.util.ReadOnlyList;
5354
import org.apache.hadoop.util.GSet;
@@ -223,7 +224,7 @@ public void run() {
223224
* after are not atomic.
224225
*/
225226
public void waitForRescanIfNeeded() {
226-
Preconditions.checkArgument(!namesystem.hasWriteLock(),
227+
Preconditions.checkArgument(!namesystem.hasWriteLock(FSNamesystemLockMode.FS),
227228
"Must not hold the FSN write lock when waiting for a rescan.");
228229
Preconditions.checkArgument(lock.isHeldByCurrentThread(),
229230
"Must hold the CRM lock when waiting for a rescan.");
@@ -268,7 +269,7 @@ public void setNeedsRescan() {
268269
*/
269270
@Override
270271
public void close() throws IOException {
271-
Preconditions.checkArgument(namesystem.hasWriteLock());
272+
Preconditions.checkArgument(namesystem.hasWriteLock(FSNamesystemLockMode.GLOBAL));
272273
lock.lock();
273274
try {
274275
if (shutdown) return;
@@ -291,7 +292,7 @@ private void rescan() throws InterruptedException {
291292
scannedBlocks = 0;
292293
lastScanTimeMs = Time.monotonicNow();
293294
try {
294-
namesystem.writeLock();
295+
namesystem.writeLock(FSNamesystemLockMode.GLOBAL);
295296
try {
296297
lock.lock();
297298
if (shutdown) {
@@ -308,7 +309,7 @@ private void rescan() throws InterruptedException {
308309
rescanCachedBlockMap();
309310
blockManager.getDatanodeManager().resetLastCachingDirectiveSentTime();
310311
} finally {
311-
namesystem.writeUnlock("cacheReplicationMonitorRescan");
312+
namesystem.writeUnlock(FSNamesystemLockMode.GLOBAL, "cacheReplicationMonitorRescan");
312313
}
313314
}
314315

@@ -325,11 +326,11 @@ private void reacquireLock(long last) {
325326
long now = Time.monotonicNow();
326327
if (now - last > cacheManager.getMaxLockTimeMs()) {
327328
try {
328-
namesystem.writeUnlock();
329+
namesystem.writeUnlock(FSNamesystemLockMode.GLOBAL, "cacheReplicationMonitorRescan");
329330
Thread.sleep(cacheManager.getSleepTimeMs());
330331
} catch (InterruptedException e) {
331332
} finally {
332-
namesystem.writeLock();
333+
namesystem.writeLock(FSNamesystemLockMode.GLOBAL);
333334
}
334335
}
335336
}

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@
8080
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList.Type;
8181
import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;
8282
import org.apache.hadoop.hdfs.server.namenode.FsImageProto.CacheManagerSection;
83+
import org.apache.hadoop.hdfs.server.namenode.fgl.FSNamesystemLockMode;
8384
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
8485
import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
8586
import org.apache.hadoop.hdfs.server.namenode.startupprogress.Phase;
@@ -317,7 +318,7 @@ public void stopMonitorThread() {
317318
}
318319

319320
public void clearDirectiveStats() {
320-
assert namesystem.hasWriteLock();
321+
assert namesystem.hasWriteLock(FSNamesystemLockMode.FS);
321322
for (CacheDirective directive : directivesById.values()) {
322323
directive.resetStatistics();
323324
}
@@ -327,26 +328,26 @@ public void clearDirectiveStats() {
327328
* @return Unmodifiable view of the collection of CachePools.
328329
*/
329330
public Collection<CachePool> getCachePools() {
330-
assert namesystem.hasReadLock();
331+
assert namesystem.hasReadLock(FSNamesystemLockMode.FS);
331332
return Collections.unmodifiableCollection(cachePools.values());
332333
}
333334

334335
/**
335336
* @return Unmodifiable view of the collection of CacheDirectives.
336337
*/
337338
public Collection<CacheDirective> getCacheDirectives() {
338-
assert namesystem.hasReadLock();
339+
assert namesystem.hasReadLock(FSNamesystemLockMode.FS);
339340
return Collections.unmodifiableCollection(directivesById.values());
340341
}
341342

342343
@VisibleForTesting
343344
public GSet<CachedBlock, CachedBlock> getCachedBlocks() {
344-
assert namesystem.hasReadLock();
345+
assert namesystem.hasReadLock(FSNamesystemLockMode.BM);
345346
return cachedBlocks;
346347
}
347348

348349
private long getNextDirectiveId() throws IOException {
349-
assert namesystem.hasWriteLock();
350+
assert namesystem.hasWriteLock(FSNamesystemLockMode.FS);
350351
if (nextDirectiveId >= Long.MAX_VALUE - 1) {
351352
throw new IOException("No more available IDs.");
352353
}
@@ -574,7 +575,7 @@ CacheDirectiveInfo addDirectiveFromEditLog(CacheDirectiveInfo directive)
574575
public CacheDirectiveInfo addDirective(
575576
CacheDirectiveInfo info, FSPermissionChecker pc, EnumSet<CacheFlag> flags)
576577
throws IOException {
577-
assert namesystem.hasWriteLock();
578+
assert namesystem.hasWriteLock(FSNamesystemLockMode.FS);
578579
CacheDirective directive;
579580
try {
580581
CachePool pool = getCachePool(validatePoolName(info));
@@ -652,7 +653,7 @@ void modifyDirectiveFromEditLog(CacheDirectiveInfo info)
652653

653654
public void modifyDirective(CacheDirectiveInfo info,
654655
FSPermissionChecker pc, EnumSet<CacheFlag> flags) throws IOException {
655-
assert namesystem.hasWriteLock();
656+
assert namesystem.hasWriteLock(FSNamesystemLockMode.FS);
656657
String idString =
657658
(info.getId() == null) ?
658659
"(null)" : info.getId().toString();
@@ -703,7 +704,7 @@ public void modifyDirective(CacheDirectiveInfo info,
703704

704705
private void removeInternal(CacheDirective directive)
705706
throws InvalidRequestException {
706-
assert namesystem.hasWriteLock();
707+
assert namesystem.hasWriteLock(FSNamesystemLockMode.FS);
707708
// Remove the corresponding entry in directivesByPath.
708709
String path = directive.getPath();
709710
if (!directivesByPath.remove(path, directive)) {
@@ -724,7 +725,7 @@ private void removeInternal(CacheDirective directive)
724725

725726
public void removeDirective(long id, FSPermissionChecker pc)
726727
throws IOException {
727-
assert namesystem.hasWriteLock();
728+
assert namesystem.hasWriteLock(FSNamesystemLockMode.FS);
728729
try {
729730
CacheDirective directive = getById(id);
730731
checkWritePermission(pc, directive.getPool());
@@ -740,7 +741,7 @@ public void removeDirective(long id, FSPermissionChecker pc)
740741
listCacheDirectives(long prevId,
741742
CacheDirectiveInfo filter,
742743
FSPermissionChecker pc) throws IOException {
743-
assert namesystem.hasReadLock();
744+
assert namesystem.hasReadLock(FSNamesystemLockMode.FS);
744745
final int NUM_PRE_ALLOCATED_ENTRIES = 16;
745746
String filterPath = null;
746747
if (filter.getPath() != null) {
@@ -815,7 +816,7 @@ public void removeDirective(long id, FSPermissionChecker pc)
815816
*/
816817
public CachePoolInfo addCachePool(CachePoolInfo info)
817818
throws IOException {
818-
assert namesystem.hasWriteLock();
819+
assert namesystem.hasWriteLock(FSNamesystemLockMode.FS);
819820
CachePool pool;
820821
try {
821822
CachePoolInfo.validate(info);
@@ -845,7 +846,7 @@ public CachePoolInfo addCachePool(CachePoolInfo info)
845846
*/
846847
public void modifyCachePool(CachePoolInfo info)
847848
throws IOException {
848-
assert namesystem.hasWriteLock();
849+
assert namesystem.hasWriteLock(FSNamesystemLockMode.FS);
849850
StringBuilder bld = new StringBuilder();
850851
try {
851852
CachePoolInfo.validate(info);
@@ -915,7 +916,7 @@ public void modifyCachePool(CachePoolInfo info)
915916
*/
916917
public void removeCachePool(String poolName)
917918
throws IOException {
918-
assert namesystem.hasWriteLock();
919+
assert namesystem.hasWriteLock(FSNamesystemLockMode.FS);
919920
try {
920921
CachePoolInfo.validateName(poolName);
921922
CachePool pool = cachePools.remove(poolName);
@@ -941,7 +942,7 @@ public void removeCachePool(String poolName)
941942

942943
public BatchedListEntries<CachePoolEntry>
943944
listCachePools(FSPermissionChecker pc, String prevKey) {
944-
assert namesystem.hasReadLock();
945+
assert namesystem.hasReadLock(FSNamesystemLockMode.FS);
945946
final int NUM_PRE_ALLOCATED_ENTRIES = 16;
946947
ArrayList<CachePoolEntry> results =
947948
new ArrayList<CachePoolEntry>(NUM_PRE_ALLOCATED_ENTRIES);
@@ -1008,7 +1009,7 @@ public final void processCacheReport(final DatanodeID datanodeID,
10081009
datanodeID, DFS_NAMENODE_CACHING_ENABLED_KEY, blockIds.size());
10091010
return;
10101011
}
1011-
namesystem.writeLock();
1012+
namesystem.writeLock(FSNamesystemLockMode.BM);
10121013
final long startTime = Time.monotonicNow();
10131014
final long endTime;
10141015
try {
@@ -1022,7 +1023,7 @@ public final void processCacheReport(final DatanodeID datanodeID,
10221023
processCacheReportImpl(datanode, blockIds);
10231024
} finally {
10241025
endTime = Time.monotonicNow();
1025-
namesystem.writeUnlock("processCacheReport");
1026+
namesystem.writeUnlock(FSNamesystemLockMode.BM, "processCacheReport");
10261027
}
10271028

10281029
// Log the block report processing stats from Namenode perspective

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -7767,7 +7767,7 @@ long addCacheDirective(CacheDirectiveInfo directive,
77677767
checkOperation(OperationCategory.WRITE);
77687768
FSPermissionChecker.setOperationType(operationName);
77697769
try {
7770-
writeLock();
7770+
writeLock(FSNamesystemLockMode.FS);
77717771
try {
77727772
checkOperation(OperationCategory.WRITE);
77737773
checkNameNodeSafeMode("Cannot add cache directive");
@@ -7776,7 +7776,7 @@ long addCacheDirective(CacheDirectiveInfo directive,
77767776
} finally {
77777777
effectiveDirectiveStr = effectiveDirective != null ?
77787778
effectiveDirective.toString() : null;
7779-
writeUnlock(operationName,
7779+
writeUnlock(FSNamesystemLockMode.FS, operationName,
77807780
getLockReportInfoSupplier(effectiveDirectiveStr));
77817781
}
77827782
} catch (AccessControlException ace) {
@@ -7800,14 +7800,14 @@ void modifyCacheDirective(CacheDirectiveInfo directive,
78007800
FSPermissionChecker.setOperationType(operationName);
78017801
checkOperation(OperationCategory.WRITE);
78027802
try {
7803-
writeLock();
7803+
writeLock(FSNamesystemLockMode.FS);
78047804
try {
78057805
checkOperation(OperationCategory.WRITE);
78067806
checkNameNodeSafeMode("Cannot add cache directive");
78077807
FSNDNCacheOp.modifyCacheDirective(this, cacheManager, directive, flags,
78087808
logRetryCache);
78097809
} finally {
7810-
writeUnlock(operationName,
7810+
writeUnlock(FSNamesystemLockMode.FS, operationName,
78117811
getLockReportInfoSupplier(idStr, directive.toString()));
78127812
}
78137813
} catch (AccessControlException ace) {
@@ -7826,14 +7826,14 @@ void removeCacheDirective(long id, boolean logRetryCache) throws IOException {
78267826
checkOperation(OperationCategory.WRITE);
78277827
FSPermissionChecker.setOperationType(operationName);
78287828
try {
7829-
writeLock();
7829+
writeLock(FSNamesystemLockMode.FS);
78307830
try {
78317831
checkOperation(OperationCategory.WRITE);
78327832
checkNameNodeSafeMode("Cannot remove cache directives");
78337833
FSNDNCacheOp.removeCacheDirective(this, cacheManager, id,
78347834
logRetryCache);
78357835
} finally {
7836-
writeUnlock(operationName, getLockReportInfoSupplier(idStr));
7836+
writeUnlock(FSNamesystemLockMode.FS, operationName, getLockReportInfoSupplier(idStr));
78377837
}
78387838
} catch (AccessControlException ace) {
78397839
logAuditEvent(false, operationName, idStr, null, null);
@@ -7851,13 +7851,13 @@ BatchedListEntries<CacheDirectiveEntry> listCacheDirectives(
78517851
BatchedListEntries<CacheDirectiveEntry> results;
78527852
cacheManager.waitForRescanIfNeeded();
78537853
try {
7854-
readLock();
7854+
readLock(FSNamesystemLockMode.FS);
78557855
try {
78567856
checkOperation(OperationCategory.READ);
78577857
results = FSNDNCacheOp.listCacheDirectives(this, cacheManager, startId,
78587858
filter);
78597859
} finally {
7860-
readUnlock(operationName,
7860+
readUnlock(FSNamesystemLockMode.FS, operationName,
78617861
getLockReportInfoSupplier(filter.toString()));
78627862
}
78637863
} catch (AccessControlException ace) {
@@ -7876,7 +7876,7 @@ void addCachePool(CachePoolInfo req, boolean logRetryCache)
78767876
String poolName = req == null ? null : req.getPoolName();
78777877
checkSuperuserPrivilege(operationName, poolName);
78787878
try {
7879-
writeLock();
7879+
writeLock(FSNamesystemLockMode.FS);
78807880
try {
78817881
checkOperation(OperationCategory.WRITE);
78827882
checkNameNodeSafeMode("Cannot add cache pool"
@@ -7885,7 +7885,7 @@ void addCachePool(CachePoolInfo req, boolean logRetryCache)
78857885
logRetryCache);
78867886
poolInfoStr = info.toString();
78877887
} finally {
7888-
writeUnlock(operationName, getLockReportInfoSupplier(poolInfoStr));
7888+
writeUnlock(FSNamesystemLockMode.FS, operationName, getLockReportInfoSupplier(poolInfoStr));
78897889
}
78907890
} catch (AccessControlException ace) {
78917891
logAuditEvent(false, operationName, poolInfoStr);
@@ -7903,14 +7903,14 @@ void modifyCachePool(CachePoolInfo req, boolean logRetryCache)
79037903
(req == null ? null : req.getPoolName()) + "}";
79047904
checkSuperuserPrivilege(operationName, poolNameStr);
79057905
try {
7906-
writeLock();
7906+
writeLock(FSNamesystemLockMode.FS);
79077907
try {
79087908
checkOperation(OperationCategory.WRITE);
79097909
checkNameNodeSafeMode("Cannot modify cache pool"
79107910
+ (req == null ? null : req.getPoolName()));
79117911
FSNDNCacheOp.modifyCachePool(this, cacheManager, req, logRetryCache);
79127912
} finally {
7913-
writeUnlock(operationName, getLockReportInfoSupplier(poolNameStr,
7913+
writeUnlock(FSNamesystemLockMode.FS, operationName, getLockReportInfoSupplier(poolNameStr,
79147914
req == null ? null : req.toString()));
79157915
}
79167916
} catch (AccessControlException ace) {
@@ -7930,14 +7930,14 @@ void removeCachePool(String cachePoolName, boolean logRetryCache)
79307930
String poolNameStr = "{poolName: " + cachePoolName + "}";
79317931
checkSuperuserPrivilege(operationName, poolNameStr);
79327932
try {
7933-
writeLock();
7933+
writeLock(FSNamesystemLockMode.FS);
79347934
try {
79357935
checkOperation(OperationCategory.WRITE);
79367936
checkNameNodeSafeMode("Cannot modify cache pool" + cachePoolName);
79377937
FSNDNCacheOp.removeCachePool(this, cacheManager, cachePoolName,
79387938
logRetryCache);
79397939
} finally {
7940-
writeUnlock(operationName, getLockReportInfoSupplier(poolNameStr));
7940+
writeUnlock(FSNamesystemLockMode.FS, operationName, getLockReportInfoSupplier(poolNameStr));
79417941
}
79427942
} catch (AccessControlException ace) {
79437943
logAuditEvent(false, operationName, poolNameStr);
@@ -7955,12 +7955,12 @@ BatchedListEntries<CachePoolEntry> listCachePools(String prevKey)
79557955
FSPermissionChecker.setOperationType(operationName);
79567956
cacheManager.waitForRescanIfNeeded();
79577957
try {
7958-
readLock();
7958+
readLock(FSNamesystemLockMode.FS);
79597959
try {
79607960
checkOperation(OperationCategory.READ);
79617961
results = FSNDNCacheOp.listCachePools(this, cacheManager, prevKey);
79627962
} finally {
7963-
readUnlock(operationName, getLockReportInfoSupplier(null));
7963+
readUnlock(FSNamesystemLockMode.FS, operationName, getLockReportInfoSupplier(null));
79647964
}
79657965
} catch (AccessControlException ace) {
79667966
logAuditEvent(false, operationName, null);

0 commit comments

Comments
 (0)