Skip to content

Commit

Permalink
Add Bulk Delete Api to BlobStore (#40322) (#41253)
Browse files Browse the repository at this point in the history
* Adds Bulk delete API to blob container
* Implement bulk delete API for S3
* Adjust S3Fixture to accept both path styles for bulk deletes since the S3 SDK uses both during our ITs
* Closes #40250
  • Loading branch information
original-brownbear authored Apr 16, 2019
1 parent c22a2ce commit c4e84e2
Show file tree
Hide file tree
Showing 6 changed files with 136 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
import com.amazonaws.services.s3.model.ObjectListing;
import com.amazonaws.services.s3.model.ObjectMetadata;
Expand Down Expand Up @@ -56,6 +57,12 @@

class S3BlobContainer extends AbstractBlobContainer {

/**
* Maximum number of deletes in a {@link DeleteObjectsRequest}.
* @see <a href="https://docs.aws.amazon.com/AmazonS3/latest/API/multiobjectdeleteapi.html">S3 Documentation</a>.
*/
private static final int MAX_BULK_DELETES = 1000;

private final S3BlobStore blobStore;
private final String keyPath;

Expand Down Expand Up @@ -118,6 +125,51 @@ public void deleteBlob(String blobName) throws IOException {
deleteBlobIgnoringIfNotExists(blobName);
}

@Override
public void deleteBlobsIgnoringIfNotExists(List<String> blobNames) throws IOException {
if (blobNames.isEmpty()) {
return;
}
try (AmazonS3Reference clientReference = blobStore.clientReference()) {
// S3 API only allows 1k blobs per delete so we split up the given blobs into requests of max. 1k deletes
final List<DeleteObjectsRequest> deleteRequests = new ArrayList<>();
final List<String> partition = new ArrayList<>();
for (String blob : blobNames) {
partition.add(buildKey(blob));
if (partition.size() == MAX_BULK_DELETES ) {
deleteRequests.add(bulkDelete(blobStore.bucket(), partition));
partition.clear();
}
}
if (partition.isEmpty() == false) {
deleteRequests.add(bulkDelete(blobStore.bucket(), partition));
}
SocketAccess.doPrivilegedVoid(() -> {
AmazonClientException aex = null;
for (DeleteObjectsRequest deleteRequest : deleteRequests) {
try {
clientReference.client().deleteObjects(deleteRequest);
} catch (AmazonClientException e) {
if (aex == null) {
aex = e;
} else {
aex.addSuppressed(e);
}
}
}
if (aex != null) {
throw aex;
}
});
} catch (final AmazonClientException e) {
throw new IOException("Exception when deleting blobs [" + blobNames + "]", e);
}
}

private static DeleteObjectsRequest bulkDelete(String bucket, List<String> blobs) {
return new DeleteObjectsRequest(bucket).withKeys(blobs.toArray(Strings.EMPTY_ARRAY)).withQuiet(true);
}

@Override
public void deleteBlobIgnoringIfNotExists(String blobName) throws IOException {
try (AmazonS3Reference clientReference = blobStore.clientReference()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ private PathTrie<RequestHandler> defaultHandlers(final Map<String, Bucket> bucke
// Delete Multiple Objects
//
// https://docs.aws.amazon.com/AmazonS3/latest/API/multiobjectdeleteapi.html
handlers.insert(nonAuthPath(HttpPost.METHOD_NAME, "/"), (request) -> {
final RequestHandler bulkDeleteHandler = request -> {
final List<String> deletes = new ArrayList<>();
final List<String> errors = new ArrayList<>();

Expand All @@ -344,7 +344,6 @@ private PathTrie<RequestHandler> defaultHandlers(final Map<String, Bucket> bucke
if (closingOffset != -1) {
offset = offset + startMarker.length();
final String objectName = requestBody.substring(offset, closingOffset);

boolean found = false;
for (Bucket bucket : buckets.values()) {
if (bucket.objects.containsKey(objectName)) {
Expand All @@ -369,7 +368,9 @@ private PathTrie<RequestHandler> defaultHandlers(final Map<String, Bucket> bucke
}
}
return newInternalError(request.getId(), "Something is wrong with this POST multiple deletes request");
});
};
handlers.insert(nonAuthPath(HttpPost.METHOD_NAME, "/"), bulkDeleteHandler);
handlers.insert(nonAuthPath(HttpPost.METHOD_NAME, "/{bucket}"), bulkDeleteHandler);

// non-authorized requests

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,11 +158,7 @@ public DeleteObjectsResult deleteObjects(DeleteObjectsRequest request) throws Sd

final List<DeleteObjectsResult.DeletedObject> deletions = new ArrayList<>();
for (DeleteObjectsRequest.KeyVersion key : request.getKeys()) {
if (blobs.remove(key.getKey()) == null) {
AmazonS3Exception exception = new AmazonS3Exception("[" + key + "] does not exist.");
exception.setStatusCode(404);
throw exception;
} else {
if (blobs.remove(key.getKey()) != null) {
DeleteObjectsResult.DeletedObject deletion = new DeleteObjectsResult.DeletedObject();
deletion.setKey(key.getKey());
deletions.add(deletion);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.io.InputStream;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.NoSuchFileException;
import java.util.List;
import java.util.Map;

/**
Expand Down Expand Up @@ -96,8 +97,9 @@ public interface BlobContainer {
* @throws IOException if the input stream could not be read, or the target blob could not be written to.
*/
void writeBlobAtomic(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException;

/**
* Deletes a blob with giving name, if the blob exists. If the blob does not exist,
* Deletes the blob with the given name, if the blob exists. If the blob does not exist,
* this method throws a NoSuchFileException.
*
* @param blobName
Expand All @@ -107,6 +109,33 @@ public interface BlobContainer {
*/
void deleteBlob(String blobName) throws IOException;

/**
* Deletes the blobs with given names. Unlike {@link #deleteBlob(String)} this method will not throw an exception
* when one or multiple of the given blobs don't exist and simply ignore this case.
*
* @param blobNames The names of the blob to delete.
* @throws IOException if a subset of blob exists but could not be deleted.
*/
default void deleteBlobsIgnoringIfNotExists(List<String> blobNames) throws IOException {
IOException ioe = null;
for (String blobName : blobNames) {
try {
deleteBlob(blobName);
} catch (NoSuchFileException e) {
// ignored
} catch (IOException e) {
if (ioe == null) {
ioe = e;
} else {
ioe.addSuppressed(e);
}
}
}
if (ioe != null) {
throw ioe;
}
}

/**
* Deletes a blob with giving name, ignoring if the blob does not exist.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.DirectoryNotEmptyException;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.NoSuchFileException;
import java.util.ArrayList;
Expand Down Expand Up @@ -464,22 +463,16 @@ public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId) {
final Collection<IndexId> indicesToCleanUp = Sets.newHashSet(repositoryData.getIndices().values());
indicesToCleanUp.removeAll(updatedRepositoryData.getIndices().values());
final BlobContainer indicesBlobContainer = blobStore().blobContainer(basePath().add("indices"));
for (final IndexId indexId : indicesToCleanUp) {
try {
indicesBlobContainer.deleteBlobIgnoringIfNotExists(indexId.getId());
} catch (DirectoryNotEmptyException dnee) {
// if the directory isn't empty for some reason, it will fail to clean up;
// we'll ignore that and accept that cleanup didn't fully succeed.
// since we are using UUIDs for path names, this won't be an issue for
// snapshotting indices of the same name
logger.warn(() -> new ParameterizedMessage("[{}] index [{}] no longer part of any snapshots in the repository, " +
"but failed to clean up its index folder due to the directory not being empty.", metadata.name(), indexId), dnee);
indicesBlobContainer.deleteBlobsIgnoringIfNotExists(
indicesToCleanUp.stream().map(IndexId::getId).collect(Collectors.toList()));
} catch (IOException ioe) {
// a different IOException occurred while trying to delete - will just log the issue for now
logger.warn(() -> new ParameterizedMessage("[{}] index [{}] no longer part of any snapshots in the repository, " +
"but failed to clean up its index folder.", metadata.name(), indexId), ioe);
logger.warn(() ->
new ParameterizedMessage(
"[{}] indices {} are no longer part of any snapshots in the repository, " +
"but failed to clean up their index folders.", metadata.name(), indicesToCleanUp), ioe);
}
}
} catch (IOException | ResourceNotFoundException ex) {
throw new RepositoryException(metadata.name(), "failed to delete snapshot [" + snapshotId + "]", ex);
}
Expand Down Expand Up @@ -1016,16 +1009,14 @@ protected void finalize(final List<SnapshotFiles> snapshots,
try {
// Delete temporary index files first, as we might otherwise fail in the next step creating the new index file if an earlier
// attempt to write an index file with this generation failed mid-way after creating the temporary file.
for (final String blobName : blobs.keySet()) {
if (FsBlobContainer.isTempBlobName(blobName)) {
try {
blobContainer.deleteBlobIgnoringIfNotExists(blobName);
} catch (IOException e) {
logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete index blob [{}] during finalization",
snapshotId, shardId, blobName), e);
throw e;
}
}
final List<String> blobNames =
blobs.keySet().stream().filter(FsBlobContainer::isTempBlobName).collect(Collectors.toList());
try {
blobContainer.deleteBlobsIgnoringIfNotExists(blobNames);
} catch (IOException e) {
logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete index blobs {} during finalization",
snapshotId, shardId, blobNames), e);
throw e;
}

// If we deleted all snapshots, we don't need to create a new index file
Expand All @@ -1034,28 +1025,26 @@ protected void finalize(final List<SnapshotFiles> snapshots,
}

// Delete old index files
for (final String blobName : blobs.keySet()) {
if (blobName.startsWith(SNAPSHOT_INDEX_PREFIX)) {
try {
blobContainer.deleteBlobIgnoringIfNotExists(blobName);
} catch (IOException e) {
logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete index blob [{}] during finalization",
snapshotId, shardId, blobName), e);
throw e;
}
}
final List<String> indexBlobs =
blobs.keySet().stream().filter(blob -> blob.startsWith(SNAPSHOT_INDEX_PREFIX)).collect(Collectors.toList());
try {
blobContainer.deleteBlobsIgnoringIfNotExists(indexBlobs);
} catch (IOException e) {
logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete index blobs {} during finalization",
snapshotId, shardId, indexBlobs), e);
throw e;
}

// Delete all blobs that don't exist in a snapshot
for (final String blobName : blobs.keySet()) {
if (blobName.startsWith(DATA_BLOB_PREFIX) && (updatedSnapshots.findNameFile(canonicalName(blobName)) == null)) {
try {
blobContainer.deleteBlobIgnoringIfNotExists(blobName);
} catch (IOException e) {
logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete data blob [{}] during finalization",
snapshotId, shardId, blobName), e);
}
}
final List<String> orphanedBlobs = blobs.keySet().stream()
.filter(blobName ->
blobName.startsWith(DATA_BLOB_PREFIX) && updatedSnapshots.findNameFile(canonicalName(blobName)) == null)
.collect(Collectors.toList());
try {
blobContainer.deleteBlobsIgnoringIfNotExists(orphanedBlobs);
} catch (IOException e) {
logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete data blobs {} during finalization",
snapshotId, shardId, orphanedBlobs), e);
}
} catch (IOException e) {
String message = "Failed to finalize " + reason + " with shard index [" + currentIndexGen + "]";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.nio.file.NoSuchFileException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.elasticsearch.repositories.ESBlobStoreTestCase.randomBytes;
Expand Down Expand Up @@ -136,6 +137,23 @@ public void testDeleteBlob() throws IOException {
}
}

public void testDeleteBlobs() throws IOException {
try (BlobStore store = newBlobStore()) {
final List<String> blobNames = Arrays.asList("foobar", "barfoo");
final BlobContainer container = store.blobContainer(new BlobPath());
container.deleteBlobsIgnoringIfNotExists(blobNames); // does not raise when blobs don't exist
byte[] data = randomBytes(randomIntBetween(10, scaledRandomIntBetween(1024, 1 << 16)));
final BytesArray bytesArray = new BytesArray(data);
for (String blobName : blobNames) {
writeBlob(container, blobName, bytesArray, randomBoolean());
}
assertEquals(container.listBlobs().size(), 2);
container.deleteBlobsIgnoringIfNotExists(blobNames);
assertTrue(container.listBlobs().isEmpty());
container.deleteBlobsIgnoringIfNotExists(blobNames); // does not raise when blobs don't exist
}
}

public void testDeleteBlobIgnoringIfNotExists() throws IOException {
try (BlobStore store = newBlobStore()) {
BlobPath blobPath = new BlobPath();
Expand Down

0 comments on commit c4e84e2

Please sign in to comment.