|
23 | 23 | import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; |
24 | 24 | import org.apache.hadoop.hdfs.server.datanode.DataNode; |
25 | 25 | import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; |
| 26 | +import org.apache.hadoop.hdfs.server.namenode.NameNode; |
| 27 | +import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; |
26 | 28 | import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; |
27 | 29 | import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; |
28 | 30 | import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; |
@@ -269,4 +271,83 @@ private StorageBlockReport[] createReports(DatanodeStorage[] dnStorages, |
269 | 271 | } |
270 | 272 | return storageBlockReports; |
271 | 273 | } |
| 274 | + |
| 275 | + @Test(timeout = 360000) |
| 276 | + public void testFirstIncompleteBlockReport() throws Exception { |
| 277 | + HdfsConfiguration conf = new HdfsConfiguration(); |
| 278 | + Random rand = new Random(); |
| 279 | + |
| 280 | + try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) |
| 281 | + .numDataNodes(1).build()) { |
| 282 | + cluster.waitActive(); |
| 283 | + |
| 284 | + FSNamesystem fsn = cluster.getNamesystem(); |
| 285 | + |
| 286 | + NameNode nameNode = cluster.getNameNode(); |
| 287 | + // Pretend to be in safemode. |
| 288 | + NameNodeAdapter.enterSafeMode(nameNode, false); |
| 289 | + |
| 290 | + BlockManager blockManager = fsn.getBlockManager(); |
| 291 | + BlockManager spyBlockManager = spy(blockManager); |
| 292 | + fsn.setBlockManagerForTesting(spyBlockManager); |
| 293 | + String poolId = cluster.getNamesystem().getBlockPoolId(); |
| 294 | + |
| 295 | + NamenodeProtocols rpcServer = cluster.getNameNodeRpc(); |
| 296 | + |
| 297 | + // Test based on one DataNode report to Namenode. |
| 298 | + DataNode dn = cluster.getDataNodes().get(0); |
| 299 | + DatanodeDescriptor datanodeDescriptor = spyBlockManager |
| 300 | + .getDatanodeManager().getDatanode(dn.getDatanodeId()); |
| 301 | + |
| 302 | + DatanodeRegistration dnRegistration = dn.getDNRegistrationForBP(poolId); |
| 303 | + StorageReport[] storages = dn.getFSDataset().getStorageReports(poolId); |
| 304 | + |
| 305 | + // Send heartbeat and request full block report lease. |
| 306 | + HeartbeatResponse hbResponse = rpcServer.sendHeartbeat( |
| 307 | + dnRegistration, storages, 0, 0, 0, 0, 0, null, true, |
| 308 | + SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT); |
| 309 | + |
| 310 | + DelayAnswer delayer = new DelayAnswer(BlockManager.LOG); |
| 311 | + doAnswer(delayer).when(spyBlockManager).processReport( |
| 312 | + any(DatanodeStorageInfo.class), |
| 313 | + any(BlockListAsLongs.class)); |
| 314 | + |
| 315 | + // Trigger sendBlockReport. |
| 316 | + BlockReportContext brContext = new BlockReportContext(1, 0, |
| 317 | + rand.nextLong(), hbResponse.getFullBlockReportLeaseId()); |
| 318 | + // Build every storage with 100 blocks for sending report. |
| 319 | + DatanodeStorage[] datanodeStorages |
| 320 | + = new DatanodeStorage[storages.length]; |
| 321 | + for (int i = 0; i < storages.length; i++) { |
| 322 | + datanodeStorages[i] = storages[i].getStorage(); |
| 323 | + StorageBlockReport[] reports = createReports(datanodeStorages, 100); |
| 324 | + |
| 325 | + // The first multiple send once, simulating the failure of the first report, |
| 326 | + // only send successfully once. |
| 327 | + if(i == 0){ |
| 328 | + rpcServer.blockReport(dnRegistration, poolId, reports, brContext); |
| 329 | + } |
| 330 | + |
| 331 | + // Send blockReport. |
| 332 | + DatanodeCommand datanodeCommand = rpcServer.blockReport(dnRegistration, poolId, reports, |
| 333 | + brContext); |
| 334 | + |
| 335 | + // Wait until BlockManager calls processReport. |
| 336 | + delayer.waitForCall(); |
| 337 | + |
| 338 | + // Allow blockreport to proceed. |
| 339 | + delayer.proceed(); |
| 340 | + |
| 341 | + // Get result, it will not null if process successfully. |
| 342 | + assertTrue(datanodeCommand instanceof FinalizeCommand); |
| 343 | + assertEquals(poolId, ((FinalizeCommand)datanodeCommand) |
| 344 | + .getBlockPoolId()); |
| 345 | + if(i == 0){ |
| 346 | + assertEquals(2, datanodeDescriptor.getStorageInfos()[i].getBlockReportCount()); |
| 347 | + }else{ |
| 348 | + assertEquals(1, datanodeDescriptor.getStorageInfos()[i].getBlockReportCount()); |
| 349 | + } |
| 350 | + } |
| 351 | + } |
| 352 | + } |
272 | 353 | } |
0 commit comments