Skip to content

Commit 59e36cc

Browse files
committed
HDFS-17218. Modify patch based on comments
1 parent 77c1342 commit 59e36cc

File tree

2 files changed

+26
-37
lines changed

2 files changed

+26
-37
lines changed

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import java.util.BitSet;
3131
import java.util.Collection;
3232
import java.util.Collections;
33+
import java.util.Comparator;
3334
import java.util.EnumSet;
3435
import java.util.HashMap;
3536
import java.util.HashSet;
@@ -50,6 +51,7 @@
5051
import java.util.concurrent.ConcurrentLinkedQueue;
5152

5253
import java.util.concurrent.atomic.AtomicLong;
54+
import java.util.stream.Collectors;
5355
import javax.management.ObjectName;
5456

5557
import org.apache.hadoop.HadoopIllegalArgumentException;
@@ -3098,15 +3100,18 @@ void processTimedOutExcessBlocks() {
30983100
long now = Time.monotonicNow();
30993101
int processed = 0;
31003102
try {
3101-
Iterator<Map.Entry<String, LightWeightHashSet<ExcessBlockInfo>>> iter =
3103+
Iterator<Map.Entry<String, LightWeightHashSet<Block>>> iter =
31023104
excessRedundancyMap.getExcessRedundancyMap().entrySet().iterator();
31033105
while (iter.hasNext() && processed < excessRedundancyTimeoutCheckLimit) {
3104-
Map.Entry<String, LightWeightHashSet<ExcessBlockInfo>> entry = iter.next();
3106+
Map.Entry<String, LightWeightHashSet<Block>> entry = iter.next();
31053107
String datanodeUuid = entry.getKey();
3106-
LightWeightHashSet<ExcessBlockInfo> blocks = entry.getValue();
3107-
List<ExcessBlockInfo> sortedBlocks = new ArrayList<>(blocks);
3108+
LightWeightHashSet<Block> blocks = entry.getValue();
31083109
// Sort blocks by timestamp in descending order.
3109-
Collections.sort(sortedBlocks);
3110+
List<ExcessBlockInfo> sortedBlocks = blocks.stream()
3111+
.filter(block -> block instanceof ExcessBlockInfo)
3112+
.map(block -> (ExcessBlockInfo) block)
3113+
.sorted(Comparator.comparingLong(ExcessBlockInfo::getTimeStamp))
3114+
.collect(Collectors.toList());
31103115

31113116
for (ExcessBlockInfo excessBlockInfo : sortedBlocks) {
31123117
if (processed >= excessRedundancyTimeoutCheckLimit) {
@@ -3132,11 +3137,11 @@ void processTimedOutExcessBlocks() {
31323137
DatanodeDescriptor datanodeDescriptor = datanodeStorageInfo.getDatanodeDescriptor();
31333138
if (datanodeDescriptor.getDatanodeUuid().equals(datanodeUuid) &&
31343139
datanodeStorageInfo.getState().equals(State.NORMAL)) {
3135-
final Block block = getBlockOnStorage(blockInfo, datanodeStorageInfo);
3136-
if (!containsInvalidateBlock(datanodeDescriptor, block)) {
3137-
addToInvalidates(block, datanodeDescriptor);
3140+
final Block b = getBlockOnStorage(blockInfo, datanodeStorageInfo);
3141+
if (!containsInvalidateBlock(datanodeDescriptor, b)) {
3142+
addToInvalidates(b, datanodeDescriptor);
31383143
LOG.debug("Excess block timeout ({}, {}) is added to invalidated.",
3139-
block, datanodeDescriptor);
3144+
b, datanodeDescriptor);
31403145
}
31413146
excessBlockInfo.setTimeStamp();
31423147
break;

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ExcessRedundancyMap.java

Lines changed: 12 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.Map;
2222
import java.util.concurrent.atomic.AtomicLong;
2323

24+
import org.apache.hadoop.hdfs.protocol.Block;
2425
import org.apache.hadoop.hdfs.server.namenode.NameNode;
2526
import org.apache.hadoop.hdfs.util.LightWeightHashSet;
2627
import org.slf4j.Logger;
@@ -37,7 +38,7 @@
3738
class ExcessRedundancyMap {
3839
public static final Logger blockLog = NameNode.blockStateChangeLog;
3940

40-
private final Map<String, LightWeightHashSet<ExcessBlockInfo>> map = new HashMap<>();
41+
private final Map<String, LightWeightHashSet<Block>> map = new HashMap<>();
4142
private final AtomicLong size = new AtomicLong(0L);
4243

4344
/**
@@ -52,7 +53,7 @@ long size() {
5253
*/
5354
@VisibleForTesting
5455
synchronized int getSize4Testing(String dnUuid) {
55-
final LightWeightHashSet<ExcessBlockInfo> set = map.get(dnUuid);
56+
final LightWeightHashSet<Block> set = map.get(dnUuid);
5657
return set == null? 0: set.size();
5758
}
5859

@@ -66,7 +67,7 @@ synchronized void clear() {
6667
* datanode and the given block?
6768
*/
6869
synchronized boolean contains(DatanodeDescriptor dn, BlockInfo blk) {
69-
final LightWeightHashSet<ExcessBlockInfo> set = map.get(dn.getDatanodeUuid());
70+
final LightWeightHashSet<Block> set = map.get(dn.getDatanodeUuid());
7071
return set != null && set.contains(blk);
7172
}
7273

@@ -77,7 +78,7 @@ synchronized boolean contains(DatanodeDescriptor dn, BlockInfo blk) {
7778
* @return true if the block is added.
7879
*/
7980
synchronized boolean add(DatanodeDescriptor dn, BlockInfo blk) {
80-
LightWeightHashSet<ExcessBlockInfo> set = map.get(dn.getDatanodeUuid());
81+
LightWeightHashSet<Block> set = map.get(dn.getDatanodeUuid());
8182
if (set == null) {
8283
set = new LightWeightHashSet<>();
8384
map.put(dn.getDatanodeUuid(), set);
@@ -97,11 +98,10 @@ synchronized boolean add(DatanodeDescriptor dn, BlockInfo blk) {
9798
* @return true if the block is removed.
9899
*/
99100
synchronized boolean remove(DatanodeDescriptor dn, BlockInfo blk) {
100-
final LightWeightHashSet<ExcessBlockInfo> set = map.get(dn.getDatanodeUuid());
101+
final LightWeightHashSet<Block> set = map.get(dn.getDatanodeUuid());
101102
if (set == null) {
102103
return false;
103104
}
104-
105105
final boolean removed = set.remove(blk);
106106
if (removed) {
107107
size.decrementAndGet();
@@ -114,19 +114,20 @@ synchronized boolean remove(DatanodeDescriptor dn, BlockInfo blk) {
114114
return removed;
115115
}
116116

117-
synchronized Map<String, LightWeightHashSet<ExcessBlockInfo>> getExcessRedundancyMap() {
117+
synchronized Map<String, LightWeightHashSet<Block>> getExcessRedundancyMap() {
118118
return map;
119119
}
120120

121121
/**
122122
* An object that contains information about a block that is being excess redundancy.
123123
* It records the timestamp when added excess redundancy map of this block.
124124
*/
125-
static class ExcessBlockInfo implements Comparable<ExcessBlockInfo> {
125+
static class ExcessBlockInfo extends Block {
126126
private long timeStamp;
127-
private BlockInfo blockInfo;
127+
private final BlockInfo blockInfo;
128128

129129
ExcessBlockInfo(BlockInfo blockInfo) {
130+
super(blockInfo.getBlockId(), blockInfo.getNumBytes(), blockInfo.getGenerationStamp());
130131
this.timeStamp = monotonicNow();
131132
this.blockInfo = blockInfo;
132133
}
@@ -145,29 +146,12 @@ void setTimeStamp() {
145146

146147
@Override
147148
public int hashCode() {
148-
return blockInfo.hashCode();
149+
return super.hashCode();
149150
}
150151

151152
@Override
152153
public boolean equals(Object obj) {
153-
if (this == obj) {
154-
return true;
155-
}
156-
157-
if (obj instanceof ExcessBlockInfo) {
158-
ExcessBlockInfo other = (ExcessBlockInfo) obj;
159-
return this.blockInfo.equals(other.blockInfo);
160-
}
161-
162-
if (obj instanceof BlockInfo) {
163-
return this.blockInfo.equals(obj);
164-
}
165-
return false;
166-
}
167-
168-
@Override
169-
public int compareTo(ExcessBlockInfo o) {
170-
return Long.compare(this.timeStamp, o.timeStamp);
154+
return super.equals(obj);
171155
}
172156
}
173157
}

0 commit comments

Comments
 (0)