diff --git a/modules/repository-url/src/main/java/org/elasticsearch/plugin/repository/url/URLRepositoryPlugin.java b/modules/repository-url/src/main/java/org/elasticsearch/plugin/repository/url/URLRepositoryPlugin.java index 6e88f0e0deb54..4c3fa615937e7 100644 --- a/modules/repository-url/src/main/java/org/elasticsearch/plugin/repository/url/URLRepositoryPlugin.java +++ b/modules/repository-url/src/main/java/org/elasticsearch/plugin/repository/url/URLRepositoryPlugin.java @@ -19,6 +19,7 @@ package org.elasticsearch.plugin.repository.url; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.env.Environment; @@ -26,7 +27,6 @@ import org.elasticsearch.plugins.RepositoryPlugin; import org.elasticsearch.repositories.Repository; import org.elasticsearch.repositories.url.URLRepository; -import org.elasticsearch.threadpool.ThreadPool; import java.util.Arrays; import java.util.Collections; @@ -46,8 +46,8 @@ public List> getSettings() { @Override public Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry, - ThreadPool threadPool) { + ClusterService clusterService) { return Collections.singletonMap(URLRepository.TYPE, - metadata -> new URLRepository(metadata, env, namedXContentRegistry, threadPool)); + metadata -> new URLRepository(metadata, env, namedXContentRegistry, clusterService)); } } diff --git a/modules/repository-url/src/main/java/org/elasticsearch/repositories/url/URLRepository.java b/modules/repository-url/src/main/java/org/elasticsearch/repositories/url/URLRepository.java index 6e759756a7289..8fbe4752dc83c 100644 --- a/modules/repository-url/src/main/java/org/elasticsearch/repositories/url/URLRepository.java +++ b/modules/repository-url/src/main/java/org/elasticsearch/repositories/url/URLRepository.java @@ -22,6 +22,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.cluster.metadata.RepositoryMetaData; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.BlobStore; @@ -33,7 +34,6 @@ import org.elasticsearch.env.Environment; import org.elasticsearch.repositories.RepositoryException; import org.elasticsearch.repositories.blobstore.BlobStoreRepository; -import org.elasticsearch.threadpool.ThreadPool; import java.net.MalformedURLException; import java.net.URISyntaxException; @@ -81,8 +81,8 @@ public class URLRepository extends BlobStoreRepository { * Constructs a read-only URL-based repository */ public URLRepository(RepositoryMetaData metadata, Environment environment, - NamedXContentRegistry namedXContentRegistry, ThreadPool threadPool) { - super(metadata, namedXContentRegistry, threadPool, BlobPath.cleanPath()); + NamedXContentRegistry namedXContentRegistry, ClusterService clusterService) { + super(metadata, namedXContentRegistry, clusterService, BlobPath.cleanPath()); if (URL_SETTING.exists(metadata.settings()) == false && REPOSITORIES_URL_SETTING.exists(environment.settings()) == false) { throw new RepositoryException(metadata.name(), "missing url"); diff --git a/modules/repository-url/src/test/java/org/elasticsearch/repositories/url/URLRepositoryTests.java b/modules/repository-url/src/test/java/org/elasticsearch/repositories/url/URLRepositoryTests.java index 96a82ee0b9d24..823571462d243 100644 --- a/modules/repository-url/src/test/java/org/elasticsearch/repositories/url/URLRepositoryTests.java +++ b/modules/repository-url/src/test/java/org/elasticsearch/repositories/url/URLRepositoryTests.java @@ -25,8 +25,8 @@ import org.elasticsearch.env.Environment; import org.elasticsearch.env.TestEnvironment; import org.elasticsearch.repositories.RepositoryException; +import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; import java.nio.file.Path; @@ -35,13 +35,12 @@ import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.not; import static org.hamcrest.CoreMatchers.nullValue; -import static org.mockito.Mockito.mock; public class URLRepositoryTests extends ESTestCase { private URLRepository createRepository(Settings baseSettings, RepositoryMetaData repositoryMetaData) { return new URLRepository(repositoryMetaData, TestEnvironment.newEnvironment(baseSettings), - new NamedXContentRegistry(Collections.emptyList()), mock(ThreadPool.class)) { + new NamedXContentRegistry(Collections.emptyList()), BlobStoreTestUtil.mockClusterService()) { @Override protected void assertSnapshotOrGenericThread() { // eliminate thread name check as we create repo manually on test/main threads diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepository.java b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepository.java index b5c6ed70ad0d2..7b7a9108c1ef5 100644 --- a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepository.java +++ b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepository.java @@ -24,6 +24,7 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.cluster.metadata.RepositoryMetaData; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.BlobStore; @@ -32,7 +33,6 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.repositories.blobstore.BlobStoreRepository; -import org.elasticsearch.threadpool.ThreadPool; import java.util.Locale; import java.util.function.Function; @@ -79,8 +79,8 @@ public AzureRepository( final RepositoryMetaData metadata, final NamedXContentRegistry namedXContentRegistry, final AzureStorageService storageService, - final ThreadPool threadPool) { - super(metadata, namedXContentRegistry, threadPool, buildBasePath(metadata)); + final ClusterService clusterService) { + super(metadata, namedXContentRegistry, clusterService, buildBasePath(metadata)); this.chunkSize = Repository.CHUNK_SIZE_SETTING.get(metadata.settings()); this.storageService = storageService; diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepositoryPlugin.java b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepositoryPlugin.java index 8815e738f9cdd..d98a7c3cbd717 100644 --- a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepositoryPlugin.java +++ b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepositoryPlugin.java @@ -19,6 +19,7 @@ package org.elasticsearch.repositories.azure; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.SettingsException; @@ -31,7 +32,6 @@ import org.elasticsearch.repositories.Repository; import org.elasticsearch.threadpool.ExecutorBuilder; import org.elasticsearch.threadpool.ScalingExecutorBuilder; -import org.elasticsearch.threadpool.ThreadPool; import java.util.Arrays; import java.util.Collections; @@ -60,9 +60,9 @@ AzureStorageService createAzureStoreService(final Settings settings) { @Override public Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry, - ThreadPool threadPool) { + ClusterService clusterService) { return Collections.singletonMap(AzureRepository.TYPE, - (metadata) -> new AzureRepository(metadata, namedXContentRegistry, azureStoreService, threadPool)); + (metadata) -> new AzureRepository(metadata, namedXContentRegistry, azureStoreService, clusterService)); } @Override diff --git a/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureRepositorySettingsTests.java b/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureRepositorySettingsTests.java index 341a1d1436deb..6f5373dab98cf 100644 --- a/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureRepositorySettingsTests.java +++ b/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureRepositorySettingsTests.java @@ -26,8 +26,8 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.env.Environment; +import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.threadpool.ThreadPool; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.nullValue; @@ -42,8 +42,7 @@ private AzureRepository azureRepository(Settings settings) { .put(settings) .build(); final AzureRepository azureRepository = new AzureRepository(new RepositoryMetaData("foo", "azure", internalSettings), - NamedXContentRegistry.EMPTY, mock(AzureStorageService.class), - mock(ThreadPool.class)); + NamedXContentRegistry.EMPTY, mock(AzureStorageService.class), BlobStoreTestUtil.mockClusterService()); assertThat(azureRepository.getBlobStore(), is(nullValue())); return azureRepository; } diff --git a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStoragePlugin.java b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStoragePlugin.java index 70c4dcf3a9889..c4a065270be69 100644 --- a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStoragePlugin.java +++ b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStoragePlugin.java @@ -19,6 +19,7 @@ package org.elasticsearch.repositories.gcs; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.NamedXContentRegistry; @@ -27,7 +28,6 @@ import org.elasticsearch.plugins.ReloadablePlugin; import org.elasticsearch.plugins.RepositoryPlugin; import org.elasticsearch.repositories.Repository; -import org.elasticsearch.threadpool.ThreadPool; import java.util.Arrays; import java.util.Collections; @@ -52,9 +52,9 @@ protected GoogleCloudStorageService createStorageService() { @Override public Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry, - ThreadPool threadPool) { + ClusterService clusterService) { return Collections.singletonMap(GoogleCloudStorageRepository.TYPE, - metadata -> new GoogleCloudStorageRepository(metadata, namedXContentRegistry, this.storageService, threadPool)); + metadata -> new GoogleCloudStorageRepository(metadata, namedXContentRegistry, this.storageService, clusterService)); } @Override diff --git a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRepository.java b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRepository.java index 4b17fd6bef3ea..852b147346e06 100644 --- a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRepository.java +++ b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRepository.java @@ -22,6 +22,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.cluster.metadata.RepositoryMetaData; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.settings.Setting; @@ -30,7 +31,6 @@ import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.repositories.RepositoryException; import org.elasticsearch.repositories.blobstore.BlobStoreRepository; -import org.elasticsearch.threadpool.ThreadPool; import java.util.function.Function; @@ -64,8 +64,8 @@ class GoogleCloudStorageRepository extends BlobStoreRepository { final RepositoryMetaData metadata, final NamedXContentRegistry namedXContentRegistry, final GoogleCloudStorageService storageService, - final ThreadPool threadPool) { - super(metadata, namedXContentRegistry, threadPool, buildBasePath(metadata)); + final ClusterService clusterService) { + super(metadata, namedXContentRegistry, clusterService, buildBasePath(metadata)); this.storageService = storageService; this.chunkSize = getSetting(CHUNK_SIZE, metadata); diff --git a/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java b/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java index ee0b59eb9d365..7a2c3d780123a 100644 --- a/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java +++ b/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java @@ -27,6 +27,7 @@ import fixture.gcs.FakeOAuth2HttpHandler; import fixture.gcs.GoogleCloudStorageHttpHandler; import org.elasticsearch.cluster.metadata.RepositoryMetaData; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.settings.MockSecureSettings; @@ -38,7 +39,6 @@ import org.elasticsearch.plugins.Plugin; import org.elasticsearch.repositories.Repository; import org.elasticsearch.repositories.blobstore.ESMockAPIBasedRepositoryIntegTestCase; -import org.elasticsearch.threadpool.ThreadPool; import org.threeten.bp.Duration; import java.io.IOException; @@ -170,9 +170,10 @@ StorageOptions createStorageOptions(final GoogleCloudStorageClientSettings clien } @Override - public Map getRepositories(Environment env, NamedXContentRegistry registry, ThreadPool threadPool) { + public Map getRepositories(Environment env, NamedXContentRegistry registry, + ClusterService clusterService) { return Collections.singletonMap(GoogleCloudStorageRepository.TYPE, - metadata -> new GoogleCloudStorageRepository(metadata, registry, this.storageService, threadPool) { + metadata -> new GoogleCloudStorageRepository(metadata, registry, this.storageService, clusterService) { @Override protected GoogleCloudStorageBlobStore createBlobStore() { return new GoogleCloudStorageBlobStore("bucket", "test", storageService) { diff --git a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsPlugin.java b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsPlugin.java index a6dc5fe7db140..1d93aed9cc308 100644 --- a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsPlugin.java +++ b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsPlugin.java @@ -30,13 +30,13 @@ import org.apache.hadoop.security.KerberosInfo; import org.apache.hadoop.security.SecurityUtil; import org.elasticsearch.SpecialPermission; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.env.Environment; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.RepositoryPlugin; import org.elasticsearch.repositories.Repository; -import org.elasticsearch.threadpool.ThreadPool; public final class HdfsPlugin extends Plugin implements RepositoryPlugin { @@ -112,7 +112,7 @@ private static Void eagerInit() { @Override public Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry, - ThreadPool threadPool) { - return Collections.singletonMap("hdfs", (metadata) -> new HdfsRepository(metadata, env, namedXContentRegistry, threadPool)); + ClusterService clusterService) { + return Collections.singletonMap("hdfs", (metadata) -> new HdfsRepository(metadata, env, namedXContentRegistry, clusterService)); } } diff --git a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsRepository.java b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsRepository.java index 72430bcd36631..776c97422e5ee 100644 --- a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsRepository.java +++ b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsRepository.java @@ -31,6 +31,7 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.SpecialPermission; import org.elasticsearch.cluster.metadata.RepositoryMetaData; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.blobstore.BlobPath; @@ -40,7 +41,6 @@ import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.env.Environment; import org.elasticsearch.repositories.blobstore.BlobStoreRepository; -import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; import java.io.UncheckedIOException; @@ -67,8 +67,8 @@ public final class HdfsRepository extends BlobStoreRepository { private static final ByteSizeValue DEFAULT_BUFFER_SIZE = new ByteSizeValue(100, ByteSizeUnit.KB); public HdfsRepository(RepositoryMetaData metadata, Environment environment, - NamedXContentRegistry namedXContentRegistry, ThreadPool threadPool) { - super(metadata, namedXContentRegistry, threadPool, BlobPath.cleanPath()); + NamedXContentRegistry namedXContentRegistry, ClusterService clusterService) { + super(metadata, namedXContentRegistry, clusterService, BlobPath.cleanPath()); this.environment = environment; this.chunkSize = metadata.settings().getAsBytesSize("chunk_size", null); diff --git a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java index af895758723f5..db6968ea71059 100644 --- a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java +++ b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java @@ -22,6 +22,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.cluster.metadata.RepositoryMetaData; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.BlobStore; @@ -32,7 +33,6 @@ import org.elasticsearch.monitor.jvm.JvmInfo; import org.elasticsearch.repositories.RepositoryException; import org.elasticsearch.repositories.blobstore.BlobStoreRepository; -import org.elasticsearch.threadpool.ThreadPool; import java.util.function.Function; @@ -152,8 +152,8 @@ class S3Repository extends BlobStoreRepository { final RepositoryMetaData metadata, final NamedXContentRegistry namedXContentRegistry, final S3Service service, - final ThreadPool threadPool) { - super(metadata, namedXContentRegistry, threadPool, buildBasePath(metadata)); + final ClusterService clusterService) { + super(metadata, namedXContentRegistry, clusterService, buildBasePath(metadata)); this.service = service; // Parse and validate the user's S3 Storage Class setting diff --git a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RepositoryPlugin.java b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RepositoryPlugin.java index 461eb9e9592cf..0661d40ec804f 100644 --- a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RepositoryPlugin.java +++ b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RepositoryPlugin.java @@ -22,6 +22,7 @@ import com.amazonaws.util.json.Jackson; import org.elasticsearch.SpecialPermission; import org.elasticsearch.cluster.metadata.RepositoryMetaData; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.NamedXContentRegistry; @@ -30,7 +31,6 @@ import org.elasticsearch.plugins.ReloadablePlugin; import org.elasticsearch.plugins.RepositoryPlugin; import org.elasticsearch.repositories.Repository; -import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; import java.security.AccessController; @@ -79,14 +79,14 @@ public S3RepositoryPlugin(final Settings settings) { protected S3Repository createRepository( final RepositoryMetaData metadata, final NamedXContentRegistry registry, - final ThreadPool threadPool) { - return new S3Repository(metadata, registry, service, threadPool); + final ClusterService clusterService) { + return new S3Repository(metadata, registry, service, clusterService); } @Override public Map getRepositories(final Environment env, final NamedXContentRegistry registry, - final ThreadPool threadPool) { - return Collections.singletonMap(S3Repository.TYPE, metadata -> createRepository(metadata, registry, threadPool)); + final ClusterService clusterService) { + return Collections.singletonMap(S3Repository.TYPE, metadata -> createRepository(metadata, registry, clusterService)); } @Override diff --git a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/RepositoryCredentialsTests.java b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/RepositoryCredentialsTests.java index f7d5ba021e25c..45a79af102df4 100644 --- a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/RepositoryCredentialsTests.java +++ b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/RepositoryCredentialsTests.java @@ -25,6 +25,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.cluster.metadata.RepositoryMetaData; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.MockSecureSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.NamedXContentRegistry; @@ -32,7 +33,6 @@ import org.elasticsearch.plugins.PluginsService; import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.test.ESSingleNodeTestCase; -import org.elasticsearch.threadpool.ThreadPool; import java.util.Collection; import java.util.List; @@ -146,8 +146,8 @@ public ProxyS3RepositoryPlugin(Settings settings) { @Override protected S3Repository createRepository(RepositoryMetaData metadata, - NamedXContentRegistry registry, ThreadPool threadPool) { - return new S3Repository(metadata, registry, service, threadPool) { + NamedXContentRegistry registry, ClusterService clusterService) { + return new S3Repository(metadata, registry, service, clusterService) { @Override protected void assertSnapshotOrGenericThread() { // eliminate thread name check as we create repo manually on test/main threads diff --git a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java index 16dd4cdc27e0b..3c26c99f8a528 100644 --- a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java +++ b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java @@ -23,6 +23,7 @@ import com.sun.net.httpserver.HttpHandler; import fixture.s3.S3HttpHandler; import org.elasticsearch.cluster.metadata.RepositoryMetaData; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobPath; @@ -35,7 +36,6 @@ import org.elasticsearch.plugins.Plugin; import org.elasticsearch.repositories.blobstore.ESMockAPIBasedRepositoryIntegTestCase; import org.elasticsearch.snapshots.mockstore.BlobStoreWrapper; -import org.elasticsearch.threadpool.ThreadPool; import java.util.ArrayList; import java.util.Collection; @@ -109,8 +109,9 @@ public List> getSettings() { } @Override - protected S3Repository createRepository(RepositoryMetaData metadata, NamedXContentRegistry registry, ThreadPool threadPool) { - return new S3Repository(metadata, registry, service, threadPool) { + protected S3Repository createRepository(RepositoryMetaData metadata, NamedXContentRegistry registry, + ClusterService clusterService) { + return new S3Repository(metadata, registry, service, clusterService) { @Override public BlobStore blobStore() { diff --git a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3RepositoryTests.java b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3RepositoryTests.java index 45177dbebbf48..3466fd89ebbf9 100644 --- a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3RepositoryTests.java +++ b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3RepositoryTests.java @@ -26,8 +26,8 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.repositories.RepositoryException; +import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.threadpool.ThreadPool; import org.hamcrest.Matchers; import java.util.Map; @@ -36,7 +36,6 @@ import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.nullValue; -import static org.mockito.Mockito.mock; public class S3RepositoryTests extends ESTestCase { @@ -120,7 +119,7 @@ public void testDefaultBufferSize() { } private S3Repository createS3Repo(RepositoryMetaData metadata) { - return new S3Repository(metadata, NamedXContentRegistry.EMPTY, new DummyS3Service(), mock(ThreadPool.class)) { + return new S3Repository(metadata, NamedXContentRegistry.EMPTY, new DummyS3Service(), BlobStoreTestUtil.mockClusterService()) { @Override protected void assertSnapshotOrGenericThread() { // eliminate thread name check as we create repo manually on test/main threads diff --git a/server/src/main/java/org/elasticsearch/cluster/RepositoryCleanupInProgress.java b/server/src/main/java/org/elasticsearch/cluster/RepositoryCleanupInProgress.java index f9be2d326980d..7c43c03ae6d40 100644 --- a/server/src/main/java/org/elasticsearch/cluster/RepositoryCleanupInProgress.java +++ b/server/src/main/java/org/elasticsearch/cluster/RepositoryCleanupInProgress.java @@ -24,6 +24,7 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.repositories.RepositoryOperation; import java.io.IOException; import java.util.Arrays; @@ -94,7 +95,7 @@ public Version getMinimalSupportedVersion() { return Version.V_7_4_0; } - public static final class Entry implements Writeable { + public static final class Entry implements Writeable, RepositoryOperation { private final String repository; @@ -110,6 +111,12 @@ public Entry(String repository, long repositoryStateId) { this.repositoryStateId = repositoryStateId; } + @Override + public long repositoryStateId() { + return repositoryStateId; + } + + @Override public String repository() { return repository; } diff --git a/server/src/main/java/org/elasticsearch/cluster/SnapshotDeletionsInProgress.java b/server/src/main/java/org/elasticsearch/cluster/SnapshotDeletionsInProgress.java index 2ac12d3e93922..9c25c5578f315 100644 --- a/server/src/main/java/org/elasticsearch/cluster/SnapshotDeletionsInProgress.java +++ b/server/src/main/java/org/elasticsearch/cluster/SnapshotDeletionsInProgress.java @@ -26,6 +26,7 @@ import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.repositories.RepositoryOperation; import org.elasticsearch.snapshots.Snapshot; import java.io.IOException; @@ -164,7 +165,7 @@ public String toString() { /** * A class representing a snapshot deletion request entry in the cluster state. */ - public static final class Entry implements Writeable { + public static final class Entry implements Writeable, RepositoryOperation { private final Snapshot snapshot; private final long startTime; private final long repositoryStateId; @@ -195,13 +196,6 @@ public long getStartTime() { return startTime; } - /** - * The repository state id at the time the snapshot deletion began. - */ - public long getRepositoryStateId() { - return repositoryStateId; - } - @Override public boolean equals(Object o) { if (this == o) { @@ -227,5 +221,15 @@ public void writeTo(StreamOutput out) throws IOException { out.writeVLong(startTime); out.writeLong(repositoryStateId); } + + @Override + public String repository() { + return snapshot.getRepository(); + } + + @Override + public long repositoryStateId() { + return repositoryStateId; + } } } diff --git a/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java b/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java index b40d771c64739..7ddecd8b2fdb3 100644 --- a/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java +++ b/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java @@ -34,6 +34,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.repositories.IndexId; +import org.elasticsearch.repositories.RepositoryOperation; import org.elasticsearch.snapshots.Snapshot; import org.elasticsearch.snapshots.SnapshotsService; @@ -81,7 +82,7 @@ public String toString() { return builder.append("]").toString(); } - public static class Entry implements ToXContent { + public static class Entry implements ToXContent, RepositoryOperation { private final State state; private final Snapshot snapshot; private final boolean includeGlobalState; @@ -153,6 +154,11 @@ public Entry(Entry entry, ImmutableOpenMap shards) this(entry, entry.state, shards, entry.failure); } + @Override + public String repository() { + return snapshot.getRepository(); + } + public Snapshot snapshot() { return this.snapshot; } @@ -189,7 +195,8 @@ public long startTime() { return startTime; } - public long getRepositoryStateId() { + @Override + public long repositoryStateId() { return repositoryStateId; } diff --git a/server/src/main/java/org/elasticsearch/plugins/RepositoryPlugin.java b/server/src/main/java/org/elasticsearch/plugins/RepositoryPlugin.java index ede5c5e3611f9..1ac61b27fd1ae 100644 --- a/server/src/main/java/org/elasticsearch/plugins/RepositoryPlugin.java +++ b/server/src/main/java/org/elasticsearch/plugins/RepositoryPlugin.java @@ -22,10 +22,10 @@ import java.util.Collections; import java.util.Map; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.env.Environment; import org.elasticsearch.repositories.Repository; -import org.elasticsearch.threadpool.ThreadPool; /** * An extension point for {@link Plugin} implementations to add custom snapshot repositories. @@ -41,7 +41,7 @@ public interface RepositoryPlugin { * the value is a factory to construct the {@link Repository} interface. */ default Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry, - ThreadPool threadPool) { + ClusterService clusterService) { return Collections.emptyMap(); } @@ -55,7 +55,7 @@ default Map getRepositories(Environment env, NamedXC * the value is a factory to construct the {@link Repository} interface. */ default Map getInternalRepositories(Environment env, NamedXContentRegistry namedXContentRegistry, - ThreadPool threadPool) { + ClusterService clusterService) { return Collections.emptyMap(); } } diff --git a/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java b/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java index 7f4d5ec5c1fda..76645834fb112 100644 --- a/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java @@ -20,6 +20,7 @@ import org.apache.lucene.index.IndexCommit; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.RepositoryMetaData; @@ -133,6 +134,11 @@ public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, In return in.getShardSnapshotStatus(snapshotId, indexId, shardId); } + @Override + public void updateState(ClusterState state) { + in.updateState(state); + } + @Override public Lifecycle.State lifecycleState() { return in.lifecycleState(); diff --git a/server/src/main/java/org/elasticsearch/repositories/RepositoriesModule.java b/server/src/main/java/org/elasticsearch/repositories/RepositoriesModule.java index 783e3ff78fc4d..f87aab460fcbc 100644 --- a/server/src/main/java/org/elasticsearch/repositories/RepositoriesModule.java +++ b/server/src/main/java/org/elasticsearch/repositories/RepositoriesModule.java @@ -43,10 +43,10 @@ public final class RepositoriesModule { public RepositoriesModule(Environment env, List repoPlugins, TransportService transportService, ClusterService clusterService, ThreadPool threadPool, NamedXContentRegistry namedXContentRegistry) { Map factories = new HashMap<>(); - factories.put(FsRepository.TYPE, metadata -> new FsRepository(metadata, env, namedXContentRegistry, threadPool)); + factories.put(FsRepository.TYPE, metadata -> new FsRepository(metadata, env, namedXContentRegistry, clusterService)); for (RepositoryPlugin repoPlugin : repoPlugins) { - Map newRepoTypes = repoPlugin.getRepositories(env, namedXContentRegistry, threadPool); + Map newRepoTypes = repoPlugin.getRepositories(env, namedXContentRegistry, clusterService); for (Map.Entry entry : newRepoTypes.entrySet()) { if (factories.put(entry.getKey(), entry.getValue()) != null) { throw new IllegalArgumentException("Repository type [" + entry.getKey() + "] is already registered"); @@ -56,7 +56,7 @@ public RepositoriesModule(Environment env, List repoPlugins, T Map internalFactories = new HashMap<>(); for (RepositoryPlugin repoPlugin : repoPlugins) { - Map newRepoTypes = repoPlugin.getInternalRepositories(env, namedXContentRegistry, threadPool); + Map newRepoTypes = repoPlugin.getInternalRepositories(env, namedXContentRegistry, clusterService); for (Map.Entry entry : newRepoTypes.entrySet()) { if (internalFactories.put(entry.getKey(), entry.getValue()) != null) { throw new IllegalArgumentException("Internal repository type [" + entry.getKey() + "] is already registered"); diff --git a/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java b/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java index 20f083dcff0fe..03b283c4aafe5 100644 --- a/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java +++ b/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java @@ -287,8 +287,9 @@ protected void doRun() { @Override public void applyClusterState(ClusterChangedEvent event) { try { + final ClusterState state = event.state(); RepositoriesMetaData oldMetaData = event.previousState().getMetaData().custom(RepositoriesMetaData.TYPE); - RepositoriesMetaData newMetaData = event.state().getMetaData().custom(RepositoriesMetaData.TYPE); + RepositoriesMetaData newMetaData = state.getMetaData().custom(RepositoriesMetaData.TYPE); // Check if repositories got changed if ((oldMetaData == null && newMetaData == null) || (oldMetaData != null && oldMetaData.equals(newMetaData))) { @@ -344,6 +345,9 @@ public void applyClusterState(ClusterChangedEvent event) { } } } + for (Repository repo : builder.values()) { + repo.updateState(state); + } repositories = Collections.unmodifiableMap(builder); } catch (Exception ex) { logger.warn("failure updating cluster state ", ex); @@ -411,11 +415,13 @@ private Repository createRepository(RepositoryMetaData repositoryMetaData, Map Math.max(known, finalBestGen)); + } + + private long bestGeneration(Collection operations) { + final String repoName = metadata.name(); + assert operations.size() <= 1 : "Assumed one or no operations but received " + operations; + return operations.stream().filter(e -> e.repository().equals(repoName)).mapToLong(RepositoryOperation::repositoryStateId) + .max().orElse(RepositoryData.EMPTY_REPO_GEN); + } + public ThreadPool threadPool() { return threadPool; } diff --git a/server/src/main/java/org/elasticsearch/repositories/fs/FsRepository.java b/server/src/main/java/org/elasticsearch/repositories/fs/FsRepository.java index 61558e4f42efa..efe095eb9b6c2 100644 --- a/server/src/main/java/org/elasticsearch/repositories/fs/FsRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/fs/FsRepository.java @@ -22,6 +22,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.cluster.metadata.RepositoryMetaData; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.BlobStore; import org.elasticsearch.common.blobstore.fs.FsBlobStore; @@ -32,7 +33,6 @@ import org.elasticsearch.env.Environment; import org.elasticsearch.repositories.RepositoryException; import org.elasticsearch.repositories.blobstore.BlobStoreRepository; -import org.elasticsearch.threadpool.ThreadPool; import java.nio.file.Path; import java.util.function.Function; @@ -70,8 +70,8 @@ public class FsRepository extends BlobStoreRepository { * Constructs a shared file system repository. */ public FsRepository(RepositoryMetaData metadata, Environment environment, NamedXContentRegistry namedXContentRegistry, - ThreadPool threadPool) { - super(metadata, namedXContentRegistry, threadPool, BlobPath.cleanPath()); + ClusterService clusterService) { + super(metadata, namedXContentRegistry, clusterService, BlobPath.cleanPath()); this.environment = environment; String location = REPOSITORIES_LOCATION_SETTING.get(metadata.settings()); if (location.isEmpty()) { diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 0ac0204760ea3..c38462e240747 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -573,7 +573,7 @@ private void cleanupAfterError(Exception exception) { ExceptionsHelper.stackTrace(exception), 0, Collections.emptyList(), - snapshot.getRepositoryStateId(), + snapshot.repositoryStateId(), snapshot.includeGlobalState(), metaDataForSnapshot(snapshot, clusterService.state().metaData()), snapshot.userMetadata(), @@ -782,7 +782,7 @@ private void finalizeSnapshotDeletionFromPreviousMaster(ClusterState state) { if (deletionsInProgress != null && deletionsInProgress.hasDeletionsInProgress()) { assert deletionsInProgress.getEntries().size() == 1 : "only one in-progress deletion allowed per cluster"; SnapshotDeletionsInProgress.Entry entry = deletionsInProgress.getEntries().get(0); - deleteSnapshotFromRepository(entry.getSnapshot(), null, entry.getRepositoryStateId(), + deleteSnapshotFromRepository(entry.getSnapshot(), null, entry.repositoryStateId(), state.nodes().getMinNodeVersion()); } } @@ -852,7 +852,7 @@ public void onResponse(Void aVoid) { public void onFailure(Exception e) { logger.warn("failed to clean up abandoned snapshot {} in INIT state", snapshot.snapshot()); } - }, updatedSnapshot.getRepositoryStateId(), false); + }, updatedSnapshot.repositoryStateId(), false); } assert updatedSnapshot.shards().size() == snapshot.shards().size() : "Shard count changed during snapshot status update from [" + snapshot + "] to [" + updatedSnapshot + "]"; @@ -1037,7 +1037,7 @@ protected void doRun() { failure, entry.shards().size(), unmodifiableList(shardFailures), - entry.getRepositoryStateId(), + entry.repositoryStateId(), entry.includeGlobalState(), metaDataForSnapshot(entry, metaData), entry.userMetadata(), @@ -1163,7 +1163,7 @@ public void deleteSnapshot(final String repositoryName, final String snapshotNam if (matchedInProgress.isPresent()) { matchedEntry = matchedInProgress.map(s -> s.snapshot().getSnapshotId()); // Derive repository generation if a snapshot is in progress because it will increment the generation when it finishes - repoGenId = matchedInProgress.get().getRepositoryStateId() + 1L; + repoGenId = matchedInProgress.get().repositoryStateId() + 1L; } } if (matchedEntry.isPresent() == false) { diff --git a/server/src/test/java/org/elasticsearch/repositories/RepositoriesModuleTests.java b/server/src/test/java/org/elasticsearch/repositories/RepositoriesModuleTests.java index cd31ce121b245..1fad6f62773af 100644 --- a/server/src/test/java/org/elasticsearch/repositories/RepositoriesModuleTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/RepositoriesModuleTests.java @@ -44,6 +44,7 @@ public class RepositoriesModuleTests extends ESTestCase { private RepositoryPlugin plugin2; private Repository.Factory factory; private ThreadPool threadPool; + private ClusterService clusterService; @Override public void setUp() throws Exception { @@ -51,6 +52,7 @@ public void setUp() throws Exception { environment = mock(Environment.class); contentRegistry = mock(NamedXContentRegistry.class); threadPool = mock(ThreadPool.class); + clusterService = mock(ClusterService.class); plugin1 = mock(RepositoryPlugin.class); plugin2 = mock(RepositoryPlugin.class); factory = mock(Repository.Factory.class); @@ -60,8 +62,8 @@ public void setUp() throws Exception { } public void testCanRegisterTwoRepositoriesWithDifferentTypes() { - when(plugin1.getRepositories(environment, contentRegistry, threadPool)).thenReturn(Collections.singletonMap("type1", factory)); - when(plugin2.getRepositories(environment, contentRegistry, threadPool)).thenReturn(Collections.singletonMap("type2", factory)); + when(plugin1.getRepositories(environment, contentRegistry, clusterService)).thenReturn(Collections.singletonMap("type1", factory)); + when(plugin2.getRepositories(environment, contentRegistry, clusterService)).thenReturn(Collections.singletonMap("type2", factory)); // Would throw new RepositoriesModule( @@ -69,37 +71,37 @@ public void testCanRegisterTwoRepositoriesWithDifferentTypes() { } public void testCannotRegisterTwoRepositoriesWithSameTypes() { - when(plugin1.getRepositories(environment, contentRegistry, threadPool)).thenReturn(Collections.singletonMap("type1", factory)); - when(plugin2.getRepositories(environment, contentRegistry, threadPool)).thenReturn(Collections.singletonMap("type1", factory)); + when(plugin1.getRepositories(environment, contentRegistry, clusterService)).thenReturn(Collections.singletonMap("type1", factory)); + when(plugin2.getRepositories(environment, contentRegistry, clusterService)).thenReturn(Collections.singletonMap("type1", factory)); IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, - () -> new RepositoriesModule(environment, repoPlugins, mock(TransportService.class), mock(ClusterService.class), + () -> new RepositoriesModule(environment, repoPlugins, mock(TransportService.class), clusterService, threadPool, contentRegistry)); assertEquals("Repository type [type1] is already registered", ex.getMessage()); } public void testCannotRegisterTwoInternalRepositoriesWithSameTypes() { - when(plugin1.getInternalRepositories(environment, contentRegistry, threadPool)) + when(plugin1.getInternalRepositories(environment, contentRegistry, clusterService)) .thenReturn(Collections.singletonMap("type1", factory)); - when(plugin2.getInternalRepositories(environment, contentRegistry, threadPool)) + when(plugin2.getInternalRepositories(environment, contentRegistry, clusterService)) .thenReturn(Collections.singletonMap("type1", factory)); IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, - () -> new RepositoriesModule(environment, repoPlugins, mock(TransportService.class), mock(ClusterService.class), + () -> new RepositoriesModule(environment, repoPlugins, mock(TransportService.class), clusterService, threadPool, contentRegistry)); assertEquals("Internal repository type [type1] is already registered", ex.getMessage()); } public void testCannotRegisterNormalAndInternalRepositoriesWithSameTypes() { - when(plugin1.getRepositories(environment, contentRegistry, threadPool)).thenReturn(Collections.singletonMap("type1", factory)); - when(plugin2.getInternalRepositories(environment, contentRegistry, threadPool)) + when(plugin1.getRepositories(environment, contentRegistry, clusterService)).thenReturn(Collections.singletonMap("type1", factory)); + when(plugin2.getInternalRepositories(environment, contentRegistry, clusterService)) .thenReturn(Collections.singletonMap("type1", factory)); IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, - () -> new RepositoriesModule(environment, repoPlugins, mock(TransportService.class), mock(ClusterService.class), - threadPool, contentRegistry)); + () -> new RepositoriesModule(environment, repoPlugins, mock(TransportService.class), clusterService, threadPool, + contentRegistry)); assertEquals("Internal repository type [type1] is already registered as a non-internal repository", ex.getMessage()); } diff --git a/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java b/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java index 0893a2568df17..2daa4afe3e149 100644 --- a/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java @@ -22,6 +22,7 @@ import org.apache.lucene.index.IndexCommit; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryRequest; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.RepositoryMetaData; @@ -214,6 +215,10 @@ public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, In return null; } + @Override + public void updateState(final ClusterState state) { + } + @Override public Lifecycle.State lifecycleState() { return null; diff --git a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java index 59a977efe5f5d..b5d99db0a880f 100644 --- a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java @@ -193,7 +193,8 @@ public void testSnapshotWithConflictingName() throws IOException { private Repository createRepository() { Settings settings = Settings.builder().put("location", randomAlphaOfLength(10)).build(); RepositoryMetaData repositoryMetaData = new RepositoryMetaData(randomAlphaOfLength(10), FsRepository.TYPE, settings); - final FsRepository repository = new FsRepository(repositoryMetaData, createEnvironment(), xContentRegistry(), threadPool) { + final FsRepository repository = new FsRepository(repositoryMetaData, createEnvironment(), xContentRegistry(), + BlobStoreTestUtil.mockClusterService()) { @Override protected void assertSnapshotOrGenericThread() { // eliminate thread name check as we create repo manually diff --git a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java index 425ccdfe8ccc2..6d6248c446df1 100644 --- a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java @@ -23,6 +23,7 @@ import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; @@ -41,7 +42,6 @@ import org.elasticsearch.snapshots.SnapshotState; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESSingleNodeTestCase; -import org.elasticsearch.threadpool.ThreadPool; import java.nio.file.Path; import java.util.Arrays; @@ -71,9 +71,9 @@ public static class FsLikeRepoPlugin extends Plugin implements RepositoryPlugin @Override public Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry, - ThreadPool threadPool) { + ClusterService clusterService) { return Collections.singletonMap(REPO_TYPE, - (metadata) -> new FsRepository(metadata, env, namedXContentRegistry, threadPool) { + (metadata) -> new FsRepository(metadata, env, namedXContentRegistry, clusterService) { @Override protected void assertSnapshotOrGenericThread() { // eliminate thread name check as we access blobStore on test/main threads diff --git a/server/src/test/java/org/elasticsearch/repositories/fs/FsRepositoryTests.java b/server/src/test/java/org/elasticsearch/repositories/fs/FsRepositoryTests.java index 5d5dc1d23e639..2b687fbcac228 100644 --- a/server/src/test/java/org/elasticsearch/repositories/fs/FsRepositoryTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/fs/FsRepositoryTests.java @@ -54,6 +54,7 @@ import org.elasticsearch.index.store.Store; import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.repositories.IndexId; +import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil; import org.elasticsearch.snapshots.Snapshot; import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.test.DummyShardLock; @@ -90,7 +91,8 @@ public void testSnapshotAndRestore() throws IOException, InterruptedException { int numDocs = indexDocs(directory); RepositoryMetaData metaData = new RepositoryMetaData("test", "fs", settings); - FsRepository repository = new FsRepository(metaData, new Environment(settings, null), NamedXContentRegistry.EMPTY, threadPool); + FsRepository repository = new FsRepository(metaData, new Environment(settings, null), NamedXContentRegistry.EMPTY, + BlobStoreTestUtil.mockClusterService()); repository.start(); final Settings indexSettings = Settings.builder().put(IndexMetaData.SETTING_INDEX_UUID, "myindexUUID").build(); IndexSettings idxSettings = IndexSettingsModule.newIndexSettings("myindex", indexSettings); diff --git a/server/src/test/java/org/elasticsearch/snapshots/MetadataLoadingDuringSnapshotRestoreIT.java b/server/src/test/java/org/elasticsearch/snapshots/MetadataLoadingDuringSnapshotRestoreIT.java index 364d2efe311b9..3d65782d4824d 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/MetadataLoadingDuringSnapshotRestoreIT.java +++ b/server/src/test/java/org/elasticsearch/snapshots/MetadataLoadingDuringSnapshotRestoreIT.java @@ -26,6 +26,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.RepositoryMetaData; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.env.Environment; @@ -35,7 +36,6 @@ import org.elasticsearch.repositories.Repository; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.snapshots.mockstore.MockRepository; -import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; import java.util.Collection; @@ -185,8 +185,8 @@ public static class CountingMockRepository extends MockRepository { public CountingMockRepository(final RepositoryMetaData metadata, final Environment environment, - final NamedXContentRegistry namedXContentRegistry, ThreadPool threadPool) { - super(metadata, environment, namedXContentRegistry, threadPool); + final NamedXContentRegistry namedXContentRegistry, ClusterService clusterService) { + super(metadata, environment, namedXContentRegistry, clusterService); } @Override @@ -206,9 +206,9 @@ public IndexMetaData getSnapshotIndexMetaData(SnapshotId snapshotId, IndexId ind public static class CountingMockRepositoryPlugin extends MockRepository.Plugin { @Override public Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry, - ThreadPool threadPool) { + ClusterService clusterService) { return Collections.singletonMap("coutingmock", - metadata -> new CountingMockRepository(metadata, env, namedXContentRegistry, threadPool)); + metadata -> new CountingMockRepository(metadata, env, namedXContentRegistry, clusterService)); } } } diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index c4d27ff13fca8..124bf290677bb 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -1226,23 +1226,15 @@ searchTransportService, new SearchPhaseController(searchService::createReduceCon private Repository.Factory getRepoFactory(Environment environment) { // Run half the tests with the eventually consistent repository if (blobStoreContext == null) { - return metaData -> { - final Repository repository = new FsRepository(metaData, environment, xContentRegistry(), threadPool) { - @Override - protected void assertSnapshotOrGenericThread() { - // eliminate thread name check as we create repo in the test thread - } - }; - repository.start(); - return repository; + return metaData -> new FsRepository(metaData, environment, xContentRegistry(), clusterService) { + @Override + protected void assertSnapshotOrGenericThread() { + // eliminate thread name check as we create repo in the test thread + } }; } else { - return metaData -> { - final Repository repository = new MockEventuallyConsistentRepository( - metaData, xContentRegistry(), deterministicTaskQueue.getThreadPool(), blobStoreContext, random()); - repository.start(); - return repository; - }; + return metaData -> + new MockEventuallyConsistentRepository(metaData, xContentRegistry(), clusterService, blobStoreContext, random()); } } public void restart() { diff --git a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java index 9727dc5f28283..0931ceb494827 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java +++ b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java @@ -21,6 +21,7 @@ import org.apache.lucene.codecs.CodecUtil; import org.elasticsearch.cluster.metadata.RepositoryMetaData; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobMetaData; @@ -37,7 +38,6 @@ import org.elasticsearch.repositories.blobstore.BlobStoreRepository; import org.elasticsearch.snapshots.SnapshotInfo; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.threadpool.ThreadPool; import java.io.ByteArrayInputStream; import java.io.IOException; @@ -74,10 +74,10 @@ public class MockEventuallyConsistentRepository extends BlobStoreRepository { public MockEventuallyConsistentRepository( final RepositoryMetaData metadata, final NamedXContentRegistry namedXContentRegistry, - final ThreadPool threadPool, + final ClusterService clusterService, final Context context, final Random random) { - super(metadata, namedXContentRegistry, threadPool, BlobPath.cleanPath()); + super(metadata, namedXContentRegistry, clusterService, BlobPath.cleanPath()); this.context = context; this.namedXContentRegistry = namedXContentRegistry; this.random = random; diff --git a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java index 5fdfbe9a93a4d..f1cf314e3158a 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java @@ -18,7 +18,6 @@ */ package org.elasticsearch.snapshots.mockstore; -import org.apache.lucene.util.SameThreadExecutorService; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.RepositoryMetaData; @@ -27,10 +26,10 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.repositories.ShardGenerations; import org.elasticsearch.repositories.blobstore.BlobStoreRepository; +import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil; import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.snapshots.SnapshotInfo; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.threadpool.ThreadPool; import java.io.ByteArrayInputStream; import java.io.IOException; @@ -41,8 +40,6 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.startsWith; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; public class MockEventuallyConsistentRepositoryTests extends ESTestCase { @@ -50,7 +47,7 @@ public void testReadAfterWriteConsistently() throws IOException { MockEventuallyConsistentRepository.Context blobStoreContext = new MockEventuallyConsistentRepository.Context(); try (BlobStoreRepository repository = new MockEventuallyConsistentRepository( new RepositoryMetaData("testRepo", "mockEventuallyConsistent", Settings.EMPTY), - xContentRegistry(), mock(ThreadPool.class), blobStoreContext, random())) { + xContentRegistry(), BlobStoreTestUtil.mockClusterService(), blobStoreContext, random())) { repository.start(); final BlobContainer blobContainer = repository.blobStore().blobContainer(repository.basePath()); final String blobName = randomAlphaOfLength(10); @@ -70,7 +67,7 @@ public void testReadAfterWriteAfterReadThrows() throws IOException { MockEventuallyConsistentRepository.Context blobStoreContext = new MockEventuallyConsistentRepository.Context(); try (BlobStoreRepository repository = new MockEventuallyConsistentRepository( new RepositoryMetaData("testRepo", "mockEventuallyConsistent", Settings.EMPTY), - xContentRegistry(), mock(ThreadPool.class), blobStoreContext, random())) { + xContentRegistry(), BlobStoreTestUtil.mockClusterService(), blobStoreContext, random())) { repository.start(); final BlobContainer blobContainer = repository.blobStore().blobContainer(repository.basePath()); final String blobName = randomAlphaOfLength(10); @@ -86,7 +83,7 @@ public void testReadAfterDeleteAfterWriteThrows() throws IOException { MockEventuallyConsistentRepository.Context blobStoreContext = new MockEventuallyConsistentRepository.Context(); try (BlobStoreRepository repository = new MockEventuallyConsistentRepository( new RepositoryMetaData("testRepo", "mockEventuallyConsistent", Settings.EMPTY), - xContentRegistry(), mock(ThreadPool.class), blobStoreContext, random())) { + xContentRegistry(), BlobStoreTestUtil.mockClusterService(), blobStoreContext, random())) { repository.start(); final BlobContainer blobContainer = repository.blobStore().blobContainer(repository.basePath()); final String blobName = randomAlphaOfLength(10); @@ -104,7 +101,7 @@ public void testOverwriteRandomBlobFails() throws IOException { MockEventuallyConsistentRepository.Context blobStoreContext = new MockEventuallyConsistentRepository.Context(); try (BlobStoreRepository repository = new MockEventuallyConsistentRepository( new RepositoryMetaData("testRepo", "mockEventuallyConsistent", Settings.EMPTY), - xContentRegistry(), mock(ThreadPool.class), blobStoreContext, random())) { + xContentRegistry(), BlobStoreTestUtil.mockClusterService(), blobStoreContext, random())) { repository.start(); final BlobContainer container = repository.blobStore().blobContainer(repository.basePath()); final String blobName = randomAlphaOfLength(10); @@ -121,7 +118,7 @@ public void testOverwriteShardSnapBlobFails() throws IOException { MockEventuallyConsistentRepository.Context blobStoreContext = new MockEventuallyConsistentRepository.Context(); try (BlobStoreRepository repository = new MockEventuallyConsistentRepository( new RepositoryMetaData("testRepo", "mockEventuallyConsistent", Settings.EMPTY), - xContentRegistry(), mock(ThreadPool.class), blobStoreContext, random())) { + xContentRegistry(), BlobStoreTestUtil.mockClusterService(), blobStoreContext, random())) { repository.start(); final BlobContainer container = repository.blobStore().blobContainer(repository.basePath().add("indices").add("someindex").add("0")); @@ -137,13 +134,9 @@ public void testOverwriteShardSnapBlobFails() throws IOException { public void testOverwriteSnapshotInfoBlob() { MockEventuallyConsistentRepository.Context blobStoreContext = new MockEventuallyConsistentRepository.Context(); - final ThreadPool threadPool = mock(ThreadPool.class); - when(threadPool.executor(ThreadPool.Names.SNAPSHOT)).thenReturn(new SameThreadExecutorService()); - when(threadPool.info(ThreadPool.Names.SNAPSHOT)).thenReturn( - new ThreadPool.Info(ThreadPool.Names.SNAPSHOT, ThreadPool.ThreadPoolType.FIXED, randomIntBetween(1, 10))); try (BlobStoreRepository repository = new MockEventuallyConsistentRepository( new RepositoryMetaData("testRepo", "mockEventuallyConsistent", Settings.EMPTY), - xContentRegistry(), threadPool, blobStoreContext, random())) { + xContentRegistry(), BlobStoreTestUtil.mockClusterService(), blobStoreContext, random())) { repository.start(); // We create a snap- blob for snapshot "foo" in the first generation diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java b/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java index 5744504845aa6..f0118d3c0b699 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java @@ -20,6 +20,7 @@ import org.apache.lucene.index.IndexCommit; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.RepositoryMetaData; @@ -142,4 +143,8 @@ public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, In @Override public void verify(String verificationToken, DiscoveryNode localNode) { } + + @Override + public void updateState(final ClusterState state) { + } } diff --git a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreTestUtil.java b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreTestUtil.java index 12b926f93a059..66c49db542dab 100644 --- a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreTestUtil.java +++ b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreTestUtil.java @@ -18,9 +18,16 @@ */ package org.elasticsearch.repositories.blobstore; +import org.apache.lucene.util.SameThreadExecutorService; import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateApplier; +import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.service.ClusterApplierService; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobMetaData; @@ -52,9 +59,11 @@ import java.util.Locale; import java.util.Map; import java.util.Set; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import static org.elasticsearch.test.ESTestCase.randomIntBetween; @@ -66,6 +75,11 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public final class BlobStoreTestUtil { @@ -275,4 +289,40 @@ public static void assertBlobsByPrefix(BlobStoreRepository repository, BlobPath } } } + + /** + * Creates a mocked {@link ClusterService} for use in {@link BlobStoreRepository} related tests that mocks out all the necessary + * functionality to make {@link BlobStoreRepository} work. + * + * @return Mock ClusterService + */ + public static ClusterService mockClusterService() { + final ThreadPool threadPool = mock(ThreadPool.class); + when(threadPool.executor(ThreadPool.Names.SNAPSHOT)).thenReturn(new SameThreadExecutorService()); + when(threadPool.generic()).thenReturn(new SameThreadExecutorService()); + when(threadPool.info(ThreadPool.Names.SNAPSHOT)).thenReturn( + new ThreadPool.Info(ThreadPool.Names.SNAPSHOT, ThreadPool.ThreadPoolType.FIXED, randomIntBetween(1, 10))); + final ClusterService clusterService = mock(ClusterService.class); + final ClusterApplierService clusterApplierService = mock(ClusterApplierService.class); + when(clusterService.getClusterApplierService()).thenReturn(clusterApplierService); + final AtomicReference currentState = new AtomicReference<>(ClusterState.EMPTY_STATE); + when(clusterService.state()).then(invocationOnMock -> currentState.get()); + final List appliers = new CopyOnWriteArrayList<>(); + doAnswer(invocation -> { + final ClusterStateUpdateTask task = ((ClusterStateUpdateTask) invocation.getArguments()[1]); + final ClusterState current = currentState.get(); + final ClusterState next = task.execute(current); + currentState.set(next); + appliers.forEach(applier -> applier.applyClusterState( + new ClusterChangedEvent((String) invocation.getArguments()[0], next, current))); + task.clusterStateProcessed((String) invocation.getArguments()[0], current, next); + return null; + }).when(clusterService).submitStateUpdateTask(anyString(), any(ClusterStateUpdateTask.class)); + doAnswer(invocation -> { + appliers.add((ClusterStateApplier) invocation.getArguments()[0]); + return null; + }).when(clusterService).addStateApplier(any(ClusterStateApplier.class)); + when(clusterApplierService.threadPool()).thenReturn(threadPool); + return clusterService; + } } diff --git a/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java b/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java index 9a157e11c7227..218c6f4eecac7 100644 --- a/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java +++ b/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java @@ -25,6 +25,7 @@ import org.apache.lucene.index.CorruptIndexException; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.cluster.metadata.RepositoryMetaData; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobMetaData; import org.elasticsearch.common.blobstore.BlobPath; @@ -40,7 +41,6 @@ import org.elasticsearch.plugins.RepositoryPlugin; import org.elasticsearch.repositories.Repository; import org.elasticsearch.repositories.fs.FsRepository; -import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; import java.io.InputStream; @@ -71,8 +71,9 @@ public static class Plugin extends org.elasticsearch.plugins.Plugin implements R @Override public Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry, - ThreadPool threadPool) { - return Collections.singletonMap("mock", (metadata) -> new MockRepository(metadata, env, namedXContentRegistry, threadPool)); + ClusterService clusterService) { + return Collections.singletonMap("mock", (metadata) -> + new MockRepository(metadata, env, namedXContentRegistry, clusterService)); } @Override @@ -113,8 +114,8 @@ public long getFailureCount() { private volatile boolean blocked = false; public MockRepository(RepositoryMetaData metadata, Environment environment, - NamedXContentRegistry namedXContentRegistry, ThreadPool threadPool) { - super(overrideSettings(metadata, environment), environment, namedXContentRegistry, threadPool); + NamedXContentRegistry namedXContentRegistry, ClusterService clusterService) { + super(overrideSettings(metadata, environment), environment, namedXContentRegistry, clusterService); randomControlIOExceptionRate = metadata.settings().getAsDouble("random_control_io_exception_rate", 0.0); randomDataFileIOExceptionRate = metadata.settings().getAsDouble("random_data_file_io_exception_rate", 0.0); useLuceneCorruptionException = metadata.settings().getAsBoolean("use_lucene_corruption", false); diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java index 9bf15e2ab030c..0fd087a47c44e 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java @@ -339,9 +339,10 @@ public List> getExecutorBuilders(Settings settings) { @Override public Map getInternalRepositories(Environment env, NamedXContentRegistry namedXContentRegistry, - ThreadPool threadPool) { + ClusterService clusterService) { Repository.Factory repositoryFactory = - (metadata) -> new CcrRepository(metadata, client, ccrLicenseChecker, settings, ccrSettings.get(), threadPool); + (metadata) -> new CcrRepository(metadata, client, ccrLicenseChecker, settings, ccrSettings.get(), + clusterService.getClusterApplierService().threadPool()); return Collections.singletonMap(CcrRepository.TYPE, repositoryFactory); } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java index 820a965e3b134..20c098ee7f82a 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java @@ -22,6 +22,7 @@ import org.elasticsearch.action.support.ThreadedActionListener; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.metadata.MetaData; @@ -420,6 +421,10 @@ public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, In throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE); } + @Override + public void updateState(ClusterState state) { + } + private void updateMappings(Client leaderClient, Index leaderIndex, long leaderMappingVersion, Client followerClient, Index followerIndex) { final PlainActionFuture indexMetadataFuture = new PlainActionFuture<>(); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackPlugin.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackPlugin.java index 1b20ceae9233e..02fd885b3e350 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackPlugin.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackPlugin.java @@ -312,7 +312,7 @@ public static Path resolveConfigFile(Environment env, String name) { @Override public Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry, - ThreadPool threadPool) { + ClusterService clusterService) { return Collections.singletonMap("source", SourceOnlySnapshotRepository.newRepositoryFactory()); } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotIT.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotIT.java index d6e94fc8ce112..899c300493ab4 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotIT.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotIT.java @@ -16,6 +16,7 @@ import org.elasticsearch.client.Client; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.metadata.MappingMetaData; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; @@ -35,7 +36,6 @@ import org.elasticsearch.search.slice.SliceBuilder; import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.test.ESIntegTestCase; -import org.elasticsearch.threadpool.ThreadPool; import org.hamcrest.Matchers; import java.io.IOException; @@ -70,7 +70,7 @@ protected boolean addMockInternalEngine() { public static final class MyPlugin extends Plugin implements RepositoryPlugin, EnginePlugin { @Override public Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry, - ThreadPool threadPool) { + ClusterService clusterService) { return Collections.singletonMap("source", SourceOnlySnapshotRepository.newRepositoryFactory()); } @Override diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java index f5f8287f2c5c1..03ca8d5cfff28 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java @@ -61,6 +61,7 @@ import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.Repository; import org.elasticsearch.repositories.ShardGenerations; +import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil; import org.elasticsearch.repositories.blobstore.ESBlobStoreRepositoryIntegTestCase; import org.elasticsearch.repositories.fs.FsRepository; import org.elasticsearch.threadpool.ThreadPool; @@ -348,10 +349,10 @@ private Environment createEnvironment() { } /** Create a {@link Repository} with a random name **/ - private Repository createRepository() throws IOException { + private Repository createRepository() { Settings settings = Settings.builder().put("location", randomAlphaOfLength(10)).build(); RepositoryMetaData repositoryMetaData = new RepositoryMetaData(randomAlphaOfLength(10), FsRepository.TYPE, settings); - return new FsRepository(repositoryMetaData, createEnvironment(), xContentRegistry(), threadPool); + return new FsRepository(repositoryMetaData, createEnvironment(), xContentRegistry(), BlobStoreTestUtil.mockClusterService()); } private static void runAsSnapshot(ThreadPool pool, Runnable runnable) { diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java index 6952342040c5d..accea03f9b68a 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java @@ -406,19 +406,21 @@ public List> getPersistentTasksExecutor(ClusterServic @Override public Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry, - ThreadPool threadPool) { - HashMap repositories = new HashMap<>(super.getRepositories(env, namedXContentRegistry, threadPool)); - filterPlugins(RepositoryPlugin.class).forEach(r -> repositories.putAll(r.getRepositories(env, namedXContentRegistry, threadPool))); + ClusterService clusterService) { + HashMap repositories = + new HashMap<>(super.getRepositories(env, namedXContentRegistry, clusterService)); + filterPlugins(RepositoryPlugin.class).forEach( + r -> repositories.putAll(r.getRepositories(env, namedXContentRegistry, clusterService))); return repositories; } @Override public Map getInternalRepositories(Environment env, NamedXContentRegistry namedXContentRegistry, - ThreadPool threadPool) { + ClusterService clusterService) { HashMap internalRepositories = - new HashMap<>(super.getInternalRepositories(env, namedXContentRegistry, threadPool)); + new HashMap<>(super.getInternalRepositories(env, namedXContentRegistry, clusterService)); filterPlugins(RepositoryPlugin.class).forEach(r -> - internalRepositories.putAll(r.getInternalRepositories(env, namedXContentRegistry, threadPool))); + internalRepositories.putAll(r.getInternalRepositories(env, namedXContentRegistry, clusterService))); return internalRepositories; }