Skip to content

Commit 7e471e1

Browse files
committed
Fix QueueResizableOpenSearchThreadPoolExecutorTests
There was a race condition in testResizeQueueDown() where depending on random parameters we could submit up to 1002 tasks into an executor with a queue size of 900. That introduced a race condition where if the tasks didn't execute fast enough then a rejected execution exception could happen and fail the test. Separate, the testResizeQueueSameSize() was not actually resizing the queue to the same size so I fixed that. And finally I refactored the tests to reduce duplication of code and ensure the executor gets shutdown properly even in case of a test failure. Signed-off-by: Andrew Ross <andrross@amazon.com>
1 parent e61b6ab commit 7e471e1

File tree

1 file changed

+38
-116
lines changed

1 file changed

+38
-116
lines changed

server/src/test/java/org/opensearch/common/util/concurrent/QueueResizableOpenSearchThreadPoolExecutorTests.java

Lines changed: 38 additions & 116 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,13 @@
88

99
package org.opensearch.common.util.concurrent;
1010

11+
import org.junit.After;
12+
import org.junit.Before;
1113
import org.opensearch.common.settings.Settings;
1214
import org.opensearch.test.OpenSearchTestCase;
15+
import org.opensearch.threadpool.ThreadPool;
1316

17+
import java.util.Objects;
1418
import java.util.concurrent.TimeUnit;
1519
import java.util.function.Function;
1620

@@ -23,14 +27,17 @@
2327
* based on the time taken for each event.
2428
*/
2529
public class QueueResizableOpenSearchThreadPoolExecutorTests extends OpenSearchTestCase {
26-
public void testResizeQueueSameSize() throws Exception {
27-
ThreadContext context = new ThreadContext(Settings.EMPTY);
28-
ResizableBlockingQueue<Runnable> queue = new ResizableBlockingQueue<>(ConcurrentCollections.<Runnable>newBlockingQueue(), 2000);
30+
private QueueResizableOpenSearchThreadPoolExecutor executor;
31+
private ResizableBlockingQueue<Runnable> queue;
32+
private int measureWindow;
2933

34+
private void createExecutor(int queueSize) {
3035
int threads = randomIntBetween(1, 10);
31-
int measureWindow = randomIntBetween(100, 200);
32-
logger.info("--> auto-queue with a measurement window of {} tasks", measureWindow);
33-
QueueResizableOpenSearchThreadPoolExecutor executor = new QueueResizableOpenSearchThreadPoolExecutor(
36+
measureWindow = randomIntBetween(100, 200);
37+
ThreadContext context = new ThreadContext(Settings.EMPTY);
38+
this.queue = new ResizableBlockingQueue<>(ConcurrentCollections.newBlockingQueue(),
39+
Objects.requireNonNull(queueSize, "All tests must set a queue size"));
40+
this.executor = new QueueResizableOpenSearchThreadPoolExecutor(
3441
"test-threadpool",
3542
threads,
3643
threads,
@@ -44,152 +51,67 @@ public void testResizeQueueSameSize() throws Exception {
4451
);
4552
executor.prestartAllCoreThreads();
4653
logger.info("--> executor: {}", executor);
54+
}
55+
56+
@After
57+
public void stopExecutor() {
58+
ThreadPool.terminate(executor, 10, TimeUnit.SECONDS);
59+
}
60+
61+
public void testResizeQueueSameSize() throws Exception {
62+
createExecutor(2000);
4763

4864
// Execute a task multiple times that takes 1ms
49-
assertThat(executor.resize(1000), equalTo(1000));
65+
assertThat(executor.resize(2000), equalTo(2000));
5066
executeTask(executor, (measureWindow * 5) + 2);
5167

52-
assertBusy(() -> { assertThat(queue.capacity(), lessThanOrEqualTo(1000)); });
53-
executor.shutdown();
54-
executor.awaitTermination(10, TimeUnit.SECONDS);
68+
assertBusy(() -> assertThat(queue.capacity(), lessThanOrEqualTo(2000)));
5569
}
5670

5771
public void testResizeQueueUp() throws Exception {
58-
ThreadContext context = new ThreadContext(Settings.EMPTY);
59-
ResizableBlockingQueue<Runnable> queue = new ResizableBlockingQueue<>(ConcurrentCollections.<Runnable>newBlockingQueue(), 2000);
60-
61-
int threads = randomIntBetween(1, 10);
62-
int measureWindow = randomIntBetween(100, 200);
63-
logger.info("--> auto-queue with a measurement window of {} tasks", measureWindow);
64-
QueueResizableOpenSearchThreadPoolExecutor executor = new QueueResizableOpenSearchThreadPoolExecutor(
65-
"test-threadpool",
66-
threads,
67-
threads,
68-
1000,
69-
TimeUnit.MILLISECONDS,
70-
queue,
71-
fastWrapper(),
72-
OpenSearchExecutors.daemonThreadFactory("queuetest"),
73-
new OpenSearchAbortPolicy(),
74-
context
75-
);
76-
executor.prestartAllCoreThreads();
77-
logger.info("--> executor: {}", executor);
78-
72+
createExecutor(2000);
7973
// Execute a task multiple times that takes 1ms
8074
assertThat(executor.resize(3000), equalTo(3000));
8175
executeTask(executor, (measureWindow * 5) + 2);
8276

83-
assertBusy(() -> { assertThat(queue.capacity(), greaterThanOrEqualTo(2000)); });
84-
executor.shutdown();
85-
executor.awaitTermination(10, TimeUnit.SECONDS);
77+
assertBusy(() -> assertThat(queue.capacity(), greaterThanOrEqualTo(2000)));
8678
}
8779

8880
public void testResizeQueueDown() throws Exception {
89-
ThreadContext context = new ThreadContext(Settings.EMPTY);
90-
ResizableBlockingQueue<Runnable> queue = new ResizableBlockingQueue<>(ConcurrentCollections.<Runnable>newBlockingQueue(), 2000);
91-
92-
int threads = randomIntBetween(1, 10);
93-
int measureWindow = randomIntBetween(100, 200);
94-
logger.info("--> auto-queue with a measurement window of {} tasks", measureWindow);
95-
QueueResizableOpenSearchThreadPoolExecutor executor = new QueueResizableOpenSearchThreadPoolExecutor(
96-
"test-threadpool",
97-
threads,
98-
threads,
99-
1000,
100-
TimeUnit.MILLISECONDS,
101-
queue,
102-
fastWrapper(),
103-
OpenSearchExecutors.daemonThreadFactory("queuetest"),
104-
new OpenSearchAbortPolicy(),
105-
context
106-
);
107-
executor.prestartAllCoreThreads();
108-
logger.info("--> executor: {}", executor);
109-
81+
createExecutor(2000);
11082
// Execute a task multiple times that takes 1ms
111-
assertThat(executor.resize(900), equalTo(900));
83+
assertThat(executor.resize(1500), equalTo(1500));
11284
executeTask(executor, (measureWindow * 5) + 2);
11385

114-
assertBusy(() -> { assertThat(queue.capacity(), lessThanOrEqualTo(900)); });
115-
executor.shutdown();
116-
executor.awaitTermination(10, TimeUnit.SECONDS);
86+
assertBusy(() -> assertThat(queue.capacity(), lessThanOrEqualTo(1500)));
11787
}
11888

11989
public void testExecutionEWMACalculation() throws Exception {
120-
ThreadContext context = new ThreadContext(Settings.EMPTY);
121-
ResizableBlockingQueue<Runnable> queue = new ResizableBlockingQueue<>(ConcurrentCollections.<Runnable>newBlockingQueue(), 100);
122-
123-
QueueResizableOpenSearchThreadPoolExecutor executor = new QueueResizableOpenSearchThreadPoolExecutor(
124-
"test-threadpool",
125-
1,
126-
1,
127-
1000,
128-
TimeUnit.MILLISECONDS,
129-
queue,
130-
fastWrapper(),
131-
OpenSearchExecutors.daemonThreadFactory("queuetest"),
132-
new OpenSearchAbortPolicy(),
133-
context
134-
);
135-
executor.prestartAllCoreThreads();
136-
logger.info("--> executor: {}", executor);
137-
90+
createExecutor(100);
13891
assertThat((long) executor.getTaskExecutionEWMA(), equalTo(0L));
13992
executeTask(executor, 1);
140-
assertBusy(() -> { assertThat((long) executor.getTaskExecutionEWMA(), equalTo(30L)); });
93+
assertBusy(() -> assertThat((long) executor.getTaskExecutionEWMA(), equalTo(30L)));
14194
executeTask(executor, 1);
142-
assertBusy(() -> { assertThat((long) executor.getTaskExecutionEWMA(), equalTo(51L)); });
95+
assertBusy(() -> assertThat((long) executor.getTaskExecutionEWMA(), equalTo(51L)));
14396
executeTask(executor, 1);
144-
assertBusy(() -> { assertThat((long) executor.getTaskExecutionEWMA(), equalTo(65L)); });
97+
assertBusy(() -> assertThat((long) executor.getTaskExecutionEWMA(), equalTo(65L)));
14598
executeTask(executor, 1);
146-
assertBusy(() -> { assertThat((long) executor.getTaskExecutionEWMA(), equalTo(75L)); });
99+
assertBusy(() -> assertThat((long) executor.getTaskExecutionEWMA(), equalTo(75L)));
147100
executeTask(executor, 1);
148-
assertBusy(() -> { assertThat((long) executor.getTaskExecutionEWMA(), equalTo(83L)); });
149-
150-
executor.shutdown();
151-
executor.awaitTermination(10, TimeUnit.SECONDS);
101+
assertBusy(() -> assertThat((long) executor.getTaskExecutionEWMA(), equalTo(83L)));
152102
}
153103

154104
/** Use a runnable wrapper that simulates a task with unknown failures. */
155-
public void testExceptionThrowingTask() throws Exception {
156-
ThreadContext context = new ThreadContext(Settings.EMPTY);
157-
ResizableBlockingQueue<Runnable> queue = new ResizableBlockingQueue<>(ConcurrentCollections.<Runnable>newBlockingQueue(), 100);
158-
159-
QueueResizableOpenSearchThreadPoolExecutor executor = new QueueResizableOpenSearchThreadPoolExecutor(
160-
"test-threadpool",
161-
1,
162-
1,
163-
1000,
164-
TimeUnit.MILLISECONDS,
165-
queue,
166-
exceptionalWrapper(),
167-
OpenSearchExecutors.daemonThreadFactory("queuetest"),
168-
new OpenSearchAbortPolicy(),
169-
context
170-
);
171-
executor.prestartAllCoreThreads();
172-
logger.info("--> executor: {}", executor);
173-
105+
public void testExceptionThrowingTask() {
106+
createExecutor(100);
174107
assertThat((long) executor.getTaskExecutionEWMA(), equalTo(0L));
175108
executeTask(executor, 1);
176-
executor.shutdown();
177-
executor.awaitTermination(10, TimeUnit.SECONDS);
178109
}
179110

180111
private Function<Runnable, WrappedRunnable> fastWrapper() {
181112
return (runnable) -> new SettableTimedRunnable(TimeUnit.NANOSECONDS.toNanos(100), false);
182113
}
183114

184-
/**
185-
* The returned function outputs a WrappedRunnabled that simulates the case
186-
* where {@link TimedRunnable#getTotalExecutionNanos()} returns -1 because
187-
* the job failed or was rejected before it finished.
188-
*/
189-
private Function<Runnable, WrappedRunnable> exceptionalWrapper() {
190-
return (runnable) -> new SettableTimedRunnable(TimeUnit.NANOSECONDS.toNanos(-1), true);
191-
}
192-
193115
/** Execute a blank task {@code times} times for the executor */
194116
private void executeTask(QueueResizableOpenSearchThreadPoolExecutor executor, int times) {
195117
logger.info("--> executing a task [{}] times", times);
@@ -198,7 +120,7 @@ private void executeTask(QueueResizableOpenSearchThreadPoolExecutor executor, in
198120
}
199121
}
200122

201-
public class SettableTimedRunnable extends TimedRunnable {
123+
private static class SettableTimedRunnable extends TimedRunnable {
202124
private final long timeTaken;
203125
private final boolean testFailedOrRejected;
204126

0 commit comments

Comments
 (0)