Skip to content

Commit 3c715a2

Browse files
Stephen O'DonnellHexiaoqiao
authored andcommitted
HDFS-15150. Introduce read write lock to Datanode. Contributed Stephen O'Donnell.
Signed-off-by: Wei-Chiu Chuang <weichiu@apache.org> (cherry picked from commit d7c136b)
1 parent a9dbe47 commit 3c715a2

File tree

16 files changed

+168
-98
lines changed

16 files changed

+168
-98
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/InstrumentedReadWriteLock.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ public class InstrumentedReadWriteLock implements ReadWriteLock {
3737
private final Lock readLock;
3838
private final Lock writeLock;
3939

40-
InstrumentedReadWriteLock(boolean fair, String name, Logger logger,
40+
public InstrumentedReadWriteLock(boolean fair, String name, Logger logger,
4141
long minLoggingGapMs, long lockWarningThresholdMs) {
4242
ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(fair);
4343
readLock = new InstrumentedReadLock(name, logger, readWriteLock,

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -549,6 +549,13 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
549549
"dfs.lock.suppress.warning.interval";
550550
public static final long DFS_LOCK_SUPPRESS_WARNING_INTERVAL_DEFAULT =
551551
10000; //ms
552+
public static final String DFS_DATANODE_LOCK_FAIR_KEY =
553+
"dfs.datanode.lock.fair";
554+
public static final boolean DFS_DATANODE_LOCK_FAIR_DEFAULT = true;
555+
public static final String DFS_DATANODE_LOCK_REPORTING_THRESHOLD_MS_KEY =
556+
"dfs.datanode.lock-reporting-threshold-ms";
557+
public static final long
558+
DFS_DATANODE_LOCK_REPORTING_THRESHOLD_MS_DEFAULT = 300L;
552559

553560
public static final String DFS_UPGRADE_DOMAIN_FACTOR = "dfs.namenode.upgrade.domain.factor";
554561
public static final int DFS_UPGRADE_DOMAIN_FACTOR_DEFAULT = DFS_REPLICATION_DEFAULT;

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -661,5 +661,11 @@ ReplicaInfo moveBlockAcrossVolumes(final ExtendedBlock block,
661661
*/
662662
AutoCloseableLock acquireDatasetLock();
663663

664+
/***
665+
* Acquire the read lock of the data set.
666+
* @return The AutoClosable read lock instance.
667+
*/
668+
AutoCloseableLock acquireDatasetReadLock();
669+
664670
Set<? extends Replica> deepCopyReplica(String bpid) throws IOException;
665671
}

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import java.util.concurrent.ForkJoinTask;
4343
import java.util.concurrent.RecursiveAction;
4444
import java.util.concurrent.atomic.AtomicLong;
45+
import java.util.concurrent.locks.ReentrantReadWriteLock;
4546

4647
import org.apache.hadoop.hdfs.server.datanode.FSCachingGetSpaceUsed;
4748
import org.slf4j.Logger;
@@ -66,7 +67,6 @@
6667
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
6768
import org.apache.hadoop.io.IOUtils;
6869
import org.apache.hadoop.io.MultipleIOException;
69-
import org.apache.hadoop.util.AutoCloseableLock;
7070
import org.apache.hadoop.util.DataChecksum;
7171
import org.apache.hadoop.util.DataChecksum.Type;
7272
import org.apache.hadoop.util.DiskChecker;
@@ -874,7 +874,7 @@ void shutdown(BlockListAsLongs blocksListToPersist) {
874874

875875
private boolean readReplicasFromCache(ReplicaMap volumeMap,
876876
final RamDiskReplicaTracker lazyWriteReplicaMap) {
877-
ReplicaMap tmpReplicaMap = new ReplicaMap(new AutoCloseableLock());
877+
ReplicaMap tmpReplicaMap = new ReplicaMap(new ReentrantReadWriteLock());
878878
File replicaFile = new File(currentDir, REPLICA_CACHE_FILE);
879879
// Check whether the file exists or not.
880880
if (!replicaFile.exists()) {

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java

Lines changed: 81 additions & 62 deletions
Large diffs are not rendered by default.

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ProvidedVolumeImpl.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import java.util.Map.Entry;
2929
import java.util.concurrent.ConcurrentHashMap;
3030
import java.util.concurrent.atomic.AtomicLong;
31+
import java.util.concurrent.locks.ReentrantReadWriteLock;
3132

3233
import org.apache.hadoop.classification.InterfaceAudience;
3334
import org.apache.hadoop.conf.Configuration;
@@ -53,18 +54,17 @@
5354
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
5455
import org.apache.hadoop.hdfs.server.datanode.FileIoProvider;
5556
import org.apache.hadoop.hdfs.server.datanode.ReplicaBuilder;
56-
import org.apache.hadoop.util.Timer;
5757
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
58-
import org.apache.hadoop.util.AutoCloseableLock;
58+
import org.apache.hadoop.util.ReflectionUtils;
59+
import org.apache.hadoop.util.Time;
60+
import org.apache.hadoop.util.Timer;
5961
import org.codehaus.jackson.annotate.JsonProperty;
6062
import org.codehaus.jackson.map.ObjectMapper;
6163
import org.codehaus.jackson.map.ObjectReader;
6264
import org.codehaus.jackson.map.ObjectWriter;
6365

6466
import com.google.common.annotations.VisibleForTesting;
6567

66-
import org.apache.hadoop.util.ReflectionUtils;
67-
import org.apache.hadoop.util.Time;
6868

6969
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PROVIDED_ALIASMAP_LOAD_RETRIES;
7070

@@ -135,7 +135,7 @@ static class ProvidedBlockPoolSlice {
135135
ProvidedBlockPoolSlice(String bpid, ProvidedVolumeImpl volume,
136136
Configuration conf) {
137137
this.providedVolume = volume;
138-
bpVolumeMap = new ReplicaMap(new AutoCloseableLock());
138+
bpVolumeMap = new ReplicaMap(new ReentrantReadWriteLock());
139139
Class<? extends BlockAliasMap> fmt =
140140
conf.getClass(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_CLASS,
141141
TextFileRegionAliasMap.class, BlockAliasMap.class);

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.util.Collection;
2121
import java.util.HashMap;
2222
import java.util.Map;
23+
import java.util.concurrent.locks.ReadWriteLock;
2324

2425
import org.apache.hadoop.HadoopIllegalArgumentException;
2526
import org.apache.hadoop.hdfs.protocol.Block;
@@ -31,23 +32,27 @@
3132
* Maintains the replica map.
3233
*/
3334
class ReplicaMap {
35+
private final ReadWriteLock rwLock;
3436
// Lock object to synchronize this instance.
35-
private final AutoCloseableLock lock;
37+
private final AutoCloseableLock readLock;
38+
private final AutoCloseableLock writeLock;
3639

3740
// Map of block pool Id to another map of block Id to ReplicaInfo.
3841
private final Map<String, LightWeightResizableGSet<Block, ReplicaInfo>> map =
3942
new HashMap<>();
4043

41-
ReplicaMap(AutoCloseableLock lock) {
44+
ReplicaMap(ReadWriteLock lock) {
4245
if (lock == null) {
4346
throw new HadoopIllegalArgumentException(
4447
"Lock to synchronize on cannot be null");
4548
}
46-
this.lock = lock;
49+
this.rwLock = lock;
50+
this.readLock = new AutoCloseableLock(rwLock.readLock());
51+
this.writeLock = new AutoCloseableLock(rwLock.writeLock());
4752
}
4853

4954
String[] getBlockPoolList() {
50-
try (AutoCloseableLock l = lock.acquire()) {
55+
try (AutoCloseableLock l = writeLock.acquire()) {
5156
return map.keySet().toArray(new String[map.keySet().size()]);
5257
}
5358
}
@@ -92,7 +97,7 @@ ReplicaInfo get(String bpid, Block block) {
9297
*/
9398
ReplicaInfo get(String bpid, long blockId) {
9499
checkBlockPool(bpid);
95-
try (AutoCloseableLock l = lock.acquire()) {
100+
try (AutoCloseableLock l = writeLock.acquire()) {
96101
LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
97102
return m != null ? m.get(new Block(blockId)) : null;
98103
}
@@ -109,7 +114,7 @@ ReplicaInfo get(String bpid, long blockId) {
109114
ReplicaInfo add(String bpid, ReplicaInfo replicaInfo) {
110115
checkBlockPool(bpid);
111116
checkBlock(replicaInfo);
112-
try (AutoCloseableLock l = lock.acquire()) {
117+
try (AutoCloseableLock l = writeLock.acquire()) {
113118
LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
114119
if (m == null) {
115120
// Add an entry for block pool if it does not exist already
@@ -127,7 +132,7 @@ ReplicaInfo add(String bpid, ReplicaInfo replicaInfo) {
127132
ReplicaInfo addAndGet(String bpid, ReplicaInfo replicaInfo) {
128133
checkBlockPool(bpid);
129134
checkBlock(replicaInfo);
130-
try (AutoCloseableLock l = lock.acquire()) {
135+
try (AutoCloseableLock l = writeLock.acquire()) {
131136
LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
132137
if (m == null) {
133138
// Add an entry for block pool if it does not exist already
@@ -176,7 +181,7 @@ void mergeAll(ReplicaMap other) {
176181
ReplicaInfo remove(String bpid, Block block) {
177182
checkBlockPool(bpid);
178183
checkBlock(block);
179-
try (AutoCloseableLock l = lock.acquire()) {
184+
try (AutoCloseableLock l = writeLock.acquire()) {
180185
LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
181186
if (m != null) {
182187
ReplicaInfo replicaInfo = m.get(block);
@@ -198,7 +203,7 @@ ReplicaInfo remove(String bpid, Block block) {
198203
*/
199204
ReplicaInfo remove(String bpid, long blockId) {
200205
checkBlockPool(bpid);
201-
try (AutoCloseableLock l = lock.acquire()) {
206+
try (AutoCloseableLock l = writeLock.acquire()) {
202207
LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
203208
if (m != null) {
204209
return m.remove(new Block(blockId));
@@ -213,7 +218,7 @@ ReplicaInfo remove(String bpid, long blockId) {
213218
* @return the number of replicas in the map
214219
*/
215220
int size(String bpid) {
216-
try (AutoCloseableLock l = lock.acquire()) {
221+
try (AutoCloseableLock l = writeLock.acquire()) {
217222
LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
218223
return m != null ? m.size() : 0;
219224
}
@@ -237,7 +242,7 @@ Collection<ReplicaInfo> replicas(String bpid) {
237242

238243
void initBlockPool(String bpid) {
239244
checkBlockPool(bpid);
240-
try (AutoCloseableLock l = lock.acquire()) {
245+
try (AutoCloseableLock l = writeLock.acquire()) {
241246
LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
242247
if (m == null) {
243248
// Add an entry for block pool if it does not exist already
@@ -249,7 +254,7 @@ void initBlockPool(String bpid) {
249254

250255
void cleanUpBlockPool(String bpid) {
251256
checkBlockPool(bpid);
252-
try (AutoCloseableLock l = lock.acquire()) {
257+
try (AutoCloseableLock l = writeLock.acquire()) {
253258
map.remove(bpid);
254259
}
255260
}
@@ -259,6 +264,6 @@ void cleanUpBlockPool(String bpid) {
259264
* @return lock object
260265
*/
261266
AutoCloseableLock getLock() {
262-
return lock;
267+
return writeLock;
263268
}
264269
}

hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2999,6 +2999,27 @@
29992999
</description>
30003000
</property>
30013001

3002+
<property>
3003+
<name>dfs.datanode.lock.fair</name>
3004+
<value>true</value>
3005+
<description>If this is true, the Datanode FsDataset lock will be used in Fair
3006+
mode, which will help to prevent writer threads from being starved, but can
3007+
lower lock throughput. See java.util.concurrent.locks.ReentrantReadWriteLock
3008+
for more information on fair/non-fair locks.
3009+
</description>
3010+
</property>
3011+
3012+
<property>
3013+
<name>dfs.datanode.lock-reporting-threshold-ms</name>
3014+
<value>300</value>
3015+
<description>When thread waits to obtain a lock, or a thread holds a lock for
3016+
more than the threshold, a log message will be written. Note that
3017+
dfs.lock.suppress.warning.interval ensures a single log message is
3018+
emitted per interval for waiting threads and a single message for holding
3019+
threads to avoid excessive logging.
3020+
</description>
3021+
</property>
3022+
30023023
<property>
30033024
<name>dfs.namenode.startup.delay.block.deletion.sec</name>
30043025
<value>0</value>

hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1572,6 +1572,12 @@ public AutoCloseableLock acquireDatasetLock() {
15721572
return datasetLock.acquire();
15731573
}
15741574

1575+
@Override
1576+
public AutoCloseableLock acquireDatasetReadLock() {
1577+
// No RW lock implementation in simulated dataset currently.
1578+
return datasetLock.acquire();
1579+
}
1580+
15751581
@Override
15761582
public Set<? extends Replica> deepCopyReplica(String bpid)
15771583
throws IOException {

hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -455,6 +455,11 @@ public AutoCloseableLock acquireDatasetLock() {
455455
return null;
456456
}
457457

458+
@Override
459+
public AutoCloseableLock acquireDatasetReadLock() {
460+
return null;
461+
}
462+
458463
@Override
459464
public Set<? extends Replica> deepCopyReplica(String bpid)
460465
throws IOException {

0 commit comments

Comments
 (0)