Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions clients/src/test/java/org/apache/kafka/test/TestUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ public static void waitForCondition(final TestCondition testCondition, final lon
*/
public static void retryOnExceptionWithTimeout(final long timeoutMs,
final ValuelessCallable runnable) throws InterruptedException {
retryOnExceptionWithTimeout(DEFAULT_POLL_INTERVAL_MS, timeoutMs, runnable);
retryOnExceptionWithTimeout(timeoutMs, DEFAULT_POLL_INTERVAL_MS, runnable);
}

/**
Expand All @@ -425,21 +425,21 @@ public static void retryOnExceptionWithTimeout(final long timeoutMs,
* @throws InterruptedException if the current thread is interrupted while waiting for {@code runnable} to complete successfully.
*/
public static void retryOnExceptionWithTimeout(final ValuelessCallable runnable) throws InterruptedException {
retryOnExceptionWithTimeout(DEFAULT_POLL_INTERVAL_MS, DEFAULT_MAX_WAIT_MS, runnable);
retryOnExceptionWithTimeout(DEFAULT_MAX_WAIT_MS, DEFAULT_POLL_INTERVAL_MS, runnable);
}

/**
* Wait for the given runnable to complete successfully, i.e. throw now {@link Exception}s or
* {@link AssertionError}s, or for the given timeout to expire. If the timeout expires then the
* last exception or assertion failure will be thrown thus providing context for the failure.
*
* @param pollIntervalMs the interval in milliseconds to wait between invoking {@code runnable}.
* @param timeoutMs the total time in milliseconds to wait for {@code runnable} to complete successfully.
* @param pollIntervalMs the interval in milliseconds to wait between invoking {@code runnable}.
* @param runnable the code to attempt to execute successfully.
* @throws InterruptedException if the current thread is interrupted while waiting for {@code runnable} to complete successfully.
*/
public static void retryOnExceptionWithTimeout(final long pollIntervalMs,
final long timeoutMs,
public static void retryOnExceptionWithTimeout(final long timeoutMs,
final long pollIntervalMs,
final ValuelessCallable runnable) throws InterruptedException {
final long expectedEnd = System.currentTimeMillis() + timeoutMs;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ public void shouldApplyUpdatesToStandbyStore() throws Exception {
}

final ReadOnlyKeyValueStore<Integer, Integer> newActiveStore = kafkaStreams1WasFirstActive ? store2 : store1;
TestUtils.retryOnExceptionWithTimeout(100, 60 * 1000, () -> {
TestUtils.retryOnExceptionWithTimeout(60 * 1000, 100, () -> {
// Assert that after failover we have recovered to the last store write
assertThat(newActiveStore.get(key), is(equalTo(batch1NumMessages - 1)));
});
Expand All @@ -146,7 +146,7 @@ public void shouldApplyUpdatesToStandbyStore() throws Exception {
// Assert that all messages in the second batch were processed in a timely manner
assertThat(semaphore.tryAcquire(batch2NumMessages, 60, TimeUnit.SECONDS), is(equalTo(true)));

TestUtils.retryOnExceptionWithTimeout(100, 60 * 1000, () -> {
TestUtils.retryOnExceptionWithTimeout(60 * 1000, 100, () -> {
// Assert that the current value in store reflects all messages being processed
assertThat(newActiveStore.get(key), is(equalTo(totalNumMessages - 1)));
});
Expand Down