Skip to content

Commit

Permalink
Parallelize stale blobs deletion during snapshot delete (#3796)
Browse files Browse the repository at this point in the history
* Parallelize stale blobs deletion during snapshot delete

Signed-off-by: Piyush Daftary <pdaftary@amazon.com>

* Adding test which throws exception

Signed-off-by: Piyush Daftary <pdaftary@amazon.com>

* Adusting identation for spotlessJavaCheck

Signed-off-by: Piyush Daftary <pdaftary@amazon.com>

* Adding more description to MAX_SHARD_BLOB_DELETE_BATCH_SIZE

Signed-off-by: Piyush Daftary <pdaftary@amazon.com>

* Renaming max_shard_blob_delete_batch_size to max_snapshot_shard_blob_delete_batch_size

Signed-off-by: Piyush Daftary <pdaftary@amazon.com>
(cherry picked from commit 1c787e8)
  • Loading branch information
piyushdaftary authored and github-actions[bot] committed Jul 22, 2022
1 parent fb4f96f commit d34cc53
Show file tree
Hide file tree
Showing 3 changed files with 192 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesResponse;
import org.opensearch.action.admin.cluster.repositories.verify.VerifyRepositoryResponse;
import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.Metadata;
Expand All @@ -46,6 +48,7 @@
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.RepositoryException;
import org.opensearch.repositories.RepositoryVerificationException;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.snapshots.mockstore.MockRepository;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -328,4 +331,125 @@ public void testRepositoryVerification() throws Exception {
assertThat(ex.getMessage(), containsString("is not shared"));
}
}

public void testSnapshotShardBlobDelete() throws Exception {
Client client = client();
Path repositoryPath = randomRepoPath();
final String repositoryName = "test-repo";
final String firstSnapshot = "first-snapshot";
final String secondSnapshot = "second-snapshot";
final String indexName = "test-idx";

logger.info("--> creating repository at {}", repositoryPath.toAbsolutePath());
int maxShardBlobDeleteBatchSize = randomIntBetween(1, 1000);
createRepository(
"test-repo",
"mock",
Settings.builder()
.put("location", repositoryPath)
.put(BlobStoreRepository.MAX_SNAPSHOT_SHARD_BLOB_DELETE_BATCH_SIZE.getKey(), maxShardBlobDeleteBatchSize)
);

logger.info("--> creating index-0 and ingest data");
createIndex(indexName);
ensureGreen();
for (int j = 0; j < randomIntBetween(1, 1000); j++) {
index(indexName, "_doc", Integer.toString(j), "foo", "bar" + j);
}
refresh();

logger.info("--> creating first snapshot");
createFullSnapshot(repositoryName, firstSnapshot);

int numberOfFiles = numberOfFiles(repositoryPath);

logger.info("--> adding some more documents to test index");
for (int j = 0; j < randomIntBetween(100, 10000); ++j) {
final BulkRequest bulkRequest = new BulkRequest();
for (int i = 0; i < randomIntBetween(100, 1000); ++i) {
bulkRequest.add(new IndexRequest(indexName).source("foo" + j, "bar" + i));
}
client().bulk(bulkRequest).get();
}
refresh();

logger.info("--> creating second snapshot");
createFullSnapshot(repositoryName, secondSnapshot);

// Delete second snapshot
logger.info("--> delete second snapshot");
client.admin().cluster().prepareDeleteSnapshot(repositoryName, secondSnapshot).get();

logger.info("--> make sure that number of files is back to what it was when the first snapshot was made");
assertFileCount(repositoryPath, numberOfFiles);

logger.info("--> done");
}

public void testSnapshotShardBlobDeletionRepositoryThrowingError() throws Exception {
Client client = client();
Path repositoryPath = randomRepoPath();
final String repositoryName = "test-repo";
final String firstSnapshot = "first-snapshot";
final String secondSnapshot = "second-snapshot";
final String indexName = "test-idx";

logger.info("--> creating repository at {}", repositoryPath.toAbsolutePath());
int maxShardBlobDeleteBatchSize = randomIntBetween(1, 1000);
createRepository(
"test-repo",
"mock",
Settings.builder()
.put("location", repositoryPath)
.put(BlobStoreRepository.MAX_SNAPSHOT_SHARD_BLOB_DELETE_BATCH_SIZE.getKey(), maxShardBlobDeleteBatchSize)
);

logger.info("--> creating index-0 and ingest data");
createIndex(indexName);
ensureGreen();
for (int j = 0; j < randomIntBetween(1, 1000); j++) {
index(indexName, "_doc", Integer.toString(j), "foo", "bar" + j);
}
refresh();

logger.info("--> creating first snapshot");
createFullSnapshot(repositoryName, firstSnapshot);

logger.info("--> adding some more documents to test index");
for (int j = 0; j < randomIntBetween(100, 1000); ++j) {
final BulkRequest bulkRequest = new BulkRequest();
for (int i = 0; i < randomIntBetween(100, 1000); ++i) {
bulkRequest.add(new IndexRequest(indexName).source("foo" + j, "bar" + i));
}
client().bulk(bulkRequest).get();
}
refresh();

logger.info("--> creating second snapshot");
createFullSnapshot(repositoryName, secondSnapshot);

// Make repository to throw exception when trying to delete stale snapshot shard blobs
String clusterManagerNode = internalCluster().getMasterName();
((MockRepository) internalCluster().getInstance(RepositoriesService.class, clusterManagerNode).repository("test-repo"))
.setThrowExceptionWhileDelete(true);

// Delete second snapshot
logger.info("--> delete second snapshot");
client.admin().cluster().prepareDeleteSnapshot(repositoryName, secondSnapshot).get();

// Make repository to work normally
((MockRepository) internalCluster().getInstance(RepositoriesService.class, clusterManagerNode).repository("test-repo"))
.setThrowExceptionWhileDelete(false);

// This snapshot should delete last snapshot's residual stale shard blobs as well
logger.info("--> delete first snapshot");
client.admin().cluster().prepareDeleteSnapshot(repositoryName, firstSnapshot).get();

// Expect two files to remain in the repository:
// (1) index-(N+1)
// (2) index-latest
assertFileCount(repositoryPath, 2);

logger.info("--> done");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
Expand Down Expand Up @@ -235,6 +236,17 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
Setting.Property.NodeScope
);

/**
* Setting to set batch size of stale snapshot shard blobs that will be deleted by snapshot workers as part of snapshot deletion.
* For optimal performance the value of the setting should be equal to or close to repository's max # of keys that can be deleted in single operation
* Most cloud storage support upto 1000 key(s) deletion in single operation, thus keeping default value to be 1000.
*/
public static final Setting<Integer> MAX_SNAPSHOT_SHARD_BLOB_DELETE_BATCH_SIZE = Setting.intSetting(
"max_snapshot_shard_blob_delete_batch_size",
1000, // the default maximum batch size of stale snapshot shard blobs deletion
Setting.Property.NodeScope
);

/**
* Setting to disable writing the {@code index.latest} blob which enables the contents of this repository to be used with a
* url-repository.
Expand All @@ -243,6 +255,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp

protected final boolean supportURLRepo;

private final int maxShardBlobDeleteBatch;

private final boolean compress;

private final boolean cacheRepositoryData;
Expand Down Expand Up @@ -358,6 +372,7 @@ protected BlobStoreRepository(
readOnly = metadata.settings().getAsBoolean("readonly", false);
cacheRepositoryData = CACHE_REPOSITORY_DATA.get(metadata.settings());
bufferSize = Math.toIntExact(BUFFER_SIZE_SETTING.get(metadata.settings()).getBytes());
maxShardBlobDeleteBatch = MAX_SNAPSHOT_SHARD_BLOB_DELETE_BATCH_SIZE.get(metadata.settings());
}

@Override
Expand Down Expand Up @@ -902,15 +917,57 @@ private void asyncCleanupUnlinkedShardLevelBlobs(
listener.onResponse(null);
return;
}
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(listener, l -> {
try {
deleteFromContainer(blobContainer(), filesToDelete);
l.onResponse(null);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("{} Failed to delete some blobs during snapshot delete", snapshotIds), e);
throw e;

try {
AtomicInteger counter = new AtomicInteger();
Collection<List<String>> subList = filesToDelete.stream()
.collect(Collectors.groupingBy(it -> counter.getAndIncrement() / maxShardBlobDeleteBatch))
.values();
final BlockingQueue<List<String>> staleFilesToDeleteInBatch = new LinkedBlockingQueue<>(subList);

final GroupedActionListener<Void> groupedListener = new GroupedActionListener<>(
ActionListener.wrap(r -> { listener.onResponse(null); }, listener::onFailure),
staleFilesToDeleteInBatch.size()
);

// Start as many workers as fit into the snapshot pool at once at the most
final int workers = Math.min(threadPool.info(ThreadPool.Names.SNAPSHOT).getMax(), staleFilesToDeleteInBatch.size());
for (int i = 0; i < workers; ++i) {
executeStaleShardDelete(staleFilesToDeleteInBatch, groupedListener);
}
}));

} catch (Exception e) {
// TODO: We shouldn't be blanket catching and suppressing all exceptions here and instead handle them safely upstream.
// Currently this catch exists as a stop gap solution to tackle unexpected runtime exceptions from implementations
// bubbling up and breaking the snapshot functionality.
assert false : e;
logger.warn(new ParameterizedMessage("[{}] Exception during cleanup of stale shard blobs", snapshotIds), e);
listener.onFailure(e);
}
}

private void executeStaleShardDelete(BlockingQueue<List<String>> staleFilesToDeleteInBatch, GroupedActionListener<Void> listener)
throws InterruptedException {
List<String> filesToDelete = staleFilesToDeleteInBatch.poll(0L, TimeUnit.MILLISECONDS);
if (filesToDelete != null) {
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(listener, l -> {
try {
deleteFromContainer(blobContainer(), filesToDelete);
l.onResponse(null);
} catch (Exception e) {
logger.warn(
() -> new ParameterizedMessage(
"[{}] Failed to delete following blobs during snapshot delete : {}",
metadata.name(),
filesToDelete
),
e
);
l.onFailure(e);
}
executeStaleShardDelete(staleFilesToDeleteInBatch, listener);
}));
}
}

// updates the shard state metadata for shards of a snapshot that is to be deleted. Also computes the files to be cleaned up.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,9 @@ public void deleteBlobsIgnoringIfNotExists(List<String> blobNames) throws IOExce
if (blockOnDeleteIndexN && blobNames.stream().anyMatch(name -> name.startsWith(BlobStoreRepository.INDEX_FILE_PREFIX))) {
blockExecutionAndMaybeWait("index-{N}");
}
if (setThrowExceptionWhileDelete) {
throw new IOException("Random exception");
}
super.deleteBlobsIgnoringIfNotExists(blobNames);
}

Expand Down

0 comments on commit d34cc53

Please sign in to comment.