Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
Verifying multi-entity detectors (#240)
Browse files Browse the repository at this point in the history
* Verifying multi-entity detectors

This PR adds categorical fields' number and length check. We only support one categorical field, and the categorical field can only be of type keyword and ip.  We also limit the max multi-entity detectors to 10.

Testing done:
1. added unit tests
2. did manual testing.
  • Loading branch information
kaituo authored Oct 13, 2020
1 parent aaba77d commit 85e38ac
Show file tree
Hide file tree
Showing 16 changed files with 1,034 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,8 @@ public List<Setting<?>> getSettings() {

List<Setting<?>> systemSetting = ImmutableList
.of(
AnomalyDetectorSettings.MAX_ANOMALY_DETECTORS,
AnomalyDetectorSettings.MAX_SINGLE_ENTITY_ANOMALY_DETECTORS,
AnomalyDetectorSettings.MAX_MULTI_ENTITY_ANOMALY_DETECTORS,
AnomalyDetectorSettings.MAX_ANOMALY_FEATURES,
AnomalyDetectorSettings.REQUEST_TIMEOUT,
AnomalyDetectorSettings.DETECTION_INTERVAL,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ public class CommonName {
// index name for anomaly checkpoint of each model. One model one document.
public static final String CHECKPOINT_INDEX_NAME = ".opendistro-anomaly-checkpoints";

// The alias of the index in which to write AD result history
public static final String ANOMALY_RESULT_INDEX_ALIAS = ".opendistro-anomaly-results";

// ======================================
// Format name
// ======================================
Expand Down Expand Up @@ -55,4 +58,14 @@ public class CommonName {
public static final String TOTAL_SIZE_IN_BYTES = "total_size_in_bytes";
public static final String MODELS = "models";
public static final String INIT_PROGRESS = "init_progress";

// Elastic mapping type
public static final String MAPPING_TYPE = "_doc";

// Used to fetch mapping
public static final String TYPE = "type";

public static final String KEYWORD_TYPE = "keyword";

public static final String IP_TYPE = "ip";
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@

package com.amazon.opendistroforelasticsearch.ad.model;

import static com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings.CATEGORY_FIELD_LIMIT;
import static com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings.DEFAULT_MULTI_ENTITY_SHINGLE;
import static com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings.DEFAULT_SHINGLE_SIZE;
import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.elasticsearch.index.query.AbstractQueryBuilder.parseInnerQueryBuilder;

import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
Expand Down Expand Up @@ -76,6 +79,7 @@ public class AnomalyDetector implements Writeable, ToXContentObject {
private static final String SHINGLE_SIZE_FIELD = "shingle_size";
private static final String LAST_UPDATE_TIME_FIELD = "last_update_time";
public static final String UI_METADATA_FIELD = "ui_metadata";
public static final String CATEGORY_FIELD = "category_field";

private final String detectorId;
private final Long version;
Expand All @@ -91,6 +95,7 @@ public class AnomalyDetector implements Writeable, ToXContentObject {
private final Map<String, Object> uiMetadata;
private final Integer schemaVersion;
private final Instant lastUpdateTime;
private final List<String> categoryFields;

/**
* Constructor function.
Expand All @@ -109,6 +114,7 @@ public class AnomalyDetector implements Writeable, ToXContentObject {
* @param uiMetadata metadata used by Kibana
* @param schemaVersion anomaly detector index mapping version
* @param lastUpdateTime detector's last update time
* @param categoryField a list of partition fields
*/
public AnomalyDetector(
String detectorId,
Expand All @@ -124,7 +130,8 @@ public AnomalyDetector(
Integer shingleSize,
Map<String, Object> uiMetadata,
Integer schemaVersion,
Instant lastUpdateTime
Instant lastUpdateTime,
List<String> categoryField
) {
if (Strings.isBlank(name)) {
throw new IllegalArgumentException("Detector name should be set");
Expand All @@ -141,6 +148,9 @@ public AnomalyDetector(
if (shingleSize != null && shingleSize < 1) {
throw new IllegalArgumentException("Shingle size must be a positive integer");
}
if (categoryField != null && categoryField.size() > CATEGORY_FIELD_LIMIT) {
throw new IllegalArgumentException("We only support filtering data by one categorical variable");
}
this.detectorId = detectorId;
this.version = version;
this.name = name;
Expand All @@ -155,6 +165,44 @@ public AnomalyDetector(
this.uiMetadata = uiMetadata;
this.schemaVersion = schemaVersion;
this.lastUpdateTime = lastUpdateTime;
this.categoryFields = categoryField;
}

// TODO: remove after complete code merges. Created to not to touch too
// many places in one PR.
public AnomalyDetector(
String detectorId,
Long version,
String name,
String description,
String timeField,
List<String> indices,
List<Feature> features,
QueryBuilder filterQuery,
TimeConfiguration detectionInterval,
TimeConfiguration windowDelay,
Integer shingleSize,
Map<String, Object> uiMetadata,
Integer schemaVersion,
Instant lastUpdateTime
) {
this(
detectorId,
version,
name,
description,
timeField,
indices,
features,
filterQuery,
detectionInterval,
windowDelay,
shingleSize,
uiMetadata,
schemaVersion,
lastUpdateTime,
null
);
}

public AnomalyDetector(StreamInput input) throws IOException {
Expand Down Expand Up @@ -188,6 +236,7 @@ public AnomalyDetector(StreamInput input) throws IOException {
uiMetadata = input.readMap();
schemaVersion = input.readInt();
lastUpdateTime = input.readInstant();
this.categoryFields = input.readStringList();
}

public XContentBuilder toXContent(XContentBuilder builder) throws IOException {
Expand All @@ -210,6 +259,7 @@ public void writeTo(StreamOutput output) throws IOException {
output.writeMap(uiMetadata);
output.writeInt(schemaVersion);
output.writeInstant(lastUpdateTime);
output.writeStringCollection(categoryFields);
}

@Override
Expand All @@ -236,6 +286,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (lastUpdateTime != null) {
xContentBuilder.timeField(LAST_UPDATE_TIME_FIELD, LAST_UPDATE_TIME_FIELD, lastUpdateTime.toEpochMilli());
}
if (categoryFields != null) {
xContentBuilder.field(CATEGORY_FIELD, categoryFields.toArray());
}
return xContentBuilder.endObject();
}

Expand Down Expand Up @@ -264,7 +317,7 @@ public static AnomalyDetector parse(XContentParser parser, String detectorId) th
* @throws IOException IOException if content can't be parsed correctly
*/
public static AnomalyDetector parse(XContentParser parser, String detectorId, Long version) throws IOException {
return parse(parser, detectorId, version, null, null, null);
return parse(parser, detectorId, version, null, null);
}

/**
Expand All @@ -275,7 +328,6 @@ public static AnomalyDetector parse(XContentParser parser, String detectorId, Lo
* @param version detector document version
* @param defaultDetectionInterval default detection interval
* @param defaultDetectionWindowDelay default detection window delay
* @param defaultShingleSize default number of intervals in shingle
* @return anomaly detector instance
* @throws IOException IOException if content can't be parsed correctly
*/
Expand All @@ -284,8 +336,7 @@ public static AnomalyDetector parse(
String detectorId,
Long version,
TimeValue defaultDetectionInterval,
TimeValue defaultDetectionWindowDelay,
Integer defaultShingleSize
TimeValue defaultDetectionWindowDelay
) throws IOException {
String name = null;
String description = null;
Expand All @@ -298,12 +349,14 @@ public static AnomalyDetector parse(
TimeConfiguration windowDelay = defaultDetectionWindowDelay == null
? null
: new IntervalTimeConfiguration(defaultDetectionWindowDelay.getSeconds(), ChronoUnit.SECONDS);
Integer shingleSize = defaultShingleSize;
Integer shingleSize = null;
List<Feature> features = new ArrayList<>();
int schemaVersion = 0;
Map<String, Object> uiMetadata = null;
Instant lastUpdateTime = null;

List<String> categoryField = null;

ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser::getTokenLocation);
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
String fieldName = parser.currentName();
Expand Down Expand Up @@ -359,6 +412,9 @@ public static AnomalyDetector parse(
case LAST_UPDATE_TIME_FIELD:
lastUpdateTime = ParseUtils.toInstant(parser);
break;
case CATEGORY_FIELD:
categoryField = (List) parser.list();
break;
default:
parser.skipChildren();
break;
Expand All @@ -375,10 +431,11 @@ public static AnomalyDetector parse(
filterQuery,
detectionInterval,
windowDelay,
shingleSize,
getShingleSize(shingleSize, categoryField),
uiMetadata,
schemaVersion,
lastUpdateTime
lastUpdateTime,
categoryField
);
}

Expand Down Expand Up @@ -483,7 +540,20 @@ public TimeConfiguration getWindowDelay() {
}

public Integer getShingleSize() {
return shingleSize == null ? DEFAULT_SHINGLE_SIZE : shingleSize;
return getShingleSize(shingleSize, categoryFields);
}

/**
* If the given shingle size is null, return default based on the kind of detector;
* otherwise, return the given shingle size.
* @param customShingleSize Given shingle size
* @param categoryField Used to verify if this is a multi-entity or single-entity detector
* @return Shingle size
*/
private static Integer getShingleSize(Integer customShingleSize, List<String> categoryField) {
return customShingleSize == null
? (categoryField != null && categoryField.size() > 0 ? DEFAULT_MULTI_ENTITY_SHINGLE : DEFAULT_SHINGLE_SIZE)
: customShingleSize;
}

public Map<String, Object> getUiMetadata() {
Expand All @@ -498,4 +568,19 @@ public Instant getLastUpdateTime() {
return lastUpdateTime;
}

public List<String> getCategoryField() {
return this.categoryFields;
}

public long getDetectorIntervalInMilliseconds() {
return ((IntervalTimeConfiguration) getDetectionInterval()).toDuration().toMillis();
}

public long getDetectorIntervalInSeconds() {
return getDetectorIntervalInMilliseconds() / 1000;
}

public Duration getDetectionIntervalDuration() {
return ((IntervalTimeConfiguration) getDetectionInterval()).toDuration();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@

package com.amazon.opendistroforelasticsearch.ad.rest;

import static com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings.DEFAULT_SHINGLE_SIZE;
import static com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings.DETECTION_INTERVAL;
import static com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings.DETECTION_WINDOW_DELAY;
import static com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings.MAX_ANOMALY_DETECTORS;
import static com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings.MAX_ANOMALY_FEATURES;
import static com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings.MAX_MULTI_ENTITY_ANOMALY_DETECTORS;
import static com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings.MAX_SINGLE_ENTITY_ANOMALY_DETECTORS;
import static com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings.REQUEST_TIMEOUT;
import static com.amazon.opendistroforelasticsearch.ad.util.RestHandlerUtils.DETECTOR_ID;
import static com.amazon.opendistroforelasticsearch.ad.util.RestHandlerUtils.IF_PRIMARY_TERM;
Expand Down Expand Up @@ -65,36 +65,37 @@
public class RestIndexAnomalyDetectorAction extends BaseRestHandler {

private static final String INDEX_ANOMALY_DETECTOR_ACTION = "index_anomaly_detector_action";
private final AnomalyDetectionIndices anomalyDetectionIndices;
private final Logger logger = LogManager.getLogger(RestIndexAnomalyDetectorAction.class);
private final ClusterService clusterService;
private final Settings settings;

private volatile TimeValue requestTimeout;
private volatile TimeValue detectionInterval;
private volatile TimeValue detectionWindowDelay;
private volatile Integer maxAnomalyDetectors;
private volatile Integer maxSingleEntityDetectors;
private volatile Integer maxMultiEntityDetectors;
private volatile Integer maxAnomalyFeatures;

public RestIndexAnomalyDetectorAction(
Settings settings,
ClusterService clusterService,
AnomalyDetectionIndices anomalyDetectionIndices
) {
this.settings = settings;
this.anomalyDetectionIndices = anomalyDetectionIndices;
this.requestTimeout = REQUEST_TIMEOUT.get(settings);
this.detectionInterval = DETECTION_INTERVAL.get(settings);
this.detectionWindowDelay = DETECTION_WINDOW_DELAY.get(settings);
this.maxAnomalyDetectors = MAX_ANOMALY_DETECTORS.get(settings);
this.maxSingleEntityDetectors = MAX_SINGLE_ENTITY_ANOMALY_DETECTORS.get(settings);
this.maxMultiEntityDetectors = MAX_MULTI_ENTITY_ANOMALY_DETECTORS.get(settings);
this.maxAnomalyFeatures = MAX_ANOMALY_FEATURES.get(settings);
this.clusterService = clusterService;
// TODO: will add more cluster setting consumer later
// TODO: inject ClusterSettings only if clusterService is only used to get ClusterSettings
clusterService.getClusterSettings().addSettingsUpdateConsumer(REQUEST_TIMEOUT, it -> requestTimeout = it);
clusterService.getClusterSettings().addSettingsUpdateConsumer(DETECTION_INTERVAL, it -> detectionInterval = it);
clusterService.getClusterSettings().addSettingsUpdateConsumer(DETECTION_WINDOW_DELAY, it -> detectionWindowDelay = it);
clusterService.getClusterSettings().addSettingsUpdateConsumer(MAX_ANOMALY_DETECTORS, it -> maxAnomalyDetectors = it);
clusterService
.getClusterSettings()
.addSettingsUpdateConsumer(MAX_SINGLE_ENTITY_ANOMALY_DETECTORS, it -> maxSingleEntityDetectors = it);
clusterService
.getClusterSettings()
.addSettingsUpdateConsumer(MAX_MULTI_ENTITY_ANOMALY_DETECTORS, it -> maxMultiEntityDetectors = it);
clusterService.getClusterSettings().addSettingsUpdateConsumer(MAX_ANOMALY_FEATURES, it -> maxAnomalyFeatures = it);
}

Expand All @@ -115,8 +116,7 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
XContentParser parser = request.contentParser();
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser::getTokenLocation);
// TODO: check detection interval < modelTTL
AnomalyDetector detector = AnomalyDetector
.parse(parser, detectorId, null, detectionInterval, detectionWindowDelay, DEFAULT_SHINGLE_SIZE);
AnomalyDetector detector = AnomalyDetector.parse(parser, detectorId, null, detectionInterval, detectionWindowDelay);

long seqNo = request.paramAsLong(IF_SEQ_NO, SequenceNumbers.UNASSIGNED_SEQ_NO);
long primaryTerm = request.paramAsLong(IF_PRIMARY_TERM, SequenceNumbers.UNASSIGNED_PRIMARY_TERM);
Expand All @@ -131,7 +131,11 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
primaryTerm,
refreshPolicy,
detector,
method
method,
requestTimeout,
maxSingleEntityDetectors,
maxMultiEntityDetectors,
maxAnomalyFeatures
);

return channel -> client
Expand Down
Loading

0 comments on commit 85e38ac

Please sign in to comment.