Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow searchable snapshot cache service to periodically fsync cache files #64696

Merged
merged 30 commits into from
Nov 23, 2020
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
012eee2
Periodically fsync searchable snapshots cache files
tlrx Nov 5, 2020
499f12b
Merge branch 'master' into periodic-fsync
tlrx Nov 9, 2020
31299c1
Fix spotless
tlrx Nov 9, 2020
bf0e660
Merge branch 'master' into periodic-fsync
tlrx Nov 12, 2020
9085f81
Merge branch 'master' into periodic-fsync
tlrx Nov 13, 2020
fb10c6f
use registering mechanism to sync cache files
tlrx Nov 16, 2020
f5d9559
Merge branch 'master' into periodic-fsync
tlrx Nov 16, 2020
684e01e
Use a queue
tlrx Nov 16, 2020
6b238d6
revert changes in Cache
tlrx Nov 17, 2020
eabe562
do not remove from queue on eviction
tlrx Nov 17, 2020
af47aaa
noop fsyncListener
tlrx Nov 17, 2020
437c8ea
limit iterations
tlrx Nov 17, 2020
c20bf75
assert IOE
tlrx Nov 17, 2020
365812d
min 1s
tlrx Nov 17, 2020
8fe920b
scaledRandomIntBetween(1, 120)
tlrx Nov 17, 2020
74f728f
Use runnable
tlrx Nov 17, 2020
5091a5d
mutualize FSyncTrackingFileSystemProvider
tlrx Nov 17, 2020
f69dde9
check evictions
tlrx Nov 17, 2020
395845d
use atomic long to count cache files
tlrx Nov 17, 2020
082b76d
[] + synchronized assert
tlrx Nov 18, 2020
ea34570
private
tlrx Nov 20, 2020
54d051d
provider.tearDown()
tlrx Nov 20, 2020
e8ad9be
randomPopulateAndReads
tlrx Nov 20, 2020
6066924
set interval before start
tlrx Nov 20, 2020
e9721c4
deleteIfExists
tlrx Nov 20, 2020
bd1ce13
lock & waitfor termination
tlrx Nov 20, 2020
46adb3c
Merge branch 'master' into periodic-fsync
tlrx Nov 20, 2020
04a4d77
missing close
tlrx Nov 20, 2020
db24043
Merge branch 'master' into periodic-fsync
tlrx Nov 23, 2020
10216ea
nits
tlrx Nov 23, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions server/src/main/java/org/elasticsearch/common/cache/Cache.java
Original file line number Diff line number Diff line change
Expand Up @@ -650,6 +650,36 @@ public void remove() {
};
}

/**
* An LRU sequencing of the entries in the cache. This sequence is not protected from mutations
* to the cache (except for {@link Iterator#remove()}. The result of iteration under any other mutation is
* undefined.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is true. In fact, it looks like the lru-chain would be unsafely published with no happens-before to the reader.

I think the iteration of the entries done in this PR is thus unsafe, at least it might skip some entries.

I think the same applies to iterating over keys(), which we do a few places.

Together with other comments in this PR, this makes me think that perhaps it was easier to let CacheFile's that need fsync register with the CacheService (or a registry in between)?

*
* @return an LRU-ordered {@link Iterable} over the entries in the cache
*/
public Iterable<Tuple<K,V>> entries() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks unused now?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a drive by comment @tlrx this is still unused and can probably go away, just in case you missed it :)

I'll take a proper look at this PR in general now

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a drive by comment @tlrx this is still unused and can probably go away, just in case you missed it :)

Thanks! I'll remove it.

I'll take a proper look at this PR in general now

I was about to ping you once Henning approved the direction :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed the unused entries() method in 6b238d6

return () -> new Iterator<>() {

private final CacheIterator iterator = new CacheIterator(head);

@Override
public boolean hasNext() {
return iterator.hasNext();
}

@Override
public Tuple<K,V> next() {
final Cache.Entry<K, V> next = iterator.next();
return Tuple.tuple(next.key, next.value);
}

@Override
public void remove() {
iterator.remove();
}
};
}

private class CacheIterator implements Iterator<Entry<K, V>> {
private Entry<K, V> current;
private Entry<K, V> next;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,13 @@ protected void closeInternal() {
**/
private final AtomicBoolean needsFsync = new AtomicBoolean();

/**
* A consumer that accepts the current {@link CacheFile} instance every time the {@link #needsFsync} flag is toggled to {@code true},
* which indicates that the cache file has been updated.
* See {@link #markAsNeedsFSync()} method.
*/
private final Consumer<CacheFile> needsFsyncListener;

/**
* A reference counted holder for the current channel to the physical file backing this cache file instance.
* By guarding access to the file channel by ref-counting and giving the channel its own life-cycle we remove all need for
Expand Down Expand Up @@ -117,10 +124,11 @@ protected void closeInternal() {
@Nullable
private volatile FileChannelReference channelRef;

public CacheFile(String description, long length, Path file) {
public CacheFile(String description, long length, Path file, @Nullable Consumer<CacheFile> fsyncListener) {
this.tracker = new SparseFileTracker(file.toString(), length);
this.description = Objects.requireNonNull(description);
this.file = Objects.requireNonNull(file);
this.needsFsyncListener = fsyncListener != null ? fsyncListener : cacheFile -> {};
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we're only ever passing null here in tests, maybe cleaner to just pass the noop consumer in tests than having this conditional?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in af47aaa

assert invariant();
}

Expand Down Expand Up @@ -320,7 +328,7 @@ protected void doRun() throws Exception {
reference.decRef();
}
gap.onCompletion();
needsFsync.set(true);
markAsNeedsFSync();
}

@Override
Expand Down Expand Up @@ -420,6 +428,15 @@ boolean needsFsync() {
return needsFsync.get();
}

/**
* Marks the current cache file as "fsync needed" and notifies the corresponding listener.
*/
private void markAsNeedsFSync() {
if (needsFsync.getAndSet(true) == false) {
needsFsyncListener.accept(this);
}
}

/**
* Ensure that all ranges of data written to the cache file are written to the storage device that contains it. This method performs
* synchronization only if data has been written to the cache since the last time the method was called. If calling this method
Expand All @@ -444,12 +461,12 @@ public SortedSet<Tuple<Long, Long>> fsync() throws IOException {
assert completedRanges != null;
assert completedRanges.isEmpty() == false;

IOUtils.fsync(file, false, false); // TODO don't forget to fsync parent directory
IOUtils.fsync(file, false, false);
success = true;
return completedRanges;
} finally {
if (success == false) {
needsFsync.set(true);
markAsNeedsFSync();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will add it back on the queue, so we continually try to fsync a bad file. I saw the comments Armin made in other places about this, we can tackle this in the same follow-up.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's do that 👍

}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the compareAndSet above does not succeed, should we then perhaps fail when running tests (using an assert)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can but the existing tests assume that fsync can be executed at any time even when fsync is not needed. Those tests should be adapted as well, maybe in a follow up?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,19 @@ public CacheKey(SnapshotId snapshotId, IndexId indexId, ShardId shardId, String
this.fileName = Objects.requireNonNull(fileName);
}

SnapshotId getSnapshotId() {
public SnapshotId getSnapshotId() {
return snapshotId;
}

IndexId getIndexId() {
public IndexId getIndexId() {
return indexId;
}

ShardId getShardId() {
public ShardId getShardId() {
return shardId;
}

String getFileName() {
public String getFileName() {
return fileName;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.elasticsearch.blobstore.cache.BlobStoreCacheService;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.allocation.ExistingShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider;
Expand Down Expand Up @@ -67,6 +68,7 @@
import org.elasticsearch.xpack.searchablesnapshots.rest.RestMountSearchableSnapshotAction;
import org.elasticsearch.xpack.searchablesnapshots.rest.RestSearchableSnapshotsStatsAction;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -182,7 +184,9 @@ public List<Setting<?>> getSettings() {
SNAPSHOT_CACHE_EXCLUDED_FILE_TYPES_SETTING,
SNAPSHOT_UNCACHED_CHUNK_SIZE_SETTING,
CacheService.SNAPSHOT_CACHE_SIZE_SETTING,
CacheService.SNAPSHOT_CACHE_RANGE_SIZE_SETTING
CacheService.SNAPSHOT_CACHE_RANGE_SIZE_SETTING,
CacheService.SNAPSHOT_CACHE_SYNC_INTERVAL_SETTING,
CacheService.SNAPSHOT_CACHE_MAX_FILES_TO_SYNC_AT_ONCE_SETTING
);
}

Expand All @@ -200,19 +204,29 @@ public Collection<Object> createComponents(
final IndexNameExpressionResolver resolver,
final Supplier<RepositoriesService> repositoriesServiceSupplier
) {
final CacheService cacheService = new CacheService(new NodeEnvironmentCacheCleaner(nodeEnvironment), settings);
this.cacheService.set(cacheService);
final List<Object> components = new ArrayList<>();
this.repositoriesServiceSupplier = repositoriesServiceSupplier;
this.threadPool.set(threadPool);
final BlobStoreCacheService blobStoreCacheService = new BlobStoreCacheService(
clusterService,
threadPool,
client,
SNAPSHOT_BLOB_CACHE_INDEX
);
this.blobStoreCacheService.set(blobStoreCacheService);
this.failShardsListener.set(new FailShardsOnInvalidLicenseClusterListener(getLicenseState(), clusterService.getRerouteService()));
return List.of(cacheService, blobStoreCacheService);
if (DiscoveryNode.isDataNode(settings)) {
final CacheService cacheService = new CacheService(
settings,
clusterService,
threadPool,
new NodeEnvironmentCacheCleaner(nodeEnvironment)
);
this.cacheService.set(cacheService);
components.add(cacheService);
final BlobStoreCacheService blobStoreCacheService = new BlobStoreCacheService(
clusterService,
threadPool,
client,
SNAPSHOT_BLOB_CACHE_INDEX
);
this.blobStoreCacheService.set(blobStoreCacheService);
components.add(blobStoreCacheService);
}
return Collections.unmodifiableList(components);
}

@Override
Expand Down
Loading