diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/blobstore/cache/BlobStoreCacheService.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/blobstore/cache/BlobStoreCacheService.java index 88da0a9a9fe3e..d0f93b373cab1 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/blobstore/cache/BlobStoreCacheService.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/blobstore/cache/BlobStoreCacheService.java @@ -11,10 +11,8 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.ElasticsearchTimeoutException; import org.elasticsearch.ExceptionsHelper; -import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexRequest; @@ -24,28 +22,19 @@ import org.elasticsearch.client.Client; import org.elasticsearch.client.OriginSettingClient; import org.elasticsearch.cluster.block.ClusterBlockException; -import org.elasticsearch.cluster.metadata.IndexMetadata; -import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.index.IndexSettings; -import org.elasticsearch.index.translog.Translog; import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.ConnectTransportException; -import org.elasticsearch.xpack.cluster.routing.allocation.DataTierAllocationDecider; -import java.io.IOException; import java.time.Instant; import java.util.concurrent.TimeUnit; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; -import static org.elasticsearch.index.mapper.MapperService.SINGLE_MAPPING_NAME; import static org.elasticsearch.xpack.core.ClientHelper.SEARCHABLE_SNAPSHOTS_ORIGIN; -import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.DATA_TIERS_PREFERENCE; public class BlobStoreCacheService { @@ -53,151 +42,16 @@ public class BlobStoreCacheService { public static final int DEFAULT_CACHED_BLOB_SIZE = ByteSizeUnit.KB.toIntBytes(4); - private final ClusterService clusterService; private final ThreadPool threadPool; private final Client client; private final String index; - public BlobStoreCacheService(ClusterService clusterService, ThreadPool threadPool, Client client, String index) { + public BlobStoreCacheService(ThreadPool threadPool, Client client, String index) { this.client = new OriginSettingClient(client, SEARCHABLE_SNAPSHOTS_ORIGIN); - this.clusterService = clusterService; this.threadPool = threadPool; this.index = index; } - private void createIndexIfNecessary(ActionListener listener) { - if (clusterService.state().routingTable().hasIndex(index)) { - listener.onResponse(index); - return; - } - try { - client.admin() - .indices() - .prepareCreate(index) - .setSettings(indexSettings()) - .addMapping(SINGLE_MAPPING_NAME, mappings()) - .execute(new ActionListener() { - @Override - public void onResponse(CreateIndexResponse createIndexResponse) { - assert createIndexResponse.index().equals(index); - listener.onResponse(createIndexResponse.index()); - } - - @Override - public void onFailure(Exception e) { - if (e instanceof ResourceAlreadyExistsException - || ExceptionsHelper.unwrapCause(e) instanceof ResourceAlreadyExistsException) { - listener.onResponse(index); - } else { - listener.onFailure(e); - } - } - }); - } catch (Exception e) { - listener.onFailure(e); - } - } - - private static Settings indexSettings() { - return Settings.builder() - .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetadata.SETTING_AUTO_EXPAND_REPLICAS, "0-1") - .put(IndexMetadata.SETTING_PRIORITY, "900") - .put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), Translog.Durability.ASYNC) - .put(DataTierAllocationDecider.INDEX_ROUTING_PREFER, DATA_TIERS_PREFERENCE) - .build(); - } - - private static XContentBuilder mappings() throws IOException { - final XContentBuilder builder = jsonBuilder(); - { - builder.startObject(); - { - builder.startObject(SINGLE_MAPPING_NAME); - builder.field("dynamic", "strict"); - { - builder.startObject("_meta"); - builder.field("version", Version.CURRENT); - builder.endObject(); - } - { - builder.startObject("properties"); - { - builder.startObject("type"); - builder.field("type", "keyword"); - builder.endObject(); - } - { - builder.startObject("creation_time"); - builder.field("type", "date"); - builder.field("format", "epoch_millis"); - builder.endObject(); - } - { - builder.startObject("version"); - builder.field("type", "integer"); - builder.endObject(); - } - { - builder.startObject("repository"); - builder.field("type", "keyword"); - builder.endObject(); - } - { - builder.startObject("blob"); - builder.field("type", "object"); - { - builder.startObject("properties"); - { - builder.startObject("name"); - builder.field("type", "keyword"); - builder.endObject(); - builder.startObject("path"); - builder.field("type", "keyword"); - builder.endObject(); - } - builder.endObject(); - } - builder.endObject(); - } - { - builder.startObject("data"); - builder.field("type", "object"); - { - builder.startObject("properties"); - { - builder.startObject("content"); - builder.field("type", "binary"); - builder.endObject(); - } - { - builder.startObject("length"); - builder.field("type", "long"); - builder.endObject(); - } - { - builder.startObject("from"); - builder.field("type", "long"); - builder.endObject(); - } - { - builder.startObject("to"); - builder.field("type", "long"); - builder.endObject(); - } - builder.endObject(); - } - builder.endObject(); - } - builder.endObject(); - } - builder.endObject(); - } - builder.endObject(); - } - return builder; - } - public CachedBlob get(String repository, String name, String path, long offset) { assert Thread.currentThread().getName().contains('[' + ThreadPool.Names.SYSTEM_READ + ']') == false : "must not block [" + Thread.currentThread().getName() @@ -267,52 +121,37 @@ private static boolean isExpectedCacheGetException(Exception e) { } public void putAsync(String repository, String name, String path, long offset, BytesReference content, ActionListener listener) { - createIndexIfNecessary(new ActionListener() { - @Override - public void onResponse(String s) { - final IndexRequest request; - try { - final CachedBlob cachedBlob = new CachedBlob( - Instant.ofEpochMilli(threadPool.absoluteTimeInMillis()), - Version.CURRENT, - repository, - name, - path, - content, - offset - ); - request = new IndexRequest(index).id(cachedBlob.generatedId()); - try (XContentBuilder builder = jsonBuilder()) { - request.source(cachedBlob.toXContent(builder, ToXContent.EMPTY_PARAMS)); - } + try { + final CachedBlob cachedBlob = new CachedBlob( + Instant.ofEpochMilli(threadPool.absoluteTimeInMillis()), + Version.CURRENT, + repository, + name, + path, + content, + offset + ); + final IndexRequest request = new IndexRequest(index).id(cachedBlob.generatedId()); + try (XContentBuilder builder = jsonBuilder()) { + request.source(cachedBlob.toXContent(builder, ToXContent.EMPTY_PARAMS)); + } - client.index(request, new ActionListener() { - @Override - public void onResponse(IndexResponse indexResponse) { - logger.trace("cache fill ({}): [{}]", indexResponse.status(), request.id()); - listener.onResponse(null); - } + client.index(request, new ActionListener() { + @Override + public void onResponse(IndexResponse indexResponse) { + logger.trace("cache fill ({}): [{}]", indexResponse.status(), request.id()); + listener.onResponse(null); + } - @Override - public void onFailure(Exception e) { - logger.debug(new ParameterizedMessage("failure in cache fill: [{}]", request.id()), e); - listener.onFailure(e); - } - }); - } catch (Exception e) { - logger.warn( - new ParameterizedMessage("cache fill failure: [{}]", CachedBlob.generateId(repository, name, path, offset)), - e - ); + @Override + public void onFailure(Exception e) { + logger.debug(new ParameterizedMessage("failure in cache fill: [{}]", request.id()), e); listener.onFailure(e); } - } - - @Override - public void onFailure(Exception e) { - logger.error(() -> new ParameterizedMessage("failed to create blob cache system index [{}]", index), e); - listener.onFailure(e); - } - }); + }); + } catch (Exception e) { + logger.warn(new ParameterizedMessage("cache fill failure: [{}]", CachedBlob.generateId(repository, name, path, offset)), e); + listener.onFailure(e); + } } } diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java index dc4f697d7d5d3..a3e868d5caf58 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java @@ -6,10 +6,12 @@ package org.elasticsearch.xpack.searchablesnapshots; import org.apache.lucene.util.SetOnce; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.blobstore.cache.BlobStoreCacheService; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; @@ -28,6 +30,7 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.IndexModule; @@ -37,6 +40,7 @@ import org.elasticsearch.index.engine.ReadOnlyEngine; import org.elasticsearch.index.store.SearchableSnapshotDirectory; import org.elasticsearch.index.store.Store; +import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.TranslogStats; import org.elasticsearch.indices.SystemIndexDescriptor; import org.elasticsearch.indices.recovery.SearchableSnapshotRecoveryState; @@ -56,6 +60,7 @@ import org.elasticsearch.threadpool.ScalingExecutorBuilder; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.watcher.ResourceWatcherService; +import org.elasticsearch.xpack.cluster.routing.allocation.DataTierAllocationDecider; import org.elasticsearch.xpack.core.DataTier; import org.elasticsearch.xpack.core.XPackPlugin; import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotAction; @@ -75,6 +80,8 @@ import org.elasticsearch.xpack.searchablesnapshots.rest.RestRepositoryStatsAction; import org.elasticsearch.xpack.searchablesnapshots.rest.RestSearchableSnapshotsStatsAction; +import java.io.IOException; +import java.io.UncheckedIOException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -86,6 +93,9 @@ import static java.util.Collections.emptyList; import static java.util.Collections.singletonList; +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.elasticsearch.index.mapper.MapperService.SINGLE_MAPPING_NAME; +import static org.elasticsearch.xpack.core.ClientHelper.SEARCHABLE_SNAPSHOTS_ORIGIN; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants.CACHE_FETCH_ASYNC_THREAD_POOL_NAME; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants.CACHE_FETCH_ASYNC_THREAD_POOL_SETTING; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants.CACHE_PREWARMING_THREAD_POOL_NAME; @@ -229,12 +239,7 @@ public Collection createComponents( final CacheService cacheService = new CacheService(settings, clusterService, threadPool, new PersistentCache(nodeEnvironment)); this.cacheService.set(cacheService); components.add(cacheService); - final BlobStoreCacheService blobStoreCacheService = new BlobStoreCacheService( - clusterService, - threadPool, - client, - SNAPSHOT_BLOB_CACHE_INDEX - ); + final BlobStoreCacheService blobStoreCacheService = new BlobStoreCacheService(threadPool, client, SNAPSHOT_BLOB_CACHE_INDEX); this.blobStoreCacheService.set(blobStoreCacheService); components.add(blobStoreCacheService); } else { @@ -273,7 +278,15 @@ public List getIndexFoldersDeletionListeners() { @Override public Collection getSystemIndexDescriptors(Settings settings) { return org.elasticsearch.common.collect.List.of( - new SystemIndexDescriptor(SNAPSHOT_BLOB_CACHE_INDEX, "Contains cached data of blob store repositories") + SystemIndexDescriptor.builder() + .setIndexPattern(SNAPSHOT_BLOB_CACHE_INDEX) + .setDescription("Contains cached data of blob store repositories") + .setPrimaryIndex(SNAPSHOT_BLOB_CACHE_INDEX) + .setMappings(getIndexMappings()) + .setSettings(getIndexSettings()) + .setOrigin(SEARCHABLE_SNAPSHOTS_ORIGIN) + .setVersionMetaKey("version") + .build() ); } @@ -407,4 +420,108 @@ public CacheService get() { return cacheService; } } + + private Settings getIndexSettings() { + return Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_AUTO_EXPAND_REPLICAS, "0-1") + .put(IndexMetadata.SETTING_PRIORITY, "900") + .put(DataTierAllocationDecider.INDEX_ROUTING_PREFER, DATA_TIERS_PREFERENCE) + .put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), Translog.Durability.ASYNC) + .build(); + } + + private XContentBuilder getIndexMappings() { + try { + final XContentBuilder builder = jsonBuilder(); + { + builder.startObject(); + { + builder.startObject(SINGLE_MAPPING_NAME); + builder.field("dynamic", "strict"); + { + builder.startObject("_meta"); + builder.field("version", Version.CURRENT); + builder.endObject(); + } + { + builder.startObject("properties"); + { + builder.startObject("type"); + builder.field("type", "keyword"); + builder.endObject(); + } + { + builder.startObject("creation_time"); + builder.field("type", "date"); + builder.field("format", "epoch_millis"); + builder.endObject(); + } + { + builder.startObject("version"); + builder.field("type", "integer"); + builder.endObject(); + } + { + builder.startObject("repository"); + builder.field("type", "keyword"); + builder.endObject(); + } + { + builder.startObject("blob"); + builder.field("type", "object"); + { + builder.startObject("properties"); + { + builder.startObject("name"); + builder.field("type", "keyword"); + builder.endObject(); + builder.startObject("path"); + builder.field("type", "keyword"); + builder.endObject(); + } + builder.endObject(); + } + builder.endObject(); + } + { + builder.startObject("data"); + builder.field("type", "object"); + { + builder.startObject("properties"); + { + builder.startObject("content"); + builder.field("type", "binary"); + builder.endObject(); + } + { + builder.startObject("length"); + builder.field("type", "long"); + builder.endObject(); + } + { + builder.startObject("from"); + builder.field("type", "long"); + builder.endObject(); + } + { + builder.startObject("to"); + builder.field("type", "long"); + builder.endObject(); + } + builder.endObject(); + } + builder.endObject(); + } + builder.endObject(); + } + builder.endObject(); + } + builder.endObject(); + } + return builder; + } catch (IOException e) { + throw new UncheckedIOException("Failed to build " + SNAPSHOT_BLOB_CACHE_INDEX + " index mappings", e); + } + } } diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/TestUtils.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/TestUtils.java index f0499daa0ac0f..14472bafbb6ec 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/TestUtils.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/TestUtils.java @@ -308,7 +308,7 @@ private static Client mockClient() { public static class NoopBlobStoreCacheService extends BlobStoreCacheService { public NoopBlobStoreCacheService() { - super(null, null, mockClient(), null); + super(null, mockClient(), null); } @Override