Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

support return AD job when get detector #50

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ public class AnomalyDetector implements ToXContentObject {
private static final String WINDOW_DELAY_FIELD = "window_delay";
private static final String LAST_UPDATE_TIME_FIELD = "last_update_time";
public static final String UI_METADATA_FIELD = "ui_metadata";
public static final String ENABLED_FIELD = "enabled";

private final String detectorId;
private final Long version;
Expand Down Expand Up @@ -161,9 +160,6 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
.field(DETECTION_INTERVAL_FIELD, detectionInterval)
.field(WINDOW_DELAY_FIELD, windowDelay)
.field(SCHEMA_VERSION_FIELD, schemaVersion);
if (params.param(ENABLED_FIELD) != null) {
xContentBuilder.field(ENABLED_FIELD, params.paramAsBoolean(ENABLED_FIELD, false));
}

if (uiMetadata != null && !uiMetadata.isEmpty()) {
xContentBuilder.field(UI_METADATA_FIELD, uiMetadata);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,13 @@ public class AnomalyDetectorJob implements ToXContentObject, ScheduledJobParamet
private static final String SCHEDULE_FIELD = "schedule";
private static final String IS_ENABLED_FIELD = "enabled";
private static final String ENABLED_TIME_FIELD = "enabled_time";
private static final String DISABLED_TIME_FIELD = "disabled_time";

private final String name;
private final Schedule schedule;
private final Boolean isEnabled;
private final Instant enabledTime;
private final Instant disabledTime;
private final Instant lastUpdateTime;
private final Long lockDurationSeconds;

Expand All @@ -56,13 +58,15 @@ public AnomalyDetectorJob(
Schedule schedule,
Boolean isEnabled,
Instant enabledTime,
Instant disabledTime,
Instant lastUpdateTime,
Long lockDurationSeconds
) {
this.name = name;
this.schedule = schedule;
this.isEnabled = isEnabled;
this.enabledTime = enabledTime;
this.disabledTime = disabledTime;
this.lastUpdateTime = lastUpdateTime;
this.lockDurationSeconds = lockDurationSeconds;
}
Expand All @@ -77,6 +81,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
.field(ENABLED_TIME_FIELD, enabledTime.toEpochMilli())
.field(LAST_UPDATE_TIME_FIELD, lastUpdateTime.toEpochMilli())
.field(LOCK_DURATION_SECONDS, lockDurationSeconds);
if (disabledTime != null) {
xContentBuilder.field(DISABLED_TIME_FIELD, disabledTime.toEpochMilli());
}
return xContentBuilder.endObject();
}

Expand All @@ -85,6 +92,7 @@ public static AnomalyDetectorJob parse(XContentParser parser) throws IOException
Schedule schedule = null;
Boolean isEnabled = null;
kaituo marked this conversation as resolved.
Show resolved Hide resolved
Instant enabledTime = null;
Instant disabledTime = null;
Instant lastUpdateTime = null;
Long lockDurationSeconds = DEFAULT_AD_JOB_LOC_DURATION_SECONDS;

Expand All @@ -106,6 +114,9 @@ public static AnomalyDetectorJob parse(XContentParser parser) throws IOException
case ENABLED_TIME_FIELD:
enabledTime = ParseUtils.toInstant(parser);
break;
case DISABLED_TIME_FIELD:
disabledTime = ParseUtils.toInstant(parser);
break;
case LAST_UPDATE_TIME_FIELD:
lastUpdateTime = ParseUtils.toInstant(parser);
break;
Expand All @@ -117,7 +128,7 @@ public static AnomalyDetectorJob parse(XContentParser parser) throws IOException
break;
}
}
return new AnomalyDetectorJob(name, schedule, isEnabled, enabledTime, lastUpdateTime, lockDurationSeconds);
return new AnomalyDetectorJob(name, schedule, isEnabled, enabledTime, disabledTime, lastUpdateTime, lockDurationSeconds);
}

@Override
Expand All @@ -131,6 +142,7 @@ public boolean equals(Object o) {
&& Objects.equal(getSchedule(), that.getSchedule())
&& Objects.equal(isEnabled(), that.isEnabled())
&& Objects.equal(getEnabledTime(), that.getEnabledTime())
&& Objects.equal(getDisabledTime(), that.getDisabledTime())
&& Objects.equal(getLastUpdateTime(), that.getLastUpdateTime())
&& Objects.equal(getLockDurationSeconds(), that.getLockDurationSeconds());
}
Expand Down Expand Up @@ -160,6 +172,10 @@ public Instant getEnabledTime() {
return enabledTime;
}

public Instant getDisabledTime() {
return disabledTime;
}

@Override
public Instant getLastUpdateTime() {
return lastUpdateTime;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,9 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
String rawPath = request.rawPath();

if (rawPath.endsWith(START_JOB)) {
handler.createAnomalyDetectorJob();
handler.startAnomalyDetectorJob();
} else if (rawPath.endsWith(STOP_JOB)) {
handler.deleteAnomalyDetectorJob(detectorId);
handler.stopAnomalyDetectorJob(detectorId);
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,20 @@
package com.amazon.opendistroforelasticsearch.ad.rest;

import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector;
import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetectorJob;
import com.amazon.opendistroforelasticsearch.ad.rest.handler.AnomalyDetectorActionHandler;
import com.amazon.opendistroforelasticsearch.ad.AnomalyDetectorPlugin;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.BytesRestResponse;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
Expand Down Expand Up @@ -69,10 +74,40 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
return channel -> {
logger.info("Delete anomaly detector {}", detectorId);
handler
.getDetectorJob(clusterService, client, detectorId, channel, () -> deleteAnomalyDetectorDoc(client, detectorId, channel));
.getDetectorJob(
clusterService,
client,
detectorId,
channel,
() -> deleteAnomalyDetectorJobDoc(client, detectorId, channel)
ylwu-amzn marked this conversation as resolved.
Show resolved Hide resolved
);
};
}

private void deleteAnomalyDetectorJobDoc(NodeClient client, String detectorId, RestChannel channel) {
logger.info("Delete anomaly detector job {}", detectorId);
DeleteRequest deleteRequest = new DeleteRequest(AnomalyDetectorJob.ANOMALY_DETECTOR_JOB_INDEX, detectorId)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
client.delete(deleteRequest, ActionListener.wrap(response -> {
if (response.getResult() == DocWriteResponse.Result.DELETED || response.getResult() == DocWriteResponse.Result.NOT_FOUND) {
deleteAnomalyDetectorDoc(client, detectorId, channel);
} else {
logger.error("Fail to delete anomaly detector job {}", detectorId);
}
}, exception -> {
if (exception instanceof IndexNotFoundException) {
deleteAnomalyDetectorDoc(client, detectorId, channel);
} else {
logger.error("Failed to delete anomaly detector job", exception);
try {
channel.sendResponse(new BytesRestResponse(channel, exception));
} catch (IOException e) {
logger.error("Failed to send response of delete anomaly detector job exception", e);
}
}
}));
}

private void deleteAnomalyDetectorDoc(NodeClient client, String detectorId, RestChannel channel) {
logger.info("Delete anomaly detector {}", detectorId);
DeleteRequest deleteRequest = new DeleteRequest(AnomalyDetector.ANOMALY_DETECTORS_INDEX, detectorId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
package com.amazon.opendistroforelasticsearch.ad.rest;

import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector;
import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetectorJob;
import com.amazon.opendistroforelasticsearch.ad.util.RestHandlerUtils;
import com.amazon.opendistroforelasticsearch.ad.AnomalyDetectorPlugin;
import com.google.common.collect.ImmutableMap;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand All @@ -27,12 +27,8 @@
import org.elasticsearch.action.get.MultiGetRequest;
import org.elasticsearch.action.get.MultiGetResponse;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.BytesRestResponse;
import org.elasticsearch.rest.RestChannel;
Expand All @@ -47,9 +43,9 @@
import java.util.Locale;

import static com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector.ANOMALY_DETECTORS_INDEX;
import static com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector.ENABLED_FIELD;
import static com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetectorJob.ANOMALY_DETECTOR_JOB_INDEX;
import static com.amazon.opendistroforelasticsearch.ad.util.RestHandlerUtils.DETECTOR_ID;
import static com.amazon.opendistroforelasticsearch.ad.util.RestHandlerUtils.createXContentParser;
import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken;

/**
Expand All @@ -74,26 +70,31 @@ public String getName() {
@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
String detectorId = request.param(DETECTOR_ID);
boolean returnJob = request.paramAsBoolean("job", false);
MultiGetRequest.Item adItem = new MultiGetRequest.Item(ANOMALY_DETECTORS_INDEX, detectorId)
.version(RestActions.parseVersion(request));
MultiGetRequest.Item adJobItem = new MultiGetRequest.Item(ANOMALY_DETECTOR_JOB_INDEX, detectorId)
.version(RestActions.parseVersion(request));
MultiGetRequest multiGetRequest = new MultiGetRequest().add(adItem).add(adJobItem);
return channel -> client.multiGet(multiGetRequest, onMultiGetResponse(channel));
MultiGetRequest multiGetRequest = new MultiGetRequest().add(adItem);
if (returnJob) {
MultiGetRequest.Item adJobItem = new MultiGetRequest.Item(ANOMALY_DETECTOR_JOB_INDEX, detectorId)
.version(RestActions.parseVersion(request));
multiGetRequest.add(adJobItem);
}

return channel -> client.multiGet(multiGetRequest, onMultiGetResponse(channel, returnJob, detectorId));
}

private ActionListener<MultiGetResponse> onMultiGetResponse(RestChannel channel) {
private ActionListener<MultiGetResponse> onMultiGetResponse(RestChannel channel, boolean returnJob, String detectorId) {
return new RestResponseListener<MultiGetResponse>(channel) {
@Override
public RestResponse buildResponse(MultiGetResponse multiGetResponse) throws Exception {
MultiGetItemResponse[] responses = multiGetResponse.getResponses();
AnomalyDetector detector = null;
XContentBuilder builder = null;
Boolean adJobEnabled = false;
AnomalyDetector detector = null;
AnomalyDetectorJob adJob = null;
for (MultiGetItemResponse response : responses) {
if (ANOMALY_DETECTORS_INDEX.equals(response.getIndex())) {
if (!response.getResponse().isExists()) {
return new BytesRestResponse(RestStatus.NOT_FOUND, channel.newBuilder());
if (response.getResponse() == null || !response.getResponse().isExists()) {
return new BytesRestResponse(RestStatus.NOT_FOUND, "Can't find detector with id: " + detectorId);
}
builder = channel
.newBuilder()
Expand All @@ -103,33 +104,44 @@ public RestResponse buildResponse(MultiGetResponse multiGetResponse) throws Exce
.field(RestHandlerUtils._PRIMARY_TERM, response.getResponse().getPrimaryTerm())
.field(RestHandlerUtils._SEQ_NO, response.getResponse().getSeqNo());
if (!response.getResponse().isSourceEmpty()) {
XContentParser parser = XContentHelper
.createParser(
channel.request().getXContentRegistry(),
LoggingDeprecationHandler.INSTANCE,
response.getResponse().getSourceAsBytesRef(),
XContentType.JSON
);
try {
try (
XContentParser parser = RestHandlerUtils
.createXContentParser(channel, response.getResponse().getSourceAsBytesRef())
) {
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser::getTokenLocation);
detector = parser.namedObject(AnomalyDetector.class, AnomalyDetector.PARSE_FIELD_NAME, null);
} catch (Throwable t) {
logger.error("Fail to parse detector", t);
throw t;
} finally {
parser.close();
return new BytesRestResponse(
RestStatus.INTERNAL_SERVER_ERROR,
"Failed to parse detector with id: " + detectorId
);
}
}
}

if (ANOMALY_DETECTOR_JOB_INDEX.equals(response.getIndex())) {
if (!response.isFailed() && response.getResponse().isExists()) {
adJobEnabled = true;
if (response.getResponse() != null
&& response.getResponse().isExists()
&& !response.getResponse().isSourceEmpty()) {
try (XContentParser parser = createXContentParser(channel, response.getResponse().getSourceAsBytesRef())) {
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser::getTokenLocation);
adJob = AnomalyDetectorJob.parse(parser);
} catch (Throwable t) {
logger.error("Fail to parse detector job ", t);
return new BytesRestResponse(
RestStatus.INTERNAL_SERVER_ERROR,
"Failed to parse detector job with id: " + detectorId
);
}
}
}
ylwu-amzn marked this conversation as resolved.
Show resolved Hide resolved
}
ToXContent.Params params = new ToXContent.MapParams(ImmutableMap.of(ENABLED_FIELD, adJobEnabled.toString()));
builder.field(RestHandlerUtils.ANOMALY_DETECTOR, detector, params);

builder.field(RestHandlerUtils.ANOMALY_DETECTOR, detector);
if (returnJob) {
builder.field(RestHandlerUtils.ANOMALY_DETECTOR_JOB, adJob);
}
builder.endObject();
return new BytesRestResponse(RestStatus.OK, builder);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,24 @@

package com.amazon.opendistroforelasticsearch.ad.rest.handler;

import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetectorJob;
import com.amazon.opendistroforelasticsearch.ad.util.RestHandlerUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.rest.BytesRestResponse;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestStatus;

import java.io.IOException;

import static com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetectorJob.ANOMALY_DETECTOR_JOB_INDEX;
import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken;

/**
* Common handler to process AD request.
Expand Down Expand Up @@ -57,7 +61,7 @@ public void getDetectorJob(
if (clusterService.state().getMetaData().indices().containsKey(ANOMALY_DETECTOR_JOB_INDEX)) {
GetRequest request = new GetRequest(ANOMALY_DETECTOR_JOB_INDEX).id(detectorId);
client.get(request, ActionListener.wrap(response -> onGetAdJobResponseForWrite(response, channel, function), exception -> {
logger.error("Fail to search AD job using detector id" + detectorId, exception);
logger.error("Fail to get anomaly detector job: " + detectorId, exception);
try {
channel.sendResponse(new BytesRestResponse(channel, exception));
} catch (IOException e) {
Expand All @@ -74,8 +78,18 @@ private void onGetAdJobResponseForWrite(GetResponse response, RestChannel channe
String adJobId = response.getId();
if (adJobId != null) {
// check if AD job is running on the detector, if yes, we can't delete the detector
channel.sendResponse(new BytesRestResponse(RestStatus.BAD_REQUEST, "Detector job is running: " + adJobId));
return;
try (XContentParser parser = RestHandlerUtils.createXContentParser(channel, response.getSourceAsBytesRef())) {
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser::getTokenLocation);
AnomalyDetectorJob adJob = AnomalyDetectorJob.parse(parser);
if (adJob.isEnabled()) {
channel.sendResponse(new BytesRestResponse(RestStatus.BAD_REQUEST, "Detector job is running: " + adJobId));
return;
}
} catch (IOException e) {
String message = "Failed to parse anomaly detector job " + adJobId;
logger.error(message, e);
channel.sendResponse(new BytesRestResponse(RestStatus.BAD_REQUEST, message));
}
}
}
function.execute();
Expand Down
Loading