Skip to content

Commit a6990d9

Browse files
committed
Fix QueueResizableOpenSearchThreadPoolExecutorTests
There was a race condition in testResizeQueueDown() where depending on random parameters we could submit up to 1002 tasks into an executor with a queue size of 900. That introduced a race condition where if the tasks didn't execute fast enough then a rejected execution exception could happen and fail the test. And finally I refactored the tests to reduce duplication of code and ensure the executor gets shutdown properly even in case of a test failure. Signed-off-by: Andrew Ross <andrross@amazon.com>
1 parent d907511 commit a6990d9

File tree

1 file changed

+38
-115
lines changed

1 file changed

+38
-115
lines changed

server/src/test/java/org/opensearch/common/util/concurrent/QueueResizableOpenSearchThreadPoolExecutorTests.java

Lines changed: 38 additions & 115 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,10 @@
1010

1111
import org.opensearch.common.settings.Settings;
1212
import org.opensearch.test.OpenSearchTestCase;
13+
import org.opensearch.threadpool.ThreadPool;
14+
import org.junit.After;
1315

16+
import java.util.Objects;
1417
import java.util.concurrent.TimeUnit;
1518
import java.util.function.Function;
1619

@@ -23,14 +26,19 @@
2326
* based on the time taken for each event.
2427
*/
2528
public class QueueResizableOpenSearchThreadPoolExecutorTests extends OpenSearchTestCase {
26-
public void testResizeQueueSameSize() throws Exception {
27-
ThreadContext context = new ThreadContext(Settings.EMPTY);
28-
ResizableBlockingQueue<Runnable> queue = new ResizableBlockingQueue<>(ConcurrentCollections.<Runnable>newBlockingQueue(), 2000);
29+
private QueueResizableOpenSearchThreadPoolExecutor executor;
30+
private ResizableBlockingQueue<Runnable> queue;
31+
private int measureWindow;
2932

33+
private void createExecutor(int queueSize) {
3034
int threads = randomIntBetween(1, 10);
31-
int measureWindow = randomIntBetween(100, 200);
32-
logger.info("--> auto-queue with a measurement window of {} tasks", measureWindow);
33-
QueueResizableOpenSearchThreadPoolExecutor executor = new QueueResizableOpenSearchThreadPoolExecutor(
35+
measureWindow = randomIntBetween(100, 200);
36+
ThreadContext context = new ThreadContext(Settings.EMPTY);
37+
this.queue = new ResizableBlockingQueue<>(
38+
ConcurrentCollections.newBlockingQueue(),
39+
Objects.requireNonNull(queueSize, "All tests must set a queue size")
40+
);
41+
this.executor = new QueueResizableOpenSearchThreadPoolExecutor(
3442
"test-threadpool",
3543
threads,
3644
threads,
@@ -44,152 +52,67 @@ public void testResizeQueueSameSize() throws Exception {
4452
);
4553
executor.prestartAllCoreThreads();
4654
logger.info("--> executor: {}", executor);
55+
}
56+
57+
@After
58+
public void stopExecutor() {
59+
ThreadPool.terminate(executor, 10, TimeUnit.SECONDS);
60+
}
61+
62+
public void testResizeQueueSameSize() throws Exception {
63+
createExecutor(2000);
4764

4865
// Execute a task multiple times that takes 1ms
4966
assertThat(executor.resize(1000), equalTo(1000));
5067
executeTask(executor, (measureWindow * 5) + 2);
5168

52-
assertBusy(() -> { assertThat(queue.capacity(), lessThanOrEqualTo(1000)); });
53-
executor.shutdown();
54-
executor.awaitTermination(10, TimeUnit.SECONDS);
69+
assertBusy(() -> assertThat(queue.capacity(), lessThanOrEqualTo(1000)));
5570
}
5671

5772
public void testResizeQueueUp() throws Exception {
58-
ThreadContext context = new ThreadContext(Settings.EMPTY);
59-
ResizableBlockingQueue<Runnable> queue = new ResizableBlockingQueue<>(ConcurrentCollections.<Runnable>newBlockingQueue(), 2000);
60-
61-
int threads = randomIntBetween(1, 10);
62-
int measureWindow = randomIntBetween(100, 200);
63-
logger.info("--> auto-queue with a measurement window of {} tasks", measureWindow);
64-
QueueResizableOpenSearchThreadPoolExecutor executor = new QueueResizableOpenSearchThreadPoolExecutor(
65-
"test-threadpool",
66-
threads,
67-
threads,
68-
1000,
69-
TimeUnit.MILLISECONDS,
70-
queue,
71-
fastWrapper(),
72-
OpenSearchExecutors.daemonThreadFactory("queuetest"),
73-
new OpenSearchAbortPolicy(),
74-
context
75-
);
76-
executor.prestartAllCoreThreads();
77-
logger.info("--> executor: {}", executor);
78-
73+
createExecutor(2000);
7974
// Execute a task multiple times that takes 1ms
8075
assertThat(executor.resize(3000), equalTo(3000));
8176
executeTask(executor, (measureWindow * 5) + 2);
8277

83-
assertBusy(() -> { assertThat(queue.capacity(), greaterThanOrEqualTo(2000)); });
84-
executor.shutdown();
85-
executor.awaitTermination(10, TimeUnit.SECONDS);
78+
assertBusy(() -> assertThat(queue.capacity(), greaterThanOrEqualTo(2000)));
8679
}
8780

8881
public void testResizeQueueDown() throws Exception {
89-
ThreadContext context = new ThreadContext(Settings.EMPTY);
90-
ResizableBlockingQueue<Runnable> queue = new ResizableBlockingQueue<>(ConcurrentCollections.<Runnable>newBlockingQueue(), 2000);
91-
92-
int threads = randomIntBetween(1, 10);
93-
int measureWindow = randomIntBetween(100, 200);
94-
logger.info("--> auto-queue with a measurement window of {} tasks", measureWindow);
95-
QueueResizableOpenSearchThreadPoolExecutor executor = new QueueResizableOpenSearchThreadPoolExecutor(
96-
"test-threadpool",
97-
threads,
98-
threads,
99-
1000,
100-
TimeUnit.MILLISECONDS,
101-
queue,
102-
fastWrapper(),
103-
OpenSearchExecutors.daemonThreadFactory("queuetest"),
104-
new OpenSearchAbortPolicy(),
105-
context
106-
);
107-
executor.prestartAllCoreThreads();
108-
logger.info("--> executor: {}", executor);
109-
82+
createExecutor(2000);
11083
// Execute a task multiple times that takes 1ms
111-
assertThat(executor.resize(900), equalTo(900));
84+
assertThat(executor.resize(1500), equalTo(1500));
11285
executeTask(executor, (measureWindow * 5) + 2);
11386

114-
assertBusy(() -> { assertThat(queue.capacity(), lessThanOrEqualTo(900)); });
115-
executor.shutdown();
116-
executor.awaitTermination(10, TimeUnit.SECONDS);
87+
assertBusy(() -> assertThat(queue.capacity(), lessThanOrEqualTo(1500)));
11788
}
11889

11990
public void testExecutionEWMACalculation() throws Exception {
120-
ThreadContext context = new ThreadContext(Settings.EMPTY);
121-
ResizableBlockingQueue<Runnable> queue = new ResizableBlockingQueue<>(ConcurrentCollections.<Runnable>newBlockingQueue(), 100);
122-
123-
QueueResizableOpenSearchThreadPoolExecutor executor = new QueueResizableOpenSearchThreadPoolExecutor(
124-
"test-threadpool",
125-
1,
126-
1,
127-
1000,
128-
TimeUnit.MILLISECONDS,
129-
queue,
130-
fastWrapper(),
131-
OpenSearchExecutors.daemonThreadFactory("queuetest"),
132-
new OpenSearchAbortPolicy(),
133-
context
134-
);
135-
executor.prestartAllCoreThreads();
136-
logger.info("--> executor: {}", executor);
137-
91+
createExecutor(100);
13892
assertThat((long) executor.getTaskExecutionEWMA(), equalTo(0L));
13993
executeTask(executor, 1);
140-
assertBusy(() -> { assertThat((long) executor.getTaskExecutionEWMA(), equalTo(30L)); });
94+
assertBusy(() -> assertThat((long) executor.getTaskExecutionEWMA(), equalTo(30L)));
14195
executeTask(executor, 1);
142-
assertBusy(() -> { assertThat((long) executor.getTaskExecutionEWMA(), equalTo(51L)); });
96+
assertBusy(() -> assertThat((long) executor.getTaskExecutionEWMA(), equalTo(51L)));
14397
executeTask(executor, 1);
144-
assertBusy(() -> { assertThat((long) executor.getTaskExecutionEWMA(), equalTo(65L)); });
98+
assertBusy(() -> assertThat((long) executor.getTaskExecutionEWMA(), equalTo(65L)));
14599
executeTask(executor, 1);
146-
assertBusy(() -> { assertThat((long) executor.getTaskExecutionEWMA(), equalTo(75L)); });
100+
assertBusy(() -> assertThat((long) executor.getTaskExecutionEWMA(), equalTo(75L)));
147101
executeTask(executor, 1);
148-
assertBusy(() -> { assertThat((long) executor.getTaskExecutionEWMA(), equalTo(83L)); });
149-
150-
executor.shutdown();
151-
executor.awaitTermination(10, TimeUnit.SECONDS);
102+
assertBusy(() -> assertThat((long) executor.getTaskExecutionEWMA(), equalTo(83L)));
152103
}
153104

154105
/** Use a runnable wrapper that simulates a task with unknown failures. */
155-
public void testExceptionThrowingTask() throws Exception {
156-
ThreadContext context = new ThreadContext(Settings.EMPTY);
157-
ResizableBlockingQueue<Runnable> queue = new ResizableBlockingQueue<>(ConcurrentCollections.<Runnable>newBlockingQueue(), 100);
158-
159-
QueueResizableOpenSearchThreadPoolExecutor executor = new QueueResizableOpenSearchThreadPoolExecutor(
160-
"test-threadpool",
161-
1,
162-
1,
163-
1000,
164-
TimeUnit.MILLISECONDS,
165-
queue,
166-
exceptionalWrapper(),
167-
OpenSearchExecutors.daemonThreadFactory("queuetest"),
168-
new OpenSearchAbortPolicy(),
169-
context
170-
);
171-
executor.prestartAllCoreThreads();
172-
logger.info("--> executor: {}", executor);
173-
106+
public void testExceptionThrowingTask() {
107+
createExecutor(100);
174108
assertThat((long) executor.getTaskExecutionEWMA(), equalTo(0L));
175109
executeTask(executor, 1);
176-
executor.shutdown();
177-
executor.awaitTermination(10, TimeUnit.SECONDS);
178110
}
179111

180112
private Function<Runnable, WrappedRunnable> fastWrapper() {
181113
return (runnable) -> new SettableTimedRunnable(TimeUnit.NANOSECONDS.toNanos(100), false);
182114
}
183115

184-
/**
185-
* The returned function outputs a WrappedRunnabled that simulates the case
186-
* where {@link TimedRunnable#getTotalExecutionNanos()} returns -1 because
187-
* the job failed or was rejected before it finished.
188-
*/
189-
private Function<Runnable, WrappedRunnable> exceptionalWrapper() {
190-
return (runnable) -> new SettableTimedRunnable(TimeUnit.NANOSECONDS.toNanos(-1), true);
191-
}
192-
193116
/** Execute a blank task {@code times} times for the executor */
194117
private void executeTask(QueueResizableOpenSearchThreadPoolExecutor executor, int times) {
195118
logger.info("--> executing a task [{}] times", times);
@@ -198,7 +121,7 @@ private void executeTask(QueueResizableOpenSearchThreadPoolExecutor executor, in
198121
}
199122
}
200123

201-
public class SettableTimedRunnable extends TimedRunnable {
124+
private static class SettableTimedRunnable extends TimedRunnable {
202125
private final long timeTaken;
203126
private final boolean testFailedOrRejected;
204127

0 commit comments

Comments
 (0)