Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

backport change: filter out exceptions which should not be counted in failure stats #347

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,10 @@ private ActionListener<GetResponse> onGetDetectorResponse(String adID, ActionLis
AnomalyDetector detector = AnomalyDetector.parse(parser, response.getId());
// end execution if all features are disabled
if (detector.getEnabledFeatureIds().isEmpty()) {
listener.onFailure(new EndRunException(adID, CommonErrorMessages.ALL_FEATURES_DISABLED_ERR_MSG, true));
listener
.onFailure(
new EndRunException(adID, CommonErrorMessages.ALL_FEATURES_DISABLED_ERR_MSG, true).countedInStats(false)
);
return;
}
NodeState state = states.computeIfAbsent(adID, id -> new NodeState(id, clock));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,14 @@
*/
public class AnomalyDetectionException extends RuntimeException {

private final String anomalyDetectorId;
private String anomalyDetectorId;
// countedInStats will be used to tell whether the exception should be
// counted in failure stats.
private boolean countedInStats = true;

public AnomalyDetectionException(String message) {
super(message);
}

/**
* Constructor with an anomaly detector ID and a message.
Expand All @@ -38,6 +45,10 @@ public AnomalyDetectionException(String adID, String message, Throwable cause) {
this.anomalyDetectorId = adID;
}

public AnomalyDetectionException(Throwable cause) {
super(cause);
}

public AnomalyDetectionException(String adID, Throwable cause) {
super(cause);
this.anomalyDetectorId = adID;
Expand All @@ -52,6 +63,26 @@ public String getAnomalyDetectorId() {
return this.anomalyDetectorId;
}

/**
* Returns if the exception should be counted in stats.
*
* @return true if should count the exception in stats; otherwise return false
*/
public boolean isCountedInStats() {
return countedInStats;
}

/**
* Set if the exception should be counted in stats.
*
* @param countInStats count the exception in stats
* @return the exception itself
*/
public AnomalyDetectionException countedInStats(boolean countInStats) {
this.countedInStats = countInStats;
return this;
}

@Override
public String toString() {
StringBuilder sb = new StringBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@
package com.amazon.opendistroforelasticsearch.ad.common.exception;

/**
* All exception visible to AD transport layer's client is under ClientVisible.
*
* All exception visible to AD transport layer's client is under ClientException.
*/
public class ClientException extends AnomalyDetectionException {

public ClientException(String message) {
super(message);
}

public ClientException(String anomalyDetectorId, String message) {
super(anomalyDetectorId, message);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@
public class EndRunException extends ClientException {
private boolean endNow;

public EndRunException(String message, boolean endNow) {
super(message);
this.endNow = endNow;
}

public EndRunException(String anomalyDetectorId, String message, boolean endNow) {
super(anomalyDetectorId, message);
this.endNow = endNow;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public class LimitExceededException extends EndRunException {
*/
public LimitExceededException(String anomalyDetectorId, String message) {
super(anomalyDetectorId, message, true);
this.countedInStats(false);
}

/**
Expand All @@ -39,5 +40,6 @@ public LimitExceededException(String anomalyDetectorId, String message) {
*/
public LimitExceededException(String anomalyDetectorId, String message, boolean stopNow) {
super(anomalyDetectorId, message, stopNow);
this.countedInStats(false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,6 @@ public class ResourceNotFoundException extends AnomalyDetectionException {
*/
public ResourceNotFoundException(String detectorId, String message) {
super(detectorId, message);
countedInStats(false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,9 @@ private double parseAggregation(Aggregation aggregation) {
result = percentile.next().getValue();
}
}
return Optional.ofNullable(result).orElseThrow(() -> new IllegalStateException("Failed to parse aggregation " + aggregation));
return Optional
.ofNullable(result)
.orElseThrow(() -> new EndRunException("Failed to parse aggregation " + aggregation, true).countedInStats(false));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,16 @@ private void onGetAnomalyDetectorResponse(GetResponse response) throws IOExcepti
);
return;
}
if (detector.getEnabledFeatureIds().size() == 0) {
listener
.onFailure(
new ElasticsearchStatusException(
"Can't start detector job as no enabled features configured",
RestStatus.BAD_REQUEST
)
);
return;
}

IntervalTimeConfiguration interval = (IntervalTimeConfiguration) detector.getDetectionInterval();
Schedule schedule = new IntervalSchedule(Instant.now(), (int) interval.getInterval(), interval.getUnit());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

package com.amazon.opendistroforelasticsearch.ad.transport;

import static com.amazon.opendistroforelasticsearch.ad.constant.CommonErrorMessages.INVALID_SEARCH_QUERY_MSG;

import java.net.ConnectException;
import java.util.ArrayList;
import java.util.HashSet;
Expand All @@ -37,6 +39,8 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.IndicesOptions;
Expand All @@ -55,6 +59,7 @@
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ActionNotFoundTransportException;
Expand Down Expand Up @@ -228,16 +233,20 @@ protected void doExecute(Task task, ActionRequest actionRequest, ActionListener<
hcDetectors.remove(adID);
original.onResponse(r);
}, e -> {
adStats.getStat(StatNames.AD_EXECUTE_FAIL_COUNT.getName()).increment();
if (hcDetectors.contains(adID)) {
adStats.getStat(StatNames.AD_HC_EXECUTE_FAIL_COUNT.getName()).increment();
// If exception is AnomalyDetectionException and it should not be counted in stats,
// we will not count it in failure stats.
if (!(e instanceof AnomalyDetectionException) || ((AnomalyDetectionException) e).isCountedInStats()) {
adStats.getStat(StatNames.AD_EXECUTE_FAIL_COUNT.getName()).increment();
if (hcDetectors.contains(adID)) {
adStats.getStat(StatNames.AD_HC_EXECUTE_FAIL_COUNT.getName()).increment();
}
}
hcDetectors.remove(adID);
original.onFailure(e);
});

if (!EnabledSetting.isADPluginEnabled()) {
throw new EndRunException(adID, CommonErrorMessages.DISABLED_ERR_MSG, true);
throw new EndRunException(adID, CommonErrorMessages.DISABLED_ERR_MSG, true).countedInStats(false);
}

adStats.getStat(StatNames.AD_EXECUTE_REQUEST_COUNT.getName()).increment();
Expand Down Expand Up @@ -501,7 +510,7 @@ private ActionListener<SinglePointFeatures> onFeatureResponse(

private void handleFailure(Exception exception, ActionListener<AnomalyResultResponse> listener, String adID) {
if (exception instanceof IndexNotFoundException) {
listener.onFailure(new EndRunException(adID, TROUBLE_QUERYING_ERR_MSG + exception.getMessage(), true));
listener.onFailure(new EndRunException(adID, TROUBLE_QUERYING_ERR_MSG + exception.getMessage(), true).countedInStats(false));
} else if (exception instanceof EndRunException) {
// invalid feature query
listener.onFailure(exception);
Expand Down Expand Up @@ -598,12 +607,36 @@ void handleExecuteException(Exception ex, ActionListener<AnomalyResultResponse>
listener.onFailure(ex);
} else if (ex instanceof AnomalyDetectionException) {
listener.onFailure(new InternalFailure((AnomalyDetectionException) ex));
} else if (ex instanceof SearchPhaseExecutionException && invalidQuery((SearchPhaseExecutionException) ex)) {
// This is to catch invalid aggregation on wrong field type. For example,
// sum aggregation on text field. We should end detector run for such case.
listener
.onFailure(
new EndRunException(
adID,
INVALID_SEARCH_QUERY_MSG + ((SearchPhaseExecutionException) ex).getDetailedMessage(),
ex,
true
).countedInStats(false)
);
} else {
Throwable cause = ExceptionsHelper.unwrapCause(ex);
listener.onFailure(new InternalFailure(adID, cause));
}
}

private boolean invalidQuery(SearchPhaseExecutionException ex) {
boolean invalidQuery = true;
// If all shards return bad request and failure cause is IllegalArgumentException, we
// consider the feature query is invalid and will not count the error in failure stats.
for (ShardSearchFailure failure : ex.shardFailures()) {
if (RestStatus.BAD_REQUEST != failure.status() || !(failure.getCause() instanceof IllegalArgumentException)) {
invalidQuery = false;
}
}
return invalidQuery;
}

class RCFActionListener implements ActionListener<RCFResultResponse> {
private List<RCFResultResponse> rcfResults;
private String modelID;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.ad;

import static com.amazon.opendistroforelasticsearch.ad.util.RestHandlerUtils.XCONTENT_WITH_TYPE;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;

import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ESIntegTestCase;
import org.junit.Before;

import com.amazon.opendistroforelasticsearch.ad.indices.AnomalyDetectionIndices;
import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector;

public abstract class ADIntegTestCase extends ESIntegTestCase {

private long timeout = 5_000;

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Collections.singletonList(AnomalyDetectorPlugin.class);
}

@Override
protected Collection<Class<? extends Plugin>> transportClientPlugins() {
return Collections.singletonList(AnomalyDetectorPlugin.class);
}

@Override
@Before
public void setUp() throws Exception {
super.setUp();
}

public void createDetectors(List<AnomalyDetector> detectors, boolean createIndexFirst) throws IOException {
if (createIndexFirst) {
createIndex(AnomalyDetector.ANOMALY_DETECTORS_INDEX, AnomalyDetectionIndices.getAnomalyDetectorMappings());
}

for (AnomalyDetector detector : detectors) {
indexDoc(AnomalyDetector.ANOMALY_DETECTORS_INDEX, detector.toXContent(XContentFactory.jsonBuilder(), XCONTENT_WITH_TYPE));
}
}

public void createDetectorIndex() throws IOException {
createIndex(AnomalyDetector.ANOMALY_DETECTORS_INDEX, AnomalyDetectionIndices.getAnomalyDetectorMappings());
}

public String createDetectors(AnomalyDetector detector) throws IOException {
return indexDoc(AnomalyDetector.ANOMALY_DETECTORS_INDEX, detector.toXContent(XContentFactory.jsonBuilder(), XCONTENT_WITH_TYPE));
}

public void createIndex(String indexName, String mappings) {
CreateIndexResponse createIndexResponse = TestHelpers.createIndex(admin(), indexName, mappings);
assertEquals(true, createIndexResponse.isAcknowledged());
}

public String indexDoc(String indexName, XContentBuilder source) {
IndexRequest indexRequest = new IndexRequest(indexName).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).source(source);
IndexResponse indexResponse = client().index(indexRequest).actionGet(timeout);
assertEquals(RestStatus.CREATED, indexResponse.status());
return indexResponse.getId();
}

public String indexDoc(String indexName, Map<String, ?> source) {
IndexRequest indexRequest = new IndexRequest(indexName).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).source(source);
IndexResponse indexResponse = client().index(indexRequest).actionGet(timeout);
assertEquals(RestStatus.CREATED, indexResponse.status());
return indexResponse.getId();
}

public GetResponse getDoc(String indexName, String id) {
GetRequest getRequest = new GetRequest(indexName).id(id);
return client().get(getRequest).actionGet(timeout);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,16 @@ protected Settings restClientSettings() {
}

protected AnomalyDetector createRandomAnomalyDetector(Boolean refresh, Boolean withMetadata) throws IOException {
return createRandomAnomalyDetector(refresh, withMetadata, true);
}

protected AnomalyDetector createRandomAnomalyDetector(Boolean refresh, Boolean withMetadata, boolean featureEnabled)
throws IOException {
Map<String, Object> uiMetadata = null;
if (withMetadata) {
uiMetadata = TestHelpers.randomUiMetadata();
}
AnomalyDetector detector = TestHelpers.randomAnomalyDetector(uiMetadata, null);
AnomalyDetector detector = TestHelpers.randomAnomalyDetector(uiMetadata, null, featureEnabled);
String indexName = detector.getIndices().get(0);
TestHelpers
.makeRequest(
Expand Down Expand Up @@ -185,6 +190,11 @@ public ToXContentObject[] getAnomalyDetector(String detectorId, BasicHeader head
detectorJob };
}

protected Response startAnomalyDetector(String detectorId) throws IOException {
return TestHelpers
.makeRequest(client(), "POST", TestHelpers.AD_BASE_DETECTORS_URI + "/" + detectorId + "/_start", ImmutableMap.of(), "", null);
}

protected HttpEntity toHttpEntity(ToXContentObject object) throws IOException {
return new StringEntity(toJsonString(object), APPLICATION_JSON);
}
Expand Down
Loading