diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorPlugin.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorPlugin.java index 91458864..7f5053d7 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorPlugin.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorPlugin.java @@ -101,6 +101,8 @@ import com.amazon.opendistroforelasticsearch.ad.stats.suppliers.SettableSupplier; import com.amazon.opendistroforelasticsearch.ad.transport.ADStatsNodesAction; import com.amazon.opendistroforelasticsearch.ad.transport.ADStatsNodesTransportAction; +import com.amazon.opendistroforelasticsearch.ad.transport.AnomalyDetectorJobAction; +import com.amazon.opendistroforelasticsearch.ad.transport.AnomalyDetectorJobTransportAction; import com.amazon.opendistroforelasticsearch.ad.transport.AnomalyResultAction; import com.amazon.opendistroforelasticsearch.ad.transport.AnomalyResultTransportAction; import com.amazon.opendistroforelasticsearch.ad.transport.CronAction; @@ -230,11 +232,7 @@ public List getRestHandlers( anomalyDetectorRunner ); RestStatsAnomalyDetectorAction statsAnomalyDetectorAction = new RestStatsAnomalyDetectorAction(adStats, this.nodeFilter); - RestAnomalyDetectorJobAction anomalyDetectorJobAction = new RestAnomalyDetectorJobAction( - settings, - clusterService, - anomalyDetectionIndices - ); + RestAnomalyDetectorJobAction anomalyDetectorJobAction = new RestAnomalyDetectorJobAction(settings, clusterService); return ImmutableList .of( @@ -474,7 +472,8 @@ public List getNamedXContent() { new ActionHandler<>(StatsAnomalyDetectorAction.INSTANCE, StatsAnomalyDetectorTransportAction.class), new ActionHandler<>(DeleteAnomalyDetectorAction.INSTANCE, DeleteAnomalyDetectorTransportAction.class), new ActionHandler<>(GetAnomalyDetectorAction.INSTANCE, GetAnomalyDetectorTransportAction.class), - new ActionHandler<>(IndexAnomalyDetectorAction.INSTANCE, IndexAnomalyDetectorTransportAction.class) + new ActionHandler<>(IndexAnomalyDetectorAction.INSTANCE, IndexAnomalyDetectorTransportAction.class), + new ActionHandler<>(AnomalyDetectorJobAction.INSTANCE, AnomalyDetectorJobTransportAction.class) ); } diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/RestAnomalyDetectorJobAction.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/RestAnomalyDetectorJobAction.java index a418a81f..262c6803 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/RestAnomalyDetectorJobAction.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/RestAnomalyDetectorJobAction.java @@ -33,12 +33,13 @@ import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.action.RestToXContentListener; import com.amazon.opendistroforelasticsearch.ad.AnomalyDetectorPlugin; import com.amazon.opendistroforelasticsearch.ad.constant.CommonErrorMessages; -import com.amazon.opendistroforelasticsearch.ad.indices.AnomalyDetectionIndices; -import com.amazon.opendistroforelasticsearch.ad.rest.handler.IndexAnomalyDetectorJobActionHandler; import com.amazon.opendistroforelasticsearch.ad.settings.EnabledSetting; +import com.amazon.opendistroforelasticsearch.ad.transport.AnomalyDetectorJobAction; +import com.amazon.opendistroforelasticsearch.ad.transport.AnomalyDetectorJobRequest; import com.google.common.collect.ImmutableList; /** @@ -48,10 +49,8 @@ public class RestAnomalyDetectorJobAction extends BaseRestHandler { public static final String AD_JOB_ACTION = "anomaly_detector_job_action"; private volatile TimeValue requestTimeout; - private final AnomalyDetectionIndices anomalyDetectionIndices; - public RestAnomalyDetectorJobAction(Settings settings, ClusterService clusterService, AnomalyDetectionIndices anomalyDetectionIndices) { - this.anomalyDetectionIndices = anomalyDetectionIndices; + public RestAnomalyDetectorJobAction(Settings settings, ClusterService clusterService) { this.requestTimeout = REQUEST_TIMEOUT.get(settings); clusterService.getClusterSettings().addSettingsUpdateConsumer(REQUEST_TIMEOUT, it -> requestTimeout = it); } @@ -68,29 +67,14 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli } String detectorId = request.param(DETECTOR_ID); + long seqNo = request.paramAsLong(IF_SEQ_NO, SequenceNumbers.UNASSIGNED_SEQ_NO); + long primaryTerm = request.paramAsLong(IF_PRIMARY_TERM, SequenceNumbers.UNASSIGNED_PRIMARY_TERM); + String rawPath = request.rawPath(); - return channel -> { - long seqNo = request.paramAsLong(IF_SEQ_NO, SequenceNumbers.UNASSIGNED_SEQ_NO); - long primaryTerm = request.paramAsLong(IF_PRIMARY_TERM, SequenceNumbers.UNASSIGNED_PRIMARY_TERM); + AnomalyDetectorJobRequest anomalyDetectorJobRequest = new AnomalyDetectorJobRequest(detectorId, seqNo, primaryTerm, rawPath); - IndexAnomalyDetectorJobActionHandler handler = new IndexAnomalyDetectorJobActionHandler( - client, - channel, - anomalyDetectionIndices, - detectorId, - seqNo, - primaryTerm, - requestTimeout - ); - - String rawPath = request.rawPath(); - - if (rawPath.endsWith(START_JOB)) { - handler.startAnomalyDetectorJob(); - } else if (rawPath.endsWith(STOP_JOB)) { - handler.stopAnomalyDetectorJob(detectorId); - } - }; + return channel -> client + .execute(AnomalyDetectorJobAction.INSTANCE, anomalyDetectorJobRequest, new RestToXContentListener<>(channel)); } @Override diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/handler/AbstractActionHandler.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/handler/AbstractActionHandler.java deleted file mode 100644 index e8ccf40f..00000000 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/handler/AbstractActionHandler.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. - */ - -package com.amazon.opendistroforelasticsearch.ad.rest.handler; - -import java.io.IOException; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.elasticsearch.client.node.NodeClient; -import org.elasticsearch.rest.BytesRestResponse; -import org.elasticsearch.rest.RestChannel; - -/** - * Action handler to process REST request and handle failures. - */ -public abstract class AbstractActionHandler { - - protected final NodeClient client; - protected final RestChannel channel; - - private final Logger logger = LogManager.getLogger(AbstractActionHandler.class); - - /** - * Constructor function. - * - * @param client ES node client that executes actions on the local node - * @param channel ES channel used to construct bytes / builder based outputs, and send responses - */ - public AbstractActionHandler(NodeClient client, RestChannel channel) { - this.client = client; - this.channel = channel; - } - - /** - * Send failure message via channel. - * - * @param e exception - */ - public void onFailure(Exception e) { - if (e != null) { - try { - channel.sendResponse(new BytesRestResponse(channel, e)); - } catch (IOException e1) { - logger.warn("Fail to send out failure message of exception", e); - } - } - } -} diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/handler/IndexAnomalyDetectorJobActionHandler.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/handler/IndexAnomalyDetectorJobActionHandler.java index 4cf0894c..9e09aadb 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/handler/IndexAnomalyDetectorJobActionHandler.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/handler/IndexAnomalyDetectorJobActionHandler.java @@ -16,11 +16,8 @@ package com.amazon.opendistroforelasticsearch.ad.rest.handler; import static com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector.ANOMALY_DETECTORS_INDEX; -import static com.amazon.opendistroforelasticsearch.ad.util.RestHandlerUtils.XCONTENT_WITH_TYPE; -import static com.amazon.opendistroforelasticsearch.ad.util.RestHandlerUtils.createXContentParser; import static org.elasticsearch.action.DocWriteResponse.Result.CREATED; import static org.elasticsearch.action.DocWriteResponse.Result.UPDATED; -import static org.elasticsearch.common.xcontent.ToXContent.EMPTY_PARAMS; import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken; import java.io.IOException; @@ -29,6 +26,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.get.GetRequest; @@ -36,18 +34,19 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.support.WriteRequest; -import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.action.support.replication.ReplicationResponse; +import org.elasticsearch.client.Client; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.rest.BytesRestResponse; -import org.elasticsearch.rest.RestChannel; import org.elasticsearch.rest.RestStatus; import com.amazon.opendistroforelasticsearch.ad.indices.AnomalyDetectionIndices; import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector; import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetectorJob; import com.amazon.opendistroforelasticsearch.ad.model.IntervalTimeConfiguration; +import com.amazon.opendistroforelasticsearch.ad.transport.AnomalyDetectorJobResponse; import com.amazon.opendistroforelasticsearch.ad.transport.StopDetectorAction; import com.amazon.opendistroforelasticsearch.ad.transport.StopDetectorRequest; import com.amazon.opendistroforelasticsearch.ad.transport.StopDetectorResponse; @@ -58,12 +57,15 @@ /** * Anomaly detector job REST action handler to process POST/PUT request. */ -public class IndexAnomalyDetectorJobActionHandler extends AbstractActionHandler { +public class IndexAnomalyDetectorJobActionHandler { private final AnomalyDetectionIndices anomalyDetectionIndices; private final String detectorId; private final Long seqNo; private final Long primaryTerm; + private final Client client; + private final ActionListener listener; + private final NamedXContentRegistry xContentRegistry; private final Logger logger = LogManager.getLogger(IndexAnomalyDetectorJobActionHandler.class); private final TimeValue requestTimeout; @@ -72,28 +74,32 @@ public class IndexAnomalyDetectorJobActionHandler extends AbstractActionHandler * Constructor function. * * @param client ES node client that executes actions on the local node - * @param channel ES channel used to construct bytes / builder based outputs, and send responses + * @param listener Listener to send responses * @param anomalyDetectionIndices anomaly detector index manager * @param detectorId detector identifier * @param seqNo sequence number of last modification * @param primaryTerm primary term of last modification * @param requestTimeout request time out configuration + * @param xContentRegistry Registry which is used for XContentParser */ public IndexAnomalyDetectorJobActionHandler( - NodeClient client, - RestChannel channel, + Client client, + ActionListener listener, AnomalyDetectionIndices anomalyDetectionIndices, String detectorId, Long seqNo, Long primaryTerm, - TimeValue requestTimeout + TimeValue requestTimeout, + NamedXContentRegistry xContentRegistry ) { - super(client, channel); + this.client = client; + this.listener = listener; this.anomalyDetectionIndices = anomalyDetectionIndices; this.detectorId = detectorId; this.seqNo = seqNo; this.primaryTerm = primaryTerm; this.requestTimeout = requestTimeout; + this.xContentRegistry = xContentRegistry; } /** @@ -107,7 +113,7 @@ public void startAnomalyDetectorJob() throws IOException { if (!anomalyDetectionIndices.doesAnomalyDetectorJobIndexExist()) { anomalyDetectionIndices .initAnomalyDetectorJobIndex( - ActionListener.wrap(response -> onCreateMappingsResponse(response), exception -> onFailure(exception)) + ActionListener.wrap(response -> onCreateMappingsResponse(response), exception -> listener.onFailure(exception)) ); } else { prepareAnomalyDetectorJobIndexing(); @@ -120,34 +126,40 @@ private void onCreateMappingsResponse(CreateIndexResponse response) throws IOExc prepareAnomalyDetectorJobIndexing(); } else { logger.warn("Created {} with mappings call not acknowledged.", ANOMALY_DETECTORS_INDEX); - channel - .sendResponse( - new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, response.toXContent(channel.newErrorBuilder(), EMPTY_PARAMS)) + listener + .onFailure( + new ElasticsearchStatusException( + "Created " + ANOMALY_DETECTORS_INDEX + " with mappings call not acknowledged.", + RestStatus.INTERNAL_SERVER_ERROR + ) ); } } private void prepareAnomalyDetectorJobIndexing() { GetRequest getRequest = new GetRequest(AnomalyDetector.ANOMALY_DETECTORS_INDEX).id(detectorId); - client.get(getRequest, ActionListener.wrap(response -> onGetAnomalyDetectorResponse(response), exception -> onFailure(exception))); + client + .get( + getRequest, + ActionListener.wrap(response -> onGetAnomalyDetectorResponse(response), exception -> listener.onFailure(exception)) + ); } private void onGetAnomalyDetectorResponse(GetResponse response) throws IOException { if (!response.isExists()) { - XContentBuilder builder = channel - .newErrorBuilder() - .startObject() - .field("Message", "AnomalyDetector is not found with id: " + detectorId) - .endObject(); - channel.sendResponse(new BytesRestResponse(RestStatus.NOT_FOUND, response.toXContent(builder, EMPTY_PARAMS))); + listener + .onFailure(new ElasticsearchStatusException("AnomalyDetector is not found with id: " + detectorId, RestStatus.NOT_FOUND)); return; } - try (XContentParser parser = RestHandlerUtils.createXContentParser(channel, response.getSourceAsBytesRef())) { + try (XContentParser parser = RestHandlerUtils.createXContentParserFromRegistry(xContentRegistry, response.getSourceAsBytesRef())) { ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser::getTokenLocation); AnomalyDetector detector = AnomalyDetector.parse(parser, response.getId(), response.getVersion()); if (detector.getFeatureAttributes().size() == 0) { - channel.sendResponse(new BytesRestResponse(RestStatus.BAD_REQUEST, "Can't start detector job as no features configured")); + listener + .onFailure( + new ElasticsearchStatusException("Can't start detector job as no features configured", RestStatus.BAD_REQUEST) + ); return; } @@ -170,7 +182,7 @@ private void onGetAnomalyDetectorResponse(GetResponse response) throws IOExcepti } catch (IOException e) { String message = "Failed to parse anomaly detector job " + detectorId; logger.error(message, e); - channel.sendResponse(new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, message)); + listener.onFailure(new ElasticsearchStatusException(message, RestStatus.INTERNAL_SERVER_ERROR)); } } @@ -180,17 +192,22 @@ private void getAnomalyDetectorJobForWrite(AnomalyDetectorJob job) { client .get( getRequest, - ActionListener.wrap(response -> onGetAnomalyDetectorJobForWrite(response, job), exception -> onFailure(exception)) + ActionListener.wrap(response -> onGetAnomalyDetectorJobForWrite(response, job), exception -> listener.onFailure(exception)) ); } private void onGetAnomalyDetectorJobForWrite(GetResponse response, AnomalyDetectorJob job) throws IOException { if (response.isExists()) { - try (XContentParser parser = createXContentParser(channel, response.getSourceAsBytesRef())) { + try ( + XContentParser parser = RestHandlerUtils.createXContentParserFromRegistry(xContentRegistry, response.getSourceAsBytesRef()) + ) { ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser::getTokenLocation); AnomalyDetectorJob currentAdJob = AnomalyDetectorJob.parse(parser); if (currentAdJob.isEnabled()) { - channel.sendResponse(new BytesRestResponse(RestStatus.OK, "Anomaly detector job is already running: " + detectorId)); + listener + .onFailure( + new ElasticsearchStatusException("Anomaly detector job is already running: " + detectorId, RestStatus.OK) + ); return; } else { AnomalyDetectorJob newJob = new AnomalyDetectorJob( @@ -208,7 +225,7 @@ private void onGetAnomalyDetectorJobForWrite(GetResponse response, AnomalyDetect } catch (IOException e) { String message = "Failed to parse anomaly detector job " + job.getName(); logger.error(message, e); - channel.sendResponse(new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, message)); + listener.onFailure(new ElasticsearchStatusException(message, RestStatus.INTERNAL_SERVER_ERROR)); } } else { indexAnomalyDetectorJob(job, null); @@ -218,7 +235,7 @@ private void onGetAnomalyDetectorJobForWrite(GetResponse response, AnomalyDetect private void indexAnomalyDetectorJob(AnomalyDetectorJob job, AnomalyDetectorFunction function) throws IOException { IndexRequest indexRequest = new IndexRequest(AnomalyDetectorJob.ANOMALY_DETECTOR_JOB_INDEX) .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) - .source(job.toXContent(channel.newBuilder(), XCONTENT_WITH_TYPE)) + .source(job.toXContent(XContentFactory.jsonBuilder(), RestHandlerUtils.XCONTENT_WITH_TYPE)) .setIfSeqNo(seqNo) .setIfPrimaryTerm(primaryTerm) .timeout(requestTimeout) @@ -226,27 +243,28 @@ private void indexAnomalyDetectorJob(AnomalyDetectorJob job, AnomalyDetectorFunc client .index( indexRequest, - ActionListener.wrap(response -> onIndexAnomalyDetectorJobResponse(response, function), exception -> onFailure(exception)) + ActionListener + .wrap(response -> onIndexAnomalyDetectorJobResponse(response, function), exception -> listener.onFailure(exception)) ); } private void onIndexAnomalyDetectorJobResponse(IndexResponse response, AnomalyDetectorFunction function) throws IOException { if (response == null || (response.getResult() != CREATED && response.getResult() != UPDATED)) { - channel.sendResponse(new BytesRestResponse(response.status(), response.toXContent(channel.newErrorBuilder(), EMPTY_PARAMS))); + String errorMsg = checkShardsFailure(response); + listener.onFailure(new ElasticsearchStatusException(errorMsg, response.status())); return; } if (function != null) { function.execute(); } else { - XContentBuilder builder = channel - .newBuilder() - .startObject() - .field(RestHandlerUtils._ID, response.getId()) - .field(RestHandlerUtils._VERSION, response.getVersion()) - .field(RestHandlerUtils._SEQ_NO, response.getSeqNo()) - .field(RestHandlerUtils._PRIMARY_TERM, response.getPrimaryTerm()) - .endObject(); - channel.sendResponse(new BytesRestResponse(RestStatus.OK, builder)); + AnomalyDetectorJobResponse anomalyDetectorJobResponse = new AnomalyDetectorJobResponse( + response.getId(), + response.getVersion(), + response.getSeqNo(), + response.getPrimaryTerm(), + RestStatus.OK + ); + listener.onResponse(anomalyDetectorJobResponse); } } @@ -262,12 +280,17 @@ public void stopAnomalyDetectorJob(String detectorId) { client.get(getRequest, ActionListener.wrap(response -> { if (response.isExists()) { - try (XContentParser parser = createXContentParser(channel, response.getSourceAsBytesRef())) { + try ( + XContentParser parser = RestHandlerUtils + .createXContentParserFromRegistry(xContentRegistry, response.getSourceAsBytesRef()) + ) { ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser::getTokenLocation); AnomalyDetectorJob job = AnomalyDetectorJob.parse(parser); if (!job.isEnabled()) { - channel - .sendResponse(new BytesRestResponse(RestStatus.OK, "Anomaly detector job is already stopped: " + detectorId)); + listener + .onFailure( + new ElasticsearchStatusException("Anomaly detector job is already stopped: " + detectorId, RestStatus.OK) + ); return; } else { AnomalyDetectorJob newJob = new AnomalyDetectorJob( @@ -286,40 +309,56 @@ public void stopAnomalyDetectorJob(String detectorId) { .execute( StopDetectorAction.INSTANCE, new StopDetectorRequest(detectorId), - stopAdDetectorListener(channel, detectorId) + stopAdDetectorListener(detectorId) ) ); } } catch (IOException e) { String message = "Failed to parse anomaly detector job " + detectorId; logger.error(message, e); - channel.sendResponse(new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, message)); + listener.onFailure(new ElasticsearchStatusException(message, RestStatus.INTERNAL_SERVER_ERROR)); } } else { - channel.sendResponse(new BytesRestResponse(RestStatus.BAD_REQUEST, "Anomaly detector job not exist: " + detectorId)); + listener + .onFailure(new ElasticsearchStatusException("Anomaly detector job not exist: " + detectorId, RestStatus.BAD_REQUEST)); } - }, exception -> onFailure(exception))); + }, exception -> listener.onFailure(exception))); } - private ActionListener stopAdDetectorListener(RestChannel channel, String detectorId) { + private ActionListener stopAdDetectorListener(String detectorId) { return new ActionListener() { @Override public void onResponse(StopDetectorResponse stopDetectorResponse) { if (stopDetectorResponse.success()) { logger.info("AD model deleted successfully for detector {}", detectorId); - channel.sendResponse(new BytesRestResponse(RestStatus.OK, "Stopped detector: " + detectorId)); + AnomalyDetectorJobResponse anomalyDetectorJobResponse = new AnomalyDetectorJobResponse(null, 0, 0, 0, RestStatus.OK); + listener.onResponse(anomalyDetectorJobResponse); } else { logger.error("Failed to delete AD model for detector {}", detectorId); - channel.sendResponse(new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, "Failed to delete AD model")); + listener.onFailure(new ElasticsearchStatusException("Failed to delete AD model", RestStatus.INTERNAL_SERVER_ERROR)); } } @Override public void onFailure(Exception e) { logger.error("Failed to delete AD model for detector " + detectorId, e); - channel.sendResponse(new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, "Failed to execute stop detector action")); + listener + .onFailure( + new ElasticsearchStatusException("Failed to execute stop detector action", RestStatus.INTERNAL_SERVER_ERROR) + ); } }; } + private String checkShardsFailure(IndexResponse response) { + StringBuilder failureReasons = new StringBuilder(); + if (response.getShardInfo().getFailed() > 0) { + for (ReplicationResponse.ShardInfo.Failure failure : response.getShardInfo().getFailures()) { + failureReasons.append(failure); + } + return failureReasons.toString(); + } + return null; + } + } diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/AnomalyDetectorJobAction.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/AnomalyDetectorJobAction.java new file mode 100644 index 00000000..5cf658e0 --- /dev/null +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/AnomalyDetectorJobAction.java @@ -0,0 +1,28 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.ad.transport; + +import org.elasticsearch.action.ActionType; + +public class AnomalyDetectorJobAction extends ActionType { + public static final AnomalyDetectorJobAction INSTANCE = new AnomalyDetectorJobAction(); + public static final String NAME = "cluster:admin/opendistro/ad/detector/jobmanagement"; + + private AnomalyDetectorJobAction() { + super(NAME, AnomalyDetectorJobResponse::new); + } + +} diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/AnomalyDetectorJobRequest.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/AnomalyDetectorJobRequest.java new file mode 100644 index 00000000..586a538e --- /dev/null +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/AnomalyDetectorJobRequest.java @@ -0,0 +1,77 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.ad.transport; + +import java.io.IOException; + +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +public class AnomalyDetectorJobRequest extends ActionRequest { + + private String detectorID; + private long seqNo; + private long primaryTerm; + private String rawPath; + + public AnomalyDetectorJobRequest(StreamInput in) throws IOException { + super(in); + detectorID = in.readString(); + seqNo = in.readLong(); + primaryTerm = in.readLong(); + rawPath = in.readString(); + } + + public AnomalyDetectorJobRequest(String detectorID, long seqNo, long primaryTerm, String rawPath) { + super(); + this.detectorID = detectorID; + this.seqNo = seqNo; + this.primaryTerm = primaryTerm; + this.rawPath = rawPath; + } + + public String getDetectorID() { + return detectorID; + } + + public long getSeqNo() { + return seqNo; + } + + public long getPrimaryTerm() { + return primaryTerm; + } + + public String getRawPath() { + return rawPath; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(detectorID); + out.writeLong(seqNo); + out.writeLong(primaryTerm); + out.writeString(rawPath); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } +} diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/AnomalyDetectorJobResponse.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/AnomalyDetectorJobResponse.java new file mode 100644 index 00000000..99a4a73a --- /dev/null +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/AnomalyDetectorJobResponse.java @@ -0,0 +1,76 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.ad.transport; + +import java.io.IOException; + +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.rest.RestStatus; + +import com.amazon.opendistroforelasticsearch.ad.util.RestHandlerUtils; + +public class AnomalyDetectorJobResponse extends ActionResponse implements ToXContentObject { + private final String id; + private final long version; + private final long seqNo; + private final long primaryTerm; + private final RestStatus restStatus; + + public AnomalyDetectorJobResponse(StreamInput in) throws IOException { + super(in); + id = in.readString(); + version = in.readLong(); + seqNo = in.readLong(); + primaryTerm = in.readLong(); + restStatus = in.readEnum(RestStatus.class); + } + + public AnomalyDetectorJobResponse(String id, long version, long seqNo, long primaryTerm, RestStatus restStatus) { + this.id = id; + this.version = version; + this.seqNo = seqNo; + this.primaryTerm = primaryTerm; + this.restStatus = restStatus; + } + + public String getId() { + return id; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(id); + out.writeLong(version); + out.writeLong(seqNo); + out.writeLong(primaryTerm); + out.writeEnum(restStatus); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + return builder + .startObject() + .field(RestHandlerUtils._ID, id) + .field(RestHandlerUtils._VERSION, version) + .field(RestHandlerUtils._SEQ_NO, seqNo) + .field(RestHandlerUtils._PRIMARY_TERM, primaryTerm) + .endObject(); + } +} diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/AnomalyDetectorJobTransportAction.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/AnomalyDetectorJobTransportAction.java new file mode 100644 index 00000000..7f94c6d9 --- /dev/null +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/AnomalyDetectorJobTransportAction.java @@ -0,0 +1,88 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.ad.transport; + +import static com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings.REQUEST_TIMEOUT; + +import java.io.IOException; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.transport.TransportService; + +import com.amazon.opendistroforelasticsearch.ad.indices.AnomalyDetectionIndices; +import com.amazon.opendistroforelasticsearch.ad.rest.handler.IndexAnomalyDetectorJobActionHandler; +import com.amazon.opendistroforelasticsearch.ad.util.RestHandlerUtils; + +public class AnomalyDetectorJobTransportAction extends HandledTransportAction { + + private final Client client; + private final Settings settings; + private final AnomalyDetectionIndices anomalyDetectionIndices; + private final NamedXContentRegistry xContentRegistry; + + @Inject + public AnomalyDetectorJobTransportAction( + TransportService transportService, + ActionFilters actionFilters, + Client client, + Settings settings, + AnomalyDetectionIndices anomalyDetectionIndices, + NamedXContentRegistry xContentRegistry + ) { + super(AnomalyDetectorJobAction.NAME, transportService, actionFilters, AnomalyDetectorJobRequest::new); + this.client = client; + this.settings = settings; + this.anomalyDetectionIndices = anomalyDetectionIndices; + this.xContentRegistry = xContentRegistry; + } + + @Override + protected void doExecute(Task task, AnomalyDetectorJobRequest request, ActionListener listener) { + String detectorId = request.getDetectorID(); + long seqNo = request.getSeqNo(); + long primaryTerm = request.getPrimaryTerm(); + String rawPath = request.getRawPath(); + TimeValue requestTimeout = REQUEST_TIMEOUT.get(settings); + + IndexAnomalyDetectorJobActionHandler handler = new IndexAnomalyDetectorJobActionHandler( + client, + listener, + anomalyDetectionIndices, + detectorId, + seqNo, + primaryTerm, + requestTimeout, + xContentRegistry + ); + try { + if (rawPath.endsWith(RestHandlerUtils.START_JOB)) { + handler.startAnomalyDetectorJob(); + } else if (rawPath.endsWith(RestHandlerUtils.STOP_JOB)) { + handler.stopAnomalyDetectorJob(detectorId); + } + } catch (IOException e) { + logger.error(e); + } + } +} diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/AnomalyDetectorJobActionTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/AnomalyDetectorJobActionTests.java new file mode 100644 index 00000000..26ac5859 --- /dev/null +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/AnomalyDetectorJobActionTests.java @@ -0,0 +1,107 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.ad.transport; + +import static org.mockito.Mockito.mock; + +import java.io.IOException; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.transport.TransportService; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import com.amazon.opendistroforelasticsearch.ad.indices.AnomalyDetectionIndices; + +public class AnomalyDetectorJobActionTests extends ESIntegTestCase { + private AnomalyDetectorJobTransportAction action; + private Task task; + private AnomalyDetectorJobRequest request; + private ActionListener response; + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + + action = new AnomalyDetectorJobTransportAction( + mock(TransportService.class), + mock(ActionFilters.class), + client(), + indexSettings(), + mock(AnomalyDetectionIndices.class), + xContentRegistry() + ); + task = mock(Task.class); + request = new AnomalyDetectorJobRequest("1234", 4567, 7890, "_start"); + response = new ActionListener() { + @Override + public void onResponse(AnomalyDetectorJobResponse adResponse) { + // Will not be called as there is no detector + Assert.assertTrue(true); + } + + @Override + public void onFailure(Exception e) { + // Will not be called as there is no detector + Assert.assertTrue(true); + } + }; + } + + @Test + public void testStartAdJobTransportAction() { + action.doExecute(task, request, response); + } + + @Test + public void testStopAdJobTransportAction() { + AnomalyDetectorJobRequest stopRequest = new AnomalyDetectorJobRequest("1234", 4567, 7890, "_stop"); + action.doExecute(task, stopRequest, response); + } + + @Test + public void testAdJobAction() { + Assert.assertNotNull(AnomalyDetectorJobAction.INSTANCE.name()); + Assert.assertEquals(AnomalyDetectorJobAction.INSTANCE.name(), AnomalyDetectorJobAction.NAME); + } + + @Test + public void testAdJobRequest() throws IOException { + BytesStreamOutput out = new BytesStreamOutput(); + request.writeTo(out); + StreamInput input = out.bytes().streamInput(); + AnomalyDetectorJobRequest newRequest = new AnomalyDetectorJobRequest(input); + Assert.assertEquals(request.getDetectorID(), newRequest.getDetectorID()); + } + + @Test + public void testAdJobResponse() throws IOException { + BytesStreamOutput out = new BytesStreamOutput(); + AnomalyDetectorJobResponse response = new AnomalyDetectorJobResponse("1234", 45, 67, 890, RestStatus.OK); + response.writeTo(out); + StreamInput input = out.bytes().streamInput(); + AnomalyDetectorJobResponse newResponse = new AnomalyDetectorJobResponse(input); + Assert.assertEquals(response.getId(), newResponse.getId()); + } +}