Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce point in time APIs in x-pack basic #61062

Merged
merged 19 commits into from
Aug 25, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 116 additions & 0 deletions docs/reference/search/point-in-time.asciidoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
[role="xpack"]
[testenv="basic"]
[[point-in-time]]
==== Point in time

A search request by default executes against the most recent visible data of
the target indices, which is called point in time. Elasticsearch pit (point in time)
is a lightweight view into the state of the data as it existed when initiated.
In some cases, it's preferred to perform multiple search requests using
the same point in time. For example, if <<indices-refresh,refreshes>> happen between
search_after requests, then the results of those requests might not be consistent as
changes happening between searches are only visible to the more recent point in time.

A point in time must be opened explicitly before being used in search requests. The
keep_alive parameter tells Elasticsearch how long it should keep a point in time alive,
e.g. `?keep_alive=5m`.

[source,console]
--------------------------------------------------
POST /my-index-000001/_pit?keep_alive=1m
--------------------------------------------------
// TEST[setup:my_index]

The result from the above request includes a `id`, which should
be passed to the `id` of the `pit` parameter of a search request.

[source,console]
--------------------------------------------------
POST /_search <1>
{
"size": 100,
"query": {
"match" : {
"title" : "elasticsearch"
}
},
"pit": {
"id": "46ToAwMDaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQNpZHkFdXVpZDIrBm5vZGVfMwAAAAAAAAAAKgFjA2lkeQV1dWlkMioGbm9kZV8yAAAAAAAAAAAMAWICBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA==", <2>
"keep_alive": "1m" <3>
}
}
--------------------------------------------------
// TEST[catch:missing]

<1> A search request with the `pit` parameter must not specify `index`, `routing`,
and {ref}/search-request-body.html#request-body-search-preference[`preference`]
as these parameters are copied from the point in time.
<2> The `id` parameter tells Elasticsearch to execute the request using contexts
from this point int time.
<3> The `keep_alive` parameter tells Elasticsearch how long it should extend
the time to live of the point in time.

IMPORTANT: The open point in time request and each subsequent search request can
return different `id`; thus always use the most recently received `id` for the
next search request.

[[point-in-time-keep-alive]]
===== Keeping point in time alive
The `keep_alive` parameter, which is passed to a open point in time request and
search request, extends the time to live of the corresponding point in time.
The value (e.g. `1m`, see <<time-units>>) does not need to be long enough to
process all data -- it just needs to be long enough for the next request.

Normally, the background merge process optimizes the index by merging together
smaller segments to create new, bigger segments. Once the smaller segments are
no longer needed they are deleted. However, open point-in-times prevent the
old segments from being deleted since they are still in use.

TIP: Keeping older segments alive means that more disk space and file handles
are needed. Ensure that you have configured your nodes to have ample free file
handles. See <<file-descriptors>>.

Additionally, if a segment contains deleted or updated documents then the
point in time must keep track of whether each document in the segment was live at
the time of the initial search request. Ensure that your nodes have sufficient heap
space if you have many open point-in-times on an index that is subject to ongoing
deletes or updates.

You can check how many point-in-times (i.e, search contexts) are open with the
<<cluster-nodes-stats,nodes stats API>>:

[source,console]
---------------------------------------
GET /_nodes/stats/indices/search
---------------------------------------

===== Close point in time API

Point-in-time is automatically closed when its `keep_alive` has
been elapsed. However keeping point-in-times has a cost, as discussed in the
<<point-in-time-keep-alive,previous section>>. Point-in-times should be closed
as soon as they are no longer used in search requests.

[source,console]
---------------------------------------
DELETE /_pit
{
"id" : "46ToAwMDaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQNpZHkFdXVpZDIrBm5vZGVfMwAAAAAAAAAAKgFjA2lkeQV1dWlkMioGbm9kZV8yAAAAAAAAAAAMAWIBBXV1aWQyAAA="
}
---------------------------------------
// TEST[catch:missing]

The API returns the following response:

[source,console-result]
--------------------------------------------------
{
"succeeded": true, <1>
"num_freed": 3 <2>
}
--------------------------------------------------
// TESTRESPONSE[s/"succeeded": true/"succeeded": $body.succeeded/]
// TESTRESPONSE[s/"num_freed": 3/"num_freed": $body.num_freed/]

<1> If true, all search contexts associated with the point-in-time id are successfully closed
<2> The number of search contexts have been successfully closed
Original file line number Diff line number Diff line change
Expand Up @@ -152,12 +152,8 @@ public TopDocsAndMaxScore topDocs(SearchHit hit) throws IOException {
topDocsCollector = TopScoreDocCollector.create(topN, Integer.MAX_VALUE);
maxScoreCollector = new MaxScoreCollector();
}
try {
for (LeafReaderContext ctx : context.searcher().getIndexReader().leaves()) {
intersect(weight, innerHitQueryWeight, MultiCollector.wrap(topDocsCollector, maxScoreCollector), ctx);
}
} finally {
clearReleasables(Lifetime.COLLECTION);
for (LeafReaderContext ctx : context.searcher().getIndexReader().leaves()) {
intersect(weight, innerHitQueryWeight, MultiCollector.wrap(topDocsCollector, maxScoreCollector), ctx);
}
TopDocs topDocs = topDocsCollector.topDocs(from(), size());
float maxScore = Float.NaN;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
search.max_keep_alive: "1m"

- do:
catch: /.*Keep alive for scroll.*is too large.*/
catch: /.*Keep alive for.*is too large.*/
search:
rest_total_hits_as_int: true
index: test_scroll
Expand All @@ -61,7 +61,7 @@
- length: {hits.hits: 1 }

- do:
catch: /.*Keep alive for scroll.*is too large.*/
catch: /.*Keep alive for.*is too large.*/
scroll:
rest_total_hits_as_int: true
scroll_id: $scroll_id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -602,7 +602,7 @@ public void testInvalidScrollKeepAlive() throws IOException {
IllegalArgumentException illegalArgumentException =
(IllegalArgumentException) ExceptionsHelper.unwrap(exc, IllegalArgumentException.class);
assertNotNull(illegalArgumentException);
assertThat(illegalArgumentException.getMessage(), containsString("Keep alive for scroll (2h) is too large"));
assertThat(illegalArgumentException.getMessage(), containsString("Keep alive for request (2h) is too large"));

SearchResponse searchResponse = client().prepareSearch()
.setQuery(matchAllQuery())
Expand All @@ -619,7 +619,7 @@ public void testInvalidScrollKeepAlive() throws IOException {
illegalArgumentException =
(IllegalArgumentException) ExceptionsHelper.unwrap(exc, IllegalArgumentException.class);
assertNotNull(illegalArgumentException);
assertThat(illegalArgumentException.getMessage(), containsString("Keep alive for scroll (3h) is too large"));
assertThat(illegalArgumentException.getMessage(), containsString("Keep alive for request (3h) is too large"));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.NoShardAvailableActionException;
import org.elasticsearch.action.ShardOperationFailedException;
Expand Down Expand Up @@ -163,7 +164,7 @@ public final void start() {
// total hits is null in the response if the tracking of total hits is disabled
boolean withTotalHits = trackTotalHitsUpTo != SearchContext.TRACK_TOTAL_HITS_DISABLED;
listener.onResponse(new SearchResponse(InternalSearchResponse.empty(withTotalHits), null, 0, 0, 0, buildTookInMillis(),
ShardSearchFailure.EMPTY_ARRAY, clusters));
ShardSearchFailure.EMPTY_ARRAY, clusters, null));
return;
}
executePhase(this);
Expand Down Expand Up @@ -514,22 +515,29 @@ public final SearchRequest getRequest() {
return request;
}

protected final SearchResponse buildSearchResponse(InternalSearchResponse internalSearchResponse,
String scrollId,
ShardSearchFailure[] failures) {
protected final SearchResponse buildSearchResponse(InternalSearchResponse internalSearchResponse, ShardSearchFailure[] failures,
String scrollId, String searchContextId) {
return new SearchResponse(internalSearchResponse, scrollId, getNumShards(), successfulOps.get(),
skippedOps.get(), buildTookInMillis(), failures, clusters);
skippedOps.get(), buildTookInMillis(), failures, clusters, searchContextId);
}

boolean includeSearchContextInResponse() {
return request.pointInTimeBuilder() != null;
}

@Override
public void sendSearchResponse(InternalSearchResponse internalSearchResponse, String scrollId) {
public void sendSearchResponse(InternalSearchResponse internalSearchResponse, AtomicArray<SearchPhaseResult> queryResults) {
ShardSearchFailure[] failures = buildShardFailures();
Boolean allowPartialResults = request.allowPartialSearchResults();
assert allowPartialResults != null : "SearchRequest missing setting for allowPartialSearchResults";
if (allowPartialResults == false && failures.length > 0){
if (request.pointInTimeBuilder() == null && allowPartialResults == false && failures.length > 0) {
raisePhaseFailure(new SearchPhaseExecutionException("", "Shard failures", null, failures));
} else {
listener.onResponse(buildSearchResponse(internalSearchResponse, scrollId, failures));
final Version minNodeVersion = clusterState.nodes().getMinNodeVersion();
final String scrollId = request.scroll() != null ? TransportSearchHelper.buildScrollId(queryResults, minNodeVersion) : null;
final String searchContextId =
includeSearchContextInResponse() ? SearchContextId.encode(queryResults.asList(), aliasFilter, minNodeVersion) : null;
listener.onResponse(buildSearchResponse(internalSearchResponse, failures, scrollId, searchContextId));
}
}

Expand Down Expand Up @@ -598,12 +606,13 @@ public final ShardSearchRequest buildShardSearchRequest(SearchShardIterator shar
final String[] routings = indexRoutings.getOrDefault(indexName, Collections.emptySet())
.toArray(new String[0]);
ShardSearchRequest shardRequest = new ShardSearchRequest(shardIt.getOriginalIndices(), request, shardIt.shardId(), getNumShards(),
filter, indexBoost, timeProvider.getAbsoluteStartMillis(), shardIt.getClusterAlias(), routings);
filter, indexBoost, timeProvider.getAbsoluteStartMillis(), shardIt.getClusterAlias(), routings,
shardIt.getSearchContextId(), shardIt.getSearchContextKeepAlive());
// if we already received a search result we can inform the shard that it
// can return a null response if the request rewrites to match none rather
// than creating an empty response in the search thread pool.
// Note that, we have to disable this shortcut for scroll queries.
shardRequest.canReturnNullResponseIfMatchNoDocs(hasShardResponse.get() && request.scroll() == null);
// Note that, we have to disable this shortcut for queries that create a context (scroll and search context).
shardRequest.canReturnNullResponseIfMatchNoDocs(hasShardResponse.get() && shardRequest.scroll() == null);
return shardRequest;
}

Expand Down Expand Up @@ -673,8 +682,4 @@ private synchronized Runnable tryQueue(Runnable runnable) {
return toExecute;
}
}

protected ClusterState clusterState() {
return clusterState;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,28 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.StepListener;
import org.elasticsearch.action.support.GroupedActionListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportResponse;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.stream.Collectors;

import static org.elasticsearch.action.search.TransportSearchHelper.parseScrollId;

final class ClearScrollController implements Runnable {
public final class ClearScrollController implements Runnable {
private final DiscoveryNodes nodes;
private final SearchTransportService searchTransportService;
private final CountDown expectedOps;
Expand All @@ -56,19 +64,18 @@ final class ClearScrollController implements Runnable {
expectedOps = nodes.getSize();
runner = this::cleanAllScrolls;
} else {
List<ScrollIdForNode> parsedScrollIds = new ArrayList<>();
for (String parsedScrollId : request.getScrollIds()) {
ScrollIdForNode[] context = parseScrollId(parsedScrollId).getContext();
for (ScrollIdForNode id : context) {
parsedScrollIds.add(id);
}
// TODO: replace this with #closeContexts
List<SearchContextIdForNode> contexts = new ArrayList<>();
for (String scrollId : request.getScrollIds()) {
SearchContextIdForNode[] context = parseScrollId(scrollId).getContext();
Collections.addAll(contexts, context);
}
if (parsedScrollIds.isEmpty()) {
if (contexts.isEmpty()) {
expectedOps = 0;
runner = () -> listener.onResponse(new ClearScrollResponse(true, 0));
} else {
expectedOps = parsedScrollIds.size();
runner = () -> cleanScrollIds(parsedScrollIds);
expectedOps = contexts.size();
runner = () -> cleanScrollIds(contexts);
}
}
this.expectedOps = new CountDown(expectedOps);
Expand Down Expand Up @@ -101,17 +108,17 @@ public void onFailure(Exception e) {
}
}

void cleanScrollIds(List<ScrollIdForNode> parsedScrollIds) {
SearchScrollAsyncAction.collectNodesAndRun(parsedScrollIds, nodes, searchTransportService, ActionListener.wrap(
void cleanScrollIds(List<SearchContextIdForNode> contextIds) {
SearchScrollAsyncAction.collectNodesAndRun(contextIds, nodes, searchTransportService, ActionListener.wrap(
lookup -> {
for (ScrollIdForNode target : parsedScrollIds) {
for (SearchContextIdForNode target : contextIds) {
final DiscoveryNode node = lookup.apply(target.getClusterAlias(), target.getNode());
if (node == null) {
onFreedContext(false);
} else {
try {
Transport.Connection connection = searchTransportService.getConnection(target.getClusterAlias(), node);
searchTransportService.sendFreeContext(connection, target.getContextId(),
searchTransportService.sendFreeContext(connection, target.getSearchContextId(),
ActionListener.wrap(freed -> onFreedContext(freed.isFreed()), e -> onFailedFreedContext(e, node)));
} catch (Exception e) {
onFailedFreedContext(e, node);
Expand Down Expand Up @@ -142,4 +149,45 @@ private void onFailedFreedContext(Throwable e, DiscoveryNode node) {
listener.onResponse(new ClearScrollResponse(false, freedSearchContexts.get()));
}
}

/**
* Closes the given context id and reports the number of freed contexts via the listener
*/
public static void closeContexts(DiscoveryNodes nodes, SearchTransportService searchTransportService,
Collection<SearchContextIdForNode> contextIds,
ActionListener<Integer> listener) {
if (contextIds.isEmpty()) {
listener.onResponse(0);
return;
}
final Set<String> clusters = contextIds.stream()
.filter(ctx -> Strings.isEmpty(ctx.getClusterAlias()) == false)
.map(SearchContextIdForNode::getClusterAlias).collect(Collectors.toSet());
final StepListener<BiFunction<String, String, DiscoveryNode>> lookupListener = new StepListener<>();
if (clusters.isEmpty() == false) {
searchTransportService.getRemoteClusterService().collectNodes(clusters, lookupListener);
} else {
lookupListener.onResponse((cluster, nodeId) -> nodes.get(nodeId));
}
lookupListener.whenComplete(nodeLookup -> {
final GroupedActionListener<Boolean> groupedListener = new GroupedActionListener<>(
ActionListener.delegateFailure(listener, (l, rs) -> l.onResponse(Math.toIntExact(rs.stream().filter(r -> r).count()))),
contextIds.size()
);
for (SearchContextIdForNode contextId : contextIds) {
final DiscoveryNode node = nodeLookup.apply(contextId.getClusterAlias(), contextId.getNode());
if (node == null) {
groupedListener.onResponse(false);
} else {
try {
final Transport.Connection connection = searchTransportService.getConnection(contextId.getClusterAlias(), node);
searchTransportService.sendFreeContext(connection, contextId.getSearchContextId(),
ActionListener.wrap(r -> groupedListener.onResponse(r.isFreed()), e -> groupedListener.onResponse(false)));
} catch (Exception e) {
groupedListener.onResponse(false);
}
}
}
}, listener::onFailure);
}
}
Loading