Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
Adding RestActions support for Start/Stop Detector API (#244)
Browse files Browse the repository at this point in the history
* Adding RestActions support for Start/Stop Detector API
  • Loading branch information
saratvemulapalli authored Oct 13, 2020
1 parent 5f8b7fa commit aaba77d
Show file tree
Hide file tree
Showing 9 changed files with 484 additions and 147 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@
import com.amazon.opendistroforelasticsearch.ad.stats.suppliers.SettableSupplier;
import com.amazon.opendistroforelasticsearch.ad.transport.ADStatsNodesAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ADStatsNodesTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.AnomalyDetectorJobAction;
import com.amazon.opendistroforelasticsearch.ad.transport.AnomalyDetectorJobTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.AnomalyResultAction;
import com.amazon.opendistroforelasticsearch.ad.transport.AnomalyResultTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.CronAction;
Expand Down Expand Up @@ -230,11 +232,7 @@ public List<RestHandler> getRestHandlers(
anomalyDetectorRunner
);
RestStatsAnomalyDetectorAction statsAnomalyDetectorAction = new RestStatsAnomalyDetectorAction(adStats, this.nodeFilter);
RestAnomalyDetectorJobAction anomalyDetectorJobAction = new RestAnomalyDetectorJobAction(
settings,
clusterService,
anomalyDetectionIndices
);
RestAnomalyDetectorJobAction anomalyDetectorJobAction = new RestAnomalyDetectorJobAction(settings, clusterService);

return ImmutableList
.of(
Expand Down Expand Up @@ -474,7 +472,8 @@ public List<NamedXContentRegistry.Entry> getNamedXContent() {
new ActionHandler<>(StatsAnomalyDetectorAction.INSTANCE, StatsAnomalyDetectorTransportAction.class),
new ActionHandler<>(DeleteAnomalyDetectorAction.INSTANCE, DeleteAnomalyDetectorTransportAction.class),
new ActionHandler<>(GetAnomalyDetectorAction.INSTANCE, GetAnomalyDetectorTransportAction.class),
new ActionHandler<>(IndexAnomalyDetectorAction.INSTANCE, IndexAnomalyDetectorTransportAction.class)
new ActionHandler<>(IndexAnomalyDetectorAction.INSTANCE, IndexAnomalyDetectorTransportAction.class),
new ActionHandler<>(AnomalyDetectorJobAction.INSTANCE, AnomalyDetectorJobTransportAction.class)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,13 @@
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestToXContentListener;

import com.amazon.opendistroforelasticsearch.ad.AnomalyDetectorPlugin;
import com.amazon.opendistroforelasticsearch.ad.constant.CommonErrorMessages;
import com.amazon.opendistroforelasticsearch.ad.indices.AnomalyDetectionIndices;
import com.amazon.opendistroforelasticsearch.ad.rest.handler.IndexAnomalyDetectorJobActionHandler;
import com.amazon.opendistroforelasticsearch.ad.settings.EnabledSetting;
import com.amazon.opendistroforelasticsearch.ad.transport.AnomalyDetectorJobAction;
import com.amazon.opendistroforelasticsearch.ad.transport.AnomalyDetectorJobRequest;
import com.google.common.collect.ImmutableList;

/**
Expand All @@ -48,10 +49,8 @@ public class RestAnomalyDetectorJobAction extends BaseRestHandler {

public static final String AD_JOB_ACTION = "anomaly_detector_job_action";
private volatile TimeValue requestTimeout;
private final AnomalyDetectionIndices anomalyDetectionIndices;

public RestAnomalyDetectorJobAction(Settings settings, ClusterService clusterService, AnomalyDetectionIndices anomalyDetectionIndices) {
this.anomalyDetectionIndices = anomalyDetectionIndices;
public RestAnomalyDetectorJobAction(Settings settings, ClusterService clusterService) {
this.requestTimeout = REQUEST_TIMEOUT.get(settings);
clusterService.getClusterSettings().addSettingsUpdateConsumer(REQUEST_TIMEOUT, it -> requestTimeout = it);
}
Expand All @@ -68,29 +67,14 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
}

String detectorId = request.param(DETECTOR_ID);
long seqNo = request.paramAsLong(IF_SEQ_NO, SequenceNumbers.UNASSIGNED_SEQ_NO);
long primaryTerm = request.paramAsLong(IF_PRIMARY_TERM, SequenceNumbers.UNASSIGNED_PRIMARY_TERM);
String rawPath = request.rawPath();

return channel -> {
long seqNo = request.paramAsLong(IF_SEQ_NO, SequenceNumbers.UNASSIGNED_SEQ_NO);
long primaryTerm = request.paramAsLong(IF_PRIMARY_TERM, SequenceNumbers.UNASSIGNED_PRIMARY_TERM);
AnomalyDetectorJobRequest anomalyDetectorJobRequest = new AnomalyDetectorJobRequest(detectorId, seqNo, primaryTerm, rawPath);

IndexAnomalyDetectorJobActionHandler handler = new IndexAnomalyDetectorJobActionHandler(
client,
channel,
anomalyDetectionIndices,
detectorId,
seqNo,
primaryTerm,
requestTimeout
);

String rawPath = request.rawPath();

if (rawPath.endsWith(START_JOB)) {
handler.startAnomalyDetectorJob();
} else if (rawPath.endsWith(STOP_JOB)) {
handler.stopAnomalyDetectorJob(detectorId);
}
};
return channel -> client
.execute(AnomalyDetectorJobAction.INSTANCE, anomalyDetectorJobRequest, new RestToXContentListener<>(channel));
}

@Override
Expand Down

This file was deleted.

Loading

0 comments on commit aaba77d

Please sign in to comment.