Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

record error and execution start/end time in AD result; handle except… #59

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import com.amazon.opendistroforelasticsearch.ad.transport.StopDetectorTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ThresholdResultAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ThresholdResultTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.handler.AnomalyResultHandler;
import com.amazon.opendistroforelasticsearch.ad.util.IndexUtils;
import com.amazon.opendistroforelasticsearch.jobscheduler.spi.JobSchedulerExtension;
import com.amazon.opendistroforelasticsearch.jobscheduler.spi.ScheduledJobParser;
Expand Down Expand Up @@ -142,7 +143,10 @@ public class AnomalyDetectorPlugin extends Plugin implements ActionPlugin, Scrip
private static Gson gson;
private AnomalyDetectionIndices anomalyDetectionIndices;
private AnomalyDetectorRunner anomalyDetectorRunner;
private Client client;
private ClusterService clusterService;
private ThreadPool threadPool;
private IndexNameExpressionResolver indexNameExpressionResolver;
private ADStats adStats;

static {
Expand All @@ -164,6 +168,21 @@ public List<RestHandler> getRestHandlers(
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<DiscoveryNodes> nodesInCluster
) {
this.indexNameExpressionResolver = indexNameExpressionResolver;
AnomalyResultHandler anomalyResultHandler = new AnomalyResultHandler(
client,
settings,
clusterService,
indexNameExpressionResolver,
anomalyDetectionIndices,
threadPool
);
AnomalyDetectorJobRunner jobRunner = AnomalyDetectorJobRunner.getJobRunnerInstance();
jobRunner.setClient(client);
jobRunner.setThreadPool(threadPool);
jobRunner.setAnomalyResultHandler(anomalyResultHandler);
jobRunner.setSettings(settings);

RestGetAnomalyDetectorAction restGetAnomalyDetectorAction = new RestGetAnomalyDetectorAction(restController);
RestIndexAnomalyDetectorAction restIndexAnomalyDetectorAction = new RestIndexAnomalyDetectorAction(
settings,
Expand Down Expand Up @@ -218,6 +237,8 @@ public Collection<Object> createComponents(
NodeEnvironment nodeEnvironment,
NamedWriteableRegistry namedWriteableRegistry
) {
this.client = client;
this.threadPool = threadPool;
Settings settings = environment.settings();
Clock clock = Clock.systemUTC();
Throttler throttler = new Throttler(clock);
Expand Down Expand Up @@ -313,10 +334,6 @@ public Collection<Object> createComponents(
adStats = new ADStats(indexUtils, modelManager, stats);
ADCircuitBreakerService adCircuitBreakerService = new ADCircuitBreakerService(jvmService).init();

AnomalyDetectorJobRunner jobRunner = AnomalyDetectorJobRunner.getJobRunnerInstance();
jobRunner.setClient(client);
jobRunner.setThreadPool(threadPool);

return ImmutableList
.of(
anomalyDetectionIndices,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,10 @@ private List<AnomalyResult> parsePreviewResult(AnomalyDetector detector, Feature
thresholdingResult.getConfidence(),
featureDatas,
Instant.ofEpochMilli(timeRange.getKey()),
Instant.ofEpochMilli(timeRange.getValue())
Instant.ofEpochMilli(timeRange.getValue()),
null,
null,
null
);
} else {
result = new AnomalyResult(
Expand All @@ -124,7 +127,10 @@ private List<AnomalyResult> parsePreviewResult(AnomalyDetector detector, Feature
null,
featureDatas,
Instant.ofEpochMilli(timeRange.getKey()),
Instant.ofEpochMilli(timeRange.getValue())
Instant.ofEpochMilli(timeRange.getValue()),
null,
null,
null
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ private void deleteDetectorResult(
.filter(QueryBuilders.termsQuery(AnomalyResult.DETECTOR_ID_FIELD, detectorID))
.filter(
QueryBuilders
.rangeQuery(AnomalyResult.END_TIME_FIELD)
.rangeQuery(AnomalyResult.DATA_END_TIME_FIELD)
.lte(deleteBeforeEpochMillis)
.format(CommonName.EPOCH_MILLIS_FORMAT)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,10 @@
import com.google.common.io.Resources;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
import org.elasticsearch.action.admin.indices.rollover.RolloverResponse;
import org.elasticsearch.client.AdminClient;
Expand All @@ -46,8 +43,6 @@

import java.io.IOException;
import java.net.URL;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;

import static com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings.AD_RESULT_HISTORY_INDEX_MAX_AGE;
import static com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings.AD_RESULT_HISTORY_MAX_DOCS;
Expand Down Expand Up @@ -85,7 +80,6 @@ public class AnomalyDetectionIndices implements LocalNodeMasterListener, Cluster
private volatile Long historyMaxDocs;

private Scheduler.Cancellable scheduledRollover = null;
private AtomicBoolean historyIndexInitialized = new AtomicBoolean(false);

private static final Logger logger = LogManager.getLogger(AnomalyDetectionIndices.class);
private TimeValue lastRolloverTime = null;
Expand Down Expand Up @@ -218,8 +212,8 @@ public void initAnomalyDetectorIndex(ActionListener<CreateIndexResponse> actionL
* @throws IOException IOException from {@link AnomalyDetectionIndices#getAnomalyDetectorMappings}
*/
public void initAnomalyResultIndexIfAbsent(ActionListener<CreateIndexResponse> actionListener) throws IOException {
if (!historyIndexInitialized.get()) {
initAnomalyResultIndex(actionListener);
if (!doesAnomalyResultIndexExist()) {
initAnomalyResultIndexDirectly(actionListener);
}
}

Expand All @@ -229,10 +223,12 @@ public void initAnomalyResultIndexIfAbsent(ActionListener<CreateIndexResponse> a
* @param actionListener action called after create index
* @throws IOException IOException from {@link AnomalyDetectionIndices#getAnomalyDetectorMappings}
*/
public void initAnomalyResultIndex(ActionListener<CreateIndexResponse> actionListener) throws IOException {
public void initAnomalyResultIndexDirectly(ActionListener<CreateIndexResponse> actionListener) throws IOException {
String mapping = getAnomalyResultMappings();
boolean createIndexResult = createIndex(AD_RESULT_HISTORY_INDEX_PATTERN, AD_RESULT_HISTORY_WRITE_INDEX_ALIAS, mapping);
historyIndexInitialized.compareAndSet(false, createIndexResult);
CreateIndexRequest request = new CreateIndexRequest(AD_RESULT_HISTORY_INDEX_PATTERN)
.mapping(MAPPING_TYPE, mapping, XContentType.JSON)
.alias(new Alias(AD_RESULT_HISTORY_WRITE_INDEX_ALIAS));
adminClient.indices().create(request, actionListener);
}

/**
Expand All @@ -248,27 +244,6 @@ public void initAnomalyDetectorJobIndex(ActionListener<CreateIndexResponse> acti
adminClient.indices().create(request, actionListener);
}

private boolean createIndex(String index, String alias, String mapping) {
IndicesExistsRequest indicesExistsRequest = new IndicesExistsRequest(index).local(true);
// TODO: add appropriate listener
Optional<IndicesExistsResponse> existsResponse = requestUtil
.<IndicesExistsRequest, IndicesExistsResponse>timedRequest(indicesExistsRequest, logger, adminClient.indices()::exists);
if (existsResponse.isPresent() && existsResponse.get().isExists()) {
return true;
}
CreateIndexRequest createIndexRequest = new CreateIndexRequest(index).mapping(MAPPING_TYPE, mapping, XContentType.JSON);
if (alias != null) {
createIndexRequest.alias(new Alias(alias));
}
try {
Optional<CreateIndexResponse> response = requestUtil
.<CreateIndexRequest, CreateIndexResponse>timedRequest(createIndexRequest, logger, adminClient.indices()::create);
return response.isPresent() && response.get().isAcknowledged();
} catch (ResourceAlreadyExistsException e) {
return true;
}
}

@Override
public void onMaster() {
try {
Expand Down Expand Up @@ -297,7 +272,6 @@ public String executorName() {
@Override
public void clusterChanged(ClusterChangedEvent event) {
boolean hasAdResultAlias = event.state().metaData().hasAlias(AD_RESULT_HISTORY_WRITE_INDEX_ALIAS);
historyIndexInitialized.set(hasAdResultAlias);
}

private void rescheduleRollover() {
Expand All @@ -310,7 +284,7 @@ private void rescheduleRollover() {
}

private boolean rolloverHistoryIndex() {
if (!historyIndexInitialized.get()) {
if (!doesAnomalyResultIndexExist()) {
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,14 @@ public class AnomalyDetectorJob implements ToXContentObject, ScheduledJobParamet
public static final String LOCK_DURATION_SECONDS = "lock_duration_seconds";

private static final String SCHEDULE_FIELD = "schedule";
private static final String WINDOW_DELAY_FIELD = "window_delay";
private static final String IS_ENABLED_FIELD = "enabled";
private static final String ENABLED_TIME_FIELD = "enabled_time";
private static final String DISABLED_TIME_FIELD = "disabled_time";

private final String name;
private final Schedule schedule;
private final TimeConfiguration windowDelay;
private final Boolean isEnabled;
private final Instant enabledTime;
private final Instant disabledTime;
Expand All @@ -56,6 +58,7 @@ public class AnomalyDetectorJob implements ToXContentObject, ScheduledJobParamet
public AnomalyDetectorJob(
String name,
Schedule schedule,
TimeConfiguration windowDelay,
Boolean isEnabled,
Instant enabledTime,
Instant disabledTime,
Expand All @@ -64,6 +67,7 @@ public AnomalyDetectorJob(
) {
this.name = name;
this.schedule = schedule;
this.windowDelay = windowDelay;
this.isEnabled = isEnabled;
this.enabledTime = enabledTime;
this.disabledTime = disabledTime;
Expand All @@ -77,6 +81,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
.startObject()
.field(NAME_FIELD, name)
.field(SCHEDULE_FIELD, schedule)
.field(WINDOW_DELAY_FIELD, windowDelay)
.field(IS_ENABLED_FIELD, isEnabled)
.field(ENABLED_TIME_FIELD, enabledTime.toEpochMilli())
.field(LAST_UPDATE_TIME_FIELD, lastUpdateTime.toEpochMilli())
Expand All @@ -90,6 +95,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
public static AnomalyDetectorJob parse(XContentParser parser) throws IOException {
String name = null;
Schedule schedule = null;
TimeConfiguration windowDelay = null;
Boolean isEnabled = null;
Instant enabledTime = null;
Instant disabledTime = null;
Expand All @@ -108,6 +114,9 @@ public static AnomalyDetectorJob parse(XContentParser parser) throws IOException
case SCHEDULE_FIELD:
schedule = ScheduleParser.parse(parser);
break;
case WINDOW_DELAY_FIELD:
windowDelay = TimeConfiguration.parse(parser);
break;
case IS_ENABLED_FIELD:
isEnabled = parser.booleanValue();
break;
Expand All @@ -128,7 +137,16 @@ public static AnomalyDetectorJob parse(XContentParser parser) throws IOException
break;
}
}
return new AnomalyDetectorJob(name, schedule, isEnabled, enabledTime, disabledTime, lastUpdateTime, lockDurationSeconds);
return new AnomalyDetectorJob(
name,
schedule,
windowDelay,
isEnabled,
enabledTime,
disabledTime,
lastUpdateTime,
lockDurationSeconds
);
}

@Override
Expand Down Expand Up @@ -162,6 +180,10 @@ public Schedule getSchedule() {
return schedule;
}

public TimeConfiguration getWindowDelay() {
return windowDelay;
}

@Override
public boolean isEnabled() {
return isEnabled;
Expand Down
Loading