Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
bbeda3c
Implement adaptive replica selection
dakrone Jun 29, 2017
adc6f43
Randomly use adaptive replica selection for internal test cluster
dakrone Aug 10, 2017
62fe599
Use an action name *prefix* for retrieving pending requests
dakrone Aug 11, 2017
34d38e4
Add unit test for replica selection
dakrone Aug 14, 2017
e16754d
don't use adaptive replica selection in SearchPreferenceIT
dakrone Aug 14, 2017
c7122f8
Track client connections in a SearchTransportService instead of Trans…
dakrone Aug 21, 2017
2ba499e
Bind `entry` pieces in local variables
dakrone Aug 22, 2017
4f60243
Add javadoc link to C3 paper and javadocs for stat adjustments
dakrone Aug 22, 2017
b3dc3d9
Bind entry's key and value to local variables
dakrone Aug 22, 2017
205d78f
Merge remote-tracking branch 'origin/master' into adaptive-replica-se…
dakrone Aug 22, 2017
a41c75a
Remove unneeded actionNamePrefix parameter
dakrone Aug 22, 2017
546f5fb
Use conns.longValue() instead of cached Long
dakrone Aug 22, 2017
91f7a12
Add comments about removing entries from the map
dakrone Aug 22, 2017
3ddc0ac
Pull out bindings for `entry` in IndexShardRoutingTable
dakrone Aug 22, 2017
a451763
Use .compareTo instead of manually comparing
dakrone Aug 22, 2017
8674249
add assert for connections not being null and gte to 1
dakrone Aug 23, 2017
fc59d53
Copy map for pending search connections instead of "live" map
dakrone Aug 23, 2017
1beca3b
Merge remote-tracking branch 'origin/master' into adaptive-replica-se…
dakrone Aug 28, 2017
e945a5d
Increase the number of pending search requests used for calculating r…
dakrone Aug 29, 2017
6df44f4
Remove unused HashMap import
dakrone Aug 30, 2017
62747bf
Rename rank -> rankShardsAndUpdateStats
dakrone Aug 30, 2017
e289581
Rename rankedActiveInitializingShardsIt -> activeInitializingShardsRa…
dakrone Aug 30, 2017
dcae338
Instead of precalculating winning node, use "winning" shard from rank…
dakrone Aug 30, 2017
3d1dd2b
Sort null ranked nodes before nodes that have a rank
dakrone Aug 30, 2017
9c55c64
Merge remote-tracking branch 'origin/master' into adaptive-replica-se…
dakrone Aug 31, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public void onResponse(SearchPhaseResult response) {
final int queueSize = queryResult.nodeQueueSize();
final long responseDuration = System.nanoTime() - startNanos;
// EWMA/queue size may be -1 if the query node doesn't support capturing it
if (serviceTimeEWMA > 0 && queueSize > 0) {
if (serviceTimeEWMA > 0 && queueSize >= 0) {
collector.addNodeStatistics(nodeId, queueSize, responseDuration, serviceTimeEWMA);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.dfs.DfsSearchResult;
Expand All @@ -50,13 +51,17 @@
import org.elasticsearch.transport.TransportActionProxy;
import org.elasticsearch.transport.TaskAwareTransportRequestHandler;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.function.BiFunction;
import java.util.function.Supplier;

Expand All @@ -80,6 +85,7 @@ public class SearchTransportService extends AbstractComponent {

private final TransportService transportService;
private final BiFunction<Transport.Connection, SearchActionListener, ActionListener> responseWrapper;
private final Map<String, Long> clientConnections = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency();

public SearchTransportService(Settings settings, TransportService transportService,
BiFunction<Transport.Connection, SearchActionListener, ActionListener> responseWrapper) {
Expand Down Expand Up @@ -131,7 +137,7 @@ public void sendClearAllScrollContexts(Transport.Connection connection, final Ac
public void sendExecuteDfs(Transport.Connection connection, final ShardSearchTransportRequest request, SearchTask task,
final SearchActionListener<DfsSearchResult> listener) {
transportService.sendChildRequest(connection, DFS_ACTION_NAME, request, task,
new ActionListenerResponseHandler<>(listener, DfsSearchResult::new));
new ConnectionCountingHandler<>(listener, DfsSearchResult::new, clientConnections, connection.getNode().getId()));
}

public void sendExecuteQuery(Transport.Connection connection, final ShardSearchTransportRequest request, SearchTask task,
Expand All @@ -143,25 +149,26 @@ public void sendExecuteQuery(Transport.Connection connection, final ShardSearchT

final ActionListener handler = responseWrapper.apply(connection, listener);
transportService.sendChildRequest(connection, QUERY_ACTION_NAME, request, task,
new ActionListenerResponseHandler<>(handler, supplier));
new ConnectionCountingHandler<>(handler, supplier, clientConnections, connection.getNode().getId()));
}

public void sendExecuteQuery(Transport.Connection connection, final QuerySearchRequest request, SearchTask task,
final SearchActionListener<QuerySearchResult> listener) {
transportService.sendChildRequest(connection, QUERY_ID_ACTION_NAME, request, task,
new ActionListenerResponseHandler<>(listener, QuerySearchResult::new));
new ConnectionCountingHandler<>(listener, QuerySearchResult::new, clientConnections, connection.getNode().getId()));
}

public void sendExecuteScrollQuery(Transport.Connection connection, final InternalScrollSearchRequest request, SearchTask task,
final SearchActionListener<ScrollQuerySearchResult> listener) {
transportService.sendChildRequest(connection, QUERY_SCROLL_ACTION_NAME, request, task,
new ActionListenerResponseHandler<>(listener, ScrollQuerySearchResult::new));
new ConnectionCountingHandler<>(listener, ScrollQuerySearchResult::new, clientConnections, connection.getNode().getId()));
}

public void sendExecuteScrollFetch(Transport.Connection connection, final InternalScrollSearchRequest request, SearchTask task,
final SearchActionListener<ScrollQueryFetchSearchResult> listener) {
transportService.sendChildRequest(connection, QUERY_FETCH_SCROLL_ACTION_NAME, request, task,
new ActionListenerResponseHandler<>(listener, ScrollQueryFetchSearchResult::new));
new ConnectionCountingHandler<>(listener, ScrollQueryFetchSearchResult::new,
clientConnections, connection.getNode().getId()));
}

public void sendExecuteFetch(Transport.Connection connection, final ShardFetchSearchRequest request, SearchTask task,
Expand All @@ -177,22 +184,31 @@ public void sendExecuteFetchScroll(Transport.Connection connection, final ShardF
private void sendExecuteFetch(Transport.Connection connection, String action, final ShardFetchRequest request, SearchTask task,
final SearchActionListener<FetchSearchResult> listener) {
transportService.sendChildRequest(connection, action, request, task,
new ActionListenerResponseHandler<>(listener, FetchSearchResult::new));
new ConnectionCountingHandler<>(listener, FetchSearchResult::new, clientConnections, connection.getNode().getId()));
}

/**
* Used by {@link TransportSearchAction} to send the expand queries (field collapsing).
*/
void sendExecuteMultiSearch(final MultiSearchRequest request, SearchTask task,
final ActionListener<MultiSearchResponse> listener) {
transportService.sendChildRequest(transportService.getConnection(transportService.getLocalNode()), MultiSearchAction.NAME, request,
task, new ActionListenerResponseHandler<>(listener, MultiSearchResponse::new));
final ActionListener<MultiSearchResponse> listener) {
final Transport.Connection connection = transportService.getConnection(transportService.getLocalNode());
transportService.sendChildRequest(connection, MultiSearchAction.NAME, request, task,
new ConnectionCountingHandler<>(listener, MultiSearchResponse::new, clientConnections, connection.getNode().getId()));
}

public RemoteClusterService getRemoteClusterService() {
return transportService.getRemoteClusterService();
}

/**
* Return a map of nodeId to pending number of search requests.
* This is a snapshot of the current pending search and not a live map.
*/
public Map<String, Long> getPendingSearchRequests() {
return new HashMap<>(clientConnections);
}

static class ScrollFreeContextRequest extends TransportRequest {
private long id;

Expand Down Expand Up @@ -486,4 +502,47 @@ Transport.Connection getConnection(String clusterAlias, DiscoveryNode node) {
return transportService.getRemoteClusterService().getConnection(node, clusterAlias);
}
}

final class ConnectionCountingHandler<Response extends TransportResponse> extends ActionListenerResponseHandler<Response> {
private final Map<String, Long> clientConnections;
private final String nodeId;

ConnectionCountingHandler(final ActionListener<? super Response> listener, final Supplier<Response> responseSupplier,
final Map<String, Long> clientConnections, final String nodeId) {
super(listener, responseSupplier);
this.clientConnections = clientConnections;
this.nodeId = nodeId;
// Increment the number of connections for this node by one
clientConnections.compute(nodeId, (id, conns) -> conns == null ? 1 : conns + 1);
}

@Override
public void handleResponse(Response response) {
super.handleResponse(response);
// Decrement the number of connections or remove it entirely if there are no more connections
// We need to remove the entry here so we don't leak when nodes go away forever
assert assertNodePresent();
clientConnections.computeIfPresent(nodeId, (id, conns) -> conns.longValue() == 1 ? null : conns - 1);
}

@Override
public void handleException(TransportException e) {
super.handleException(e);
// Decrement the number of connections or remove it entirely if there are no more connections
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe mention that it is important to remove entries that have a value of zero to avoid memory leaks

// We need to remove the entry here so we don't leak when nodes go away forever
assert assertNodePresent();
clientConnections.computeIfPresent(nodeId, (id, conns) -> conns.longValue() == 1 ? null : conns - 1);
}

private boolean assertNodePresent() {
clientConnections.compute(nodeId, (id, conns) -> {
assert conns != null : "number of connections for " + id + " is null, but should be an integer";
assert conns >= 1 : "number of connections for " + id + " should be >= 1 but was " + conns;
return conns;
});
// Always return true, there is additional asserting here, the boolean is just so this
// can be skipped when assertions are not enabled
return true;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -284,8 +284,9 @@ private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, Sea
for (int i = 0; i < indices.length; i++) {
concreteIndices[i] = indices[i].getName();
}
Map<String, Long> nodeSearchCounts = searchTransportService.getPendingSearchRequests();
GroupShardsIterator<ShardIterator> localShardsIterator = clusterService.operationRouting().searchShards(clusterState,
concreteIndices, routingMap, searchRequest.preference());
concreteIndices, routingMap, searchRequest.preference(), searchService.getResponseCollectorService(), nodeSearchCounts);
GroupShardsIterator<SearchShardIterator> shardIterators = mergeShardsIterators(localShardsIterator, localIndices,
remoteShardIterators);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,24 @@
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.node.ResponseCollectorService;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalDouble;
import java.util.Set;
import java.util.stream.Collectors;

import static java.util.Collections.emptyMap;

Expand Down Expand Up @@ -261,6 +267,165 @@ public ShardIterator activeInitializingShardsIt(int seed) {
return new PlainShardIterator(shardId, ordered);
}

/**
* Returns an iterator over active and initializing shards, ordered by the adaptive replica
* selection forumla. Making sure though that its random within the active shards of the same
* (or missing) rank, and initializing shards are the last to iterate through.
*/
public ShardIterator activeInitializingShardsRankedIt(@Nullable ResponseCollectorService collector,
@Nullable Map<String, Long> nodeSearchCounts) {
final int seed = shuffler.nextSeed();
if (allInitializingShards.isEmpty()) {
return new PlainShardIterator(shardId,
rankShardsAndUpdateStats(shuffler.shuffle(activeShards, seed), collector, nodeSearchCounts));
}

ArrayList<ShardRouting> ordered = new ArrayList<>(activeShards.size() + allInitializingShards.size());
List<ShardRouting> rankedActiveShards =
rankShardsAndUpdateStats(shuffler.shuffle(activeShards, seed), collector, nodeSearchCounts);
ordered.addAll(rankedActiveShards);
List<ShardRouting> rankedInitializingShards =
rankShardsAndUpdateStats(allInitializingShards, collector, nodeSearchCounts);
ordered.addAll(rankedInitializingShards);
return new PlainShardIterator(shardId, ordered);
}

private static Set<String> getAllNodeIds(final List<ShardRouting> shards) {
final Set<String> nodeIds = new HashSet<>();
for (ShardRouting shard : shards) {
nodeIds.add(shard.currentNodeId());
}
return nodeIds;
}

private static Map<String, Optional<ResponseCollectorService.ComputedNodeStats>>
getNodeStats(final Set<String> nodeIds, final ResponseCollectorService collector) {

final Map<String, Optional<ResponseCollectorService.ComputedNodeStats>> nodeStats = new HashMap<>(nodeIds.size());
for (String nodeId : nodeIds) {
nodeStats.put(nodeId, collector.getNodeStatistics(nodeId));
}
return nodeStats;
}

private static Map<String, Double> rankNodes(final Map<String, Optional<ResponseCollectorService.ComputedNodeStats>> nodeStats,
final Map<String, Long> nodeSearchCounts) {
final Map<String, Double> nodeRanks = new HashMap<>(nodeStats.size());
for (Map.Entry<String, Optional<ResponseCollectorService.ComputedNodeStats>> entry : nodeStats.entrySet()) {
Optional<ResponseCollectorService.ComputedNodeStats> maybeStats = entry.getValue();
maybeStats.ifPresent(stats -> {
final String nodeId = entry.getKey();
nodeRanks.put(nodeId, stats.rank(nodeSearchCounts.getOrDefault(nodeId, 1L)));
});
}
return nodeRanks;
}

/**
* Adjust the for all other nodes' collected stats. In the original ranking paper there is no need to adjust other nodes' stats because
* Cassandra sends occasional requests to all copies of the data, so their stats will be updated during that broadcast phase. In
* Elasticsearch, however, we do not have that sort of broadcast-to-all behavior. In order to prevent a node that gets a high score and
* then never gets any more requests, we must ensure it eventually returns to a more normal score and can be a candidate for serving
* requests.
*
* This adjustment takes the "winning" node's statistics and adds the average of those statistics with each non-winning node. Let's say
* the winning node had a queue size of 10 and a non-winning node had a queue of 18. The average queue size is (10 + 18) / 2 = 14 so the
* non-winning node will have statistics added for a queue size of 14. This is repeated for the response time and service times as well.
*/
private static void adjustStats(final ResponseCollectorService collector,
final Map<String, Optional<ResponseCollectorService.ComputedNodeStats>> nodeStats,
final String minNodeId,
final ResponseCollectorService.ComputedNodeStats minStats) {
if (minNodeId != null) {
for (Map.Entry<String, Optional<ResponseCollectorService.ComputedNodeStats>> entry : nodeStats.entrySet()) {
final String nodeId = entry.getKey();
final Optional<ResponseCollectorService.ComputedNodeStats> maybeStats = entry.getValue();
if (nodeId.equals(minNodeId) == false && maybeStats.isPresent()) {
final ResponseCollectorService.ComputedNodeStats stats = maybeStats.get();
final int updatedQueue = (minStats.queueSize + stats.queueSize) / 2;
final long updatedResponse = (long) (minStats.responseTime + stats.responseTime) / 2;
final long updatedService = (long) (minStats.serviceTime + stats.serviceTime) / 2;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: the casts should not be necessary?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are required, without them you get error: incompatible types: possible lossy conversion from double to long

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, I had not realized we stored those times as doubles

collector.addNodeStatistics(nodeId, updatedQueue, updatedResponse, updatedService);
}
}
}
}

private static List<ShardRouting> rankShardsAndUpdateStats(List<ShardRouting> shards, final ResponseCollectorService collector,
final Map<String, Long> nodeSearchCounts) {
if (collector == null || nodeSearchCounts == null || shards.size() <= 1) {
return shards;
}

// Retrieve which nodes we can potentially send the query to
final Set<String> nodeIds = getAllNodeIds(shards);
final int nodeCount = nodeIds.size();

final Map<String, Optional<ResponseCollectorService.ComputedNodeStats>> nodeStats = getNodeStats(nodeIds, collector);

// Retrieve all the nodes the shards exist on
final Map<String, Double> nodeRanks = rankNodes(nodeStats, nodeSearchCounts);

// sort all shards based on the shard rank
ArrayList<ShardRouting> sortedShards = new ArrayList<>(shards);
Collections.sort(sortedShards, new NodeRankComparator(nodeRanks));

// adjust the non-winner nodes' stats so they will get a chance to receive queries
if (sortedShards.size() > 1) {
ShardRouting minShard = sortedShards.get(0);
// If the winning shard is not started we are ranking initializing
// shards, don't bother to do adjustments
if (minShard.started()) {
String minNodeId = minShard.currentNodeId();
Optional<ResponseCollectorService.ComputedNodeStats> maybeMinStats = nodeStats.get(minNodeId);
if (maybeMinStats.isPresent()) {
adjustStats(collector, nodeStats, minNodeId, maybeMinStats.get());
// Increase the number of searches for the "winning" node by one.
// Note that this doesn't actually affect the "real" counts, instead
// it only affects the captured node search counts, which is
// captured once for each query in TransportSearchAction
nodeSearchCounts.compute(minNodeId, (id, conns) -> conns == null ? 1 : conns + 1);
}
}
}

return sortedShards;
}

private static class NodeRankComparator implements Comparator<ShardRouting> {
private final Map<String, Double> nodeRanks;

NodeRankComparator(Map<String, Double> nodeRanks) {
this.nodeRanks = nodeRanks;
}

@Override
public int compare(ShardRouting s1, ShardRouting s2) {
if (s1.currentNodeId().equals(s2.currentNodeId())) {
// these shards on the the same node
return 0;
}
Double shard1rank = nodeRanks.get(s1.currentNodeId());
Double shard2rank = nodeRanks.get(s2.currentNodeId());
if (shard1rank != null) {
if (shard2rank != null) {
return shard1rank.compareTo(shard2rank);
} else {
// place non-nulls after null values
return 1;
}
} else {
if (shard2rank != null) {
// place nulls before non-null values
return -1;
} else {
// Both nodes do not have stats, they are equal
return 0;
}
}
}
}

/**
* Returns true if no primaries are active or initializing for this shard
*/
Expand Down
Loading