Skip to content

Commit

Permalink
Added close on buffered stream in s3 async upload for additional cleanup
Browse files Browse the repository at this point in the history
Signed-off-by: vikasvb90 <vikasvb@amazon.com>
  • Loading branch information
vikasvb90 committed Oct 18, 2023
1 parent 3a36c22 commit efb7e64
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -140,28 +141,39 @@ private static void uploadPart(
ExecutorService streamReadExecutor = uploadRequest.getWritePriority() == WritePriority.HIGH
? priorityExecutorService
: executorService;
// Buffered stream is needed to allow mark and reset ops during IO errors so that only buffered
// data can be retried instead of retrying whole file by the application.
InputStream inputStream = new BufferedInputStream(inputStreamContainer.getInputStream(), (int) (ByteSizeUnit.MB.toBytes(1) + 1));
CompletableFuture<UploadPartResponse> uploadPartResponseFuture = SocketAccess.doPrivileged(
() -> s3AsyncClient.uploadPart(
uploadPartRequest,
AsyncRequestBody.fromInputStream(
// Buffered stream is needed to allow mark and reset ops during IO errors so that only buffered
// data can be retried instead of retrying whole file by the application.
new BufferedInputStream(inputStreamContainer.getInputStream(), (int) (ByteSizeUnit.MB.toBytes(1) + 1)),
inputStreamContainer.getContentLength(),
streamReadExecutor
)
AsyncRequestBody.fromInputStream(inputStream, inputStreamContainer.getContentLength(), streamReadExecutor)
)
);

CompletableFuture<CompletedPart> convertFuture = uploadPartResponseFuture.thenApply(
uploadPartResponse -> convertUploadPartResponse(
completedParts,
inputStreamContainers,
uploadPartResponse,
partNumber,
uploadRequest.doRemoteDataIntegrityCheck()
)
);
CompletableFuture<CompletedPart> convertFuture = uploadPartResponseFuture.whenComplete((resp, throwable) -> {
try {
inputStream.close();
} catch (IOException ex) {
log.error(
() -> new ParameterizedMessage(
"Failed to close stream while uploading a part of idx {} and file {}.",
uploadPartRequest.partNumber(),
uploadPartRequest.key()
),
ex
);
}
})
.thenApply(
uploadPartResponse -> convertUploadPartResponse(
completedParts,
inputStreamContainers,
uploadPartResponse,
partNumber,
uploadRequest.doRemoteDataIntegrityCheck()
)
);
futures.add(convertFuture);

CompletableFutureUtils.forwardExceptionTo(convertFuture, uploadPartResponseFuture);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@

import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.Base64;
import java.util.List;
Expand Down Expand Up @@ -310,17 +311,19 @@ private void uploadInOneChunk(
ExecutorService streamReadExecutor = uploadRequest.getWritePriority() == WritePriority.HIGH
? priorityExecutorService
: executorService;
// Buffered stream is needed to allow mark and reset ops during IO errors so that only buffered
// data can be retried instead of retrying whole file by the application.
InputStream inputStream = new BufferedInputStream(inputStreamContainer.getInputStream(), (int) (ByteSizeUnit.MB.toBytes(1) + 1));
CompletableFuture<Void> putObjectFuture = SocketAccess.doPrivileged(
() -> s3AsyncClient.putObject(
putObjectRequestBuilder.build(),
AsyncRequestBody.fromInputStream(
// Buffered stream is needed to allow mark and reset ops during IO errors so that only buffered
// data can be retried instead of retrying whole file by the application.
new BufferedInputStream(inputStreamContainer.getInputStream(), (int) (ByteSizeUnit.MB.toBytes(1) + 1)),
inputStreamContainer.getContentLength(),
streamReadExecutor
)
AsyncRequestBody.fromInputStream(inputStream, inputStreamContainer.getContentLength(), streamReadExecutor)
).handle((resp, throwable) -> {
try {
inputStream.close();
} catch (IOException e) {
log.error("Failed to close stream while uploading single chunk.", e);
}
if (throwable != null) {
Throwable unwrappedThrowable = ExceptionsHelper.unwrap(throwable, S3Exception.class);
if (unwrappedThrowable != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,14 @@
import org.opensearch.test.OpenSearchTestCase;
import org.junit.Before;

import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -71,17 +76,16 @@ public void testOneChunkUpload() {
putObjectResponseCompletableFuture
);

AtomicReference<InputStream> streamRef = new AtomicReference<>();
CompletableFuture<Void> resultFuture = asyncTransferManager.uploadObject(
s3AsyncClient,
new UploadRequest("bucket", "key", ByteSizeUnit.MB.toBytes(1), WritePriority.HIGH, uploadSuccess -> {
// do nothing
}, false, null),
new StreamContext(
(partIdx, partSize, position) -> new InputStreamContainer(new ZeroInputStream(partSize), partSize, position),
ByteSizeUnit.MB.toBytes(1),
ByteSizeUnit.MB.toBytes(1),
1
),
new StreamContext((partIdx, partSize, position) -> {
streamRef.set(new ZeroInputStream(partSize));
return new InputStreamContainer(streamRef.get(), partSize, position);
}, ByteSizeUnit.MB.toBytes(1), ByteSizeUnit.MB.toBytes(1), 1),
new StatsMetricPublisher()
);

Expand All @@ -92,6 +96,14 @@ public void testOneChunkUpload() {
}

verify(s3AsyncClient, times(1)).putObject(any(PutObjectRequest.class), any(AsyncRequestBody.class));

boolean closeError = false;
try {
streamRef.get().available();
} catch (IOException e) {
closeError = e.getMessage().equals("Stream closed");
}
assertTrue("InputStream was still open after upload", closeError);
}

public void testOneChunkUploadCorruption() {
Expand Down Expand Up @@ -162,17 +174,17 @@ public void testMultipartUpload() {
abortMultipartUploadResponseCompletableFuture
);

List<InputStream> streams = new ArrayList<>();
CompletableFuture<Void> resultFuture = asyncTransferManager.uploadObject(
s3AsyncClient,
new UploadRequest("bucket", "key", ByteSizeUnit.MB.toBytes(5), WritePriority.HIGH, uploadSuccess -> {
// do nothing
}, true, 3376132981L),
new StreamContext(
(partIdx, partSize, position) -> new InputStreamContainer(new ZeroInputStream(partSize), partSize, position),
ByteSizeUnit.MB.toBytes(1),
ByteSizeUnit.MB.toBytes(1),
5
),
new StreamContext((partIdx, partSize, position) -> {
InputStream stream = new ZeroInputStream(partSize);
streams.add(stream);
return new InputStreamContainer(stream, partSize, position);
}, ByteSizeUnit.MB.toBytes(1), ByteSizeUnit.MB.toBytes(1), 5),
new StatsMetricPublisher()
);

Expand All @@ -182,6 +194,16 @@ public void testMultipartUpload() {
fail("did not expect resultFuture to fail");
}

streams.forEach(stream -> {
boolean closeError = false;
try {
stream.available();
} catch (IOException e) {
closeError = e.getMessage().equals("Stream closed");
}
assertTrue("InputStream was still open after upload", closeError);
});

verify(s3AsyncClient, times(1)).createMultipartUpload(any(CreateMultipartUploadRequest.class));
verify(s3AsyncClient, times(5)).uploadPart(any(UploadPartRequest.class), any(AsyncRequestBody.class));
verify(s3AsyncClient, times(1)).completeMultipartUpload(any(CompleteMultipartUploadRequest.class));
Expand Down

0 comments on commit efb7e64

Please sign in to comment.