diff --git a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/blobstore/cache/SearchableSnapshotsBlobStoreCacheIntegTests.java b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/blobstore/cache/SearchableSnapshotsBlobStoreCacheIntegTests.java index 31f17f5730bbf..db7742ba273bb 100644 --- a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/blobstore/cache/SearchableSnapshotsBlobStoreCacheIntegTests.java +++ b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/blobstore/cache/SearchableSnapshotsBlobStoreCacheIntegTests.java @@ -11,6 +11,7 @@ import org.elasticsearch.action.admin.indices.refresh.RefreshResponse; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.Client; import org.elasticsearch.client.OriginSettingClient; import org.elasticsearch.cluster.metadata.IndexMetadata; @@ -28,23 +29,31 @@ import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.CollectionUtils; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.index.IndexNotFoundException; -import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.index.shard.IndexingStats; import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot; import org.elasticsearch.plugins.ClusterPlugin; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.repositories.blobstore.BlobStoreRepository; import org.elasticsearch.snapshots.SnapshotId; +import org.elasticsearch.snapshots.SnapshotsService; import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.xpack.cluster.routing.allocation.DataTierAllocationDecider; import org.elasticsearch.xpack.core.ClientHelper; +import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotRequest.Storage; import org.elasticsearch.xpack.core.searchablesnapshots.SearchableSnapshotShardStats; import org.elasticsearch.xpack.searchablesnapshots.BaseSearchableSnapshotsIntegTestCase; import org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots; import org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants; import org.elasticsearch.xpack.searchablesnapshots.action.SearchableSnapshotsStatsAction; import org.elasticsearch.xpack.searchablesnapshots.action.SearchableSnapshotsStatsRequest; +import org.elasticsearch.xpack.searchablesnapshots.cache.ByteRange; import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService; +import org.elasticsearch.xpack.searchablesnapshots.cache.FrozenCacheService; +import org.junit.AfterClass; +import org.junit.BeforeClass; import java.io.IOException; import java.nio.file.Files; @@ -67,6 +76,35 @@ public class SearchableSnapshotsBlobStoreCacheIntegTests extends BaseSearchableSnapshotsIntegTestCase { + private static Settings cacheSettings = null; + private static ByteSizeValue blobCacheMaxLength = null; + + @BeforeClass + public static void setUpCacheSettings() { + blobCacheMaxLength = new ByteSizeValue(randomLongBetween(64L, 128L), ByteSizeUnit.KB); + + final Settings.Builder builder = Settings.builder(); + // Cold (full copy) cache should be unlimited to not cause evictions + builder.put(CacheService.SNAPSHOT_CACHE_SIZE_SETTING.getKey(), new ByteSizeValue(Long.MAX_VALUE, ByteSizeUnit.BYTES)); + // Align ranges to match the blob cache max length + builder.put(CacheService.SNAPSHOT_CACHE_RANGE_SIZE_SETTING.getKey(), blobCacheMaxLength); + builder.put(CacheService.SNAPSHOT_CACHE_RECOVERY_RANGE_SIZE_SETTING.getKey(), blobCacheMaxLength); + + // Frozen (shared cache) cache should be large enough to not cause direct reads + builder.put(SnapshotsService.SNAPSHOT_CACHE_SIZE_SETTING.getKey(), ByteSizeValue.ofMb(128)); + // Align ranges to match the blob cache max length + builder.put(SnapshotsService.SNAPSHOT_CACHE_REGION_SIZE_SETTING.getKey(), blobCacheMaxLength); + builder.put(SnapshotsService.SHARED_CACHE_RANGE_SIZE_SETTING.getKey(), blobCacheMaxLength); + builder.put(FrozenCacheService.FROZEN_CACHE_RECOVERY_RANGE_SIZE_SETTING.getKey(), blobCacheMaxLength); + cacheSettings = builder.build(); + } + + @AfterClass + public static void tearDownCacheSettings() { + blobCacheMaxLength = null; + cacheSettings = null; + } + @Override protected Collection> nodePlugins() { return CollectionUtils.appendToCopy(super.nodePlugins(), WaitForSnapshotBlobCacheShardsActivePlugin.class); @@ -79,28 +117,25 @@ protected int numberOfReplicas() { @Override protected Settings nodeSettings(int nodeOrdinal) { - return Settings.builder() - .put(super.nodeSettings(nodeOrdinal)) - .put( - CacheService.SNAPSHOT_CACHE_RANGE_SIZE_SETTING.getKey(), - randomLongBetween(new ByteSizeValue(4, ByteSizeUnit.KB).getBytes(), new ByteSizeValue(20, ByteSizeUnit.KB).getBytes()) + "b" - ) - .put(CacheService.SNAPSHOT_CACHE_SIZE_SETTING.getKey(), new ByteSizeValue(Long.MAX_VALUE, ByteSizeUnit.BYTES)) - .build(); + return Settings.builder().put(super.nodeSettings(nodeOrdinal)).put(cacheSettings).build(); } public void testBlobStoreCache() throws Exception { final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); createIndex(indexName); - final List indexRequestBuilders = new ArrayList<>(); - for (int i = scaledRandomIntBetween(0, 10_000); i >= 0; i--) { - indexRequestBuilders.add(client().prepareIndex(indexName).setSource("text", randomUnicodeOfLength(10), "num", i)); - } - indexRandom(true, false, true, indexRequestBuilders); - final long numberOfDocs = indexRequestBuilders.size(); final NumShards numberOfShards = getNumShards(indexName); + final int numberOfDocs = scaledRandomIntBetween(0, 20_000); + if (numberOfDocs > 0) { + final List indexRequestBuilders = new ArrayList<>(); + for (int i = numberOfDocs; i > 0; i--) { + XContentBuilder builder = XContentFactory.smileBuilder(); + builder.startObject().field("text", randomRealisticUnicodeOfCodepointLengthBetween(5, 50)).field("num", i).endObject(); + indexRequestBuilders.add(client().prepareIndex(indexName).setSource(builder)); + } + indexRandom(true, true, true, indexRequestBuilders); + } if (randomBoolean()) { logger.info("--> force-merging index before snapshotting"); final ForceMergeResponse forceMergeResponse = client().admin() @@ -129,15 +164,25 @@ public void testBlobStoreCache() throws Exception { () -> systemClient().admin().indices().prepareGetIndex().addIndices(SNAPSHOT_BLOB_CACHE_INDEX).get() ); - logger.info("--> mount snapshot [{}] as an index for the first time", snapshot); - final String restoredIndex = mountSnapshot( + Storage storage = randomFrom(Storage.values()); + logger.info( + "--> mount snapshot [{}] as an index for the first time [storage={}, max length={}]", + snapshot, + storage, + blobCacheMaxLength.getStringRep() + ); + final String restoredIndex = randomBoolean() ? indexName : randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + mountSnapshot( repositoryName, snapshot.getName(), indexName, + restoredIndex, Settings.builder() .put(SearchableSnapshots.SNAPSHOT_CACHE_ENABLED_SETTING.getKey(), true) .put(SearchableSnapshots.SNAPSHOT_CACHE_PREWARM_ENABLED_SETTING.getKey(), false) - .build() + .put(SearchableSnapshots.SNAPSHOT_BLOB_CACHE_METADATA_FILES_MAX_LENGTH, blobCacheMaxLength) + .build(), + storage ); ensureGreen(restoredIndex); @@ -163,60 +208,62 @@ public void testBlobStoreCache() throws Exception { } logger.info("--> verifying cached documents in system index [{}]", SNAPSHOT_BLOB_CACHE_INDEX); - assertCachedBlobsInSystemIndex(repositoryName, blobsInSnapshot); - - logger.info("--> verifying system index [{}] data tiers preference", SNAPSHOT_BLOB_CACHE_INDEX); - assertThat( - systemClient().admin() - .indices() - .prepareGetSettings(SNAPSHOT_BLOB_CACHE_INDEX) - .get() - .getSetting(SNAPSHOT_BLOB_CACHE_INDEX, DataTierAllocationDecider.INDEX_ROUTING_PREFER), - equalTo(DATA_TIERS_CACHE_INDEX_PREFERENCE) - ); + if (numberOfDocs > 0) { + assertCachedBlobsInSystemIndex(repositoryName, blobsInSnapshot); + + logger.info("--> verifying system index [{}] data tiers preference", SNAPSHOT_BLOB_CACHE_INDEX); + assertThat( + systemClient().admin() + .indices() + .prepareGetSettings(SNAPSHOT_BLOB_CACHE_INDEX) + .get() + .getSetting(SNAPSHOT_BLOB_CACHE_INDEX, DataTierAllocationDecider.INDEX_ROUTING_PREFER), + equalTo(DATA_TIERS_CACHE_INDEX_PREFERENCE) + ); + } - final long numberOfCachedBlobs = systemClient().prepareSearch(SNAPSHOT_BLOB_CACHE_INDEX).get().getHits().getTotalHits().value; - final long numberOfCacheWrites = systemClient().admin() + final long numberOfCachedBlobs = systemClient().prepareSearch(SNAPSHOT_BLOB_CACHE_INDEX) + .setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN) + .get() + .getHits() + .getTotalHits().value; + IndexingStats indexingStats = systemClient().admin() .indices() .prepareStats(SNAPSHOT_BLOB_CACHE_INDEX) + .setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN) .clear() .setIndexing(true) .get() - .getTotal().indexing.getTotal().getIndexCount(); + .getTotal() + .getIndexing(); + final long numberOfCacheWrites = indexingStats != null ? indexingStats.getTotal().getIndexCount() : 0L; - logger.info("--> verifying documents in index [{}]", restoredIndex); + logger.info("--> verifying number of documents in index [{}]", restoredIndex); assertHitCount(client().prepareSearch(restoredIndex).setSize(0).setTrackTotalHits(true).get(), numberOfDocs); - assertHitCount( - client().prepareSearch(restoredIndex) - .setQuery(QueryBuilders.rangeQuery("num").lte(numberOfDocs)) - .setSize(0) - .setTrackTotalHits(true) - .get(), - numberOfDocs - ); - assertHitCount( - client().prepareSearch(restoredIndex) - .setQuery(QueryBuilders.rangeQuery("num").gt(numberOfDocs + 1)) - .setSize(0) - .setTrackTotalHits(true) - .get(), - 0L - ); - assertAcked(client().admin().indices().prepareDelete(restoredIndex)); - logger.info("--> mount snapshot [{}] as an index for the second time", snapshot); - final String restoredAgainIndex = mountSnapshot( + storage = randomFrom(Storage.values()); + logger.info("--> mount snapshot [{}] as an index for the second time [storage={}]", snapshot, storage); + final String restoredAgainIndex = randomBoolean() ? indexName : randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + mountSnapshot( repositoryName, snapshot.getName(), indexName, + restoredAgainIndex, Settings.builder() .put(SearchableSnapshots.SNAPSHOT_CACHE_ENABLED_SETTING.getKey(), true) .put(SearchableSnapshots.SNAPSHOT_CACHE_PREWARM_ENABLED_SETTING.getKey(), false) - .build() + .put(SearchableSnapshots.SNAPSHOT_BLOB_CACHE_METADATA_FILES_MAX_LENGTH, blobCacheMaxLength) + .build(), + storage ); ensureGreen(restoredAgainIndex); + logger.info("--> verifying cached documents (after second mount) in system index [{}]", SNAPSHOT_BLOB_CACHE_INDEX); + if (numberOfDocs > 0) { + assertCachedBlobsInSystemIndex(repositoryName, blobsInSnapshot); + } + logger.info("--> verifying shards of [{}] were started without using the blob store more than necessary", restoredAgainIndex); for (final SearchableSnapshotShardStats shardStats : client().execute( SearchableSnapshotsStatsAction.INSTANCE, @@ -225,44 +272,33 @@ public void testBlobStoreCache() throws Exception { for (final SearchableSnapshotShardStats.CacheIndexInputStats indexInputStats : shardStats.getStats()) { // we read the header of each file contained within the .cfs file, which could be anywhere final boolean mayReadMoreThanHeader = indexInputStats.getFileExt().equals("cfs"); - if (indexInputStats.getTotalSize() <= BlobStoreCacheService.DEFAULT_CACHED_BLOB_SIZE * 2 - || mayReadMoreThanHeader == false) { + if (mayReadMoreThanHeader == false) { assertThat(Strings.toString(indexInputStats), indexInputStats.getBlobStoreBytesRequested().getCount(), equalTo(0L)); } } } - logger.info("--> verifying documents in index [{}]", restoredAgainIndex); + logger.info("--> verifying number of documents in index [{}]", restoredAgainIndex); assertHitCount(client().prepareSearch(restoredAgainIndex).setSize(0).setTrackTotalHits(true).get(), numberOfDocs); - assertHitCount( - client().prepareSearch(restoredAgainIndex) - .setQuery(QueryBuilders.rangeQuery("num").lte(numberOfDocs)) - .setSize(0) - .setTrackTotalHits(true) - .get(), - numberOfDocs - ); - assertHitCount( - client().prepareSearch(restoredAgainIndex) - .setQuery(QueryBuilders.rangeQuery("num").gt(numberOfDocs + 1)) - .setSize(0) - .setTrackTotalHits(true) - .get(), - 0L - ); - - logger.info("--> verifying cached documents (again) in system index [{}]", SNAPSHOT_BLOB_CACHE_INDEX); - assertCachedBlobsInSystemIndex(repositoryName, blobsInSnapshot); logger.info("--> verifying that no extra cached blobs were indexed [{}]", SNAPSHOT_BLOB_CACHE_INDEX); - refreshSystemIndex(); - assertHitCount(systemClient().prepareSearch(SNAPSHOT_BLOB_CACHE_INDEX).setSize(0).get(), numberOfCachedBlobs); - assertThat( - systemClient().admin().indices().prepareStats(SNAPSHOT_BLOB_CACHE_INDEX).clear().setIndexing(true).get().getTotal().indexing - .getTotal() - .getIndexCount(), - equalTo(numberOfCacheWrites) + if (numberOfDocs > 0) { + refreshSystemIndex(); + } + assertHitCount( + systemClient().prepareSearch(SNAPSHOT_BLOB_CACHE_INDEX).setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN).setSize(0).get(), + numberOfCachedBlobs ); + indexingStats = systemClient().admin() + .indices() + .prepareStats(SNAPSHOT_BLOB_CACHE_INDEX) + .setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN) + .clear() + .setIndexing(true) + .get() + .getTotal() + .getIndexing(); + assertThat(indexingStats != null ? indexingStats.getTotal().getIndexCount() : 0L, equalTo(numberOfCacheWrites)); logger.info("--> restarting cluster"); internalCluster().fullRestart(new InternalTestCluster.RestartCallback() { @@ -276,36 +312,43 @@ public Settings onNodeStopped(String nodeName) throws Exception { }); ensureGreen(restoredAgainIndex); - logger.info("--> verifying documents in index [{}]", restoredAgainIndex); - assertHitCount(client().prepareSearch(restoredAgainIndex).setSize(0).setTrackTotalHits(true).get(), numberOfDocs); - assertHitCount( - client().prepareSearch(restoredAgainIndex) - .setQuery(QueryBuilders.rangeQuery("num").lte(numberOfDocs)) - .setSize(0) - .setTrackTotalHits(true) - .get(), - numberOfDocs - ); - assertHitCount( - client().prepareSearch(restoredAgainIndex) - .setQuery(QueryBuilders.rangeQuery("num").gt(numberOfDocs + 1)) - .setSize(0) - .setTrackTotalHits(true) - .get(), - 0L - ); - logger.info("--> verifying cached documents (after restart) in system index [{}]", SNAPSHOT_BLOB_CACHE_INDEX); - assertCachedBlobsInSystemIndex(repositoryName, blobsInSnapshot); + if (numberOfDocs > 0) { + assertCachedBlobsInSystemIndex(repositoryName, blobsInSnapshot); + } + + logger.info("--> shards of [{}] should start without downloading bytes from the blob store", restoredAgainIndex); + for (final SearchableSnapshotShardStats shardStats : client().execute( + SearchableSnapshotsStatsAction.INSTANCE, + new SearchableSnapshotsStatsRequest() + ).actionGet().getStats()) { + for (final SearchableSnapshotShardStats.CacheIndexInputStats indexInputStats : shardStats.getStats()) { + // we read the header of each file contained within the .cfs file, which could be anywhere + final boolean mayReadMoreThanHeader = indexInputStats.getFileExt().equals("cfs"); + if (mayReadMoreThanHeader == false) { + assertThat(Strings.toString(indexInputStats), indexInputStats.getBlobStoreBytesRequested().getCount(), equalTo(0L)); + } + } + } logger.info("--> verifying that no cached blobs were indexed in system index [{}] after restart", SNAPSHOT_BLOB_CACHE_INDEX); - assertHitCount(systemClient().prepareSearch(SNAPSHOT_BLOB_CACHE_INDEX).setSize(0).get(), numberOfCachedBlobs); - assertThat( - systemClient().admin().indices().prepareStats(SNAPSHOT_BLOB_CACHE_INDEX).clear().setIndexing(true).get().getTotal().indexing - .getTotal() - .getIndexCount(), - equalTo(0L) + assertHitCount( + systemClient().prepareSearch(SNAPSHOT_BLOB_CACHE_INDEX).setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN).setSize(0).get(), + numberOfCachedBlobs ); + indexingStats = systemClient().admin() + .indices() + .prepareStats(SNAPSHOT_BLOB_CACHE_INDEX) + .setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN) + .clear() + .setIndexing(true) + .get() + .getTotal() + .getIndexing(); + assertThat(indexingStats != null ? indexingStats.getTotal().getIndexCount() : 0L, equalTo(0L)); + + logger.info("--> verifying number of documents in index [{}]", restoredAgainIndex); + assertHitCount(client().prepareSearch(restoredAgainIndex).setSize(0).setTrackTotalHits(true).get(), numberOfDocs); // TODO also test when the index is frozen // TODO also test when prewarming is enabled @@ -352,6 +395,7 @@ private Map blobsInSnapshot(Path repository private void assertCachedBlobsInSystemIndex(final String repositoryName, final Map blobsInSnapshot) throws Exception { + final BlobStoreCacheService blobCacheService = internalCluster().getDataNodeInstance(BlobStoreCacheService.class); assertBusy(() -> { refreshSystemIndex(); @@ -362,32 +406,18 @@ private void assertCachedBlobsInSystemIndex(final String repositoryName, final M continue; } - final String path = String.join("/", repositoryName, blob.getKey(), fileInfo.physicalName()); - if (fileInfo.length() <= BlobStoreCacheService.DEFAULT_CACHED_BLOB_SIZE * 2) { - // file has been fully cached - final GetResponse getResponse = systemClient().prepareGet(SNAPSHOT_BLOB_CACHE_INDEX, path + "/@0").get(); - assertThat("not cached: [" + path + "/@0] for blob [" + fileInfo + "]", getResponse.isExists(), is(true)); - final CachedBlob cachedBlob = CachedBlob.fromSource(getResponse.getSourceAsMap()); - assertThat(cachedBlob.from(), equalTo(0L)); - assertThat(cachedBlob.to(), equalTo(fileInfo.length())); - assertThat((long) cachedBlob.length(), equalTo(fileInfo.length())); - numberOfCachedBlobs += 1; - - } else { - // first region of file has been cached - GetResponse getResponse = systemClient().prepareGet(SNAPSHOT_BLOB_CACHE_INDEX, path + "/@0").get(); - assertThat( - "not cached: [" + path + "/@0] for first region of blob [" + fileInfo + "]", - getResponse.isExists(), - is(true) - ); - - CachedBlob cachedBlob = CachedBlob.fromSource(getResponse.getSourceAsMap()); - assertThat(cachedBlob.from(), equalTo(0L)); - assertThat(cachedBlob.to(), equalTo((long) BlobStoreCacheService.DEFAULT_CACHED_BLOB_SIZE)); - assertThat(cachedBlob.length(), equalTo(BlobStoreCacheService.DEFAULT_CACHED_BLOB_SIZE)); - numberOfCachedBlobs += 1; - } + final String fileName = fileInfo.physicalName(); + final long length = fileInfo.length(); + final ByteRange expectedByteRange = blobCacheService.computeBlobCacheByteRange(fileName, length, blobCacheMaxLength); + final String path = String.join("/", repositoryName, blob.getKey(), fileName, "@" + expectedByteRange.start()); + + final GetResponse getResponse = systemClient().prepareGet(SNAPSHOT_BLOB_CACHE_INDEX, path).get(); + assertThat("Expected cached blob [" + path + "] for blob [" + fileInfo + "]", getResponse.isExists(), is(true)); + final CachedBlob cachedBlob = CachedBlob.fromSource(getResponse.getSourceAsMap()); + assertThat(cachedBlob.from(), equalTo(expectedByteRange.start())); + assertThat(cachedBlob.to(), equalTo(expectedByteRange.end())); + assertThat((long) cachedBlob.length(), equalTo(expectedByteRange.length())); + numberOfCachedBlobs += 1; } } diff --git a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/BaseSearchableSnapshotsIntegTestCase.java b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/BaseSearchableSnapshotsIntegTestCase.java index 25a1e430a8eb0..bee7a394896ca 100644 --- a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/BaseSearchableSnapshotsIntegTestCase.java +++ b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/BaseSearchableSnapshotsIntegTestCase.java @@ -26,6 +26,7 @@ import org.elasticsearch.snapshots.SnapshotsService; import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotAction; import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotRequest; +import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotRequest.Storage; import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService; import org.elasticsearch.xpack.searchablesnapshots.cache.FrozenCacheService; @@ -116,6 +117,17 @@ protected void mountSnapshot( String indexName, String restoredIndexName, Settings restoredIndexSettings + ) throws Exception { + mountSnapshot(repositoryName, snapshotName, indexName, restoredIndexName, restoredIndexSettings, Storage.FULL_COPY); + } + + protected void mountSnapshot( + String repositoryName, + String snapshotName, + String indexName, + String restoredIndexName, + Settings restoredIndexSettings, + final Storage storage ) throws Exception { final MountSearchableSnapshotRequest mountRequest = new MountSearchableSnapshotRequest( restoredIndexName, @@ -128,7 +140,7 @@ protected void mountSnapshot( .build(), Strings.EMPTY_ARRAY, true, - MountSearchableSnapshotRequest.Storage.FULL_COPY + storage ); final RestoreSnapshotResponse restoreResponse = client().execute(MountSearchableSnapshotAction.INSTANCE, mountRequest).get(); diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/blobstore/cache/BlobStoreCacheService.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/blobstore/cache/BlobStoreCacheService.java index 28db874c72a52..179e2ad39bac7 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/blobstore/cache/BlobStoreCacheService.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/blobstore/cache/BlobStoreCacheService.java @@ -10,6 +10,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.lucene.index.IndexFileNames; import org.elasticsearch.ElasticsearchTimeoutException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; @@ -23,15 +24,24 @@ import org.elasticsearch.client.Client; import org.elasticsearch.client.OriginSettingClient; import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.cache.Cache; +import org.elasticsearch.common.cache.CacheBuilder; import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.ConnectTransportException; +import org.elasticsearch.xpack.searchablesnapshots.cache.ByteRange; import java.time.Instant; +import java.util.Set; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; @@ -41,14 +51,24 @@ public class BlobStoreCacheService { private static final Logger logger = LogManager.getLogger(BlobStoreCacheService.class); - public static final int DEFAULT_CACHED_BLOB_SIZE = ByteSizeUnit.KB.toIntBytes(4); + /** + * Before 8.0.0 blobs were cached using a 4KB or 8KB maximum length. + */ + private static final Version OLD_CACHED_BLOB_SIZE_VERSION = Version.V_8_0_0; // TODO adjust after backport + public static final int DEFAULT_CACHED_BLOB_SIZE = ByteSizeUnit.KB.toIntBytes(1); + private static final Cache LOG_EXCEEDING_FILES_CACHE = CacheBuilder.builder() + .setExpireAfterAccess(TimeValue.timeValueMinutes(60L)) + .build(); + + private final ClusterService clusterService; private final ThreadPool threadPool; private final Client client; private final String index; - public BlobStoreCacheService(ThreadPool threadPool, Client client, String index) { + public BlobStoreCacheService(ClusterService clusterService, ThreadPool threadPool, Client client, String index) { this.client = new OriginSettingClient(client, SEARCHABLE_SNAPSHOTS_ORIGIN); + this.clusterService = clusterService; this.threadPool = threadPool; this.index = index; } @@ -155,4 +175,112 @@ public void onFailure(Exception e) { listener.onFailure(e); } } + + private static final Set METADATA_FILES_EXTENSIONS; + private static final Set OTHER_FILES_EXTENSIONS; + static { + // List of Lucene file extensions that are considered as "metadata" and should therefore be fully cached in the blob store cache. + // Those files are usually fully read by Lucene when it opens a Directory. + METADATA_FILES_EXTENSIONS = Set.of( + "cfe", // compound file's entry table + "dvm", // doc values metadata file + "fdm", // stored fields metadata file + "fnm", // field names metadata file + "kdm", // Lucene 8.6 point format metadata file + "nvm", // norms metadata file + "tmd", // Lucene 8.6 terms metadata file + "tvm", // terms vectors metadata file + "vem" // Lucene 9.0 indexed vectors metadata + ); + + // List of extensions for which Lucene usually only reads the first 1024 byte and checks a header checksum when opening a Directory. + OTHER_FILES_EXTENSIONS = Set.of( + "cfs", + "dii", + "dim", + "doc", + "dvd", + "fdt", + "fdx", + "kdd", + "kdi", + "liv", + "nvd", + "pay", + "pos", + "tim", + "tip", + "tvd", + "tvx", + "vec" + ); + assert Sets.intersection(METADATA_FILES_EXTENSIONS, OTHER_FILES_EXTENSIONS).isEmpty(); + } + + /** + * Computes the {@link ByteRange} corresponding to the header of a Lucene file. This range can vary depending of the type of the file + * which is indicated by the file's extension. The returned byte range can never be larger than the file's length but it can be smaller. + * + * For files that are declared as metadata files in {@link #METADATA_FILES_EXTENSIONS}, the header can be as large as the specified + * maximum metadata length parameter {@code maxMetadataLength}. Non-metadata files have a fixed length header of maximum 1KB. + * + * @param fileName the name of the file + * @param fileLength the length of the file + * @param maxMetadataLength the maximum accepted length for metadata files + * + * @return the header {@link ByteRange} + */ + public ByteRange computeBlobCacheByteRange(String fileName, long fileLength, ByteSizeValue maxMetadataLength) { + final String fileExtension = IndexFileNames.getExtension(fileName); + assert fileExtension == null || METADATA_FILES_EXTENSIONS.contains(fileExtension) || OTHER_FILES_EXTENSIONS.contains(fileExtension) + : "unknown Lucene file extension [" + fileExtension + "] - should it be considered a metadata file?"; + + if (useLegacyCachedBlobSizes()) { + if (fileLength <= ByteSizeUnit.KB.toBytes(8L)) { + return ByteRange.of(0L, fileLength); + } else { + return ByteRange.of(0L, ByteSizeUnit.KB.toBytes(4L)); + } + } + + if (METADATA_FILES_EXTENSIONS.contains(fileExtension)) { + final long maxAllowedLengthInBytes = maxMetadataLength.getBytes(); + if (fileLength > maxAllowedLengthInBytes) { + logExceedingFile(fileExtension, fileLength, maxMetadataLength); + } + return ByteRange.of(0L, Math.min(fileLength, maxAllowedLengthInBytes)); + } + return ByteRange.of(0L, Math.min(fileLength, DEFAULT_CACHED_BLOB_SIZE)); + } + + protected boolean useLegacyCachedBlobSizes() { + final Version minNodeVersion = clusterService.state().nodes().getMinNodeVersion(); + return minNodeVersion.before(OLD_CACHED_BLOB_SIZE_VERSION); + } + + private static void logExceedingFile(String extension, long length, ByteSizeValue maxAllowedLength) { + if (logger.isWarnEnabled()) { + try { + // Use of a cache to prevent too many log traces per hour + LOG_EXCEEDING_FILES_CACHE.computeIfAbsent(extension, key -> { + logger.warn( + "file with extension [{}] is larger ([{}]) than the max. length allowed [{}] to cache metadata files in blob cache", + extension, + length, + maxAllowedLength + ); + return key; + }); + } catch (ExecutionException e) { + logger.warn( + () -> new ParameterizedMessage( + "Failed to log information about exceeding file type [{}] with length [{}]", + extension, + length + ), + e + ); + } + } + } } diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/blobstore/cache/CachedBlob.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/blobstore/cache/CachedBlob.java index 699fddbac023b..323e60a9e91a6 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/blobstore/cache/CachedBlob.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/blobstore/cache/CachedBlob.java @@ -187,4 +187,27 @@ public static CachedBlob fromSource(final Map source) { to.longValue() ); } + + @Override + public String toString() { + return "CachedBlob [" + + "creationTime=" + + creationTime + + ", version=" + + version + + ", repository='" + + repository + + '\'' + + ", name='" + + name + + '\'' + + ", path='" + + path + + '\'' + + ", from=" + + from + + ", to=" + + to + + ']'; + } } diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/BaseSearchableSnapshotIndexInput.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/BaseSearchableSnapshotIndexInput.java index d426dd1daf32b..2564aa2a5e4da 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/BaseSearchableSnapshotIndexInput.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/BaseSearchableSnapshotIndexInput.java @@ -15,6 +15,7 @@ import org.elasticsearch.index.snapshots.blobstore.SlicedInputStream; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants; +import org.elasticsearch.xpack.searchablesnapshots.cache.ByteRange; import java.io.IOException; import java.io.InputStream; @@ -29,6 +30,7 @@ public abstract class BaseSearchableSnapshotIndexInput extends BufferedIndexInput { protected final Logger logger; + protected final SearchableSnapshotDirectory directory; protected final BlobContainer blobContainer; protected final FileInfo fileInfo; protected final IOContext context; @@ -36,6 +38,9 @@ public abstract class BaseSearchableSnapshotIndexInput extends BufferedIndexInpu protected final long offset; protected final long length; + /** Range of bytes that should be cached in the blob cache for the current index input **/ + protected final ByteRange blobCacheByteRange; + // the following are only mutable so they can be adjusted after cloning/slicing protected volatile boolean isClone; private AtomicBoolean closed; @@ -43,20 +48,23 @@ public abstract class BaseSearchableSnapshotIndexInput extends BufferedIndexInpu public BaseSearchableSnapshotIndexInput( Logger logger, String resourceDesc, - BlobContainer blobContainer, + SearchableSnapshotDirectory directory, FileInfo fileInfo, IOContext context, IndexInputStats stats, long offset, - long length + long length, + ByteRange blobCacheByteRange ) { super(resourceDesc, context); this.logger = Objects.requireNonNull(logger); - this.blobContainer = Objects.requireNonNull(blobContainer); + this.directory = Objects.requireNonNull(directory); + this.blobContainer = Objects.requireNonNull(directory.blobContainer()); this.fileInfo = Objects.requireNonNull(fileInfo); this.context = Objects.requireNonNull(context); assert fileInfo.metadata().hashEqualsContents() == false : "this method should only be used with blobs that are NOT stored in metadata's hash field " + "(fileInfo: " + fileInfo + ')'; + this.blobCacheByteRange = Objects.requireNonNull(blobCacheByteRange); this.stats = Objects.requireNonNull(stats); this.offset = offset; this.length = length; diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java index 50bf2b9495f6c..912d87c4f0279 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java @@ -34,6 +34,7 @@ import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.lucene.store.ByteArrayIndexInput; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.LazyInitializable; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; @@ -59,6 +60,7 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots; import org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants; +import org.elasticsearch.xpack.searchablesnapshots.cache.ByteRange; import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService; import org.elasticsearch.xpack.searchablesnapshots.cache.FrozenCacheService; import org.elasticsearch.xpack.searchablesnapshots.cache.FrozenCacheService.FrozenCacheFile; @@ -86,6 +88,7 @@ import static org.apache.lucene.store.BufferedIndexInput.bufferSize; import static org.elasticsearch.index.IndexModule.INDEX_STORE_TYPE_SETTING; +import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_BLOB_CACHE_METADATA_FILES_MAX_LENGTH_SETTING; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_CACHE_ENABLED_SETTING; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_CACHE_EXCLUDED_FILE_TYPES_SETTING; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_CACHE_PREWARM_ENABLED_SETTING; @@ -136,6 +139,7 @@ public class SearchableSnapshotDirectory extends BaseDirectory { private final AtomicBoolean closed; private final boolean partial; private final FrozenCacheService frozenCacheService; + private final ByteSizeValue blobStoreCacheMaxLength; // volatile fields are updated once under `this` lock, all together, iff loaded is not true. private volatile BlobStoreIndexShardSnapshot snapshot; @@ -179,6 +183,7 @@ public SearchableSnapshotDirectory( this.excludedFileTypes = new HashSet<>(SNAPSHOT_CACHE_EXCLUDED_FILE_TYPES_SETTING.get(indexSettings)); this.uncachedChunkSize = SNAPSHOT_UNCACHED_CHUNK_SIZE_SETTING.get(indexSettings).getBytes(); this.blobStoreCachePath = String.join("/", snapshotId.getUUID(), indexId.getId(), String.valueOf(shardId.id())); + this.blobStoreCacheMaxLength = SNAPSHOT_BLOB_CACHE_METADATA_FILES_MAX_LENGTH_SETTING.get(indexSettings); this.threadPool = threadPool; this.loaded = false; this.frozenCacheService = frozenCacheService; @@ -419,14 +424,7 @@ public IndexInput openInput(final String name, final IOContext context) throws I ); } } else { - return new DirectBlobContainerIndexInput( - blobContainer(), - fileInfo, - context, - inputStats, - getUncachedChunkSize(), - bufferSize(context) - ); + return new DirectBlobContainerIndexInput(this, fileInfo, context, inputStats, getUncachedChunkSize(), bufferSize(context)); } } @@ -679,10 +677,19 @@ public static SearchableSnapshotDirectory unwrapDirectory(Directory dir) { return null; } - public CachedBlob getCachedBlob(String name, long offset, int length) { - final CachedBlob cachedBlob = blobStoreCacheService.get(repository, name, blobStoreCachePath, offset); - assert cachedBlob == CachedBlob.CACHE_MISS || cachedBlob == CachedBlob.CACHE_NOT_READY || cachedBlob.from() <= offset; - assert cachedBlob == CachedBlob.CACHE_MISS || cachedBlob == CachedBlob.CACHE_NOT_READY || offset + length <= cachedBlob.to(); + public ByteRange getBlobCacheByteRange(String fileName, long fileLength) { + return blobStoreCacheService.computeBlobCacheByteRange(fileName, fileLength, blobStoreCacheMaxLength); + } + + public CachedBlob getCachedBlob(String name, ByteRange range) { + final CachedBlob cachedBlob = blobStoreCacheService.get(repository, name, blobStoreCachePath, range.start()); + if (cachedBlob == CachedBlob.CACHE_MISS || cachedBlob == CachedBlob.CACHE_NOT_READY) { + return cachedBlob; + } else if (cachedBlob.from() != range.start() || cachedBlob.to() != range.end()) { + // expected range in cache might differ with the returned cached blob; this can happen if the range to put in cache is changed + // between versions or through the index setting. In this case we assume it is a cache miss to force the blob to be cached again + return CachedBlob.CACHE_MISS; + } return cachedBlob; } diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInput.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInput.java index 9db0503598dd8..f7eac478a8af0 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInput.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInput.java @@ -56,7 +56,6 @@ public class CachedBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn private static final Logger logger = LogManager.getLogger(CachedBlobContainerIndexInput.class); private static final int COPY_BUFFER_SIZE = ByteSizeUnit.KB.toIntBytes(8); - private final SearchableSnapshotDirectory directory; private final CacheFileReference cacheFileReference; private final int defaultRangeSize; private final int recoveryRangeSize; @@ -84,7 +83,8 @@ public CachedBlobContainerIndexInput( fileInfo.length(), new CacheFileReference(directory, fileInfo.physicalName(), fileInfo.length()), rangeSize, - recoveryRangeSize + recoveryRangeSize, + directory.getBlobCacheByteRange(fileInfo.physicalName(), fileInfo.length()) ); assert getBufferSize() <= BlobStoreCacheService.DEFAULT_CACHED_BLOB_SIZE; // must be able to cache at least one buffer's worth stats.incrementOpenCount(); @@ -100,10 +100,10 @@ private CachedBlobContainerIndexInput( long length, CacheFileReference cacheFileReference, int rangeSize, - int recoveryRangeSize + int recoveryRangeSize, + ByteRange blobCacheByteRange ) { - super(logger, resourceDesc, directory.blobContainer(), fileInfo, context, stats, offset, length); - this.directory = directory; + super(logger, resourceDesc, directory, fileInfo, context, stats, offset, length, blobCacheByteRange); this.cacheFileReference = cacheFileReference; this.lastReadPosition = this.offset; this.lastSeekPosition = this.offset; @@ -157,30 +157,18 @@ protected void doReadInternal(ByteBuffer b) throws IOException { } // Requested data is not on disk, so try the cache index next. - final ByteRange indexCacheMiss; // null if not a miss - // We try to use the cache index if: - // - the file is small enough to be fully cached - final boolean canBeFullyCached = fileInfo.length() <= BlobStoreCacheService.DEFAULT_CACHED_BLOB_SIZE * 2; - // - we're reading the first N bytes of the file - final boolean isStartOfFile = (position + length <= BlobStoreCacheService.DEFAULT_CACHED_BLOB_SIZE); - - if (canBeFullyCached || isStartOfFile) { - final CachedBlob cachedBlob = directory.getCachedBlob(fileInfo.physicalName(), 0L, length); + if (blobCacheByteRange.contains(position, position + length)) { + final CachedBlob cachedBlob = directory.getCachedBlob(fileInfo.physicalName(), blobCacheByteRange); + assert cachedBlob == CachedBlob.CACHE_MISS || cachedBlob == CachedBlob.CACHE_NOT_READY || cachedBlob.from() <= position; + assert cachedBlob == CachedBlob.CACHE_MISS || cachedBlob == CachedBlob.CACHE_NOT_READY || length <= cachedBlob.length(); if (cachedBlob == CachedBlob.CACHE_MISS || cachedBlob == CachedBlob.CACHE_NOT_READY) { // We would have liked to find a cached entry but we did not find anything: the cache on the disk will be requested // so we compute the region of the file we would like to have the next time. The region is expressed as a tuple of // {start, end} where positions are relative to the whole file. - - if (canBeFullyCached) { - // if the index input is smaller than twice the size of the blob cache, it will be fully indexed - indexCacheMiss = ByteRange.of(0L, fileInfo.length()); - } else { - // the index input is too large to fully cache, so just cache the initial range - indexCacheMiss = ByteRange.of(0L, BlobStoreCacheService.DEFAULT_CACHED_BLOB_SIZE); - } + indexCacheMiss = blobCacheByteRange; // We must fill in a cache miss even if CACHE_NOT_READY since the cache index is only created on the first put. // TODO TBD use a different trigger for creating the cache index and avoid a put in the CACHE_NOT_READY case. @@ -192,15 +180,15 @@ protected void doReadInternal(ByteBuffer b) throws IOException { position ); stats.addIndexCacheBytesRead(cachedBlob.length()); - - final BytesRefIterator cachedBytesIterator = cachedBlob.bytes().slice(toIntBytes(position), length).iterator(); - BytesRef bytesRef; - while ((bytesRef = cachedBytesIterator.next()) != null) { - b.put(bytesRef.bytes, bytesRef.offset, bytesRef.length); - } - assert b.position() == length : "copied " + b.position() + " but expected " + length; - try { + final int sliceOffset = toIntBytes(position - cachedBlob.from()); + final BytesRefIterator cachedBytesIterator = cachedBlob.bytes().slice(sliceOffset, length).iterator(); + BytesRef bytesRef; + while ((bytesRef = cachedBytesIterator.next()) != null) { + b.put(bytesRef.bytes, bytesRef.offset, bytesRef.length); + } + assert b.position() == length : "copied " + b.position() + " but expected " + length; + final ByteRange cachedRange = ByteRange.of(cachedBlob.from(), cachedBlob.to()); cacheFile.populateAndRead( cachedRange, @@ -278,7 +266,6 @@ protected void doReadInternal(ByteBuffer b) throws IOException { // - use a BigArrays for allocation // - use an intermediate copy buffer to read the file in sensibly-sized chunks // - release the buffer once the indexing operation is complete - assert indexCacheMissLength <= COPY_BUFFER_SIZE : indexCacheMiss; final ByteBuffer byteBuffer = ByteBuffer.allocate(indexCacheMissLength); Channels.readFromFileChannelWithEofException(channel, indexCacheMiss.start(), byteBuffer); @@ -600,7 +587,8 @@ public IndexInput slice(String sliceDescription, long offset, long length) { length, cacheFileReference, defaultRangeSize, - recoveryRangeSize + recoveryRangeSize, + ByteRange.EMPTY // TODO implement blob cache for slices when it makes sense (like CFs) ); slice.isClone = true; return slice; diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/FrozenIndexInput.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/FrozenIndexInput.java index fe5ca6156a5c5..0d3138534aba1 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/FrozenIndexInput.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/FrozenIndexInput.java @@ -48,7 +48,6 @@ public class FrozenIndexInput extends BaseSearchableSnapshotIndexInput { private static final Logger logger = LogManager.getLogger(FrozenIndexInput.class); private static final int COPY_BUFFER_SIZE = ByteSizeUnit.KB.toIntBytes(8); - private final SearchableSnapshotDirectory directory; private final FrozenCacheFile frozenCacheFile; private final int defaultRangeSize; private final int recoveryRangeSize; @@ -76,7 +75,8 @@ public FrozenIndexInput( fileInfo.length(), directory.getFrozenCacheFile(fileInfo.physicalName(), fileInfo.length()), rangeSize, - recoveryRangeSize + recoveryRangeSize, + directory.getBlobCacheByteRange(fileInfo.physicalName(), fileInfo.length()) ); assert getBufferSize() <= BlobStoreCacheService.DEFAULT_CACHED_BLOB_SIZE; // must be able to cache at least one buffer's worth stats.incrementOpenCount(); @@ -92,10 +92,10 @@ private FrozenIndexInput( long length, FrozenCacheFile frozenCacheFile, int rangeSize, - int recoveryRangeSize + int recoveryRangeSize, + ByteRange blobCacheByteRange ) { - super(logger, resourceDesc, directory.blobContainer(), fileInfo, context, stats, offset, length); - this.directory = directory; + super(logger, resourceDesc, directory, fileInfo, context, stats, offset, length, blobCacheByteRange); this.frozenCacheFile = frozenCacheFile; this.lastReadPosition = this.offset; this.lastSeekPosition = this.offset; @@ -166,30 +166,18 @@ protected void doReadInternal(ByteBuffer b) throws IOException { } // Requested data is not on disk, so try the cache index next. - final ByteRange indexCacheMiss; // null if not a miss - // We try to use the cache index if: - // - the file is small enough to be fully cached - final boolean canBeFullyCached = fileInfo.length() <= BlobStoreCacheService.DEFAULT_CACHED_BLOB_SIZE * 2; - // - we're reading the first N bytes of the file - final boolean isStartOfFile = (position + length <= BlobStoreCacheService.DEFAULT_CACHED_BLOB_SIZE); - - if (canBeFullyCached || isStartOfFile) { - final CachedBlob cachedBlob = directory.getCachedBlob(fileInfo.physicalName(), 0L, length); + if (blobCacheByteRange.contains(position, position + length)) { + final CachedBlob cachedBlob = directory.getCachedBlob(fileInfo.physicalName(), blobCacheByteRange); + assert cachedBlob == CachedBlob.CACHE_MISS || cachedBlob == CachedBlob.CACHE_NOT_READY || cachedBlob.from() <= position; + assert cachedBlob == CachedBlob.CACHE_MISS || cachedBlob == CachedBlob.CACHE_NOT_READY || length <= cachedBlob.length(); if (cachedBlob == CachedBlob.CACHE_MISS || cachedBlob == CachedBlob.CACHE_NOT_READY) { // We would have liked to find a cached entry but we did not find anything: the cache on the disk will be requested // so we compute the region of the file we would like to have the next time. The region is expressed as a tuple of // {start, end} where positions are relative to the whole file. - - if (canBeFullyCached) { - // if the index input is smaller than twice the size of the blob cache, it will be fully indexed - indexCacheMiss = ByteRange.of(0L, fileInfo.length()); - } else { - // the index input is too large to fully cache, so just cache the initial range - indexCacheMiss = ByteRange.of(0L, (long) BlobStoreCacheService.DEFAULT_CACHED_BLOB_SIZE); - } + indexCacheMiss = blobCacheByteRange; // We must fill in a cache miss even if CACHE_NOT_READY since the cache index is only created on the first put. // TODO TBD use a different trigger for creating the cache index and avoid a put in the CACHE_NOT_READY case. @@ -203,17 +191,17 @@ protected void doReadInternal(ByteBuffer b) throws IOException { stats.addIndexCacheBytesRead(cachedBlob.length()); preventAsyncBufferChanges.run(); - - final BytesRefIterator cachedBytesIterator = cachedBlob.bytes().slice(toIntBytes(position), length).iterator(); - int copiedBytes = 0; - BytesRef bytesRef; - while ((bytesRef = cachedBytesIterator.next()) != null) { - b.put(bytesRef.bytes, bytesRef.offset, bytesRef.length); - copiedBytes += bytesRef.length; - } - assert copiedBytes == length : "copied " + copiedBytes + " but expected " + length; - try { + final int sliceOffset = toIntBytes(position - cachedBlob.from()); + final BytesRefIterator cachedBytesIterator = cachedBlob.bytes().slice(sliceOffset, length).iterator(); + int copiedBytes = 0; + BytesRef bytesRef; + while ((bytesRef = cachedBytesIterator.next()) != null) { + b.put(bytesRef.bytes, bytesRef.offset, bytesRef.length); + copiedBytes += bytesRef.length; + } + assert copiedBytes == length : "copied " + copiedBytes + " but expected " + length; + final ByteRange cachedRange = ByteRange.of(cachedBlob.from(), cachedBlob.to()); frozenCacheFile.populateAndRead( cachedRange, @@ -316,12 +304,11 @@ protected void doReadInternal(ByteBuffer b) throws IOException { if (indexCacheMiss != null) { final Releasable onCacheFillComplete = stats.addIndexCacheFill(); final int indexCacheMissLength = toIntBytes(indexCacheMiss.length()); + // We assume that we only cache small portions of blobs so that we do not need to: // - use a BigArrays for allocation // - use an intermediate copy buffer to read the file in sensibly-sized chunks // - release the buffer once the indexing operation is complete - assert indexCacheMissLength <= COPY_BUFFER_SIZE : indexCacheMiss; - final ByteBuffer byteBuffer = ByteBuffer.allocate(indexCacheMissLength); final StepListener readListener = frozenCacheFile.readIfAvailableOrPending( @@ -642,7 +629,8 @@ public IndexInput slice(String sliceDescription, long offset, long length) { length, frozenCacheFile, defaultRangeSize, - recoveryRangeSize + recoveryRangeSize, + ByteRange.EMPTY // TODO implement blob cache for slices when it makes sense (like CFs) ); slice.isClone = true; return slice; diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/direct/DirectBlobContainerIndexInput.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/direct/DirectBlobContainerIndexInput.java index b083aa7304e7a..d732c6f036bf0 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/direct/DirectBlobContainerIndexInput.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/direct/DirectBlobContainerIndexInput.java @@ -18,6 +18,8 @@ import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo; import org.elasticsearch.index.store.BaseSearchableSnapshotIndexInput; import org.elasticsearch.index.store.IndexInputStats; +import org.elasticsearch.index.store.SearchableSnapshotDirectory; +import org.elasticsearch.xpack.searchablesnapshots.cache.ByteRange; import java.io.Closeable; import java.io.EOFException; @@ -67,7 +69,7 @@ public class DirectBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn private static final int COPY_BUFFER_SIZE = 8192; public DirectBlobContainerIndexInput( - BlobContainer blobContainer, + SearchableSnapshotDirectory directory, FileInfo fileInfo, IOContext context, IndexInputStats stats, @@ -76,7 +78,7 @@ public DirectBlobContainerIndexInput( ) { this( "DirectBlobContainerIndexInput(" + fileInfo.physicalName() + ")", - blobContainer, + directory, fileInfo, context, stats, @@ -91,7 +93,7 @@ public DirectBlobContainerIndexInput( private DirectBlobContainerIndexInput( String resourceDesc, - BlobContainer blobContainer, + SearchableSnapshotDirectory directory, FileInfo fileInfo, IOContext context, IndexInputStats stats, @@ -101,7 +103,7 @@ private DirectBlobContainerIndexInput( long sequentialReadSize, int bufferSize ) { - super(logger, resourceDesc, blobContainer, fileInfo, context, stats, offset, length); + super(logger, resourceDesc, directory, fileInfo, context, stats, offset, length, ByteRange.EMPTY); // TODO should use blob cache this.position = position; assert sequentialReadSize >= 0; this.sequentialReadSize = sequentialReadSize; @@ -271,7 +273,7 @@ protected void seekInternal(long pos) throws IOException { public DirectBlobContainerIndexInput clone() { final DirectBlobContainerIndexInput clone = new DirectBlobContainerIndexInput( "clone(" + this + ")", - blobContainer, + directory, fileInfo, context, stats, @@ -292,7 +294,7 @@ public IndexInput slice(String sliceDescription, long offset, long length) throw if ((offset >= 0L) && (length >= 0L) && (offset + length <= length())) { final DirectBlobContainerIndexInput slice = new DirectBlobContainerIndexInput( getFullSliceDescription(sliceDescription), - blobContainer, + directory, fileInfo, context, stats, diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java index c7669020e9b04..b0d548c5e39f6 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java @@ -6,6 +6,7 @@ */ package org.elasticsearch.xpack.searchablesnapshots; +import org.apache.lucene.store.BufferedIndexInput; import org.apache.lucene.util.SetOnce; import org.elasticsearch.Version; import org.elasticsearch.action.ActionRequest; @@ -92,6 +93,7 @@ import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Optional; import java.util.function.Function; @@ -185,6 +187,31 @@ public class SearchableSnapshots extends Plugin implements IndexStorePlugin, Eng Setting.Property.PrivateIndex, Setting.Property.NotCopyableOnResize ); + public static final String SNAPSHOT_BLOB_CACHE_METADATA_FILES_MAX_LENGTH = "index.store.snapshot.blob_cache.metadata_files.max_length"; + public static final Setting SNAPSHOT_BLOB_CACHE_METADATA_FILES_MAX_LENGTH_SETTING = new Setting<>( + new Setting.SimpleKey(SNAPSHOT_BLOB_CACHE_METADATA_FILES_MAX_LENGTH), + s -> new ByteSizeValue(64L, ByteSizeUnit.KB).getStringRep(), + s -> Setting.parseByteSize( + s, + new ByteSizeValue(1L, ByteSizeUnit.KB), + new ByteSizeValue(Long.MAX_VALUE), + SNAPSHOT_BLOB_CACHE_METADATA_FILES_MAX_LENGTH + ), + value -> { + if (value.getBytes() % BufferedIndexInput.BUFFER_SIZE != 0L) { + final String message = String.format( + Locale.ROOT, + "failed to parse value [%s] for setting [%s], must be a multiple of [%s] bytes", + value.getStringRep(), + SNAPSHOT_BLOB_CACHE_METADATA_FILES_MAX_LENGTH, + BufferedIndexInput.BUFFER_SIZE + ); + throw new IllegalArgumentException(message); + } + }, + Setting.Property.IndexScope, + Setting.Property.NotCopyableOnResize + ); /** * Prefer to allocate to the cold tier, then the frozen tier, then the warm tier, then the hot tier @@ -262,6 +289,7 @@ public List> getSettings() { SNAPSHOT_CACHE_EXCLUDED_FILE_TYPES_SETTING, SNAPSHOT_UNCACHED_CHUNK_SIZE_SETTING, SNAPSHOT_PARTIAL_SETTING, + SNAPSHOT_BLOB_CACHE_METADATA_FILES_MAX_LENGTH_SETTING, CacheService.SNAPSHOT_CACHE_SIZE_SETTING, CacheService.SNAPSHOT_CACHE_RANGE_SIZE_SETTING, CacheService.SNAPSHOT_CACHE_RECOVERY_RANGE_SIZE_SETTING, @@ -303,7 +331,12 @@ public Collection createComponents( final FrozenCacheService frozenCacheService = new FrozenCacheService(environment, threadPool); this.frozenCacheService.set(frozenCacheService); components.add(cacheService); - final BlobStoreCacheService blobStoreCacheService = new BlobStoreCacheService(threadPool, client, SNAPSHOT_BLOB_CACHE_INDEX); + final BlobStoreCacheService blobStoreCacheService = new BlobStoreCacheService( + clusterService, + threadPool, + client, + SNAPSHOT_BLOB_CACHE_INDEX + ); this.blobStoreCacheService.set(blobStoreCacheService); components.add(blobStoreCacheService); } else { diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/ByteRange.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/ByteRange.java index 96e1725b3a0c6..165634f5e0978 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/ByteRange.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/ByteRange.java @@ -65,6 +65,10 @@ public boolean isSubRangeOf(ByteRange range) { return start >= range.start() && end <= range.end(); } + public boolean contains(long start, long end) { + return start() <= start && end <= end(); + } + @Override public int hashCode() { return 31 * Long.hashCode(start) + Long.hashCode(end); @@ -84,7 +88,7 @@ public boolean equals(Object obj) { @Override public String toString() { - return "ByteRange{" + start + "}{" + end + "}"; + return "ByteRange [" + start + "-" + end + ']'; } @Override diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryStatsTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryStatsTests.java index 256f4cd13660e..ddf0da3a29afc 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryStatsTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryStatsTests.java @@ -78,7 +78,7 @@ public void testOpenCount() throws Exception { IndexInputStats inputStats = directory.getStats(fileName); assertThat(inputStats, (i == 0L) ? nullValue() : notNullValue()); - final IndexInput input = directory.openInput(fileName, newIOContext(random())); + final IndexInput input = directory.openInput(fileName, randomIOContext()); inputStats = directory.getStats(fileName); assertThat(inputStats.getOpened().longValue(), equalTo(i + 1L)); input.close(); @@ -93,7 +93,7 @@ public void testCloseCount() throws Exception { executeTestCase((fileName, fileContent, directory) -> { try { for (long i = 0L; i < randomLongBetween(1L, 20L); i++) { - final IndexInput input = directory.openInput(fileName, newIOContext(random())); + final IndexInput input = directory.openInput(fileName, randomIOContext()); IndexInputStats inputStats = directory.getStats(fileName); assertThat(inputStats, notNullValue()); @@ -114,7 +114,7 @@ public void testCachedBytesReadsAndWrites() throws Exception { final ByteSizeValue cacheSize = new ByteSizeValue(10, ByteSizeUnit.MB); executeTestCaseWithCache(cacheSize, rangeSize, (fileName, fileContent, directory) -> { - try (IndexInput input = directory.openInput(fileName, newIOContext(random()))) { + try (IndexInput input = directory.openInput(fileName, randomIOContext())) { final long length = input.length(); final IndexInputStats inputStats = directory.getStats(fileName); @@ -166,7 +166,7 @@ public void testCachedBytesReadsAndWrites() throws Exception { public void testCachedBytesReadsAndWritesNoCache() throws Exception { final ByteSizeValue uncachedChunkSize = new ByteSizeValue(randomIntBetween(512, MAX_FILE_LENGTH), ByteSizeUnit.BYTES); executeTestCaseWithoutCache(uncachedChunkSize, (fileName, fileContent, directory) -> { - try (IndexInput input = directory.openInput(fileName, newIOContext(random()))) { + try (IndexInput input = directory.openInput(fileName, randomIOContext())) { final long length = input.length(); final IndexInputStats inputStats = directory.getStats(fileName); @@ -192,7 +192,7 @@ public void testDirectBytesReadsWithCache() throws Exception { executeTestCaseWithCache(ByteSizeValue.ZERO, randomCacheRangeSize(), (fileName, fileContent, directory) -> { assertThat(directory.getStats(fileName), nullValue()); - final IOContext ioContext = newIOContext(random()); + final IOContext ioContext = randomIOContext(); try { IndexInput input = directory.openInput(fileName, ioContext); if (randomBoolean()) { @@ -245,7 +245,7 @@ public void testDirectBytesReadsWithoutCache() throws Exception { executeTestCaseWithoutCache(uncachedChunkSize, (fileName, fileContent, directory) -> { assertThat(directory.getStats(fileName), nullValue()); - final IOContext ioContext = newIOContext(random()); + final IOContext ioContext = randomIOContext(); try (IndexInput original = directory.openInput(fileName, ioContext)) { final IndexInput input = original.clone(); // always clone to only execute direct reads final IndexInputStats inputStats = directory.getStats(fileName); @@ -280,7 +280,7 @@ public void testOptimizedBytesReads() throws Exception { // use a large uncached chunk size that allows to read the file in a single operation final ByteSizeValue uncachedChunkSize = new ByteSizeValue(1, ByteSizeUnit.GB); executeTestCaseWithoutCache(uncachedChunkSize, (fileName, fileContent, directory) -> { - final IOContext context = newIOContext(random()); + final IOContext context = randomIOContext(); try (IndexInput input = directory.openInput(fileName, context)) { final IndexInputStats inputStats = directory.getStats(fileName); assertThat(inputStats, notNullValue()); @@ -315,7 +315,7 @@ public void testOptimizedBytesReads() throws Exception { public void testReadBytesContiguously() throws Exception { executeTestCaseWithDefaultCache((fileName, fileContent, cacheDirectory) -> { - final IOContext ioContext = newIOContext(random()); + final IOContext ioContext = randomIOContext(); try (IndexInput input = cacheDirectory.openInput(fileName, ioContext)) { final IndexInputStats inputStats = cacheDirectory.getStats(fileName); @@ -366,7 +366,7 @@ public void testReadBytesContiguously() throws Exception { public void testReadBytesNonContiguously() throws Exception { executeTestCaseWithDefaultCache((fileName, fileContent, cacheDirectory) -> { - final IOContext ioContext = newIOContext(random()); + final IOContext ioContext = randomIOContext(); try (IndexInput input = cacheDirectory.openInput(fileName, ioContext)) { final IndexInputStats inputStats = cacheDirectory.getStats(fileName); @@ -417,7 +417,7 @@ public void testReadBytesNonContiguously() throws Exception { public void testForwardSeeks() throws Exception { executeTestCaseWithDefaultCache((fileName, fileContent, cacheDirectory) -> { - final IOContext ioContext = newIOContext(random()); + final IOContext ioContext = randomIOContext(); try (IndexInput indexInput = cacheDirectory.openInput(fileName, ioContext)) { IndexInput input = indexInput; if (randomBoolean()) { @@ -475,7 +475,7 @@ public void testForwardSeeks() throws Exception { public void testBackwardSeeks() throws Exception { executeTestCaseWithDefaultCache((fileName, fileContent, cacheDirectory) -> { - final IOContext ioContext = newIOContext(random()); + final IOContext ioContext = randomIOContext(); try (IndexInput indexInput = cacheDirectory.openInput(fileName, ioContext)) { IndexInput input = indexInput; if (randomBoolean()) { @@ -605,8 +605,7 @@ private void executeTestCase( ) throws Exception { final byte[] fileContent = randomByteArrayOfLength(randomIntBetween(10, MAX_FILE_LENGTH)); - final String fileExtension = randomAlphaOfLength(3); - final String fileName = randomAlphaOfLength(10) + '.' + fileExtension; + final String fileName = randomAlphaOfLength(10) + randomFileExtension(); final SnapshotId snapshotId = new SnapshotId("_name", "_uuid"); final IndexId indexId = new IndexId("_name", "_uuid"); final ShardId shardId = new ShardId("_name", "_uuid", 0); diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryTests.java index 3235eaab87537..ba8983491b847 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryTests.java @@ -632,7 +632,7 @@ protected void assertSnapshotOrGenericThread() { private void testIndexInputs(final CheckedBiConsumer consumer) throws Exception { testDirectories((directory, snapshotDirectory) -> { for (String fileName : randomSubsetOf(Arrays.asList(snapshotDirectory.listAll()))) { - final IOContext context = newIOContext(random()); + final IOContext context = randomIOContext(); try (IndexInput indexInput = directory.openInput(fileName, context)) { final List closeables = new ArrayList<>(); try { @@ -659,8 +659,7 @@ public void testClearCache() throws Exception { final Path shardSnapshotDir = createTempDir(); for (int i = 0; i < nbRandomFiles; i++) { - final String fileName = "file_" + randomAlphaOfLength(10); - + final String fileName = randomAlphaOfLength(5) + randomFileExtension(); final Tuple bytes = randomChecksumBytes(randomIntBetween(1, 100_000)); final byte[] input = bytes.v2(); final String checksum = bytes.v1(); @@ -726,7 +725,7 @@ public void testClearCache() throws Exception { final BlobStoreIndexShardSnapshot.FileInfo fileInfo = randomFrom(randomFiles); final int fileLength = toIntBytes(fileInfo.length()); - try (IndexInput input = directory.openInput(fileInfo.physicalName(), newIOContext(random()))) { + try (IndexInput input = directory.openInput(fileInfo.physicalName(), randomIOContext())) { assertThat(input.length(), equalTo((long) fileLength)); final int start = between(0, fileLength - 1); final int end = between(start + 1, fileLength); diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInputTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInputTests.java index d4d5621d646ed..3f342c5fd9dc3 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInputTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInputTests.java @@ -63,7 +63,7 @@ public void testRandomReads() throws Exception { ShardId shardId = new ShardId("_name", "_uuid", 0); for (int i = 0; i < 5; i++) { - final String fileName = randomAlphaOfLength(10); + final String fileName = randomAlphaOfLength(5) + randomFileExtension(); final Tuple bytes = randomChecksumBytes(randomIntBetween(1, 100_000)); final byte[] input = bytes.v2(); @@ -129,7 +129,7 @@ public void testRandomReads() throws Exception { assertThat("Snapshot should be loaded", directory.snapshot(), notNullValue()); assertThat("BlobContainer should be loaded", directory.blobContainer(), notNullValue()); - try (IndexInput indexInput = directory.openInput(fileName, newIOContext(random()))) { + try (IndexInput indexInput = directory.openInput(fileName, randomIOContext())) { assertThat(indexInput, instanceOf(CachedBlobContainerIndexInput.class)); assertEquals(input.length, indexInput.length()); assertEquals(0, indexInput.getFilePointer()); @@ -178,7 +178,7 @@ public void testThrowsEOFException() throws Exception { IndexId indexId = new IndexId("_name", "_uuid"); ShardId shardId = new ShardId("_name", "_uuid", 0); - final String fileName = randomAlphaOfLength(10); + final String fileName = randomAlphaOfLength(5) + randomFileExtension(); final Tuple bytes = randomChecksumBytes(randomIntBetween(1, 1000)); final byte[] input = bytes.v2(); @@ -231,7 +231,7 @@ public void testThrowsEOFException() throws Exception { assertThat("Snapshot should be loaded", searchableSnapshotDirectory.snapshot(), notNullValue()); assertThat("BlobContainer should be loaded", searchableSnapshotDirectory.blobContainer(), notNullValue()); - try (IndexInput indexInput = searchableSnapshotDirectory.openInput(fileName, newIOContext(random()))) { + try (IndexInput indexInput = searchableSnapshotDirectory.openInput(fileName, randomIOContext())) { assertThat(indexInput, instanceOf(CachedBlobContainerIndexInput.class)); final byte[] buffer = new byte[input.length + 1]; final IOException exception = expectThrows(IOException.class, () -> indexInput.readBytes(buffer, 0, buffer.length)); diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/FrozenIndexInputTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/FrozenIndexInputTests.java index 9976857b3767f..9f349d934edef 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/FrozenIndexInputTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/FrozenIndexInputTests.java @@ -34,7 +34,6 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.List; -import java.util.Locale; import static org.hamcrest.Matchers.instanceOf; import static org.mockito.Mockito.mock; @@ -44,7 +43,7 @@ public class FrozenIndexInputTests extends AbstractSearchableSnapshotsTestCase { private static final ShardId SHARD_ID = new ShardId(new Index("_index_name", "_index_id"), 0); public void testRandomReads() throws IOException { - final String fileName = randomAlphaOfLengthBetween(5, 10).toLowerCase(Locale.ROOT); + final String fileName = randomAlphaOfLength(5) + randomFileExtension(); final Tuple bytes = randomChecksumBytes(randomIntBetween(1, 100_000)); final byte[] fileData = bytes.v2(); @@ -104,7 +103,7 @@ public void testRandomReads() throws IOException { directory.loadSnapshot(createRecoveryState(true), ActionListener.wrap(() -> {})); // TODO does not test using the recovery range size - final IndexInput indexInput = directory.openInput(fileName, newIOContext(random())); + final IndexInput indexInput = directory.openInput(fileName, randomIOContext()); assertThat(indexInput, instanceOf(FrozenIndexInput.class)); assertEquals(fileData.length, indexInput.length()); assertEquals(0, indexInput.getFilePointer()); diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/TestUtils.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/TestUtils.java index 4b4be80cfbb20..00850269f2ce8 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/TestUtils.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/TestUtils.java @@ -306,7 +306,12 @@ private UnsupportedOperationException unsupportedException() { public static class NoopBlobStoreCacheService extends BlobStoreCacheService { public NoopBlobStoreCacheService() { - super(null, mock(Client.class), null); + super(null, null, mock(Client.class), null); + } + + @Override + protected boolean useLegacyCachedBlobSizes() { + return false; } @Override @@ -332,7 +337,12 @@ public static class SimpleBlobStoreCacheService extends BlobStoreCacheService { private final ConcurrentHashMap blobs = new ConcurrentHashMap<>(); public SimpleBlobStoreCacheService() { - super(null, mock(Client.class), null); + super(null, null, mock(Client.class), null); + } + + @Override + protected boolean useLegacyCachedBlobSizes() { + return false; } @Override diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/direct/DirectBlobContainerIndexInputTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/direct/DirectBlobContainerIndexInputTests.java index e165d44bcb70d..7820ddf499ee8 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/direct/DirectBlobContainerIndexInputTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/direct/DirectBlobContainerIndexInputTests.java @@ -8,6 +8,7 @@ import org.apache.lucene.store.BufferedIndexInput; import org.apache.lucene.util.Version; +import org.elasticsearch.blobstore.cache.CachedBlob; import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.lucene.store.ESIndexInputTestCase; @@ -15,7 +16,9 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo; import org.elasticsearch.index.store.IndexInputStats; +import org.elasticsearch.index.store.SearchableSnapshotDirectory; import org.elasticsearch.index.store.StoreFileMetadata; +import org.elasticsearch.xpack.searchablesnapshots.cache.ByteRange; import java.io.ByteArrayInputStream; import java.io.EOFException; @@ -25,6 +28,8 @@ import java.util.concurrent.atomic.AtomicInteger; import static org.elasticsearch.xpack.searchablesnapshots.AbstractSearchableSnapshotsTestCase.randomChecksumBytes; +import static org.elasticsearch.xpack.searchablesnapshots.AbstractSearchableSnapshotsTestCase.randomFileExtension; +import static org.elasticsearch.xpack.searchablesnapshots.AbstractSearchableSnapshotsTestCase.randomIOContext; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsUtils.toIntBytes; import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.containsString; @@ -33,6 +38,7 @@ import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.startsWith; +import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.anyString; @@ -59,9 +65,10 @@ private DirectBlobContainerIndexInput createIndexInput( String checksum, Runnable onReadBlob ) throws IOException { + final String fileName = randomAlphaOfLength(5) + randomFileExtension(); final FileInfo fileInfo = new FileInfo( randomAlphaOfLength(5), - new StoreFileMetadata("test", input.length, checksum, Version.LATEST), + new StoreFileMetadata(fileName, input.length, checksum, Version.LATEST), partSize == input.length ? randomFrom( new ByteSizeValue(partSize, ByteSizeUnit.BYTES), @@ -116,10 +123,15 @@ public int read(byte[] b, int off, int len) throws IOException { }; } }); + + final SearchableSnapshotDirectory directory = mock(SearchableSnapshotDirectory.class); + when(directory.getCachedBlob(anyString(), any(ByteRange.class))).thenReturn(CachedBlob.CACHE_NOT_READY); + when(directory.blobContainer()).thenReturn(blobContainer); + final DirectBlobContainerIndexInput indexInput = new DirectBlobContainerIndexInput( - blobContainer, + directory, fileInfo, - newIOContext(random()), + randomIOContext(), new IndexInputStats(1, 0L, () -> 0L), minimumReadSize, randomBoolean() ? BufferedIndexInput.BUFFER_SIZE : between(BufferedIndexInput.MIN_BUFFER_SIZE, BufferedIndexInput.BUFFER_SIZE) diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/AbstractSearchableSnapshotsTestCase.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/AbstractSearchableSnapshotsTestCase.java index 4cd8537a78d64..b5b498bf7455a 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/AbstractSearchableSnapshotsTestCase.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/AbstractSearchableSnapshotsTestCase.java @@ -11,6 +11,7 @@ import org.apache.lucene.store.ByteBuffersDataOutput; import org.apache.lucene.store.ByteBuffersIndexInput; import org.apache.lucene.store.ByteBuffersIndexOutput; +import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.IndexOutput; import org.elasticsearch.Version; @@ -333,4 +334,47 @@ public static Tuple randomChecksumBytes(byte[] bytes) throws IOE } return Tuple.tuple(checksum, out.toArrayCopy()); } + + public static String randomFileExtension() { + return randomFrom( + ".cfe", + ".cfs", + ".dii", + ".dim", + ".doc", + ".dvd", + ".dvm", + ".fdt", + ".fdx", + ".fdm", + ".fnm", + ".kdd", + ".kdi", + ".kdm", + ".liv", + ".nvd", + ".nvm", + ".pay", + ".pos", + ".tim", + ".tip", + ".tmd", + ".tvd", + ".tvx", + ".vec", + ".vem" + ); + } + + /** + * @return a random {@link IOContext} that corresponds to a default, read or read_once usage. + * + * It's important that the context returned by this method is not a "merge" once as {@link org.apache.lucene.store.BufferedIndexInput} + * uses a different buffer size for them. + */ + public static IOContext randomIOContext() { + final IOContext ioContext = randomFrom(IOContext.DEFAULT, IOContext.READ, IOContext.READONCE); + assert ioContext.context != IOContext.Context.MERGE; + return ioContext; + } } diff --git a/x-pack/qa/rolling-upgrade/build.gradle b/x-pack/qa/rolling-upgrade/build.gradle index 32aa9bef68ede..08c5ee0327b3a 100644 --- a/x-pack/qa/rolling-upgrade/build.gradle +++ b/x-pack/qa/rolling-upgrade/build.gradle @@ -33,6 +33,7 @@ tasks.register("copyTestNodeKeyMaterial", Copy) { for (Version bwcVersion : BuildParams.bwcVersions.wireCompatible) { String baseName = "v${bwcVersion}" + String repositoryPath = "${buildDir}/cluster/shared/repo/${baseName}" testClusters { "${baseName}" { @@ -41,7 +42,7 @@ for (Version bwcVersion : BuildParams.bwcVersions.wireCompatible) { numberOfNodes = 3 setting 'repositories.url.allowed_urls', 'http://snapshot.test*' - setting 'path.repo', "${buildDir}/cluster/shared/repo/${baseName}" + setting 'path.repo', repositoryPath setting 'xpack.license.self_generated.type', 'trial' setting 'xpack.security.enabled', 'true' setting 'xpack.security.transport.ssl.enabled', 'true' @@ -93,6 +94,7 @@ for (Version bwcVersion : BuildParams.bwcVersions.wireCompatible) { dependsOn "copyTestNodeKeyMaterial" systemProperty 'tests.rest.suite', 'old_cluster' systemProperty 'tests.upgrade_from_version', version.toString().replace('-SNAPSHOT', '') + systemProperty 'tests.path.repo', repositoryPath nonInputProperties.systemProperty('tests.rest.cluster', "${-> testClusters."${baseName}".allHttpSocketURI.join(",")}") nonInputProperties.systemProperty('tests.clustername', "${-> testClusters."${baseName}".getName()}") } @@ -109,6 +111,7 @@ for (Version bwcVersion : BuildParams.bwcVersions.wireCompatible) { systemProperty 'tests.rest.suite', 'mixed_cluster' systemProperty 'tests.first_round', 'true' systemProperty 'tests.upgrade_from_version', oldVersion + systemProperty 'tests.path.repo', repositoryPath // We only need to run these tests once so we may as well do it when we're two thirds upgraded systemProperty 'tests.rest.blacklist', [ 'mixed_cluster/10_basic/Start scroll in mixed cluster on upgraded node that we will continue after upgrade', @@ -134,6 +137,7 @@ for (Version bwcVersion : BuildParams.bwcVersions.wireCompatible) { systemProperty 'tests.rest.suite', 'mixed_cluster' systemProperty 'tests.first_round', 'false' systemProperty 'tests.upgrade_from_version', oldVersion + systemProperty 'tests.path.repo', repositoryPath } tasks.register("${baseName}#upgradedClusterTest", StandaloneRestIntegTestTask) { @@ -146,6 +150,7 @@ for (Version bwcVersion : BuildParams.bwcVersions.wireCompatible) { nonInputProperties.systemProperty('tests.clustername', "${-> testClusters."${baseName}".getName()}") systemProperty 'tests.rest.suite', 'upgraded_cluster' systemProperty 'tests.upgrade_from_version', oldVersion + systemProperty 'tests.path.repo', repositoryPath } tasks.register(bwcTaskName(bwcVersion)) { diff --git a/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/AbstractUpgradeTestCase.java b/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/AbstractUpgradeTestCase.java index e40d9a549df76..75e303c1cb61c 100644 --- a/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/AbstractUpgradeTestCase.java +++ b/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/AbstractUpgradeTestCase.java @@ -41,6 +41,11 @@ protected boolean preserveReposUponCompletion() { return true; } + @Override + protected boolean preserveSnapshotsUponCompletion() { + return true; + } + @Override protected boolean preserveTemplatesUponCompletion() { return true; diff --git a/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/SearchableSnapshotsRollingUpgradeIT.java b/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/SearchableSnapshotsRollingUpgradeIT.java new file mode 100644 index 0000000000000..bf5dfefc98319 --- /dev/null +++ b/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/SearchableSnapshotsRollingUpgradeIT.java @@ -0,0 +1,312 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.upgrades; + +import org.apache.http.client.methods.HttpDelete; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.client.methods.HttpPut; +import org.elasticsearch.Version; +import org.elasticsearch.client.Request; +import org.elasticsearch.client.Response; +import org.elasticsearch.client.searchable_snapshots.MountSnapshotRequest.Storage; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.support.XContentMapValues; +import org.elasticsearch.repositories.fs.FsRepository; +import org.elasticsearch.rest.RestStatus; +import org.hamcrest.Matcher; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Locale; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.elasticsearch.common.xcontent.support.XContentMapValues.extractValue; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.notNullValue; + +public class SearchableSnapshotsRollingUpgradeIT extends AbstractUpgradeTestCase { + + public void testMountFullCopyAndRecoversCorrectly() throws Exception { + final Storage storage = Storage.FULL_COPY; + assumeVersion(Version.V_7_10_0, storage); + + executeMountAndRecoversCorrectlyTestCase(storage, 6789L); + } + + public void testMountPartialCopyAndRecoversCorrectly() throws Exception { + final Storage storage = Storage.SHARED_CACHE; + assumeVersion(Version.V_7_12_0, Storage.SHARED_CACHE); + + executeMountAndRecoversCorrectlyTestCase(storage, 5678L); + } + + /** + * Test that a snapshot mounted as a searchable snapshot index in the previous version recovers correctly during rolling upgrade + */ + private void executeMountAndRecoversCorrectlyTestCase(Storage storage, long numberOfDocs) throws Exception { + final String suffix = storage.storageName().toLowerCase(Locale.ROOT); + final String index = "mounted_index_" + suffix; + + if (CLUSTER_TYPE.equals(ClusterType.OLD)) { + final String repository = "repository_" + suffix; + final String snapshot = "snapshot_" + suffix; + + registerRepository(repository, FsRepository.TYPE, true, + Settings.builder().put("location", System.getProperty("tests.path.repo") + '/' + repository).build()); + + final String originalIndex = "logs_" + suffix; + createIndex(originalIndex, Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, randomIntBetween(3, 5)) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .build()); + indexDocs(originalIndex, numberOfDocs); + createSnapshot(repository, snapshot, originalIndex); + deleteIndex(originalIndex); + + logger.info("mounting snapshot [repository={}, snapshot={}, index={}] as index [{}] with storage [{}] on version [{}]", + repository, snapshot, originalIndex, index, storage, UPGRADE_FROM_VERSION); + mountSnapshot(repository, snapshot, originalIndex, index, storage, Settings.EMPTY); + } + + ensureGreen(index); + assertHitCount(index, equalTo(numberOfDocs)); + } + + public void testBlobStoreCacheWithFullCopyInMixedVersions() throws Exception { + final Storage storage = Storage.FULL_COPY; + assumeVersion(Version.V_7_10_0, storage); + + executeBlobCacheCreationTestCase(storage, 9876L); + } + + public void testBlobStoreCacheWithPartialCopyInMixedVersions() throws Exception { + final Storage storage = Storage.SHARED_CACHE; + assumeVersion(Version.V_7_12_0, Storage.SHARED_CACHE); + + executeBlobCacheCreationTestCase(storage, 8765L); + } + + /** + * Test the behavior of the blob store cache in mixed versions cluster. The idea is to mount a new snapshot as an index on a node with + * version X so that this node generates cached blobs documents in the blob cache system index, and then mount the snapshot again on + * a different node with version Y so that this other node is likely to use the previously generated cached blobs documents. + */ + private void executeBlobCacheCreationTestCase(Storage storage, long numberOfDocs) throws Exception { + final String suffix = "blob_cache_" + storage.storageName().toLowerCase(Locale.ROOT); + final String repository = "repository_" + suffix; + + if (CLUSTER_TYPE.equals(ClusterType.OLD)) { + registerRepository(repository, FsRepository.TYPE, true, + Settings.builder().put("location", System.getProperty("tests.path.repo") + '/' + repository).build()); + + } else if (CLUSTER_TYPE.equals(ClusterType.MIXED)) { + final int numberOfNodes = 3; + waitForNodes(numberOfNodes); + + final Map nodesIdsAndVersions = nodesVersions(); + assertThat("Cluster should have 3 nodes", nodesIdsAndVersions.size(), equalTo(numberOfNodes)); + + final Version minVersion = nodesIdsAndVersions.values().stream().min(Version::compareTo).get(); + final Version maxVersion = nodesIdsAndVersions.values().stream().max(Version::compareTo).get(); + + final String nodeIdWithMinVersion = randomFrom(nodesIdsAndVersions.entrySet().stream() + .filter(node -> minVersion.equals(node.getValue())).map(Map.Entry::getKey) + .collect(Collectors.toSet())); + + final String nodeIdWithMaxVersion = randomValueOtherThan(nodeIdWithMinVersion, + () -> randomFrom(nodesIdsAndVersions.entrySet().stream() + .filter(node -> maxVersion.equals(node.getValue())).map(Map.Entry::getKey) + .collect(Collectors.toSet()))); + + // The snapshot is mounted on the node with the min. version in order to force the node to populate the blob store cache index. + // Then the snapshot is mounted again on a different node with a higher version in order to verify that the docs in the cache + // index can be used. + + final String firstIndex = "first_index_" + suffix; + createIndex(firstIndex, Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, randomIntBetween(3, 5)) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .build()); + indexDocs(firstIndex, numberOfDocs); + + final String firstSnapshot = "first_snapshot_" + suffix; + createSnapshot(repository, firstSnapshot, firstIndex); + deleteIndex(firstIndex); + + String index = "first_mount_" + suffix; + logger.info("mounting snapshot as index [{}] with storage [{}] on node [{}] with min. version [{}]", + index, storage, nodeIdWithMinVersion, minVersion); + mountSnapshot(repository, firstSnapshot, firstIndex, index, storage, + Settings.builder() + // we want a specific node version to create docs in the blob cache index + .put("index.routing.allocation.include._id", nodeIdWithMinVersion) + // prevent interferences with blob cache when full_copy is used + .put("index.store.snapshot.cache.prewarm.enabled", false) + .build()); + ensureGreen(index); + assertHitCount(index, equalTo(numberOfDocs)); + deleteIndex(index); + + index = "second_mount_" + suffix; + logger.info("mounting the same snapshot of index [{}] with storage [{}], this time on node [{}] with higher version [{}]", + index, storage, nodeIdWithMaxVersion, maxVersion); + mountSnapshot(repository, firstSnapshot, firstIndex, index, storage, + Settings.builder() + // we want a specific node version to use the cached blobs created by the nodeIdWithMinVersion + .put("index.routing.allocation.include._id", nodeIdWithMaxVersion) + .put("index.routing.allocation.exclude._id", nodeIdWithMinVersion) + // prevent interferences with blob cache when full_copy is used + .put("index.store.snapshot.cache.prewarm.enabled", false) + .build()); + ensureGreen(index); + assertHitCount(index, equalTo(numberOfDocs)); + deleteIndex(index); + + deleteSnapshot(repository, firstSnapshot); + + // Now the same thing but this time the docs in blob cache index are created from the upgraded version and mounted in a second + // time on the node with the minimum version. + + final String secondIndex = "second_index_" + suffix; + createIndex(secondIndex, Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, randomIntBetween(3, 5)) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .build()); + indexDocs(secondIndex, numberOfDocs * 2L); + + final String secondSnapshot = "second_snapshot_" + suffix; + createSnapshot(repository, secondSnapshot, secondIndex); + deleteIndex(secondIndex); + + index = "first_mount_" + suffix; + logger.info("mounting snapshot as index [{}] with storage [{}] on node [{}] with max. version [{}]", + index, storage, nodeIdWithMaxVersion, maxVersion); + mountSnapshot(repository, secondSnapshot, secondIndex, index, storage, + Settings.builder() + // we want a specific node version to create docs in the blob cache index + .put("index.routing.allocation.include._id", nodeIdWithMaxVersion) + // prevent interferences with blob cache when full_copy is used + .put("index.store.snapshot.cache.prewarm.enabled", false) + .build()); + ensureGreen(index); + assertHitCount(index, equalTo(numberOfDocs * 2L)); + deleteIndex(index); + + index = "second_mount_" + suffix; + logger.info("mounting the same snapshot of index [{}] with storage [{}], this time on node [{}] with lower version [{}]", + index, storage, nodeIdWithMinVersion, minVersion); + mountSnapshot(repository, secondSnapshot, secondIndex, index, storage, + Settings.builder() + // we want a specific node version to use the cached blobs created by the nodeIdWithMinVersion + .put("index.routing.allocation.include._id", nodeIdWithMinVersion) + .put("index.routing.allocation.exclude._id", nodeIdWithMaxVersion) + // prevent interferences with blob cache when full_copy is used + .put("index.store.snapshot.cache.prewarm.enabled", false) + .build()); + ensureGreen(index); + assertHitCount(index, equalTo(numberOfDocs * 2L)); + deleteIndex(index); + + deleteSnapshot(repository, secondSnapshot); + + } else if (CLUSTER_TYPE.equals(ClusterType.UPGRADED)) { + deleteRepository(repository); + } + } + + private static void assumeVersion(Version minSupportedVersion, Storage storageType) { + assumeTrue("Searchable snapshots with storage type [" + storageType + "] is supported since version [" + minSupportedVersion + ']', + UPGRADE_FROM_VERSION.onOrAfter(minSupportedVersion)); + } + + private static void indexDocs(String indexName, long numberOfDocs) throws IOException { + final StringBuilder builder = new StringBuilder(); + for (long i = 0L; i < numberOfDocs; i++) { + builder.append("{\"create\":{\"_index\":\"").append(indexName).append("\"}}\n"); + builder.append("{\"value\":").append(i).append("}\n"); + } + final Request bulk = new Request(HttpPost.METHOD_NAME, "/_bulk"); + bulk.addParameter("refresh", "true"); + bulk.setJsonEntity(builder.toString()); + final Response response = client().performRequest(bulk); + assertThat(response.getStatusLine().getStatusCode(), equalTo(RestStatus.OK.getStatus())); + assertFalse((Boolean) XContentMapValues.extractValue("errors", responseAsMap(response))); + } + + private static void createSnapshot(String repositoryName, String snapshotName, String indexName) throws IOException { + final Request request = new Request(HttpPut.METHOD_NAME, "/_snapshot/" + repositoryName + '/' + snapshotName); + request.addParameter("wait_for_completion", "true"); + request.setJsonEntity("{ \"indices\" : \"" + indexName + "\", \"include_global_state\": false}"); + final Response response = client().performRequest(request); + assertThat(response.getStatusLine().getStatusCode(), equalTo(RestStatus.OK.getStatus())); + } + + private static void waitForNodes(int numberOfNodes) throws IOException { + final Request request = new Request(HttpGet.METHOD_NAME, "/_cluster/health"); + request.addParameter("wait_for_nodes", String.valueOf(numberOfNodes)); + final Response response = client().performRequest(request); + assertThat(response.getStatusLine().getStatusCode(), equalTo(RestStatus.OK.getStatus())); + } + + @SuppressWarnings("unchecked") + private static Map nodesVersions() throws IOException { + final Response response = client().performRequest(new Request(HttpGet.METHOD_NAME, "_nodes/_all")); + assertThat(response.getStatusLine().getStatusCode(), equalTo(RestStatus.OK.getStatus())); + final Map nodes = (Map) extractValue(responseAsMap(response), "nodes"); + assertNotNull("Nodes info is null", nodes); + final Map nodesVersions = new HashMap<>(nodes.size()); + for (Map.Entry node : nodes.entrySet()) { + nodesVersions.put(node.getKey(), Version.fromString((String) extractValue((Map) node.getValue(), "version"))); + } + return nodesVersions; + } + + private static void deleteSnapshot(String repositoryName, String snapshotName) throws IOException { + final Request request = new Request(HttpDelete.METHOD_NAME, "/_snapshot/" + repositoryName + '/' + snapshotName); + final Response response = client().performRequest(request); + assertThat(response.getStatusLine().getStatusCode(), equalTo(RestStatus.OK.getStatus())); + } + + private static void mountSnapshot( + String repositoryName, + String snapshotName, + String indexName, + String renamedIndex, + Storage storage, + Settings indexSettings + ) throws IOException { + final Request request = new Request(HttpPost.METHOD_NAME, "/_snapshot/" + repositoryName + '/' + snapshotName + "/_mount"); + request.addParameter("storage", storage.storageName()); + request.addParameter("wait_for_completion", "true"); + request.setJsonEntity("{" + + " \"index\": \"" + indexName + "\"," + + " \"renamed_index\": \"" + renamedIndex + "\"," + + " \"index_settings\": " + Strings.toString(indexSettings) + + "}"); + final Response response = client().performRequest(request); + assertThat( + "Failed to mount snapshot [" + snapshotName + "] from repository [" + repositoryName + "]: " + response, + response.getStatusLine().getStatusCode(), + equalTo(RestStatus.OK.getStatus()) + ); + } + + private static void assertHitCount(String indexName, Matcher countMatcher) throws IOException { + final Response response = client().performRequest(new Request(HttpGet.METHOD_NAME, "/" + indexName + "/_count")); + assertThat(response.getStatusLine().getStatusCode(), equalTo(RestStatus.OK.getStatus())); + final Map responseAsMap = responseAsMap(response); + final Number responseCount = (Number) extractValue("count", responseAsMap); + assertThat(responseAsMap + "", responseCount, notNullValue()); + assertThat(((Number) extractValue("count", responseAsMap)).longValue(), countMatcher); + assertThat(((Number) extractValue("_shards.failed", responseAsMap)).intValue(), equalTo(0)); + } +} diff --git a/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/UpgradeClusterClientYamlTestSuiteIT.java b/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/UpgradeClusterClientYamlTestSuiteIT.java index 085935923fb7b..0333d0b3f6930 100644 --- a/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/UpgradeClusterClientYamlTestSuiteIT.java +++ b/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/UpgradeClusterClientYamlTestSuiteIT.java @@ -89,6 +89,16 @@ protected boolean preserveDataStreamsUponCompletion() { return true; } + @Override + protected boolean preserveReposUponCompletion() { + return true; + } + + @Override + protected boolean preserveSnapshotsUponCompletion() { + return true; + } + public UpgradeClusterClientYamlTestSuiteIT(ClientYamlTestCandidate testCandidate) { super(testCandidate); }