Skip to content

Commit

Permalink
Use blob store cache for Lucene compound files (elastic#69861)
Browse files Browse the repository at this point in the history
The blob store cache is used to cache a variable length of the
begining of Lucene files in the .snapshot-blob-cache system
index. This is useful to speed up Lucene directory opening
during shard recovery and to limit the number of bytes
downloaded from the blob store when a searchable snapshot
shard must be rebuilt.

This commit adds support for compound files segment (.cfs)
when they are partially cached (ie, Storage.SHARED_CACHE)
so that the files they are composed of can also be cached in
the blob store cache index.

Co-Authored-By: Yannick Welsch <yannick@welsch.lu>
  • Loading branch information
tlrx and ywelsch committed Mar 4, 2021
1 parent 02cc01d commit fb0542b
Show file tree
Hide file tree
Showing 10 changed files with 291 additions and 269 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ protected byte[] randomReadAndSlice(IndexInput indexInput, int length) throws IO
case 3:
// Read using slice
len = randomIntBetween(1, length - readPos);
IndexInput slice = indexInput.slice("slice (" + readPos + ", " + len + ") of " + indexInput, readPos, len);
IndexInput slice = indexInput.slice(randomAlphaOfLength(10) + randomFileExtension(), readPos, len);
temp = randomReadAndSlice(slice, len);
// assert that position in the original input didn't change
assertEquals(readPos, indexInput.getFilePointer());
Expand Down Expand Up @@ -121,7 +121,7 @@ protected void doRun() throws Exception {
clone = indexInput.clone();
} else {
final int sliceEnd = between(readEnd, length);
clone = indexInput.slice("concurrent slice (0, " + sliceEnd + ") of " + indexInput, 0L, sliceEnd);
clone = indexInput.slice("slice" + randomAlphaOfLength(10) + randomFileExtension(), 0L, sliceEnd);
}
startLatch.countDown();
startLatch.await();
Expand Down Expand Up @@ -178,4 +178,34 @@ public void onRejection(Exception e) {
return output;
}

protected static String randomFileExtension() {
return randomFrom(
".cfe",
".cfs",
".dii",
".dim",
".doc",
".dvd",
".dvm",
".fdt",
".fdx",
".fdm",
".fnm",
".kdd",
".kdi",
".kdm",
".liv",
".nvd",
".nvm",
".pay",
".pos",
".tim",
".tip",
".tmd",
".tvd",
".tvx",
".vec",
".vem"
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.Client;
Expand All @@ -22,7 +21,6 @@
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
Expand All @@ -33,10 +31,8 @@
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.shard.IndexingStats;
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot;
import org.elasticsearch.plugins.ClusterPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.snapshots.SnapshotsService;
import org.elasticsearch.test.InternalTestCluster;
Expand All @@ -49,32 +45,26 @@
import org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants;
import org.elasticsearch.xpack.searchablesnapshots.action.SearchableSnapshotsStatsAction;
import org.elasticsearch.xpack.searchablesnapshots.action.SearchableSnapshotsStatsRequest;
import org.elasticsearch.xpack.searchablesnapshots.cache.ByteRange;
import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService;
import org.elasticsearch.xpack.searchablesnapshots.cache.FrozenCacheService;
import org.junit.AfterClass;
import org.junit.BeforeClass;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;

import static org.elasticsearch.index.mapper.MapperService.SINGLE_MAPPING_NAME;
import static org.elasticsearch.repositories.blobstore.BlobStoreRepository.INDEX_SHARD_SNAPSHOT_FORMAT;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.DATA_TIERS_CACHE_INDEX_PREFERENCE;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants.SNAPSHOT_BLOB_CACHE_INDEX;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;

public class SearchableSnapshotsBlobStoreCacheIntegTests extends BaseSearchableSnapshotsIntegTestCase {

Expand Down Expand Up @@ -117,6 +107,11 @@ protected int numberOfReplicas() {
return 0;
}

@Override
protected int numberOfShards() {
return 1;
}

@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder().put(super.nodeSettings(nodeOrdinal)).put(cacheSettings).build();
Expand Down Expand Up @@ -156,17 +151,13 @@ public void testBlobStoreCache() throws Exception {
final SnapshotId snapshot = createSnapshot(repositoryName, "test-snapshot", Collections.singletonList(indexName)).snapshotId();
assertAcked(client().admin().indices().prepareDelete(indexName));

// extract the list of blobs per shard from the snapshot directory on disk
final Map<String, BlobStoreIndexShardSnapshot> blobsInSnapshot = blobsInSnapshot(repositoryLocation, snapshot.getUUID());
assertThat("Failed to load all shard snapshot metadata files", blobsInSnapshot.size(), equalTo(numberOfShards.numPrimaries));

expectThrows(
IndexNotFoundException.class,
".snapshot-blob-cache system index should not be created yet",
() -> systemClient().admin().indices().prepareGetIndex().addIndices(SNAPSHOT_BLOB_CACHE_INDEX).get()
);

Storage storage = randomFrom(Storage.values());
final Storage storage = randomFrom(Storage.values());
logger.info(
"--> mount snapshot [{}] as an index for the first time [storage={}, max length={}]",
snapshot,
Expand Down Expand Up @@ -211,7 +202,8 @@ public void testBlobStoreCache() throws Exception {

logger.info("--> verifying cached documents in system index [{}]", SNAPSHOT_BLOB_CACHE_INDEX);
if (numberOfDocs > 0) {
assertCachedBlobsInSystemIndex(repositoryName, blobsInSnapshot);
ensureYellow(SNAPSHOT_BLOB_CACHE_INDEX);
refreshSystemIndex();

logger.info("--> verifying system index [{}] data tiers preference", SNAPSHOT_BLOB_CACHE_INDEX);
assertThat(
Expand Down Expand Up @@ -244,7 +236,14 @@ public void testBlobStoreCache() throws Exception {
assertHitCount(client().prepareSearch(restoredIndex).setSize(0).setTrackTotalHits(true).get(), numberOfDocs);
assertAcked(client().admin().indices().prepareDelete(restoredIndex));

storage = randomFrom(Storage.values());
assertBusy(() -> {
refreshSystemIndex();
assertThat(
systemClient().prepareSearch(SNAPSHOT_BLOB_CACHE_INDEX).setSize(0).get().getHits().getTotalHits().value,
greaterThan(0L)
);
});

logger.info("--> mount snapshot [{}] as an index for the second time [storage={}]", snapshot, storage);
final String restoredAgainIndex = randomBoolean() ? indexName : randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
mountSnapshot(
Expand All @@ -261,22 +260,19 @@ public void testBlobStoreCache() throws Exception {
);
ensureGreen(restoredAgainIndex);

logger.info("--> verifying cached documents (after second mount) in system index [{}]", SNAPSHOT_BLOB_CACHE_INDEX);
if (numberOfDocs > 0) {
assertCachedBlobsInSystemIndex(repositoryName, blobsInSnapshot);
}

logger.info("--> verifying shards of [{}] were started without using the blob store more than necessary", restoredAgainIndex);
for (final SearchableSnapshotShardStats shardStats : client().execute(
SearchableSnapshotsStatsAction.INSTANCE,
new SearchableSnapshotsStatsRequest()
).actionGet().getStats()) {
for (final SearchableSnapshotShardStats.CacheIndexInputStats indexInputStats : shardStats.getStats()) {
// we read the header of each file contained within the .cfs file, which could be anywhere
final boolean mayReadMoreThanHeader = indexInputStats.getFileExt().equals("cfs");
if (mayReadMoreThanHeader == false) {
assertThat(Strings.toString(indexInputStats), indexInputStats.getBlobStoreBytesRequested().getCount(), equalTo(0L));
}
assertThat(
Strings.toString(indexInputStats),
indexInputStats.getBlobStoreBytesRequested().getCount(),
storage == Storage.SHARED_CACHE ? equalTo(0L)
: indexInputStats.getFileExt().equals("cfs") ? greaterThanOrEqualTo(0L)
: equalTo(0L)
);
}
}

Expand Down Expand Up @@ -314,22 +310,19 @@ public Settings onNodeStopped(String nodeName) throws Exception {
});
ensureGreen(restoredAgainIndex);

logger.info("--> verifying cached documents (after restart) in system index [{}]", SNAPSHOT_BLOB_CACHE_INDEX);
if (numberOfDocs > 0) {
assertCachedBlobsInSystemIndex(repositoryName, blobsInSnapshot);
}

logger.info("--> shards of [{}] should start without downloading bytes from the blob store", restoredAgainIndex);
for (final SearchableSnapshotShardStats shardStats : client().execute(
SearchableSnapshotsStatsAction.INSTANCE,
new SearchableSnapshotsStatsRequest()
).actionGet().getStats()) {
for (final SearchableSnapshotShardStats.CacheIndexInputStats indexInputStats : shardStats.getStats()) {
// we read the header of each file contained within the .cfs file, which could be anywhere
final boolean mayReadMoreThanHeader = indexInputStats.getFileExt().equals("cfs");
if (mayReadMoreThanHeader == false) {
assertThat(Strings.toString(indexInputStats), indexInputStats.getBlobStoreBytesRequested().getCount(), equalTo(0L));
}
assertThat(
Strings.toString(indexInputStats),
indexInputStats.getBlobStoreBytesRequested().getCount(),
storage == Storage.SHARED_CACHE ? equalTo(0L)
: indexInputStats.getFileExt().equals("cfs") ? greaterThanOrEqualTo(0L)
: equalTo(0L)
);
}
}

Expand Down Expand Up @@ -373,61 +366,6 @@ private void refreshSystemIndex() {
}
}

/**
* Reads a repository location on disk and extracts the list of blobs for each shards
*/
private Map<String, BlobStoreIndexShardSnapshot> blobsInSnapshot(Path repositoryLocation, String snapshotId) throws IOException {
final Map<String, BlobStoreIndexShardSnapshot> blobsPerShard = new HashMap<>();
forEachFileRecursively(repositoryLocation.resolve("indices"), ((file, basicFileAttributes) -> {
final String fileName = file.getFileName().toString();
if (fileName.equals(BlobStoreRepository.SNAPSHOT_FORMAT.blobName(snapshotId))) {
blobsPerShard.put(
String.join(
"/",
snapshotId,
file.getParent().getParent().getFileName().toString(),
file.getParent().getFileName().toString()
),
INDEX_SHARD_SNAPSHOT_FORMAT.deserialize(fileName, xContentRegistry(), Streams.readFully(Files.newInputStream(file)))
);
}
}));
return Collections.unmodifiableMap(blobsPerShard);
}

private void assertCachedBlobsInSystemIndex(final String repositoryName, final Map<String, BlobStoreIndexShardSnapshot> blobsInSnapshot)
throws Exception {
final BlobStoreCacheService blobCacheService = internalCluster().getDataNodeInstance(BlobStoreCacheService.class);
assertBusy(() -> {
refreshSystemIndex();

long numberOfCachedBlobs = 0L;
for (Map.Entry<String, BlobStoreIndexShardSnapshot> blob : blobsInSnapshot.entrySet()) {
for (BlobStoreIndexShardSnapshot.FileInfo fileInfo : blob.getValue().indexFiles()) {
if (fileInfo.name().startsWith("__") == false) {
continue;
}

final String fileName = fileInfo.physicalName();
final long length = fileInfo.length();
final ByteRange expectedByteRange = blobCacheService.computeBlobCacheByteRange(fileName, length, blobCacheMaxLength);
final String path = String.join("/", repositoryName, blob.getKey(), fileName, "@" + expectedByteRange.start());

final GetResponse getResponse = systemClient().prepareGet(SNAPSHOT_BLOB_CACHE_INDEX, SINGLE_MAPPING_NAME, path).get();
assertThat("Expected cached blob [" + path + "] for blob [" + fileInfo + "]", getResponse.isExists(), is(true));
final CachedBlob cachedBlob = CachedBlob.fromSource(getResponse.getSourceAsMap());
assertThat(cachedBlob.from(), equalTo(expectedByteRange.start()));
assertThat(cachedBlob.to(), equalTo(expectedByteRange.end()));
assertThat((long) cachedBlob.length(), equalTo(expectedByteRange.length()));
numberOfCachedBlobs += 1;
}
}

refreshSystemIndex();
assertHitCount(systemClient().prepareSearch(SNAPSHOT_BLOB_CACHE_INDEX).setSize(0).get(), numberOfCachedBlobs);
});
}

/**
* This plugin declares an {@link AllocationDecider} that forces searchable snapshot shards to be allocated after
* the primary shards of the snapshot blob cache index are started. This way we can ensure that searchable snapshot
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
public abstract class BaseSearchableSnapshotIndexInput extends BufferedIndexInput {

protected final Logger logger;
protected final String name;
protected final SearchableSnapshotDirectory directory;
protected final BlobContainer blobContainer;
protected final FileInfo fileInfo;
Expand All @@ -38,16 +39,18 @@ public abstract class BaseSearchableSnapshotIndexInput extends BufferedIndexInpu
protected final long offset;
protected final long length;

/** Range of bytes that should be cached in the blob cache for the current index input **/
protected final ByteRange blobCacheByteRange;
/**
* Range of bytes that should be cached in the blob cache for the current index input's header.
*/
protected final ByteRange headerBlobCacheByteRange;

// the following are only mutable so they can be adjusted after cloning/slicing
protected volatile boolean isClone;
private AtomicBoolean closed;

public BaseSearchableSnapshotIndexInput(
Logger logger,
String resourceDesc,
String name,
SearchableSnapshotDirectory directory,
FileInfo fileInfo,
IOContext context,
Expand All @@ -56,15 +59,16 @@ public BaseSearchableSnapshotIndexInput(
long length,
ByteRange blobCacheByteRange
) {
super(resourceDesc, context);
super(name, context);
this.name = Objects.requireNonNull(name);
this.logger = Objects.requireNonNull(logger);
this.directory = Objects.requireNonNull(directory);
this.blobContainer = Objects.requireNonNull(directory.blobContainer());
this.fileInfo = Objects.requireNonNull(fileInfo);
this.context = Objects.requireNonNull(context);
assert fileInfo.metadata().hashEqualsContents() == false
: "this method should only be used with blobs that are NOT stored in metadata's hash field " + "(fileInfo: " + fileInfo + ')';
this.blobCacheByteRange = Objects.requireNonNull(blobCacheByteRange);
this.headerBlobCacheByteRange = Objects.requireNonNull(blobCacheByteRange);
this.stats = Objects.requireNonNull(stats);
this.offset = offset;
this.length = length;
Expand All @@ -77,6 +81,13 @@ public final long length() {
return length;
}

protected long getAbsolutePosition() {
final long position = getFilePointer() + this.offset;
assert position >= 0L : "absolute position is negative: " + position;
assert position <= fileInfo.length() : position + " vs " + fileInfo.length();
return position;
}

@Override
protected final void readInternal(ByteBuffer b) throws IOException {
assert assertCurrentThreadIsNotCacheFetchAsync();
Expand Down Expand Up @@ -107,7 +118,7 @@ private boolean maybeReadChecksumFromFileInfo(ByteBuffer b) throws IOException {
if (remaining > CodecUtil.footerLength()) {
return false;
}
final long position = getFilePointer() + this.offset;
final long position = getAbsolutePosition();
final long checksumPosition = fileInfo.length() - CodecUtil.footerLength();
if (position < checksumPosition) {
return false;
Expand All @@ -132,6 +143,13 @@ private boolean maybeReadChecksumFromFileInfo(ByteBuffer b) throws IOException {
return success;
}

protected ByteRange maybeReadFromBlobCache(long position, int length) {
if (headerBlobCacheByteRange.contains(position, position + length)) {
return headerBlobCacheByteRange;
}
return ByteRange.EMPTY;
}

/**
* Opens an {@link InputStream} for the given range of bytes which reads the data directly from the blob store. If the requested range
* spans multiple blobs then this stream will request them in turn.
Expand Down
Loading

0 comments on commit fb0542b

Please sign in to comment.