From e29fcfdf6108a98d33ebd7b697544b95717508e8 Mon Sep 17 00:00:00 2001 From: Yaliang Wu Date: Tue, 29 Dec 2020 03:35:45 -0800 Subject: [PATCH] filter out exceptions which should not be counted in failure stats --- .../ad/NodeStateManager.java | 5 +- .../exception/AnomalyDetectionException.java | 33 +++- .../ad/common/exception/ClientException.java | 7 +- .../ad/common/exception/EndRunException.java | 5 + .../exception/LimitExceededException.java | 2 + .../exception/ResourceNotFoundException.java | 1 + .../ad/feature/SearchFeatureDao.java | 4 +- .../IndexAnomalyDetectorJobActionHandler.java | 10 + .../AnomalyResultTransportAction.java | 43 ++++- .../ad/ADIntegTestCase.java | 103 +++++++++++ .../ad/AnomalyDetectorRestTestCase.java | 12 +- .../ad/TestHelpers.java | 57 ++++-- .../ad/feature/SearchFeatureDaoTests.java | 2 +- .../ad/rest/AnomalyDetectorRestApiIT.java | 7 + .../AnomalyResultTransportActionTests.java | 174 ++++++++++++++++++ 15 files changed, 440 insertions(+), 25 deletions(-) create mode 100644 src/test/java/com/amazon/opendistroforelasticsearch/ad/ADIntegTestCase.java create mode 100644 src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/AnomalyResultTransportActionTests.java diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/NodeStateManager.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/NodeStateManager.java index 104be239..c30659ec 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/NodeStateManager.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/NodeStateManager.java @@ -159,7 +159,10 @@ private ActionListener onGetDetectorResponse(String adID, ActionLis AnomalyDetector detector = AnomalyDetector.parse(parser, response.getId()); // end execution if all features are disabled if (detector.getEnabledFeatureIds().isEmpty()) { - listener.onFailure(new EndRunException(adID, CommonErrorMessages.ALL_FEATURES_DISABLED_ERR_MSG, true)); + listener + .onFailure( + new EndRunException(adID, CommonErrorMessages.ALL_FEATURES_DISABLED_ERR_MSG, true).countedInStats(false) + ); return; } NodeState state = states.computeIfAbsent(adID, id -> new NodeState(id, clock)); diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/common/exception/AnomalyDetectionException.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/common/exception/AnomalyDetectionException.java index 2f06ff34..a7f1b15e 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/common/exception/AnomalyDetectionException.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/common/exception/AnomalyDetectionException.java @@ -20,7 +20,14 @@ */ public class AnomalyDetectionException extends RuntimeException { - private final String anomalyDetectorId; + private String anomalyDetectorId; + // countedInStats will be used to tell whether the exception should be + // counted in failure stats. + private boolean countedInStats = true; + + public AnomalyDetectionException(String message) { + super(message); + } /** * Constructor with an anomaly detector ID and a message. @@ -38,6 +45,10 @@ public AnomalyDetectionException(String adID, String message, Throwable cause) { this.anomalyDetectorId = adID; } + public AnomalyDetectionException(Throwable cause) { + super(cause); + } + public AnomalyDetectionException(String adID, Throwable cause) { super(cause); this.anomalyDetectorId = adID; @@ -52,6 +63,26 @@ public String getAnomalyDetectorId() { return this.anomalyDetectorId; } + /** + * Returns if the exception should be counted in stats. + * + * @return true if should count the exception in stats; otherwise return false + */ + public boolean isCountedInStats() { + return countedInStats; + } + + /** + * Set if the exception should be counted in stats. + * + * @param countInStats count the exception in stats + * @return the exception itself + */ + public AnomalyDetectionException countedInStats(boolean countInStats) { + this.countedInStats = countInStats; + return this; + } + @Override public String toString() { StringBuilder sb = new StringBuilder(); diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/common/exception/ClientException.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/common/exception/ClientException.java index 4faa2d66..e50c0dbc 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/common/exception/ClientException.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/common/exception/ClientException.java @@ -16,11 +16,14 @@ package com.amazon.opendistroforelasticsearch.ad.common.exception; /** - * All exception visible to AD transport layer's client is under ClientVisible. - * + * All exception visible to AD transport layer's client is under ClientException. */ public class ClientException extends AnomalyDetectionException { + public ClientException(String message) { + super(message); + } + public ClientException(String anomalyDetectorId, String message) { super(anomalyDetectorId, message); } diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/common/exception/EndRunException.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/common/exception/EndRunException.java index 118a6869..452f6746 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/common/exception/EndRunException.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/common/exception/EndRunException.java @@ -22,6 +22,11 @@ public class EndRunException extends ClientException { private boolean endNow; + public EndRunException(String message, boolean endNow) { + super(message); + this.endNow = endNow; + } + public EndRunException(String anomalyDetectorId, String message, boolean endNow) { super(anomalyDetectorId, message); this.endNow = endNow; diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/common/exception/LimitExceededException.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/common/exception/LimitExceededException.java index 1ee0f28e..bb155265 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/common/exception/LimitExceededException.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/common/exception/LimitExceededException.java @@ -28,6 +28,7 @@ public class LimitExceededException extends EndRunException { */ public LimitExceededException(String anomalyDetectorId, String message) { super(anomalyDetectorId, message, true); + this.countedInStats(false); } /** @@ -39,5 +40,6 @@ public LimitExceededException(String anomalyDetectorId, String message) { */ public LimitExceededException(String anomalyDetectorId, String message, boolean stopNow) { super(anomalyDetectorId, message, stopNow); + this.countedInStats(false); } } diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/common/exception/ResourceNotFoundException.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/common/exception/ResourceNotFoundException.java index e69f81fd..2f23d452 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/common/exception/ResourceNotFoundException.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/common/exception/ResourceNotFoundException.java @@ -28,5 +28,6 @@ public class ResourceNotFoundException extends AnomalyDetectionException { */ public ResourceNotFoundException(String detectorId, String message) { super(detectorId, message); + countedInStats(false); } } diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/feature/SearchFeatureDao.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/feature/SearchFeatureDao.java index bb9ba881..0ac9626d 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/feature/SearchFeatureDao.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/feature/SearchFeatureDao.java @@ -313,7 +313,9 @@ private double parseAggregation(Aggregation aggregation) { result = percentile.next().getValue(); } } - return Optional.ofNullable(result).orElseThrow(() -> new IllegalStateException("Failed to parse aggregation " + aggregation)); + return Optional + .ofNullable(result) + .orElseThrow(() -> new EndRunException("Failed to parse aggregation " + aggregation, true).countedInStats(false)); } /** diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/handler/IndexAnomalyDetectorJobActionHandler.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/handler/IndexAnomalyDetectorJobActionHandler.java index 2483893b..6ea1ecc3 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/handler/IndexAnomalyDetectorJobActionHandler.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/handler/IndexAnomalyDetectorJobActionHandler.java @@ -162,6 +162,16 @@ private void onGetAnomalyDetectorResponse(GetResponse response) throws IOExcepti ); return; } + if (detector.getEnabledFeatureIds().size() == 0) { + listener + .onFailure( + new ElasticsearchStatusException( + "Can't start detector job as no enabled features configured", + RestStatus.BAD_REQUEST + ) + ); + return; + } IntervalTimeConfiguration interval = (IntervalTimeConfiguration) detector.getDetectionInterval(); Schedule schedule = new IntervalSchedule(Instant.now(), (int) interval.getInterval(), interval.getUnit()); diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/AnomalyResultTransportAction.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/AnomalyResultTransportAction.java index 409dcdcd..46ff2fb3 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/AnomalyResultTransportAction.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/AnomalyResultTransportAction.java @@ -15,6 +15,8 @@ package com.amazon.opendistroforelasticsearch.ad.transport; +import static com.amazon.opendistroforelasticsearch.ad.constant.CommonErrorMessages.INVALID_SEARCH_QUERY_MSG; + import java.net.ConnectException; import java.util.ArrayList; import java.util.HashSet; @@ -37,6 +39,8 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListenerResponseHandler; import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.search.SearchPhaseExecutionException; +import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.action.support.IndicesOptions; @@ -55,6 +59,7 @@ import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.node.NodeClosedException; +import org.elasticsearch.rest.RestStatus; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.ActionNotFoundTransportException; @@ -228,16 +233,20 @@ protected void doExecute(Task task, ActionRequest actionRequest, ActionListener< hcDetectors.remove(adID); original.onResponse(r); }, e -> { - adStats.getStat(StatNames.AD_EXECUTE_FAIL_COUNT.getName()).increment(); - if (hcDetectors.contains(adID)) { - adStats.getStat(StatNames.AD_HC_EXECUTE_FAIL_COUNT.getName()).increment(); + // If exception is AnomalyDetectionException and it should not be counted in stats, + // we will not count it in failure stats. + if (!(e instanceof AnomalyDetectionException) || ((AnomalyDetectionException) e).isCountedInStats()) { + adStats.getStat(StatNames.AD_EXECUTE_FAIL_COUNT.getName()).increment(); + if (hcDetectors.contains(adID)) { + adStats.getStat(StatNames.AD_HC_EXECUTE_FAIL_COUNT.getName()).increment(); + } } hcDetectors.remove(adID); original.onFailure(e); }); if (!EnabledSetting.isADPluginEnabled()) { - throw new EndRunException(adID, CommonErrorMessages.DISABLED_ERR_MSG, true); + throw new EndRunException(adID, CommonErrorMessages.DISABLED_ERR_MSG, true).countedInStats(false); } adStats.getStat(StatNames.AD_EXECUTE_REQUEST_COUNT.getName()).increment(); @@ -501,7 +510,7 @@ private ActionListener onFeatureResponse( private void handleFailure(Exception exception, ActionListener listener, String adID) { if (exception instanceof IndexNotFoundException) { - listener.onFailure(new EndRunException(adID, TROUBLE_QUERYING_ERR_MSG + exception.getMessage(), true)); + listener.onFailure(new EndRunException(adID, TROUBLE_QUERYING_ERR_MSG + exception.getMessage(), true).countedInStats(false)); } else if (exception instanceof EndRunException) { // invalid feature query listener.onFailure(exception); @@ -598,12 +607,36 @@ void handleExecuteException(Exception ex, ActionListener listener.onFailure(ex); } else if (ex instanceof AnomalyDetectionException) { listener.onFailure(new InternalFailure((AnomalyDetectionException) ex)); + } else if (ex instanceof SearchPhaseExecutionException && invalidQuery((SearchPhaseExecutionException) ex)) { + // This is to catch invalid aggregation on wrong field type. For example, + // sum aggregation on text field. We should end detector run for such case. + listener + .onFailure( + new EndRunException( + adID, + INVALID_SEARCH_QUERY_MSG + ((SearchPhaseExecutionException) ex).getDetailedMessage(), + ex, + true + ).countedInStats(false) + ); } else { Throwable cause = ExceptionsHelper.unwrapCause(ex); listener.onFailure(new InternalFailure(adID, cause)); } } + private boolean invalidQuery(SearchPhaseExecutionException ex) { + boolean invalidQuery = true; + // If all shards return bad request and failure cause is IllegalArgumentException, we + // consider the feature query is invalid and will not count the error in failure stats. + for (ShardSearchFailure failure : ex.shardFailures()) { + if (RestStatus.BAD_REQUEST != failure.status() || !(failure.getCause() instanceof IllegalArgumentException)) { + invalidQuery = false; + } + } + return invalidQuery; + } + class RCFActionListener implements ActionListener { private List rcfResults; private String modelID; diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/ADIntegTestCase.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/ADIntegTestCase.java new file mode 100644 index 00000000..e54c05c1 --- /dev/null +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/ADIntegTestCase.java @@ -0,0 +1,103 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.ad; + +import static com.amazon.opendistroforelasticsearch.ad.util.RestHandlerUtils.XCONTENT_WITH_TYPE; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; +import org.elasticsearch.action.get.GetRequest; +import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.test.ESIntegTestCase; +import org.junit.Before; + +import com.amazon.opendistroforelasticsearch.ad.indices.AnomalyDetectionIndices; +import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector; + +public abstract class ADIntegTestCase extends ESIntegTestCase { + + private long timeout = 5_000; + + @Override + protected Collection> nodePlugins() { + return Collections.singletonList(AnomalyDetectorPlugin.class); + } + + @Override + protected Collection> transportClientPlugins() { + return Collections.singletonList(AnomalyDetectorPlugin.class); + } + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + } + + public void createDetectors(List detectors, boolean createIndexFirst) throws IOException { + if (createIndexFirst) { + createIndex(AnomalyDetector.ANOMALY_DETECTORS_INDEX, AnomalyDetectionIndices.getAnomalyDetectorMappings()); + } + + for (AnomalyDetector detector : detectors) { + indexDoc(AnomalyDetector.ANOMALY_DETECTORS_INDEX, detector.toXContent(XContentFactory.jsonBuilder(), XCONTENT_WITH_TYPE)); + } + } + + public void createDetectorIndex() throws IOException { + createIndex(AnomalyDetector.ANOMALY_DETECTORS_INDEX, AnomalyDetectionIndices.getAnomalyDetectorMappings()); + } + + public String createDetectors(AnomalyDetector detector) throws IOException { + return indexDoc(AnomalyDetector.ANOMALY_DETECTORS_INDEX, detector.toXContent(XContentFactory.jsonBuilder(), XCONTENT_WITH_TYPE)); + } + + public void createIndex(String indexName, String mappings) { + CreateIndexResponse createIndexResponse = TestHelpers.createIndex(admin(), indexName, mappings); + assertEquals(true, createIndexResponse.isAcknowledged()); + } + + public String indexDoc(String indexName, XContentBuilder source) { + IndexRequest indexRequest = new IndexRequest(indexName).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).source(source); + IndexResponse indexResponse = client().index(indexRequest).actionGet(timeout); + assertEquals(RestStatus.CREATED, indexResponse.status()); + return indexResponse.getId(); + } + + public String indexDoc(String indexName, Map source) { + IndexRequest indexRequest = new IndexRequest(indexName).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).source(source); + IndexResponse indexResponse = client().index(indexRequest).actionGet(timeout); + assertEquals(RestStatus.CREATED, indexResponse.status()); + return indexResponse.getId(); + } + + public GetResponse getDoc(String indexName, String id) { + GetRequest getRequest = new GetRequest(indexName).id(id); + return client().get(getRequest).actionGet(timeout); + } +} diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorRestTestCase.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorRestTestCase.java index 1c196887..6cd0372d 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorRestTestCase.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorRestTestCase.java @@ -61,11 +61,16 @@ protected Settings restClientSettings() { } protected AnomalyDetector createRandomAnomalyDetector(Boolean refresh, Boolean withMetadata) throws IOException { + return createRandomAnomalyDetector(refresh, withMetadata, true); + } + + protected AnomalyDetector createRandomAnomalyDetector(Boolean refresh, Boolean withMetadata, boolean featureEnabled) + throws IOException { Map uiMetadata = null; if (withMetadata) { uiMetadata = TestHelpers.randomUiMetadata(); } - AnomalyDetector detector = TestHelpers.randomAnomalyDetector(uiMetadata, null); + AnomalyDetector detector = TestHelpers.randomAnomalyDetector(uiMetadata, null, featureEnabled); String indexName = detector.getIndices().get(0); TestHelpers .makeRequest( @@ -185,6 +190,11 @@ public ToXContentObject[] getAnomalyDetector(String detectorId, BasicHeader head detectorJob }; } + protected Response startAnomalyDetector(String detectorId) throws IOException { + return TestHelpers + .makeRequest(client(), "POST", TestHelpers.AD_BASE_DETECTORS_URI + "/" + detectorId + "/_start", ImmutableMap.of(), "", null); + } + protected HttpEntity toHttpEntity(ToXContentObject object) throws IOException { return new StringEntity(toJsonString(object), APPLICATION_JSON); } diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/TestHelpers.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/TestHelpers.java index 7f6e2dd1..0a8c99a2 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/TestHelpers.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/TestHelpers.java @@ -19,6 +19,7 @@ import static org.elasticsearch.index.query.AbstractQueryBuilder.parseInnerQueryBuilder; import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; import static org.elasticsearch.test.ESTestCase.randomAlphaOfLength; +import static org.elasticsearch.test.ESTestCase.randomBoolean; import static org.elasticsearch.test.ESTestCase.randomDouble; import static org.elasticsearch.test.ESTestCase.randomInt; import static org.elasticsearch.test.ESTestCase.randomIntBetween; @@ -49,11 +50,13 @@ import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; +import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.admin.indices.mapping.get.GetFieldMappingsResponse.FieldMappingMetadata; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.ShardSearchFailure; +import org.elasticsearch.client.AdminClient; import org.elasticsearch.client.Client; import org.elasticsearch.client.Request; import org.elasticsearch.client.RequestOptions; @@ -194,13 +197,23 @@ public static AnomalyDetector randomAnomalyDetector(Map uiMetada public static AnomalyDetector randomAnomalyDetector(List features, Map uiMetadata, Instant lastUpdateTime) throws IOException { + List indices = ImmutableList.of(randomAlphaOfLength(10).toLowerCase()); + return randomAnomalyDetector(indices, features, uiMetadata, lastUpdateTime); + } + + public static AnomalyDetector randomAnomalyDetector( + List indices, + List features, + Map uiMetadata, + Instant lastUpdateTime + ) throws IOException { return new AnomalyDetector( randomAlphaOfLength(10), randomLong(), randomAlphaOfLength(20), randomAlphaOfLength(30), randomAlphaOfLength(5), - ImmutableList.of(randomAlphaOfLength(10).toLowerCase()), + indices, features, randomQuery(), randomIntervalTimeConfiguration(), @@ -321,6 +334,10 @@ public static QueryBuilder randomQuery() throws IOException { String query = "{\"bool\":{\"must\":{\"term\":{\"user\":\"kimchy\"}},\"filter\":{\"term\":{\"tag\":" + "\"tech\"}},\"must_not\":{\"range\":{\"age\":{\"gte\":10,\"lte\":20}}},\"should\":[{\"term\":" + "{\"tag\":\"wow\"}},{\"term\":{\"tag\":\"elasticsearch\"}}],\"minimum_should_match\":1,\"boost\":1}}"; + return randomQuery(query); + } + + public static QueryBuilder randomQuery(String query) throws IOException { XContentParser parser = TestHelpers.parser(query); return parseInnerQueryBuilder(parser); } @@ -336,6 +353,22 @@ public static AggregationBuilder randomAggregation(String aggregationName) throw return parsed.getAggregatorFactories().iterator().next(); } + /** + * Parse string aggregation query into {@link AggregationBuilder} + * Sample input: + * "{\"test\":{\"value_count\":{\"field\":\"ok\"}}}" + * + * @param aggregationQuery aggregation builder + * @return aggregation builder + * @throws IOException IO exception + */ + public static AggregationBuilder parseAggregation(String aggregationQuery) throws IOException { + XContentParser parser = parser(aggregationQuery); + + AggregatorFactories.Builder parsed = AggregatorFactories.parseAggregators(parser); + return parsed.getAggregatorFactories().iterator().next(); + } + public static Map randomUiMetadata() { return ImmutableMap.of(randomAlphaOfLength(5), randomFeature()); } @@ -353,24 +386,17 @@ public static IntervalSchedule randomIntervalSchedule() { } public static Feature randomFeature() { - return randomFeature(randomAlphaOfLength(5), randomAlphaOfLength(5)); - } - - public static Feature randomFeature(String featureName, String aggregationName) { - AggregationBuilder testAggregation = null; - try { - testAggregation = randomAggregation(aggregationName); - } catch (IOException e) { - logger.error("Fail to generate test aggregation"); - throw new RuntimeException(); - } - return new Feature(randomAlphaOfLength(5), featureName, ESRestTestCase.randomBoolean(), testAggregation); + return randomFeature(randomAlphaOfLength(5), randomAlphaOfLength(5), randomBoolean()); } public static Feature randomFeature(boolean enabled) { return randomFeature(randomAlphaOfLength(5), randomAlphaOfLength(5), enabled); } + public static Feature randomFeature(String featureName, String aggregationName) { + return randomFeature(featureName, aggregationName, randomBoolean()); + } + public static Feature randomFeature(String featureName, String aggregationName, boolean enabled) { AggregationBuilder testAggregation = null; try { @@ -566,6 +592,11 @@ public static ThreadPool createThreadPool() { return pool; } + public static CreateIndexResponse createIndex(AdminClient adminClient, String indexName, String indexMapping) { + CreateIndexRequest request = new CreateIndexRequest(indexName).mapping(AnomalyDetector.TYPE, indexMapping, XContentType.JSON); + return adminClient.indices().create(request).actionGet(5_000); + } + public static void createIndex(RestClient client, String indexName, HttpEntity data) throws IOException { TestHelpers .makeRequest( diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/feature/SearchFeatureDaoTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/feature/SearchFeatureDaoTests.java index fb8c11e2..0b9fd53e 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/feature/SearchFeatureDaoTests.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/feature/SearchFeatureDaoTests.java @@ -418,7 +418,7 @@ private Object[] getFeaturesForPeriodThrowIllegalStateData() { new Object[] { asList(multiBucket), asList(aggName), null }, }; } - @Test(expected = IllegalStateException.class) + @Test(expected = EndRunException.class) @Parameters(method = "getFeaturesForPeriodThrowIllegalStateData") public void getFeaturesForPeriod_throwIllegalState_forUnknownAggregation( List aggs, diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/rest/AnomalyDetectorRestApiIT.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/rest/AnomalyDetectorRestApiIT.java index f4975315..6b39b3b5 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/rest/AnomalyDetectorRestApiIT.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/rest/AnomalyDetectorRestApiIT.java @@ -1078,4 +1078,11 @@ public void testSearchAnomalyDetectorMatch() throws Exception { boolean nameExists = (boolean) responseMap.get("match"); assertEquals(nameExists, false); } + + public void testRunDetectorWithNoEnabledFeature() throws Exception { + AnomalyDetector detector = createRandomAnomalyDetector(true, true, false); + assertNotNull(detector.getDetectorId()); + ResponseException e = expectThrows(ResponseException.class, () -> startAnomalyDetector(detector.getDetectorId())); + assertTrue(e.getMessage().contains("Can't start detector job as no enabled features configured")); + } } diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/AnomalyResultTransportActionTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/AnomalyResultTransportActionTests.java new file mode 100644 index 00000000..38f025d9 --- /dev/null +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/AnomalyResultTransportActionTests.java @@ -0,0 +1,174 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.ad.transport; + +import static com.amazon.opendistroforelasticsearch.ad.TestHelpers.randomQuery; + +import java.io.IOException; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.List; +import java.util.Map; + +import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.common.io.stream.NotSerializableExceptionWrapper; +import org.elasticsearch.search.aggregations.AggregationBuilder; +import org.elasticsearch.test.rest.ESRestTestCase; +import org.junit.Before; + +import com.amazon.opendistroforelasticsearch.ad.ADIntegTestCase; +import com.amazon.opendistroforelasticsearch.ad.TestHelpers; +import com.amazon.opendistroforelasticsearch.ad.common.exception.AnomalyDetectionException; +import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector; +import com.amazon.opendistroforelasticsearch.ad.model.Feature; +import com.amazon.opendistroforelasticsearch.ad.model.IntervalTimeConfiguration; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; + +public class AnomalyResultTransportActionTests extends ADIntegTestCase { + private String testIndex; + private Instant testDataTimeStamp; + private long start; + private long end; + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + testIndex = "test_data"; + testDataTimeStamp = Instant.now(); + start = testDataTimeStamp.minus(10, ChronoUnit.MINUTES).toEpochMilli(); + end = testDataTimeStamp.plus(10, ChronoUnit.MINUTES).toEpochMilli(); + ingestTestData(); + } + + private void ingestTestData() throws IOException { + String mappings = "{\"properties\":{\"timestamp\":{\"type\":\"date\",\"format\":\"strict_date_time||epoch_millis\"}," + + "\"value\":{\"type\":\"double\"}, \"type\":{\"type\":\"keyword\"}," + + "\"is_error\":{\"type\":\"boolean\"}, \"message\":{\"type\":\"text\"}}}"; + createIndex(testIndex, mappings); + double value = randomDouble(); + String type = randomAlphaOfLength(5); + boolean isError = randomBoolean(); + String message = randomAlphaOfLength(10); + String id = indexDoc( + testIndex, + ImmutableMap + .of("timestamp", testDataTimeStamp.toEpochMilli(), "value", value, "type", type, "is_error", isError, "message", message) + ); + GetResponse doc = getDoc(testIndex, id); + Map sourceAsMap = doc.getSourceAsMap(); + assertEquals(testDataTimeStamp.toEpochMilli(), sourceAsMap.get("timestamp")); + assertEquals(value, sourceAsMap.get("value")); + assertEquals(type, sourceAsMap.get("type")); + assertEquals(isError, sourceAsMap.get("is_error")); + assertEquals(message, sourceAsMap.get("message")); + createDetectorIndex(); + } + + public void testFeatureQueryWithTermsAggregation() throws IOException { + String adId = createDetectorWithFeatureAgg("{\"test\":{\"terms\":{\"field\":\"type\"}}}"); + assertErrorMessage(adId, "Failed to parse aggregation"); + } + + public void testFeatureWithSumOfTextField() throws IOException { + String adId = createDetectorWithFeatureAgg("{\"test\":{\"sum\":{\"field\":\"message\"}}}"); + assertErrorMessage(adId, "Text fields are not optimised for operations"); + } + + public void testFeatureWithSumOfTypeField() throws IOException { + String adId = createDetectorWithFeatureAgg("{\"test\":{\"sum\":{\"field\":\"type\"}}}"); + assertErrorMessage(adId, "is not supported for aggregation [sum]"); + } + + public void testFeatureWithMaxOfTextField() throws IOException { + String adId = createDetectorWithFeatureAgg("{\"test\":{\"max\":{\"field\":\"message\"}}}"); + assertErrorMessage(adId, "Text fields are not optimised for operations"); + } + + public void testFeatureWithMaxOfTypeField() throws IOException { + String adId = createDetectorWithFeatureAgg("{\"test\":{\"max\":{\"field\":\"type\"}}}"); + assertErrorMessage(adId, "is not supported for aggregation [max]"); + } + + public void testFeatureWithMinOfTextField() throws IOException { + String adId = createDetectorWithFeatureAgg("{\"test\":{\"min\":{\"field\":\"message\"}}}"); + assertErrorMessage(adId, "Text fields are not optimised for operations"); + } + + public void testFeatureWithMinOfTypeField() throws IOException { + String adId = createDetectorWithFeatureAgg("{\"test\":{\"min\":{\"field\":\"type\"}}}"); + assertErrorMessage(adId, "is not supported for aggregation [min]"); + } + + public void testFeatureWithAvgOfTextField() throws IOException { + String adId = createDetectorWithFeatureAgg("{\"test\":{\"avg\":{\"field\":\"message\"}}}"); + assertErrorMessage(adId, "Text fields are not optimised for operations"); + } + + public void testFeatureWithAvgOfTypeField() throws IOException { + String adId = createDetectorWithFeatureAgg("{\"test\":{\"avg\":{\"field\":\"type\"}}}"); + assertErrorMessage(adId, "is not supported for aggregation [avg]"); + } + + public void testFeatureWithCountOfTextField() throws IOException { + String adId = createDetectorWithFeatureAgg("{\"test\":{\"value_count\":{\"field\":\"message\"}}}"); + assertErrorMessage(adId, "Text fields are not optimised for operations"); + } + + public void testFeatureWithCardinalityOfTextField() throws IOException { + String adId = createDetectorWithFeatureAgg("{\"test\":{\"cardinality\":{\"field\":\"message\"}}}"); + assertErrorMessage(adId, "Text fields are not optimised for operations"); + } + + private String createDetectorWithFeatureAgg(String aggQuery) throws IOException { + AggregationBuilder aggregationBuilder = TestHelpers.parseAggregation(aggQuery); + Feature feature = new Feature(randomAlphaOfLength(5), randomAlphaOfLength(10), true, aggregationBuilder); + AnomalyDetector detector = randomDetector(ImmutableList.of(testIndex), ImmutableList.of(feature)); + String adId = createDetectors(detector); + return adId; + } + + private AnomalyDetector randomDetector(List indices, List features) throws IOException { + return new AnomalyDetector( + randomAlphaOfLength(10), + randomLong(), + randomAlphaOfLength(20), + randomAlphaOfLength(30), + "timestamp", + indices, + features, + randomQuery("{\"bool\":{\"filter\":[{\"exists\":{\"field\":\"value\"}}]}}"), + new IntervalTimeConfiguration(ESRestTestCase.randomLongBetween(1, 5), ChronoUnit.MINUTES), + new IntervalTimeConfiguration(ESRestTestCase.randomLongBetween(1, 5), ChronoUnit.MINUTES), + 8, + null, + randomInt(), + Instant.now(), + null, + null + ); + } + + private void assertErrorMessage(String adId, String errorMessage) { + AnomalyResultRequest resultRequest = new AnomalyResultRequest(adId, start, end); + RuntimeException e = expectThrowsAnyOf( + ImmutableList.of(NotSerializableExceptionWrapper.class, AnomalyDetectionException.class), + () -> client().execute(AnomalyResultAction.INSTANCE, resultRequest).actionGet(30_000) + ); + assertTrue(e.getMessage().contains(errorMessage)); + } +}