Skip to content

Commit

Permalink
[Extensions] Migrates AnomalyResultAction, EntityResultAction, RCFRes…
Browse files Browse the repository at this point in the history
…ultAction (#856)

* Initial AnomalyResultAction commit. Implements the required created components for ADResultAction, EntityResultAction, RCFResultAction. Uncomments JS dependent code

Signed-off-by: Joshua Palis <jpalis@amazon.com>

* uncomments runAnomalyDetectionJob

Signed-off-by: Joshua Palis <jpalis@amazon.com>

* Fixing release lock request, changed from lock ID to job ID

Signed-off-by: Joshua Palis <jpalis@amazon.com>

* Replacing indexNotFoundException with OpenSearchStatusException

Signed-off-by: Joshua Palis <jpalis@amazon.com>

* Makes all JS rest requests async calls, fixes release lock requests

Signed-off-by: Joshua Palis <jpalis@amazon.com>

* Updating action extension import due to changes in the SDK

Signed-off-by: Joshua Palis <jpalis@amazon.com>

* Enables AnomalyResults for single-entity real time analysis

Signed-off-by: Joshua Palis <jpalis@amazon.com>

* fixing affected test classes

Signed-off-by: Joshua Palis <jpalis@amazon.com>

* Registers ProfileTransportAction, needed for indexing anomaly results for HCAD. Preparational work for HCAD results, invokes EntityResultAction in pagelistener

Signed-off-by: Joshua Palis <jpalis@amazon.com>

* Addressing PR comments, removing remoteNodeID, checking for index not found message rather than catching OpenSearchStatusException

Signed-off-by: Joshua Palis <jpalis@amazon.com>

* Addressing PR comments

Signed-off-by: Joshua Palis <jpalis@amazon.com>

---------

Signed-off-by: Joshua Palis <jpalis@amazon.com>
  • Loading branch information
joshpalis authored Apr 21, 2023
1 parent 86b1084 commit 9c0a308
Show file tree
Hide file tree
Showing 21 changed files with 515 additions and 472 deletions.
6 changes: 6 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -772,6 +772,12 @@ List<String> jacocoExclusions = [
'org.opensearch.ad.AnomalyDetectorExtension.1.1',
'org.opensearch.ad.EntityProfileRunner',
'org.opensearch.ad.caching.CacheProvider',
'org.opensearch.ad.transport.ADResultBulkTransportAction',
'org.opensearch.ad.transport.ADResultBulkRequest',
'org.opensearch.ad.transport.ADResultBulkAction',
'org.opensearch.ad.ratelimit.ResultWriteRequest',
'org.opensearch.ad.AnomalyDetectorJobRunner.1',
'org.opensearch.ad.AnomalyDetectorJobRunner.2',
'org.opensearch.ad.util.RestHandlerUtils'
]

Expand Down
119 changes: 117 additions & 2 deletions src/main/java/org/opensearch/ad/AnomalyDetectorExtension.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,11 @@
import org.opensearch.ad.model.AnomalyDetectorJob;
import org.opensearch.ad.model.AnomalyResult;
import org.opensearch.ad.model.DetectorInternalState;
import org.opensearch.ad.ratelimit.CheckpointReadWorker;
import org.opensearch.ad.ratelimit.CheckpointWriteWorker;
import org.opensearch.ad.ratelimit.ColdEntityWorker;
import org.opensearch.ad.ratelimit.EntityColdStartWorker;
import org.opensearch.ad.ratelimit.ResultWriteWorker;
import org.opensearch.ad.rest.RestAnomalyDetectorJobAction;
import org.opensearch.ad.rest.RestDeleteAnomalyDetectorAction;
import org.opensearch.ad.rest.RestGetAnomalyDetectorAction;
Expand Down Expand Up @@ -80,12 +84,20 @@
import org.opensearch.ad.transport.ADStatsNodesTransportAction;
import org.opensearch.ad.transport.AnomalyDetectorJobAction;
import org.opensearch.ad.transport.AnomalyDetectorJobTransportAction;
import org.opensearch.ad.transport.AnomalyResultAction;
import org.opensearch.ad.transport.AnomalyResultTransportAction;
import org.opensearch.ad.transport.DeleteAnomalyDetectorAction;
import org.opensearch.ad.transport.DeleteAnomalyDetectorTransportAction;
import org.opensearch.ad.transport.EntityResultAction;
import org.opensearch.ad.transport.EntityResultTransportAction;
import org.opensearch.ad.transport.GetAnomalyDetectorAction;
import org.opensearch.ad.transport.GetAnomalyDetectorTransportAction;
import org.opensearch.ad.transport.IndexAnomalyDetectorAction;
import org.opensearch.ad.transport.IndexAnomalyDetectorTransportAction;
import org.opensearch.ad.transport.ProfileAction;
import org.opensearch.ad.transport.ProfileTransportAction;
import org.opensearch.ad.transport.RCFResultAction;
import org.opensearch.ad.transport.RCFResultTransportAction;
import org.opensearch.ad.transport.SearchADTasksAction;
import org.opensearch.ad.transport.SearchADTasksTransportAction;
import org.opensearch.ad.transport.SearchAnomalyDetectorAction;
Expand All @@ -98,6 +110,7 @@
import org.opensearch.ad.transport.ValidateAnomalyDetectorTransportAction;
import org.opensearch.ad.transport.handler.ADSearchHandler;
import org.opensearch.ad.transport.handler.AnomalyIndexHandler;
import org.opensearch.ad.transport.handler.MultiEntityResultHandler;
import org.opensearch.ad.util.ClientUtil;
import org.opensearch.ad.util.DiscoveryNodeFilterer;
import org.opensearch.ad.util.IndexUtils;
Expand Down Expand Up @@ -370,6 +383,26 @@ public PooledObject<LinkedBuffer> wrap(LinkedBuffer obj) {
checkpointWriteQueue,
AnomalyDetectorSettings.MAX_COLD_START_ROUNDS
);
EntityColdStartWorker coldstartQueue = new EntityColdStartWorker(
heapSizeBytes,
AnomalyDetectorSettings.ENTITY_REQUEST_SIZE_IN_BYTES,
AnomalyDetectorSettings.ENTITY_COLD_START_QUEUE_MAX_HEAP_PERCENT,
sdkClusterService,
random,
adCircuitBreakerService,
threadPool,
environmentSettings,
AnomalyDetectorSettings.MAX_QUEUED_TASKS_RATIO,
getClock(),
AnomalyDetectorSettings.MEDIUM_SEGMENT_PRUNE_RATIO,
AnomalyDetectorSettings.LOW_SEGMENT_PRUNE_RATIO,
AnomalyDetectorSettings.MAINTENANCE_FREQ_CONSTANT,
AnomalyDetectorSettings.QUEUE_MAINTENANCE,
entityColdStarter,
AnomalyDetectorSettings.HOURLY_MAINTENANCE,
stateManager
);

ModelManager modelManager = new ModelManager(
checkpoint,
getClock(),
Expand All @@ -385,6 +418,81 @@ public PooledObject<LinkedBuffer> wrap(LinkedBuffer obj) {
featureManager,
memoryTracker
);
MultiEntityResultHandler multiEntityResultHandler = new MultiEntityResultHandler(
sdkRestClient,
environmentSettings,
threadPool,
anomalyDetectionIndices,
clientUtil,
indexUtils,
sdkClusterService
);

ResultWriteWorker resultWriteQueue = new ResultWriteWorker(
heapSizeBytes,
AnomalyDetectorSettings.RESULT_WRITE_QUEUE_SIZE_IN_BYTES,
AnomalyDetectorSettings.RESULT_WRITE_QUEUE_MAX_HEAP_PERCENT,
sdkClusterService,
random,
adCircuitBreakerService,
threadPool,
environmentSettings,
AnomalyDetectorSettings.MAX_QUEUED_TASKS_RATIO,
getClock(),
AnomalyDetectorSettings.MEDIUM_SEGMENT_PRUNE_RATIO,
AnomalyDetectorSettings.LOW_SEGMENT_PRUNE_RATIO,
AnomalyDetectorSettings.MAINTENANCE_FREQ_CONSTANT,
AnomalyDetectorSettings.QUEUE_MAINTENANCE,
multiEntityResultHandler,
xContentRegistry,
stateManager,
AnomalyDetectorSettings.HOURLY_MAINTENANCE
);

CheckpointReadWorker checkpointReadQueue = new CheckpointReadWorker(
heapSizeBytes,
AnomalyDetectorSettings.ENTITY_FEATURE_REQUEST_SIZE_IN_BYTES,
AnomalyDetectorSettings.CHECKPOINT_READ_QUEUE_MAX_HEAP_PERCENT,
sdkClusterService,
random,
adCircuitBreakerService,
threadPool,
environmentSettings,
AnomalyDetectorSettings.MAX_QUEUED_TASKS_RATIO,
getClock(),
AnomalyDetectorSettings.MEDIUM_SEGMENT_PRUNE_RATIO,
AnomalyDetectorSettings.LOW_SEGMENT_PRUNE_RATIO,
AnomalyDetectorSettings.MAINTENANCE_FREQ_CONSTANT,
AnomalyDetectorSettings.QUEUE_MAINTENANCE,
modelManager,
checkpoint,
coldstartQueue,
resultWriteQueue,
stateManager,
anomalyDetectionIndices,
cacheProvider,
AnomalyDetectorSettings.HOURLY_MAINTENANCE,
checkpointWriteQueue
);

ColdEntityWorker coldEntityQueue = new ColdEntityWorker(
heapSizeBytes,
AnomalyDetectorSettings.ENTITY_FEATURE_REQUEST_SIZE_IN_BYTES,
AnomalyDetectorSettings.COLD_ENTITY_QUEUE_MAX_HEAP_PERCENT,
sdkClusterService,
random,
adCircuitBreakerService,
threadPool,
environmentSettings,
AnomalyDetectorSettings.MAX_QUEUED_TASKS_RATIO,
getClock(),
AnomalyDetectorSettings.MEDIUM_SEGMENT_PRUNE_RATIO,
AnomalyDetectorSettings.LOW_SEGMENT_PRUNE_RATIO,
AnomalyDetectorSettings.MAINTENANCE_FREQ_CONSTANT,
checkpointReadQueue,
AnomalyDetectorSettings.HOURLY_MAINTENANCE,
stateManager
);

Map<String, ADStat<?>> stats = ImmutableMap
.<String, ADStat<?>>builder()
Expand Down Expand Up @@ -480,11 +588,16 @@ public PooledObject<LinkedBuffer> wrap(LinkedBuffer obj) {
adCircuitBreakerService,
adStats,
nodeFilter,
multiEntityResultHandler,
checkpoint,
cacheProvider,
adTaskManager,
coldstartQueue,
resultWriteQueue,
checkpointReadQueue,
adSearchHandler,
checkpointWriteQueue,
coldEntityQueue,
entityColdStarter,
adTaskCacheManager
);
Expand Down Expand Up @@ -611,11 +724,13 @@ public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
new ActionHandler<>(ADJobRunnerAction.INSTANCE, ADJobRunnerTransportAction.class),
new ActionHandler<>(ADJobParameterAction.INSTANCE, ADJobParameterTransportAction.class),
new ActionHandler<>(AnomalyDetectorJobAction.INSTANCE, AnomalyDetectorJobTransportAction.class),
new ActionHandler<>(AnomalyResultAction.INSTANCE, AnomalyResultTransportAction.class),
new ActionHandler<>(RCFResultAction.INSTANCE, RCFResultTransportAction.class),
new ActionHandler<>(EntityResultAction.INSTANCE, EntityResultTransportAction.class),
new ActionHandler<>(ProfileAction.INSTANCE, ProfileTransportAction.class),
new ActionHandler<>(DeleteAnomalyDetectorAction.INSTANCE, DeleteAnomalyDetectorTransportAction.class),
new ActionHandler<>(StatsAnomalyDetectorAction.INSTANCE, StatsAnomalyDetectorTransportAction.class),
new ActionHandler<>(ADStatsNodesAction.INSTANCE, ADStatsNodesTransportAction.class),
// TODO : Register AnomalyResultAction/TransportAction here :
// https://github.com/opensearch-project/opensearch-sdk-java/issues/626
new ActionHandler<>(SearchAnomalyDetectorAction.INSTANCE, SearchAnomalyDetectorTransportAction.class),
new ActionHandler<>(SearchAnomalyResultAction.INSTANCE, SearchAnomalyResultTransportAction.class),
new ActionHandler<>(SearchADTasksAction.INSTANCE, SearchADTasksTransportAction.class)
Expand Down
62 changes: 44 additions & 18 deletions src/main/java/org/opensearch/ad/AnomalyDetectorJobRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@
import java.util.List;
import java.util.Locale;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -51,6 +53,8 @@
import org.opensearch.ad.rest.handler.AnomalyDetectorFunction;
import org.opensearch.ad.settings.AnomalyDetectorSettings;
import org.opensearch.ad.task.ADTaskManager;
import org.opensearch.ad.transport.AnomalyResultAction;
import org.opensearch.ad.transport.AnomalyResultRequest;
import org.opensearch.ad.transport.AnomalyResultResponse;
import org.opensearch.ad.transport.AnomalyResultTransportAction;
import org.opensearch.ad.transport.ProfileAction;
Expand All @@ -59,6 +63,7 @@
import org.opensearch.ad.util.DiscoveryNodeFilterer;
import org.opensearch.client.Request;
import org.opensearch.client.Response;
import org.opensearch.client.ResponseListener;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.Strings;
import org.opensearch.common.settings.Settings;
Expand Down Expand Up @@ -229,8 +234,7 @@ protected void runAdJob(AnomalyDetectorJob jobParameter, LockModel lock, Instant
}
String resultIndex = jobParameter.getResultIndex();
if (resultIndex == null) {
// TODO : https://github.com/opensearch-project/opensearch-sdk-java/issues/626
// runAnomalyDetectionJob(jobParameter, lock, detectionStartTime, executionStartTime, detectorId, user, roles);
runAnomalyDetectionJob(jobParameter, lock, detectionStartTime, executionStartTime, detectorId, user, roles);
return;
}
ActionListener<Boolean> listener = ActionListener.wrap(r -> { log.debug("Custom index is valid"); }, e -> {
Expand All @@ -239,12 +243,10 @@ protected void runAdJob(AnomalyDetectorJob jobParameter, LockModel lock, Instant
});
anomalyDetectionIndices.validateCustomIndexForBackendJob(resultIndex, detectorId, user, roles, () -> {
listener.onResponse(true);
// TODO https://github.com/opensearch-project/opensearch-sdk-java/issues/626
// runAnomalyDetectionJob(jobParameter, lock, detectionStartTime, executionStartTime, detectorId, user, roles);
runAnomalyDetectionJob(jobParameter, lock, detectionStartTime, executionStartTime, detectorId, user, roles);
}, listener);
}

/* @anomaly.detection - will be handled by https://github.com/opensearch-project/opensearch-sdk-java/issues/626
private void runAnomalyDetectionJob(
AnomalyDetectorJob jobParameter,
LockModel lock,
Expand Down Expand Up @@ -275,7 +277,6 @@ private void runAnomalyDetectionJob(
log.error("Failed to execute AD job " + detectorId, e);
}
}
*/

/**
* Handle exception from anomaly result action.
Expand Down Expand Up @@ -632,7 +633,6 @@ private void updateLatestRealtimeTask(
}

private LockModel acquireLock(JobExecutionContext context, Long lockDurationSeconds) throws Exception {

// Build request body
AcquireLockRequest acquireLockRequestBody = new AcquireLockRequest(
context.getJobId(),
Expand All @@ -645,30 +645,56 @@ private LockModel acquireLock(JobExecutionContext context, Long lockDurationSeco
acquireLockRequest
.setJsonEntity(Strings.toString(acquireLockRequestBody.toXContent(JsonXContent.contentBuilder(), ToXContent.EMPTY_PARAMS)));

// Parse response map fields for lock model
Response acquireLockResponse = client.performRequest(acquireLockRequest);
CompletableFuture<Response> acquireLockResponse = new CompletableFuture<>();
client.performRequestAsync(acquireLockRequest, new ResponseListener() {

@Override
public void onSuccess(Response response) {
acquireLockResponse.complete(response);
}

@Override
public void onFailure(Exception exception) {
acquireLockResponse.completeExceptionally(exception);
}

});
Response response = acquireLockResponse.orTimeout(15L, TimeUnit.SECONDS).join();

log.info("Acquired lock for AD job {}", context.getJobId());

XContentParser parser = XContentType.JSON
.xContent()
.createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, acquireLockResponse.getEntity().getContent());

.createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, response.getEntity().getContent());
AcquireLockResponse acquireLockResponseBody = AcquireLockResponse.parse(parser);

return acquireLockResponseBody.getLock();
}

private void releaseLock(AnomalyDetectorJob jobParameter, LockModel lock) {

Request releaseLockRequest = new Request(
"PUT",
String.format(Locale.ROOT, "%s/%s", JobSchedulerPlugin.JS_BASE_URI, "_release_lock")
String.format(Locale.ROOT, "%s/%s/%s", JobSchedulerPlugin.JS_BASE_URI, "_release_lock", lock.getLockId())
);
releaseLockRequest.addParameter(LockModel.LOCK_ID, lock.getLockId());

try {
Response releaseLockResponse = client.performRequest(releaseLockRequest);
boolean lockIsReleased = RestStatus.fromCode(releaseLockResponse.getStatusLine().getStatusCode()) == RestStatus.OK
? true
: false;
CompletableFuture<Response> releaseLockResponse = new CompletableFuture<>();
client.performRequestAsync(releaseLockRequest, new ResponseListener() {

@Override
public void onSuccess(Response response) {
releaseLockResponse.complete(response);
}

@Override
public void onFailure(Exception exception) {
releaseLockResponse.completeExceptionally(exception);
}

});
Response response = releaseLockResponse.orTimeout(15L, TimeUnit.SECONDS).join();

boolean lockIsReleased = RestStatus.fromCode(response.getStatusLine().getStatusCode()) == RestStatus.OK ? true : false;
if (lockIsReleased) {
log.info("Released lock for AD job {}", jobParameter.getName());
} else {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
// @anomaly-detection.create-detector Commented this code until we have support of Job Scheduler for extensibility
/*
* SPDX-License-Identifier: Apache-2.0
*
Expand Down
23 changes: 10 additions & 13 deletions src/main/java/org/opensearch/ad/NodeState.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import java.util.Optional;

import org.opensearch.ad.model.AnomalyDetector;
//import org.opensearch.ad.model.AnomalyDetectorJob;
import org.opensearch.ad.model.AnomalyDetectorJob;

/**
* Storing intermediate state during the execution of transport action
Expand All @@ -43,8 +43,7 @@ public class NodeState implements ExpiringState {
// cold start running flag to prevent concurrent cold start
private boolean coldStartRunning;
// detector job
// @anomaly-detection.create-detector Commented this code until we have support of Job Scheduler for extensibility
// private AnomalyDetectorJob detectorJob;
private AnomalyDetectorJob detectorJob;

public NodeState(String detectorId, Clock clock) {
this.detectorId = detectorId;
Expand Down Expand Up @@ -171,25 +170,23 @@ public void setColdStartRunning(boolean coldStartRunning) {
refreshLastUpdateTime();
}

// @anomaly-detection.create-detector Commented this code until we have support of Job Scheduler for extensibility
/**
*
* @return Detector configuration object
*/
// public AnomalyDetectorJob getDetectorJob() {
// refreshLastUpdateTime();
// return detectorJob;
// }
public AnomalyDetectorJob getDetectorJob() {
refreshLastUpdateTime();
return detectorJob;
}

// @anomaly-detection.create-detector Commented this code until we have support of Job Scheduler for extensibility
/**
*
* @param detectorJob Detector job
*/
// public void setDetectorJob(AnomalyDetectorJob detectorJob) {
// this.detectorJob = detectorJob;
// refreshLastUpdateTime();
// }
public void setDetectorJob(AnomalyDetectorJob detectorJob) {
this.detectorJob = detectorJob;
refreshLastUpdateTime();
}

/**
* refresh last access time.
Expand Down
Loading

0 comments on commit 9c0a308

Please sign in to comment.