Skip to content

Commit

Permalink
Add state and error to profile API (opendistro-for-elasticsearch#84)
Browse files Browse the repository at this point in the history
* Add state and error to profile API

We want to make it easy for customers and oncalls to identify a detector’s state and error if any. This PR adds such information to our new profile API.

We expect three kinds of states:
-Disabled: if get ad job api says the job is disabled;
-Init: if anomaly score after the last update time of the detector is larger than 0
-Running: if neither of the above applies and no exceptions.

Error is populated if error of the latest anomaly result is not empty.

Testing done:
-manual testing during a detector’s life cycle: not created, created but not started, started, during initialization, after initialization, stopped, restarted
-added unit tests to cover above scenario
  • Loading branch information
kaituo authored Apr 15, 2020
1 parent 0c33050 commit e5b6ce5
Show file tree
Hide file tree
Showing 14 changed files with 1,154 additions and 38 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
Expand Down Expand Up @@ -36,6 +36,7 @@
import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector;
import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetectorJob;
import com.amazon.opendistroforelasticsearch.ad.model.AnomalyResult;
import com.amazon.opendistroforelasticsearch.ad.model.ProfileName;
import com.amazon.opendistroforelasticsearch.ad.rest.RestAnomalyDetectorJobAction;
import com.amazon.opendistroforelasticsearch.ad.rest.RestDeleteAnomalyDetectorAction;
import com.amazon.opendistroforelasticsearch.ad.rest.RestExecuteAnomalyDetectorAction;
Expand Down Expand Up @@ -141,8 +142,8 @@ public class AnomalyDetectorPlugin extends Plugin implements ActionPlugin, Scrip
private Client client;
private ClusterService clusterService;
private ThreadPool threadPool;
private IndexNameExpressionResolver indexNameExpressionResolver;
private ADStats adStats;
private NamedXContentRegistry xContentRegistry;
private ClientUtil clientUtil;

static {
Expand All @@ -164,7 +165,6 @@ public List<RestHandler> getRestHandlers(
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<DiscoveryNodes> nodesInCluster
) {
this.indexNameExpressionResolver = indexNameExpressionResolver;
AnomalyResultHandler anomalyResultHandler = new AnomalyResultHandler(
client,
settings,
Expand All @@ -180,7 +180,12 @@ public List<RestHandler> getRestHandlers(
jobRunner.setAnomalyResultHandler(anomalyResultHandler);
jobRunner.setSettings(settings);

RestGetAnomalyDetectorAction restGetAnomalyDetectorAction = new RestGetAnomalyDetectorAction(restController);
AnomalyDetectorProfileRunner profileRunner = new AnomalyDetectorProfileRunner(client, this.xContentRegistry);
RestGetAnomalyDetectorAction restGetAnomalyDetectorAction = new RestGetAnomalyDetectorAction(
restController,
profileRunner,
ProfileName.getNames()
);
RestIndexAnomalyDetectorAction restIndexAnomalyDetectorAction = new RestIndexAnomalyDetectorAction(
settings,
restController,
Expand Down Expand Up @@ -243,6 +248,7 @@ public Collection<Object> createComponents(
IndexUtils indexUtils = new IndexUtils(client, clientUtil, clusterService);
anomalyDetectionIndices = new AnomalyDetectionIndices(client, clusterService, threadPool, settings, clientUtil);
this.clusterService = clusterService;
this.xContentRegistry = xContentRegistry;

SingleFeatureLinearUniformInterpolator singleFeatureLinearUniformInterpolator =
new IntegerSensitiveSingleFeatureLinearUniformInterpolator();
Expand Down Expand Up @@ -392,7 +398,7 @@ public List<Setting<?>> getSettings() {

@Override
public List<NamedXContentRegistry.Entry> getNamedXContent() {
return ImmutableList.of(AnomalyDetector.XCONTENT_REGISTRY, ADMetaData.XCONTENT_REGISTRY);
return ImmutableList.of(AnomalyDetector.XCONTENT_REGISTRY, ADMetaData.XCONTENT_REGISTRY, AnomalyResult.XCONTENT_REGISTRY);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,287 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.ad;

import static com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector.ANOMALY_DETECTORS_INDEX;
import static com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetectorJob.ANOMALY_DETECTOR_JOB_INDEX;
import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken;

import java.io.IOException;
import java.util.Set;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentParseException;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.SortOrder;

import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetectorJob;
import com.amazon.opendistroforelasticsearch.ad.model.AnomalyResult;
import com.amazon.opendistroforelasticsearch.ad.model.DetectorProfile;
import com.amazon.opendistroforelasticsearch.ad.model.DetectorState;
import com.amazon.opendistroforelasticsearch.ad.model.ProfileName;
import com.amazon.opendistroforelasticsearch.ad.util.MultiResponsesDelegateActionListener;

public class AnomalyDetectorProfileRunner {
private final Logger logger = LogManager.getLogger(AnomalyDetectorProfileRunner.class);
private Client client;
private NamedXContentRegistry xContentRegistry;
static String FAIL_TO_FIND_DETECTOR_MSG = "Fail to find detector with id: ";
static String FAIL_TO_GET_PROFILE_MSG = "Fail to get profile for detector ";

public AnomalyDetectorProfileRunner(Client client, NamedXContentRegistry xContentRegistry) {
this.client = client;
this.xContentRegistry = xContentRegistry;
}

public void profile(String detectorId, ActionListener<DetectorProfile> listener, Set<ProfileName> profiles) {

if (profiles.isEmpty()) {
listener.onFailure(new RuntimeException("Unsupported profile types."));
return;
}

MultiResponsesDelegateActionListener<DetectorProfile> delegateListener = new MultiResponsesDelegateActionListener<DetectorProfile>(
listener,
profiles.size(),
"Fail to fetch profile for " + detectorId
);

prepareProfile(detectorId, delegateListener, profiles);
}

private void prepareProfile(
String detectorId,
MultiResponsesDelegateActionListener<DetectorProfile> listener,
Set<ProfileName> profiles
) {
GetRequest getRequest = new GetRequest(ANOMALY_DETECTOR_JOB_INDEX, detectorId);
client.get(getRequest, ActionListener.wrap(getResponse -> {
if (getResponse != null && getResponse.isExists()) {
try (
XContentParser parser = XContentType.JSON
.xContent()
.createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, getResponse.getSourceAsString())
) {
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser::getTokenLocation);
AnomalyDetectorJob job = AnomalyDetectorJob.parse(parser);
long enabledTimeMs = job.getEnabledTime().toEpochMilli();

if (profiles.contains(ProfileName.STATE)) {
profileState(detectorId, enabledTimeMs, listener, job.isEnabled());
}
if (profiles.contains(ProfileName.ERROR)) {
profileError(detectorId, enabledTimeMs, listener);
}
} catch (IOException | XContentParseException | NullPointerException e) {
logger.error(e);
listener.failImmediately(FAIL_TO_GET_PROFILE_MSG, e);
}
} else {
GetRequest getDetectorRequest = new GetRequest(ANOMALY_DETECTORS_INDEX, detectorId);
client.get(getDetectorRequest, onGetDetectorResponse(listener, detectorId, profiles));
}
}, exception -> {
if (exception instanceof IndexNotFoundException) {
logger.info(exception.getMessage());
GetRequest getDetectorRequest = new GetRequest(ANOMALY_DETECTORS_INDEX, detectorId);
client.get(getDetectorRequest, onGetDetectorResponse(listener, detectorId, profiles));
} else {
logger.error(FAIL_TO_GET_PROFILE_MSG + detectorId);
listener.onFailure(exception);
}
}));
}

private ActionListener<GetResponse> onGetDetectorResponse(
MultiResponsesDelegateActionListener<DetectorProfile> listener,
String detectorId,
Set<ProfileName> profiles
) {
return ActionListener.wrap(getResponse -> {
if (getResponse != null && getResponse.isExists()) {
DetectorProfile profile = new DetectorProfile();
if (profiles.contains(ProfileName.STATE)) {
profile.setState(DetectorState.DISABLED);
}
listener.respondImmediately(profile);
} else {
listener.failImmediately(FAIL_TO_FIND_DETECTOR_MSG + detectorId);
}
}, exception -> { listener.failImmediately(FAIL_TO_FIND_DETECTOR_MSG + detectorId, exception); });
}

/**
* We expect three kinds of states:
* -Disabled: if get ad job api says the job is disabled;
* -Init: if anomaly score after the last update time of the detector is larger than 0
* -Running: if neither of the above applies and no exceptions.
* @param detectorId detector id
* @param enabledTime the time when AD job is enabled in milliseconds
* @param listener listener to process the returned state or exception
* @param enabled whether the detector job is enabled or not
*/
private void profileState(
String detectorId,
long enabledTime,
MultiResponsesDelegateActionListener<DetectorProfile> listener,
boolean enabled
) {
if (enabled) {
SearchRequest searchLatestResult = createInittedEverRequest(detectorId, enabledTime);
client.search(searchLatestResult, onInittedEver(listener, detectorId, enabledTime));
} else {
DetectorProfile profile = new DetectorProfile();
profile.setState(DetectorState.DISABLED);
listener.onResponse(profile);
}
}

private ActionListener<SearchResponse> onInittedEver(
MultiResponsesDelegateActionListener<DetectorProfile> listener,
String detectorId,
long lastUpdateTimeMs
) {
return ActionListener.wrap(searchResponse -> {
SearchHits hits = searchResponse.getHits();
DetectorProfile profile = new DetectorProfile();
if (hits.getTotalHits().value == 0L) {
profile.setState(DetectorState.INIT);
} else {
profile.setState(DetectorState.RUNNING);
}

listener.onResponse(profile);

}, exception -> {
if (exception instanceof IndexNotFoundException) {
DetectorProfile profile = new DetectorProfile();
// anomaly result index is not created yet
profile.setState(DetectorState.INIT);
listener.onResponse(profile);
} else {
logger
.error(
"Fail to find any anomaly result with anomaly score larger than 0 after AD job enabled time for detector {}",
detectorId
);
listener.onFailure(new RuntimeException("Fail to find detector state: " + detectorId, exception));
}
});
}

/**
* Error is populated if error of the latest anomaly result is not empty.
* @param detectorId detector id
* @param enabledTime the time when AD job is enabled in milliseconds
* @param listener listener to process the returned error or exception
*/
private void profileError(String detectorId, long enabledTime, MultiResponsesDelegateActionListener<DetectorProfile> listener) {
SearchRequest searchLatestResult = createLatestAnomalyResultRequest(detectorId, enabledTime);
client.search(searchLatestResult, onGetLatestAnomalyResult(listener, detectorId));
}

private ActionListener<SearchResponse> onGetLatestAnomalyResult(ActionListener<DetectorProfile> listener, String detectorId) {
return ActionListener.wrap(searchResponse -> {
SearchHits hits = searchResponse.getHits();
if (hits.getTotalHits().value == 0L) {
listener.onResponse(new DetectorProfile());
} else {
SearchHit hit = hits.getAt(0);

try (
XContentParser parser = XContentType.JSON
.xContent()
.createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, hit.getSourceAsString())
) {
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser::getTokenLocation);
AnomalyResult result = parser.namedObject(AnomalyResult.class, AnomalyResult.PARSE_FIELD_NAME, null);

DetectorProfile profile = new DetectorProfile();
if (result.getError() != null) {
profile.setError(result.getError());
}
listener.onResponse(profile);
} catch (IOException | XContentParseException | NullPointerException e) {
logger.error("Fail to parse anomaly result with " + hit.toString());
listener.onFailure(new RuntimeException("Fail to find detector error: " + detectorId, e));
}
}
}, exception -> {
if (exception instanceof IndexNotFoundException) {
listener.onResponse(new DetectorProfile());
} else {
logger.error("Fail to find any anomaly result after AD job enabled time for detector {}", detectorId);
listener.onFailure(new RuntimeException("Fail to find detector error: " + detectorId, exception));
}
});
}

/**
* Create search request to check if we have at least 1 anomaly score larger than 0 after AD job enabled time
* @param detectorId detector id
* @param enabledTime the time when AD job is enabled in milliseconds
* @return the search request
*/
private SearchRequest createInittedEverRequest(String detectorId, long enabledTime) {
BoolQueryBuilder filterQuery = new BoolQueryBuilder();
filterQuery.filter(QueryBuilders.termQuery(AnomalyResult.DETECTOR_ID_FIELD, detectorId));
filterQuery.filter(QueryBuilders.rangeQuery(AnomalyResult.EXECUTION_END_TIME_FIELD).gte(enabledTime));
filterQuery.filter(QueryBuilders.rangeQuery(AnomalyResult.ANOMALY_SCORE_FIELD).gt(0));

SearchSourceBuilder source = new SearchSourceBuilder().query(filterQuery).size(1);

SearchRequest request = new SearchRequest(AnomalyResult.ANOMALY_RESULT_INDEX);
request.source(source);
return request;
}

/**
* Create search request to get the latest anomaly result after AD job enabled time
* @param detectorId detector id
* @param enabledTime the time when AD job is enabled in milliseconds
* @return the search request
*/
private SearchRequest createLatestAnomalyResultRequest(String detectorId, long enabledTime) {
BoolQueryBuilder filterQuery = new BoolQueryBuilder();
filterQuery.filter(QueryBuilders.termQuery(AnomalyResult.DETECTOR_ID_FIELD, detectorId));
filterQuery.filter(QueryBuilders.rangeQuery(AnomalyResult.EXECUTION_END_TIME_FIELD).gte(enabledTime));

FieldSortBuilder sortQuery = new FieldSortBuilder(AnomalyResult.EXECUTION_END_TIME_FIELD).order(SortOrder.DESC);

SearchSourceBuilder source = new SearchSourceBuilder().query(filterQuery).size(1).sort(sortQuery);

SearchRequest request = new SearchRequest(AnomalyResult.ANOMALY_RESULT_INDEX);
request.source(source);
return request;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ public static AnomalyDetectorJob parse(XContentParser parser) throws IOException
String name = null;
Schedule schedule = null;
TimeConfiguration windowDelay = null;
Boolean isEnabled = null;
// we cannot set it to null as isEnabled() would do the unboxing and results in null pointer exception
Boolean isEnabled = Boolean.FALSE;
Instant enabledTime = null;
Instant disabledTime = null;
Instant lastUpdateTime = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
import com.amazon.opendistroforelasticsearch.ad.annotation.Generated;
import com.amazon.opendistroforelasticsearch.ad.util.ParseUtils;
import com.google.common.base.Objects;

import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
Expand All @@ -35,10 +38,17 @@
*/
public class AnomalyResult implements ToXContentObject {

public static final String PARSE_FIELD_NAME = "AnomalyResult";
public static final NamedXContentRegistry.Entry XCONTENT_REGISTRY = new NamedXContentRegistry.Entry(
AnomalyResult.class,
new ParseField(PARSE_FIELD_NAME),
it -> parse(it)
);

public static final String ANOMALY_RESULT_INDEX = ".opendistro-anomaly-results";

public static final String DETECTOR_ID_FIELD = "detector_id";
private static final String ANOMALY_SCORE_FIELD = "anomaly_score";
public static final String ANOMALY_SCORE_FIELD = "anomaly_score";
private static final String ANOMALY_GRADE_FIELD = "anomaly_grade";
private static final String CONFIDENCE_FIELD = "confidence";
private static final String FEATURE_DATA_FIELD = "feature_data";
Expand Down
Loading

0 comments on commit e5b6ce5

Please sign in to comment.