diff --git a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/apache/lucene/codecs/lucene50/CompoundReaderUtils.java b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/apache/lucene/codecs/lucene50/CompoundReaderUtils.java new file mode 100644 index 0000000000000..e1fa7cfaf1666 --- /dev/null +++ b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/apache/lucene/codecs/lucene50/CompoundReaderUtils.java @@ -0,0 +1,92 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.apache.lucene.codecs.lucene50; + +import org.apache.lucene.codecs.Codec; +import org.apache.lucene.codecs.CodecUtil; +import org.apache.lucene.codecs.CompoundDirectory; +import org.apache.lucene.index.CorruptIndexException; +import org.apache.lucene.index.IndexFileNames; +import org.apache.lucene.index.SegmentCommitInfo; +import org.apache.lucene.index.SegmentInfo; +import org.apache.lucene.index.SegmentInfos; +import org.apache.lucene.store.ChecksumIndexInput; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.IOContext; +import org.elasticsearch.common.collect.Tuple; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +public class CompoundReaderUtils { + + private CompoundReaderUtils() {} + + public static Map>> extractCompoundFiles(Directory directory) throws IOException { + final Map>> compoundFiles = new HashMap<>(); + final SegmentInfos segmentInfos = SegmentInfos.readLatestCommit(directory); + for (SegmentCommitInfo segmentCommitInfo : segmentInfos) { + final SegmentInfo segmentInfo = segmentCommitInfo.info; + if (segmentInfo.getUseCompoundFile()) { + final Codec codec = segmentInfo.getCodec(); + try (CompoundDirectory compoundDir = codec.compoundFormat().getCompoundReader(directory, segmentInfo, IOContext.DEFAULT)) { + String className = compoundDir.getClass().getName(); + switch (className) { + case "org.apache.lucene.codecs.lucene50.Lucene50CompoundReader": + compoundFiles.put(segmentInfo.name, readEntries(directory, segmentCommitInfo.info)); + break; + default: + assert false : "please implement readEntries() for this format of compound files: " + className; + throw new IllegalStateException("This format of compound files is not supported: " + className); + } + } + } + } + return Collections.unmodifiableMap(compoundFiles); + } + + private static Map> readEntries(Directory directory, SegmentInfo segmentInfo) throws IOException { + final String entriesFileName = IndexFileNames.segmentFileName(segmentInfo.name, "", Lucene50CompoundFormat.ENTRIES_EXTENSION); + try (ChecksumIndexInput entriesStream = directory.openChecksumInput(entriesFileName, IOContext.READONCE)) { + Map> mapping = new HashMap<>(); + Throwable trowable = null; + try { + CodecUtil.checkIndexHeader( + entriesStream, + Lucene50CompoundFormat.ENTRY_CODEC, + Lucene50CompoundFormat.VERSION_START, + Lucene50CompoundFormat.VERSION_CURRENT, + segmentInfo.getId(), + "" + ); + + final int numEntries = entriesStream.readVInt(); + final Set seen = new HashSet<>(numEntries); + for (int i = 0; i < numEntries; i++) { + final String id = entriesStream.readString(); + if (seen.add(id) == false) { + throw new CorruptIndexException("Duplicate cfs entry id=" + id + " in CFS ", entriesStream); + } + long offset = entriesStream.readLong(); + long length = entriesStream.readLong(); + mapping.put(id, Tuple.tuple(offset, length)); + } + assert mapping.size() == numEntries; + } catch (Throwable exception) { + trowable = exception; + } finally { + CodecUtil.checkFooter(entriesStream, trowable); + } + return Collections.unmodifiableMap(mapping); + } + } +} diff --git a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/blobstore/cache/SearchableSnapshotsBlobStoreCacheIntegTests.java b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/blobstore/cache/SearchableSnapshotsBlobStoreCacheIntegTests.java index 31f17f5730bbf..9b7125a94158c 100644 --- a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/blobstore/cache/SearchableSnapshotsBlobStoreCacheIntegTests.java +++ b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/blobstore/cache/SearchableSnapshotsBlobStoreCacheIntegTests.java @@ -7,6 +7,10 @@ package org.elasticsearch.blobstore.cache; +import org.apache.lucene.codecs.CodecUtil; +import org.apache.lucene.codecs.lucene50.CompoundReaderUtils; +import org.apache.lucene.index.IndexFileNames; +import org.apache.lucene.store.Directory; import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse; import org.elasticsearch.action.admin.indices.refresh.RefreshResponse; import org.elasticsearch.action.get.GetResponse; @@ -21,7 +25,7 @@ import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.Decision; import org.elasticsearch.common.Strings; -import org.elasticsearch.common.io.Streams; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; @@ -29,8 +33,11 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.CollectionUtils; import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.index.IndexService; import org.elasticsearch.index.query.QueryBuilders; -import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.indices.IndicesService; import org.elasticsearch.plugins.ClusterPlugin; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.repositories.blobstore.BlobStoreRepository; @@ -38,29 +45,33 @@ import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.xpack.cluster.routing.allocation.DataTierAllocationDecider; import org.elasticsearch.xpack.core.ClientHelper; +import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotRequest.Storage; import org.elasticsearch.xpack.core.searchablesnapshots.SearchableSnapshotShardStats; import org.elasticsearch.xpack.searchablesnapshots.BaseSearchableSnapshotsIntegTestCase; import org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots; import org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants; import org.elasticsearch.xpack.searchablesnapshots.action.SearchableSnapshotsStatsAction; import org.elasticsearch.xpack.searchablesnapshots.action.SearchableSnapshotsStatsRequest; +import org.elasticsearch.xpack.searchablesnapshots.cache.ByteRange; import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService; import java.io.IOException; -import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.concurrent.TimeUnit; -import static org.elasticsearch.repositories.blobstore.BlobStoreRepository.INDEX_SHARD_SNAPSHOT_FORMAT; +import static org.elasticsearch.blobstore.cache.BlobStoreCacheService.computeHeaderByteRange; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.DATA_TIERS_CACHE_INDEX_PREFERENCE; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants.SNAPSHOT_BLOB_CACHE_INDEX; +import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsUtils.toIntBytes; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.is; @@ -111,6 +122,35 @@ public void testBlobStoreCache() throws Exception { assertThat(forceMergeResponse.getSuccessfulShards(), equalTo(numberOfShards.totalNumShards)); assertThat(forceMergeResponse.getFailedShards(), equalTo(0)); } + flushAndRefresh(indexName); + + // Per-shard list of Lucene files with their respective lengths + final Map> expectedLuceneFiles = new HashMap<>(); + // Per-shard list of Lucene compound files with their respective offset and lengths in the .cfs + final Map>>> expectedLuceneCompoundFiles = new HashMap<>(); + + for (IndicesService indicesService : internalCluster().getDataNodeInstances(IndicesService.class)) { + for (IndexService indexService : indicesService) { + for (IndexShard indexShard : indexService) { + final ShardId shardId = indexShard.shardId(); + if (indexName.equals(shardId.getIndexName())) { + final Directory directory = indexShard.store().directory(); + + // load regular Lucene files + for (String file : directory.listAll()) { + String extension = IndexFileNames.getExtension(file); + if (extension != null && extension.equals("si") == false && extension.equals("lock") == false) { + expectedLuceneFiles.computeIfAbsent(shardId, s -> new HashMap<>()).put(file, directory.fileLength(file)); + } + } + + // load Lucene compound files + expectedLuceneCompoundFiles.put(shardId, CompoundReaderUtils.extractCompoundFiles(directory)); + } + } + } + } + assertThat("Failed to load Lucene files", expectedLuceneFiles.size(), equalTo(numberOfShards.numPrimaries)); final String repositoryName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); final Path repositoryLocation = randomRepoPath(); @@ -119,9 +159,9 @@ public void testBlobStoreCache() throws Exception { final SnapshotId snapshot = createSnapshot(repositoryName, "test-snapshot", List.of(indexName)).snapshotId(); assertAcked(client().admin().indices().prepareDelete(indexName)); - // extract the list of blobs per shard from the snapshot directory on disk - final Map blobsInSnapshot = blobsInSnapshot(repositoryLocation, snapshot.getUUID()); - assertThat("Failed to load all shard snapshot metadata files", blobsInSnapshot.size(), equalTo(numberOfShards.numPrimaries)); + // extract the paths of each shards from the snapshot directory on disk + final Map shardsPathsInSnapshot = shardsPathsInSnapshot(repositoryLocation, snapshot.getUUID()); + assertThat("Failed to load all shard snapshot paths", shardsPathsInSnapshot.size(), equalTo(numberOfShards.numPrimaries)); expectThrows( IndexNotFoundException.class, @@ -129,15 +169,20 @@ public void testBlobStoreCache() throws Exception { () -> systemClient().admin().indices().prepareGetIndex().addIndices(SNAPSHOT_BLOB_CACHE_INDEX).get() ); - logger.info("--> mount snapshot [{}] as an index for the first time", snapshot); - final String restoredIndex = mountSnapshot( + boolean cacheEnabled = true; // always enable cache the first time to populate the SNAPSHOT_BLOB_CACHE_INDEX + Storage storage = Storage.FULL_COPY; + logger.info("--> mount snapshot [{}] as an index for the first time [cache={}, storage={}]", snapshot, cacheEnabled, storage); + final String restoredIndex = randomBoolean() ? indexName : randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + mountSnapshot( repositoryName, snapshot.getName(), indexName, + restoredIndex, Settings.builder() - .put(SearchableSnapshots.SNAPSHOT_CACHE_ENABLED_SETTING.getKey(), true) - .put(SearchableSnapshots.SNAPSHOT_CACHE_PREWARM_ENABLED_SETTING.getKey(), false) - .build() + .put(SearchableSnapshots.SNAPSHOT_CACHE_ENABLED_SETTING.getKey(), cacheEnabled) + .put(SearchableSnapshots.SNAPSHOT_CACHE_PREWARM_ENABLED_SETTING.getKey(), randomBoolean()) + .build(), + storage ); ensureGreen(restoredIndex); @@ -151,7 +196,7 @@ public void testBlobStoreCache() throws Exception { assertThat(Strings.toString(indexInputStats), indexInputStats.getCurrentIndexCacheFills(), equalTo(0L)); } } - }); + }, 30L, TimeUnit.SECONDS); for (final SearchableSnapshotShardStats shardStats : client().execute( SearchableSnapshotsStatsAction.INSTANCE, @@ -163,7 +208,7 @@ public void testBlobStoreCache() throws Exception { } logger.info("--> verifying cached documents in system index [{}]", SNAPSHOT_BLOB_CACHE_INDEX); - assertCachedBlobsInSystemIndex(repositoryName, blobsInSnapshot); + assertCachedBlobsInSystemIndex(repositoryName, shardsPathsInSnapshot, expectedLuceneFiles, expectedLuceneCompoundFiles); logger.info("--> verifying system index [{}] data tiers preference", SNAPSHOT_BLOB_CACHE_INDEX); assertThat( @@ -205,33 +250,36 @@ public void testBlobStoreCache() throws Exception { assertAcked(client().admin().indices().prepareDelete(restoredIndex)); - logger.info("--> mount snapshot [{}] as an index for the second time", snapshot); - final String restoredAgainIndex = mountSnapshot( + cacheEnabled = randomBoolean(); + storage = cacheEnabled ? randomFrom(Storage.values()) : Storage.FULL_COPY; + logger.info("--> mount snapshot [{}] as an index for the second time [cache={}, storage={}]", snapshot, cacheEnabled, storage); + final String restoredAgainIndex = randomBoolean() ? indexName : randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + mountSnapshot( repositoryName, snapshot.getName(), indexName, + restoredAgainIndex, Settings.builder() - .put(SearchableSnapshots.SNAPSHOT_CACHE_ENABLED_SETTING.getKey(), true) + .put(SearchableSnapshots.SNAPSHOT_CACHE_ENABLED_SETTING.getKey(), cacheEnabled) .put(SearchableSnapshots.SNAPSHOT_CACHE_PREWARM_ENABLED_SETTING.getKey(), false) - .build() + .build(), + storage ); ensureGreen(restoredAgainIndex); - logger.info("--> verifying shards of [{}] were started without using the blob store more than necessary", restoredAgainIndex); + logger.info("--> shards of [{}] should start without downloading bytes from the blob store", restoredAgainIndex); for (final SearchableSnapshotShardStats shardStats : client().execute( SearchableSnapshotsStatsAction.INSTANCE, new SearchableSnapshotsStatsRequest() ).actionGet().getStats()) { for (final SearchableSnapshotShardStats.CacheIndexInputStats indexInputStats : shardStats.getStats()) { - // we read the header of each file contained within the .cfs file, which could be anywhere - final boolean mayReadMoreThanHeader = indexInputStats.getFileExt().equals("cfs"); - if (indexInputStats.getTotalSize() <= BlobStoreCacheService.DEFAULT_CACHED_BLOB_SIZE * 2 - || mayReadMoreThanHeader == false) { - assertThat(Strings.toString(indexInputStats), indexInputStats.getBlobStoreBytesRequested().getCount(), equalTo(0L)); - } + assertThat(Strings.toString(indexInputStats), indexInputStats.getBlobStoreBytesRequested().getCount(), equalTo(0L)); } } + logger.info("--> verifying cached documents (before search) in system index [{}]", SNAPSHOT_BLOB_CACHE_INDEX); + assertCachedBlobsInSystemIndex(repositoryName, shardsPathsInSnapshot, expectedLuceneFiles, expectedLuceneCompoundFiles); + logger.info("--> verifying documents in index [{}]", restoredAgainIndex); assertHitCount(client().prepareSearch(restoredAgainIndex).setSize(0).setTrackTotalHits(true).get(), numberOfDocs); assertHitCount( @@ -251,9 +299,6 @@ public void testBlobStoreCache() throws Exception { 0L ); - logger.info("--> verifying cached documents (again) in system index [{}]", SNAPSHOT_BLOB_CACHE_INDEX); - assertCachedBlobsInSystemIndex(repositoryName, blobsInSnapshot); - logger.info("--> verifying that no extra cached blobs were indexed [{}]", SNAPSHOT_BLOB_CACHE_INDEX); refreshSystemIndex(); assertHitCount(systemClient().prepareSearch(SNAPSHOT_BLOB_CACHE_INDEX).setSize(0).get(), numberOfCachedBlobs); @@ -276,6 +321,28 @@ public Settings onNodeStopped(String nodeName) throws Exception { }); ensureGreen(restoredAgainIndex); + logger.info("--> verifying cached documents (after restart) in system index [{}]", SNAPSHOT_BLOB_CACHE_INDEX); + assertCachedBlobsInSystemIndex(repositoryName, shardsPathsInSnapshot, expectedLuceneFiles, expectedLuceneCompoundFiles); + + logger.info("--> shards of [{}] should start without downloading bytes from the blob store", restoredAgainIndex); + for (final SearchableSnapshotShardStats shardStats : client().execute( + SearchableSnapshotsStatsAction.INSTANCE, + new SearchableSnapshotsStatsRequest() + ).actionGet().getStats()) { + for (final SearchableSnapshotShardStats.CacheIndexInputStats indexInputStats : shardStats.getStats()) { + assertThat(Strings.toString(indexInputStats), indexInputStats.getBlobStoreBytesRequested().getCount(), equalTo(0L)); + } + } + + logger.info("--> verifying that no cached blobs were indexed in system index [{}] after restart", SNAPSHOT_BLOB_CACHE_INDEX); + assertHitCount(systemClient().prepareSearch(SNAPSHOT_BLOB_CACHE_INDEX).setSize(0).get(), numberOfCachedBlobs); + assertThat( + systemClient().admin().indices().prepareStats(SNAPSHOT_BLOB_CACHE_INDEX).clear().setIndexing(true).get().getTotal().indexing + .getTotal() + .getIndexCount(), + equalTo(0L) + ); + logger.info("--> verifying documents in index [{}]", restoredAgainIndex); assertHitCount(client().prepareSearch(restoredAgainIndex).setSize(0).setTrackTotalHits(true).get(), numberOfDocs); assertHitCount( @@ -295,18 +362,6 @@ public Settings onNodeStopped(String nodeName) throws Exception { 0L ); - logger.info("--> verifying cached documents (after restart) in system index [{}]", SNAPSHOT_BLOB_CACHE_INDEX); - assertCachedBlobsInSystemIndex(repositoryName, blobsInSnapshot); - - logger.info("--> verifying that no cached blobs were indexed in system index [{}] after restart", SNAPSHOT_BLOB_CACHE_INDEX); - assertHitCount(systemClient().prepareSearch(SNAPSHOT_BLOB_CACHE_INDEX).setSize(0).get(), numberOfCachedBlobs); - assertThat( - systemClient().admin().indices().prepareStats(SNAPSHOT_BLOB_CACHE_INDEX).clear().setIndexing(true).get().getTotal().indexing - .getTotal() - .getIndexCount(), - equalTo(0L) - ); - // TODO also test when the index is frozen // TODO also test when prewarming is enabled } @@ -329,71 +384,85 @@ private void refreshSystemIndex() { } /** - * Reads a repository location on disk and extracts the list of blobs for each shards + * Reads a repository location on disk and extracts the paths of each shards */ - private Map blobsInSnapshot(Path repositoryLocation, String snapshotId) throws IOException { - final Map blobsPerShard = new HashMap<>(); + private Map shardsPathsInSnapshot(Path repositoryLocation, String snapshotId) throws IOException { + final Map shards = new HashMap<>(); forEachFileRecursively(repositoryLocation.resolve("indices"), ((file, basicFileAttributes) -> { final String fileName = file.getFileName().toString(); if (fileName.equals(BlobStoreRepository.SNAPSHOT_FORMAT.blobName(snapshotId))) { - blobsPerShard.put( + final Integer shardId = Integer.parseInt(file.getParent().getFileName().toString()); + shards.put( + shardId, String.join( "/", snapshotId, file.getParent().getParent().getFileName().toString(), file.getParent().getFileName().toString() - ), - INDEX_SHARD_SNAPSHOT_FORMAT.deserialize(fileName, xContentRegistry(), Streams.readFully(Files.newInputStream(file))) + ) ); } })); - return Map.copyOf(blobsPerShard); + return Collections.unmodifiableMap(shards); } - private void assertCachedBlobsInSystemIndex(final String repositoryName, final Map blobsInSnapshot) - throws Exception { - assertBusy(() -> { - refreshSystemIndex(); + private void assertCachedBlobsInSystemIndex( + final String repositoryName, + final Map shardsPathsInSnapshot, + final Map> expectedLuceneFiles, + final Map>>> expectedLuceneCompoundFiles + ) throws Exception { + + final Map expectedCachedBlobs = new HashMap<>(); + + for (ShardId shardId : expectedLuceneFiles.keySet()) { + final String shardPath = shardsPathsInSnapshot.get(shardId.getId()); + for (Map.Entry luceneFile : expectedLuceneFiles.get(shardId).entrySet()) { + final ByteRange header = computeHeaderByteRange(luceneFile.getKey(), luceneFile.getValue()); + expectedCachedBlobs.put(String.join("/", repositoryName, shardPath, luceneFile.getKey(), "@" + header.start()), header); + // footer of regular Lucene files is never cached in blob store cache, + // it is extracted from the shard snapshot metadata instead. + } + } - long numberOfCachedBlobs = 0L; - for (Map.Entry blob : blobsInSnapshot.entrySet()) { - for (BlobStoreIndexShardSnapshot.FileInfo fileInfo : blob.getValue().indexFiles()) { - if (fileInfo.name().startsWith("__") == false) { - continue; - } + for (ShardId shardId : expectedLuceneCompoundFiles.keySet()) { + final String shardPath = shardsPathsInSnapshot.get(shardId.getId()); + final Map>> luceneCompoundFiles = expectedLuceneCompoundFiles.get(shardId); + + for (Map.Entry>> luceneCompoundFile : luceneCompoundFiles.entrySet()) { + final String segmentName = luceneCompoundFile.getKey(); + final String cfsFileName = segmentName + ".cfs"; + for (Map.Entry> innerFile : luceneCompoundFile.getValue().entrySet()) { + final long offset = innerFile.getValue().v1(); + final long length = innerFile.getValue().v2(); + + final ByteRange header = computeHeaderByteRange(innerFile.getKey(), length).withOffset(offset); + expectedCachedBlobs.put(String.join("/", repositoryName, shardPath, cfsFileName, "@" + header.start()), header); - final String path = String.join("/", repositoryName, blob.getKey(), fileInfo.physicalName()); - if (fileInfo.length() <= BlobStoreCacheService.DEFAULT_CACHED_BLOB_SIZE * 2) { - // file has been fully cached - final GetResponse getResponse = systemClient().prepareGet(SNAPSHOT_BLOB_CACHE_INDEX, path + "/@0").get(); - assertThat("not cached: [" + path + "/@0] for blob [" + fileInfo + "]", getResponse.isExists(), is(true)); - final CachedBlob cachedBlob = CachedBlob.fromSource(getResponse.getSourceAsMap()); - assertThat(cachedBlob.from(), equalTo(0L)); - assertThat(cachedBlob.to(), equalTo(fileInfo.length())); - assertThat((long) cachedBlob.length(), equalTo(fileInfo.length())); - numberOfCachedBlobs += 1; - - } else { - // first region of file has been cached - GetResponse getResponse = systemClient().prepareGet(SNAPSHOT_BLOB_CACHE_INDEX, path + "/@0").get(); - assertThat( - "not cached: [" + path + "/@0] for first region of blob [" + fileInfo + "]", - getResponse.isExists(), - is(true) - ); - - CachedBlob cachedBlob = CachedBlob.fromSource(getResponse.getSourceAsMap()); - assertThat(cachedBlob.from(), equalTo(0L)); - assertThat(cachedBlob.to(), equalTo((long) BlobStoreCacheService.DEFAULT_CACHED_BLOB_SIZE)); - assertThat(cachedBlob.length(), equalTo(BlobStoreCacheService.DEFAULT_CACHED_BLOB_SIZE)); - numberOfCachedBlobs += 1; + if (header.length() < length) { + final ByteRange footer = ByteRange.of(length - CodecUtil.footerLength(), length).withOffset(offset); + expectedCachedBlobs.put(String.join("/", repositoryName, shardPath, cfsFileName, "@" + footer.start()), footer); } } } + } + assertBusy(() -> { refreshSystemIndex(); - assertHitCount(systemClient().prepareSearch(SNAPSHOT_BLOB_CACHE_INDEX).setSize(0).get(), numberOfCachedBlobs); + assertHitCount(systemClient().prepareSearch(SNAPSHOT_BLOB_CACHE_INDEX).setSize(0).get(), expectedCachedBlobs.size()); }); + + for (Map.Entry expectedCachedBlob : expectedCachedBlobs.entrySet()) { + final String cachedBlobId = expectedCachedBlob.getKey(); + + final GetResponse getResponse = systemClient().prepareGet(SNAPSHOT_BLOB_CACHE_INDEX, cachedBlobId).get(); + assertThat("Cached blob not found: " + cachedBlobId, getResponse.isExists(), is(true)); + + final CachedBlob cachedBlob = CachedBlob.fromSource(getResponse.getSourceAsMap()); + assertThat(cachedBlob.from(), equalTo(expectedCachedBlob.getValue().start())); + assertThat(cachedBlob.to(), equalTo(expectedCachedBlob.getValue().end())); + assertThat(cachedBlob.length(), equalTo(toIntBytes(expectedCachedBlob.getValue().length()))); + } } /** diff --git a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/BaseSearchableSnapshotsIntegTestCase.java b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/BaseSearchableSnapshotsIntegTestCase.java index 8bab23714d365..d8a259ac0fe2e 100644 --- a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/BaseSearchableSnapshotsIntegTestCase.java +++ b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/BaseSearchableSnapshotsIntegTestCase.java @@ -26,6 +26,7 @@ import org.elasticsearch.snapshots.SnapshotsService; import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotAction; import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotRequest; +import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotRequest.Storage; import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService; import org.elasticsearch.xpack.searchablesnapshots.cache.FrozenCacheService; @@ -116,6 +117,17 @@ protected void mountSnapshot( String indexName, String restoredIndexName, Settings restoredIndexSettings + ) throws Exception { + mountSnapshot(repositoryName, snapshotName, indexName, restoredIndexName, restoredIndexSettings, Storage.FULL_COPY); + } + + protected void mountSnapshot( + String repositoryName, + String snapshotName, + String indexName, + String restoredIndexName, + Settings restoredIndexSettings, + final Storage storage ) throws Exception { final MountSearchableSnapshotRequest mountRequest = new MountSearchableSnapshotRequest( restoredIndexName, @@ -128,7 +140,7 @@ protected void mountSnapshot( .build(), Strings.EMPTY_ARRAY, true, - MountSearchableSnapshotRequest.Storage.FULL_COPY + storage ); final RestoreSnapshotResponse restoreResponse = client().execute(MountSearchableSnapshotAction.INSTANCE, mountRequest).get(); diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/blobstore/cache/BlobStoreCacheService.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/blobstore/cache/BlobStoreCacheService.java index 28db874c72a52..d23b6469e4f23 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/blobstore/cache/BlobStoreCacheService.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/blobstore/cache/BlobStoreCacheService.java @@ -10,6 +10,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.lucene.index.IndexFileNames; import org.elasticsearch.ElasticsearchTimeoutException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; @@ -24,14 +25,16 @@ import org.elasticsearch.client.OriginSettingClient; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.ConnectTransportException; +import org.elasticsearch.xpack.searchablesnapshots.cache.ByteRange; import java.time.Instant; +import java.util.Set; import java.util.concurrent.TimeUnit; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; @@ -41,8 +44,6 @@ public class BlobStoreCacheService { private static final Logger logger = LogManager.getLogger(BlobStoreCacheService.class); - public static final int DEFAULT_CACHED_BLOB_SIZE = ByteSizeUnit.KB.toIntBytes(4); - private final ThreadPool threadPool; private final Client client; private final String index; @@ -155,4 +156,70 @@ public void onFailure(Exception e) { listener.onFailure(e); } } + + private static Set METADATA_FILES_EXTENSIONS = Set.of( + "cfe", // compound file's entry table + "dvm", // doc values metadata file + "fdm", // stored fields metadata file + "fnm", // field names metadata file + "kdm", // Lucene 8.6 point format metadata file + "nvm", // norms metadata file + "tmd", // Lucene 8.6 terms metadata file + "tvm", // terms vectors metadata file + "vem" // Lucene 9.0 indexed vectors metadata + ); + + private static Set NON_METADATA_FILES_EXTENSIONS = Set.of( + "cfs", + "dii", + "dim", + "doc", + "dvd", + "fdt", + "fdx", + "kdd", + "kdi", + "liv", + "nvd", + "pay", + "pos", + "tim", + "tip", + "tvd", + "tvx", + "vec" + ); + + /** + * Computes the {@link ByteRange} corresponding to the header of a Lucene file. This range can vary depending of the type of the file + * which is indicated by the file's extension. The returned byte range can never be larger than the file's length but it can be smaller. + * + * @param fileName the name of the file + * @param fileLength the length of the file + * @return + */ + public static ByteRange computeHeaderByteRange(String fileName, long fileLength) { + assert Sets.intersection(METADATA_FILES_EXTENSIONS, NON_METADATA_FILES_EXTENSIONS).isEmpty(); + final String fileExtension = IndexFileNames.getExtension(fileName); + if (METADATA_FILES_EXTENSIONS.contains(fileExtension)) { + return upTo64kb(fileLength); + } else { + if (NON_METADATA_FILES_EXTENSIONS.contains(fileExtension) == false) { + // TODO maybe log less? + logger.warn("Blob store cache failed to detect Lucene file extension [{}], using default cache file size", fileExtension); + } + return upTo1kb(fileLength); + } + } + + private static ByteRange upTo64kb(long fileLength) { + if (fileLength > 65536L) { + return upTo1kb(fileLength); + } + return ByteRange.of(0L, fileLength); + } + + private static ByteRange upTo1kb(long fileLength) { + return ByteRange.of(0L, Math.min(fileLength, 1024L)); + } } diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/blobstore/cache/CachedBlob.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/blobstore/cache/CachedBlob.java index 699fddbac023b..f83feee2943ee 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/blobstore/cache/CachedBlob.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/blobstore/cache/CachedBlob.java @@ -121,6 +121,10 @@ public BytesReference bytes() { return bytes; } + public Version version() { + return version; + } + public static String generateId(String repository, String name, String path, long offset) { return String.join("/", repository, path, name, "@" + offset); } @@ -187,4 +191,27 @@ public static CachedBlob fromSource(final Map source) { to.longValue() ); } + + @Override + public String toString() { + return "CachedBlob [" + + "creationTime=" + + creationTime + + ", version=" + + version + + ", repository='" + + repository + + '\'' + + ", name='" + + name + + '\'' + + ", path='" + + path + + '\'' + + ", from=" + + from + + ", to=" + + to + + ']'; + } } diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/BaseSearchableSnapshotIndexInput.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/BaseSearchableSnapshotIndexInput.java index 539cd873293c2..5d42ad770aa34 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/BaseSearchableSnapshotIndexInput.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/BaseSearchableSnapshotIndexInput.java @@ -7,33 +7,46 @@ package org.elasticsearch.index.store; import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.codecs.CodecUtil; +import org.apache.lucene.index.IndexFileNames; import org.apache.lucene.store.BufferedIndexInput; import org.apache.lucene.store.IOContext; +import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.BytesRefIterator; +import org.elasticsearch.Version; +import org.elasticsearch.blobstore.cache.CachedBlob; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo; import org.elasticsearch.index.snapshots.blobstore.SlicedInputStream; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants; +import org.elasticsearch.xpack.searchablesnapshots.cache.ByteRange; import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; +import java.util.List; import java.util.Objects; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Predicate; +import static org.elasticsearch.blobstore.cache.BlobStoreCacheService.computeHeaderByteRange; import static org.elasticsearch.index.store.checksum.ChecksumBlobContainerIndexInput.checksumToBytesArray; +import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsUtils.toIntBytes; public abstract class BaseSearchableSnapshotIndexInput extends BufferedIndexInput { protected final Logger logger; + protected final SearchableSnapshotDirectory directory; protected final BlobContainer blobContainer; protected final FileInfo fileInfo; protected final IOContext context; protected final IndexInputStats stats; protected final long offset; protected final long length; + protected final List blobCacheByteRanges; // the following are only mutable so they can be adjusted after cloning/slicing protected volatile boolean isClone; @@ -42,16 +55,18 @@ public abstract class BaseSearchableSnapshotIndexInput extends BufferedIndexInpu public BaseSearchableSnapshotIndexInput( Logger logger, String resourceDesc, - BlobContainer blobContainer, + SearchableSnapshotDirectory directory, FileInfo fileInfo, IOContext context, IndexInputStats stats, long offset, - long length + long length, + List blobCacheByteRanges ) { super(resourceDesc, context); this.logger = Objects.requireNonNull(logger); - this.blobContainer = Objects.requireNonNull(blobContainer); + this.directory = Objects.requireNonNull(directory); + this.blobContainer = Objects.requireNonNull(directory.blobContainer()); this.fileInfo = Objects.requireNonNull(fileInfo); this.context = Objects.requireNonNull(context); assert fileInfo.metadata().hashEqualsContents() == false @@ -61,6 +76,7 @@ public BaseSearchableSnapshotIndexInput( this.length = length; this.closed = new AtomicBoolean(false); this.isClone = false; + this.blobCacheByteRanges = Objects.requireNonNull(blobCacheByteRanges); } @Override @@ -68,9 +84,17 @@ public final long length() { return length; } + protected long getAbsolutePosition() { + final long position = getFilePointer() + this.offset; + assert position >= 0L : "absolute position is negative: " + position; + assert position <= fileInfo.length() : position + " vs " + fileInfo.length(); + return position; + } + @Override protected final void readInternal(ByteBuffer b) throws IOException { assert assertCurrentThreadIsNotCacheFetchAsync(); + final int remaining = b.remaining(); // We can detect that we're going to read the last 16 bytes (that contains the footer checksum) of the file. Such reads are often // executed when opening a Directory and since we have the checksum in the snapshot metadata we can use it to fill the ByteBuffer. @@ -78,13 +102,35 @@ protected final void readInternal(ByteBuffer b) throws IOException { logger.trace("read footer of file [{}], bypassing all caches", fileInfo.physicalName()); assert b.remaining() == 0L : b.remaining(); return; + + // We can maybe use the blob store cache to execute this read operation + } else if (maybeReadFromBlobCache(b)) { + logger.trace( + () -> new ParameterizedMessage( + "read [{}] bytes of file [{}] at position [{}] using blob cache index", + remaining, + fileInfo.physicalName(), + getAbsolutePosition() + ) + ); + assert b.remaining() == 0L : b.remaining(); + return; } + assert b.remaining() == remaining; doReadInternal(b); } protected abstract void doReadInternal(ByteBuffer b) throws IOException; + /** + * Called after a read operation completes + * + * @param position the position where the read operation started + * @param length the number of bytes read + */ + protected abstract void onReadComplete(long position, int length); + /** * Detects read operations that are executed on the last 16 bytes of the index input which is where Lucene stores the footer checksum * of Lucene files. If such a read is detected this method tries to complete the read operation by reading the checksum from the @@ -98,7 +144,7 @@ private boolean maybeReadChecksumFromFileInfo(ByteBuffer b) throws IOException { if (remaining != CodecUtil.footerLength()) { return false; } - final long position = getFilePointer() + this.offset; + final long position = getAbsolutePosition(); if (position != fileInfo.length() - CodecUtil.footerLength()) { return false; } @@ -108,6 +154,7 @@ private boolean maybeReadChecksumFromFileInfo(ByteBuffer b) throws IOException { boolean success = false; try { b.put(checksumToBytesArray(fileInfo.checksum())); + onReadComplete(position, remaining); success = true; } catch (NumberFormatException e) { // tests disable this optimisation by passing an invalid checksum @@ -117,6 +164,86 @@ private boolean maybeReadChecksumFromFileInfo(ByteBuffer b) throws IOException { return success; } + /** + * Detects read operations that are executed on a portion of the file that is likely to be present in the blob store cache. + * + * @return true if the read operation has been fully completed using the blob store cache. + */ + private boolean maybeReadFromBlobCache(ByteBuffer b) throws IOException { + if (tryReadFromBlobCache()) { + assert blobCacheByteRanges.size() > 0; + final long position = getAbsolutePosition(); + final int length = b.remaining(); + + final ByteRange range = getBlobCacheByteRange(position, length); + if (range != null) { + final CachedBlob cachedBlob = directory.getCachedBlob(fileInfo.physicalName(), range.start(), length); + if (cachedBlob != CachedBlob.CACHE_MISS && cachedBlob != CachedBlob.CACHE_NOT_READY) { + if (cachedBlob.from() <= position && length <= cachedBlob.length()) { + final BytesRefIterator cachedBytesIterator = cachedBlob.bytes() + .slice(toIntBytes(position - cachedBlob.from()), length) + .iterator(); + BytesRef bytesRef; + while ((bytesRef = cachedBytesIterator.next()) != null) { + b.put(bytesRef.bytes, bytesRef.offset, bytesRef.length); + stats.addIndexCacheBytesRead(bytesRef.length); + } + assert b.position() == length : "copied " + b.position() + " but expected " + length; + onReadComplete(position, length); + return true; + } else { + assert cachedBlob.version().before(Version.CURRENT) : cachedBlob; + return false; + } + } + } + } + return false; + } + + /** + * Indicates if reading data from the blob store cache index should be attempted. This is always the case when the shard is recovering. + */ + protected boolean tryReadFromBlobCache() { + return blobCacheByteRanges.isEmpty() == false && directory.isRecoveryFinalized() == false; + } + + @Nullable + protected ByteRange getBlobCacheByteRange(long position, int length) { + assert tryReadFromBlobCache(); + for (ByteRange blobCacheByteRange : blobCacheByteRanges) { + if (blobCacheByteRange.contains(position, position + length)) { + return blobCacheByteRange; + } + } + return null; + } + + protected boolean isCompoundFile() { + return IndexFileNames.matchesExtension(fileInfo.physicalName(), "cfs"); + } + + protected static List blobCacheByteRanges(String fileName, long fileLength) { + // footer is not cached in blob store cache but extracted from shard snapshot metadata + return List.of(computeHeaderByteRange(fileName, fileLength)); + } + + protected List getBlobCacheByteRangesForSlice(String sliceName, long sliceOffset, long sliceLength) { + if (isCompoundFile() && isClone == false) { + // slices created from .cfs index input can have header/footer in the blob store cache + final ByteRange headerByteRange = computeHeaderByteRange(sliceName, sliceLength).withOffset(this.offset + sliceOffset); + if (headerByteRange.length() == sliceLength) { + return List.of(headerByteRange); + } else { + final ByteRange footerByteRange = ByteRange.of(sliceLength - CodecUtil.footerLength(), sliceLength) + .withOffset(this.offset + sliceOffset); + return List.of(headerByteRange, footerByteRange); + } + } + return List.of(); + + } + /** * Opens an {@link InputStream} for the given range of bytes which reads the data directly from the blob store. If the requested range * spans multiple blobs then this stream will request them in turn. diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java index 50bf2b9495f6c..4fb915794a0c8 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java @@ -419,14 +419,7 @@ public IndexInput openInput(final String name, final IOContext context) throws I ); } } else { - return new DirectBlobContainerIndexInput( - blobContainer(), - fileInfo, - context, - inputStats, - getUncachedChunkSize(), - bufferSize(context) - ); + return new DirectBlobContainerIndexInput(this, fileInfo, context, inputStats, getUncachedChunkSize(), bufferSize(context)); } } diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInput.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInput.java index 9db0503598dd8..28ac46a8f2ad5 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInput.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInput.java @@ -13,11 +13,7 @@ import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; -import org.apache.lucene.util.BytesRef; -import org.apache.lucene.util.BytesRefIterator; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.blobstore.cache.BlobStoreCacheService; -import org.elasticsearch.blobstore.cache.CachedBlob; import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.Tuple; @@ -35,6 +31,7 @@ import java.io.InputStream; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; +import java.util.List; import java.util.Locale; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicLong; @@ -56,7 +53,6 @@ public class CachedBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn private static final Logger logger = LogManager.getLogger(CachedBlobContainerIndexInput.class); private static final int COPY_BUFFER_SIZE = ByteSizeUnit.KB.toIntBytes(8); - private final SearchableSnapshotDirectory directory; private final CacheFileReference cacheFileReference; private final int defaultRangeSize; private final int recoveryRangeSize; @@ -84,9 +80,9 @@ public CachedBlobContainerIndexInput( fileInfo.length(), new CacheFileReference(directory, fileInfo.physicalName(), fileInfo.length()), rangeSize, - recoveryRangeSize + recoveryRangeSize, + blobCacheByteRanges(fileInfo.physicalName(), fileInfo.length()) ); - assert getBufferSize() <= BlobStoreCacheService.DEFAULT_CACHED_BLOB_SIZE; // must be able to cache at least one buffer's worth stats.incrementOpenCount(); } @@ -100,10 +96,10 @@ private CachedBlobContainerIndexInput( long length, CacheFileReference cacheFileReference, int rangeSize, - int recoveryRangeSize + int recoveryRangeSize, + List blobCacheByteRanges ) { - super(logger, resourceDesc, directory.blobContainer(), fileInfo, context, stats, offset, length); - this.directory = directory; + super(logger, resourceDesc, directory, fileInfo, context, stats, offset, length, blobCacheByteRanges); this.cacheFileReference = cacheFileReference; this.lastReadPosition = this.offset; this.lastSeekPosition = this.offset; @@ -141,112 +137,30 @@ protected void doReadInternal(ByteBuffer b) throws IOException { try { final CacheFile cacheFile = cacheFileReference.get(); - // Can we serve the read directly from disk? If so, do so and don't worry about anything else. - - final Future waitingForRead = cacheFile.readIfAvailableOrPending(ByteRange.of(position, position + length), chan -> { - final int read = readCacheFile(chan, position, b); - assert read == length : read + " vs " + length; - return read; - }); - - if (waitingForRead != null) { - final Integer read = waitingForRead.get(); - assert read == length; - readComplete(position, length); - return; - } - - // Requested data is not on disk, so try the cache index next. - + // Are we trying to read data that should be present in blob cache index? final ByteRange indexCacheMiss; // null if not a miss + if (tryReadFromBlobCache()) { + indexCacheMiss = getBlobCacheByteRange(position, length); + } else { + // requested range is not eligible for caching + indexCacheMiss = null; - // We try to use the cache index if: - // - the file is small enough to be fully cached - final boolean canBeFullyCached = fileInfo.length() <= BlobStoreCacheService.DEFAULT_CACHED_BLOB_SIZE * 2; - // - we're reading the first N bytes of the file - final boolean isStartOfFile = (position + length <= BlobStoreCacheService.DEFAULT_CACHED_BLOB_SIZE); - - if (canBeFullyCached || isStartOfFile) { - final CachedBlob cachedBlob = directory.getCachedBlob(fileInfo.physicalName(), 0L, length); - - if (cachedBlob == CachedBlob.CACHE_MISS || cachedBlob == CachedBlob.CACHE_NOT_READY) { - // We would have liked to find a cached entry but we did not find anything: the cache on the disk will be requested - // so we compute the region of the file we would like to have the next time. The region is expressed as a tuple of - // {start, end} where positions are relative to the whole file. - - if (canBeFullyCached) { - // if the index input is smaller than twice the size of the blob cache, it will be fully indexed - indexCacheMiss = ByteRange.of(0L, fileInfo.length()); - } else { - // the index input is too large to fully cache, so just cache the initial range - indexCacheMiss = ByteRange.of(0L, BlobStoreCacheService.DEFAULT_CACHED_BLOB_SIZE); - } - - // We must fill in a cache miss even if CACHE_NOT_READY since the cache index is only created on the first put. - // TODO TBD use a different trigger for creating the cache index and avoid a put in the CACHE_NOT_READY case. - } else { - logger.trace( - "reading [{}] bytes of file [{}] at position [{}] using cache index", - length, - fileInfo.physicalName(), - position - ); - stats.addIndexCacheBytesRead(cachedBlob.length()); - - final BytesRefIterator cachedBytesIterator = cachedBlob.bytes().slice(toIntBytes(position), length).iterator(); - BytesRef bytesRef; - while ((bytesRef = cachedBytesIterator.next()) != null) { - b.put(bytesRef.bytes, bytesRef.offset, bytesRef.length); + // Can we serve the read directly from disk? If so, do so and don't worry about anything else. + final Future waitingForRead = cacheFile.readIfAvailableOrPending( + ByteRange.of(position, position + length), + chan -> { + final int read = readCacheFile(chan, position, b); + assert read == length : read + " vs " + length; + return read; } - assert b.position() == length : "copied " + b.position() + " but expected " + length; - - try { - final ByteRange cachedRange = ByteRange.of(cachedBlob.from(), cachedBlob.to()); - cacheFile.populateAndRead( - cachedRange, - cachedRange, - channel -> cachedBlob.length(), - (channel, from, to, progressUpdater) -> { - final long startTimeNanos = stats.currentTimeNanos(); - final BytesRefIterator iterator = cachedBlob.bytes() - .slice(toIntBytes(from - cachedBlob.from()), toIntBytes(to - from)) - .iterator(); - long writePosition = from; - BytesRef current; - while ((current = iterator.next()) != null) { - final ByteBuffer byteBuffer = ByteBuffer.wrap(current.bytes, current.offset, current.length); - while (byteBuffer.remaining() > 0) { - writePosition += positionalWrite(channel, writePosition, byteBuffer); - progressUpdater.accept(writePosition); - } - } - assert writePosition == to : writePosition + " vs " + to; - final long endTimeNanos = stats.currentTimeNanos(); - stats.addCachedBytesWritten(to - from, endTimeNanos - startTimeNanos); - logger.trace("copied bytes [{}-{}] of file [{}] from cache index to disk", from, to, fileInfo); - }, - directory.cacheFetchAsyncExecutor() - ); - } catch (Exception e) { - logger.debug( - new ParameterizedMessage( - "failed to store bytes [{}-{}] of file [{}] obtained from index cache", - cachedBlob.from(), - cachedBlob.to(), - fileInfo - ), - e - ); - // oh well, no big deal, at least we can return them to the caller. - } - - readComplete(position, length); + ); + if (waitingForRead != null) { + final Integer read = waitingForRead.get(); + assert read == length; + onReadComplete(position, length); return; } - } else { - // requested range is not eligible for caching - indexCacheMiss = null; } // Requested data is also not in the cache index, so we must visit the blob store to satisfy both the target range and any @@ -274,11 +188,12 @@ protected void doReadInternal(ByteBuffer b) throws IOException { final Future readFuture = cacheFile.readIfAvailableOrPending(indexCacheMiss, channel -> { final int indexCacheMissLength = toIntBytes(indexCacheMiss.length()); + // Revisit this: // We assume that we only cache small portions of blobs so that we do not need to: // - use a BigArrays for allocation // - use an intermediate copy buffer to read the file in sensibly-sized chunks // - release the buffer once the indexing operation is complete - assert indexCacheMissLength <= COPY_BUFFER_SIZE : indexCacheMiss; + // assert indexCacheMissLength <= COPY_BUFFER_SIZE : indexCacheMiss; final ByteBuffer byteBuffer = ByteBuffer.allocate(indexCacheMissLength); Channels.readFromFileChannelWithEofException(channel, indexCacheMiss.start(), byteBuffer); @@ -319,10 +234,11 @@ public void onFailure(Exception e1) { // already a rare case caused by an overfull/undersized cache. } - readComplete(position, length); + onReadComplete(position, length); } - private void readComplete(long position, int length) { + @Override + protected void onReadComplete(long position, int length) { stats.incrementBytesRead(lastReadPosition, position, length); lastReadPosition = position + length; lastSeekPosition = lastReadPosition; @@ -600,7 +516,8 @@ public IndexInput slice(String sliceDescription, long offset, long length) { length, cacheFileReference, defaultRangeSize, - recoveryRangeSize + recoveryRangeSize, + getBlobCacheByteRangesForSlice(sliceDescription, offset, length) ); slice.isClone = true; return slice; diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/FrozenIndexInput.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/FrozenIndexInput.java index fe5ca6156a5c5..faeacad79319f 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/FrozenIndexInput.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/FrozenIndexInput.java @@ -13,12 +13,8 @@ import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; -import org.apache.lucene.util.BytesRef; -import org.apache.lucene.util.BytesRefIterator; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.StepListener; -import org.elasticsearch.blobstore.cache.BlobStoreCacheService; -import org.elasticsearch.blobstore.cache.CachedBlob; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.unit.ByteSizeUnit; @@ -34,6 +30,7 @@ import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; +import java.util.List; import java.util.Locale; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -48,7 +45,6 @@ public class FrozenIndexInput extends BaseSearchableSnapshotIndexInput { private static final Logger logger = LogManager.getLogger(FrozenIndexInput.class); private static final int COPY_BUFFER_SIZE = ByteSizeUnit.KB.toIntBytes(8); - private final SearchableSnapshotDirectory directory; private final FrozenCacheFile frozenCacheFile; private final int defaultRangeSize; private final int recoveryRangeSize; @@ -76,9 +72,9 @@ public FrozenIndexInput( fileInfo.length(), directory.getFrozenCacheFile(fileInfo.physicalName(), fileInfo.length()), rangeSize, - recoveryRangeSize + recoveryRangeSize, + blobCacheByteRanges(fileInfo.physicalName(), fileInfo.length()) ); - assert getBufferSize() <= BlobStoreCacheService.DEFAULT_CACHED_BLOB_SIZE; // must be able to cache at least one buffer's worth stats.incrementOpenCount(); } @@ -92,10 +88,10 @@ private FrozenIndexInput( long length, FrozenCacheFile frozenCacheFile, int rangeSize, - int recoveryRangeSize + int recoveryRangeSize, + List blobCacheByteRanges ) { - super(logger, resourceDesc, directory.blobContainer(), fileInfo, context, stats, offset, length); - this.directory = directory; + super(logger, resourceDesc, directory, fileInfo, context, stats, offset, length, blobCacheByteRanges); this.frozenCacheFile = frozenCacheFile; this.lastReadPosition = this.offset; this.lastSeekPosition = this.offset; @@ -124,7 +120,7 @@ private ByteRange computeRange(long position) { @Override protected void doReadInternal(ByteBuffer b) throws IOException { ensureContext(ctx -> ctx != CACHE_WARMING_CONTEXT); - final long position = getFilePointer() + this.offset; + final long position = getAbsolutePosition(); final int length = b.remaining(); final ReentrantReadWriteLock luceneByteBufLock = new ReentrantReadWriteLock(); @@ -144,132 +140,44 @@ protected void doReadInternal(ByteBuffer b) throws IOException { logger.trace("readInternal: read [{}-{}] ([{}] bytes) from [{}]", position, position + length, length, this); try { - // Can we serve the read directly from disk? If so, do so and don't worry about anything else. - - final StepListener waitingForRead = frozenCacheFile.readIfAvailableOrPending( - ByteRange.of(position, position + length), - (channel, pos, relativePos, len) -> { - final int read = readCacheFile(channel, pos, relativePos, len, b, position, true, luceneByteBufLock, stopAsyncReads); - assert read <= length : read + " vs " + length; - return read; - } - ); - - if (waitingForRead != null) { - final Integer read = waitingForRead.asFuture().get(); - assert read == length; - assert luceneByteBufLock.getReadHoldCount() == 0; - preventAsyncBufferChanges.run(); - b.position(read); // mark all bytes as accounted for - readComplete(position, length); - return; - } - - // Requested data is not on disk, so try the cache index next. - + // Are we trying to read data that should be present in blob cache index? final ByteRange indexCacheMiss; // null if not a miss + if (tryReadFromBlobCache()) { + indexCacheMiss = getBlobCacheByteRange(position, length); + } else { + // requested range is not eligible for caching + indexCacheMiss = null; - // We try to use the cache index if: - // - the file is small enough to be fully cached - final boolean canBeFullyCached = fileInfo.length() <= BlobStoreCacheService.DEFAULT_CACHED_BLOB_SIZE * 2; - // - we're reading the first N bytes of the file - final boolean isStartOfFile = (position + length <= BlobStoreCacheService.DEFAULT_CACHED_BLOB_SIZE); - - if (canBeFullyCached || isStartOfFile) { - final CachedBlob cachedBlob = directory.getCachedBlob(fileInfo.physicalName(), 0L, length); - - if (cachedBlob == CachedBlob.CACHE_MISS || cachedBlob == CachedBlob.CACHE_NOT_READY) { - // We would have liked to find a cached entry but we did not find anything: the cache on the disk will be requested - // so we compute the region of the file we would like to have the next time. The region is expressed as a tuple of - // {start, end} where positions are relative to the whole file. - - if (canBeFullyCached) { - // if the index input is smaller than twice the size of the blob cache, it will be fully indexed - indexCacheMiss = ByteRange.of(0L, fileInfo.length()); - } else { - // the index input is too large to fully cache, so just cache the initial range - indexCacheMiss = ByteRange.of(0L, (long) BlobStoreCacheService.DEFAULT_CACHED_BLOB_SIZE); - } - - // We must fill in a cache miss even if CACHE_NOT_READY since the cache index is only created on the first put. - // TODO TBD use a different trigger for creating the cache index and avoid a put in the CACHE_NOT_READY case. - } else { - logger.trace( - "reading [{}] bytes of file [{}] at position [{}] using cache index", - length, - fileInfo.physicalName(), - position - ); - stats.addIndexCacheBytesRead(cachedBlob.length()); - - preventAsyncBufferChanges.run(); - - final BytesRefIterator cachedBytesIterator = cachedBlob.bytes().slice(toIntBytes(position), length).iterator(); - int copiedBytes = 0; - BytesRef bytesRef; - while ((bytesRef = cachedBytesIterator.next()) != null) { - b.put(bytesRef.bytes, bytesRef.offset, bytesRef.length); - copiedBytes += bytesRef.length; - } - assert copiedBytes == length : "copied " + copiedBytes + " but expected " + length; - - try { - final ByteRange cachedRange = ByteRange.of(cachedBlob.from(), cachedBlob.to()); - frozenCacheFile.populateAndRead( - cachedRange, - cachedRange, - (channel, channelPos, relativePos, len) -> Math.toIntExact(len), - (channel, channelPos, relativePos, len, progressUpdater) -> { - assert len <= cachedBlob.to() - cachedBlob.from(); - final long startTimeNanos = stats.currentTimeNanos(); - final BytesRefIterator iterator = cachedBlob.bytes() - .slice(toIntBytes(relativePos), toIntBytes(len)) - .iterator(); - long writePosition = channelPos; - long bytesCopied = 0L; - BytesRef current; - while ((current = iterator.next()) != null) { - final ByteBuffer byteBuffer = ByteBuffer.wrap(current.bytes, current.offset, current.length); - while (byteBuffer.remaining() > 0) { - final long bytesWritten = positionalWrite(channel, writePosition, byteBuffer); - bytesCopied += bytesWritten; - writePosition += bytesWritten; - progressUpdater.accept(bytesCopied); - } - } - long channelTo = channelPos + len; - assert writePosition == channelTo : writePosition + " vs " + channelTo; - final long endTimeNanos = stats.currentTimeNanos(); - stats.addCachedBytesWritten(len, endTimeNanos - startTimeNanos); - logger.trace( - "copied bytes [{}-{}] of file [{}] from cache index to disk", - relativePos, - relativePos + len, - fileInfo - ); - }, - directory.cacheFetchAsyncExecutor() + // Can we serve the read directly from disk? If so, do so and don't worry about anything else. + + final StepListener waitingForRead = frozenCacheFile.readIfAvailableOrPending( + ByteRange.of(position, position + length), + (channel, pos, relativePos, len) -> { + final int read = readCacheFile( + channel, + pos, + relativePos, + len, + b, + position, + true, + luceneByteBufLock, + stopAsyncReads ); - } catch (Exception e) { - logger.debug( - new ParameterizedMessage( - "failed to store bytes [{}-{}] of file [{}] obtained from index cache", - cachedBlob.from(), - cachedBlob.to(), - fileInfo - ), - e - ); - // oh well, no big deal, at least we can return them to the caller. + assert read <= length : read + " vs " + length; + return read; } + ); - readComplete(position, length); - + if (waitingForRead != null) { + final Integer read = waitingForRead.asFuture().get(); + assert read == length; + assert luceneByteBufLock.getReadHoldCount() == 0; + preventAsyncBufferChanges.run(); + b.position(read); // mark all bytes as accounted for + onReadComplete(position, length); return; } - } else { - // requested range is not eligible for caching - indexCacheMiss = null; } // Requested data is also not in the cache index, so we must visit the blob store to satisfy both the target range and any @@ -316,11 +224,12 @@ protected void doReadInternal(ByteBuffer b) throws IOException { if (indexCacheMiss != null) { final Releasable onCacheFillComplete = stats.addIndexCacheFill(); final int indexCacheMissLength = toIntBytes(indexCacheMiss.length()); + // Revisit this: // We assume that we only cache small portions of blobs so that we do not need to: // - use a BigArrays for allocation // - use an intermediate copy buffer to read the file in sensibly-sized chunks // - release the buffer once the indexing operation is complete - assert indexCacheMissLength <= COPY_BUFFER_SIZE : indexCacheMiss; + // assert indexCacheMissLength <= COPY_BUFFER_SIZE : indexCacheMiss; final ByteBuffer byteBuffer = ByteBuffer.allocate(indexCacheMissLength); @@ -395,10 +304,11 @@ public void onFailure(Exception e1) { // already a rare case caused by an overfull/undersized cache. } - readComplete(position, length); + onReadComplete(position, length); } - private void readComplete(long position, int length) { + @Override + protected void onReadComplete(long position, int length) { stats.incrementBytesRead(lastReadPosition, position, length); lastReadPosition = position + length; lastSeekPosition = lastReadPosition; @@ -642,7 +552,8 @@ public IndexInput slice(String sliceDescription, long offset, long length) { length, frozenCacheFile, defaultRangeSize, - recoveryRangeSize + recoveryRangeSize, + getBlobCacheByteRangesForSlice(sliceDescription, offset, length) ); slice.isClone = true; return slice; diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/direct/DirectBlobContainerIndexInput.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/direct/DirectBlobContainerIndexInput.java index b083aa7304e7a..359a31657849d 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/direct/DirectBlobContainerIndexInput.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/direct/DirectBlobContainerIndexInput.java @@ -18,6 +18,8 @@ import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo; import org.elasticsearch.index.store.BaseSearchableSnapshotIndexInput; import org.elasticsearch.index.store.IndexInputStats; +import org.elasticsearch.index.store.SearchableSnapshotDirectory; +import org.elasticsearch.xpack.searchablesnapshots.cache.ByteRange; import java.io.Closeable; import java.io.EOFException; @@ -25,6 +27,7 @@ import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; +import java.util.List; import java.util.Objects; import java.util.concurrent.atomic.LongAdder; @@ -67,7 +70,7 @@ public class DirectBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn private static final int COPY_BUFFER_SIZE = 8192; public DirectBlobContainerIndexInput( - BlobContainer blobContainer, + SearchableSnapshotDirectory directory, FileInfo fileInfo, IOContext context, IndexInputStats stats, @@ -76,7 +79,7 @@ public DirectBlobContainerIndexInput( ) { this( "DirectBlobContainerIndexInput(" + fileInfo.physicalName() + ")", - blobContainer, + directory, fileInfo, context, stats, @@ -84,14 +87,15 @@ public DirectBlobContainerIndexInput( 0L, fileInfo.length(), sequentialReadSize, - bufferSize + bufferSize, + blobCacheByteRanges(fileInfo.physicalName(), fileInfo.length()) ); stats.incrementOpenCount(); } private DirectBlobContainerIndexInput( String resourceDesc, - BlobContainer blobContainer, + SearchableSnapshotDirectory directory, FileInfo fileInfo, IOContext context, IndexInputStats stats, @@ -99,9 +103,10 @@ private DirectBlobContainerIndexInput( long offset, long length, long sequentialReadSize, - int bufferSize + int bufferSize, + List blobCacheByteRanges ) { - super(logger, resourceDesc, blobContainer, fileInfo, context, stats, offset, length); + super(logger, resourceDesc, directory, fileInfo, context, stats, offset, length, blobCacheByteRanges); this.position = position; assert sequentialReadSize >= 0; this.sequentialReadSize = sequentialReadSize; @@ -189,6 +194,9 @@ private int readOptimized(int part, long pos, ByteBuffer b, int length) throws I return read; } + @Override + protected void onReadComplete(long position, int length) {} + private void closeStreamForSequentialReads() throws IOException { try { IOUtils.close(streamForSequentialReads); @@ -271,7 +279,7 @@ protected void seekInternal(long pos) throws IOException { public DirectBlobContainerIndexInput clone() { final DirectBlobContainerIndexInput clone = new DirectBlobContainerIndexInput( "clone(" + this + ")", - blobContainer, + directory, fileInfo, context, stats, @@ -281,7 +289,8 @@ public DirectBlobContainerIndexInput clone() { // Clones might not be closed when they are no longer needed, but we must always close streamForSequentialReads. The simple // solution: do not optimize sequential reads on clones. NO_SEQUENTIAL_READ_OPTIMIZATION, - getBufferSize() + getBufferSize(), + blobCacheByteRanges ); clone.isClone = true; return clone; @@ -292,7 +301,7 @@ public IndexInput slice(String sliceDescription, long offset, long length) throw if ((offset >= 0L) && (length >= 0L) && (offset + length <= length())) { final DirectBlobContainerIndexInput slice = new DirectBlobContainerIndexInput( getFullSliceDescription(sliceDescription), - blobContainer, + directory, fileInfo, context, stats, @@ -302,7 +311,8 @@ public IndexInput slice(String sliceDescription, long offset, long length) throw // Slices might not be closed when they are no longer needed, but we must always close streamForSequentialReads. The simple // solution: do not optimize sequential reads on slices. NO_SEQUENTIAL_READ_OPTIMIZATION, - getBufferSize() + getBufferSize(), + getBlobCacheByteRangesForSlice(sliceDescription, offset, length) ); slice.isClone = true; slice.seek(0L); diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/ByteRange.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/ByteRange.java index 96e1725b3a0c6..3198638147ba3 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/ByteRange.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/ByteRange.java @@ -65,6 +65,14 @@ public boolean isSubRangeOf(ByteRange range) { return start >= range.start() && end <= range.end(); } + public boolean contains(long start, long end) { + return start() <= start && end <= end(); + } + + public ByteRange withOffset(long offset) { + return ByteRange.of(offset + start(), offset + end()); + } + @Override public int hashCode() { return 31 * Long.hashCode(start) + Long.hashCode(end); diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryStatsTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryStatsTests.java index 256f4cd13660e..70eaab22e28fb 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryStatsTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryStatsTests.java @@ -605,8 +605,7 @@ private void executeTestCase( ) throws Exception { final byte[] fileContent = randomByteArrayOfLength(randomIntBetween(10, MAX_FILE_LENGTH)); - final String fileExtension = randomAlphaOfLength(3); - final String fileName = randomAlphaOfLength(10) + '.' + fileExtension; + final String fileName = randomAlphaOfLength(5) + randomFileExtension(); final SnapshotId snapshotId = new SnapshotId("_name", "_uuid"); final IndexId indexId = new IndexId("_name", "_uuid"); final ShardId shardId = new ShardId("_name", "_uuid", 0); diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryTests.java index 3235eaab87537..cfa618dd97553 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryTests.java @@ -659,8 +659,7 @@ public void testClearCache() throws Exception { final Path shardSnapshotDir = createTempDir(); for (int i = 0; i < nbRandomFiles; i++) { - final String fileName = "file_" + randomAlphaOfLength(10); - + final String fileName = randomAlphaOfLength(5) + randomFileExtension(); final Tuple bytes = randomChecksumBytes(randomIntBetween(1, 100_000)); final byte[] input = bytes.v2(); final String checksum = bytes.v1(); diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInputTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInputTests.java index d4d5621d646ed..456d72952afb4 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInputTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInputTests.java @@ -63,7 +63,7 @@ public void testRandomReads() throws Exception { ShardId shardId = new ShardId("_name", "_uuid", 0); for (int i = 0; i < 5; i++) { - final String fileName = randomAlphaOfLength(10); + final String fileName = randomAlphaOfLength(5) + randomFileExtension(); final Tuple bytes = randomChecksumBytes(randomIntBetween(1, 100_000)); final byte[] input = bytes.v2(); @@ -178,7 +178,7 @@ public void testThrowsEOFException() throws Exception { IndexId indexId = new IndexId("_name", "_uuid"); ShardId shardId = new ShardId("_name", "_uuid", 0); - final String fileName = randomAlphaOfLength(10); + final String fileName = randomAlphaOfLength(5) + randomFileExtension(); final Tuple bytes = randomChecksumBytes(randomIntBetween(1, 1000)); final byte[] input = bytes.v2(); diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/FrozenIndexInputTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/FrozenIndexInputTests.java index 9976857b3767f..1641b34ba85ee 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/FrozenIndexInputTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/FrozenIndexInputTests.java @@ -34,7 +34,6 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.List; -import java.util.Locale; import static org.hamcrest.Matchers.instanceOf; import static org.mockito.Mockito.mock; @@ -44,7 +43,7 @@ public class FrozenIndexInputTests extends AbstractSearchableSnapshotsTestCase { private static final ShardId SHARD_ID = new ShardId(new Index("_index_name", "_index_id"), 0); public void testRandomReads() throws IOException { - final String fileName = randomAlphaOfLengthBetween(5, 10).toLowerCase(Locale.ROOT); + final String fileName = randomAlphaOfLength(5) + randomFileExtension(); final Tuple bytes = randomChecksumBytes(randomIntBetween(1, 100_000)); final byte[] fileData = bytes.v2(); diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/direct/DirectBlobContainerIndexInputTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/direct/DirectBlobContainerIndexInputTests.java index e165d44bcb70d..f2ab16fd65929 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/direct/DirectBlobContainerIndexInputTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/direct/DirectBlobContainerIndexInputTests.java @@ -8,6 +8,7 @@ import org.apache.lucene.store.BufferedIndexInput; import org.apache.lucene.util.Version; +import org.elasticsearch.blobstore.cache.CachedBlob; import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.lucene.store.ESIndexInputTestCase; @@ -15,6 +16,7 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo; import org.elasticsearch.index.store.IndexInputStats; +import org.elasticsearch.index.store.SearchableSnapshotDirectory; import org.elasticsearch.index.store.StoreFileMetadata; import java.io.ByteArrayInputStream; @@ -25,6 +27,7 @@ import java.util.concurrent.atomic.AtomicInteger; import static org.elasticsearch.xpack.searchablesnapshots.AbstractSearchableSnapshotsTestCase.randomChecksumBytes; +import static org.elasticsearch.xpack.searchablesnapshots.AbstractSearchableSnapshotsTestCase.randomFileExtension; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsUtils.toIntBytes; import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.containsString; @@ -59,9 +62,10 @@ private DirectBlobContainerIndexInput createIndexInput( String checksum, Runnable onReadBlob ) throws IOException { + final String fileName = randomAlphaOfLength(5) + randomFileExtension(); final FileInfo fileInfo = new FileInfo( randomAlphaOfLength(5), - new StoreFileMetadata("test", input.length, checksum, Version.LATEST), + new StoreFileMetadata(fileName, input.length, checksum, Version.LATEST), partSize == input.length ? randomFrom( new ByteSizeValue(partSize, ByteSizeUnit.BYTES), @@ -116,8 +120,13 @@ public int read(byte[] b, int off, int len) throws IOException { }; } }); + + final SearchableSnapshotDirectory directory = mock(SearchableSnapshotDirectory.class); + when(directory.getCachedBlob(anyString(), anyLong(), anyInt())).thenReturn(CachedBlob.CACHE_NOT_READY); + when(directory.blobContainer()).thenReturn(blobContainer); + final DirectBlobContainerIndexInput indexInput = new DirectBlobContainerIndexInput( - blobContainer, + directory, fileInfo, newIOContext(random()), new IndexInputStats(1, 0L, () -> 0L), diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/AbstractSearchableSnapshotsTestCase.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/AbstractSearchableSnapshotsTestCase.java index 4cd8537a78d64..a0070f5c7c162 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/AbstractSearchableSnapshotsTestCase.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/AbstractSearchableSnapshotsTestCase.java @@ -333,4 +333,35 @@ public static Tuple randomChecksumBytes(byte[] bytes) throws IOE } return Tuple.tuple(checksum, out.toArrayCopy()); } + + public static String randomFileExtension() { + return randomFrom( + ".cfe", + ".cfs", + ".dii", + ".dim", + ".doc", + ".dvd", + ".dvm", + ".fdt", + ".fdx", + ".fdm", + ".fnm", + ".kdd", + ".kdi", + ".kdm", + ".liv", + ".nvd", + ".nvm", + ".pay", + ".pos", + ".tim", + ".tip", + ".tmd", + ".tvd", + ".tvx", + ".vec", + ".vem" + ); + } }