Skip to content

Commit

Permalink
Fix Creating NOOP Tasks on SNAPSHOT Pool (elastic#62152) (elastic#62157)
Browse files Browse the repository at this point in the history
Fixing a few spots where NOOP tasks on the snapshot pool were created needlessly.
Especially when it comes to mixed master+data nodes and concurrent snapshots these
hurt delete operation performance needlessly.
  • Loading branch information
original-brownbear authored Sep 9, 2020
1 parent b680d3f commit 6710104
Showing 1 changed file with 33 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -668,7 +668,7 @@ private void doDeleteShardSnapshots(Collection<SnapshotId> snapshotIds, long rep
// Run unreferenced blobs cleanup in parallel to shard-level snapshot deletion
final ActionListener<Void> afterCleanupsListener =
new GroupedActionListener<>(ActionListener.wrap(() -> listener.onResponse(updatedRepoData)), 2);
asyncCleanupUnlinkedRootAndIndicesBlobs(snapshotIds, foundIndices, rootBlobs, updatedRepoData, afterCleanupsListener);
cleanupUnlinkedRootAndIndicesBlobs(snapshotIds, foundIndices, rootBlobs, updatedRepoData, afterCleanupsListener);
asyncCleanupUnlinkedShardLevelBlobs(repositoryData, snapshotIds, writeShardMetaDataAndComputeDeletesStep.result(),
afterCleanupsListener);
}, listener::onFailure);
Expand All @@ -679,7 +679,7 @@ private void doDeleteShardSnapshots(Collection<SnapshotId> snapshotIds, long rep
// Run unreferenced blobs cleanup in parallel to shard-level snapshot deletion
final ActionListener<Void> afterCleanupsListener =
new GroupedActionListener<>(ActionListener.wrap(() -> listener.onResponse(newRepoData)), 2);
asyncCleanupUnlinkedRootAndIndicesBlobs(snapshotIds, foundIndices, rootBlobs, newRepoData, afterCleanupsListener);
cleanupUnlinkedRootAndIndicesBlobs(snapshotIds, foundIndices, rootBlobs, newRepoData, afterCleanupsListener);
final StepListener<Collection<ShardSnapshotMetaDeleteResult>> writeMetaAndComputeDeletesStep = new StepListener<>();
writeUpdatedShardMetaDataAndComputeDeletes(snapshotIds, repositoryData, false, writeMetaAndComputeDeletesStep);
writeMetaAndComputeDeletesStep.whenComplete(deleteResults ->
Expand All @@ -689,22 +689,25 @@ private void doDeleteShardSnapshots(Collection<SnapshotId> snapshotIds, long rep
}
}

private void asyncCleanupUnlinkedRootAndIndicesBlobs(Collection<SnapshotId> deletedSnapshots, Map<String, BlobContainer> foundIndices,
Map<String, BlobMetadata> rootBlobs, RepositoryData updatedRepoData,
ActionListener<Void> listener) {
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(
listener,
l -> cleanupStaleBlobs(deletedSnapshots, foundIndices, rootBlobs, updatedRepoData, ActionListener.map(l, ignored -> null))));
private void cleanupUnlinkedRootAndIndicesBlobs(Collection<SnapshotId> deletedSnapshots, Map<String, BlobContainer> foundIndices,
Map<String, BlobMetadata> rootBlobs, RepositoryData updatedRepoData,
ActionListener<Void> listener) {
cleanupStaleBlobs(deletedSnapshots, foundIndices, rootBlobs, updatedRepoData, ActionListener.map(listener, ignored -> null));
}

private void asyncCleanupUnlinkedShardLevelBlobs(RepositoryData oldRepositoryData, Collection<SnapshotId> snapshotIds,
Collection<ShardSnapshotMetaDeleteResult> deleteResults,
ActionListener<Void> listener) {
final List<String> filesToDelete = resolveFilesToDelete(oldRepositoryData, snapshotIds, deleteResults);
if (filesToDelete.isEmpty()) {
listener.onResponse(null);
return;
}
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(
listener,
l -> {
try {
deleteFromContainer(blobContainer(), resolveFilesToDelete(oldRepositoryData, snapshotIds, deleteResults));
deleteFromContainer(blobContainer(), filesToDelete);
l.onResponse(null);
} catch (Exception e) {
logger.warn(
Expand Down Expand Up @@ -848,13 +851,23 @@ private void cleanupStaleBlobs(Collection<SnapshotId> deletedSnapshots, Map<Stri
}, listener::onFailure), 2);

final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT);
executor.execute(ActionRunnable.supply(groupedListener, () -> {
List<String> deletedBlobs = cleanupStaleRootFiles(deletedSnapshots, staleRootBlobs(newRepoData, rootBlobs.keySet()));
return new DeleteResult(deletedBlobs.size(), deletedBlobs.stream().mapToLong(name -> rootBlobs.get(name).length()).sum());
}));
final List<String> staleRootBlobs = staleRootBlobs(newRepoData, rootBlobs.keySet());
if (staleRootBlobs.isEmpty()) {
groupedListener.onResponse(DeleteResult.ZERO);
} else {
executor.execute(ActionRunnable.supply(groupedListener, () -> {
List<String> deletedBlobs =
cleanupStaleRootFiles(newRepoData.getGenId() - 1, deletedSnapshots, staleRootBlobs);
return new DeleteResult(deletedBlobs.size(), deletedBlobs.stream().mapToLong(name -> rootBlobs.get(name).length()).sum());
}));
}

final Set<String> survivingIndexIds = newRepoData.getIndices().values().stream().map(IndexId::getId).collect(Collectors.toSet());
executor.execute(ActionRunnable.supply(groupedListener, () -> cleanupStaleIndices(foundIndices, survivingIndexIds)));
if (foundIndices.keySet().equals(survivingIndexIds)) {
groupedListener.onResponse(DeleteResult.ZERO);
} else {
executor.execute(ActionRunnable.supply(groupedListener, () -> cleanupStaleIndices(foundIndices, survivingIndexIds)));
}
}

/**
Expand Down Expand Up @@ -896,7 +909,7 @@ public void cleanup(long repositoryStateId, Version repositoryMetaVersion, Actio
}

// Finds all blobs directly under the repository root path that are not referenced by the current RepositoryData
private List<String> staleRootBlobs(RepositoryData repositoryData, Set<String> rootBlobNames) {
private static List<String> staleRootBlobs(RepositoryData repositoryData, Set<String> rootBlobNames) {
final Set<String> allSnapshotIds =
repositoryData.getSnapshotIds().stream().map(SnapshotId::getUUID).collect(Collectors.toSet());
return rootBlobNames.stream().filter(
Expand Down Expand Up @@ -925,7 +938,8 @@ private List<String> staleRootBlobs(RepositoryData repositoryData, Set<String> r
).collect(Collectors.toList());
}

private List<String> cleanupStaleRootFiles(Collection<SnapshotId> deletedSnapshots, List<String> blobsToDelete) {
private List<String> cleanupStaleRootFiles(long previousGeneration, Collection<SnapshotId> deletedSnapshots,
List<String> blobsToDelete) {
if (blobsToDelete.isEmpty()) {
return blobsToDelete;
}
Expand All @@ -936,7 +950,8 @@ private List<String> cleanupStaleRootFiles(Collection<SnapshotId> deletedSnapsho
// delete would also log a confusing INFO message about "stale blobs".
final Set<String> blobNamesToIgnore = deletedSnapshots.stream().flatMap(
snapshotId -> Stream.of(GLOBAL_METADATA_FORMAT.blobName(snapshotId.getUUID()),
SNAPSHOT_FORMAT.blobName(snapshotId.getUUID()))).collect(Collectors.toSet());
SNAPSHOT_FORMAT.blobName(snapshotId.getUUID()), INDEX_FILE_PREFIX + previousGeneration))
.collect(Collectors.toSet());
final List<String> blobsToLog = blobsToDelete.stream().filter(b -> blobNamesToIgnore.contains(b) == false)
.collect(Collectors.toList());
if (blobsToLog.isEmpty() == false) {
Expand Down Expand Up @@ -1011,7 +1026,7 @@ public void finalizeSnapshot(final ShardGenerations shardGenerations,
final boolean writeIndexGens = SnapshotsService.useIndexGenerations(repositoryMetaVersion);

final StepListener<RepositoryData> repoDataListener = new StepListener<>();
executor.execute(ActionRunnable.wrap(repoDataListener, this::getRepositoryData));
getRepositoryData(repoDataListener);
repoDataListener.whenComplete(existingRepositoryData -> {

final Map<IndexId, String> indexMetas;
Expand Down

0 comments on commit 6710104

Please sign in to comment.