Skip to content

Commit 05ddaff

Browse files
jojochuangliubingxing
authored andcommitted
HDFS-16423. Balancer should not get blocks on stale storages (apache#3883) (apache#3924)
Reviewed-by: litao <tomleescut@gmail.com> Signed-off-by: Takanobu Asanuma <tasanuma@apache.org> (cherry picked from commit db2c320) Conflicts: hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestGetBlocks.java hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java Co-authored-by: qinyuren <1476659627@qq.com>
1 parent a70a53e commit 05ddaff

File tree

6 files changed

+128
-4
lines changed

6 files changed

+128
-4
lines changed

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1642,9 +1642,16 @@ public BlocksWithLocations getBlocksWithLocations(final DatanodeID datanode,
16421642
if(numBlocks == 0) {
16431643
return new BlocksWithLocations(new BlockWithLocations[0]);
16441644
}
1645+
1646+
// skip stale storage
1647+
DatanodeStorageInfo[] storageInfos = Arrays
1648+
.stream(node.getStorageInfos())
1649+
.filter(s -> !s.areBlockContentsStale())
1650+
.toArray(DatanodeStorageInfo[]::new);
1651+
16451652
// starting from a random block
16461653
int startBlock = ThreadLocalRandom.current().nextInt(numBlocks);
1647-
Iterator<BlockInfo> iter = node.getBlockIterator(startBlock);
1654+
Iterator<BlockInfo> iter = node.getBlockIterator(startBlock, storageInfos);
16481655
List<BlockWithLocations> results = new ArrayList<BlockWithLocations>();
16491656
long totalSize = 0;
16501657
BlockInfo curBlock;
@@ -1657,8 +1664,8 @@ public BlocksWithLocations getBlocksWithLocations(final DatanodeID datanode,
16571664
totalSize += addBlock(curBlock, results);
16581665
}
16591666
if(totalSize<size) {
1660-
iter = node.getBlockIterator(); // start from the beginning
1661-
for(int i=0; i<startBlock&&totalSize<size; i++) {
1667+
iter = node.getBlockIterator(0, storageInfos); // start from the beginning
1668+
for(int i = 0; i < startBlock && totalSize < size && iter.hasNext(); i++) {
16621669
curBlock = iter.next();
16631670
if(!curBlock.isComplete()) continue;
16641671
if (curBlock.getNumBytes() < minBlockSize) {

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -636,6 +636,17 @@ Iterator<BlockInfo> getBlockIterator(final int startBlock) {
636636
return new BlockIterator(startBlock, getStorageInfos());
637637
}
638638

639+
/**
640+
* Get iterator, which starts iterating from the specified block and storages.
641+
*
642+
* @param startBlock on which blocks are start iterating
643+
* @param storageInfos specified storages
644+
*/
645+
Iterator<BlockInfo> getBlockIterator(
646+
final int startBlock, final DatanodeStorageInfo[] storageInfos) {
647+
return new BlockIterator(startBlock, storageInfos);
648+
}
649+
639650
@VisibleForTesting
640651
public void incrementPendingReplicationWithoutTargets() {
641652
pendingReplicationWithoutTargets++;

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,11 @@ public boolean areBlockContentsStale() {
168168
return blockContentsStale;
169169
}
170170

171+
@VisibleForTesting
172+
public void setBlockContentsStale(boolean value) {
173+
blockContentsStale = value;
174+
}
175+
171176
void markStaleAfterFailover() {
172177
heartbeatedSinceFailover = false;
173178
blockContentsStale = true;

hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestGetBlocks.java

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
4646
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
4747
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
48+
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
4849
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
4950
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
5051
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
@@ -396,4 +397,67 @@ public void testBlockKey() {
396397
}
397398
}
398399

400+
@Test
401+
public void testReadSkipStaleStorage() throws Exception {
402+
final short repFactor = (short) 1;
403+
final int blockNum = 64;
404+
final int storageNum = 2;
405+
final int fileLen = BLOCK_SIZE * blockNum;
406+
final Path path = new Path("testReadSkipStaleStorage");
407+
final Configuration conf = new HdfsConfiguration();
408+
409+
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
410+
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
411+
.numDataNodes(1)
412+
.storagesPerDatanode(storageNum)
413+
.build();
414+
cluster.waitActive();
415+
416+
FileSystem fs = cluster.getFileSystem();
417+
DFSTestUtil.createFile(fs, path, false, 1024, fileLen,
418+
BLOCK_SIZE, repFactor, 0, true);
419+
420+
// get datanode info
421+
ClientProtocol client = NameNodeProxies.createProxy(conf,
422+
cluster.getFileSystem(0).getUri(),
423+
ClientProtocol.class).getProxy();
424+
DatanodeInfo[] dataNodes = client.getDatanodeReport(DatanodeReportType.ALL);
425+
426+
// get storage info
427+
BlockManager bm0 = cluster.getNamesystem(0).getBlockManager();
428+
DatanodeStorageInfo[] storageInfos = bm0.getDatanodeManager()
429+
.getDatanode(dataNodes[0].getDatanodeUuid()).getStorageInfos();
430+
431+
InetSocketAddress addr = new InetSocketAddress("localhost",
432+
cluster.getNameNodePort());
433+
NamenodeProtocol namenode = NameNodeProxies.createProxy(conf,
434+
DFSUtilClient.getNNUri(addr), NamenodeProtocol.class).getProxy();
435+
436+
// check blocks count equals to blockNum
437+
BlockWithLocations[] blocks = namenode.getBlocks(
438+
dataNodes[0], fileLen*2, 0).getBlocks();
439+
assertEquals(blockNum, blocks.length);
440+
441+
// calculate the block count on storage[0]
442+
int count = 0;
443+
for (BlockWithLocations b : blocks) {
444+
for (String s : b.getStorageIDs()) {
445+
if (s.equals(storageInfos[0].getStorageID())) {
446+
count++;
447+
}
448+
}
449+
}
450+
451+
// set storage[0] stale
452+
storageInfos[0].setBlockContentsStale(true);
453+
blocks = namenode.getBlocks(
454+
dataNodes[0], fileLen*2, 0).getBlocks();
455+
assertEquals(blockNum - count, blocks.length);
456+
457+
// set all storage stale
458+
bm0.getDatanodeManager().markAllDatanodesStale();
459+
blocks = namenode.getBlocks(
460+
dataNodes[0], fileLen*2, 0).getBlocks();
461+
assertEquals(0, blocks.length);
462+
}
399463
}

hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerService.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ public void testBalancerServiceBalanceTwice() throws Exception {
123123
TestBalancer.initConf(conf);
124124
try {
125125
setupCluster(conf);
126+
TestBalancerWithHANameNodes.waitStoragesNoStale(cluster, client, 0);
126127
long totalCapacity = addOneDataNode(conf); // make cluster imbalanced
127128

128129
Thread balancerThread =
@@ -174,6 +175,7 @@ public void testBalancerServiceOnError() throws Exception {
174175
cluster.transitionToActive(0);
175176
cluster.waitActive();
176177

178+
TestBalancerWithHANameNodes.waitStoragesNoStale(cluster, client, 0);
177179
long totalCapacity = addOneDataNode(conf);
178180
TestBalancer.waitForBalancer(totalUsedSpace, totalCapacity, client,
179181
cluster, BalancerParameters.DEFAULT);

hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,11 +44,16 @@
4444
import org.apache.hadoop.hdfs.NameNodeProxies;
4545
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
4646
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
47+
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
48+
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
4749
import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
50+
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
51+
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
4852
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
4953
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
5054
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
5155
import org.apache.hadoop.hdfs.server.namenode.ha.ObserverReadProxyProvider;
56+
import org.apache.hadoop.test.GenericTestUtils;
5257
import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
5358
import org.junit.Test;
5459
import org.slf4j.LoggerFactory;
@@ -71,6 +76,26 @@ public class TestBalancerWithHANameNodes {
7176
TestBalancer.initTestSetup();
7277
}
7378

79+
public static void waitStoragesNoStale(MiniDFSCluster cluster,
80+
ClientProtocol client, int nnIndex) throws Exception {
81+
// trigger a full block report and wait all storages out of stale
82+
cluster.triggerBlockReports();
83+
DatanodeInfo[] dataNodes = client.getDatanodeReport(HdfsConstants.DatanodeReportType.ALL);
84+
GenericTestUtils.waitFor(() -> {
85+
BlockManager bm = cluster.getNamesystem(nnIndex).getBlockManager();
86+
for (DatanodeInfo dn : dataNodes) {
87+
DatanodeStorageInfo[] storageInfos = bm.getDatanodeManager()
88+
.getDatanode(dn.getDatanodeUuid()).getStorageInfos();
89+
for (DatanodeStorageInfo s : storageInfos) {
90+
if (s.areBlockContentsStale()) {
91+
return false;
92+
}
93+
}
94+
}
95+
return true;
96+
}, 300, 60000);
97+
}
98+
7499
/**
75100
* Test a cluster with even distribution, then a new empty node is added to
76101
* the cluster. Test start a cluster with specified number of nodes, and fills
@@ -99,13 +124,17 @@ public void testBalancerWithHANameNodes() throws Exception {
99124
client = NameNodeProxies.createProxy(conf, FileSystem.getDefaultUri(conf),
100125
ClientProtocol.class).getProxy();
101126

102-
doTest(conf);
127+
doTest(conf, true);
103128
} finally {
104129
cluster.shutdown();
105130
}
106131
}
107132

108133
void doTest(Configuration conf) throws Exception {
134+
doTest(conf, false);
135+
}
136+
137+
void doTest(Configuration conf, boolean withHA) throws Exception {
109138
int numOfDatanodes = TEST_CAPACITIES.length;
110139
long totalCapacity = TestBalancer.sum(TEST_CAPACITIES);
111140
// fill up the cluster to be 30% full
@@ -119,6 +148,12 @@ void doTest(Configuration conf) throws Exception {
119148
HATestUtil.waitForStandbyToCatchUp(cluster.getNameNode(0),
120149
cluster.getNameNode(1));
121150
}
151+
152+
// all storages are stale after HA
153+
if (withHA) {
154+
waitStoragesNoStale(cluster, client, 0);
155+
}
156+
122157
// start up an empty node with the same capacity and on the same rack
123158
long newNodeCapacity = TestBalancer.CAPACITY; // new node's capacity
124159
String newNodeRack = TestBalancer.RACK2; // new node's rack

0 commit comments

Comments
 (0)