Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow searchable snapshot cache service to periodically fsync cache files #64696

Merged
merged 30 commits into from
Nov 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
012eee2
Periodically fsync searchable snapshots cache files
tlrx Nov 5, 2020
499f12b
Merge branch 'master' into periodic-fsync
tlrx Nov 9, 2020
31299c1
Fix spotless
tlrx Nov 9, 2020
bf0e660
Merge branch 'master' into periodic-fsync
tlrx Nov 12, 2020
9085f81
Merge branch 'master' into periodic-fsync
tlrx Nov 13, 2020
fb10c6f
use registering mechanism to sync cache files
tlrx Nov 16, 2020
f5d9559
Merge branch 'master' into periodic-fsync
tlrx Nov 16, 2020
684e01e
Use a queue
tlrx Nov 16, 2020
6b238d6
revert changes in Cache
tlrx Nov 17, 2020
eabe562
do not remove from queue on eviction
tlrx Nov 17, 2020
af47aaa
noop fsyncListener
tlrx Nov 17, 2020
437c8ea
limit iterations
tlrx Nov 17, 2020
c20bf75
assert IOE
tlrx Nov 17, 2020
365812d
min 1s
tlrx Nov 17, 2020
8fe920b
scaledRandomIntBetween(1, 120)
tlrx Nov 17, 2020
74f728f
Use runnable
tlrx Nov 17, 2020
5091a5d
mutualize FSyncTrackingFileSystemProvider
tlrx Nov 17, 2020
f69dde9
check evictions
tlrx Nov 17, 2020
395845d
use atomic long to count cache files
tlrx Nov 17, 2020
082b76d
[] + synchronized assert
tlrx Nov 18, 2020
ea34570
private
tlrx Nov 20, 2020
54d051d
provider.tearDown()
tlrx Nov 20, 2020
e8ad9be
randomPopulateAndReads
tlrx Nov 20, 2020
6066924
set interval before start
tlrx Nov 20, 2020
e9721c4
deleteIfExists
tlrx Nov 20, 2020
bd1ce13
lock & waitfor termination
tlrx Nov 20, 2020
46adb3c
Merge branch 'master' into periodic-fsync
tlrx Nov 20, 2020
04a4d77
missing close
tlrx Nov 20, 2020
db24043
Merge branch 'master' into periodic-fsync
tlrx Nov 23, 2020
10216ea
nits
tlrx Nov 23, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,12 @@ protected void closeInternal() {
**/
private final AtomicBoolean needsFsync = new AtomicBoolean();

/**
* A runnable that is executed every time the {@link #needsFsync} flag is toggled to {@code true}, which indicates that the cache file
* has been updated. See {@link #markAsNeedsFSync()} method.
*/
private final Runnable needsFsyncRunnable;

/**
* A reference counted holder for the current channel to the physical file backing this cache file instance.
* By guarding access to the file channel by ref-counting and giving the channel its own life-cycle we remove all need for
Expand Down Expand Up @@ -117,10 +123,11 @@ protected void closeInternal() {
@Nullable
private volatile FileChannelReference channelRef;

public CacheFile(String description, long length, Path file) {
public CacheFile(String description, long length, Path file, Runnable onNeedFSync) {
this.tracker = new SparseFileTracker(file.toString(), length);
this.description = Objects.requireNonNull(description);
this.file = Objects.requireNonNull(file);
this.needsFsyncRunnable = Objects.requireNonNull(onNeedFSync);
assert invariant();
}

Expand Down Expand Up @@ -320,7 +327,7 @@ protected void doRun() throws Exception {
reference.decRef();
}
gap.onCompletion();
needsFsync.set(true);
markAsNeedsFSync();
}

@Override
Expand Down Expand Up @@ -420,6 +427,15 @@ boolean needsFsync() {
return needsFsync.get();
}

/**
* Marks the current cache file as "fsync needed" and notifies the corresponding listener.
*/
private void markAsNeedsFSync() {
if (needsFsync.getAndSet(true) == false) {
needsFsyncRunnable.run();
}
}

/**
* Ensure that all ranges of data written to the cache file are written to the storage device that contains it. This method performs
* synchronization only if data has been written to the cache since the last time the method was called. If calling this method
Expand All @@ -444,12 +460,12 @@ public SortedSet<Tuple<Long, Long>> fsync() throws IOException {
assert completedRanges != null;
assert completedRanges.isEmpty() == false;

IOUtils.fsync(file, false, false); // TODO don't forget to fsync parent directory
IOUtils.fsync(file, false, false);
success = true;
return completedRanges;
} finally {
if (success == false) {
needsFsync.set(true);
markAsNeedsFSync();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will add it back on the queue, so we continually try to fsync a bad file. I saw the comments Armin made in other places about this, we can tackle this in the same follow-up.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's do that 👍

}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the compareAndSet above does not succeed, should we then perhaps fail when running tests (using an assert)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can but the existing tests assume that fsync can be executed at any time even when fsync is not needed. Those tests should be adapted as well, maybe in a follow up?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,19 @@ public CacheKey(SnapshotId snapshotId, IndexId indexId, ShardId shardId, String
this.fileName = Objects.requireNonNull(fileName);
}

SnapshotId getSnapshotId() {
public SnapshotId getSnapshotId() {
return snapshotId;
}

IndexId getIndexId() {
public IndexId getIndexId() {
return indexId;
}

ShardId getShardId() {
public ShardId getShardId() {
return shardId;
}

String getFileName() {
public String getFileName() {
return fileName;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.elasticsearch.blobstore.cache.BlobStoreCacheService;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.allocation.ExistingShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider;
Expand Down Expand Up @@ -67,6 +68,7 @@
import org.elasticsearch.xpack.searchablesnapshots.rest.RestMountSearchableSnapshotAction;
import org.elasticsearch.xpack.searchablesnapshots.rest.RestSearchableSnapshotsStatsAction;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -182,7 +184,10 @@ public List<Setting<?>> getSettings() {
SNAPSHOT_CACHE_EXCLUDED_FILE_TYPES_SETTING,
SNAPSHOT_UNCACHED_CHUNK_SIZE_SETTING,
CacheService.SNAPSHOT_CACHE_SIZE_SETTING,
CacheService.SNAPSHOT_CACHE_RANGE_SIZE_SETTING
CacheService.SNAPSHOT_CACHE_RANGE_SIZE_SETTING,
CacheService.SNAPSHOT_CACHE_SYNC_INTERVAL_SETTING,
CacheService.SNAPSHOT_CACHE_MAX_FILES_TO_SYNC_AT_ONCE_SETTING,
CacheService.SNAPSHOT_CACHE_SYNC_SHUTDOWN_TIMEOUT
);
}

Expand All @@ -200,19 +205,29 @@ public Collection<Object> createComponents(
final IndexNameExpressionResolver resolver,
final Supplier<RepositoriesService> repositoriesServiceSupplier
) {
final CacheService cacheService = new CacheService(new NodeEnvironmentCacheCleaner(nodeEnvironment), settings);
this.cacheService.set(cacheService);
final List<Object> components = new ArrayList<>();
this.repositoriesServiceSupplier = repositoriesServiceSupplier;
this.threadPool.set(threadPool);
final BlobStoreCacheService blobStoreCacheService = new BlobStoreCacheService(
clusterService,
threadPool,
client,
SNAPSHOT_BLOB_CACHE_INDEX
);
this.blobStoreCacheService.set(blobStoreCacheService);
this.failShardsListener.set(new FailShardsOnInvalidLicenseClusterListener(getLicenseState(), clusterService.getRerouteService()));
return List.of(cacheService, blobStoreCacheService);
if (DiscoveryNode.isDataNode(settings)) {
final CacheService cacheService = new CacheService(
settings,
clusterService,
threadPool,
new NodeEnvironmentCacheCleaner(nodeEnvironment)
);
this.cacheService.set(cacheService);
components.add(cacheService);
final BlobStoreCacheService blobStoreCacheService = new BlobStoreCacheService(
clusterService,
threadPool,
client,
SNAPSHOT_BLOB_CACHE_INDEX
);
this.blobStoreCacheService.set(blobStoreCacheService);
components.add(blobStoreCacheService);
}
return Collections.unmodifiableList(components);
}

@Override
Expand Down
Loading