3434import org .apache .hadoop .yarn .factory .providers .RecordFactoryProvider ;
3535import org .apache .hadoop .yarn .server .resourcemanager .MockNodes ;
3636import org .apache .hadoop .yarn .server .resourcemanager .MockRM ;
37+ import org .apache .hadoop .yarn .server .resourcemanager .nodelabels .RMNodeLabelsManager ;
3738import org .apache .hadoop .yarn .server .resourcemanager .rmapp .RMAppImpl ;
3839import org .apache .hadoop .yarn .server .resourcemanager .rmapp .attempt .RMAppAttemptImpl ;
3940import org .apache .hadoop .yarn .server .resourcemanager .rmapp .attempt .RMAppAttemptMetrics ;
5859import java .util .PriorityQueue ;
5960
6061import static org .apache .hadoop .yarn .server .resourcemanager .resource .TestResourceProfiles .TEST_CONF_RESET_RESOURCE_TYPES ;
62+ import static org .junit .Assert .assertEquals ;
63+ import static org .junit .Assert .assertFalse ;
64+ import static org .junit .Assert .assertTrue ;
6165import static org .mockito .Mockito .mock ;
6266import static org .mockito .Mockito .when ;
6367
@@ -68,9 +72,22 @@ private String getResourceName(int idx) {
6872 return "resource-" + idx ;
6973 }
7074
75+ // This test is run only when when -DRunCapacitySchedulerPerfTests=true is set
76+ // on the command line. In addition, this test has tunables for the following:
77+ // Number of queues: -DNumberOfQueues (default=100)
78+ // Number of total apps: -DNumberOfApplications (default=200)
79+ // Percentage of queues with apps: -DPercentActiveQueues (default=100)
80+ // E.G.:
81+ // mvn test -Dtest=TestCapacitySchedulerPerf -Dsurefire.fork.timeout=1800 \
82+ // -DRunCapacitySchedulerPerfTests=true -DNumberOfQueues=50 \
83+ // -DNumberOfApplications=200 -DPercentActiveQueues=100
84+ // Note that the surefire.fork.timeout flag is added because these tests could
85+ // take longer than the surefire timeout.
7186 private void testUserLimitThroughputWithNumberOfResourceTypes (
72- int numOfResourceTypes )
87+ int numOfResourceTypes , int numQueues , int pctActiveQueues , int appCount )
7388 throws Exception {
89+ Assume .assumeTrue (Boolean .valueOf (
90+ System .getProperty ("RunCapacitySchedulerPerfTests" )));
7491 if (numOfResourceTypes > 2 ) {
7592 // Initialize resource map
7693 Map <String , ResourceInformation > riMap = new HashMap <>();
@@ -89,22 +106,16 @@ private void testUserLimitThroughputWithNumberOfResourceTypes(
89106 ResourceUtils .initializeResourcesFromResourceInformationMap (riMap );
90107 }
91108
92- // Since this is more of a performance unit test, only run if
93- // RunUserLimitThroughput is set (-DRunUserLimitThroughput=true)
94- Assume .assumeTrue (Boolean .valueOf (
95- System .getProperty ("RunCapacitySchedulerPerfTests" )));
109+ final int activeQueues = (int ) (numQueues * (pctActiveQueues /100f ));
110+ final int totalApps = appCount + activeQueues ;
111+ // extra apps to get started with user limit
96112
97113 CapacitySchedulerConfiguration csconf =
98- new CapacitySchedulerConfiguration ();
99- csconf .setMaximumApplicationMasterResourcePerQueuePercent ("root" , 100.0f );
100- csconf .setMaximumAMResourcePercentPerPartition ("root" , "" , 100.0f );
101- csconf .setMaximumApplicationMasterResourcePerQueuePercent ("root.default" ,
102- 100.0f );
103- csconf .setMaximumAMResourcePercentPerPartition ("root.default" , "" , 100.0f );
104- csconf .setResourceComparator (DominantResourceCalculator .class );
114+ createCSConfWithManyQueues (numQueues );
105115
106116 YarnConfiguration conf = new YarnConfiguration (csconf );
107- // Don't reset resource types since we have already configured resource types
117+ // Don't reset resource types since we have already configured resource
118+ // types
108119 conf .setBoolean (TEST_CONF_RESET_RESOURCE_TYPES , false );
109120 conf .setClass (YarnConfiguration .RM_SCHEDULER , CapacityScheduler .class ,
110121 ResourceScheduler .class );
@@ -113,25 +124,29 @@ private void testUserLimitThroughputWithNumberOfResourceTypes(
113124 rm .start ();
114125
115126 CapacityScheduler cs = (CapacityScheduler ) rm .getResourceScheduler ();
116- LeafQueue qb = (LeafQueue )cs .getQueue ("default" );
117127
118- // For now make user limit large so we can activate all applications
119- qb .setUserLimitFactor ((float )100.0 );
120- qb .setupConfigurableCapacities ();
128+ LeafQueue [] lqs = new LeafQueue [numQueues ];
129+ for (int i = 0 ; i < numQueues ; i ++) {
130+ String queueName = String .format ("%03d" , i );
131+ LeafQueue qb = (LeafQueue )cs .getQueue (queueName );
132+ // For now make user limit large so we can activate all applications
133+ qb .setUserLimitFactor ((float )100.0 );
134+ qb .setupConfigurableCapacities ();
135+ lqs [i ] = qb ;
136+ }
121137
122138 SchedulerEvent addAppEvent ;
123139 SchedulerEvent addAttemptEvent ;
124140 Container container = mock (Container .class );
125141 ApplicationSubmissionContext submissionContext =
126142 mock (ApplicationSubmissionContext .class );
127143
128- final int appCount = 100 ;
129- ApplicationId [] appids = new ApplicationId [appCount ];
130- RMAppAttemptImpl [] attempts = new RMAppAttemptImpl [appCount ];
131- ApplicationAttemptId [] appAttemptIds = new ApplicationAttemptId [appCount ];
132- RMAppImpl [] apps = new RMAppImpl [appCount ];
133- RMAppAttemptMetrics [] attemptMetrics = new RMAppAttemptMetrics [appCount ];
134- for (int i =0 ; i <appCount ; i ++) {
144+ ApplicationId [] appids = new ApplicationId [totalApps ];
145+ RMAppAttemptImpl [] attempts = new RMAppAttemptImpl [totalApps ];
146+ ApplicationAttemptId [] appAttemptIds = new ApplicationAttemptId [totalApps ];
147+ RMAppImpl [] apps = new RMAppImpl [totalApps ];
148+ RMAppAttemptMetrics [] attemptMetrics = new RMAppAttemptMetrics [totalApps ];
149+ for (int i =0 ; i <totalApps ; i ++) {
135150 appids [i ] = BuilderUtils .newApplicationId (100 , i );
136151 appAttemptIds [i ] =
137152 BuilderUtils .newApplicationAttemptId (appids [i ], 1 );
@@ -148,34 +163,34 @@ private void testUserLimitThroughputWithNumberOfResourceTypes(
148163 when (apps [i ].getCurrentAppAttempt ()).thenReturn (attempts [i ]);
149164
150165 rm .getRMContext ().getRMApps ().put (appids [i ], apps [i ]);
166+ String queueName = lqs [i % activeQueues ].getQueueName ();
151167 addAppEvent =
152- new AppAddedSchedulerEvent (appids [i ], "default" , "user1" );
168+ new AppAddedSchedulerEvent (appids [i ], queueName , "user1" );
153169 cs .handle (addAppEvent );
154170 addAttemptEvent =
155171 new AppAttemptAddedSchedulerEvent (appAttemptIds [i ], false );
156172 cs .handle (addAttemptEvent );
157173 }
158174
159- // add nodes to cluster, so cluster has 20GB and 20 vcores
160- Resource nodeResource = Resource .newInstance (10 * GB , 10 );
175+ // add nodes to cluster with enough resources to satisfy all apps
176+ Resource newResource = Resource .newInstance (totalApps * GB , totalApps );
161177 if (numOfResourceTypes > 2 ) {
162178 for (int i = 2 ; i < numOfResourceTypes ; i ++) {
163- nodeResource .setResourceValue (getResourceName (i ), 10 );
179+ newResource .setResourceValue (getResourceName (i ), totalApps );
164180 }
165181 }
166-
167- RMNode node = MockNodes .newNodeInfo (0 , nodeResource , 1 , "127.0.0.1" );
182+ RMNode node = MockNodes .newNodeInfo (0 , newResource , 1 , "127.0.0.1" );
168183 cs .handle (new NodeAddedSchedulerEvent (node ));
169184
170- RMNode node2 = MockNodes .newNodeInfo (0 , nodeResource , 1 , "127.0.0.2" );
185+ RMNode node2 = MockNodes .newNodeInfo (0 , newResource , 1 , "127.0.0.2" );
171186 cs .handle (new NodeAddedSchedulerEvent (node2 ));
172187
173188 Priority u0Priority = TestUtils .createMockPriority (1 );
174189 RecordFactory recordFactory =
175190 RecordFactoryProvider .getRecordFactory (null );
176191
177- FiCaSchedulerApp [] fiCaApps = new FiCaSchedulerApp [appCount ];
178- for (int i =0 ;i <appCount ;i ++) {
192+ FiCaSchedulerApp [] fiCaApps = new FiCaSchedulerApp [totalApps ];
193+ for (int i =0 ;i <totalApps ;i ++) {
179194 fiCaApps [i ] =
180195 cs .getSchedulerApplications ().get (apps [i ].getApplicationId ())
181196 .getCurrentAppAttempt ();
@@ -193,8 +208,30 @@ private void testUserLimitThroughputWithNumberOfResourceTypes(
193208 fiCaApps [i ].updateResourceRequests (
194209 Collections .singletonList (resourceRequest ));
195210 }
196- // Now force everything to be over user limit
197- qb .setUserLimitFactor ((float )0.0 );
211+ // Now force everything to be at user limit
212+ for (int i = 0 ; i < numQueues ; i ++) {
213+ lqs [i ].setUserLimitFactor ((float )0.0 );
214+ }
215+
216+ // allocate one container for each extra apps since
217+ // LeafQueue.canAssignToUser() checks for used > limit, not used >= limit
218+ cs .handle (new NodeUpdateSchedulerEvent (node ));
219+ cs .handle (new NodeUpdateSchedulerEvent (node2 ));
220+
221+ // make sure only the extra apps have allocated containers
222+ for (int i =0 ;i <totalApps ;i ++) {
223+ boolean pending = fiCaApps [i ].getAppSchedulingInfo ().isPending ();
224+ if (i < activeQueues ) {
225+ assertFalse (pending );
226+ assertEquals (0 ,
227+ fiCaApps [i ].getTotalPendingRequestsPerPartition ().size ());
228+ } else {
229+ assertTrue (pending );
230+ assertEquals (1 *GB ,
231+ fiCaApps [i ].getTotalPendingRequestsPerPartition ()
232+ .get (RMNodeLabelsManager .NO_LABEL ).getMemorySize ());
233+ }
234+ }
198235
199236 // Quiet the loggers while measuring throughput
200237 GenericTestUtils .setRootLogLevel (Level .WARN );
@@ -233,27 +270,86 @@ private void testUserLimitThroughputWithNumberOfResourceTypes(
233270 }
234271 System .out .println (
235272 "#ResourceTypes = " + numOfResourceTypes + ". Avg of fastest " + entries
236- + ": " + numerator / (timespent / entries ));
273+ + ": " + numerator / (timespent / entries ) + " ops/sec of "
274+ + appCount + " apps on " + pctActiveQueues + "% of " + numQueues
275+ + " queues." );
276+
277+ // make sure only the extra apps have allocated containers
278+ for (int i =0 ;i <totalApps ;i ++) {
279+ boolean pending = fiCaApps [i ].getAppSchedulingInfo ().isPending ();
280+ if (i < activeQueues ) {
281+ assertFalse (pending );
282+ assertEquals (0 ,
283+ fiCaApps [i ].getTotalPendingRequestsPerPartition ().size ());
284+ } else {
285+ assertTrue (pending );
286+ assertEquals (1 *GB ,
287+ fiCaApps [i ].getTotalPendingRequestsPerPartition ()
288+ .get (RMNodeLabelsManager .NO_LABEL ).getMemorySize ());
289+ }
290+ }
291+
292+ rm .close ();
237293 rm .stop ();
238294 }
239295
240296 @ Test (timeout = 300000 )
241297 public void testUserLimitThroughputForTwoResources () throws Exception {
242- testUserLimitThroughputWithNumberOfResourceTypes (2 );
298+ testUserLimitThroughputWithNumberOfResourceTypes (2 , 1 , 100 , 100 );
243299 }
244300
245301 @ Test (timeout = 300000 )
246302 public void testUserLimitThroughputForThreeResources () throws Exception {
247- testUserLimitThroughputWithNumberOfResourceTypes (3 );
303+ testUserLimitThroughputWithNumberOfResourceTypes (3 , 1 , 100 , 100 );
248304 }
249305
250306 @ Test (timeout = 300000 )
251307 public void testUserLimitThroughputForFourResources () throws Exception {
252- testUserLimitThroughputWithNumberOfResourceTypes (4 );
308+ testUserLimitThroughputWithNumberOfResourceTypes (4 , 1 , 100 , 100 );
253309 }
254310
255311 @ Test (timeout = 300000 )
256312 public void testUserLimitThroughputForFiveResources () throws Exception {
257- testUserLimitThroughputWithNumberOfResourceTypes (5 );
313+ testUserLimitThroughputWithNumberOfResourceTypes (5 , 1 , 100 , 100 );
314+ }
315+
316+ @ Test (timeout = 1800000 )
317+ public void testUserLimitThroughputWithManyQueues () throws Exception {
318+
319+ int numQueues = Integer .getInteger ("NumberOfQueues" , 40 );
320+ int pctActiveQueues = Integer .getInteger ("PercentActiveQueues" , 100 );
321+ int appCount = Integer .getInteger ("NumberOfApplications" , 100 );
322+
323+ testUserLimitThroughputWithNumberOfResourceTypes (
324+ 2 , numQueues , pctActiveQueues , appCount );
325+ }
326+
327+ CapacitySchedulerConfiguration createCSConfWithManyQueues (int numQueues )
328+ throws Exception {
329+ CapacitySchedulerConfiguration csconf =
330+ new CapacitySchedulerConfiguration ();
331+ csconf .setResourceComparator (DominantResourceCalculator .class );
332+ csconf .setMaximumApplicationMasterResourcePerQueuePercent ("root" , 100.0f );
333+ csconf .setMaximumAMResourcePercentPerPartition ("root" , "" , 100.0f );
334+ csconf .setCapacity ("root.default" , 0.0f );
335+ csconf .setOffSwitchPerHeartbeatLimit (numQueues );
336+
337+ float capacity = 100.0f / numQueues ;
338+ String [] subQueues = new String [numQueues ];
339+ for (int i = 0 ; i < numQueues ; i ++) {
340+ String queueName = String .format ("%03d" , i );
341+ String queuePath = "root." + queueName ;
342+ subQueues [i ] = queueName ;
343+ csconf .setMaximumApplicationMasterResourcePerQueuePercent (
344+ queuePath , 100.0f );
345+ csconf .setMaximumAMResourcePercentPerPartition (queuePath , "" , 100.0f );
346+ csconf .setCapacity (queuePath , capacity );
347+ csconf .setUserLimitFactor (queuePath , 100.0f );
348+ csconf .setMaximumCapacity (queuePath , 100.0f );
349+ }
350+
351+ csconf .setQueues ("root" , subQueues );
352+
353+ return csconf ;
258354 }
259355}
0 commit comments