Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use blob store cache for Lucene compound files #69861

Merged
merged 8 commits into from
Mar 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ protected byte[] randomReadAndSlice(IndexInput indexInput, int length) throws IO
case 3:
// Read using slice
len = randomIntBetween(1, length - readPos);
IndexInput slice = indexInput.slice("slice (" + readPos + ", " + len + ") of " + indexInput, readPos, len);
IndexInput slice = indexInput.slice(randomAlphaOfLength(10) + randomFileExtension(), readPos, len);
temp = randomReadAndSlice(slice, len);
// assert that position in the original input didn't change
assertEquals(readPos, indexInput.getFilePointer());
Expand Down Expand Up @@ -121,7 +121,7 @@ protected void doRun() throws Exception {
clone = indexInput.clone();
} else {
final int sliceEnd = between(readEnd, length);
clone = indexInput.slice("concurrent slice (0, " + sliceEnd + ") of " + indexInput, 0L, sliceEnd);
clone = indexInput.slice("slice" + randomAlphaOfLength(10) + randomFileExtension(), 0L, sliceEnd);
}
startLatch.countDown();
startLatch.await();
Expand Down Expand Up @@ -178,4 +178,34 @@ public void onRejection(Exception e) {
return output;
}

protected static String randomFileExtension() {
return randomFrom(
".cfe",
".cfs",
".dii",
".dim",
".doc",
".dvd",
".dvm",
".fdt",
".fdx",
".fdm",
".fnm",
".kdd",
".kdi",
".kdm",
".liv",
".nvd",
".nvm",
".pay",
".pos",
".tim",
".tip",
".tmd",
".tvd",
".tvx",
".vec",
".vem"
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.Client;
Expand All @@ -22,7 +21,6 @@
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
Expand All @@ -33,10 +31,8 @@
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.shard.IndexingStats;
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot;
import org.elasticsearch.plugins.ClusterPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.snapshots.SnapshotsService;
import org.elasticsearch.test.InternalTestCluster;
Expand All @@ -49,30 +45,24 @@
import org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants;
import org.elasticsearch.xpack.searchablesnapshots.action.SearchableSnapshotsStatsAction;
import org.elasticsearch.xpack.searchablesnapshots.action.SearchableSnapshotsStatsRequest;
import org.elasticsearch.xpack.searchablesnapshots.cache.ByteRange;
import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService;
import org.elasticsearch.xpack.searchablesnapshots.cache.FrozenCacheService;
import org.junit.AfterClass;
import org.junit.BeforeClass;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;

import static org.elasticsearch.repositories.blobstore.BlobStoreRepository.INDEX_SHARD_SNAPSHOT_FORMAT;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.DATA_TIERS_CACHE_INDEX_PREFERENCE;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants.SNAPSHOT_BLOB_CACHE_INDEX;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;

public class SearchableSnapshotsBlobStoreCacheIntegTests extends BaseSearchableSnapshotsIntegTestCase {

Expand Down Expand Up @@ -115,6 +105,11 @@ protected int numberOfReplicas() {
return 0;
}

@Override
protected int numberOfShards() {
return 1;
}

@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder().put(super.nodeSettings(nodeOrdinal)).put(cacheSettings).build();
Expand Down Expand Up @@ -154,17 +149,13 @@ public void testBlobStoreCache() throws Exception {
final SnapshotId snapshot = createSnapshot(repositoryName, "test-snapshot", List.of(indexName)).snapshotId();
assertAcked(client().admin().indices().prepareDelete(indexName));

// extract the list of blobs per shard from the snapshot directory on disk
final Map<String, BlobStoreIndexShardSnapshot> blobsInSnapshot = blobsInSnapshot(repositoryLocation, snapshot.getUUID());
assertThat("Failed to load all shard snapshot metadata files", blobsInSnapshot.size(), equalTo(numberOfShards.numPrimaries));

expectThrows(
IndexNotFoundException.class,
".snapshot-blob-cache system index should not be created yet",
() -> systemClient().admin().indices().prepareGetIndex().addIndices(SNAPSHOT_BLOB_CACHE_INDEX).get()
);

Storage storage = randomFrom(Storage.values());
final Storage storage = randomFrom(Storage.values());
logger.info(
"--> mount snapshot [{}] as an index for the first time [storage={}, max length={}]",
snapshot,
Expand Down Expand Up @@ -209,7 +200,8 @@ public void testBlobStoreCache() throws Exception {

logger.info("--> verifying cached documents in system index [{}]", SNAPSHOT_BLOB_CACHE_INDEX);
if (numberOfDocs > 0) {
assertCachedBlobsInSystemIndex(repositoryName, blobsInSnapshot);
ensureYellow(SNAPSHOT_BLOB_CACHE_INDEX);
refreshSystemIndex();

logger.info("--> verifying system index [{}] data tiers preference", SNAPSHOT_BLOB_CACHE_INDEX);
assertThat(
Expand Down Expand Up @@ -242,7 +234,14 @@ public void testBlobStoreCache() throws Exception {
assertHitCount(client().prepareSearch(restoredIndex).setSize(0).setTrackTotalHits(true).get(), numberOfDocs);
assertAcked(client().admin().indices().prepareDelete(restoredIndex));

storage = randomFrom(Storage.values());
assertBusy(() -> {
refreshSystemIndex();
assertThat(
systemClient().prepareSearch(SNAPSHOT_BLOB_CACHE_INDEX).setSize(0).get().getHits().getTotalHits().value,
greaterThan(0L)
);
});

logger.info("--> mount snapshot [{}] as an index for the second time [storage={}]", snapshot, storage);
final String restoredAgainIndex = randomBoolean() ? indexName : randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
mountSnapshot(
Expand All @@ -259,22 +258,19 @@ public void testBlobStoreCache() throws Exception {
);
ensureGreen(restoredAgainIndex);

logger.info("--> verifying cached documents (after second mount) in system index [{}]", SNAPSHOT_BLOB_CACHE_INDEX);
if (numberOfDocs > 0) {
assertCachedBlobsInSystemIndex(repositoryName, blobsInSnapshot);
}

logger.info("--> verifying shards of [{}] were started without using the blob store more than necessary", restoredAgainIndex);
for (final SearchableSnapshotShardStats shardStats : client().execute(
SearchableSnapshotsStatsAction.INSTANCE,
new SearchableSnapshotsStatsRequest()
).actionGet().getStats()) {
for (final SearchableSnapshotShardStats.CacheIndexInputStats indexInputStats : shardStats.getStats()) {
// we read the header of each file contained within the .cfs file, which could be anywhere
final boolean mayReadMoreThanHeader = indexInputStats.getFileExt().equals("cfs");
if (mayReadMoreThanHeader == false) {
assertThat(Strings.toString(indexInputStats), indexInputStats.getBlobStoreBytesRequested().getCount(), equalTo(0L));
}
assertThat(
Strings.toString(indexInputStats),
indexInputStats.getBlobStoreBytesRequested().getCount(),
storage == Storage.SHARED_CACHE ? equalTo(0L)
: indexInputStats.getFileExt().equals("cfs") ? greaterThanOrEqualTo(0L)
: equalTo(0L)
);
}
}

Expand Down Expand Up @@ -312,22 +308,19 @@ public Settings onNodeStopped(String nodeName) throws Exception {
});
ensureGreen(restoredAgainIndex);

logger.info("--> verifying cached documents (after restart) in system index [{}]", SNAPSHOT_BLOB_CACHE_INDEX);
if (numberOfDocs > 0) {
assertCachedBlobsInSystemIndex(repositoryName, blobsInSnapshot);
}

logger.info("--> shards of [{}] should start without downloading bytes from the blob store", restoredAgainIndex);
for (final SearchableSnapshotShardStats shardStats : client().execute(
SearchableSnapshotsStatsAction.INSTANCE,
new SearchableSnapshotsStatsRequest()
).actionGet().getStats()) {
for (final SearchableSnapshotShardStats.CacheIndexInputStats indexInputStats : shardStats.getStats()) {
// we read the header of each file contained within the .cfs file, which could be anywhere
final boolean mayReadMoreThanHeader = indexInputStats.getFileExt().equals("cfs");
if (mayReadMoreThanHeader == false) {
assertThat(Strings.toString(indexInputStats), indexInputStats.getBlobStoreBytesRequested().getCount(), equalTo(0L));
}
assertThat(
Strings.toString(indexInputStats),
indexInputStats.getBlobStoreBytesRequested().getCount(),
storage == Storage.SHARED_CACHE ? equalTo(0L)
: indexInputStats.getFileExt().equals("cfs") ? greaterThanOrEqualTo(0L)
: equalTo(0L)
);
}
}

Expand Down Expand Up @@ -371,61 +364,6 @@ private void refreshSystemIndex() {
}
}

/**
* Reads a repository location on disk and extracts the list of blobs for each shards
*/
private Map<String, BlobStoreIndexShardSnapshot> blobsInSnapshot(Path repositoryLocation, String snapshotId) throws IOException {
final Map<String, BlobStoreIndexShardSnapshot> blobsPerShard = new HashMap<>();
forEachFileRecursively(repositoryLocation.resolve("indices"), ((file, basicFileAttributes) -> {
final String fileName = file.getFileName().toString();
if (fileName.equals(BlobStoreRepository.SNAPSHOT_FORMAT.blobName(snapshotId))) {
blobsPerShard.put(
String.join(
"/",
snapshotId,
file.getParent().getParent().getFileName().toString(),
file.getParent().getFileName().toString()
),
INDEX_SHARD_SNAPSHOT_FORMAT.deserialize(fileName, xContentRegistry(), Streams.readFully(Files.newInputStream(file)))
);
}
}));
return Map.copyOf(blobsPerShard);
}

private void assertCachedBlobsInSystemIndex(final String repositoryName, final Map<String, BlobStoreIndexShardSnapshot> blobsInSnapshot)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In a follow up I'll add tests to verify the exact cached documents for CFS and non-CFS Lucene files, but for this integration test I think it is sufficient to verify that no bytes were downloaded after the second mount.

throws Exception {
final BlobStoreCacheService blobCacheService = internalCluster().getDataNodeInstance(BlobStoreCacheService.class);
assertBusy(() -> {
refreshSystemIndex();

long numberOfCachedBlobs = 0L;
for (Map.Entry<String, BlobStoreIndexShardSnapshot> blob : blobsInSnapshot.entrySet()) {
for (BlobStoreIndexShardSnapshot.FileInfo fileInfo : blob.getValue().indexFiles()) {
if (fileInfo.name().startsWith("__") == false) {
continue;
}

final String fileName = fileInfo.physicalName();
final long length = fileInfo.length();
final ByteRange expectedByteRange = blobCacheService.computeBlobCacheByteRange(fileName, length, blobCacheMaxLength);
final String path = String.join("/", repositoryName, blob.getKey(), fileName, "@" + expectedByteRange.start());

final GetResponse getResponse = systemClient().prepareGet(SNAPSHOT_BLOB_CACHE_INDEX, path).get();
assertThat("Expected cached blob [" + path + "] for blob [" + fileInfo + "]", getResponse.isExists(), is(true));
final CachedBlob cachedBlob = CachedBlob.fromSource(getResponse.getSourceAsMap());
assertThat(cachedBlob.from(), equalTo(expectedByteRange.start()));
assertThat(cachedBlob.to(), equalTo(expectedByteRange.end()));
assertThat((long) cachedBlob.length(), equalTo(expectedByteRange.length()));
numberOfCachedBlobs += 1;
}
}

refreshSystemIndex();
assertHitCount(systemClient().prepareSearch(SNAPSHOT_BLOB_CACHE_INDEX).setSize(0).get(), numberOfCachedBlobs);
});
}

/**
* This plugin declares an {@link AllocationDecider} that forces searchable snapshot shards to be allocated after
* the primary shards of the snapshot blob cache index are started. This way we can ensure that searchable snapshot
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
public abstract class BaseSearchableSnapshotIndexInput extends BufferedIndexInput {

protected final Logger logger;
protected final String name;
protected final SearchableSnapshotDirectory directory;
protected final BlobContainer blobContainer;
protected final FileInfo fileInfo;
Expand All @@ -38,16 +39,18 @@ public abstract class BaseSearchableSnapshotIndexInput extends BufferedIndexInpu
protected final long offset;
protected final long length;

/** Range of bytes that should be cached in the blob cache for the current index input **/
protected final ByteRange blobCacheByteRange;
/**
* Range of bytes that should be cached in the blob cache for the current index input's header.
*/
protected final ByteRange headerBlobCacheByteRange;

// the following are only mutable so they can be adjusted after cloning/slicing
protected volatile boolean isClone;
private AtomicBoolean closed;

public BaseSearchableSnapshotIndexInput(
Logger logger,
String resourceDesc,
String name,
SearchableSnapshotDirectory directory,
FileInfo fileInfo,
IOContext context,
Expand All @@ -56,15 +59,16 @@ public BaseSearchableSnapshotIndexInput(
long length,
ByteRange blobCacheByteRange
) {
super(resourceDesc, context);
super(name, context);
this.name = Objects.requireNonNull(name);
this.logger = Objects.requireNonNull(logger);
this.directory = Objects.requireNonNull(directory);
this.blobContainer = Objects.requireNonNull(directory.blobContainer());
this.fileInfo = Objects.requireNonNull(fileInfo);
this.context = Objects.requireNonNull(context);
assert fileInfo.metadata().hashEqualsContents() == false
: "this method should only be used with blobs that are NOT stored in metadata's hash field " + "(fileInfo: " + fileInfo + ')';
this.blobCacheByteRange = Objects.requireNonNull(blobCacheByteRange);
this.headerBlobCacheByteRange = Objects.requireNonNull(blobCacheByteRange);
this.stats = Objects.requireNonNull(stats);
this.offset = offset;
this.length = length;
Expand All @@ -77,6 +81,13 @@ public final long length() {
return length;
}

protected long getAbsolutePosition() {
final long position = getFilePointer() + this.offset;
assert position >= 0L : "absolute position is negative: " + position;
assert position <= fileInfo.length() : position + " vs " + fileInfo.length();
return position;
}

@Override
protected final void readInternal(ByteBuffer b) throws IOException {
assert assertCurrentThreadIsNotCacheFetchAsync();
Expand Down Expand Up @@ -107,7 +118,7 @@ private boolean maybeReadChecksumFromFileInfo(ByteBuffer b) throws IOException {
if (remaining > CodecUtil.footerLength()) {
return false;
}
final long position = getFilePointer() + this.offset;
final long position = getAbsolutePosition();
final long checksumPosition = fileInfo.length() - CodecUtil.footerLength();
if (position < checksumPosition) {
return false;
Expand All @@ -132,6 +143,13 @@ private boolean maybeReadChecksumFromFileInfo(ByteBuffer b) throws IOException {
return success;
}

protected ByteRange maybeReadFromBlobCache(long position, int length) {
if (headerBlobCacheByteRange.contains(position, position + length)) {
return headerBlobCacheByteRange;
}
return ByteRange.EMPTY;
}

/**
* Opens an {@link InputStream} for the given range of bytes which reads the data directly from the blob store. If the requested range
* spans multiple blobs then this stream will request them in turn.
Expand Down
Loading