diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CacheFile.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CacheFile.java index 7bccf98191ebb..1bf5ac4e17940 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CacheFile.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CacheFile.java @@ -77,6 +77,12 @@ protected void closeInternal() { **/ private final AtomicBoolean needsFsync = new AtomicBoolean(); + /** + * A runnable that is executed every time the {@link #needsFsync} flag is toggled to {@code true}, which indicates that the cache file + * has been updated. See {@link #markAsNeedsFSync()} method. + */ + private final Runnable needsFsyncRunnable; + /** * A reference counted holder for the current channel to the physical file backing this cache file instance. * By guarding access to the file channel by ref-counting and giving the channel its own life-cycle we remove all need for @@ -117,10 +123,11 @@ protected void closeInternal() { @Nullable private volatile FileChannelReference channelRef; - public CacheFile(String description, long length, Path file) { + public CacheFile(String description, long length, Path file, Runnable onNeedFSync) { this.tracker = new SparseFileTracker(file.toString(), length); this.description = Objects.requireNonNull(description); this.file = Objects.requireNonNull(file); + this.needsFsyncRunnable = Objects.requireNonNull(onNeedFSync); assert invariant(); } @@ -320,7 +327,7 @@ protected void doRun() throws Exception { reference.decRef(); } gap.onCompletion(); - needsFsync.set(true); + markAsNeedsFSync(); } @Override @@ -420,6 +427,15 @@ boolean needsFsync() { return needsFsync.get(); } + /** + * Marks the current cache file as "fsync needed" and notifies the corresponding listener. + */ + private void markAsNeedsFSync() { + if (needsFsync.getAndSet(true) == false) { + needsFsyncRunnable.run(); + } + } + /** * Ensure that all ranges of data written to the cache file are written to the storage device that contains it. This method performs * synchronization only if data has been written to the cache since the last time the method was called. If calling this method @@ -444,12 +460,12 @@ public SortedSet> fsync() throws IOException { assert completedRanges != null; assert completedRanges.isEmpty() == false; - IOUtils.fsync(file, false, false); // TODO don't forget to fsync parent directory + IOUtils.fsync(file, false, false); success = true; return completedRanges; } finally { if (success == false) { - needsFsync.set(true); + markAsNeedsFSync(); } } } diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CacheKey.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CacheKey.java index a84b28ca53770..ba4b4c315c2ef 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CacheKey.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CacheKey.java @@ -25,19 +25,19 @@ public CacheKey(SnapshotId snapshotId, IndexId indexId, ShardId shardId, String this.fileName = Objects.requireNonNull(fileName); } - SnapshotId getSnapshotId() { + public SnapshotId getSnapshotId() { return snapshotId; } - IndexId getIndexId() { + public IndexId getIndexId() { return indexId; } - ShardId getShardId() { + public ShardId getShardId() { return shardId; } - String getFileName() { + public String getFileName() { return fileName; } diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java index 5433ea37ec634..bf5a869d44fe6 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java @@ -11,6 +11,7 @@ import org.elasticsearch.blobstore.cache.BlobStoreCacheService; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.allocation.ExistingShardsAllocator; import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider; @@ -67,6 +68,7 @@ import org.elasticsearch.xpack.searchablesnapshots.rest.RestMountSearchableSnapshotAction; import org.elasticsearch.xpack.searchablesnapshots.rest.RestSearchableSnapshotsStatsAction; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -182,7 +184,10 @@ public List> getSettings() { SNAPSHOT_CACHE_EXCLUDED_FILE_TYPES_SETTING, SNAPSHOT_UNCACHED_CHUNK_SIZE_SETTING, CacheService.SNAPSHOT_CACHE_SIZE_SETTING, - CacheService.SNAPSHOT_CACHE_RANGE_SIZE_SETTING + CacheService.SNAPSHOT_CACHE_RANGE_SIZE_SETTING, + CacheService.SNAPSHOT_CACHE_SYNC_INTERVAL_SETTING, + CacheService.SNAPSHOT_CACHE_MAX_FILES_TO_SYNC_AT_ONCE_SETTING, + CacheService.SNAPSHOT_CACHE_SYNC_SHUTDOWN_TIMEOUT ); } @@ -200,19 +205,29 @@ public Collection createComponents( final IndexNameExpressionResolver resolver, final Supplier repositoriesServiceSupplier ) { - final CacheService cacheService = new CacheService(new NodeEnvironmentCacheCleaner(nodeEnvironment), settings); - this.cacheService.set(cacheService); + final List components = new ArrayList<>(); this.repositoriesServiceSupplier = repositoriesServiceSupplier; this.threadPool.set(threadPool); - final BlobStoreCacheService blobStoreCacheService = new BlobStoreCacheService( - clusterService, - threadPool, - client, - SNAPSHOT_BLOB_CACHE_INDEX - ); - this.blobStoreCacheService.set(blobStoreCacheService); this.failShardsListener.set(new FailShardsOnInvalidLicenseClusterListener(getLicenseState(), clusterService.getRerouteService())); - return List.of(cacheService, blobStoreCacheService); + if (DiscoveryNode.isDataNode(settings)) { + final CacheService cacheService = new CacheService( + settings, + clusterService, + threadPool, + new NodeEnvironmentCacheCleaner(nodeEnvironment) + ); + this.cacheService.set(cacheService); + components.add(cacheService); + final BlobStoreCacheService blobStoreCacheService = new BlobStoreCacheService( + clusterService, + threadPool, + client, + SNAPSHOT_BLOB_CACHE_INDEX + ); + this.blobStoreCacheService.set(blobStoreCacheService); + components.add(blobStoreCacheService); + } + return Collections.unmodifiableList(components); } @Override diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheService.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheService.java index 7f743b4f5a6c0..6afe9c3b9f273 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheService.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheService.java @@ -5,30 +5,50 @@ */ package org.elasticsearch.xpack.searchablesnapshots.cache; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.lucene.util.SetOnce; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.cache.Cache; import org.elasticsearch.common.cache.CacheBuilder; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.component.Lifecycle; +import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.AbstractAsyncTask; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.shard.ShardPath; import org.elasticsearch.index.store.cache.CacheFile; import org.elasticsearch.index.store.cache.CacheKey; +import org.elasticsearch.threadpool.ThreadPool; +import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; +import java.util.HashSet; import java.util.Objects; +import java.util.Set; +import java.util.SortedSet; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Predicate; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants.toIntBytes; /** - * {@link CacheService} maintains a cache entry for all files read from searchable snapshot directories ( - * see {@link org.elasticsearch.index.store.SearchableSnapshotDirectory}) + * {@link CacheService} maintains a cache entry for all files read from searchable snapshot directories (see + * {@link org.elasticsearch.index.store.SearchableSnapshotDirectory}). + * + * Cache files created by this service are periodically synchronized on disk in order to make the cached data durable + * (see {@link #synchronizeCache()} for more information). */ public class CacheService extends AbstractLifecycleComponent { @@ -52,41 +72,108 @@ public class CacheService extends AbstractLifecycleComponent { Setting.Property.NodeScope ); + public static final TimeValue MIN_SNAPSHOT_CACHE_SYNC_INTERVAL = TimeValue.timeValueSeconds(1L); + public static final Setting SNAPSHOT_CACHE_SYNC_INTERVAL_SETTING = Setting.timeSetting( + SETTINGS_PREFIX + "sync.interval", + TimeValue.timeValueSeconds(60L), // default + MIN_SNAPSHOT_CACHE_SYNC_INTERVAL, // min + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); + + public static final Setting SNAPSHOT_CACHE_MAX_FILES_TO_SYNC_AT_ONCE_SETTING = Setting.intSetting( + SETTINGS_PREFIX + "sync.max_files", + 10_000, // default + 0, // min + Integer.MAX_VALUE, // max + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); + + public static final Setting SNAPSHOT_CACHE_SYNC_SHUTDOWN_TIMEOUT = Setting.timeSetting( + SETTINGS_PREFIX + "sync.shutdown_timeout", + TimeValue.timeValueSeconds(10L), // default + TimeValue.ZERO, // min + Setting.Property.NodeScope + ); + + private static final Logger logger = LogManager.getLogger(CacheService.class); + + private final ThreadPool threadPool; + private final ConcurrentLinkedQueue cacheFilesToSync; + private final AtomicLong numberOfCacheFilesToSync; + private final CacheSynchronizationTask cacheSyncTask; + private final TimeValue cacheSyncStopTimeout; + private final ReentrantLock cacheSyncLock; private final Cache cache; private final ByteSizeValue cacheSize; private final Runnable cacheCleaner; private final ByteSizeValue rangeSize; - public CacheService(final Runnable cacheCleaner, final Settings settings) { - this(cacheCleaner, SNAPSHOT_CACHE_SIZE_SETTING.get(settings), SNAPSHOT_CACHE_RANGE_SIZE_SETTING.get(settings)); - } + private volatile int maxCacheFilesToSyncAtOnce; - // exposed for tests - public CacheService(final Runnable cacheCleaner, final ByteSizeValue cacheSize, final ByteSizeValue rangeSize) { - this.cacheSize = Objects.requireNonNull(cacheSize); + public CacheService( + final Settings settings, + final ClusterService clusterService, + final ThreadPool threadPool, + final Runnable cacheCleaner + ) { + this.threadPool = Objects.requireNonNull(threadPool); + this.cacheSize = SNAPSHOT_CACHE_SIZE_SETTING.get(settings); this.cacheCleaner = Objects.requireNonNull(cacheCleaner); - this.rangeSize = Objects.requireNonNull(rangeSize); + this.rangeSize = SNAPSHOT_CACHE_RANGE_SIZE_SETTING.get(settings); this.cache = CacheBuilder.builder() .setMaximumWeight(cacheSize.getBytes()) .weigher((key, entry) -> entry.getLength()) // NORELEASE This does not immediately free space on disk, as cache file are only deleted when all index inputs // are done with reading/writing the cache file - .removalListener(notification -> IOUtils.closeWhileHandlingException(() -> notification.getValue().startEviction())) + .removalListener(notification -> onCacheFileRemoval(notification.getValue())) .build(); + this.numberOfCacheFilesToSync = new AtomicLong(); + this.cacheSyncLock = new ReentrantLock(); + this.cacheFilesToSync = new ConcurrentLinkedQueue<>(); + final ClusterSettings clusterSettings = clusterService.getClusterSettings(); + this.maxCacheFilesToSyncAtOnce = SNAPSHOT_CACHE_MAX_FILES_TO_SYNC_AT_ONCE_SETTING.get(settings); + clusterSettings.addSettingsUpdateConsumer(SNAPSHOT_CACHE_MAX_FILES_TO_SYNC_AT_ONCE_SETTING, this::setMaxCacheFilesToSyncAtOnce); + this.cacheSyncTask = new CacheSynchronizationTask(threadPool, SNAPSHOT_CACHE_SYNC_INTERVAL_SETTING.get(settings)); + clusterSettings.addSettingsUpdateConsumer(SNAPSHOT_CACHE_SYNC_INTERVAL_SETTING, this::setCacheSyncInterval); + this.cacheSyncStopTimeout = SNAPSHOT_CACHE_SYNC_SHUTDOWN_TIMEOUT.get(settings); } public static Path getShardCachePath(ShardPath shardPath) { - return shardPath.getDataPath().resolve("snapshot_cache"); + return resolveSnapshotCache(shardPath.getDataPath()); + } + + static Path resolveSnapshotCache(Path path) { + return path.resolve("snapshot_cache"); } @Override protected void doStart() { + cacheSyncTask.rescheduleIfNecessary(); cacheCleaner.run(); } @Override protected void doStop() { - cache.invalidateAll(); + boolean acquired = false; + try { + try { + acquired = cacheSyncLock.tryLock(cacheSyncStopTimeout.duration(), cacheSyncStopTimeout.timeUnit()); + if (acquired == false) { + logger.warn("failed to acquire cache sync lock in [{}], cache might be partially persisted", cacheSyncStopTimeout); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + logger.warn("interrupted while waiting for cache sync lock", e); + } + cacheSyncTask.close(); + cache.invalidateAll(); + } finally { + if (acquired) { + cacheSyncLock.unlock(); + } + } } @Override @@ -124,7 +211,9 @@ public CacheFile get(final CacheKey cacheKey, final long fileLength, final Path final Path path = cacheDir.resolve(uuid); assert Files.notExists(path) : "cache file already exists " + path; - return new CacheFile(key.toString(), fileLength, path); + final SetOnce cacheFile = new SetOnce<>(); + cacheFile.set(new CacheFile(key.toString(), fileLength, path, () -> onCacheFileUpdate(cacheFile.get()))); + return cacheFile.get(); }); } @@ -141,4 +230,138 @@ public void removeFromCache(final Predicate predicate) { } cache.refresh(); } + + void setCacheSyncInterval(TimeValue interval) { + cacheSyncTask.setInterval(interval); + } + + private void setMaxCacheFilesToSyncAtOnce(int maxCacheFilesToSyncAtOnce) { + this.maxCacheFilesToSyncAtOnce = maxCacheFilesToSyncAtOnce; + } + + /** + * This method is invoked when a {@link CacheFile} notifies the current {@link CacheService} that it needs to be fsync on disk. + *

+ * It adds the {@link CacheFile} instance to current set of cache files to synchronize. + * + * @param cacheFile the instance that needs to be fsync + */ + private void onCacheFileUpdate(CacheFile cacheFile) { + assert cacheFile != null; + cacheFilesToSync.offer(cacheFile); + numberOfCacheFilesToSync.incrementAndGet(); + } + + /** + * This method is invoked after a {@link CacheFile} is evicted from the cache. + *

+ * It notifies the {@link CacheFile}'s eviction listeners that the instance is evicted. + * + * @param cacheFile the evicted instance + */ + private void onCacheFileRemoval(CacheFile cacheFile) { + IOUtils.closeWhileHandlingException(cacheFile::startEviction); + } + + // used in tests + boolean isCacheFileToSync(CacheFile cacheFile) { + return cacheFilesToSync.contains(cacheFile); + } + + /** + * Synchronize the cache files and their parent directories on disk. + * + * This method synchronizes the cache files that have been updated since the last time the method was invoked. To be able to do this, + * the cache files must notify the {@link CacheService} when they need to be fsync. When a {@link CacheFile} notifies the service the + * {@link CacheFile} instance is added to the current queue of cache files to synchronize referenced by {@link #cacheFilesToSync}. + * + * Cache files are serially synchronized using the {@link CacheFile#fsync()} method. When the {@link CacheFile#fsync()} call returns a + * non empty set of completed ranges this method also fsync the shard's snapshot cache directory, which is the parent directory of the + * cache entry. Note that cache files might be evicted during the synchronization. + */ + protected void synchronizeCache() { + cacheSyncLock.lock(); + try { + long count = 0L; + final Set cacheDirs = new HashSet<>(); + final long startTimeNanos = threadPool.relativeTimeInNanos(); + final long maxCacheFilesToSync = Math.min(numberOfCacheFilesToSync.get(), this.maxCacheFilesToSyncAtOnce); + + for (long i = 0L; i < maxCacheFilesToSync; i++) { + if (lifecycleState() != Lifecycle.State.STARTED) { + logger.debug("stopping cache synchronization (cache service is closing)"); + return; + } + + final CacheFile cacheFile = cacheFilesToSync.poll(); + assert cacheFile != null; + + final long value = numberOfCacheFilesToSync.decrementAndGet(); + assert value >= 0 : value; + final Path cacheFilePath = cacheFile.getFile(); + try { + final SortedSet> ranges = cacheFile.fsync(); + if (ranges.isEmpty() == false) { + logger.trace( + "cache file [{}] synchronized with [{}] completed range(s)", + cacheFilePath.getFileName(), + ranges.size() + ); + final Path cacheDir = cacheFilePath.toAbsolutePath().getParent(); + if (cacheDirs.add(cacheDir)) { + try { + IOUtils.fsync(cacheDir, true, false); + logger.trace("cache directory [{}] synchronized", cacheDir); + } catch (Exception e) { + assert e instanceof IOException : e; + logger.warn(() -> new ParameterizedMessage("failed to synchronize cache directory [{}]", cacheDir), e); + } + } + // TODO Index searchable snapshot shard information + cache file ranges in Lucene + count += 1L; + } + } catch (Exception e) { + assert e instanceof IOException : e; + logger.warn(() -> new ParameterizedMessage("failed to fsync cache file [{}]", cacheFilePath.getFileName()), e); + } + } + if (logger.isDebugEnabled()) { + final long elapsedNanos = threadPool.relativeTimeInNanos() - startTimeNanos; + logger.debug( + "cache files synchronization is done ([{}] cache files synchronized in [{}])", + count, + TimeValue.timeValueNanos(elapsedNanos) + ); + } + } finally { + cacheSyncLock.unlock(); + } + } + + class CacheSynchronizationTask extends AbstractAsyncTask { + + CacheSynchronizationTask(ThreadPool threadPool, TimeValue interval) { + super(logger, Objects.requireNonNull(threadPool), Objects.requireNonNull(interval), true); + } + + @Override + protected boolean mustReschedule() { + return true; + } + + @Override + public void runInternal() { + synchronizeCache(); + } + + @Override + protected String getThreadPool() { + return ThreadPool.Names.GENERIC; + } + + @Override + public String toString() { + return "cache_synchronization_task"; + } + } } diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CacheFileTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CacheFileTests.java index 187abd7240380..931c5f9f9ccbd 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CacheFileTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CacheFileTests.java @@ -5,15 +5,14 @@ */ package org.elasticsearch.index.store.cache; -import org.apache.lucene.mockfile.FilterFileChannel; -import org.apache.lucene.mockfile.FilterFileSystemProvider; -import org.apache.lucene.mockfile.FilterPath; import org.apache.lucene.util.SetOnce; import org.elasticsearch.cluster.coordination.DeterministicTaskQueue; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.PathUtils; import org.elasticsearch.common.io.PathUtilsForTesting; +import org.elasticsearch.common.util.concurrent.RunOnce; import org.elasticsearch.index.store.cache.CacheFile.EvictionListener; +import org.elasticsearch.index.store.cache.TestUtils.FSyncTrackingFileSystemProvider; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.hamcrest.Matcher; @@ -22,28 +21,19 @@ import java.nio.channels.FileChannel; import java.nio.file.FileSystem; import java.nio.file.Files; -import java.nio.file.OpenOption; import java.nio.file.Path; -import java.nio.file.attribute.FileAttribute; -import java.nio.file.spi.FileSystemProvider; import java.util.ArrayList; -import java.util.Comparator; import java.util.Iterator; import java.util.List; -import java.util.Map; import java.util.Objects; -import java.util.Set; import java.util.SortedSet; -import java.util.TreeSet; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; -import static java.util.Collections.synchronizedNavigableSet; import static org.elasticsearch.common.settings.Settings.builder; -import static org.elasticsearch.index.store.cache.TestUtils.mergeContiguousRanges; +import static org.elasticsearch.index.store.cache.TestUtils.randomPopulateAndReads; import static org.elasticsearch.node.Node.NODE_NAME_SETTING; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; @@ -53,9 +43,11 @@ public class CacheFileTests extends ESTestCase { + private static final Runnable NOOP = () -> {}; + public void testAcquireAndRelease() throws Exception { final Path file = createTempDir().resolve("file.cache"); - final CacheFile cacheFile = new CacheFile("test", randomLongBetween(1, 100), file); + final CacheFile cacheFile = new CacheFile("test", randomLongBetween(1, 100), file, NOOP); assertThat("Cache file is not acquired: no channel exists", cacheFile.getChannel(), nullValue()); assertThat("Cache file is not acquired: file does not exist", Files.exists(file), is(false)); @@ -94,7 +86,7 @@ public void testAcquireAndRelease() throws Exception { public void testCacheFileNotAcquired() throws IOException { final Path file = createTempDir().resolve("file.cache"); - final CacheFile cacheFile = new CacheFile("test", randomLongBetween(1, 100), file); + final CacheFile cacheFile = new CacheFile("test", randomLongBetween(1, 100), file, NOOP); assertThat(Files.exists(file), is(false)); assertThat(cacheFile.getChannel(), nullValue()); @@ -116,7 +108,7 @@ public void testCacheFileNotAcquired() throws IOException { public void testDeleteOnCloseAfterLastRelease() throws Exception { final Path file = createTempDir().resolve("file.cache"); - final CacheFile cacheFile = new CacheFile("test", randomLongBetween(1, 100), file); + final CacheFile cacheFile = new CacheFile("test", randomLongBetween(1, 100), file, NOOP); final List acquiredListeners = new ArrayList<>(); for (int i = 0; i < randomIntBetween(1, 20); i++) { @@ -148,7 +140,7 @@ public void testDeleteOnCloseAfterLastRelease() throws Exception { public void testConcurrentAccess() throws Exception { final Path file = createTempDir().resolve("file.cache"); - final CacheFile cacheFile = new CacheFile("test", randomLongBetween(1, 100), file); + final CacheFile cacheFile = new CacheFile("test", randomLongBetween(1, 100), file, NOOP); final TestEvictionListener evictionListener = new TestEvictionListener(); cacheFile.acquire(evictionListener); @@ -194,9 +186,17 @@ public void testConcurrentAccess() throws Exception { } public void testFSync() throws Exception { - try (FSyncTrackingFileSystemProvider fileSystem = setupFSyncCountingFileSystem()) { - final CacheFile cacheFile = new CacheFile("test", randomLongBetween(100, 1000), fileSystem.resolve("test")); + final FSyncTrackingFileSystemProvider fileSystem = setupFSyncCountingFileSystem(); + try { + final AtomicBoolean needsFSyncCalled = new AtomicBoolean(); + final CacheFile cacheFile = new CacheFile( + "test", + randomLongBetween(100, 1000), + fileSystem.resolve("test"), + () -> assertFalse(needsFSyncCalled.getAndSet(true)) + ); assertFalse(cacheFile.needsFsync()); + assertFalse(needsFSyncCalled.get()); final TestEvictionListener listener = new TestEvictionListener(); cacheFile.acquire(listener); @@ -204,65 +204,100 @@ public void testFSync() throws Exception { try { if (randomBoolean()) { final SortedSet> completedRanges = cacheFile.fsync(); - assertNumberOfFSyncs(cacheFile.getFile(), equalTo(0L)); + assertNumberOfFSyncs(cacheFile.getFile(), equalTo(0)); assertThat(completedRanges, hasSize(0)); assertFalse(cacheFile.needsFsync()); + assertFalse(needsFSyncCalled.get()); } final SortedSet> expectedCompletedRanges = randomPopulateAndReads(cacheFile); if (expectedCompletedRanges.isEmpty() == false) { assertTrue(cacheFile.needsFsync()); + assertTrue(needsFSyncCalled.getAndSet(false)); } else { assertFalse(cacheFile.needsFsync()); + assertFalse(needsFSyncCalled.get()); } final SortedSet> completedRanges = cacheFile.fsync(); - assertNumberOfFSyncs(cacheFile.getFile(), equalTo(expectedCompletedRanges.isEmpty() ? 0L : 1L)); + assertNumberOfFSyncs(cacheFile.getFile(), equalTo(expectedCompletedRanges.isEmpty() ? 0 : 1)); assertArrayEquals(completedRanges.toArray(Tuple[]::new), expectedCompletedRanges.toArray(Tuple[]::new)); assertFalse(cacheFile.needsFsync()); + assertFalse(needsFSyncCalled.get()); } finally { cacheFile.release(listener); } + } finally { + fileSystem.tearDown(); } } public void testFSyncOnEvictedFile() throws Exception { - try (FSyncTrackingFileSystemProvider fileSystem = setupFSyncCountingFileSystem()) { - final CacheFile cacheFile = new CacheFile("test", randomLongBetween(1L, 1000L), fileSystem.resolve("test")); + final FSyncTrackingFileSystemProvider fileSystem = setupFSyncCountingFileSystem(); + try { + final AtomicBoolean needsFSyncCalled = new AtomicBoolean(); + final CacheFile cacheFile = new CacheFile( + "test", + randomLongBetween(1L, 1000L), + fileSystem.resolve("test"), + () -> assertFalse(needsFSyncCalled.getAndSet(true)) + ); assertFalse(cacheFile.needsFsync()); + assertFalse(needsFSyncCalled.get()); final TestEvictionListener listener = new TestEvictionListener(); cacheFile.acquire(listener); + final RunOnce releaseOnce = new RunOnce(() -> cacheFile.release(listener)); try { final SortedSet> expectedCompletedRanges = randomPopulateAndReads(cacheFile); if (expectedCompletedRanges.isEmpty() == false) { assertTrue(cacheFile.needsFsync()); + assertTrue(needsFSyncCalled.getAndSet(false)); + final SortedSet> completedRanges = cacheFile.fsync(); assertArrayEquals(completedRanges.toArray(Tuple[]::new), expectedCompletedRanges.toArray(Tuple[]::new)); - assertNumberOfFSyncs(cacheFile.getFile(), equalTo(1L)); - assertFalse(cacheFile.needsFsync()); + assertNumberOfFSyncs(cacheFile.getFile(), equalTo(1)); } assertFalse(cacheFile.needsFsync()); + assertFalse(needsFSyncCalled.get()); cacheFile.startEviction(); + if (rarely()) { + assertThat("New ranges should not be written after cache file eviction", randomPopulateAndReads(cacheFile), hasSize(0)); + } + if (randomBoolean()) { + releaseOnce.run(); + } + final SortedSet> completedRangesAfterEviction = cacheFile.fsync(); - assertNumberOfFSyncs(cacheFile.getFile(), equalTo(expectedCompletedRanges.isEmpty() ? 0L : 1L)); + assertNumberOfFSyncs(cacheFile.getFile(), equalTo(expectedCompletedRanges.isEmpty() ? 0 : 1)); assertThat(completedRangesAfterEviction, hasSize(0)); assertFalse(cacheFile.needsFsync()); + assertFalse(needsFSyncCalled.get()); } finally { - cacheFile.release(listener); + releaseOnce.run(); } + } finally { + fileSystem.tearDown(); } } public void testFSyncFailure() throws Exception { - try (FSyncTrackingFileSystemProvider fileSystem = setupFSyncCountingFileSystem()) { - fileSystem.failFSyncs.set(true); - - final CacheFile cacheFile = new CacheFile("test", randomLongBetween(1L, 1000L), fileSystem.resolve("test")); + final FSyncTrackingFileSystemProvider fileSystem = setupFSyncCountingFileSystem(); + try { + fileSystem.failFSyncs(true); + + final AtomicBoolean needsFSyncCalled = new AtomicBoolean(); + final CacheFile cacheFile = new CacheFile( + "test", + randomLongBetween(1L, 1000L), + fileSystem.resolve("test"), + () -> assertFalse(needsFSyncCalled.getAndSet(true)) + ); assertFalse(cacheFile.needsFsync()); + assertFalse(needsFSyncCalled.get()); final TestEvictionListener listener = new TestEvictionListener(); cacheFile.acquire(listener); @@ -271,23 +306,30 @@ public void testFSyncFailure() throws Exception { final SortedSet> expectedCompletedRanges = randomPopulateAndReads(cacheFile); if (expectedCompletedRanges.isEmpty() == false) { assertTrue(cacheFile.needsFsync()); - expectThrows(IOException.class, cacheFile::fsync); + assertTrue(needsFSyncCalled.getAndSet(false)); + IOException exception = expectThrows(IOException.class, cacheFile::fsync); + assertThat(exception.getMessage(), containsString("simulated")); + assertTrue(cacheFile.needsFsync()); + assertTrue(needsFSyncCalled.getAndSet(false)); } else { assertFalse(cacheFile.needsFsync()); final SortedSet> completedRanges = cacheFile.fsync(); assertTrue(completedRanges.isEmpty()); } - assertNumberOfFSyncs(cacheFile.getFile(), equalTo(0L)); + assertNumberOfFSyncs(cacheFile.getFile(), equalTo(0)); - fileSystem.failFSyncs.set(false); + fileSystem.failFSyncs(false); final SortedSet> completedRanges = cacheFile.fsync(); assertArrayEquals(completedRanges.toArray(Tuple[]::new), expectedCompletedRanges.toArray(Tuple[]::new)); - assertNumberOfFSyncs(cacheFile.getFile(), equalTo(expectedCompletedRanges.isEmpty() ? 0L : 1L)); + assertNumberOfFSyncs(cacheFile.getFile(), equalTo(expectedCompletedRanges.isEmpty() ? 0 : 1)); assertFalse(cacheFile.needsFsync()); + assertFalse(needsFSyncCalled.get()); } finally { cacheFile.release(listener); } + } finally { + fileSystem.tearDown(); } } @@ -309,34 +351,11 @@ public void onEviction(CacheFile evictedCacheFile) { } } - private SortedSet> randomPopulateAndReads(final CacheFile cacheFile) { - final SortedSet> ranges = synchronizedNavigableSet(new TreeSet<>(Comparator.comparingLong(Tuple::v1))); - final List> futures = new ArrayList<>(); - final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue( - builder().put(NODE_NAME_SETTING.getKey(), getTestName()).build(), - random() - ); - for (int i = 0; i < between(0, 10); i++) { - final long start = randomLongBetween(0L, cacheFile.getLength() - 1L); - final long end = randomLongBetween(start + 1L, cacheFile.getLength()); - final Tuple range = Tuple.tuple(start, end); - futures.add( - cacheFile.populateAndRead(range, range, channel -> Math.toIntExact(end - start), (channel, from, to, progressUpdater) -> { - ranges.add(Tuple.tuple(from, to)); - progressUpdater.accept(to); - }, deterministicTaskQueue.getThreadPool().generic()) - ); - } - deterministicTaskQueue.runAllRunnableTasks(); - assertTrue(futures.stream().allMatch(Future::isDone)); - return mergeContiguousRanges(ranges); - } - - public static void assertNumberOfFSyncs(final Path path, final Matcher matcher) { + public static void assertNumberOfFSyncs(final Path path, final Matcher matcher) { final FSyncTrackingFileSystemProvider provider = (FSyncTrackingFileSystemProvider) path.getFileSystem().provider(); - final AtomicLong fsyncCounter = provider.files.get(path); - assertThat("File [" + path + "] was never fsynced", notNullValue()); - assertThat("Mismatching number of fsync for [" + path + "]", fsyncCounter.get(), matcher); + final Integer fsyncCount = provider.getNumberOfFSyncs(path); + assertThat("File [" + path + "] was never fsynced", fsyncCount, notNullValue()); + assertThat("Mismatching number of fsync for [" + path + "]", fsyncCount, matcher); } private static FSyncTrackingFileSystemProvider setupFSyncCountingFileSystem() { @@ -345,48 +364,4 @@ private static FSyncTrackingFileSystemProvider setupFSyncCountingFileSystem() { PathUtilsForTesting.installMock(provider.getFileSystem(null)); return provider; } - - /** - * A {@link FileSystemProvider} that counts the number of times the method {@link FileChannel#force(boolean)} is executed on every - * files. It reinstates the default file system when this file system provider is closed. - */ - private static class FSyncTrackingFileSystemProvider extends FilterFileSystemProvider implements AutoCloseable { - - private final Map files = new ConcurrentHashMap<>(); - private final AtomicBoolean failFSyncs = new AtomicBoolean(); - private final FileSystem delegateInstance; - private final Path rootDir; - - FSyncTrackingFileSystemProvider(FileSystem delegate, Path rootDir) { - super("fsynccounting://", delegate); - this.rootDir = new FilterPath(rootDir, this.fileSystem); - this.delegateInstance = delegate; - } - - public Path resolve(String other) { - return rootDir.resolve(other); - } - - @Override - public FileChannel newFileChannel(Path path, Set options, FileAttribute... attrs) throws IOException { - return new FilterFileChannel(delegate.newFileChannel(toDelegate(path), options, attrs)) { - - final AtomicLong counter = files.computeIfAbsent(path, p -> new AtomicLong(0L)); - - @Override - public void force(boolean metaData) throws IOException { - if (failFSyncs.get()) { - throw new IOException("simulated"); - } - super.force(metaData); - counter.incrementAndGet(); - } - }; - } - - @Override - public void close() { - PathUtilsForTesting.installMock(delegateInstance); - } - } } diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/TestUtils.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/TestUtils.java index e120524694f9f..ac856bfc42375 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/TestUtils.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/TestUtils.java @@ -5,16 +5,22 @@ */ package org.elasticsearch.index.store.cache; +import org.apache.lucene.mockfile.FilterFileChannel; +import org.apache.lucene.mockfile.FilterFileSystemProvider; +import org.apache.lucene.mockfile.FilterPath; import org.elasticsearch.action.ActionListener; import org.elasticsearch.blobstore.cache.BlobStoreCacheService; import org.elasticsearch.blobstore.cache.CachedBlob; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.coordination.DeterministicTaskQueue; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobMetadata; import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.DeleteResult; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.io.PathUtilsForTesting; import org.elasticsearch.common.io.Streams; import org.elasticsearch.index.store.IndexInputStats; @@ -22,20 +28,62 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; +import java.nio.channels.FileChannel; +import java.nio.file.FileSystem; +import java.nio.file.OpenOption; +import java.nio.file.Path; +import java.nio.file.attribute.FileAttribute; +import java.nio.file.spi.FileSystemProvider; +import java.util.ArrayList; import java.util.Comparator; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import static java.util.Collections.synchronizedNavigableSet; +import static org.apache.lucene.util.LuceneTestCase.random; +import static org.elasticsearch.common.settings.Settings.builder; +import static org.elasticsearch.node.Node.NODE_NAME_SETTING; +import static org.elasticsearch.test.ESTestCase.between; +import static org.elasticsearch.test.ESTestCase.randomLongBetween; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants.toIntBytes; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; public final class TestUtils { private TestUtils() {} + public static SortedSet> randomPopulateAndReads(final CacheFile cacheFile) { + final SortedSet> ranges = synchronizedNavigableSet(new TreeSet<>(Comparator.comparingLong(Tuple::v1))); + final List> futures = new ArrayList<>(); + final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue( + builder().put(NODE_NAME_SETTING.getKey(), "_node").build(), + random() + ); + for (int i = 0; i < between(0, 10); i++) { + final long start = randomLongBetween(0L, cacheFile.getLength() - 1L); + final long end = randomLongBetween(start + 1L, cacheFile.getLength()); + final Tuple range = Tuple.tuple(start, end); + futures.add( + cacheFile.populateAndRead(range, range, channel -> Math.toIntExact(end - start), (channel, from, to, progressUpdater) -> { + ranges.add(Tuple.tuple(from, to)); + progressUpdater.accept(to); + }, deterministicTaskQueue.getThreadPool().generic()) + ); + } + deterministicTaskQueue.runAllRunnableTasks(); + assertTrue(futures.stream().allMatch(Future::isDone)); + return mergeContiguousRanges(ranges); + } + public static long numberOfRanges(long fileSize, long rangeSize) { return numberOfRanges(toIntBytes(fileSize), toIntBytes(rangeSize)); } @@ -51,7 +99,7 @@ static long numberOfRanges(int fileSize, int rangeSize) { return numberOfRanges; } - static SortedSet> mergeContiguousRanges(final SortedSet> ranges) { + public static SortedSet> mergeContiguousRanges(final SortedSet> ranges) { // Eclipse needs the TreeSet type to be explicit (see https://bugs.eclipse.org/bugs/show_bug.cgi?id=568600) return ranges.stream().collect(() -> new TreeSet>(Comparator.comparingLong(Tuple::v1)), (gaps, gap) -> { if (gaps.isEmpty()) { @@ -245,4 +293,56 @@ public void putAsync( listener.onResponse(null); } } + + /** + * A {@link FileSystemProvider} that counts the number of times the method {@link FileChannel#force(boolean)} is executed on every + * files. + */ + public static class FSyncTrackingFileSystemProvider extends FilterFileSystemProvider { + + private final Map files = new ConcurrentHashMap<>(); + private final AtomicBoolean failFSyncs = new AtomicBoolean(); + private final FileSystem delegateInstance; + private final Path rootDir; + + public FSyncTrackingFileSystemProvider(FileSystem delegate, Path rootDir) { + super("fsynccounting://", delegate); + this.rootDir = new FilterPath(rootDir, this.fileSystem); + this.delegateInstance = delegate; + } + + public void failFSyncs(boolean shouldFail) { + failFSyncs.set(shouldFail); + } + + public Path resolve(String other) { + return rootDir.resolve(other); + } + + @Nullable + public Integer getNumberOfFSyncs(Path path) { + final AtomicInteger counter = files.get(path); + return counter != null ? counter.get() : null; + } + + @Override + public FileChannel newFileChannel(Path path, Set options, FileAttribute... attrs) throws IOException { + final AtomicInteger counter = files.computeIfAbsent(path, p -> new AtomicInteger(0)); + return new FilterFileChannel(delegate.newFileChannel(toDelegate(path), options, attrs)) { + + @Override + public void force(boolean metaData) throws IOException { + if (failFSyncs.get()) { + throw new IOException("simulated"); + } + super.force(metaData); + counter.incrementAndGet(); + } + }; + } + + public void tearDown() { + PathUtilsForTesting.installMock(delegateInstance); + } + } } diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/AbstractSearchableSnapshotsTestCase.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/AbstractSearchableSnapshotsTestCase.java index db347b423b75b..8edf4aa3f7865 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/AbstractSearchableSnapshotsTestCase.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/AbstractSearchableSnapshotsTestCase.java @@ -8,20 +8,29 @@ import org.elasticsearch.Version; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodeRole; import org.elasticsearch.cluster.routing.RecoverySource; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.TestShardRouting; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.lucene.store.ESIndexInputTestCase; +import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.set.Sets; +import org.elasticsearch.core.internal.io.IOUtils; +import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.indices.recovery.SearchableSnapshotRecoveryState; import org.elasticsearch.repositories.IndexId; import org.elasticsearch.snapshots.Snapshot; import org.elasticsearch.snapshots.SnapshotId; +import org.elasticsearch.test.ClusterServiceUtils; +import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPoolStats; @@ -29,19 +38,46 @@ import org.junit.After; import org.junit.Before; +import java.util.Collections; +import java.util.Set; import java.util.concurrent.TimeUnit; public abstract class AbstractSearchableSnapshotsTestCase extends ESIndexInputTestCase { + private static final ClusterSettings CLUSTER_SETTINGS = new ClusterSettings( + Settings.EMPTY, + Sets.union( + ClusterSettings.BUILT_IN_CLUSTER_SETTINGS, + Set.of( + CacheService.SNAPSHOT_CACHE_SIZE_SETTING, + CacheService.SNAPSHOT_CACHE_RANGE_SIZE_SETTING, + CacheService.SNAPSHOT_CACHE_SYNC_INTERVAL_SETTING, + CacheService.SNAPSHOT_CACHE_MAX_FILES_TO_SYNC_AT_ONCE_SETTING + ) + ) + ); + protected ThreadPool threadPool; + protected ClusterService clusterService; + protected NodeEnvironment nodeEnvironment; @Before - public void setUpTest() { + public void setUpTest() throws Exception { + final DiscoveryNode node = new DiscoveryNode( + "node", + ESTestCase.buildNewFakeTransportAddress(), + Collections.emptyMap(), + DiscoveryNodeRole.BUILT_IN_ROLES, + Version.CURRENT + ); threadPool = new TestThreadPool(getTestName(), SearchableSnapshots.executorBuilders()); + clusterService = ClusterServiceUtils.createClusterService(threadPool, node, CLUSTER_SETTINGS); + nodeEnvironment = newNodeEnvironment(); } @After - public void tearDownTest() { + public void tearDownTest() throws Exception { + IOUtils.close(nodeEnvironment, clusterService); assertTrue(ThreadPool.terminate(threadPool, 30L, TimeUnit.SECONDS)); } @@ -49,7 +85,7 @@ public void tearDownTest() { * @return a new {@link CacheService} instance configured with default settings */ protected CacheService defaultCacheService() { - return new CacheService(AbstractSearchableSnapshotsTestCase::noOpCacheCleaner, Settings.EMPTY); + return new CacheService(Settings.EMPTY, clusterService, threadPool, AbstractSearchableSnapshotsTestCase::noOpCacheCleaner); } /** @@ -63,7 +99,13 @@ protected CacheService randomCacheService() { if (randomBoolean()) { cacheSettings.put(CacheService.SNAPSHOT_CACHE_RANGE_SIZE_SETTING.getKey(), randomCacheRangeSize()); } - return new CacheService(AbstractSearchableSnapshotsTestCase::noOpCacheCleaner, cacheSettings.build()); + if (randomBoolean()) { + cacheSettings.put( + CacheService.SNAPSHOT_CACHE_SYNC_INTERVAL_SETTING.getKey(), + TimeValue.timeValueSeconds(scaledRandomIntBetween(1, 120)) + ); + } + return new CacheService(cacheSettings.build(), clusterService, threadPool, AbstractSearchableSnapshotsTestCase::noOpCacheCleaner); } /** @@ -71,11 +113,13 @@ protected CacheService randomCacheService() { */ protected CacheService createCacheService(final ByteSizeValue cacheSize, final ByteSizeValue cacheRangeSize) { return new CacheService( - AbstractSearchableSnapshotsTestCase::noOpCacheCleaner, Settings.builder() .put(CacheService.SNAPSHOT_CACHE_SIZE_SETTING.getKey(), cacheSize) .put(CacheService.SNAPSHOT_CACHE_RANGE_SIZE_SETTING.getKey(), cacheRangeSize) - .build() + .build(), + clusterService, + threadPool, + AbstractSearchableSnapshotsTestCase::noOpCacheCleaner ); } diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheServiceTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheServiceTests.java new file mode 100644 index 0000000000000..130068ebe3ec1 --- /dev/null +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheServiceTests.java @@ -0,0 +1,164 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.searchablesnapshots.cache; + +import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.io.PathUtils; +import org.elasticsearch.common.io.PathUtilsForTesting; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.set.Sets; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.store.cache.CacheFile; +import org.elasticsearch.index.store.cache.CacheKey; +import org.elasticsearch.index.store.cache.TestUtils.FSyncTrackingFileSystemProvider; +import org.elasticsearch.repositories.IndexId; +import org.elasticsearch.snapshots.SnapshotId; +import org.elasticsearch.xpack.searchablesnapshots.AbstractSearchableSnapshotsTestCase; +import org.junit.AfterClass; +import org.junit.BeforeClass; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.HashMap; +import java.util.Locale; +import java.util.Map; +import java.util.SortedSet; + +import static org.elasticsearch.index.store.cache.TestUtils.randomPopulateAndReads; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; + +public class CacheServiceTests extends AbstractSearchableSnapshotsTestCase { + + private static FSyncTrackingFileSystemProvider fileSystemProvider; + + @BeforeClass + public static void installFileSystem() { + fileSystemProvider = new FSyncTrackingFileSystemProvider(PathUtils.getDefaultFileSystem(), createTempDir()); + PathUtilsForTesting.installMock(fileSystemProvider.getFileSystem(null)); + } + + @AfterClass + public static void removeFileSystem() { + fileSystemProvider.tearDown(); + } + + public void testCacheSynchronization() throws Exception { + final int numShards = randomIntBetween(1, 3); + final Index index = new Index(randomAlphaOfLength(5).toLowerCase(Locale.ROOT), UUIDs.randomBase64UUID(random())); + final SnapshotId snapshotId = new SnapshotId("_snapshot_name", UUIDs.randomBase64UUID(random())); + final IndexId indexId = new IndexId("_index_name", UUIDs.randomBase64UUID(random())); + + logger.debug("--> creating shard cache directories on disk"); + final Path[] shardsCacheDirs = new Path[numShards]; + for (int i = 0; i < numShards; i++) { + final Path shardDataPath = randomFrom(nodeEnvironment.availableShardPaths(new ShardId(index, i))); + assertFalse(Files.exists(shardDataPath)); + + logger.debug("--> creating directories [{}] for shard [{}]", shardDataPath.toAbsolutePath(), i); + shardsCacheDirs[i] = Files.createDirectories(CacheService.resolveSnapshotCache(shardDataPath).resolve(snapshotId.getUUID())); + } + + try (CacheService cacheService = defaultCacheService()) { + logger.debug("--> setting large cache sync interval (explicit cache synchronization calls in test)"); + cacheService.setCacheSyncInterval(TimeValue.timeValueMillis(Long.MAX_VALUE)); + cacheService.start(); + + // Keep a count of the number of writes for every cache file existing in the cache + final Map> previous = new HashMap<>(); + + for (int iteration = 0; iteration < between(1, 10); iteration++) { + + final Map> updates = new HashMap<>(); + + logger.trace("--> more random reads/writes from existing cache files"); + for (Map.Entry> cacheEntry : randomSubsetOf(previous.entrySet())) { + final CacheKey cacheKey = cacheEntry.getKey(); + final CacheFile cacheFile = cacheEntry.getValue().v1(); + + final CacheFile.EvictionListener listener = evictedCacheFile -> {}; + cacheFile.acquire(listener); + + final SortedSet> newCacheRanges = randomPopulateAndReads(cacheFile); + assertThat(cacheService.isCacheFileToSync(cacheFile), is(newCacheRanges.isEmpty() == false)); + if (newCacheRanges.isEmpty() == false) { + final int numberOfWrites = cacheEntry.getValue().v2() + 1; + updates.put(cacheKey, Tuple.tuple(cacheFile, numberOfWrites)); + } + cacheFile.release(listener); + } + + logger.trace("--> creating new cache files and randomly read/write them"); + for (int i = 0; i < between(1, 25); i++) { + final ShardId shardId = new ShardId(index, randomIntBetween(0, numShards - 1)); + final String fileName = String.format(Locale.ROOT, "file_%d_%d", iteration, i); + final CacheKey cacheKey = new CacheKey(snapshotId, indexId, shardId, fileName); + final CacheFile cacheFile = cacheService.get(cacheKey, randomIntBetween(1, 10_000), shardsCacheDirs[shardId.id()]); + + final CacheFile.EvictionListener listener = evictedCacheFile -> {}; + cacheFile.acquire(listener); + + final SortedSet> newRanges = randomPopulateAndReads(cacheFile); + assertThat(cacheService.isCacheFileToSync(cacheFile), is(newRanges.isEmpty() == false)); + updates.put(cacheKey, Tuple.tuple(cacheFile, newRanges.isEmpty() ? 0 : 1)); + cacheFile.release(listener); + } + + logger.trace("--> evicting random cache files"); + final Map evictions = new HashMap<>(); + for (CacheKey evictedCacheKey : randomSubsetOf(Sets.union(previous.keySet(), updates.keySet()))) { + cacheService.removeFromCache(evictedCacheKey::equals); + Tuple evicted = previous.remove(evictedCacheKey); + if (evicted != null) { + evictions.put(evicted.v1(), evicted.v2()); + updates.remove(evictedCacheKey); + } else { + evicted = updates.remove(evictedCacheKey); + evictions.put(evicted.v1(), 0); + } + } + + logger.trace("--> capturing expected number of fsyncs per cache directory before synchronization"); + final Map cacheDirFSyncs = new HashMap<>(); + for (int i = 0; i < shardsCacheDirs.length; i++) { + final Path shardCacheDir = shardsCacheDirs[i]; + final ShardId shardId = new ShardId(index, i); + final Integer numberOfFSyncs = fileSystemProvider.getNumberOfFSyncs(shardCacheDir); + if (updates.entrySet() + .stream() + .filter(update -> update.getValue().v2() != null) + .filter(update -> update.getValue().v2() > 0) + .anyMatch(update -> update.getKey().getShardId().equals(shardId))) { + cacheDirFSyncs.put(shardCacheDir, numberOfFSyncs == null ? 1 : numberOfFSyncs + 1); + } else { + cacheDirFSyncs.put(shardCacheDir, numberOfFSyncs); + } + } + + logger.debug("--> synchronizing cache files [#{}]", iteration); + cacheService.synchronizeCache(); + + logger.trace("--> verifying cache synchronization correctness"); + cacheDirFSyncs.forEach( + (dir, expectedNumberOfFSyncs) -> assertThat(fileSystemProvider.getNumberOfFSyncs(dir), equalTo(expectedNumberOfFSyncs)) + ); + evictions.forEach((cacheFile, expectedNumberOfFSyncs) -> { + assertThat(cacheService.isCacheFileToSync(cacheFile), is(false)); + assertThat(fileSystemProvider.getNumberOfFSyncs(cacheFile.getFile()), equalTo(expectedNumberOfFSyncs)); + }); + previous.putAll(updates); + previous.forEach((key, cacheFileAndExpectedNumberOfFSyncs) -> { + CacheFile cacheFile = cacheFileAndExpectedNumberOfFSyncs.v1(); + assertThat(cacheService.isCacheFileToSync(cacheFile), is(false)); + assertThat(fileSystemProvider.getNumberOfFSyncs(cacheFile.getFile()), equalTo(cacheFileAndExpectedNumberOfFSyncs.v2())); + }); + } + } + } +}