Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
Verifying Categorical field's type and total number
Browse files Browse the repository at this point in the history
This PR adds categorical fields' number and length check. We only support one categorical field, and the categorical field can only be of type keyword and ip.

Testing done:
1. added unit tests
2. did manual testing.
  • Loading branch information
kaituo committed Oct 7, 2020
1 parent 502c8c5 commit dce7469
Show file tree
Hide file tree
Showing 6 changed files with 580 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ public class CommonName {
// index name for anomaly checkpoint of each model. One model one document.
public static final String CHECKPOINT_INDEX_NAME = ".opendistro-anomaly-checkpoints";

// The alias of the index in which to write single-entity AD result history
public static final String ANOMALY_RESULT_INDEX_ALIAS = ".opendistro-anomaly-results";

// ======================================
// Format name
// ======================================
Expand Down Expand Up @@ -55,4 +58,14 @@ public class CommonName {
public static final String TOTAL_SIZE_IN_BYTES = "total_size_in_bytes";
public static final String MODELS = "models";
public static final String INIT_PROGRESS = "init_progress";

// Elastic mapping type
public static final String MAPPING_TYPE = "_doc";

// Used to fetch mapping
public static final String TYPE = "type";

public static final String KEYWORD_TYPE = "keyword";

public static final String IP_TYPE = "ip";
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ public class AnomalyDetector implements ToXContentObject {
private static final String SHINGLE_SIZE_FIELD = "shingle_size";
private static final String LAST_UPDATE_TIME_FIELD = "last_update_time";
public static final String UI_METADATA_FIELD = "ui_metadata";
public static final String CATEGORY_FIELD = "category_field";

private final String detectorId;
private final Long version;
Expand All @@ -87,6 +88,7 @@ public class AnomalyDetector implements ToXContentObject {
private final Map<String, Object> uiMetadata;
private final Integer schemaVersion;
private final Instant lastUpdateTime;
private final List<String> categoryField;

/**
* Constructor function.
Expand All @@ -105,7 +107,62 @@ public class AnomalyDetector implements ToXContentObject {
* @param uiMetadata metadata used by Kibana
* @param schemaVersion anomaly detector index mapping version
* @param lastUpdateTime detector's last update time
* @param categoryField a list of partition fields
*/
public AnomalyDetector(
String detectorId,
Long version,
String name,
String description,
String timeField,
List<String> indices,
List<Feature> features,
QueryBuilder filterQuery,
TimeConfiguration detectionInterval,
TimeConfiguration windowDelay,
Integer shingleSize,
Map<String, Object> uiMetadata,
Integer schemaVersion,
Instant lastUpdateTime,
List<String> categoryField
) {
if (Strings.isBlank(name)) {
throw new IllegalArgumentException("Detector name should be set");
}
if (timeField == null) {
throw new IllegalArgumentException("Time field should be set");
}
if (indices == null || indices.isEmpty()) {
throw new IllegalArgumentException("Indices should be set");
}
if (detectionInterval == null) {
throw new IllegalArgumentException("Detection interval should be set");
}
if (shingleSize != null && shingleSize < 1) {
throw new IllegalArgumentException("Shingle size must be a positive integer");
}
if (categoryField != null && categoryField.size() > 1) {
throw new IllegalArgumentException("We only support filtering data by one categorical variable");
}
this.detectorId = detectorId;
this.version = version;
this.name = name;
this.description = description;
this.timeField = timeField;
this.indices = indices;
this.featureAttributes = features;
this.filterQuery = filterQuery;
this.detectionInterval = detectionInterval;
this.windowDelay = windowDelay;
this.shingleSize = shingleSize;
this.uiMetadata = uiMetadata;
this.schemaVersion = schemaVersion;
this.lastUpdateTime = lastUpdateTime;
this.categoryField = categoryField;
}

// TODO: remove after complete code merges. Created to not to touch too
// many places in one PR.
public AnomalyDetector(
String detectorId,
Long version,
Expand Down Expand Up @@ -151,6 +208,7 @@ public AnomalyDetector(
this.uiMetadata = uiMetadata;
this.schemaVersion = schemaVersion;
this.lastUpdateTime = lastUpdateTime;
this.categoryField = null;
}

public XContentBuilder toXContent(XContentBuilder builder) throws IOException {
Expand Down Expand Up @@ -181,6 +239,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (lastUpdateTime != null) {
xContentBuilder.timeField(LAST_UPDATE_TIME_FIELD, LAST_UPDATE_TIME_FIELD, lastUpdateTime.toEpochMilli());
}
if (categoryField != null) {
xContentBuilder.field(CATEGORY_FIELD, categoryField.toArray());
}
return xContentBuilder.endObject();
}

Expand Down Expand Up @@ -248,6 +309,7 @@ public static AnomalyDetector parse(
int schemaVersion = 0;
Map<String, Object> uiMetadata = null;
Instant lastUpdateTime = null;
List<String> categoryField = null;

ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser::getTokenLocation);
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
Expand Down Expand Up @@ -304,6 +366,9 @@ public static AnomalyDetector parse(
case LAST_UPDATE_TIME_FIELD:
lastUpdateTime = ParseUtils.toInstant(parser);
break;
case CATEGORY_FIELD:
categoryField = (List) parser.list();
break;
default:
parser.skipChildren();
break;
Expand All @@ -323,7 +388,8 @@ public static AnomalyDetector parse(
shingleSize,
uiMetadata,
schemaVersion,
lastUpdateTime
lastUpdateTime,
categoryField
);
}

Expand Down Expand Up @@ -443,4 +509,7 @@ public Instant getLastUpdateTime() {
return lastUpdateTime;
}

public List<String> getCategoryField() {
return this.categoryField;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,23 @@
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Map;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.mapping.get.GetFieldMappingsAction;
import org.elasticsearch.action.admin.indices.mapping.get.GetFieldMappingsRequest;
import org.elasticsearch.action.admin.indices.mapping.get.GetFieldMappingsResponse;
import org.elasticsearch.action.admin.indices.mapping.get.GetFieldMappingsResponse.FieldMappingMetadata;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.unit.TimeValue;
Expand All @@ -44,6 +52,7 @@
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestStatus;

import com.amazon.opendistroforelasticsearch.ad.constant.CommonName;
import com.amazon.opendistroforelasticsearch.ad.indices.AnomalyDetectionIndices;
import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector;
import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetectorJob;
Expand All @@ -60,6 +69,10 @@
*/
public class IndexAnomalyDetectorJobActionHandler extends AbstractActionHandler {

static final String ONLY_ONE_CATEGORICAL_FIELD_ERR_MSG = "We can have only one categorical field.";
public static final String CATEGORICAL_FIELD_TYPE_ERR_MSG = "A categorical field must be of type keyword or ip.";
static final String NOT_FOUND_ERR_MSG = "Cannot found the categorical field %s";

private final AnomalyDetectionIndices anomalyDetectionIndices;
private final String detectorId;
private final Long seqNo;
Expand Down Expand Up @@ -129,7 +142,10 @@ private void onCreateMappingsResponse(CreateIndexResponse response) throws IOExc

private void prepareAnomalyDetectorJobIndexing() {
GetRequest getRequest = new GetRequest(AnomalyDetector.ANOMALY_DETECTORS_INDEX).id(detectorId);
client.get(getRequest, ActionListener.wrap(response -> onGetAnomalyDetectorResponse(response), exception -> onFailure(exception)));
client.get(getRequest, ActionListener.wrap(response -> onGetAnomalyDetectorResponse(response), exception -> {
logger.info(new ParameterizedMessage("Exception while indexing job for[{}]", detectorId), exception);
onFailure(exception);
}));
}

private void onGetAnomalyDetectorResponse(GetResponse response) throws IOException {
Expand All @@ -151,29 +167,98 @@ private void onGetAnomalyDetectorResponse(GetResponse response) throws IOExcepti
return;
}

IntervalTimeConfiguration interval = (IntervalTimeConfiguration) detector.getDetectionInterval();
Schedule schedule = new IntervalSchedule(Instant.now(), (int) interval.getInterval(), interval.getUnit());
Duration duration = Duration.of(interval.getInterval(), interval.getUnit());

AnomalyDetectorJob job = new AnomalyDetectorJob(
detector.getDetectorId(),
schedule,
detector.getWindowDelay(),
true,
Instant.now(),
null,
Instant.now(),
duration.getSeconds()
);

getAnomalyDetectorJobForWrite(job);
validateCategoricalField(detector);
} catch (IOException e) {
String message = "Failed to parse anomaly detector job " + detectorId;
logger.error(message, e);
channel.sendResponse(new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, message));
}
}

@SuppressWarnings("unchecked")
private void validateCategoricalField(AnomalyDetector detector) {
List<String> categoryField = detector.getCategoryField();
if (categoryField == null) {
writeJob(detector);
return;
}

// we only support one categorical field
// If there is more than 1 field or none, AnomalyDetector's constructor
// throws IllegalArgumentException before reaching this line
if (categoryField.size() != 1) {
channel.sendResponse(new BytesRestResponse(RestStatus.BAD_REQUEST, ONLY_ONE_CATEGORICAL_FIELD_ERR_MSG));
return;
}

String categoryField0 = categoryField.get(0);

GetFieldMappingsRequest getMappingsRequest = new GetFieldMappingsRequest();
getMappingsRequest.indices(detector.getIndices().toArray(new String[0])).fields(categoryField.toArray(new String[0]));
getMappingsRequest.indicesOptions(IndicesOptions.strictExpand());

ActionListener<GetFieldMappingsResponse> mappingsListener = ActionListener.wrap(getMappingsResponse -> {
// example getMappingsResponse:
// GetFieldMappingsResponse{mappings={server-metrics={_doc={service=FieldMappingMetadata{fullName='service',
// source=org.elasticsearch.common.bytes.BytesArray@7ba87dbd}}}}}
boolean foundField = false;
Map<String, Map<String, Map<String, FieldMappingMetadata>>> mappingsByIndex = getMappingsResponse.mappings();

for (Map<String, Map<String, FieldMappingMetadata>> mappingsByType : mappingsByIndex.values()) {
for (Map<String, FieldMappingMetadata> mappingsByField : mappingsByType.values()) {
for (Map.Entry<String, FieldMappingMetadata> field2Metadata : mappingsByField.entrySet()) {
FieldMappingMetadata fieldMetadata = field2Metadata.getValue();

if (fieldMetadata != null) {
Object metadata = fieldMetadata.sourceAsMap().get(categoryField0);
if (metadata != null && metadata instanceof Map) {
foundField = true;
Map<String, Object> metadataMap = (Map<String, Object>) metadata;
String typeName = (String) metadataMap.get(CommonName.TYPE);
if (!typeName.equals(CommonName.KEYWORD_TYPE) && !typeName.equals(CommonName.IP_TYPE)) {
channel.sendResponse(new BytesRestResponse(RestStatus.BAD_REQUEST, CATEGORICAL_FIELD_TYPE_ERR_MSG));
return;
}
}
}
}
}
}

if (foundField == false) {
channel.sendResponse(new BytesRestResponse(RestStatus.BAD_REQUEST, String.format(NOT_FOUND_ERR_MSG, categoryField0)));
return;
}

writeJob(detector);
}, error -> {
String message = String.format("Fail to get the index mapping of %s", detector.getIndices());
logger.error(message, error);
channel.sendResponse(new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, message));
});

client.execute(GetFieldMappingsAction.INSTANCE, getMappingsRequest, mappingsListener);
}

private void writeJob(AnomalyDetector detector) {
IntervalTimeConfiguration interval = (IntervalTimeConfiguration) detector.getDetectionInterval();
Schedule schedule = new IntervalSchedule(Instant.now(), (int) interval.getInterval(), interval.getUnit());
Duration duration = Duration.of(interval.getInterval(), interval.getUnit());

AnomalyDetectorJob job = new AnomalyDetectorJob(
detector.getDetectorId(),
schedule,
detector.getWindowDelay(),
true,
Instant.now(),
null,
Instant.now(),
duration.getSeconds()
);

getAnomalyDetectorJobForWrite(job);
}

private void getAnomalyDetectorJobForWrite(AnomalyDetectorJob job) {
GetRequest getRequest = new GetRequest(AnomalyDetectorJob.ANOMALY_DETECTOR_JOB_INDEX).id(detectorId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import static org.hamcrest.Matchers.containsString;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
Expand All @@ -33,6 +35,8 @@
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.search.SearchModule;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.FixedExecutorBuilder;
import org.elasticsearch.threadpool.TestThreadPool;
Expand All @@ -42,6 +46,11 @@

import test.com.amazon.opendistroforelasticsearch.ad.util.FakeNode;

import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector;
import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetectorJob;
import com.amazon.opendistroforelasticsearch.ad.model.AnomalyResult;
import com.amazon.opendistroforelasticsearch.ad.model.DetectorInternalState;

public class AbstractADTest extends ESTestCase {

protected static final Logger LOG = (Logger) LogManager.getLogger(AbstractADTest.class);
Expand Down Expand Up @@ -216,4 +225,21 @@ public void assertException(
Exception e = expectThrows(exceptionType, () -> listener.actionGet());
assertThat(e.getMessage(), containsString(msg));
}

@Override
protected NamedXContentRegistry xContentRegistry() {
SearchModule searchModule = new SearchModule(Settings.EMPTY, false, Collections.emptyList());
List<NamedXContentRegistry.Entry> entries = searchModule.getNamedXContents();
entries
.addAll(
Arrays
.asList(
AnomalyDetector.XCONTENT_REGISTRY,
AnomalyResult.XCONTENT_REGISTRY,
DetectorInternalState.XCONTENT_REGISTRY,
AnomalyDetectorJob.XCONTENT_REGISTRY
)
);
return new NamedXContentRegistry(entries);
}
}
Loading

0 comments on commit dce7469

Please sign in to comment.