Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.hadoop.hdfs.server.namenode.INode;
import org.apache.hadoop.hdfs.server.namenode.INodeDirectory;
import org.apache.hadoop.hdfs.server.namenode.INodeFile;
import org.apache.hadoop.hdfs.server.namenode.fgl.FSNamesystemLockMode;
import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
import org.apache.hadoop.hdfs.util.ReadOnlyList;
import org.apache.hadoop.util.GSet;
Expand Down Expand Up @@ -223,7 +224,7 @@ public void run() {
* after are not atomic.
*/
public void waitForRescanIfNeeded() {
Preconditions.checkArgument(!namesystem.hasWriteLock(),
Preconditions.checkArgument(!namesystem.hasWriteLock(FSNamesystemLockMode.FS),
"Must not hold the FSN write lock when waiting for a rescan.");
Preconditions.checkArgument(lock.isHeldByCurrentThread(),
"Must hold the CRM lock when waiting for a rescan.");
Expand Down Expand Up @@ -268,7 +269,7 @@ public void setNeedsRescan() {
*/
@Override
public void close() throws IOException {
Preconditions.checkArgument(namesystem.hasWriteLock());
Preconditions.checkArgument(namesystem.hasWriteLock(FSNamesystemLockMode.GLOBAL));
lock.lock();
try {
if (shutdown) return;
Expand All @@ -291,7 +292,7 @@ private void rescan() throws InterruptedException {
scannedBlocks = 0;
lastScanTimeMs = Time.monotonicNow();
try {
namesystem.writeLock();
namesystem.writeLock(FSNamesystemLockMode.GLOBAL);
try {
lock.lock();
if (shutdown) {
Expand All @@ -308,7 +309,7 @@ private void rescan() throws InterruptedException {
rescanCachedBlockMap();
blockManager.getDatanodeManager().resetLastCachingDirectiveSentTime();
} finally {
namesystem.writeUnlock("cacheReplicationMonitorRescan");
namesystem.writeUnlock(FSNamesystemLockMode.GLOBAL, "cacheReplicationMonitorRescan");
}
}

Expand All @@ -325,11 +326,11 @@ private void reacquireLock(long last) {
long now = Time.monotonicNow();
if (now - last > cacheManager.getMaxLockTimeMs()) {
try {
namesystem.writeUnlock();
namesystem.writeUnlock(FSNamesystemLockMode.GLOBAL, "cacheReplicationMonitorRescan");
Thread.sleep(cacheManager.getSleepTimeMs());
} catch (InterruptedException e) {
} finally {
namesystem.writeLock();
namesystem.writeLock(FSNamesystemLockMode.GLOBAL);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList.Type;
import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;
import org.apache.hadoop.hdfs.server.namenode.FsImageProto.CacheManagerSection;
import org.apache.hadoop.hdfs.server.namenode.fgl.FSNamesystemLockMode;
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
import org.apache.hadoop.hdfs.server.namenode.startupprogress.Phase;
Expand Down Expand Up @@ -317,7 +318,7 @@ public void stopMonitorThread() {
}

public void clearDirectiveStats() {
assert namesystem.hasWriteLock();
assert namesystem.hasWriteLock(FSNamesystemLockMode.FS);
for (CacheDirective directive : directivesById.values()) {
directive.resetStatistics();
}
Expand All @@ -327,26 +328,26 @@ public void clearDirectiveStats() {
* @return Unmodifiable view of the collection of CachePools.
*/
public Collection<CachePool> getCachePools() {
assert namesystem.hasReadLock();
assert namesystem.hasReadLock(FSNamesystemLockMode.FS);
return Collections.unmodifiableCollection(cachePools.values());
}

/**
* @return Unmodifiable view of the collection of CacheDirectives.
*/
public Collection<CacheDirective> getCacheDirectives() {
assert namesystem.hasReadLock();
assert namesystem.hasReadLock(FSNamesystemLockMode.FS);
return Collections.unmodifiableCollection(directivesById.values());
}

@VisibleForTesting
public GSet<CachedBlock, CachedBlock> getCachedBlocks() {
assert namesystem.hasReadLock();
assert namesystem.hasReadLock(FSNamesystemLockMode.BM);
return cachedBlocks;
}

private long getNextDirectiveId() throws IOException {
assert namesystem.hasWriteLock();
assert namesystem.hasWriteLock(FSNamesystemLockMode.FS);
if (nextDirectiveId >= Long.MAX_VALUE - 1) {
throw new IOException("No more available IDs.");
}
Expand Down Expand Up @@ -574,7 +575,7 @@ CacheDirectiveInfo addDirectiveFromEditLog(CacheDirectiveInfo directive)
public CacheDirectiveInfo addDirective(
CacheDirectiveInfo info, FSPermissionChecker pc, EnumSet<CacheFlag> flags)
throws IOException {
assert namesystem.hasWriteLock();
assert namesystem.hasWriteLock(FSNamesystemLockMode.FS);
CacheDirective directive;
try {
CachePool pool = getCachePool(validatePoolName(info));
Expand Down Expand Up @@ -652,7 +653,7 @@ void modifyDirectiveFromEditLog(CacheDirectiveInfo info)

public void modifyDirective(CacheDirectiveInfo info,
FSPermissionChecker pc, EnumSet<CacheFlag> flags) throws IOException {
assert namesystem.hasWriteLock();
assert namesystem.hasWriteLock(FSNamesystemLockMode.FS);
String idString =
(info.getId() == null) ?
"(null)" : info.getId().toString();
Expand Down Expand Up @@ -703,7 +704,7 @@ public void modifyDirective(CacheDirectiveInfo info,

private void removeInternal(CacheDirective directive)
throws InvalidRequestException {
assert namesystem.hasWriteLock();
assert namesystem.hasWriteLock(FSNamesystemLockMode.FS);
// Remove the corresponding entry in directivesByPath.
String path = directive.getPath();
if (!directivesByPath.remove(path, directive)) {
Expand All @@ -724,7 +725,7 @@ private void removeInternal(CacheDirective directive)

public void removeDirective(long id, FSPermissionChecker pc)
throws IOException {
assert namesystem.hasWriteLock();
assert namesystem.hasWriteLock(FSNamesystemLockMode.FS);
try {
CacheDirective directive = getById(id);
checkWritePermission(pc, directive.getPool());
Expand All @@ -740,7 +741,7 @@ public void removeDirective(long id, FSPermissionChecker pc)
listCacheDirectives(long prevId,
CacheDirectiveInfo filter,
FSPermissionChecker pc) throws IOException {
assert namesystem.hasReadLock();
assert namesystem.hasReadLock(FSNamesystemLockMode.FS);
final int NUM_PRE_ALLOCATED_ENTRIES = 16;
String filterPath = null;
if (filter.getPath() != null) {
Expand Down Expand Up @@ -815,7 +816,7 @@ public void removeDirective(long id, FSPermissionChecker pc)
*/
public CachePoolInfo addCachePool(CachePoolInfo info)
throws IOException {
assert namesystem.hasWriteLock();
assert namesystem.hasWriteLock(FSNamesystemLockMode.FS);
CachePool pool;
try {
CachePoolInfo.validate(info);
Expand Down Expand Up @@ -845,7 +846,7 @@ public CachePoolInfo addCachePool(CachePoolInfo info)
*/
public void modifyCachePool(CachePoolInfo info)
throws IOException {
assert namesystem.hasWriteLock();
assert namesystem.hasWriteLock(FSNamesystemLockMode.FS);
StringBuilder bld = new StringBuilder();
try {
CachePoolInfo.validate(info);
Expand Down Expand Up @@ -915,7 +916,7 @@ public void modifyCachePool(CachePoolInfo info)
*/
public void removeCachePool(String poolName)
throws IOException {
assert namesystem.hasWriteLock();
assert namesystem.hasWriteLock(FSNamesystemLockMode.FS);
try {
CachePoolInfo.validateName(poolName);
CachePool pool = cachePools.remove(poolName);
Expand All @@ -941,7 +942,7 @@ public void removeCachePool(String poolName)

public BatchedListEntries<CachePoolEntry>
listCachePools(FSPermissionChecker pc, String prevKey) {
assert namesystem.hasReadLock();
assert namesystem.hasReadLock(FSNamesystemLockMode.FS);
final int NUM_PRE_ALLOCATED_ENTRIES = 16;
ArrayList<CachePoolEntry> results =
new ArrayList<CachePoolEntry>(NUM_PRE_ALLOCATED_ENTRIES);
Expand Down Expand Up @@ -1008,7 +1009,7 @@ public final void processCacheReport(final DatanodeID datanodeID,
datanodeID, DFS_NAMENODE_CACHING_ENABLED_KEY, blockIds.size());
return;
}
namesystem.writeLock();
namesystem.writeLock(FSNamesystemLockMode.BM);
final long startTime = Time.monotonicNow();
final long endTime;
try {
Expand All @@ -1022,7 +1023,7 @@ public final void processCacheReport(final DatanodeID datanodeID,
processCacheReportImpl(datanode, blockIds);
} finally {
endTime = Time.monotonicNow();
namesystem.writeUnlock("processCacheReport");
namesystem.writeUnlock(FSNamesystemLockMode.BM, "processCacheReport");
}

// Log the block report processing stats from Namenode perspective
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7767,7 +7767,7 @@ long addCacheDirective(CacheDirectiveInfo directive,
checkOperation(OperationCategory.WRITE);
FSPermissionChecker.setOperationType(operationName);
try {
writeLock();
writeLock(FSNamesystemLockMode.FS);
try {
checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot add cache directive");
Expand All @@ -7776,7 +7776,7 @@ long addCacheDirective(CacheDirectiveInfo directive,
} finally {
effectiveDirectiveStr = effectiveDirective != null ?
effectiveDirective.toString() : null;
writeUnlock(operationName,
writeUnlock(FSNamesystemLockMode.FS, operationName,
getLockReportInfoSupplier(effectiveDirectiveStr));
}
} catch (AccessControlException ace) {
Expand All @@ -7800,14 +7800,14 @@ void modifyCacheDirective(CacheDirectiveInfo directive,
FSPermissionChecker.setOperationType(operationName);
checkOperation(OperationCategory.WRITE);
try {
writeLock();
writeLock(FSNamesystemLockMode.FS);
try {
checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot add cache directive");
FSNDNCacheOp.modifyCacheDirective(this, cacheManager, directive, flags,
logRetryCache);
} finally {
writeUnlock(operationName,
writeUnlock(FSNamesystemLockMode.FS, operationName,
getLockReportInfoSupplier(idStr, directive.toString()));
}
} catch (AccessControlException ace) {
Expand All @@ -7826,14 +7826,14 @@ void removeCacheDirective(long id, boolean logRetryCache) throws IOException {
checkOperation(OperationCategory.WRITE);
FSPermissionChecker.setOperationType(operationName);
try {
writeLock();
writeLock(FSNamesystemLockMode.FS);
try {
checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot remove cache directives");
FSNDNCacheOp.removeCacheDirective(this, cacheManager, id,
logRetryCache);
} finally {
writeUnlock(operationName, getLockReportInfoSupplier(idStr));
writeUnlock(FSNamesystemLockMode.FS, operationName, getLockReportInfoSupplier(idStr));
}
} catch (AccessControlException ace) {
logAuditEvent(false, operationName, idStr, null, null);
Expand All @@ -7851,13 +7851,13 @@ BatchedListEntries<CacheDirectiveEntry> listCacheDirectives(
BatchedListEntries<CacheDirectiveEntry> results;
cacheManager.waitForRescanIfNeeded();
try {
readLock();
readLock(FSNamesystemLockMode.FS);
try {
checkOperation(OperationCategory.READ);
results = FSNDNCacheOp.listCacheDirectives(this, cacheManager, startId,
filter);
} finally {
readUnlock(operationName,
readUnlock(FSNamesystemLockMode.FS, operationName,
getLockReportInfoSupplier(filter.toString()));
}
} catch (AccessControlException ace) {
Expand All @@ -7876,7 +7876,7 @@ void addCachePool(CachePoolInfo req, boolean logRetryCache)
String poolName = req == null ? null : req.getPoolName();
checkSuperuserPrivilege(operationName, poolName);
try {
writeLock();
writeLock(FSNamesystemLockMode.FS);
try {
checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot add cache pool"
Expand All @@ -7885,7 +7885,7 @@ void addCachePool(CachePoolInfo req, boolean logRetryCache)
logRetryCache);
poolInfoStr = info.toString();
} finally {
writeUnlock(operationName, getLockReportInfoSupplier(poolInfoStr));
writeUnlock(FSNamesystemLockMode.FS, operationName, getLockReportInfoSupplier(poolInfoStr));
}
} catch (AccessControlException ace) {
logAuditEvent(false, operationName, poolInfoStr);
Expand All @@ -7903,14 +7903,14 @@ void modifyCachePool(CachePoolInfo req, boolean logRetryCache)
(req == null ? null : req.getPoolName()) + "}";
checkSuperuserPrivilege(operationName, poolNameStr);
try {
writeLock();
writeLock(FSNamesystemLockMode.FS);
try {
checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot modify cache pool"
+ (req == null ? null : req.getPoolName()));
FSNDNCacheOp.modifyCachePool(this, cacheManager, req, logRetryCache);
} finally {
writeUnlock(operationName, getLockReportInfoSupplier(poolNameStr,
writeUnlock(FSNamesystemLockMode.FS, operationName, getLockReportInfoSupplier(poolNameStr,
req == null ? null : req.toString()));
}
} catch (AccessControlException ace) {
Expand All @@ -7930,14 +7930,14 @@ void removeCachePool(String cachePoolName, boolean logRetryCache)
String poolNameStr = "{poolName: " + cachePoolName + "}";
checkSuperuserPrivilege(operationName, poolNameStr);
try {
writeLock();
writeLock(FSNamesystemLockMode.FS);
try {
checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot modify cache pool" + cachePoolName);
FSNDNCacheOp.removeCachePool(this, cacheManager, cachePoolName,
logRetryCache);
} finally {
writeUnlock(operationName, getLockReportInfoSupplier(poolNameStr));
writeUnlock(FSNamesystemLockMode.FS, operationName, getLockReportInfoSupplier(poolNameStr));
}
} catch (AccessControlException ace) {
logAuditEvent(false, operationName, poolNameStr);
Expand All @@ -7955,12 +7955,12 @@ BatchedListEntries<CachePoolEntry> listCachePools(String prevKey)
FSPermissionChecker.setOperationType(operationName);
cacheManager.waitForRescanIfNeeded();
try {
readLock();
readLock(FSNamesystemLockMode.FS);
try {
checkOperation(OperationCategory.READ);
results = FSNDNCacheOp.listCachePools(this, cacheManager, prevKey);
} finally {
readUnlock(operationName, getLockReportInfoSupplier(null));
readUnlock(FSNamesystemLockMode.FS, operationName, getLockReportInfoSupplier(null));
}
} catch (AccessControlException ace) {
logAuditEvent(false, operationName, null);
Expand Down