Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

start historical detector #355

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,13 @@
import com.amazon.opendistroforelasticsearch.ad.stats.suppliers.IndexStatusSupplier;
import com.amazon.opendistroforelasticsearch.ad.stats.suppliers.ModelsOnNodeSupplier;
import com.amazon.opendistroforelasticsearch.ad.stats.suppliers.SettableSupplier;
import com.amazon.opendistroforelasticsearch.ad.task.ADBatchTaskRunner;
import com.amazon.opendistroforelasticsearch.ad.task.ADTaskCacheManager;
import com.amazon.opendistroforelasticsearch.ad.task.ADTaskManager;
import com.amazon.opendistroforelasticsearch.ad.transport.ADBatchAnomalyResultAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ADBatchAnomalyResultTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ADBatchTaskRemoteExecutionAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ADBatchTaskRemoteExecutionTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ADResultBulkAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ADResultBulkTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ADStatsNodesAction;
Expand Down Expand Up @@ -148,6 +155,7 @@
import com.amazon.opendistroforelasticsearch.ad.transport.ThresholdResultAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ThresholdResultTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.handler.AnomalyIndexHandler;
import com.amazon.opendistroforelasticsearch.ad.transport.handler.AnomalyResultBulkIndexHandler;
import com.amazon.opendistroforelasticsearch.ad.transport.handler.DetectionStateHandler;
import com.amazon.opendistroforelasticsearch.ad.transport.handler.MultiEntityResultHandler;
import com.amazon.opendistroforelasticsearch.ad.util.ClientUtil;
Expand All @@ -173,7 +181,9 @@ public class AnomalyDetectorPlugin extends Plugin implements ActionPlugin, Scrip

public static final String AD_BASE_URI = "/_opendistro/_anomaly_detection";
public static final String AD_BASE_DETECTORS_URI = AD_BASE_URI + "/detectors";
public static final String AD_THREAD_POOL_PREFIX = "opendistro.ad.";
public static final String AD_THREAD_POOL_NAME = "ad-threadpool";
public static final String AD_BATCH_TASK_THREAD_POOL_NAME = "ad-batch-task-threadpool";
public static final String AD_JOB_TYPE = "opendistro_anomaly_detector";
private static Gson gson;
private AnomalyDetectionIndices anomalyDetectionIndices;
Expand All @@ -186,6 +196,9 @@ public class AnomalyDetectorPlugin extends Plugin implements ActionPlugin, Scrip
private DiscoveryNodeFilterer nodeFilter;
private IndexUtils indexUtils;
private DetectionStateHandler detectorStateHandler;
private ADTaskCacheManager adTaskCacheManager;
private ADTaskManager adTaskManager;
private ADBatchTaskRunner adBatchTaskRunner;

static {
SpecialPermission.check();
Expand Down Expand Up @@ -473,7 +486,7 @@ public Collection<Object> createComponents(
client,
settings,
threadPool,
ThrowingConsumerWrapper.throwingConsumerWrapper(anomalyDetectionIndices::initDetectorStateIndex),
ThrowingConsumerWrapper.throwingConsumerWrapper(anomalyDetectionIndices::initDetectionStateIndex),
anomalyDetectionIndices::doesDetectorStateIndexExist,
this.clientUtil,
this.indexUtils,
Expand All @@ -493,6 +506,35 @@ public Collection<Object> createComponents(
stateManager
);

adTaskCacheManager = new ADTaskCacheManager(settings, clusterService, memoryTracker);
adTaskManager = new ADTaskManager(settings, clusterService, client, xContentRegistry, anomalyDetectionIndices);
AnomalyResultBulkIndexHandler anomalyResultBulkIndexHandler = new AnomalyResultBulkIndexHandler(
client,
settings,
threadPool,
ThrowingConsumerWrapper.throwingConsumerWrapper(anomalyDetectionIndices::initAnomalyResultIndexDirectly),
anomalyDetectionIndices::doesAnomalyResultIndexExist,
this.clientUtil,
this.indexUtils,
clusterService,
anomalyDetectionIndices
);
adBatchTaskRunner = new ADBatchTaskRunner(
settings,
threadPool,
clusterService,
client,
nodeFilter,
indexNameExpressionResolver,
adCircuitBreakerService,
featureManager,
adTaskManager,
anomalyDetectionIndices,
adStats,
anomalyResultBulkIndexHandler,
adTaskCacheManager
);

// return objects used by Guice to inject dependencies for e.g.,
// transport action handler constructors
return ImmutableList
Expand All @@ -517,7 +559,9 @@ public Collection<Object> createComponents(
multiEntityResultHandler,
checkpoint,
modelPartitioner,
cacheProvider
cacheProvider,
adTaskManager,
adBatchTaskRunner
);
}

Expand All @@ -532,14 +576,21 @@ protected Clock getClock() {

@Override
public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
return Collections
.singletonList(
return ImmutableList
.of(
new FixedExecutorBuilder(
settings,
AD_THREAD_POOL_NAME,
Math.max(1, EsExecutors.allocatedProcessors(settings) / 4),
AnomalyDetectorSettings.AD_THEAD_POOL_QUEUE_SIZE,
"opendistro.ad." + AD_THREAD_POOL_NAME
AD_THREAD_POOL_PREFIX + AD_THREAD_POOL_NAME
),
new FixedExecutorBuilder(
kaituo marked this conversation as resolved.
Show resolved Hide resolved
settings,
AD_BATCH_TASK_THREAD_POOL_NAME,
Math.max(1, EsExecutors.allocatedProcessors(settings) / 8),
AnomalyDetectorSettings.AD_THEAD_POOL_QUEUE_SIZE,
AD_THREAD_POOL_PREFIX + AD_BATCH_TASK_THREAD_POOL_NAME
)
);
}
Expand Down Expand Up @@ -571,7 +622,10 @@ public List<Setting<?>> getSettings() {
AnomalyDetectorSettings.MAX_PRIMARY_SHARDS,
AnomalyDetectorSettings.FILTER_BY_BACKEND_ROLES,
AnomalyDetectorSettings.MAX_CACHE_MISS_HANDLING_PER_SECOND,
AnomalyDetectorSettings.MAX_BATCH_TASK_PER_NODE
AnomalyDetectorSettings.MAX_BATCH_TASK_PER_NODE,
AnomalyDetectorSettings.BATCH_TASK_PIECE_INTERVAL_SECONDS,
AnomalyDetectorSettings.MAX_OLD_AD_TASK_DOCS_PER_DETECTOR,
AnomalyDetectorSettings.BATCH_TASK_PIECE_SIZE
);
return unmodifiableList(Stream.concat(enabledSetting.stream(), systemSetting.stream()).collect(Collectors.toList()));
}
Expand Down Expand Up @@ -613,7 +667,9 @@ public List<NamedXContentRegistry.Entry> getNamedXContent() {
new ActionHandler<>(ADResultBulkAction.INSTANCE, ADResultBulkTransportAction.class),
new ActionHandler<>(EntityResultAction.INSTANCE, EntityResultTransportAction.class),
new ActionHandler<>(EntityProfileAction.INSTANCE, EntityProfileTransportAction.class),
new ActionHandler<>(SearchAnomalyDetectorInfoAction.INSTANCE, SearchAnomalyDetectorInfoTransportAction.class)
new ActionHandler<>(SearchAnomalyDetectorInfoAction.INSTANCE, SearchAnomalyDetectorInfoTransportAction.class),
new ActionHandler<>(ADBatchAnomalyResultAction.INSTANCE, ADBatchAnomalyResultTransportAction.class),
new ActionHandler<>(ADBatchTaskRemoteExecutionAction.INSTANCE, ADBatchTaskRemoteExecutionTransportAction.class)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ private SearchRequest createLastSampleTimeRequest(String detectorId, long enable

SearchSourceBuilder source = new SearchSourceBuilder()
.query(boolQueryBuilder)
.aggregation(AggregationBuilders.max(CommonName.AGG_NAME_MAX).field(AnomalyResult.EXECUTION_END_TIME_FIELD))
.aggregation(AggregationBuilders.max(CommonName.AGG_NAME_MAX_TIME).field(AnomalyResult.EXECUTION_END_TIME_FIELD))
.trackTotalHits(false)
.size(0);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@
*/
public class MemoryCircuitBreaker extends ThresholdCircuitBreaker<Short> {

private static final short defaultThreshold = 85;
public static final short DEFAULT_JVM_HEAP_USAGE_THRESHOLD = 85;
private final JvmService jvmService;

public MemoryCircuitBreaker(JvmService jvmService) {
super(defaultThreshold);
super(DEFAULT_JVM_HEAP_USAGE_THRESHOLD);
this.jvmService = jvmService;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
Copy link
Contributor Author

@ylwu-amzn ylwu-amzn Jan 5, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

./gradlew spotlessApply will reset copyright year as 2020 as configured in file spotless.license.java.

To make this PR clean, will send out a separate PR to update this license file and apply to all files. Will replace 2020 with $YEAR, then spotless can fill as current year automatically.

*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.ad.common.exception;

public class ADTaskCancelledException extends AnomalyDetectionException {
private String cancelledBy;

public ADTaskCancelledException(String msg, String user) {
super(msg);
this.cancelledBy = user;
this.countedInStats(false);
}

public String getCancelledBy() {
return cancelledBy;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,17 @@ public LimitExceededException(String anomalyDetectorId, String message) {
* @param message explanation for the limit
*/
public LimitExceededException(String message) {
super(null, message, true);
super(message, true);
}

/**
* Constructor with error message.
*
* @param message explanation for the limit
* @param endRun end detector run or not
*/
public LimitExceededException(String message, boolean endRun) {
super(null, message, endRun);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,5 +83,11 @@ public class CommonName {
// Query
// ======================================
// Used in finding the max timestamp
public static final String AGG_NAME_MAX = "max_timefield";
public static final String AGG_NAME_MAX_TIME = "max_timefield";
// Used in finding the min timestamp
public static final String AGG_NAME_MIN_TIME = "min_timefield";
// date histogram aggregation name
public static final String DATE_HISTOGRAM = "date_histogram";
// feature aggregation name
public static final String FEATURE_AGGS = "feature_aggs";
}
Original file line number Diff line number Diff line change
Expand Up @@ -699,4 +699,63 @@ public int getShingleSize(String detectorId) {
return -1;
}
}

public void getFeatureDataPoints(
ylwu-amzn marked this conversation as resolved.
Show resolved Hide resolved
AnomalyDetector detector,
long startTime,
long endTime,
ActionListener<Map<Long, Optional<double[]>>> listener
) {
try {
searchFeatureDao.getFeaturesForPeriodByBatch(detector, startTime, endTime, ActionListener.wrap(points -> {
logger.info("features size: {}", points.size());
ylwu-amzn marked this conversation as resolved.
Show resolved Hide resolved
listener.onResponse(points);
}, listener::onFailure));
} catch (Exception e) {
logger.error("Failed to get features for detector: " + detector.getDetectorId());
weicongs-amazon marked this conversation as resolved.
Show resolved Hide resolved
}
}

public SinglePointFeatures getShingledFeature(
AnomalyDetector detector,
Deque<Entry<Long, Optional<double[]>>> shingle,
Map<Long, Optional<double[]>> dataPoints,
long endTime
) {
long maxTimeDifference = detector.getDetectorIntervalInMilliseconds() / 2;
Map<Long, Entry<Long, Optional<double[]>>> featuresMap = getNearbyPointsForShingle(detector, shingle, endTime, maxTimeDifference)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to call getNearbyPointsForShingle which helps real time detectors to deal with uneven arrival of requests? You run in batches and your timestamp within an interval is fixed.

Copy link
Contributor Author

@ylwu-amzn ylwu-amzn Jan 7, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to handle sparse data, this method is used for imputing missing points in the shingle with neighboring points here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In getNearbyPointsForShingle, the imputing distance is half of the interval, which does not apply to your case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need this logic for historical detector?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is from product/user experience consideration, we will not differentiate the historical and realtime detection, will keep the model/algorithm consistent.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to impute missing data which caused by run time jitter for historical detector, just need to impute the data hole in source data.

Discussed with kaituo, will simplify the code for historical detector currently. For the single flow, we can create new function to handle both historical and realtime detection.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems that we have lots of ideas for the new universal workflow. Should we create a doc/issue to track them?

Copy link
Contributor Author

@ylwu-amzn ylwu-amzn Jan 9, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, as universal flow is just started, I will doc these ideas on my notebook and share with internal team first. Don't thinks it will benefit community by sharing such unconnected ideas on Github now as we don't even mention what universal flow is. Will create an RFC issue later once we finish research, and put those ideas on that Github issue. So user can know the big picture of background, our solution, then they can understand why we come up with these ideas.

.collect(Collectors.toMap(Entry::getKey, Entry::getValue));
List<Entry<Long, Long>> missingRanges = getMissingRangesInShingle(detector, featuresMap, endTime);
ylwu-amzn marked this conversation as resolved.
Show resolved Hide resolved
missingRanges.stream().forEach(r -> {
if (dataPoints.containsKey(r.getKey())) {
featuresMap.put(r.getValue(), new SimpleImmutableEntry<>(r.getValue(), dataPoints.get(r.getKey())));
}
});
shingle.clear();
weicongs-amazon marked this conversation as resolved.
Show resolved Hide resolved

shingle.clear();
yizheliu-amazon marked this conversation as resolved.
Show resolved Hide resolved
getFullShingleEndTimes(endTime, detector.getDetectorIntervalInMilliseconds(), detector.getShingleSize())
.mapToObj(time -> featuresMap.getOrDefault(time, new SimpleImmutableEntry<>(time, Optional.empty())))
.forEach(e -> shingle.add(e));

return getProcessedFeatures(shingle, detector, endTime);
}

private SinglePointFeatures getProcessedFeatures(
ylwu-amzn marked this conversation as resolved.
Show resolved Hide resolved
Deque<Entry<Long, Optional<double[]>>> shingle,
AnomalyDetector detector,
long endTime
) {
int shingleSize = detector.getShingleSize();
Optional<double[]> currentPoint = shingle.peekLast().getValue();
return new SinglePointFeatures(
currentPoint,
Optional
// if current point is not present or current shingle has more missing data points than
// max missing rate, will return null
.ofNullable(currentPoint.isPresent() ? filterAndFill(shingle, endTime, detector) : null)
.map(points -> batchShingle(points, shingleSize)[0])
);
}

}
Loading