From 561546cad56d358e7d54e147126f678f798f0acf Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Fri, 3 Nov 2017 13:28:59 +0100 Subject: [PATCH 01/15] Skip shard refreshes if shard is `search idle` Today we refresh automatically in the backgroud by default very second. This default behavior has a significant impact on indexing performance if the refreshes are not needed. This change introduces a notion of a shard being `search idle` which a shard transitions to after (default) `30s` without any access to an external searcher. Once a shard is search idle all scheduled refreshes will be skipped unless there are any refresh listeners registered. If a search happens on a `serach idle` shard the search request _park_ on a refresh listener and will be executed once the next scheduled refresh occurs. This will also turn the shard into the `non-idle` state immediately. This behavior is only applied if there is no explicit refresh interval set. --- .../explain/TransportExplainAction.java | 17 +++ .../action/get/TransportGetAction.java | 18 +++ .../shard/TransportSingleShardAction.java | 40 +++++- .../TransportTermVectorsAction.java | 16 +++ .../common/settings/IndexScopedSettings.java | 2 +- .../org/elasticsearch/index/IndexService.java | 12 +- .../elasticsearch/index/IndexSettings.java | 21 +++ .../elasticsearch/index/shard/IndexShard.java | 65 ++++++++- .../elasticsearch/search/SearchService.java | 39 ++++-- .../index/shard/IndexShardTests.java | 131 ++++++++++++++++++ 10 files changed, 334 insertions(+), 27 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/explain/TransportExplainAction.java b/core/src/main/java/org/elasticsearch/action/explain/TransportExplainAction.java index c30dfd360a08b..c8f48bfbde71e 100644 --- a/core/src/main/java/org/elasticsearch/action/explain/TransportExplainAction.java +++ b/core/src/main/java/org/elasticsearch/action/explain/TransportExplainAction.java @@ -24,6 +24,8 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.RoutingMissingException; +import org.elasticsearch.action.get.GetRequest; +import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.single.shard.TransportSingleShardAction; import org.elasticsearch.cluster.ClusterState; @@ -33,8 +35,10 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.IndexService; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.get.GetResult; +import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.search.SearchService; import org.elasticsearch.search.internal.AliasFilter; @@ -86,6 +90,19 @@ protected void resolveRequest(ClusterState state, InternalRequest request) { } } + @Override + protected void shardOperation(ExplainRequest request, ShardId shardId, ActionListener listener) throws IOException { + IndexService indexService = searchService.getIndicesService().indexServiceSafe(shardId.getIndex()); + IndexShard indexShard = indexService.getShard(shardId.id()); + indexShard.awaitPendingRefresh(b -> { + try { + super.shardOperation(request, shardId, listener); + } catch (IOException ex) { + listener.onFailure(ex); + } + }); + } + @Override protected ExplainResponse shardOperation(ExplainRequest request, ShardId shardId) throws IOException { ShardSearchLocalRequest shardSearchLocalRequest = new ShardSearchLocalRequest(shardId, diff --git a/core/src/main/java/org/elasticsearch/action/get/TransportGetAction.java b/core/src/main/java/org/elasticsearch/action/get/TransportGetAction.java index 884af4a3af998..4772f06deeb5b 100644 --- a/core/src/main/java/org/elasticsearch/action/get/TransportGetAction.java +++ b/core/src/main/java/org/elasticsearch/action/get/TransportGetAction.java @@ -19,9 +19,12 @@ package org.elasticsearch.action.get; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.RoutingMissingException; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.single.shard.TransportSingleShardAction; +import org.elasticsearch.action.termvectors.TermVectorsRequest; +import org.elasticsearch.action.termvectors.TermVectorsResponse; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; @@ -38,6 +41,8 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import java.io.IOException; + /** * Performs the get operation. */ @@ -76,6 +81,19 @@ protected void resolveRequest(ClusterState state, InternalRequest request) { } } + @Override + protected void shardOperation(GetRequest request, ShardId shardId, ActionListener listener) throws IOException { + IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); + IndexShard indexShard = indexService.getShard(shardId.id()); + indexShard.awaitPendingRefresh(b -> { + try { + super.shardOperation(request, shardId, listener); + } catch (IOException ex) { + listener.onFailure(ex); + } + }); + } + @Override protected GetResponse shardOperation(GetRequest request, ShardId shardId) { IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); diff --git a/core/src/main/java/org/elasticsearch/action/support/single/shard/TransportSingleShardAction.java b/core/src/main/java/org/elasticsearch/action/support/single/shard/TransportSingleShardAction.java index 811dcbed3dcf9..4c780647479f8 100644 --- a/core/src/main/java/org/elasticsearch/action/support/single/shard/TransportSingleShardAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/single/shard/TransportSingleShardAction.java @@ -38,6 +38,7 @@ import org.elasticsearch.common.Nullable; import org.elasticsearch.common.logging.LoggerMessageFormat; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportChannel; @@ -47,6 +48,8 @@ import org.elasticsearch.transport.TransportService; import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.concurrent.Executor; import java.util.function.Supplier; import static org.elasticsearch.action.support.TransportActions.isShardNotAvailableException; @@ -78,7 +81,7 @@ protected TransportSingleShardAction(Settings settings, String actionName, Threa if (!isSubAction()) { transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, new TransportHandler()); } - transportService.registerRequestHandler(transportShardAction, request, executor, new ShardTransportHandler()); + transportService.registerRequestHandler(transportShardAction, request, ThreadPool.Names.SAME, new ShardTransportHandler()); } /** @@ -97,6 +100,19 @@ protected void doExecute(Request request, ActionListener listener) { protected abstract Response shardOperation(Request request, ShardId shardId) throws IOException; + protected void shardOperation(Request request, ShardId shardId, ActionListener listener) throws IOException { + threadPool.executor(this.executor).execute(new AbstractRunnable() { + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + + @Override + protected void doRun() throws Exception { + listener.onResponse(shardOperation(request, shardId)); + } + }); + } protected abstract Response newResponse(); protected abstract boolean resolveIndex(Request request); @@ -291,11 +307,27 @@ public void messageReceived(final Request request, final TransportChannel channe if (logger.isTraceEnabled()) { logger.trace("executing [{}] on shard [{}]", request, request.internalShardId); } - Response response = shardOperation(request, request.internalShardId); - channel.sendResponse(response); + shardOperation(request, request.internalShardId, new ActionListener() { + @Override + public void onResponse(Response response) { + try { + channel.sendResponse(response); + } catch (IOException e) { + onFailure(e); + } + } + + @Override + public void onFailure(Exception e) { + try { + channel.sendResponse(e); + } catch (IOException e1) { + throw new UncheckedIOException(e1); + } + } + }); } } - /** * Internal request class that gets built on each node. Holds the original request plus additional info. */ diff --git a/core/src/main/java/org/elasticsearch/action/termvectors/TransportTermVectorsAction.java b/core/src/main/java/org/elasticsearch/action/termvectors/TransportTermVectorsAction.java index 5ff55a6fa552a..f22b7342ba1c4 100644 --- a/core/src/main/java/org/elasticsearch/action/termvectors/TransportTermVectorsAction.java +++ b/core/src/main/java/org/elasticsearch/action/termvectors/TransportTermVectorsAction.java @@ -19,6 +19,7 @@ package org.elasticsearch.action.termvectors; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.RoutingMissingException; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.single.shard.TransportSingleShardAction; @@ -37,6 +38,8 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import java.io.IOException; + /** * Performs the get operation. */ @@ -82,6 +85,19 @@ protected void resolveRequest(ClusterState state, InternalRequest request) { } } + @Override + protected void shardOperation(TermVectorsRequest request, ShardId shardId, ActionListener listener) throws IOException { + IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); + IndexShard indexShard = indexService.getShard(shardId.id()); + indexShard.awaitPendingRefresh(b -> { + try { + super.shardOperation(request, shardId, listener); + } catch (IOException ex) { + listener.onFailure(ex); + } + }); + } + @Override protected TermVectorsResponse shardOperation(TermVectorsRequest request, ShardId shardId) { IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); diff --git a/core/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java b/core/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java index d40488eaa34f8..cbf783fdde9c7 100644 --- a/core/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java +++ b/core/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java @@ -36,7 +36,6 @@ import org.elasticsearch.index.fielddata.IndexFieldDataService; import org.elasticsearch.index.mapper.FieldMapper; import org.elasticsearch.index.mapper.MapperService; -import org.elasticsearch.index.seqno.LocalCheckpointTracker; import org.elasticsearch.index.similarity.SimilarityService; import org.elasticsearch.index.store.FsDirectoryService; import org.elasticsearch.index.store.Store; @@ -135,6 +134,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings { IndexSettings.INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING, IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING, IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING, + IndexSettings.INDEX_SEARCH_IDLE_AFTER, IndexFieldDataService.INDEX_FIELDDATA_CACHE_KEY, FieldMapper.IGNORE_MALFORMED_SETTING, FieldMapper.COERCE_SETTING, diff --git a/core/src/main/java/org/elasticsearch/index/IndexService.java b/core/src/main/java/org/elasticsearch/index/IndexService.java index d192e8781d6da..2887e9a0b26c7 100644 --- a/core/src/main/java/org/elasticsearch/index/IndexService.java +++ b/core/src/main/java/org/elasticsearch/index/IndexService.java @@ -689,14 +689,10 @@ private void maybeFSyncTranslogs() { private void maybeRefreshEngine() { if (indexSettings.getRefreshInterval().millis() > 0) { for (IndexShard shard : this.shards.values()) { - if (shard.isReadAllowed()) { - try { - if (shard.isRefreshNeeded()) { - shard.refresh("schedule"); - } - } catch (IndexShardClosedException | AlreadyClosedException ex) { - // fine - continue; - } + try { + shard.scheduledRefresh(); + } catch (IndexShardClosedException | AlreadyClosedException ex) { + // fine - continue; } } } diff --git a/core/src/main/java/org/elasticsearch/index/IndexSettings.java b/core/src/main/java/org/elasticsearch/index/IndexSettings.java index 9e390fb5b22cf..bf498d3d07d0e 100644 --- a/core/src/main/java/org/elasticsearch/index/IndexSettings.java +++ b/core/src/main/java/org/elasticsearch/index/IndexSettings.java @@ -62,6 +62,9 @@ public final class IndexSettings { public static final Setting INDEX_TRANSLOG_SYNC_INTERVAL_SETTING = Setting.timeSetting("index.translog.sync_interval", TimeValue.timeValueSeconds(5), TimeValue.timeValueMillis(100), Property.IndexScope); + public static final Setting INDEX_SEARCH_IDLE_AFTER = + Setting.timeSetting("index.search.idle.after", TimeValue.timeValueSeconds(30), + TimeValue.timeValueMinutes(0), Property.IndexScope, Property.Dynamic); public static final Setting INDEX_TRANSLOG_DURABILITY_SETTING = new Setting<>("index.translog.durability", Translog.Durability.REQUEST.name(), (value) -> Translog.Durability.valueOf(value.toUpperCase(Locale.ROOT)), Property.Dynamic, Property.IndexScope); @@ -262,6 +265,8 @@ public final class IndexSettings { private volatile int maxNgramDiff; private volatile int maxShingleDiff; private volatile boolean TTLPurgeDisabled; + private volatile TimeValue searchIdleAfter; + /** * The maximum number of refresh listeners allows on this shard. */ @@ -371,6 +376,7 @@ public IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSetti maxSlicesPerScroll = scopedSettings.get(MAX_SLICES_PER_SCROLL); this.mergePolicyConfig = new MergePolicyConfig(logger, this); this.indexSortConfig = new IndexSortConfig(this); + searchIdleAfter = scopedSettings.get(INDEX_SEARCH_IDLE_AFTER); singleType = INDEX_MAPPING_SINGLE_TYPE_SETTING.get(indexMetaData.getSettings()); // get this from metadata - it's not registered if ((singleType || version.before(Version.V_6_0_0_alpha1)) == false) { throw new AssertionError(index.toString() + "multiple types are only allowed on pre 6.x indices but version is: [" @@ -411,8 +417,11 @@ public IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSetti scopedSettings.addSettingsUpdateConsumer(MAX_REFRESH_LISTENERS_PER_SHARD, this::setMaxRefreshListeners); scopedSettings.addSettingsUpdateConsumer(MAX_SLICES_PER_SCROLL, this::setMaxSlicesPerScroll); scopedSettings.addSettingsUpdateConsumer(DEFAULT_FIELD_SETTING, this::setDefaultFields); + scopedSettings.addSettingsUpdateConsumer(INDEX_SEARCH_IDLE_AFTER, this::setSearchIdleAfter); } + private void setSearchIdleAfter(TimeValue searchIdleAfter) { this.searchIdleAfter = searchIdleAfter; } + private void setTranslogFlushThresholdSize(ByteSizeValue byteSizeValue) { this.flushThresholdSize = byteSizeValue; } @@ -752,4 +761,16 @@ public IndexSortConfig getIndexSortConfig() { } public IndexScopedSettings getScopedSettings() { return scopedSettings;} + + /** + * Returns true iff the refresh setting exists or in other words is explicitly set. + */ + public boolean isExplicitRefresh() { + return INDEX_REFRESH_INTERVAL_SETTING.exists(settings); + } + + /** + * Returns the time that an index shard becomes search idle unless it's accessed in between + */ + public TimeValue getSearchIdleAfter() { return searchIdleAfter; } } diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 304764656b73f..8502c68a2fe73 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -234,6 +234,9 @@ Runnable getGlobalCheckpointSyncer() { */ private final RefreshListeners refreshListeners; + private final AtomicLong lastSearcherAccess = new AtomicLong(); + private final AtomicReference pendingRefreshLocation = new AtomicReference<>(); + public IndexShard( ShardRouting shardRouting, IndexSettings indexSettings, @@ -298,6 +301,7 @@ public IndexShard( searcherWrapper = indexSearcherWrapper; primaryTerm = indexSettings.getIndexMetaData().primaryTerm(shardId.id()); refreshListeners = buildRefreshListeners(); + lastSearcherAccess.set(threadPool.relativeTimeInMillis()); persistMetadata(path, indexSettings, shardRouting, null, logger); } @@ -1120,6 +1124,7 @@ public Engine.Searcher acquireSearcher(String source) { private Engine.Searcher acquireSearcher(String source, Engine.SearcherScope scope) { readAllowed(); + lastSearcherAccess.lazySet(threadPool.relativeTimeInMillis()); final Engine engine = getEngine(); final Engine.Searcher searcher = engine.acquireSearcher(source, scope); boolean success = false; @@ -2418,6 +2423,25 @@ EngineFactory getEngineFactory() { return engineFactory; } + /** + * Executes a scheduled refresh if necessary + */ + public boolean scheduledRefresh() { + if (isReadAllowed() && isRefreshNeeded()) { + if (refreshListeners.refreshNeeded() == false // if we have a listener that is waiting for a refresh we need to force it + && isSearchIdle() && indexSettings.isExplicitRefresh() == false) { + // lets skip this refresh since we are search idle and + // don't necessarily need to refresh. the next search execute cause a + setRefreshPending(); + return false; + } else { + refresh("schedule"); + return true; + } + } + return false; + } + /** * Returns true iff one or more changes to the engine are not visible to via the current searcher *or* there are pending * refresh listeners. @@ -2425,8 +2449,45 @@ EngineFactory getEngineFactory() { * * @throws AlreadyClosedException if the engine or internal indexwriter in the engine is already closed */ - public boolean isRefreshNeeded() { - return getEngine().refreshNeeded() || (refreshListeners != null && refreshListeners.refreshNeeded()); + final boolean isRefreshNeeded() { + return refreshListeners.refreshNeeded() // lets check the cheaper one first + || getEngine().refreshNeeded(); + } + + /** + * Returns true if this shards is search idle + * @see {@link IndexSettings#getSearchIdleAfter()} + */ + final boolean isSearchIdle() { + return (threadPool.relativeTimeInMillis() - lastSearcherAccess.get()) + >= indexSettings.getSearchIdleAfter().getMillis(); + } + + private void setRefreshPending() { + Engine engine = getEngine(); + if (isSearchIdle()) { + acquireSearcher("setRefreshPending").close(); // move the shard into non-search idle + } + Translog.Location lastWriteLocation = engine.getTranslog().getLastWriteLocation(); + Translog.Location location; + do { + location = this.pendingRefreshLocation.get(); + if (location != null && lastWriteLocation.compareTo(location) <= 0) { + break; + } + } while (pendingRefreshLocation.compareAndSet(location, lastWriteLocation) == false); + } + + public void awaitPendingRefresh(Consumer listener) { + final Translog.Location location = pendingRefreshLocation.get(); + if (location != null) { + addRefreshListener(location, (b) -> { + pendingRefreshLocation.compareAndSet(location, null); + listener.accept(true); + }); + } else { + listener.accept(false); + } } /** diff --git a/core/src/main/java/org/elasticsearch/search/SearchService.java b/core/src/main/java/org/elasticsearch/search/SearchService.java index 49ab665295793..1f60756a65f02 100644 --- a/core/src/main/java/org/elasticsearch/search/SearchService.java +++ b/core/src/main/java/org/elasticsearch/search/SearchService.java @@ -53,6 +53,7 @@ import org.elasticsearch.index.shard.IndexEventListener; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.SearchOperationListener; +import org.elasticsearch.index.translog.Translog; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason; import org.elasticsearch.node.ResponseCollectorService; @@ -582,6 +583,7 @@ public DefaultSearchContext createSearchContext(ShardSearchRequest request, Time throws IOException { return createSearchContext(request, timeout, true); } + private DefaultSearchContext createSearchContext(ShardSearchRequest request, TimeValue timeout, boolean assertAsyncActions) throws IOException { @@ -979,22 +981,31 @@ public static boolean canRewriteToMatchNone(SearchSourceBuilder source) { * The action listener is guaranteed to be executed on the search thread-pool */ private void rewriteShardRequest(ShardSearchRequest request, ActionListener listener) { + ActionListener actionListener = ActionListener.wrap(r -> + threadPool.executor(Names.SEARCH).execute(new AbstractRunnable() { + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + + @Override + protected void doRun() throws Exception { + listener.onResponse(request); + } + }), listener::onFailure); + IndexShard shardOrNull = indicesService.getShardOrNull(request.shardId()); + if (shardOrNull != null) { + // now we need to check if there is a pending refresh and register + ActionListener finalListener = actionListener; + actionListener = ActionListener.wrap(r -> + shardOrNull.awaitPendingRefresh(b -> finalListener.onResponse(r)), finalListener::onFailure); + } // we also do rewrite on the coordinating node (TransportSearchService) but we also need to do it here for BWC as well as // AliasFilters that might need to be rewritten. These are edge-cases but we are every efficient doing the rewrite here so it's not // adding a lot of overhead - Rewriteable.rewriteAndFetch(request.getRewriteable(), indicesService.getRewriteContext(request::nowInMillis), - ActionListener.wrap(r -> - threadPool.executor(Names.SEARCH).execute(new AbstractRunnable() { - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } + Rewriteable.rewriteAndFetch(request.getRewriteable(), indicesService.getRewriteContext(request::nowInMillis), actionListener); + - @Override - protected void doRun() throws Exception { - listener.onResponse(request); - } - }), listener::onFailure)); } /** @@ -1003,4 +1014,8 @@ protected void doRun() throws Exception { public QueryRewriteContext getRewriteContext(LongSupplier nowInMillis) { return indicesService.getRewriteContext(nowInMillis); } + + public IndicesService getIndicesService() { + return indicesService; + } } diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 89e2f8441741d..e48d5ec4816cb 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -62,7 +62,9 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasables; +import org.elasticsearch.common.settings.IndexScopedSettings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.xcontent.NamedXContentRegistry; @@ -70,6 +72,7 @@ import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.env.NodeEnvironment; +import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineException; @@ -2565,4 +2568,132 @@ public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, Ve public void verify(String verificationToken, DiscoveryNode localNode) { } } + + public void testIsSearchIdle() throws IOException { + Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .build(); + IndexMetaData metaData = IndexMetaData.builder("test") + .putMapping("test", "{ \"properties\": { \"foo\": { \"type\": \"text\"}}}") + .settings(settings) + .primaryTerm(0, 1).build(); + IndexShard primary = newShard(new ShardId(metaData.getIndex(), 0), true, "n1", metaData, null); + recoverShardFromStore(primary); + indexDoc(primary, "test", "0", "{\"foo\" : \"bar\"}"); + assertTrue(primary.isRefreshNeeded()); + assertTrue(primary.scheduledRefresh()); + assertFalse(primary.isSearchIdle()); + + IndexScopedSettings scopedSettings = primary.indexSettings().getScopedSettings(); + settings = Settings.builder().put(settings).put(IndexSettings.INDEX_SEARCH_IDLE_AFTER.getKey(), TimeValue.ZERO).build(); + scopedSettings.applySettings(settings); + assertTrue(primary.isSearchIdle()); + + settings = Settings.builder().put(settings).put(IndexSettings.INDEX_SEARCH_IDLE_AFTER.getKey(), TimeValue.timeValueMinutes(1)) + .build(); + scopedSettings.applySettings(settings); + assertFalse(primary.isSearchIdle()); + + settings = Settings.builder().put(settings).put(IndexSettings.INDEX_SEARCH_IDLE_AFTER.getKey(), TimeValue.timeValueMillis(10)) + .build(); + scopedSettings.applySettings(settings); + while (primary.isSearchIdle() == false) { + // wait for it to become idle + } + do { + // now loop until we are fast enough... shouldn't take long + primary.acquireSearcher("test").close(); + } while (primary.isSearchIdle()); + closeShards(primary); + } + + public void testScheduledRefresh() throws IOException, InterruptedException { + Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .build(); + IndexMetaData metaData = IndexMetaData.builder("test") + .putMapping("test", "{ \"properties\": { \"foo\": { \"type\": \"text\"}}}") + .settings(settings) + .primaryTerm(0, 1).build(); + IndexShard primary = newShard(new ShardId(metaData.getIndex(), 0), true, "n1", metaData, null); + recoverShardFromStore(primary); + indexDoc(primary, "test", "0", "{\"foo\" : \"bar\"}"); + assertTrue(primary.isRefreshNeeded()); + assertTrue(primary.scheduledRefresh()); + IndexScopedSettings scopedSettings = primary.indexSettings().getScopedSettings(); + settings = Settings.builder().put(settings).put(IndexSettings.INDEX_SEARCH_IDLE_AFTER.getKey(), TimeValue.ZERO).build(); + scopedSettings.applySettings(settings); + + assertFalse(primary.isRefreshNeeded()); + indexDoc(primary, "test", "1", "{\"foo\" : \"bar\"}"); + assertTrue(primary.isRefreshNeeded()); + assertFalse(primary.scheduledRefresh()); + CountDownLatch latch = new CountDownLatch(10); + for (int i = 0; i < 10; i++) { + primary.awaitPendingRefresh(refreshed -> { + assertTrue(refreshed); + try (Engine.Searcher searcher = primary.acquireSearcher("test")) { + assertEquals(2, searcher.reader().numDocs()); + } finally { + latch.countDown(); + } + }); + } + try (Engine.Searcher searcher = primary.acquireSearcher("test")) { + assertEquals(1, searcher.reader().numDocs()); + } + assertTrue(primary.isRefreshNeeded()); + assertTrue(primary.scheduledRefresh()); + latch.await(); + CountDownLatch latch1 = new CountDownLatch(1); + primary.awaitPendingRefresh(refreshed -> { + assertFalse(refreshed); + try (Engine.Searcher searcher = primary.acquireSearcher("test")) { + assertEquals(2, searcher.reader().numDocs()); + } finally { + latch1.countDown(); + } + + }); + latch1.await(); + closeShards(primary); + } + + public void testRefreshIsNeededWithRefreshListeners() throws IOException, InterruptedException { + Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .build(); + IndexMetaData metaData = IndexMetaData.builder("test") + .putMapping("test", "{ \"properties\": { \"foo\": { \"type\": \"text\"}}}") + .settings(settings) + .primaryTerm(0, 1).build(); + IndexShard primary = newShard(new ShardId(metaData.getIndex(), 0), true, "n1", metaData, null); + recoverShardFromStore(primary); + indexDoc(primary, "test", "0", "{\"foo\" : \"bar\"}"); + assertTrue(primary.isRefreshNeeded()); + assertTrue(primary.scheduledRefresh()); + Engine.IndexResult doc = indexDoc(primary, "test", "1", "{\"foo\" : \"bar\"}"); + CountDownLatch latch = new CountDownLatch(1); + primary.addRefreshListener(doc.getTranslogLocation(), r -> latch.countDown()); + assertEquals(1, latch.getCount()); + assertTrue(primary.isRefreshNeeded()); + assertTrue(primary.scheduledRefresh()); + latch.await(); + + IndexScopedSettings scopedSettings = primary.indexSettings().getScopedSettings(); + settings = Settings.builder().put(settings).put(IndexSettings.INDEX_SEARCH_IDLE_AFTER.getKey(), TimeValue.ZERO).build(); + scopedSettings.applySettings(settings); + + doc = indexDoc(primary, "test", "2", "{\"foo\" : \"bar\"}"); + CountDownLatch latch1 = new CountDownLatch(1); + primary.addRefreshListener(doc.getTranslogLocation(), r -> latch1.countDown()); + assertEquals(1, latch1.getCount()); + assertTrue(primary.isRefreshNeeded()); + assertTrue(primary.scheduledRefresh()); + latch1.await(); + closeShards(primary); + } } From 6e6edf26a8fe3ac10b9e0ee4fb381d79f5845829 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Wed, 22 Nov 2017 23:22:04 +0100 Subject: [PATCH 02/15] remove unnecessary javadoc link --- .../main/java/org/elasticsearch/index/shard/IndexShard.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 8502c68a2fe73..208b5474df884 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -2456,11 +2456,9 @@ final boolean isRefreshNeeded() { /** * Returns true if this shards is search idle - * @see {@link IndexSettings#getSearchIdleAfter()} */ final boolean isSearchIdle() { - return (threadPool.relativeTimeInMillis() - lastSearcherAccess.get()) - >= indexSettings.getSearchIdleAfter().getMillis(); + return (threadPool.relativeTimeInMillis() - lastSearcherAccess.get()) >= indexSettings.getSearchIdleAfter().getMillis(); } private void setRefreshPending() { From 52fc2868c19a0260f33cc200ab85ea0ae14920fb Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Thu, 23 Nov 2017 15:10:17 +0100 Subject: [PATCH 03/15] add javadocs --- .../elasticsearch/index/shard/IndexShard.java | 27 +++++++++---------- 1 file changed, 12 insertions(+), 15 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 208b5474df884..0ca462522ab30 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -2424,14 +2424,17 @@ EngineFactory getEngineFactory() { } /** - * Executes a scheduled refresh if necessary + * Executes a scheduled refresh if necessary. + * + * @return true iff the engine got refreshed otherwise false */ public boolean scheduledRefresh() { - if (isReadAllowed() && isRefreshNeeded()) { + if (isReadAllowed() && getEngine().refreshNeeded()) { if (refreshListeners.refreshNeeded() == false // if we have a listener that is waiting for a refresh we need to force it && isSearchIdle() && indexSettings.isExplicitRefresh() == false) { // lets skip this refresh since we are search idle and - // don't necessarily need to refresh. the next search execute cause a + // don't necessarily need to refresh. the next searcher access will register a refreshListener and that will + // cause the next schedule to refresh. setRefreshPending(); return false; } else { @@ -2442,18 +2445,6 @@ && isSearchIdle() && indexSettings.isExplicitRefresh() == false) { return false; } - /** - * Returns true iff one or more changes to the engine are not visible to via the current searcher *or* there are pending - * refresh listeners. - * Otherwise false. - * - * @throws AlreadyClosedException if the engine or internal indexwriter in the engine is already closed - */ - final boolean isRefreshNeeded() { - return refreshListeners.refreshNeeded() // lets check the cheaper one first - || getEngine().refreshNeeded(); - } - /** * Returns true if this shards is search idle */ @@ -2476,6 +2467,12 @@ private void setRefreshPending() { } while (pendingRefreshLocation.compareAndSet(location, lastWriteLocation) == false); } + /** + * Registers the given listener and invokes it once the pending refresh translog location has been refreshed. If there is no pending + * refresh location registered the listener will be invoked immediately. + * @param listener the listener to invoke once the pending refresh location is visible. The listener will be called with + * true if the listener was registered to wait for a refresh. + */ public void awaitPendingRefresh(Consumer listener) { final Translog.Location location = pendingRefreshLocation.get(); if (location != null) { From 47dd97f7354e8c2e3cbe08e30be4393b0016a5a5 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Thu, 23 Nov 2017 15:10:53 +0100 Subject: [PATCH 04/15] add integration test that checks that we are actually refreshing in the backgroud --- .../index/shard/IndexShardIT.java | 91 +++++++++++++------ .../index/shard/IndexShardTests.java | 16 ++-- 2 files changed, 73 insertions(+), 34 deletions(-) diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java index b29ba2d9efcb5..ba451257649e9 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -18,14 +18,14 @@ */ package org.elasticsearch.index.shard; -import org.apache.lucene.document.Field; -import org.apache.lucene.document.NumericDocValuesField; import org.apache.lucene.store.LockObtainFailedException; import org.apache.lucene.util.IOUtils; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.stats.IndexStats; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.cluster.ClusterInfoService; @@ -41,11 +41,11 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.CheckedRunnable; import org.elasticsearch.common.bytes.BytesArray; -import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; @@ -56,11 +56,6 @@ import org.elasticsearch.index.VersionType; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.flush.FlushStats; -import org.elasticsearch.index.mapper.IdFieldMapper; -import org.elasticsearch.index.mapper.Mapping; -import org.elasticsearch.index.mapper.ParseContext; -import org.elasticsearch.index.mapper.ParsedDocument; -import org.elasticsearch.index.mapper.SeqNoFieldMapper; import org.elasticsearch.index.mapper.SourceToParse; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.indices.IndicesService; @@ -70,6 +65,7 @@ import org.elasticsearch.test.ESSingleNodeTestCase; import org.elasticsearch.test.IndexSettingsModule; import org.elasticsearch.test.InternalSettingsPlugin; +import org.elasticsearch.test.hamcrest.ElasticsearchAssertions; import java.io.IOException; import java.io.UncheckedIOException; @@ -82,8 +78,10 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Predicate; @@ -94,9 +92,7 @@ import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS; import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; -import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; -import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; -import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.*; import static org.hamcrest.Matchers.equalTo; public class IndexShardIT extends ESSingleNodeTestCase { @@ -106,21 +102,6 @@ protected Collection> getPlugins() { return pluginList(InternalSettingsPlugin.class); } - private ParsedDocument testParsedDocument(String id, String type, String routing, long seqNo, - ParseContext.Document document, BytesReference source, XContentType xContentType, - Mapping mappingUpdate) { - Field uidField = new Field("_id", id, IdFieldMapper.Defaults.FIELD_TYPE); - Field versionField = new NumericDocValuesField("_version", 0); - SeqNoFieldMapper.SequenceIDFields seqID = SeqNoFieldMapper.SequenceIDFields.emptySeqID(); - document.add(uidField); - document.add(versionField); - document.add(seqID.seqNo); - document.add(seqID.seqNoDocValue); - document.add(seqID.primaryTerm); - return new ParsedDocument(versionField, seqID, id, type, routing, - Collections.singletonList(document), source, xContentType, mappingUpdate); - } - public void testLockTryingToDelete() throws Exception { createIndex("test"); ensureGreen(); @@ -550,4 +531,62 @@ private static ShardRouting getInitializingShardRouting(ShardRouting existingSha RecoverySource.StoreRecoverySource.EXISTING_STORE_INSTANCE); return shardRouting; } + + public void testAutomaticRefresh() throws InterruptedException { + TimeValue randomTimeValue = randomFrom(random(), null, TimeValue.ZERO, TimeValue.timeValueMillis(randomIntBetween(0, 1000))); + Settings.Builder builder = Settings.builder(); + if (randomTimeValue != null) { + builder.put(IndexSettings.INDEX_SEARCH_IDLE_AFTER.getKey(), randomTimeValue); + } + IndexService indexService = createIndex("test", builder.build()); + assertFalse(indexService.getIndexSettings().isExplicitRefresh()); + ensureGreen(); + AtomicInteger totalNumDocs = new AtomicInteger(Integer.MAX_VALUE); + CountDownLatch started = new CountDownLatch(1); + Thread t = new Thread(() -> { + SearchResponse searchResponse; + started.countDown(); + do { + searchResponse = client().prepareSearch().get(); + } while (searchResponse.getHits().totalHits != totalNumDocs.get()); + }); + t.start(); + started.await(); + assertNoSearchHits(client().prepareSearch().get()); + int numDocs = scaledRandomIntBetween(25, 100); + totalNumDocs.set(numDocs); + CountDownLatch indexingDone = new CountDownLatch(numDocs); + client().prepareIndex("test", "test", "0").setSource("{\"foo\" : \"bar\"}", XContentType.JSON).get(); + indexingDone.countDown(); // one doc is indexed above blocking + IndexShard shard = indexService.getShard(0); + boolean hasRefreshed = shard.scheduledRefresh(); + if (randomTimeValue == TimeValue.ZERO) { + // with ZERO we are guaranteed to see the doc since we will wait for a refresh in the background + assertFalse(hasRefreshed); + assertTrue(shard.isSearchIdle()); + } else if (randomTimeValue == null){ + // with null we are guaranteed to see the doc since do execute the refresh. + // we can't assert on hasRefreshed since it might have been refreshed in the background on the shard concurrently + assertFalse(shard.isSearchIdle()); + } + ElasticsearchAssertions.assertHitCount(client().prepareSearch().get(), 1l); + for (int i = 1; i < numDocs; i++) { + client().prepareIndex("test", "test", "" + i).setSource("{\"foo\" : \"bar\"}", XContentType.JSON) + .execute(new ActionListener() { + @Override + public void onResponse(IndexResponse indexResponse) { + indexingDone.countDown(); + } + + @Override + public void onFailure(Exception e) { + indexingDone.countDown(); + throw new AssertionError(e); + } + }); + } + indexingDone.await(); + t.join(); + + } } diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index e48d5ec4816cb..d206947f65345 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -2581,7 +2581,7 @@ public void testIsSearchIdle() throws IOException { IndexShard primary = newShard(new ShardId(metaData.getIndex(), 0), true, "n1", metaData, null); recoverShardFromStore(primary); indexDoc(primary, "test", "0", "{\"foo\" : \"bar\"}"); - assertTrue(primary.isRefreshNeeded()); + assertTrue(primary.getEngine().refreshNeeded()); assertTrue(primary.scheduledRefresh()); assertFalse(primary.isSearchIdle()); @@ -2620,15 +2620,15 @@ public void testScheduledRefresh() throws IOException, InterruptedException { IndexShard primary = newShard(new ShardId(metaData.getIndex(), 0), true, "n1", metaData, null); recoverShardFromStore(primary); indexDoc(primary, "test", "0", "{\"foo\" : \"bar\"}"); - assertTrue(primary.isRefreshNeeded()); + assertTrue(primary.getEngine().refreshNeeded()); assertTrue(primary.scheduledRefresh()); IndexScopedSettings scopedSettings = primary.indexSettings().getScopedSettings(); settings = Settings.builder().put(settings).put(IndexSettings.INDEX_SEARCH_IDLE_AFTER.getKey(), TimeValue.ZERO).build(); scopedSettings.applySettings(settings); - assertFalse(primary.isRefreshNeeded()); + assertFalse(primary.getEngine().refreshNeeded()); indexDoc(primary, "test", "1", "{\"foo\" : \"bar\"}"); - assertTrue(primary.isRefreshNeeded()); + assertTrue(primary.getEngine().refreshNeeded()); assertFalse(primary.scheduledRefresh()); CountDownLatch latch = new CountDownLatch(10); for (int i = 0; i < 10; i++) { @@ -2644,7 +2644,7 @@ public void testScheduledRefresh() throws IOException, InterruptedException { try (Engine.Searcher searcher = primary.acquireSearcher("test")) { assertEquals(1, searcher.reader().numDocs()); } - assertTrue(primary.isRefreshNeeded()); + assertTrue(primary.getEngine().refreshNeeded()); assertTrue(primary.scheduledRefresh()); latch.await(); CountDownLatch latch1 = new CountDownLatch(1); @@ -2673,13 +2673,13 @@ public void testRefreshIsNeededWithRefreshListeners() throws IOException, Interr IndexShard primary = newShard(new ShardId(metaData.getIndex(), 0), true, "n1", metaData, null); recoverShardFromStore(primary); indexDoc(primary, "test", "0", "{\"foo\" : \"bar\"}"); - assertTrue(primary.isRefreshNeeded()); + assertTrue(primary.getEngine().refreshNeeded()); assertTrue(primary.scheduledRefresh()); Engine.IndexResult doc = indexDoc(primary, "test", "1", "{\"foo\" : \"bar\"}"); CountDownLatch latch = new CountDownLatch(1); primary.addRefreshListener(doc.getTranslogLocation(), r -> latch.countDown()); assertEquals(1, latch.getCount()); - assertTrue(primary.isRefreshNeeded()); + assertTrue(primary.getEngine().refreshNeeded()); assertTrue(primary.scheduledRefresh()); latch.await(); @@ -2691,7 +2691,7 @@ public void testRefreshIsNeededWithRefreshListeners() throws IOException, Interr CountDownLatch latch1 = new CountDownLatch(1); primary.addRefreshListener(doc.getTranslogLocation(), r -> latch1.countDown()); assertEquals(1, latch1.getCount()); - assertTrue(primary.isRefreshNeeded()); + assertTrue(primary.getEngine().refreshNeeded()); assertTrue(primary.scheduledRefresh()); latch1.await(); closeShards(primary); From 046c6be0241f41281c91f156369032e8bee9bd4c Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Thu, 23 Nov 2017 15:36:14 +0100 Subject: [PATCH 05/15] ensure we refresh again after we switch the refresh interval --- .../org/elasticsearch/index/IndexService.java | 27 ++++++++++++-- .../index/shard/IndexShardIT.java | 36 ++++++++++++++++++- 2 files changed, 59 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/IndexService.java b/core/src/main/java/org/elasticsearch/index/IndexService.java index 2887e9a0b26c7..03744d6f3ad45 100644 --- a/core/src/main/java/org/elasticsearch/index/IndexService.java +++ b/core/src/main/java/org/elasticsearch/index/IndexService.java @@ -37,6 +37,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.env.NodeEnvironment; @@ -624,6 +625,26 @@ public synchronized void updateMetaData(final IndexMetaData metadata) { } } if (refreshTask.getInterval().equals(indexSettings.getRefreshInterval()) == false) { + // once we change the refresh interval we schedule yet another refresh + // to ensure we are in a clean and predictable state. + // it doesn't matter if we move from or to -1 in both cases we want + // docs to become visible immediately + threadPool.executor(ThreadPool.Names.REFRESH).execute(new AbstractRunnable() { + @Override + public void onFailure(Exception e) { + logger.warn("forced refresh failed after interval change", e); + } + + @Override + protected void doRun() throws Exception { + maybeRefreshEngine(true); + } + + @Override + public boolean isForceExecution() { + return true; + } + }); rescheduleRefreshTasks(); } final Translog.Durability durability = indexSettings.getTranslogDurability(); @@ -686,8 +707,8 @@ private void maybeFSyncTranslogs() { } } - private void maybeRefreshEngine() { - if (indexSettings.getRefreshInterval().millis() > 0) { + private void maybeRefreshEngine(boolean force) { + if (indexSettings.getRefreshInterval().millis() > 0 || force) { for (IndexShard shard : this.shards.values()) { try { shard.scheduledRefresh(); @@ -892,7 +913,7 @@ final class AsyncRefreshTask extends BaseAsyncTask { @Override protected void runInternal() { - indexService.maybeRefreshEngine(); + indexService.maybeRefreshEngine(false); } @Override diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java index ba451257649e9..ca098f97878f4 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -569,7 +569,7 @@ public void testAutomaticRefresh() throws InterruptedException { // we can't assert on hasRefreshed since it might have been refreshed in the background on the shard concurrently assertFalse(shard.isSearchIdle()); } - ElasticsearchAssertions.assertHitCount(client().prepareSearch().get(), 1l); + assertHitCount(client().prepareSearch().get(), 1l); for (int i = 1; i < numDocs; i++) { client().prepareIndex("test", "test", "" + i).setSource("{\"foo\" : \"bar\"}", XContentType.JSON) .execute(new ActionListener() { @@ -587,6 +587,40 @@ public void onFailure(Exception e) { } indexingDone.await(); t.join(); + } + public void testPendingRefreshWithIntervalChange() throws InterruptedException { + Settings.Builder builder = Settings.builder(); + builder.put(IndexSettings.INDEX_SEARCH_IDLE_AFTER.getKey(), TimeValue.ZERO); + IndexService indexService = createIndex("test", builder.build()); + assertFalse(indexService.getIndexSettings().isExplicitRefresh()); + ensureGreen(); + assertNoSearchHits(client().prepareSearch().get()); + client().prepareIndex("test", "test", "0").setSource("{\"foo\" : \"bar\"}", XContentType.JSON).get(); + IndexShard shard = indexService.getShard(0); + // with ZERO we are guaranteed to see the doc since we will wait for a refresh in the background + assertFalse(shard.scheduledRefresh()); + assertTrue(shard.isSearchIdle()); + CountDownLatch refreshLatch = new CountDownLatch(1); + client().admin().indices().prepareRefresh().execute(ActionListener.wrap(refreshLatch::countDown)); // async on purpose + assertHitCount(client().prepareSearch().get(), 1l); + client().prepareIndex("test", "test", "1").setSource("{\"foo\" : \"bar\"}", XContentType.JSON).get(); + assertFalse(shard.scheduledRefresh()); + + // now disable background refresh and make sure the refresh happens + CountDownLatch updateSettingsLatch = new CountDownLatch(1); + client().admin().indices() + .prepareUpdateSettings("test") + .setSettings(Settings.builder().put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), -1).build()) + .execute(ActionListener.wrap(updateSettingsLatch::countDown)); + assertHitCount(client().prepareSearch().get(), 2l); + // wait for both to ensure we don't have in-flight operations + updateSettingsLatch.await(); + refreshLatch.await(); + + client().prepareIndex("test", "test", "2").setSource("{\"foo\" : \"bar\"}", XContentType.JSON).get(); + assertTrue(shard.scheduledRefresh()); + assertTrue(shard.isSearchIdle()); + assertHitCount(client().prepareSearch().get(), 3l); } } From 69583a0ab542360fed6c4dd5385b1c4627bc3e92 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Thu, 23 Nov 2017 16:21:03 +0100 Subject: [PATCH 06/15] remove unnecessary long markers --- .../java/org/elasticsearch/index/shard/IndexShardIT.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java index ca098f97878f4..8e599ff98688b 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -569,7 +569,7 @@ public void testAutomaticRefresh() throws InterruptedException { // we can't assert on hasRefreshed since it might have been refreshed in the background on the shard concurrently assertFalse(shard.isSearchIdle()); } - assertHitCount(client().prepareSearch().get(), 1l); + assertHitCount(client().prepareSearch().get(), 1); for (int i = 1; i < numDocs; i++) { client().prepareIndex("test", "test", "" + i).setSource("{\"foo\" : \"bar\"}", XContentType.JSON) .execute(new ActionListener() { @@ -603,7 +603,7 @@ public void testPendingRefreshWithIntervalChange() throws InterruptedException { assertTrue(shard.isSearchIdle()); CountDownLatch refreshLatch = new CountDownLatch(1); client().admin().indices().prepareRefresh().execute(ActionListener.wrap(refreshLatch::countDown)); // async on purpose - assertHitCount(client().prepareSearch().get(), 1l); + assertHitCount(client().prepareSearch().get(), 1); client().prepareIndex("test", "test", "1").setSource("{\"foo\" : \"bar\"}", XContentType.JSON).get(); assertFalse(shard.scheduledRefresh()); @@ -613,7 +613,7 @@ public void testPendingRefreshWithIntervalChange() throws InterruptedException { .prepareUpdateSettings("test") .setSettings(Settings.builder().put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), -1).build()) .execute(ActionListener.wrap(updateSettingsLatch::countDown)); - assertHitCount(client().prepareSearch().get(), 2l); + assertHitCount(client().prepareSearch().get(), 2); // wait for both to ensure we don't have in-flight operations updateSettingsLatch.await(); refreshLatch.await(); @@ -621,6 +621,6 @@ public void testPendingRefreshWithIntervalChange() throws InterruptedException { client().prepareIndex("test", "test", "2").setSource("{\"foo\" : \"bar\"}", XContentType.JSON).get(); assertTrue(shard.scheduledRefresh()); assertTrue(shard.isSearchIdle()); - assertHitCount(client().prepareSearch().get(), 3l); + assertHitCount(client().prepareSearch().get(), 3); } } From c0e4176b9a92e189a8af4ee2e22d825980637e7d Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Thu, 23 Nov 2017 16:36:03 +0100 Subject: [PATCH 07/15] add docs --- docs/reference/index-modules.asciidoc | 14 ++++++++++++-- .../migration/migrate_7_0/indices.asciidoc | 9 ++++++++- 2 files changed, 20 insertions(+), 3 deletions(-) diff --git a/docs/reference/index-modules.asciidoc b/docs/reference/index-modules.asciidoc index bf93f62847fb6..19b0ebfe991e7 100644 --- a/docs/reference/index-modules.asciidoc +++ b/docs/reference/index-modules.asciidoc @@ -107,11 +107,21 @@ specific index module: Set to a dash delimited lower and upper bound (e.g. `0-5`) or use `all` for the upper bound (e.g. `0-all`). Defaults to `false` (i.e. disabled). +`index.search.idle.after`:: + How long a shard can not receive a search or get request until it's considered + search idle. (default is `30s`) + `index.refresh_interval`:: How often to perform a refresh operation, which makes recent changes to the - index visible to search. Defaults to `1s`. Can be set to `-1` to disable - refresh. + index visible to search. Defaults to `1s`. Can be set to `-1` to disable + refresh. If this setting is not explicitly set, shards that haven't seen + search traffic for at least `index.search.idle.after` won't receive + background refreshes until they are not idle anymore. Searches that hit + an idle shard where a refresh is pending will wait in the background + for the next background refresh. This behavior aims to automatically optimize + bulk indexing in the default case when no searches are performed. In order to + opt out of this behavior an explicit value of `1s` should set as the refresh interval. `index.max_result_window`:: diff --git a/docs/reference/migration/migrate_7_0/indices.asciidoc b/docs/reference/migration/migrate_7_0/indices.asciidoc index 92f56a2ddbb17..ce14f7449788d 100644 --- a/docs/reference/migration/migrate_7_0/indices.asciidoc +++ b/docs/reference/migration/migrate_7_0/indices.asciidoc @@ -44,4 +44,11 @@ Indices created with version `7.0.0` onwards will have an automatic `index.numbe value set. This might change how documents are distributed across shards depending on how many shards the index has. In order to maintain the exact same distribution as a pre `7.0.0` index, the `index.number_of_routing_shards` must be set to the `index.number_of_shards` at index creation time. -Note: if the number of routing shards equals the number of shards `_split` operations are not supported. \ No newline at end of file +Note: if the number of routing shards equals the number of shards `_split` operations are not supported. + +==== Skipped background refresh on search idle shards. + + Shards that haven't seen any search traffic for `index.search.idle.after` (defaults to `30s`) + won't receive background refreshes until they become non-idle and unless a requires waits for a refresh. + Searches that access a search idle shard will be `parked` until the next refresh happens. This feature is only + enabled unless an explicit background refresh interval via `index.refresh_interval` is set. From ca92314cfb5b6f3760518f5fe7a9366df6841c50 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Thu, 23 Nov 2017 16:39:15 +0100 Subject: [PATCH 08/15] fix imports --- .../java/org/elasticsearch/index/shard/IndexShardIT.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java index 8e599ff98688b..485b5aaffcff1 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -65,7 +65,6 @@ import org.elasticsearch.test.ESSingleNodeTestCase; import org.elasticsearch.test.IndexSettingsModule; import org.elasticsearch.test.InternalSettingsPlugin; -import org.elasticsearch.test.hamcrest.ElasticsearchAssertions; import java.io.IOException; import java.io.UncheckedIOException; @@ -92,7 +91,10 @@ import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS; import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; -import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.*; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoSearchHits; import static org.hamcrest.Matchers.equalTo; public class IndexShardIT extends ESSingleNodeTestCase { From b2f77a457f49c5857f6a2251759a9dfbfdcb30fc Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Thu, 23 Nov 2017 16:57:50 +0100 Subject: [PATCH 09/15] improve docs --- docs/reference/index-modules.asciidoc | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/docs/reference/index-modules.asciidoc b/docs/reference/index-modules.asciidoc index 19b0ebfe991e7..6dd0fad436592 100644 --- a/docs/reference/index-modules.asciidoc +++ b/docs/reference/index-modules.asciidoc @@ -116,12 +116,13 @@ specific index module: How often to perform a refresh operation, which makes recent changes to the index visible to search. Defaults to `1s`. Can be set to `-1` to disable refresh. If this setting is not explicitly set, shards that haven't seen - search traffic for at least `index.search.idle.after` won't receive - background refreshes until they are not idle anymore. Searches that hit - an idle shard where a refresh is pending will wait in the background - for the next background refresh. This behavior aims to automatically optimize - bulk indexing in the default case when no searches are performed. In order to - opt out of this behavior an explicit value of `1s` should set as the refresh interval. + search traffic for at least `index.search.idle.after` seconds will not receive + background refreshes until they receive a search request. Searches that hit an + idle shard where a refresh is pending will wait for the next background + refresh (within `1s`). This behavior aims to automatically optimize bulk + indexing in the default case when no searches are performed. In order to opt + out of this behavior an explicit value of `1s` should set as the refresh + interval. `index.max_result_window`:: From 20f5c21f324c3c1f49e4433208ede3ae926d4075 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Thu, 23 Nov 2017 17:04:05 +0100 Subject: [PATCH 10/15] more docs --- docs/reference/migration/migrate_7_0/indices.asciidoc | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/docs/reference/migration/migrate_7_0/indices.asciidoc b/docs/reference/migration/migrate_7_0/indices.asciidoc index ce14f7449788d..16e437b4156e8 100644 --- a/docs/reference/migration/migrate_7_0/indices.asciidoc +++ b/docs/reference/migration/migrate_7_0/indices.asciidoc @@ -48,7 +48,10 @@ Note: if the number of routing shards equals the number of shards `_split` opera ==== Skipped background refresh on search idle shards. - Shards that haven't seen any search traffic for `index.search.idle.after` (defaults to `30s`) - won't receive background refreshes until they become non-idle and unless a requires waits for a refresh. - Searches that access a search idle shard will be `parked` until the next refresh happens. This feature is only - enabled unless an explicit background refresh interval via `index.refresh_interval` is set. +Shards belonging to an index that does not have an explicit +`index.refresh_interval` configured will no longer refresh in the background +once the shard becomes "search idle", ie the shard hasn't seen any search +traffic for `index.search.idle.after` seconds (defaults to `30s`). Searches +that access a search idle shard will be "parked" until the next refresh +happens. Indexing requests with `wait_for_refresh` will also trigger +a background refresh. From bdf9a7e4a84a7ecdfebbabb1162ea6cb1be8819a Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Thu, 23 Nov 2017 20:01:14 +0100 Subject: [PATCH 11/15] bring back listeners check --- .../main/java/org/elasticsearch/index/shard/IndexShard.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 0ca462522ab30..611ecd80b6710 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -2429,8 +2429,9 @@ EngineFactory getEngineFactory() { * @return true iff the engine got refreshed otherwise false */ public boolean scheduledRefresh() { - if (isReadAllowed() && getEngine().refreshNeeded()) { - if (refreshListeners.refreshNeeded() == false // if we have a listener that is waiting for a refresh we need to force it + boolean listenerNeedsRefresh = refreshListeners.refreshNeeded(); + if (isReadAllowed() && (listenerNeedsRefresh || getEngine().refreshNeeded())) { + if (listenerNeedsRefresh == false // if we have a listener that is waiting for a refresh we need to force it && isSearchIdle() && indexSettings.isExplicitRefresh() == false) { // lets skip this refresh since we are search idle and // don't necessarily need to refresh. the next searcher access will register a refreshListener and that will From 88a8ec0e78d45fbf4849c2f1b79d2ac30b7cc773 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Sat, 25 Nov 2017 14:44:01 +0100 Subject: [PATCH 12/15] fix import after merge with master --- core/src/main/java/org/elasticsearch/index/shard/IndexShard.java | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index ba1f8c7f5795e..d84b310881f68 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -154,6 +154,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.Consumer; From 1e1d739322c2467a8a57aa3d2dce933470e40788 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Mon, 27 Nov 2017 14:46:45 +0100 Subject: [PATCH 13/15] ensure we set shard non idle on awaitPendingRefresh --- .../elasticsearch/index/shard/IndexShard.java | 19 +++++++++++++++---- .../index/shard/IndexShardTests.java | 7 +++++++ 2 files changed, 22 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index d84b310881f68..6c937eb930b21 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1137,9 +1137,13 @@ public Engine.Searcher acquireSearcher(String source) { return acquireSearcher(source, Engine.SearcherScope.EXTERNAL); } + private void markSearcherAccessed() { + lastSearcherAccess.lazySet(threadPool.relativeTimeInMillis()); + } + private Engine.Searcher acquireSearcher(String source, Engine.SearcherScope scope) { readAllowed(); - lastSearcherAccess.lazySet(threadPool.relativeTimeInMillis()); + markSearcherAccessed(); final Engine engine = getEngine(); final Engine.Searcher searcher = engine.acquireSearcher(source, scope); boolean success = false; @@ -2468,11 +2472,15 @@ final boolean isSearchIdle() { return (threadPool.relativeTimeInMillis() - lastSearcherAccess.get()) >= indexSettings.getSearchIdleAfter().getMillis(); } + /** + * Returns the last timestamp the searcher was accessed. This is a relative timestamp in milliseconds. + */ + final long getLastSearcherAccess() { + return lastSearcherAccess.get(); + } + private void setRefreshPending() { Engine engine = getEngine(); - if (isSearchIdle()) { - acquireSearcher("setRefreshPending").close(); // move the shard into non-search idle - } Translog.Location lastWriteLocation = engine.getTranslog().getLastWriteLocation(); Translog.Location location; do { @@ -2490,6 +2498,9 @@ private void setRefreshPending() { * true if the listener was registered to wait for a refresh. */ public void awaitPendingRefresh(Consumer listener) { + if (isSearchIdle()) { + markSearcherAccessed(); // move the shard into non-search idle + } final Translog.Location location = pendingRefreshLocation.get(); if (location != null) { addRefreshListener(location, (b) -> { diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 84727761a530d..f72dd89cb4ad0 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -2649,7 +2649,11 @@ public void testScheduledRefresh() throws IOException, InterruptedException { assertFalse(primary.getEngine().refreshNeeded()); indexDoc(primary, "test", "1", "{\"foo\" : \"bar\"}"); assertTrue(primary.getEngine().refreshNeeded()); + long lastSearchAccess = primary.getLastSearcherAccess(); assertFalse(primary.scheduledRefresh()); + assertEquals(lastSearchAccess, primary.getLastSearcherAccess()); + // wait until the thread-pool has moved the timestamp otherwise we can't assert on this below + awaitBusy(() -> primary.getThreadPool().relativeTimeInMillis() > lastSearchAccess); CountDownLatch latch = new CountDownLatch(10); for (int i = 0; i < 10; i++) { primary.awaitPendingRefresh(refreshed -> { @@ -2661,6 +2665,9 @@ public void testScheduledRefresh() throws IOException, InterruptedException { } }); } + assertNotEquals("awaitPendingRefresh must access a searcher to remove search idle state", lastSearchAccess, + primary.getLastSearcherAccess()); + assertTrue(lastSearchAccess < primary.getLastSearcherAccess()); try (Engine.Searcher searcher = primary.acquireSearcher("test")) { assertEquals(1, searcher.reader().numDocs()); } From dd977971f5006f2d2cbd8e995c538add4f4a89a5 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Mon, 27 Nov 2017 15:27:57 +0100 Subject: [PATCH 14/15] apply feedback from @bleskes --- .../explain/TransportExplainAction.java | 6 +++--- .../action/get/TransportGetAction.java | 20 +++++++++++-------- .../shard/TransportSingleShardAction.java | 4 ++-- .../TransportTermVectorsAction.java | 20 +++++++++++-------- .../org/elasticsearch/index/IndexService.java | 3 ++- .../index/shard/IndexShardIT.java | 4 ++-- .../index/shard/IndexShardTests.java | 6 ++---- 7 files changed, 35 insertions(+), 28 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/explain/TransportExplainAction.java b/core/src/main/java/org/elasticsearch/action/explain/TransportExplainAction.java index c8f48bfbde71e..39adadd54097d 100644 --- a/core/src/main/java/org/elasticsearch/action/explain/TransportExplainAction.java +++ b/core/src/main/java/org/elasticsearch/action/explain/TransportExplainAction.java @@ -91,13 +91,13 @@ protected void resolveRequest(ClusterState state, InternalRequest request) { } @Override - protected void shardOperation(ExplainRequest request, ShardId shardId, ActionListener listener) throws IOException { + protected void asyncShardOperation(ExplainRequest request, ShardId shardId, ActionListener listener) throws IOException { IndexService indexService = searchService.getIndicesService().indexServiceSafe(shardId.getIndex()); IndexShard indexShard = indexService.getShard(shardId.id()); indexShard.awaitPendingRefresh(b -> { try { - super.shardOperation(request, shardId, listener); - } catch (IOException ex) { + super.asyncShardOperation(request, shardId, listener); + } catch (Exception ex) { listener.onFailure(ex); } }); diff --git a/core/src/main/java/org/elasticsearch/action/get/TransportGetAction.java b/core/src/main/java/org/elasticsearch/action/get/TransportGetAction.java index 4772f06deeb5b..cddeb4ee0dcef 100644 --- a/core/src/main/java/org/elasticsearch/action/get/TransportGetAction.java +++ b/core/src/main/java/org/elasticsearch/action/get/TransportGetAction.java @@ -82,16 +82,20 @@ protected void resolveRequest(ClusterState state, InternalRequest request) { } @Override - protected void shardOperation(GetRequest request, ShardId shardId, ActionListener listener) throws IOException { + protected void asyncShardOperation(GetRequest request, ShardId shardId, ActionListener listener) throws IOException { IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); IndexShard indexShard = indexService.getShard(shardId.id()); - indexShard.awaitPendingRefresh(b -> { - try { - super.shardOperation(request, shardId, listener); - } catch (IOException ex) { - listener.onFailure(ex); - } - }); + if (request.realtime()) { // we are not tied to a refresh cycle here anyway + listener.onResponse(shardOperation(request, shardId)); + } else { + indexShard.awaitPendingRefresh(b -> { + try { + super.asyncShardOperation(request, shardId, listener); + } catch (Exception ex) { + listener.onFailure(ex); + } + }); + } } @Override diff --git a/core/src/main/java/org/elasticsearch/action/support/single/shard/TransportSingleShardAction.java b/core/src/main/java/org/elasticsearch/action/support/single/shard/TransportSingleShardAction.java index 4c780647479f8..f2b2090dc286e 100644 --- a/core/src/main/java/org/elasticsearch/action/support/single/shard/TransportSingleShardAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/single/shard/TransportSingleShardAction.java @@ -100,7 +100,7 @@ protected void doExecute(Request request, ActionListener listener) { protected abstract Response shardOperation(Request request, ShardId shardId) throws IOException; - protected void shardOperation(Request request, ShardId shardId, ActionListener listener) throws IOException { + protected void asyncShardOperation(Request request, ShardId shardId, ActionListener listener) throws IOException { threadPool.executor(this.executor).execute(new AbstractRunnable() { @Override public void onFailure(Exception e) { @@ -307,7 +307,7 @@ public void messageReceived(final Request request, final TransportChannel channe if (logger.isTraceEnabled()) { logger.trace("executing [{}] on shard [{}]", request, request.internalShardId); } - shardOperation(request, request.internalShardId, new ActionListener() { + asyncShardOperation(request, request.internalShardId, new ActionListener() { @Override public void onResponse(Response response) { try { diff --git a/core/src/main/java/org/elasticsearch/action/termvectors/TransportTermVectorsAction.java b/core/src/main/java/org/elasticsearch/action/termvectors/TransportTermVectorsAction.java index f22b7342ba1c4..bc9f640c35744 100644 --- a/core/src/main/java/org/elasticsearch/action/termvectors/TransportTermVectorsAction.java +++ b/core/src/main/java/org/elasticsearch/action/termvectors/TransportTermVectorsAction.java @@ -86,16 +86,20 @@ protected void resolveRequest(ClusterState state, InternalRequest request) { } @Override - protected void shardOperation(TermVectorsRequest request, ShardId shardId, ActionListener listener) throws IOException { + protected void asyncShardOperation(TermVectorsRequest request, ShardId shardId, ActionListener listener) throws IOException { IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); IndexShard indexShard = indexService.getShard(shardId.id()); - indexShard.awaitPendingRefresh(b -> { - try { - super.shardOperation(request, shardId, listener); - } catch (IOException ex) { - listener.onFailure(ex); - } - }); + if (request.realtime()) { // it's a realtime request which is not subject to refresh cycles + listener.onResponse(shardOperation(request, shardId)); + } else { + indexShard.awaitPendingRefresh(b -> { + try { + super.asyncShardOperation(request, shardId, listener); + } catch (Exception ex) { + listener.onFailure(ex); + } + }); + } } @Override diff --git a/core/src/main/java/org/elasticsearch/index/IndexService.java b/core/src/main/java/org/elasticsearch/index/IndexService.java index 03744d6f3ad45..78489965e3944 100644 --- a/core/src/main/java/org/elasticsearch/index/IndexService.java +++ b/core/src/main/java/org/elasticsearch/index/IndexService.java @@ -628,7 +628,8 @@ public synchronized void updateMetaData(final IndexMetaData metadata) { // once we change the refresh interval we schedule yet another refresh // to ensure we are in a clean and predictable state. // it doesn't matter if we move from or to -1 in both cases we want - // docs to become visible immediately + // docs to become visible immediately. This also flushes all pending indexing / search reqeusts + // that are waiting for a refresh. threadPool.executor(ThreadPool.Names.REFRESH).execute(new AbstractRunnable() { @Override public void onFailure(Exception e) { diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java index 485b5aaffcff1..7c38b7c211f06 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -600,11 +600,11 @@ public void testPendingRefreshWithIntervalChange() throws InterruptedException { assertNoSearchHits(client().prepareSearch().get()); client().prepareIndex("test", "test", "0").setSource("{\"foo\" : \"bar\"}", XContentType.JSON).get(); IndexShard shard = indexService.getShard(0); - // with ZERO we are guaranteed to see the doc since we will wait for a refresh in the background assertFalse(shard.scheduledRefresh()); assertTrue(shard.isSearchIdle()); CountDownLatch refreshLatch = new CountDownLatch(1); - client().admin().indices().prepareRefresh().execute(ActionListener.wrap(refreshLatch::countDown)); // async on purpose + client().admin().indices().prepareRefresh() + .execute(ActionListener.wrap(refreshLatch::countDown));// async on purpose to make sure it happens concurrently assertHitCount(client().prepareSearch().get(), 1); client().prepareIndex("test", "test", "1").setSource("{\"foo\" : \"bar\"}", XContentType.JSON).get(); assertFalse(shard.scheduledRefresh()); diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index f72dd89cb4ad0..6993fc3d1af11 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -2589,7 +2589,7 @@ public void verify(String verificationToken, DiscoveryNode localNode) { } } - public void testIsSearchIdle() throws IOException { + public void testIsSearchIdle() throws Exception { Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) @@ -2618,9 +2618,7 @@ public void testIsSearchIdle() throws IOException { settings = Settings.builder().put(settings).put(IndexSettings.INDEX_SEARCH_IDLE_AFTER.getKey(), TimeValue.timeValueMillis(10)) .build(); scopedSettings.applySettings(settings); - while (primary.isSearchIdle() == false) { - // wait for it to become idle - } + assertBusy(() -> assertFalse(primary.isSearchIdle())); do { // now loop until we are fast enough... shouldn't take long primary.acquireSearcher("test").close(); From cba052b27ac5917f0f062864155c0f0f64480407 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Mon, 27 Nov 2017 16:39:21 +0100 Subject: [PATCH 15/15] fix naming and more shard active out of acquireSearcher --- .../action/explain/TransportExplainAction.java | 4 +--- .../action/get/TransportGetAction.java | 5 +---- .../termvectors/TransportTermVectorsAction.java | 2 +- .../org/elasticsearch/index/shard/IndexShard.java | 14 ++++++++++---- .../org/elasticsearch/search/SearchService.java | 3 +-- .../elasticsearch/index/shard/IndexShardTests.java | 6 +++--- 6 files changed, 17 insertions(+), 17 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/explain/TransportExplainAction.java b/core/src/main/java/org/elasticsearch/action/explain/TransportExplainAction.java index 39adadd54097d..5b20b848f0b04 100644 --- a/core/src/main/java/org/elasticsearch/action/explain/TransportExplainAction.java +++ b/core/src/main/java/org/elasticsearch/action/explain/TransportExplainAction.java @@ -24,8 +24,6 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.RoutingMissingException; -import org.elasticsearch.action.get.GetRequest; -import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.single.shard.TransportSingleShardAction; import org.elasticsearch.cluster.ClusterState; @@ -94,7 +92,7 @@ protected void resolveRequest(ClusterState state, InternalRequest request) { protected void asyncShardOperation(ExplainRequest request, ShardId shardId, ActionListener listener) throws IOException { IndexService indexService = searchService.getIndicesService().indexServiceSafe(shardId.getIndex()); IndexShard indexShard = indexService.getShard(shardId.id()); - indexShard.awaitPendingRefresh(b -> { + indexShard.awaitShardSearchActive(b -> { try { super.asyncShardOperation(request, shardId, listener); } catch (Exception ex) { diff --git a/core/src/main/java/org/elasticsearch/action/get/TransportGetAction.java b/core/src/main/java/org/elasticsearch/action/get/TransportGetAction.java index cddeb4ee0dcef..d14db67744d3c 100644 --- a/core/src/main/java/org/elasticsearch/action/get/TransportGetAction.java +++ b/core/src/main/java/org/elasticsearch/action/get/TransportGetAction.java @@ -23,12 +23,9 @@ import org.elasticsearch.action.RoutingMissingException; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.single.shard.TransportSingleShardAction; -import org.elasticsearch.action.termvectors.TermVectorsRequest; -import org.elasticsearch.action.termvectors.TermVectorsResponse; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.routing.Preference; import org.elasticsearch.cluster.routing.ShardIterator; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; @@ -88,7 +85,7 @@ protected void asyncShardOperation(GetRequest request, ShardId shardId, ActionLi if (request.realtime()) { // we are not tied to a refresh cycle here anyway listener.onResponse(shardOperation(request, shardId)); } else { - indexShard.awaitPendingRefresh(b -> { + indexShard.awaitShardSearchActive(b -> { try { super.asyncShardOperation(request, shardId, listener); } catch (Exception ex) { diff --git a/core/src/main/java/org/elasticsearch/action/termvectors/TransportTermVectorsAction.java b/core/src/main/java/org/elasticsearch/action/termvectors/TransportTermVectorsAction.java index bc9f640c35744..289f40f1a34a8 100644 --- a/core/src/main/java/org/elasticsearch/action/termvectors/TransportTermVectorsAction.java +++ b/core/src/main/java/org/elasticsearch/action/termvectors/TransportTermVectorsAction.java @@ -92,7 +92,7 @@ protected void asyncShardOperation(TermVectorsRequest request, ShardId shardId, if (request.realtime()) { // it's a realtime request which is not subject to refresh cycles listener.onResponse(shardOperation(request, shardId)); } else { - indexShard.awaitPendingRefresh(b -> { + indexShard.awaitShardSearchActive(b -> { try { super.asyncShardOperation(request, shardId, listener); } catch (Exception ex) { diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 6c937eb930b21..dc144c13d50a4 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -872,6 +872,9 @@ public DocsStats docStats() { long numDeletedDocs = 0; long sizeInBytes = 0; try (Engine.Searcher searcher = acquireSearcher("docStats", Engine.SearcherScope.INTERNAL)) { + // we don't wait for a pending refreshes here since it's a stats call instead we mark it as accesssed only which will cause + // the next scheduled refresh to go through and refresh the stats as well + markSearcherAccessed(); for (LeafReaderContext reader : searcher.reader().leaves()) { // we go on the segment level here to get accurate numbers final SegmentReader segmentReader = Lucene.segmentReader(reader.reader()); @@ -968,6 +971,9 @@ public TranslogStats translogStats() { public CompletionStats completionStats(String... fields) { CompletionStats completionStats = new CompletionStats(); try (Engine.Searcher currentSearcher = acquireSearcher("completion_stats")) { + // we don't wait for a pending refreshes here since it's a stats call instead we mark it as accesssed only which will cause + // the next scheduled refresh to go through and refresh the stats as well + markSearcherAccessed(); completionStats.add(CompletionFieldStats.completionStats(currentSearcher.reader(), fields)); } return completionStats; @@ -1143,7 +1149,6 @@ private void markSearcherAccessed() { private Engine.Searcher acquireSearcher(String source, Engine.SearcherScope scope) { readAllowed(); - markSearcherAccessed(); final Engine engine = getEngine(); final Engine.Searcher searcher = engine.acquireSearcher(source, scope); boolean success = false; @@ -2492,12 +2497,13 @@ private void setRefreshPending() { } /** - * Registers the given listener and invokes it once the pending refresh translog location has been refreshed. If there is no pending - * refresh location registered the listener will be invoked immediately. + * Registers the given listener and invokes it once the shard is active again and all + * pending refresh translog location has been refreshed. If there is no pending refresh location registered the listener will be + * invoked immediately. * @param listener the listener to invoke once the pending refresh location is visible. The listener will be called with * true if the listener was registered to wait for a refresh. */ - public void awaitPendingRefresh(Consumer listener) { + public final void awaitShardSearchActive(Consumer listener) { if (isSearchIdle()) { markSearcherAccessed(); // move the shard into non-search idle } diff --git a/core/src/main/java/org/elasticsearch/search/SearchService.java b/core/src/main/java/org/elasticsearch/search/SearchService.java index 1f60756a65f02..117a979639bb1 100644 --- a/core/src/main/java/org/elasticsearch/search/SearchService.java +++ b/core/src/main/java/org/elasticsearch/search/SearchService.java @@ -53,7 +53,6 @@ import org.elasticsearch.index.shard.IndexEventListener; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.SearchOperationListener; -import org.elasticsearch.index.translog.Translog; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason; import org.elasticsearch.node.ResponseCollectorService; @@ -998,7 +997,7 @@ protected void doRun() throws Exception { // now we need to check if there is a pending refresh and register ActionListener finalListener = actionListener; actionListener = ActionListener.wrap(r -> - shardOrNull.awaitPendingRefresh(b -> finalListener.onResponse(r)), finalListener::onFailure); + shardOrNull.awaitShardSearchActive(b -> finalListener.onResponse(r)), finalListener::onFailure); } // we also do rewrite on the coordinating node (TransportSearchService) but we also need to do it here for BWC as well as // AliasFilters that might need to be rewritten. These are edge-cases but we are every efficient doing the rewrite here so it's not diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 6993fc3d1af11..9f7cdf884707b 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -2654,7 +2654,7 @@ public void testScheduledRefresh() throws IOException, InterruptedException { awaitBusy(() -> primary.getThreadPool().relativeTimeInMillis() > lastSearchAccess); CountDownLatch latch = new CountDownLatch(10); for (int i = 0; i < 10; i++) { - primary.awaitPendingRefresh(refreshed -> { + primary.awaitShardSearchActive(refreshed -> { assertTrue(refreshed); try (Engine.Searcher searcher = primary.acquireSearcher("test")) { assertEquals(2, searcher.reader().numDocs()); @@ -2663,7 +2663,7 @@ public void testScheduledRefresh() throws IOException, InterruptedException { } }); } - assertNotEquals("awaitPendingRefresh must access a searcher to remove search idle state", lastSearchAccess, + assertNotEquals("awaitShardSearchActive must access a searcher to remove search idle state", lastSearchAccess, primary.getLastSearcherAccess()); assertTrue(lastSearchAccess < primary.getLastSearcherAccess()); try (Engine.Searcher searcher = primary.acquireSearcher("test")) { @@ -2673,7 +2673,7 @@ public void testScheduledRefresh() throws IOException, InterruptedException { assertTrue(primary.scheduledRefresh()); latch.await(); CountDownLatch latch1 = new CountDownLatch(1); - primary.awaitPendingRefresh(refreshed -> { + primary.awaitShardSearchActive(refreshed -> { assertFalse(refreshed); try (Engine.Searcher searcher = primary.acquireSearcher("test")) { assertEquals(2, searcher.reader().numDocs());