Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Adding RestActions support for Start/Stop Detector API #244

Merged
merged 10 commits into from
Oct 13, 2020
9 changes: 7 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,8 @@ List<String> jacocoExclusions = [
//TODO: add more test cases later for these package
'com.amazon.opendistroforelasticsearch.ad.model.*',
'com.amazon.opendistroforelasticsearch.ad.rest.*',
'com.amazon.opendistroforelasticsearch.ad.transport.handler.DetectionStateHandler',
'com.amazon.opendistroforelasticsearch.ad.AnomalyDetectorJobRunner',

// Class containing just constants. Don't need to test
'com.amazon.opendistroforelasticsearch.ad.constant.*',
Expand All @@ -250,7 +252,10 @@ List<String> jacocoExclusions = [
'com.amazon.opendistroforelasticsearch.ad.indices.AnomalyDetectionIndices',
'com.amazon.opendistroforelasticsearch.ad.util.ParseUtils',
'com.amazon.opendistroforelasticsearch.ad.transport.StatsAnomalyDetectorTransportAction',
'com.amazon.opendistroforelasticsearch.ad.transport.DeleteAnomalyDetectorTransportAction*'
'com.amazon.opendistroforelasticsearch.ad.transport.DeleteAnomalyDetectorTransportAction*',
'com.amazon.opendistroforelasticsearch.ad.transport.GetAnomalyDetectorTransportAction*',
'com.amazon.opendistroforelasticsearch.ad.transport.GetAnomalyDetectorResponse',
'com.amazon.opendistroforelasticsearch.ad.transport.IndexAnomalyDetectorRequest'
]

jacocoTestCoverageVerification {
Expand Down Expand Up @@ -292,7 +297,7 @@ checkstyle {
dependencies {
compile "org.elasticsearch:elasticsearch:${es_version}"
compileOnly "org.elasticsearch.plugin:elasticsearch-scripting-painless-spi:${versions.elasticsearch}"
compileOnly "com.amazon.opendistroforelasticsearch:opendistro-job-scheduler-spi:1.10.1.0"
compileOnly "com.amazon.opendistroforelasticsearch:opendistro-job-scheduler-spi:1.10.1.1"
compile group: 'com.google.guava', name: 'guava', version:'15.0'
compile group: 'org.apache.commons', name: 'commons-math3', version: '3.6.1'
compile group: 'com.google.code.gson', name: 'gson', version: '2.8.5'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@
import com.amazon.opendistroforelasticsearch.ad.stats.suppliers.SettableSupplier;
import com.amazon.opendistroforelasticsearch.ad.transport.ADStatsNodesAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ADStatsNodesTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.AnomalyDetectorJobAction;
import com.amazon.opendistroforelasticsearch.ad.transport.AnomalyDetectorJobTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.AnomalyResultAction;
import com.amazon.opendistroforelasticsearch.ad.transport.AnomalyResultTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.CronAction;
Expand All @@ -109,6 +111,10 @@
import com.amazon.opendistroforelasticsearch.ad.transport.DeleteAnomalyDetectorTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.DeleteModelAction;
import com.amazon.opendistroforelasticsearch.ad.transport.DeleteModelTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.GetAnomalyDetectorAction;
import com.amazon.opendistroforelasticsearch.ad.transport.GetAnomalyDetectorTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.IndexAnomalyDetectorAction;
import com.amazon.opendistroforelasticsearch.ad.transport.IndexAnomalyDetectorTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ProfileAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ProfileTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.RCFPollingAction;
Expand Down Expand Up @@ -211,7 +217,7 @@ public List<RestHandler> getRestHandlers(
this.nodeFilter,
AnomalyDetectorSettings.NUM_MIN_SAMPLES
);
RestGetAnomalyDetectorAction restGetAnomalyDetectorAction = new RestGetAnomalyDetectorAction(profileRunner);
RestGetAnomalyDetectorAction restGetAnomalyDetectorAction = new RestGetAnomalyDetectorAction();
RestIndexAnomalyDetectorAction restIndexAnomalyDetectorAction = new RestIndexAnomalyDetectorAction(
settings,
clusterService,
Expand All @@ -226,11 +232,7 @@ public List<RestHandler> getRestHandlers(
anomalyDetectorRunner
);
RestStatsAnomalyDetectorAction statsAnomalyDetectorAction = new RestStatsAnomalyDetectorAction(adStats, this.nodeFilter);
RestAnomalyDetectorJobAction anomalyDetectorJobAction = new RestAnomalyDetectorJobAction(
settings,
clusterService,
anomalyDetectionIndices
);
RestAnomalyDetectorJobAction anomalyDetectorJobAction = new RestAnomalyDetectorJobAction(settings, clusterService);

return ImmutableList
.of(
Expand Down Expand Up @@ -468,7 +470,10 @@ public List<NamedXContentRegistry.Entry> getNamedXContent() {
new ActionHandler<>(SearchAnomalyDetectorAction.INSTANCE, SearchAnomalyDetectorTransportAction.class),
new ActionHandler<>(SearchAnomalyResultAction.INSTANCE, SearchAnomalyResultTransportAction.class),
new ActionHandler<>(StatsAnomalyDetectorAction.INSTANCE, StatsAnomalyDetectorTransportAction.class),
new ActionHandler<>(DeleteAnomalyDetectorAction.INSTANCE, DeleteAnomalyDetectorTransportAction.class)
new ActionHandler<>(DeleteAnomalyDetectorAction.INSTANCE, DeleteAnomalyDetectorTransportAction.class),
new ActionHandler<>(GetAnomalyDetectorAction.INSTANCE, GetAnomalyDetectorTransportAction.class),
new ActionHandler<>(IndexAnomalyDetectorAction.INSTANCE, IndexAnomalyDetectorTransportAction.class),
new ActionHandler<>(AnomalyDetectorJobAction.INSTANCE, AnomalyDetectorJobTransportAction.class)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,16 @@

import org.apache.logging.log4j.util.Strings;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.query.MatchAllQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.builder.SearchSourceBuilder;
Expand All @@ -46,7 +50,7 @@
/**
* An AnomalyDetector is used to represent anomaly detection model(RCF) related parameters.
*/
public class AnomalyDetector implements ToXContentObject {
public class AnomalyDetector implements Writeable, ToXContentObject {

public static final String PARSE_FIELD_NAME = "AnomalyDetector";
public static final NamedXContentRegistry.Entry XCONTENT_REGISTRY = new NamedXContentRegistry.Entry(
Expand Down Expand Up @@ -153,10 +157,61 @@ public AnomalyDetector(
this.lastUpdateTime = lastUpdateTime;
}

public AnomalyDetector(StreamInput input) throws IOException {
detectorId = input.readString();
version = input.readLong();
String name = input.readString();
if (Strings.isBlank(name)) {
throw new IllegalArgumentException("Detector name should be set");
}
this.name = name;
description = input.readString();
String timeField = input.readString();
if (timeField == null) {
throw new IllegalArgumentException("Time field should be set");
}
this.timeField = timeField;
List<String> indices = input.readStringList();
if (indices == null || indices.isEmpty()) {
throw new IllegalArgumentException("Indices should be set");
}
this.indices = indices;
featureAttributes = input.readList(Feature::new);
filterQuery = new MatchAllQueryBuilder(input);
detectionInterval = IntervalTimeConfiguration.readFrom(input);
windowDelay = IntervalTimeConfiguration.readFrom(input);
Integer shingleSize = input.readInt();
if (shingleSize != null && shingleSize < 1) {
throw new IllegalArgumentException("Shingle size must be a positive integer");
}
this.shingleSize = shingleSize;
uiMetadata = input.readMap();
schemaVersion = input.readInt();
lastUpdateTime = input.readInstant();
}

public XContentBuilder toXContent(XContentBuilder builder) throws IOException {
return toXContent(builder, ToXContent.EMPTY_PARAMS);
}

@Override
public void writeTo(StreamOutput output) throws IOException {
output.writeString(detectorId);
output.writeLong(version);
output.writeString(name);
output.writeString(description);
output.writeString(timeField);
output.writeStringCollection(indices);
output.writeList(featureAttributes);
filterQuery.writeTo(output);
detectionInterval.writeTo(output);
windowDelay.writeTo(output);
output.writeInt(shingleSize);
output.writeMap(uiMetadata);
output.writeInt(schemaVersion);
output.writeInstant(lastUpdateTime);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
XContentBuilder xContentBuilder = builder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,30 @@
import java.time.Instant;

import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;

import com.amazon.opendistroforelasticsearch.ad.util.ParseUtils;
import com.amazon.opendistroforelasticsearch.jobscheduler.spi.ScheduledJobParameter;
import com.amazon.opendistroforelasticsearch.jobscheduler.spi.schedule.CronSchedule;
import com.amazon.opendistroforelasticsearch.jobscheduler.spi.schedule.IntervalSchedule;
import com.amazon.opendistroforelasticsearch.jobscheduler.spi.schedule.Schedule;
import com.amazon.opendistroforelasticsearch.jobscheduler.spi.schedule.ScheduleParser;
import com.google.common.base.Objects;

/**
* Anomaly detector job.
*/
public class AnomalyDetectorJob implements ToXContentObject, ScheduledJobParameter {
public class AnomalyDetectorJob implements Writeable, ToXContentObject, ScheduledJobParameter {
enum ScheduleType {
CRON,
INTERVAL
}

public static final String PARSE_FIELD_NAME = "AnomalyDetectorJob";
public static final NamedXContentRegistry.Entry XCONTENT_REGISTRY = new NamedXContentRegistry.Entry(
Expand Down Expand Up @@ -85,6 +94,21 @@ public AnomalyDetectorJob(
this.lockDurationSeconds = lockDurationSeconds;
}

public AnomalyDetectorJob(StreamInput input) throws IOException {
name = input.readString();
if (input.readEnum(AnomalyDetectorJob.ScheduleType.class) == ScheduleType.CRON) {
schedule = new CronSchedule(input);
} else {
schedule = new IntervalSchedule(input);
}
windowDelay = IntervalTimeConfiguration.readFrom(input);
isEnabled = input.readBoolean();
enabledTime = input.readInstant();
disabledTime = input.readInstant();
lastUpdateTime = input.readInstant();
lockDurationSeconds = input.readLong();
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
XContentBuilder xContentBuilder = builder
Expand All @@ -102,6 +126,22 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
return xContentBuilder.endObject();
}

@Override
public void writeTo(StreamOutput output) throws IOException {
output.writeString(name);
if (schedule instanceof CronSchedule) {
output.writeEnum(ScheduleType.CRON);
} else {
output.writeEnum(ScheduleType.INTERVAL);
}
schedule.writeTo(output);
windowDelay.writeTo(output);
output.writeInstant(enabledTime);
output.writeInstant(disabledTime);
output.writeInstant(lastUpdateTime);
output.writeLong(lockDurationSeconds);
}

public static AnomalyDetectorJob parse(XContentParser parser) throws IOException {
String name = null;
Schedule schedule = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,16 @@
import org.apache.commons.lang.builder.EqualsBuilder;
import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.commons.lang.builder.ToStringBuilder;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;

import com.amazon.opendistroforelasticsearch.ad.constant.CommonName;

public class DetectorProfile implements ToXContentObject, Mergeable {
public class DetectorProfile implements Writeable, ToXContentObject, Mergeable {
private DetectorState state;
private String error;
private ModelProfile[] modelProfile;
Expand All @@ -39,6 +42,16 @@ public XContentBuilder toXContent(XContentBuilder builder) throws IOException {
return toXContent(builder, ToXContent.EMPTY_PARAMS);
}

public DetectorProfile(StreamInput in) throws IOException {
this.state = in.readEnum(DetectorState.class);
this.error = in.readString();
this.modelProfile = in.readArray(ModelProfile::new, ModelProfile[]::new);
this.shingleSize = in.readInt();
this.coordinatingNode = in.readString();
this.totalSizeInBytes = in.readLong();
this.initProgress = new InitProgressProfile(in);
}

private DetectorProfile() {}

public static class Builder {
Expand Down Expand Up @@ -101,6 +114,17 @@ public DetectorProfile build() {
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeEnum(state);
out.writeString(error);
out.writeArray(modelProfile);
out.writeInt(shingleSize);
out.writeString(coordinatingNode);
out.writeLong(totalSizeInBytes);
initProgress.writeTo(out);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
XContentBuilder xContentBuilder = builder.startObject();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@

import org.apache.logging.log4j.util.Strings;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
Expand All @@ -33,7 +36,7 @@
/**
* Anomaly detector feature
*/
public class Feature implements ToXContentObject {
public class Feature implements Writeable, ToXContentObject {

private static final String FEATURE_ID_FIELD = "feature_id";
private static final String FEATURE_NAME_FIELD = "feature_name";
Expand Down Expand Up @@ -65,6 +68,21 @@ public Feature(String id, String name, Boolean enabled, AggregationBuilder aggre
this.aggregation = aggregation;
}

public Feature(StreamInput input) throws IOException {
this.id = input.readString();
this.name = input.readString();
this.enabled = input.readBoolean();
this.aggregation = input.readNamedWriteable(AggregationBuilder.class);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(this.id);
out.writeString(this.name);
out.writeBoolean(this.enabled);
aggregation.writeTo(out);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
XContentBuilder xContentBuilder = builder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import java.util.Locale;
import java.util.Set;

import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;

import com.amazon.opendistroforelasticsearch.ad.annotation.Generated;
Expand Down Expand Up @@ -51,6 +53,21 @@ public IntervalTimeConfiguration(long interval, ChronoUnit unit) {
this.unit = unit;
}

public IntervalTimeConfiguration(StreamInput input) throws IOException {
this.interval = input.readLong();
this.unit = input.readEnum(ChronoUnit.class);
}

public static IntervalTimeConfiguration readFrom(StreamInput input) throws IOException {
return new IntervalTimeConfiguration(input);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeLong(this.interval);
out.writeEnum(this.unit);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject().startObject(PERIOD_FIELD).field(INTERVAL_FIELD, interval).field(UNIT_FIELD, unit).endObject().endObject();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@
import java.time.temporal.ChronoUnit;
import java.util.Locale;

import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentParser;

/**
* TimeConfiguration represents the time configuration for a job which runs regularly.
*/
public abstract class TimeConfiguration implements ToXContentObject {
public abstract class TimeConfiguration implements Writeable, ToXContentObject {

public static final String PERIOD_FIELD = "period";
public static final String INTERVAL_FIELD = "interval";
Expand Down
Loading