- 
                Notifications
    
You must be signed in to change notification settings  - Fork 951
 
Closed
Labels
bugThis issue is a bug.This issue is a bug.closed-for-stalenessp2This is a standard priority issueThis is a standard priority issueresponse-requestedWaiting on additional info and feedback. Will move to "closing-soon" in 10 days.Waiting on additional info and feedback. Will move to "closing-soon" in 10 days.
Description
Describe the bug
Using the S3Client in a multithreaded spring boot app when the context gets shutdown the S3Client doesn't shutdown gracefully.
Expected Behavior
The S3Client finish sending all the pending request.
Current Behavior
An exception is thrown:
Exception in thread "pool-1-thread-2" Exception in thread "pool-1-thread-3" Exception in thread "pool-1-thread-1" java.lang.RuntimeException: java.lang.InterruptedException
	at software.amazon.awssdk.http.crt.AwsCrtHttpClient$CrtHttpRequest.call(AwsCrtHttpClient.java:132)
	at software.amazon.awssdk.http.crt.AwsCrtHttpClient$CrtHttpRequest.call(AwsCrtHttpClient.java:104)
	at software.amazon.awssdk.core.internal.util.MetricUtils.measureDurationUnsafe(MetricUtils.java:99)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeHttpRequestStage.executeHttpRequest(MakeHttpRequestStage.java:79)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeHttpRequestStage.execute(MakeHttpRequestStage.java:57)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeHttpRequestStage.execute(MakeHttpRequestStage.java:40)
	at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
	at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
	at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
	at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:72)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:42)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:78)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:40)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute(ApiCallAttemptMetricCollectionStage.java:55)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute(ApiCallAttemptMetricCollectionStage.java:39)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage.execute(RetryableStage.java:81)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage.execute(RetryableStage.java:36)
	at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
	at software.amazon.awssdk.core.internal.http.StreamManagingStage.execute(StreamManagingStage.java:56)
	at software.amazon.awssdk.core.internal.http.StreamManagingStage.execute(StreamManagingStage.java:36)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.executeWithTimer(ApiCallTimeoutTrackingStage.java:80)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.execute(ApiCallTimeoutTrackingStage.java:60)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.execute(ApiCallTimeoutTrackingStage.java:42)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallMetricCollectionStage.execute(ApiCallMetricCollectionStage.java:50)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallMetricCollectionStage.execute(ApiCallMetricCollectionStage.java:32)
	at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
	at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
Reproduction Steps
Execute this code in a Spring Boot Basic App
@Component
public class Producer implements Closeable {
    private static Logger LOG = LoggerFactory
            .getLogger(Producer .class);
    private final S3Client s3Client;
    private volatile Set<Integer> queued = ConcurrentHashMap.newKeySet();
    private volatile boolean running = true;
    private final ThreadPoolExecutor s3SenderThreadPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(4);
    private final Thread processor = new Thread(this::run,
            "s3sink-processor");
    private SdkHttpClient httpClient;
    public Producer(@Value("${key}") String key,
                    @Value("${secret}") String secret) {
        this.s3Client = makeS3Client(key, secret);
    }
    @PostConstruct
    void start() {
        processor.start();
    }
    public void run() {
        LOG.info("start producing ");
        int i = 0;
        while (running) {
            final int fileCount = i;
            LOG.info("Writing = {}, queued={} - [{}]  - poolQueue {}",
                    fileCount, queued.size(),
                    queued, s3SenderThreadPool.getQueue().size());
            queued.add(fileCount);
            s3SenderThreadPool.execute(() -> {
                String str = RandomStringUtils.random(1_000_000, true, true);
                long initTime = System.nanoTime();
                PutObjectRequest objectRequest = PutObjectRequest.builder()
                        .bucket("reporting-events-dev")
                        .key("close_test/" + fileCount)
                        .build();
                s3Client.putObject(objectRequest, RequestBody.fromBytes(str.getBytes()));
                queued.remove(fileCount);
                LOG.info("Written = {} in {}s", fileCount, (System.nanoTime() - initTime) / 1_000_000_000.0);
            });
            try {
                Thread.sleep(200);
            } catch (Exception e) {
                LOG.error("exception", e);
            }
            i++;
        }
        LOG.info("end of loop");
    }
    @PreDestroy
    public void destroy() {
        LOG.info("Predestroy");
        close();
    }
    @Override
    public void close() {
        LOG.info("Wait for the processor thread to finish.");
        try {
            running = false;
            this.processor.join(8_000);
            LOG.info("Pending to finish: {}, threadPool isShutdown {} isTerminated {} poolQueue: {}",
                    queued.size(),
                    s3SenderThreadPool.isShutdown(),
                    s3SenderThreadPool.isTerminated(),
                    s3SenderThreadPool.getQueue().size());
            this.s3SenderThreadPool.shutdown();
            try {
                LOG.info("awaitTermination, pending {}", queued.size());
                if (!this.s3SenderThreadPool.awaitTermination(355_000, TimeUnit.MILLISECONDS)) {
                    LOG.error("Force shutdown occurred! queuedSize={} poolSize={}", queued.size(), s3SenderThreadPool.getQueue().size());
                    this.s3SenderThreadPool.shutdownNow();
                }
            } catch (InterruptedException ie) {
                // Re-interrupt the current thread if awaitTermination was interrupted
                LOG.info("Failed, pending to finish: queueSize={} poolSize={}", queued.size(), s3SenderThreadPool.getQueue().size(), ie);
                Thread.currentThread().interrupt();
            }
        } catch (InterruptedException e) {
            // It's a good practice to re-interrupt the thread
            Thread.currentThread().interrupt();
            throw new RuntimeException("Interrupted waiting for processor thread to finish queueSize=" + queued.size() + " poolSize="+s3SenderThreadPool.getQueue().size(), e);
        }
        LOG.info("Clean finish");
    }
    public S3Client makeS3Client(String key, String secret) {
        AwsBasicCredentials awsBasicCredentials = AwsBasicCredentials.create(key, secret);
        // Create a synchronous S3 client with custom configuration and credentials
        // https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/http-configuration-crt.html
        httpClient = AwsCrtHttpClient.builder()
                .connectionTimeout(Duration.ofSeconds(20))
                .maxConcurrency(100).build();
        return S3Client.builder()
                .httpClient(httpClient)
                .credentialsProvider(StaticCredentialsProvider.create(awsBasicCredentials))
                .region(Region.of("us-east-2"))
                .build();
    }
}
Possible Solution
No response
Additional Information/Context
Spring Boot Used 2.4.5
AWS Java SDK version used
2.22.0
JDK version used
openjdk version "17.0.10" 2024-01-16
Operating System and version
5.15.146.1-microsoft-standard-WSL2 x86_64 x86_64 x86_64 GNU/Linux
Metadata
Metadata
Assignees
Labels
bugThis issue is a bug.This issue is a bug.closed-for-stalenessp2This is a standard priority issueThis is a standard priority issueresponse-requestedWaiting on additional info and feedback. Will move to "closing-soon" in 10 days.Waiting on additional info and feedback. Will move to "closing-soon" in 10 days.