Skip to content

Commit

Permalink
[S3 API] Support for S3 AbortMultipartUpload API (#2940)
Browse files Browse the repository at this point in the history
* Support AbortMultipartUpload for S3 API

---------

Co-authored-by: Shan Xu <shaxu@linkedin.com>
  • Loading branch information
alyssaxu333 and alyssaxu33 authored Nov 20, 2024
1 parent c651bcd commit e486d69
Show file tree
Hide file tree
Showing 11 changed files with 183 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.github.ambry.frontend.s3.S3GetHandler;
import com.github.ambry.frontend.s3.S3HeadHandler;
import com.github.ambry.frontend.s3.S3ListHandler;
import com.github.ambry.frontend.s3.S3MultipartAbortUploadHandler;
import com.github.ambry.frontend.s3.S3MultipartUploadHandler;
import com.github.ambry.frontend.s3.S3PostHandler;
import com.github.ambry.frontend.s3.S3PutHandler;
Expand Down Expand Up @@ -115,6 +116,7 @@ class FrontendRestRequestService implements RestRequestService {
private S3HeadHandler s3HeadHandler;
private S3PostHandler s3PostHandler;
private S3MultipartUploadHandler s3MultipartUploadHandler;
private S3MultipartAbortUploadHandler s3MultipartAbortHandler;
private S3GetHandler s3GetHandler;
private QuotaManager quotaManager;
private boolean isUp = false;
Expand Down Expand Up @@ -229,12 +231,11 @@ public void start() throws InstantiationException {
postAccountsHandler = new PostAccountsHandler(securityService, accountService, frontendConfig, frontendMetrics);
postDatasetsHandler = new PostDatasetsHandler(securityService, accountService, frontendConfig, frontendMetrics,
accountAndContainerInjector);
s3DeleteHandler = new S3DeleteHandler(deleteBlobHandler, frontendMetrics);
s3HeadHandler = new S3HeadHandler(headBlobHandler, securityService, frontendMetrics, accountService);
s3HeadHandler = new S3HeadHandler(headBlobHandler, securityService, frontendMetrics, accountService);
s3MultipartUploadHandler =
new S3MultipartUploadHandler(securityService, frontendMetrics, accountAndContainerInjector, frontendConfig,
namedBlobDb, idConverter, router, quotaManager);
s3DeleteHandler = new S3DeleteHandler(deleteBlobHandler, s3MultipartUploadHandler, frontendMetrics);
s3PostHandler = new S3PostHandler(s3MultipartUploadHandler);
s3PutHandler = new S3PutHandler(namedBlobPutHandler, s3MultipartUploadHandler, frontendMetrics);
s3ListHandler = new S3ListHandler(namedBlobListHandler, frontendMetrics);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,11 @@ public static boolean isMultipartListPartRequest(RestRequest restRequest) {
&& restRequest.getArgs().containsKey(UPLOAD_ID_QUERY_PARAM);
}

public static boolean isMultipartAbortUploadRequest(RestRequest restRequest) {
return restRequest.getRestMethod() == RestMethod.DELETE && restRequest.getArgs().containsKey(S3_REQUEST)
&& restRequest.getArgs().containsKey(UPLOAD_ID_QUERY_PARAM);
}

protected class DefaultCallbackChain {
private final RestRequest restRequest;
private final Callback<ReadableStreamChannel> finalCallback;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,15 @@
package com.github.ambry.frontend.s3;

import com.github.ambry.commons.Callback;
import com.github.ambry.frontend.AccountAndContainerInjector;
import com.github.ambry.frontend.DeleteBlobHandler;
import com.github.ambry.frontend.FrontendMetrics;
import com.github.ambry.frontend.SecurityService;
import com.github.ambry.rest.ResponseStatus;
import com.github.ambry.rest.RestRequest;
import com.github.ambry.rest.RestResponseChannel;
import com.github.ambry.rest.RestServiceException;
import com.github.ambry.router.ReadableStreamChannel;
import com.github.ambry.utils.ThrowingConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -34,16 +37,18 @@
public class S3DeleteHandler extends S3BaseHandler<Void> {

private final S3DeleteObjectHandler objectHandler;
private final S3MultipartUploadHandler s3MultipartUploadHandler;
private final FrontendMetrics metrics;

/**
* Construct a handler for handling S3 DELETE requests.
*
* @param deleteBlobHandler the generic {@link DeleteBlobHandler} delegated to by the underlying delete object handler.
*/
public S3DeleteHandler(DeleteBlobHandler deleteBlobHandler, FrontendMetrics metrics) {
public S3DeleteHandler(DeleteBlobHandler deleteBlobHandler, S3MultipartUploadHandler s3MultipartUploadHandler, FrontendMetrics metrics) {
this.metrics = metrics;
this.objectHandler = new S3DeleteObjectHandler(deleteBlobHandler);
this.s3MultipartUploadHandler = s3MultipartUploadHandler;
}

/**
Expand All @@ -57,7 +62,11 @@ public S3DeleteHandler(DeleteBlobHandler deleteBlobHandler, FrontendMetrics metr
*/
protected void doHandle(RestRequest restRequest, RestResponseChannel restResponseChannel, Callback<Void> callback)
throws RestServiceException {
objectHandler.handle(restRequest, restResponseChannel, callback);
if(S3MultipartUploadHandler.isMultipartAbortUploadRequest(restRequest)) {
s3MultipartUploadHandler.handle(restRequest, restResponseChannel, callback);
} else {
objectHandler.handle(restRequest, restResponseChannel, callback);
}
}

private class S3DeleteObjectHandler {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* Copyright 2024 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*
*/
package com.github.ambry.frontend.s3;

import com.github.ambry.commons.Callback;
import com.github.ambry.frontend.AccountAndContainerInjector;
import com.github.ambry.frontend.FrontendMetrics;
import com.github.ambry.frontend.SecurityService;
import com.github.ambry.rest.ResponseStatus;
import com.github.ambry.rest.RestRequest;
import com.github.ambry.rest.RestResponseChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static com.github.ambry.frontend.FrontendUtils.*;


/**
* Handles a request for s3 AbortMultipartUploads according to the
* <a href="https://docs.aws.amazon.com/AmazonS3/latest/API/API_AbortMultipartUpload.html">...</a>
*/
public class S3MultipartAbortUploadHandler<R> {
private static final Logger LOGGER = LoggerFactory.getLogger(S3MultipartListPartsHandler.class);
private final SecurityService securityService;
private final FrontendMetrics frontendMetrics;
private final AccountAndContainerInjector accountAndContainerInjector;

/**
* Construct a handler for handling S3 abort multi uploads.
*
* @param securityService the {@link SecurityService} to use.
* @param frontendMetrics {@link FrontendMetrics} instance where metrics should be recorded.
* @param accountAndContainerInjector helper to resolve account and container for a given request.
*/
public S3MultipartAbortUploadHandler(SecurityService securityService, FrontendMetrics frontendMetrics,
AccountAndContainerInjector accountAndContainerInjector) {
this.securityService = securityService;
this.frontendMetrics = frontendMetrics;
this.accountAndContainerInjector = accountAndContainerInjector;
}

/**
* @param restRequest the {@link RestRequest} that contains the request parameters.
* @param restResponseChannel the {@link RestResponseChannel} where headers should be set.
* @param callback the {@link Callback} to invoke when the response is ready (or if there is an exception).
*/
void handle(RestRequest restRequest, RestResponseChannel restResponseChannel, Callback<R> callback) {
new S3MultipartAbortUploadHandler.CallbackChain(restRequest, restResponseChannel, callback).start();
}

/**
* Represents the chain of actions to take. Keeps request context that is relevant to all callback stages.
*/
private class CallbackChain {
private final RestRequest restRequest;
private final RestResponseChannel restResponseChannel;
private final Callback<Void> finalCallback;
private final String uri;

/**
* @param restRequest the {@link RestRequest}.
* @param restResponseChannel the {@link RestResponseChannel}.
* @param finalCallback the {@link Callback} to call on completion.
*/
private CallbackChain(RestRequest restRequest, RestResponseChannel restResponseChannel,
Callback<Void> finalCallback) {
this.restRequest = restRequest;
this.restResponseChannel = restResponseChannel;
this.finalCallback = finalCallback;
this.uri = restRequest.getUri();
}

/**
* Start the chain by calling {@link SecurityService#processRequest}.
*/
private void start() {
try {
accountAndContainerInjector.injectAccountContainerForNamedBlob(restRequest,
frontendMetrics.deleteBlobMetricsGroup);
securityService.processRequest(restRequest, securityProcessRequestCallback());
} catch (Exception e) {
finalCallback.onCompletion(null, e);
}
}

/**
* After {@link SecurityService#processRequest} finishes, call {@link SecurityService#postProcessRequest} to perform
* request time security checks that rely on the request being fully parsed and any additional arguments set.
* @return a {@link Callback} to be used with {@link SecurityService#processRequest}.
*/
private Callback<Void> securityProcessRequestCallback() {
return buildCallback(frontendMetrics.s3DeleteHandleMetrics, securityCheckResult -> {
securityService.postProcessRequest(restRequest, securityPostProcessRequestCallback());
}, uri, LOGGER, finalCallback);
}

/**
* After {@link SecurityService#postProcessRequest} finishes, return response for the request.
* @return a {@link Callback} to be used with {@link SecurityService#postProcessRequest}.
*/
private Callback<Void> securityPostProcessRequestCallback() {
return buildCallback(frontendMetrics.s3DeleteHandleMetrics, securityCheckResult -> {
restResponseChannel.setStatus(ResponseStatus.NoContent);
finalCallback.onCompletion(null, null);
}, uri, LOGGER, finalCallback);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@
* <a href="https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html">...</a>
* TODO [S3] Add support for Abort multipart uploads.
*/
public class S3MultipartCompleteUploadHandler {
public class S3MultipartCompleteUploadHandler<R> {
private static final Logger LOGGER = LoggerFactory.getLogger(S3MultipartCompleteUploadHandler.class);
private static final ObjectMapper objectMapper = new XmlMapper();
private final SecurityService securityService;
Expand Down Expand Up @@ -125,7 +125,7 @@ public S3MultipartCompleteUploadHandler(SecurityService securityService, NamedBl
* @param callback the {@link Callback} to invoke when the response is ready (or if there is an exception).
*/
void handle(RestRequest restRequest, RestResponseChannel restResponseChannel,
Callback<ReadableStreamChannel> callback) {
Callback<R> callback) {
new S3MultipartCompleteUploadHandler.CallbackChain(restRequest, restResponseChannel, callback).start();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
* Handles a request for s3 CreateMultipartUploads according to the
* <a href="https://docs.aws.amazon.com/AmazonS3/latest/API/API_CreateMultipartUpload.html">...</a>
*/
public class S3MultipartCreateUploadHandler {
public class S3MultipartCreateUploadHandler<R> {
private static final Logger LOGGER = LoggerFactory.getLogger(S3MultipartCreateUploadHandler.class);
private static final ObjectMapper objectMapper = new XmlMapper();
private final SecurityService securityService;
Expand All @@ -71,7 +71,7 @@ public S3MultipartCreateUploadHandler(SecurityService securityService, FrontendM
* @param callback the {@link Callback} to invoke when the response is ready (or if there is an exception).
*/
void handle(RestRequest restRequest, RestResponseChannel restResponseChannel,
Callback<ReadableStreamChannel> callback) {
Callback<R> callback) {
new S3MultipartCreateUploadHandler.CallbackChain(restRequest, restResponseChannel, callback).start();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
* TODO [S3] For now, this API returns empty list. Flink only uses it before starting a new multipart upload. So,
* sending an empty list unblocks them. In future, we may need to fetch the actual part IDs.
*/
public class S3MultipartListPartsHandler {
public class S3MultipartListPartsHandler<R> {
private static final Logger LOGGER = LoggerFactory.getLogger(S3MultipartListPartsHandler.class);
private static final ObjectMapper objectMapper = new XmlMapper();
private final SecurityService securityService;
Expand All @@ -74,7 +74,7 @@ public S3MultipartListPartsHandler(SecurityService securityService, FrontendMetr
* @param callback the {@link Callback} to invoke when the response is ready (or if there is an exception).
*/
void handle(RestRequest restRequest, RestResponseChannel restResponseChannel,
Callback<ReadableStreamChannel> callback) {
Callback<R> callback) {
new S3MultipartListPartsHandler.CallbackChain(restRequest, restResponseChannel, callback).start();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,12 @@
/**
* Handles requests for s3 multipart uploads.
*/
public class S3MultipartUploadHandler extends S3BaseHandler<ReadableStreamChannel> {
public class S3MultipartUploadHandler<R> extends S3BaseHandler<R> {
private final S3MultipartCreateUploadHandler createMultipartUploadHandler;
private final S3MultipartCompleteUploadHandler completeMultipartUploadHandler;
private final S3MultipartUploadPartHandler uploadPartHandler;
private final S3MultipartListPartsHandler listPartsHandler;
private final S3MultipartAbortUploadHandler abortMultipartUploadHandler;

/**
* Construct a handler for handling S3 POST requests during multipart uploads.
Expand All @@ -54,14 +55,16 @@ public S3MultipartUploadHandler(SecurityService securityService, FrontendMetrics
AccountAndContainerInjector accountAndContainerInjector, FrontendConfig frontendConfig, NamedBlobDb namedBlobDb,
IdConverter idConverter, Router router, QuotaManager quotaManager) {
createMultipartUploadHandler =
new S3MultipartCreateUploadHandler(securityService, frontendMetrics, accountAndContainerInjector);
new S3MultipartCreateUploadHandler<ReadableStreamChannel>(securityService, frontendMetrics, accountAndContainerInjector);
completeMultipartUploadHandler =
new S3MultipartCompleteUploadHandler(securityService, namedBlobDb, idConverter, router,
new S3MultipartCompleteUploadHandler<ReadableStreamChannel>(securityService, namedBlobDb, idConverter, router,
accountAndContainerInjector, frontendMetrics, frontendConfig, quotaManager);
uploadPartHandler =
new S3MultipartUploadPartHandler(securityService, idConverter, router, accountAndContainerInjector,
new S3MultipartUploadPartHandler<ReadableStreamChannel>(securityService, idConverter, router, accountAndContainerInjector,
frontendConfig, frontendMetrics, quotaManager);
listPartsHandler = new S3MultipartListPartsHandler(securityService, frontendMetrics, accountAndContainerInjector);
listPartsHandler = new S3MultipartListPartsHandler<ReadableStreamChannel>(securityService, frontendMetrics, accountAndContainerInjector);
abortMultipartUploadHandler = new S3MultipartAbortUploadHandler<Void>(securityService, frontendMetrics, accountAndContainerInjector);

}

/**
Expand All @@ -72,7 +75,7 @@ public S3MultipartUploadHandler(SecurityService securityService, FrontendMetrics
*/
@Override
protected void doHandle(RestRequest restRequest, RestResponseChannel restResponseChannel,
Callback<ReadableStreamChannel> callback) throws RestServiceException {
Callback<R> callback) throws RestServiceException {
if (isMultipartCreateUploadRequest(restRequest)) {
createMultipartUploadHandler.handle(restRequest, restResponseChannel, callback);
} else if (isMultipartUploadPartRequest(restRequest)) {
Expand All @@ -81,6 +84,8 @@ protected void doHandle(RestRequest restRequest, RestResponseChannel restRespons
completeMultipartUploadHandler.handle(restRequest, restResponseChannel, callback);
} else if (isMultipartListPartRequest(restRequest)) {
listPartsHandler.handle(restRequest, restResponseChannel, callback);
} else if(isMultipartAbortUploadRequest(restRequest)) {
abortMultipartUploadHandler.handle(restRequest, restResponseChannel, callback);
} else {
callback.onCompletion(null,
new RestServiceException("Invalid S3 Multipart request", RestServiceErrorCode.BadRequest));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
* Handles a request for S3 Multipart upload part requests according to the
* <a href="https://docs.aws.amazon.com/AmazonS3/latest/API/API_UploadPart.html">...</a>
*/
public class S3MultipartUploadPartHandler {
public class S3MultipartUploadPartHandler<R> {
private static final Logger logger = LoggerFactory.getLogger(S3MultipartUploadPartHandler.class);
private final SecurityService securityService;
private final IdConverter idConverter;
Expand Down Expand Up @@ -89,7 +89,7 @@ public S3MultipartUploadPartHandler(SecurityService securityService, IdConverter
* @throws RestServiceException exception when the processing fails
*/
void handle(RestRequest restRequest, RestResponseChannel restResponseChannel,
Callback<ReadableStreamChannel> callback) throws RestServiceException {
Callback<R> callback) throws RestServiceException {
// 1. Set headers required by Ambry. These become the blob properties.
NamedBlobPath namedBlobPath = NamedBlobPath.parse(getRequestPath(restRequest), restRequest.getArgs());
String accountName = namedBlobPath.getAccountName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import com.github.ambry.config.FrontendConfig;
import com.github.ambry.config.VerifiableProperties;
import com.github.ambry.frontend.s3.S3DeleteHandler;
import com.github.ambry.frontend.s3.S3MultipartAbortUploadHandler;
import com.github.ambry.frontend.s3.S3MultipartUploadHandler;
import com.github.ambry.named.NamedBlobDb;
import com.github.ambry.named.NamedBlobDbFactory;
import com.github.ambry.quota.QuotaTestUtils;
Expand Down Expand Up @@ -114,7 +116,7 @@ private void setup() throws Exception {
DeleteBlobHandler deleteBlobHandler =
new DeleteBlobHandler(router, securityService, ambryIdConverterFactory.getIdConverter(), injector, metrics,
new MockClusterMap(), QuotaTestUtils.createDummyQuotaManager(), ACCOUNT_SERVICE);
s3DeleteHandler = new S3DeleteHandler(deleteBlobHandler, metrics);
s3DeleteHandler = new S3DeleteHandler(deleteBlobHandler, null, metrics);
}

private void putABlob() throws Exception {
Expand Down
Loading

0 comments on commit e486d69

Please sign in to comment.