1818package org .apache .hadoop .hdfs .server .balancer ;
1919
2020import static org .junit .Assert .assertEquals ;
21+ import static org .junit .Assert .assertTrue ;
22+ import static org .junit .Assert .assertFalse ;
2123
2224import java .io .IOException ;
2325import java .net .URI ;
4446import org .apache .hadoop .hdfs .protocol .LocatedBlock ;
4547import org .apache .hadoop .hdfs .protocol .LocatedBlocks ;
4648import org .apache .hadoop .hdfs .protocol .HdfsConstants .DatanodeReportType ;
49+ import org .apache .hadoop .hdfs .server .blockmanagement .BlockPlacementPolicy ;
4750import org .apache .hadoop .hdfs .server .blockmanagement .BlockPlacementPolicyWithNodeGroup ;
51+ import org .apache .hadoop .hdfs .server .blockmanagement .BlockPlacementStatus ;
4852import org .apache .hadoop .net .NetworkTopology ;
4953import org .apache .hadoop .net .NetworkTopologyWithNodeGroup ;
5054import org .apache .hadoop .test .LambdaTestUtils ;
51- import org .junit .Assert ;
5255import org .junit .Test ;
5356
5457/**
@@ -84,7 +87,7 @@ static Configuration createConf() {
8487 TestBalancer .initConf (conf );
8588 conf .setLong (DFSConfigKeys .DFS_BLOCK_SIZE_KEY , DEFAULT_BLOCK_SIZE );
8689 conf .setBoolean (DFSConfigKeys .DFS_USE_DFS_NETWORK_TOPOLOGY_KEY , false );
87- conf .set (CommonConfigurationKeysPublic .NET_TOPOLOGY_IMPL_KEY ,
90+ conf .set (CommonConfigurationKeysPublic .NET_TOPOLOGY_IMPL_KEY ,
8891 NetworkTopologyWithNodeGroup .class .getName ());
8992 conf .set (DFSConfigKeys .DFS_BLOCK_REPLICATOR_CLASSNAME_KEY ,
9093 BlockPlacementPolicyWithNodeGroup .class .getName ());
@@ -192,8 +195,8 @@ private void runBalancerCanFinish(Configuration conf,
192195 // start rebalancing
193196 Collection <URI > namenodes = DFSUtil .getInternalNsRpcUris (conf );
194197 final int r = Balancer .run (namenodes , BalancerParameters .DEFAULT , conf );
195- Assert . assertTrue ( r == ExitStatus . SUCCESS . getExitCode () ||
196- ( r == ExitStatus .NO_MOVE_PROGRESS .getExitCode ()) );
198+ assertEquals ( "Balancer did not exit with NO_MOVE_PROGRESS" ,
199+ ExitStatus .NO_MOVE_PROGRESS .getExitCode (), r );
197200 waitForHeartBeat (totalUsedSpace , totalCapacity );
198201 LOG .info ("Rebalancing with default factor." );
199202 }
@@ -211,6 +214,30 @@ private Set<ExtendedBlock> getBlocksOnRack(List<LocatedBlock> blks, String rack)
211214 return ret ;
212215 }
213216
217+ private void verifyNetworkTopology () {
218+ NetworkTopology topology =
219+ cluster .getNamesystem ().getBlockManager ().getDatanodeManager ().
220+ getNetworkTopology ();
221+ assertTrue ("must be an instance of NetworkTopologyWithNodeGroup" ,
222+ topology instanceof NetworkTopologyWithNodeGroup );
223+ }
224+
225+ private void verifyProperBlockPlacement (String file ,
226+ long length , int numOfReplicas ) throws IOException {
227+ BlockPlacementPolicy placementPolicy =
228+ cluster .getNamesystem ().getBlockManager ().getBlockPlacementPolicy ();
229+ List <LocatedBlock > locatedBlocks = client .
230+ getBlockLocations (file , 0 , length ).getLocatedBlocks ();
231+ assertFalse ("No blocks found for file " + file , locatedBlocks .isEmpty ());
232+ for (LocatedBlock locatedBlock : locatedBlocks ) {
233+ BlockPlacementStatus status = placementPolicy .verifyBlockPlacement (
234+ locatedBlock .getLocations (), numOfReplicas );
235+ assertTrue ("Block placement policy was not satisfied for block " +
236+ locatedBlock .getBlock ().getBlockId (),
237+ status .isPlacementPolicySatisfied ());
238+ }
239+ }
240+
214241 /**
215242 * Create a cluster with even distribution, and a new empty node is added to
216243 * the cluster, then test rack locality for balancer policy.
@@ -232,6 +259,7 @@ public void testBalancerWithRackLocality() throws Exception {
232259 cluster = new MiniDFSClusterWithNodeGroup (builder );
233260 try {
234261 cluster .waitActive ();
262+ verifyNetworkTopology ();
235263 client = NameNodeProxies .createProxy (conf ,
236264 cluster .getFileSystem (0 ).getUri (),
237265 ClientProtocol .class ).getProxy ();
@@ -258,12 +286,14 @@ public void testBalancerWithRackLocality() throws Exception {
258286 totalCapacity += newCapacity ;
259287
260288 // run balancer and validate results
261- runBalancerCanFinish (conf , totalUsedSpace , totalCapacity );
289+ runBalancer (conf , totalUsedSpace , totalCapacity );
262290
263291 lbs = client .getBlockLocations (filePath .toUri ().getPath (), 0 , length );
264292 Set <ExtendedBlock > after = getBlocksOnRack (lbs .getLocatedBlocks (), RACK0 );
265293 assertEquals (before , after );
266-
294+
295+ verifyProperBlockPlacement (filePath .toUri ().getPath (), length ,
296+ numOfDatanodes );
267297 } finally {
268298 cluster .shutdown ();
269299 }
@@ -291,15 +321,18 @@ public void testBalancerWithNodeGroup() throws Exception {
291321 cluster = new MiniDFSClusterWithNodeGroup (builder );
292322 try {
293323 cluster .waitActive ();
324+ verifyNetworkTopology ();
294325 client = NameNodeProxies .createProxy (conf ,
295326 cluster .getFileSystem (0 ).getUri (),
296327 ClientProtocol .class ).getProxy ();
297328
298329 long totalCapacity = TestBalancer .sum (capacities );
330+ int numOfReplicas = numOfDatanodes / 2 ;
299331 // fill up the cluster to be 20% full
300332 long totalUsedSpace = totalCapacity * 2 / 10 ;
301- TestBalancer .createFile (cluster , filePath , totalUsedSpace / (numOfDatanodes /2 ),
302- (short ) (numOfDatanodes /2 ), 0 );
333+ long length = totalUsedSpace / numOfReplicas ;
334+ TestBalancer .createFile (cluster , filePath , length ,
335+ (short ) numOfReplicas , 0 );
303336
304337 long newCapacity = CAPACITY ;
305338 String newRack = RACK1 ;
@@ -313,6 +346,9 @@ public void testBalancerWithNodeGroup() throws Exception {
313346 // run balancer and validate results
314347 runBalancer (conf , totalUsedSpace , totalCapacity );
315348
349+ verifyProperBlockPlacement (filePath .toUri ().getPath (), length ,
350+ numOfReplicas );
351+
316352 } finally {
317353 cluster .shutdown ();
318354 }
@@ -345,19 +381,25 @@ public void testBalancerEndInNoMoveProgress() throws Exception {
345381 cluster = new MiniDFSClusterWithNodeGroup (builder );
346382 try {
347383 cluster .waitActive ();
384+ verifyNetworkTopology ();
348385 client = NameNodeProxies .createProxy (conf ,
349386 cluster .getFileSystem (0 ).getUri (),
350387 ClientProtocol .class ).getProxy ();
351388
352389 long totalCapacity = TestBalancer .sum (capacities );
353390 // fill up the cluster to be 60% full
354391 long totalUsedSpace = totalCapacity * 6 / 10 ;
355- TestBalancer .createFile (cluster , filePath , totalUsedSpace / 3 ,
356- (short ) (3 ), 0 );
392+ int numOfReplicas = 3 ;
393+ long length = totalUsedSpace / 3 ;
394+ TestBalancer .createFile (cluster , filePath , length ,
395+ (short ) numOfReplicas , 0 );
357396
358397 // run balancer which can finish in 5 iterations with no block movement.
359398 runBalancerCanFinish (conf , totalUsedSpace , totalCapacity );
360399
400+ verifyProperBlockPlacement (filePath .toUri ().getPath (), length ,
401+ numOfReplicas );
402+
361403 } finally {
362404 cluster .shutdown ();
363405 }
0 commit comments