Skip to content

Commit

Permalink
Async Snapshot Repository Deletes (#40144) (#41571)
Browse files Browse the repository at this point in the history
Motivated by slow snapshot deletes reported in e.g. #39656 and the fact that these likely are a contributing factor to repositories accumulating stale files over time when deletes fail to finish in time and are interrupted before they can complete.

* Makes snapshot deletion async and parallelizes some steps of the delete process that can be safely run concurrently via the snapshot thread poll
   * I did not take the biggest potential speedup step here and parallelize the shard file deletion because that's probably better handled by moving to bulk deletes where possible (and can still be parallelized via the snapshot pool where it isn't). Also, I wanted to keep the size of the PR manageable.
* See #39656 (comment)
* Also, as a side effect this gives the `SnapshotResiliencyTests` a little more coverage for master failover scenarios (since parallel access to a blob store repository during deletes is now possible since a delete isn't a single task anymore).
* By adding a `ThreadPool` reference to the repository this also lays the groundwork to parallelizing shard snapshot uploads to improve the situation reported in #39657
  • Loading branch information
original-brownbear authored Apr 26, 2019
1 parent 4127d68 commit aad3312
Show file tree
Hide file tree
Showing 39 changed files with 248 additions and 203 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.plugins.RepositoryPlugin;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.url.URLRepository;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.Arrays;
import java.util.Collections;
Expand All @@ -44,7 +45,9 @@ public List<Setting<?>> getSettings() {
}

@Override
public Map<String, Repository.Factory> getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) {
return Collections.singletonMap(URLRepository.TYPE, metadata -> new URLRepository(metadata, env, namedXContentRegistry));
public Map<String, Repository.Factory> getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry,
ThreadPool threadPool) {
return Collections.singletonMap(URLRepository.TYPE,
metadata -> new URLRepository(metadata, env, namedXContentRegistry, threadPool));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.elasticsearch.env.Environment;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import org.elasticsearch.threadpool.ThreadPool;

import java.net.MalformedURLException;
import java.net.URISyntaxException;
Expand Down Expand Up @@ -82,8 +83,8 @@ public class URLRepository extends BlobStoreRepository {
* Constructs a read-only URL-based repository
*/
public URLRepository(RepositoryMetaData metadata, Environment environment,
NamedXContentRegistry namedXContentRegistry) {
super(metadata, environment.settings(), false, namedXContentRegistry);
NamedXContentRegistry namedXContentRegistry, ThreadPool threadPool) {
super(metadata, environment.settings(), false, namedXContentRegistry, threadPool);

if (URL_SETTING.exists(metadata.settings()) == false && REPOSITORIES_URL_SETTING.exists(environment.settings()) == false) {
throw new RepositoryException(metadata.name(), "missing url");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.env.TestEnvironment;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.IOException;
import java.nio.file.Path;
Expand All @@ -34,12 +35,13 @@
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.mockito.Mockito.mock;

public class URLRepositoryTests extends ESTestCase {

private URLRepository createRepository(Settings baseSettings, RepositoryMetaData repositoryMetaData) {
return new URLRepository(repositoryMetaData, TestEnvironment.newEnvironment(baseSettings),
new NamedXContentRegistry(Collections.emptyList())) {
new NamedXContentRegistry(Collections.emptyList()), mock(ThreadPool.class)) {
@Override
protected void assertSnapshotOrGenericThread() {
// eliminate thread name check as we create repo manually on test/main threads
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import org.elasticsearch.snapshots.SnapshotCreationException;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.threadpool.ThreadPool;

import java.net.URISyntaxException;
import java.util.List;
Expand Down Expand Up @@ -86,8 +87,8 @@ public static final class Repository {
private final boolean readonly;

public AzureRepository(RepositoryMetaData metadata, Environment environment, NamedXContentRegistry namedXContentRegistry,
AzureStorageService storageService) {
super(metadata, environment.settings(), Repository.COMPRESS_SETTING.get(metadata.settings()), namedXContentRegistry);
AzureStorageService storageService, ThreadPool threadPool) {
super(metadata, environment.settings(), Repository.COMPRESS_SETTING.get(metadata.settings()), namedXContentRegistry, threadPool);
this.chunkSize = Repository.CHUNK_SIZE_SETTING.get(metadata.settings());
this.environment = environment;
this.storageService = storageService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import org.elasticsearch.plugins.ReloadablePlugin;
import org.elasticsearch.plugins.RepositoryPlugin;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
Expand All @@ -47,9 +49,10 @@ public AzureRepositoryPlugin(Settings settings) {
}

@Override
public Map<String, Repository.Factory> getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) {
public Map<String, Repository.Factory> getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry,
ThreadPool threadPool) {
return Collections.singletonMap(AzureRepository.TYPE,
(metadata) -> new AzureRepository(metadata, env, namedXContentRegistry, azureStoreService));
(metadata) -> new AzureRepository(metadata, env, namedXContentRegistry, azureStoreService, threadPool));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.TestEnvironment;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;

import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
Expand All @@ -42,7 +43,8 @@ private AzureRepository azureRepository(Settings settings) {
.put(settings)
.build();
final AzureRepository azureRepository = new AzureRepository(new RepositoryMetaData("foo", "azure", internalSettings),
TestEnvironment.newEnvironment(internalSettings), NamedXContentRegistry.EMPTY, mock(AzureStorageService.class));
TestEnvironment.newEnvironment(internalSettings), NamedXContentRegistry.EMPTY, mock(AzureStorageService.class),
mock(ThreadPool.class));
assertThat(azureRepository.getBlobStore(), is(nullValue()));
return azureRepository;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.elasticsearch.plugins.ReloadablePlugin;
import org.elasticsearch.plugins.RepositoryPlugin;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
Expand All @@ -49,9 +51,10 @@ protected GoogleCloudStorageService createStorageService() {
}

@Override
public Map<String, Repository.Factory> getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) {
public Map<String, Repository.Factory> getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry,
ThreadPool threadPool) {
return Collections.singletonMap(GoogleCloudStorageRepository.TYPE,
(metadata) -> new GoogleCloudStorageRepository(metadata, env, namedXContentRegistry, this.storageService));
metadata -> new GoogleCloudStorageRepository(metadata, env, namedXContentRegistry, this.storageService, threadPool));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.Environment;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.function.Function;

Expand Down Expand Up @@ -59,7 +59,6 @@ class GoogleCloudStorageRepository extends BlobStoreRepository {
byteSizeSetting("chunk_size", MAX_CHUNK_SIZE, MIN_CHUNK_SIZE, MAX_CHUNK_SIZE, Property.NodeScope, Property.Dynamic);
static final Setting<String> CLIENT_NAME = new Setting<>("client", "default", Function.identity());

private final Settings settings;
private final GoogleCloudStorageService storageService;
private final BlobPath basePath;
private final ByteSizeValue chunkSize;
Expand All @@ -68,9 +67,8 @@ class GoogleCloudStorageRepository extends BlobStoreRepository {

GoogleCloudStorageRepository(RepositoryMetaData metadata, Environment environment,
NamedXContentRegistry namedXContentRegistry,
GoogleCloudStorageService storageService) {
super(metadata, environment.settings(), getSetting(COMPRESS, metadata), namedXContentRegistry);
this.settings = environment.settings();
GoogleCloudStorageService storageService, ThreadPool threadPool) {
super(metadata, environment.settings(), getSetting(COMPRESS, metadata), namedXContentRegistry, threadPool);
this.storageService = storageService;

String basePath = BASE_PATH.get(metadata.settings());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.RepositoryPlugin;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.threadpool.ThreadPool;

public final class HdfsPlugin extends Plugin implements RepositoryPlugin {

Expand Down Expand Up @@ -110,7 +111,8 @@ private static Void eagerInit() {
}

@Override
public Map<String, Repository.Factory> getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) {
return Collections.singletonMap("hdfs", (metadata) -> new HdfsRepository(metadata, env, namedXContentRegistry));
public Map<String, Repository.Factory> getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry,
ThreadPool threadPool) {
return Collections.singletonMap("hdfs", (metadata) -> new HdfsRepository(metadata, env, namedXContentRegistry, threadPool));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.Environment;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.IOException;
import java.io.UncheckedIOException;
Expand Down Expand Up @@ -67,8 +68,8 @@ public final class HdfsRepository extends BlobStoreRepository {
private static final ByteSizeValue DEFAULT_BUFFER_SIZE = new ByteSizeValue(100, ByteSizeUnit.KB);

public HdfsRepository(RepositoryMetaData metadata, Environment environment,
NamedXContentRegistry namedXContentRegistry) {
super(metadata, environment.settings(), metadata.settings().getAsBoolean("compress", false), namedXContentRegistry);
NamedXContentRegistry namedXContentRegistry, ThreadPool threadPool) {
super(metadata, environment.settings(), metadata.settings().getAsBoolean("compress", false), namedXContentRegistry, threadPool);

this.environment = environment;
this.chunkSize = metadata.settings().getAsBytesSize("chunk_size", null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.elasticsearch.monitor.jvm.JvmInfo;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.function.Function;

Expand Down Expand Up @@ -171,8 +172,8 @@ class S3Repository extends BlobStoreRepository {
S3Repository(final RepositoryMetaData metadata,
final Settings settings,
final NamedXContentRegistry namedXContentRegistry,
final S3Service service) {
super(metadata, settings, COMPRESS_SETTING.get(metadata.settings()), namedXContentRegistry);
final S3Service service, final ThreadPool threadPool) {
super(metadata, settings, COMPRESS_SETTING.get(metadata.settings()), namedXContentRegistry, threadPool);
this.service = service;

this.repositoryMetaData = metadata;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.elasticsearch.plugins.ReloadablePlugin;
import org.elasticsearch.plugins.RepositoryPlugin;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.IOException;
import java.security.AccessController;
Expand Down Expand Up @@ -77,13 +78,15 @@ public S3RepositoryPlugin(final Settings settings) {
// proxy method for testing
protected S3Repository createRepository(final RepositoryMetaData metadata,
final Settings settings,
final NamedXContentRegistry registry) {
return new S3Repository(metadata, settings, registry, service);
final NamedXContentRegistry registry, final ThreadPool threadPool) {
return new S3Repository(metadata, settings, registry, service, threadPool);
}

@Override
public Map<String, Repository.Factory> getRepositories(final Environment env, final NamedXContentRegistry registry) {
return Collections.singletonMap(S3Repository.TYPE, (metadata) -> createRepository(metadata, env.settings(), registry));
public Map<String, Repository.Factory> getRepositories(final Environment env, final NamedXContentRegistry registry,
final ThreadPool threadPool) {
return Collections.singletonMap(S3Repository.TYPE,
metadata -> createRepository(metadata, env.settings(), registry, threadPool));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,14 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.IOException;
import java.security.AccessController;
import java.security.PrivilegedAction;

import static org.hamcrest.Matchers.is;
import static org.mockito.Mockito.mock;

@SuppressForbidden(reason = "test fixture requires System.setProperty")
public class RepositoryCredentialsTests extends ESTestCase {
Expand All @@ -61,9 +63,9 @@ static final class ClientAndCredentials extends AmazonS3Wrapper {
}

static final class ProxyS3Service extends S3Service {

private static final Logger logger = LogManager.getLogger(ProxyS3Service.class);

@Override
AmazonS3 buildClient(final S3ClientSettings clientSettings) {
final AmazonS3 client = super.buildClient(clientSettings);
Expand All @@ -77,8 +79,9 @@ AmazonS3 buildClient(final S3ClientSettings clientSettings) {
}

@Override
protected S3Repository createRepository(RepositoryMetaData metadata, Settings settings, NamedXContentRegistry registry) {
return new S3Repository(metadata, settings, registry, service){
protected S3Repository createRepository(RepositoryMetaData metadata, Settings settings, NamedXContentRegistry registry,
ThreadPool threadPool) {
return new S3Repository(metadata, settings, registry, service, threadPool){
@Override
protected void assertSnapshotOrGenericThread() {
// eliminate thread name check as we create repo manually on test/main threads
Expand Down Expand Up @@ -106,7 +109,7 @@ public void testRepositoryCredentialsOverrideSecureCredentials() throws IOExcept
.put(S3Repository.ACCESS_KEY_SETTING.getKey(), "insecure_aws_key")
.put(S3Repository.SECRET_KEY_SETTING.getKey(), "insecure_aws_secret").build());
try (S3RepositoryPlugin s3Plugin = new ProxyS3RepositoryPlugin(settings);
S3Repository s3repo = createAndStartRepository(metadata, s3Plugin);
S3Repository s3repo = createAndStartRepository(metadata, s3Plugin, mock(ThreadPool.class));
AmazonS3Reference s3Ref = ((S3BlobStore) s3repo.blobStore()).clientReference()) {
final AWSCredentials credentials = ((ProxyS3RepositoryPlugin.ClientAndCredentials) s3Ref.client()).credentials.getCredentials();
assertThat(credentials.getAWSAccessKeyId(), is("insecure_aws_key"));
Expand All @@ -129,7 +132,7 @@ public void testRepositoryCredentialsOnly() throws IOException {
.put(S3Repository.SECRET_KEY_SETTING.getKey(), "insecure_aws_secret")
.build());
try (S3RepositoryPlugin s3Plugin = new ProxyS3RepositoryPlugin(Settings.EMPTY);
S3Repository s3repo = createAndStartRepository(metadata, s3Plugin);
S3Repository s3repo = createAndStartRepository(metadata, s3Plugin, mock(ThreadPool.class));
AmazonS3Reference s3Ref = ((S3BlobStore) s3repo.blobStore()).clientReference()) {
final AWSCredentials credentials = ((ProxyS3RepositoryPlugin.ClientAndCredentials) s3Ref.client()).credentials.getCredentials();
assertThat(credentials.getAWSAccessKeyId(), is("insecure_aws_key"));
Expand All @@ -144,8 +147,8 @@ public void testRepositoryCredentialsOnly() throws IOException {
+ " See the breaking changes documentation for the next major version.");
}

private S3Repository createAndStartRepository(RepositoryMetaData metadata, S3RepositoryPlugin s3Plugin) {
final S3Repository repository = s3Plugin.createRepository(metadata, Settings.EMPTY, NamedXContentRegistry.EMPTY);
private S3Repository createAndStartRepository(RepositoryMetaData metadata, S3RepositoryPlugin s3Plugin, ThreadPool threadPool) {
final S3Repository repository = s3Plugin.createRepository(metadata, Settings.EMPTY, NamedXContentRegistry.EMPTY, threadPool);
repository.start();
return repository;
}
Expand All @@ -168,7 +171,7 @@ public void testReinitSecureCredentials() throws IOException {
}
final RepositoryMetaData metadata = new RepositoryMetaData("dummy-repo", "mock", builder.build());
try (S3RepositoryPlugin s3Plugin = new ProxyS3RepositoryPlugin(settings);
S3Repository s3repo = createAndStartRepository(metadata, s3Plugin)) {
S3Repository s3repo = createAndStartRepository(metadata, s3Plugin, mock(ThreadPool.class))) {
try (AmazonS3Reference s3Ref = ((S3BlobStore) s3repo.blobStore()).clientReference()) {
final AWSCredentials credentials = ((ProxyS3RepositoryPlugin.ClientAndCredentials) s3Ref.client()).credentials
.getCredentials();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.elasticsearch.rest.RestResponse;
import org.elasticsearch.rest.action.admin.cluster.RestGetRepositoriesAction;
import org.elasticsearch.test.rest.FakeRestRequest;
import org.elasticsearch.threadpool.ThreadPool;
import org.junit.AfterClass;
import org.junit.BeforeClass;

Expand Down Expand Up @@ -114,14 +115,15 @@ public TestS3RepositoryPlugin(final Settings settings) {
}

@Override
public Map<String, Repository.Factory> getRepositories(final Environment env, final NamedXContentRegistry registry) {
public Map<String, Repository.Factory> getRepositories(final Environment env, final NamedXContentRegistry registry,
final ThreadPool threadPool) {
return Collections.singletonMap(S3Repository.TYPE,
(metadata) -> new S3Repository(metadata, env.settings(), registry, new S3Service() {
metadata -> new S3Repository(metadata, env.settings(), registry, new S3Service() {
@Override
AmazonS3 buildClient(S3ClientSettings clientSettings) {
return new MockAmazonS3(blobs, bucket, serverSideEncryption, cannedACL, storageClass);
}
}));
}, threadPool));
}
}

Expand Down
Loading

0 comments on commit aad3312

Please sign in to comment.