From e5b6ce5f47590e9c451c02a2b81d1c40a5f2b6d6 Mon Sep 17 00:00:00 2001 From: Kaituo Li Date: Wed, 15 Apr 2020 15:45:13 -0700 Subject: [PATCH] Add state and error to profile API (#84) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Add state and error to profile API We want to make it easy for customers and oncalls to identify a detector’s state and error if any. This PR adds such information to our new profile API. We expect three kinds of states: -Disabled: if get ad job api says the job is disabled; -Init: if anomaly score after the last update time of the detector is larger than 0 -Running: if neither of the above applies and no exceptions. Error is populated if error of the latest anomaly result is not empty. Testing done: -manual testing during a detector’s life cycle: not created, created but not started, started, during initialization, after initialization, stopped, restarted -added unit tests to cover above scenario --- .../ad/AnomalyDetectorPlugin.java | 16 +- .../ad/AnomalyDetectorProfileRunner.java | 287 ++++++++++++++++ .../ad/model/AnomalyDetectorJob.java | 3 +- .../ad/model/AnomalyResult.java | 12 +- .../ad/model/DetectorProfile.java | 107 ++++++ .../ad/model/DetectorState.java | 22 ++ .../ad/model/Mergeable.java | 20 ++ .../ad/model/ProfileName.java | 73 ++++ .../ad/rest/AbstractSearchAction.java | 17 +- .../ad/rest/RestGetAnomalyDetectorAction.java | 100 ++++-- .../MultiResponsesDelegateActionListener.java | 112 +++++++ .../ad/util/RestHandlerUtils.java | 2 + .../ad/AnomalyDetectorProfileRunnerTests.java | 317 ++++++++++++++++++ .../ad/TestHelpers.java | 104 +++++- 14 files changed, 1154 insertions(+), 38 deletions(-) create mode 100644 src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorProfileRunner.java create mode 100644 src/main/java/com/amazon/opendistroforelasticsearch/ad/model/DetectorProfile.java create mode 100644 src/main/java/com/amazon/opendistroforelasticsearch/ad/model/DetectorState.java create mode 100644 src/main/java/com/amazon/opendistroforelasticsearch/ad/model/Mergeable.java create mode 100644 src/main/java/com/amazon/opendistroforelasticsearch/ad/model/ProfileName.java create mode 100644 src/main/java/com/amazon/opendistroforelasticsearch/ad/util/MultiResponsesDelegateActionListener.java create mode 100644 src/test/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorProfileRunnerTests.java diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorPlugin.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorPlugin.java index 9744ed76..285ab608 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorPlugin.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorPlugin.java @@ -1,5 +1,5 @@ /* - * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). * You may not use this file except in compliance with the License. @@ -36,6 +36,7 @@ import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector; import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetectorJob; import com.amazon.opendistroforelasticsearch.ad.model.AnomalyResult; +import com.amazon.opendistroforelasticsearch.ad.model.ProfileName; import com.amazon.opendistroforelasticsearch.ad.rest.RestAnomalyDetectorJobAction; import com.amazon.opendistroforelasticsearch.ad.rest.RestDeleteAnomalyDetectorAction; import com.amazon.opendistroforelasticsearch.ad.rest.RestExecuteAnomalyDetectorAction; @@ -141,8 +142,8 @@ public class AnomalyDetectorPlugin extends Plugin implements ActionPlugin, Scrip private Client client; private ClusterService clusterService; private ThreadPool threadPool; - private IndexNameExpressionResolver indexNameExpressionResolver; private ADStats adStats; + private NamedXContentRegistry xContentRegistry; private ClientUtil clientUtil; static { @@ -164,7 +165,6 @@ public List getRestHandlers( IndexNameExpressionResolver indexNameExpressionResolver, Supplier nodesInCluster ) { - this.indexNameExpressionResolver = indexNameExpressionResolver; AnomalyResultHandler anomalyResultHandler = new AnomalyResultHandler( client, settings, @@ -180,7 +180,12 @@ public List getRestHandlers( jobRunner.setAnomalyResultHandler(anomalyResultHandler); jobRunner.setSettings(settings); - RestGetAnomalyDetectorAction restGetAnomalyDetectorAction = new RestGetAnomalyDetectorAction(restController); + AnomalyDetectorProfileRunner profileRunner = new AnomalyDetectorProfileRunner(client, this.xContentRegistry); + RestGetAnomalyDetectorAction restGetAnomalyDetectorAction = new RestGetAnomalyDetectorAction( + restController, + profileRunner, + ProfileName.getNames() + ); RestIndexAnomalyDetectorAction restIndexAnomalyDetectorAction = new RestIndexAnomalyDetectorAction( settings, restController, @@ -243,6 +248,7 @@ public Collection createComponents( IndexUtils indexUtils = new IndexUtils(client, clientUtil, clusterService); anomalyDetectionIndices = new AnomalyDetectionIndices(client, clusterService, threadPool, settings, clientUtil); this.clusterService = clusterService; + this.xContentRegistry = xContentRegistry; SingleFeatureLinearUniformInterpolator singleFeatureLinearUniformInterpolator = new IntegerSensitiveSingleFeatureLinearUniformInterpolator(); @@ -392,7 +398,7 @@ public List> getSettings() { @Override public List getNamedXContent() { - return ImmutableList.of(AnomalyDetector.XCONTENT_REGISTRY, ADMetaData.XCONTENT_REGISTRY); + return ImmutableList.of(AnomalyDetector.XCONTENT_REGISTRY, ADMetaData.XCONTENT_REGISTRY, AnomalyResult.XCONTENT_REGISTRY); } @Override diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorProfileRunner.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorProfileRunner.java new file mode 100644 index 00000000..1cf75cda --- /dev/null +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorProfileRunner.java @@ -0,0 +1,287 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.ad; + +import static com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector.ANOMALY_DETECTORS_INDEX; +import static com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetectorJob.ANOMALY_DETECTOR_JOB_INDEX; +import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken; + +import java.io.IOException; +import java.util.Set; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.get.GetRequest; +import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.XContentParseException; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.index.query.BoolQueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.SearchHits; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.search.sort.FieldSortBuilder; +import org.elasticsearch.search.sort.SortOrder; + +import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetectorJob; +import com.amazon.opendistroforelasticsearch.ad.model.AnomalyResult; +import com.amazon.opendistroforelasticsearch.ad.model.DetectorProfile; +import com.amazon.opendistroforelasticsearch.ad.model.DetectorState; +import com.amazon.opendistroforelasticsearch.ad.model.ProfileName; +import com.amazon.opendistroforelasticsearch.ad.util.MultiResponsesDelegateActionListener; + +public class AnomalyDetectorProfileRunner { + private final Logger logger = LogManager.getLogger(AnomalyDetectorProfileRunner.class); + private Client client; + private NamedXContentRegistry xContentRegistry; + static String FAIL_TO_FIND_DETECTOR_MSG = "Fail to find detector with id: "; + static String FAIL_TO_GET_PROFILE_MSG = "Fail to get profile for detector "; + + public AnomalyDetectorProfileRunner(Client client, NamedXContentRegistry xContentRegistry) { + this.client = client; + this.xContentRegistry = xContentRegistry; + } + + public void profile(String detectorId, ActionListener listener, Set profiles) { + + if (profiles.isEmpty()) { + listener.onFailure(new RuntimeException("Unsupported profile types.")); + return; + } + + MultiResponsesDelegateActionListener delegateListener = new MultiResponsesDelegateActionListener( + listener, + profiles.size(), + "Fail to fetch profile for " + detectorId + ); + + prepareProfile(detectorId, delegateListener, profiles); + } + + private void prepareProfile( + String detectorId, + MultiResponsesDelegateActionListener listener, + Set profiles + ) { + GetRequest getRequest = new GetRequest(ANOMALY_DETECTOR_JOB_INDEX, detectorId); + client.get(getRequest, ActionListener.wrap(getResponse -> { + if (getResponse != null && getResponse.isExists()) { + try ( + XContentParser parser = XContentType.JSON + .xContent() + .createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, getResponse.getSourceAsString()) + ) { + ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser::getTokenLocation); + AnomalyDetectorJob job = AnomalyDetectorJob.parse(parser); + long enabledTimeMs = job.getEnabledTime().toEpochMilli(); + + if (profiles.contains(ProfileName.STATE)) { + profileState(detectorId, enabledTimeMs, listener, job.isEnabled()); + } + if (profiles.contains(ProfileName.ERROR)) { + profileError(detectorId, enabledTimeMs, listener); + } + } catch (IOException | XContentParseException | NullPointerException e) { + logger.error(e); + listener.failImmediately(FAIL_TO_GET_PROFILE_MSG, e); + } + } else { + GetRequest getDetectorRequest = new GetRequest(ANOMALY_DETECTORS_INDEX, detectorId); + client.get(getDetectorRequest, onGetDetectorResponse(listener, detectorId, profiles)); + } + }, exception -> { + if (exception instanceof IndexNotFoundException) { + logger.info(exception.getMessage()); + GetRequest getDetectorRequest = new GetRequest(ANOMALY_DETECTORS_INDEX, detectorId); + client.get(getDetectorRequest, onGetDetectorResponse(listener, detectorId, profiles)); + } else { + logger.error(FAIL_TO_GET_PROFILE_MSG + detectorId); + listener.onFailure(exception); + } + })); + } + + private ActionListener onGetDetectorResponse( + MultiResponsesDelegateActionListener listener, + String detectorId, + Set profiles + ) { + return ActionListener.wrap(getResponse -> { + if (getResponse != null && getResponse.isExists()) { + DetectorProfile profile = new DetectorProfile(); + if (profiles.contains(ProfileName.STATE)) { + profile.setState(DetectorState.DISABLED); + } + listener.respondImmediately(profile); + } else { + listener.failImmediately(FAIL_TO_FIND_DETECTOR_MSG + detectorId); + } + }, exception -> { listener.failImmediately(FAIL_TO_FIND_DETECTOR_MSG + detectorId, exception); }); + } + + /** + * We expect three kinds of states: + * -Disabled: if get ad job api says the job is disabled; + * -Init: if anomaly score after the last update time of the detector is larger than 0 + * -Running: if neither of the above applies and no exceptions. + * @param detectorId detector id + * @param enabledTime the time when AD job is enabled in milliseconds + * @param listener listener to process the returned state or exception + * @param enabled whether the detector job is enabled or not + */ + private void profileState( + String detectorId, + long enabledTime, + MultiResponsesDelegateActionListener listener, + boolean enabled + ) { + if (enabled) { + SearchRequest searchLatestResult = createInittedEverRequest(detectorId, enabledTime); + client.search(searchLatestResult, onInittedEver(listener, detectorId, enabledTime)); + } else { + DetectorProfile profile = new DetectorProfile(); + profile.setState(DetectorState.DISABLED); + listener.onResponse(profile); + } + } + + private ActionListener onInittedEver( + MultiResponsesDelegateActionListener listener, + String detectorId, + long lastUpdateTimeMs + ) { + return ActionListener.wrap(searchResponse -> { + SearchHits hits = searchResponse.getHits(); + DetectorProfile profile = new DetectorProfile(); + if (hits.getTotalHits().value == 0L) { + profile.setState(DetectorState.INIT); + } else { + profile.setState(DetectorState.RUNNING); + } + + listener.onResponse(profile); + + }, exception -> { + if (exception instanceof IndexNotFoundException) { + DetectorProfile profile = new DetectorProfile(); + // anomaly result index is not created yet + profile.setState(DetectorState.INIT); + listener.onResponse(profile); + } else { + logger + .error( + "Fail to find any anomaly result with anomaly score larger than 0 after AD job enabled time for detector {}", + detectorId + ); + listener.onFailure(new RuntimeException("Fail to find detector state: " + detectorId, exception)); + } + }); + } + + /** + * Error is populated if error of the latest anomaly result is not empty. + * @param detectorId detector id + * @param enabledTime the time when AD job is enabled in milliseconds + * @param listener listener to process the returned error or exception + */ + private void profileError(String detectorId, long enabledTime, MultiResponsesDelegateActionListener listener) { + SearchRequest searchLatestResult = createLatestAnomalyResultRequest(detectorId, enabledTime); + client.search(searchLatestResult, onGetLatestAnomalyResult(listener, detectorId)); + } + + private ActionListener onGetLatestAnomalyResult(ActionListener listener, String detectorId) { + return ActionListener.wrap(searchResponse -> { + SearchHits hits = searchResponse.getHits(); + if (hits.getTotalHits().value == 0L) { + listener.onResponse(new DetectorProfile()); + } else { + SearchHit hit = hits.getAt(0); + + try ( + XContentParser parser = XContentType.JSON + .xContent() + .createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, hit.getSourceAsString()) + ) { + ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser::getTokenLocation); + AnomalyResult result = parser.namedObject(AnomalyResult.class, AnomalyResult.PARSE_FIELD_NAME, null); + + DetectorProfile profile = new DetectorProfile(); + if (result.getError() != null) { + profile.setError(result.getError()); + } + listener.onResponse(profile); + } catch (IOException | XContentParseException | NullPointerException e) { + logger.error("Fail to parse anomaly result with " + hit.toString()); + listener.onFailure(new RuntimeException("Fail to find detector error: " + detectorId, e)); + } + } + }, exception -> { + if (exception instanceof IndexNotFoundException) { + listener.onResponse(new DetectorProfile()); + } else { + logger.error("Fail to find any anomaly result after AD job enabled time for detector {}", detectorId); + listener.onFailure(new RuntimeException("Fail to find detector error: " + detectorId, exception)); + } + }); + } + + /** + * Create search request to check if we have at least 1 anomaly score larger than 0 after AD job enabled time + * @param detectorId detector id + * @param enabledTime the time when AD job is enabled in milliseconds + * @return the search request + */ + private SearchRequest createInittedEverRequest(String detectorId, long enabledTime) { + BoolQueryBuilder filterQuery = new BoolQueryBuilder(); + filterQuery.filter(QueryBuilders.termQuery(AnomalyResult.DETECTOR_ID_FIELD, detectorId)); + filterQuery.filter(QueryBuilders.rangeQuery(AnomalyResult.EXECUTION_END_TIME_FIELD).gte(enabledTime)); + filterQuery.filter(QueryBuilders.rangeQuery(AnomalyResult.ANOMALY_SCORE_FIELD).gt(0)); + + SearchSourceBuilder source = new SearchSourceBuilder().query(filterQuery).size(1); + + SearchRequest request = new SearchRequest(AnomalyResult.ANOMALY_RESULT_INDEX); + request.source(source); + return request; + } + + /** + * Create search request to get the latest anomaly result after AD job enabled time + * @param detectorId detector id + * @param enabledTime the time when AD job is enabled in milliseconds + * @return the search request + */ + private SearchRequest createLatestAnomalyResultRequest(String detectorId, long enabledTime) { + BoolQueryBuilder filterQuery = new BoolQueryBuilder(); + filterQuery.filter(QueryBuilders.termQuery(AnomalyResult.DETECTOR_ID_FIELD, detectorId)); + filterQuery.filter(QueryBuilders.rangeQuery(AnomalyResult.EXECUTION_END_TIME_FIELD).gte(enabledTime)); + + FieldSortBuilder sortQuery = new FieldSortBuilder(AnomalyResult.EXECUTION_END_TIME_FIELD).order(SortOrder.DESC); + + SearchSourceBuilder source = new SearchSourceBuilder().query(filterQuery).size(1).sort(sortQuery); + + SearchRequest request = new SearchRequest(AnomalyResult.ANOMALY_RESULT_INDEX); + request.source(source); + return request; + } +} diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/model/AnomalyDetectorJob.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/model/AnomalyDetectorJob.java index 62a42cf7..bf552941 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/model/AnomalyDetectorJob.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/model/AnomalyDetectorJob.java @@ -96,7 +96,8 @@ public static AnomalyDetectorJob parse(XContentParser parser) throws IOException String name = null; Schedule schedule = null; TimeConfiguration windowDelay = null; - Boolean isEnabled = null; + // we cannot set it to null as isEnabled() would do the unboxing and results in null pointer exception + Boolean isEnabled = Boolean.FALSE; Instant enabledTime = null; Instant disabledTime = null; Instant lastUpdateTime = null; diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/model/AnomalyResult.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/model/AnomalyResult.java index 077ba33d..d0881e3b 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/model/AnomalyResult.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/model/AnomalyResult.java @@ -18,6 +18,9 @@ import com.amazon.opendistroforelasticsearch.ad.annotation.Generated; import com.amazon.opendistroforelasticsearch.ad.util.ParseUtils; import com.google.common.base.Objects; + +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; @@ -35,10 +38,17 @@ */ public class AnomalyResult implements ToXContentObject { + public static final String PARSE_FIELD_NAME = "AnomalyResult"; + public static final NamedXContentRegistry.Entry XCONTENT_REGISTRY = new NamedXContentRegistry.Entry( + AnomalyResult.class, + new ParseField(PARSE_FIELD_NAME), + it -> parse(it) + ); + public static final String ANOMALY_RESULT_INDEX = ".opendistro-anomaly-results"; public static final String DETECTOR_ID_FIELD = "detector_id"; - private static final String ANOMALY_SCORE_FIELD = "anomaly_score"; + public static final String ANOMALY_SCORE_FIELD = "anomaly_score"; private static final String ANOMALY_GRADE_FIELD = "anomaly_grade"; private static final String CONFIDENCE_FIELD = "confidence"; private static final String FEATURE_DATA_FIELD = "feature_data"; diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/model/DetectorProfile.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/model/DetectorProfile.java new file mode 100644 index 00000000..30650cbe --- /dev/null +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/model/DetectorProfile.java @@ -0,0 +1,107 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.ad.model; + +import java.io.IOException; + +import org.apache.commons.lang.builder.EqualsBuilder; +import org.apache.commons.lang.builder.HashCodeBuilder; +import org.apache.commons.lang.builder.ToStringBuilder; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; + +public class DetectorProfile implements ToXContentObject, Mergeable { + private DetectorState state; + private String error; + + private static final String STATE_FIELD = "state"; + private static final String ERROR_FIELD = "error"; + + public XContentBuilder toXContent(XContentBuilder builder) throws IOException { + return toXContent(builder, ToXContent.EMPTY_PARAMS); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + XContentBuilder xContentBuilder = builder.startObject(); + + if (state != null) { + xContentBuilder.field(STATE_FIELD, state); + } + if (error != null) { + xContentBuilder.field(ERROR_FIELD, error); + } + return xContentBuilder.endObject(); + } + + public DetectorState getState() { + return state; + } + + public void setState(DetectorState state) { + this.state = state; + } + + public String getError() { + return error; + } + + public void setError(String error) { + this.error = error; + } + + @Override + public void merge(Mergeable other) { + if (this == other || other == null || getClass() != other.getClass()) { + return; + } + DetectorProfile otherProfile = (DetectorProfile) other; + if (otherProfile.getState() != null) { + this.state = otherProfile.getState(); + } + if (otherProfile.getError() != null) { + this.error = otherProfile.getError(); + } + + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + if (obj instanceof DetectorProfile) { + DetectorProfile other = (DetectorProfile) obj; + + return new EqualsBuilder().append(state, other.state).append(error, other.error).isEquals(); + } + return false; + } + + @Override + public int hashCode() { + return new HashCodeBuilder().append(state).append(error).toHashCode(); + } + + @Override + public String toString() { + return new ToStringBuilder(this).append("state", state).append("error", error).toString(); + } +} diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/model/DetectorState.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/model/DetectorState.java new file mode 100644 index 00000000..08307942 --- /dev/null +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/model/DetectorState.java @@ -0,0 +1,22 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.ad.model; + +public enum DetectorState { + DISABLED, + INIT, + RUNNING +} diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/model/Mergeable.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/model/Mergeable.java new file mode 100644 index 00000000..7093af99 --- /dev/null +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/model/Mergeable.java @@ -0,0 +1,20 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.ad.model; + +public interface Mergeable { + void merge(Mergeable other); +} diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/model/ProfileName.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/model/ProfileName.java new file mode 100644 index 00000000..ea0be275 --- /dev/null +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/model/ProfileName.java @@ -0,0 +1,73 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.ad.model; + +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; + +public enum ProfileName { + STATE("state"), + ERROR("error"); + + private String name; + + ProfileName(String name) { + this.name = name; + } + + /** + * Get profile name + * + * @return name + */ + public String getName() { + return name; + } + + /** + * Get set of profile names + * + * @return set of profile names + */ + public static Set getNames() { + Set names = new HashSet<>(); + + for (ProfileName statName : ProfileName.values()) { + names.add(statName.getName()); + } + return names; + } + + public static ProfileName getName(String name) { + switch (name) { + case "state": + return STATE; + case "error": + return ERROR; + default: + throw new IllegalArgumentException("Unsupported profile types"); + } + } + + public static Set getNames(Collection names) { + Set res = new HashSet<>(); + for (String name : names) { + res.add(getName(name)); + } + return res; + } +} diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/AbstractSearchAction.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/AbstractSearchAction.java index ef4a4137..3f98befd 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/AbstractSearchAction.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/AbstractSearchAction.java @@ -80,13 +80,18 @@ public RestResponse buildResponse(SearchResponse response) throws Exception { return new BytesRestResponse(RestStatus.REQUEST_TIMEOUT, response.toString()); } - for (SearchHit hit : response.getHits()) { - XContentParser parser = XContentType.JSON - .xContent() - .createParser(channel.request().getXContentRegistry(), LoggingDeprecationHandler.INSTANCE, hit.getSourceAsString()); - ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser::getTokenLocation); + if (clazz == AnomalyDetector.class) { + for (SearchHit hit : response.getHits()) { + XContentParser parser = XContentType.JSON + .xContent() + .createParser( + channel.request().getXContentRegistry(), + LoggingDeprecationHandler.INSTANCE, + hit.getSourceAsString() + ); + ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser::getTokenLocation); - if (clazz == AnomalyDetector.class) { + // write back id and version to anomaly detector object ToXContentObject xContentObject = AnomalyDetector.parse(parser, hit.getId(), hit.getVersion()); XContentBuilder builder = xContentObject.toXContent(jsonBuilder(), EMPTY_PARAMS); hit.sourceRef(BytesReference.bytes(builder)); diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/RestGetAnomalyDetectorAction.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/RestGetAnomalyDetectorAction.java index 82582ad7..21e142e6 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/RestGetAnomalyDetectorAction.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/RestGetAnomalyDetectorAction.java @@ -1,5 +1,5 @@ /* - * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). * You may not use this file except in compliance with the License. @@ -17,8 +17,13 @@ import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector; import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetectorJob; +import com.amazon.opendistroforelasticsearch.ad.model.DetectorProfile; +import com.amazon.opendistroforelasticsearch.ad.model.ProfileName; import com.amazon.opendistroforelasticsearch.ad.util.RestHandlerUtils; +import com.google.common.collect.Sets; import com.amazon.opendistroforelasticsearch.ad.AnomalyDetectorPlugin; +import com.amazon.opendistroforelasticsearch.ad.AnomalyDetectorProfileRunner; + import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -27,6 +32,7 @@ import org.elasticsearch.action.get.MultiGetRequest; import org.elasticsearch.action.get.MultiGetResponse; import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.rest.BaseRestHandler; @@ -40,11 +46,16 @@ import org.elasticsearch.rest.action.RestResponseListener; import java.io.IOException; +import java.util.Arrays; +import java.util.HashSet; import java.util.Locale; +import java.util.Set; import static com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector.ANOMALY_DETECTORS_INDEX; import static com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetectorJob.ANOMALY_DETECTOR_JOB_INDEX; import static com.amazon.opendistroforelasticsearch.ad.util.RestHandlerUtils.DETECTOR_ID; +import static com.amazon.opendistroforelasticsearch.ad.util.RestHandlerUtils.PROFILE; +import static com.amazon.opendistroforelasticsearch.ad.util.RestHandlerUtils.TYPE; import static com.amazon.opendistroforelasticsearch.ad.util.RestHandlerUtils.createXContentParser; import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken; @@ -55,11 +66,36 @@ public class RestGetAnomalyDetectorAction extends BaseRestHandler { private static final String GET_ANOMALY_DETECTOR_ACTION = "get_anomaly_detector"; private static final Logger logger = LogManager.getLogger(RestGetAnomalyDetectorAction.class); + private final AnomalyDetectorProfileRunner profileRunner; + private final Set allProfileTypeStrs; + private final Set allProfileTypes; + + public RestGetAnomalyDetectorAction( + RestController controller, + AnomalyDetectorProfileRunner profileRunner, + Set allProfileTypeStrs + ) { + this.profileRunner = profileRunner; + this.allProfileTypes = new HashSet(Arrays.asList(ProfileName.values())); + this.allProfileTypeStrs = ProfileName.getNames(); - public RestGetAnomalyDetectorAction(RestController controller) { String path = String.format(Locale.ROOT, "%s/{%s}", AnomalyDetectorPlugin.AD_BASE_DETECTORS_URI, DETECTOR_ID); controller.registerHandler(RestRequest.Method.GET, path, this); controller.registerHandler(RestRequest.Method.HEAD, path, this); + controller + .registerHandler( + RestRequest.Method.GET, + String.format(Locale.ROOT, "%s/{%s}/%s", AnomalyDetectorPlugin.AD_BASE_DETECTORS_URI, DETECTOR_ID, PROFILE), + this + ); + // types is a profile names. See a complete list of supported profiles names in + // com.amazon.opendistroforelasticsearch.ad.model.ProfileName. + controller + .registerHandler( + RestRequest.Method.GET, + String.format(Locale.ROOT, "%s/{%s}/%s/{%s}", AnomalyDetectorPlugin.AD_BASE_DETECTORS_URI, DETECTOR_ID, PROFILE, TYPE), + this + ); } @Override @@ -71,16 +107,23 @@ public String getName() { protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { String detectorId = request.param(DETECTOR_ID); boolean returnJob = request.paramAsBoolean("job", false); - MultiGetRequest.Item adItem = new MultiGetRequest.Item(ANOMALY_DETECTORS_INDEX, detectorId) - .version(RestActions.parseVersion(request)); - MultiGetRequest multiGetRequest = new MultiGetRequest().add(adItem); - if (returnJob) { - MultiGetRequest.Item adJobItem = new MultiGetRequest.Item(ANOMALY_DETECTOR_JOB_INDEX, detectorId) + String typesStr = request.param(TYPE); + String rawPath = request.rawPath(); + if (!Strings.isEmpty(typesStr) || rawPath.endsWith(PROFILE) || rawPath.endsWith(PROFILE + "/")) { + return channel -> profileRunner + .profile(detectorId, getProfileActionListener(channel, detectorId), getProfilesToCollect(typesStr)); + } else { + MultiGetRequest.Item adItem = new MultiGetRequest.Item(ANOMALY_DETECTORS_INDEX, detectorId) .version(RestActions.parseVersion(request)); - multiGetRequest.add(adJobItem); - } + MultiGetRequest multiGetRequest = new MultiGetRequest().add(adItem); + if (returnJob) { + MultiGetRequest.Item adJobItem = new MultiGetRequest.Item(ANOMALY_DETECTOR_JOB_INDEX, detectorId) + .version(RestActions.parseVersion(request)); + multiGetRequest.add(adJobItem); + } - return channel -> client.multiGet(multiGetRequest, onMultiGetResponse(channel, returnJob, detectorId)); + return channel -> client.multiGet(multiGetRequest, onMultiGetResponse(channel, returnJob, detectorId)); + } } private ActionListener onMultiGetResponse(RestChannel channel, boolean returnJob, String detectorId) { @@ -110,12 +153,8 @@ public RestResponse buildResponse(MultiGetResponse multiGetResponse) throws Exce ) { ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser::getTokenLocation); detector = parser.namedObject(AnomalyDetector.class, AnomalyDetector.PARSE_FIELD_NAME, null); - } catch (Throwable t) { - logger.error("Fail to parse detector", t); - return new BytesRestResponse( - RestStatus.INTERNAL_SERVER_ERROR, - "Failed to parse detector with id: " + detectorId - ); + } catch (Exception e) { + return buildInternalServerErrorResponse(e, "Failed to parse detector with id: " + detectorId); } } } @@ -127,12 +166,8 @@ public RestResponse buildResponse(MultiGetResponse multiGetResponse) throws Exce try (XContentParser parser = createXContentParser(channel, response.getResponse().getSourceAsBytesRef())) { ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser::getTokenLocation); adJob = AnomalyDetectorJob.parse(parser); - } catch (Throwable t) { - logger.error("Fail to parse detector job ", t); - return new BytesRestResponse( - RestStatus.INTERNAL_SERVER_ERROR, - "Failed to parse detector job with id: " + detectorId - ); + } catch (Exception e) { + return buildInternalServerErrorResponse(e, "Failed to parse detector job with id: " + detectorId); } } } @@ -148,4 +183,25 @@ public RestResponse buildResponse(MultiGetResponse multiGetResponse) throws Exce }; } + private ActionListener getProfileActionListener(RestChannel channel, String detectorId) { + return ActionListener + .wrap( + profile -> { channel.sendResponse(new BytesRestResponse(RestStatus.OK, profile.toXContent(channel.newBuilder()))); }, + exception -> { channel.sendResponse(buildInternalServerErrorResponse(exception, exception.getMessage())); } + ); + } + + private RestResponse buildInternalServerErrorResponse(Exception e, String errorMsg) { + logger.error(errorMsg, e); + return new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, errorMsg); + } + + private Set getProfilesToCollect(String typesStr) { + if (Strings.isEmpty(typesStr)) { + return this.allProfileTypes; + } else { + Set typesInRequest = new HashSet<>(Arrays.asList(typesStr.split(","))); + return ProfileName.getNames(Sets.intersection(this.allProfileTypeStrs, typesInRequest)); + } + } } diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/MultiResponsesDelegateActionListener.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/MultiResponsesDelegateActionListener.java new file mode 100644 index 00000000..3f42a18c --- /dev/null +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/MultiResponsesDelegateActionListener.java @@ -0,0 +1,112 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.ad.util; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Locale; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.ActionListener; + +import com.amazon.opendistroforelasticsearch.ad.model.Mergeable; + +/** + * A listener wrapper to help send multiple requests asynchronously and return one final responses together + */ +public class MultiResponsesDelegateActionListener implements ActionListener { + private static final Logger LOG = LogManager.getLogger(MultiResponsesDelegateActionListener.class); + private final ActionListener delegate; + private final AtomicInteger collectedResponseCount; + private final int maxResponseCount; + // save responses from multiple requests + private final List savedResponses; + private List exceptions; + private String finalErrorMsg; + + public MultiResponsesDelegateActionListener(ActionListener delegate, int maxResponseCount, String finalErrorMsg) { + this.delegate = delegate; + this.collectedResponseCount = new AtomicInteger(0); + this.maxResponseCount = maxResponseCount; + this.savedResponses = Collections.synchronizedList(new ArrayList()); + this.exceptions = Collections.synchronizedList(new ArrayList()); + this.finalErrorMsg = finalErrorMsg; + } + + @Override + public void onResponse(T response) { + try { + if (response != null) { + this.savedResponses.add(response); + } + } finally { + // If expectedResponseCount == 0 , collectedResponseCount.incrementAndGet() will be greater than expectedResponseCount + if (collectedResponseCount.incrementAndGet() >= maxResponseCount) { + finish(); + } + } + + } + + @Override + public void onFailure(Exception e) { + LOG.error(e); + try { + this.exceptions.add(e.getMessage()); + } finally { + // no matter the asynchronous request is a failure or success, we need to increment the count. + // We need finally here to increment the count when there is a failure. + if (collectedResponseCount.incrementAndGet() >= maxResponseCount) { + finish(); + } + } + } + + private void finish() { + if (this.exceptions.size() == 0) { + if (savedResponses.size() == 0) { + this.delegate.onFailure(new RuntimeException("No response collected")); + } else { + T response0 = savedResponses.get(0); + for (int i = 1; i < savedResponses.size(); i++) { + response0.merge(savedResponses.get(i)); + } + this.delegate.onResponse(response0); + } + } else { + this.delegate.onFailure(new RuntimeException(String.format(Locale.ROOT, finalErrorMsg + " Exceptions: %s", exceptions))); + } + } + + public void failImmediately(Exception e) { + this.delegate.onFailure(new RuntimeException(finalErrorMsg, e)); + } + + public void failImmediately(String errMsg) { + this.delegate.onFailure(new RuntimeException(errMsg)); + } + + public void failImmediately(String errMsg, Exception e) { + this.delegate.onFailure(new RuntimeException(errMsg, e)); + } + + public void respondImmediately(T o) { + this.delegate.onResponse(o); + } +} diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/RestHandlerUtils.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/RestHandlerUtils.java index 2d09ea2d..057eeec7 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/RestHandlerUtils.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/RestHandlerUtils.java @@ -53,6 +53,8 @@ public final class RestHandlerUtils { public static final String PREVIEW = "_preview"; public static final String START_JOB = "_start"; public static final String STOP_JOB = "_stop"; + public static final String PROFILE = "_profile"; + public static final String TYPE = "type"; public static final ToXContent.MapParams XCONTENT_WITH_TYPE = new ToXContent.MapParams(ImmutableMap.of("with_type", "true")); private static final String KIBANA_USER_AGENT = "Kibana"; diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorProfileRunnerTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorProfileRunnerTests.java new file mode 100644 index 00000000..8f23f7ea --- /dev/null +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorProfileRunnerTests.java @@ -0,0 +1,317 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.ad; + +import static com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector.ANOMALY_DETECTORS_INDEX; +import static com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetectorJob.ANOMALY_DETECTOR_JOB_INDEX; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; + +import java.io.IOException; +import java.time.Instant; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.get.GetRequest; +import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.search.SearchModule; +import org.elasticsearch.test.ESTestCase; +import org.junit.Before; +import org.junit.BeforeClass; + +import com.amazon.opendistroforelasticsearch.ad.cluster.ADMetaData; +import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector; +import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetectorJob; +import com.amazon.opendistroforelasticsearch.ad.model.AnomalyResult; +import com.amazon.opendistroforelasticsearch.ad.model.DetectorProfile; +import com.amazon.opendistroforelasticsearch.ad.model.DetectorState; +import com.amazon.opendistroforelasticsearch.ad.model.ProfileName; + +public class AnomalyDetectorProfileRunnerTests extends ESTestCase { + private static final Logger LOG = LogManager.getLogger(AnomalyDetectorProfileRunnerTests.class); + private AnomalyDetectorProfileRunner runner; + private Client client; + private AnomalyDetector detector; + private static Set stateOnly; + private static Set stateNError; + private static String error = "No full shingle in current detection window"; + + @Override + protected NamedXContentRegistry xContentRegistry() { + SearchModule searchModule = new SearchModule(Settings.EMPTY, false, Collections.emptyList()); + List entries = searchModule.getNamedXContents(); + entries.addAll(Arrays.asList(AnomalyDetector.XCONTENT_REGISTRY, ADMetaData.XCONTENT_REGISTRY, AnomalyResult.XCONTENT_REGISTRY)); + return new NamedXContentRegistry(entries); + } + + @BeforeClass + public static void setUpOnce() { + stateOnly = new HashSet(); + stateOnly.add(ProfileName.STATE); + stateNError = new HashSet(); + stateNError.add(ProfileName.ERROR); + stateNError.add(ProfileName.STATE); + } + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + client = mock(Client.class); + runner = new AnomalyDetectorProfileRunner(client, xContentRegistry()); + } + + enum JobStatus { + INDEX_NOT_EXIT, + DISABLED, + ENABLED + } + + enum InittedEverResultStatus { + INDEX_NOT_EXIT, + GREATER_THAN_ZERO, + EMPTY, + EXCEPTION + } + + enum ErrorResultStatus { + INDEX_NOT_EXIT, + NO_ERROR, + ERROR + } + + @SuppressWarnings("unchecked") + private void setUpClientGet(boolean detectorExists, JobStatus jobStatus) throws IOException { + detector = TestHelpers.randomAnomalyDetector(TestHelpers.randomUiMetadata(), Instant.now()); + doAnswer(invocation -> { + Object[] args = invocation.getArguments(); + GetRequest request = (GetRequest) args[0]; + ActionListener listener = (ActionListener) args[1]; + + if (request.index().equals(ANOMALY_DETECTORS_INDEX)) { + if (detectorExists) { + listener.onResponse(TestHelpers.createGetResponse(detector, detector.getDetectorId())); + } else { + listener.onFailure(new IndexNotFoundException(ANOMALY_DETECTORS_INDEX)); + } + } else { + AnomalyDetectorJob job = null; + switch (jobStatus) { + case INDEX_NOT_EXIT: + listener.onFailure(new IndexNotFoundException(ANOMALY_DETECTOR_JOB_INDEX)); + break; + case DISABLED: + job = TestHelpers.randomAnomalyDetectorJob(false); + listener.onResponse(TestHelpers.createGetResponse(job, detector.getDetectorId())); + break; + case ENABLED: + job = TestHelpers.randomAnomalyDetectorJob(true); + listener.onResponse(TestHelpers.createGetResponse(job, detector.getDetectorId())); + break; + default: + assertTrue("should not reach here", false); + break; + } + } + + return null; + }).when(client).get(any(), any()); + } + + @SuppressWarnings("unchecked") + private void setUpClientSearch(InittedEverResultStatus inittedEverResultStatus, ErrorResultStatus errorResultStatus) { + doAnswer(invocation -> { + Object[] args = invocation.getArguments(); + SearchRequest request = (SearchRequest) args[0]; + ActionListener listener = (ActionListener) args[1]; + if (errorResultStatus == ErrorResultStatus.INDEX_NOT_EXIT + || inittedEverResultStatus == InittedEverResultStatus.INDEX_NOT_EXIT) { + listener.onFailure(new IndexNotFoundException(AnomalyResult.ANOMALY_RESULT_INDEX)); + return null; + } + AnomalyResult result = null; + if (request.source().query().toString().contains(AnomalyResult.ANOMALY_SCORE_FIELD)) { + switch (inittedEverResultStatus) { + case GREATER_THAN_ZERO: + result = TestHelpers.randomAnomalyDetectResult(0.87); + listener.onResponse(TestHelpers.createSearchResponse(result)); + break; + case EMPTY: + listener.onResponse(TestHelpers.createEmptySearchResponse()); + break; + case EXCEPTION: + listener.onFailure(new RuntimeException()); + break; + default: + assertTrue("should not reach here", false); + break; + } + } else { + switch (errorResultStatus) { + case NO_ERROR: + result = TestHelpers.randomAnomalyDetectResult(null); + listener.onResponse(TestHelpers.createSearchResponse(result)); + break; + case ERROR: + result = TestHelpers.randomAnomalyDetectResult(error); + listener.onResponse(TestHelpers.createSearchResponse(result)); + break; + default: + assertTrue("should not reach here", false); + break; + } + } + + return null; + }).when(client).search(any(), any()); + + } + + public void testDetectorNotExist() throws IOException, InterruptedException { + setUpClientGet(false, JobStatus.INDEX_NOT_EXIT); + final CountDownLatch inProgressLatch = new CountDownLatch(1); + + runner.profile("x123", ActionListener.wrap(response -> { + assertTrue("Should not reach here", false); + inProgressLatch.countDown(); + }, exception -> { + assertTrue(exception.getMessage().contains(AnomalyDetectorProfileRunner.FAIL_TO_FIND_DETECTOR_MSG)); + inProgressLatch.countDown(); + }), stateNError); + assertTrue(inProgressLatch.await(100, TimeUnit.SECONDS)); + } + + public void testDisabledJobIndexTemplate(JobStatus status) throws IOException, InterruptedException { + setUpClientGet(true, status); + DetectorProfile expectedProfile = new DetectorProfile(); + expectedProfile.setState(DetectorState.DISABLED); + final CountDownLatch inProgressLatch = new CountDownLatch(1); + + runner.profile(detector.getDetectorId(), ActionListener.wrap(response -> { + assertEquals(expectedProfile, response); + inProgressLatch.countDown(); + }, exception -> { + assertTrue("Should not reach here ", false); + inProgressLatch.countDown(); + }), stateOnly); + assertTrue(inProgressLatch.await(100, TimeUnit.SECONDS)); + } + + public void testNoJobIndex() throws IOException, InterruptedException { + testDisabledJobIndexTemplate(JobStatus.INDEX_NOT_EXIT); + } + + public void testJobDisabled() throws IOException, InterruptedException { + testDisabledJobIndexTemplate(JobStatus.DISABLED); + } + + public void testInitOrRunningStateTemplate(InittedEverResultStatus status, DetectorState expectedState) throws IOException, + InterruptedException { + setUpClientGet(true, JobStatus.ENABLED); + setUpClientSearch(status, ErrorResultStatus.NO_ERROR); + DetectorProfile expectedProfile = new DetectorProfile(); + expectedProfile.setState(expectedState); + final CountDownLatch inProgressLatch = new CountDownLatch(1); + + runner.profile(detector.getDetectorId(), ActionListener.wrap(response -> { + assertEquals(expectedProfile, response); + inProgressLatch.countDown(); + }, exception -> { + assertTrue("Should not reach here ", false); + inProgressLatch.countDown(); + }), stateOnly); + assertTrue(inProgressLatch.await(100, TimeUnit.SECONDS)); + } + + public void testResultNotExist() throws IOException, InterruptedException { + testInitOrRunningStateTemplate(InittedEverResultStatus.INDEX_NOT_EXIT, DetectorState.INIT); + } + + public void testResultEmpty() throws IOException, InterruptedException { + testInitOrRunningStateTemplate(InittedEverResultStatus.EMPTY, DetectorState.INIT); + } + + public void testResultGreaterThanZero() throws IOException, InterruptedException { + testInitOrRunningStateTemplate(InittedEverResultStatus.GREATER_THAN_ZERO, DetectorState.RUNNING); + } + + public void testErrorStateTemplate(InittedEverResultStatus initStatus, ErrorResultStatus status, DetectorState state, String error) + throws IOException, + InterruptedException { + setUpClientGet(true, JobStatus.ENABLED); + setUpClientSearch(initStatus, status); + DetectorProfile expectedProfile = new DetectorProfile(); + expectedProfile.setState(state); + expectedProfile.setError(error); + final CountDownLatch inProgressLatch = new CountDownLatch(1); + + runner.profile(detector.getDetectorId(), ActionListener.wrap(response -> { + assertEquals(expectedProfile, response); + inProgressLatch.countDown(); + }, exception -> { + assertTrue("Should not reach here ", false); + inProgressLatch.countDown(); + }), stateNError); + assertTrue(inProgressLatch.await(100, TimeUnit.SECONDS)); + } + + public void testInitNoError() throws IOException, InterruptedException { + testErrorStateTemplate(InittedEverResultStatus.INDEX_NOT_EXIT, ErrorResultStatus.INDEX_NOT_EXIT, DetectorState.INIT, null); + } + + public void testRunningNoError() throws IOException, InterruptedException { + testErrorStateTemplate(InittedEverResultStatus.GREATER_THAN_ZERO, ErrorResultStatus.NO_ERROR, DetectorState.RUNNING, null); + } + + public void testRunningWithError() throws IOException, InterruptedException { + testErrorStateTemplate(InittedEverResultStatus.GREATER_THAN_ZERO, ErrorResultStatus.ERROR, DetectorState.RUNNING, error); + } + + public void testInitWithError() throws IOException, InterruptedException { + testErrorStateTemplate(InittedEverResultStatus.EMPTY, ErrorResultStatus.ERROR, DetectorState.INIT, error); + } + + public void testExceptionOnStateFetching() throws IOException, InterruptedException { + setUpClientGet(true, JobStatus.ENABLED); + setUpClientSearch(InittedEverResultStatus.EXCEPTION, ErrorResultStatus.NO_ERROR); + + final CountDownLatch inProgressLatch = new CountDownLatch(1); + + runner.profile(detector.getDetectorId(), ActionListener.wrap(response -> { + assertTrue("Should not reach here ", false); + inProgressLatch.countDown(); + }, exception -> { + assertTrue("Unexcpeted exception " + exception.getMessage(), exception instanceof RuntimeException); + inProgressLatch.countDown(); + }), stateOnly); + assertTrue(inProgressLatch.await(100, TimeUnit.SECONDS)); + } +} diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/TestHelpers.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/TestHelpers.java index b85103c5..3af20295 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/TestHelpers.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/TestHelpers.java @@ -33,9 +33,13 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.util.Strings; +import org.apache.lucene.search.TotalHits; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; +import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.client.Client; import org.elasticsearch.client.Request; import org.elasticsearch.client.RequestOptions; @@ -58,14 +62,25 @@ import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.get.GetResult; +import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.SearchModule; import org.elasticsearch.search.aggregations.AggregationBuilder; import org.elasticsearch.search.aggregations.AggregatorFactories; +import org.elasticsearch.search.aggregations.InternalAggregations; import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.search.internal.InternalSearchResponse; +import org.elasticsearch.search.profile.SearchProfileShardResults; +import org.elasticsearch.search.suggest.Suggest; import org.elasticsearch.test.ClusterServiceUtils; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.rest.ESRestTestCase; @@ -82,6 +97,7 @@ import static org.elasticsearch.cluster.node.DiscoveryNodeRole.BUILT_IN_ROLES; import static org.elasticsearch.index.query.AbstractQueryBuilder.parseInnerQueryBuilder; +import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; import static org.elasticsearch.test.ESTestCase.randomAlphaOfLength; import static org.elasticsearch.test.ESTestCase.randomDouble; import static org.elasticsearch.test.ESTestCase.randomInt; @@ -297,9 +313,21 @@ public static FeatureData randomFeatureData() { } public static AnomalyResult randomAnomalyDetectResult() { + return randomAnomalyDetectResult(randomDouble(), randomAlphaOfLength(5)); + } + + public static AnomalyResult randomAnomalyDetectResult(double score) { + return randomAnomalyDetectResult(randomDouble(), null); + } + + public static AnomalyResult randomAnomalyDetectResult(String error) { + return randomAnomalyDetectResult(Double.NaN, error); + } + + public static AnomalyResult randomAnomalyDetectResult(double score, String error) { return new AnomalyResult( randomAlphaOfLength(5), - randomDouble(), + score, randomDouble(), randomDouble(), ImmutableList.of(randomFeatureData(), randomFeatureData()), @@ -307,16 +335,20 @@ public static AnomalyResult randomAnomalyDetectResult() { Instant.now().truncatedTo(ChronoUnit.SECONDS), Instant.now().truncatedTo(ChronoUnit.SECONDS), Instant.now().truncatedTo(ChronoUnit.SECONDS), - randomAlphaOfLength(5) + error ); } public static AnomalyDetectorJob randomAnomalyDetectorJob() { + return randomAnomalyDetectorJob(true); + } + + public static AnomalyDetectorJob randomAnomalyDetectorJob(boolean enabled) { return new AnomalyDetectorJob( randomAlphaOfLength(10), randomIntervalSchedule(), randomIntervalTimeConfiguration(), - true, + enabled, Instant.now().truncatedTo(ChronoUnit.SECONDS), Instant.now().truncatedTo(ChronoUnit.SECONDS), Instant.now().truncatedTo(ChronoUnit.SECONDS), @@ -406,4 +438,70 @@ public static void createIndex(RestClient client, String indexName, HttpEntity d null ); } + + public static GetResponse createGetResponse(ToXContentObject o, String id) throws IOException { + XContentBuilder content = o.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS); + + return new GetResponse( + new GetResult( + AnomalyDetector.ANOMALY_DETECTORS_INDEX, + MapperService.SINGLE_MAPPING_NAME, + id, + UNASSIGNED_SEQ_NO, + 0, + -1, + true, + BytesReference.bytes(content), + Collections.emptyMap(), + Collections.emptyMap() + ) + ); + } + + public static SearchResponse createSearchResponse(ToXContentObject o) throws IOException { + XContentBuilder content = o.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS); + + SearchHit[] hits = new SearchHit[1]; + hits[0] = new SearchHit(0).sourceRef(BytesReference.bytes(content)); + + return new SearchResponse( + new InternalSearchResponse( + new SearchHits(hits, new TotalHits(1, TotalHits.Relation.EQUAL_TO), 1.0f), + new InternalAggregations(Collections.emptyList()), + new Suggest(Collections.emptyList()), + new SearchProfileShardResults(Collections.emptyMap()), + false, + false, + 1 + ), + "", + 5, + 5, + 0, + 100, + ShardSearchFailure.EMPTY_ARRAY, + SearchResponse.Clusters.EMPTY + ); + } + + public static SearchResponse createEmptySearchResponse() throws IOException { + return new SearchResponse( + new InternalSearchResponse( + new SearchHits(new SearchHit[0], new TotalHits(0, TotalHits.Relation.EQUAL_TO), 1.0f), + new InternalAggregations(Collections.emptyList()), + new Suggest(Collections.emptyList()), + new SearchProfileShardResults(Collections.emptyMap()), + false, + false, + 1 + ), + "", + 5, + 5, + 0, + 100, + ShardSearchFailure.EMPTY_ARRAY, + SearchResponse.Clusters.EMPTY + ); + } }