Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Skip shard refreshes if shard is search idle #27500

Merged
merged 22 commits into from
Nov 27, 2017
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.RoutingMissingException;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.single.shard.TransportSingleShardAction;
import org.elasticsearch.cluster.ClusterState;
Expand All @@ -33,8 +35,10 @@
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.get.GetResult;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.internal.AliasFilter;
Expand Down Expand Up @@ -86,6 +90,19 @@ protected void resolveRequest(ClusterState state, InternalRequest request) {
}
}

@Override
protected void shardOperation(ExplainRequest request, ShardId shardId, ActionListener<ExplainResponse> listener) throws IOException {
IndexService indexService = searchService.getIndicesService().indexServiceSafe(shardId.getIndex());
IndexShard indexShard = indexService.getShard(shardId.id());
indexShard.awaitPendingRefresh(b -> {
try {
super.shardOperation(request, shardId, listener);
} catch (IOException ex) {
listener.onFailure(ex);
}
});
}

@Override
protected ExplainResponse shardOperation(ExplainRequest request, ShardId shardId) throws IOException {
ShardSearchLocalRequest shardSearchLocalRequest = new ShardSearchLocalRequest(shardId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@

package org.elasticsearch.action.get;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.RoutingMissingException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.single.shard.TransportSingleShardAction;
import org.elasticsearch.action.termvectors.TermVectorsRequest;
import org.elasticsearch.action.termvectors.TermVectorsResponse;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
Expand All @@ -38,6 +41,8 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;

/**
* Performs the get operation.
*/
Expand Down Expand Up @@ -76,6 +81,19 @@ protected void resolveRequest(ClusterState state, InternalRequest request) {
}
}

@Override
protected void shardOperation(GetRequest request, ShardId shardId, ActionListener<GetResponse> listener) throws IOException {
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
IndexShard indexShard = indexService.getShard(shardId.id());
indexShard.awaitPendingRefresh(b -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm on the fence as to whether we should only do this on non-realtime get. Real time gets don't really relate to refresh cycles (they force a refresh if needed). They are already "efficient" in the sense that they only refresh if they need to (i.e., there's a pending doc change in the version map).

try {
super.shardOperation(request, shardId, listener);
} catch (IOException ex) {
listener.onFailure(ex);
}
});
}

@Override
protected GetResponse shardOperation(GetRequest request, ShardId shardId) {
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.logging.LoggerMessageFormat;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportChannel;
Expand All @@ -47,6 +48,8 @@
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.concurrent.Executor;
import java.util.function.Supplier;

import static org.elasticsearch.action.support.TransportActions.isShardNotAvailableException;
Expand Down Expand Up @@ -78,7 +81,7 @@ protected TransportSingleShardAction(Settings settings, String actionName, Threa
if (!isSubAction()) {
transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, new TransportHandler());
}
transportService.registerRequestHandler(transportShardAction, request, executor, new ShardTransportHandler());
transportService.registerRequestHandler(transportShardAction, request, ThreadPool.Names.SAME, new ShardTransportHandler());
}

/**
Expand All @@ -97,6 +100,19 @@ protected void doExecute(Request request, ActionListener<Response> listener) {

protected abstract Response shardOperation(Request request, ShardId shardId) throws IOException;

protected void shardOperation(Request request, ShardId shardId, ActionListener<Response> listener) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe call this asyncShardOperation to avoid confusion? also, can you please java doc the fact that this is still called on the networking thread?

threadPool.executor(this.executor).execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}

@Override
protected void doRun() throws Exception {
listener.onResponse(shardOperation(request, shardId));
}
});
}
protected abstract Response newResponse();

protected abstract boolean resolveIndex(Request request);
Expand Down Expand Up @@ -291,11 +307,27 @@ public void messageReceived(final Request request, final TransportChannel channe
if (logger.isTraceEnabled()) {
logger.trace("executing [{}] on shard [{}]", request, request.internalShardId);
}
Response response = shardOperation(request, request.internalShardId);
channel.sendResponse(response);
shardOperation(request, request.internalShardId, new ActionListener<Response>() {
@Override
public void onResponse(Response response) {
try {
channel.sendResponse(response);
} catch (IOException e) {
onFailure(e);
}
}

@Override
public void onFailure(Exception e) {
try {
channel.sendResponse(e);
} catch (IOException e1) {
throw new UncheckedIOException(e1);
}
}
});
}
}

/**
* Internal request class that gets built on each node. Holds the original request plus additional info.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.action.termvectors;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.RoutingMissingException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.single.shard.TransportSingleShardAction;
Expand All @@ -37,6 +38,8 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;

/**
* Performs the get operation.
*/
Expand Down Expand Up @@ -82,6 +85,19 @@ protected void resolveRequest(ClusterState state, InternalRequest request) {
}
}

@Override
protected void shardOperation(TermVectorsRequest request, ShardId shardId, ActionListener<TermVectorsResponse> listener) throws IOException {
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
IndexShard indexShard = indexService.getShard(shardId.id());
indexShard.awaitPendingRefresh(b -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment here w.r.t real time requests.

try {
super.shardOperation(request, shardId, listener);
} catch (IOException ex) {
listener.onFailure(ex);
}
});
}

@Override
protected TermVectorsResponse shardOperation(TermVectorsRequest request, ShardId shardId) {
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.elasticsearch.index.fielddata.IndexFieldDataService;
import org.elasticsearch.index.mapper.FieldMapper;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.seqno.LocalCheckpointTracker;
import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.store.FsDirectoryService;
import org.elasticsearch.index.store.Store;
Expand Down Expand Up @@ -135,6 +134,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
IndexSettings.INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING,
IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING,
IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING,
IndexSettings.INDEX_SEARCH_IDLE_AFTER,
IndexFieldDataService.INDEX_FIELDDATA_CACHE_KEY,
FieldMapper.IGNORE_MALFORMED_SETTING,
FieldMapper.COERCE_SETTING,
Expand Down
12 changes: 4 additions & 8 deletions core/src/main/java/org/elasticsearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -689,14 +689,10 @@ private void maybeFSyncTranslogs() {
private void maybeRefreshEngine() {
if (indexSettings.getRefreshInterval().millis() > 0) {
for (IndexShard shard : this.shards.values()) {
if (shard.isReadAllowed()) {
try {
if (shard.isRefreshNeeded()) {
shard.refresh("schedule");
}
} catch (IndexShardClosedException | AlreadyClosedException ex) {
// fine - continue;
}
try {
shard.scheduledRefresh();
} catch (IndexShardClosedException | AlreadyClosedException ex) {
// fine - continue;
}
}
}
Expand Down
21 changes: 21 additions & 0 deletions core/src/main/java/org/elasticsearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ public final class IndexSettings {
public static final Setting<TimeValue> INDEX_TRANSLOG_SYNC_INTERVAL_SETTING =
Setting.timeSetting("index.translog.sync_interval", TimeValue.timeValueSeconds(5), TimeValue.timeValueMillis(100),
Property.IndexScope);
public static final Setting<TimeValue> INDEX_SEARCH_IDLE_AFTER =
Setting.timeSetting("index.search.idle.after", TimeValue.timeValueSeconds(30),
TimeValue.timeValueMinutes(0), Property.IndexScope, Property.Dynamic);
public static final Setting<Translog.Durability> INDEX_TRANSLOG_DURABILITY_SETTING =
new Setting<>("index.translog.durability", Translog.Durability.REQUEST.name(),
(value) -> Translog.Durability.valueOf(value.toUpperCase(Locale.ROOT)), Property.Dynamic, Property.IndexScope);
Expand Down Expand Up @@ -262,6 +265,8 @@ public final class IndexSettings {
private volatile int maxNgramDiff;
private volatile int maxShingleDiff;
private volatile boolean TTLPurgeDisabled;
private volatile TimeValue searchIdleAfter;

/**
* The maximum number of refresh listeners allows on this shard.
*/
Expand Down Expand Up @@ -371,6 +376,7 @@ public IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSetti
maxSlicesPerScroll = scopedSettings.get(MAX_SLICES_PER_SCROLL);
this.mergePolicyConfig = new MergePolicyConfig(logger, this);
this.indexSortConfig = new IndexSortConfig(this);
searchIdleAfter = scopedSettings.get(INDEX_SEARCH_IDLE_AFTER);
singleType = INDEX_MAPPING_SINGLE_TYPE_SETTING.get(indexMetaData.getSettings()); // get this from metadata - it's not registered
if ((singleType || version.before(Version.V_6_0_0_alpha1)) == false) {
throw new AssertionError(index.toString() + "multiple types are only allowed on pre 6.x indices but version is: ["
Expand Down Expand Up @@ -411,8 +417,11 @@ public IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSetti
scopedSettings.addSettingsUpdateConsumer(MAX_REFRESH_LISTENERS_PER_SHARD, this::setMaxRefreshListeners);
scopedSettings.addSettingsUpdateConsumer(MAX_SLICES_PER_SCROLL, this::setMaxSlicesPerScroll);
scopedSettings.addSettingsUpdateConsumer(DEFAULT_FIELD_SETTING, this::setDefaultFields);
scopedSettings.addSettingsUpdateConsumer(INDEX_SEARCH_IDLE_AFTER, this::setSearchIdleAfter);
}

private void setSearchIdleAfter(TimeValue searchIdleAfter) { this.searchIdleAfter = searchIdleAfter; }

private void setTranslogFlushThresholdSize(ByteSizeValue byteSizeValue) {
this.flushThresholdSize = byteSizeValue;
}
Expand Down Expand Up @@ -752,4 +761,16 @@ public IndexSortConfig getIndexSortConfig() {
}

public IndexScopedSettings getScopedSettings() { return scopedSettings;}

/**
* Returns true iff the refresh setting exists or in other words is explicitly set.
*/
public boolean isExplicitRefresh() {
return INDEX_REFRESH_INTERVAL_SETTING.exists(settings);
}

/**
* Returns the time that an index shard becomes search idle unless it's accessed in between
*/
public TimeValue getSearchIdleAfter() { return searchIdleAfter; }
}
63 changes: 61 additions & 2 deletions core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,9 @@ Runnable getGlobalCheckpointSyncer() {
*/
private final RefreshListeners refreshListeners;

private final AtomicLong lastSearcherAccess = new AtomicLong();
private final AtomicReference<Translog.Location> pendingRefreshLocation = new AtomicReference<>();

public IndexShard(
ShardRouting shardRouting,
IndexSettings indexSettings,
Expand Down Expand Up @@ -298,6 +301,7 @@ public IndexShard(
searcherWrapper = indexSearcherWrapper;
primaryTerm = indexSettings.getIndexMetaData().primaryTerm(shardId.id());
refreshListeners = buildRefreshListeners();
lastSearcherAccess.set(threadPool.relativeTimeInMillis());
persistMetadata(path, indexSettings, shardRouting, null, logger);
}

Expand Down Expand Up @@ -1120,6 +1124,7 @@ public Engine.Searcher acquireSearcher(String source) {

private Engine.Searcher acquireSearcher(String source, Engine.SearcherScope scope) {
readAllowed();
lastSearcherAccess.lazySet(threadPool.relativeTimeInMillis());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I spent some cycles reading this and convincing my self that we can do this regardless of the searcher scope - i.e., that if we end up here with an internal searcher it will be OK (it is!). I would personally think it will be simpler to follow if we mark access in awaitPendingRefresh. That would mean all flow /state management is around the engine and you don't have to go through engine code to see how things work. This is of course subjective, so just a suggestion.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we have to do this in here otherwise we can't make a decision if a shard is idle or not. sorry your comment doesn't make sense with respect to the engine. the engine is not involved here

final Engine engine = getEngine();
final Engine.Searcher searcher = engine.acquireSearcher(source, scope);
boolean success = false;
Expand Down Expand Up @@ -2418,15 +2423,69 @@ EngineFactory getEngineFactory() {
return engineFactory;
}

/**
* Executes a scheduled refresh if necessary
*/
public boolean scheduledRefresh() {
if (isReadAllowed() && isRefreshNeeded()) {
if (refreshListeners.refreshNeeded() == false // if we have a listener that is waiting for a refresh we need to force it
&& isSearchIdle() && indexSettings.isExplicitRefresh() == false) {
// lets skip this refresh since we are search idle and
// don't necessarily need to refresh. the next search execute cause a
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: truncated comment

setRefreshPending();
return false;
} else {
refresh("schedule");
return true;
}
}
return false;
}

/**
* Returns <code>true</code> iff one or more changes to the engine are not visible to via the current searcher *or* there are pending
* refresh listeners.
* Otherwise <code>false</code>.
*
* @throws AlreadyClosedException if the engine or internal indexwriter in the engine is already closed
*/
public boolean isRefreshNeeded() {
return getEngine().refreshNeeded() || (refreshListeners != null && refreshListeners.refreshNeeded());
final boolean isRefreshNeeded() {
return refreshListeners.refreshNeeded() // lets check the cheaper one first
|| getEngine().refreshNeeded();
}

/**
* Returns true if this shards is search idle
*/
final boolean isSearchIdle() {
return (threadPool.relativeTimeInMillis() - lastSearcherAccess.get()) >= indexSettings.getSearchIdleAfter().getMillis();
}

private void setRefreshPending() {
Engine engine = getEngine();
if (isSearchIdle()) {
acquireSearcher("setRefreshPending").close(); // move the shard into non-search idle
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's a bug I removed it and added a test

}
Translog.Location lastWriteLocation = engine.getTranslog().getLastWriteLocation();
Translog.Location location;
do {
location = this.pendingRefreshLocation.get();
if (location != null && lastWriteLocation.compareTo(location) <= 0) {
break;
}
} while (pendingRefreshLocation.compareAndSet(location, lastWriteLocation) == false);
}

public void awaitPendingRefresh(Consumer<Boolean> listener) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add javadocs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe call this awaitSearchActive (or markSearchActive if my other suggestion is accepted to move setting the timer here) ? pending refresh is an internal implementation detail..

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I disagree actually, we are waiting for the pending refresh so I think it's a good name?

final Translog.Location location = pendingRefreshLocation.get();
if (location != null) {
addRefreshListener(location, (b) -> {
pendingRefreshLocation.compareAndSet(location, null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we move this part to the setRefreshPending method? this will come at the expense of a dedicated listener but all the code that changes pendingRefreshLocation will be in one place making it easier figure out.

listener.accept(true);
});
} else {
listener.accept(false);
}
}

/**
Expand Down
Loading