Skip to content

Commit

Permalink
Making test cases resilient to delayed thread operations (#612)
Browse files Browse the repository at this point in the history
* Making test cases resilient to delayed thread operations

* Setting the initial demand in test cases to be in line with service's coral initial demand.
  • Loading branch information
ashwing authored and micah-jaffe committed Sep 19, 2019
1 parent 3fead19 commit c197d35
Showing 1 changed file with 22 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.mockito.Mock;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.runners.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import software.amazon.awssdk.core.SdkBytes;
Expand Down Expand Up @@ -409,6 +410,16 @@ public void testIfStreamOfEventsAreDeliveredInOrderWithBackpressureAdheringServi

@Test
public void testIfStreamOfEventsAndOnCompleteAreDeliveredInOrderWithBackpressureAdheringServicePublisher() throws Exception {

CountDownLatch onS2SCallLatch = new CountDownLatch(2);

doAnswer(new Answer() {
@Override public Object answer(InvocationOnMock invocation) throws Throwable {
onS2SCallLatch.countDown();
return null;
}
}).when(kinesisClient).subscribeToShard(any(SubscribeToShardRequest.class), any());

FanOutRecordsPublisher source = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN);

ArgumentCaptor<FanOutRecordsPublisher.RecordSubscription> captor = ArgumentCaptor
Expand All @@ -427,7 +438,7 @@ public void testIfStreamOfEventsAndOnCompleteAreDeliveredInOrderWithBackpressure

CountDownLatch servicePublisherTaskCompletionLatch = new CountDownLatch(2);
int totalServicePublisherEvents = 1000;
int initialDemand = 10;
int initialDemand = 9;
int triggerCompleteAtNthEvent = 200;
BackpressureAdheringServicePublisher servicePublisher = new BackpressureAdheringServicePublisher(
servicePublisherAction, totalServicePublisherEvents, servicePublisherTaskCompletionLatch,
Expand Down Expand Up @@ -501,6 +512,8 @@ public void testIfStreamOfEventsAndOnCompleteAreDeliveredInOrderWithBackpressure

assertThat(source.getCurrentSequenceNumber(), equalTo(triggerCompleteAtNthEvent + ""));
// In non-shard end cases, upon successful completion, the publisher would re-subscribe to service.
// Let's wait for sometime to allow the publisher to re-subscribe
onS2SCallLatch.await(5000, TimeUnit.MILLISECONDS);
verify(kinesisClient, times(2)).subscribeToShard(any(SubscribeToShardRequest.class), flowCaptor.capture());

}
Expand Down Expand Up @@ -531,8 +544,9 @@ public void testIfShardEndEventAndOnCompleteAreDeliveredInOrderWithBackpressureA
.build());

CountDownLatch servicePublisherTaskCompletionLatch = new CountDownLatch(2);
CountDownLatch onCompleteLatch = new CountDownLatch(1);
int totalServicePublisherEvents = 1000;
int initialDemand = 10;
int initialDemand = 9;
int triggerCompleteAtNthEvent = 200;
BackpressureAdheringServicePublisher servicePublisher = new BackpressureAdheringServicePublisher(
servicePublisherAction, totalServicePublisherEvents, servicePublisherTaskCompletionLatch,
Expand Down Expand Up @@ -578,6 +592,7 @@ public void testIfShardEndEventAndOnCompleteAreDeliveredInOrderWithBackpressureA

@Override public void onComplete() {
isOnCompleteTriggered[0] = true;
onCompleteLatch.countDown();
}
}, source);

Expand Down Expand Up @@ -610,6 +625,7 @@ public void testIfShardEndEventAndOnCompleteAreDeliveredInOrderWithBackpressureA

assertNull(source.getCurrentSequenceNumber());
// With shard end event, onComplete must be propagated to the subscriber.
onCompleteLatch.await(5000, TimeUnit.MILLISECONDS);
assertTrue("OnComplete should be triggered", isOnCompleteTriggered[0]);

}
Expand All @@ -633,8 +649,9 @@ public void testIfStreamOfEventsAndOnErrorAreDeliveredInOrderWithBackpressureAdh
.build());

CountDownLatch servicePublisherTaskCompletionLatch = new CountDownLatch(2);
CountDownLatch onErrorReceiveLatch = new CountDownLatch(1);
int totalServicePublisherEvents = 1000;
int initialDemand = 10;
int initialDemand = 9;
int triggerErrorAtNthEvent = 241;
BackpressureAdheringServicePublisher servicePublisher = new BackpressureAdheringServicePublisher(
servicePublisherAction, totalServicePublisherEvents, servicePublisherTaskCompletionLatch,
Expand Down Expand Up @@ -675,6 +692,7 @@ public void testIfStreamOfEventsAndOnErrorAreDeliveredInOrderWithBackpressureAdh
@Override public void onError(Throwable t) {
log.error("Caught throwable in subscriber", t);
isOnErrorThrown[0] = true;
onErrorReceiveLatch.countDown();
}

@Override public void onComplete() {
Expand Down Expand Up @@ -709,6 +727,7 @@ public void testIfStreamOfEventsAndOnErrorAreDeliveredInOrderWithBackpressureAdh
});

assertThat(source.getCurrentSequenceNumber(), equalTo(triggerErrorAtNthEvent + ""));
onErrorReceiveLatch.await(5000, TimeUnit.MILLISECONDS);
assertTrue("OnError should have been thrown", isOnErrorThrown[0]);

}
Expand Down

0 comments on commit c197d35

Please sign in to comment.