-
Notifications
You must be signed in to change notification settings - Fork 24.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Allow searchable snapshot cache service to periodically fsync cache files #64696
Conversation
Pinging @elastic/es-distributed (:Distributed/Snapshot/Restore) |
@@ -335,7 +335,7 @@ public void onEviction(CacheFile evictedCacheFile) { | |||
public static void assertNumberOfFSyncs(final Path path, final Matcher<Long> matcher) { | |||
final FSyncTrackingFileSystemProvider provider = (FSyncTrackingFileSystemProvider) path.getFileSystem().provider(); | |||
final AtomicLong fsyncCounter = provider.files.get(path); | |||
assertThat("File [" + path + "] was never fsynced", notNullValue()); | |||
assertThat("File [" + path + "] was never fsynced", fsyncCounter, notNullValue()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤦
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did an initial read of this and have a few comments that I think need clarification before fully reviewing this.
this.cache = CacheBuilder.<CacheKey, CacheFile>builder() | ||
.setMaximumWeight(cacheSize.getBytes()) | ||
.weigher((key, entry) -> entry.getLength()) | ||
// NORELEASE This does not immediately free space on disk, as cache file are only deleted when all index inputs | ||
// are done with reading/writing the cache file | ||
.removalListener(notification -> IOUtils.closeWhileHandlingException(() -> notification.getValue().startEviction())) | ||
.build(); | ||
|
||
if (DiscoveryNode.isDataNode(settings)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if it was better to not instantiate the CacheService
at all on non-data nodes? Having it support non-data nodes without the cacheSyncTask
seems counter intuitive (unless there is a good reason).
That would allow removing the asserts/ifs on cacheSyncTask != null
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
++ I think not instantiating it would be nice.
Mild worry on this :) <- it might be a little cumbersome to get it right due to transport actions? (haven't checked this here in detail but I remember trying to do a similar thing elsewhere and it turned out to be tricky)
assert localNode.isDataNode(); | ||
|
||
final boolean shouldSynchronize = hasSearchableSnapshotShards(clusterState, localNode.getId()); | ||
cacheSyncTask.allowReschedule.set(shouldSynchronize); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the reason we need this is because cacheSyncTask.cancel
does not promise to not reschedule until rescheduleIfNecessary
is called again?
I wonder if that is a bug we should fix in AbstractAsyncTask? Can certainly be done in a follow-up (or separate PR).
final RoutingNode routingNode = clusterState.getRoutingNodes().node(nodeId); | ||
if (routingNode != null) { | ||
for (ShardRouting shardRouting : routingNode) { | ||
if (shardRouting.active()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this could mean that we will not start fsync'ing during initialization/recovery of the first shard(s) on a starting node?
|
||
final long startTimeNanos = threadPool.relativeTimeInNanos(); | ||
for (ShardRouting shardRouting : routingNode) { | ||
if (shardRouting.active()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we also want to fsync initializing shards?
); | ||
|
||
boolean syncDirectory = false; | ||
for (Tuple<CacheKey, CacheFile> entry : cache.entries()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems like an n^2 algorithm. Since we iterate the cache entries, could we not just fsync the cache files individually as long as they are in the cache
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
++, I would also think there's no need to be tricky here. We already have all the tricky logic for handling a cache file's life-cycle in CacheFile
=> I think I like the idea of just iterating over all CacheFile
here combined with Henning's other point of having the CacheFile
register itself for fsync
in some form. That seems like it wouldn't add new complexities around synchronization and life-cycle beyond what we already have in CacheFile
?
/** | ||
* An LRU sequencing of the entries in the cache. This sequence is not protected from mutations | ||
* to the cache (except for {@link Iterator#remove()}. The result of iteration under any other mutation is | ||
* undefined. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is true. In fact, it looks like the lru-chain would be unsafely published with no happens-before to the reader.
I think the iteration of the entries done in this PR is thus unsafe, at least it might skip some entries.
I think the same applies to iterating over keys()
, which we do a few places.
Together with other comments in this PR, this makes me think that perhaps it was easier to let CacheFile
's that need fsync register with the CacheService
(or a registry in between)?
final DiscoveryNode localNode = clusterState.getNodes().getLocalNode(); | ||
assert localNode.isDataNode(); | ||
|
||
final boolean shouldSynchronize = hasSearchableSnapshotShards(clusterState, localNode.getId()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we need to couple this to cluster state updates? Instead we could either trigger on the presence of any cache files or as proposed in another comment an explicit fsync-needed registration?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I read over this quickly now after our meeting and I'm +1 on Henning's suggestions as commented on inline :)
this.cache = CacheBuilder.<CacheKey, CacheFile>builder() | ||
.setMaximumWeight(cacheSize.getBytes()) | ||
.weigher((key, entry) -> entry.getLength()) | ||
// NORELEASE This does not immediately free space on disk, as cache file are only deleted when all index inputs | ||
// are done with reading/writing the cache file | ||
.removalListener(notification -> IOUtils.closeWhileHandlingException(() -> notification.getValue().startEviction())) | ||
.build(); | ||
|
||
if (DiscoveryNode.isDataNode(settings)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
++ I think not instantiating it would be nice.
Mild worry on this :) <- it might be a little cumbersome to get it right due to transport actions? (haven't checked this here in detail but I remember trying to do a similar thing elsewhere and it turned out to be tricky)
); | ||
|
||
boolean syncDirectory = false; | ||
for (Tuple<CacheKey, CacheFile> entry : cache.entries()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
++, I would also think there's no need to be tricky here. We already have all the tricky logic for handling a cache file's life-cycle in CacheFile
=> I think I like the idea of just iterating over all CacheFile
here combined with Henning's other point of having the CacheFile
register itself for fsync
in some form. That seems like it wouldn't add new complexities around synchronization and life-cycle beyond what we already have in CacheFile
?
Thanks Henning and Armin for your valuable feedback. @henningandersen I implemented your idea of having I'd be happy if you can have another look. Let me know if it corresponds to what you had in mind. Thanks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @tlrx, this direction looks good to me. I did a quick initial read and thought I would relay my initial comments now. I will spend a bit more time on this later today or tomorrow.
* | ||
* @return an LRU-ordered {@link Iterable} over the entries in the cache | ||
*/ | ||
public Iterable<Tuple<K,V>> entries() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks unused now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a drive by comment @tlrx this is still unused and can probably go away, just in case you missed it :)
I'll take a proper look at this PR in general now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a drive by comment @tlrx this is still unused and can probably go away, just in case you missed it :)
Thanks! I'll remove it.
I'll take a proper look at this PR in general now
I was about to ping you once Henning approved the direction :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed the unused entries()
method in 6b238d6
Setting.Property.Dynamic | ||
); | ||
|
||
private static final Supplier<Set<CacheFile>> SUPPLIER_OF_CACHE_FILES_TO_SYNC = ConcurrentCollections::newConcurrentSet; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like this might as well be a method (or inlined)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed this when moving to a Queue implementation.
...-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheService.java
Outdated
Show resolved
Hide resolved
* @param cacheFile the instance that needs to be fsync | ||
*/ | ||
void onCacheFileUpdate(CacheFile cacheFile) { | ||
final boolean added = cacheFilesToSyncRef.get().add(cacheFile); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a race condition here, since we first get the set and then add to it. But the synchronizeCache
method might read the set and check that it is empty before the add is done here, risking that we miss an fsync. Same is true for the non-empty case, in that it could start iterating over the set before the add is executed.
Maybe we can use a queue instead? Since CacheFile
ensures it is only registered once, we might not need the set
semantics except when removing a cache file (where we could just iterate the queue, seems ok, could even just leave it in the queue).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're perfectly right, I'm sorry I did not catch it by myself as it is obvious. I pushed 684e01e to use a ConcurrentLinkedQueue
which should give us the right semantic.
success = true; | ||
return completedRanges; | ||
} finally { | ||
if (success == false) { | ||
needsFsync.set(true); | ||
markAsNeedsFSync(); | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the compareAndSet
above does not succeed, should we then perhaps fail when running tests (using an assert)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can but the existing tests assume that fsync can be executed at any time even when fsync is not needed. Those tests should be adapted as well, maybe in a follow up?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
Thanks @henningandersen. I've updated the code again. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks really nice already thanks Tanguy! I mainly have a question on error handling and some smaller points :)
this.tracker = new SparseFileTracker(file.toString(), length); | ||
this.description = Objects.requireNonNull(description); | ||
this.file = Objects.requireNonNull(file); | ||
this.needsFsyncListener = fsyncListener != null ? fsyncListener : cacheFile -> {}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like we're only ever passing null
here in tests, maybe cleaner to just pass the noop consumer in tests than having this conditional?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done in af47aaa
count += 1L; | ||
} | ||
} catch (Exception e) { | ||
logger.warn(() -> new ParameterizedMessage("failed to fsync cache file [{}]", cacheFilePath.getFileName()), e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we should be this heroic? I guess I could see us running into an IOException
here when exceeding the FD limit, but beyond that it seems there's very little valid reasons to keep going with a cache file after we fail to fsync
it?
Should we have a notion of forcefully dropping such a file from the cache since we can't trust it any longer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, my attention was to not add the cache file to the Lucene index if the fsync failed.
Should we have a notion of forcefully dropping such a file from the cache since we can't trust it any longer?
I think we need such a mechanism because we should also discard a cache file in case we fail to write a range in it. I think I can try to tackle this in a follow up but I expect tests to be quite complex.
IOUtils.fsync(cacheDir, true, false); | ||
logger.trace("cache directory [{}] synchronized", cacheDir); | ||
} catch (Exception e) { | ||
logger.warn(() -> new ParameterizedMessage("failed to synchronize cache directory [{}]", cacheDir), e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as the other comment, maybe even more pronounced here: what does it mean if we fail to fsync the directory? Doesn't it at least mean that we failed to fsync the file as well? (it's certainly not guaranteed to be safely persisted if it was just created on many Linux FS)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't it at least mean that we failed to fsync the file as well? (it's certainly not guaranteed to be safely persisted if it was just created on many Linux FS)
That's my understanding as well, meaning that the cache file should not be added to the Lucene index.
*/ | ||
void onCacheFileRemoval(CacheFile cacheFile) { | ||
IOUtils.closeWhileHandlingException(cacheFile::startEviction); | ||
cacheFilesToSync.remove(cacheFile); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we first remove the file from the queue and then evict to have less of a race here with concurrent fsync runs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I saw it the other way, as startEviction() should prevent more ranges to be written and then the cache file to register itself back into the queue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed the remove()
call in eabe562
*/ | ||
void onCacheFileRemoval(CacheFile cacheFile) { | ||
IOUtils.closeWhileHandlingException(cacheFile::startEviction); | ||
cacheFilesToSync.remove(cacheFile); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, calling remove
(O(n)
) on a ConcurrentLinkedQueue
feels like it may bring trouble if we're dealing with a large number of files queued up? Do we even need to do this when we could simply skip evicted files when polling the fsync queue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree - Henning also raised this point. The CacheFile#fsync()
method should return an empty set of ranges if the cache file is evicted and deleted from disk, we can rely on this to skip removed files.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed the remove()
call in eabe562
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the extra iteration, this is looking good. I added a number of smaller comments.
final Set<Path> cacheDirs = new HashSet<>(); | ||
final long startTimeNanos = threadPool.relativeTimeInNanos(); | ||
final int maxCacheFilesToSync = this.maxCacheFilesToSyncAtOnce; | ||
for (long i = 0L; i < maxCacheFilesToSync; i++) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be nice to ensure we never loop further than the set of cache files already in the queue at the beginning of this method. Can we cap the number of iterations by the size of the queue before we start any fsync'ing? It is O(n) on the linked-queue implementation, but I think that is fine.
If we just continue looping, we risk an oscillating effect of writing another block of data and then fsync'ing it multiple times, resulting in much more fsync'ing than desired.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a good suggestion, I pushed 437c8ea.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you sure this is ok? The size()
call isn't just O(n)
it also has no accuracy guarantees. Should we maybe use org.elasticsearch.common.util.concurrent.SizeBlockingQueue
here to make this a little safer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd prefer not using a BlockingQueue
at all. An alternative could be to maintain a cacheFilesToSync atomic counter that is incremented when the cache file register itself for fsync (in onCacheFileUpdate) and decremented everytime a fsync is executed. The current size would be captured before we start any fsync'ing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah right, this queue isn't blocking :) -> counter sounds good to me
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I pushed 395845d
success = true; | ||
return completedRanges; | ||
} finally { | ||
if (success == false) { | ||
needsFsync.set(true); | ||
markAsNeedsFSync(); | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
count += 1L; | ||
} | ||
} catch (Exception e) { | ||
logger.warn(() -> new ParameterizedMessage("failed to fsync cache file [{}]", cacheFilePath.getFileName()), e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we only expect IOException here. Perhaps we can assert that to ensure tests will fail?
Similar assert a few lines up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense, I added c20bf75
@@ -117,10 +124,11 @@ protected void closeInternal() { | |||
@Nullable | |||
private volatile FileChannelReference channelRef; | |||
|
|||
public CacheFile(String description, long length, Path file) { | |||
public CacheFile(String description, long length, Path file, Consumer<CacheFile> fsyncListener) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I find it nicer to just pass in a Runnable
here, to make it clear that a CacheFile
can only request an fsync of itself.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I pushed 74f728f to use a Runnable
success = true; | ||
return completedRanges; | ||
} finally { | ||
if (success == false) { | ||
needsFsync.set(true); | ||
markAsNeedsFSync(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will add it back on the queue, so we continually try to fsync a bad file. I saw the comments Armin made in other places about this, we can tackle this in the same follow-up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's do that 👍
@@ -52,40 +68,78 @@ | |||
Setting.Property.NodeScope | |||
); | |||
|
|||
public static final TimeValue MIN_SNAPSHOT_CACHE_SYNC_INTERVAL = TimeValue.timeValueSeconds(10L); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we should lower this to 1 second.
I can imagine us using this for a poor-mans rate limiter - set the interval to 1 second and number of files to sync to 10 on a spinning disk setup in case fsync'ing causes issues.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense, I pushed 365812d
private final FileSystem delegateInstance; | ||
private final Path rootDir; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These fields are unused. Seems like you intended the rootDir
to have significance?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should have been mutualized with some other tests, it is now in 5091a5d. Thanks for spotting this.
} | ||
|
||
logger.trace("--> evicting random cache files"); | ||
for (CacheKey evictedCacheKey : randomSubsetOf(Sets.union(previous.keySet(), updates.keySet()))) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be good to verify that once we evicted, we do not fsync these CacheFile
s anymore.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right - I added f69dde9
if (randomBoolean()) { | ||
cacheSettings.put( | ||
CacheService.SNAPSHOT_CACHE_SYNC_INTERVAL_SETTING.getKey(), | ||
TimeValue.timeValueSeconds(randomLongBetween(MIN_SNAPSHOT_CACHE_SYNC_INTERVAL.getSeconds(), Long.MAX_VALUE)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should bias towards smaller values, perhaps using scaledRandomInt
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I pushed 8fe920b to randomize between 1 and 120 seconds.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I went through this in more detail, looks good. I have a number of minor comments only.
...-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheService.java
Outdated
Show resolved
Hide resolved
...-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheService.java
Outdated
Show resolved
Hide resolved
...-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheService.java
Outdated
Show resolved
Hide resolved
...-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheService.java
Outdated
Show resolved
Hide resolved
...-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheService.java
Outdated
Show resolved
Hide resolved
...n/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CacheFileTests.java
Outdated
Show resolved
Hide resolved
...shots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheServiceTests.java
Show resolved
Hide resolved
...plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/TestUtils.java
Show resolved
Hide resolved
...plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/TestUtils.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks for the extra iterations @tlrx .
Setting.Property.NodeScope, | ||
Setting.Property.Dynamic | ||
); | ||
|
||
public static final Setting<TimeValue> SNAPSHOT_CACHE_SYNC_SHUTDOWN_TIMEOUT = Setting.timeSetting( | ||
SETTINGS_PREFIX + "sync.shutdown_timeout", | ||
TimeValue.timeValueSeconds(60L), // default |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps just 10 seconds default, since we only need to wait for one fsync and if it takes more than 10s to do one, we really want to continue shutting down the node anyway? The other doStop
timeouts that I found (did not search thoroughly though) are in the 10-30s range.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed.
cache.invalidateAll(); | ||
} catch (InterruptedException e) { | ||
Thread.currentThread().interrupt(); | ||
logger.warn("interrupted while waiting for cache sync lock", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need to also do cacheSyncTask.close()
and cache.invalidateAll()
in this case? Possibly better to surround the tryLock
with a separate try catch for InterruptedException
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh right, I'll surround the tryLock
for (long i = 0L; i < maxCacheFilesToSync; i++) { | ||
protected void synchronizeCache() { | ||
cacheSyncLock.lock(); | ||
try { | ||
if (lifecycleState() != Lifecycle.State.STARTED) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would prefer to keep this inside the loop to ensure we break out as soon as possible when shutting down.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok
Thanks Henning and Armin! |
…iles (elastic#64696) This committ changes the searchable snapshot's CacheService so that it now periodically fsync cache files using the method introduced in elastic#64201. The synchronization is executed every 10 seconds by default (this interval can be changed using a new xpack.searchable.snapshot.cache.sync.interval setting).
The searchable snapshots cache implemented in 7.10 is not persisted across node restarts, forcing data nodes to download files from the snapshot repository again once the node is restarted. This commit introduces a new Lucene index that is used to store information about cache files. The information about cache files are periodically updated and committed in this index as part of the cache synchronization task added in #64696. When the data node starts the Lucene index is used to load in memory the cache files information; these information are then used to repopulate the searchable snapshots cache with the cache files that exist on disk. Since data nodes can have one or more data paths, this change introduces a Lucene index per data path. Information about cache files are updated in the Lucene index located on the same data path of the cache files.
…files #64696 (#66216) This committ changes the searchable snapshot's CacheService so that it now periodically fsync cache files using the method introduced in #64201. The synchronization is executed every 10 seconds by default (this interval can be changed using a new xpack.searchable.snapshot.cache.sync.interval setting). Backport of #64696 for 7.11
The searchable snapshots cache implemented in 7.10 is not persisted across node restarts, forcing data nodes to download files from the snapshot repository again once the node is restarted. This commit introduces a new Lucene index that is used to store information about cache files. The information about cache files are periodically updated and committed in this index as part of the cache synchronization task added in elastic#64696. When the data node starts the Lucene index is used to load in memory the cache files information; these information are then used to repopulate the searchable snapshots cache with the cache files that exist on disk. Since data nodes can have one or more data paths, this change introduces a Lucene index per data path. Information about cache files are updated in the Lucene index located on the same data path of the cache files.
The searchable snapshots cache implemented in 7.10 is not persisted across node restarts, forcing data nodes to download files from the snapshot repository again once the node is restarted. This commit introduces a new Lucene index that is used to store information about cache files. The information about cache files are periodically updated and committed in this index as part of the cache synchronization task added in #64696. When the data node starts the Lucene index is used to load in memory the cache files information; these information are then used to repopulate the searchable snapshots cache with the cache files that exist on disk. Since data nodes can have one or more data paths, this change introduces a Lucene index per data path. Information about cache files are updated in the Lucene index located on the same data path of the cache files. Backport of #65725 for 7.11
This pull request changes the searchable snapshot's
CacheService
so that it now periodically fsync cache files using the method introduced in #64201.The synchronization is executed every 60 seconds by default (this interval can be changed using a new
xpack.searchable.snapshot.cache.sync_interval
setting). It is only executed on data nodes that have at least one searchable snapshot assigned. On a data node cache file fsyncs are serialized and executed on a per-shard basis where the order of shards is undefined.