Skip to content

Commit 289b1f4

Browse files
authored
Reduce locking in prewarming (#61837) (#61967)
During prewarming of a Lucene file a CacheFile is acquired and then locked for the duration of the prewarming, ie locked until all the part of the file has been downloaded and written to cache on disk. The locking (executed with CacheFile#fileLock()) is here to prevent the cache file to be evicted while it is prewarming. But holding the lock may take a while for large files, specially since restoring snapshot files now respects the indices.recovery.max_bytes_per_sec setting of 40mb (#58658), and this can have bad consequences like preventing the CacheFile to be evicted, opened or closed. In manual tests this bug slow downs various requests like mounting a new searchable snapshot index or deleting an existing one that is still prewarming. This commit reduces the time the lock is held during prewarming so that the read lock is only required when actively writing to the CacheFile.
1 parent 3e6e81c commit 289b1f4

File tree

1 file changed

+42
-43
lines changed

1 file changed

+42
-43
lines changed

x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInput.java

Lines changed: 42 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -453,54 +453,54 @@ public void prefetchPart(final int part) throws IOException {
453453

454454
try {
455455
final CacheFile cacheFile = getCacheFileSafe();
456-
try (Releasable ignored = cacheFile.fileLock()) {
457-
458-
final Tuple<Long, Long> range = cacheFile.getAbsentRangeWithin(partRange.v1(), partRange.v2());
459-
if (range == null) {
460-
logger.trace(
461-
"prefetchPart: part [{}] bytes [{}-{}] is already fully available for cache file [{}]",
462-
part,
463-
partRange.v1(),
464-
partRange.v2(),
465-
cacheFileReference
466-
);
467-
return;
468-
}
469-
470-
final long rangeStart = range.v1();
471-
final long rangeEnd = range.v2();
472-
final long rangeLength = rangeEnd - rangeStart;
473456

457+
final Tuple<Long, Long> range = cacheFile.getAbsentRangeWithin(partRange.v1(), partRange.v2());
458+
if (range == null) {
474459
logger.trace(
475-
"prefetchPart: prewarming part [{}] bytes [{}-{}] by fetching bytes [{}-{}] for cache file [{}]",
460+
"prefetchPart: part [{}] bytes [{}-{}] is already fully available for cache file [{}]",
476461
part,
477462
partRange.v1(),
478463
partRange.v2(),
479-
rangeStart,
480-
rangeEnd,
481464
cacheFileReference
482465
);
466+
return;
467+
}
483468

484-
final FileChannel fc = cacheFile.getChannel();
485-
assert assertFileChannelOpen(fc);
486-
final byte[] copyBuffer = new byte[toIntBytes(Math.min(COPY_BUFFER_SIZE, rangeLength))];
469+
final long rangeStart = range.v1();
470+
final long rangeEnd = range.v2();
471+
final long rangeLength = rangeEnd - rangeStart;
472+
473+
logger.trace(
474+
"prefetchPart: prewarming part [{}] bytes [{}-{}] by fetching bytes [{}-{}] for cache file [{}]",
475+
part,
476+
partRange.v1(),
477+
partRange.v2(),
478+
rangeStart,
479+
rangeEnd,
480+
cacheFileReference
481+
);
487482

488-
long totalBytesRead = 0L;
489-
final AtomicLong totalBytesWritten = new AtomicLong();
490-
long remainingBytes = rangeEnd - rangeStart;
491-
final long startTimeNanos = stats.currentTimeNanos();
492-
try (InputStream input = openInputStreamFromBlobStore(rangeStart, rangeLength)) {
493-
while (remainingBytes > 0L) {
494-
assert totalBytesRead + remainingBytes == rangeLength;
495-
final int bytesRead = readSafe(input, copyBuffer, rangeStart, rangeEnd, remainingBytes, cacheFileReference);
483+
final byte[] copyBuffer = new byte[toIntBytes(Math.min(COPY_BUFFER_SIZE, rangeLength))];
496484

497-
// The range to prewarm in cache
498-
final long readStart = rangeStart + totalBytesRead;
499-
final Tuple<Long, Long> rangeToWrite = Tuple.tuple(readStart, readStart + bytesRead);
485+
long totalBytesRead = 0L;
486+
final AtomicLong totalBytesWritten = new AtomicLong();
487+
long remainingBytes = rangeEnd - rangeStart;
488+
final long startTimeNanos = stats.currentTimeNanos();
489+
try (InputStream input = openInputStreamFromBlobStore(rangeStart, rangeLength)) {
490+
while (remainingBytes > 0L) {
491+
assert totalBytesRead + remainingBytes == rangeLength;
492+
final int bytesRead = readSafe(input, copyBuffer, rangeStart, rangeEnd, remainingBytes, cacheFileReference);
500493

501-
// We do not actually read anything, but we want to wait for the write to complete before proceeding.
502-
// noinspection UnnecessaryLocalVariable
503-
final Tuple<Long, Long> rangeToRead = rangeToWrite;
494+
// The range to prewarm in cache
495+
final long readStart = rangeStart + totalBytesRead;
496+
final Tuple<Long, Long> rangeToWrite = Tuple.tuple(readStart, readStart + bytesRead);
497+
498+
// We do not actually read anything, but we want to wait for the write to complete before proceeding.
499+
// noinspection UnnecessaryLocalVariable
500+
final Tuple<Long, Long> rangeToRead = rangeToWrite;
501+
502+
try (Releasable ignored = cacheFile.fileLock()) {
503+
assert assertFileChannelOpen(cacheFile.getChannel());
504504

505505
cacheFile.populateAndRead(
506506
rangeToWrite,
@@ -525,15 +525,14 @@ public void prefetchPart(final int part) throws IOException {
525525
},
526526
directory.cacheFetchAsyncExecutor()
527527
).get();
528-
totalBytesRead += bytesRead;
529-
remainingBytes -= bytesRead;
530528
}
531-
final long endTimeNanos = stats.currentTimeNanos();
532-
stats.addCachedBytesWritten(totalBytesWritten.get(), endTimeNanos - startTimeNanos);
529+
totalBytesRead += bytesRead;
530+
remainingBytes -= bytesRead;
533531
}
534-
535-
assert totalBytesRead == rangeLength;
532+
final long endTimeNanos = stats.currentTimeNanos();
533+
stats.addCachedBytesWritten(totalBytesWritten.get(), endTimeNanos - startTimeNanos);
536534
}
535+
assert totalBytesRead == rangeLength;
537536
} catch (final Exception e) {
538537
throw new IOException("Failed to prefetch file part in cache", e);
539538
}

0 commit comments

Comments
 (0)