Skip to content

Commit 4a70d48

Browse files
committed
HDFS-17413. [FGL] CacheReplicationMonitor supports fine-grained lock (#6641)
1 parent 435787c commit 4a70d48

File tree

3 files changed

+40
-38
lines changed

3 files changed

+40
-38
lines changed

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import org.apache.hadoop.hdfs.server.namenode.INode;
4949
import org.apache.hadoop.hdfs.server.namenode.INodeDirectory;
5050
import org.apache.hadoop.hdfs.server.namenode.INodeFile;
51+
import org.apache.hadoop.hdfs.server.namenode.fgl.FSNamesystemLockMode;
5152
import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
5253
import org.apache.hadoop.hdfs.util.ReadOnlyList;
5354
import org.apache.hadoop.util.GSet;
@@ -223,7 +224,7 @@ public void run() {
223224
* after are not atomic.
224225
*/
225226
public void waitForRescanIfNeeded() {
226-
Preconditions.checkArgument(!namesystem.hasWriteLock(),
227+
Preconditions.checkArgument(!namesystem.hasWriteLock(FSNamesystemLockMode.FS),
227228
"Must not hold the FSN write lock when waiting for a rescan.");
228229
Preconditions.checkArgument(lock.isHeldByCurrentThread(),
229230
"Must hold the CRM lock when waiting for a rescan.");
@@ -268,7 +269,7 @@ public void setNeedsRescan() {
268269
*/
269270
@Override
270271
public void close() throws IOException {
271-
Preconditions.checkArgument(namesystem.hasWriteLock());
272+
Preconditions.checkArgument(namesystem.hasWriteLock(FSNamesystemLockMode.GLOBAL));
272273
lock.lock();
273274
try {
274275
if (shutdown) return;
@@ -291,7 +292,7 @@ private void rescan() throws InterruptedException {
291292
scannedBlocks = 0;
292293
lastScanTimeMs = Time.monotonicNow();
293294
try {
294-
namesystem.writeLock();
295+
namesystem.writeLock(FSNamesystemLockMode.GLOBAL);
295296
try {
296297
lock.lock();
297298
if (shutdown) {
@@ -308,7 +309,7 @@ private void rescan() throws InterruptedException {
308309
rescanCachedBlockMap();
309310
blockManager.getDatanodeManager().resetLastCachingDirectiveSentTime();
310311
} finally {
311-
namesystem.writeUnlock("cacheReplicationMonitorRescan");
312+
namesystem.writeUnlock(FSNamesystemLockMode.GLOBAL, "cacheReplicationMonitorRescan");
312313
}
313314
}
314315

@@ -325,11 +326,11 @@ private void reacquireLock(long last) {
325326
long now = Time.monotonicNow();
326327
if (now - last > cacheManager.getMaxLockTimeMs()) {
327328
try {
328-
namesystem.writeUnlock();
329+
namesystem.writeUnlock(FSNamesystemLockMode.GLOBAL, "cacheReplicationMonitorRescan");
329330
Thread.sleep(cacheManager.getSleepTimeMs());
330331
} catch (InterruptedException e) {
331332
} finally {
332-
namesystem.writeLock();
333+
namesystem.writeLock(FSNamesystemLockMode.GLOBAL);
333334
}
334335
}
335336
}

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@
8080
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList.Type;
8181
import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;
8282
import org.apache.hadoop.hdfs.server.namenode.FsImageProto.CacheManagerSection;
83+
import org.apache.hadoop.hdfs.server.namenode.fgl.FSNamesystemLockMode;
8384
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
8485
import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
8586
import org.apache.hadoop.hdfs.server.namenode.startupprogress.Phase;
@@ -317,7 +318,7 @@ public void stopMonitorThread() {
317318
}
318319

319320
public void clearDirectiveStats() {
320-
assert namesystem.hasWriteLock();
321+
assert namesystem.hasWriteLock(FSNamesystemLockMode.FS);
321322
for (CacheDirective directive : directivesById.values()) {
322323
directive.resetStatistics();
323324
}
@@ -327,26 +328,26 @@ public void clearDirectiveStats() {
327328
* @return Unmodifiable view of the collection of CachePools.
328329
*/
329330
public Collection<CachePool> getCachePools() {
330-
assert namesystem.hasReadLock();
331+
assert namesystem.hasReadLock(FSNamesystemLockMode.FS);
331332
return Collections.unmodifiableCollection(cachePools.values());
332333
}
333334

334335
/**
335336
* @return Unmodifiable view of the collection of CacheDirectives.
336337
*/
337338
public Collection<CacheDirective> getCacheDirectives() {
338-
assert namesystem.hasReadLock();
339+
assert namesystem.hasReadLock(FSNamesystemLockMode.FS);
339340
return Collections.unmodifiableCollection(directivesById.values());
340341
}
341342

342343
@VisibleForTesting
343344
public GSet<CachedBlock, CachedBlock> getCachedBlocks() {
344-
assert namesystem.hasReadLock();
345+
assert namesystem.hasReadLock(FSNamesystemLockMode.BM);
345346
return cachedBlocks;
346347
}
347348

348349
private long getNextDirectiveId() throws IOException {
349-
assert namesystem.hasWriteLock();
350+
assert namesystem.hasWriteLock(FSNamesystemLockMode.FS);
350351
if (nextDirectiveId >= Long.MAX_VALUE - 1) {
351352
throw new IOException("No more available IDs.");
352353
}
@@ -574,7 +575,7 @@ CacheDirectiveInfo addDirectiveFromEditLog(CacheDirectiveInfo directive)
574575
public CacheDirectiveInfo addDirective(
575576
CacheDirectiveInfo info, FSPermissionChecker pc, EnumSet<CacheFlag> flags)
576577
throws IOException {
577-
assert namesystem.hasWriteLock();
578+
assert namesystem.hasWriteLock(FSNamesystemLockMode.FS);
578579
CacheDirective directive;
579580
try {
580581
CachePool pool = getCachePool(validatePoolName(info));
@@ -652,7 +653,7 @@ void modifyDirectiveFromEditLog(CacheDirectiveInfo info)
652653

653654
public void modifyDirective(CacheDirectiveInfo info,
654655
FSPermissionChecker pc, EnumSet<CacheFlag> flags) throws IOException {
655-
assert namesystem.hasWriteLock();
656+
assert namesystem.hasWriteLock(FSNamesystemLockMode.FS);
656657
String idString =
657658
(info.getId() == null) ?
658659
"(null)" : info.getId().toString();
@@ -703,7 +704,7 @@ public void modifyDirective(CacheDirectiveInfo info,
703704

704705
private void removeInternal(CacheDirective directive)
705706
throws InvalidRequestException {
706-
assert namesystem.hasWriteLock();
707+
assert namesystem.hasWriteLock(FSNamesystemLockMode.FS);
707708
// Remove the corresponding entry in directivesByPath.
708709
String path = directive.getPath();
709710
if (!directivesByPath.remove(path, directive)) {
@@ -724,7 +725,7 @@ private void removeInternal(CacheDirective directive)
724725

725726
public void removeDirective(long id, FSPermissionChecker pc)
726727
throws IOException {
727-
assert namesystem.hasWriteLock();
728+
assert namesystem.hasWriteLock(FSNamesystemLockMode.FS);
728729
try {
729730
CacheDirective directive = getById(id);
730731
checkWritePermission(pc, directive.getPool());
@@ -740,7 +741,7 @@ public void removeDirective(long id, FSPermissionChecker pc)
740741
listCacheDirectives(long prevId,
741742
CacheDirectiveInfo filter,
742743
FSPermissionChecker pc) throws IOException {
743-
assert namesystem.hasReadLock();
744+
assert namesystem.hasReadLock(FSNamesystemLockMode.FS);
744745
final int NUM_PRE_ALLOCATED_ENTRIES = 16;
745746
String filterPath = null;
746747
if (filter.getPath() != null) {
@@ -815,7 +816,7 @@ public void removeDirective(long id, FSPermissionChecker pc)
815816
*/
816817
public CachePoolInfo addCachePool(CachePoolInfo info)
817818
throws IOException {
818-
assert namesystem.hasWriteLock();
819+
assert namesystem.hasWriteLock(FSNamesystemLockMode.FS);
819820
CachePool pool;
820821
try {
821822
CachePoolInfo.validate(info);
@@ -845,7 +846,7 @@ public CachePoolInfo addCachePool(CachePoolInfo info)
845846
*/
846847
public void modifyCachePool(CachePoolInfo info)
847848
throws IOException {
848-
assert namesystem.hasWriteLock();
849+
assert namesystem.hasWriteLock(FSNamesystemLockMode.FS);
849850
StringBuilder bld = new StringBuilder();
850851
try {
851852
CachePoolInfo.validate(info);
@@ -915,7 +916,7 @@ public void modifyCachePool(CachePoolInfo info)
915916
*/
916917
public void removeCachePool(String poolName)
917918
throws IOException {
918-
assert namesystem.hasWriteLock();
919+
assert namesystem.hasWriteLock(FSNamesystemLockMode.FS);
919920
try {
920921
CachePoolInfo.validateName(poolName);
921922
CachePool pool = cachePools.remove(poolName);
@@ -941,7 +942,7 @@ public void removeCachePool(String poolName)
941942

942943
public BatchedListEntries<CachePoolEntry>
943944
listCachePools(FSPermissionChecker pc, String prevKey) {
944-
assert namesystem.hasReadLock();
945+
assert namesystem.hasReadLock(FSNamesystemLockMode.FS);
945946
final int NUM_PRE_ALLOCATED_ENTRIES = 16;
946947
ArrayList<CachePoolEntry> results =
947948
new ArrayList<CachePoolEntry>(NUM_PRE_ALLOCATED_ENTRIES);
@@ -1008,7 +1009,7 @@ public final void processCacheReport(final DatanodeID datanodeID,
10081009
datanodeID, DFS_NAMENODE_CACHING_ENABLED_KEY, blockIds.size());
10091010
return;
10101011
}
1011-
namesystem.writeLock();
1012+
namesystem.writeLock(FSNamesystemLockMode.BM);
10121013
final long startTime = Time.monotonicNow();
10131014
final long endTime;
10141015
try {
@@ -1022,7 +1023,7 @@ public final void processCacheReport(final DatanodeID datanodeID,
10221023
processCacheReportImpl(datanode, blockIds);
10231024
} finally {
10241025
endTime = Time.monotonicNow();
1025-
namesystem.writeUnlock("processCacheReport");
1026+
namesystem.writeUnlock(FSNamesystemLockMode.BM, "processCacheReport");
10261027
}
10271028

10281029
// Log the block report processing stats from Namenode perspective

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -7802,7 +7802,7 @@ long addCacheDirective(CacheDirectiveInfo directive,
78027802
checkOperation(OperationCategory.WRITE);
78037803
FSPermissionChecker.setOperationType(operationName);
78047804
try {
7805-
writeLock();
7805+
writeLock(FSNamesystemLockMode.FS);
78067806
try {
78077807
checkOperation(OperationCategory.WRITE);
78087808
checkNameNodeSafeMode("Cannot add cache directive");
@@ -7811,7 +7811,7 @@ long addCacheDirective(CacheDirectiveInfo directive,
78117811
} finally {
78127812
effectiveDirectiveStr = effectiveDirective != null ?
78137813
effectiveDirective.toString() : null;
7814-
writeUnlock(operationName,
7814+
writeUnlock(FSNamesystemLockMode.FS, operationName,
78157815
getLockReportInfoSupplier(effectiveDirectiveStr));
78167816
}
78177817
} catch (AccessControlException ace) {
@@ -7835,14 +7835,14 @@ void modifyCacheDirective(CacheDirectiveInfo directive,
78357835
FSPermissionChecker.setOperationType(operationName);
78367836
checkOperation(OperationCategory.WRITE);
78377837
try {
7838-
writeLock();
7838+
writeLock(FSNamesystemLockMode.FS);
78397839
try {
78407840
checkOperation(OperationCategory.WRITE);
78417841
checkNameNodeSafeMode("Cannot add cache directive");
78427842
FSNDNCacheOp.modifyCacheDirective(this, cacheManager, directive, flags,
78437843
logRetryCache);
78447844
} finally {
7845-
writeUnlock(operationName,
7845+
writeUnlock(FSNamesystemLockMode.FS, operationName,
78467846
getLockReportInfoSupplier(idStr, directive.toString()));
78477847
}
78487848
} catch (AccessControlException ace) {
@@ -7861,14 +7861,14 @@ void removeCacheDirective(long id, boolean logRetryCache) throws IOException {
78617861
checkOperation(OperationCategory.WRITE);
78627862
FSPermissionChecker.setOperationType(operationName);
78637863
try {
7864-
writeLock();
7864+
writeLock(FSNamesystemLockMode.FS);
78657865
try {
78667866
checkOperation(OperationCategory.WRITE);
78677867
checkNameNodeSafeMode("Cannot remove cache directives");
78687868
FSNDNCacheOp.removeCacheDirective(this, cacheManager, id,
78697869
logRetryCache);
78707870
} finally {
7871-
writeUnlock(operationName, getLockReportInfoSupplier(idStr));
7871+
writeUnlock(FSNamesystemLockMode.FS, operationName, getLockReportInfoSupplier(idStr));
78727872
}
78737873
} catch (AccessControlException ace) {
78747874
logAuditEvent(false, operationName, idStr, null, null);
@@ -7886,13 +7886,13 @@ BatchedListEntries<CacheDirectiveEntry> listCacheDirectives(
78867886
BatchedListEntries<CacheDirectiveEntry> results;
78877887
cacheManager.waitForRescanIfNeeded();
78887888
try {
7889-
readLock();
7889+
readLock(FSNamesystemLockMode.FS);
78907890
try {
78917891
checkOperation(OperationCategory.READ);
78927892
results = FSNDNCacheOp.listCacheDirectives(this, cacheManager, startId,
78937893
filter);
78947894
} finally {
7895-
readUnlock(operationName,
7895+
readUnlock(FSNamesystemLockMode.FS, operationName,
78967896
getLockReportInfoSupplier(filter.toString()));
78977897
}
78987898
} catch (AccessControlException ace) {
@@ -7911,7 +7911,7 @@ void addCachePool(CachePoolInfo req, boolean logRetryCache)
79117911
String poolName = req == null ? null : req.getPoolName();
79127912
checkSuperuserPrivilege(operationName, poolName);
79137913
try {
7914-
writeLock();
7914+
writeLock(FSNamesystemLockMode.FS);
79157915
try {
79167916
checkOperation(OperationCategory.WRITE);
79177917
checkNameNodeSafeMode("Cannot add cache pool"
@@ -7920,7 +7920,7 @@ void addCachePool(CachePoolInfo req, boolean logRetryCache)
79207920
logRetryCache);
79217921
poolInfoStr = info.toString();
79227922
} finally {
7923-
writeUnlock(operationName, getLockReportInfoSupplier(poolInfoStr));
7923+
writeUnlock(FSNamesystemLockMode.FS, operationName, getLockReportInfoSupplier(poolInfoStr));
79247924
}
79257925
} catch (AccessControlException ace) {
79267926
logAuditEvent(false, operationName, poolInfoStr);
@@ -7938,14 +7938,14 @@ void modifyCachePool(CachePoolInfo req, boolean logRetryCache)
79387938
(req == null ? null : req.getPoolName()) + "}";
79397939
checkSuperuserPrivilege(operationName, poolNameStr);
79407940
try {
7941-
writeLock();
7941+
writeLock(FSNamesystemLockMode.FS);
79427942
try {
79437943
checkOperation(OperationCategory.WRITE);
79447944
checkNameNodeSafeMode("Cannot modify cache pool"
79457945
+ (req == null ? null : req.getPoolName()));
79467946
FSNDNCacheOp.modifyCachePool(this, cacheManager, req, logRetryCache);
79477947
} finally {
7948-
writeUnlock(operationName, getLockReportInfoSupplier(poolNameStr,
7948+
writeUnlock(FSNamesystemLockMode.FS, operationName, getLockReportInfoSupplier(poolNameStr,
79497949
req == null ? null : req.toString()));
79507950
}
79517951
} catch (AccessControlException ace) {
@@ -7965,14 +7965,14 @@ void removeCachePool(String cachePoolName, boolean logRetryCache)
79657965
String poolNameStr = "{poolName: " + cachePoolName + "}";
79667966
checkSuperuserPrivilege(operationName, poolNameStr);
79677967
try {
7968-
writeLock();
7968+
writeLock(FSNamesystemLockMode.FS);
79697969
try {
79707970
checkOperation(OperationCategory.WRITE);
79717971
checkNameNodeSafeMode("Cannot modify cache pool" + cachePoolName);
79727972
FSNDNCacheOp.removeCachePool(this, cacheManager, cachePoolName,
79737973
logRetryCache);
79747974
} finally {
7975-
writeUnlock(operationName, getLockReportInfoSupplier(poolNameStr));
7975+
writeUnlock(FSNamesystemLockMode.FS, operationName, getLockReportInfoSupplier(poolNameStr));
79767976
}
79777977
} catch (AccessControlException ace) {
79787978
logAuditEvent(false, operationName, poolNameStr);
@@ -7990,12 +7990,12 @@ BatchedListEntries<CachePoolEntry> listCachePools(String prevKey)
79907990
FSPermissionChecker.setOperationType(operationName);
79917991
cacheManager.waitForRescanIfNeeded();
79927992
try {
7993-
readLock();
7993+
readLock(FSNamesystemLockMode.FS);
79947994
try {
79957995
checkOperation(OperationCategory.READ);
79967996
results = FSNDNCacheOp.listCachePools(this, cacheManager, prevKey);
79977997
} finally {
7998-
readUnlock(operationName, getLockReportInfoSupplier(null));
7998+
readUnlock(FSNamesystemLockMode.FS, operationName, getLockReportInfoSupplier(null));
79997999
}
80008000
} catch (AccessControlException ace) {
80018001
logAuditEvent(false, operationName, null);

0 commit comments

Comments
 (0)