From 8e9bc76ac5263bfe198a3e7104462d689213dda6 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Fri, 29 Nov 2019 10:14:53 +0100 Subject: [PATCH] Make BlobStoreRepository Aware of ClusterState (#49639) This is a preliminary to #49060. It does not introduce any substantial behavior change to how the blob store repository operates. What it does is to add all the infrastructure changes around passing the cluster service to the blob store, associated test changes and a best effort approach to tracking the latest repository generation on all nodes from cluster state updates. This brings a slight improvement to the consistency by which non-master nodes (or master directly after a failover) will be able to determine the latest repository generation. It does not however do any tricky checks for the situation after a repository operation (create, delete or cleanup) that could theoretically be used to get even greater accuracy to keep this change simple. This change does not in any way alter the behavior of the blobstore repository other than adding a better "guess" for the value of the latest repo generation and is mainly intended to isolate the actual logical change to how the repository operates in #49060 --- .../repository/url/URLRepositoryPlugin.java | 6 +-- .../repositories/url/URLRepository.java | 6 +-- .../repositories/url/URLRepositoryTests.java | 5 +- .../repositories/azure/AzureRepository.java | 6 +-- .../azure/AzureRepositoryPlugin.java | 6 +-- .../azure/AzureRepositorySettingsTests.java | 5 +- .../gcs/GoogleCloudStoragePlugin.java | 6 +-- .../gcs/GoogleCloudStorageRepository.java | 6 +-- ...eCloudStorageBlobStoreRepositoryTests.java | 7 +-- .../repositories/hdfs/HdfsPlugin.java | 6 +-- .../repositories/hdfs/HdfsRepository.java | 6 +-- .../repositories/s3/S3Repository.java | 6 +-- .../repositories/s3/S3RepositoryPlugin.java | 10 ++-- .../s3/RepositoryCredentialsTests.java | 6 +-- .../s3/S3BlobStoreRepositoryTests.java | 7 +-- .../repositories/s3/S3RepositoryTests.java | 5 +- .../cluster/RepositoryCleanupInProgress.java | 9 +++- .../cluster/SnapshotDeletionsInProgress.java | 20 +++++--- .../cluster/SnapshotsInProgress.java | 11 +++- .../plugins/RepositoryPlugin.java | 6 +-- .../repositories/FilterRepository.java | 6 +++ .../repositories/RepositoriesModule.java | 6 +-- .../repositories/RepositoriesService.java | 10 +++- .../repositories/Repository.java | 10 ++++ .../repositories/RepositoryOperation.java | 35 +++++++++++++ .../blobstore/BlobStoreRepository.java | 50 +++++++++++++++++-- .../repositories/fs/FsRepository.java | 6 +-- .../snapshots/SnapshotsService.java | 10 ++-- .../repositories/RepositoriesModuleTests.java | 26 +++++----- .../RepositoriesServiceTests.java | 5 ++ .../BlobStoreRepositoryRestoreTests.java | 3 +- .../blobstore/BlobStoreRepositoryTests.java | 6 +-- .../repositories/fs/FsRepositoryTests.java | 4 +- ...etadataLoadingDuringSnapshotRestoreIT.java | 10 ++-- .../snapshots/SnapshotResiliencyTests.java | 22 +++----- .../MockEventuallyConsistentRepository.java | 6 +-- ...ckEventuallyConsistentRepositoryTests.java | 21 +++----- .../index/shard/RestoreOnlyRepository.java | 5 ++ .../blobstore/BlobStoreTestUtil.java | 50 +++++++++++++++++++ .../snapshots/mockstore/MockRepository.java | 11 ++-- .../java/org/elasticsearch/xpack/ccr/Ccr.java | 5 +- .../xpack/ccr/repository/CcrRepository.java | 5 ++ .../elasticsearch/xpack/core/XPackPlugin.java | 2 +- .../snapshots/SourceOnlySnapshotIT.java | 4 +- .../SourceOnlySnapshotShardTests.java | 5 +- .../core/LocalStateCompositeXPackPlugin.java | 14 +++--- 46 files changed, 329 insertions(+), 153 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/repositories/RepositoryOperation.java diff --git a/modules/repository-url/src/main/java/org/elasticsearch/plugin/repository/url/URLRepositoryPlugin.java b/modules/repository-url/src/main/java/org/elasticsearch/plugin/repository/url/URLRepositoryPlugin.java index 6e88f0e0deb54..4c3fa615937e7 100644 --- a/modules/repository-url/src/main/java/org/elasticsearch/plugin/repository/url/URLRepositoryPlugin.java +++ b/modules/repository-url/src/main/java/org/elasticsearch/plugin/repository/url/URLRepositoryPlugin.java @@ -19,6 +19,7 @@ package org.elasticsearch.plugin.repository.url; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.env.Environment; @@ -26,7 +27,6 @@ import org.elasticsearch.plugins.RepositoryPlugin; import org.elasticsearch.repositories.Repository; import org.elasticsearch.repositories.url.URLRepository; -import org.elasticsearch.threadpool.ThreadPool; import java.util.Arrays; import java.util.Collections; @@ -46,8 +46,8 @@ public List> getSettings() { @Override public Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry, - ThreadPool threadPool) { + ClusterService clusterService) { return Collections.singletonMap(URLRepository.TYPE, - metadata -> new URLRepository(metadata, env, namedXContentRegistry, threadPool)); + metadata -> new URLRepository(metadata, env, namedXContentRegistry, clusterService)); } } diff --git a/modules/repository-url/src/main/java/org/elasticsearch/repositories/url/URLRepository.java b/modules/repository-url/src/main/java/org/elasticsearch/repositories/url/URLRepository.java index 6e759756a7289..8fbe4752dc83c 100644 --- a/modules/repository-url/src/main/java/org/elasticsearch/repositories/url/URLRepository.java +++ b/modules/repository-url/src/main/java/org/elasticsearch/repositories/url/URLRepository.java @@ -22,6 +22,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.cluster.metadata.RepositoryMetaData; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.BlobStore; @@ -33,7 +34,6 @@ import org.elasticsearch.env.Environment; import org.elasticsearch.repositories.RepositoryException; import org.elasticsearch.repositories.blobstore.BlobStoreRepository; -import org.elasticsearch.threadpool.ThreadPool; import java.net.MalformedURLException; import java.net.URISyntaxException; @@ -81,8 +81,8 @@ public class URLRepository extends BlobStoreRepository { * Constructs a read-only URL-based repository */ public URLRepository(RepositoryMetaData metadata, Environment environment, - NamedXContentRegistry namedXContentRegistry, ThreadPool threadPool) { - super(metadata, namedXContentRegistry, threadPool, BlobPath.cleanPath()); + NamedXContentRegistry namedXContentRegistry, ClusterService clusterService) { + super(metadata, namedXContentRegistry, clusterService, BlobPath.cleanPath()); if (URL_SETTING.exists(metadata.settings()) == false && REPOSITORIES_URL_SETTING.exists(environment.settings()) == false) { throw new RepositoryException(metadata.name(), "missing url"); diff --git a/modules/repository-url/src/test/java/org/elasticsearch/repositories/url/URLRepositoryTests.java b/modules/repository-url/src/test/java/org/elasticsearch/repositories/url/URLRepositoryTests.java index 96a82ee0b9d24..823571462d243 100644 --- a/modules/repository-url/src/test/java/org/elasticsearch/repositories/url/URLRepositoryTests.java +++ b/modules/repository-url/src/test/java/org/elasticsearch/repositories/url/URLRepositoryTests.java @@ -25,8 +25,8 @@ import org.elasticsearch.env.Environment; import org.elasticsearch.env.TestEnvironment; import org.elasticsearch.repositories.RepositoryException; +import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; import java.nio.file.Path; @@ -35,13 +35,12 @@ import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.not; import static org.hamcrest.CoreMatchers.nullValue; -import static org.mockito.Mockito.mock; public class URLRepositoryTests extends ESTestCase { private URLRepository createRepository(Settings baseSettings, RepositoryMetaData repositoryMetaData) { return new URLRepository(repositoryMetaData, TestEnvironment.newEnvironment(baseSettings), - new NamedXContentRegistry(Collections.emptyList()), mock(ThreadPool.class)) { + new NamedXContentRegistry(Collections.emptyList()), BlobStoreTestUtil.mockClusterService()) { @Override protected void assertSnapshotOrGenericThread() { // eliminate thread name check as we create repo manually on test/main threads diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepository.java b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepository.java index b5c6ed70ad0d2..7b7a9108c1ef5 100644 --- a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepository.java +++ b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepository.java @@ -24,6 +24,7 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.cluster.metadata.RepositoryMetaData; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.BlobStore; @@ -32,7 +33,6 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.repositories.blobstore.BlobStoreRepository; -import org.elasticsearch.threadpool.ThreadPool; import java.util.Locale; import java.util.function.Function; @@ -79,8 +79,8 @@ public AzureRepository( final RepositoryMetaData metadata, final NamedXContentRegistry namedXContentRegistry, final AzureStorageService storageService, - final ThreadPool threadPool) { - super(metadata, namedXContentRegistry, threadPool, buildBasePath(metadata)); + final ClusterService clusterService) { + super(metadata, namedXContentRegistry, clusterService, buildBasePath(metadata)); this.chunkSize = Repository.CHUNK_SIZE_SETTING.get(metadata.settings()); this.storageService = storageService; diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepositoryPlugin.java b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepositoryPlugin.java index 8815e738f9cdd..d98a7c3cbd717 100644 --- a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepositoryPlugin.java +++ b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepositoryPlugin.java @@ -19,6 +19,7 @@ package org.elasticsearch.repositories.azure; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.SettingsException; @@ -31,7 +32,6 @@ import org.elasticsearch.repositories.Repository; import org.elasticsearch.threadpool.ExecutorBuilder; import org.elasticsearch.threadpool.ScalingExecutorBuilder; -import org.elasticsearch.threadpool.ThreadPool; import java.util.Arrays; import java.util.Collections; @@ -60,9 +60,9 @@ AzureStorageService createAzureStoreService(final Settings settings) { @Override public Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry, - ThreadPool threadPool) { + ClusterService clusterService) { return Collections.singletonMap(AzureRepository.TYPE, - (metadata) -> new AzureRepository(metadata, namedXContentRegistry, azureStoreService, threadPool)); + (metadata) -> new AzureRepository(metadata, namedXContentRegistry, azureStoreService, clusterService)); } @Override diff --git a/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureRepositorySettingsTests.java b/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureRepositorySettingsTests.java index 341a1d1436deb..6f5373dab98cf 100644 --- a/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureRepositorySettingsTests.java +++ b/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureRepositorySettingsTests.java @@ -26,8 +26,8 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.env.Environment; +import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.threadpool.ThreadPool; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.nullValue; @@ -42,8 +42,7 @@ private AzureRepository azureRepository(Settings settings) { .put(settings) .build(); final AzureRepository azureRepository = new AzureRepository(new RepositoryMetaData("foo", "azure", internalSettings), - NamedXContentRegistry.EMPTY, mock(AzureStorageService.class), - mock(ThreadPool.class)); + NamedXContentRegistry.EMPTY, mock(AzureStorageService.class), BlobStoreTestUtil.mockClusterService()); assertThat(azureRepository.getBlobStore(), is(nullValue())); return azureRepository; } diff --git a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStoragePlugin.java b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStoragePlugin.java index 70c4dcf3a9889..c4a065270be69 100644 --- a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStoragePlugin.java +++ b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStoragePlugin.java @@ -19,6 +19,7 @@ package org.elasticsearch.repositories.gcs; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.NamedXContentRegistry; @@ -27,7 +28,6 @@ import org.elasticsearch.plugins.ReloadablePlugin; import org.elasticsearch.plugins.RepositoryPlugin; import org.elasticsearch.repositories.Repository; -import org.elasticsearch.threadpool.ThreadPool; import java.util.Arrays; import java.util.Collections; @@ -52,9 +52,9 @@ protected GoogleCloudStorageService createStorageService() { @Override public Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry, - ThreadPool threadPool) { + ClusterService clusterService) { return Collections.singletonMap(GoogleCloudStorageRepository.TYPE, - metadata -> new GoogleCloudStorageRepository(metadata, namedXContentRegistry, this.storageService, threadPool)); + metadata -> new GoogleCloudStorageRepository(metadata, namedXContentRegistry, this.storageService, clusterService)); } @Override diff --git a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRepository.java b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRepository.java index 4b17fd6bef3ea..852b147346e06 100644 --- a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRepository.java +++ b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRepository.java @@ -22,6 +22,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.cluster.metadata.RepositoryMetaData; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.settings.Setting; @@ -30,7 +31,6 @@ import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.repositories.RepositoryException; import org.elasticsearch.repositories.blobstore.BlobStoreRepository; -import org.elasticsearch.threadpool.ThreadPool; import java.util.function.Function; @@ -64,8 +64,8 @@ class GoogleCloudStorageRepository extends BlobStoreRepository { final RepositoryMetaData metadata, final NamedXContentRegistry namedXContentRegistry, final GoogleCloudStorageService storageService, - final ThreadPool threadPool) { - super(metadata, namedXContentRegistry, threadPool, buildBasePath(metadata)); + final ClusterService clusterService) { + super(metadata, namedXContentRegistry, clusterService, buildBasePath(metadata)); this.storageService = storageService; this.chunkSize = getSetting(CHUNK_SIZE, metadata); diff --git a/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java b/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java index ee0b59eb9d365..7a2c3d780123a 100644 --- a/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java +++ b/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java @@ -27,6 +27,7 @@ import fixture.gcs.FakeOAuth2HttpHandler; import fixture.gcs.GoogleCloudStorageHttpHandler; import org.elasticsearch.cluster.metadata.RepositoryMetaData; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.settings.MockSecureSettings; @@ -38,7 +39,6 @@ import org.elasticsearch.plugins.Plugin; import org.elasticsearch.repositories.Repository; import org.elasticsearch.repositories.blobstore.ESMockAPIBasedRepositoryIntegTestCase; -import org.elasticsearch.threadpool.ThreadPool; import org.threeten.bp.Duration; import java.io.IOException; @@ -170,9 +170,10 @@ StorageOptions createStorageOptions(final GoogleCloudStorageClientSettings clien } @Override - public Map getRepositories(Environment env, NamedXContentRegistry registry, ThreadPool threadPool) { + public Map getRepositories(Environment env, NamedXContentRegistry registry, + ClusterService clusterService) { return Collections.singletonMap(GoogleCloudStorageRepository.TYPE, - metadata -> new GoogleCloudStorageRepository(metadata, registry, this.storageService, threadPool) { + metadata -> new GoogleCloudStorageRepository(metadata, registry, this.storageService, clusterService) { @Override protected GoogleCloudStorageBlobStore createBlobStore() { return new GoogleCloudStorageBlobStore("bucket", "test", storageService) { diff --git a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsPlugin.java b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsPlugin.java index a6dc5fe7db140..1d93aed9cc308 100644 --- a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsPlugin.java +++ b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsPlugin.java @@ -30,13 +30,13 @@ import org.apache.hadoop.security.KerberosInfo; import org.apache.hadoop.security.SecurityUtil; import org.elasticsearch.SpecialPermission; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.env.Environment; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.RepositoryPlugin; import org.elasticsearch.repositories.Repository; -import org.elasticsearch.threadpool.ThreadPool; public final class HdfsPlugin extends Plugin implements RepositoryPlugin { @@ -112,7 +112,7 @@ private static Void eagerInit() { @Override public Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry, - ThreadPool threadPool) { - return Collections.singletonMap("hdfs", (metadata) -> new HdfsRepository(metadata, env, namedXContentRegistry, threadPool)); + ClusterService clusterService) { + return Collections.singletonMap("hdfs", (metadata) -> new HdfsRepository(metadata, env, namedXContentRegistry, clusterService)); } } diff --git a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsRepository.java b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsRepository.java index 72430bcd36631..776c97422e5ee 100644 --- a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsRepository.java +++ b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsRepository.java @@ -31,6 +31,7 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.SpecialPermission; import org.elasticsearch.cluster.metadata.RepositoryMetaData; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.blobstore.BlobPath; @@ -40,7 +41,6 @@ import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.env.Environment; import org.elasticsearch.repositories.blobstore.BlobStoreRepository; -import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; import java.io.UncheckedIOException; @@ -67,8 +67,8 @@ public final class HdfsRepository extends BlobStoreRepository { private static final ByteSizeValue DEFAULT_BUFFER_SIZE = new ByteSizeValue(100, ByteSizeUnit.KB); public HdfsRepository(RepositoryMetaData metadata, Environment environment, - NamedXContentRegistry namedXContentRegistry, ThreadPool threadPool) { - super(metadata, namedXContentRegistry, threadPool, BlobPath.cleanPath()); + NamedXContentRegistry namedXContentRegistry, ClusterService clusterService) { + super(metadata, namedXContentRegistry, clusterService, BlobPath.cleanPath()); this.environment = environment; this.chunkSize = metadata.settings().getAsBytesSize("chunk_size", null); diff --git a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java index af895758723f5..db6968ea71059 100644 --- a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java +++ b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java @@ -22,6 +22,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.cluster.metadata.RepositoryMetaData; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.BlobStore; @@ -32,7 +33,6 @@ import org.elasticsearch.monitor.jvm.JvmInfo; import org.elasticsearch.repositories.RepositoryException; import org.elasticsearch.repositories.blobstore.BlobStoreRepository; -import org.elasticsearch.threadpool.ThreadPool; import java.util.function.Function; @@ -152,8 +152,8 @@ class S3Repository extends BlobStoreRepository { final RepositoryMetaData metadata, final NamedXContentRegistry namedXContentRegistry, final S3Service service, - final ThreadPool threadPool) { - super(metadata, namedXContentRegistry, threadPool, buildBasePath(metadata)); + final ClusterService clusterService) { + super(metadata, namedXContentRegistry, clusterService, buildBasePath(metadata)); this.service = service; // Parse and validate the user's S3 Storage Class setting diff --git a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RepositoryPlugin.java b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RepositoryPlugin.java index 461eb9e9592cf..0661d40ec804f 100644 --- a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RepositoryPlugin.java +++ b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RepositoryPlugin.java @@ -22,6 +22,7 @@ import com.amazonaws.util.json.Jackson; import org.elasticsearch.SpecialPermission; import org.elasticsearch.cluster.metadata.RepositoryMetaData; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.NamedXContentRegistry; @@ -30,7 +31,6 @@ import org.elasticsearch.plugins.ReloadablePlugin; import org.elasticsearch.plugins.RepositoryPlugin; import org.elasticsearch.repositories.Repository; -import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; import java.security.AccessController; @@ -79,14 +79,14 @@ public S3RepositoryPlugin(final Settings settings) { protected S3Repository createRepository( final RepositoryMetaData metadata, final NamedXContentRegistry registry, - final ThreadPool threadPool) { - return new S3Repository(metadata, registry, service, threadPool); + final ClusterService clusterService) { + return new S3Repository(metadata, registry, service, clusterService); } @Override public Map getRepositories(final Environment env, final NamedXContentRegistry registry, - final ThreadPool threadPool) { - return Collections.singletonMap(S3Repository.TYPE, metadata -> createRepository(metadata, registry, threadPool)); + final ClusterService clusterService) { + return Collections.singletonMap(S3Repository.TYPE, metadata -> createRepository(metadata, registry, clusterService)); } @Override diff --git a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/RepositoryCredentialsTests.java b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/RepositoryCredentialsTests.java index f7d5ba021e25c..45a79af102df4 100644 --- a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/RepositoryCredentialsTests.java +++ b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/RepositoryCredentialsTests.java @@ -25,6 +25,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.cluster.metadata.RepositoryMetaData; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.MockSecureSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.NamedXContentRegistry; @@ -32,7 +33,6 @@ import org.elasticsearch.plugins.PluginsService; import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.test.ESSingleNodeTestCase; -import org.elasticsearch.threadpool.ThreadPool; import java.util.Collection; import java.util.List; @@ -146,8 +146,8 @@ public ProxyS3RepositoryPlugin(Settings settings) { @Override protected S3Repository createRepository(RepositoryMetaData metadata, - NamedXContentRegistry registry, ThreadPool threadPool) { - return new S3Repository(metadata, registry, service, threadPool) { + NamedXContentRegistry registry, ClusterService clusterService) { + return new S3Repository(metadata, registry, service, clusterService) { @Override protected void assertSnapshotOrGenericThread() { // eliminate thread name check as we create repo manually on test/main threads diff --git a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java index 16dd4cdc27e0b..3c26c99f8a528 100644 --- a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java +++ b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java @@ -23,6 +23,7 @@ import com.sun.net.httpserver.HttpHandler; import fixture.s3.S3HttpHandler; import org.elasticsearch.cluster.metadata.RepositoryMetaData; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobPath; @@ -35,7 +36,6 @@ import org.elasticsearch.plugins.Plugin; import org.elasticsearch.repositories.blobstore.ESMockAPIBasedRepositoryIntegTestCase; import org.elasticsearch.snapshots.mockstore.BlobStoreWrapper; -import org.elasticsearch.threadpool.ThreadPool; import java.util.ArrayList; import java.util.Collection; @@ -109,8 +109,9 @@ public List> getSettings() { } @Override - protected S3Repository createRepository(RepositoryMetaData metadata, NamedXContentRegistry registry, ThreadPool threadPool) { - return new S3Repository(metadata, registry, service, threadPool) { + protected S3Repository createRepository(RepositoryMetaData metadata, NamedXContentRegistry registry, + ClusterService clusterService) { + return new S3Repository(metadata, registry, service, clusterService) { @Override public BlobStore blobStore() { diff --git a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3RepositoryTests.java b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3RepositoryTests.java index 45177dbebbf48..3466fd89ebbf9 100644 --- a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3RepositoryTests.java +++ b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3RepositoryTests.java @@ -26,8 +26,8 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.repositories.RepositoryException; +import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.threadpool.ThreadPool; import org.hamcrest.Matchers; import java.util.Map; @@ -36,7 +36,6 @@ import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.nullValue; -import static org.mockito.Mockito.mock; public class S3RepositoryTests extends ESTestCase { @@ -120,7 +119,7 @@ public void testDefaultBufferSize() { } private S3Repository createS3Repo(RepositoryMetaData metadata) { - return new S3Repository(metadata, NamedXContentRegistry.EMPTY, new DummyS3Service(), mock(ThreadPool.class)) { + return new S3Repository(metadata, NamedXContentRegistry.EMPTY, new DummyS3Service(), BlobStoreTestUtil.mockClusterService()) { @Override protected void assertSnapshotOrGenericThread() { // eliminate thread name check as we create repo manually on test/main threads diff --git a/server/src/main/java/org/elasticsearch/cluster/RepositoryCleanupInProgress.java b/server/src/main/java/org/elasticsearch/cluster/RepositoryCleanupInProgress.java index f9be2d326980d..7c43c03ae6d40 100644 --- a/server/src/main/java/org/elasticsearch/cluster/RepositoryCleanupInProgress.java +++ b/server/src/main/java/org/elasticsearch/cluster/RepositoryCleanupInProgress.java @@ -24,6 +24,7 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.repositories.RepositoryOperation; import java.io.IOException; import java.util.Arrays; @@ -94,7 +95,7 @@ public Version getMinimalSupportedVersion() { return Version.V_7_4_0; } - public static final class Entry implements Writeable { + public static final class Entry implements Writeable, RepositoryOperation { private final String repository; @@ -110,6 +111,12 @@ public Entry(String repository, long repositoryStateId) { this.repositoryStateId = repositoryStateId; } + @Override + public long repositoryStateId() { + return repositoryStateId; + } + + @Override public String repository() { return repository; } diff --git a/server/src/main/java/org/elasticsearch/cluster/SnapshotDeletionsInProgress.java b/server/src/main/java/org/elasticsearch/cluster/SnapshotDeletionsInProgress.java index 2ac12d3e93922..9c25c5578f315 100644 --- a/server/src/main/java/org/elasticsearch/cluster/SnapshotDeletionsInProgress.java +++ b/server/src/main/java/org/elasticsearch/cluster/SnapshotDeletionsInProgress.java @@ -26,6 +26,7 @@ import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.repositories.RepositoryOperation; import org.elasticsearch.snapshots.Snapshot; import java.io.IOException; @@ -164,7 +165,7 @@ public String toString() { /** * A class representing a snapshot deletion request entry in the cluster state. */ - public static final class Entry implements Writeable { + public static final class Entry implements Writeable, RepositoryOperation { private final Snapshot snapshot; private final long startTime; private final long repositoryStateId; @@ -195,13 +196,6 @@ public long getStartTime() { return startTime; } - /** - * The repository state id at the time the snapshot deletion began. - */ - public long getRepositoryStateId() { - return repositoryStateId; - } - @Override public boolean equals(Object o) { if (this == o) { @@ -227,5 +221,15 @@ public void writeTo(StreamOutput out) throws IOException { out.writeVLong(startTime); out.writeLong(repositoryStateId); } + + @Override + public String repository() { + return snapshot.getRepository(); + } + + @Override + public long repositoryStateId() { + return repositoryStateId; + } } } diff --git a/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java b/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java index b40d771c64739..7ddecd8b2fdb3 100644 --- a/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java +++ b/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java @@ -34,6 +34,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.repositories.IndexId; +import org.elasticsearch.repositories.RepositoryOperation; import org.elasticsearch.snapshots.Snapshot; import org.elasticsearch.snapshots.SnapshotsService; @@ -81,7 +82,7 @@ public String toString() { return builder.append("]").toString(); } - public static class Entry implements ToXContent { + public static class Entry implements ToXContent, RepositoryOperation { private final State state; private final Snapshot snapshot; private final boolean includeGlobalState; @@ -153,6 +154,11 @@ public Entry(Entry entry, ImmutableOpenMap shards) this(entry, entry.state, shards, entry.failure); } + @Override + public String repository() { + return snapshot.getRepository(); + } + public Snapshot snapshot() { return this.snapshot; } @@ -189,7 +195,8 @@ public long startTime() { return startTime; } - public long getRepositoryStateId() { + @Override + public long repositoryStateId() { return repositoryStateId; } diff --git a/server/src/main/java/org/elasticsearch/plugins/RepositoryPlugin.java b/server/src/main/java/org/elasticsearch/plugins/RepositoryPlugin.java index ede5c5e3611f9..1ac61b27fd1ae 100644 --- a/server/src/main/java/org/elasticsearch/plugins/RepositoryPlugin.java +++ b/server/src/main/java/org/elasticsearch/plugins/RepositoryPlugin.java @@ -22,10 +22,10 @@ import java.util.Collections; import java.util.Map; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.env.Environment; import org.elasticsearch.repositories.Repository; -import org.elasticsearch.threadpool.ThreadPool; /** * An extension point for {@link Plugin} implementations to add custom snapshot repositories. @@ -41,7 +41,7 @@ public interface RepositoryPlugin { * the value is a factory to construct the {@link Repository} interface. */ default Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry, - ThreadPool threadPool) { + ClusterService clusterService) { return Collections.emptyMap(); } @@ -55,7 +55,7 @@ default Map getRepositories(Environment env, NamedXC * the value is a factory to construct the {@link Repository} interface. */ default Map getInternalRepositories(Environment env, NamedXContentRegistry namedXContentRegistry, - ThreadPool threadPool) { + ClusterService clusterService) { return Collections.emptyMap(); } } diff --git a/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java b/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java index 7f4d5ec5c1fda..76645834fb112 100644 --- a/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java @@ -20,6 +20,7 @@ import org.apache.lucene.index.IndexCommit; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.RepositoryMetaData; @@ -133,6 +134,11 @@ public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, In return in.getShardSnapshotStatus(snapshotId, indexId, shardId); } + @Override + public void updateState(ClusterState state) { + in.updateState(state); + } + @Override public Lifecycle.State lifecycleState() { return in.lifecycleState(); diff --git a/server/src/main/java/org/elasticsearch/repositories/RepositoriesModule.java b/server/src/main/java/org/elasticsearch/repositories/RepositoriesModule.java index 783e3ff78fc4d..f87aab460fcbc 100644 --- a/server/src/main/java/org/elasticsearch/repositories/RepositoriesModule.java +++ b/server/src/main/java/org/elasticsearch/repositories/RepositoriesModule.java @@ -43,10 +43,10 @@ public final class RepositoriesModule { public RepositoriesModule(Environment env, List repoPlugins, TransportService transportService, ClusterService clusterService, ThreadPool threadPool, NamedXContentRegistry namedXContentRegistry) { Map factories = new HashMap<>(); - factories.put(FsRepository.TYPE, metadata -> new FsRepository(metadata, env, namedXContentRegistry, threadPool)); + factories.put(FsRepository.TYPE, metadata -> new FsRepository(metadata, env, namedXContentRegistry, clusterService)); for (RepositoryPlugin repoPlugin : repoPlugins) { - Map newRepoTypes = repoPlugin.getRepositories(env, namedXContentRegistry, threadPool); + Map newRepoTypes = repoPlugin.getRepositories(env, namedXContentRegistry, clusterService); for (Map.Entry entry : newRepoTypes.entrySet()) { if (factories.put(entry.getKey(), entry.getValue()) != null) { throw new IllegalArgumentException("Repository type [" + entry.getKey() + "] is already registered"); @@ -56,7 +56,7 @@ public RepositoriesModule(Environment env, List repoPlugins, T Map internalFactories = new HashMap<>(); for (RepositoryPlugin repoPlugin : repoPlugins) { - Map newRepoTypes = repoPlugin.getInternalRepositories(env, namedXContentRegistry, threadPool); + Map newRepoTypes = repoPlugin.getInternalRepositories(env, namedXContentRegistry, clusterService); for (Map.Entry entry : newRepoTypes.entrySet()) { if (internalFactories.put(entry.getKey(), entry.getValue()) != null) { throw new IllegalArgumentException("Internal repository type [" + entry.getKey() + "] is already registered"); diff --git a/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java b/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java index 20f083dcff0fe..03b283c4aafe5 100644 --- a/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java +++ b/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java @@ -287,8 +287,9 @@ protected void doRun() { @Override public void applyClusterState(ClusterChangedEvent event) { try { + final ClusterState state = event.state(); RepositoriesMetaData oldMetaData = event.previousState().getMetaData().custom(RepositoriesMetaData.TYPE); - RepositoriesMetaData newMetaData = event.state().getMetaData().custom(RepositoriesMetaData.TYPE); + RepositoriesMetaData newMetaData = state.getMetaData().custom(RepositoriesMetaData.TYPE); // Check if repositories got changed if ((oldMetaData == null && newMetaData == null) || (oldMetaData != null && oldMetaData.equals(newMetaData))) { @@ -344,6 +345,9 @@ public void applyClusterState(ClusterChangedEvent event) { } } } + for (Repository repo : builder.values()) { + repo.updateState(state); + } repositories = Collections.unmodifiableMap(builder); } catch (Exception ex) { logger.warn("failure updating cluster state ", ex); @@ -411,11 +415,13 @@ private Repository createRepository(RepositoryMetaData repositoryMetaData, Map Math.max(known, finalBestGen)); + } + + private long bestGeneration(Collection operations) { + final String repoName = metadata.name(); + assert operations.size() <= 1 : "Assumed one or no operations but received " + operations; + return operations.stream().filter(e -> e.repository().equals(repoName)).mapToLong(RepositoryOperation::repositoryStateId) + .max().orElse(RepositoryData.EMPTY_REPO_GEN); + } + public ThreadPool threadPool() { return threadPool; } diff --git a/server/src/main/java/org/elasticsearch/repositories/fs/FsRepository.java b/server/src/main/java/org/elasticsearch/repositories/fs/FsRepository.java index 61558e4f42efa..efe095eb9b6c2 100644 --- a/server/src/main/java/org/elasticsearch/repositories/fs/FsRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/fs/FsRepository.java @@ -22,6 +22,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.cluster.metadata.RepositoryMetaData; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.BlobStore; import org.elasticsearch.common.blobstore.fs.FsBlobStore; @@ -32,7 +33,6 @@ import org.elasticsearch.env.Environment; import org.elasticsearch.repositories.RepositoryException; import org.elasticsearch.repositories.blobstore.BlobStoreRepository; -import org.elasticsearch.threadpool.ThreadPool; import java.nio.file.Path; import java.util.function.Function; @@ -70,8 +70,8 @@ public class FsRepository extends BlobStoreRepository { * Constructs a shared file system repository. */ public FsRepository(RepositoryMetaData metadata, Environment environment, NamedXContentRegistry namedXContentRegistry, - ThreadPool threadPool) { - super(metadata, namedXContentRegistry, threadPool, BlobPath.cleanPath()); + ClusterService clusterService) { + super(metadata, namedXContentRegistry, clusterService, BlobPath.cleanPath()); this.environment = environment; String location = REPOSITORIES_LOCATION_SETTING.get(metadata.settings()); if (location.isEmpty()) { diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 0ac0204760ea3..c38462e240747 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -573,7 +573,7 @@ private void cleanupAfterError(Exception exception) { ExceptionsHelper.stackTrace(exception), 0, Collections.emptyList(), - snapshot.getRepositoryStateId(), + snapshot.repositoryStateId(), snapshot.includeGlobalState(), metaDataForSnapshot(snapshot, clusterService.state().metaData()), snapshot.userMetadata(), @@ -782,7 +782,7 @@ private void finalizeSnapshotDeletionFromPreviousMaster(ClusterState state) { if (deletionsInProgress != null && deletionsInProgress.hasDeletionsInProgress()) { assert deletionsInProgress.getEntries().size() == 1 : "only one in-progress deletion allowed per cluster"; SnapshotDeletionsInProgress.Entry entry = deletionsInProgress.getEntries().get(0); - deleteSnapshotFromRepository(entry.getSnapshot(), null, entry.getRepositoryStateId(), + deleteSnapshotFromRepository(entry.getSnapshot(), null, entry.repositoryStateId(), state.nodes().getMinNodeVersion()); } } @@ -852,7 +852,7 @@ public void onResponse(Void aVoid) { public void onFailure(Exception e) { logger.warn("failed to clean up abandoned snapshot {} in INIT state", snapshot.snapshot()); } - }, updatedSnapshot.getRepositoryStateId(), false); + }, updatedSnapshot.repositoryStateId(), false); } assert updatedSnapshot.shards().size() == snapshot.shards().size() : "Shard count changed during snapshot status update from [" + snapshot + "] to [" + updatedSnapshot + "]"; @@ -1037,7 +1037,7 @@ protected void doRun() { failure, entry.shards().size(), unmodifiableList(shardFailures), - entry.getRepositoryStateId(), + entry.repositoryStateId(), entry.includeGlobalState(), metaDataForSnapshot(entry, metaData), entry.userMetadata(), @@ -1163,7 +1163,7 @@ public void deleteSnapshot(final String repositoryName, final String snapshotNam if (matchedInProgress.isPresent()) { matchedEntry = matchedInProgress.map(s -> s.snapshot().getSnapshotId()); // Derive repository generation if a snapshot is in progress because it will increment the generation when it finishes - repoGenId = matchedInProgress.get().getRepositoryStateId() + 1L; + repoGenId = matchedInProgress.get().repositoryStateId() + 1L; } } if (matchedEntry.isPresent() == false) { diff --git a/server/src/test/java/org/elasticsearch/repositories/RepositoriesModuleTests.java b/server/src/test/java/org/elasticsearch/repositories/RepositoriesModuleTests.java index cd31ce121b245..1fad6f62773af 100644 --- a/server/src/test/java/org/elasticsearch/repositories/RepositoriesModuleTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/RepositoriesModuleTests.java @@ -44,6 +44,7 @@ public class RepositoriesModuleTests extends ESTestCase { private RepositoryPlugin plugin2; private Repository.Factory factory; private ThreadPool threadPool; + private ClusterService clusterService; @Override public void setUp() throws Exception { @@ -51,6 +52,7 @@ public void setUp() throws Exception { environment = mock(Environment.class); contentRegistry = mock(NamedXContentRegistry.class); threadPool = mock(ThreadPool.class); + clusterService = mock(ClusterService.class); plugin1 = mock(RepositoryPlugin.class); plugin2 = mock(RepositoryPlugin.class); factory = mock(Repository.Factory.class); @@ -60,8 +62,8 @@ public void setUp() throws Exception { } public void testCanRegisterTwoRepositoriesWithDifferentTypes() { - when(plugin1.getRepositories(environment, contentRegistry, threadPool)).thenReturn(Collections.singletonMap("type1", factory)); - when(plugin2.getRepositories(environment, contentRegistry, threadPool)).thenReturn(Collections.singletonMap("type2", factory)); + when(plugin1.getRepositories(environment, contentRegistry, clusterService)).thenReturn(Collections.singletonMap("type1", factory)); + when(plugin2.getRepositories(environment, contentRegistry, clusterService)).thenReturn(Collections.singletonMap("type2", factory)); // Would throw new RepositoriesModule( @@ -69,37 +71,37 @@ public void testCanRegisterTwoRepositoriesWithDifferentTypes() { } public void testCannotRegisterTwoRepositoriesWithSameTypes() { - when(plugin1.getRepositories(environment, contentRegistry, threadPool)).thenReturn(Collections.singletonMap("type1", factory)); - when(plugin2.getRepositories(environment, contentRegistry, threadPool)).thenReturn(Collections.singletonMap("type1", factory)); + when(plugin1.getRepositories(environment, contentRegistry, clusterService)).thenReturn(Collections.singletonMap("type1", factory)); + when(plugin2.getRepositories(environment, contentRegistry, clusterService)).thenReturn(Collections.singletonMap("type1", factory)); IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, - () -> new RepositoriesModule(environment, repoPlugins, mock(TransportService.class), mock(ClusterService.class), + () -> new RepositoriesModule(environment, repoPlugins, mock(TransportService.class), clusterService, threadPool, contentRegistry)); assertEquals("Repository type [type1] is already registered", ex.getMessage()); } public void testCannotRegisterTwoInternalRepositoriesWithSameTypes() { - when(plugin1.getInternalRepositories(environment, contentRegistry, threadPool)) + when(plugin1.getInternalRepositories(environment, contentRegistry, clusterService)) .thenReturn(Collections.singletonMap("type1", factory)); - when(plugin2.getInternalRepositories(environment, contentRegistry, threadPool)) + when(plugin2.getInternalRepositories(environment, contentRegistry, clusterService)) .thenReturn(Collections.singletonMap("type1", factory)); IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, - () -> new RepositoriesModule(environment, repoPlugins, mock(TransportService.class), mock(ClusterService.class), + () -> new RepositoriesModule(environment, repoPlugins, mock(TransportService.class), clusterService, threadPool, contentRegistry)); assertEquals("Internal repository type [type1] is already registered", ex.getMessage()); } public void testCannotRegisterNormalAndInternalRepositoriesWithSameTypes() { - when(plugin1.getRepositories(environment, contentRegistry, threadPool)).thenReturn(Collections.singletonMap("type1", factory)); - when(plugin2.getInternalRepositories(environment, contentRegistry, threadPool)) + when(plugin1.getRepositories(environment, contentRegistry, clusterService)).thenReturn(Collections.singletonMap("type1", factory)); + when(plugin2.getInternalRepositories(environment, contentRegistry, clusterService)) .thenReturn(Collections.singletonMap("type1", factory)); IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, - () -> new RepositoriesModule(environment, repoPlugins, mock(TransportService.class), mock(ClusterService.class), - threadPool, contentRegistry)); + () -> new RepositoriesModule(environment, repoPlugins, mock(TransportService.class), clusterService, threadPool, + contentRegistry)); assertEquals("Internal repository type [type1] is already registered as a non-internal repository", ex.getMessage()); } diff --git a/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java b/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java index 0893a2568df17..2daa4afe3e149 100644 --- a/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java @@ -22,6 +22,7 @@ import org.apache.lucene.index.IndexCommit; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryRequest; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.RepositoryMetaData; @@ -214,6 +215,10 @@ public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, In return null; } + @Override + public void updateState(final ClusterState state) { + } + @Override public Lifecycle.State lifecycleState() { return null; diff --git a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java index 59a977efe5f5d..b5d99db0a880f 100644 --- a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java @@ -193,7 +193,8 @@ public void testSnapshotWithConflictingName() throws IOException { private Repository createRepository() { Settings settings = Settings.builder().put("location", randomAlphaOfLength(10)).build(); RepositoryMetaData repositoryMetaData = new RepositoryMetaData(randomAlphaOfLength(10), FsRepository.TYPE, settings); - final FsRepository repository = new FsRepository(repositoryMetaData, createEnvironment(), xContentRegistry(), threadPool) { + final FsRepository repository = new FsRepository(repositoryMetaData, createEnvironment(), xContentRegistry(), + BlobStoreTestUtil.mockClusterService()) { @Override protected void assertSnapshotOrGenericThread() { // eliminate thread name check as we create repo manually diff --git a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java index 425ccdfe8ccc2..6d6248c446df1 100644 --- a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java @@ -23,6 +23,7 @@ import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; @@ -41,7 +42,6 @@ import org.elasticsearch.snapshots.SnapshotState; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESSingleNodeTestCase; -import org.elasticsearch.threadpool.ThreadPool; import java.nio.file.Path; import java.util.Arrays; @@ -71,9 +71,9 @@ public static class FsLikeRepoPlugin extends Plugin implements RepositoryPlugin @Override public Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry, - ThreadPool threadPool) { + ClusterService clusterService) { return Collections.singletonMap(REPO_TYPE, - (metadata) -> new FsRepository(metadata, env, namedXContentRegistry, threadPool) { + (metadata) -> new FsRepository(metadata, env, namedXContentRegistry, clusterService) { @Override protected void assertSnapshotOrGenericThread() { // eliminate thread name check as we access blobStore on test/main threads diff --git a/server/src/test/java/org/elasticsearch/repositories/fs/FsRepositoryTests.java b/server/src/test/java/org/elasticsearch/repositories/fs/FsRepositoryTests.java index 5d5dc1d23e639..2b687fbcac228 100644 --- a/server/src/test/java/org/elasticsearch/repositories/fs/FsRepositoryTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/fs/FsRepositoryTests.java @@ -54,6 +54,7 @@ import org.elasticsearch.index.store.Store; import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.repositories.IndexId; +import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil; import org.elasticsearch.snapshots.Snapshot; import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.test.DummyShardLock; @@ -90,7 +91,8 @@ public void testSnapshotAndRestore() throws IOException, InterruptedException { int numDocs = indexDocs(directory); RepositoryMetaData metaData = new RepositoryMetaData("test", "fs", settings); - FsRepository repository = new FsRepository(metaData, new Environment(settings, null), NamedXContentRegistry.EMPTY, threadPool); + FsRepository repository = new FsRepository(metaData, new Environment(settings, null), NamedXContentRegistry.EMPTY, + BlobStoreTestUtil.mockClusterService()); repository.start(); final Settings indexSettings = Settings.builder().put(IndexMetaData.SETTING_INDEX_UUID, "myindexUUID").build(); IndexSettings idxSettings = IndexSettingsModule.newIndexSettings("myindex", indexSettings); diff --git a/server/src/test/java/org/elasticsearch/snapshots/MetadataLoadingDuringSnapshotRestoreIT.java b/server/src/test/java/org/elasticsearch/snapshots/MetadataLoadingDuringSnapshotRestoreIT.java index 364d2efe311b9..3d65782d4824d 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/MetadataLoadingDuringSnapshotRestoreIT.java +++ b/server/src/test/java/org/elasticsearch/snapshots/MetadataLoadingDuringSnapshotRestoreIT.java @@ -26,6 +26,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.RepositoryMetaData; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.env.Environment; @@ -35,7 +36,6 @@ import org.elasticsearch.repositories.Repository; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.snapshots.mockstore.MockRepository; -import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; import java.util.Collection; @@ -185,8 +185,8 @@ public static class CountingMockRepository extends MockRepository { public CountingMockRepository(final RepositoryMetaData metadata, final Environment environment, - final NamedXContentRegistry namedXContentRegistry, ThreadPool threadPool) { - super(metadata, environment, namedXContentRegistry, threadPool); + final NamedXContentRegistry namedXContentRegistry, ClusterService clusterService) { + super(metadata, environment, namedXContentRegistry, clusterService); } @Override @@ -206,9 +206,9 @@ public IndexMetaData getSnapshotIndexMetaData(SnapshotId snapshotId, IndexId ind public static class CountingMockRepositoryPlugin extends MockRepository.Plugin { @Override public Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry, - ThreadPool threadPool) { + ClusterService clusterService) { return Collections.singletonMap("coutingmock", - metadata -> new CountingMockRepository(metadata, env, namedXContentRegistry, threadPool)); + metadata -> new CountingMockRepository(metadata, env, namedXContentRegistry, clusterService)); } } } diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index c4d27ff13fca8..124bf290677bb 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -1226,23 +1226,15 @@ searchTransportService, new SearchPhaseController(searchService::createReduceCon private Repository.Factory getRepoFactory(Environment environment) { // Run half the tests with the eventually consistent repository if (blobStoreContext == null) { - return metaData -> { - final Repository repository = new FsRepository(metaData, environment, xContentRegistry(), threadPool) { - @Override - protected void assertSnapshotOrGenericThread() { - // eliminate thread name check as we create repo in the test thread - } - }; - repository.start(); - return repository; + return metaData -> new FsRepository(metaData, environment, xContentRegistry(), clusterService) { + @Override + protected void assertSnapshotOrGenericThread() { + // eliminate thread name check as we create repo in the test thread + } }; } else { - return metaData -> { - final Repository repository = new MockEventuallyConsistentRepository( - metaData, xContentRegistry(), deterministicTaskQueue.getThreadPool(), blobStoreContext, random()); - repository.start(); - return repository; - }; + return metaData -> + new MockEventuallyConsistentRepository(metaData, xContentRegistry(), clusterService, blobStoreContext, random()); } } public void restart() { diff --git a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java index 9727dc5f28283..0931ceb494827 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java +++ b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java @@ -21,6 +21,7 @@ import org.apache.lucene.codecs.CodecUtil; import org.elasticsearch.cluster.metadata.RepositoryMetaData; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobMetaData; @@ -37,7 +38,6 @@ import org.elasticsearch.repositories.blobstore.BlobStoreRepository; import org.elasticsearch.snapshots.SnapshotInfo; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.threadpool.ThreadPool; import java.io.ByteArrayInputStream; import java.io.IOException; @@ -74,10 +74,10 @@ public class MockEventuallyConsistentRepository extends BlobStoreRepository { public MockEventuallyConsistentRepository( final RepositoryMetaData metadata, final NamedXContentRegistry namedXContentRegistry, - final ThreadPool threadPool, + final ClusterService clusterService, final Context context, final Random random) { - super(metadata, namedXContentRegistry, threadPool, BlobPath.cleanPath()); + super(metadata, namedXContentRegistry, clusterService, BlobPath.cleanPath()); this.context = context; this.namedXContentRegistry = namedXContentRegistry; this.random = random; diff --git a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java index 5fdfbe9a93a4d..f1cf314e3158a 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java @@ -18,7 +18,6 @@ */ package org.elasticsearch.snapshots.mockstore; -import org.apache.lucene.util.SameThreadExecutorService; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.RepositoryMetaData; @@ -27,10 +26,10 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.repositories.ShardGenerations; import org.elasticsearch.repositories.blobstore.BlobStoreRepository; +import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil; import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.snapshots.SnapshotInfo; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.threadpool.ThreadPool; import java.io.ByteArrayInputStream; import java.io.IOException; @@ -41,8 +40,6 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.startsWith; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; public class MockEventuallyConsistentRepositoryTests extends ESTestCase { @@ -50,7 +47,7 @@ public void testReadAfterWriteConsistently() throws IOException { MockEventuallyConsistentRepository.Context blobStoreContext = new MockEventuallyConsistentRepository.Context(); try (BlobStoreRepository repository = new MockEventuallyConsistentRepository( new RepositoryMetaData("testRepo", "mockEventuallyConsistent", Settings.EMPTY), - xContentRegistry(), mock(ThreadPool.class), blobStoreContext, random())) { + xContentRegistry(), BlobStoreTestUtil.mockClusterService(), blobStoreContext, random())) { repository.start(); final BlobContainer blobContainer = repository.blobStore().blobContainer(repository.basePath()); final String blobName = randomAlphaOfLength(10); @@ -70,7 +67,7 @@ public void testReadAfterWriteAfterReadThrows() throws IOException { MockEventuallyConsistentRepository.Context blobStoreContext = new MockEventuallyConsistentRepository.Context(); try (BlobStoreRepository repository = new MockEventuallyConsistentRepository( new RepositoryMetaData("testRepo", "mockEventuallyConsistent", Settings.EMPTY), - xContentRegistry(), mock(ThreadPool.class), blobStoreContext, random())) { + xContentRegistry(), BlobStoreTestUtil.mockClusterService(), blobStoreContext, random())) { repository.start(); final BlobContainer blobContainer = repository.blobStore().blobContainer(repository.basePath()); final String blobName = randomAlphaOfLength(10); @@ -86,7 +83,7 @@ public void testReadAfterDeleteAfterWriteThrows() throws IOException { MockEventuallyConsistentRepository.Context blobStoreContext = new MockEventuallyConsistentRepository.Context(); try (BlobStoreRepository repository = new MockEventuallyConsistentRepository( new RepositoryMetaData("testRepo", "mockEventuallyConsistent", Settings.EMPTY), - xContentRegistry(), mock(ThreadPool.class), blobStoreContext, random())) { + xContentRegistry(), BlobStoreTestUtil.mockClusterService(), blobStoreContext, random())) { repository.start(); final BlobContainer blobContainer = repository.blobStore().blobContainer(repository.basePath()); final String blobName = randomAlphaOfLength(10); @@ -104,7 +101,7 @@ public void testOverwriteRandomBlobFails() throws IOException { MockEventuallyConsistentRepository.Context blobStoreContext = new MockEventuallyConsistentRepository.Context(); try (BlobStoreRepository repository = new MockEventuallyConsistentRepository( new RepositoryMetaData("testRepo", "mockEventuallyConsistent", Settings.EMPTY), - xContentRegistry(), mock(ThreadPool.class), blobStoreContext, random())) { + xContentRegistry(), BlobStoreTestUtil.mockClusterService(), blobStoreContext, random())) { repository.start(); final BlobContainer container = repository.blobStore().blobContainer(repository.basePath()); final String blobName = randomAlphaOfLength(10); @@ -121,7 +118,7 @@ public void testOverwriteShardSnapBlobFails() throws IOException { MockEventuallyConsistentRepository.Context blobStoreContext = new MockEventuallyConsistentRepository.Context(); try (BlobStoreRepository repository = new MockEventuallyConsistentRepository( new RepositoryMetaData("testRepo", "mockEventuallyConsistent", Settings.EMPTY), - xContentRegistry(), mock(ThreadPool.class), blobStoreContext, random())) { + xContentRegistry(), BlobStoreTestUtil.mockClusterService(), blobStoreContext, random())) { repository.start(); final BlobContainer container = repository.blobStore().blobContainer(repository.basePath().add("indices").add("someindex").add("0")); @@ -137,13 +134,9 @@ public void testOverwriteShardSnapBlobFails() throws IOException { public void testOverwriteSnapshotInfoBlob() { MockEventuallyConsistentRepository.Context blobStoreContext = new MockEventuallyConsistentRepository.Context(); - final ThreadPool threadPool = mock(ThreadPool.class); - when(threadPool.executor(ThreadPool.Names.SNAPSHOT)).thenReturn(new SameThreadExecutorService()); - when(threadPool.info(ThreadPool.Names.SNAPSHOT)).thenReturn( - new ThreadPool.Info(ThreadPool.Names.SNAPSHOT, ThreadPool.ThreadPoolType.FIXED, randomIntBetween(1, 10))); try (BlobStoreRepository repository = new MockEventuallyConsistentRepository( new RepositoryMetaData("testRepo", "mockEventuallyConsistent", Settings.EMPTY), - xContentRegistry(), threadPool, blobStoreContext, random())) { + xContentRegistry(), BlobStoreTestUtil.mockClusterService(), blobStoreContext, random())) { repository.start(); // We create a snap- blob for snapshot "foo" in the first generation diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java b/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java index 5744504845aa6..f0118d3c0b699 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java @@ -20,6 +20,7 @@ import org.apache.lucene.index.IndexCommit; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.RepositoryMetaData; @@ -142,4 +143,8 @@ public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, In @Override public void verify(String verificationToken, DiscoveryNode localNode) { } + + @Override + public void updateState(final ClusterState state) { + } } diff --git a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreTestUtil.java b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreTestUtil.java index 12b926f93a059..66c49db542dab 100644 --- a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreTestUtil.java +++ b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreTestUtil.java @@ -18,9 +18,16 @@ */ package org.elasticsearch.repositories.blobstore; +import org.apache.lucene.util.SameThreadExecutorService; import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateApplier; +import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.service.ClusterApplierService; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobMetaData; @@ -52,9 +59,11 @@ import java.util.Locale; import java.util.Map; import java.util.Set; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import static org.elasticsearch.test.ESTestCase.randomIntBetween; @@ -66,6 +75,11 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public final class BlobStoreTestUtil { @@ -275,4 +289,40 @@ public static void assertBlobsByPrefix(BlobStoreRepository repository, BlobPath } } } + + /** + * Creates a mocked {@link ClusterService} for use in {@link BlobStoreRepository} related tests that mocks out all the necessary + * functionality to make {@link BlobStoreRepository} work. + * + * @return Mock ClusterService + */ + public static ClusterService mockClusterService() { + final ThreadPool threadPool = mock(ThreadPool.class); + when(threadPool.executor(ThreadPool.Names.SNAPSHOT)).thenReturn(new SameThreadExecutorService()); + when(threadPool.generic()).thenReturn(new SameThreadExecutorService()); + when(threadPool.info(ThreadPool.Names.SNAPSHOT)).thenReturn( + new ThreadPool.Info(ThreadPool.Names.SNAPSHOT, ThreadPool.ThreadPoolType.FIXED, randomIntBetween(1, 10))); + final ClusterService clusterService = mock(ClusterService.class); + final ClusterApplierService clusterApplierService = mock(ClusterApplierService.class); + when(clusterService.getClusterApplierService()).thenReturn(clusterApplierService); + final AtomicReference currentState = new AtomicReference<>(ClusterState.EMPTY_STATE); + when(clusterService.state()).then(invocationOnMock -> currentState.get()); + final List appliers = new CopyOnWriteArrayList<>(); + doAnswer(invocation -> { + final ClusterStateUpdateTask task = ((ClusterStateUpdateTask) invocation.getArguments()[1]); + final ClusterState current = currentState.get(); + final ClusterState next = task.execute(current); + currentState.set(next); + appliers.forEach(applier -> applier.applyClusterState( + new ClusterChangedEvent((String) invocation.getArguments()[0], next, current))); + task.clusterStateProcessed((String) invocation.getArguments()[0], current, next); + return null; + }).when(clusterService).submitStateUpdateTask(anyString(), any(ClusterStateUpdateTask.class)); + doAnswer(invocation -> { + appliers.add((ClusterStateApplier) invocation.getArguments()[0]); + return null; + }).when(clusterService).addStateApplier(any(ClusterStateApplier.class)); + when(clusterApplierService.threadPool()).thenReturn(threadPool); + return clusterService; + } } diff --git a/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java b/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java index 9a157e11c7227..218c6f4eecac7 100644 --- a/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java +++ b/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java @@ -25,6 +25,7 @@ import org.apache.lucene.index.CorruptIndexException; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.cluster.metadata.RepositoryMetaData; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobMetaData; import org.elasticsearch.common.blobstore.BlobPath; @@ -40,7 +41,6 @@ import org.elasticsearch.plugins.RepositoryPlugin; import org.elasticsearch.repositories.Repository; import org.elasticsearch.repositories.fs.FsRepository; -import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; import java.io.InputStream; @@ -71,8 +71,9 @@ public static class Plugin extends org.elasticsearch.plugins.Plugin implements R @Override public Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry, - ThreadPool threadPool) { - return Collections.singletonMap("mock", (metadata) -> new MockRepository(metadata, env, namedXContentRegistry, threadPool)); + ClusterService clusterService) { + return Collections.singletonMap("mock", (metadata) -> + new MockRepository(metadata, env, namedXContentRegistry, clusterService)); } @Override @@ -113,8 +114,8 @@ public long getFailureCount() { private volatile boolean blocked = false; public MockRepository(RepositoryMetaData metadata, Environment environment, - NamedXContentRegistry namedXContentRegistry, ThreadPool threadPool) { - super(overrideSettings(metadata, environment), environment, namedXContentRegistry, threadPool); + NamedXContentRegistry namedXContentRegistry, ClusterService clusterService) { + super(overrideSettings(metadata, environment), environment, namedXContentRegistry, clusterService); randomControlIOExceptionRate = metadata.settings().getAsDouble("random_control_io_exception_rate", 0.0); randomDataFileIOExceptionRate = metadata.settings().getAsDouble("random_data_file_io_exception_rate", 0.0); useLuceneCorruptionException = metadata.settings().getAsBoolean("use_lucene_corruption", false); diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java index 9bf15e2ab030c..0fd087a47c44e 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java @@ -339,9 +339,10 @@ public List> getExecutorBuilders(Settings settings) { @Override public Map getInternalRepositories(Environment env, NamedXContentRegistry namedXContentRegistry, - ThreadPool threadPool) { + ClusterService clusterService) { Repository.Factory repositoryFactory = - (metadata) -> new CcrRepository(metadata, client, ccrLicenseChecker, settings, ccrSettings.get(), threadPool); + (metadata) -> new CcrRepository(metadata, client, ccrLicenseChecker, settings, ccrSettings.get(), + clusterService.getClusterApplierService().threadPool()); return Collections.singletonMap(CcrRepository.TYPE, repositoryFactory); } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java index 820a965e3b134..20c098ee7f82a 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java @@ -22,6 +22,7 @@ import org.elasticsearch.action.support.ThreadedActionListener; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.metadata.MetaData; @@ -420,6 +421,10 @@ public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, In throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE); } + @Override + public void updateState(ClusterState state) { + } + private void updateMappings(Client leaderClient, Index leaderIndex, long leaderMappingVersion, Client followerClient, Index followerIndex) { final PlainActionFuture indexMetadataFuture = new PlainActionFuture<>(); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackPlugin.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackPlugin.java index 1b20ceae9233e..02fd885b3e350 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackPlugin.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackPlugin.java @@ -312,7 +312,7 @@ public static Path resolveConfigFile(Environment env, String name) { @Override public Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry, - ThreadPool threadPool) { + ClusterService clusterService) { return Collections.singletonMap("source", SourceOnlySnapshotRepository.newRepositoryFactory()); } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotIT.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotIT.java index d6e94fc8ce112..899c300493ab4 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotIT.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotIT.java @@ -16,6 +16,7 @@ import org.elasticsearch.client.Client; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.metadata.MappingMetaData; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; @@ -35,7 +36,6 @@ import org.elasticsearch.search.slice.SliceBuilder; import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.test.ESIntegTestCase; -import org.elasticsearch.threadpool.ThreadPool; import org.hamcrest.Matchers; import java.io.IOException; @@ -70,7 +70,7 @@ protected boolean addMockInternalEngine() { public static final class MyPlugin extends Plugin implements RepositoryPlugin, EnginePlugin { @Override public Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry, - ThreadPool threadPool) { + ClusterService clusterService) { return Collections.singletonMap("source", SourceOnlySnapshotRepository.newRepositoryFactory()); } @Override diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java index f5f8287f2c5c1..03ca8d5cfff28 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java @@ -61,6 +61,7 @@ import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.Repository; import org.elasticsearch.repositories.ShardGenerations; +import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil; import org.elasticsearch.repositories.blobstore.ESBlobStoreRepositoryIntegTestCase; import org.elasticsearch.repositories.fs.FsRepository; import org.elasticsearch.threadpool.ThreadPool; @@ -348,10 +349,10 @@ private Environment createEnvironment() { } /** Create a {@link Repository} with a random name **/ - private Repository createRepository() throws IOException { + private Repository createRepository() { Settings settings = Settings.builder().put("location", randomAlphaOfLength(10)).build(); RepositoryMetaData repositoryMetaData = new RepositoryMetaData(randomAlphaOfLength(10), FsRepository.TYPE, settings); - return new FsRepository(repositoryMetaData, createEnvironment(), xContentRegistry(), threadPool); + return new FsRepository(repositoryMetaData, createEnvironment(), xContentRegistry(), BlobStoreTestUtil.mockClusterService()); } private static void runAsSnapshot(ThreadPool pool, Runnable runnable) { diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java index 6952342040c5d..accea03f9b68a 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java @@ -406,19 +406,21 @@ public List> getPersistentTasksExecutor(ClusterServic @Override public Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry, - ThreadPool threadPool) { - HashMap repositories = new HashMap<>(super.getRepositories(env, namedXContentRegistry, threadPool)); - filterPlugins(RepositoryPlugin.class).forEach(r -> repositories.putAll(r.getRepositories(env, namedXContentRegistry, threadPool))); + ClusterService clusterService) { + HashMap repositories = + new HashMap<>(super.getRepositories(env, namedXContentRegistry, clusterService)); + filterPlugins(RepositoryPlugin.class).forEach( + r -> repositories.putAll(r.getRepositories(env, namedXContentRegistry, clusterService))); return repositories; } @Override public Map getInternalRepositories(Environment env, NamedXContentRegistry namedXContentRegistry, - ThreadPool threadPool) { + ClusterService clusterService) { HashMap internalRepositories = - new HashMap<>(super.getInternalRepositories(env, namedXContentRegistry, threadPool)); + new HashMap<>(super.getInternalRepositories(env, namedXContentRegistry, clusterService)); filterPlugins(RepositoryPlugin.class).forEach(r -> - internalRepositories.putAll(r.getInternalRepositories(env, namedXContentRegistry, threadPool))); + internalRepositories.putAll(r.getInternalRepositories(env, namedXContentRegistry, clusterService))); return internalRepositories; }