From e14d9c9514b07218603ec0d978ab51b14bd8fb2d Mon Sep 17 00:00:00 2001 From: David Turner Date: Thu, 27 Aug 2020 06:38:32 +0100 Subject: [PATCH] Introduce cache index for searchable snapshots (#61595) If a searchable snapshot shard fails (e.g. its node leaves the cluster) we want to be able to start it up again on a different node as quickly as possible to avoid unnecessarily blocking or failing searches. It isn't feasible to fully restore such shards in an acceptably short time. In particular we would like to be able to deal with the `can_match` phase of a search ASAP so that we can skip unnecessary waiting on shards that may still be warming up but which are not required for the search. This commit solves this problem by introducing a system index that holds much of the data required to start a shard. Today(*) this means it holds the contents of every file with size <8kB, and the first 4kB of every other file in the shard. This system index acts as a second-level cache, behind the first-level node-local disk cache but in front of the blob store itself. Reading chunks from the index is slower than reading them directly from disk, but faster than reading them from the blob store, and is also replicated and accessible to all nodes in the cluster. (*) the exact heuristics for what we should put into the system index are still under investigation and may change in future. This second-level cache is populated when we attempt to read a chunk which is missing from both levels of cache and must therefore be read from the blob store. We also introduce `SearchableSnapshotsBlobStoreCacheIntegTests` which verify that we do not hit the blob store more than necessary when starting up a shard that we've seen before, whether due to a node restart or because a snapshot was mounted multiple times. Backport of #60522 Co-authored-by: Tanguy Leroux --- .../test/rest/ESRestTestCase.java | 1 + .../xpack/core/ClientHelper.java | 1 + .../SearchableSnapshotShardStats.java | 56 +- .../SearchableSnapshotsConstants.java | 2 + .../SearchableSnapshotShardStatsTests.java | 5 +- .../resources/rest-api-spec/test/stats.yml | 12 + .../cache/BlobStoreCacheService.java | 349 +++++++++++++ .../blobstore/cache/CachedBlob.java | 189 +++++++ .../BaseSearchableSnapshotIndexInput.java | 55 -- .../index/store/IndexInputStats.java | 34 ++ .../store/SearchableSnapshotDirectory.java | 34 +- .../index/store/cache/CacheFile.java | 45 +- .../cache/CachedBlobContainerIndexInput.java | 491 ++++++++++++++---- .../index/store/cache/SparseFileTracker.java | 91 +++- .../ChecksumBlobContainerIndexInput.java | 26 +- .../direct/DirectBlobContainerIndexInput.java | 1 + .../SearchableSnapshots.java | 35 +- ...ansportSearchableSnapshotsStatsAction.java | 5 +- ...ableSnapshotsBlobStoreCacheIntegTests.java | 458 ++++++++++++++++ ...SearchableSnapshotDirectoryStatsTests.java | 24 +- .../SearchableSnapshotDirectoryTests.java | 6 +- .../CachedBlobContainerIndexInputTests.java | 52 +- .../store/cache/SparseFileTrackerTests.java | 79 ++- .../index/store/cache/TestUtils.java | 37 ++ .../BaseSearchableSnapshotsIntegTestCase.java | 63 +++ ...SnapshotRecoveryStateIntegrationTests.java | 2 + .../SearchableSnapshotsIntegTests.java | 46 +- ...SearchableSnapshotsStatsResponseTests.java | 5 +- .../security/authz/AuthorizationUtils.java | 2 + 29 files changed, 1927 insertions(+), 279 deletions(-) create mode 100644 x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/blobstore/cache/BlobStoreCacheService.java create mode 100644 x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/blobstore/cache/CachedBlob.java create mode 100644 x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/blobstore/cache/SearchableSnapshotsBlobStoreCacheIntegTests.java diff --git a/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java index 9b8884d30ef86..a253c7e613fa6 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java @@ -1261,6 +1261,7 @@ protected static boolean isXPackTemplate(String name) { case "metrics": case "metrics-settings": case "metrics-mappings": + case ".snapshot-blob-cache": return true; default: return false; diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ClientHelper.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ClientHelper.java index 8487d4158fd67..0c5b2aee6f629 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ClientHelper.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ClientHelper.java @@ -71,6 +71,7 @@ public static Map filterSecurityHeaders(Map head public static final String ASYNC_SEARCH_ORIGIN = "async_search"; public static final String IDP_ORIGIN = "idp"; public static final String STACK_ORIGIN = "stack"; + public static final String SEARCHABLE_SNAPSHOTS_ORIGIN = "searchable_snapshots"; private ClientHelper() {} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/searchablesnapshots/SearchableSnapshotShardStats.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/searchablesnapshots/SearchableSnapshotShardStats.java index 4d66aa351332c..8883fa6364bac 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/searchablesnapshots/SearchableSnapshotShardStats.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/searchablesnapshots/SearchableSnapshotShardStats.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.core.searchablesnapshots; +import org.elasticsearch.Version; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -134,16 +135,20 @@ public static class CacheIndexInputStats implements Writeable, ToXContentObject private final Counter contiguousReads; private final Counter nonContiguousReads; private final Counter cachedBytesRead; + private final Counter indexCacheBytesRead; private final TimedCounter cachedBytesWritten; private final TimedCounter directBytesRead; private final TimedCounter optimizedBytesRead; + private final Counter blobStoreBytesRequested; + private final long currentIndexCacheFills; public CacheIndexInputStats(String fileName, long fileLength, long openCount, long closeCount, Counter forwardSmallSeeks, Counter backwardSmallSeeks, Counter forwardLargeSeeks, Counter backwardLargeSeeks, Counter contiguousReads, Counter nonContiguousReads, - Counter cachedBytesRead, TimedCounter cachedBytesWritten, - TimedCounter directBytesRead, TimedCounter optimizedBytesRead) { + Counter cachedBytesRead, Counter indexCacheBytesRead, + TimedCounter cachedBytesWritten, TimedCounter directBytesRead, TimedCounter optimizedBytesRead, + Counter blobStoreBytesRequested, long currentIndexCacheFills) { this.fileName = fileName; this.fileLength = fileLength; this.openCount = openCount; @@ -155,9 +160,12 @@ public CacheIndexInputStats(String fileName, long fileLength, long openCount, lo this.contiguousReads = contiguousReads; this.nonContiguousReads = nonContiguousReads; this.cachedBytesRead = cachedBytesRead; + this.indexCacheBytesRead = indexCacheBytesRead; this.cachedBytesWritten = cachedBytesWritten; this.directBytesRead = directBytesRead; this.optimizedBytesRead = optimizedBytesRead; + this.blobStoreBytesRequested = blobStoreBytesRequested; + this.currentIndexCacheFills = currentIndexCacheFills; } CacheIndexInputStats(final StreamInput in) throws IOException { @@ -172,9 +180,21 @@ public CacheIndexInputStats(String fileName, long fileLength, long openCount, lo this.contiguousReads = new Counter(in); this.nonContiguousReads = new Counter(in); this.cachedBytesRead = new Counter(in); + if (in.getVersion().onOrAfter(Version.V_7_10_0)) { + this.indexCacheBytesRead = new Counter(in); + } else { + this.indexCacheBytesRead = new Counter(0, 0, 0, 0); + } this.cachedBytesWritten = new TimedCounter(in); this.directBytesRead = new TimedCounter(in); this.optimizedBytesRead = new TimedCounter(in); + if (in.getVersion().onOrAfter(Version.V_7_10_0)) { + this.blobStoreBytesRequested = new Counter(in); + this.currentIndexCacheFills = in.readVLong(); + } else { + this.blobStoreBytesRequested = new Counter(0, 0, 0, 0); + this.currentIndexCacheFills = 0; + } } @Override @@ -191,9 +211,16 @@ public void writeTo(StreamOutput out) throws IOException { contiguousReads.writeTo(out); nonContiguousReads.writeTo(out); cachedBytesRead.writeTo(out); + if (out.getVersion().onOrAfter(Version.V_7_10_0)) { + indexCacheBytesRead.writeTo(out); + } cachedBytesWritten.writeTo(out); directBytesRead.writeTo(out); optimizedBytesRead.writeTo(out); + if (out.getVersion().onOrAfter(Version.V_7_10_0)) { + blobStoreBytesRequested.writeTo(out); + out.writeVLong(currentIndexCacheFills); + } } public String getFileName() { @@ -240,6 +267,10 @@ public Counter getCachedBytesRead() { return cachedBytesRead; } + public Counter getIndexCacheBytesRead() { + return indexCacheBytesRead; + } + public TimedCounter getCachedBytesWritten() { return cachedBytesWritten; } @@ -252,6 +283,14 @@ public TimedCounter getOptimizedBytesRead() { return optimizedBytesRead; } + public Counter getBlobStoreBytesRequested() { + return blobStoreBytesRequested; + } + + public long getCurrentIndexCacheFills() { + return currentIndexCacheFills; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); @@ -263,6 +302,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field("contiguous_bytes_read", getContiguousReads()); builder.field("non_contiguous_bytes_read", getNonContiguousReads()); builder.field("cached_bytes_read", getCachedBytesRead()); + builder.field("index_cache_bytes_read", getIndexCacheBytesRead()); builder.field("cached_bytes_written", getCachedBytesWritten()); builder.field("direct_bytes_read", getDirectBytesRead()); builder.field("optimized_bytes_read", getOptimizedBytesRead()); @@ -278,6 +318,8 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field("large", getBackwardLargeSeeks()); builder.endObject(); } + builder.field("blob_store_bytes_requested", getBlobStoreBytesRequested()); + builder.field("current_index_cache_fills", getCurrentIndexCacheFills()); } return builder.endObject(); } @@ -302,9 +344,12 @@ public boolean equals(Object other) { && Objects.equals(contiguousReads, stats.contiguousReads) && Objects.equals(nonContiguousReads, stats.nonContiguousReads) && Objects.equals(cachedBytesRead, stats.cachedBytesRead) + && Objects.equals(indexCacheBytesRead, stats.indexCacheBytesRead) && Objects.equals(cachedBytesWritten, stats.cachedBytesWritten) && Objects.equals(directBytesRead, stats.directBytesRead) - && Objects.equals(optimizedBytesRead, stats.optimizedBytesRead); + && Objects.equals(optimizedBytesRead, stats.optimizedBytesRead) + && Objects.equals(blobStoreBytesRequested, stats.blobStoreBytesRequested) + && currentIndexCacheFills == stats.currentIndexCacheFills; } @Override @@ -313,8 +358,9 @@ public int hashCode() { forwardSmallSeeks, backwardSmallSeeks, forwardLargeSeeks, backwardLargeSeeks, contiguousReads, nonContiguousReads, - cachedBytesRead, cachedBytesWritten, - directBytesRead, optimizedBytesRead); + cachedBytesRead, indexCacheBytesRead, + cachedBytesWritten, directBytesRead, optimizedBytesRead, + blobStoreBytesRequested, currentIndexCacheFills); } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsConstants.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsConstants.java index e8322010c5d20..d160a91883a85 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsConstants.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsConstants.java @@ -42,4 +42,6 @@ public static boolean isSearchableSnapshotStore(Settings indexSettings) { public static final String CACHE_PREWARMING_THREAD_POOL_NAME = "searchable_snapshots_cache_prewarming"; public static final String CACHE_PREWARMING_THREAD_POOL_SETTING = "xpack.searchable_snapshots.cache_prewarming_thread_pool"; + + public static final String SNAPSHOT_BLOB_CACHE_INDEX = ".snapshot-blob-cache"; } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/searchablesnapshots/SearchableSnapshotShardStatsTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/searchablesnapshots/SearchableSnapshotShardStatsTests.java index dade0a0ca4204..d57d0746db417 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/searchablesnapshots/SearchableSnapshotShardStatsTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/searchablesnapshots/SearchableSnapshotShardStatsTests.java @@ -46,8 +46,9 @@ private CacheIndexInputStats randomCacheIndexInputStats() { randomCounter(), randomCounter(), randomCounter(), randomCounter(), randomCounter(), randomCounter(), - randomCounter(), randomTimedCounter(), - randomTimedCounter(), randomTimedCounter()); + randomCounter(), randomCounter(), randomTimedCounter(), + randomTimedCounter(), randomTimedCounter(), + randomCounter(), randomNonNegativeLong()); } private Counter randomCounter() { diff --git a/x-pack/plugin/searchable-snapshots/qa/rest/src/test/resources/rest-api-spec/test/stats.yml b/x-pack/plugin/searchable-snapshots/qa/rest/src/test/resources/rest-api-spec/test/stats.yml index 6f3cd6271eefb..a0256f8b483aa 100644 --- a/x-pack/plugin/searchable-snapshots/qa/rest/src/test/resources/rest-api-spec/test/stats.yml +++ b/x-pack/plugin/searchable-snapshots/qa/rest/src/test/resources/rest-api-spec/test/stats.yml @@ -164,6 +164,11 @@ teardown: - gte: { indices.docs.shards.0.0.files.0.cached_bytes_read.min: 0 } - gte: { indices.docs.shards.0.0.files.0.cached_bytes_read.max: 0 } + - gte: { indices.docs.shards.0.0.files.0.index_cache_bytes_read.count: 0 } + - gte: { indices.docs.shards.0.0.files.0.index_cache_bytes_read.sum: 0 } + - gte: { indices.docs.shards.0.0.files.0.index_cache_bytes_read.min: 0 } + - gte: { indices.docs.shards.0.0.files.0.index_cache_bytes_read.max: 0 } + - gte: { indices.docs.shards.0.0.files.0.cached_bytes_written.count: 0 } - gte: { indices.docs.shards.0.0.files.0.cached_bytes_written.sum: 0 } - gte: { indices.docs.shards.0.0.files.0.cached_bytes_written.min: 0 } @@ -203,6 +208,13 @@ teardown: - gte: { indices.docs.shards.0.0.files.0.backward_seeks.large.min: 0 } - gte: { indices.docs.shards.0.0.files.0.backward_seeks.large.max: 0 } + - gte: { indices.docs.shards.0.0.files.0.blob_store_bytes_requested.count: 0 } + - gte: { indices.docs.shards.0.0.files.0.blob_store_bytes_requested.sum: 0 } + - gte: { indices.docs.shards.0.0.files.0.blob_store_bytes_requested.min: 0 } + - gte: { indices.docs.shards.0.0.files.0.blob_store_bytes_requested.max: 0 } + + - gte: { indices.docs.shards.0.0.files.0.current_index_cache_fills: 0 } + - do: searchable_snapshots.stats: index: "d*" diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/blobstore/cache/BlobStoreCacheService.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/blobstore/cache/BlobStoreCacheService.java new file mode 100644 index 0000000000000..84788f36ee1de --- /dev/null +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/blobstore/cache/BlobStoreCacheService.java @@ -0,0 +1,349 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.blobstore.cache; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.ElasticsearchTimeoutException; +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.ResourceAlreadyExistsException; +import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; +import org.elasticsearch.action.get.GetRequest; +import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.action.support.TransportActions; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.OriginSettingClient; +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterStateListener; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.component.AbstractLifecycleComponent; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.node.NodeClosedException; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.ConnectTransportException; + +import java.io.IOException; +import java.time.Instant; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.elasticsearch.index.mapper.MapperService.SINGLE_MAPPING_NAME; +import static org.elasticsearch.xpack.core.ClientHelper.SEARCHABLE_SNAPSHOTS_ORIGIN; + +public class BlobStoreCacheService extends AbstractLifecycleComponent implements ClusterStateListener { + + private static final Logger logger = LogManager.getLogger(BlobStoreCacheService.class); + + public static final int DEFAULT_CACHED_BLOB_SIZE = Math.toIntExact(ByteSizeUnit.KB.toBytes(4L)); + + private final ClusterService clusterService; + private final ThreadPool threadPool; + private final AtomicBoolean ready; + private final Client client; + private final String index; + + public BlobStoreCacheService(ClusterService clusterService, ThreadPool threadPool, Client client, String index) { + this.client = new OriginSettingClient(client, SEARCHABLE_SNAPSHOTS_ORIGIN); + this.ready = new AtomicBoolean(false); + this.clusterService = clusterService; + this.threadPool = threadPool; + this.index = index; + } + + @Override + protected void doStart() { + clusterService.addListener(this); + } + + @Override + protected void doStop() { + clusterService.removeListener(this); + } + + @Override + protected void doClose() {} + + @Override + public void clusterChanged(ClusterChangedEvent event) { + if (lifecycle.started() == false || event.routingTableChanged() == false) { + return; + } + if (event.indexRoutingTableChanged(index)) { + final IndexRoutingTable indexRoutingTable = event.state().routingTable().index(index); + if (indexRoutingTable == null) { + ready.set(false); + return; + } + ready.set(indexRoutingTable.allPrimaryShardsActive()); + } + } + + private void createIndexIfNecessary(ActionListener listener) { + if (clusterService.state().routingTable().hasIndex(index)) { + listener.onResponse(index); + return; + } + try { + client.admin() + .indices() + .prepareCreate(index) + .setSettings(indexSettings()) + .addMapping(SINGLE_MAPPING_NAME, mappings()) + .execute(new ActionListener() { + @Override + public void onResponse(CreateIndexResponse createIndexResponse) { + assert createIndexResponse.index().equals(index); + listener.onResponse(createIndexResponse.index()); + } + + @Override + public void onFailure(Exception e) { + if (e instanceof ResourceAlreadyExistsException + || ExceptionsHelper.unwrapCause(e) instanceof ResourceAlreadyExistsException) { + listener.onResponse(index); + } else { + listener.onFailure(e); + } + } + }); + } catch (Exception e) { + listener.onFailure(e); + } + } + + private static Settings indexSettings() { + return Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_AUTO_EXPAND_REPLICAS, "0-1") + .put(IndexMetadata.SETTING_PRIORITY, "900") + .build(); + } + + private static XContentBuilder mappings() throws IOException { + final XContentBuilder builder = jsonBuilder(); + { + builder.startObject(); + { + builder.startObject(SINGLE_MAPPING_NAME); + builder.field("dynamic", "strict"); + { + builder.startObject("_meta"); + builder.field("version", Version.CURRENT); + builder.endObject(); + } + { + builder.startObject("properties"); + { + builder.startObject("type"); + builder.field("type", "keyword"); + builder.endObject(); + } + { + builder.startObject("creation_time"); + builder.field("type", "date"); + builder.field("format", "epoch_millis"); + builder.endObject(); + } + { + builder.startObject("version"); + builder.field("type", "integer"); + builder.endObject(); + } + { + builder.startObject("repository"); + builder.field("type", "keyword"); + builder.endObject(); + } + { + builder.startObject("blob"); + builder.field("type", "object"); + { + builder.startObject("properties"); + { + builder.startObject("name"); + builder.field("type", "keyword"); + builder.endObject(); + builder.startObject("path"); + builder.field("type", "keyword"); + builder.endObject(); + } + builder.endObject(); + } + builder.endObject(); + } + { + builder.startObject("data"); + builder.field("type", "object"); + { + builder.startObject("properties"); + { + builder.startObject("content"); + builder.field("type", "binary"); + builder.endObject(); + } + { + builder.startObject("length"); + builder.field("type", "long"); + builder.endObject(); + } + { + builder.startObject("from"); + builder.field("type", "long"); + builder.endObject(); + } + { + builder.startObject("to"); + builder.field("type", "long"); + builder.endObject(); + } + builder.endObject(); + } + builder.endObject(); + } + builder.endObject(); + } + builder.endObject(); + } + builder.endObject(); + } + return builder; + } + + public CachedBlob get(String repository, String name, String path, long offset) { + assert Thread.currentThread().getName().contains('[' + ThreadPool.Names.SYSTEM_READ + ']') == false : "must not block [" + + Thread.currentThread().getName() + + "] for a cache read"; + + final PlainActionFuture future = PlainActionFuture.newFuture(); + getAsync(repository, name, path, offset, future); + try { + return future.actionGet(5, TimeUnit.SECONDS); + } catch (ElasticsearchTimeoutException e) { + if (logger.isDebugEnabled()) { + logger.warn( + () -> new ParameterizedMessage( + "get from cache index timed out after [5s], retrieving from blob store instead [id={}]", + CachedBlob.generateId(repository, name, path, offset) + ), + e + ); + } else { + logger.warn("get from cache index timed out after [5s], retrieving from blob store instead"); + } + return CachedBlob.CACHE_NOT_READY; + } + } + + protected void getAsync(String repository, String name, String path, long offset, ActionListener listener) { + if ((lifecycle.started() && ready.get()) == false) { + // TODO TBD can we just execute the GET request and let it fail if the index isn't ready yet? + // We might get lucky and hit a started shard anyway. + logger.debug("not ready : [{}]", CachedBlob.generateId(repository, name, path, offset)); + listener.onResponse(CachedBlob.CACHE_NOT_READY); + return; + } + final GetRequest request = new GetRequest(index).id(CachedBlob.generateId(repository, name, path, offset)); + client.get(request, new ActionListener() { + @Override + public void onResponse(GetResponse response) { + if (response.isExists()) { + logger.debug("cache hit : [{}]", request.id()); + assert response.isSourceEmpty() == false; + + final CachedBlob cachedBlob = CachedBlob.fromSource(response.getSource()); + assert response.getId().equals(cachedBlob.generatedId()); + listener.onResponse(cachedBlob); + } else { + logger.debug("cache miss: [{}]", request.id()); + listener.onResponse(CachedBlob.CACHE_MISS); + } + } + + @Override + public void onFailure(Exception e) { + // In case the blob cache system index is unavailable, we indicate it's not ready and move on. We do not fail the request: + // a failure here is not fatal since the data exists in the blob store, so we can simply indicate the cache is not ready. + if (isExpectedCacheGetException(e)) { + logger.debug(() -> new ParameterizedMessage("failed to retrieve cached blob from system index [{}]", index), e); + } else { + logger.warn(() -> new ParameterizedMessage("failed to retrieve cached blob from system index [{}]", index), e); + assert false : e; + } + listener.onResponse(CachedBlob.CACHE_NOT_READY); + } + }); + } + + private static boolean isExpectedCacheGetException(Exception e) { + return TransportActions.isShardNotAvailableException(e) + || e instanceof ConnectTransportException + || ExceptionsHelper.unwrapCause(e) instanceof NodeClosedException; + } + + public void putAsync(String repository, String name, String path, long offset, BytesReference content, ActionListener listener) { + createIndexIfNecessary(new ActionListener() { + @Override + public void onResponse(String s) { + final IndexRequest request; + try { + final CachedBlob cachedBlob = new CachedBlob( + Instant.ofEpochMilli(threadPool.absoluteTimeInMillis()), + Version.CURRENT, + repository, + name, + path, + content, + offset + ); + request = new IndexRequest(index).id(cachedBlob.generatedId()); + try (XContentBuilder builder = jsonBuilder()) { + request.source(cachedBlob.toXContent(builder, ToXContent.EMPTY_PARAMS)); + } + + client.index(request, new ActionListener() { + @Override + public void onResponse(IndexResponse indexResponse) { + logger.trace("cache fill ({}): [{}]", indexResponse.status(), request.id()); + listener.onResponse(null); + } + + @Override + public void onFailure(Exception e) { + logger.debug(new ParameterizedMessage("failure in cache fill: [{}]", request.id()), e); + listener.onFailure(e); + } + }); + } catch (Exception e) { + logger.warn( + new ParameterizedMessage("cache fill failure: [{}]", CachedBlob.generateId(repository, name, path, offset)), + e + ); + listener.onFailure(e); + } + } + + @Override + public void onFailure(Exception e) { + logger.error(() -> new ParameterizedMessage("failed to create blob cache system index [{}]", index), e); + listener.onFailure(e); + } + }); + } +} diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/blobstore/cache/CachedBlob.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/blobstore/cache/CachedBlob.java new file mode 100644 index 0000000000000..acc0c7cbe9260 --- /dev/null +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/blobstore/cache/CachedBlob.java @@ -0,0 +1,189 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.blobstore.cache; + +import org.elasticsearch.Version; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; + +import java.io.IOException; +import java.time.Instant; +import java.util.Base64; +import java.util.Map; + +public class CachedBlob implements ToXContent { + + /** + * Sentinel {@link CachedBlob} indicating that searching the cache index returned an error. + */ + public static final CachedBlob CACHE_NOT_READY = new CachedBlob(null, null, null, "CACHE_NOT_READY", null, BytesArray.EMPTY, 0L, 0L); + + /** + * Sentinel {@link CachedBlob} indicating that the cache index definitely did not contain the requested data. + */ + public static final CachedBlob CACHE_MISS = new CachedBlob(null, null, null, "CACHE_MISS", null, BytesArray.EMPTY, 0L, 0L); + + private static final String TYPE = "blob"; + + private final Instant creationTime; + private final Version version; + private final String repository; + private final String name; + private final String path; + + private final BytesReference bytes; + private final long from; + private final long to; + + public CachedBlob( + Instant creationTime, + Version version, + String repository, + String name, + String path, + BytesReference content, + long offset + ) { + this(creationTime, version, repository, name, path, content, offset, offset + (content == null ? 0 : content.length())); + } + + private CachedBlob( + Instant creationTime, + Version version, + String repository, + String name, + String path, + BytesReference content, + long from, + long to + ) { + this.creationTime = creationTime; + this.version = version; + this.repository = repository; + this.name = name; + this.path = path; + this.bytes = content; + this.from = from; + this.to = to; + assert this.to == this.from + this.bytes.length(); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + { + builder.field("type", TYPE); + builder.field("creation_time", creationTime.toEpochMilli()); + builder.field("version", version.id); + builder.field("repository", repository); + builder.startObject("blob"); + { + builder.field("name", name); + builder.field("path", path); + } + builder.endObject(); + builder.startObject("data"); + { + builder.field("content", BytesReference.toBytes(bytes)); + builder.field("length", bytes.length()); + builder.field("from", from); + builder.field("to", to); + } + builder.endObject(); + } + return builder.endObject(); + } + + public String generatedId() { + return generateId(repository, name, path, from); + } + + public long from() { + return from; + } + + public long to() { + return to; + } + + public int length() { + return bytes.length(); + } + + public BytesReference bytes() { + return bytes; + } + + public static String generateId(String repository, String name, String path, long offset) { + return String.join("/", repository, path, name, "@" + offset); + } + + @SuppressWarnings("unchecked") + public static CachedBlob fromSource(final Map source) { + final Long creationTimeEpochMillis = (Long) source.get("creation_time"); + if (creationTimeEpochMillis == null) { + throw new IllegalStateException("cached blob document does not have the [creation_time] field"); + } + final Version version = Version.fromId((Integer) source.get("version")); + if (version == null) { + throw new IllegalStateException("cached blob document does not have the [version] field"); + } + final String repository = (String) source.get("repository"); + if (repository == null) { + throw new IllegalStateException("cached blob document does not have the [repository] field"); + } + final Map blob = (Map) source.get("blob"); + if (blob == null || blob.isEmpty()) { + throw new IllegalStateException("cached blob document does not have the [blob] object"); + } + final String name = (String) blob.get("name"); + if (name == null) { + throw new IllegalStateException("cached blob document does not have the [blob.name] field"); + } + final String path = (String) blob.get("path"); + if (path == null) { + throw new IllegalStateException("cached blob document does not have the [blob.path] field"); + } + final Map data = (Map) source.get("data"); + if (data == null || data.isEmpty()) { + throw new IllegalStateException("cached blob document does not have the [data] fobjectield"); + } + final String encodedContent = (String) data.get("content"); + if (encodedContent == null) { + throw new IllegalStateException("cached blob document does not have the [data.content] field"); + } + final Integer length = (Integer) data.get("length"); + if (length == null) { + throw new IllegalStateException("cached blob document does not have the [data.length] field"); + } + final byte[] content = Base64.getDecoder().decode(encodedContent); + if (content.length != length) { + throw new IllegalStateException("cached blob document content length does not match [data.length] field"); + } + final Number from = (Number) data.get("from"); + if (from == null) { + throw new IllegalStateException("cached blob document does not have the [data.from] field"); + } + final Number to = (Number) data.get("to"); + if (to == null) { + throw new IllegalStateException("cached blob document does not have the [data.to] field"); + } + // TODO add exhaustive verifications (from/to/content.length, version supported, id == recomputed id etc) + return new CachedBlob( + Instant.ofEpochMilli(creationTimeEpochMillis), + version, + repository, + name, + path, + new BytesArray(content), + from.longValue(), + to.longValue() + ); + } +} diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/BaseSearchableSnapshotIndexInput.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/BaseSearchableSnapshotIndexInput.java index 9b4ed2db5981e..45e57e40cd855 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/BaseSearchableSnapshotIndexInput.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/BaseSearchableSnapshotIndexInput.java @@ -9,12 +9,10 @@ import org.apache.lucene.store.IOContext; import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo; -import org.elasticsearch.index.snapshots.blobstore.SlicedInputStream; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants; import java.io.IOException; -import java.io.InputStream; import java.util.Objects; import java.util.concurrent.atomic.AtomicBoolean; @@ -101,34 +99,6 @@ public final void close() throws IOException { public abstract void innerClose() throws IOException; - protected InputStream openInputStream(final long position, final long length) throws IOException { - assert assertCurrentThreadMayAccessBlobStore(); - if (fileInfo.numberOfParts() == 1L) { - assert position + length <= fileInfo.partBytes(0) : "cannot read [" - + position - + "-" - + (position + length) - + "] from [" - + fileInfo - + "]"; - return blobContainer.readBlob(fileInfo.partName(0L), position, length); - } else { - final long startPart = getPartNumberForPosition(position); - final long endPart = getPartNumberForPosition(position + length); - return new SlicedInputStream(endPart - startPart + 1L) { - @Override - protected InputStream openSlice(long slice) throws IOException { - final long currentPart = startPart + slice; - final long startInPart = (currentPart == startPart) ? getRelativePositionInPart(position) : 0L; - final long endInPart = (currentPart == endPart) - ? getRelativePositionInPart(position + length) - : getLengthOfPart(currentPart); - return blobContainer.readBlob(fileInfo.partName(currentPart), startInPart, endInPart - startInPart); - } - }; - } - } - protected final boolean assertCurrentThreadMayAccessBlobStore() { final String threadName = Thread.currentThread().getName(); assert threadName.contains('[' + ThreadPool.Names.SNAPSHOT + ']') @@ -149,29 +119,4 @@ protected final boolean assertCurrentThreadMayAccessBlobStore() { return true; } - private long getPartNumberForPosition(long position) { - ensureValidPosition(position); - final long part = position / fileInfo.partSize().getBytes(); - assert part <= fileInfo.numberOfParts() : "part number [" + part + "] exceeds number of parts: " + fileInfo.numberOfParts(); - assert part >= 0L : "part number [" + part + "] is negative"; - return part; - } - - private long getRelativePositionInPart(long position) { - ensureValidPosition(position); - final long pos = position % fileInfo.partSize().getBytes(); - assert pos < fileInfo.partBytes((int) getPartNumberForPosition(pos)) : "position in part [" + pos + "] exceeds part's length"; - assert pos >= 0L : "position in part [" + pos + "] is negative"; - return pos; - } - - private long getLengthOfPart(long part) { - return fileInfo.partBytes(Math.toIntExact(part)); - } - - private void ensureValidPosition(long position) { - if (position < 0L || position > fileInfo.length()) { - throw new IllegalArgumentException("Position [" + position + "] is invalid"); - } - } } diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/IndexInputStats.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/IndexInputStats.java index 8cc4b0e4f9d8f..0f652d604ba5d 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/IndexInputStats.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/IndexInputStats.java @@ -6,6 +6,7 @@ package org.elasticsearch.index.store; import org.elasticsearch.common.SuppressForbidden; +import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.index.store.cache.CachedBlobContainerIndexInput; @@ -43,8 +44,12 @@ public class IndexInputStats { private final TimedCounter optimizedBytesRead = new TimedCounter(); private final Counter cachedBytesRead = new Counter(); + private final Counter indexCacheBytesRead = new Counter(); private final TimedCounter cachedBytesWritten = new TimedCounter(); + private final Counter blobStoreBytesRequested = new Counter(); + private final AtomicLong currentIndexCacheFills = new AtomicLong(); + public IndexInputStats(long fileLength, LongSupplier currentTimeNanos) { this(fileLength, SEEKING_THRESHOLD.getBytes(), currentTimeNanos); } @@ -74,6 +79,10 @@ public void addCachedBytesRead(int bytesRead) { cachedBytesRead.add(bytesRead); } + public void addIndexCacheBytesRead(int bytesRead) { + indexCacheBytesRead.add(bytesRead); + } + public void addCachedBytesWritten(long bytesWritten, long nanoseconds) { cachedBytesWritten.add(bytesWritten, nanoseconds); } @@ -112,6 +121,19 @@ public void incrementSeeks(long currentPosition, long newPosition) { } } + public void addBlobStoreBytesRequested(long bytesRequested) { + blobStoreBytesRequested.add(bytesRequested); + } + + public Releasable addIndexCacheFill() { + final long openValue = currentIndexCacheFills.incrementAndGet(); + assert openValue > 0 : openValue; + return () -> { + final long closeValue = currentIndexCacheFills.decrementAndGet(); + assert closeValue >= 0 : closeValue; + }; + } + public long getFileLength() { return fileLength; } @@ -160,15 +182,27 @@ public Counter getCachedBytesRead() { return cachedBytesRead; } + public Counter getIndexCacheBytesRead() { + return indexCacheBytesRead; + } + public TimedCounter getCachedBytesWritten() { return cachedBytesWritten; } + public Counter getBlobStoreBytesRequested() { + return blobStoreBytesRequested; + } + @SuppressForbidden(reason = "Handles Long.MIN_VALUE before using Math.abs()") public boolean isLargeSeek(long delta) { return delta != Long.MIN_VALUE && Math.abs(delta) > seekingThreshold; } + public long getCurrentIndexCacheFills() { + return currentIndexCacheFills.get(); + } + public static class Counter { private final LongAdder count = new LongAdder(); diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java index 11187951be353..c07b652a805f9 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java @@ -21,11 +21,14 @@ import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.StepListener; import org.elasticsearch.action.support.GroupedActionListener; +import org.elasticsearch.blobstore.cache.BlobStoreCacheService; +import org.elasticsearch.blobstore.cache.CachedBlob; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.common.CheckedRunnable; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.support.FilterBlobContainer; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.lucene.store.ByteArrayIndexInput; import org.elasticsearch.common.settings.Settings; @@ -105,6 +108,9 @@ public class SearchableSnapshotDirectory extends BaseDirectory { private final Supplier blobContainerSupplier; private final Supplier snapshotSupplier; + private final BlobStoreCacheService blobStoreCacheService; + private final String blobStoreCachePath; + private final String repository; private final SnapshotId snapshotId; private final IndexId indexId; private final ShardId shardId; @@ -129,6 +135,8 @@ public class SearchableSnapshotDirectory extends BaseDirectory { public SearchableSnapshotDirectory( Supplier blobContainer, Supplier snapshot, + BlobStoreCacheService blobStoreCacheService, + String repository, SnapshotId snapshotId, IndexId indexId, ShardId shardId, @@ -142,6 +150,8 @@ public SearchableSnapshotDirectory( super(new SingleInstanceLockFactory()); this.snapshotSupplier = Objects.requireNonNull(snapshot); this.blobContainerSupplier = Objects.requireNonNull(blobContainer); + this.blobStoreCacheService = Objects.requireNonNull(blobStoreCacheService); + this.repository = Objects.requireNonNull(repository); this.snapshotId = Objects.requireNonNull(snapshotId); this.indexId = Objects.requireNonNull(indexId); this.shardId = Objects.requireNonNull(shardId); @@ -155,6 +165,7 @@ public SearchableSnapshotDirectory( this.prewarmCache = useCache ? SNAPSHOT_CACHE_PREWARM_ENABLED_SETTING.get(indexSettings) : false; this.excludedFileTypes = new HashSet<>(SNAPSHOT_CACHE_EXCLUDED_FILE_TYPES_SETTING.get(indexSettings)); this.uncachedChunkSize = SNAPSHOT_UNCACHED_CHUNK_SIZE_SETTING.get(indexSettings).getBytes(); + this.blobStoreCachePath = String.join("/", snapshotId.getUUID(), indexId.getId(), String.valueOf(shardId.id())); this.threadPool = threadPool; this.loaded = false; assert invariant(); @@ -163,6 +174,7 @@ public SearchableSnapshotDirectory( private synchronized boolean invariant() { assert loaded != (snapshot == null); assert loaded != (blobContainer == null); + assert loaded != (recoveryState == null); return true; } @@ -184,6 +196,7 @@ public boolean loadSnapshot(RecoveryState recoveryState) { assert recoveryState != null; assert recoveryState instanceof SearchableSnapshotRecoveryState; assert assertCurrentThreadMayLoadSnapshot(); + // noinspection ConstantConditions in case assertions are disabled if (recoveryState instanceof SearchableSnapshotRecoveryState == false) { throw new IllegalArgumentException("A SearchableSnapshotRecoveryState instance was expected"); } @@ -386,7 +399,7 @@ private boolean isExcludedFromCache(String name) { @Override public String toString() { - return this.getClass().getSimpleName() + "@snapshotId=" + snapshotId + " lockFactory=" + lockFactory; + return this.getClass().getSimpleName() + "@snapshotId=" + snapshotId + " lockFactory=" + lockFactory + " shard=" + shardId; } private void cleanExistingRegularShardFiles() { @@ -488,7 +501,8 @@ public static Directory create( IndexSettings indexSettings, ShardPath shardPath, LongSupplier currentTimeNanosSupplier, - ThreadPool threadPool + ThreadPool threadPool, + BlobStoreCacheService blobStoreCacheService ) throws IOException { if (SNAPSHOT_REPOSITORY_SETTING.exists(indexSettings.getSettings()) == false @@ -516,7 +530,8 @@ public static Directory create( ); } - final Repository repository = repositories.repository(SNAPSHOT_REPOSITORY_SETTING.get(indexSettings.getSettings())); + final String repositoryName = SNAPSHOT_REPOSITORY_SETTING.get(indexSettings.getSettings()); + final Repository repository = repositories.repository(repositoryName); if (repository instanceof BlobStoreRepository == false) { throw new IllegalArgumentException("Repository [" + repository + "] is not searchable"); } @@ -546,6 +561,8 @@ public static Directory create( new SearchableSnapshotDirectory( lazyBlobContainer::getOrCompute, lazySnapshot::getOrCompute, + blobStoreCacheService, + repositoryName, snapshotId, indexId, shardPath.getShardId(), @@ -585,6 +602,17 @@ public static SearchableSnapshotDirectory unwrapDirectory(Directory dir) { return null; } + public CachedBlob getCachedBlob(String name, long offset, int length) { + final CachedBlob cachedBlob = blobStoreCacheService.get(repository, name, blobStoreCachePath, offset); + assert cachedBlob == CachedBlob.CACHE_MISS || cachedBlob == CachedBlob.CACHE_NOT_READY || cachedBlob.from() <= offset; + assert cachedBlob == CachedBlob.CACHE_MISS || cachedBlob == CachedBlob.CACHE_NOT_READY || offset + length <= cachedBlob.to(); + return cachedBlob; + } + + public void putCachedBlob(String name, long offset, BytesReference content, ActionListener listener) { + blobStoreCacheService.putAsync(repository, name, blobStoreCachePath, offset, content, listener); + } + /** * A {@link FilterBlobContainer} that uses {@link BlobStoreRepository#maybeRateLimitRestores(InputStream)} to limit the rate at which * blobs are read from the repository. diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CacheFile.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CacheFile.java index c6e0557bb483d..e4bc6ac8bb613 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CacheFile.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CacheFile.java @@ -270,7 +270,15 @@ interface RangeMissingHandler { void fillCacheRange(FileChannel channel, long from, long to, Consumer progressUpdater) throws IOException; } - CompletableFuture fetchAsync( + /** + * Populates any missing ranges within {@code rangeToWrite} using the {@link RangeMissingHandler}, and notifies the + * {@link RangeAvailableHandler} when {@code rangeToRead} is available to read from the file. If {@code rangeToRead} is already + * available then the {@link RangeAvailableHandler} is called synchronously by this method; if not then the given {@link Executor} + * processes the missing ranges and notifies the {@link RangeAvailableHandler}. + * + * @return a future which returns the result of the {@link RangeAvailableHandler} once it has completed. + */ + CompletableFuture populateAndRead( final Tuple rangeToWrite, final Tuple rangeToRead, final RangeAvailableHandler reader, @@ -331,6 +339,41 @@ public void onFailure(Exception e) { return future; } + /** + * Notifies the {@link RangeAvailableHandler} when {@code rangeToRead} is available to read from the file. If {@code rangeToRead} is + * already available then the {@link RangeAvailableHandler} is called synchronously by this method; if not, but it is pending, then the + * {@link RangeAvailableHandler} is notified when the pending ranges have completed. If it contains gaps that are not currently pending + * then no listeners are registered and this method returns {@code null}. + * + * @return a future which returns the result of the {@link RangeAvailableHandler} once it has completed, or {@code null} if the + * target range is neither available nor pending. + */ + @Nullable + CompletableFuture readIfAvailableOrPending(final Tuple rangeToRead, final RangeAvailableHandler reader) { + final CompletableFuture future = new CompletableFuture<>(); + try { + ensureOpen(); + if (tracker.waitForRangeIfPending(rangeToRead, ActionListener.wrap(success -> { + final int read = reader.onRangeAvailable(channel); + assert read == rangeToRead.v2() - rangeToRead.v1() : "partial read [" + + read + + "] does not match the range to read [" + + rangeToRead.v2() + + '-' + + rangeToRead.v1() + + ']'; + future.complete(read); + }, future::completeExceptionally))) { + return future; + } else { + return null; + } + } catch (Exception e) { + future.completeExceptionally(e); + return future; + } + } + public Tuple getAbsentRangeWithin(long start, long end) { ensureOpen(); return tracker.getAbsentRangeWithin(start, end); diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInput.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInput.java index 4ee8a191fd5d7..dd457ca310960 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInput.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInput.java @@ -9,15 +9,23 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.lucene.codecs.CodecUtil; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; +import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.BytesRefIterator; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.blobstore.cache.BlobStoreCacheService; +import org.elasticsearch.blobstore.cache.CachedBlob; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.SuppressForbidden; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.Channels; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo; +import org.elasticsearch.index.snapshots.blobstore.SlicedInputStream; import org.elasticsearch.index.store.BaseSearchableSnapshotIndexInput; import org.elasticsearch.index.store.IndexInputStats; import org.elasticsearch.index.store.SearchableSnapshotDirectory; @@ -29,12 +37,15 @@ import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.util.Locale; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.Predicate; import java.util.stream.IntStream; +import static org.elasticsearch.index.store.checksum.ChecksumBlobContainerIndexInput.checksumToBytesArray; + public class CachedBlobContainerIndexInput extends BaseSearchableSnapshotIndexInput { /** @@ -74,6 +85,7 @@ public CachedBlobContainerIndexInput( new CacheFileReference(directory, fileInfo.physicalName(), fileInfo.length()), rangeSize ); + assert getBufferSize() <= BlobStoreCacheService.DEFAULT_CACHED_BLOB_SIZE; // must be able to cache at least one buffer's worth stats.incrementOpenCount(); } @@ -136,53 +148,296 @@ protected void readInternal(ByteBuffer b) throws IOException { final long position = getFilePointer() + this.offset; final int length = b.remaining(); - int totalBytesRead = 0; - while (totalBytesRead < length) { - final long pos = position + totalBytesRead; - final int len = length - totalBytesRead; - int bytesRead = 0; - try { - final CacheFile cacheFile = getCacheFileSafe(); - try (Releasable ignored = cacheFile.fileLock()) { - final Tuple rangeToWrite = computeRange(pos); - final Tuple rangeToRead = Tuple.tuple(pos, Math.min(pos + len, rangeToWrite.v2())); - - bytesRead = cacheFile.fetchAsync(rangeToWrite, rangeToRead, (channel) -> { - final int read; - if ((rangeToRead.v2() - rangeToRead.v1()) < b.remaining()) { - final ByteBuffer duplicate = b.duplicate(); - duplicate.limit(duplicate.position() + Math.toIntExact(rangeToRead.v2() - rangeToRead.v1())); - read = readCacheFile(channel, pos, duplicate); - assert duplicate.position() <= b.limit(); - b.position(duplicate.position()); + // We can detect that we're going to read the last 16 bytes (that contains the footer checksum) of the file. Such reads are often + // executed when opening a Directory and since we have the checksum in the snapshot metadata we can use it to fill the ByteBuffer. + if (length == CodecUtil.footerLength() && isClone == false && position == fileInfo.length() - length) { + if (readChecksumFromFileInfo(b)) { + logger.trace("read footer of file [{}] at position [{}], bypassing all caches", fileInfo.physicalName(), position); + return; + } + assert b.remaining() == length; + } + + logger.trace("readInternal: read [{}-{}] ([{}] bytes) from [{}]", position, position + length, length, this); + + try { + final CacheFile cacheFile = getCacheFileSafe(); + try (Releasable ignored = cacheFile.fileLock()) { + + // Can we serve the read directly from disk? If so, do so and don't worry about anything else. + + final CompletableFuture waitingForRead = cacheFile.readIfAvailableOrPending( + Tuple.tuple(position, position + length), + channel -> { + final int read = readCacheFile(channel, position, b); + assert read == length : read + " vs " + length; + return read; + } + ); + + if (waitingForRead != null) { + final Integer read = waitingForRead.get(); + assert read == length; + readComplete(position, length); + return; + } + + // Requested data is not on disk, so try the cache index next. + + final Tuple indexCacheMiss; // null if not a miss + + // We try to use the cache index if: + // - the file is small enough to be fully cached + final boolean canBeFullyCached = fileInfo.length() <= BlobStoreCacheService.DEFAULT_CACHED_BLOB_SIZE * 2; + // - we're reading the first N bytes of the file + final boolean isStartOfFile = (position + length <= BlobStoreCacheService.DEFAULT_CACHED_BLOB_SIZE); + + if (canBeFullyCached || isStartOfFile) { + final CachedBlob cachedBlob = directory.getCachedBlob(fileInfo.physicalName(), 0L, length); + + if (cachedBlob == CachedBlob.CACHE_MISS || cachedBlob == CachedBlob.CACHE_NOT_READY) { + // We would have liked to find a cached entry but we did not find anything: the cache on the disk will be requested + // so we compute the region of the file we would like to have the next time. The region is expressed as a tuple of + // {start, end} where positions are relative to the whole file. + + if (canBeFullyCached) { + // if the index input is smaller than twice the size of the blob cache, it will be fully indexed + indexCacheMiss = Tuple.tuple(0L, fileInfo.length()); } else { - read = readCacheFile(channel, pos, b); + // the index input is too large to fully cache, so just cache the initial range + indexCacheMiss = Tuple.tuple(0L, (long) BlobStoreCacheService.DEFAULT_CACHED_BLOB_SIZE); } - return read; - }, this::writeCacheFile, directory.cacheFetchAsyncExecutor()).get(); + + // We must fill in a cache miss even if CACHE_NOT_READY since the cache index is only created on the first put. + // TODO TBD use a different trigger for creating the cache index and avoid a put in the CACHE_NOT_READY case. + } else { + logger.trace( + "reading [{}] bytes of file [{}] at position [{}] using cache index", + length, + fileInfo.physicalName(), + position + ); + stats.addIndexCacheBytesRead(cachedBlob.length()); + + final BytesRefIterator cachedBytesIterator = cachedBlob.bytes().slice(Math.toIntExact(position), length).iterator(); + BytesRef bytesRef; + while ((bytesRef = cachedBytesIterator.next()) != null) { + b.put(bytesRef.bytes, bytesRef.offset, bytesRef.length); + } + assert b.position() == length : "copied " + b.position() + " but expected " + length; + + try { + final Tuple cachedRange = Tuple.tuple(cachedBlob.from(), cachedBlob.to()); + cacheFile.populateAndRead( + cachedRange, + cachedRange, + channel -> cachedBlob.length(), + (channel, from, to, progressUpdater) -> { + final long startTimeNanos = stats.currentTimeNanos(); + final BytesRefIterator iterator = cachedBlob.bytes() + .slice(Math.toIntExact(from - cachedBlob.from()), Math.toIntExact(to - from)) + .iterator(); + long writePosition = from; + BytesRef current; + while ((current = iterator.next()) != null) { + final ByteBuffer byteBuffer = ByteBuffer.wrap(current.bytes, current.offset, current.length); + while (byteBuffer.remaining() > 0) { + writePosition += positionalWrite(channel, writePosition, byteBuffer); + progressUpdater.accept(writePosition); + } + } + assert writePosition == to : writePosition + " vs " + to; + final long endTimeNanos = stats.currentTimeNanos(); + stats.addCachedBytesWritten(to - from, endTimeNanos - startTimeNanos); + logger.trace("copied bytes [{}-{}] of file [{}] from cache index to disk", from, to, fileInfo); + }, + directory.cacheFetchAsyncExecutor() + ); + } catch (Exception e) { + logger.debug( + new ParameterizedMessage( + "failed to store bytes [{}-{}] of file [{}] obtained from index cache", + cachedBlob.from(), + cachedBlob.to(), + fileInfo + ), + e + ); + // oh well, no big deal, at least we can return them to the caller. + } + + readComplete(position, length); + + return; + } + } else { + // requested range is not eligible for caching + indexCacheMiss = null; } - } catch (final Exception e) { - if (e instanceof AlreadyClosedException || (e.getCause() != null && e.getCause() instanceof AlreadyClosedException)) { - try { - // cache file was evicted during the range fetching, read bytes directly from source - bytesRead = readDirectly(pos, pos + len, b); - continue; - } catch (Exception inner) { - e.addSuppressed(inner); + + // Requested data is also not in the cache index, so we must visit the blob store to satisfy both the target range and any + // miss in the cache index. + + final Tuple startRangeToWrite = computeRange(position); + final Tuple endRangeToWrite = computeRange(position + length - 1); + assert startRangeToWrite.v2() <= endRangeToWrite.v2() : startRangeToWrite + " vs " + endRangeToWrite; + final Tuple rangeToWrite = Tuple.tuple( + Math.min(startRangeToWrite.v1(), indexCacheMiss == null ? Long.MAX_VALUE : indexCacheMiss.v1()), + Math.max(endRangeToWrite.v2(), indexCacheMiss == null ? Long.MIN_VALUE : indexCacheMiss.v2()) + ); + + assert rangeToWrite.v1() <= position && position + length <= rangeToWrite.v2() : "[" + + position + + "-" + + (position + length) + + "] vs " + + rangeToWrite; + final Tuple rangeToRead = Tuple.tuple(position, position + length); + + final CompletableFuture populateCacheFuture = cacheFile.populateAndRead(rangeToWrite, rangeToRead, channel -> { + final int read; + if ((rangeToRead.v2() - rangeToRead.v1()) < b.remaining()) { + final ByteBuffer duplicate = b.duplicate(); + duplicate.limit(duplicate.position() + Math.toIntExact(rangeToRead.v2() - rangeToRead.v1())); + read = readCacheFile(channel, position, duplicate); + assert duplicate.position() <= b.limit(); + b.position(duplicate.position()); + } else { + read = readCacheFile(channel, position, b); + } + return read; + }, this::writeCacheFile, directory.cacheFetchAsyncExecutor()); + + if (indexCacheMiss != null) { + final Releasable onCacheFillComplete = stats.addIndexCacheFill(); + final CompletableFuture readFuture = cacheFile.readIfAvailableOrPending(indexCacheMiss, channel -> { + final int indexCacheMissLength = Math.toIntExact(indexCacheMiss.v2() - indexCacheMiss.v1()); + + // We assume that we only cache small portions of blobs so that we do not need to: + // - use a BigArrays for allocation + // - use an intermediate copy buffer to read the file in sensibly-sized chunks + // - release the buffer once the indexing operation is complete + assert indexCacheMissLength <= COPY_BUFFER_SIZE : indexCacheMiss; + + final ByteBuffer byteBuffer = ByteBuffer.allocate(indexCacheMissLength); + Channels.readFromFileChannelWithEofException(channel, indexCacheMiss.v1(), byteBuffer); + // NB use Channels.readFromFileChannelWithEofException not readCacheFile() to avoid counting this in the stats + byteBuffer.flip(); + final BytesReference content = BytesReference.fromByteBuffer(byteBuffer); + directory.putCachedBlob(fileInfo.physicalName(), indexCacheMiss.v1(), content, new ActionListener() { + @Override + public void onResponse(Void response) { + onCacheFillComplete.close(); + } + + @Override + public void onFailure(Exception e1) { + onCacheFillComplete.close(); + } + }); + return indexCacheMissLength; + }); + + if (readFuture == null) { + // Normally doesn't happen, we're already obtaining a range covering all cache misses above, but theoretically + // possible in the case that the real populateAndRead call already failed to obtain this range of the file. In that + // case, simply move on. + onCacheFillComplete.close(); } } - throw new IOException("Fail to read data from cache", e); - } finally { - totalBytesRead += bytesRead; + final int bytesRead = populateCacheFuture.get(); + assert bytesRead == length : bytesRead + " vs " + length; } + } catch (final Exception e) { + // may have partially filled the buffer before the exception was thrown, so try and get the remainder directly. + final int alreadyRead = length - b.remaining(); + final int bytesRead = readDirectlyIfAlreadyClosed(position + alreadyRead, b, e); + assert alreadyRead + bytesRead == length : alreadyRead + " + " + bytesRead + " vs " + length; + + // In principle we could handle an index cache miss here too, ensuring that the direct read was large enough, but this is + // already a rare case caused by an overfull/undersized cache. } - assert totalBytesRead == length : "partial read operation, read [" + totalBytesRead + "] bytes of [" + length + "]"; - stats.incrementBytesRead(lastReadPosition, position, totalBytesRead); - lastReadPosition = position + totalBytesRead; + + readComplete(position, length); + } + + private void readComplete(long position, int length) { + stats.incrementBytesRead(lastReadPosition, position, length); + lastReadPosition = position + length; lastSeekPosition = lastReadPosition; } + private int readDirectlyIfAlreadyClosed(long position, ByteBuffer b, Exception e) throws IOException { + if (e instanceof AlreadyClosedException || (e.getCause() != null && e.getCause() instanceof AlreadyClosedException)) { + try { + // cache file was evicted during the range fetching, read bytes directly from blob container + final long length = b.remaining(); + final byte[] copyBuffer = new byte[Math.toIntExact(Math.min(COPY_BUFFER_SIZE, length))]; + logger.trace( + () -> new ParameterizedMessage( + "direct reading of range [{}-{}] for cache file [{}]", + position, + position + length, + cacheFileReference + ) + ); + + int bytesCopied = 0; + final long startTimeNanos = stats.currentTimeNanos(); + try (InputStream input = openInputStreamFromBlobStore(position, length)) { + long remaining = length; + while (remaining > 0) { + final int len = (remaining < copyBuffer.length) ? (int) remaining : copyBuffer.length; + int bytesRead = input.read(copyBuffer, 0, len); + if (bytesRead == -1) { + throw new EOFException( + String.format( + Locale.ROOT, + "unexpected EOF reading [%d-%d] ([%d] bytes remaining) from %s", + position, + position + length, + remaining, + cacheFileReference + ) + ); + } + b.put(copyBuffer, 0, bytesRead); + bytesCopied += bytesRead; + remaining -= bytesRead; + assert remaining == b.remaining() : remaining + " vs " + b.remaining(); + } + final long endTimeNanos = stats.currentTimeNanos(); + stats.addDirectBytesRead(bytesCopied, endTimeNanos - startTimeNanos); + } + return bytesCopied; + } catch (Exception inner) { + e.addSuppressed(inner); + } + } + throw new IOException("failed to read data from cache", e); + } + + private boolean readChecksumFromFileInfo(ByteBuffer b) throws IOException { + assert isClone == false; + byte[] footer; + try { + footer = checksumToBytesArray(fileInfo.checksum()); + } catch (NumberFormatException e) { + // tests disable this optimisation by passing an invalid checksum + footer = null; + } + if (footer == null) { + return false; + } + + b.put(footer); + assert b.remaining() == 0L; + return true; + + // TODO we should add this to DirectBlobContainerIndexInput too. + } + /** * Prefetches a complete part and writes it in cache. This method is used to prewarm the cache. */ @@ -232,7 +487,7 @@ public void prefetchPart(final int part) throws IOException { final AtomicLong totalBytesWritten = new AtomicLong(); long remainingBytes = rangeEnd - rangeStart; final long startTimeNanos = stats.currentTimeNanos(); - try (InputStream input = openInputStream(rangeStart, rangeLength)) { + try (InputStream input = openInputStreamFromBlobStore(rangeStart, rangeLength)) { while (remainingBytes > 0L) { assert totalBytesRead + remainingBytes == rangeLength; final int bytesRead = readSafe(input, copyBuffer, rangeStart, rangeEnd, remainingBytes, cacheFileReference); @@ -241,23 +496,33 @@ public void prefetchPart(final int part) throws IOException { final long readStart = rangeStart + totalBytesRead; final Tuple rangeToWrite = Tuple.tuple(readStart, readStart + bytesRead); - cacheFile.fetchAsync(rangeToWrite, rangeToWrite, (channel) -> bytesRead, (channel, start, end, progressUpdater) -> { - final ByteBuffer byteBuffer = ByteBuffer.wrap( - copyBuffer, - Math.toIntExact(start - readStart), - Math.toIntExact(end - start) - ); - final int writtenBytes = positionalWrite(channel, start, byteBuffer); - logger.trace( - "prefetchPart: writing range [{}-{}] of file [{}], [{}] bytes written", - start, - end, - fileInfo.physicalName(), - writtenBytes - ); - totalBytesWritten.addAndGet(writtenBytes); - progressUpdater.accept(start + writtenBytes); - }, directory.cacheFetchAsyncExecutor()).get(); + // We do not actually read anything, but we want to wait for the write to complete before proceeding. + // noinspection UnnecessaryLocalVariable + final Tuple rangeToRead = rangeToWrite; + + cacheFile.populateAndRead( + rangeToWrite, + rangeToRead, + (channel) -> bytesRead, + (channel, start, end, progressUpdater) -> { + final ByteBuffer byteBuffer = ByteBuffer.wrap( + copyBuffer, + Math.toIntExact(start - readStart), + Math.toIntExact(end - start) + ); + final int writtenBytes = positionalWrite(channel, start, byteBuffer); + logger.trace( + "prefetchPart: writing range [{}-{}] of file [{}], [{}] bytes written", + start, + end, + fileInfo.physicalName(), + writtenBytes + ); + totalBytesWritten.addAndGet(writtenBytes); + progressUpdater.accept(start + writtenBytes); + }, + directory.cacheFetchAsyncExecutor() + ).get(); totalBytesRead += bytesRead; remainingBytes -= bytesRead; } @@ -357,7 +622,7 @@ private void writeCacheFile(final FileChannel fc, final long start, final long e long bytesCopied = 0L; long remaining = end - start; final long startTimeNanos = stats.currentTimeNanos(); - try (InputStream input = openInputStream(start, length)) { + try (InputStream input = openInputStreamFromBlobStore(start, length)) { while (remaining > 0L) { final int bytesRead = readSafe(input, copyBuffer, start, end, remaining, cacheFileReference); positionalWrite(fc, start + bytesCopied, ByteBuffer.wrap(copyBuffer, 0, bytesRead)); @@ -370,6 +635,86 @@ private void writeCacheFile(final FileChannel fc, final long start, final long e } } + /** + * Opens an {@link InputStream} for the given range of bytes which reads the data directly from the blob store. If the requested range + * spans multiple blobs then this stream will request them in turn. + * + * @param position The start of the range of bytes to read, relative to the start of the corresponding Lucene file. + * @param length The number of bytes to read + */ + private InputStream openInputStreamFromBlobStore(final long position, final long length) throws IOException { + assert assertCurrentThreadMayAccessBlobStore(); + if (fileInfo.numberOfParts() == 1L) { + assert position + length <= fileInfo.partBytes(0) : "cannot read [" + + position + + "-" + + (position + length) + + "] from [" + + fileInfo + + "]"; + stats.addBlobStoreBytesRequested(length); + return blobContainer.readBlob(fileInfo.partName(0L), position, length); + } else { + final long startPart = getPartNumberForPosition(position); + final long endPart = getPartNumberForPosition(position + length - 1); + + for (long currentPart = startPart; currentPart <= endPart; currentPart++) { + final long startInPart = (currentPart == startPart) ? getRelativePositionInPart(position) : 0L; + final long endInPart = (currentPart == endPart) + ? getRelativePositionInPart(position + length - 1) + 1 + : getLengthOfPart(currentPart); + stats.addBlobStoreBytesRequested(endInPart - startInPart); + } + + return new SlicedInputStream(endPart - startPart + 1L) { + @Override + protected InputStream openSlice(long slice) throws IOException { + final long currentPart = startPart + slice; + final long startInPart = (currentPart == startPart) ? getRelativePositionInPart(position) : 0L; + final long endInPart = (currentPart == endPart) + ? getRelativePositionInPart(position + length - 1) + 1 + : getLengthOfPart(currentPart); + return blobContainer.readBlob(fileInfo.partName(currentPart), startInPart, endInPart - startInPart); + } + }; + } + } + + /** + * Compute the part number that contains the byte at the given position in the corresponding Lucene file. + */ + private long getPartNumberForPosition(long position) { + ensureValidPosition(position); + final long part = position / fileInfo.partSize().getBytes(); + assert part <= fileInfo.numberOfParts() : "part number [" + part + "] exceeds number of parts: " + fileInfo.numberOfParts(); + assert part >= 0L : "part number [" + part + "] is negative"; + return part; + } + + /** + * Compute the position of the given byte relative to the start of its part. + * @param position the position of the required byte (within the corresponding Lucene file) + */ + private long getRelativePositionInPart(long position) { + ensureValidPosition(position); + final long pos = position % fileInfo.partSize().getBytes(); + assert pos < fileInfo.partBytes((int) getPartNumberForPosition(pos)) : "position in part [" + pos + "] exceeds part's length"; + assert pos >= 0L : "position in part [" + pos + "] is negative"; + return pos; + } + + private long getLengthOfPart(long part) { + return fileInfo.partBytes(Math.toIntExact(part)); + } + + private void ensureValidPosition(long position) { + assert position >= 0L && position < fileInfo.length() : position + " vs " + fileInfo.length(); + // noinspection ConstantConditions in case assertions are disabled + if (position < 0L || position >= fileInfo.length()) { + throw new IllegalArgumentException("Position [" + position + "] is invalid for a file of length [" + fileInfo.length() + "]"); + } + } + @Override protected void seekInternal(long pos) throws IOException { if (pos > length()) { @@ -431,43 +776,11 @@ public String toString() { + getFilePointer() + ", rangeSize=" + getDefaultRangeSize() + + ", directory=" + + directory + '}'; } - private int readDirectly(long start, long end, ByteBuffer b) throws IOException { - final long length = end - start; - final byte[] copyBuffer = new byte[Math.toIntExact(Math.min(COPY_BUFFER_SIZE, length))]; - logger.trace(() -> new ParameterizedMessage("direct reading of range [{}-{}] for cache file [{}]", start, end, cacheFileReference)); - - int bytesCopied = 0; - final long startTimeNanos = stats.currentTimeNanos(); - try (InputStream input = openInputStream(start, length)) { - long remaining = end - start; - while (remaining > 0) { - final int len = (remaining < copyBuffer.length) ? (int) remaining : copyBuffer.length; - int bytesRead = input.read(copyBuffer, 0, len); - if (bytesRead == -1) { - throw new EOFException( - String.format( - Locale.ROOT, - "unexpected EOF reading [%d-%d] ([%d] bytes remaining) from %s", - start, - end, - remaining, - cacheFileReference - ) - ); - } - b.put(copyBuffer, 0, bytesRead); - bytesCopied += bytesRead; - remaining -= bytesRead; - } - final long endTimeNanos = stats.currentTimeNanos(); - stats.addDirectBytesRead(bytesCopied, endTimeNanos - startTimeNanos); - } - return bytesCopied; - } - private static class CacheFileReference implements CacheFile.EvictionListener { private final long fileLength; diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/SparseFileTracker.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/SparseFileTracker.java index feeb55152e314..9e9217688b66a 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/SparseFileTracker.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/SparseFileTracker.java @@ -178,7 +178,7 @@ public List waitForRange(final Tuple range, final Tuple null), - Math.min(requiredRange.end, subRange != null ? subRange.v2() : Long.MAX_VALUE) + Math.min(requiredRange.end, subRange.v2()) ); break; default: @@ -187,10 +187,7 @@ public List waitForRange(final Tuple range, final Tuple r.completionListener.addListener( - groupedActionListener, - Math.min(r.end, subRange != null ? subRange.v2() : Long.MAX_VALUE) - ) + r -> r.completionListener.addListener(groupedActionListener, Math.min(r.end, subRange.v2())) ); } @@ -203,6 +200,90 @@ public List waitForRange(final Tuple range, final Tuple range, final ActionListener listener) { + final long start = range.v1(); + final long end = range.v2(); + if (end < start || start < 0L || length < end) { + throw new IllegalArgumentException("invalid range [start=" + start + ", end=" + end + ", length=" + length + "]"); + } + + synchronized (mutex) { + assert invariant(); + + final List pendingRanges = new ArrayList<>(); + + final Range targetRange = new Range(start, end, null); + final SortedSet earlierRanges = ranges.headSet(targetRange, false); // ranges with strictly earlier starts + if (earlierRanges.isEmpty() == false) { + final Range lastEarlierRange = earlierRanges.last(); + if (start < lastEarlierRange.end) { + if (lastEarlierRange.isPending()) { + pendingRanges.add(lastEarlierRange); + } + targetRange.start = Math.min(end, lastEarlierRange.end); + } + } + + while (targetRange.start < end) { + assert 0 <= targetRange.start : targetRange; + assert invariant(); + + final SortedSet existingRanges = ranges.tailSet(targetRange); + if (existingRanges.isEmpty()) { + return false; + } else { + final Range firstExistingRange = existingRanges.first(); + assert targetRange.start <= firstExistingRange.start : targetRange + " vs " + firstExistingRange; + + if (targetRange.start == firstExistingRange.start) { + if (firstExistingRange.isPending()) { + pendingRanges.add(firstExistingRange); + } + targetRange.start = Math.min(end, firstExistingRange.end); + } else { + return false; + } + } + } + assert targetRange.start == targetRange.end : targetRange; + assert targetRange.start == end : targetRange; + assert invariant(); + + switch (pendingRanges.size()) { + case 0: + break; + case 1: + final Range pendingRange = pendingRanges.get(0); + pendingRange.completionListener.addListener( + ActionListener.map(listener, progress -> null), + Math.min(pendingRange.end, end) + ); + return true; + default: + final GroupedActionListener groupedActionListener = new GroupedActionListener<>( + ActionListener.map(listener, progress -> null), + pendingRanges.size() + ); + pendingRanges.forEach(r -> r.completionListener.addListener(groupedActionListener, Math.min(r.end, end))); + return true; + } + } + + listener.onResponse(null); + return true; + } + /** * Returns a range that contains all bytes of the target range which are absent (possibly pending). The returned range may include * some ranges of present bytes. It tries to return the smallest possible range, but does so on a best-effort basis. This method does diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/checksum/ChecksumBlobContainerIndexInput.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/checksum/ChecksumBlobContainerIndexInput.java index 76c9017ba84e1..bb72e354b2dfc 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/checksum/ChecksumBlobContainerIndexInput.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/checksum/ChecksumBlobContainerIndexInput.java @@ -7,11 +7,9 @@ package org.elasticsearch.index.store.checksum; import org.apache.lucene.codecs.CodecUtil; -import org.apache.lucene.store.ByteBuffersDataOutput; -import org.apache.lucene.store.ByteBuffersIndexOutput; +import org.apache.lucene.store.ByteArrayDataOutput; import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; -import org.apache.lucene.store.IndexOutput; import org.elasticsearch.index.store.Store; import java.io.EOFException; @@ -131,14 +129,18 @@ private static void ensureReadOnceChecksumContext(IOContext context) { * @throws IOException if something goes wrong when creating the {@link ChecksumBlobContainerIndexInput} */ public static ChecksumBlobContainerIndexInput create(String name, long length, String checksum, IOContext context) throws IOException { - final ByteBuffersDataOutput out = new ByteBuffersDataOutput(); - try (IndexOutput output = new ByteBuffersIndexOutput(out, "tmp", name)) { - // reverse CodecUtil.writeFooter() - output.writeInt(CodecUtil.FOOTER_MAGIC); - output.writeInt(0); - output.writeLong(Long.parseLong(checksum, Character.MAX_RADIX)); - output.close(); - return new ChecksumBlobContainerIndexInput(name, length, out.toArrayCopy(), context); - } + return new ChecksumBlobContainerIndexInput(name, length, checksumToBytesArray(checksum), context); + } + + public static byte[] checksumToBytesArray(String checksum) throws IOException { + final byte[] result = new byte[CodecUtil.footerLength()]; + assert result.length >= Integer.BYTES + Integer.BYTES + Long.BYTES; // ensure that nobody changed the file format under us + final ByteArrayDataOutput output = new ByteArrayDataOutput(result); + // reverse CodecUtil.writeFooter() + output.writeInt(CodecUtil.FOOTER_MAGIC); + output.writeInt(0); + output.writeLong(Long.parseLong(checksum, Character.MAX_RADIX)); + assert output.getPosition() == result.length; + return result; } } diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/direct/DirectBlobContainerIndexInput.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/direct/DirectBlobContainerIndexInput.java index 18e779578a286..1386c67de671a 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/direct/DirectBlobContainerIndexInput.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/direct/DirectBlobContainerIndexInput.java @@ -338,6 +338,7 @@ public String toString() { private InputStream openBlobStream(int part, long pos, long length) throws IOException { assert assertCurrentThreadMayAccessBlobStore(); + stats.addBlobStoreBytesRequested(length); return blobContainer.readBlob(fileInfo.partName(part), pos, length); } diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java index ff68ba71e34f9..aaa75deea2f8e 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java @@ -8,6 +8,7 @@ import org.apache.lucene.util.SetOnce; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.blobstore.cache.BlobStoreCacheService; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNodes; @@ -33,6 +34,7 @@ import org.elasticsearch.index.engine.ReadOnlyEngine; import org.elasticsearch.index.store.SearchableSnapshotDirectory; import org.elasticsearch.index.translog.TranslogStats; +import org.elasticsearch.indices.SystemIndexDescriptor; import org.elasticsearch.indices.recovery.SearchableSnapshotRecoveryState; import org.elasticsearch.license.LicenseUtils; import org.elasticsearch.license.XPackLicenseState; @@ -41,6 +43,7 @@ import org.elasticsearch.plugins.EnginePlugin; import org.elasticsearch.plugins.IndexStorePlugin; import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.plugins.SystemIndexPlugin; import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestHandler; @@ -80,13 +83,14 @@ import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants.CACHE_PREWARMING_THREAD_POOL_NAME; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants.CACHE_PREWARMING_THREAD_POOL_SETTING; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants.SEARCHABLE_SNAPSHOTS_FEATURE_ENABLED; +import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants.SNAPSHOT_BLOB_CACHE_INDEX; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants.SNAPSHOT_DIRECTORY_FACTORY_KEY; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants.SNAPSHOT_RECOVERY_STATE_FACTORY_KEY; /** * Plugin for Searchable Snapshots feature */ -public class SearchableSnapshots extends Plugin implements IndexStorePlugin, EnginePlugin, ActionPlugin, ClusterPlugin { +public class SearchableSnapshots extends Plugin implements IndexStorePlugin, EnginePlugin, ActionPlugin, ClusterPlugin, SystemIndexPlugin { public static final Setting SNAPSHOT_REPOSITORY_SETTING = Setting.simpleString( "index.store.snapshot.repository_name", @@ -142,6 +146,7 @@ public class SearchableSnapshots extends Plugin implements IndexStorePlugin, Eng ); private volatile Supplier repositoriesServiceSupplier; + private final SetOnce blobStoreCacheService = new SetOnce<>(); private final SetOnce cacheService = new SetOnce<>(); private final SetOnce threadPool = new SetOnce<>(); private final SetOnce failShardsListener = new SetOnce<>(); @@ -199,10 +204,17 @@ public Collection createComponents( this.cacheService.set(cacheService); this.repositoriesServiceSupplier = repositoriesServiceSupplier; this.threadPool.set(threadPool); + final BlobStoreCacheService blobStoreCacheService = new BlobStoreCacheService( + clusterService, + threadPool, + client, + SNAPSHOT_BLOB_CACHE_INDEX + ); + this.blobStoreCacheService.set(blobStoreCacheService); this.failShardsListener.set( new FailShardsOnInvalidLicenseClusterListener(getLicenseState(), clusterService.getRerouteService()) ); - return org.elasticsearch.common.collect.List.of(cacheService); + return org.elasticsearch.common.collect.List.of(cacheService, blobStoreCacheService); } else { this.repositoriesServiceSupplier = () -> { assert false : "searchable snapshots are disabled"; @@ -233,6 +245,13 @@ public void onIndexModule(IndexModule indexModule) { } } + @Override + public Collection getSystemIndexDescriptors(Settings settings) { + return org.elasticsearch.common.collect.List.of( + new SystemIndexDescriptor(SNAPSHOT_BLOB_CACHE_INDEX, "Contains cached data of blob store repositories") + ); + } + @Override public Map getDirectoryFactories() { if (SEARCHABLE_SNAPSHOTS_FEATURE_ENABLED) { @@ -243,7 +262,17 @@ public Map getDirectoryFactories() { assert cache != null; final ThreadPool threadPool = this.threadPool.get(); assert threadPool != null; - return SearchableSnapshotDirectory.create(repositories, cache, indexSettings, shardPath, System::nanoTime, threadPool); + final BlobStoreCacheService blobCache = blobStoreCacheService.get(); + assert blobCache != null; + return SearchableSnapshotDirectory.create( + repositories, + cache, + indexSettings, + shardPath, + System::nanoTime, + threadPool, + blobCache + ); }); } else { return org.elasticsearch.common.collect.Map.of(); diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/action/TransportSearchableSnapshotsStatsAction.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/action/TransportSearchableSnapshotsStatsAction.java index a2e9ac0c1f478..8d0c3eac5fe02 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/action/TransportSearchableSnapshotsStatsAction.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/action/TransportSearchableSnapshotsStatsAction.java @@ -108,9 +108,12 @@ private static CacheIndexInputStats toCacheIndexInputStats(final String fileName toCounter(inputStats.getContiguousReads()), toCounter(inputStats.getNonContiguousReads()), toCounter(inputStats.getCachedBytesRead()), + toCounter(inputStats.getIndexCacheBytesRead()), toTimedCounter(inputStats.getCachedBytesWritten()), toTimedCounter(inputStats.getDirectBytesRead()), - toTimedCounter(inputStats.getOptimizedBytesRead()) + toTimedCounter(inputStats.getOptimizedBytesRead()), + toCounter(inputStats.getBlobStoreBytesRequested()), + inputStats.getCurrentIndexCacheFills() ); } diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/blobstore/cache/SearchableSnapshotsBlobStoreCacheIntegTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/blobstore/cache/SearchableSnapshotsBlobStoreCacheIntegTests.java new file mode 100644 index 0000000000000..a3e38fcd62207 --- /dev/null +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/blobstore/cache/SearchableSnapshotsBlobStoreCacheIntegTests.java @@ -0,0 +1,458 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.blobstore.cache; + +import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse; +import org.elasticsearch.action.admin.indices.refresh.RefreshResponse; +import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.OriginSettingClient; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.cluster.routing.RoutingNode; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; +import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider; +import org.elasticsearch.cluster.routing.allocation.decider.Decision; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.Streams; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot; +import org.elasticsearch.plugins.ClusterPlugin; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.snapshots.SnapshotId; +import org.elasticsearch.test.InternalTestCluster; +import org.elasticsearch.xpack.core.ClientHelper; +import org.elasticsearch.xpack.core.searchablesnapshots.SearchableSnapshotShardStats; +import org.elasticsearch.xpack.searchablesnapshots.BaseSearchableSnapshotsIntegTestCase; +import org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots; +import org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants; +import org.elasticsearch.xpack.searchablesnapshots.action.SearchableSnapshotsStatsAction; +import org.elasticsearch.xpack.searchablesnapshots.action.SearchableSnapshotsStatsRequest; +import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService; + +import java.io.IOException; +import java.nio.file.FileVisitResult; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.SimpleFileVisitor; +import java.nio.file.attribute.BasicFileAttributes; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; + +import static org.elasticsearch.index.mapper.MapperService.SINGLE_MAPPING_NAME; +import static org.elasticsearch.repositories.blobstore.BlobStoreRepository.INDEX_SHARD_SNAPSHOT_FORMAT; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; +import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants.SNAPSHOT_BLOB_CACHE_INDEX; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.is; + +public class SearchableSnapshotsBlobStoreCacheIntegTests extends BaseSearchableSnapshotsIntegTestCase { + + @Override + protected Collection> nodePlugins() { + final List> plugins = new ArrayList<>(); + plugins.add(WaitForSnapshotBlobCacheShardsActivePlugin.class); + plugins.addAll(super.nodePlugins()); + return org.elasticsearch.common.collect.List.copyOf(plugins); + } + + @Override + protected int numberOfReplicas() { + return 0; + } + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal)) + .put( + CacheService.SNAPSHOT_CACHE_RANGE_SIZE_SETTING.getKey(), + randomLongBetween(new ByteSizeValue(4, ByteSizeUnit.KB).getBytes(), new ByteSizeValue(20, ByteSizeUnit.KB).getBytes()) + "b" + ) + .put(CacheService.SNAPSHOT_CACHE_SIZE_SETTING.getKey(), new ByteSizeValue(Long.MAX_VALUE, ByteSizeUnit.BYTES)) + .build(); + } + + public void testBlobStoreCache() throws Exception { + final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + createIndex(indexName); + + final List indexRequestBuilders = new ArrayList<>(); + for (int i = scaledRandomIntBetween(0, 10_000); i >= 0; i--) { + indexRequestBuilders.add( + client().prepareIndex(indexName, SINGLE_MAPPING_NAME).setSource("text", randomUnicodeOfLength(10), "num", i) + ); + } + indexRandom(true, false, true, indexRequestBuilders); + final long numberOfDocs = indexRequestBuilders.size(); + final NumShards numberOfShards = getNumShards(indexName); + + if (randomBoolean()) { + logger.info("--> force-merging index before snapshotting"); + final ForceMergeResponse forceMergeResponse = client().admin() + .indices() + .prepareForceMerge(indexName) + .setMaxNumSegments(1) + .get(); + assertThat(forceMergeResponse.getSuccessfulShards(), equalTo(numberOfShards.totalNumShards)); + assertThat(forceMergeResponse.getFailedShards(), equalTo(0)); + } + + final String repositoryName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + final Path repositoryLocation = randomRepoPath(); + createFsRepository(repositoryName, repositoryLocation); + + final SnapshotId snapshot = createSnapshot(repositoryName, org.elasticsearch.common.collect.List.of(indexName)); + assertAcked(client().admin().indices().prepareDelete(indexName)); + + // extract the list of blobs per shard from the snapshot directory on disk + final Map blobsInSnapshot = blobsInSnapshot(repositoryLocation, snapshot.getUUID()); + assertThat("Failed to load all shard snapshot metadata files", blobsInSnapshot.size(), equalTo(numberOfShards.numPrimaries)); + + expectThrows( + IndexNotFoundException.class, + ".snapshot-blob-cache system index should not be created yet", + () -> systemClient().admin().indices().prepareGetIndex().addIndices(SNAPSHOT_BLOB_CACHE_INDEX).get() + ); + + logger.info("--> mount snapshot [{}] as an index for the first time", snapshot); + final String restoredIndex = mountSnapshot( + repositoryName, + snapshot.getName(), + indexName, + Settings.builder() + .put(SearchableSnapshots.SNAPSHOT_CACHE_ENABLED_SETTING.getKey(), true) + .put(SearchableSnapshots.SNAPSHOT_CACHE_PREWARM_ENABLED_SETTING.getKey(), false) + .build() + ); + ensureGreen(restoredIndex); + + // wait for all async cache fills to complete + assertBusy(() -> { + for (final SearchableSnapshotShardStats shardStats : client().execute( + SearchableSnapshotsStatsAction.INSTANCE, + new SearchableSnapshotsStatsRequest() + ).actionGet().getStats()) { + for (final SearchableSnapshotShardStats.CacheIndexInputStats indexInputStats : shardStats.getStats()) { + assertThat(Strings.toString(indexInputStats), indexInputStats.getCurrentIndexCacheFills(), equalTo(0L)); + } + } + }); + + for (final SearchableSnapshotShardStats shardStats : client().execute( + SearchableSnapshotsStatsAction.INSTANCE, + new SearchableSnapshotsStatsRequest() + ).actionGet().getStats()) { + for (final SearchableSnapshotShardStats.CacheIndexInputStats indexInputStats : shardStats.getStats()) { + assertThat(Strings.toString(indexInputStats), indexInputStats.getBlobStoreBytesRequested().getCount(), greaterThan(0L)); + } + } + + logger.info("--> verifying cached documents in system index [{}]", SNAPSHOT_BLOB_CACHE_INDEX); + assertCachedBlobsInSystemIndex(repositoryName, blobsInSnapshot); + + final long numberOfCachedBlobs = systemClient().prepareSearch(SNAPSHOT_BLOB_CACHE_INDEX).get().getHits().getTotalHits().value; + final long numberOfCacheWrites = systemClient().admin() + .indices() + .prepareStats(SNAPSHOT_BLOB_CACHE_INDEX) + .clear() + .setIndexing(true) + .get() + .getTotal().indexing.getTotal().getIndexCount(); + + logger.info("--> verifying documents in index [{}]", restoredIndex); + assertHitCount(client().prepareSearch(restoredIndex).setSize(0).setTrackTotalHits(true).get(), numberOfDocs); + assertHitCount( + client().prepareSearch(restoredIndex) + .setQuery(QueryBuilders.rangeQuery("num").lte(numberOfDocs)) + .setSize(0) + .setTrackTotalHits(true) + .get(), + numberOfDocs + ); + assertHitCount( + client().prepareSearch(restoredIndex) + .setQuery(QueryBuilders.rangeQuery("num").gt(numberOfDocs + 1)) + .setSize(0) + .setTrackTotalHits(true) + .get(), + 0L + ); + + assertAcked(client().admin().indices().prepareDelete(restoredIndex)); + + logger.info("--> mount snapshot [{}] as an index for the second time", snapshot); + final String restoredAgainIndex = mountSnapshot( + repositoryName, + snapshot.getName(), + indexName, + Settings.builder() + .put(SearchableSnapshots.SNAPSHOT_CACHE_ENABLED_SETTING.getKey(), true) + .put(SearchableSnapshots.SNAPSHOT_CACHE_PREWARM_ENABLED_SETTING.getKey(), false) + .build() + ); + ensureGreen(restoredAgainIndex); + + logger.info("--> verifying shards of [{}] were started without using the blob store more than necessary", restoredAgainIndex); + for (final SearchableSnapshotShardStats shardStats : client().execute( + SearchableSnapshotsStatsAction.INSTANCE, + new SearchableSnapshotsStatsRequest() + ).actionGet().getStats()) { + for (final SearchableSnapshotShardStats.CacheIndexInputStats indexInputStats : shardStats.getStats()) { + final boolean mayReadMoreThanHeader + // we read the header of each file contained within the .cfs file, which could be anywhere + = indexInputStats.getFileName().endsWith(".cfs") + // we read a couple of longs at the end of the .fdt file (see https://issues.apache.org/jira/browse/LUCENE-9456) + // TODO revisit this when this issue is addressed in Lucene + || indexInputStats.getFileName().endsWith(".fdt"); + if (indexInputStats.getFileLength() <= BlobStoreCacheService.DEFAULT_CACHED_BLOB_SIZE * 2 + || mayReadMoreThanHeader == false) { + assertThat(Strings.toString(indexInputStats), indexInputStats.getBlobStoreBytesRequested().getCount(), equalTo(0L)); + } + } + } + + logger.info("--> verifying documents in index [{}]", restoredAgainIndex); + assertHitCount(client().prepareSearch(restoredAgainIndex).setSize(0).setTrackTotalHits(true).get(), numberOfDocs); + assertHitCount( + client().prepareSearch(restoredAgainIndex) + .setQuery(QueryBuilders.rangeQuery("num").lte(numberOfDocs)) + .setSize(0) + .setTrackTotalHits(true) + .get(), + numberOfDocs + ); + assertHitCount( + client().prepareSearch(restoredAgainIndex) + .setQuery(QueryBuilders.rangeQuery("num").gt(numberOfDocs + 1)) + .setSize(0) + .setTrackTotalHits(true) + .get(), + 0L + ); + + logger.info("--> verifying cached documents (again) in system index [{}]", SNAPSHOT_BLOB_CACHE_INDEX); + assertCachedBlobsInSystemIndex(repositoryName, blobsInSnapshot); + + logger.info("--> verifying that no extra cached blobs were indexed [{}]", SNAPSHOT_BLOB_CACHE_INDEX); + refreshSystemIndex(); + assertHitCount(systemClient().prepareSearch(SNAPSHOT_BLOB_CACHE_INDEX).setSize(0).get(), numberOfCachedBlobs); + assertThat( + systemClient().admin().indices().prepareStats(SNAPSHOT_BLOB_CACHE_INDEX).clear().setIndexing(true).get().getTotal().indexing + .getTotal() + .getIndexCount(), + equalTo(numberOfCacheWrites) + ); + + logger.info("--> restarting cluster"); + internalCluster().fullRestart(new InternalTestCluster.RestartCallback() { + @Override + public Settings onNodeStopped(String nodeName) throws Exception { + return Settings.builder() + .put(super.onNodeStopped(nodeName)) + .put(WaitForSnapshotBlobCacheShardsActivePlugin.ENABLED.getKey(), true) + .build(); + } + }); + ensureGreen(restoredAgainIndex); + + logger.info("--> verifying documents in index [{}]", restoredAgainIndex); + assertHitCount(client().prepareSearch(restoredAgainIndex).setSize(0).setTrackTotalHits(true).get(), numberOfDocs); + assertHitCount( + client().prepareSearch(restoredAgainIndex) + .setQuery(QueryBuilders.rangeQuery("num").lte(numberOfDocs)) + .setSize(0) + .setTrackTotalHits(true) + .get(), + numberOfDocs + ); + assertHitCount( + client().prepareSearch(restoredAgainIndex) + .setQuery(QueryBuilders.rangeQuery("num").gt(numberOfDocs + 1)) + .setSize(0) + .setTrackTotalHits(true) + .get(), + 0L + ); + + logger.info("--> verifying cached documents (after restart) in system index [{}]", SNAPSHOT_BLOB_CACHE_INDEX); + assertCachedBlobsInSystemIndex(repositoryName, blobsInSnapshot); + + logger.info("--> verifying that no cached blobs were indexed in system index [{}] after restart", SNAPSHOT_BLOB_CACHE_INDEX); + assertHitCount(systemClient().prepareSearch(SNAPSHOT_BLOB_CACHE_INDEX).setSize(0).get(), numberOfCachedBlobs); + assertThat( + systemClient().admin().indices().prepareStats(SNAPSHOT_BLOB_CACHE_INDEX).clear().setIndexing(true).get().getTotal().indexing + .getTotal() + .getIndexCount(), + equalTo(0L) + ); + + // TODO also test when the index is frozen + // TODO also test when prewarming is enabled + } + + /** + * @return a {@link Client} that can be used to query the blob store cache system index + */ + private Client systemClient() { + return new OriginSettingClient(client(), ClientHelper.SEARCHABLE_SNAPSHOTS_ORIGIN); + } + + private void refreshSystemIndex() { + try { + final RefreshResponse refreshResponse = systemClient().admin().indices().prepareRefresh(SNAPSHOT_BLOB_CACHE_INDEX).get(); + assertThat(refreshResponse.getSuccessfulShards(), greaterThan(0)); + assertThat(refreshResponse.getFailedShards(), equalTo(0)); + } catch (IndexNotFoundException indexNotFoundException) { + throw new AssertionError("unexpected", indexNotFoundException); + } + } + + /** + * Reads a repository location on disk and extracts the list of blobs for each shards + */ + private Map blobsInSnapshot(Path repositoryLocation, String snapshotId) throws IOException { + final Map blobsPerShard = new HashMap<>(); + Files.walkFileTree(repositoryLocation.resolve("indices"), new SimpleFileVisitor() { + @Override + public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException { + final String fileName = file.getFileName().toString(); + if (fileName.equals("snap-" + snapshotId + ".dat")) { + blobsPerShard.put( + String.join( + "/", + snapshotId, + file.getParent().getParent().getFileName().toString(), + file.getParent().getFileName().toString() + ), + INDEX_SHARD_SNAPSHOT_FORMAT.deserialize(fileName, xContentRegistry(), Streams.readFully(Files.newInputStream(file))) + ); + } + return FileVisitResult.CONTINUE; + } + }); + return org.elasticsearch.common.collect.Map.copyOf(blobsPerShard); + } + + private void assertCachedBlobsInSystemIndex(final String repositoryName, final Map blobsInSnapshot) + throws Exception { + assertBusy(() -> { + refreshSystemIndex(); + + long numberOfCachedBlobs = 0L; + for (Map.Entry blob : blobsInSnapshot.entrySet()) { + for (BlobStoreIndexShardSnapshot.FileInfo fileInfo : blob.getValue().indexFiles()) { + if (fileInfo.name().startsWith("__") == false) { + continue; + } + + final String path = String.join("/", repositoryName, blob.getKey(), fileInfo.physicalName()); + if (fileInfo.length() <= BlobStoreCacheService.DEFAULT_CACHED_BLOB_SIZE * 2) { + // file has been fully cached + final GetResponse getResponse = systemClient().prepareGet( + SNAPSHOT_BLOB_CACHE_INDEX, + SINGLE_MAPPING_NAME, + path + "/@0" + ).get(); + assertThat("not cached: [" + path + "/@0] for blob [" + fileInfo + "]", getResponse.isExists(), is(true)); + final CachedBlob cachedBlob = CachedBlob.fromSource(getResponse.getSourceAsMap()); + assertThat(cachedBlob.from(), equalTo(0L)); + assertThat(cachedBlob.to(), equalTo(fileInfo.length())); + assertThat((long) cachedBlob.length(), equalTo(fileInfo.length())); + numberOfCachedBlobs += 1; + + } else { + // first region of file has been cached + GetResponse getResponse = systemClient().prepareGet(SNAPSHOT_BLOB_CACHE_INDEX, SINGLE_MAPPING_NAME, path + "/@0") + .get(); + assertThat( + "not cached: [" + path + "/@0] for first region of blob [" + fileInfo + "]", + getResponse.isExists(), + is(true) + ); + + CachedBlob cachedBlob = CachedBlob.fromSource(getResponse.getSourceAsMap()); + assertThat(cachedBlob.from(), equalTo(0L)); + assertThat(cachedBlob.to(), equalTo((long) BlobStoreCacheService.DEFAULT_CACHED_BLOB_SIZE)); + assertThat(cachedBlob.length(), equalTo(BlobStoreCacheService.DEFAULT_CACHED_BLOB_SIZE)); + numberOfCachedBlobs += 1; + } + } + } + + refreshSystemIndex(); + assertHitCount(systemClient().prepareSearch(SNAPSHOT_BLOB_CACHE_INDEX).setSize(0).get(), numberOfCachedBlobs); + }); + } + + /** + * This plugin declares an {@link AllocationDecider} that forces searchable snapshot shards to be allocated after + * the primary shards of the snapshot blob cache index are started. This way we can ensure that searchable snapshot + * shards can use the snapshot blob cache index after the cluster restarted. + */ + public static class WaitForSnapshotBlobCacheShardsActivePlugin extends Plugin implements ClusterPlugin { + + public static Setting ENABLED = Setting.boolSetting( + "wait_for_snapshot_blob_cache_shards_active.enabled", + false, + Setting.Property.NodeScope + ); + + @Override + public List> getSettings() { + return org.elasticsearch.common.collect.List.of(ENABLED); + } + + @Override + public Collection createAllocationDeciders(Settings settings, ClusterSettings clusterSettings) { + if (ENABLED.get(settings) == false) { + return org.elasticsearch.common.collect.List.of(); + } + final String name = "wait_for_snapshot_blob_cache_shards_active"; + return org.elasticsearch.common.collect.List.of(new AllocationDecider() { + + @Override + public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { + return canAllocate(shardRouting, allocation); + } + + @Override + public Decision canAllocate(ShardRouting shardRouting, RoutingAllocation allocation) { + final IndexMetadata indexMetadata = allocation.metadata().index(shardRouting.index()); + if (SearchableSnapshotsConstants.isSearchableSnapshotStore(indexMetadata.getSettings()) == false) { + return allocation.decision(Decision.YES, name, "index is not a searchable snapshot shard - can allocate"); + } + if (allocation.metadata().hasIndex(SNAPSHOT_BLOB_CACHE_INDEX) == false) { + return allocation.decision(Decision.YES, name, SNAPSHOT_BLOB_CACHE_INDEX + " is not created yet"); + } + if (allocation.routingTable().hasIndex(SNAPSHOT_BLOB_CACHE_INDEX) == false) { + return allocation.decision(Decision.THROTTLE, name, SNAPSHOT_BLOB_CACHE_INDEX + " is not active yet"); + } + final IndexRoutingTable indexRoutingTable = allocation.routingTable().index(SNAPSHOT_BLOB_CACHE_INDEX); + if (indexRoutingTable.allPrimaryShardsActive() == false) { + return allocation.decision(Decision.THROTTLE, name, SNAPSHOT_BLOB_CACHE_INDEX + " is not active yet"); + } + return allocation.decision(Decision.YES, name, "primary shard for this replica is already active"); + } + }); + } + } +} diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryStatsTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryStatsTests.java index d696ef53681a9..6410428cc0f80 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryStatsTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryStatsTests.java @@ -26,6 +26,8 @@ import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot; import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo; import org.elasticsearch.index.store.cache.TestUtils; +import org.elasticsearch.index.store.cache.TestUtils.NoopBlobStoreCacheService; +import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.indices.recovery.SearchableSnapshotRecoveryState; import org.elasticsearch.repositories.IndexId; @@ -126,20 +128,18 @@ public void testCachedBytesReadsAndWrites() { assertBusy(() -> { assertThat(inputStats.getCachedBytesWritten(), notNullValue()); assertThat(inputStats.getCachedBytesWritten().total(), equalTo(length)); - assertThat(inputStats.getCachedBytesWritten().count(), equalTo(cachedBytesWriteCount)); + final long actualWriteCount = inputStats.getCachedBytesWritten().count(); + assertThat(actualWriteCount, lessThanOrEqualTo(cachedBytesWriteCount)); assertThat(inputStats.getCachedBytesWritten().min(), greaterThan(0L)); - assertThat( - inputStats.getCachedBytesWritten().max(), - (length < rangeSize.getBytes()) ? equalTo(length) : equalTo(rangeSize.getBytes()) - ); + assertThat(inputStats.getCachedBytesWritten().max(), lessThanOrEqualTo(length)); assertThat( inputStats.getCachedBytesWritten().totalNanoseconds(), allOf( // each read takes at least FAKE_CLOCK_ADVANCE_NANOS time - greaterThanOrEqualTo(FAKE_CLOCK_ADVANCE_NANOS * cachedBytesWriteCount), + greaterThanOrEqualTo(FAKE_CLOCK_ADVANCE_NANOS * actualWriteCount), // worst case: we start all reads before finishing any of them - lessThanOrEqualTo(FAKE_CLOCK_ADVANCE_NANOS * cachedBytesWriteCount * cachedBytesWriteCount) + lessThanOrEqualTo(FAKE_CLOCK_ADVANCE_NANOS * actualWriteCount * actualWriteCount) ) ); }); @@ -148,10 +148,7 @@ public void testCachedBytesReadsAndWrites() { assertThat(inputStats.getCachedBytesRead().total(), greaterThanOrEqualTo(length)); assertThat(inputStats.getCachedBytesRead().count(), greaterThan(0L)); assertThat(inputStats.getCachedBytesRead().min(), greaterThan(0L)); - assertThat( - inputStats.getCachedBytesRead().max(), - (length < rangeSize.getBytes()) ? lessThanOrEqualTo(length) : lessThanOrEqualTo(rangeSize.getBytes()) - ); + assertThat(inputStats.getCachedBytesRead().max(), lessThanOrEqualTo(length)); assertCounter(inputStats.getDirectBytesRead(), 0L, 0L, 0L, 0L); assertThat(inputStats.getDirectBytesRead().totalNanoseconds(), equalTo(0L)); @@ -323,7 +320,7 @@ public void testReadBytesContiguously() { final IndexInputStats inputStats = cacheDirectory.getStats(fileName); // account for the CacheBufferedIndexInput internal buffer - final long bufferSize = (long) BufferedIndexInput.bufferSize(ioContext); + final long bufferSize = BufferedIndexInput.bufferSize(ioContext); final long remaining = input.length() % bufferSize; final long expectedTotal = input.length(); final long expectedCount = input.length() / bufferSize + (remaining > 0L ? 1L : 0L); @@ -615,6 +612,7 @@ private static void executeTestCase( throw new UncheckedIOException(e); } final ShardPath shardPath = new ShardPath(false, shardDir, shardDir, shardId); + final DiscoveryNode discoveryNode = new DiscoveryNode("_id", buildNewFakeTransportAddress(), Version.CURRENT); final Path cacheDir = createTempDir(); try ( @@ -622,6 +620,8 @@ private static void executeTestCase( SearchableSnapshotDirectory directory = new SearchableSnapshotDirectory( () -> blobContainer, () -> snapshot, + new NoopBlobStoreCacheService(), + "_repo", snapshotId, indexId, shardId, diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryTests.java index e8886df30f3d9..16ea65a0d4af2 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryTests.java @@ -597,6 +597,8 @@ protected void assertSnapshotOrGenericThread() { SearchableSnapshotDirectory snapshotDirectory = new SearchableSnapshotDirectory( () -> blobContainer, () -> snapshot, + new TestUtils.NoopBlobStoreCacheService(), + "_repo", snapshotId, indexId, shardId, @@ -692,6 +694,8 @@ public void testClearCache() throws Exception { SearchableSnapshotDirectory directory = new SearchableSnapshotDirectory( () -> blobContainer, () -> snapshot, + new TestUtils.NoopBlobStoreCacheService(), + "_repo", snapshotId, indexId, shardId, @@ -762,7 +766,7 @@ public void testRequiresAdditionalSettings() { final IndexSettings indexSettings = new IndexSettings(IndexMetadata.builder("test").settings(settings).build(), Settings.EMPTY); expectThrows( IllegalArgumentException.class, - () -> SearchableSnapshotDirectory.create(null, null, indexSettings, null, null, null) + () -> SearchableSnapshotDirectory.create(null, null, indexSettings, null, null, null, null) ); } } diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInputTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInputTests.java index 3bf551831698a..395ec19372e8b 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInputTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInputTests.java @@ -8,9 +8,9 @@ import org.apache.lucene.store.IndexInput; import org.elasticsearch.Version; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.TestShardRouting; +import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.support.FilterBlobContainer; import org.elasticsearch.common.lucene.store.ESIndexInputTestCase; @@ -22,6 +22,7 @@ import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot; import org.elasticsearch.index.store.SearchableSnapshotDirectory; import org.elasticsearch.index.store.StoreFileMetadata; +import org.elasticsearch.index.store.cache.TestUtils.NoopBlobStoreCacheService; import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.indices.recovery.SearchableSnapshotRecoveryState; import org.elasticsearch.repositories.IndexId; @@ -51,6 +52,7 @@ import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_CACHE_PREWARM_ENABLED_SETTING; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.notNullValue; public class CachedBlobContainerIndexInputTests extends ESIndexInputTestCase { @@ -66,6 +68,7 @@ public void testRandomReads() throws IOException { for (int i = 0; i < 5; i++) { final String fileName = randomAlphaOfLength(10); + final byte[] input = randomUnicodeOfLength(randomIntBetween(1, 100_000)).getBytes(StandardCharsets.UTF_8); final String blobName = randomUnicodeOfLength(10); @@ -104,6 +107,8 @@ public void testRandomReads() throws IOException { SearchableSnapshotDirectory directory = new SearchableSnapshotDirectory( () -> blobContainer, () -> snapshot, + new NoopBlobStoreCacheService(), + "_repo", snapshotId, indexId, shardId, @@ -118,13 +123,6 @@ public void testRandomReads() throws IOException { threadPool ) ) { - ShardRouting shardRouting = TestShardRouting.newShardRouting( - randomAlphaOfLength(10), - 0, - randomAlphaOfLength(10), - true, - ShardRoutingState.INITIALIZING - ); RecoveryState recoveryState = createRecoveryState(); final boolean loaded = directory.loadSnapshot(recoveryState); assertThat("Failed to load snapshot", loaded, is(true)); @@ -142,9 +140,9 @@ public void testRandomReads() throws IOException { if (blobContainer instanceof CountingBlobContainer) { long numberOfRanges = TestUtils.numberOfRanges(input.length, cacheService.getRangeSize()); assertThat( - "Expected " + numberOfRanges + " ranges fetched from the source", + "Expected at most " + numberOfRanges + " ranges fetched from the source", ((CountingBlobContainer) blobContainer).totalOpens.sum(), - equalTo(numberOfRanges) + lessThanOrEqualTo(numberOfRanges) ); assertThat( "All bytes should have been read from source", @@ -195,6 +193,8 @@ public void testThrowsEOFException() throws IOException { SearchableSnapshotDirectory searchableSnapshotDirectory = new SearchableSnapshotDirectory( () -> blobContainer, () -> snapshot, + new NoopBlobStoreCacheService(), + "_repo", snapshotId, indexId, shardId, @@ -270,7 +270,7 @@ private static class CountingBlobContainer extends FilterBlobContainer { @Override public InputStream readBlob(String blobName, long position, long length) throws IOException { - return new CountingInputStream(this, super.readBlob(blobName, position, length), length, rangeSize); + return new CountingInputStream(this, super.readBlob(blobName, position, length)); } @Override @@ -292,19 +292,15 @@ public InputStream readBlob(String name) { private static class CountingInputStream extends FilterInputStream { private final CountingBlobContainer container; - private final int rangeSize; - private final long length; private long bytesRead = 0L; private long position = 0L; private long start = Long.MAX_VALUE; private long end = Long.MIN_VALUE; - CountingInputStream(CountingBlobContainer container, InputStream input, long length, int rangeSize) { + CountingInputStream(CountingBlobContainer container, InputStream input) { super(input); this.container = Objects.requireNonNull(container); - this.rangeSize = rangeSize; - this.length = length; this.container.totalOpens.increment(); } @@ -346,30 +342,6 @@ public int read(byte[] b, int offset, int len) throws IOException { @Override public void close() throws IOException { in.close(); - if (start % rangeSize != 0) { - throw new AssertionError("Read operation should start at the beginning of a range"); - } - if (end % rangeSize != 0) { - if (end != length) { - throw new AssertionError("Read operation should finish at the end of a range or the end of the file"); - } - } - if (length <= rangeSize) { - if (bytesRead != length) { - throw new AssertionError("All [" + length + "] bytes should have been read, no more no less but got:" + bytesRead); - } - } else { - if (bytesRead != rangeSize) { - if (end != length) { - throw new AssertionError("Expecting [" + rangeSize + "] bytes to be read but got:" + bytesRead); - - } - final long remaining = length % rangeSize; - if (bytesRead != remaining) { - throw new AssertionError("Expecting [" + remaining + "] bytes to be read but got:" + bytesRead); - } - } - } this.container.totalBytes.add(bytesRead); } } diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/SparseFileTrackerTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/SparseFileTrackerTests.java index 808613fb52a19..01e539f0abce6 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/SparseFileTrackerTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/SparseFileTrackerTests.java @@ -220,17 +220,21 @@ public void testCallsListenerWhenRangeIsAvailable() { if (pending == false) { final AtomicBoolean wasNotified = new AtomicBoolean(); - final List gaps = sparseFileTracker.waitForRange( - range, - subRange, - ActionListener.wrap(ignored -> assertTrue(wasNotified.compareAndSet(false, true)), e -> { throw new AssertionError(e); }) + final ActionListener listener = ActionListener.wrap( + ignored -> assertTrue(wasNotified.compareAndSet(false, true)), + e -> { throw new AssertionError(e); } ); + final List gaps = sparseFileTracker.waitForRange(range, subRange, listener); assertTrue( "All bytes of the sub range " + subRange + " are available, listener must be executed immediately", wasNotified.get() ); + wasNotified.set(false); + assertTrue(sparseFileTracker.waitForRangeIfPending(subRange, listener)); + assertTrue(wasNotified.get()); + for (final SparseFileTracker.Gap gap : gaps) { assertThat(gap.start(), greaterThanOrEqualTo(range.v1())); assertThat(gap.end(), lessThanOrEqualTo(range.v2())); @@ -238,13 +242,19 @@ public void testCallsListenerWhenRangeIsAvailable() { for (long i = gap.start(); i < gap.end(); i++) { assertThat(fileContents[Math.toIntExact(i)], equalTo(UNAVAILABLE)); fileContents[Math.toIntExact(i)] = AVAILABLE; - assertTrue(wasNotified.get()); gap.onProgress(i + 1L); } gap.onCompletion(); } } else { + final AtomicBoolean waitIfPendingWasNotified = new AtomicBoolean(); + final ActionListener waitIfPendingListener = ActionListener.wrap( + ignored -> assertTrue(waitIfPendingWasNotified.compareAndSet(false, true)), + e -> { throw new AssertionError(e); } + ); + assertFalse(sparseFileTracker.waitForRangeIfPending(subRange, waitIfPendingListener)); + final AtomicBoolean wasNotified = new AtomicBoolean(); final AtomicBoolean expectNotification = new AtomicBoolean(); final List gaps = sparseFileTracker.waitForRange(range, subRange, ActionListener.wrap(ignored -> { @@ -254,6 +264,9 @@ public void testCallsListenerWhenRangeIsAvailable() { assertFalse("Listener should not have been executed yet", wasNotified.get()); + assertTrue(sparseFileTracker.waitForRangeIfPending(subRange, waitIfPendingListener)); + assertFalse(waitIfPendingWasNotified.get()); + long triggeringProgress = -1L; for (long i = subRange.v1(); i < subRange.v2(); i++) { if (fileContents[Math.toIntExact(i)] == UNAVAILABLE) { @@ -278,7 +291,7 @@ public void testCallsListenerWhenRangeIsAvailable() { + "] is reached, but it was triggered after progress got updated to [" + i + ']', - wasNotified.get(), + wasNotified.get() && waitIfPendingWasNotified.get(), equalTo(triggeringProgress < i) ); @@ -290,7 +303,7 @@ public void testCallsListenerWhenRangeIsAvailable() { + "] is reached, but it was triggered after progress got updated to [" + i + ']', - wasNotified.get(), + wasNotified.get() && waitIfPendingWasNotified.get(), equalTo(triggeringProgress < i + 1L) ); } @@ -305,8 +318,10 @@ public void testCallsListenerWhenRangeIsAvailable() { wasNotified.get(), equalTo(triggeringProgress < gap.end()) ); + assertThat(waitIfPendingWasNotified.get(), equalTo(triggeringProgress < gap.end())); } assertTrue(wasNotified.get()); + assertTrue(waitIfPendingWasNotified.get()); } final AtomicBoolean wasNotified = new AtomicBoolean(); @@ -430,34 +445,44 @@ private static void waitForRandomRange( final AtomicBoolean listenerCalled = new AtomicBoolean(); listenerCalledConsumer.accept(listenerCalled); - final boolean useSubRange = randomBoolean(); + final boolean fillInGaps = randomBoolean(); + final boolean useSubRange = fillInGaps && randomBoolean(); final long subRangeStart = useSubRange ? randomLongBetween(rangeStart, rangeEnd) : rangeStart; final long subRangeEnd = useSubRange ? randomLongBetween(subRangeStart, rangeEnd) : rangeEnd; - final List gaps = sparseFileTracker.waitForRange( - Tuple.tuple(rangeStart, rangeEnd), - Tuple.tuple(subRangeStart, subRangeEnd), - new ActionListener() { - @Override - public void onResponse(Void aVoid) { - for (long i = subRangeStart; i < subRangeEnd; i++) { - assertThat(fileContents[Math.toIntExact(i)], equalTo(AVAILABLE)); - } - assertTrue(listenerCalled.compareAndSet(false, true)); + final ActionListener actionListener = new ActionListener() { + @Override + public void onResponse(Void aVoid) { + for (long i = subRangeStart; i < subRangeEnd; i++) { + assertThat(fileContents[Math.toIntExact(i)], equalTo(AVAILABLE)); } + assertTrue(listenerCalled.compareAndSet(false, true)); + } - @Override - public void onFailure(Exception e) { - assertTrue(listenerCalled.compareAndSet(false, true)); - } + @Override + public void onFailure(Exception e) { + assertTrue(listenerCalled.compareAndSet(false, true)); } - ); + }; - for (final SparseFileTracker.Gap gap : gaps) { - for (long i = gap.start(); i < gap.end(); i++) { - assertThat(Long.toString(i), fileContents[Math.toIntExact(i)], equalTo(UNAVAILABLE)); + if (randomBoolean()) { + final List gaps = sparseFileTracker.waitForRange( + Tuple.tuple(rangeStart, rangeEnd), + Tuple.tuple(subRangeStart, subRangeEnd), + actionListener + ); + + for (final SparseFileTracker.Gap gap : gaps) { + for (long i = gap.start(); i < gap.end(); i++) { + assertThat(Long.toString(i), fileContents[Math.toIntExact(i)], equalTo(UNAVAILABLE)); + } + gapConsumer.accept(gap); + } + } else { + final boolean listenerRegistered = sparseFileTracker.waitForRangeIfPending(Tuple.tuple(rangeStart, rangeEnd), actionListener); + if (listenerRegistered == false) { + assertTrue(listenerCalled.compareAndSet(false, true)); } - gapConsumer.accept(gap); } } diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/TestUtils.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/TestUtils.java index ea8a58b0e7e8c..746e0f6956b0b 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/TestUtils.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/TestUtils.java @@ -5,10 +5,15 @@ */ package org.elasticsearch.index.store.cache; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.blobstore.cache.BlobStoreCacheService; +import org.elasticsearch.blobstore.cache.CachedBlob; +import org.elasticsearch.client.Client; import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobMetadata; import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.DeleteResult; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; @@ -29,6 +34,8 @@ import static java.util.Arrays.asList; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public final class TestUtils { private TestUtils() {} @@ -210,4 +217,34 @@ private UnsupportedOperationException unsupportedException() { return new UnsupportedOperationException("This operation is not supported"); } } + + private static Client mockClient() { + final Client client = mock(Client.class); + when(client.settings()).thenReturn(Settings.EMPTY); + return client; + } + + public static class NoopBlobStoreCacheService extends BlobStoreCacheService { + + public NoopBlobStoreCacheService() { + super(null, null, mockClient(), null); + } + + @Override + protected void getAsync(String repository, String name, String path, long offset, ActionListener listener) { + listener.onResponse(CachedBlob.CACHE_NOT_READY); + } + + @Override + public void putAsync( + String repository, + String name, + String path, + long offset, + BytesReference content, + ActionListener listener + ) { + listener.onResponse(null); + } + } } diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/BaseSearchableSnapshotsIntegTestCase.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/BaseSearchableSnapshotsIntegTestCase.java index 368a8209a407c..bebeac728be0b 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/BaseSearchableSnapshotsIntegTestCase.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/BaseSearchableSnapshotsIntegTestCase.java @@ -24,24 +24,35 @@ */ package org.elasticsearch.xpack.searchablesnapshots; +import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; +import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.index.IndexSettings; import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.repositories.fs.FsRepository; +import org.elasticsearch.snapshots.SnapshotId; +import org.elasticsearch.snapshots.SnapshotInfo; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.xpack.core.XPackSettings; +import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotAction; +import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotRequest; import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService; import java.nio.file.Path; import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.Locale; import static org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING; import static org.elasticsearch.license.LicenseService.SELF_GENERATED_LICENSE_TYPE; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; public abstract class BaseSearchableSnapshotsIntegTestCase extends ESIntegTestCase { @Override @@ -94,6 +105,58 @@ protected Settings transportClientSettings() { return builder.build(); } + protected void createFsRepository(String repositoryName, Path location) { + createRepository( + repositoryName, + FsRepository.TYPE, + Settings.builder().put(FsRepository.LOCATION_SETTING.getKey(), location).build(), + true + ); + } + + protected void createRepository(String repositoryName, String repositoryType, Settings repositorySettings, boolean verify) { + assertAcked( + client().admin() + .cluster() + .preparePutRepository(repositoryName) + .setType(repositoryType) + .setSettings(repositorySettings) + .setVerify(verify) + ); + } + + protected SnapshotId createSnapshot(String repositoryName, List indices) { + final CreateSnapshotResponse response = client().admin() + .cluster() + .prepareCreateSnapshot(repositoryName, randomAlphaOfLength(10).toLowerCase(Locale.ROOT)) + .setIndices(indices.toArray(Strings.EMPTY_ARRAY)) + .setWaitForCompletion(true) + .get(); + + final SnapshotInfo snapshotInfo = response.getSnapshotInfo(); + assertThat(snapshotInfo.successfulShards(), greaterThan(0)); + assertThat(snapshotInfo.failedShards(), equalTo(0)); + return snapshotInfo.snapshotId(); + } + + protected String mountSnapshot(String repositoryName, String snapshotName, String indexName, Settings indexSettings) throws Exception { + final String restoredIndexName = randomBoolean() ? indexName : randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + final MountSearchableSnapshotRequest mountRequest = new MountSearchableSnapshotRequest( + restoredIndexName, + repositoryName, + snapshotName, + indexName, + Settings.builder().put(IndexSettings.INDEX_CHECK_ON_STARTUP.getKey(), Boolean.FALSE.toString()).put(indexSettings).build(), + Strings.EMPTY_ARRAY, + true + ); + + final RestoreSnapshotResponse restoreResponse = client().execute(MountSearchableSnapshotAction.INSTANCE, mountRequest).get(); + assertThat(restoreResponse.getRestoreInfo().successfulShards(), equalTo(getNumShards(restoredIndexName).numPrimaries)); + assertThat(restoreResponse.getRestoreInfo().failedShards(), equalTo(0)); + return restoredIndexName; + } + protected void createRepo(String fsRepoName) { final Path repo = randomRepoPath(); assertAcked( diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotRecoveryStateIntegrationTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotRecoveryStateIntegrationTests.java index a95103488a1df..cd4e5e3ce68c0 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotRecoveryStateIntegrationTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotRecoveryStateIntegrationTests.java @@ -122,6 +122,8 @@ public void testRecoveryStateRecoveredBytesMatchPhysicalCacheState() throws Exce assertThat("Physical cache size doesn't match with recovery state data", physicalCacheSize, equalTo(recoveredBytes)); assertThat("Expected to recover 100% of files", recoveryState.getIndex().recoveredBytesPercent(), equalTo(100.0f)); + + assertAcked(client().admin().indices().prepareDelete(restoredIndexName)); } @SuppressForbidden(reason = "Uses FileSystem APIs") diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsIntegTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsIntegTests.java index a5baf5898fd5a..c6fadc94983f7 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsIntegTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsIntegTests.java @@ -11,7 +11,6 @@ import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; -import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse; import org.elasticsearch.action.admin.indices.shrink.ResizeType; @@ -425,6 +424,8 @@ public void testCanMountSnapshotTakenWhileConcurrentlyIndexing() throws Exceptio final RestoreSnapshotResponse restoreSnapshotResponse = client().execute(MountSearchableSnapshotAction.INSTANCE, req).get(); assertThat(restoreSnapshotResponse.getRestoreInfo().failedShards(), equalTo(0)); ensureGreen(restoredIndexName); + + assertAcked(client().admin().indices().prepareDelete(restoredIndexName)); } public void testMaxRestoreBytesPerSecIsUsed() throws Exception { @@ -507,6 +508,8 @@ public void testMaxRestoreBytesPerSecIsUsed() throws Exception { ); } } + + assertAcked(client().admin().indices().prepareDelete(restoredIndexName)); } private Map getMaxShardSizeByNodeInBytes(String indexName) { @@ -707,11 +710,11 @@ private void assertRecoveryStats(String indexName, boolean preWarmEnabled) { for (List recoveryStates : recoveryResponse.shardRecoveryStates().values()) { for (RecoveryState recoveryState : recoveryStates) { - ByteSizeValue cacheSize = getCacheSizeForShard(recoveryState.getShardId()); + ByteSizeValue cacheSize = getCacheSizeForNode(recoveryState.getTargetNode().getName()); boolean unboundedCache = cacheSize.equals(new ByteSizeValue(Long.MAX_VALUE, ByteSizeUnit.BYTES)); RecoveryState.Index index = recoveryState.getIndex(); assertThat( - Strings.toString(recoveryState), + Strings.toString(recoveryState, true, true), index.recoveredFileCount(), preWarmEnabled && unboundedCache ? equalTo(index.totalRecoverFiles()) : greaterThanOrEqualTo(0) ); @@ -784,18 +787,22 @@ private void assertSearchableSnapshotStats(String indexName, boolean cacheEnable if (cacheEnabled == false || nonCachedExtensions.contains(IndexFileNames.getExtension(fileName))) { assertThat( "Expected at least 1 optimized or direct read for " + fileName + " of shard " + shardRouting, - Math.max(indexInputStats.getOptimizedBytesRead().getCount(), indexInputStats.getDirectBytesRead().getCount()), + max(indexInputStats.getOptimizedBytesRead().getCount(), indexInputStats.getDirectBytesRead().getCount()), greaterThan(0L) ); assertThat( "Expected no cache read or write for " + fileName + " of shard " + shardRouting, - Math.max(indexInputStats.getCachedBytesRead().getCount(), indexInputStats.getCachedBytesWritten().getCount()), + max(indexInputStats.getCachedBytesRead().getCount(), indexInputStats.getCachedBytesWritten().getCount()), equalTo(0L) ); } else if (nodeIdsWithLargeEnoughCache.contains(stats.getShardRouting().currentNodeId())) { assertThat( "Expected at least 1 cache read or write for " + fileName + " of shard " + shardRouting, - Math.max(indexInputStats.getCachedBytesRead().getCount(), indexInputStats.getCachedBytesWritten().getCount()), + max( + indexInputStats.getCachedBytesRead().getCount(), + indexInputStats.getCachedBytesWritten().getCount(), + indexInputStats.getIndexCacheBytesRead().getCount() + ), greaterThan(0L) ); assertThat( @@ -811,15 +818,12 @@ private void assertSearchableSnapshotStats(String indexName, boolean cacheEnable } else { assertThat( "Expected at least 1 read or write of any kind for " + fileName + " of shard " + shardRouting, - Math.max( - Math.max( - indexInputStats.getCachedBytesRead().getCount(), - indexInputStats.getCachedBytesWritten().getCount() - ), - Math.max( - indexInputStats.getOptimizedBytesRead().getCount(), - indexInputStats.getDirectBytesRead().getCount() - ) + max( + indexInputStats.getCachedBytesRead().getCount(), + indexInputStats.getCachedBytesWritten().getCount(), + indexInputStats.getOptimizedBytesRead().getCount(), + indexInputStats.getDirectBytesRead().getCount(), + indexInputStats.getIndexCacheBytesRead().getCount() ), greaterThan(0L) ); @@ -829,13 +833,11 @@ private void assertSearchableSnapshotStats(String indexName, boolean cacheEnable } } - private ByteSizeValue getCacheSizeForShard(ShardId shardId) { - ClusterStateResponse clusterStateResponse = client().admin().cluster().prepareState().setRoutingTable(true).setNodes(true).get(); - ClusterState clusterStateResponseState = clusterStateResponse.getState(); - String nodeId = clusterStateResponseState.getRoutingTable().shardRoutingTable(shardId).primaryShard().currentNodeId(); - DiscoveryNode discoveryNode = clusterStateResponseState.nodes().get(nodeId); + private static long max(long... values) { + return Arrays.stream(values).max().orElseThrow(() -> new AssertionError("no values")); + } - final Settings nodeSettings = internalCluster().getInstance(Environment.class, discoveryNode.getName()).settings(); - return CacheService.SNAPSHOT_CACHE_SIZE_SETTING.get(nodeSettings); + private ByteSizeValue getCacheSizeForNode(String nodeName) { + return CacheService.SNAPSHOT_CACHE_SIZE_SETTING.get(internalCluster().getInstance(Environment.class, nodeName).settings()); } } diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/action/SearchableSnapshotsStatsResponseTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/action/SearchableSnapshotsStatsResponseTests.java index fd04ce4b7f226..ad57c607d1722 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/action/SearchableSnapshotsStatsResponseTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/action/SearchableSnapshotsStatsResponseTests.java @@ -107,9 +107,12 @@ private static SearchableSnapshotShardStats.CacheIndexInputStats randomCacheInde randomCounter(), randomCounter(), randomCounter(), + randomCounter(), + randomTimedCounter(), randomTimedCounter(), randomTimedCounter(), - randomTimedCounter() + randomCounter(), + randomNonNegativeLong() ); } diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationUtils.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationUtils.java index d791e6611605d..4874967ed8d3f 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationUtils.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationUtils.java @@ -24,6 +24,7 @@ import static org.elasticsearch.xpack.core.ClientHelper.ASYNC_SEARCH_ORIGIN; import static org.elasticsearch.xpack.core.ClientHelper.ENRICH_ORIGIN; import static org.elasticsearch.xpack.core.ClientHelper.IDP_ORIGIN; +import static org.elasticsearch.xpack.core.ClientHelper.SEARCHABLE_SNAPSHOTS_ORIGIN; import static org.elasticsearch.xpack.core.ClientHelper.STACK_ORIGIN; import static org.elasticsearch.xpack.core.ClientHelper.TRANSFORM_ORIGIN; import static org.elasticsearch.xpack.core.ClientHelper.DEPRECATION_ORIGIN; @@ -121,6 +122,7 @@ public static void switchUserBasedOnActionOriginAndExecute(ThreadContext threadC case IDP_ORIGIN: case INGEST_ORIGIN: case STACK_ORIGIN: + case SEARCHABLE_SNAPSHOTS_ORIGIN: case TASKS_ORIGIN: // TODO use a more limited user for tasks securityContext.executeAsUser(XPackUser.INSTANCE, consumer, Version.CURRENT); break;