From 5e7547d353ea2f41470faeb14024bb260b4750f8 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Wed, 3 Mar 2021 12:50:22 +0100 Subject: [PATCH 1/6] Use blob store cache for Lucene compound files --- .../lucene/store/ESIndexInputTestCase.java | 34 ++++++- ...ableSnapshotsBlobStoreCacheIntegTests.java | 95 +------------------ .../BaseSearchableSnapshotIndexInput.java | 95 ++++++++++++++++++- .../cache/CachedBlobContainerIndexInput.java | 67 ++++++++----- .../index/store/cache/FrozenIndexInput.java | 85 +++++++++++------ .../searchablesnapshots/cache/ByteRange.java | 4 + .../DirectBlobContainerIndexInputTests.java | 1 - .../AbstractSearchableSnapshotsTestCase.java | 31 ------ 8 files changed, 232 insertions(+), 180 deletions(-) diff --git a/test/framework/src/main/java/org/elasticsearch/common/lucene/store/ESIndexInputTestCase.java b/test/framework/src/main/java/org/elasticsearch/common/lucene/store/ESIndexInputTestCase.java index ad5d6456134bc..fe6d15a61bec7 100644 --- a/test/framework/src/main/java/org/elasticsearch/common/lucene/store/ESIndexInputTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/common/lucene/store/ESIndexInputTestCase.java @@ -72,7 +72,7 @@ protected byte[] randomReadAndSlice(IndexInput indexInput, int length) throws IO case 3: // Read using slice len = randomIntBetween(1, length - readPos); - IndexInput slice = indexInput.slice("slice (" + readPos + ", " + len + ") of " + indexInput, readPos, len); + IndexInput slice = indexInput.slice("slice" + randomFileExtension(), readPos, len); temp = randomReadAndSlice(slice, len); // assert that position in the original input didn't change assertEquals(readPos, indexInput.getFilePointer()); @@ -121,7 +121,7 @@ protected void doRun() throws Exception { clone = indexInput.clone(); } else { final int sliceEnd = between(readEnd, length); - clone = indexInput.slice("concurrent slice (0, " + sliceEnd + ") of " + indexInput, 0L, sliceEnd); + clone = indexInput.slice("concurrent slice" + randomFileExtension(), 0L, sliceEnd); } startLatch.countDown(); startLatch.await(); @@ -178,4 +178,34 @@ public void onRejection(Exception e) { return output; } + protected static String randomFileExtension() { + return randomFrom( + ".cfe", + ".cfs", + ".dii", + ".dim", + ".doc", + ".dvd", + ".dvm", + ".fdt", + ".fdx", + ".fdm", + ".fnm", + ".kdd", + ".kdi", + ".kdm", + ".liv", + ".nvd", + ".nvm", + ".pay", + ".pos", + ".tim", + ".tip", + ".tmd", + ".tvd", + ".tvx", + ".vec", + ".vem" + ); + } } diff --git a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/blobstore/cache/SearchableSnapshotsBlobStoreCacheIntegTests.java b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/blobstore/cache/SearchableSnapshotsBlobStoreCacheIntegTests.java index db7742ba273bb..3ab8438cc5296 100644 --- a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/blobstore/cache/SearchableSnapshotsBlobStoreCacheIntegTests.java +++ b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/blobstore/cache/SearchableSnapshotsBlobStoreCacheIntegTests.java @@ -9,7 +9,6 @@ import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse; import org.elasticsearch.action.admin.indices.refresh.RefreshResponse; -import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.Client; @@ -22,7 +21,6 @@ import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.Decision; import org.elasticsearch.common.Strings; -import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; @@ -33,10 +31,8 @@ import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.shard.IndexingStats; -import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot; import org.elasticsearch.plugins.ClusterPlugin; import org.elasticsearch.plugins.Plugin; -import org.elasticsearch.repositories.blobstore.BlobStoreRepository; import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.snapshots.SnapshotsService; import org.elasticsearch.test.InternalTestCluster; @@ -49,30 +45,23 @@ import org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants; import org.elasticsearch.xpack.searchablesnapshots.action.SearchableSnapshotsStatsAction; import org.elasticsearch.xpack.searchablesnapshots.action.SearchableSnapshotsStatsRequest; -import org.elasticsearch.xpack.searchablesnapshots.cache.ByteRange; import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService; import org.elasticsearch.xpack.searchablesnapshots.cache.FrozenCacheService; import org.junit.AfterClass; import org.junit.BeforeClass; -import java.io.IOException; -import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; import java.util.Collection; -import java.util.HashMap; import java.util.List; import java.util.Locale; -import java.util.Map; -import static org.elasticsearch.repositories.blobstore.BlobStoreRepository.INDEX_SHARD_SNAPSHOT_FORMAT; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.DATA_TIERS_CACHE_INDEX_PREFERENCE; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants.SNAPSHOT_BLOB_CACHE_INDEX; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; -import static org.hamcrest.Matchers.is; public class SearchableSnapshotsBlobStoreCacheIntegTests extends BaseSearchableSnapshotsIntegTestCase { @@ -154,10 +143,6 @@ public void testBlobStoreCache() throws Exception { final SnapshotId snapshot = createSnapshot(repositoryName, "test-snapshot", List.of(indexName)).snapshotId(); assertAcked(client().admin().indices().prepareDelete(indexName)); - // extract the list of blobs per shard from the snapshot directory on disk - final Map blobsInSnapshot = blobsInSnapshot(repositoryLocation, snapshot.getUUID()); - assertThat("Failed to load all shard snapshot metadata files", blobsInSnapshot.size(), equalTo(numberOfShards.numPrimaries)); - expectThrows( IndexNotFoundException.class, ".snapshot-blob-cache system index should not be created yet", @@ -209,7 +194,8 @@ public void testBlobStoreCache() throws Exception { logger.info("--> verifying cached documents in system index [{}]", SNAPSHOT_BLOB_CACHE_INDEX); if (numberOfDocs > 0) { - assertCachedBlobsInSystemIndex(repositoryName, blobsInSnapshot); + ensureYellow(SNAPSHOT_BLOB_CACHE_INDEX); + refreshSystemIndex(); logger.info("--> verifying system index [{}] data tiers preference", SNAPSHOT_BLOB_CACHE_INDEX); assertThat( @@ -259,22 +245,13 @@ public void testBlobStoreCache() throws Exception { ); ensureGreen(restoredAgainIndex); - logger.info("--> verifying cached documents (after second mount) in system index [{}]", SNAPSHOT_BLOB_CACHE_INDEX); - if (numberOfDocs > 0) { - assertCachedBlobsInSystemIndex(repositoryName, blobsInSnapshot); - } - logger.info("--> verifying shards of [{}] were started without using the blob store more than necessary", restoredAgainIndex); for (final SearchableSnapshotShardStats shardStats : client().execute( SearchableSnapshotsStatsAction.INSTANCE, new SearchableSnapshotsStatsRequest() ).actionGet().getStats()) { for (final SearchableSnapshotShardStats.CacheIndexInputStats indexInputStats : shardStats.getStats()) { - // we read the header of each file contained within the .cfs file, which could be anywhere - final boolean mayReadMoreThanHeader = indexInputStats.getFileExt().equals("cfs"); - if (mayReadMoreThanHeader == false) { - assertThat(Strings.toString(indexInputStats), indexInputStats.getBlobStoreBytesRequested().getCount(), equalTo(0L)); - } + assertThat(Strings.toString(indexInputStats), indexInputStats.getBlobStoreBytesRequested().getCount(), equalTo(0L)); } } @@ -312,22 +289,13 @@ public Settings onNodeStopped(String nodeName) throws Exception { }); ensureGreen(restoredAgainIndex); - logger.info("--> verifying cached documents (after restart) in system index [{}]", SNAPSHOT_BLOB_CACHE_INDEX); - if (numberOfDocs > 0) { - assertCachedBlobsInSystemIndex(repositoryName, blobsInSnapshot); - } - logger.info("--> shards of [{}] should start without downloading bytes from the blob store", restoredAgainIndex); for (final SearchableSnapshotShardStats shardStats : client().execute( SearchableSnapshotsStatsAction.INSTANCE, new SearchableSnapshotsStatsRequest() ).actionGet().getStats()) { for (final SearchableSnapshotShardStats.CacheIndexInputStats indexInputStats : shardStats.getStats()) { - // we read the header of each file contained within the .cfs file, which could be anywhere - final boolean mayReadMoreThanHeader = indexInputStats.getFileExt().equals("cfs"); - if (mayReadMoreThanHeader == false) { - assertThat(Strings.toString(indexInputStats), indexInputStats.getBlobStoreBytesRequested().getCount(), equalTo(0L)); - } + assertThat(Strings.toString(indexInputStats), indexInputStats.getBlobStoreBytesRequested().getCount(), equalTo(0L)); } } @@ -371,61 +339,6 @@ private void refreshSystemIndex() { } } - /** - * Reads a repository location on disk and extracts the list of blobs for each shards - */ - private Map blobsInSnapshot(Path repositoryLocation, String snapshotId) throws IOException { - final Map blobsPerShard = new HashMap<>(); - forEachFileRecursively(repositoryLocation.resolve("indices"), ((file, basicFileAttributes) -> { - final String fileName = file.getFileName().toString(); - if (fileName.equals(BlobStoreRepository.SNAPSHOT_FORMAT.blobName(snapshotId))) { - blobsPerShard.put( - String.join( - "/", - snapshotId, - file.getParent().getParent().getFileName().toString(), - file.getParent().getFileName().toString() - ), - INDEX_SHARD_SNAPSHOT_FORMAT.deserialize(fileName, xContentRegistry(), Streams.readFully(Files.newInputStream(file))) - ); - } - })); - return Map.copyOf(blobsPerShard); - } - - private void assertCachedBlobsInSystemIndex(final String repositoryName, final Map blobsInSnapshot) - throws Exception { - final BlobStoreCacheService blobCacheService = internalCluster().getDataNodeInstance(BlobStoreCacheService.class); - assertBusy(() -> { - refreshSystemIndex(); - - long numberOfCachedBlobs = 0L; - for (Map.Entry blob : blobsInSnapshot.entrySet()) { - for (BlobStoreIndexShardSnapshot.FileInfo fileInfo : blob.getValue().indexFiles()) { - if (fileInfo.name().startsWith("__") == false) { - continue; - } - - final String fileName = fileInfo.physicalName(); - final long length = fileInfo.length(); - final ByteRange expectedByteRange = blobCacheService.computeBlobCacheByteRange(fileName, length, blobCacheMaxLength); - final String path = String.join("/", repositoryName, blob.getKey(), fileName, "@" + expectedByteRange.start()); - - final GetResponse getResponse = systemClient().prepareGet(SNAPSHOT_BLOB_CACHE_INDEX, path).get(); - assertThat("Expected cached blob [" + path + "] for blob [" + fileInfo + "]", getResponse.isExists(), is(true)); - final CachedBlob cachedBlob = CachedBlob.fromSource(getResponse.getSourceAsMap()); - assertThat(cachedBlob.from(), equalTo(expectedByteRange.start())); - assertThat(cachedBlob.to(), equalTo(expectedByteRange.end())); - assertThat((long) cachedBlob.length(), equalTo(expectedByteRange.length())); - numberOfCachedBlobs += 1; - } - } - - refreshSystemIndex(); - assertHitCount(systemClient().prepareSearch(SNAPSHOT_BLOB_CACHE_INDEX).setSize(0).get(), numberOfCachedBlobs); - }); - } - /** * This plugin declares an {@link AllocationDecider} that forces searchable snapshot shards to be allocated after * the primary shards of the snapshot blob cache index are started. This way we can ensure that searchable snapshot diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/BaseSearchableSnapshotIndexInput.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/BaseSearchableSnapshotIndexInput.java index 2564aa2a5e4da..3643f94f56db1 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/BaseSearchableSnapshotIndexInput.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/BaseSearchableSnapshotIndexInput.java @@ -8,9 +8,11 @@ import org.apache.logging.log4j.Logger; import org.apache.lucene.codecs.CodecUtil; +import org.apache.lucene.index.IndexFileNames; import org.apache.lucene.store.BufferedIndexInput; import org.apache.lucene.store.IOContext; import org.elasticsearch.common.blobstore.BlobContainer; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo; import org.elasticsearch.index.snapshots.blobstore.SlicedInputStream; import org.elasticsearch.threadpool.ThreadPool; @@ -38,8 +40,18 @@ public abstract class BaseSearchableSnapshotIndexInput extends BufferedIndexInpu protected final long offset; protected final long length; - /** Range of bytes that should be cached in the blob cache for the current index input **/ - protected final ByteRange blobCacheByteRange; + /** + * Range of bytes that should be cached in the blob cache for the current index input's header. + */ + protected final ByteRange headerBlobCacheByteRange; + + /** + * Range of bytes that should be cached in the blob cache for the current index input's footer. This footer byte range should only be + * required for slices of CFS files; regular files already have their footers extracted from the {@link FileInfo} (see method + * {@link #maybeReadChecksumFromFileInfo}). + */ + protected final ByteRange footerBlobCacheByteRange; + protected final boolean isCompoundFile; // the following are only mutable so they can be adjusted after cloning/slicing protected volatile boolean isClone; @@ -55,6 +67,21 @@ public BaseSearchableSnapshotIndexInput( long offset, long length, ByteRange blobCacheByteRange + ) { + this(logger, resourceDesc, directory, fileInfo, context, stats, offset, length, blobCacheByteRange, ByteRange.EMPTY); + } + + protected BaseSearchableSnapshotIndexInput( + Logger logger, + String resourceDesc, + SearchableSnapshotDirectory directory, + FileInfo fileInfo, + IOContext context, + IndexInputStats stats, + long offset, + long length, + ByteRange headerBlobCacheByteRange, + ByteRange footerBlobCacheByteRange ) { super(resourceDesc, context); this.logger = Objects.requireNonNull(logger); @@ -64,7 +91,9 @@ public BaseSearchableSnapshotIndexInput( this.context = Objects.requireNonNull(context); assert fileInfo.metadata().hashEqualsContents() == false : "this method should only be used with blobs that are NOT stored in metadata's hash field " + "(fileInfo: " + fileInfo + ')'; - this.blobCacheByteRange = Objects.requireNonNull(blobCacheByteRange); + this.headerBlobCacheByteRange = Objects.requireNonNull(headerBlobCacheByteRange); + this.footerBlobCacheByteRange = Objects.requireNonNull(footerBlobCacheByteRange); + this.isCompoundFile = IndexFileNames.matchesExtension(fileInfo.physicalName(), "cfs"); this.stats = Objects.requireNonNull(stats); this.offset = offset; this.length = length; @@ -77,6 +106,13 @@ public final long length() { return length; } + protected long getAbsolutePosition() { + final long position = getFilePointer() + this.offset; + assert position >= 0L : "absolute position is negative: " + position; + assert position <= fileInfo.length() : position + " vs " + fileInfo.length(); + return position; + } + @Override protected final void readInternal(ByteBuffer b) throws IOException { assert assertCurrentThreadIsNotCacheFetchAsync(); @@ -107,7 +143,7 @@ private boolean maybeReadChecksumFromFileInfo(ByteBuffer b) throws IOException { if (remaining > CodecUtil.footerLength()) { return false; } - final long position = getFilePointer() + this.offset; + final long position = getAbsolutePosition(); final long checksumPosition = fileInfo.length() - CodecUtil.footerLength(); if (position < checksumPosition) { return false; @@ -132,6 +168,57 @@ private boolean maybeReadChecksumFromFileInfo(ByteBuffer b) throws IOException { return success; } + protected ByteRange maybeReadFromBlobCache(long position, int length) { + final long end = position + length; + if (headerBlobCacheByteRange.contains(position, end)) { + return headerBlobCacheByteRange; + } else if (footerBlobCacheByteRange.contains(position, end)) { + return footerBlobCacheByteRange; + } + return ByteRange.EMPTY; + } + + /** + * Computes the appropriate {@link ByteRange}s to put in blob store cache for the header and footer of slices created from CFS files. + * + * @param sliceName the name of the slice. Luckily Lucene passes the exact file name when slicing a CFS file. + * @param sliceOffset the offset of the slice in the CFS file + * @param sliceLength the length of the slice in the CFS file + * + * @return a {@link Tuple} of {@link ByteRange} whose first element is the header byte range and second element the footer byte range. + */ + protected Tuple getBlobCacheByteRangesForSlice(String sliceName, long sliceOffset, long sliceLength) { + if (this.isCompoundFile && this.isClone == false) { + // slices created from .cfs index input can have header/footer in the blob store cache + final ByteRange headerByteRange = directory.getBlobCacheByteRange(sliceName, sliceLength).withOffset(sliceOffset); + if (headerByteRange.length() == sliceLength) { + return Tuple.tuple(headerByteRange, ByteRange.EMPTY); + } else { + final ByteRange footerByteRange = ByteRange.of(sliceLength - CodecUtil.footerLength(), sliceLength).withOffset(sliceOffset); + return Tuple.tuple(headerByteRange, footerByteRange); + } + } else { + return Tuple.tuple(this.headerBlobCacheByteRange, this.footerBlobCacheByteRange); + } + } + + /** + * This method detects read operations during shard recovery that operate on the header or the footer of a slice IndexInput created + * from a Lucene CFS file. In such case we want to bypass the disk based cache to force the creation of the blob document in blob cache. + */ + protected boolean isReadFromCompoundFileDuringRecovery(long position, int length) { + if (isCompoundFile == false || directory.isRecoveryFinalized() || isClone == false) { + return false; + } + final ByteRange blobCacheByteRange = maybeReadFromBlobCache(position, length); + if (blobCacheByteRange == ByteRange.EMPTY) { + // we are reading from a slice created from a CFS file, but the read does not fall into a range of bytes that should be cached + // in the blob store cache, so we don't need to force an update of the cached blob. + return false; + } + return true; + } + /** * Opens an {@link InputStream} for the given range of bytes which reads the data directly from the blob store. If the requested range * spans multiple blobs then this stream will request them in turn. diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInput.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInput.java index f7eac478a8af0..35a9f59870920 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInput.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInput.java @@ -84,7 +84,8 @@ public CachedBlobContainerIndexInput( new CacheFileReference(directory, fileInfo.physicalName(), fileInfo.length()), rangeSize, recoveryRangeSize, - directory.getBlobCacheByteRange(fileInfo.physicalName(), fileInfo.length()) + directory.getBlobCacheByteRange(fileInfo.physicalName(), fileInfo.length()), + ByteRange.EMPTY ); assert getBufferSize() <= BlobStoreCacheService.DEFAULT_CACHED_BLOB_SIZE; // must be able to cache at least one buffer's worth stats.incrementOpenCount(); @@ -101,9 +102,21 @@ private CachedBlobContainerIndexInput( CacheFileReference cacheFileReference, int rangeSize, int recoveryRangeSize, - ByteRange blobCacheByteRange + ByteRange headerBlobCacheByteRange, + ByteRange footerBlobCacheByteRange ) { - super(logger, resourceDesc, directory, fileInfo, context, stats, offset, length, blobCacheByteRange); + super( + logger, + resourceDesc, + directory, + fileInfo, + context, + stats, + offset, + length, + headerBlobCacheByteRange, + footerBlobCacheByteRange + ); this.cacheFileReference = cacheFileReference; this.lastReadPosition = this.offset; this.lastSeekPosition = this.offset; @@ -134,7 +147,7 @@ private ByteRange computeRange(long position) { @Override protected void doReadInternal(ByteBuffer b) throws IOException { ensureContext(ctx -> ctx != CACHE_WARMING_CONTEXT); - final long position = getFilePointer() + this.offset; + final long position = getAbsolutePosition(); final int length = b.remaining(); logger.trace("readInternal: read [{}-{}] ([{}] bytes) from [{}]", position, position + length, length, this); @@ -142,24 +155,29 @@ protected void doReadInternal(ByteBuffer b) throws IOException { final CacheFile cacheFile = cacheFileReference.get(); // Can we serve the read directly from disk? If so, do so and don't worry about anything else. + if (isReadFromCompoundFileDuringRecovery(position, length) == false) { + final Future waitingForRead = cacheFile.readIfAvailableOrPending( + ByteRange.of(position, position + length), + chan -> { + final int read = readCacheFile(chan, position, b); + assert read == length : read + " vs " + length; + return read; + } + ); - final Future waitingForRead = cacheFile.readIfAvailableOrPending(ByteRange.of(position, position + length), chan -> { - final int read = readCacheFile(chan, position, b); - assert read == length : read + " vs " + length; - return read; - }); - - if (waitingForRead != null) { - final Integer read = waitingForRead.get(); - assert read == length; - readComplete(position, length); - return; + if (waitingForRead != null) { + final Integer read = waitingForRead.get(); + assert read == length; + readComplete(position, length); + return; + } } // Requested data is not on disk, so try the cache index next. final ByteRange indexCacheMiss; // null if not a miss - if (blobCacheByteRange.contains(position, position + length)) { + final ByteRange blobCacheByteRange = maybeReadFromBlobCache(position, length); + if (blobCacheByteRange != ByteRange.EMPTY) { final CachedBlob cachedBlob = directory.getCachedBlob(fileInfo.physicalName(), blobCacheByteRange); assert cachedBlob == CachedBlob.CACHE_MISS || cachedBlob == CachedBlob.CACHE_NOT_READY || cachedBlob.from() <= position; assert cachedBlob == CachedBlob.CACHE_MISS || cachedBlob == CachedBlob.CACHE_NOT_READY || length <= cachedBlob.length(); @@ -562,33 +580,36 @@ public CachedBlobContainerIndexInput clone() { } @Override - public IndexInput slice(String sliceDescription, long offset, long length) { - if (offset < 0 || length < 0 || offset + length > length()) { + public IndexInput slice(String sliceDescription, long sliceOffset, long sliceLength) { + if (sliceOffset < 0 || sliceLength < 0 || sliceOffset + sliceLength > length()) { throw new IllegalArgumentException( "slice() " + sliceDescription + " out of bounds: offset=" - + offset + + sliceOffset + ",length=" - + length + + sliceLength + ",fileLength=" + length() + ": " + this ); } + final long offset = this.offset + sliceOffset; + final Tuple blobCacheByteRanges = getBlobCacheByteRangesForSlice(sliceDescription, offset, sliceLength); final CachedBlobContainerIndexInput slice = new CachedBlobContainerIndexInput( getFullSliceDescription(sliceDescription), directory, fileInfo, context, stats, - this.offset + offset, - length, + offset, + sliceLength, cacheFileReference, defaultRangeSize, recoveryRangeSize, - ByteRange.EMPTY // TODO implement blob cache for slices when it makes sense (like CFs) + blobCacheByteRanges.v1(), + blobCacheByteRanges.v2() ); slice.isClone = true; return slice; diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/FrozenIndexInput.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/FrozenIndexInput.java index 0d3138534aba1..dac2db97db407 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/FrozenIndexInput.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/FrozenIndexInput.java @@ -20,6 +20,7 @@ import org.elasticsearch.blobstore.cache.BlobStoreCacheService; import org.elasticsearch.blobstore.cache.CachedBlob; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo; @@ -76,7 +77,8 @@ public FrozenIndexInput( directory.getFrozenCacheFile(fileInfo.physicalName(), fileInfo.length()), rangeSize, recoveryRangeSize, - directory.getBlobCacheByteRange(fileInfo.physicalName(), fileInfo.length()) + directory.getBlobCacheByteRange(fileInfo.physicalName(), fileInfo.length()), + ByteRange.EMPTY ); assert getBufferSize() <= BlobStoreCacheService.DEFAULT_CACHED_BLOB_SIZE; // must be able to cache at least one buffer's worth stats.incrementOpenCount(); @@ -93,9 +95,21 @@ private FrozenIndexInput( FrozenCacheFile frozenCacheFile, int rangeSize, int recoveryRangeSize, - ByteRange blobCacheByteRange + ByteRange headerBlobCacheByteRange, + ByteRange footerBlobCacheByteRange ) { - super(logger, resourceDesc, directory, fileInfo, context, stats, offset, length, blobCacheByteRange); + super( + logger, + resourceDesc, + directory, + fileInfo, + context, + stats, + offset, + length, + headerBlobCacheByteRange, + footerBlobCacheByteRange + ); this.frozenCacheFile = frozenCacheFile; this.lastReadPosition = this.offset; this.lastSeekPosition = this.offset; @@ -124,7 +138,7 @@ private ByteRange computeRange(long position) { @Override protected void doReadInternal(ByteBuffer b) throws IOException { ensureContext(ctx -> ctx != CACHE_WARMING_CONTEXT); - final long position = getFilePointer() + this.offset; + final long position = getAbsolutePosition(); final int length = b.remaining(); final ReentrantReadWriteLock luceneByteBufLock = new ReentrantReadWriteLock(); @@ -145,30 +159,42 @@ protected void doReadInternal(ByteBuffer b) throws IOException { try { // Can we serve the read directly from disk? If so, do so and don't worry about anything else. + if (isReadFromCompoundFileDuringRecovery(position, length) == false) { + final StepListener waitingForRead = frozenCacheFile.readIfAvailableOrPending( + ByteRange.of(position, position + length), + (channel, pos, relativePos, len) -> { + final int read = readCacheFile( + channel, + pos, + relativePos, + len, + b, + position, + true, + luceneByteBufLock, + stopAsyncReads + ); + assert read <= length : read + " vs " + length; + return read; + } + ); - final StepListener waitingForRead = frozenCacheFile.readIfAvailableOrPending( - ByteRange.of(position, position + length), - (channel, pos, relativePos, len) -> { - final int read = readCacheFile(channel, pos, relativePos, len, b, position, true, luceneByteBufLock, stopAsyncReads); - assert read <= length : read + " vs " + length; - return read; + if (waitingForRead != null) { + final Integer read = waitingForRead.asFuture().get(); + assert read == length; + assert luceneByteBufLock.getReadHoldCount() == 0; + preventAsyncBufferChanges.run(); + b.position(read); // mark all bytes as accounted for + readComplete(position, length); + return; } - ); - - if (waitingForRead != null) { - final Integer read = waitingForRead.asFuture().get(); - assert read == length; - assert luceneByteBufLock.getReadHoldCount() == 0; - preventAsyncBufferChanges.run(); - b.position(read); // mark all bytes as accounted for - readComplete(position, length); - return; } // Requested data is not on disk, so try the cache index next. final ByteRange indexCacheMiss; // null if not a miss - if (blobCacheByteRange.contains(position, position + length)) { + final ByteRange blobCacheByteRange = maybeReadFromBlobCache(position, length); + if (blobCacheByteRange != ByteRange.EMPTY) { final CachedBlob cachedBlob = directory.getCachedBlob(fileInfo.physicalName(), blobCacheByteRange); assert cachedBlob == CachedBlob.CACHE_MISS || cachedBlob == CachedBlob.CACHE_NOT_READY || cachedBlob.from() <= position; assert cachedBlob == CachedBlob.CACHE_MISS || cachedBlob == CachedBlob.CACHE_NOT_READY || length <= cachedBlob.length(); @@ -604,33 +630,36 @@ public FrozenIndexInput clone() { } @Override - public IndexInput slice(String sliceDescription, long offset, long length) { - if (offset < 0 || length < 0 || offset + length > length()) { + public IndexInput slice(String sliceDescription, long sliceOffset, long sliceLength) { + if (sliceOffset < 0 || sliceLength < 0 || sliceOffset + sliceLength > length()) { throw new IllegalArgumentException( "slice() " + sliceDescription + " out of bounds: offset=" - + offset + + sliceOffset + ",length=" - + length + + sliceLength + ",fileLength=" + length() + ": " + this ); } + final long offset = this.offset + sliceOffset; + final Tuple blobCacheByteRanges = getBlobCacheByteRangesForSlice(sliceDescription, offset, sliceLength); final FrozenIndexInput slice = new FrozenIndexInput( getFullSliceDescription(sliceDescription), directory, fileInfo, context, stats, - this.offset + offset, - length, + offset, + sliceLength, frozenCacheFile, defaultRangeSize, recoveryRangeSize, - ByteRange.EMPTY // TODO implement blob cache for slices when it makes sense (like CFs) + blobCacheByteRanges.v1(), + blobCacheByteRanges.v2() ); slice.isClone = true; return slice; diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/ByteRange.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/ByteRange.java index 165634f5e0978..2f051740394ce 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/ByteRange.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/ByteRange.java @@ -69,6 +69,10 @@ public boolean contains(long start, long end) { return start() <= start && end <= end(); } + public ByteRange withOffset(long offset) { + return ByteRange.of(Math.addExact(start(), offset), Math.addExact(end(), offset)); + } + @Override public int hashCode() { return 31 * Long.hashCode(start) + Long.hashCode(end); diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/direct/DirectBlobContainerIndexInputTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/direct/DirectBlobContainerIndexInputTests.java index 7820ddf499ee8..f24137c7e09f0 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/direct/DirectBlobContainerIndexInputTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/direct/DirectBlobContainerIndexInputTests.java @@ -28,7 +28,6 @@ import java.util.concurrent.atomic.AtomicInteger; import static org.elasticsearch.xpack.searchablesnapshots.AbstractSearchableSnapshotsTestCase.randomChecksumBytes; -import static org.elasticsearch.xpack.searchablesnapshots.AbstractSearchableSnapshotsTestCase.randomFileExtension; import static org.elasticsearch.xpack.searchablesnapshots.AbstractSearchableSnapshotsTestCase.randomIOContext; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsUtils.toIntBytes; import static org.hamcrest.Matchers.allOf; diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/AbstractSearchableSnapshotsTestCase.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/AbstractSearchableSnapshotsTestCase.java index b5b498bf7455a..a24b5bc6b8a5f 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/AbstractSearchableSnapshotsTestCase.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/AbstractSearchableSnapshotsTestCase.java @@ -335,37 +335,6 @@ public static Tuple randomChecksumBytes(byte[] bytes) throws IOE return Tuple.tuple(checksum, out.toArrayCopy()); } - public static String randomFileExtension() { - return randomFrom( - ".cfe", - ".cfs", - ".dii", - ".dim", - ".doc", - ".dvd", - ".dvm", - ".fdt", - ".fdx", - ".fdm", - ".fnm", - ".kdd", - ".kdi", - ".kdm", - ".liv", - ".nvd", - ".nvm", - ".pay", - ".pos", - ".tim", - ".tip", - ".tmd", - ".tvd", - ".tvx", - ".vec", - ".vem" - ); - } - /** * @return a random {@link IOContext} that corresponds to a default, read or read_once usage. * From ab38952e5674490fed0fec8636eacd1f6e7f4f8d Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Thu, 4 Mar 2021 11:43:39 +0100 Subject: [PATCH 2/6] current state of work - not working --- .../lucene/store/ESIndexInputTestCase.java | 6 +- ...ableSnapshotsBlobStoreCacheIntegTests.java | 18 ++- .../BaseSearchableSnapshotIndexInput.java | 56 ++----- .../store/SearchableSnapshotDirectory.java | 4 +- .../cache/CachedBlobContainerIndexInput.java | 52 +++--- .../index/store/cache/FrozenIndexInput.java | 152 +++++++++++------- .../direct/DirectBlobContainerIndexInput.java | 13 +- .../searchablesnapshots/cache/ByteRange.java | 4 - .../cache/FrozenCacheService.java | 4 + .../DirectBlobContainerIndexInputTests.java | 1 + 10 files changed, 161 insertions(+), 149 deletions(-) diff --git a/test/framework/src/main/java/org/elasticsearch/common/lucene/store/ESIndexInputTestCase.java b/test/framework/src/main/java/org/elasticsearch/common/lucene/store/ESIndexInputTestCase.java index fe6d15a61bec7..d03df1d2e6682 100644 --- a/test/framework/src/main/java/org/elasticsearch/common/lucene/store/ESIndexInputTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/common/lucene/store/ESIndexInputTestCase.java @@ -50,7 +50,7 @@ protected byte[] randomReadAndSlice(IndexInput indexInput, int length) throws IO int readPos = (int) indexInput.getFilePointer(); byte[] output = new byte[length]; while (readPos < length) { - switch (randomIntBetween(0, 5)) { + switch (randomIntBetween(0, 3)) { case 0: // Read by one byte at a time output[readPos++] = indexInput.readByte(); @@ -72,7 +72,7 @@ protected byte[] randomReadAndSlice(IndexInput indexInput, int length) throws IO case 3: // Read using slice len = randomIntBetween(1, length - readPos); - IndexInput slice = indexInput.slice("slice" + randomFileExtension(), readPos, len); + IndexInput slice = indexInput.slice(randomAlphaOfLength(10) + randomFileExtension(), readPos, len); temp = randomReadAndSlice(slice, len); // assert that position in the original input didn't change assertEquals(readPos, indexInput.getFilePointer()); @@ -121,7 +121,7 @@ protected void doRun() throws Exception { clone = indexInput.clone(); } else { final int sliceEnd = between(readEnd, length); - clone = indexInput.slice("concurrent slice" + randomFileExtension(), 0L, sliceEnd); + clone = indexInput.slice("slice" + randomAlphaOfLength(10) + randomFileExtension(), 0L, sliceEnd); } startLatch.countDown(); startLatch.await(); diff --git a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/blobstore/cache/SearchableSnapshotsBlobStoreCacheIntegTests.java b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/blobstore/cache/SearchableSnapshotsBlobStoreCacheIntegTests.java index 3ab8438cc5296..5e23b2dea1b2f 100644 --- a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/blobstore/cache/SearchableSnapshotsBlobStoreCacheIntegTests.java +++ b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/blobstore/cache/SearchableSnapshotsBlobStoreCacheIntegTests.java @@ -36,6 +36,7 @@ import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.snapshots.SnapshotsService; import org.elasticsearch.test.InternalTestCluster; +import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.xpack.cluster.routing.allocation.DataTierAllocationDecider; import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotRequest.Storage; @@ -104,11 +105,17 @@ protected int numberOfReplicas() { return 0; } + @Override + protected int numberOfShards() { + return 1; + } + @Override protected Settings nodeSettings(int nodeOrdinal) { return Settings.builder().put(super.nodeSettings(nodeOrdinal)).put(cacheSettings).build(); } + @TestLogging(reason = "code", value = "org.elasticsearch.blobstore.cache:TRACE") public void testBlobStoreCache() throws Exception { final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); createIndex(indexName); @@ -149,7 +156,7 @@ public void testBlobStoreCache() throws Exception { () -> systemClient().admin().indices().prepareGetIndex().addIndices(SNAPSHOT_BLOB_CACHE_INDEX).get() ); - Storage storage = randomFrom(Storage.values()); + final Storage storage = Storage.SHARED_CACHE; logger.info( "--> mount snapshot [{}] as an index for the first time [storage={}, max length={}]", snapshot, @@ -228,7 +235,14 @@ public void testBlobStoreCache() throws Exception { assertHitCount(client().prepareSearch(restoredIndex).setSize(0).setTrackTotalHits(true).get(), numberOfDocs); assertAcked(client().admin().indices().prepareDelete(restoredIndex)); - storage = randomFrom(Storage.values()); + assertBusy(() -> { + refreshSystemIndex(); + assertThat( + systemClient().prepareSearch(SNAPSHOT_BLOB_CACHE_INDEX).setSize(0).get().getHits().getTotalHits().value, + greaterThan(0L) + ); + }); + logger.info("--> mount snapshot [{}] as an index for the second time [storage={}]", snapshot, storage); final String restoredAgainIndex = randomBoolean() ? indexName : randomAlphaOfLength(10).toLowerCase(Locale.ROOT); mountSnapshot( diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/BaseSearchableSnapshotIndexInput.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/BaseSearchableSnapshotIndexInput.java index 3643f94f56db1..d4d49adfd5dd0 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/BaseSearchableSnapshotIndexInput.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/BaseSearchableSnapshotIndexInput.java @@ -32,6 +32,7 @@ public abstract class BaseSearchableSnapshotIndexInput extends BufferedIndexInput { protected final Logger logger; + protected final String name; protected final SearchableSnapshotDirectory directory; protected final BlobContainer blobContainer; protected final FileInfo fileInfo; @@ -59,7 +60,7 @@ public abstract class BaseSearchableSnapshotIndexInput extends BufferedIndexInpu public BaseSearchableSnapshotIndexInput( Logger logger, - String resourceDesc, + String name, SearchableSnapshotDirectory directory, FileInfo fileInfo, IOContext context, @@ -68,12 +69,12 @@ public BaseSearchableSnapshotIndexInput( long length, ByteRange blobCacheByteRange ) { - this(logger, resourceDesc, directory, fileInfo, context, stats, offset, length, blobCacheByteRange, ByteRange.EMPTY); + this(logger, name, directory, fileInfo, context, stats, offset, length, blobCacheByteRange, ByteRange.EMPTY, false); } protected BaseSearchableSnapshotIndexInput( Logger logger, - String resourceDesc, + String name, SearchableSnapshotDirectory directory, FileInfo fileInfo, IOContext context, @@ -81,9 +82,11 @@ protected BaseSearchableSnapshotIndexInput( long offset, long length, ByteRange headerBlobCacheByteRange, - ByteRange footerBlobCacheByteRange + ByteRange footerBlobCacheByteRange, + boolean isCompoundFile ) { - super(resourceDesc, context); + super(name, context); + this.name = Objects.requireNonNull(name); this.logger = Objects.requireNonNull(logger); this.directory = Objects.requireNonNull(directory); this.blobContainer = Objects.requireNonNull(directory.blobContainer()); @@ -93,11 +96,11 @@ protected BaseSearchableSnapshotIndexInput( : "this method should only be used with blobs that are NOT stored in metadata's hash field " + "(fileInfo: " + fileInfo + ')'; this.headerBlobCacheByteRange = Objects.requireNonNull(headerBlobCacheByteRange); this.footerBlobCacheByteRange = Objects.requireNonNull(footerBlobCacheByteRange); - this.isCompoundFile = IndexFileNames.matchesExtension(fileInfo.physicalName(), "cfs"); this.stats = Objects.requireNonNull(stats); this.offset = offset; this.length = length; this.closed = new AtomicBoolean(false); + this.isCompoundFile = isCompoundFile; this.isClone = false; } @@ -178,47 +181,6 @@ protected ByteRange maybeReadFromBlobCache(long position, int length) { return ByteRange.EMPTY; } - /** - * Computes the appropriate {@link ByteRange}s to put in blob store cache for the header and footer of slices created from CFS files. - * - * @param sliceName the name of the slice. Luckily Lucene passes the exact file name when slicing a CFS file. - * @param sliceOffset the offset of the slice in the CFS file - * @param sliceLength the length of the slice in the CFS file - * - * @return a {@link Tuple} of {@link ByteRange} whose first element is the header byte range and second element the footer byte range. - */ - protected Tuple getBlobCacheByteRangesForSlice(String sliceName, long sliceOffset, long sliceLength) { - if (this.isCompoundFile && this.isClone == false) { - // slices created from .cfs index input can have header/footer in the blob store cache - final ByteRange headerByteRange = directory.getBlobCacheByteRange(sliceName, sliceLength).withOffset(sliceOffset); - if (headerByteRange.length() == sliceLength) { - return Tuple.tuple(headerByteRange, ByteRange.EMPTY); - } else { - final ByteRange footerByteRange = ByteRange.of(sliceLength - CodecUtil.footerLength(), sliceLength).withOffset(sliceOffset); - return Tuple.tuple(headerByteRange, footerByteRange); - } - } else { - return Tuple.tuple(this.headerBlobCacheByteRange, this.footerBlobCacheByteRange); - } - } - - /** - * This method detects read operations during shard recovery that operate on the header or the footer of a slice IndexInput created - * from a Lucene CFS file. In such case we want to bypass the disk based cache to force the creation of the blob document in blob cache. - */ - protected boolean isReadFromCompoundFileDuringRecovery(long position, int length) { - if (isCompoundFile == false || directory.isRecoveryFinalized() || isClone == false) { - return false; - } - final ByteRange blobCacheByteRange = maybeReadFromBlobCache(position, length); - if (blobCacheByteRange == ByteRange.EMPTY) { - // we are reading from a slice created from a CFS file, but the read does not fall into a range of bytes that should be cached - // in the blob store cache, so we don't need to force an update of the cached blob. - return false; - } - return true; - } - /** * Opens an {@link InputStream} for the given range of bytes which reads the data directly from the blob store. If the requested range * spans multiple blobs then this stream will request them in turn. diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java index 912d87c4f0279..4e985eaf8db3b 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java @@ -406,6 +406,7 @@ public IndexInput openInput(final String name, final IOContext context) throws I if (useCache && isExcludedFromCache(name) == false) { if (partial) { return new FrozenIndexInput( + name, this, fileInfo, context, @@ -415,6 +416,7 @@ public IndexInput openInput(final String name, final IOContext context) throws I ); } else { return new CachedBlobContainerIndexInput( + name, this, fileInfo, context, @@ -424,7 +426,7 @@ public IndexInput openInput(final String name, final IOContext context) throws I ); } } else { - return new DirectBlobContainerIndexInput(this, fileInfo, context, inputStats, getUncachedChunkSize(), bufferSize(context)); + return new DirectBlobContainerIndexInput(name, this, fileInfo, context, inputStats, getUncachedChunkSize(), bufferSize(context)); } } diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInput.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInput.java index 35a9f59870920..756bf2ddccbb4 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInput.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInput.java @@ -66,6 +66,7 @@ public class CachedBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn private long lastSeekPosition; public CachedBlobContainerIndexInput( + String name, SearchableSnapshotDirectory directory, FileInfo fileInfo, IOContext context, @@ -74,7 +75,7 @@ public CachedBlobContainerIndexInput( int recoveryRangeSize ) { this( - "CachedBlobContainerIndexInput(" + fileInfo.physicalName() + ")", + name, directory, fileInfo, context, @@ -84,7 +85,7 @@ public CachedBlobContainerIndexInput( new CacheFileReference(directory, fileInfo.physicalName(), fileInfo.length()), rangeSize, recoveryRangeSize, - directory.getBlobCacheByteRange(fileInfo.physicalName(), fileInfo.length()), + directory.getBlobCacheByteRange(name, fileInfo.length()), ByteRange.EMPTY ); assert getBufferSize() <= BlobStoreCacheService.DEFAULT_CACHED_BLOB_SIZE; // must be able to cache at least one buffer's worth @@ -92,7 +93,7 @@ public CachedBlobContainerIndexInput( } private CachedBlobContainerIndexInput( - String resourceDesc, + String name, SearchableSnapshotDirectory directory, FileInfo fileInfo, IOContext context, @@ -107,7 +108,7 @@ private CachedBlobContainerIndexInput( ) { super( logger, - resourceDesc, + name, directory, fileInfo, context, @@ -115,7 +116,8 @@ private CachedBlobContainerIndexInput( offset, length, headerBlobCacheByteRange, - footerBlobCacheByteRange + footerBlobCacheByteRange, + false ); this.cacheFileReference = cacheFileReference; this.lastReadPosition = this.offset; @@ -155,22 +157,18 @@ protected void doReadInternal(ByteBuffer b) throws IOException { final CacheFile cacheFile = cacheFileReference.get(); // Can we serve the read directly from disk? If so, do so and don't worry about anything else. - if (isReadFromCompoundFileDuringRecovery(position, length) == false) { - final Future waitingForRead = cacheFile.readIfAvailableOrPending( - ByteRange.of(position, position + length), - chan -> { - final int read = readCacheFile(chan, position, b); - assert read == length : read + " vs " + length; - return read; - } - ); - if (waitingForRead != null) { - final Integer read = waitingForRead.get(); - assert read == length; - readComplete(position, length); - return; - } + final Future waitingForRead = cacheFile.readIfAvailableOrPending(ByteRange.of(position, position + length), chan -> { + final int read = readCacheFile(chan, position, b); + assert read == length : read + " vs " + length; + return read; + }); + + if (waitingForRead != null) { + final Integer read = waitingForRead.get(); + assert read == length; + readComplete(position, length); + return; } // Requested data is not on disk, so try the cache index next. @@ -580,11 +578,11 @@ public CachedBlobContainerIndexInput clone() { } @Override - public IndexInput slice(String sliceDescription, long sliceOffset, long sliceLength) { + public IndexInput slice(String sliceName, long sliceOffset, long sliceLength) { if (sliceOffset < 0 || sliceLength < 0 || sliceOffset + sliceLength > length()) { throw new IllegalArgumentException( "slice() " - + sliceDescription + + sliceName + " out of bounds: offset=" + sliceOffset + ",length=" @@ -595,21 +593,19 @@ public IndexInput slice(String sliceDescription, long sliceOffset, long sliceLen + this ); } - final long offset = this.offset + sliceOffset; - final Tuple blobCacheByteRanges = getBlobCacheByteRangesForSlice(sliceDescription, offset, sliceLength); final CachedBlobContainerIndexInput slice = new CachedBlobContainerIndexInput( - getFullSliceDescription(sliceDescription), + sliceName, directory, fileInfo, context, stats, - offset, + this.offset + sliceOffset, sliceLength, cacheFileReference, defaultRangeSize, recoveryRangeSize, - blobCacheByteRanges.v1(), - blobCacheByteRanges.v2() + headerBlobCacheByteRange, + footerBlobCacheByteRange ); slice.isClone = true; return slice; diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/FrozenIndexInput.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/FrozenIndexInput.java index dac2db97db407..e26ffe09bf57a 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/FrozenIndexInput.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/FrozenIndexInput.java @@ -10,6 +10,8 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.lucene.codecs.CodecUtil; +import org.apache.lucene.index.IndexFileNames; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; @@ -20,7 +22,6 @@ import org.elasticsearch.blobstore.cache.BlobStoreCacheService; import org.elasticsearch.blobstore.cache.CachedBlob; import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo; @@ -35,6 +36,8 @@ import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; import java.util.Locale; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -59,6 +62,7 @@ public class FrozenIndexInput extends BaseSearchableSnapshotIndexInput { private long lastSeekPosition; public FrozenIndexInput( + String name, SearchableSnapshotDirectory directory, FileInfo fileInfo, IOContext context, @@ -67,25 +71,26 @@ public FrozenIndexInput( int recoveryRangeSize ) { this( - "FrozenIndexInput(" + fileInfo.physicalName() + ")", + name, directory, fileInfo, context, stats, 0L, fileInfo.length(), - directory.getFrozenCacheFile(fileInfo.physicalName(), fileInfo.length()), + directory.getFrozenCacheFile(name, fileInfo.length()), rangeSize, recoveryRangeSize, - directory.getBlobCacheByteRange(fileInfo.physicalName(), fileInfo.length()), - ByteRange.EMPTY + directory.getBlobCacheByteRange(name, fileInfo.length()), + ByteRange.EMPTY, + false ); assert getBufferSize() <= BlobStoreCacheService.DEFAULT_CACHED_BLOB_SIZE; // must be able to cache at least one buffer's worth stats.incrementOpenCount(); } private FrozenIndexInput( - String resourceDesc, + String name, SearchableSnapshotDirectory directory, FileInfo fileInfo, IOContext context, @@ -96,11 +101,12 @@ private FrozenIndexInput( int rangeSize, int recoveryRangeSize, ByteRange headerBlobCacheByteRange, - ByteRange footerBlobCacheByteRange + ByteRange footerBlobCacheByteRange, + boolean isCompoundFile ) { super( logger, - resourceDesc, + name, directory, fileInfo, context, @@ -108,7 +114,8 @@ private FrozenIndexInput( offset, length, headerBlobCacheByteRange, - footerBlobCacheByteRange + footerBlobCacheByteRange, + isCompoundFile ); this.frozenCacheFile = frozenCacheFile; this.lastReadPosition = this.offset; @@ -128,17 +135,23 @@ private long getDefaultRangeSize() { : fileInfo.partSize().getBytes(); } - private ByteRange computeRange(long position) { + private ByteRange computeRange(long position, long length) { final long rangeSize = getDefaultRangeSize(); long start = (position / rangeSize) * rangeSize; - long end = Math.min(start + rangeSize, fileInfo.length()); + long end = Math.min(start + rangeSize, length); return ByteRange.of(start, end); } @Override protected void doReadInternal(ByteBuffer b) throws IOException { ensureContext(ctx -> ctx != CACHE_WARMING_CONTEXT); - final long position = getAbsolutePosition(); + final long position; + if (isCompoundFile) { + position = getFilePointer(); + assert isClone; + } else { + position = getAbsolutePosition(); + } final int length = b.remaining(); final ReentrantReadWriteLock luceneByteBufLock = new ReentrantReadWriteLock(); @@ -155,39 +168,29 @@ protected void doReadInternal(ByteBuffer b) throws IOException { } }; + final String fileName = frozenCacheFile.getCacheKey().getFileName(); logger.trace("readInternal: read [{}-{}] ([{}] bytes) from [{}]", position, position + length, length, this); try { // Can we serve the read directly from disk? If so, do so and don't worry about anything else. - if (isReadFromCompoundFileDuringRecovery(position, length) == false) { - final StepListener waitingForRead = frozenCacheFile.readIfAvailableOrPending( - ByteRange.of(position, position + length), - (channel, pos, relativePos, len) -> { - final int read = readCacheFile( - channel, - pos, - relativePos, - len, - b, - position, - true, - luceneByteBufLock, - stopAsyncReads - ); - assert read <= length : read + " vs " + length; - return read; - } - ); - if (waitingForRead != null) { - final Integer read = waitingForRead.asFuture().get(); - assert read == length; - assert luceneByteBufLock.getReadHoldCount() == 0; - preventAsyncBufferChanges.run(); - b.position(read); // mark all bytes as accounted for - readComplete(position, length); - return; + final StepListener waitingForRead = frozenCacheFile.readIfAvailableOrPending( + ByteRange.of(position, position + length), + (channel, pos, relativePos, len) -> { + final int read = readCacheFile(channel, pos, relativePos, len, b, position, true, luceneByteBufLock, stopAsyncReads); + assert read <= length : read + " vs " + length; + return read; } + ); + + if (waitingForRead != null) { + final Integer read = waitingForRead.asFuture().get(); + assert read == length; + assert luceneByteBufLock.getReadHoldCount() == 0; + preventAsyncBufferChanges.run(); + b.position(read); // mark all bytes as accounted for + readComplete(position, length); + return; } // Requested data is not on disk, so try the cache index next. @@ -195,7 +198,7 @@ protected void doReadInternal(ByteBuffer b) throws IOException { final ByteRange blobCacheByteRange = maybeReadFromBlobCache(position, length); if (blobCacheByteRange != ByteRange.EMPTY) { - final CachedBlob cachedBlob = directory.getCachedBlob(fileInfo.physicalName(), blobCacheByteRange); + final CachedBlob cachedBlob = directory.getCachedBlob(fileName, blobCacheByteRange); assert cachedBlob == CachedBlob.CACHE_MISS || cachedBlob == CachedBlob.CACHE_NOT_READY || cachedBlob.from() <= position; assert cachedBlob == CachedBlob.CACHE_MISS || cachedBlob == CachedBlob.CACHE_NOT_READY || length <= cachedBlob.length(); @@ -211,7 +214,7 @@ protected void doReadInternal(ByteBuffer b) throws IOException { logger.trace( "reading [{}] bytes of file [{}] at position [{}] using cache index", length, - fileInfo.physicalName(), + fileName, position ); stats.addIndexCacheBytesRead(cachedBlob.length()); @@ -259,7 +262,7 @@ protected void doReadInternal(ByteBuffer b) throws IOException { "copied bytes [{}-{}] of file [{}] from cache index to disk", relativePos, relativePos + len, - fileInfo + fileName ); }, directory.cacheFetchAsyncExecutor() @@ -270,7 +273,7 @@ protected void doReadInternal(ByteBuffer b) throws IOException { "failed to store bytes [{}-{}] of file [{}] obtained from index cache", cachedBlob.from(), cachedBlob.to(), - fileInfo + fileName ), e ); @@ -289,8 +292,9 @@ protected void doReadInternal(ByteBuffer b) throws IOException { // Requested data is also not in the cache index, so we must visit the blob store to satisfy both the target range and any // miss in the cache index. - final ByteRange startRangeToWrite = computeRange(position); - final ByteRange endRangeToWrite = computeRange(position + length - 1); + final long fileLength = frozenCacheFile.getLength(); + final ByteRange startRangeToWrite = computeRange(position, fileLength); + final ByteRange endRangeToWrite = computeRange(position + length - 1, fileLength); assert startRangeToWrite.end() <= endRangeToWrite.end() : startRangeToWrite + " vs " + endRangeToWrite; final ByteRange rangeToWrite = startRangeToWrite.minEnvelope(endRangeToWrite).minEnvelope(indexCacheMiss); @@ -328,6 +332,10 @@ protected void doReadInternal(ByteBuffer b) throws IOException { ); if (indexCacheMiss != null) { + + // TODO also index footer for compound files if rangeToWrite encapsulates footerBlobCacheByteRange, + // otherwise the blob cache won't be populate for the footer and only the header. + final Releasable onCacheFillComplete = stats.addIndexCacheFill(); final int indexCacheMissLength = toIntBytes(indexCacheMiss.length()); @@ -375,7 +383,7 @@ protected void doReadInternal(ByteBuffer b) throws IOException { byteBuffer.position(read); // mark all bytes as accounted for byteBuffer.flip(); final BytesReference content = BytesReference.fromByteBuffer(byteBuffer); - directory.putCachedBlob(fileInfo.physicalName(), indexCacheMiss.start(), content, new ActionListener<>() { + directory.putCachedBlob(fileName, indexCacheMiss.start(), content, new ActionListener<>() { @Override public void onResponse(Void response) { onCacheFillComplete.close(); @@ -417,8 +425,10 @@ private void readComplete(long position, int length) { lastSeekPosition = lastReadPosition; } - private int readDirectlyIfAlreadyClosed(long position, ByteBuffer b, Exception e) throws IOException { + private int readDirectlyIfAlreadyClosed(long pos, ByteBuffer b, Exception e) throws IOException { if (e instanceof AlreadyClosedException || (e.getCause() != null && e.getCause() instanceof AlreadyClosedException)) { + // we need to compute a the global position for compound files + final long position = pos + (isCompoundFile ? this.offset : 0L); try { // cache file was evicted during the range fetching, read bytes directly from blob container final long length = b.remaining(); @@ -598,7 +608,8 @@ private void writeCacheFile( long bytesCopied = 0L; long remaining = length; final long startTimeNanos = stats.currentTimeNanos(); - try (InputStream input = openInputStreamFromBlobStore(logicalPos + relativePos, length)) { + // we need to compute the global position for compound files + try (InputStream input = openInputStreamFromBlobStore(logicalPos + relativePos + (isCompoundFile ? this.offset : 0L), length)) { while (remaining > 0L) { final int bytesRead = readSafe(input, copyBuffer, relativePos, end, remaining, frozenCacheFile); positionalWrite(fc, fileChannelPos + bytesCopied, ByteBuffer.wrap(copyBuffer, 0, bytesRead)); @@ -619,7 +630,7 @@ protected void seekInternal(long pos) throws IOException { } else if (pos < 0L) { throw new IOException("Seeking to negative position [" + pos + "] for " + toString()); } - final long position = pos + this.offset; + final long position = isCompoundFile ? pos : pos + this.offset; stats.incrementSeeks(lastSeekPosition, position); lastSeekPosition = position; } @@ -630,11 +641,11 @@ public FrozenIndexInput clone() { } @Override - public IndexInput slice(String sliceDescription, long sliceOffset, long sliceLength) { + public IndexInput slice(String sliceName, long sliceOffset, long sliceLength) { if (sliceOffset < 0 || sliceLength < 0 || sliceOffset + sliceLength > length()) { throw new IllegalArgumentException( "slice() " - + sliceDescription + + sliceName + " out of bounds: offset=" + sliceOffset + ",length=" @@ -645,21 +656,46 @@ public IndexInput slice(String sliceDescription, long sliceOffset, long sliceLen + this ); } - final long offset = this.offset + sliceOffset; - final Tuple blobCacheByteRanges = getBlobCacheByteRangesForSlice(sliceDescription, offset, sliceLength); + + // Are we creating a slice from a CFS file? + boolean sliceCompoundFile = IndexFileNames.matchesExtension(name, "cfs") + && IndexFileNames.getExtension(sliceName) != null + && isCompoundFile == false + && isClone == false; + + final FrozenCacheFile sliceFrozenCacheFile; + final ByteRange sliceHeaderByteRange; + final ByteRange sliceFooterByteRange; + + if (sliceCompoundFile) { + sliceFrozenCacheFile = directory.getFrozenCacheFile(sliceName, sliceLength); + sliceHeaderByteRange = directory.getBlobCacheByteRange(sliceName, sliceLength); + if (sliceHeaderByteRange.length() < sliceLength) { + sliceFooterByteRange = ByteRange.of(sliceLength - CodecUtil.footerLength(), sliceLength); + } else { + sliceFooterByteRange = ByteRange.EMPTY; + } + } else { + sliceFrozenCacheFile = this.frozenCacheFile; + sliceCompoundFile = this.isCompoundFile; // fix this + sliceHeaderByteRange = ByteRange.EMPTY; + sliceFooterByteRange = ByteRange.EMPTY; + } + final FrozenIndexInput slice = new FrozenIndexInput( - getFullSliceDescription(sliceDescription), + sliceName, directory, fileInfo, context, stats, - offset, + this.offset + sliceOffset, sliceLength, - frozenCacheFile, + sliceFrozenCacheFile, defaultRangeSize, recoveryRangeSize, - blobCacheByteRanges.v1(), - blobCacheByteRanges.v2() + sliceHeaderByteRange, + sliceFooterByteRange, + sliceCompoundFile ); slice.isClone = true; return slice; diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/direct/DirectBlobContainerIndexInput.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/direct/DirectBlobContainerIndexInput.java index d732c6f036bf0..ff6c7875ae829 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/direct/DirectBlobContainerIndexInput.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/direct/DirectBlobContainerIndexInput.java @@ -69,6 +69,7 @@ public class DirectBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn private static final int COPY_BUFFER_SIZE = 8192; public DirectBlobContainerIndexInput( + String name, SearchableSnapshotDirectory directory, FileInfo fileInfo, IOContext context, @@ -77,7 +78,7 @@ public DirectBlobContainerIndexInput( int bufferSize ) { this( - "DirectBlobContainerIndexInput(" + fileInfo.physicalName() + ")", + name, directory, fileInfo, context, @@ -92,7 +93,7 @@ public DirectBlobContainerIndexInput( } private DirectBlobContainerIndexInput( - String resourceDesc, + String name, SearchableSnapshotDirectory directory, FileInfo fileInfo, IOContext context, @@ -103,7 +104,7 @@ private DirectBlobContainerIndexInput( long sequentialReadSize, int bufferSize ) { - super(logger, resourceDesc, directory, fileInfo, context, stats, offset, length, ByteRange.EMPTY); // TODO should use blob cache + super(logger, name, directory, fileInfo, context, stats, offset, length, ByteRange.EMPTY); // TODO should use blob cache this.position = position; assert sequentialReadSize >= 0; this.sequentialReadSize = sequentialReadSize; @@ -290,10 +291,10 @@ public DirectBlobContainerIndexInput clone() { } @Override - public IndexInput slice(String sliceDescription, long offset, long length) throws IOException { + public IndexInput slice(String sliceName, long offset, long length) throws IOException { if ((offset >= 0L) && (length >= 0L) && (offset + length <= length())) { final DirectBlobContainerIndexInput slice = new DirectBlobContainerIndexInput( - getFullSliceDescription(sliceDescription), + sliceName, directory, fileInfo, context, @@ -312,7 +313,7 @@ public IndexInput slice(String sliceDescription, long offset, long length) throw } else { throw new IllegalArgumentException( "slice() " - + sliceDescription + + sliceName + " out of bounds: offset=" + offset + ",length=" diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/ByteRange.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/ByteRange.java index 2f051740394ce..165634f5e0978 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/ByteRange.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/ByteRange.java @@ -69,10 +69,6 @@ public boolean contains(long start, long end) { return start() <= start && end <= end(); } - public ByteRange withOffset(long offset) { - return ByteRange.of(Math.addExact(start(), offset), Math.addExact(end(), offset)); - } - @Override public int hashCode() { return 31 * Long.hashCode(start) + Long.hashCode(end); diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/FrozenCacheService.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/FrozenCacheService.java index b26e2157cff1b..adf464a87dc68 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/FrozenCacheService.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/FrozenCacheService.java @@ -707,6 +707,10 @@ public long getLength() { return length; } + public CacheKey getCacheKey() { + return cacheKey; + } + public StepListener populateAndRead( final ByteRange rangeToWrite, final ByteRange rangeToRead, diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/direct/DirectBlobContainerIndexInputTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/direct/DirectBlobContainerIndexInputTests.java index f24137c7e09f0..f44a345b76bfb 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/direct/DirectBlobContainerIndexInputTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/direct/DirectBlobContainerIndexInputTests.java @@ -128,6 +128,7 @@ public int read(byte[] b, int off, int len) throws IOException { when(directory.blobContainer()).thenReturn(blobContainer); final DirectBlobContainerIndexInput indexInput = new DirectBlobContainerIndexInput( + fileName, directory, fileInfo, randomIOContext(), From 7018845ae832a8085c587cf30d8c033251bc71a2 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Thu, 4 Mar 2021 13:49:09 +0100 Subject: [PATCH 3/6] fix battle --- .../index/store/cache/FrozenIndexInput.java | 79 +++++++++---------- 1 file changed, 36 insertions(+), 43 deletions(-) diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/FrozenIndexInput.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/FrozenIndexInput.java index e26ffe09bf57a..3efa26fc109be 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/FrozenIndexInput.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/FrozenIndexInput.java @@ -36,8 +36,6 @@ import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.List; import java.util.Locale; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -61,6 +59,8 @@ public class FrozenIndexInput extends BaseSearchableSnapshotIndexInput { // last seek position is kept around in order to detect forward/backward seeks for stats private long lastSeekPosition; + private long physicalOffset; + public FrozenIndexInput( String name, SearchableSnapshotDirectory directory, @@ -130,28 +130,20 @@ public void doClose() { } private long getDefaultRangeSize() { - return (context != CACHE_WARMING_CONTEXT) - ? (directory.isRecoveryFinalized() ? defaultRangeSize : recoveryRangeSize) - : fileInfo.partSize().getBytes(); + return directory.isRecoveryFinalized() ? defaultRangeSize : recoveryRangeSize; } - private ByteRange computeRange(long position, long length) { + private ByteRange computeRange(long position) { final long rangeSize = getDefaultRangeSize(); long start = (position / rangeSize) * rangeSize; - long end = Math.min(start + rangeSize, length); + long end = Math.min(start + rangeSize, frozenCacheFile.getLength()); return ByteRange.of(start, end); } @Override protected void doReadInternal(ByteBuffer b) throws IOException { ensureContext(ctx -> ctx != CACHE_WARMING_CONTEXT); - final long position; - if (isCompoundFile) { - position = getFilePointer(); - assert isClone; - } else { - position = getAbsolutePosition(); - } + final long position = getFilePointer() + this.offset; final int length = b.remaining(); final ReentrantReadWriteLock luceneByteBufLock = new ReentrantReadWriteLock(); @@ -292,9 +284,8 @@ protected void doReadInternal(ByteBuffer b) throws IOException { // Requested data is also not in the cache index, so we must visit the blob store to satisfy both the target range and any // miss in the cache index. - final long fileLength = frozenCacheFile.getLength(); - final ByteRange startRangeToWrite = computeRange(position, fileLength); - final ByteRange endRangeToWrite = computeRange(position + length - 1, fileLength); + final ByteRange startRangeToWrite = computeRange(position); + final ByteRange endRangeToWrite = computeRange(position + length - 1); assert startRangeToWrite.end() <= endRangeToWrite.end() : startRangeToWrite + " vs " + endRangeToWrite; final ByteRange rangeToWrite = startRangeToWrite.minEnvelope(endRangeToWrite).minEnvelope(indexCacheMiss); @@ -425,10 +416,8 @@ private void readComplete(long position, int length) { lastSeekPosition = lastReadPosition; } - private int readDirectlyIfAlreadyClosed(long pos, ByteBuffer b, Exception e) throws IOException { + private int readDirectlyIfAlreadyClosed(long position, ByteBuffer b, Exception e) throws IOException { if (e instanceof AlreadyClosedException || (e.getCause() != null && e.getCause() instanceof AlreadyClosedException)) { - // we need to compute a the global position for compound files - final long position = pos + (isCompoundFile ? this.offset : 0L); try { // cache file was evicted during the range fetching, read bytes directly from blob container final long length = b.remaining(); @@ -444,7 +433,7 @@ private int readDirectlyIfAlreadyClosed(long pos, ByteBuffer b, Exception e) thr int bytesCopied = 0; final long startTimeNanos = stats.currentTimeNanos(); - try (InputStream input = openInputStreamFromBlobStore(position, length)) { + try (InputStream input = openInputStreamFromBlobStore(position + this.physicalOffset, length)) { long remaining = length; while (remaining > 0) { final int len = (remaining < copyBuffer.length) ? (int) remaining : copyBuffer.length; @@ -608,8 +597,7 @@ private void writeCacheFile( long bytesCopied = 0L; long remaining = length; final long startTimeNanos = stats.currentTimeNanos(); - // we need to compute the global position for compound files - try (InputStream input = openInputStreamFromBlobStore(logicalPos + relativePos + (isCompoundFile ? this.offset : 0L), length)) { + try (InputStream input = openInputStreamFromBlobStore(logicalPos + relativePos + this.physicalOffset, length)) { while (remaining > 0L) { final int bytesRead = readSafe(input, copyBuffer, relativePos, end, remaining, frozenCacheFile); positionalWrite(fc, fileChannelPos + bytesCopied, ByteBuffer.wrap(copyBuffer, 0, bytesRead)); @@ -630,7 +618,7 @@ protected void seekInternal(long pos) throws IOException { } else if (pos < 0L) { throw new IOException("Seeking to negative position [" + pos + "] for " + toString()); } - final long position = isCompoundFile ? pos : pos + this.offset; + final long position = pos + this.offset; stats.incrementSeeks(lastSeekPosition, position); lastSeekPosition = position; } @@ -658,28 +646,32 @@ public IndexInput slice(String sliceName, long sliceOffset, long sliceLength) { } // Are we creating a slice from a CFS file? - boolean sliceCompoundFile = IndexFileNames.matchesExtension(name, "cfs") + final boolean sliceCompoundFile = IndexFileNames.matchesExtension(name, "cfs") && IndexFileNames.getExtension(sliceName) != null - && isCompoundFile == false - && isClone == false; + && isCompoundFile == false; - final FrozenCacheFile sliceFrozenCacheFile; - final ByteRange sliceHeaderByteRange; - final ByteRange sliceFooterByteRange; + final long offset; + final long physicalOffset; + final FrozenCacheFile frozenCacheFile; + final ByteRange headerBlobCacheByteRange; + final ByteRange footerBlobCacheByteRange; if (sliceCompoundFile) { - sliceFrozenCacheFile = directory.getFrozenCacheFile(sliceName, sliceLength); - sliceHeaderByteRange = directory.getBlobCacheByteRange(sliceName, sliceLength); - if (sliceHeaderByteRange.length() < sliceLength) { - sliceFooterByteRange = ByteRange.of(sliceLength - CodecUtil.footerLength(), sliceLength); + offset = 0L; + physicalOffset = this.physicalOffset + this.offset + sliceOffset; + frozenCacheFile = directory.getFrozenCacheFile(sliceName, sliceLength); + headerBlobCacheByteRange = directory.getBlobCacheByteRange(sliceName, sliceLength); + if (headerBlobCacheByteRange.length() < sliceLength) { + footerBlobCacheByteRange = ByteRange.of(sliceLength - CodecUtil.footerLength(), sliceLength); } else { - sliceFooterByteRange = ByteRange.EMPTY; + footerBlobCacheByteRange = ByteRange.EMPTY; } } else { - sliceFrozenCacheFile = this.frozenCacheFile; - sliceCompoundFile = this.isCompoundFile; // fix this - sliceHeaderByteRange = ByteRange.EMPTY; - sliceFooterByteRange = ByteRange.EMPTY; + offset = this.offset + sliceOffset; + physicalOffset = this.physicalOffset; + frozenCacheFile = this.frozenCacheFile; + headerBlobCacheByteRange = ByteRange.EMPTY; + footerBlobCacheByteRange = ByteRange.EMPTY; } final FrozenIndexInput slice = new FrozenIndexInput( @@ -688,16 +680,17 @@ public IndexInput slice(String sliceName, long sliceOffset, long sliceLength) { fileInfo, context, stats, - this.offset + sliceOffset, + offset, sliceLength, - sliceFrozenCacheFile, + frozenCacheFile, defaultRangeSize, recoveryRangeSize, - sliceHeaderByteRange, - sliceFooterByteRange, + headerBlobCacheByteRange, + footerBlobCacheByteRange, sliceCompoundFile ); slice.isClone = true; + slice.physicalOffset = physicalOffset; return slice; } From 83973170b2119e7a1d9c5774707e068a43f00049 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Thu, 4 Mar 2021 15:45:51 +0100 Subject: [PATCH 4/6] apply fix --- .../lucene/store/ESIndexInputTestCase.java | 2 +- ...ableSnapshotsBlobStoreCacheIntegTests.java | 94 +++++++++++---- .../BaseSearchableSnapshotIndexInput.java | 35 +----- .../store/SearchableSnapshotDirectory.java | 10 +- .../cache/CachedBlobContainerIndexInput.java | 23 +--- .../index/store/cache/FrozenIndexInput.java | 112 +++++++++--------- .../direct/DirectBlobContainerIndexInput.java | 24 +--- .../cache/FrozenCacheService.java | 2 +- 8 files changed, 148 insertions(+), 154 deletions(-) diff --git a/test/framework/src/main/java/org/elasticsearch/common/lucene/store/ESIndexInputTestCase.java b/test/framework/src/main/java/org/elasticsearch/common/lucene/store/ESIndexInputTestCase.java index d03df1d2e6682..73d7f22730d7d 100644 --- a/test/framework/src/main/java/org/elasticsearch/common/lucene/store/ESIndexInputTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/common/lucene/store/ESIndexInputTestCase.java @@ -50,7 +50,7 @@ protected byte[] randomReadAndSlice(IndexInput indexInput, int length) throws IO int readPos = (int) indexInput.getFilePointer(); byte[] output = new byte[length]; while (readPos < length) { - switch (randomIntBetween(0, 3)) { + switch (randomIntBetween(0, 5)) { case 0: // Read by one byte at a time output[readPos++] = indexInput.readByte(); diff --git a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/blobstore/cache/SearchableSnapshotsBlobStoreCacheIntegTests.java b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/blobstore/cache/SearchableSnapshotsBlobStoreCacheIntegTests.java index 5e23b2dea1b2f..55276f284098a 100644 --- a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/blobstore/cache/SearchableSnapshotsBlobStoreCacheIntegTests.java +++ b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/blobstore/cache/SearchableSnapshotsBlobStoreCacheIntegTests.java @@ -36,7 +36,6 @@ import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.snapshots.SnapshotsService; import org.elasticsearch.test.InternalTestCluster; -import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.xpack.cluster.routing.allocation.DataTierAllocationDecider; import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotRequest.Storage; @@ -115,7 +114,6 @@ protected Settings nodeSettings(int nodeOrdinal) { return Settings.builder().put(super.nodeSettings(nodeOrdinal)).put(cacheSettings).build(); } - @TestLogging(reason = "code", value = "org.elasticsearch.blobstore.cache:TRACE") public void testBlobStoreCache() throws Exception { final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); createIndex(indexName); @@ -156,6 +154,7 @@ public void testBlobStoreCache() throws Exception { () -> systemClient().admin().indices().prepareGetIndex().addIndices(SNAPSHOT_BLOB_CACHE_INDEX).get() ); + // TODO randomize this with FULL_COPY too when cold tier also handle blob cache for footers final Storage storage = Storage.SHARED_CACHE; logger.info( "--> mount snapshot [{}] as an index for the first time [storage={}, max length={}]", @@ -215,6 +214,62 @@ public void testBlobStoreCache() throws Exception { ); } + logger.info("--> verifying number of documents in index [{}]", restoredIndex); + assertHitCount(client().prepareSearch(restoredIndex).setSize(0).setTrackTotalHits(true).get(), numberOfDocs); + assertAcked(client().admin().indices().prepareDelete(restoredIndex)); + + assertBusy(() -> { + refreshSystemIndex(); + assertThat( + systemClient().prepareSearch(SNAPSHOT_BLOB_CACHE_INDEX).setSize(0).get().getHits().getTotalHits().value, + greaterThan(0L) + ); + }); + + logger.info("--> mount snapshot [{}] as an index for the second time [storage={}]", snapshot, storage); + final String restoredIndexSecondTime = randomBoolean() ? indexName : randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + mountSnapshot( + repositoryName, + snapshot.getName(), + indexName, + restoredIndexSecondTime, + Settings.builder() + .put(SearchableSnapshots.SNAPSHOT_CACHE_ENABLED_SETTING.getKey(), true) + .put(SearchableSnapshots.SNAPSHOT_CACHE_PREWARM_ENABLED_SETTING.getKey(), false) + .put(SearchableSnapshots.SNAPSHOT_BLOB_CACHE_METADATA_FILES_MAX_LENGTH, blobCacheMaxLength) + .build(), + storage + ); + ensureGreen(restoredIndexSecondTime); + + // wait for all async cache fills to complete + assertBusy(() -> { + for (final SearchableSnapshotShardStats shardStats : client().execute( + SearchableSnapshotsStatsAction.INSTANCE, + new SearchableSnapshotsStatsRequest() + ).actionGet().getStats()) { + for (final SearchableSnapshotShardStats.CacheIndexInputStats indexInputStats : shardStats.getStats()) { + assertThat(Strings.toString(indexInputStats), indexInputStats.getCurrentIndexCacheFills(), equalTo(0L)); + } + } + }); + + logger.info("--> verifying cached documents in system index [{}]", SNAPSHOT_BLOB_CACHE_INDEX); + if (numberOfDocs > 0) { + ensureYellow(SNAPSHOT_BLOB_CACHE_INDEX); + refreshSystemIndex(); + + logger.info("--> verifying system index [{}] data tiers preference", SNAPSHOT_BLOB_CACHE_INDEX); + assertThat( + systemClient().admin() + .indices() + .prepareGetSettings(SNAPSHOT_BLOB_CACHE_INDEX) + .get() + .getSetting(SNAPSHOT_BLOB_CACHE_INDEX, DataTierAllocationDecider.INDEX_ROUTING_PREFER), + equalTo(DATA_TIERS_CACHE_INDEX_PREFERENCE) + ); + } + final long numberOfCachedBlobs = systemClient().prepareSearch(SNAPSHOT_BLOB_CACHE_INDEX) .setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN) .get() @@ -231,25 +286,15 @@ public void testBlobStoreCache() throws Exception { .getIndexing(); final long numberOfCacheWrites = indexingStats != null ? indexingStats.getTotal().getIndexCount() : 0L; - logger.info("--> verifying number of documents in index [{}]", restoredIndex); - assertHitCount(client().prepareSearch(restoredIndex).setSize(0).setTrackTotalHits(true).get(), numberOfDocs); - assertAcked(client().admin().indices().prepareDelete(restoredIndex)); - - assertBusy(() -> { - refreshSystemIndex(); - assertThat( - systemClient().prepareSearch(SNAPSHOT_BLOB_CACHE_INDEX).setSize(0).get().getHits().getTotalHits().value, - greaterThan(0L) - ); - }); + assertAcked(client().admin().indices().prepareDelete(restoredIndexSecondTime)); - logger.info("--> mount snapshot [{}] as an index for the second time [storage={}]", snapshot, storage); - final String restoredAgainIndex = randomBoolean() ? indexName : randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + logger.info("--> mount snapshot [{}] as an index for the third time [storage={}]", snapshot, storage); + final String restoredIndexThirdTime = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); mountSnapshot( repositoryName, snapshot.getName(), indexName, - restoredAgainIndex, + restoredIndexThirdTime, Settings.builder() .put(SearchableSnapshots.SNAPSHOT_CACHE_ENABLED_SETTING.getKey(), true) .put(SearchableSnapshots.SNAPSHOT_CACHE_PREWARM_ENABLED_SETTING.getKey(), false) @@ -257,9 +302,9 @@ public void testBlobStoreCache() throws Exception { .build(), storage ); - ensureGreen(restoredAgainIndex); + ensureGreen(restoredIndexThirdTime); - logger.info("--> verifying shards of [{}] were started without using the blob store more than necessary", restoredAgainIndex); + logger.info("--> verifying shards of [{}] were started without using the blob store more than necessary", restoredIndexThirdTime); for (final SearchableSnapshotShardStats shardStats : client().execute( SearchableSnapshotsStatsAction.INSTANCE, new SearchableSnapshotsStatsRequest() @@ -269,13 +314,14 @@ public void testBlobStoreCache() throws Exception { } } - logger.info("--> verifying number of documents in index [{}]", restoredAgainIndex); - assertHitCount(client().prepareSearch(restoredAgainIndex).setSize(0).setTrackTotalHits(true).get(), numberOfDocs); + logger.info("--> verifying number of documents in index [{}]", restoredIndexThirdTime); + assertHitCount(client().prepareSearch(restoredIndexThirdTime).setSize(0).setTrackTotalHits(true).get(), numberOfDocs); logger.info("--> verifying that no extra cached blobs were indexed [{}]", SNAPSHOT_BLOB_CACHE_INDEX); if (numberOfDocs > 0) { refreshSystemIndex(); } + assertHitCount( systemClient().prepareSearch(SNAPSHOT_BLOB_CACHE_INDEX).setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN).setSize(0).get(), numberOfCachedBlobs @@ -301,9 +347,9 @@ public Settings onNodeStopped(String nodeName) throws Exception { .build(); } }); - ensureGreen(restoredAgainIndex); + ensureGreen(restoredIndexThirdTime); - logger.info("--> shards of [{}] should start without downloading bytes from the blob store", restoredAgainIndex); + logger.info("--> shards of [{}] should start without downloading bytes from the blob store", restoredIndexThirdTime); for (final SearchableSnapshotShardStats shardStats : client().execute( SearchableSnapshotsStatsAction.INSTANCE, new SearchableSnapshotsStatsRequest() @@ -329,8 +375,8 @@ public Settings onNodeStopped(String nodeName) throws Exception { .getIndexing(); assertThat(indexingStats != null ? indexingStats.getTotal().getIndexCount() : 0L, equalTo(0L)); - logger.info("--> verifying number of documents in index [{}]", restoredAgainIndex); - assertHitCount(client().prepareSearch(restoredAgainIndex).setSize(0).setTrackTotalHits(true).get(), numberOfDocs); + logger.info("--> verifying number of documents in index [{}]", restoredIndexThirdTime); + assertHitCount(client().prepareSearch(restoredIndexThirdTime).setSize(0).setTrackTotalHits(true).get(), numberOfDocs); // TODO also test when the index is frozen // TODO also test when prewarming is enabled diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/BaseSearchableSnapshotIndexInput.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/BaseSearchableSnapshotIndexInput.java index d4d49adfd5dd0..46d7ef60adc60 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/BaseSearchableSnapshotIndexInput.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/BaseSearchableSnapshotIndexInput.java @@ -8,11 +8,9 @@ import org.apache.logging.log4j.Logger; import org.apache.lucene.codecs.CodecUtil; -import org.apache.lucene.index.IndexFileNames; import org.apache.lucene.store.BufferedIndexInput; import org.apache.lucene.store.IOContext; import org.elasticsearch.common.blobstore.BlobContainer; -import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo; import org.elasticsearch.index.snapshots.blobstore.SlicedInputStream; import org.elasticsearch.threadpool.ThreadPool; @@ -46,14 +44,6 @@ public abstract class BaseSearchableSnapshotIndexInput extends BufferedIndexInpu */ protected final ByteRange headerBlobCacheByteRange; - /** - * Range of bytes that should be cached in the blob cache for the current index input's footer. This footer byte range should only be - * required for slices of CFS files; regular files already have their footers extracted from the {@link FileInfo} (see method - * {@link #maybeReadChecksumFromFileInfo}). - */ - protected final ByteRange footerBlobCacheByteRange; - protected final boolean isCompoundFile; - // the following are only mutable so they can be adjusted after cloning/slicing protected volatile boolean isClone; private AtomicBoolean closed; @@ -68,22 +58,6 @@ public BaseSearchableSnapshotIndexInput( long offset, long length, ByteRange blobCacheByteRange - ) { - this(logger, name, directory, fileInfo, context, stats, offset, length, blobCacheByteRange, ByteRange.EMPTY, false); - } - - protected BaseSearchableSnapshotIndexInput( - Logger logger, - String name, - SearchableSnapshotDirectory directory, - FileInfo fileInfo, - IOContext context, - IndexInputStats stats, - long offset, - long length, - ByteRange headerBlobCacheByteRange, - ByteRange footerBlobCacheByteRange, - boolean isCompoundFile ) { super(name, context); this.name = Objects.requireNonNull(name); @@ -94,13 +68,11 @@ protected BaseSearchableSnapshotIndexInput( this.context = Objects.requireNonNull(context); assert fileInfo.metadata().hashEqualsContents() == false : "this method should only be used with blobs that are NOT stored in metadata's hash field " + "(fileInfo: " + fileInfo + ')'; - this.headerBlobCacheByteRange = Objects.requireNonNull(headerBlobCacheByteRange); - this.footerBlobCacheByteRange = Objects.requireNonNull(footerBlobCacheByteRange); + this.headerBlobCacheByteRange = Objects.requireNonNull(blobCacheByteRange); this.stats = Objects.requireNonNull(stats); this.offset = offset; this.length = length; this.closed = new AtomicBoolean(false); - this.isCompoundFile = isCompoundFile; this.isClone = false; } @@ -172,11 +144,8 @@ private boolean maybeReadChecksumFromFileInfo(ByteBuffer b) throws IOException { } protected ByteRange maybeReadFromBlobCache(long position, int length) { - final long end = position + length; - if (headerBlobCacheByteRange.contains(position, end)) { + if (headerBlobCacheByteRange.contains(position, position + length)) { return headerBlobCacheByteRange; - } else if (footerBlobCacheByteRange.contains(position, end)) { - return footerBlobCacheByteRange; } return ByteRange.EMPTY; } diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java index 4e985eaf8db3b..3cb7c192b5fce 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java @@ -426,7 +426,15 @@ public IndexInput openInput(final String name, final IOContext context) throws I ); } } else { - return new DirectBlobContainerIndexInput(name, this, fileInfo, context, inputStats, getUncachedChunkSize(), bufferSize(context)); + return new DirectBlobContainerIndexInput( + name, + this, + fileInfo, + context, + inputStats, + getUncachedChunkSize(), + bufferSize(context) + ); } } diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInput.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInput.java index 756bf2ddccbb4..8a59010884a3d 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInput.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInput.java @@ -85,8 +85,7 @@ public CachedBlobContainerIndexInput( new CacheFileReference(directory, fileInfo.physicalName(), fileInfo.length()), rangeSize, recoveryRangeSize, - directory.getBlobCacheByteRange(name, fileInfo.length()), - ByteRange.EMPTY + directory.getBlobCacheByteRange(name, fileInfo.length()) ); assert getBufferSize() <= BlobStoreCacheService.DEFAULT_CACHED_BLOB_SIZE; // must be able to cache at least one buffer's worth stats.incrementOpenCount(); @@ -103,22 +102,9 @@ private CachedBlobContainerIndexInput( CacheFileReference cacheFileReference, int rangeSize, int recoveryRangeSize, - ByteRange headerBlobCacheByteRange, - ByteRange footerBlobCacheByteRange + ByteRange headerBlobCacheByteRange ) { - super( - logger, - name, - directory, - fileInfo, - context, - stats, - offset, - length, - headerBlobCacheByteRange, - footerBlobCacheByteRange, - false - ); + super(logger, name, directory, fileInfo, context, stats, offset, length, headerBlobCacheByteRange); this.cacheFileReference = cacheFileReference; this.lastReadPosition = this.offset; this.lastSeekPosition = this.offset; @@ -604,8 +590,7 @@ public IndexInput slice(String sliceName, long sliceOffset, long sliceLength) { cacheFileReference, defaultRangeSize, recoveryRangeSize, - headerBlobCacheByteRange, - footerBlobCacheByteRange + headerBlobCacheByteRange ); slice.isClone = true; return slice; diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/FrozenIndexInput.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/FrozenIndexInput.java index 3efa26fc109be..fdc4ff2678a38 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/FrozenIndexInput.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/FrozenIndexInput.java @@ -37,6 +37,7 @@ import java.io.InputStream; import java.nio.ByteBuffer; import java.util.Locale; +import java.util.Objects; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Consumer; @@ -54,13 +55,24 @@ public class FrozenIndexInput extends BaseSearchableSnapshotIndexInput { private final int defaultRangeSize; private final int recoveryRangeSize; + /** + * If > 0, represents a logical file within a compound (CFS) file or is a slice thereof represents the offset of the logical + * compound file within the physical CFS file + */ + private final long compoundFileOffset; + + /** + * Range of bytes that should be cached in the blob cache for the current index input's footer. This footer byte range should only be + * required for slices of CFS files; regular files already have their footers extracted from the {@link FileInfo} (see method + * {@link BaseSearchableSnapshotIndexInput#maybeReadChecksumFromFileInfo}). + */ + private final ByteRange footerBlobCacheByteRange; + // last read position is kept around in order to detect (non)contiguous reads for stats private long lastReadPosition; // last seek position is kept around in order to detect forward/backward seeks for stats private long lastSeekPosition; - private long physicalOffset; - public FrozenIndexInput( String name, SearchableSnapshotDirectory directory, @@ -77,13 +89,13 @@ public FrozenIndexInput( context, stats, 0L, + 0L, fileInfo.length(), directory.getFrozenCacheFile(name, fileInfo.length()), rangeSize, recoveryRangeSize, directory.getBlobCacheByteRange(name, fileInfo.length()), - ByteRange.EMPTY, - false + ByteRange.EMPTY ); assert getBufferSize() <= BlobStoreCacheService.DEFAULT_CACHED_BLOB_SIZE; // must be able to cache at least one buffer's worth stats.incrementOpenCount(); @@ -96,32 +108,23 @@ private FrozenIndexInput( IOContext context, IndexInputStats stats, long offset, + long compoundFileOffset, long length, FrozenCacheFile frozenCacheFile, int rangeSize, int recoveryRangeSize, ByteRange headerBlobCacheByteRange, - ByteRange footerBlobCacheByteRange, - boolean isCompoundFile + ByteRange footerBlobCacheByteRange ) { - super( - logger, - name, - directory, - fileInfo, - context, - stats, - offset, - length, - headerBlobCacheByteRange, - footerBlobCacheByteRange, - isCompoundFile - ); + super(logger, name, directory, fileInfo, context, stats, offset, length, headerBlobCacheByteRange); this.frozenCacheFile = frozenCacheFile; this.lastReadPosition = this.offset; this.lastSeekPosition = this.offset; this.defaultRangeSize = rangeSize; this.recoveryRangeSize = recoveryRangeSize; + this.footerBlobCacheByteRange = Objects.requireNonNull(footerBlobCacheByteRange); + this.compoundFileOffset = compoundFileOffset; + assert offset >= compoundFileOffset; } @Override @@ -143,7 +146,7 @@ private ByteRange computeRange(long position) { @Override protected void doReadInternal(ByteBuffer b) throws IOException { ensureContext(ctx -> ctx != CACHE_WARMING_CONTEXT); - final long position = getFilePointer() + this.offset; + final long position = getAbsolutePosition() - compoundFileOffset; final int length = b.remaining(); final ReentrantReadWriteLock luceneByteBufLock = new ReentrantReadWriteLock(); @@ -203,12 +206,7 @@ protected void doReadInternal(ByteBuffer b) throws IOException { // We must fill in a cache miss even if CACHE_NOT_READY since the cache index is only created on the first put. // TODO TBD use a different trigger for creating the cache index and avoid a put in the CACHE_NOT_READY case. } else { - logger.trace( - "reading [{}] bytes of file [{}] at position [{}] using cache index", - length, - fileName, - position - ); + logger.trace("reading [{}] bytes of file [{}] at position [{}] using cache index", length, fileName, position); stats.addIndexCacheBytesRead(cachedBlob.length()); preventAsyncBufferChanges.run(); @@ -433,7 +431,7 @@ private int readDirectlyIfAlreadyClosed(long position, ByteBuffer b, Exception e int bytesCopied = 0; final long startTimeNanos = stats.currentTimeNanos(); - try (InputStream input = openInputStreamFromBlobStore(position + this.physicalOffset, length)) { + try (InputStream input = openInputStreamFromBlobStore(position + compoundFileOffset, length)) { long remaining = length; while (remaining > 0) { final int len = (remaining < copyBuffer.length) ? (int) remaining : copyBuffer.length; @@ -466,6 +464,17 @@ private int readDirectlyIfAlreadyClosed(long position, ByteBuffer b, Exception e throw new IOException("failed to read data from cache", e); } + @Override + protected ByteRange maybeReadFromBlobCache(long position, int length) { + final long end = position + length; + if (headerBlobCacheByteRange.contains(position, end)) { + return headerBlobCacheByteRange; + } else if (footerBlobCacheByteRange.contains(position, end)) { + return footerBlobCacheByteRange; + } + return ByteRange.EMPTY; + } + private static int positionalWrite(SharedBytes.IO fc, long start, ByteBuffer byteBuffer) throws IOException { assert assertCurrentThreadMayWriteCacheFile(); return fc.write(byteBuffer, start); @@ -597,7 +606,7 @@ private void writeCacheFile( long bytesCopied = 0L; long remaining = length; final long startTimeNanos = stats.currentTimeNanos(); - try (InputStream input = openInputStreamFromBlobStore(logicalPos + relativePos + this.physicalOffset, length)) { + try (InputStream input = openInputStreamFromBlobStore(logicalPos + relativePos + compoundFileOffset, length)) { while (remaining > 0L) { final int bytesRead = readSafe(input, copyBuffer, relativePos, end, remaining, frozenCacheFile); positionalWrite(fc, fileChannelPos + bytesCopied, ByteBuffer.wrap(copyBuffer, 0, bytesRead)); @@ -618,7 +627,7 @@ protected void seekInternal(long pos) throws IOException { } else if (pos < 0L) { throw new IOException("Seeking to negative position [" + pos + "] for " + toString()); } - final long position = pos + this.offset; + final long position = pos + this.offset - compoundFileOffset; stats.incrementSeeks(lastSeekPosition, position); lastSeekPosition = position; } @@ -648,30 +657,28 @@ public IndexInput slice(String sliceName, long sliceOffset, long sliceLength) { // Are we creating a slice from a CFS file? final boolean sliceCompoundFile = IndexFileNames.matchesExtension(name, "cfs") && IndexFileNames.getExtension(sliceName) != null - && isCompoundFile == false; + && compoundFileOffset == 0L // not already a compound file + && isClone == false; // tests aggressively clone and slice - final long offset; - final long physicalOffset; - final FrozenCacheFile frozenCacheFile; - final ByteRange headerBlobCacheByteRange; - final ByteRange footerBlobCacheByteRange; + final FrozenCacheFile sliceFrozenCacheFile; + final ByteRange sliceHeaderByteRange; + final ByteRange sliceFooterByteRange; + final long sliceCompoundFileOffset; if (sliceCompoundFile) { - offset = 0L; - physicalOffset = this.physicalOffset + this.offset + sliceOffset; - frozenCacheFile = directory.getFrozenCacheFile(sliceName, sliceLength); - headerBlobCacheByteRange = directory.getBlobCacheByteRange(sliceName, sliceLength); - if (headerBlobCacheByteRange.length() < sliceLength) { - footerBlobCacheByteRange = ByteRange.of(sliceLength - CodecUtil.footerLength(), sliceLength); + sliceCompoundFileOffset = this.offset + sliceOffset; + sliceFrozenCacheFile = directory.getFrozenCacheFile(sliceName, sliceLength); + sliceHeaderByteRange = directory.getBlobCacheByteRange(sliceName, sliceLength); + if (sliceHeaderByteRange.length() < sliceLength) { + sliceFooterByteRange = ByteRange.of(sliceLength - CodecUtil.footerLength(), sliceLength); } else { - footerBlobCacheByteRange = ByteRange.EMPTY; + sliceFooterByteRange = ByteRange.EMPTY; } } else { - offset = this.offset + sliceOffset; - physicalOffset = this.physicalOffset; - frozenCacheFile = this.frozenCacheFile; - headerBlobCacheByteRange = ByteRange.EMPTY; - footerBlobCacheByteRange = ByteRange.EMPTY; + sliceCompoundFileOffset = this.compoundFileOffset; + sliceFrozenCacheFile = this.frozenCacheFile; + sliceHeaderByteRange = ByteRange.EMPTY; + sliceFooterByteRange = ByteRange.EMPTY; } final FrozenIndexInput slice = new FrozenIndexInput( @@ -680,17 +687,16 @@ public IndexInput slice(String sliceName, long sliceOffset, long sliceLength) { fileInfo, context, stats, - offset, + this.offset + sliceOffset, + sliceCompoundFileOffset, sliceLength, - frozenCacheFile, + sliceFrozenCacheFile, defaultRangeSize, recoveryRangeSize, - headerBlobCacheByteRange, - footerBlobCacheByteRange, - sliceCompoundFile + sliceHeaderByteRange, + sliceFooterByteRange ); slice.isClone = true; - slice.physicalOffset = physicalOffset; return slice; } diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/direct/DirectBlobContainerIndexInput.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/direct/DirectBlobContainerIndexInput.java index ff6c7875ae829..48deca1bc29df 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/direct/DirectBlobContainerIndexInput.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/direct/DirectBlobContainerIndexInput.java @@ -77,18 +77,7 @@ public DirectBlobContainerIndexInput( long sequentialReadSize, int bufferSize ) { - this( - name, - directory, - fileInfo, - context, - stats, - 0L, - 0L, - fileInfo.length(), - sequentialReadSize, - bufferSize - ); + this(name, directory, fileInfo, context, stats, 0L, 0L, fileInfo.length(), sequentialReadSize, bufferSize); stats.incrementOpenCount(); } @@ -312,16 +301,7 @@ public IndexInput slice(String sliceName, long offset, long length) throws IOExc return slice; } else { throw new IllegalArgumentException( - "slice() " - + sliceName - + " out of bounds: offset=" - + offset - + ",length=" - + length - + ",fileLength=" - + length() - + ": " - + this + "slice() " + sliceName + " out of bounds: offset=" + offset + ",length=" + length + ",fileLength=" + length() + ": " + this ); } } diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/FrozenCacheService.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/FrozenCacheService.java index adf464a87dc68..dbe14487663c9 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/FrozenCacheService.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/FrozenCacheService.java @@ -197,7 +197,7 @@ private ByteRange mapSubRangeToRegion(ByteRange range, int region) { private long getRegionSize(long fileLength, int region) { assert fileLength > 0; final int maxRegion = getEndingRegion(fileLength); - assert region >= 0 && region <= maxRegion; + assert region >= 0 && region <= maxRegion : region + " - " + maxRegion; final long effectiveRegionSize; if (region == maxRegion && (region + 1) * regionSize != fileLength) { assert getRegionRelativePosition(fileLength) != 0L; From 77696aa78c74faac74d0a8b3d9c8f4367428da47 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Thu, 4 Mar 2021 17:52:15 +0100 Subject: [PATCH 5/6] avoid remounting 3rd time --- ...ableSnapshotsBlobStoreCacheIntegTests.java | 91 +++--------- .../index/store/cache/FrozenIndexInput.java | 130 +++++++++--------- 2 files changed, 90 insertions(+), 131 deletions(-) diff --git a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/blobstore/cache/SearchableSnapshotsBlobStoreCacheIntegTests.java b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/blobstore/cache/SearchableSnapshotsBlobStoreCacheIntegTests.java index 55276f284098a..f398b74c73f3f 100644 --- a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/blobstore/cache/SearchableSnapshotsBlobStoreCacheIntegTests.java +++ b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/blobstore/cache/SearchableSnapshotsBlobStoreCacheIntegTests.java @@ -214,62 +214,6 @@ public void testBlobStoreCache() throws Exception { ); } - logger.info("--> verifying number of documents in index [{}]", restoredIndex); - assertHitCount(client().prepareSearch(restoredIndex).setSize(0).setTrackTotalHits(true).get(), numberOfDocs); - assertAcked(client().admin().indices().prepareDelete(restoredIndex)); - - assertBusy(() -> { - refreshSystemIndex(); - assertThat( - systemClient().prepareSearch(SNAPSHOT_BLOB_CACHE_INDEX).setSize(0).get().getHits().getTotalHits().value, - greaterThan(0L) - ); - }); - - logger.info("--> mount snapshot [{}] as an index for the second time [storage={}]", snapshot, storage); - final String restoredIndexSecondTime = randomBoolean() ? indexName : randomAlphaOfLength(10).toLowerCase(Locale.ROOT); - mountSnapshot( - repositoryName, - snapshot.getName(), - indexName, - restoredIndexSecondTime, - Settings.builder() - .put(SearchableSnapshots.SNAPSHOT_CACHE_ENABLED_SETTING.getKey(), true) - .put(SearchableSnapshots.SNAPSHOT_CACHE_PREWARM_ENABLED_SETTING.getKey(), false) - .put(SearchableSnapshots.SNAPSHOT_BLOB_CACHE_METADATA_FILES_MAX_LENGTH, blobCacheMaxLength) - .build(), - storage - ); - ensureGreen(restoredIndexSecondTime); - - // wait for all async cache fills to complete - assertBusy(() -> { - for (final SearchableSnapshotShardStats shardStats : client().execute( - SearchableSnapshotsStatsAction.INSTANCE, - new SearchableSnapshotsStatsRequest() - ).actionGet().getStats()) { - for (final SearchableSnapshotShardStats.CacheIndexInputStats indexInputStats : shardStats.getStats()) { - assertThat(Strings.toString(indexInputStats), indexInputStats.getCurrentIndexCacheFills(), equalTo(0L)); - } - } - }); - - logger.info("--> verifying cached documents in system index [{}]", SNAPSHOT_BLOB_CACHE_INDEX); - if (numberOfDocs > 0) { - ensureYellow(SNAPSHOT_BLOB_CACHE_INDEX); - refreshSystemIndex(); - - logger.info("--> verifying system index [{}] data tiers preference", SNAPSHOT_BLOB_CACHE_INDEX); - assertThat( - systemClient().admin() - .indices() - .prepareGetSettings(SNAPSHOT_BLOB_CACHE_INDEX) - .get() - .getSetting(SNAPSHOT_BLOB_CACHE_INDEX, DataTierAllocationDecider.INDEX_ROUTING_PREFER), - equalTo(DATA_TIERS_CACHE_INDEX_PREFERENCE) - ); - } - final long numberOfCachedBlobs = systemClient().prepareSearch(SNAPSHOT_BLOB_CACHE_INDEX) .setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN) .get() @@ -286,15 +230,25 @@ public void testBlobStoreCache() throws Exception { .getIndexing(); final long numberOfCacheWrites = indexingStats != null ? indexingStats.getTotal().getIndexCount() : 0L; - assertAcked(client().admin().indices().prepareDelete(restoredIndexSecondTime)); + logger.info("--> verifying number of documents in index [{}]", restoredIndex); + assertHitCount(client().prepareSearch(restoredIndex).setSize(0).setTrackTotalHits(true).get(), numberOfDocs); + assertAcked(client().admin().indices().prepareDelete(restoredIndex)); + + assertBusy(() -> { + refreshSystemIndex(); + assertThat( + systemClient().prepareSearch(SNAPSHOT_BLOB_CACHE_INDEX).setSize(0).get().getHits().getTotalHits().value, + greaterThan(0L) + ); + }); - logger.info("--> mount snapshot [{}] as an index for the third time [storage={}]", snapshot, storage); - final String restoredIndexThirdTime = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + logger.info("--> mount snapshot [{}] as an index for the second time [storage={}]", snapshot, storage); + final String restoredAgainIndex = randomBoolean() ? indexName : randomAlphaOfLength(10).toLowerCase(Locale.ROOT); mountSnapshot( repositoryName, snapshot.getName(), indexName, - restoredIndexThirdTime, + restoredAgainIndex, Settings.builder() .put(SearchableSnapshots.SNAPSHOT_CACHE_ENABLED_SETTING.getKey(), true) .put(SearchableSnapshots.SNAPSHOT_CACHE_PREWARM_ENABLED_SETTING.getKey(), false) @@ -302,9 +256,9 @@ public void testBlobStoreCache() throws Exception { .build(), storage ); - ensureGreen(restoredIndexThirdTime); + ensureGreen(restoredAgainIndex); - logger.info("--> verifying shards of [{}] were started without using the blob store more than necessary", restoredIndexThirdTime); + logger.info("--> verifying shards of [{}] were started without using the blob store more than necessary", restoredAgainIndex); for (final SearchableSnapshotShardStats shardStats : client().execute( SearchableSnapshotsStatsAction.INSTANCE, new SearchableSnapshotsStatsRequest() @@ -314,14 +268,13 @@ public void testBlobStoreCache() throws Exception { } } - logger.info("--> verifying number of documents in index [{}]", restoredIndexThirdTime); - assertHitCount(client().prepareSearch(restoredIndexThirdTime).setSize(0).setTrackTotalHits(true).get(), numberOfDocs); + logger.info("--> verifying number of documents in index [{}]", restoredAgainIndex); + assertHitCount(client().prepareSearch(restoredAgainIndex).setSize(0).setTrackTotalHits(true).get(), numberOfDocs); logger.info("--> verifying that no extra cached blobs were indexed [{}]", SNAPSHOT_BLOB_CACHE_INDEX); if (numberOfDocs > 0) { refreshSystemIndex(); } - assertHitCount( systemClient().prepareSearch(SNAPSHOT_BLOB_CACHE_INDEX).setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN).setSize(0).get(), numberOfCachedBlobs @@ -347,9 +300,9 @@ public Settings onNodeStopped(String nodeName) throws Exception { .build(); } }); - ensureGreen(restoredIndexThirdTime); + ensureGreen(restoredAgainIndex); - logger.info("--> shards of [{}] should start without downloading bytes from the blob store", restoredIndexThirdTime); + logger.info("--> shards of [{}] should start without downloading bytes from the blob store", restoredAgainIndex); for (final SearchableSnapshotShardStats shardStats : client().execute( SearchableSnapshotsStatsAction.INSTANCE, new SearchableSnapshotsStatsRequest() @@ -375,8 +328,8 @@ public Settings onNodeStopped(String nodeName) throws Exception { .getIndexing(); assertThat(indexingStats != null ? indexingStats.getTotal().getIndexCount() : 0L, equalTo(0L)); - logger.info("--> verifying number of documents in index [{}]", restoredIndexThirdTime); - assertHitCount(client().prepareSearch(restoredIndexThirdTime).setSize(0).setTrackTotalHits(true).get(), numberOfDocs); + logger.info("--> verifying number of documents in index [{}]", restoredAgainIndex); + assertHitCount(client().prepareSearch(restoredAgainIndex).setSize(0).setTrackTotalHits(true).get(), numberOfDocs); // TODO also test when the index is frozen // TODO also test when prewarming is enabled diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/FrozenIndexInput.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/FrozenIndexInput.java index fdc4ff2678a38..61e83265660fa 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/FrozenIndexInput.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/FrozenIndexInput.java @@ -322,68 +322,11 @@ protected void doReadInternal(ByteBuffer b) throws IOException { if (indexCacheMiss != null) { - // TODO also index footer for compound files if rangeToWrite encapsulates footerBlobCacheByteRange, - // otherwise the blob cache won't be populate for the footer and only the header. - - final Releasable onCacheFillComplete = stats.addIndexCacheFill(); - final int indexCacheMissLength = toIntBytes(indexCacheMiss.length()); - - // We assume that we only cache small portions of blobs so that we do not need to: - // - use a BigArrays for allocation - // - use an intermediate copy buffer to read the file in sensibly-sized chunks - // - release the buffer once the indexing operation is complete - final ByteBuffer byteBuffer = ByteBuffer.allocate(indexCacheMissLength); - - final StepListener readListener = frozenCacheFile.readIfAvailableOrPending( - indexCacheMiss, - (channel, channelPos, relativePos, len) -> { - assert len <= indexCacheMissLength; - - if (len == 0) { - return 0; - } - - // create slice that is positioned to read the given values - final ByteBuffer dup = byteBuffer.duplicate(); - final int newPosition = dup.position() + Math.toIntExact(relativePos); - assert newPosition <= dup.limit() : "newpos " + newPosition + " limit " + dup.limit(); - assert newPosition + len <= byteBuffer.limit(); - dup.position(newPosition); - dup.limit(newPosition + Math.toIntExact(len)); - - final int read = channel.read(dup, channelPos); - if (read < 0) { - throw new EOFException("read past EOF. pos [" + relativePos + "] length: [" + len + "]"); - } - // NB use Channels.readFromFileChannelWithEofException not readCacheFile() to avoid counting this in the stats - assert read == len; - return read; - } - ); - - if (readListener == null) { - // Normally doesn't happen, we're already obtaining a range covering all cache misses above, but theoretically - // possible in the case that the real populateAndRead call already failed to obtain this range of the file. In that - // case, simply move on. - onCacheFillComplete.close(); - } else { - readListener.whenComplete(read -> { - assert read == indexCacheMissLength; - byteBuffer.position(read); // mark all bytes as accounted for - byteBuffer.flip(); - final BytesReference content = BytesReference.fromByteBuffer(byteBuffer); - directory.putCachedBlob(fileName, indexCacheMiss.start(), content, new ActionListener<>() { - @Override - public void onResponse(Void response) { - onCacheFillComplete.close(); - } - - @Override - public void onFailure(Exception e1) { - onCacheFillComplete.close(); - } - }); - }, e -> onCacheFillComplete.close()); + fillIndexCache(fileName, indexCacheMiss); + if (compoundFileOffset > 0 + && indexCacheMiss.equals(headerBlobCacheByteRange) + && footerBlobCacheByteRange != ByteRange.EMPTY) { + fillIndexCache(fileName, footerBlobCacheByteRange); } } @@ -408,6 +351,69 @@ public void onFailure(Exception e1) { readComplete(position, length); } + private void fillIndexCache(String fileName, ByteRange indexCacheMiss) { + final Releasable onCacheFillComplete = stats.addIndexCacheFill(); + final int indexCacheMissLength = toIntBytes(indexCacheMiss.length()); + + // We assume that we only cache small portions of blobs so that we do not need to: + // - use a BigArrays for allocation + // - use an intermediate copy buffer to read the file in sensibly-sized chunks + // - release the buffer once the indexing operation is complete + final ByteBuffer byteBuffer = ByteBuffer.allocate(indexCacheMissLength); + + final StepListener readListener = frozenCacheFile.readIfAvailableOrPending( + indexCacheMiss, + (channel, channelPos, relativePos, len) -> { + assert len <= indexCacheMissLength; + + if (len == 0) { + return 0; + } + + // create slice that is positioned to read the given values + final ByteBuffer dup = byteBuffer.duplicate(); + final int newPosition = dup.position() + Math.toIntExact(relativePos); + assert newPosition <= dup.limit() : "newpos " + newPosition + " limit " + dup.limit(); + assert newPosition + len <= byteBuffer.limit(); + dup.position(newPosition); + dup.limit(newPosition + Math.toIntExact(len)); + + final int read = channel.read(dup, channelPos); + if (read < 0) { + throw new EOFException("read past EOF. pos [" + relativePos + "] length: [" + len + "]"); + } + // NB use Channels.readFromFileChannelWithEofException not readCacheFile() to avoid counting this in the stats + assert read == len; + return read; + } + ); + + if (readListener == null) { + // Normally doesn't happen, we're already obtaining a range covering all cache misses above, but theoretically + // possible in the case that the real populateAndRead call already failed to obtain this range of the file. In that + // case, simply move on. + onCacheFillComplete.close(); + } else { + readListener.whenComplete(read -> { + assert read == indexCacheMissLength; + byteBuffer.position(read); // mark all bytes as accounted for + byteBuffer.flip(); + final BytesReference content = BytesReference.fromByteBuffer(byteBuffer); + directory.putCachedBlob(fileName, indexCacheMiss.start(), content, new ActionListener<>() { + @Override + public void onResponse(Void response) { + onCacheFillComplete.close(); + } + + @Override + public void onFailure(Exception e1) { + onCacheFillComplete.close(); + } + }); + }, e -> onCacheFillComplete.close()); + } + } + private void readComplete(long position, int length) { stats.incrementBytesRead(lastReadPosition, position, length); lastReadPosition = position + length; From 77828c94f6d8cf68c5b16f50dec94ff3c800aec9 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Thu, 4 Mar 2021 18:17:37 +0100 Subject: [PATCH 6/6] nits --- ...ableSnapshotsBlobStoreCacheIntegTests.java | 20 +++++++++++++++---- .../index/store/cache/FrozenIndexInput.java | 2 +- 2 files changed, 17 insertions(+), 5 deletions(-) diff --git a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/blobstore/cache/SearchableSnapshotsBlobStoreCacheIntegTests.java b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/blobstore/cache/SearchableSnapshotsBlobStoreCacheIntegTests.java index f398b74c73f3f..91235413e15fb 100644 --- a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/blobstore/cache/SearchableSnapshotsBlobStoreCacheIntegTests.java +++ b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/blobstore/cache/SearchableSnapshotsBlobStoreCacheIntegTests.java @@ -62,6 +62,7 @@ import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants.SNAPSHOT_BLOB_CACHE_INDEX; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; public class SearchableSnapshotsBlobStoreCacheIntegTests extends BaseSearchableSnapshotsIntegTestCase { @@ -154,8 +155,7 @@ public void testBlobStoreCache() throws Exception { () -> systemClient().admin().indices().prepareGetIndex().addIndices(SNAPSHOT_BLOB_CACHE_INDEX).get() ); - // TODO randomize this with FULL_COPY too when cold tier also handle blob cache for footers - final Storage storage = Storage.SHARED_CACHE; + final Storage storage = randomFrom(Storage.values()); logger.info( "--> mount snapshot [{}] as an index for the first time [storage={}, max length={}]", snapshot, @@ -264,7 +264,13 @@ public void testBlobStoreCache() throws Exception { new SearchableSnapshotsStatsRequest() ).actionGet().getStats()) { for (final SearchableSnapshotShardStats.CacheIndexInputStats indexInputStats : shardStats.getStats()) { - assertThat(Strings.toString(indexInputStats), indexInputStats.getBlobStoreBytesRequested().getCount(), equalTo(0L)); + assertThat( + Strings.toString(indexInputStats), + indexInputStats.getBlobStoreBytesRequested().getCount(), + storage == Storage.SHARED_CACHE ? equalTo(0L) + : indexInputStats.getFileExt().equals("cfs") ? greaterThanOrEqualTo(0L) + : equalTo(0L) + ); } } @@ -308,7 +314,13 @@ public Settings onNodeStopped(String nodeName) throws Exception { new SearchableSnapshotsStatsRequest() ).actionGet().getStats()) { for (final SearchableSnapshotShardStats.CacheIndexInputStats indexInputStats : shardStats.getStats()) { - assertThat(Strings.toString(indexInputStats), indexInputStats.getBlobStoreBytesRequested().getCount(), equalTo(0L)); + assertThat( + Strings.toString(indexInputStats), + indexInputStats.getBlobStoreBytesRequested().getCount(), + storage == Storage.SHARED_CACHE ? equalTo(0L) + : indexInputStats.getFileExt().equals("cfs") ? greaterThanOrEqualTo(0L) + : equalTo(0L) + ); } } diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/FrozenIndexInput.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/FrozenIndexInput.java index 61e83265660fa..0551202a0f513 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/FrozenIndexInput.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/FrozenIndexInput.java @@ -323,7 +323,7 @@ protected void doReadInternal(ByteBuffer b) throws IOException { if (indexCacheMiss != null) { fillIndexCache(fileName, indexCacheMiss); - if (compoundFileOffset > 0 + if (compoundFileOffset > 0L && indexCacheMiss.equals(headerBlobCacheByteRange) && footerBlobCacheByteRange != ByteRange.EMPTY) { fillIndexCache(fileName, footerBlobCacheByteRange);