Skip to content

Commit

Permalink
override indices of search request if point-in-time specified
Browse files Browse the repository at this point in the history
  • Loading branch information
dnhatn committed Aug 20, 2020
1 parent 2ba2113 commit 8a7eb6a
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.action.search;

import org.elasticsearch.Version;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.ByteBufferStreamInput;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
Expand All @@ -31,14 +32,17 @@
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.transport.RemoteClusterAware;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

public class SearchContextId {
private final Map<ShardId, SearchContextIdForNode> shards;
Expand Down Expand Up @@ -95,4 +99,18 @@ public static SearchContextId decode(NamedWriteableRegistry namedWriteableRegist
throw new IllegalArgumentException(e);
}
}

public String[] getActualIndices() {
final Set<String> indices = new HashSet<>();
for (Map.Entry<ShardId, SearchContextIdForNode> entry : shards().entrySet()) {
final String indexName = entry.getKey().getIndexName();
final String clusterAlias = entry.getValue().getClusterAlias();
if (Strings.isEmpty(clusterAlias)) {
indices.add(indexName);
} else {
indices.add(clusterAlias + RemoteClusterAware.REMOTE_CLUSTER_INDEX_SEPARATOR + indexName);
}
}
return indices.toArray(String[]::new);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@
package org.elasticsearch.rest.action.search;

import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchContextId;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.rest.BaseRestHandler;
Expand Down Expand Up @@ -98,8 +100,12 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
* company.
*/
IntConsumer setSize = size -> searchRequest.source().size(size);
request.withContentOrSourceParamParserOrNull(parser ->
parseSearchRequest(searchRequest, request, parser, setSize));
request.withContentOrSourceParamParserOrNull(parser -> {
parseSearchRequest(searchRequest, request, parser, setSize);
if (searchRequest.pointInTimeBuilder() != null) {
preparePointInTime(searchRequest, client.getNamedWriteableRegistry());
}
});

return channel -> {
RestCancellableNodeClient cancelClient = new RestCancellableNodeClient(client, request.getHttpChannel());
Expand Down Expand Up @@ -283,6 +289,17 @@ private static void parseSearchSource(final SearchSourceBuilder searchSourceBuil
}
}

static void preparePointInTime(SearchRequest request, NamedWriteableRegistry namedWriteableRegistry) {
assert request.pointInTimeBuilder() != null;
final IndicesOptions indicesOptions = request.indicesOptions();
final IndicesOptions stricterIndicesOptions = IndicesOptions.fromOptions(
indicesOptions.ignoreUnavailable(), indicesOptions.allowNoIndices(), false, false, false,
true, true, indicesOptions.ignoreThrottled());
request.indicesOptions(stricterIndicesOptions);
final SearchContextId searchContextId = SearchContextId.decode(namedWriteableRegistry, request.pointInTimeBuilder().getId());
request.indices(searchContextId.getActualIndices());
}

/**
* Modify the search request to accurately count the total hits that match the query
* if {@link #TOTAL_HITS_AS_INT_PARAM} is set.
Expand Down

0 comments on commit 8a7eb6a

Please sign in to comment.