Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Close query cache on index service creation failure #48230

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 28 additions & 13 deletions server/src/main/java/org/elasticsearch/index/IndexModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,10 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.analysis.AnalysisRegistry;
import org.elasticsearch.index.analysis.IndexAnalyzers;
import org.elasticsearch.index.cache.query.DisabledQueryCache;
import org.elasticsearch.index.cache.query.IndexQueryCache;
import org.elasticsearch.index.cache.query.QueryCache;
Expand Down Expand Up @@ -399,22 +401,35 @@ public IndexService newIndexService(
indexReaderWrapper.get() == null ? (shard) -> null : indexReaderWrapper.get();
eventListener.beforeIndexCreated(indexSettings.getIndex(), indexSettings.getSettings());
final IndexStorePlugin.DirectoryFactory directoryFactory = getDirectoryFactory(indexSettings, directoryFactories);
final QueryCache queryCache;
if (indexSettings.getValue(INDEX_QUERY_CACHE_ENABLED_SETTING)) {
BiFunction<IndexSettings, IndicesQueryCache, QueryCache> queryCacheProvider = forceQueryCacheProvider.get();
if (queryCacheProvider == null) {
queryCache = new IndexQueryCache(indexSettings, indicesQueryCache);
QueryCache queryCache = null;
IndexAnalyzers indexAnalyzers = null;
boolean success = false;
try {
if (indexSettings.getValue(INDEX_QUERY_CACHE_ENABLED_SETTING)) {
BiFunction<IndexSettings, IndicesQueryCache, QueryCache> queryCacheProvider = forceQueryCacheProvider.get();
if (queryCacheProvider == null) {
queryCache = new IndexQueryCache(indexSettings, indicesQueryCache);
} else {
queryCache = queryCacheProvider.apply(indexSettings, indicesQueryCache);
}
} else {
queryCache = queryCacheProvider.apply(indexSettings, indicesQueryCache);
queryCache = new DisabledQueryCache(indexSettings);
}
if (IndexService.needsMapperService(indexSettings, indexCreationContext)) {
indexAnalyzers = analysisRegistry.build(indexSettings);
}
final IndexService indexService = new IndexService(indexSettings, indexCreationContext, environment, xContentRegistry,
new SimilarityService(indexSettings, scriptService, similarities), shardStoreDeleter, indexAnalyzers,
engineFactory, circuitBreakerService, bigArrays, threadPool, scriptService, clusterService, client, queryCache,
directoryFactory, eventListener, readerWrapperFactory, mapperRegistry, indicesFieldDataCache, searchOperationListeners,
indexOperationListeners, namedWriteableRegistry);
success = true;
return indexService;
} finally {
if (success == false) {
IOUtils.closeWhileHandlingException(queryCache, indexAnalyzers);
}
} else {
queryCache = new DisabledQueryCache(indexSettings);
}
return new IndexService(indexSettings, indexCreationContext, environment, xContentRegistry,
new SimilarityService(indexSettings, scriptService, similarities),
shardStoreDeleter, analysisRegistry, engineFactory, circuitBreakerService, bigArrays, threadPool, scriptService,
clusterService, client, queryCache, directoryFactory, eventListener, readerWrapperFactory, mapperRegistry,
indicesFieldDataCache, searchOperationListeners, indexOperationListeners, namedWriteableRegistry);
}

private static IndexStorePlugin.DirectoryFactory getDirectoryFactory(
Expand Down
33 changes: 18 additions & 15 deletions server/src/main/java/org/elasticsearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.env.ShardLock;
import org.elasticsearch.env.ShardLockObtainFailedException;
import org.elasticsearch.index.analysis.AnalysisRegistry;
import org.elasticsearch.index.analysis.IndexAnalyzers;
import org.elasticsearch.index.cache.IndexCache;
import org.elasticsearch.index.cache.bitset.BitsetFilterCache;
Expand Down Expand Up @@ -147,8 +146,7 @@ public IndexService(
NamedXContentRegistry xContentRegistry,
SimilarityService similarityService,
ShardStoreDeleter shardStoreDeleter,
AnalysisRegistry registry,
EngineFactory engineFactory,
IndexAnalyzers indexAnalyzers, EngineFactory engineFactory,
CircuitBreakerService circuitBreakerService,
BigArrays bigArrays,
ThreadPool threadPool,
Expand All @@ -163,24 +161,16 @@ public IndexService(
IndicesFieldDataCache indicesFieldDataCache,
List<SearchOperationListener> searchOperationListeners,
List<IndexingOperationListener> indexingOperationListeners,
NamedWriteableRegistry namedWriteableRegistry) throws IOException {
NamedWriteableRegistry namedWriteableRegistry) {
super(indexSettings);
this.indexSettings = indexSettings;
this.xContentRegistry = xContentRegistry;
this.similarityService = similarityService;
this.namedWriteableRegistry = namedWriteableRegistry;
this.circuitBreakerService = circuitBreakerService;
if (indexSettings.getIndexMetaData().getState() == IndexMetaData.State.CLOSE &&
indexCreationContext == IndexCreationContext.CREATE_INDEX) { // metadata verification needs a mapper service
this.mapperService = null;
this.indexFieldData = null;
this.indexSortSupplier = () -> null;
this.bitsetFilterCache = null;
this.warmer = null;
this.indexCache = null;
} else {
this.mapperService = new MapperService(indexSettings, registry.build(indexSettings), xContentRegistry, similarityService,
mapperRegistry,
if (needsMapperService(indexSettings, indexCreationContext)) {
assert indexAnalyzers != null;
this.mapperService = new MapperService(indexSettings, indexAnalyzers, xContentRegistry, similarityService, mapperRegistry,
// we parse all percolator queries as they would be parsed on shard 0
() -> newQueryShardContext(0, null, System::currentTimeMillis, null));
this.indexFieldData = new IndexFieldDataService(indexSettings, indicesFieldDataCache, circuitBreakerService, mapperService);
Expand All @@ -198,6 +188,14 @@ public IndexService(
this.bitsetFilterCache = new BitsetFilterCache(indexSettings, new BitsetCacheListener(this));
this.warmer = new IndexWarmer(threadPool, indexFieldData, bitsetFilterCache.createListener(threadPool));
this.indexCache = new IndexCache(indexSettings, queryCache, bitsetFilterCache);
} else {
assert indexAnalyzers == null;
this.mapperService = null;
this.indexFieldData = null;
this.indexSortSupplier = () -> null;
this.bitsetFilterCache = null;
this.warmer = null;
this.indexCache = null;
}

this.shardStoreDeleter = shardStoreDeleter;
Expand All @@ -222,6 +220,11 @@ public IndexService(
updateFsyncTaskIfNecessary();
}

static boolean needsMapperService(IndexSettings indexSettings, IndexCreationContext indexCreationContext) {
return false == (indexSettings.getIndexMetaData().getState() == IndexMetaData.State.CLOSE &&
indexCreationContext == IndexCreationContext.CREATE_INDEX); // metadata verification needs a mapper service
}

public enum IndexCreationContext {
CREATE_INDEX,
META_DATA_VERIFICATION
Expand Down
95 changes: 90 additions & 5 deletions server/src/test/java/org/elasticsearch/index/IndexModuleTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.elasticsearch.index;

import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.index.AssertingDirectoryReader;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.FieldInvertState;
Expand All @@ -40,12 +41,15 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.env.ShardLock;
import org.elasticsearch.env.TestEnvironment;
import org.elasticsearch.index.analysis.AnalysisRegistry;
import org.elasticsearch.index.analysis.AnalyzerProvider;
import org.elasticsearch.index.analysis.AnalyzerScope;
import org.elasticsearch.index.cache.query.DisabledQueryCache;
import org.elasticsearch.index.cache.query.IndexQueryCache;
import org.elasticsearch.index.cache.query.QueryCache;
Expand All @@ -65,6 +69,7 @@
import org.elasticsearch.index.store.FsDirectoryFactory;
import org.elasticsearch.indices.IndicesModule;
import org.elasticsearch.indices.IndicesQueryCache;
import org.elasticsearch.indices.analysis.AnalysisModule;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason;
Expand All @@ -84,13 +89,17 @@

import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonMap;
import static org.elasticsearch.index.IndexService.IndexCreationContext.CREATE_INDEX;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.hasToString;
import static org.hamcrest.Matchers.instanceOf;

Expand Down Expand Up @@ -174,7 +183,7 @@ public void testRegisterIndexStore() throws IOException {
.put(IndexModule.INDEX_STORE_TYPE_SETTING.getKey(), "foo_store")
.build();
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(index, settings);
final Map<String, IndexStorePlugin.DirectoryFactory> indexStoreFactories = Collections.singletonMap(
final Map<String, IndexStorePlugin.DirectoryFactory> indexStoreFactories = singletonMap(
"foo_store", new FooFunction());
final IndexModule module = new IndexModule(indexSettings, emptyAnalysisRegistry, new InternalEngineFactory(), indexStoreFactories);

Expand Down Expand Up @@ -354,11 +363,19 @@ public void testForceCustomQueryCache() throws IOException {
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build();
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("foo", settings);
IndexModule module = new IndexModule(indexSettings, emptyAnalysisRegistry, new InternalEngineFactory(), Collections.emptyMap());
module.forceQueryCacheProvider((a, b) -> new CustomQueryCache());
expectThrows(AlreadySetException.class, () -> module.forceQueryCacheProvider((a, b) -> new CustomQueryCache()));
final Set<CustomQueryCache> liveQueryCaches = new HashSet<>();
module.forceQueryCacheProvider((a, b) -> {
final CustomQueryCache customQueryCache = new CustomQueryCache(liveQueryCaches);
liveQueryCaches.add(customQueryCache);
return customQueryCache;
});
expectThrows(AlreadySetException.class, () -> module.forceQueryCacheProvider((a, b) -> {
throw new AssertionError("never called");
}));
IndexService indexService = newIndexService(module);
assertTrue(indexService.cache().query() instanceof CustomQueryCache);
indexService.close("simon says", false);
assertThat(liveQueryCaches, empty());
}

public void testDefaultQueryCacheImplIsSelected() throws IOException {
Expand All @@ -379,12 +396,73 @@ public void testDisableQueryCacheHasPrecedenceOverForceQueryCache() throws IOExc
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build();
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("foo", settings);
IndexModule module = new IndexModule(indexSettings, emptyAnalysisRegistry, new InternalEngineFactory(), Collections.emptyMap());
module.forceQueryCacheProvider((a, b) -> new CustomQueryCache());
module.forceQueryCacheProvider((a, b) -> new CustomQueryCache(null));
IndexService indexService = newIndexService(module);
assertTrue(indexService.cache().query() instanceof DisabledQueryCache);
indexService.close("simon says", false);
}

public void testCustomQueryCacheCleanedUpIfIndexServiceCreationFails() {
Settings settings = Settings.builder()
.put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString())
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build();
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("foo", settings);
IndexModule module = new IndexModule(indexSettings, emptyAnalysisRegistry, new InternalEngineFactory(), Collections.emptyMap());
final Set<CustomQueryCache> liveQueryCaches = new HashSet<>();
module.forceQueryCacheProvider((a, b) -> {
final CustomQueryCache customQueryCache = new CustomQueryCache(liveQueryCaches);
liveQueryCaches.add(customQueryCache);
return customQueryCache;
});
threadPool.shutdown(); // causes index service creation to fail
expectThrows(EsRejectedExecutionException.class, () -> newIndexService(module));
assertThat(liveQueryCaches, empty());
}

public void testIndexAnalyzersCleanedUpIfIndexServiceCreationFails() {
Settings settings = Settings.builder()
.put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString())
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build();
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("foo", settings);

final HashSet<Analyzer> openAnalyzers = new HashSet<>();
final AnalysisModule.AnalysisProvider<AnalyzerProvider<?>> analysisProvider = (i,e,n,s) -> new AnalyzerProvider<>() {
@Override
public String name() {
return "test";
}

@Override
public AnalyzerScope scope() {
return AnalyzerScope.INDEX;
}

@Override
public Analyzer get() {
final Analyzer analyzer = new Analyzer() {
@Override
protected TokenStreamComponents createComponents(String fieldName) {
throw new AssertionError("should not be here");
}

@Override
public void close() {
super.close();
openAnalyzers.remove(this);
}
};
openAnalyzers.add(analyzer);
return analyzer;
}
};
final AnalysisRegistry analysisRegistry = new AnalysisRegistry(environment, emptyMap(), emptyMap(), emptyMap(),
singletonMap("test", analysisProvider), emptyMap(), emptyMap(), emptyMap(), emptyMap(), emptyMap());
IndexModule module = new IndexModule(indexSettings, analysisRegistry, new InternalEngineFactory(), Collections.emptyMap());
threadPool.shutdown(); // causes index service creation to fail
expectThrows(EsRejectedExecutionException.class, () -> newIndexService(module));
assertThat(openAnalyzers, empty());
}

public void testMmapNotAllowed() {
String storeType = randomFrom(IndexModule.Type.HYBRIDFS.getSettingsKey(), IndexModule.Type.MMAPFS.getSettingsKey());
final Settings settings = Settings.builder()
Expand All @@ -403,12 +481,19 @@ public void testMmapNotAllowed() {

class CustomQueryCache implements QueryCache {

private final Set<CustomQueryCache> liveQueryCaches;

CustomQueryCache(Set<CustomQueryCache> liveQueryCaches) {
this.liveQueryCaches = liveQueryCaches;
}

@Override
public void clear(String reason) {
}

@Override
public void close() throws IOException {
public void close() {
assertTrue(liveQueryCaches == null || liveQueryCaches.remove(this));
}

@Override
Expand Down