Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce index based snapshot blob cache for Searchable Snapshots #60522

Merged
merged 70 commits into from
Aug 26, 2020

Conversation

tlrx
Copy link
Member

@tlrx tlrx commented Jul 31, 2020

This pull request is a draft that introduces an index-based cache for files that are used by searchable snapshots directories.

This draft is based on a previous work that implemented the cache a the BlobContainer level whereas this pull request implements it at the IndexInput level.

Implementing the cache at a higher level brings advantages like:

  • caching a region of the file that spans over multiple blob parts is possible
  • knowing if the directory is recovering or not is straightforward (by passing the RecoveryState to the directory)
  • being able to serve footer checksums directly from memory (for non-slice index inputs)
  • being able to dectect if the read operation is executed from a slice/clone or regular index input and compute the right offset positions to hit the index based cache

In this pull request, only the first 4 Kbytes of the blobs are cached and the footer checksums are served directly from memory if we can. Every time some data are read from a CachedBlobContainerIndexInput, it first tries to decide if it should read the data from disk or from the blob cache index. The decision is taken on the basis of the shard's RecoveryState: if the recovery is not finished yet, it prefers to use the blob cache index. Then it detects if the read occurs within the first 4KB and if the file can be fully cached too. If one of these conditions is true then it tries to retrieve the first bytes from the index cache by computing an id for the cached entry based on the repository name, the file name, a path (containing the snapshot id, index id and shard id) and an offset (here, 0). In case of a hit, the read operation can be fully served from the cached blob. In case of a miss, it computes the regions of the file we would have liked to retrieve from the index cache. The regions are basically a list of the positions for the file we want to index. This list of regions is then passed to the CacheFile writing method which will take care to copy over the bytes of the region and indexes them if, by chance, they got to be read when writing a cache range on disk. Because we're interested in caching the beginning and the end of all files this is very likely to happen.

This pull request adds a new SearchableSnapshotsBlobStoreCacheIntegTests test that successively mounts a snapshot multiple times and then verifies the documents indexed in the blob cache index and the blobs read from the repository. The goal of this test is to confirm that mounting a snapshot multiple times can be executed without downloading more data from the repositories. It also tests the scenario of restarting a node (it should not download more data from the repositories but should rely on the blob cache index too). Testing the freezing and the can_match phase during search queries would also be interested.

As of today files of a searchable snapshot shards are accessed first during restore (when Lucene prunes unreferenced files), then when a new history UUID is boostrapped and finally when the Directory is opened. It would be great if the Lucene pruning and the bootstrapping could also benefit from the blob cache. In case it's not possible, we can bypass the Lucene.pruneUnreferencedFiles() and the store.bootstrapNewHistory() for searchable snapshot shard.

Also, this pull request does not handle compound files very well but I think it should be possible to adapt the behavior to also cache the header/footer of slices as well.

@DaveCTurner
Copy link
Contributor

The failure of SearchableSnapshotsBlobStoreCacheIntegTests was related to our trying to cache the whole file if it's less than twice the usual cache blob size, because in that case we don't actually read enough of the file to put it in cache anyway. I'm backing out this optimisation for now, it's not so obviously useful (we need to do two reads from the index cache anyway).

With this commit we fill in any blob index cache misses from the cache
file, which may in turn be populated from the blob store.

It also renames/comments some methods and moves some methods down the
class hierarchy to their usage site.
@DaveCTurner
Copy link
Contributor

@ywelsch this is worth a first-pass review. I'm still hunting down at least one rare test failure in which the stats aren't as expected, and I think I'm also occasionally hitting #60813. I haven't really added to the tests that Tanguy wrote yet but I'm fairly happy with the implementation.

Copy link
Contributor

@DaveCTurner DaveCTurner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For reviewers, the heart of this change is now CachedBlobContainerIndexInput#readInternal so you may like to start there.

I added a few other comments/pointers inline.

@@ -101,34 +102,6 @@ public final void close() throws IOException {

public abstract void innerClose() throws IOException;

protected InputStream openInputStream(final long position, final long length) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pushed down to the cached index input only, the other implementation doesn't use this.

@@ -149,29 +122,4 @@ protected final boolean assertCurrentThreadMayAccessBlobStore() {
return true;
}

private long getPartNumberForPosition(long position) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pushed these methods down to the cached index input only, the other implementation doesn't use them.

@@ -136,53 +146,221 @@ protected void readInternal(ByteBuffer b) throws IOException {
final long position = getFilePointer() + this.offset;
final int length = b.remaining();

int totalBytesRead = 0;
while (totalBytesRead < length) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact we (almost) never looped here, the inner read methods always satisfied the complete read. The only case where we did loop is if the read spanned two 32MB-aligned ranges, but in that case we may as well read both ranges in one go anyway so that's what we do now.

* @param position The start of the range of bytes to read, relative to the start of the corresponding Lucene file.
* @param length The number of bytes to read
*/
private InputStream openInputStreamFromBlobStore(final long position, final long length) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This (and subsequent methods) moved here from the base class.

+ '}';
}

private int readDirectly(long start, long end, ByteBuffer b) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inlined this into another method.

@ywelsch ywelsch marked this pull request as ready for review August 10, 2020 07:50
@elasticmachine
Copy link
Collaborator

Pinging @elastic/es-distributed (:Distributed/Snapshot/Restore)

@elasticmachine elasticmachine added the Team:Distributed (Obsolete) Meta label for distributed team (obsolete). Replaced by Distributed Indexing/Coordination. label Aug 10, 2020
Copy link
Contributor

@ywelsch ywelsch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @tlrx @DaveCTurner, looking very good already. I've left some minor questions / comments.

// - we're reading the first N bytes of the file
final boolean isStartOfFile = (position + length <= BlobStoreCacheService.DEFAULT_SIZE);
// - the file is small enough to be fully cached in the blob cache
final boolean canBeFullyCached = fileInfo.length() <= BlobStoreCacheService.DEFAULT_SIZE * 2;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Small preference for defining canBeFullyCached before isStartOfFile.

Also, I wonder whether the logic to cache up to 2 * DEFAULT_SIZE is truly needed (perhaps canBeFullyCached = fileInfo.length() <= BlobStoreCacheService.DEFAULT_SIZE; would work just as well in practice, and give us a simpler upper bound on the size of cached blobs?). Alternatively, we could extend this in the future, do it file-type or content-based instead?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, reordered defs in 974c095.

I don't have a great way to decide on the heuristics for what should go in the cache, so I'm deferring to @tlrx's better judgement here. There's scope for all sorts of cleverness here, such as caching the header of slices within a .cfs file.

Note that there's an assertion (on the put path) that cached blobs are no larger than the size of the copy buffer.

@DaveCTurner
Copy link
Contributor

Ok I think that addresses everything except the question of deadlocks, and has passed an overnight soak test cleanly. @tlrx would you take another look too (I can't request a review from you since you opened this PR).

Copy link
Member Author

@tlrx tlrx left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM - thanks for the hard work @DaveCTurner. I've left comments but nothing to worry about. Using stats in integration tests is a nice way to verify how the system index cache is used and the waitForRangeIfPending() is a neat way to juggle between disk cache/index cache and disk cache again.

edit: I'll update #61517 once this is merged

@@ -263,6 +302,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field("contiguous_bytes_read", getContiguousReads());
builder.field("non_contiguous_bytes_read", getNonContiguousReads());
builder.field("cached_bytes_read", getCachedBytesRead());
builder.field("index_cache_bytes_read", getIndexCacheBytesRead());
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we update the stats.yml tests with the new fields?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++ see 2003ec0

public CachedBlob get(String repository, String name, String path, long offset) {
final PlainActionFuture<CachedBlob> future = PlainActionFuture.newFuture();
getAsync(repository, name, path, offset, future);
return future.actionGet();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A possible alternative idea is to bypass the cache index for indices on which the GET would run on the SYSTEM_READ threadpool.

That's a interesting idea and I did not know about this new thread pool. This would work I think, but I'm struggling finding a good reason to allow system indices being mounted as searchable snapshots whereas I can see reasons to prevent them being slowish and read-only. I think #61517 makes sense today (with comments made there) and this is something we can revisit later.

What do you think about 5s?

Anything below 10s looks good to me.

return;
}

client.index(request, new ActionListener<>() {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should catch any exception on this and calls the listener appropriately (ActionListener.wrap() would do this for us)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++ see 62fe090.

Comment on lines +327 to +337
directory.putCachedBlob(fileInfo.physicalName(), indexCacheMiss.v1(), content, new ActionListener<>() {
@Override
public void onResponse(Void response) {
onCacheFillComplete.close();
}

@Override
public void onFailure(Exception e1) {
onCacheFillComplete.close();
}
});
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
directory.putCachedBlob(fileInfo.physicalName(), indexCacheMiss.v1(), content, new ActionListener<>() {
@Override
public void onResponse(Void response) {
onCacheFillComplete.close();
}
@Override
public void onFailure(Exception e1) {
onCacheFillComplete.close();
}
});
directory.putCachedBlob(fileInfo.physicalName(), indexCacheMiss.v1(), content, ActionListener.wrap(onCacheFillComplete::close));

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ActionListener#wrap is trappy, if onResponse throws then it calls onFailure which is effectively a double-notification. Doesn't really matter here, perhaps, but I'd rather not add another usage that needs thought when it comes time to remove it.


} finally {
totalBytesRead += bytesRead;
final int bytesRead = populateCacheFuture.get();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible to check that readFuture is effectively done in case of indexCacheMiss not null?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is possible but I don't think it's needed. It gets completed by whoever filled in the last piece of the range needed for the cache index, so is still protected from eviction, but it's completed before the call to directory.putCachedBlob returns. If it's completed exceptionally then we don't really care.

new SearchableSnapshotsStatsRequest()
).actionGet().getStats()) {
for (final SearchableSnapshotShardStats.CacheIndexInputStats indexInputStats : shardStats.getStats()) {
assertThat(Strings.toString(indexInputStats), indexInputStats.getCurrentIndexCacheFills(), equalTo(0L));
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍


logger.info("--> verifying documents in index [{}]", restoredAgainIndex);
assertHitCount(client().prepareSearch(restoredAgainIndex).setSize(0).setTrackTotalHits(true).get(), numberOfDocs);
assertHitCount(client().prepareSearch(restoredAgainIndex).setSize(0).setTrackTotalHits(true).get(), numberOfDocs);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: duplicate

Copy link
Contributor

@fcofdez fcofdez left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks for addressing all the comments/questions 👍

Copy link
Contributor

@DaveCTurner DaveCTurner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Giving this a LGTM on @tlrx's behalf since he opened it...

@DaveCTurner DaveCTurner merged commit a20ff51 into elastic:master Aug 26, 2020
DaveCTurner added a commit to DaveCTurner/elasticsearch that referenced this pull request Aug 26, 2020
If a searchable snapshot shard fails (e.g. its node leaves the cluster)
we want to be able to start it up again on a different node as quickly
as possible to avoid unnecessarily blocking or failing searches. It
isn't feasible to fully restore such shards in an acceptably short time.
In particular we would like to be able to deal with the `can_match`
phase of a search ASAP so that we can skip unnecessary waiting on shards
that may still be warming up but which are not required for the search.

This commit solves this problem by introducing a system index that holds
much of the data required to start a shard. Today(*) this means it holds
the contents of every file with size <8kB, and the first 4kB of every
other file in the shard. This system index acts as a second-level cache,
behind the first-level node-local disk cache but in front of the blob
store itself. Reading chunks from the index is slower than reading them
directly from disk, but faster than reading them from the blob store,
and is also replicated and accessible to all nodes in the cluster.

(*) the exact heuristics for what we should put into the system index
are still under investigation and may change in future.

This second-level cache is populated when we attempt to read a chunk
which is missing from both levels of cache and must therefore be read
from the blob store.

We also introduce `SearchableSnapshotsBlobStoreCacheIntegTests` which
verify that we do not hit the blob store more than necessary when
starting up a shard that we've seen before, whether due to a node
restart or because a snapshot was mounted multiple times.

Co-authored-by: David Turner <david.turner@elastic.co>
DaveCTurner added a commit that referenced this pull request Aug 27, 2020
If a searchable snapshot shard fails (e.g. its node leaves the cluster)
we want to be able to start it up again on a different node as quickly
as possible to avoid unnecessarily blocking or failing searches. It
isn't feasible to fully restore such shards in an acceptably short time.
In particular we would like to be able to deal with the `can_match`
phase of a search ASAP so that we can skip unnecessary waiting on shards
that may still be warming up but which are not required for the search.

This commit solves this problem by introducing a system index that holds
much of the data required to start a shard. Today(*) this means it holds
the contents of every file with size <8kB, and the first 4kB of every
other file in the shard. This system index acts as a second-level cache,
behind the first-level node-local disk cache but in front of the blob
store itself. Reading chunks from the index is slower than reading them
directly from disk, but faster than reading them from the blob store,
and is also replicated and accessible to all nodes in the cluster.

(*) the exact heuristics for what we should put into the system index
are still under investigation and may change in future.

This second-level cache is populated when we attempt to read a chunk
which is missing from both levels of cache and must therefore be read
from the blob store.

We also introduce `SearchableSnapshotsBlobStoreCacheIntegTests` which
verify that we do not hit the blob store more than necessary when
starting up a shard that we've seen before, whether due to a node
restart or because a snapshot was mounted multiple times.

Backport of #60522

Co-authored-by: Tanguy Leroux <tlrx.dev@gmail.com>
DaveCTurner added a commit to DaveCTurner/elasticsearch that referenced this pull request Aug 27, 2020
Adjusts versions for wire format and re-enables BWC tests
DaveCTurner added a commit that referenced this pull request Aug 27, 2020
Adjusts versions for wire format and re-enables BWC tests
tlrx added a commit that referenced this pull request Aug 31, 2020
System indices can be snapshotted and are therefore potential candidates 
to be mounted as searchable snapshot indices. As of today nothing 
prevents a snapshot to be mounted under an index name starting with . 
and this can lead to conflicting situations because searchable snapshot 
indices are read-only and Elasticsearch expects some system indices 
to be writable; because searchable snapshot indices will soon use an 
internal system index (#60522) to speed up recoveries and we should
prevent the system index to be itself a searchable snapshot index 
(leading to some deadlock situation for recovery).

This commit introduces a changes to prevent snapshots to be mounted 
as a system index.
tlrx added a commit that referenced this pull request Sep 1, 2020
System indices can be snapshotted and are therefore potential candidates 
to be mounted as searchable snapshot indices. As of today nothing 
prevents a snapshot to be mounted under an index name starting with . 
and this can lead to conflicting situations because searchable snapshot 
indices are read-only and Elasticsearch expects some system indices 
to be writable; because searchable snapshot indices will soon use an 
internal system index (#60522) to speed up recoveries and we should
prevent the system index to be itself a searchable snapshot index 
(leading to some deadlock situation for recovery).

This commit introduces a changes to prevent snapshots to be mounted 
as a system index.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
:Distributed Coordination/Snapshot/Restore Anything directly related to the `_snapshot/*` APIs >enhancement Team:Distributed (Obsolete) Meta label for distributed team (obsolete). Replaced by Distributed Indexing/Coordination. v7.10.0 v8.0.0-alpha1
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants