From 314212ba6f368d50d9989659708641e8ed4987de Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Tue, 27 Oct 2020 11:33:57 +0100 Subject: [PATCH 1/6] Add CacheFile#fsync() method to ensure cached data are written on disk --- .../index/store/cache/CacheFile.java | 56 ++++ .../index/store/cache/CacheFileTests.java | 239 ++++++++++++++++++ 2 files changed, 295 insertions(+) diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CacheFile.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CacheFile.java index 6ccada8aa615f..f9ba56f58dfe7 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CacheFile.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CacheFile.java @@ -13,6 +13,7 @@ import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.util.concurrent.AbstractRefCounted; import org.elasticsearch.common.util.concurrent.AbstractRunnable; +import org.elasticsearch.core.internal.io.IOUtils; import java.io.IOException; import java.io.UncheckedIOException; @@ -66,6 +67,12 @@ protected void closeInternal() { private final Set listeners = new HashSet<>(); + /** + * Indicates whether the cache file has been synchronized with the storage device that contains it, since the last time data + * were written in cache (or since the creation of the file if no cached data have been written yet). + **/ + private final AtomicBoolean fsynced = new AtomicBoolean(); + /** * A reference counted holder for the current channel to the physical file backing this cache file instance. * By guarding access to the file channel by ref-counting and giving the channel its own life-cycle we remove all need for @@ -305,6 +312,7 @@ protected void doRun() throws Exception { reference.decRef(); } gap.onCompletion(); + fsynced.set(false); } @Override @@ -398,4 +406,52 @@ public Tuple getAbsentRangeWithin(long start, long end) { ensureOpen(); return tracker.getAbsentRangeWithin(start, end); } + + // used in tests + boolean isFSynced() { + return fsynced.get(); + } + + /** + * Ensure that all ranges of data written to the cache file are written to the storage device that contains it. This method returns the + * list of all successfully written ranges of data since the creation of the cache file. + * + * @return the list of ranges of data available in cache at the time this method is invoked + * @throws IOException if the cache file failed to be fsync + * @throws AlreadyClosedException if the cache file is evicted + * @throws java.nio.file.NoSuchFileException if the cache file does not exist + */ + public List> fsync() throws IOException { + ensureOpen(); + if (refCounter.tryIncRef()) { + try { + // Capture the completed ranges before fsyncing; ranges that are completed after this point won't be considered as + // persisted on disk by the caller of this method, even if they are fully written to disk at the time the file + // fsync is effectively executed + final List> completedRanges = tracker.getCompletedRanges(); + assert completedRanges != null; + + if (fsynced.compareAndSet(false, true)) { + boolean success = false; + try { + // check again if the file is evicted before doing expensive I/O + ensureOpen(); + IOUtils.fsync(file, false); // TODO don't forget to fsync parent directory + success = true; + } finally { + if (success == false) { + fsynced.set(false); + } + } + } + return completedRanges; + } finally { + refCounter.decRef(); + } + } else { + assert evicted.get(); + throwAlreadyEvicted(); + } + return List.of(); + } } diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CacheFileTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CacheFileTests.java index bdf004ece5c26..6bfcf32be57fd 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CacheFileTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CacheFileTests.java @@ -5,25 +5,50 @@ */ package org.elasticsearch.index.store.cache; +import org.apache.lucene.mockfile.FilterFileChannel; +import org.apache.lucene.mockfile.FilterFileSystemProvider; +import org.apache.lucene.mockfile.FilterPath; +import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.util.SetOnce; import org.elasticsearch.cluster.coordination.DeterministicTaskQueue; import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.io.PathUtils; +import org.elasticsearch.common.io.PathUtilsForTesting; import org.elasticsearch.index.store.cache.CacheFile.EvictionListener; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.threadpool.ThreadPoolStats; +import org.hamcrest.Matcher; import java.io.IOException; import java.nio.channels.FileChannel; +import java.nio.file.FileSystem; import java.nio.file.Files; +import java.nio.file.NoSuchFileException; +import java.nio.file.OpenOption; import java.nio.file.Path; +import java.nio.file.attribute.FileAttribute; +import java.nio.file.spi.FileSystemProvider; import java.util.ArrayList; +import java.util.Comparator; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Objects; +import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.StreamSupport; import static org.elasticsearch.common.settings.Settings.builder; import static org.elasticsearch.node.Node.NODE_NAME_SETTING; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; @@ -171,6 +196,151 @@ public void testConcurrentAccess() throws Exception { } } + public void testFSyncOnNonExistentFile() { + try (FSyncTrackingFileSystemProvider fileSystem = setupFSyncCountingFileSystem()) { + final CacheFile cacheFile = new CacheFile("test", randomNonNegativeLong(), fileSystem.resolve("test")); + expectThrows(NoSuchFileException.class, cacheFile::fsync); + } + } + + public void testFSync() throws Exception { + try (FSyncTrackingFileSystemProvider fileSystem = setupFSyncCountingFileSystem()) { + final CacheFile cacheFile = new CacheFile("test", randomLongBetween(100, 1000), fileSystem.resolve("test")); + assertFalse(cacheFile.isFSynced()); + + final TestEvictionListener listener = new TestEvictionListener(); + cacheFile.acquire(listener); + + List> completedRanges = cacheFile.fsync(); + assertNumberOfFSyncs(cacheFile.getFile(), equalTo(1L)); + assertThat(completedRanges, hasSize(0)); + assertTrue(cacheFile.isFSynced()); + + final TestThreadPool threadPool = new TestThreadPool(getTestName()); + try { + final Set> expectedCompletedRanges = new TreeSet<>(Comparator.comparingLong(Tuple::v1)); + + for (long i = 0L; i < between(1, 10); i++) { + if (randomBoolean()) { + final long position = i * 10L; // simplify the test by completing small non-contiguous ranges + final Tuple range = Tuple.tuple(position, position + 1L); + cacheFile.populateAndRead( + range, + range, + channel -> Math.toIntExact(range.v2() - range.v1()), + (channel, from, to, progressUpdater) -> progressUpdater.accept(to), + threadPool.generic() + ); + + waitForGenericThreadPool(threadPool, expectedCompletedRanges.size() + 1L); + assertTrue(expectedCompletedRanges.add(range)); + assertFalse(cacheFile.isFSynced()); + } + + completedRanges = cacheFile.fsync(); + assertNumberOfFSyncs(cacheFile.getFile(), equalTo(1L + expectedCompletedRanges.size())); + assertThat(completedRanges.size(), equalTo(expectedCompletedRanges.size())); + assertTrue(cacheFile.isFSynced()); + } + + assertArrayEquals(cacheFile.fsync().toArray(Tuple[]::new), expectedCompletedRanges.toArray(Tuple[]::new)); + } finally { + cacheFile.release(listener); + terminate(threadPool); + } + } + } + + public void testFSyncOnEvictedFile() throws Exception { + try (FSyncTrackingFileSystemProvider fileSystem = setupFSyncCountingFileSystem()) { + final CacheFile cacheFile = new CacheFile("test", randomNonNegativeLong(), fileSystem.resolve("test")); + assertFalse(cacheFile.isFSynced()); + + final TestEvictionListener listener = new TestEvictionListener(); + cacheFile.acquire(listener); + + final TestThreadPool threadPool = new TestThreadPool(getTestName()); + try { + final boolean completeRangeBeforeEviction = randomBoolean(); + if (completeRangeBeforeEviction) { + final long start = randomLongBetween(0L, Math.max(0L, cacheFile.getLength() - 1L)); + final long end = randomLongBetween(start, cacheFile.getLength()); + final Tuple range = Tuple.tuple(start, end); + cacheFile.populateAndRead( + range, + range, + channel -> Math.toIntExact(end - start), + (channel, from, to, progressUpdater) -> progressUpdater.accept(to), + threadPool.generic() + ); + + waitForGenericThreadPool(threadPool, 1L); + assertFalse(cacheFile.isFSynced()); + + final List> completedRanges = cacheFile.fsync(); + assertNumberOfFSyncs(cacheFile.getFile(), equalTo(1L)); + assertThat(completedRanges, hasSize(1)); + assertTrue(cacheFile.isFSynced()); + } + + cacheFile.startEviction(); + expectThrows(AlreadyClosedException.class, cacheFile::fsync); + + assertNumberOfFSyncs(cacheFile.getFile(), equalTo(completeRangeBeforeEviction ? 1L : 0L)); + assertThat(cacheFile.isFSynced(), is(completeRangeBeforeEviction)); + } finally { + cacheFile.release(listener); + terminate(threadPool); + } + } + } + + public void testFSyncFailure() throws Exception { + try (FSyncTrackingFileSystemProvider fileSystem = setupFSyncCountingFileSystem()) { + fileSystem.failFSyncs.set(true); + + final CacheFile cacheFile = new CacheFile("test", randomNonNegativeLong(), fileSystem.resolve("test")); + assertFalse(cacheFile.isFSynced()); + + final TestEvictionListener listener = new TestEvictionListener(); + cacheFile.acquire(listener); + + final TestThreadPool threadPool = new TestThreadPool(getTestName()); + try { + final boolean hasCompletedRange = randomBoolean(); + if (hasCompletedRange) { + final long start = randomLongBetween(0L, Math.max(0L, cacheFile.getLength() - 1L)); + final long end = randomLongBetween(start, cacheFile.getLength()); + final Tuple range = Tuple.tuple(start, end); + cacheFile.populateAndRead( + range, + range, + channel -> Math.toIntExact(end - start), + (channel, from, to, progressUpdater) -> progressUpdater.accept(to), + threadPool.generic() + ); + + waitForGenericThreadPool(threadPool, 1L); + assertFalse(cacheFile.isFSynced()); + } + + expectThrows(IOException.class, cacheFile::fsync); + assertNumberOfFSyncs(cacheFile.getFile(), equalTo(0L)); + assertFalse(cacheFile.isFSynced()); + + fileSystem.failFSyncs.set(false); + + final List> completedRanges = cacheFile.fsync(); + assertThat(completedRanges.size(), equalTo(hasCompletedRange ? 1 : 0)); + assertNumberOfFSyncs(cacheFile.getFile(), equalTo(1L)); + assertTrue(cacheFile.isFSynced()); + } finally { + cacheFile.release(listener); + terminate(threadPool); + } + } + } + static class TestEvictionListener implements EvictionListener { private final SetOnce evicted = new SetOnce<>(); @@ -188,4 +358,73 @@ public void onEviction(CacheFile evictedCacheFile) { evicted.set(Objects.requireNonNull(evictedCacheFile)); } } + + public static void assertNumberOfFSyncs(final Path path, final Matcher matcher) { + final FSyncTrackingFileSystemProvider provider = (FSyncTrackingFileSystemProvider) path.getFileSystem().provider(); + final AtomicLong fsyncCounter = provider.files.get(path); + assertThat("File [" + path + "] was never fsynced", notNullValue()); + assertThat("Mismatching number of fsync for [" + path + "]", fsyncCounter.get(), matcher); + } + + private static FSyncTrackingFileSystemProvider setupFSyncCountingFileSystem() { + final FileSystem defaultFileSystem = PathUtils.getDefaultFileSystem(); + final FSyncTrackingFileSystemProvider provider = new FSyncTrackingFileSystemProvider(defaultFileSystem, createTempDir()); + PathUtilsForTesting.installMock(provider.getFileSystem(null)); + return provider; + } + + private static void waitForGenericThreadPool(final ThreadPool threadPool, final long completedTasks) throws Exception { + assertBusy(() -> { + final ThreadPoolStats.Stats threadPoolStats = StreamSupport.stream(threadPool.stats().spliterator(), false) + .filter(stats -> stats.getName().equals(ThreadPool.Names.GENERIC)) + .findFirst() + .orElseThrow(AssertionError::new); + assertThat(threadPoolStats.getCompleted(), greaterThanOrEqualTo(completedTasks)); + assertThat(threadPoolStats.getQueue(), equalTo(0)); + }); + } + + /** + * A {@link FileSystemProvider} that counts the number of times the method {@link FileChannel#force(boolean)} is executed on every + * files. It reinstates the default file system when this file system provider is closed. + */ + private static class FSyncTrackingFileSystemProvider extends FilterFileSystemProvider implements AutoCloseable { + + private final Map files = new ConcurrentHashMap<>(); + private final AtomicBoolean failFSyncs = new AtomicBoolean(); + private final FileSystem delegateInstance; + private final Path rootDir; + + FSyncTrackingFileSystemProvider(FileSystem delegate, Path rootDir) { + super("fsynccounting://", delegate); + this.rootDir = new FilterPath(rootDir, this.fileSystem); + this.delegateInstance = delegate; + } + + public Path resolve(String other) { + return rootDir.resolve(other); + } + + @Override + public FileChannel newFileChannel(Path path, Set options, FileAttribute... attrs) throws IOException { + return new FilterFileChannel(delegate.newFileChannel(toDelegate(path), options, attrs)) { + + final AtomicLong counter = files.computeIfAbsent(path, p -> new AtomicLong(0L)); + + @Override + public void force(boolean metaData) throws IOException { + if (failFSyncs.get()) { + throw new IOException("simulated"); + } + super.force(metaData); + counter.incrementAndGet(); + } + }; + } + + @Override + public void close() { + PathUtilsForTesting.installMock(delegateInstance); + } + } } From 6eea9006cc32def270240126785637afad5a3720 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Mon, 2 Nov 2020 11:14:26 +0100 Subject: [PATCH 2/6] fsynced no metadata --- .../elasticsearch/core/internal/io/IOUtils.java | 17 ++++++++++++++++- .../core/internal/io/IOUtilsTests.java | 14 +++++++++----- .../index/store/cache/CacheFile.java | 2 +- 3 files changed, 26 insertions(+), 7 deletions(-) diff --git a/libs/core/src/main/java/org/elasticsearch/core/internal/io/IOUtils.java b/libs/core/src/main/java/org/elasticsearch/core/internal/io/IOUtils.java index 022d8c90301ed..946c8f43e9ee4 100644 --- a/libs/core/src/main/java/org/elasticsearch/core/internal/io/IOUtils.java +++ b/libs/core/src/main/java/org/elasticsearch/core/internal/io/IOUtils.java @@ -280,6 +280,21 @@ public FileVisitResult visitFileFailed(final Path file, final IOException exc) t * systems and operating systems allow to fsync on a directory) */ public static void fsync(final Path fileToSync, final boolean isDir) throws IOException { + fsync(fileToSync, isDir, true); + } + + /** + * Ensure that any writes to the given file is written to the storage device that contains it. The {@code isDir} parameter specifies + * whether or not the path to sync is a directory. This is needed because we open for read and ignore an {@link IOException} since not + * all filesystems and operating systems support fsyncing on a directory. For regular files we must open for write for the fsync to have + * an effect. + * + * @param fileToSync the file to fsync + * @param isDir if true, the given file is a directory (we open for read and ignore {@link IOException}s, because not all file + * systems and operating systems allow to fsync on a directory) + * @param metaData if {@code true} both the file's content and metadata will be sync, otherwise only the file's content will be sync + */ + public static void fsync(final Path fileToSync, final boolean isDir, final boolean metaData) throws IOException { if (isDir && WINDOWS) { // opening a directory on Windows fails, directories can not be fsynced there if (Files.exists(fileToSync) == false) { @@ -290,7 +305,7 @@ public static void fsync(final Path fileToSync, final boolean isDir) throws IOEx } try (FileChannel file = FileChannel.open(fileToSync, isDir ? StandardOpenOption.READ : StandardOpenOption.WRITE)) { try { - file.force(true); + file.force(metaData); } catch (final IOException e) { if (isDir) { assert (LINUX || MAC_OS_X) == false : diff --git a/libs/core/src/test/java/org/elasticsearch/core/internal/io/IOUtilsTests.java b/libs/core/src/test/java/org/elasticsearch/core/internal/io/IOUtilsTests.java index 882d19cd2c333..a41b41810a602 100644 --- a/libs/core/src/test/java/org/elasticsearch/core/internal/io/IOUtilsTests.java +++ b/libs/core/src/test/java/org/elasticsearch/core/internal/io/IOUtilsTests.java @@ -213,11 +213,15 @@ public void delete(final Path path) throws IOException { } + private void fsync(final Path path, final boolean isDir) throws IOException { + IOUtils.fsync(path, isDir, randomBoolean()); + } + public void testFsyncDirectory() throws Exception { final Path path = createTempDir().toRealPath(); final Path subPath = path.resolve(randomAlphaOfLength(8)); Files.createDirectories(subPath); - IOUtils.fsync(subPath, true); + fsync(subPath, true); // no exception } @@ -246,16 +250,16 @@ public void testFsyncAccessDeniedOpeningDirectory() throws Exception { final Path wrapped = new FilterPath(path, fs); if (Constants.WINDOWS) { // no exception, we early return and do not even try to open the directory - IOUtils.fsync(wrapped, true); + fsync(wrapped, true); } else { - expectThrows(AccessDeniedException.class, () -> IOUtils.fsync(wrapped, true)); + expectThrows(AccessDeniedException.class, () -> fsync(wrapped, true)); } } public void testFsyncNonExistentDirectory() throws Exception { final Path dir = FilterPath.unwrap(createTempDir()).toRealPath(); final Path nonExistentDir = dir.resolve("non-existent"); - expectThrows(NoSuchFileException.class, () -> IOUtils.fsync(nonExistentDir, true)); + expectThrows(NoSuchFileException.class, () -> fsync(nonExistentDir, true)); } public void testFsyncFile() throws IOException { @@ -266,7 +270,7 @@ public void testFsyncFile() throws IOException { try (OutputStream o = Files.newOutputStream(file)) { o.write("0\n".getBytes(StandardCharsets.US_ASCII)); } - IOUtils.fsync(file, false); + fsync(file, false); // no exception } diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CacheFile.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CacheFile.java index f9ba56f58dfe7..8c5aa0e20881c 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CacheFile.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CacheFile.java @@ -436,7 +436,7 @@ public List> fsync() throws IOException { try { // check again if the file is evicted before doing expensive I/O ensureOpen(); - IOUtils.fsync(file, false); // TODO don't forget to fsync parent directory + IOUtils.fsync(file, false, false); // TODO don't forget to fsync parent directory success = true; } finally { if (success == false) { From 2f8b0e71553feb8c3e6212b4abf0f9ba25c08b22 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Mon, 2 Nov 2020 11:15:13 +0100 Subject: [PATCH 3/6] fsynced at creation time --- .../java/org/elasticsearch/index/store/cache/CacheFile.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CacheFile.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CacheFile.java index 8c5aa0e20881c..9dbfc4413327d 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CacheFile.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CacheFile.java @@ -69,9 +69,11 @@ protected void closeInternal() { /** * Indicates whether the cache file has been synchronized with the storage device that contains it, since the last time data - * were written in cache (or since the creation of the file if no cached data have been written yet). + * were written in cache (or since the creation of the file if no cached data have been written yet). An empty cache file is + * considered as fsynced (the initialization value is {@code true}) when it is created; and writing new data to the cache file + * will toggle the flag to {@code false}. **/ - private final AtomicBoolean fsynced = new AtomicBoolean(); + private final AtomicBoolean fsynced = new AtomicBoolean(true); /** * A reference counted holder for the current channel to the physical file backing this cache file instance. From 7b3590edaa921b328efb5bd5c97162f9795b1c55 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Mon, 2 Nov 2020 11:25:10 +0100 Subject: [PATCH 4/6] sorted set --- .../elasticsearch/index/store/cache/CacheFile.java | 10 ++++++---- .../index/store/cache/SparseFileTracker.java | 8 ++++---- .../index/store/cache/CacheFileTests.java | 7 ++++--- .../index/store/cache/SparseFileTrackerTests.java | 14 +++++++------- 4 files changed, 21 insertions(+), 18 deletions(-) diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CacheFile.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CacheFile.java index 9dbfc4413327d..88bfa9195082b 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CacheFile.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CacheFile.java @@ -21,10 +21,12 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardOpenOption; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Objects; import java.util.Set; +import java.util.SortedSet; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.concurrent.Future; @@ -418,19 +420,19 @@ boolean isFSynced() { * Ensure that all ranges of data written to the cache file are written to the storage device that contains it. This method returns the * list of all successfully written ranges of data since the creation of the cache file. * - * @return the list of ranges of data available in cache at the time this method is invoked + * @return a sorted set of ranges of data available in cache at the time calling this method resulted in performing a fsync * @throws IOException if the cache file failed to be fsync * @throws AlreadyClosedException if the cache file is evicted * @throws java.nio.file.NoSuchFileException if the cache file does not exist */ - public List> fsync() throws IOException { + public SortedSet> fsync() throws IOException { ensureOpen(); if (refCounter.tryIncRef()) { try { // Capture the completed ranges before fsyncing; ranges that are completed after this point won't be considered as // persisted on disk by the caller of this method, even if they are fully written to disk at the time the file // fsync is effectively executed - final List> completedRanges = tracker.getCompletedRanges(); + final SortedSet> completedRanges = tracker.getCompletedRanges(); assert completedRanges != null; if (fsynced.compareAndSet(false, true)) { @@ -454,6 +456,6 @@ public List> fsync() throws IOException { assert evicted.get(); throwAlreadyEvicted(); } - return List.of(); + return Collections.emptySortedSet(); } } diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/SparseFileTracker.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/SparseFileTracker.java index 2d192d12d3db8..39f566a7195ce 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/SparseFileTracker.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/SparseFileTracker.java @@ -49,8 +49,8 @@ public long getLength() { return length; } - public List> getCompletedRanges() { - List> completedRanges = null; + public SortedSet> getCompletedRanges() { + SortedSet> completedRanges = null; synchronized (mutex) { assert invariant(); for (Range range : ranges) { @@ -58,12 +58,12 @@ public List> getCompletedRanges() { continue; } if (completedRanges == null) { - completedRanges = new ArrayList<>(); + completedRanges = new TreeSet<>(Comparator.comparingLong(Tuple::v1)); } completedRanges.add(Tuple.tuple(range.start, range.end)); } } - return completedRanges == null ? Collections.emptyList() : completedRanges; + return completedRanges == null ? Collections.emptySortedSet() : completedRanges; } /** diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CacheFileTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CacheFileTests.java index 6bfcf32be57fd..2aa07cac1a487 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CacheFileTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CacheFileTests.java @@ -37,6 +37,7 @@ import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.SortedSet; import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Future; @@ -211,7 +212,7 @@ public void testFSync() throws Exception { final TestEvictionListener listener = new TestEvictionListener(); cacheFile.acquire(listener); - List> completedRanges = cacheFile.fsync(); + SortedSet> completedRanges = cacheFile.fsync(); assertNumberOfFSyncs(cacheFile.getFile(), equalTo(1L)); assertThat(completedRanges, hasSize(0)); assertTrue(cacheFile.isFSynced()); @@ -277,7 +278,7 @@ public void testFSyncOnEvictedFile() throws Exception { waitForGenericThreadPool(threadPool, 1L); assertFalse(cacheFile.isFSynced()); - final List> completedRanges = cacheFile.fsync(); + final SortedSet> completedRanges = cacheFile.fsync(); assertNumberOfFSyncs(cacheFile.getFile(), equalTo(1L)); assertThat(completedRanges, hasSize(1)); assertTrue(cacheFile.isFSynced()); @@ -330,7 +331,7 @@ public void testFSyncFailure() throws Exception { fileSystem.failFSyncs.set(false); - final List> completedRanges = cacheFile.fsync(); + final SortedSet> completedRanges = cacheFile.fsync(); assertThat(completedRanges.size(), equalTo(hasCompletedRange ? 1 : 0)); assertNumberOfFSyncs(cacheFile.getFile(), equalTo(1L)); assertTrue(cacheFile.isFSynced()); diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/SparseFileTrackerTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/SparseFileTrackerTests.java index 9da91388330bc..ccf7075ae99b2 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/SparseFileTrackerTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/SparseFileTrackerTests.java @@ -15,9 +15,9 @@ import java.util.Collections; import java.util.Comparator; import java.util.HashSet; -import java.util.LinkedList; import java.util.List; import java.util.Set; +import java.util.SortedSet; import java.util.TreeSet; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Semaphore; @@ -432,13 +432,13 @@ public void testCompletedRanges() { // merge adjacent processed ranges as the SparseFileTracker does internally when a gap is completed // in order to check that SparseFileTracker.getCompletedRanges() returns the expected values - final List> expectedCompletedRanges = gapsProcessed.stream() + final SortedSet> expectedCompletedRanges = gapsProcessed.stream() .map(gap -> Tuple.tuple(gap.start(), gap.end())) - .collect(LinkedList::new, (gaps, gap) -> { + .collect(() -> new TreeSet<>(Comparator.comparingLong(Tuple::v1)), (gaps, gap) -> { if (gaps.isEmpty()) { gaps.add(gap); } else { - final Tuple previous = gaps.removeLast(); + final Tuple previous = gaps.pollLast(); if (previous.v2().equals(gap.v1())) { gaps.add(Tuple.tuple(previous.v1(), gap.v2())); } else { @@ -448,8 +448,8 @@ public void testCompletedRanges() { } }, (gaps1, gaps2) -> { if (gaps1.isEmpty() == false && gaps2.isEmpty() == false) { - final Tuple last = gaps1.removeLast(); - final Tuple first = gaps2.removeFirst(); + final Tuple last = gaps1.pollLast(); + final Tuple first = gaps2.pollFirst(); if (last.v2().equals(first.v1())) { gaps1.add(Tuple.tuple(last.v1(), first.v2())); } else { @@ -460,7 +460,7 @@ public void testCompletedRanges() { gaps1.addAll(gaps2); }); - final List> completedRanges = sparseFileTracker.getCompletedRanges(); + final SortedSet> completedRanges = sparseFileTracker.getCompletedRanges(); assertThat(completedRanges, hasSize(expectedCompletedRanges.size())); assertThat(completedRanges, equalTo(expectedCompletedRanges)); } From d55278ebf5c6f6ccac3e7af06f3124700215a347 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Mon, 2 Nov 2020 13:11:08 +0100 Subject: [PATCH 5/6] feedback --- .../index/store/cache/CacheFile.java | 29 ++-- .../index/store/cache/CacheFileTests.java | 160 +++++++----------- .../store/cache/SparseFileTrackerTests.java | 35 +--- .../index/store/cache/TestUtils.java | 32 ++++ 4 files changed, 109 insertions(+), 147 deletions(-) diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CacheFile.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CacheFile.java index 88bfa9195082b..1899d4fcd1f0f 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CacheFile.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CacheFile.java @@ -417,44 +417,43 @@ boolean isFSynced() { } /** - * Ensure that all ranges of data written to the cache file are written to the storage device that contains it. This method returns the - * list of all successfully written ranges of data since the creation of the cache file. + * Ensure that all ranges of data written to the cache file are written to the storage device that contains it. This method performs + * synchronization only if data has been written to the cache since the last time the method was called. If calling this method + * resulted in performing a synchronization, a sorted set of all successfully written ranges of data since the creation of the cache + * file is returned. If the cache file is evicted or if a concurrent thread is already fsyncing the file this method returns an empty + * set of ranges. * - * @return a sorted set of ranges of data available in cache at the time calling this method resulted in performing a fsync + * @return a sorted set of ranges of data available in cache iff calling this method resulted in performing a fsync * @throws IOException if the cache file failed to be fsync - * @throws AlreadyClosedException if the cache file is evicted * @throws java.nio.file.NoSuchFileException if the cache file does not exist */ public SortedSet> fsync() throws IOException { - ensureOpen(); if (refCounter.tryIncRef()) { try { - // Capture the completed ranges before fsyncing; ranges that are completed after this point won't be considered as - // persisted on disk by the caller of this method, even if they are fully written to disk at the time the file - // fsync is effectively executed - final SortedSet> completedRanges = tracker.getCompletedRanges(); - assert completedRanges != null; - if (fsynced.compareAndSet(false, true)) { boolean success = false; try { - // check again if the file is evicted before doing expensive I/O - ensureOpen(); + // Capture the completed ranges before fsyncing; ranges that are completed after this point won't be considered as + // persisted on disk by the caller of this method, even if they are fully written to disk at the time the file + // fsync is effectively executed + final SortedSet> completedRanges = tracker.getCompletedRanges(); + assert completedRanges != null; + assert completedRanges.isEmpty() == false; + IOUtils.fsync(file, false, false); // TODO don't forget to fsync parent directory success = true; + return completedRanges; } finally { if (success == false) { fsynced.set(false); } } } - return completedRanges; } finally { refCounter.decRef(); } } else { assert evicted.get(); - throwAlreadyEvicted(); } return Collections.emptySortedSet(); } diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CacheFileTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CacheFileTests.java index 2aa07cac1a487..e241f76b4b48e 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CacheFileTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CacheFileTests.java @@ -8,7 +8,6 @@ import org.apache.lucene.mockfile.FilterFileChannel; import org.apache.lucene.mockfile.FilterFileSystemProvider; import org.apache.lucene.mockfile.FilterPath; -import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.util.SetOnce; import org.elasticsearch.cluster.coordination.DeterministicTaskQueue; import org.elasticsearch.common.collect.Tuple; @@ -16,16 +15,13 @@ import org.elasticsearch.common.io.PathUtilsForTesting; import org.elasticsearch.index.store.cache.CacheFile.EvictionListener; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.threadpool.ThreadPoolStats; import org.hamcrest.Matcher; import java.io.IOException; import java.nio.channels.FileChannel; import java.nio.file.FileSystem; import java.nio.file.Files; -import java.nio.file.NoSuchFileException; import java.nio.file.OpenOption; import java.nio.file.Path; import java.nio.file.attribute.FileAttribute; @@ -43,12 +39,12 @@ import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; -import java.util.stream.StreamSupport; +import static java.util.Collections.synchronizedNavigableSet; import static org.elasticsearch.common.settings.Settings.builder; +import static org.elasticsearch.index.store.cache.TestUtils.mergeContiguousRanges; import static org.elasticsearch.node.Node.NODE_NAME_SETTING; import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; @@ -197,101 +193,61 @@ public void testConcurrentAccess() throws Exception { } } - public void testFSyncOnNonExistentFile() { - try (FSyncTrackingFileSystemProvider fileSystem = setupFSyncCountingFileSystem()) { - final CacheFile cacheFile = new CacheFile("test", randomNonNegativeLong(), fileSystem.resolve("test")); - expectThrows(NoSuchFileException.class, cacheFile::fsync); - } - } - public void testFSync() throws Exception { try (FSyncTrackingFileSystemProvider fileSystem = setupFSyncCountingFileSystem()) { final CacheFile cacheFile = new CacheFile("test", randomLongBetween(100, 1000), fileSystem.resolve("test")); - assertFalse(cacheFile.isFSynced()); + assertTrue(cacheFile.isFSynced()); final TestEvictionListener listener = new TestEvictionListener(); cacheFile.acquire(listener); - SortedSet> completedRanges = cacheFile.fsync(); - assertNumberOfFSyncs(cacheFile.getFile(), equalTo(1L)); - assertThat(completedRanges, hasSize(0)); - assertTrue(cacheFile.isFSynced()); - - final TestThreadPool threadPool = new TestThreadPool(getTestName()); try { - final Set> expectedCompletedRanges = new TreeSet<>(Comparator.comparingLong(Tuple::v1)); - - for (long i = 0L; i < between(1, 10); i++) { - if (randomBoolean()) { - final long position = i * 10L; // simplify the test by completing small non-contiguous ranges - final Tuple range = Tuple.tuple(position, position + 1L); - cacheFile.populateAndRead( - range, - range, - channel -> Math.toIntExact(range.v2() - range.v1()), - (channel, from, to, progressUpdater) -> progressUpdater.accept(to), - threadPool.generic() - ); - - waitForGenericThreadPool(threadPool, expectedCompletedRanges.size() + 1L); - assertTrue(expectedCompletedRanges.add(range)); - assertFalse(cacheFile.isFSynced()); - } - - completedRanges = cacheFile.fsync(); - assertNumberOfFSyncs(cacheFile.getFile(), equalTo(1L + expectedCompletedRanges.size())); - assertThat(completedRanges.size(), equalTo(expectedCompletedRanges.size())); + if (randomBoolean()) { + final SortedSet> completedRanges = cacheFile.fsync(); + assertNumberOfFSyncs(cacheFile.getFile(), equalTo(0L)); + assertThat(completedRanges, hasSize(0)); assertTrue(cacheFile.isFSynced()); } - assertArrayEquals(cacheFile.fsync().toArray(Tuple[]::new), expectedCompletedRanges.toArray(Tuple[]::new)); + final SortedSet> expectedCompletedRanges = randomPopulateAndReads(cacheFile); + assertThat(cacheFile.isFSynced(), equalTo(expectedCompletedRanges.isEmpty())); + + final SortedSet> completedRanges = cacheFile.fsync(); + assertNumberOfFSyncs(cacheFile.getFile(), equalTo(expectedCompletedRanges.isEmpty() ? 0L : 1L)); + assertArrayEquals(completedRanges.toArray(Tuple[]::new), expectedCompletedRanges.toArray(Tuple[]::new)); + assertTrue(cacheFile.isFSynced()); } finally { cacheFile.release(listener); - terminate(threadPool); } } } public void testFSyncOnEvictedFile() throws Exception { try (FSyncTrackingFileSystemProvider fileSystem = setupFSyncCountingFileSystem()) { - final CacheFile cacheFile = new CacheFile("test", randomNonNegativeLong(), fileSystem.resolve("test")); - assertFalse(cacheFile.isFSynced()); + final CacheFile cacheFile = new CacheFile("test", randomLongBetween(1L, 1000L), fileSystem.resolve("test")); + assertTrue(cacheFile.isFSynced()); final TestEvictionListener listener = new TestEvictionListener(); cacheFile.acquire(listener); - final TestThreadPool threadPool = new TestThreadPool(getTestName()); try { - final boolean completeRangeBeforeEviction = randomBoolean(); - if (completeRangeBeforeEviction) { - final long start = randomLongBetween(0L, Math.max(0L, cacheFile.getLength() - 1L)); - final long end = randomLongBetween(start, cacheFile.getLength()); - final Tuple range = Tuple.tuple(start, end); - cacheFile.populateAndRead( - range, - range, - channel -> Math.toIntExact(end - start), - (channel, from, to, progressUpdater) -> progressUpdater.accept(to), - threadPool.generic() - ); - - waitForGenericThreadPool(threadPool, 1L); - assertFalse(cacheFile.isFSynced()); - + final SortedSet> expectedCompletedRanges = randomPopulateAndReads(cacheFile); + if (expectedCompletedRanges.isEmpty() == false) { final SortedSet> completedRanges = cacheFile.fsync(); + assertArrayEquals(completedRanges.toArray(Tuple[]::new), expectedCompletedRanges.toArray(Tuple[]::new)); assertNumberOfFSyncs(cacheFile.getFile(), equalTo(1L)); - assertThat(completedRanges, hasSize(1)); assertTrue(cacheFile.isFSynced()); } + assertTrue(cacheFile.isFSynced()); cacheFile.startEviction(); - expectThrows(AlreadyClosedException.class, cacheFile::fsync); - assertNumberOfFSyncs(cacheFile.getFile(), equalTo(completeRangeBeforeEviction ? 1L : 0L)); - assertThat(cacheFile.isFSynced(), is(completeRangeBeforeEviction)); + final SortedSet> completedRangesAfterEviction = cacheFile.fsync(); + assertNumberOfFSyncs(cacheFile.getFile(), equalTo(expectedCompletedRanges.isEmpty() ? 0L : 1L)); + assertThat(completedRangesAfterEviction, hasSize(0)); + assertTrue(cacheFile.isFSynced()); } finally { cacheFile.release(listener); - terminate(threadPool); } } } @@ -300,44 +256,32 @@ public void testFSyncFailure() throws Exception { try (FSyncTrackingFileSystemProvider fileSystem = setupFSyncCountingFileSystem()) { fileSystem.failFSyncs.set(true); - final CacheFile cacheFile = new CacheFile("test", randomNonNegativeLong(), fileSystem.resolve("test")); - assertFalse(cacheFile.isFSynced()); + final CacheFile cacheFile = new CacheFile("test", randomLongBetween(1L, 1000L), fileSystem.resolve("test")); + assertTrue(cacheFile.isFSynced()); final TestEvictionListener listener = new TestEvictionListener(); cacheFile.acquire(listener); - final TestThreadPool threadPool = new TestThreadPool(getTestName()); try { - final boolean hasCompletedRange = randomBoolean(); - if (hasCompletedRange) { - final long start = randomLongBetween(0L, Math.max(0L, cacheFile.getLength() - 1L)); - final long end = randomLongBetween(start, cacheFile.getLength()); - final Tuple range = Tuple.tuple(start, end); - cacheFile.populateAndRead( - range, - range, - channel -> Math.toIntExact(end - start), - (channel, from, to, progressUpdater) -> progressUpdater.accept(to), - threadPool.generic() - ); - - waitForGenericThreadPool(threadPool, 1L); + final SortedSet> expectedCompletedRanges = randomPopulateAndReads(cacheFile); + if (expectedCompletedRanges.isEmpty()) { + assertTrue(cacheFile.isFSynced()); + final SortedSet> completedRanges = cacheFile.fsync(); + assertTrue(completedRanges.isEmpty()); + } else { assertFalse(cacheFile.isFSynced()); + expectThrows(IOException.class, cacheFile::fsync); } - - expectThrows(IOException.class, cacheFile::fsync); assertNumberOfFSyncs(cacheFile.getFile(), equalTo(0L)); - assertFalse(cacheFile.isFSynced()); fileSystem.failFSyncs.set(false); final SortedSet> completedRanges = cacheFile.fsync(); - assertThat(completedRanges.size(), equalTo(hasCompletedRange ? 1 : 0)); - assertNumberOfFSyncs(cacheFile.getFile(), equalTo(1L)); + assertArrayEquals(completedRanges.toArray(Tuple[]::new), expectedCompletedRanges.toArray(Tuple[]::new)); + assertNumberOfFSyncs(cacheFile.getFile(), equalTo(expectedCompletedRanges.isEmpty() ? 0L : 1L)); assertTrue(cacheFile.isFSynced()); } finally { cacheFile.release(listener); - terminate(threadPool); } } } @@ -360,6 +304,29 @@ public void onEviction(CacheFile evictedCacheFile) { } } + private SortedSet> randomPopulateAndReads(final CacheFile cacheFile) { + final SortedSet> ranges = synchronizedNavigableSet(new TreeSet<>(Comparator.comparingLong(Tuple::v1))); + final List> futures = new ArrayList<>(); + final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue( + builder().put(NODE_NAME_SETTING.getKey(), getTestName()).build(), + random() + ); + for (int i = 0; i < between(0, 10); i++) { + final long start = randomLongBetween(0L, cacheFile.getLength() - 1L); + final long end = randomLongBetween(start + 1L, cacheFile.getLength()); + final Tuple range = Tuple.tuple(start, end); + futures.add( + cacheFile.populateAndRead(range, range, channel -> Math.toIntExact(end - start), (channel, from, to, progressUpdater) -> { + ranges.add(Tuple.tuple(from, to)); + progressUpdater.accept(to); + }, deterministicTaskQueue.getThreadPool().generic()) + ); + } + deterministicTaskQueue.runAllRunnableTasks(); + assertTrue(futures.stream().allMatch(Future::isDone)); + return mergeContiguousRanges(ranges); + } + public static void assertNumberOfFSyncs(final Path path, final Matcher matcher) { final FSyncTrackingFileSystemProvider provider = (FSyncTrackingFileSystemProvider) path.getFileSystem().provider(); final AtomicLong fsyncCounter = provider.files.get(path); @@ -374,17 +341,6 @@ private static FSyncTrackingFileSystemProvider setupFSyncCountingFileSystem() { return provider; } - private static void waitForGenericThreadPool(final ThreadPool threadPool, final long completedTasks) throws Exception { - assertBusy(() -> { - final ThreadPoolStats.Stats threadPoolStats = StreamSupport.stream(threadPool.stats().spliterator(), false) - .filter(stats -> stats.getName().equals(ThreadPool.Names.GENERIC)) - .findFirst() - .orElseThrow(AssertionError::new); - assertThat(threadPoolStats.getCompleted(), greaterThanOrEqualTo(completedTasks)); - assertThat(threadPoolStats.getQueue(), equalTo(0)); - }); - } - /** * A {@link FileSystemProvider} that counts the number of times the method {@link FileChannel#force(boolean)} is executed on every * files. It reinstates the default file system when this file system provider is closed. diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/SparseFileTrackerTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/SparseFileTrackerTests.java index ccf7075ae99b2..393611765c61e 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/SparseFileTrackerTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/SparseFileTrackerTests.java @@ -25,6 +25,7 @@ import java.util.function.Consumer; import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentSet; +import static org.elasticsearch.index.store.cache.TestUtils.mergeContiguousRanges; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants.toIntBytes; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.empty; @@ -418,13 +419,13 @@ public void testCompletedRanges() { final SparseFileTracker sparseFileTracker = new SparseFileTracker("test", fileContents.length); final Set listenersCalled = new HashSet<>(); - final Set gapsProcessed = Collections.synchronizedSet( - new TreeSet<>(Comparator.comparingLong(SparseFileTracker.Gap::start)) + final SortedSet> gapsProcessed = Collections.synchronizedNavigableSet( + new TreeSet<>(Comparator.comparingLong(Tuple::v1)) ); for (int i = between(0, 10); i > 0; i--) { waitForRandomRange(fileContents, sparseFileTracker, listenersCalled::add, gap -> { if (processGap(fileContents, gap)) { - gapsProcessed.add(gap); + gapsProcessed.add(Tuple.tuple(gap.start(), gap.end())); } }); assertTrue(listenersCalled.stream().allMatch(AtomicBoolean::get)); @@ -432,33 +433,7 @@ public void testCompletedRanges() { // merge adjacent processed ranges as the SparseFileTracker does internally when a gap is completed // in order to check that SparseFileTracker.getCompletedRanges() returns the expected values - final SortedSet> expectedCompletedRanges = gapsProcessed.stream() - .map(gap -> Tuple.tuple(gap.start(), gap.end())) - .collect(() -> new TreeSet<>(Comparator.comparingLong(Tuple::v1)), (gaps, gap) -> { - if (gaps.isEmpty()) { - gaps.add(gap); - } else { - final Tuple previous = gaps.pollLast(); - if (previous.v2().equals(gap.v1())) { - gaps.add(Tuple.tuple(previous.v1(), gap.v2())); - } else { - gaps.add(previous); - gaps.add(gap); - } - } - }, (gaps1, gaps2) -> { - if (gaps1.isEmpty() == false && gaps2.isEmpty() == false) { - final Tuple last = gaps1.pollLast(); - final Tuple first = gaps2.pollFirst(); - if (last.v2().equals(first.v1())) { - gaps1.add(Tuple.tuple(last.v1(), first.v2())); - } else { - gaps1.add(last); - gaps2.add(first); - } - } - gaps1.addAll(gaps2); - }); + final SortedSet> expectedCompletedRanges = mergeContiguousRanges(gapsProcessed); final SortedSet> completedRanges = sparseFileTracker.getCompletedRanges(); assertThat(completedRanges, hasSize(expectedCompletedRanges.size())); diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/TestUtils.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/TestUtils.java index aa2634a12fc24..1eba2269a47e4 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/TestUtils.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/TestUtils.java @@ -14,6 +14,7 @@ import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.DeleteResult; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; @@ -25,9 +26,12 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; +import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.Random; +import java.util.SortedSet; +import java.util.TreeSet; import static com.carrotsearch.randomizedtesting.generators.RandomNumbers.randomIntBetween; import static com.carrotsearch.randomizedtesting.generators.RandomPicks.randomFrom; @@ -75,6 +79,34 @@ static long numberOfRanges(int fileSize, int rangeSize) { return numberOfRanges; } + static SortedSet> mergeContiguousRanges(final SortedSet> ranges) { + return ranges.stream().collect(() -> new TreeSet<>(Comparator.comparingLong(Tuple::v1)), (gaps, gap) -> { + if (gaps.isEmpty()) { + gaps.add(gap); + } else { + final Tuple previous = gaps.pollLast(); + if (previous.v2().equals(gap.v1())) { + gaps.add(Tuple.tuple(previous.v1(), gap.v2())); + } else { + gaps.add(previous); + gaps.add(gap); + } + } + }, (gaps1, gaps2) -> { + if (gaps1.isEmpty() == false && gaps2.isEmpty() == false) { + final Tuple last = gaps1.pollLast(); + final Tuple first = gaps2.pollFirst(); + if (last.v2().equals(first.v1())) { + gaps1.add(Tuple.tuple(last.v1(), first.v2())); + } else { + gaps1.add(last); + gaps2.add(first); + } + } + gaps1.addAll(gaps2); + }); + } + public static void assertCounter(IndexInputStats.Counter counter, long total, long count, long min, long max) { assertThat(counter.total(), equalTo(total)); assertThat(counter.count(), equalTo(count)); From d8fdaaa2aab25e78033906f2fa8c956a6e340441 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Tue, 3 Nov 2020 14:05:19 +0100 Subject: [PATCH 6/6] needsFsync --- .../index/store/cache/CacheFile.java | 19 +++++----- .../index/store/cache/CacheFileTests.java | 35 +++++++++++-------- 2 files changed, 29 insertions(+), 25 deletions(-) diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CacheFile.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CacheFile.java index 0c1f78b21c6b1..7074205df3088 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CacheFile.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CacheFile.java @@ -71,12 +71,11 @@ protected void closeInternal() { private final Set listeners = new HashSet<>(); /** - * Indicates whether the cache file has been synchronized with the storage device that contains it, since the last time data - * were written in cache (or since the creation of the file if no cached data have been written yet). An empty cache file is - * considered as fsynced (the initialization value is {@code true}) when it is created; and writing new data to the cache file - * will toggle the flag to {@code false}. + * Indicates whether the cache file requires to be synchronized with the storage device that contains it in order to persist in a + * durable manner its ranges of cached data. An empty cache file does not need to be fsync; and writing new data to the cache file + * will toggle the flag to {@code true}. **/ - private final AtomicBoolean fsynced = new AtomicBoolean(true); + private final AtomicBoolean needsFsync = new AtomicBoolean(); /** * A reference counted holder for the current channel to the physical file backing this cache file instance. @@ -321,7 +320,7 @@ protected void doRun() throws Exception { reference.decRef(); } gap.onCompletion(); - fsynced.set(false); + needsFsync.set(true); } @Override @@ -417,8 +416,8 @@ public Tuple getAbsentRangeWithin(long start, long end) { } // used in tests - boolean isFSynced() { - return fsynced.get(); + boolean needsFsync() { + return needsFsync.get(); } /** @@ -435,7 +434,7 @@ boolean isFSynced() { public SortedSet> fsync() throws IOException { if (refCounter.tryIncRef()) { try { - if (fsynced.compareAndSet(false, true)) { + if (needsFsync.compareAndSet(true, false)) { boolean success = false; try { // Capture the completed ranges before fsyncing; ranges that are completed after this point won't be considered as @@ -450,7 +449,7 @@ public SortedSet> fsync() throws IOException { return completedRanges; } finally { if (success == false) { - fsynced.set(false); + needsFsync.set(true); } } } diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CacheFileTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CacheFileTests.java index e241f76b4b48e..187abd7240380 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CacheFileTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CacheFileTests.java @@ -196,7 +196,7 @@ public void testConcurrentAccess() throws Exception { public void testFSync() throws Exception { try (FSyncTrackingFileSystemProvider fileSystem = setupFSyncCountingFileSystem()) { final CacheFile cacheFile = new CacheFile("test", randomLongBetween(100, 1000), fileSystem.resolve("test")); - assertTrue(cacheFile.isFSynced()); + assertFalse(cacheFile.needsFsync()); final TestEvictionListener listener = new TestEvictionListener(); cacheFile.acquire(listener); @@ -206,16 +206,20 @@ public void testFSync() throws Exception { final SortedSet> completedRanges = cacheFile.fsync(); assertNumberOfFSyncs(cacheFile.getFile(), equalTo(0L)); assertThat(completedRanges, hasSize(0)); - assertTrue(cacheFile.isFSynced()); + assertFalse(cacheFile.needsFsync()); } final SortedSet> expectedCompletedRanges = randomPopulateAndReads(cacheFile); - assertThat(cacheFile.isFSynced(), equalTo(expectedCompletedRanges.isEmpty())); + if (expectedCompletedRanges.isEmpty() == false) { + assertTrue(cacheFile.needsFsync()); + } else { + assertFalse(cacheFile.needsFsync()); + } final SortedSet> completedRanges = cacheFile.fsync(); assertNumberOfFSyncs(cacheFile.getFile(), equalTo(expectedCompletedRanges.isEmpty() ? 0L : 1L)); assertArrayEquals(completedRanges.toArray(Tuple[]::new), expectedCompletedRanges.toArray(Tuple[]::new)); - assertTrue(cacheFile.isFSynced()); + assertFalse(cacheFile.needsFsync()); } finally { cacheFile.release(listener); } @@ -225,7 +229,7 @@ public void testFSync() throws Exception { public void testFSyncOnEvictedFile() throws Exception { try (FSyncTrackingFileSystemProvider fileSystem = setupFSyncCountingFileSystem()) { final CacheFile cacheFile = new CacheFile("test", randomLongBetween(1L, 1000L), fileSystem.resolve("test")); - assertTrue(cacheFile.isFSynced()); + assertFalse(cacheFile.needsFsync()); final TestEvictionListener listener = new TestEvictionListener(); cacheFile.acquire(listener); @@ -233,19 +237,20 @@ public void testFSyncOnEvictedFile() throws Exception { try { final SortedSet> expectedCompletedRanges = randomPopulateAndReads(cacheFile); if (expectedCompletedRanges.isEmpty() == false) { + assertTrue(cacheFile.needsFsync()); final SortedSet> completedRanges = cacheFile.fsync(); assertArrayEquals(completedRanges.toArray(Tuple[]::new), expectedCompletedRanges.toArray(Tuple[]::new)); assertNumberOfFSyncs(cacheFile.getFile(), equalTo(1L)); - assertTrue(cacheFile.isFSynced()); + assertFalse(cacheFile.needsFsync()); } - assertTrue(cacheFile.isFSynced()); + assertFalse(cacheFile.needsFsync()); cacheFile.startEviction(); final SortedSet> completedRangesAfterEviction = cacheFile.fsync(); assertNumberOfFSyncs(cacheFile.getFile(), equalTo(expectedCompletedRanges.isEmpty() ? 0L : 1L)); assertThat(completedRangesAfterEviction, hasSize(0)); - assertTrue(cacheFile.isFSynced()); + assertFalse(cacheFile.needsFsync()); } finally { cacheFile.release(listener); } @@ -257,20 +262,20 @@ public void testFSyncFailure() throws Exception { fileSystem.failFSyncs.set(true); final CacheFile cacheFile = new CacheFile("test", randomLongBetween(1L, 1000L), fileSystem.resolve("test")); - assertTrue(cacheFile.isFSynced()); + assertFalse(cacheFile.needsFsync()); final TestEvictionListener listener = new TestEvictionListener(); cacheFile.acquire(listener); try { final SortedSet> expectedCompletedRanges = randomPopulateAndReads(cacheFile); - if (expectedCompletedRanges.isEmpty()) { - assertTrue(cacheFile.isFSynced()); + if (expectedCompletedRanges.isEmpty() == false) { + assertTrue(cacheFile.needsFsync()); + expectThrows(IOException.class, cacheFile::fsync); + } else { + assertFalse(cacheFile.needsFsync()); final SortedSet> completedRanges = cacheFile.fsync(); assertTrue(completedRanges.isEmpty()); - } else { - assertFalse(cacheFile.isFSynced()); - expectThrows(IOException.class, cacheFile::fsync); } assertNumberOfFSyncs(cacheFile.getFile(), equalTo(0L)); @@ -279,7 +284,7 @@ public void testFSyncFailure() throws Exception { final SortedSet> completedRanges = cacheFile.fsync(); assertArrayEquals(completedRanges.toArray(Tuple[]::new), expectedCompletedRanges.toArray(Tuple[]::new)); assertNumberOfFSyncs(cacheFile.getFile(), equalTo(expectedCompletedRanges.isEmpty() ? 0L : 1L)); - assertTrue(cacheFile.isFSynced()); + assertFalse(cacheFile.needsFsync()); } finally { cacheFile.release(listener); }