Skip to content

Commit

Permalink
Reorder/extract methods in S3 CompareAndExchangeOperation (elastic#10…
Browse files Browse the repository at this point in the history
…1299)

Shorten `run()` and put the methods it calls into a more logical order.
  • Loading branch information
DaveCTurner committed Oct 25, 2023
1 parent 27956c0 commit 9e135eb
Showing 1 changed file with 114 additions and 112 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -580,111 +580,20 @@ private class CompareAndExchangeOperation {
this.threadPool = threadPool;
}

private List<MultipartUpload> listMultipartUploads() {
final var listRequest = new ListMultipartUploadsRequest(bucket);
listRequest.setPrefix(blobKey);
listRequest.setRequestMetricCollector(blobStore.getMetricCollector(Operation.LIST_OBJECTS, purpose));
try {
return SocketAccess.doPrivileged(() -> client.listMultipartUploads(listRequest)).getMultipartUploads();
} catch (AmazonS3Exception e) {
if (e.getStatusCode() == 404) {
return List.of();
}
throw e;
}
}

private int getUploadIndex(String targetUploadId, List<MultipartUpload> multipartUploads) {
var uploadIndex = 0;
var found = false;
for (MultipartUpload multipartUpload : multipartUploads) {
final var observedUploadId = multipartUpload.getUploadId();
if (observedUploadId.equals(targetUploadId)) {
final var currentTimeMillis = blobStore.getThreadPool().absoluteTimeInMillis();
final var ageMillis = currentTimeMillis - multipartUpload.getInitiated().toInstant().toEpochMilli();
final var expectedAgeRangeMillis = blobStore.getCompareAndExchangeTimeToLive().millis();
if (ageMillis < -expectedAgeRangeMillis || ageMillis > expectedAgeRangeMillis) {
logger.warn(
"""
compare-and-exchange of blob [{}:{}] was initiated at [{}={}] \
which deviates from local node epoch time [{}] by more than the warn threshold of [{}ms]""",
bucket,
blobKey,
multipartUpload.getInitiated(),
multipartUpload.getInitiated().toInstant().toEpochMilli(),
currentTimeMillis,
expectedAgeRangeMillis
);
}
found = true;
} else if (observedUploadId.compareTo(targetUploadId) < 0) {
uploadIndex += 1;
}
}

return found ? uploadIndex : -1;
}

/**
* @return {@code true} if there are already ongoing uploads, so we should not proceed with the operation
*/
private boolean hasPreexistingUploads() {
final var uploads = listMultipartUploads();
if (uploads.isEmpty()) {
return false;
}

final var expiryDate = Date.from(
Instant.ofEpochMilli(
blobStore.getThreadPool().absoluteTimeInMillis() - blobStore.getCompareAndExchangeTimeToLive().millis()
)
);
if (uploads.stream().anyMatch(upload -> upload.getInitiated().after(expiryDate))) {
return true;
}

// there are uploads, but they are all older than the TTL, so clean them up before carrying on (should be rare)
for (final var upload : uploads) {
logger.warn(
"cleaning up stale compare-and-swap upload [{}] initiated at [{}]",
upload.getUploadId(),
upload.getInitiated()
);
safeAbortMultipartUpload(upload.getUploadId());
}

return false;
}

void run(BytesReference expected, BytesReference updated, ActionListener<OptionalBytesReference> listener) throws Exception {
BlobContainerUtils.ensureValidRegisterContent(updated);

if (hasPreexistingUploads()) {

// This is a small optimization to improve the liveness properties of this algorithm.
//
// We can safely proceed even if there are other uploads in progress, but that would add to the potential for collisions and
// delays. Thus in this case we prefer avoid disturbing the ongoing attempts and just fail up front.

listener.onResponse(OptionalBytesReference.MISSING);
return;
}

final var initiateRequest = new InitiateMultipartUploadRequest(bucket, blobKey);
initiateRequest.setRequestMetricCollector(blobStore.getMetricCollector(Operation.PUT_MULTIPART_OBJECT, purpose));
final var uploadId = SocketAccess.doPrivileged(() -> client.initiateMultipartUpload(initiateRequest)).getUploadId();

final var uploadPartRequest = new UploadPartRequest();
uploadPartRequest.setBucketName(bucket);
uploadPartRequest.setKey(blobKey);
uploadPartRequest.setUploadId(uploadId);
uploadPartRequest.setPartNumber(1);
uploadPartRequest.setLastPart(true);
uploadPartRequest.setInputStream(updated.streamInput());
uploadPartRequest.setPartSize(updated.length());
uploadPartRequest.setRequestMetricCollector(blobStore.getMetricCollector(Operation.PUT_MULTIPART_OBJECT, purpose));
final var partETag = SocketAccess.doPrivileged(() -> client.uploadPart(uploadPartRequest)).getPartETag();

final var uploadId = initiateMultipartUpload();
final var partETag = uploadPart(updated, uploadId);
final var currentUploads = listMultipartUploads();
final var uploadIndex = getUploadIndex(uploadId, currentUploads);

Expand All @@ -710,16 +619,7 @@ void run(BytesReference expected, BytesReference updated, ActionListener<Optiona
rawKey,
delegate1.delegateFailure((delegate2, currentValue) -> ActionListener.completeWith(delegate2, () -> {
if (currentValue.isPresent() && currentValue.bytesReference().equals(expected)) {
final var completeMultipartUploadRequest = new CompleteMultipartUploadRequest(
bucket,
blobKey,
uploadId,
List.of(partETag)
);
completeMultipartUploadRequest.setRequestMetricCollector(
blobStore.getMetricCollector(Operation.PUT_MULTIPART_OBJECT, purpose)
);
SocketAccess.doPrivilegedVoid(() -> client.completeMultipartUpload(completeMultipartUploadRequest));
completeMultipartUpload(uploadId, partETag);
isComplete.set(true);
}
return currentValue;
Expand All @@ -740,15 +640,7 @@ void run(BytesReference expected, BytesReference updated, ActionListener<Optiona
var delayListener = listeners.acquire();
final Runnable cancelConcurrentUpdates = () -> {
try {
for (MultipartUpload currentUpload : currentUploads) {
final var currentUploadId = currentUpload.getUploadId();
if (uploadId.equals(currentUploadId) == false) {
blobStore.getSnapshotExecutor()
.execute(
ActionRunnable.run(listeners.acquire(), () -> abortMultipartUploadIfExists(currentUploadId))
);
}
}
cancelOtherUploads(uploadId, currentUploads, listeners);
} finally {
delayListener.onResponse(null);
}
Expand All @@ -769,6 +661,111 @@ void run(BytesReference expected, BytesReference updated, ActionListener<Optiona
}
}

/**
* @return {@code true} if there are already ongoing uploads, so we should not proceed with the operation
*/
private boolean hasPreexistingUploads() {
final var uploads = listMultipartUploads();
if (uploads.isEmpty()) {
return false;
}

final var expiryDate = Date.from(
Instant.ofEpochMilli(
blobStore.getThreadPool().absoluteTimeInMillis() - blobStore.getCompareAndExchangeTimeToLive().millis()
)
);
if (uploads.stream().anyMatch(upload -> upload.getInitiated().after(expiryDate))) {
return true;
}

// there are uploads, but they are all older than the TTL, so clean them up before carrying on (should be rare)
for (final var upload : uploads) {
logger.warn(
"cleaning up stale compare-and-swap upload [{}] initiated at [{}]",
upload.getUploadId(),
upload.getInitiated()
);
safeAbortMultipartUpload(upload.getUploadId());
}

return false;
}

private List<MultipartUpload> listMultipartUploads() {
final var listRequest = new ListMultipartUploadsRequest(bucket);
listRequest.setPrefix(blobKey);
listRequest.setRequestMetricCollector(blobStore.getMetricCollector(Operation.LIST_OBJECTS, purpose));
try {
return SocketAccess.doPrivileged(() -> client.listMultipartUploads(listRequest)).getMultipartUploads();
} catch (AmazonS3Exception e) {
if (e.getStatusCode() == 404) {
return List.of();
}
throw e;
}
}

private String initiateMultipartUpload() {
final var initiateRequest = new InitiateMultipartUploadRequest(bucket, blobKey);
initiateRequest.setRequestMetricCollector(blobStore.getMetricCollector(Operation.PUT_MULTIPART_OBJECT, purpose));
return SocketAccess.doPrivileged(() -> client.initiateMultipartUpload(initiateRequest)).getUploadId();
}

private PartETag uploadPart(BytesReference updated, String uploadId) throws IOException {
final var uploadPartRequest = new UploadPartRequest();
uploadPartRequest.setBucketName(bucket);
uploadPartRequest.setKey(blobKey);
uploadPartRequest.setUploadId(uploadId);
uploadPartRequest.setPartNumber(1);
uploadPartRequest.setLastPart(true);
uploadPartRequest.setInputStream(updated.streamInput());
uploadPartRequest.setPartSize(updated.length());
uploadPartRequest.setRequestMetricCollector(blobStore.getMetricCollector(Operation.PUT_MULTIPART_OBJECT, purpose));
return SocketAccess.doPrivileged(() -> client.uploadPart(uploadPartRequest)).getPartETag();
}

private int getUploadIndex(String targetUploadId, List<MultipartUpload> multipartUploads) {
var uploadIndex = 0;
var found = false;
for (MultipartUpload multipartUpload : multipartUploads) {
final var observedUploadId = multipartUpload.getUploadId();
if (observedUploadId.equals(targetUploadId)) {
final var currentTimeMillis = blobStore.getThreadPool().absoluteTimeInMillis();
final var ageMillis = currentTimeMillis - multipartUpload.getInitiated().toInstant().toEpochMilli();
final var expectedAgeRangeMillis = blobStore.getCompareAndExchangeTimeToLive().millis();
if (ageMillis < -expectedAgeRangeMillis || ageMillis > expectedAgeRangeMillis) {
logger.warn(
"""
compare-and-exchange of blob [{}:{}] was initiated at [{}={}] \
which deviates from local node epoch time [{}] by more than the warn threshold of [{}ms]""",
bucket,
blobKey,
multipartUpload.getInitiated(),
multipartUpload.getInitiated().toInstant().toEpochMilli(),
currentTimeMillis,
expectedAgeRangeMillis
);
}
found = true;
} else if (observedUploadId.compareTo(targetUploadId) < 0) {
uploadIndex += 1;
}
}

return found ? uploadIndex : -1;
}

private void cancelOtherUploads(String uploadId, List<MultipartUpload> currentUploads, RefCountingListener listeners) {
for (final var currentUpload : currentUploads) {
final var currentUploadId = currentUpload.getUploadId();
if (uploadId.equals(currentUploadId) == false) {
blobStore.getSnapshotExecutor()
.execute(ActionRunnable.run(listeners.acquire(), () -> abortMultipartUploadIfExists(currentUploadId)));
}
}
}

private void safeAbortMultipartUpload(String uploadId) {
try {
abortMultipartUploadIfExists(uploadId);
Expand All @@ -791,6 +788,11 @@ private void abortMultipartUploadIfExists(String uploadId) {
}
}

private void completeMultipartUpload(String uploadId, PartETag partETag) {
final var completeMultipartUploadRequest = new CompleteMultipartUploadRequest(bucket, blobKey, uploadId, List.of(partETag));
completeMultipartUploadRequest.setRequestMetricCollector(blobStore.getMetricCollector(Operation.PUT_MULTIPART_OBJECT, purpose));
SocketAccess.doPrivilegedVoid(() -> client.completeMultipartUpload(completeMultipartUploadRequest));
}
}

@Override
Expand Down

0 comments on commit 9e135eb

Please sign in to comment.