Skip to content

Commit 0321e9d

Browse files
committed
Attempt to fix ThreadLeakControl issues in S3BlobStoreRepositoryTests
Similar to opensearch-project#18201 but applied via the TestPlugin used in S3BlobStoreRepositoryTests. Closes opensearch-project#14299 Signed-off-by: David Causse <dcausse@wikimedia.org>
1 parent 5baf5d8 commit 0321e9d

File tree

4 files changed

+32
-7
lines changed

4 files changed

+32
-7
lines changed

plugins/repository-s3/src/internalClusterTest/java/org/opensearch/repositories/s3/S3BlobStoreRepositoryTests.java

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
import com.sun.net.httpserver.Headers;
3535
import com.sun.net.httpserver.HttpExchange;
3636
import com.sun.net.httpserver.HttpHandler;
37-
37+
import fixture.s3.S3HttpHandler;
3838
import software.amazon.awssdk.core.internal.http.pipeline.stages.ApplyTransactionIdStage;
3939

4040
import org.opensearch.action.admin.indices.forcemerge.ForceMergeResponse;
@@ -74,14 +74,15 @@
7474
import java.util.HashMap;
7575
import java.util.List;
7676
import java.util.Map;
77+
import java.util.concurrent.Executors;
78+
import java.util.concurrent.TimeUnit;
79+
import java.util.stream.Stream;
7780
import java.util.stream.StreamSupport;
7881

79-
import fixture.s3.S3HttpHandler;
80-
81-
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
82-
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
8382
import static org.hamcrest.Matchers.containsString;
8483
import static org.hamcrest.Matchers.equalTo;
84+
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
85+
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
8586

8687
@SuppressForbidden(reason = "this test uses a HttpServer to emulate an S3 endpoint")
8788
// Need to set up a new cluster for each test because cluster settings use randomized authentication settings
@@ -234,9 +235,13 @@ public void testRequestStats() throws Exception {
234235
* S3RepositoryPlugin that allows to disable chunked encoding and to set a low threshold between single upload and multipart upload.
235236
*/
236237
public static class TestS3RepositoryPlugin extends S3RepositoryPlugin {
237-
238238
public TestS3RepositoryPlugin(final Settings settings, final Path configPath) {
239-
super(settings, configPath);
239+
super(
240+
settings,
241+
configPath,
242+
new S3Service(configPath, Executors.newSingleThreadScheduledExecutor()),
243+
new S3AsyncService(configPath, Executors.newSingleThreadScheduledExecutor())
244+
);
240245
}
241246

242247
@Override
@@ -246,6 +251,13 @@ public List<Setting<?>> getSettings() {
246251
return settings;
247252
}
248253

254+
@Override
255+
public void close() throws IOException {
256+
super.close();
257+
Stream.of(service.getClientExecutorService(), s3AsyncService.getClientExecutorService())
258+
.forEach(e -> assertTrue(ThreadPool.terminate(e, 5, TimeUnit.SECONDS)));
259+
}
260+
249261
@Override
250262
protected S3Repository createRepository(
251263
RepositoryMetadata metadata,

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3AsyncService.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -455,4 +455,9 @@ public void close() {
455455
releaseCachedClients();
456456

457457
}
458+
459+
@Nullable
460+
ScheduledExecutorService getClientExecutorService() {
461+
return clientExecutorService;
462+
}
458463
}

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RepositoryPlugin.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@
7575
import java.util.Map;
7676
import java.util.Objects;
7777
import java.util.concurrent.ExecutorService;
78+
import java.util.concurrent.ScheduledExecutorService;
7879
import java.util.concurrent.TimeUnit;
7980
import java.util.function.Supplier;
8081

@@ -106,6 +107,7 @@ public class S3RepositoryPlugin extends Plugin implements RepositoryPlugin, Relo
106107
protected SizeBasedBlockingQ lowPrioritySizeBasedBlockingQ;
107108
protected TransferSemaphoresHolder transferSemaphoresHolder;
108109
protected GenericStatsMetricPublisher genericStatsMetricPublisher;
110+
private ScheduledExecutorService scheduler;
109111

110112
public S3RepositoryPlugin(final Settings settings, final Path configPath) {
111113
this(settings, configPath, new S3Service(configPath), new S3AsyncService(configPath));
@@ -234,6 +236,7 @@ public Collection<Object> createComponents(
234236

235237
this.lowTransferQConsumerService = threadPool.executor(LOW_TRANSFER_QUEUE_CONSUMER);
236238
this.normalTransferQConsumerService = threadPool.executor(NORMAL_TRANSFER_QUEUE_CONSUMER);
239+
this.scheduler = threadPool.scheduler();
237240

238241
// High number of permit allocation because each op acquiring permit performs disk IO, computation and network IO.
239242
int availablePermits = Math.max(allocatedProcessors(clusterService.getSettings()) * 4, 10);

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Service.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -527,4 +527,9 @@ public AwsCredentials resolveCredentials() {
527527
public void close() {
528528
releaseCachedClients();
529529
}
530+
531+
@Nullable
532+
ScheduledExecutorService getClientExecutorService() {
533+
return clientExecutorService;
534+
}
530535
}

0 commit comments

Comments
 (0)