1414import org .reactivestreams .Publisher ;
1515import org .reactivestreams .Subscriber ;
1616import org .reactivestreams .Subscription ;
17- import org .reactivestreams .tck .flow .support .SubscriberBufferOverflowException ;
1817import org .reactivestreams .tck .flow .support .Optional ;
18+ import org .reactivestreams .tck .flow .support .SubscriberBufferOverflowException ;
1919
2020import java .util .Collections ;
2121import java .util .LinkedList ;
2424import java .util .concurrent .CopyOnWriteArrayList ;
2525import java .util .concurrent .CountDownLatch ;
2626import java .util .concurrent .TimeUnit ;
27- import java .util .concurrent .atomic .AtomicBoolean ;
2827import java .util .concurrent .atomic .AtomicReference ;
2928
29+ import static java .util .concurrent .TimeUnit .MILLISECONDS ;
30+ import static java .util .concurrent .TimeUnit .NANOSECONDS ;
3031import static org .testng .Assert .assertTrue ;
3132import static org .testng .Assert .fail ;
3233
@@ -37,8 +38,10 @@ public class TestEnvironment {
3738 private static final long DEFAULT_TIMEOUT_MILLIS = 100 ;
3839
3940 private static final String DEFAULT_NO_SIGNALS_TIMEOUT_MILLIS_ENV = "DEFAULT_NO_SIGNALS_TIMEOUT_MILLIS" ;
41+ private static final String DEFAULT_POLL_TIMEOUT_MILLIS_ENV = "DEFAULT_POLL_TIMEOUT_MILLIS_ENV" ;
4042
4143 private final long defaultTimeoutMillis ;
44+ private final long defaultPollTimeoutMillis ;
4245 private final long defaultNoSignalsTimeoutMillis ;
4346 private final boolean printlnDebug ;
4447
@@ -51,14 +54,46 @@ public class TestEnvironment {
5154 * run the tests.
5255 * @param defaultTimeoutMillis default timeout to be used in all expect* methods
5356 * @param defaultNoSignalsTimeoutMillis default timeout to be used when no further signals are expected anymore
57+ * @param defaultPollTimeoutMillis default amount of time to poll for events if {@code defaultTimeoutMillis} isn't
58+ * preempted by an asynchronous event.
5459 * @param printlnDebug if true, signals such as OnNext / Request / OnComplete etc will be printed to standard output,
5560 */
56- public TestEnvironment (long defaultTimeoutMillis , long defaultNoSignalsTimeoutMillis , boolean printlnDebug ) {
61+ public TestEnvironment (long defaultTimeoutMillis , long defaultNoSignalsTimeoutMillis , long defaultPollTimeoutMillis ,
62+ boolean printlnDebug ) {
5763 this .defaultTimeoutMillis = defaultTimeoutMillis ;
64+ this .defaultPollTimeoutMillis = defaultPollTimeoutMillis ;
5865 this .defaultNoSignalsTimeoutMillis = defaultNoSignalsTimeoutMillis ;
5966 this .printlnDebug = printlnDebug ;
6067 }
6168
69+ /**
70+ * Tests must specify the timeout for expected outcome of asynchronous
71+ * interactions. Longer timeout does not invalidate the correctness of
72+ * the implementation, but can in some cases result in longer time to
73+ * run the tests.
74+ * @param defaultTimeoutMillis default timeout to be used in all expect* methods
75+ * @param defaultNoSignalsTimeoutMillis default timeout to be used when no further signals are expected anymore
76+ * @param printlnDebug if true, signals such as OnNext / Request / OnComplete etc will be printed to standard output,
77+ */
78+ public TestEnvironment (long defaultTimeoutMillis , long defaultNoSignalsTimeoutMillis , boolean printlnDebug ) {
79+ this (defaultTimeoutMillis , defaultNoSignalsTimeoutMillis , defaultTimeoutMillis , printlnDebug );
80+ }
81+
82+ /**
83+ * Tests must specify the timeout for expected outcome of asynchronous
84+ * interactions. Longer timeout does not invalidate the correctness of
85+ * the implementation, but can in some cases result in longer time to
86+ * run the tests.
87+ *
88+ * @param defaultTimeoutMillis default timeout to be used in all expect* methods
89+ * @param defaultNoSignalsTimeoutMillis default timeout to be used when no further signals are expected anymore
90+ * @param defaultPollTimeoutMillis default amount of time to poll for events if {@code defaultTimeoutMillis} isn't
91+ * preempted by an asynchronous event.
92+ */
93+ public TestEnvironment (long defaultTimeoutMillis , long defaultNoSignalsTimeoutMillis , long defaultPollTimeoutMillis ) {
94+ this (defaultTimeoutMillis , defaultNoSignalsTimeoutMillis , defaultPollTimeoutMillis , false );
95+ }
96+
6297 /**
6398 * Tests must specify the timeout for expected outcome of asynchronous
6499 * interactions. Longer timeout does not invalidate the correctness of
@@ -69,7 +104,7 @@ public TestEnvironment(long defaultTimeoutMillis, long defaultNoSignalsTimeoutMi
69104 * @param defaultNoSignalsTimeoutMillis default timeout to be used when no further signals are expected anymore
70105 */
71106 public TestEnvironment (long defaultTimeoutMillis , long defaultNoSignalsTimeoutMillis ) {
72- this (defaultTimeoutMillis , defaultNoSignalsTimeoutMillis , false );
107+ this (defaultTimeoutMillis , defaultTimeoutMillis , defaultNoSignalsTimeoutMillis );
73108 }
74109
75110 /**
@@ -81,7 +116,7 @@ public TestEnvironment(long defaultTimeoutMillis, long defaultNoSignalsTimeoutMi
81116 * @param defaultTimeoutMillis default timeout to be used in all expect* methods
82117 */
83118 public TestEnvironment (long defaultTimeoutMillis ) {
84- this (defaultTimeoutMillis , defaultTimeoutMillis , false );
119+ this (defaultTimeoutMillis , defaultTimeoutMillis , defaultTimeoutMillis );
85120 }
86121
87122 /**
@@ -97,7 +132,7 @@ public TestEnvironment(long defaultTimeoutMillis) {
97132 * often helpful to pinpoint simple race conditions etc.
98133 */
99134 public TestEnvironment (boolean printlnDebug ) {
100- this (envDefaultTimeoutMillis (), envDefaultNoSignalsTimeoutMillis (), printlnDebug );
135+ this (envDefaultTimeoutMillis (), envDefaultNoSignalsTimeoutMillis (), envDefaultPollTimeoutMillis (), printlnDebug );
101136 }
102137
103138 /**
@@ -126,6 +161,14 @@ public long defaultNoSignalsTimeoutMillis() {
126161 return defaultNoSignalsTimeoutMillis ;
127162 }
128163
164+ /**
165+ * The default amount of time to poll for events if {@code defaultTimeoutMillis} isn't preempted by an asynchronous
166+ * event.
167+ */
168+ public long defaultPollTimeoutMillis () {
169+ return defaultPollTimeoutMillis ;
170+ }
171+
129172 /**
130173 * Tries to parse the env variable {@code DEFAULT_TIMEOUT_MILLIS} as long and returns the value if present OR its default value.
131174 *
@@ -156,6 +199,21 @@ public static long envDefaultNoSignalsTimeoutMillis() {
156199 }
157200 }
158201
202+ /**
203+ * Tries to parse the env variable {@code DEFAULT_POLL_TIMEOUT_MILLIS_ENV} as long and returns the value if present OR its default value.
204+ *
205+ * @throws java.lang.IllegalArgumentException when unable to parse the env variable
206+ */
207+ public static long envDefaultPollTimeoutMillis () {
208+ final String envMillis = System .getenv (DEFAULT_POLL_TIMEOUT_MILLIS_ENV );
209+ if (envMillis == null ) return envDefaultTimeoutMillis ();
210+ else try {
211+ return Long .parseLong (envMillis );
212+ } catch (NumberFormatException ex ) {
213+ throw new IllegalArgumentException (String .format ("Unable to parse %s env value [%s] as long!" , DEFAULT_POLL_TIMEOUT_MILLIS_ENV , envMillis ), ex );
214+ }
215+ }
216+
159217 /**
160218 * To flop means to "fail asynchronously", either by onErroring or by failing some TCK check triggered asynchronously.
161219 * This method does *NOT* fail the test - it's up to inspections of the error to fail the test if required.
@@ -277,7 +335,7 @@ public Throwable dropAsyncError() {
277335 }
278336
279337 /**
280- * Waits for {@link TestEnvironment#defaultTimeoutMillis ()} and then verifies that no asynchronous errors
338+ * Waits for {@link TestEnvironment#defaultNoSignalsTimeoutMillis ()} and then verifies that no asynchronous errors
281339 * were signalled pior to, or during that time (by calling {@code flop()}).
282340 */
283341 public void verifyNoAsyncErrors () {
@@ -519,42 +577,57 @@ public void expectCompletion(long timeoutMillis, String errorMsg) throws Interru
519577 }
520578
521579 public <E extends Throwable > void expectErrorWithMessage (Class <E > expected , String requiredMessagePart ) throws Exception {
522- expectErrorWithMessage (expected , requiredMessagePart , env .defaultTimeoutMillis ());
580+ expectErrorWithMessage (expected , Collections . singletonList ( requiredMessagePart ) , env .defaultTimeoutMillis (), env . defaultPollTimeoutMillis ());
523581 }
524582 public <E extends Throwable > void expectErrorWithMessage (Class <E > expected , List <String > requiredMessagePartAlternatives ) throws Exception {
525- expectErrorWithMessage (expected , requiredMessagePartAlternatives , env .defaultTimeoutMillis ());
583+ expectErrorWithMessage (expected , requiredMessagePartAlternatives , env .defaultTimeoutMillis (), env . defaultPollTimeoutMillis () );
526584 }
527585
528586 @ SuppressWarnings ("ThrowableResultOfMethodCallIgnored" )
529587 public <E extends Throwable > void expectErrorWithMessage (Class <E > expected , String requiredMessagePart , long timeoutMillis ) throws Exception {
530588 expectErrorWithMessage (expected , Collections .singletonList (requiredMessagePart ), timeoutMillis );
531589 }
590+
532591 public <E extends Throwable > void expectErrorWithMessage (Class <E > expected , List <String > requiredMessagePartAlternatives , long timeoutMillis ) throws Exception {
533- final E err = expectError (expected , timeoutMillis );
592+ expectErrorWithMessage (expected , requiredMessagePartAlternatives , timeoutMillis , timeoutMillis );
593+ }
594+
595+ public <E extends Throwable > void expectErrorWithMessage (Class <E > expected , List <String > requiredMessagePartAlternatives ,
596+ long totalTimeoutMillis , long pollTimeoutMillis ) throws Exception {
597+ final E err = expectError (expected , totalTimeoutMillis , pollTimeoutMillis );
534598 final String message = err .getMessage ();
535-
599+
536600 boolean contains = false ;
537- for (String requiredMessagePart : requiredMessagePartAlternatives )
601+ for (String requiredMessagePart : requiredMessagePartAlternatives )
538602 if (message .contains (requiredMessagePart )) contains = true ; // not short-circuting loop, it is expected to
539603 assertTrue (contains ,
540- String .format ("Got expected exception [%s] but missing message part [%s], was: %s" ,
541- err .getClass (), "anyOf: " + requiredMessagePartAlternatives , err .getMessage ()));
604+ String .format ("Got expected exception [%s] but missing message part [%s], was: %s" ,
605+ err .getClass (), "anyOf: " + requiredMessagePartAlternatives , err .getMessage ()));
542606 }
543607
544608 public <E extends Throwable > E expectError (Class <E > expected ) throws Exception {
545609 return expectError (expected , env .defaultTimeoutMillis ());
546610 }
547611
548612 public <E extends Throwable > E expectError (Class <E > expected , long timeoutMillis ) throws Exception {
549- return expectError (expected , timeoutMillis , String . format ( "Expected onError(%s)" , expected . getName () ));
613+ return expectError (expected , timeoutMillis , env . defaultPollTimeoutMillis ( ));
550614 }
551615
552616 public <E extends Throwable > E expectError (Class <E > expected , String errorMsg ) throws Exception {
553617 return expectError (expected , env .defaultTimeoutMillis (), errorMsg );
554618 }
555619
556620 public <E extends Throwable > E expectError (Class <E > expected , long timeoutMillis , String errorMsg ) throws Exception {
557- return received .expectError (expected , timeoutMillis , errorMsg );
621+ return expectError (expected , timeoutMillis , env .defaultPollTimeoutMillis (), errorMsg );
622+ }
623+
624+ public <E extends Throwable > E expectError (Class <E > expected , long totalTimeoutMillis , long pollTimeoutMillis ) throws Exception {
625+ return expectError (expected , totalTimeoutMillis , pollTimeoutMillis , String .format ("Expected onError(%s)" , expected .getName ()));
626+ }
627+
628+ public <E extends Throwable > E expectError (Class <E > expected , long totalTimeoutMillis , long pollTimeoutMillis ,
629+ String errorMsg ) throws Exception {
630+ return received .expectError (expected , totalTimeoutMillis , pollTimeoutMillis , errorMsg );
558631 }
559632
560633 public void expectNone () throws InterruptedException {
@@ -1025,22 +1098,44 @@ public void expectCompletion(long timeoutMillis, String errorMsg) throws Interru
10251098 } // else, ok
10261099 }
10271100
1028- @ SuppressWarnings ("unchecked" )
1101+ /**
1102+ * @deprecated Deprecated in favor of {@link #expectError(Class, long, long, String)}.
1103+ */
1104+ @ Deprecated
10291105 public <E extends Throwable > E expectError (Class <E > clazz , long timeoutMillis , String errorMsg ) throws Exception {
1030- Thread .sleep (timeoutMillis );
1031-
1032- if (env .asyncErrors .isEmpty ()) {
1033- return env .flopAndFail (String .format ("%s within %d ms" , errorMsg , timeoutMillis ));
1034- } else {
1035- // ok, there was an expected error
1036- Throwable thrown = env .asyncErrors .remove (0 );
1106+ return expectError (clazz , timeoutMillis , timeoutMillis , errorMsg );
1107+ }
10371108
1038- if (clazz .isInstance (thrown )) {
1039- return (E ) thrown ;
1109+ @ SuppressWarnings ("unchecked" )
1110+ final <E extends Throwable > E expectError (Class <E > clazz , final long totalTimeoutMillis ,
1111+ long pollTimeoutMillis ,
1112+ String errorMsg ) throws Exception {
1113+ long totalTimeoutRemainingNs = MILLISECONDS .toNanos (totalTimeoutMillis );
1114+ long timeStampANs = System .nanoTime ();
1115+ long timeStampBNs ;
1116+
1117+ for (;;) {
1118+ Thread .sleep (Math .min (pollTimeoutMillis , NANOSECONDS .toMillis (totalTimeoutRemainingNs )));
1119+
1120+ if (env .asyncErrors .isEmpty ()) {
1121+ timeStampBNs = System .nanoTime ();
1122+ totalTimeoutRemainingNs =- timeStampBNs - timeStampANs ;
1123+ timeStampANs = timeStampBNs ;
1124+
1125+ if (totalTimeoutRemainingNs <= 0 ) {
1126+ return env .flopAndFail (String .format ("%s within %d ms" , errorMsg , totalTimeoutMillis ));
1127+ }
10401128 } else {
1129+ // ok, there was an expected error
1130+ Throwable thrown = env .asyncErrors .remove (0 );
1131+
1132+ if (clazz .isInstance (thrown )) {
1133+ return (E ) thrown ;
1134+ } else {
10411135
1042- return env .flopAndFail (String .format ("%s within %d ms; Got %s but expected %s" ,
1043- errorMsg , timeoutMillis , thrown .getClass ().getCanonicalName (), clazz .getCanonicalName ()));
1136+ return env .flopAndFail (String .format ("%s within %d ms; Got %s but expected %s" ,
1137+ errorMsg , totalTimeoutMillis , thrown .getClass ().getCanonicalName (), clazz .getCanonicalName ()));
1138+ }
10441139 }
10451140 }
10461141 }
0 commit comments