Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Stats API: moved detector count call outside transport layer and make asynchronous #108

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ List<String> jacocoExclusions = [
'com.amazon.opendistroforelasticsearch.ad.transport.DeleteDetectorAction',
'com.amazon.opendistroforelasticsearch.ad.transport.CronTransportAction',
'com.amazon.opendistroforelasticsearch.ad.transport.CronRequest',
'com.amazon.opendistroforelasticsearch.ad.transport.ADStatsAction',
'com.amazon.opendistroforelasticsearch.ad.transport.ADStatsNodesAction',
'com.amazon.opendistroforelasticsearch.ad.AnomalyDetectorRunner',
'com.amazon.opendistroforelasticsearch.ad.indices.AnomalyDetectionIndices',
'com.amazon.opendistroforelasticsearch.ad.util.ParseUtils',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,12 @@
import com.amazon.opendistroforelasticsearch.ad.stats.ADStats;
import com.amazon.opendistroforelasticsearch.ad.stats.StatNames;
import com.amazon.opendistroforelasticsearch.ad.stats.suppliers.CounterSupplier;
import com.amazon.opendistroforelasticsearch.ad.stats.suppliers.DocumentCountSupplier;
import com.amazon.opendistroforelasticsearch.ad.stats.suppliers.IndexStatusSupplier;
import com.amazon.opendistroforelasticsearch.ad.stats.suppliers.ModelsOnNodeSupplier;
import com.amazon.opendistroforelasticsearch.ad.stats.suppliers.SettableSupplier;
import com.amazon.opendistroforelasticsearch.ad.transport.ADStateManager;
import com.amazon.opendistroforelasticsearch.ad.transport.ADStatsAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ADStatsTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ADStatsNodesAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ADStatsNodesTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.AnomalyResultAction;
import com.amazon.opendistroforelasticsearch.ad.transport.AnomalyResultTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.CronAction;
Expand Down Expand Up @@ -201,7 +201,11 @@ public List<RestHandler> getRestHandlers(
clusterService,
anomalyDetectorRunner
);
RestStatsAnomalyDetectorAction statsAnomalyDetectorAction = new RestStatsAnomalyDetectorAction(restController, adStats);
RestStatsAnomalyDetectorAction statsAnomalyDetectorAction = new RestStatsAnomalyDetectorAction(
restController,
clusterService,
adStats
);
RestAnomalyDetectorJobAction anomalyDetectorJobAction = new RestAnomalyDetectorJobAction(
settings,
restController,
Expand Down Expand Up @@ -329,10 +333,7 @@ public Collection<Object> createComponents(
StatNames.MODELS_CHECKPOINT_INDEX_STATUS.getName(),
new ADStat<>(true, new IndexStatusSupplier(indexUtils, CommonName.CHECKPOINT_INDEX_NAME))
)
.put(
StatNames.DETECTOR_COUNT.getName(),
new ADStat<>(true, new DocumentCountSupplier(indexUtils, AnomalyDetector.ANOMALY_DETECTORS_INDEX))
)
.put(StatNames.DETECTOR_COUNT.getName(), new ADStat<>(true, new SettableSupplier()))
.build();

adStats = new ADStats(indexUtils, modelManager, stats);
Expand Down Expand Up @@ -424,7 +425,7 @@ public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
new ActionHandler<>(ThresholdResultAction.INSTANCE, ThresholdResultTransportAction.class),
new ActionHandler<>(AnomalyResultAction.INSTANCE, AnomalyResultTransportAction.class),
new ActionHandler<>(CronAction.INSTANCE, CronTransportAction.class),
new ActionHandler<>(ADStatsAction.INSTANCE, ADStatsTransportAction.class)
new ActionHandler<>(ADStatsNodesAction.INSTANCE, ADStatsNodesTransportAction.class)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,31 @@

package com.amazon.opendistroforelasticsearch.ad.rest;

import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector;
import com.amazon.opendistroforelasticsearch.ad.stats.ADStats;
import com.amazon.opendistroforelasticsearch.ad.transport.ADStatsAction;
import com.amazon.opendistroforelasticsearch.ad.stats.ADStatsResponse;
import com.amazon.opendistroforelasticsearch.ad.stats.StatNames;
import com.amazon.opendistroforelasticsearch.ad.transport.ADStatsNodesAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ADStatsRequest;
import com.amazon.opendistroforelasticsearch.ad.util.MultiResponsesDelegateActionListener;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.rest.action.RestActions;
import org.elasticsearch.rest.BytesRestResponse;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;

import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;

Expand All @@ -39,18 +52,21 @@ public class RestStatsAnomalyDetectorAction extends BaseRestHandler {

private static final String STATS_ANOMALY_DETECTOR_ACTION = "stats_anomaly_detector";
private ADStats adStats;
private ClusterService clusterService;

/**
* Constructor
*
* @param controller Rest Controller
* @param adStats ADStats object
* @param clusterService ClusterService
* @param adStats ADStats object
*/
public RestStatsAnomalyDetectorAction(RestController controller, ADStats adStats) {
public RestStatsAnomalyDetectorAction(RestController controller, ClusterService clusterService, ADStats adStats) {
controller.registerHandler(RestRequest.Method.GET, AD_BASE_URI + "/{nodeId}/stats/", this);
controller.registerHandler(RestRequest.Method.GET, AD_BASE_URI + "/{nodeId}/stats/{stat}", this);
controller.registerHandler(RestRequest.Method.GET, AD_BASE_URI + "/stats/", this);
controller.registerHandler(RestRequest.Method.GET, AD_BASE_URI + "/stats/{stat}", this);
this.clusterService = clusterService;
this.adStats = adStats;
}

Expand All @@ -62,14 +78,14 @@ public String getName() {
@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) {
ADStatsRequest adStatsRequest = getRequest(request);
return channel -> client.execute(ADStatsAction.INSTANCE, adStatsRequest, new RestActions.NodesResponseRestListener<>(channel));
return channel -> getStats(client, channel, adStatsRequest);
}

/**
* Creates a ADStatsRequest from a RestRequest
*
* @param request RestRequest
* @return ADStatsRequest
* @return ADStatsRequest Request containing stats to be retrieved
*/
private ADStatsRequest getRequest(RestRequest request) {
// parse the nodes the user wants to query the stats for
Expand Down Expand Up @@ -115,4 +131,112 @@ private ADStatsRequest getRequest(RestRequest request) {
}
return adStatsRequest;
}

/**
* Make the 2 requests to get the node and cluster statistics
*
* @param client Client
* @param channel Channel to send response
* @param adStatsRequest Request containing stats to be retrieved
*/
private void getStats(Client client, RestChannel channel, ADStatsRequest adStatsRequest) {
// Use MultiResponsesDelegateActionListener to execute 2 async requests and create the response once they finish
MultiResponsesDelegateActionListener<ADStatsResponse> delegateListener = new MultiResponsesDelegateActionListener<>(
getRestStatsListener(channel),
2,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it always 2? If customers don't want to get detector count or only want to get detector count, this value should be 1.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it is always 2. We still call the listeners onResponse method even if the calls are not made. For example, here

"Unable to return AD Stats"
);

getClusterStats(client, delegateListener, adStatsRequest);
getNodeStats(client, delegateListener, adStatsRequest);
}

/**
* Make async request to get the number of detectors in AnomalyDetector.ANOMALY_DETECTORS_INDEX if necessary
* and, onResponse, gather the cluster statistics
*
* @param client Client
* @param listener MultiResponsesDelegateActionListener to be used once both requests complete
* @param adStatsRequest Request containing stats to be retrieved
*/
private void getClusterStats(
Client client,
MultiResponsesDelegateActionListener<ADStatsResponse> listener,
ADStatsRequest adStatsRequest
) {
ADStatsResponse adStatsResponse = new ADStatsResponse();
if (adStatsRequest.getStatsToBeRetrieved().contains(StatNames.DETECTOR_COUNT.getName())) {
if (clusterService.state().getRoutingTable().hasIndex(AnomalyDetector.ANOMALY_DETECTORS_INDEX)) {
IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest().docs(true);
client.execute(IndicesStatsAction.INSTANCE, indicesStatsRequest, ActionListener.wrap(indicesStatsResponse -> {
adStats
.getStat(StatNames.DETECTOR_COUNT.getName())
.setValue(indicesStatsResponse.getIndex(AnomalyDetector.ANOMALY_DETECTORS_INDEX).getPrimaries().docs.getCount());
adStatsResponse.setClusterStats(getClusterStatsMap(adStatsRequest));
listener.onResponse(adStatsResponse);
}, e -> listener.onFailure(new RuntimeException("Failed to get AD cluster stats", e))));
} else {
adStats.getStat(StatNames.DETECTOR_COUNT.getName()).setValue(0L);
adStatsResponse.setClusterStats(getClusterStatsMap(adStatsRequest));
listener.onResponse(adStatsResponse);
}
} else {
adStatsResponse.setClusterStats(getClusterStatsMap(adStatsRequest));
listener.onResponse(adStatsResponse);
}
}

/**
* Make async request to get the Anomaly Detection statistics from each node and, onResponse, set the
* ADStatsNodesResponse field of ADStatsResponse
*
* @param client Client
* @param listener MultiResponsesDelegateActionListener to be used once both requests complete
* @param adStatsRequest Request containing stats to be retrieved
*/
private void getNodeStats(
Client client,
MultiResponsesDelegateActionListener<ADStatsResponse> listener,
ADStatsRequest adStatsRequest
) {
client.execute(ADStatsNodesAction.INSTANCE, adStatsRequest, ActionListener.wrap(adStatsResponse -> {
ADStatsResponse restADStatsResponse = new ADStatsResponse();
restADStatsResponse.setADStatsNodesResponse(adStatsResponse);
listener.onResponse(restADStatsResponse);
}, listener::onFailure));
}

/**
* Collect Cluster Stats into map to be retrieved
*
* @param adStatsRequest Request containing stats to be retrieved
* @return Map containing Cluster Stats
*/
private Map<String, Object> getClusterStatsMap(ADStatsRequest adStatsRequest) {
Map<String, Object> clusterStats = new HashMap<>();
Set<String> statsToBeRetrieved = adStatsRequest.getStatsToBeRetrieved();
adStats
.getClusterStats()
.entrySet()
.stream()
.filter(s -> statsToBeRetrieved.contains(s.getKey()))
.forEach(s -> clusterStats.put(s.getKey(), s.getValue().getValue()));
return clusterStats;
}

/**
* Listener sends response once Node Stats and Cluster Stats are gathered
*
* @param channel Channel
* @return ActionListener for ADStatsResponse
*/
private ActionListener<ADStatsResponse> getRestStatsListener(RestChannel channel) {
return ActionListener
.wrap(
adStatsResponse -> {
channel.sendResponse(new BytesRestResponse(RestStatus.OK, adStatsResponse.toXContent(channel.newBuilder())));
},
exception -> channel.sendResponse(new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, exception.getMessage()))
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.amazon.opendistroforelasticsearch.ad.stats;

import com.amazon.opendistroforelasticsearch.ad.stats.suppliers.CounterSupplier;
import com.amazon.opendistroforelasticsearch.ad.stats.suppliers.SettableSupplier;

import java.util.function.Supplier;

Expand Down Expand Up @@ -55,6 +56,17 @@ public T getValue() {
return supplier.get();
}

/**
* Set the value of the statistic
*
* @param value set value
*/
public void setValue(Long value) {
if (supplier instanceof SettableSupplier) {
((SettableSupplier) supplier).set(value);
}
}

/**
* Increments the supplier if it can be incremented
*/
Expand Down
Loading