Skip to content

Commit

Permalink
[Feature/extensions] Get detector stats api (#857)
Browse files Browse the repository at this point in the history
* Modifying RestStatsAnomalyDetectorAction

Signed-off-by: Varun Jain <varunudr@amazon.com>

* Uncommenting test cases

Signed-off-by: Varun Jain <varunudr@amazon.com>

* Get Detector Stats

Signed-off-by: Varun Jain <varunudr@amazon.com>

* Removing unwanted log statements

Signed-off-by: Varun Jain <varunudr@amazon.com>

* Addressing comments

Signed-off-by: Varun Jain <varunudr@amazon.com>

* Addressing comments

Signed-off-by: Varun Jain <varunudr@amazon.com>

* Addressing Dan's comments

Signed-off-by: Varun Jain <varunudr@amazon.com>

* Addressing comments

Signed-off-by: Varun Jain <varunudr@amazon.com>

* Addressed Owais Comments

Signed-off-by: Varun Jain <varunudr@amazon.com>

* Addressing owais comments

Signed-off-by: Varun Jain <varunudr@amazon.com>

* Addressing Owais Comments

Signed-off-by: Varun Jain <varunudr@amazon.com>

---------

Signed-off-by: Varun Jain <varunudr@amazon.com>
  • Loading branch information
vibrantvarun authored Apr 18, 2023
1 parent 8b31c13 commit 45251ac
Show file tree
Hide file tree
Showing 10 changed files with 259 additions and 143 deletions.
3 changes: 2 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -771,7 +771,8 @@ List<String> jacocoExclusions = [
'org.opensearch.ad.AnomalyDetectorExtension.1',
'org.opensearch.ad.AnomalyDetectorExtension.1.1',
'org.opensearch.ad.EntityProfileRunner',
'org.opensearch.ad.caching.CacheProvider'
'org.opensearch.ad.caching.CacheProvider',
'org.opensearch.ad.util.RestHandlerUtils'
]


Expand Down
20 changes: 15 additions & 5 deletions src/main/java/org/opensearch/ad/AnomalyDetectorExtension.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.opensearch.ad.rest.RestDeleteAnomalyDetectorAction;
import org.opensearch.ad.rest.RestGetAnomalyDetectorAction;
import org.opensearch.ad.rest.RestIndexAnomalyDetectorAction;
import org.opensearch.ad.rest.RestStatsAnomalyDetectorAction;
import org.opensearch.ad.rest.RestValidateAnomalyDetectorAction;
import org.opensearch.ad.settings.AnomalyDetectorSettings;
import org.opensearch.ad.settings.EnabledSetting;
Expand All @@ -72,6 +73,8 @@
import org.opensearch.ad.transport.ADJobParameterTransportAction;
import org.opensearch.ad.transport.ADJobRunnerAction;
import org.opensearch.ad.transport.ADJobRunnerTransportAction;
import org.opensearch.ad.transport.ADStatsNodesAction;
import org.opensearch.ad.transport.ADStatsNodesTransportAction;
import org.opensearch.ad.transport.AnomalyDetectorJobAction;
import org.opensearch.ad.transport.AnomalyDetectorJobTransportAction;
import org.opensearch.ad.transport.DeleteAnomalyDetectorAction;
Expand All @@ -80,6 +83,8 @@
import org.opensearch.ad.transport.GetAnomalyDetectorTransportAction;
import org.opensearch.ad.transport.IndexAnomalyDetectorAction;
import org.opensearch.ad.transport.IndexAnomalyDetectorTransportAction;
import org.opensearch.ad.transport.StatsAnomalyDetectorAction;
import org.opensearch.ad.transport.StatsAnomalyDetectorTransportAction;
import org.opensearch.ad.transport.ValidateAnomalyDetectorAction;
import org.opensearch.ad.transport.ValidateAnomalyDetectorTransportAction;
import org.opensearch.ad.transport.handler.AnomalyIndexHandler;
Expand Down Expand Up @@ -137,6 +142,8 @@ public class AnomalyDetectorExtension extends BaseExtension implements ActionExt
private static Gson gson;
// package private for testing
GenericObjectPool<LinkedBuffer> serializeRCFBufferPool;
private ADStats adStats;
private DiscoveryNodeFilterer nodeFilter;

static {
SpecialPermission.check();
Expand All @@ -157,7 +164,8 @@ public List<ExtensionRestHandler> getExtensionRestHandlers() {
new RestValidateAnomalyDetectorAction(extensionsRunner(), restClient()),
new RestGetAnomalyDetectorAction(extensionsRunner(), restClient()),
new RestAnomalyDetectorJobAction(extensionsRunner(), restClient()),
new RestDeleteAnomalyDetectorAction(extensionsRunner(), restClient())
new RestDeleteAnomalyDetectorAction(extensionsRunner(), restClient()),
new RestStatsAnomalyDetectorAction(extensionsRunner(), restClient(), adStats, nodeFilter)
);
}

Expand All @@ -178,8 +186,8 @@ public Collection<Object> createComponents(ExtensionsRunner runner) {

Throttler throttler = new Throttler(getClock());
ClientUtil clientUtil = new ClientUtil(environmentSettings, restClient(), throttler);
IndexUtils indexUtils = new IndexUtils(restClient(), clientUtil, sdkClusterService, indexNameExpressionResolver);
DiscoveryNodeFilterer nodeFilter = new DiscoveryNodeFilterer(sdkClusterService);
IndexUtils indexUtils = new IndexUtils(restClient(), clientUtil, sdkClusterService, indexNameExpressionResolver, javaAsyncClient());
nodeFilter = new DiscoveryNodeFilterer(sdkClusterService);
AnomalyDetectionIndices anomalyDetectionIndices = new AnomalyDetectionIndices(
sdkRestClient,
sdkJavaAsyncClient,
Expand Down Expand Up @@ -405,7 +413,7 @@ public PooledObject<LinkedBuffer> wrap(LinkedBuffer obj) {
.put(StatNames.MODEL_COUNT.getName(), new ADStat<>(false, new ModelsOnNodeCountSupplier(modelManager, cacheProvider)))
.build();

ADStats adStats = new ADStats(stats);
adStats = new ADStats(stats);

ADTaskCacheManager adTaskCacheManager = new ADTaskCacheManager(environmentSettings, sdkClusterService, memoryTracker);
ADTaskManager adTaskManager = new ADTaskManager(
Expand Down Expand Up @@ -587,7 +595,9 @@ public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
new ActionHandler<>(ADJobRunnerAction.INSTANCE, ADJobRunnerTransportAction.class),
new ActionHandler<>(ADJobParameterAction.INSTANCE, ADJobParameterTransportAction.class),
new ActionHandler<>(AnomalyDetectorJobAction.INSTANCE, AnomalyDetectorJobTransportAction.class),
new ActionHandler<>(DeleteAnomalyDetectorAction.INSTANCE, DeleteAnomalyDetectorTransportAction.class)
new ActionHandler<>(DeleteAnomalyDetectorAction.INSTANCE, DeleteAnomalyDetectorTransportAction.class),
new ActionHandler<>(StatsAnomalyDetectorAction.INSTANCE, StatsAnomalyDetectorTransportAction.class),
new ActionHandler<>(ADStatsNodesAction.INSTANCE, ADStatsNodesTransportAction.class)
// TODO : Register AnomalyResultAction/TransportAction here :
// https://github.com/opensearch-project/opensearch-sdk-java/issues/626
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ public static String getTooManyCategoricalFieldErr(int limit) {
public static String FAIL_TO_DELETE_AD_RESULT = "Fail to delete anomaly result";
public static String FAIL_TO_GET_STATS = "Fail to get stats";
public static String FAIL_TO_SEARCH = "Fail to search";
public static String FAIL_TO_GET_NODE_STATS = "Failed to get node stats";

public static String CAN_NOT_FIND_RESULT_INDEX = "Can't find result index ";
public static String INVALID_RESULT_INDEX_PREFIX = "Result index must start with " + CUSTOM_RESULT_INDEX_PREFIX;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,64 +11,139 @@

package org.opensearch.ad.rest;

import static org.opensearch.ad.AnomalyDetectorPlugin.AD_BASE_URI;
import static org.opensearch.ad.AnomalyDetectorPlugin.LEGACY_AD_BASE;
import static org.opensearch.ad.util.RestHandlerUtils.NODE_ID;
import static org.opensearch.ad.util.RestHandlerUtils.STAT;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.ActionListener;
import org.opensearch.ad.AnomalyDetectorExtension;
import org.opensearch.ad.constant.CommonErrorMessages;
import org.opensearch.ad.settings.AnomalyDetectorSettings;
import org.opensearch.ad.settings.EnabledSetting;
import org.opensearch.ad.stats.ADStats;
import org.opensearch.ad.transport.ADStatsRequest;
import org.opensearch.ad.transport.StatsAnomalyDetectorAction;
import org.opensearch.ad.transport.StatsAnomalyDetectorResponse;
import org.opensearch.ad.util.DiscoveryNodeFilterer;
import org.opensearch.client.node.NodeClient;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Strings;
import org.opensearch.rest.BaseRestHandler;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.xcontent.json.JsonXContent;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.extensions.rest.ExtensionRestResponse;
import org.opensearch.rest.RestRequest;
import org.opensearch.rest.action.RestToXContentListener;
import org.opensearch.rest.RestStatus;
import org.opensearch.sdk.ExtensionsRunner;
import org.opensearch.sdk.SDKClient.SDKRestClient;
import org.opensearch.sdk.rest.BaseExtensionRestHandler;
import org.opensearch.sdk.rest.ReplacedRouteHandler;

import com.google.common.collect.ImmutableList;

/**
* RestStatsAnomalyDetectorAction consists of the REST handler to get the stats from the anomaly detector plugin.
* RestStatsAnomalyDetectorAction consists of the REST handler to get the stats from the anomaly detector extension.
*/
public class RestStatsAnomalyDetectorAction extends BaseRestHandler {
public class RestStatsAnomalyDetectorAction extends BaseExtensionRestHandler {

private static final String STATS_ANOMALY_DETECTOR_ACTION = "stats_anomaly_detector";
private final Logger logger = LogManager.getLogger(RestStatsAnomalyDetectorAction.class);
private ADStats adStats;
private ClusterService clusterService;
private DiscoveryNodeFilterer nodeFilter;

/**
* Constructor
*
* @param adStats ADStats object
* @param nodeFilter util class to get eligible data nodes
*/
public RestStatsAnomalyDetectorAction(ADStats adStats, DiscoveryNodeFilterer nodeFilter) {
private SDKRestClient sdkRestClient;
private Settings settings;

public RestStatsAnomalyDetectorAction(
ExtensionsRunner extensionsRunner,
SDKRestClient sdkRestClient,
ADStats adStats,
DiscoveryNodeFilterer nodeFilter
) {
this.sdkRestClient = sdkRestClient;
this.settings = extensionsRunner.getEnvironmentSettings();
this.adStats = adStats;
this.nodeFilter = nodeFilter;
}

@Override
public String getName() {
return STATS_ANOMALY_DETECTOR_ACTION;
}

@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) {
public List<ReplacedRouteHandler> replacedRouteHandlers() {
return ImmutableList
.of(
new ReplacedRouteHandler(
RestRequest.Method.GET,
"/{nodeId}/stats/",
RestRequest.Method.GET,
AnomalyDetectorExtension.LEGACY_AD_BASE + "/{nodeId}/stats/",
handleRequest
),
new ReplacedRouteHandler(
RestRequest.Method.GET,
"/{nodeId}/stats/{stat}",
RestRequest.Method.GET,
AnomalyDetectorExtension.LEGACY_AD_BASE + "/{nodeId}/stats/{stat}",
handleRequest
),
new ReplacedRouteHandler(
RestRequest.Method.GET,
"/stats/",
RestRequest.Method.GET,
AnomalyDetectorExtension.LEGACY_AD_BASE + "/stats/",
handleRequest
),
new ReplacedRouteHandler(
RestRequest.Method.GET,
"/stats/{stat}",
RestRequest.Method.GET,
AnomalyDetectorExtension.LEGACY_AD_BASE + "/stats/{stat}",
handleRequest
)
);
}

private Function<RestRequest, ExtensionRestResponse> handleRequest = (request) -> {
try {
return prepareRequest(request);
} catch (Exception e) {
// TODO: handle the AD-specific exceptions separately
return exceptionalRequest(request, e);
}
};

protected ExtensionRestResponse prepareRequest(RestRequest request) throws IOException {
if (!EnabledSetting.isADPluginEnabled()) {
throw new IllegalStateException(CommonErrorMessages.DISABLED_ERR_MSG);
}
ADStatsRequest adStatsRequest = getRequest(request);
return channel -> client.execute(StatsAnomalyDetectorAction.INSTANCE, adStatsRequest, new RestToXContentListener<>(channel));
CompletableFuture<StatsAnomalyDetectorResponse> statsFutureResponse = new CompletableFuture<>();
sdkRestClient
.execute(
StatsAnomalyDetectorAction.INSTANCE,
adStatsRequest,
ActionListener.wrap(response -> statsFutureResponse.complete(response), ex -> statsFutureResponse.completeExceptionally(ex))
);

StatsAnomalyDetectorResponse statsAnomalyDetectorResponse = statsFutureResponse
.orTimeout(AnomalyDetectorSettings.REQUEST_TIMEOUT.get(settings).getMillis(), TimeUnit.MILLISECONDS)
.join();

XContentBuilder statsAnomalyDetectorResponseBuilder = statsAnomalyDetectorResponse
.toXContent(JsonXContent.contentBuilder(), ToXContent.EMPTY_PARAMS);
return new ExtensionRestResponse(request, RestStatus.OK, statsAnomalyDetectorResponseBuilder);
}

/**
Expand All @@ -79,7 +154,7 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
*/
private ADStatsRequest getRequest(RestRequest request) {
// parse the nodes the user wants to query the stats for
String nodesIdsStr = request.param("nodeId");
String nodesIdsStr = request.param(NODE_ID);
Set<String> validStats = adStats.getStats().keySet();

ADStatsRequest adStatsRequest = null;
Expand Down Expand Up @@ -119,41 +194,10 @@ private ADStatsRequest getRequest(RestRequest request) {
}

if (!invalidStats.isEmpty()) {
throw new IllegalArgumentException(unrecognized(request, invalidStats, adStatsRequest.getStatsToBeRetrieved(), "stat"));
throw new IllegalArgumentException(unrecognized(request, invalidStats, adStatsRequest.getStatsToBeRetrieved(), STAT));
}
}
return adStatsRequest;
}

@Override
public List<Route> routes() {
return ImmutableList.of();
}

@Override
public List<ReplacedRoute> replacedRoutes() {
return ImmutableList
.of(
// delete anomaly detector document
new ReplacedRoute(
RestRequest.Method.GET,
AD_BASE_URI + "/{nodeId}/stats/",
RestRequest.Method.GET,
LEGACY_AD_BASE + "/{nodeId}/stats/"
),
new ReplacedRoute(
RestRequest.Method.GET,
AD_BASE_URI + "/{nodeId}/stats/{stat}",
RestRequest.Method.GET,
LEGACY_AD_BASE + "/{nodeId}/stats/{stat}"
),
new ReplacedRoute(RestRequest.Method.GET, AD_BASE_URI + "/stats/", RestRequest.Method.GET, LEGACY_AD_BASE + "/stats/"),
new ReplacedRoute(
RestRequest.Method.GET,
AD_BASE_URI + "/stats/{stat}",
RestRequest.Method.GET,
LEGACY_AD_BASE + "/stats/{stat}"
)
);
}
}
Loading

0 comments on commit 45251ac

Please sign in to comment.