Skip to content

Commit 7cd0902

Browse files
authored
Reduce usage of SAME threadpool name (#106279)
These days in most places where we mention `SAME` we're immediately looking up the corresponding executor, and therefore can just mention `EsExecutors#DIRECT_EXECUTOR_SERVICE` directly instead.
1 parent e0087e9 commit 7cd0902

File tree

16 files changed

+81
-119
lines changed

16 files changed

+81
-119
lines changed

modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4PipeliningIT.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@
4242
import org.elasticsearch.rest.RestStatus;
4343
import org.elasticsearch.rest.action.RestToXContentListener;
4444
import org.elasticsearch.test.ESIntegTestCase;
45-
import org.elasticsearch.threadpool.ThreadPool;
4645
import org.elasticsearch.xcontent.ToXContentObject;
4746

4847
import java.io.IOException;
@@ -243,9 +242,8 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
243242
if (failAfterBytes < 0) {
244243
throw new IllegalArgumentException("[" + FAIL_AFTER_BYTES_PARAM + "] must be present and non-negative");
245244
}
246-
return channel -> client.threadPool()
247-
.executor(randomFrom(ThreadPool.Names.SAME, ThreadPool.Names.GENERIC))
248-
.execute(() -> channel.sendResponse(RestResponse.chunked(RestStatus.OK, new ChunkedRestResponseBody() {
245+
return channel -> randomExecutor(client.threadPool()).execute(
246+
() -> channel.sendResponse(RestResponse.chunked(RestStatus.OK, new ChunkedRestResponseBody() {
249247
int bytesRemaining = failAfterBytes;
250248

251249
@Override
@@ -270,7 +268,8 @@ public ReleasableBytesReference encodeChunk(int sizeHint, Recycler<BytesRef> rec
270268
public String getResponseContentTypeString() {
271269
return RestResponse.TEXT_CONTENT_TYPE;
272270
}
273-
}, null)));
271+
}, null))
272+
);
274273
}
275274
});
276275
}

server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.elasticsearch.common.inject.Inject;
2929
import org.elasticsearch.common.regex.Regex;
3030
import org.elasticsearch.common.util.Maps;
31+
import org.elasticsearch.common.util.concurrent.EsExecutors;
3132
import org.elasticsearch.core.Nullable;
3233
import org.elasticsearch.core.Tuple;
3334
import org.elasticsearch.index.shard.ShardId;
@@ -91,14 +92,8 @@ public TransportFieldCapabilitiesAction(
9192
IndicesService indicesService,
9293
IndexNameExpressionResolver indexNameExpressionResolver
9394
) {
94-
// TODO replace SAME when removing workaround for https://github.com/elastic/elasticsearch/issues/97916
95-
super(
96-
NAME,
97-
transportService,
98-
actionFilters,
99-
FieldCapabilitiesRequest::new,
100-
transportService.getThreadPool().executor(ThreadPool.Names.SAME)
101-
);
95+
// TODO replace DIRECT_EXECUTOR_SERVICE when removing workaround for https://github.com/elastic/elasticsearch/issues/97916
96+
super(NAME, transportService, actionFilters, FieldCapabilitiesRequest::new, EsExecutors.DIRECT_EXECUTOR_SERVICE);
10297
this.threadPool = threadPool;
10398
this.searchCoordinationExecutor = threadPool.executor(ThreadPool.Names.SEARCH_COORDINATION);
10499
this.transportService = transportService;

server/src/test/java/org/elasticsearch/action/search/MultiSearchActionTookTests.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
import org.elasticsearch.cluster.ClusterState;
1616
import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
1717
import org.elasticsearch.cluster.service.ClusterService;
18-
import org.elasticsearch.common.Randomness;
1918
import org.elasticsearch.common.UUIDs;
2019
import org.elasticsearch.common.settings.Settings;
2120
import org.elasticsearch.common.util.concurrent.AtomicArray;
@@ -31,14 +30,12 @@
3130
import org.junit.Before;
3231
import org.junit.BeforeClass;
3332

34-
import java.util.Arrays;
3533
import java.util.Collections;
3634
import java.util.HashSet;
3735
import java.util.IdentityHashMap;
38-
import java.util.List;
3936
import java.util.Queue;
4037
import java.util.Set;
41-
import java.util.concurrent.ExecutorService;
38+
import java.util.concurrent.Executor;
4239
import java.util.concurrent.TimeUnit;
4340
import java.util.concurrent.atomic.AtomicInteger;
4441
import java.util.concurrent.atomic.AtomicLong;
@@ -136,9 +133,7 @@ private TransportMultiSearchAction createTransportMultiSearchAction(boolean cont
136133

137134
final int availableProcessors = Runtime.getRuntime().availableProcessors();
138135
AtomicInteger counter = new AtomicInteger();
139-
final List<String> threadPoolNames = Arrays.asList(ThreadPool.Names.GENERIC, ThreadPool.Names.SAME);
140-
Randomness.shuffle(threadPoolNames);
141-
final ExecutorService commonExecutor = threadPool.executor(threadPoolNames.get(0));
136+
final Executor commonExecutor = randomExecutor(threadPool);
142137
final Set<SearchRequest> requests = Collections.newSetFromMap(Collections.synchronizedMap(new IdentityHashMap<>()));
143138

144139
NodeClient client = new NodeClient(settings, threadPool) {

server/src/test/java/org/elasticsearch/action/search/TransportMultiSearchActionTests.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.elasticsearch.common.Randomness;
2424
import org.elasticsearch.common.UUIDs;
2525
import org.elasticsearch.common.settings.Settings;
26+
import org.elasticsearch.common.util.concurrent.EsExecutors;
2627
import org.elasticsearch.search.SearchResponseUtils;
2728
import org.elasticsearch.tasks.Task;
2829
import org.elasticsearch.telemetry.metric.MeterRegistry;
@@ -37,7 +38,7 @@
3738
import java.util.List;
3839
import java.util.Set;
3940
import java.util.concurrent.ExecutionException;
40-
import java.util.concurrent.ExecutorService;
41+
import java.util.concurrent.Executor;
4142
import java.util.concurrent.atomic.AtomicInteger;
4243
import java.util.concurrent.atomic.AtomicReference;
4344

@@ -143,10 +144,10 @@ public void testBatchExecute() throws ExecutionException, InterruptedException {
143144
AtomicInteger counter = new AtomicInteger();
144145
AtomicReference<AssertionError> errorHolder = new AtomicReference<>();
145146
// randomize whether or not requests are executed asynchronously
146-
final List<String> threadPoolNames = Arrays.asList(ThreadPool.Names.GENERIC, ThreadPool.Names.SAME);
147-
Randomness.shuffle(threadPoolNames);
148-
final ExecutorService commonExecutor = threadPool.executor(threadPoolNames.get(0));
149-
final ExecutorService rarelyExecutor = threadPool.executor(threadPoolNames.get(1));
147+
final List<Executor> executorServices = Arrays.asList(threadPool.generic(), EsExecutors.DIRECT_EXECUTOR_SERVICE);
148+
Randomness.shuffle(executorServices);
149+
final Executor commonExecutor = executorServices.get(0);
150+
final Executor rarelyExecutor = executorServices.get(1);
150151
final Set<SearchRequest> requests = Collections.newSetFromMap(Collections.synchronizedMap(new IdentityHashMap<>()));
151152
NodeClient client = new NodeClient(settings, threadPool) {
152153
@Override
@@ -164,7 +165,7 @@ public void search(final SearchRequest request, final ActionListener<SearchRespo
164165
)
165166
);
166167
}
167-
final ExecutorService executorService = rarely() ? rarelyExecutor : commonExecutor;
168+
final Executor executorService = rarely() ? rarelyExecutor : commonExecutor;
168169
executorService.execute(() -> {
169170
counter.decrementAndGet();
170171
var response = SearchResponseUtils.emptyWithTotalHits(

server/src/test/java/org/elasticsearch/cluster/coordination/JoinValidationServiceTests.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -99,9 +99,7 @@ public TransportVersion getTransportVersion() {
9999
@Override
100100
public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)
101101
throws TransportException {
102-
final var executor = threadPool.executor(
103-
randomFrom(ThreadPool.Names.SAME, ThreadPool.Names.GENERIC, ThreadPool.Names.CLUSTER_COORDINATION)
104-
);
102+
final var executor = randomExecutor(threadPool, ThreadPool.Names.CLUSTER_COORDINATION);
105103
executor.execute(new AbstractRunnable() {
106104
@Override
107105
public void onFailure(Exception e) {

server/src/test/java/org/elasticsearch/cluster/service/MasterServiceTests.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -175,8 +175,7 @@ protected ExecutorService createThreadPoolExecutor() {
175175
masterService.setClusterStatePublisher((clusterStatePublicationEvent, publishListener, ackListener) -> {
176176
clusterStateRef.set(clusterStatePublicationEvent.getNewState());
177177
ClusterServiceUtils.setAllElapsedMillis(clusterStatePublicationEvent);
178-
threadPool.executor(randomFrom(ThreadPool.Names.SAME, ThreadPool.Names.GENERIC))
179-
.execute(() -> publishListener.onResponse(null));
178+
randomExecutor(threadPool).execute(() -> publishListener.onResponse(null));
180179
});
181180
masterService.setClusterStateSupplier(clusterStateRef::get);
182181
masterService.start();

server/src/test/java/org/elasticsearch/index/CompositeIndexEventListenerTests.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import org.elasticsearch.index.shard.IndexShard;
1919
import org.elasticsearch.index.shard.IndexShardTestCase;
2020
import org.elasticsearch.test.MockLogAppender;
21-
import org.elasticsearch.threadpool.ThreadPool;
2221
import org.hamcrest.Matchers;
2322

2423
import java.util.concurrent.TimeUnit;
@@ -60,9 +59,7 @@ public void beforeIndexShardRecovery(
6059
listener.onResponse(null);
6160
} else {
6261
// fails the listener sometimes
63-
shard.getThreadPool()
64-
.executor(randomFrom(ThreadPool.Names.GENERIC, ThreadPool.Names.SAME))
65-
.execute(ActionRunnable.run(listener, this::runStep));
62+
randomExecutor(shard.getThreadPool()).execute(ActionRunnable.run(listener, this::runStep));
6663
}
6764
}
6865

@@ -129,9 +126,7 @@ public void afterIndexShardRecovery(IndexShard indexShard, ActionListener<Void>
129126
listener.onResponse(null);
130127
} else {
131128
// fails the listener sometimes
132-
shard.getThreadPool()
133-
.executor(randomFrom(ThreadPool.Names.GENERIC, ThreadPool.Names.SAME))
134-
.execute(ActionRunnable.run(listener, this::runStep));
129+
randomExecutor(shard.getThreadPool()).execute(ActionRunnable.run(listener, this::runStep));
135130
}
136131
}
137132

server/src/test/java/org/elasticsearch/transport/ClusterConnectionManagerTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -469,7 +469,7 @@ public void testConcurrentConnectsAndDisconnects() throws Exception {
469469

470470
final ConnectionManager.ConnectionValidator validator = (c, p, l) -> {
471471
assertTrue(validatorPermits.tryAcquire());
472-
threadPool.executor(randomFrom(ThreadPool.Names.GENERIC, ThreadPool.Names.SAME)).execute(() -> {
472+
randomExecutor(threadPool).execute(() -> {
473473
try {
474474
l.onResponse(null);
475475
} finally {
@@ -547,7 +547,7 @@ public void testConcurrentConnectsAndCloses() throws Exception {
547547

548548
final ConnectionManager.ConnectionValidator validator = (c, p, l) -> {
549549
assertTrue(validatorPermits.tryAcquire());
550-
threadPool.executor(randomFrom(ThreadPool.Names.GENERIC, ThreadPool.Names.SAME)).execute(() -> {
550+
randomExecutor(threadPool).execute(() -> {
551551
try {
552552
l.onResponse(null);
553553
} finally {

server/src/test/java/org/elasticsearch/transport/TransportActionProxyTests.java

Lines changed: 10 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -266,22 +266,17 @@ public void testSendLocalRequest() throws Exception {
266266
final CountDownLatch latch = new CountDownLatch(2);
267267

268268
final boolean cancellable = randomBoolean();
269-
serviceB.registerRequestHandler(
270-
"internal:test",
271-
threadPool.executor(randomFrom(ThreadPool.Names.SAME, ThreadPool.Names.GENERIC)),
272-
SimpleTestRequest::new,
273-
(request, channel, task) -> {
274-
try {
275-
assertThat(task instanceof CancellableTask, equalTo(cancellable));
276-
assertEquals(request.sourceNode, "TS_A");
277-
final SimpleTestResponse responseB = new SimpleTestResponse("TS_B");
278-
channel.sendResponse(responseB);
279-
response.set(responseB);
280-
} finally {
281-
latch.countDown();
282-
}
269+
serviceB.registerRequestHandler("internal:test", randomExecutor(threadPool), SimpleTestRequest::new, (request, channel, task) -> {
270+
try {
271+
assertThat(task instanceof CancellableTask, equalTo(cancellable));
272+
assertEquals(request.sourceNode, "TS_A");
273+
final SimpleTestResponse responseB = new SimpleTestResponse("TS_B");
274+
channel.sendResponse(responseB);
275+
response.set(responseB);
276+
} finally {
277+
latch.countDown();
283278
}
284-
);
279+
});
285280
TransportActionProxy.registerProxyAction(serviceB, "internal:test", cancellable, SimpleTestResponse::new);
286281
AbstractSimpleTransportTestCase.connectToNode(serviceA, nodeB);
287282

test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@
7373
import org.elasticsearch.common.util.CollectionUtils;
7474
import org.elasticsearch.common.util.Maps;
7575
import org.elasticsearch.common.util.MockBigArrays;
76+
import org.elasticsearch.common.util.concurrent.EsExecutors;
7677
import org.elasticsearch.common.util.concurrent.ThreadContext;
7778
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
7879
import org.elasticsearch.common.xcontent.XContentHelper;
@@ -168,6 +169,7 @@
168169
import java.util.concurrent.CountDownLatch;
169170
import java.util.concurrent.CyclicBarrier;
170171
import java.util.concurrent.ExecutionException;
172+
import java.util.concurrent.Executor;
171173
import java.util.concurrent.ExecutorService;
172174
import java.util.concurrent.Semaphore;
173175
import java.util.concurrent.TimeUnit;
@@ -1196,6 +1198,21 @@ public static String randomDateFormatterPattern() {
11961198
return randomFrom(FormatNames.values()).getName();
11971199
}
11981200

1201+
/**
1202+
* Randomly choose between {@link EsExecutors#DIRECT_EXECUTOR_SERVICE} (which does not fork), {@link ThreadPool#generic}, and one of the
1203+
* other named threadpool executors.
1204+
*/
1205+
public static Executor randomExecutor(ThreadPool threadPool, String... otherExecutorNames) {
1206+
final var choice = between(0, otherExecutorNames.length + 1);
1207+
if (choice < otherExecutorNames.length) {
1208+
return threadPool.executor(otherExecutorNames[choice]);
1209+
} else if (choice == otherExecutorNames.length) {
1210+
return threadPool.generic();
1211+
} else {
1212+
return EsExecutors.DIRECT_EXECUTOR_SERVICE;
1213+
}
1214+
}
1215+
11991216
/**
12001217
* helper to randomly perform on <code>consumer</code> with <code>value</code>
12011218
*/

0 commit comments

Comments
 (0)