Skip to content

Commit

Permalink
commit e5b6ce5
Browse files Browse the repository at this point in the history
Author: Kaituo Li <kaituo@amazon.com>
Date:   Wed Apr 15 15:45:13 2020 -0700

    Add state and error to profile API (opendistro-for-elasticsearch#84)

    * Add state and error to profile API

    We want to make it easy for customers and oncalls to identify a detector’s state and error if any. This PR adds such information to our new profile API.

    We expect three kinds of states:
    -Disabled: if get ad job api says the job is disabled;
    -Init: if anomaly score after the last update time of the detector is larger than 0
    -Running: if neither of the above applies and no exceptions.

    Error is populated if error of the latest anomaly result is not empty.

    Testing done:
    -manual testing during a detector’s life cycle: not created, created but not started, started, during initialization, after initialization, stopped, restarted
    -added unit tests to cover above scenario

commit 0c33050
Author: Kaituo Li <kaituo@amazon.com>
Date:   Tue Apr 14 11:52:20 2020 -0700

    Use callbacks and bug fix (opendistro-for-elasticsearch#83)

    * Use callbacks and bug fix

    This PR includes the following changes:

    1. remove classes that are not needed in jacocoExclusions since we have enough coverage for those classes.
    2. Use ClientUtil instead of Elasticsearch’s client in AD job runner
    3. Use one function to get the number of partitioned forests. Previously, we have redundant code in both ModelManager and ADStateManager.
    4. Change ADStateManager.getAnomalyDetector to use callback.
    5. Change AnomalyResultTransportAction to use callback to get features.
    6. Add in AnomalyResultTransportAction to handle the case where all features have been disabled, and users' index does not exist.
    7. Change get RCF and threshold result methods to use callback and add exception handling of IndexNotFoundException due to the change. Previously, getting RCF and threshold result methods won’t throw IndexNotFoundException.
    8. Remove unused fields in StopDetectorTransportAction and AnomalyResultTransportAction
    9. Unwrap EsRejectedExecutionException as it can be nested inside RemoteTransportException. Previously, we would not recognize EsRejectedExecutionException and thus miss anomaly results write retrying.
    10. Add error in anomaly result schema.11. Fix broken tests due to my changes.

    Testing done:
    1. unit/integration tests pass
    2. do end-to-end testing and make sure my fix achieves the purpose 
       * timeout issue is gone 
       * when all features have been disabled or index does not exist, we will retry a few more times and disable AD jobs.
  • Loading branch information
vamshin committed Apr 16, 2020
1 parent e9d4a46 commit b541442
Show file tree
Hide file tree
Showing 34 changed files with 1,583 additions and 380 deletions.
18 changes: 6 additions & 12 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -207,23 +207,17 @@ List<String> jacocoExclusions = [
'com.amazon.opendistroforelasticsearch.ad.common.exception.AnomalyDetectionException',
'com.amazon.opendistroforelasticsearch.ad.util.ClientUtil',

'com.amazon.opendistroforelasticsearch.ad.ml.*',
'com.amazon.opendistroforelasticsearch.ad.feature.*',
'com.amazon.opendistroforelasticsearch.ad.dataprocessor.*',
'com.amazon.opendistroforelasticsearch.ad.AnomalyDetectorRunner',
'com.amazon.opendistroforelasticsearch.ad.resthandler.RestGetAnomalyResultAction',
'com.amazon.opendistroforelasticsearch.ad.metrics.MetricFactory',
'com.amazon.opendistroforelasticsearch.ad.indices.AnomalyDetectionIndices',
'com.amazon.opendistroforelasticsearch.ad.transport.ForwardAction',
'com.amazon.opendistroforelasticsearch.ad.transport.ForwardTransportAction',
'com.amazon.opendistroforelasticsearch.ad.transport.StopDetectorAction',
'com.amazon.opendistroforelasticsearch.ad.transport.StopDetectorRequest',
'com.amazon.opendistroforelasticsearch.ad.transport.StopDetectorResponse',
'com.amazon.opendistroforelasticsearch.ad.transport.StopDetectorTransportAction',
'com.amazon.opendistroforelasticsearch.ad.transport.ADStatsAction',
'com.amazon.opendistroforelasticsearch.ad.transport.CronRequest',
'com.amazon.opendistroforelasticsearch.ad.transport.DeleteDetectorAction',
'com.amazon.opendistroforelasticsearch.ad.util.ParseUtils'
'com.amazon.opendistroforelasticsearch.ad.transport.CronTransportAction',
'com.amazon.opendistroforelasticsearch.ad.transport.CronRequest',
'com.amazon.opendistroforelasticsearch.ad.transport.ADStatsAction',
'com.amazon.opendistroforelasticsearch.ad.AnomalyDetectorRunner',
'com.amazon.opendistroforelasticsearch.ad.indices.AnomalyDetectionIndices',
'com.amazon.opendistroforelasticsearch.ad.util.ParseUtils',
]

jacocoTestCoverageVerification {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.amazon.opendistroforelasticsearch.ad.transport.AnomalyResultResponse;
import com.amazon.opendistroforelasticsearch.ad.transport.AnomalyResultTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.handler.AnomalyResultHandler;
import com.amazon.opendistroforelasticsearch.ad.util.ClientUtil;
import com.amazon.opendistroforelasticsearch.jobscheduler.spi.JobExecutionContext;
import com.amazon.opendistroforelasticsearch.jobscheduler.spi.LockModel;
import com.amazon.opendistroforelasticsearch.jobscheduler.spi.ScheduledJobParameter;
Expand All @@ -39,7 +40,9 @@
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -71,6 +74,7 @@ public class AnomalyDetectorJobRunner implements ScheduledJobRunner {
private Settings settings;
private int maxRetryForEndRunException;
private Client client;
private ClientUtil clientUtil;
private ThreadPool threadPool;
private AnomalyResultHandler anomalyResultHandler;
private ConcurrentHashMap<String, Integer> detectorEndRunExceptionCount;
Expand All @@ -97,6 +101,10 @@ public void setClient(Client client) {
this.client = client;
}

public void setClientUtil(ClientUtil clientUtil) {
this.clientUtil = clientUtil;
}

public void setThreadPool(ThreadPool threadPool) {
this.threadPool = threadPool;
}
Expand Down Expand Up @@ -258,7 +266,7 @@ protected void handleAdException(
) {
String detectorId = jobParameter.getName();
if (exception instanceof EndRunException) {
log.error("EndRunException happened when executed anomaly result action for " + detectorId, exception);
log.error("EndRunException happened when executing anomaly result action for " + detectorId, exception);

if (((EndRunException) exception).isEndNow()) {
// Stop AD job if EndRunException shows we should end job now.
Expand Down Expand Up @@ -349,9 +357,8 @@ private void stopAdJob(String detectorId) {
try {
GetRequest getRequest = new GetRequest(AnomalyDetectorJob.ANOMALY_DETECTOR_JOB_INDEX).id(detectorId);

client.get(getRequest, ActionListener.wrap(response -> {
clientUtil.<GetRequest, GetResponse>asyncRequest(getRequest, client::get, ActionListener.wrap(response -> {
if (response.isExists()) {
String s = response.getSourceAsString();
try (
XContentParser parser = XContentType.JSON
.xContent()
Expand All @@ -374,14 +381,19 @@ private void stopAdJob(String detectorId) {
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.source(newJob.toXContent(XContentBuilder.builder(XContentType.JSON.xContent()), XCONTENT_WITH_TYPE))
.id(detectorId);
client.index(indexRequest, ActionListener.wrap(indexResponse -> {
if (indexResponse != null
&& (indexResponse.getResult() == CREATED || indexResponse.getResult() == UPDATED)) {
log.info("AD Job was disabled by JobRunner for " + detectorId);
} else {
log.warn("Failed to disable AD job for " + detectorId);
}
}, exception -> log.error("JobRunner failed to update AD job as disabled for " + detectorId, exception)));
clientUtil
.<IndexRequest, IndexResponse>asyncRequest(
indexRequest,
client::index,
ActionListener.wrap(indexResponse -> {
if (indexResponse != null
&& (indexResponse.getResult() == CREATED || indexResponse.getResult() == UPDATED)) {
log.info("AD Job was disabled by JobRunner for " + detectorId);
} else {
log.warn("Failed to disable AD job for " + detectorId);
}
}, exception -> log.error("JobRunner failed to update AD job as disabled for " + detectorId, exception))
);
}
} catch (IOException e) {
log.error("JobRunner failed to stop detector job " + detectorId, e);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
Expand Down Expand Up @@ -36,6 +36,7 @@
import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector;
import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetectorJob;
import com.amazon.opendistroforelasticsearch.ad.model.AnomalyResult;
import com.amazon.opendistroforelasticsearch.ad.model.ProfileName;
import com.amazon.opendistroforelasticsearch.ad.rest.RestAnomalyDetectorJobAction;
import com.amazon.opendistroforelasticsearch.ad.rest.RestDeleteAnomalyDetectorAction;
import com.amazon.opendistroforelasticsearch.ad.rest.RestExecuteAnomalyDetectorAction;
Expand Down Expand Up @@ -141,8 +142,9 @@ public class AnomalyDetectorPlugin extends Plugin implements ActionPlugin, Scrip
private Client client;
private ClusterService clusterService;
private ThreadPool threadPool;
private IndexNameExpressionResolver indexNameExpressionResolver;
private ADStats adStats;
private NamedXContentRegistry xContentRegistry;
private ClientUtil clientUtil;

static {
SpecialPermission.check();
Expand All @@ -163,7 +165,6 @@ public List<RestHandler> getRestHandlers(
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<DiscoveryNodes> nodesInCluster
) {
this.indexNameExpressionResolver = indexNameExpressionResolver;
AnomalyResultHandler anomalyResultHandler = new AnomalyResultHandler(
client,
settings,
Expand All @@ -174,11 +175,17 @@ public List<RestHandler> getRestHandlers(
);
AnomalyDetectorJobRunner jobRunner = AnomalyDetectorJobRunner.getJobRunnerInstance();
jobRunner.setClient(client);
jobRunner.setClientUtil(clientUtil);
jobRunner.setThreadPool(threadPool);
jobRunner.setAnomalyResultHandler(anomalyResultHandler);
jobRunner.setSettings(settings);

RestGetAnomalyDetectorAction restGetAnomalyDetectorAction = new RestGetAnomalyDetectorAction(restController);
AnomalyDetectorProfileRunner profileRunner = new AnomalyDetectorProfileRunner(client, this.xContentRegistry);
RestGetAnomalyDetectorAction restGetAnomalyDetectorAction = new RestGetAnomalyDetectorAction(
restController,
profileRunner,
ProfileName.getNames()
);
RestIndexAnomalyDetectorAction restIndexAnomalyDetectorAction = new RestIndexAnomalyDetectorAction(
settings,
restController,
Expand Down Expand Up @@ -237,10 +244,11 @@ public Collection<Object> createComponents(
Settings settings = environment.settings();
Clock clock = Clock.systemUTC();
Throttler throttler = new Throttler(clock);
ClientUtil clientUtil = new ClientUtil(settings, client, throttler, threadPool);
this.clientUtil = new ClientUtil(settings, client, throttler, threadPool);
IndexUtils indexUtils = new IndexUtils(client, clientUtil, clusterService);
anomalyDetectionIndices = new AnomalyDetectionIndices(client, clusterService, threadPool, settings, clientUtil);
this.clusterService = clusterService;
this.xContentRegistry = xContentRegistry;

SingleFeatureLinearUniformInterpolator singleFeatureLinearUniformInterpolator =
new IntegerSensitiveSingleFeatureLinearUniformInterpolator();
Expand Down Expand Up @@ -272,7 +280,8 @@ public Collection<Object> createComponents(
HybridThresholdingModel.class,
AnomalyDetectorSettings.MIN_PREVIEW_SIZE,
AnomalyDetectorSettings.HOURLY_MAINTENANCE,
AnomalyDetectorSettings.HOURLY_MAINTENANCE
AnomalyDetectorSettings.HOURLY_MAINTENANCE,
AnomalyDetectorSettings.SHINGLE_SIZE
);

HashRing hashRing = new HashRing(clusterService, clock, settings);
Expand Down Expand Up @@ -389,7 +398,7 @@ public List<Setting<?>> getSettings() {

@Override
public List<NamedXContentRegistry.Entry> getNamedXContent() {
return ImmutableList.of(AnomalyDetector.XCONTENT_REGISTRY, ADMetaData.XCONTENT_REGISTRY);
return ImmutableList.of(AnomalyDetector.XCONTENT_REGISTRY, ADMetaData.XCONTENT_REGISTRY, AnomalyResult.XCONTENT_REGISTRY);
}

@Override
Expand Down
Loading

0 comments on commit b541442

Please sign in to comment.