2424import java .util .concurrent .CountDownLatch ;
2525import java .util .concurrent .ExecutorService ;
2626import java .util .concurrent .Executors ;
27+ import java .util .concurrent .Semaphore ;
2728import java .util .concurrent .atomic .AtomicReference ;
2829import java .util .concurrent .locks .ReentrantLock ;
2930import java .util .function .Consumer ;
@@ -291,6 +292,7 @@ public void testLeaderElectionCaptureException() throws ApiException, Interrupte
291292 () -> {
292293 leaderElector .run (() -> {}, () -> {});
293294 });
295+ // TODO: Remove this sleep
294296 Thread .sleep (Duration .ofSeconds (2 ).toMillis ());
295297 assertEquals (expectedException , actualException .get ().getCause ());
296298 }
@@ -308,7 +310,18 @@ public void testLeaderElectionReportLeaderOnStart() throws ApiException, Interru
308310 setLeaderTransitions (1 );
309311 setLeaseDurationSeconds (60 );
310312 }
313+ })
314+ .thenReturn (
315+ new LeaderElectionRecord () {
316+ {
317+ setHolderIdentity ("foo3" );
318+ setAcquireTime (new Date ());
319+ setRenewTime (new Date ());
320+ setLeaderTransitions (1 );
321+ setLeaseDurationSeconds (60 );
322+ }
311323 });
324+
312325 List <String > notifications = new ArrayList <>();
313326 LeaderElectionConfig leaderElectionConfig = new LeaderElectionConfig ();
314327 leaderElectionConfig .setLock (lock );
@@ -317,25 +330,21 @@ public void testLeaderElectionReportLeaderOnStart() throws ApiException, Interru
317330 leaderElectionConfig .setRenewDeadline (Duration .ofMillis (700 ));
318331 LeaderElector leaderElector = new LeaderElector (leaderElectionConfig );
319332 ExecutorService leaderElectionWorker = Executors .newFixedThreadPool (1 );
333+ final Semaphore s = new Semaphore (2 );
334+ s .acquire (2 );
320335 leaderElectionWorker .submit (
321336 () -> {
322- leaderElector .run (() -> {}, () -> {}, (id ) -> notifications .add (id ));
337+ leaderElector .run (
338+ () -> {},
339+ () -> {},
340+ (id ) -> {
341+ notifications .add (id );
342+ s .release ();
343+ });
323344 });
324345
325- Thread .sleep (Duration .ofSeconds (2 ).toMillis ());
326-
327- when (lock .get ())
328- .thenReturn (
329- new LeaderElectionRecord () {
330- {
331- setHolderIdentity ("foo3" );
332- setAcquireTime (new Date ());
333- setRenewTime (new Date ());
334- setLeaderTransitions (1 );
335- setLeaseDurationSeconds (60 );
336- }
337- });
338- Thread .sleep (Duration .ofSeconds (2 ).toMillis ());
346+ // wait for two notifications to occur.
347+ s .acquire (2 );
339348
340349 assertEquals (2 , notifications .size ());
341350 assertEquals ("foo2" , notifications .get (0 ));
@@ -365,12 +374,20 @@ public void testLeaderElectionShouldReportLeaderItAcquiresOnStart()
365374 leaderElectionConfig .setRenewDeadline (Duration .ofMillis (700 ));
366375 LeaderElector leaderElector = new LeaderElector (leaderElectionConfig );
367376 ExecutorService leaderElectionWorker = Executors .newFixedThreadPool (1 );
377+ Semaphore s = new Semaphore (1 );
378+ s .acquire ();
368379 leaderElectionWorker .submit (
369380 () -> {
370- leaderElector .run (() -> {}, () -> {}, (id ) -> notifications .add (id ));
381+ leaderElector .run (
382+ () -> {},
383+ () -> {},
384+ (id ) -> {
385+ notifications .add (id );
386+ s .release ();
387+ });
371388 });
372389
373- Thread . sleep ( Duration . ofSeconds ( 2 ). toMillis () );
390+ s . acquire ( );
374391 assertEquals (1 , notifications .size ());
375392 assertEquals ("foo1" , notifications .get (0 ));
376393 }
0 commit comments