Skip to content

Commit e472754

Browse files
committed
Attempt to fix ThreadLeakControl issues in S3BlobStoreRepositoryTests
Similar to opensearch-project#18201 but applied via the TestPlugin used in S3BlobStoreRepositoryTests. Closes opensearch-project#14299 Signed-off-by: David Causse <dcausse@wikimedia.org>
1 parent 5baf5d8 commit e472754

File tree

4 files changed

+29
-2
lines changed

4 files changed

+29
-2
lines changed

plugins/repository-s3/src/internalClusterTest/java/org/opensearch/repositories/s3/S3BlobStoreRepositoryTests.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,9 @@
7474
import java.util.HashMap;
7575
import java.util.List;
7676
import java.util.Map;
77+
import java.util.concurrent.Executors;
78+
import java.util.concurrent.TimeUnit;
79+
import java.util.stream.Stream;
7780
import java.util.stream.StreamSupport;
7881

7982
import fixture.s3.S3HttpHandler;
@@ -234,9 +237,13 @@ public void testRequestStats() throws Exception {
234237
* S3RepositoryPlugin that allows to disable chunked encoding and to set a low threshold between single upload and multipart upload.
235238
*/
236239
public static class TestS3RepositoryPlugin extends S3RepositoryPlugin {
237-
238240
public TestS3RepositoryPlugin(final Settings settings, final Path configPath) {
239-
super(settings, configPath);
241+
super(
242+
settings,
243+
configPath,
244+
new S3Service(configPath, Executors.newSingleThreadScheduledExecutor()),
245+
new S3AsyncService(configPath, Executors.newSingleThreadScheduledExecutor())
246+
);
240247
}
241248

242249
@Override
@@ -246,6 +253,13 @@ public List<Setting<?>> getSettings() {
246253
return settings;
247254
}
248255

256+
@Override
257+
public void close() throws IOException {
258+
super.close();
259+
Stream.of(service.getClientExecutorService(), s3AsyncService.getClientExecutorService())
260+
.forEach(e -> assertTrue(ThreadPool.terminate(e, 5, TimeUnit.SECONDS)));
261+
}
262+
249263
@Override
250264
protected S3Repository createRepository(
251265
RepositoryMetadata metadata,

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3AsyncService.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -455,4 +455,9 @@ public void close() {
455455
releaseCachedClients();
456456

457457
}
458+
459+
@Nullable
460+
ScheduledExecutorService getClientExecutorService() {
461+
return clientExecutorService;
462+
}
458463
}

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RepositoryPlugin.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@
7575
import java.util.Map;
7676
import java.util.Objects;
7777
import java.util.concurrent.ExecutorService;
78+
import java.util.concurrent.ScheduledExecutorService;
7879
import java.util.concurrent.TimeUnit;
7980
import java.util.function.Supplier;
8081

@@ -106,6 +107,7 @@ public class S3RepositoryPlugin extends Plugin implements RepositoryPlugin, Relo
106107
protected SizeBasedBlockingQ lowPrioritySizeBasedBlockingQ;
107108
protected TransferSemaphoresHolder transferSemaphoresHolder;
108109
protected GenericStatsMetricPublisher genericStatsMetricPublisher;
110+
private ScheduledExecutorService scheduler;
109111

110112
public S3RepositoryPlugin(final Settings settings, final Path configPath) {
111113
this(settings, configPath, new S3Service(configPath), new S3AsyncService(configPath));
@@ -234,6 +236,7 @@ public Collection<Object> createComponents(
234236

235237
this.lowTransferQConsumerService = threadPool.executor(LOW_TRANSFER_QUEUE_CONSUMER);
236238
this.normalTransferQConsumerService = threadPool.executor(NORMAL_TRANSFER_QUEUE_CONSUMER);
239+
this.scheduler = threadPool.scheduler();
237240

238241
// High number of permit allocation because each op acquiring permit performs disk IO, computation and network IO.
239242
int availablePermits = Math.max(allocatedProcessors(clusterService.getSettings()) * 4, 10);

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Service.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -527,4 +527,9 @@ public AwsCredentials resolveCredentials() {
527527
public void close() {
528528
releaseCachedClients();
529529
}
530+
531+
@Nullable
532+
ScheduledExecutorService getClientExecutorService() {
533+
return clientExecutorService;
534+
}
530535
}

0 commit comments

Comments
 (0)