2525import java .util .List ;
2626import java .util .concurrent .ExecutorService ;
2727import java .util .concurrent .Executors ;
28+ import java .util .concurrent .Semaphore ;
2829import java .util .concurrent .atomic .AtomicBoolean ;
2930import org .junit .After ;
3031import org .junit .Before ;
3132import org .junit .Test ;
3233import org .junit .runner .RunWith ;
3334import org .mockito .Mock ;
35+ import org .mockito .invocation .InvocationOnMock ;
3436import org .mockito .junit .MockitoJUnitRunner ;
37+ import org .mockito .stubbing .Answer ;
3538
3639@ RunWith (MockitoJUnitRunner .class )
3740public class DefaultControllerTest {
@@ -40,16 +43,6 @@ public class DefaultControllerTest {
4043 private RateLimitingQueue <Request > workQueue =
4144 new DefaultRateLimitingQueue <>(Executors .newSingleThreadExecutor ());
4245
43- private final int stepCooldownIntervalInMillis = 500 ;
44-
45- private void cooldown () {
46- try {
47- Thread .sleep (stepCooldownIntervalInMillis );
48- } catch (InterruptedException e ) {
49- e .printStackTrace ();
50- }
51- }
52-
5346 @ Before
5447 public void setUp () throws Exception {}
5548
@@ -67,33 +60,53 @@ public void testStartingStoppingController() throws InterruptedException {
6760 testController .setWorkerThreadPool (Executors .newScheduledThreadPool (1 ));
6861
6962 Request request1 = new Request ("test1" );
70- when (mockReconciler .reconcile (request1 )).thenReturn (new Result (false ));
63+ final Semaphore latch = new Semaphore (1 );
64+ latch .acquire ();
65+ when (mockReconciler .reconcile (request1 ))
66+ .thenAnswer (
67+ new Answer () {
68+ public Object answer (InvocationOnMock invocation ) {
69+ latch .release ();
70+ return new Result (false );
71+ }
72+ });
7173
7274 // emit an event when the controller hasn't started
7375 workQueue .add (request1 );
74- cooldown ();
75- verify (mockReconciler , times (0 )).reconcile (request1 );
76+ latch .acquire ();
7677
78+ verify (mockReconciler , times (0 )).reconcile (request1 );
7779 controllerThead .submit (testController ::run );
7880
79- cooldown ();
81+ latch . acquire ();
8082 verify (mockReconciler , times (1 )).reconcile (request1 );
8183
8284 testController .shutdown ();
8385 Request request2 = new Request ("test2" );
8486
8587 // emit an event after the controller has shutdown
8688 workQueue .add (request2 );
87- cooldown ();
89+ latch .acquire ();
90+
8891 verify (mockReconciler , times (0 )).reconcile (request2 );
8992 }
9093
9194 @ Test
9295 public void testControllerWontStartBeforeReady () throws InterruptedException {
9396
9497 Request request1 = new Request ("test1" );
95- when (mockReconciler .reconcile (request1 )).thenReturn (new Result (false ));
98+ final Semaphore latch = new Semaphore (1 );
99+
100+ when (mockReconciler .reconcile (request1 ))
101+ .thenAnswer (
102+ new Answer () {
103+ public Object answer (InvocationOnMock invocation ) {
104+ latch .release ();
105+ return new Result (false );
106+ }
107+ });
96108
109+ latch .acquire ();
97110 AtomicBoolean ready = new AtomicBoolean (false );
98111 DefaultController testController =
99112 new DefaultController ("" , mockReconciler , workQueue , () -> ready .get ());
@@ -105,12 +118,12 @@ public void testControllerWontStartBeforeReady() throws InterruptedException {
105118
106119 // emit an event when the controller hasn't been ready
107120 workQueue .add (request1 );
108- cooldown ();
121+ latch . acquire ();
109122
110123 verify (mockReconciler , times (0 )).reconcile (request1 );
111124
112125 ready .set (true );
113- cooldown ();
126+ latch . acquire ();
114127 verify (mockReconciler , times (1 )).reconcile (request1 );
115128 }
116129
@@ -120,18 +133,23 @@ public void testControllerKeepsWorkingWhenReconcilerAbortsWithRuntimeException()
120133 AtomicBoolean aborts = new AtomicBoolean (true );
121134 AtomicBoolean resumed = new AtomicBoolean (false );
122135 List <Request > finishedRequests = new ArrayList <>();
136+ final Semaphore latch = new Semaphore (1 );
123137 DefaultController testController =
124138 new DefaultController (
125139 "" ,
126140 new Reconciler () {
127141 @ Override
128142 public Result reconcile (Request request ) {
129- if (aborts .get ()) {
130- throw new RuntimeException ("Oops!!" );
143+ try {
144+ if (aborts .get ()) {
145+ throw new RuntimeException ("Oops!!" );
146+ }
147+ resumed .set (true );
148+ finishedRequests .add (request );
149+ return new Result (false );
150+ } finally {
151+ latch .release ();
131152 }
132- resumed .set (true );
133- finishedRequests .add (request );
134- return new Result (false );
135153 }
136154 },
137155 workQueue );
@@ -142,13 +160,13 @@ public Result reconcile(Request request) {
142160
143161 Request request1 = new Request ("test1" );
144162 workQueue .add (request1 );
145- cooldown ();
163+ latch . acquire ();
146164
147165 aborts .set (false );
148166 // emit another event, the previous one has been backoff'd
149167 Request request2 = new Request ("test2" );
150168 workQueue .add (request2 );
151- cooldown ();
169+ latch . acquire ();
152170 testController .shutdown ();
153171
154172 assertTrue (resumed .get ());
0 commit comments