Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make searchable snapshots cache persistent #65725

Merged
merged 27 commits into from
Dec 14, 2020
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,88 +4,41 @@
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.searchablesnapshots;
package org.elasticsearch.xpack.searchablesnapshots.cache;

import org.apache.lucene.mockfile.FilterFileSystemProvider;
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
import org.apache.lucene.document.Document;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.PathUtils;
import org.elasticsearch.common.io.PathUtilsForTesting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.shard.ShardPath;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.snapshots.SnapshotInfo;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotAction;
import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotRequest;
import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.elasticsearch.xpack.searchablesnapshots.BaseSearchableSnapshotsIntegTestCase;

import java.io.IOException;
import java.nio.file.DirectoryStream;
import java.nio.file.FileSystem;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.HashSet;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_ROUTING_REQUIRE_GROUP_PREFIX;
import static org.elasticsearch.index.IndexSettings.INDEX_SOFT_DELETES_SETTING;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.xpack.searchablesnapshots.cache.PersistentCache.resolveCacheIndexFolder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.notNullValue;

public class SearchableSnapshotsCacheClearingIntegTests extends BaseSearchableSnapshotsIntegTestCase {

private static DeleteBlockingFileSystemProvider deleteBlockingFileSystemProvider;

@BeforeClass
public static void installDeleteBlockingFileSystemProvider() {
FileSystem current = PathUtils.getDefaultFileSystem();
deleteBlockingFileSystemProvider = new DeleteBlockingFileSystemProvider(current);
PathUtilsForTesting.installMock(deleteBlockingFileSystemProvider.getFileSystem(null));
}

@AfterClass
public static void removeDeleteBlockingFileSystemProvider() {
PathUtilsForTesting.teardown();
}

void startBlockingDeletes() {
deleteBlockingFileSystemProvider.injectFailures.set(true);
}

void stopBlockingDeletes() {
deleteBlockingFileSystemProvider.injectFailures.set(false);
}

private static class DeleteBlockingFileSystemProvider extends FilterFileSystemProvider {

AtomicBoolean injectFailures = new AtomicBoolean();

DeleteBlockingFileSystemProvider(FileSystem inner) {
super("deleteblocking://", inner);
}

@Override
public boolean deleteIfExists(Path path) throws IOException {
if (injectFailures.get()) {
throw new IOException("blocked deletion of " + path);
} else {
return super.deleteIfExists(path);
}
}

}
public class SearchableSnapshotsPersistentCacheIntegTests extends BaseSearchableSnapshotsIntegTestCase {

@Override
protected Settings nodeSettings(int nodeOrdinal) {
Expand All @@ -96,7 +49,7 @@ protected Settings nodeSettings(int nodeOrdinal) {
.build();
}

public void testCacheDirectoriesRemovedOnStartup() throws Exception {
public void testCacheSurviveRestart() throws Exception {
final String fsRepoName = randomAlphaOfLength(10);
final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
final String restoredIndexName = randomBoolean() ? indexName : randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
Expand All @@ -117,18 +70,13 @@ public void testCacheDirectoriesRemovedOnStartup() throws Exception {
final DiscoveryNodes discoveryNodes = client().admin().cluster().prepareState().clear().setNodes(true).get().getState().nodes();
final String dataNode = randomFrom(discoveryNodes.getDataNodes().values().toArray(DiscoveryNode.class)).getName();

final MountSearchableSnapshotRequest req = new MountSearchableSnapshotRequest(
restoredIndexName,
mountSnapshot(
fsRepoName,
snapshotName,
indexName,
Settings.builder().put(INDEX_ROUTING_REQUIRE_GROUP_PREFIX + "._name", dataNode).build(),
Strings.EMPTY_ARRAY,
true
restoredIndexName,
Settings.builder().put(INDEX_ROUTING_REQUIRE_GROUP_PREFIX + "._name", dataNode).build()
);

final RestoreSnapshotResponse restoreSnapshotResponse = client().execute(MountSearchableSnapshotAction.INSTANCE, req).get();
assertThat(restoreSnapshotResponse.getRestoreInfo().failedShards(), equalTo(0));
ensureGreen(restoredIndexName);

final Index restoredIndex = client().admin()
Expand All @@ -143,7 +91,9 @@ public void testCacheDirectoriesRemovedOnStartup() throws Exception {
.getIndex();

final IndexService indexService = internalCluster().getInstance(IndicesService.class, dataNode).indexService(restoredIndex);
final Path shardCachePath = CacheService.getShardCachePath(indexService.getShard(0).shardPath());
final ShardPath shardPath = indexService.getShard(0).shardPath();
final Path shardCachePath = CacheService.getShardCachePath(shardPath);

assertTrue(Files.isDirectory(shardCachePath));
final Set<Path> cacheFiles = new HashSet<>();
try (DirectoryStream<Path> snapshotCacheStream = Files.newDirectoryStream(shardCachePath)) {
henningandersen marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -159,25 +109,49 @@ public void testCacheDirectoriesRemovedOnStartup() throws Exception {
}
assertFalse("no cache files found", cacheFiles.isEmpty());

startBlockingDeletes();
CacheService cacheService = internalCluster().getInstance(CacheService.class, dataNode);
cacheService.synchronizeCache();

PersistentCache persistentCache = cacheService.getPersistentCache();
assertThat(persistentCache.getNumDocs(), equalTo((long) cacheFiles.size()));

internalCluster().restartNode(dataNode, new InternalTestCluster.RestartCallback() {
@Override
public Settings onNodeStopped(String nodeName) {
assertTrue(Files.isDirectory(shardCachePath));
for (Path cacheFile : cacheFiles) {
assertTrue(cacheFile + " should not have been cleaned up yet", Files.isRegularFile(cacheFile));
try {
assertTrue(Files.isDirectory(shardCachePath));

final Path persistentCacheIndexDir = resolveCacheIndexFolder(shardPath.getRootDataPath());
assertTrue(Files.isDirectory(persistentCacheIndexDir));

final Map<String, Document> documents = PersistentCache.loadDocuments(persistentCacheIndexDir);
assertThat(documents.size(), equalTo(cacheFiles.size()));

for (Path cacheFile : cacheFiles) {
final String cacheFileName = cacheFile.getFileName().toString();
assertTrue(cacheFileName + " should exist on disk", Files.isRegularFile(cacheFile));
assertThat(cacheFileName + " should exist in persistent cache index", documents.get(cacheFileName), notNullValue());
}
} catch (IOException e) {
throw new AssertionError(e);
}
stopBlockingDeletes();
return Settings.EMPTY;
}
});

persistentCache = internalCluster().getInstance(CacheService.class, dataNode).getPersistentCache();
assertThat(persistentCache.getNumDocs(), equalTo((long) cacheFiles.size()));
ensureGreen(restoredIndexName);

for (Path cacheFile : cacheFiles) {
assertFalse(cacheFile + " should have been cleaned up", Files.exists(cacheFile));
}
cacheFiles.forEach(cacheFile -> assertTrue(cacheFile + " should have survived node restart", Files.exists(cacheFile)));

assertAcked(client().admin().indices().prepareDelete(restoredIndexName));

assertBusy(() -> cacheFiles.forEach(cacheFile -> assertFalse(cacheFile + " should have been cleaned up", Files.exists(cacheFile))));
cacheService = internalCluster().getInstance(CacheService.class, dataNode);
cacheService.synchronizeCache();

persistentCache = cacheService.getPersistentCache();
assertThat(persistentCache.getNumDocs(), equalTo(0L));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

import java.nio.file.Path;

import static org.elasticsearch.index.store.SearchableSnapshotDirectory.unwrapDirectory;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_INDEX_ID_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_INDEX_NAME_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_SNAPSHOT_ID_SETTING;
Expand All @@ -49,6 +50,13 @@ public SearchableSnapshotIndexEventListener(Settings settings, @Nullable CacheSe
this.cacheService = cacheService;
}

/**
* Called before a searchable snapshot {@link IndexShard} starts to recover. This event is used to trigger the loading of the shard
* snapshot information that contains the list of shard's Lucene files.
*
* @param indexShard the shard that is about to recover
* @param indexSettings the shard's index settings
*/
@Override
public void beforeIndexShardRecovery(IndexShard indexShard, IndexSettings indexSettings) {
assert Thread.currentThread().getName().contains(ThreadPool.Names.GENERIC);
Expand All @@ -57,7 +65,7 @@ public void beforeIndexShardRecovery(IndexShard indexShard, IndexSettings indexS
}

private static void ensureSnapshotIsLoaded(IndexShard indexShard) {
final SearchableSnapshotDirectory directory = SearchableSnapshotDirectory.unwrapDirectory(indexShard.store().directory());
final SearchableSnapshotDirectory directory = unwrapDirectory(indexShard.store().directory());
assert directory != null;
final StepListener<Void> preWarmListener = new StepListener<>();
final boolean success = directory.loadSnapshot(indexShard.recoveryState(), preWarmListener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
import org.elasticsearch.xpack.searchablesnapshots.action.TransportMountSearchableSnapshotAction;
import org.elasticsearch.xpack.searchablesnapshots.action.TransportSearchableSnapshotsStatsAction;
import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService;
import org.elasticsearch.xpack.searchablesnapshots.cache.NodeEnvironmentCacheCleaner;
import org.elasticsearch.xpack.searchablesnapshots.cache.PersistentCache;
import org.elasticsearch.xpack.searchablesnapshots.rest.RestClearSearchableSnapshotsCacheAction;
import org.elasticsearch.xpack.searchablesnapshots.rest.RestMountSearchableSnapshotAction;
import org.elasticsearch.xpack.searchablesnapshots.rest.RestSearchableSnapshotsStatsAction;
Expand Down Expand Up @@ -210,12 +210,7 @@ public Collection<Object> createComponents(
this.threadPool.set(threadPool);
this.failShardsListener.set(new FailShardsOnInvalidLicenseClusterListener(getLicenseState(), clusterService.getRerouteService()));
if (DiscoveryNode.isDataNode(settings)) {
final CacheService cacheService = new CacheService(
settings,
clusterService,
threadPool,
new NodeEnvironmentCacheCleaner(nodeEnvironment)
);
final CacheService cacheService = new CacheService(settings, clusterService, threadPool, new PersistentCache(nodeEnvironment));
this.cacheService.set(cacheService);
components.add(cacheService);
final BlobStoreCacheService blobStoreCacheService = new BlobStoreCacheService(
Expand All @@ -226,6 +221,8 @@ public Collection<Object> createComponents(
);
this.blobStoreCacheService.set(blobStoreCacheService);
components.add(blobStoreCacheService);
} else {
PersistentCache.cleanUp(settings, nodeEnvironment);
}
return Collections.unmodifiableList(components);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,9 @@ public class CacheService extends AbstractLifecycleComponent {
private final CacheSynchronizationTask cacheSyncTask;
private final TimeValue cacheSyncStopTimeout;
private final ReentrantLock cacheSyncLock;
private final PersistentCache persistentCache;
private final Cache<CacheKey, CacheFile> cache;
private final ByteSizeValue cacheSize;
private final Runnable cacheCleaner;
private final ByteSizeValue rangeSize;
private final KeyedLock<ShardEviction> shardsEvictionLock;
private final Set<ShardEviction> evictedShards;
Expand All @@ -127,11 +127,10 @@ public CacheService(
final Settings settings,
final ClusterService clusterService,
final ThreadPool threadPool,
final Runnable cacheCleaner
final PersistentCache persistentCache
) {
this.threadPool = Objects.requireNonNull(threadPool);
this.cacheSize = SNAPSHOT_CACHE_SIZE_SETTING.get(settings);
this.cacheCleaner = Objects.requireNonNull(cacheCleaner);
this.rangeSize = SNAPSHOT_CACHE_RANGE_SIZE_SETTING.get(settings);
this.cache = CacheBuilder.<CacheKey, CacheFile>builder()
.setMaximumWeight(cacheSize.getBytes())
Expand All @@ -140,6 +139,7 @@ public CacheService(
// are done with reading/writing the cache file
.removalListener(notification -> onCacheFileRemoval(notification.getValue()))
.build();
this.persistentCache = Objects.requireNonNull(persistentCache);
this.shardsEvictionLock = new KeyedLock<>();
this.evictedShards = ConcurrentCollections.newConcurrentSet();
this.numberOfCacheFilesToSync = new AtomicLong();
Expand All @@ -163,8 +163,8 @@ static Path resolveSnapshotCache(Path path) {

@Override
protected void doStart() {
persistentCache.loadCacheFiles(this);
cacheSyncTask.rescheduleIfNecessary();
cacheCleaner.run();
}

@Override
Expand All @@ -181,10 +181,15 @@ protected void doStop() {
logger.warn("interrupted while waiting for cache sync lock", e);
}
cacheSyncTask.close();
cache.invalidateAll();
} finally {
if (acquired) {
cacheSyncLock.unlock();
try {
persistentCache.close();
} catch (Exception e) {
logger.warn("failed to close persistent cache", e);
} finally {
if (acquired) {
cacheSyncLock.unlock();
}
}
}
}
Expand Down Expand Up @@ -397,19 +402,29 @@ private void onCacheFileUpdate(CacheFile cacheFile) {
/**
* This method is invoked after a {@link CacheFile} is evicted from the cache.
* <p>
* It notifies the {@link CacheFile}'s eviction listeners that the instance is evicted.
* It notifies the {@link CacheFile}'s eviction listeners that the instance is evicted and removes it from the persistent cache.
*
* @param cacheFile the evicted instance
*/
private void onCacheFileRemoval(CacheFile cacheFile) {
IOUtils.closeWhileHandlingException(cacheFile::startEviction);
try {
persistentCache.removeCacheFile(cacheFile);
} catch (Exception e) {
logger.warn("failed to remove cache file from persistent cache", e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible to add assert e instanceof IOException here? OK to warn in production, but would be good to catch if this fails in tests for other things than IO.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that sounds a good thing to do so I pushed 5aa545a

}
}

// used in tests
boolean isCacheFileToSync(CacheFile cacheFile) {
return cacheFilesToSync.contains(cacheFile);
}

// used in tests
PersistentCache getPersistentCache() {
return persistentCache;
}

/**
* Synchronize the cache files and their parent directories on disk.
*
Expand Down Expand Up @@ -459,21 +474,28 @@ protected void synchronizeCache() {
final Path cacheDir = cacheFilePath.toAbsolutePath().getParent();
if (cacheDirs.add(cacheDir)) {
try {
IOUtils.fsync(cacheDir, true, false);
IOUtils.fsync(cacheDir, true, false); // TODO evict cache file if fsync failed
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In relation to the todo, could we instead avoid persisting those that fail into the PersistentCache? We should be able to check cacheDirs.contains(cacheDir) before fsync'ing the dir and only add to cacheDirs if fsync is successful?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, I pushed eab0a0c.

logger.trace("cache directory [{}] synchronized", cacheDir);
} catch (Exception e) {
assert e instanceof IOException : e;
logger.warn(() -> new ParameterizedMessage("failed to synchronize cache directory [{}]", cacheDir), e);
}
}
// TODO Index searchable snapshot shard information + cache file ranges in Lucene
persistentCache.addCacheFile(cacheFile, ranges);
count += 1L;
}
} catch (Exception e) {
assert e instanceof IOException : e;
logger.warn(() -> new ParameterizedMessage("failed to fsync cache file [{}]", cacheFilePath.getFileName()), e);
}
}
if (count > 0 || persistentCache.hasDeletions()) {
try {
persistentCache.commit();
} catch (IOException e) {
logger.error("failed to commit persistent cache after synchronization", e);
}
}
if (logger.isDebugEnabled()) {
final long elapsedNanos = threadPool.relativeTimeInNanos() - startTimeNanos;
logger.debug(
Expand Down
Loading