Skip to content

Commit 84e16ad

Browse files
Stephen O'DonnellHexiaoqiao
authored andcommitted
HDFS-15160. ReplicaMap, Disk Balancer, Directory Scanner and various FsDatasetImpl methods should use datanode readlock. Contributed by Stephen O'Donnell.
Signed-off-by: Wei-Chiu Chuang <weichiu@apache.org> (cherry picked from commit 2a67e2b)
1 parent 3c715a2 commit 84e16ad

File tree

9 files changed

+187
-44
lines changed

9 files changed

+187
-44
lines changed

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -552,6 +552,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
552552
public static final String DFS_DATANODE_LOCK_FAIR_KEY =
553553
"dfs.datanode.lock.fair";
554554
public static final boolean DFS_DATANODE_LOCK_FAIR_DEFAULT = true;
555+
public static final String DFS_DATANODE_LOCK_READ_WRITE_ENABLED_KEY =
556+
"dfs.datanode.lock.read.write.enabled";
557+
public static final Boolean DFS_DATANODE_LOCK_READ_WRITE_ENABLED_DEFAULT =
558+
true;
555559
public static final String DFS_DATANODE_LOCK_REPORTING_THRESHOLD_MS_KEY =
556560
"dfs.datanode.lock-reporting-threshold-ms";
557561
public static final long

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,7 @@ class BlockSender implements java.io.Closeable {
254254
// the append write.
255255
ChunkChecksum chunkChecksum = null;
256256
final long replicaVisibleLength;
257-
try(AutoCloseableLock lock = datanode.data.acquireDatasetLock()) {
257+
try(AutoCloseableLock lock = datanode.data.acquireDatasetReadLock()) {
258258
replica = getReplica(block, datanode);
259259
replicaVisibleLength = replica.getVisibleLength();
260260
}

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3010,7 +3010,7 @@ void transferReplicaForPipelineRecovery(final ExtendedBlock b,
30103010
final BlockConstructionStage stage;
30113011

30123012
//get replica information
3013-
try(AutoCloseableLock lock = data.acquireDatasetLock()) {
3013+
try(AutoCloseableLock lock = data.acquireDatasetReadLock()) {
30143014
Block storedBlock = data.getStoredBlock(b.getBlockPoolId(),
30153015
b.getBlockId());
30163016
if (null == storedBlock) {

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -504,7 +504,7 @@ private Map<String, String> getStorageIDToVolumeBasePathMap()
504504
Map<String, String> storageIDToVolBasePathMap = new HashMap<>();
505505
FsDatasetSpi.FsVolumeReferences references;
506506
try {
507-
try(AutoCloseableLock lock = this.dataset.acquireDatasetLock()) {
507+
try(AutoCloseableLock lock = this.dataset.acquireDatasetReadLock()) {
508508
references = this.dataset.getFsVolumeReferences();
509509
for (int ndx = 0; ndx < references.size(); ndx++) {
510510
FsVolumeSpi vol = references.get(ndx);

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -657,12 +657,16 @@ ReplicaInfo moveBlockAcrossVolumes(final ExtendedBlock block,
657657
FsVolumeSpi destination) throws IOException;
658658

659659
/**
660-
* Acquire the lock of the data set.
660+
* Acquire the lock of the data set. This prevents other threads from
661+
* modifying the volume map structure inside the datanode, but other changes
662+
* are still possible. For example modifying the genStamp of a block instance.
661663
*/
662664
AutoCloseableLock acquireDatasetLock();
663665

664666
/***
665-
* Acquire the read lock of the data set.
667+
* Acquire the read lock of the data set. This prevents other threads from
668+
* modifying the volume map structure inside the datanode, but other changes
669+
* are still possible. For example modifying the genStamp of a block instance.
666670
* @return The AutoClosable read lock instance.
667671
*/
668672
AutoCloseableLock acquireDatasetReadLock();

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java

Lines changed: 40 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@
4141
import java.util.concurrent.Executor;
4242
import java.util.concurrent.locks.Condition;
4343
import java.util.concurrent.TimeUnit;
44-
import java.util.concurrent.locks.ReentrantReadWriteLock;
4544

4645
import javax.management.NotCompliantMBeanException;
4746
import javax.management.ObjectName;
@@ -179,7 +178,7 @@ public StorageReport[] getStorageReports(String bpid)
179178

180179
@Override
181180
public FsVolumeImpl getVolume(final ExtendedBlock b) {
182-
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
181+
try (AutoCloseableLock lock = datasetReadLock.acquire()) {
183182
final ReplicaInfo r =
184183
volumeMap.get(b.getBlockPoolId(), b.getLocalBlock());
185184
return r != null ? (FsVolumeImpl) r.getVolume() : null;
@@ -189,7 +188,7 @@ public FsVolumeImpl getVolume(final ExtendedBlock b) {
189188
@Override // FsDatasetSpi
190189
public Block getStoredBlock(String bpid, long blkid)
191190
throws IOException {
192-
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
191+
try (AutoCloseableLock lock = datasetReadLock.acquire()) {
193192
ReplicaInfo r = volumeMap.get(bpid, blkid);
194193
if (r == null) {
195194
return null;
@@ -206,7 +205,7 @@ public Block getStoredBlock(String bpid, long blkid)
206205
public Set<? extends Replica> deepCopyReplica(String bpid)
207206
throws IOException {
208207
Set<? extends Replica> replicas = null;
209-
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
208+
try (AutoCloseableLock lock = datasetReadLock.acquire()) {
210209
replicas =
211210
new HashSet<>(volumeMap.replicas(bpid) == null ? Collections.EMPTY_SET
212211
: volumeMap.replicas(bpid));
@@ -302,7 +301,20 @@ public LengthInputStream getMetaDataInputStream(ExtendedBlock b)
302301
DFSConfigKeys.DFS_DATANODE_LOCK_REPORTING_THRESHOLD_MS_DEFAULT,
303302
TimeUnit.MILLISECONDS));
304303
this.datasetWriteLock = new AutoCloseableLock(datasetRWLock.writeLock());
305-
this.datasetReadLock = new AutoCloseableLock(datasetRWLock.readLock());
304+
boolean enableRL = conf.getBoolean(
305+
DFSConfigKeys.DFS_DATANODE_LOCK_READ_WRITE_ENABLED_KEY,
306+
DFSConfigKeys.DFS_DATANODE_LOCK_READ_WRITE_ENABLED_DEFAULT);
307+
// The read lock can be disabled by the above config key. If it is disabled
308+
// then we simply make the both the read and write lock variables hold
309+
// the write lock. All accesses to the lock are via these variables, so that
310+
// effectively disables the read lock.
311+
if (enableRL) {
312+
LOG.info("The datanode lock is a read write lock");
313+
this.datasetReadLock = new AutoCloseableLock(datasetRWLock.readLock());
314+
} else {
315+
LOG.info("The datanode lock is an exclusive write lock");
316+
this.datasetReadLock = this.datasetWriteLock;
317+
}
306318
this.datasetWriteLockCondition = datasetWriteLock.newCondition();
307319

308320
// The number of volumes required for operation is the total number
@@ -342,7 +354,7 @@ public LengthInputStream getMetaDataInputStream(ExtendedBlock b)
342354
}
343355

344356
storageMap = new ConcurrentHashMap<String, DatanodeStorage>();
345-
volumeMap = new ReplicaMap(datasetRWLock);
357+
volumeMap = new ReplicaMap(datasetReadLock, datasetWriteLock);
346358
ramDiskReplicaTracker = RamDiskReplicaTracker.getInstance(conf, this);
347359

348360
@SuppressWarnings("unchecked")
@@ -475,7 +487,8 @@ private void addVolume(Storage.StorageDirectory sd) throws IOException {
475487
.setConf(this.conf)
476488
.build();
477489
FsVolumeReference ref = fsVolume.obtainReference();
478-
ReplicaMap tempVolumeMap = new ReplicaMap(datasetRWLock);
490+
ReplicaMap tempVolumeMap =
491+
new ReplicaMap(datasetReadLock, datasetWriteLock);
479492
fsVolume.getVolumeMap(tempVolumeMap, ramDiskReplicaTracker);
480493

481494
activateVolume(tempVolumeMap, sd, storageLocation.getStorageType(), ref);
@@ -515,7 +528,7 @@ public void addVolume(final StorageLocation location,
515528
final FsVolumeImpl fsVolume =
516529
createFsVolume(sd.getStorageUuid(), sd, location);
517530
final ReplicaMap tempVolumeMap =
518-
new ReplicaMap(new ReentrantReadWriteLock());
531+
new ReplicaMap(datasetReadLock, datasetWriteLock);
519532
ArrayList<IOException> exceptions = Lists.newArrayList();
520533

521534
for (final NamespaceInfo nsInfo : nsInfos) {
@@ -810,7 +823,7 @@ public InputStream getBlockInputStream(ExtendedBlock b,
810823
long seekOffset) throws IOException {
811824

812825
ReplicaInfo info;
813-
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
826+
try (AutoCloseableLock lock = datasetReadLock.acquire()) {
814827
info = volumeMap.get(b.getBlockPoolId(), b.getLocalBlock());
815828
}
816829

@@ -898,7 +911,7 @@ ReplicaInfo getReplicaInfo(String bpid, long blkid)
898911
@Override // FsDatasetSpi
899912
public ReplicaInputStreams getTmpInputStreams(ExtendedBlock b,
900913
long blkOffset, long metaOffset) throws IOException {
901-
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
914+
try (AutoCloseableLock lock = datasetReadLock.acquire()) {
902915
ReplicaInfo info = getReplicaInfo(b);
903916
FsVolumeReference ref = info.getVolume().obtainReference();
904917
try {
@@ -1023,7 +1036,7 @@ public ReplicaInfo moveBlockAcrossStorage(ExtendedBlock block,
10231036
}
10241037

10251038
FsVolumeReference volumeRef = null;
1026-
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
1039+
try (AutoCloseableLock lock = datasetReadLock.acquire()) {
10271040
volumeRef = volumes.getNextVolume(targetStorageType, targetStorageId,
10281041
block.getNumBytes());
10291042
}
@@ -1137,7 +1150,7 @@ public ReplicaInfo moveBlockAcrossVolumes(ExtendedBlock block, FsVolumeSpi
11371150

11381151
FsVolumeReference volumeRef = null;
11391152

1140-
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
1153+
try (AutoCloseableLock lock = datasetReadLock.acquire()) {
11411154
volumeRef = destination.obtainReference();
11421155
}
11431156

@@ -1891,7 +1904,7 @@ public Map<DatanodeStorage, BlockListAsLongs> getBlockReports(String bpid) {
18911904
new HashMap<String, BlockListAsLongs.Builder>();
18921905

18931906
List<FsVolumeImpl> curVolumes = null;
1894-
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
1907+
try (AutoCloseableLock lock = datasetReadLock.acquire()) {
18951908
curVolumes = volumes.getVolumes();
18961909
for (FsVolumeSpi v : curVolumes) {
18971910
builders.put(v.getStorageID(), BlockListAsLongs.builder(maxDataLength));
@@ -1954,7 +1967,7 @@ public Map<DatanodeStorage, BlockListAsLongs> getBlockReports(String bpid) {
19541967
*/
19551968
@Override
19561969
public List<ReplicaInfo> getFinalizedBlocks(String bpid) {
1957-
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
1970+
try (AutoCloseableLock lock = datasetReadLock.acquire()) {
19581971
final List<ReplicaInfo> finalized = new ArrayList<ReplicaInfo>(
19591972
volumeMap.size(bpid));
19601973
for (ReplicaInfo b : volumeMap.replicas(bpid)) {
@@ -2047,9 +2060,7 @@ private boolean isValid(final ExtendedBlock b, final ReplicaState state) {
20472060
ReplicaInfo validateBlockFile(String bpid, long blockId) {
20482061
//Should we check for metadata file too?
20492062
final ReplicaInfo r;
2050-
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
2051-
r = volumeMap.get(bpid, blockId);
2052-
}
2063+
r = volumeMap.get(bpid, blockId);
20532064
if (r != null) {
20542065
if (r.blockDataExists()) {
20552066
return r;
@@ -2292,7 +2303,7 @@ public boolean isCached(String bpid, long blockId) {
22922303

22932304
@Override // FsDatasetSpi
22942305
public boolean contains(final ExtendedBlock block) {
2295-
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
2306+
try (AutoCloseableLock lock = datasetReadLock.acquire()) {
22962307
final long blockId = block.getLocalBlock().getBlockId();
22972308
final String bpid = block.getBlockPoolId();
22982309
final ReplicaInfo r = volumeMap.get(bpid, blockId);
@@ -2613,7 +2624,7 @@ public ReplicaInfo getReplica(String bpid, long blockId) {
26132624

26142625
@Override
26152626
public String getReplicaString(String bpid, long blockId) {
2616-
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
2627+
try (AutoCloseableLock lock = datasetReadLock.acquire()) {
26172628
final Replica r = volumeMap.get(bpid, blockId);
26182629
return r == null ? "null" : r.toString();
26192630
}
@@ -2833,7 +2844,7 @@ private ReplicaInfo updateReplicaUnderRecovery(
28332844
@Override // FsDatasetSpi
28342845
public long getReplicaVisibleLength(final ExtendedBlock block)
28352846
throws IOException {
2836-
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
2847+
try (AutoCloseableLock lock = datasetReadLock.acquire()) {
28372848
final Replica replica = getReplicaInfo(block.getBlockPoolId(),
28382849
block.getBlockId());
28392850
if (replica.getGenerationStamp() < block.getGenerationStamp()) {
@@ -2983,18 +2994,20 @@ public void deleteBlockPool(String bpid, boolean force)
29832994
@Override // FsDatasetSpi
29842995
public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block)
29852996
throws IOException {
2986-
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
2997+
try (AutoCloseableLock lock = datasetReadLock.acquire()) {
29872998
final Replica replica = volumeMap.get(block.getBlockPoolId(),
29882999
block.getBlockId());
29893000
if (replica == null) {
29903001
throw new ReplicaNotFoundException(block);
29913002
}
2992-
if (replica.getGenerationStamp() < block.getGenerationStamp()) {
2993-
throw new IOException(
2994-
"Replica generation stamp < block generation stamp, block="
2995-
+ block + ", replica=" + replica);
2996-
} else if (replica.getGenerationStamp() > block.getGenerationStamp()) {
2997-
block.setGenerationStamp(replica.getGenerationStamp());
3003+
synchronized(replica) {
3004+
if (replica.getGenerationStamp() < block.getGenerationStamp()) {
3005+
throw new IOException(
3006+
"Replica generation stamp < block generation stamp, block="
3007+
+ block + ", replica=" + replica);
3008+
} else if (replica.getGenerationStamp() > block.getGenerationStamp()) {
3009+
block.setGenerationStamp(replica.getGenerationStamp());
3010+
}
29983011
}
29993012
}
30003013

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
* Maintains the replica map.
3333
*/
3434
class ReplicaMap {
35-
private final ReadWriteLock rwLock;
3635
// Lock object to synchronize this instance.
3736
private final AutoCloseableLock readLock;
3837
private final AutoCloseableLock writeLock;
@@ -41,18 +40,22 @@ class ReplicaMap {
4140
private final Map<String, LightWeightResizableGSet<Block, ReplicaInfo>> map =
4241
new HashMap<>();
4342

44-
ReplicaMap(ReadWriteLock lock) {
45-
if (lock == null) {
43+
ReplicaMap(AutoCloseableLock rLock, AutoCloseableLock wLock) {
44+
if (rLock == null || wLock == null) {
4645
throw new HadoopIllegalArgumentException(
4746
"Lock to synchronize on cannot be null");
4847
}
49-
this.rwLock = lock;
50-
this.readLock = new AutoCloseableLock(rwLock.readLock());
51-
this.writeLock = new AutoCloseableLock(rwLock.writeLock());
48+
this.readLock = rLock;
49+
this.writeLock = wLock;
50+
}
51+
52+
ReplicaMap(ReadWriteLock lock) {
53+
this(new AutoCloseableLock(lock.readLock()),
54+
new AutoCloseableLock(lock.writeLock()));
5255
}
5356

5457
String[] getBlockPoolList() {
55-
try (AutoCloseableLock l = writeLock.acquire()) {
58+
try (AutoCloseableLock l = readLock.acquire()) {
5659
return map.keySet().toArray(new String[map.keySet().size()]);
5760
}
5861
}
@@ -97,7 +100,7 @@ ReplicaInfo get(String bpid, Block block) {
97100
*/
98101
ReplicaInfo get(String bpid, long blockId) {
99102
checkBlockPool(bpid);
100-
try (AutoCloseableLock l = writeLock.acquire()) {
103+
try (AutoCloseableLock l = readLock.acquire()) {
101104
LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
102105
return m != null ? m.get(new Block(blockId)) : null;
103106
}
@@ -218,7 +221,7 @@ ReplicaInfo remove(String bpid, long blockId) {
218221
* @return the number of replicas in the map
219222
*/
220223
int size(String bpid) {
221-
try (AutoCloseableLock l = writeLock.acquire()) {
224+
try (AutoCloseableLock l = readLock.acquire()) {
222225
LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
223226
return m != null ? m.size() : 0;
224227
}
@@ -266,4 +269,14 @@ void cleanUpBlockPool(String bpid) {
266269
AutoCloseableLock getLock() {
267270
return writeLock;
268271
}
272+
273+
/**
274+
* Get the lock object used for synchronizing the ReplicasMap for read only
275+
* operations.
276+
* @return The read lock object
277+
*/
278+
AutoCloseableLock getReadLock() {
279+
return readLock;
280+
}
281+
269282
}

hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3009,6 +3009,19 @@
30093009
</description>
30103010
</property>
30113011

3012+
<property>
3013+
<name>dfs.datanode.lock.read.write.enabled</name>
3014+
<value>true</value>
3015+
<description>If this is true, the FsDataset lock will be a read write lock. If
3016+
it is false, all locks will be a write lock.
3017+
Enabling this should give better datanode throughput, as many read only
3018+
functions can run concurrently under the read lock, when they would
3019+
previously have required the exclusive write lock. As the feature is
3020+
experimental, this switch can be used to disable the shared read lock, and
3021+
cause all lock acquisitions to use the exclusive write lock.
3022+
</description>
3023+
</property>
3024+
30123025
<property>
30133026
<name>dfs.datanode.lock-reporting-threshold-ms</name>
30143027
<value>300</value>

0 commit comments

Comments
 (0)