Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Use callbacks and bug fix #83

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 6 additions & 12 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -251,23 +251,17 @@ List<String> jacocoExclusions = [
'com.amazon.opendistroforelasticsearch.ad.common.exception.AnomalyDetectionException',
'com.amazon.opendistroforelasticsearch.ad.util.ClientUtil',

'com.amazon.opendistroforelasticsearch.ad.ml.*',
'com.amazon.opendistroforelasticsearch.ad.feature.*',
'com.amazon.opendistroforelasticsearch.ad.dataprocessor.*',
'com.amazon.opendistroforelasticsearch.ad.AnomalyDetectorRunner',
'com.amazon.opendistroforelasticsearch.ad.resthandler.RestGetAnomalyResultAction',
'com.amazon.opendistroforelasticsearch.ad.metrics.MetricFactory',
'com.amazon.opendistroforelasticsearch.ad.indices.AnomalyDetectionIndices',
'com.amazon.opendistroforelasticsearch.ad.transport.ForwardAction',
'com.amazon.opendistroforelasticsearch.ad.transport.ForwardTransportAction',
'com.amazon.opendistroforelasticsearch.ad.transport.StopDetectorAction',
'com.amazon.opendistroforelasticsearch.ad.transport.StopDetectorRequest',
'com.amazon.opendistroforelasticsearch.ad.transport.StopDetectorResponse',
'com.amazon.opendistroforelasticsearch.ad.transport.StopDetectorTransportAction',
'com.amazon.opendistroforelasticsearch.ad.transport.ADStatsAction',
'com.amazon.opendistroforelasticsearch.ad.transport.CronRequest',
'com.amazon.opendistroforelasticsearch.ad.transport.DeleteDetectorAction',
'com.amazon.opendistroforelasticsearch.ad.util.ParseUtils'
'com.amazon.opendistroforelasticsearch.ad.transport.CronTransportAction',
'com.amazon.opendistroforelasticsearch.ad.transport.CronRequest',
'com.amazon.opendistroforelasticsearch.ad.transport.ADStatsAction',
'com.amazon.opendistroforelasticsearch.ad.AnomalyDetectorRunner',
'com.amazon.opendistroforelasticsearch.ad.indices.AnomalyDetectionIndices',
'com.amazon.opendistroforelasticsearch.ad.util.ParseUtils',
]

jacocoTestCoverageVerification {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.amazon.opendistroforelasticsearch.ad.transport.AnomalyResultResponse;
import com.amazon.opendistroforelasticsearch.ad.transport.AnomalyResultTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.handler.AnomalyResultHandler;
import com.amazon.opendistroforelasticsearch.ad.util.ClientUtil;
import com.amazon.opendistroforelasticsearch.jobscheduler.spi.JobExecutionContext;
import com.amazon.opendistroforelasticsearch.jobscheduler.spi.LockModel;
import com.amazon.opendistroforelasticsearch.jobscheduler.spi.ScheduledJobParameter;
Expand All @@ -39,7 +40,9 @@
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -71,6 +74,7 @@ public class AnomalyDetectorJobRunner implements ScheduledJobRunner {
private Settings settings;
private int maxRetryForEndRunException;
private Client client;
private ClientUtil clientUtil;
private ThreadPool threadPool;
private AnomalyResultHandler anomalyResultHandler;
private ConcurrentHashMap<String, Integer> detectorEndRunExceptionCount;
Expand All @@ -97,6 +101,10 @@ public void setClient(Client client) {
this.client = client;
}

public void setClientUtil(ClientUtil clientUtil) {
this.clientUtil = clientUtil;
}

public void setThreadPool(ThreadPool threadPool) {
this.threadPool = threadPool;
}
Expand Down Expand Up @@ -258,7 +266,7 @@ protected void handleAdException(
) {
String detectorId = jobParameter.getName();
if (exception instanceof EndRunException) {
log.error("EndRunException happened when executed anomaly result action for " + detectorId, exception);
log.error("EndRunException happened when executing anomaly result action for " + detectorId, exception);

if (((EndRunException) exception).isEndNow()) {
// Stop AD job if EndRunException shows we should end job now.
Expand Down Expand Up @@ -349,9 +357,8 @@ private void stopAdJob(String detectorId) {
try {
GetRequest getRequest = new GetRequest(AnomalyDetectorJob.ANOMALY_DETECTOR_JOB_INDEX).id(detectorId);

client.get(getRequest, ActionListener.wrap(response -> {
clientUtil.<GetRequest, GetResponse>asyncRequest(getRequest, client::get, ActionListener.wrap(response -> {
if (response.isExists()) {
String s = response.getSourceAsString();
try (
XContentParser parser = XContentType.JSON
.xContent()
Expand All @@ -374,14 +381,19 @@ private void stopAdJob(String detectorId) {
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.source(newJob.toXContent(XContentBuilder.builder(XContentType.JSON.xContent()), XCONTENT_WITH_TYPE))
.id(detectorId);
client.index(indexRequest, ActionListener.wrap(indexResponse -> {
if (indexResponse != null
&& (indexResponse.getResult() == CREATED || indexResponse.getResult() == UPDATED)) {
log.info("AD Job was disabled by JobRunner for " + detectorId);
} else {
log.warn("Failed to disable AD job for " + detectorId);
}
}, exception -> log.error("JobRunner failed to update AD job as disabled for " + detectorId, exception)));
clientUtil
.<IndexRequest, IndexResponse>asyncRequest(
indexRequest,
client::index,
ActionListener.wrap(indexResponse -> {
if (indexResponse != null
&& (indexResponse.getResult() == CREATED || indexResponse.getResult() == UPDATED)) {
log.info("AD Job was disabled by JobRunner for " + detectorId);
} else {
log.warn("Failed to disable AD job for " + detectorId);
}
}, exception -> log.error("JobRunner failed to update AD job as disabled for " + detectorId, exception))
);
}
} catch (IOException e) {
log.error("JobRunner failed to stop detector job " + detectorId, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ public class AnomalyDetectorPlugin extends Plugin implements ActionPlugin, Scrip
private ThreadPool threadPool;
private IndexNameExpressionResolver indexNameExpressionResolver;
private ADStats adStats;
private ClientUtil clientUtil;

static {
SpecialPermission.check();
Expand Down Expand Up @@ -174,6 +175,7 @@ public List<RestHandler> getRestHandlers(
);
AnomalyDetectorJobRunner jobRunner = AnomalyDetectorJobRunner.getJobRunnerInstance();
jobRunner.setClient(client);
jobRunner.setClientUtil(clientUtil);
jobRunner.setThreadPool(threadPool);
jobRunner.setAnomalyResultHandler(anomalyResultHandler);
jobRunner.setSettings(settings);
Expand Down Expand Up @@ -237,7 +239,7 @@ public Collection<Object> createComponents(
Settings settings = environment.settings();
Clock clock = Clock.systemUTC();
Throttler throttler = new Throttler(clock);
ClientUtil clientUtil = new ClientUtil(settings, client, throttler, threadPool);
this.clientUtil = new ClientUtil(settings, client, throttler, threadPool);
IndexUtils indexUtils = new IndexUtils(client, clientUtil, clusterService);
anomalyDetectionIndices = new AnomalyDetectionIndices(client, clusterService, threadPool, settings, clientUtil);
this.clusterService = clusterService;
Expand Down Expand Up @@ -272,7 +274,8 @@ public Collection<Object> createComponents(
HybridThresholdingModel.class,
AnomalyDetectorSettings.MIN_PREVIEW_SIZE,
AnomalyDetectorSettings.HOURLY_MAINTENANCE,
AnomalyDetectorSettings.HOURLY_MAINTENANCE
AnomalyDetectorSettings.HOURLY_MAINTENANCE,
AnomalyDetectorSettings.SHINGLE_SIZE
);

HashRing hashRing = new HashRing(clusterService, clock, settings);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public AnomalyDetectorRunner(ModelManager modelManager, FeatureManager featureMa
* @param startTime detection period start time
* @param endTime detection period end time
* @param listener handle anomaly result
* @throws IOException - if a user gives wrong query input when defining a detector
*/
public void executeDetector(AnomalyDetector detector, Instant startTime, Instant endTime, ActionListener<List<AnomalyResult>> listener)
throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,52 @@ public Optional<double[][]> getColdStartData(AnomalyDetector detector) {
.map(points -> batchShingle(points, shingleSize));
}

/**
* Returns to listener data for cold-start training.
*
* Training data starts with getting samples from (costly) search.
* Samples are increased in size via interpolation and then
* in dimension via shingling.
*
* @param detector contains data info (indices, documents, etc)
* @param listener onResponse is called with data for cold-start training, or empty if unavailable
*/
public void getColdStartData(AnomalyDetector detector, ActionListener<Optional<double[][]>> listener) {
searchFeatureDao
.getLatestDataTime(
detector,
ActionListener.wrap(latest -> getColdStartSamples(latest, detector, listener), listener::onFailure)
);
}

private void getColdStartSamples(Optional<Long> latest, AnomalyDetector detector, ActionListener<Optional<double[][]>> listener) {
if (latest.isPresent()) {
searchFeatureDao
.getFeaturesForSampledPeriods(
detector,
maxTrainSamples,
maxSampleStride,
latest.get(),
ActionListener.wrap(samples -> processColdStartSamples(samples, listener), listener::onFailure)
);
} else {
listener.onResponse(Optional.empty());
}
}

private void processColdStartSamples(Optional<Entry<double[][], Integer>> samples, ActionListener<Optional<double[][]>> listener) {
listener
.onResponse(
samples
.map(
results -> transpose(
interpolator.interpolate(transpose(results.getKey()), results.getValue() * (results.getKey().length - 1) + 1)
)
)
.map(points -> batchShingle(points, shingleSize))
);
}

/**
* Shingles a batch of data points by concatenating neighboring data points.
*
Expand Down
Loading