3333import java .util .HashMap ;
3434import java .util .HashSet ;
3535import java .util .Map ;
36+ import java .util .Set ;
37+ import java .util .concurrent .ConcurrentHashMap ;
3638
3739import javax .crypto .SecretKey ;
3840
7476import org .apache .hadoop .yarn .server .resourcemanager .reservation .ReservationSystemTestUtil ;
7577import org .apache .hadoop .yarn .server .resourcemanager .reservation .ReservationSystemUtil ;
7678import org .apache .hadoop .yarn .server .resourcemanager .rmapp .RMApp ;
79+ import org .apache .hadoop .yarn .server .resourcemanager .rmapp .RMAppEvent ;
7780import org .apache .hadoop .yarn .server .resourcemanager .rmapp .RMAppState ;
7881import org .apache .hadoop .yarn .server .resourcemanager .rmapp .attempt .AggregateAppResourceUsage ;
7982import org .apache .hadoop .yarn .server .resourcemanager .rmapp .attempt .RMAppAttempt ;
@@ -99,9 +102,8 @@ public class RMStateStoreTestBase {
99102
100103 static class TestDispatcher implements Dispatcher , EventHandler <Event > {
101104
102- ApplicationAttemptId attemptId ;
103-
104- boolean notified = false ;
105+ private final Set <ApplicationAttemptId > handledAttempt = ConcurrentHashMap .newKeySet ();
106+ private final Set <ApplicationId > handledApps = ConcurrentHashMap .newKeySet ();
105107
106108 @ SuppressWarnings ("rawtypes" )
107109 @ Override
@@ -113,11 +115,16 @@ public void register(Class<? extends Enum> eventType,
113115 public void handle (Event event ) {
114116 if (event instanceof RMAppAttemptEvent ) {
115117 RMAppAttemptEvent rmAppAttemptEvent = (RMAppAttemptEvent ) event ;
116- assertEquals (attemptId , rmAppAttemptEvent .getApplicationAttemptId ());
117- }
118- notified = true ;
119- synchronized (this ) {
120- notifyAll ();
118+ synchronized (handledAttempt ) {
119+ handledAttempt .add (rmAppAttemptEvent .getApplicationAttemptId ());
120+ handledAttempt .notifyAll ();
121+ }
122+ } else if (event instanceof RMAppEvent ) {
123+ RMAppEvent rmAppEvent = (RMAppEvent ) event ;
124+ synchronized (handledApps ) {
125+ handledApps .add (rmAppEvent .getApplicationId ());
126+ handledApps .notifyAll ();
127+ }
121128 }
122129 }
123130
@@ -127,6 +134,38 @@ public EventHandler<Event> getEventHandler() {
127134 return this ;
128135 }
129136
137+ void waitNotify (ApplicationAttemptId attemptId ) {
138+ long startTime = System .currentTimeMillis ();
139+ while (!handledAttempt .contains (attemptId )) {
140+ synchronized (handledAttempt ) {
141+ try {
142+ handledAttempt .wait (100 );
143+ } catch (InterruptedException e ) {
144+ LOG .trace ("Interrupted" , e );
145+ }
146+ }
147+ if (System .currentTimeMillis () - startTime > 1000 *60 ) {
148+ fail ("Timed out attempt store notification" );
149+ }
150+ }
151+ }
152+
153+ void waitNotify (ApplicationId applicationId ) {
154+ long startTime = System .currentTimeMillis ();
155+ while (!handledApps .contains (applicationId )) {
156+ synchronized (handledApps ) {
157+ try {
158+ handledApps .wait (100 );
159+ } catch (InterruptedException e ) {
160+ LOG .trace ("Interrupted" , e );
161+ }
162+ }
163+ if (System .currentTimeMillis () - startTime > 1000 *60 ) {
164+ fail ("Timed out attempt store notification" );
165+ }
166+ }
167+ }
168+
130169 }
131170
132171 public static class StoreStateVerifier {
@@ -148,23 +187,6 @@ public long getEpochRange() {
148187 return epochRange ;
149188 }
150189
151- void waitNotify (TestDispatcher dispatcher ) {
152- long startTime = System .currentTimeMillis ();
153- while (!dispatcher .notified ) {
154- synchronized (dispatcher ) {
155- try {
156- dispatcher .wait (1000 );
157- } catch (InterruptedException e ) {
158- e .printStackTrace ();
159- }
160- }
161- if (System .currentTimeMillis () - startTime > 1000 *60 ) {
162- fail ("Timed out attempt store notification" );
163- }
164- }
165- dispatcher .notified = false ;
166- }
167-
168190 protected RMApp storeApp (RMStateStore store , ApplicationId appId ,
169191 long submitTime , long startTime ) throws Exception {
170192 ApplicationSubmissionContext context =
@@ -204,17 +226,15 @@ protected RMAppAttempt storeAttempt(RMStateStore store,
204226 .thenReturn (mockRmAppAttemptMetrics );
205227 when (mockRmAppAttemptMetrics .getAggregateAppResourceUsage ())
206228 .thenReturn (new AggregateAppResourceUsage (new HashMap <>()));
207- dispatcher .attemptId = attemptId ;
208229 store .storeNewApplicationAttempt (mockAttempt );
209- waitNotify (dispatcher );
230+ dispatcher . waitNotify (attemptId );
210231 return mockAttempt ;
211232 }
212233
213234 protected void updateAttempt (RMStateStore store , TestDispatcher dispatcher ,
214235 ApplicationAttemptStateData attemptState ) {
215- dispatcher .attemptId = attemptState .getAttemptId ();
216236 store .updateApplicationAttemptState (attemptState );
217- waitNotify (dispatcher );
237+ dispatcher . waitNotify (attemptState . getAttemptId () );
218238 }
219239
220240 void testRMAppStateStore (RMStateStoreHelper stateStoreHelper )
@@ -642,6 +662,8 @@ private ArrayList<RMApp> createAndStoreApps(
642662 public void testDeleteStore (RMStateStoreHelper stateStoreHelper )
643663 throws Exception {
644664 RMStateStore store = stateStoreHelper .getRMStateStore ();
665+ TestDispatcher dispatcher = new TestDispatcher ();
666+ store .setRMDispatcher (dispatcher );
645667 ArrayList <RMApp > appList = createAndStoreApps (stateStoreHelper , store , 5 );
646668 store .deleteStore ();
647669 // verify apps deleted
0 commit comments