Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Extensions] Migrates AnomalyResultAction, EntityResultAction, RCFResultAction #856

Merged
merged 17 commits into from
Apr 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -772,6 +772,12 @@ List<String> jacocoExclusions = [
'org.opensearch.ad.AnomalyDetectorExtension.1.1',
'org.opensearch.ad.EntityProfileRunner',
'org.opensearch.ad.caching.CacheProvider',
'org.opensearch.ad.transport.ADResultBulkTransportAction',
'org.opensearch.ad.transport.ADResultBulkRequest',
'org.opensearch.ad.transport.ADResultBulkAction',
'org.opensearch.ad.ratelimit.ResultWriteRequest',
'org.opensearch.ad.AnomalyDetectorJobRunner.1',
'org.opensearch.ad.AnomalyDetectorJobRunner.2',
Comment on lines +775 to +780
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious why these classes are not having test coverage?

Copy link
Member Author

@joshpalis joshpalis Apr 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test classes regarding ADResultBulkTransportAction/ResultWriteWorker have been commented out for this PR temporarily, I will handle them in this PR to enable HCAD real time analysis, as the ADResultBulkTransportAction is used to index multi entity Anomaly Results. This current PR includes some preparational work for the HCAD workflow

'org.opensearch.ad.util.RestHandlerUtils'
]

Expand Down
119 changes: 117 additions & 2 deletions src/main/java/org/opensearch/ad/AnomalyDetectorExtension.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,11 @@
import org.opensearch.ad.model.AnomalyDetectorJob;
import org.opensearch.ad.model.AnomalyResult;
import org.opensearch.ad.model.DetectorInternalState;
import org.opensearch.ad.ratelimit.CheckpointReadWorker;
import org.opensearch.ad.ratelimit.CheckpointWriteWorker;
import org.opensearch.ad.ratelimit.ColdEntityWorker;
import org.opensearch.ad.ratelimit.EntityColdStartWorker;
import org.opensearch.ad.ratelimit.ResultWriteWorker;
import org.opensearch.ad.rest.RestAnomalyDetectorJobAction;
import org.opensearch.ad.rest.RestDeleteAnomalyDetectorAction;
import org.opensearch.ad.rest.RestGetAnomalyDetectorAction;
Expand Down Expand Up @@ -80,12 +84,20 @@
import org.opensearch.ad.transport.ADStatsNodesTransportAction;
import org.opensearch.ad.transport.AnomalyDetectorJobAction;
import org.opensearch.ad.transport.AnomalyDetectorJobTransportAction;
import org.opensearch.ad.transport.AnomalyResultAction;
import org.opensearch.ad.transport.AnomalyResultTransportAction;
import org.opensearch.ad.transport.DeleteAnomalyDetectorAction;
import org.opensearch.ad.transport.DeleteAnomalyDetectorTransportAction;
import org.opensearch.ad.transport.EntityResultAction;
import org.opensearch.ad.transport.EntityResultTransportAction;
import org.opensearch.ad.transport.GetAnomalyDetectorAction;
import org.opensearch.ad.transport.GetAnomalyDetectorTransportAction;
import org.opensearch.ad.transport.IndexAnomalyDetectorAction;
import org.opensearch.ad.transport.IndexAnomalyDetectorTransportAction;
import org.opensearch.ad.transport.ProfileAction;
import org.opensearch.ad.transport.ProfileTransportAction;
import org.opensearch.ad.transport.RCFResultAction;
import org.opensearch.ad.transport.RCFResultTransportAction;
import org.opensearch.ad.transport.SearchADTasksAction;
import org.opensearch.ad.transport.SearchADTasksTransportAction;
import org.opensearch.ad.transport.SearchAnomalyDetectorAction;
Expand All @@ -98,6 +110,7 @@
import org.opensearch.ad.transport.ValidateAnomalyDetectorTransportAction;
import org.opensearch.ad.transport.handler.ADSearchHandler;
import org.opensearch.ad.transport.handler.AnomalyIndexHandler;
import org.opensearch.ad.transport.handler.MultiEntityResultHandler;
import org.opensearch.ad.util.ClientUtil;
import org.opensearch.ad.util.DiscoveryNodeFilterer;
import org.opensearch.ad.util.IndexUtils;
Expand Down Expand Up @@ -370,6 +383,26 @@ public PooledObject<LinkedBuffer> wrap(LinkedBuffer obj) {
checkpointWriteQueue,
AnomalyDetectorSettings.MAX_COLD_START_ROUNDS
);
EntityColdStartWorker coldstartQueue = new EntityColdStartWorker(
heapSizeBytes,
AnomalyDetectorSettings.ENTITY_REQUEST_SIZE_IN_BYTES,
AnomalyDetectorSettings.ENTITY_COLD_START_QUEUE_MAX_HEAP_PERCENT,
sdkClusterService,
random,
adCircuitBreakerService,
threadPool,
environmentSettings,
AnomalyDetectorSettings.MAX_QUEUED_TASKS_RATIO,
getClock(),
AnomalyDetectorSettings.MEDIUM_SEGMENT_PRUNE_RATIO,
AnomalyDetectorSettings.LOW_SEGMENT_PRUNE_RATIO,
AnomalyDetectorSettings.MAINTENANCE_FREQ_CONSTANT,
AnomalyDetectorSettings.QUEUE_MAINTENANCE,
entityColdStarter,
AnomalyDetectorSettings.HOURLY_MAINTENANCE,
stateManager
);

ModelManager modelManager = new ModelManager(
checkpoint,
getClock(),
Expand All @@ -385,6 +418,81 @@ public PooledObject<LinkedBuffer> wrap(LinkedBuffer obj) {
featureManager,
memoryTracker
);
MultiEntityResultHandler multiEntityResultHandler = new MultiEntityResultHandler(
sdkRestClient,
environmentSettings,
threadPool,
anomalyDetectionIndices,
clientUtil,
indexUtils,
sdkClusterService
);

ResultWriteWorker resultWriteQueue = new ResultWriteWorker(
heapSizeBytes,
AnomalyDetectorSettings.RESULT_WRITE_QUEUE_SIZE_IN_BYTES,
AnomalyDetectorSettings.RESULT_WRITE_QUEUE_MAX_HEAP_PERCENT,
sdkClusterService,
random,
adCircuitBreakerService,
threadPool,
environmentSettings,
AnomalyDetectorSettings.MAX_QUEUED_TASKS_RATIO,
getClock(),
AnomalyDetectorSettings.MEDIUM_SEGMENT_PRUNE_RATIO,
AnomalyDetectorSettings.LOW_SEGMENT_PRUNE_RATIO,
AnomalyDetectorSettings.MAINTENANCE_FREQ_CONSTANT,
AnomalyDetectorSettings.QUEUE_MAINTENANCE,
multiEntityResultHandler,
xContentRegistry,
stateManager,
AnomalyDetectorSettings.HOURLY_MAINTENANCE
);

CheckpointReadWorker checkpointReadQueue = new CheckpointReadWorker(
heapSizeBytes,
AnomalyDetectorSettings.ENTITY_FEATURE_REQUEST_SIZE_IN_BYTES,
AnomalyDetectorSettings.CHECKPOINT_READ_QUEUE_MAX_HEAP_PERCENT,
sdkClusterService,
random,
adCircuitBreakerService,
threadPool,
environmentSettings,
AnomalyDetectorSettings.MAX_QUEUED_TASKS_RATIO,
getClock(),
AnomalyDetectorSettings.MEDIUM_SEGMENT_PRUNE_RATIO,
AnomalyDetectorSettings.LOW_SEGMENT_PRUNE_RATIO,
AnomalyDetectorSettings.MAINTENANCE_FREQ_CONSTANT,
AnomalyDetectorSettings.QUEUE_MAINTENANCE,
modelManager,
checkpoint,
coldstartQueue,
resultWriteQueue,
stateManager,
anomalyDetectionIndices,
cacheProvider,
AnomalyDetectorSettings.HOURLY_MAINTENANCE,
checkpointWriteQueue
);

ColdEntityWorker coldEntityQueue = new ColdEntityWorker(
heapSizeBytes,
AnomalyDetectorSettings.ENTITY_FEATURE_REQUEST_SIZE_IN_BYTES,
AnomalyDetectorSettings.COLD_ENTITY_QUEUE_MAX_HEAP_PERCENT,
sdkClusterService,
random,
adCircuitBreakerService,
threadPool,
environmentSettings,
AnomalyDetectorSettings.MAX_QUEUED_TASKS_RATIO,
getClock(),
AnomalyDetectorSettings.MEDIUM_SEGMENT_PRUNE_RATIO,
AnomalyDetectorSettings.LOW_SEGMENT_PRUNE_RATIO,
AnomalyDetectorSettings.MAINTENANCE_FREQ_CONSTANT,
checkpointReadQueue,
AnomalyDetectorSettings.HOURLY_MAINTENANCE,
stateManager
);

Map<String, ADStat<?>> stats = ImmutableMap
.<String, ADStat<?>>builder()
Expand Down Expand Up @@ -480,11 +588,16 @@ public PooledObject<LinkedBuffer> wrap(LinkedBuffer obj) {
adCircuitBreakerService,
adStats,
nodeFilter,
multiEntityResultHandler,
checkpoint,
cacheProvider,
adTaskManager,
coldstartQueue,
resultWriteQueue,
checkpointReadQueue,
adSearchHandler,
checkpointWriteQueue,
coldEntityQueue,
entityColdStarter,
adTaskCacheManager
);
Expand Down Expand Up @@ -611,11 +724,13 @@ public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
new ActionHandler<>(ADJobRunnerAction.INSTANCE, ADJobRunnerTransportAction.class),
new ActionHandler<>(ADJobParameterAction.INSTANCE, ADJobParameterTransportAction.class),
new ActionHandler<>(AnomalyDetectorJobAction.INSTANCE, AnomalyDetectorJobTransportAction.class),
new ActionHandler<>(AnomalyResultAction.INSTANCE, AnomalyResultTransportAction.class),
new ActionHandler<>(RCFResultAction.INSTANCE, RCFResultTransportAction.class),
new ActionHandler<>(EntityResultAction.INSTANCE, EntityResultTransportAction.class),
new ActionHandler<>(ProfileAction.INSTANCE, ProfileTransportAction.class),
new ActionHandler<>(DeleteAnomalyDetectorAction.INSTANCE, DeleteAnomalyDetectorTransportAction.class),
new ActionHandler<>(StatsAnomalyDetectorAction.INSTANCE, StatsAnomalyDetectorTransportAction.class),
new ActionHandler<>(ADStatsNodesAction.INSTANCE, ADStatsNodesTransportAction.class),
// TODO : Register AnomalyResultAction/TransportAction here :
// https://github.com/opensearch-project/opensearch-sdk-java/issues/626
new ActionHandler<>(SearchAnomalyDetectorAction.INSTANCE, SearchAnomalyDetectorTransportAction.class),
new ActionHandler<>(SearchAnomalyResultAction.INSTANCE, SearchAnomalyResultTransportAction.class),
new ActionHandler<>(SearchADTasksAction.INSTANCE, SearchADTasksTransportAction.class)
Expand Down
62 changes: 44 additions & 18 deletions src/main/java/org/opensearch/ad/AnomalyDetectorJobRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@
import java.util.List;
import java.util.Locale;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -51,6 +53,8 @@
import org.opensearch.ad.rest.handler.AnomalyDetectorFunction;
import org.opensearch.ad.settings.AnomalyDetectorSettings;
import org.opensearch.ad.task.ADTaskManager;
import org.opensearch.ad.transport.AnomalyResultAction;
import org.opensearch.ad.transport.AnomalyResultRequest;
import org.opensearch.ad.transport.AnomalyResultResponse;
import org.opensearch.ad.transport.AnomalyResultTransportAction;
import org.opensearch.ad.transport.ProfileAction;
Expand All @@ -59,6 +63,7 @@
import org.opensearch.ad.util.DiscoveryNodeFilterer;
import org.opensearch.client.Request;
import org.opensearch.client.Response;
import org.opensearch.client.ResponseListener;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.Strings;
import org.opensearch.common.settings.Settings;
Expand Down Expand Up @@ -229,8 +234,7 @@ protected void runAdJob(AnomalyDetectorJob jobParameter, LockModel lock, Instant
}
String resultIndex = jobParameter.getResultIndex();
if (resultIndex == null) {
// TODO : https://github.com/opensearch-project/opensearch-sdk-java/issues/626
// runAnomalyDetectionJob(jobParameter, lock, detectionStartTime, executionStartTime, detectorId, user, roles);
runAnomalyDetectionJob(jobParameter, lock, detectionStartTime, executionStartTime, detectorId, user, roles);
return;
}
ActionListener<Boolean> listener = ActionListener.wrap(r -> { log.debug("Custom index is valid"); }, e -> {
Expand All @@ -239,12 +243,10 @@ protected void runAdJob(AnomalyDetectorJob jobParameter, LockModel lock, Instant
});
anomalyDetectionIndices.validateCustomIndexForBackendJob(resultIndex, detectorId, user, roles, () -> {
listener.onResponse(true);
// TODO https://github.com/opensearch-project/opensearch-sdk-java/issues/626
// runAnomalyDetectionJob(jobParameter, lock, detectionStartTime, executionStartTime, detectorId, user, roles);
runAnomalyDetectionJob(jobParameter, lock, detectionStartTime, executionStartTime, detectorId, user, roles);
}, listener);
}

/* @anomaly.detection - will be handled by https://github.com/opensearch-project/opensearch-sdk-java/issues/626
private void runAnomalyDetectionJob(
AnomalyDetectorJob jobParameter,
LockModel lock,
Expand Down Expand Up @@ -275,7 +277,6 @@ private void runAnomalyDetectionJob(
log.error("Failed to execute AD job " + detectorId, e);
}
}
*/

/**
* Handle exception from anomaly result action.
Expand Down Expand Up @@ -632,7 +633,6 @@ private void updateLatestRealtimeTask(
}

private LockModel acquireLock(JobExecutionContext context, Long lockDurationSeconds) throws Exception {

// Build request body
AcquireLockRequest acquireLockRequestBody = new AcquireLockRequest(
context.getJobId(),
Expand All @@ -645,30 +645,56 @@ private LockModel acquireLock(JobExecutionContext context, Long lockDurationSeco
acquireLockRequest
.setJsonEntity(Strings.toString(acquireLockRequestBody.toXContent(JsonXContent.contentBuilder(), ToXContent.EMPTY_PARAMS)));

// Parse response map fields for lock model
Response acquireLockResponse = client.performRequest(acquireLockRequest);
CompletableFuture<Response> acquireLockResponse = new CompletableFuture<>();
client.performRequestAsync(acquireLockRequest, new ResponseListener() {

@Override
public void onSuccess(Response response) {
acquireLockResponse.complete(response);
}

@Override
public void onFailure(Exception exception) {
acquireLockResponse.completeExceptionally(exception);
}

});
Response response = acquireLockResponse.orTimeout(15L, TimeUnit.SECONDS).join();

log.info("Acquired lock for AD job {}", context.getJobId());

XContentParser parser = XContentType.JSON
.xContent()
.createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, acquireLockResponse.getEntity().getContent());

.createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, response.getEntity().getContent());
AcquireLockResponse acquireLockResponseBody = AcquireLockResponse.parse(parser);

return acquireLockResponseBody.getLock();
}

private void releaseLock(AnomalyDetectorJob jobParameter, LockModel lock) {

Request releaseLockRequest = new Request(
"PUT",
String.format(Locale.ROOT, "%s/%s", JobSchedulerPlugin.JS_BASE_URI, "_release_lock")
String.format(Locale.ROOT, "%s/%s/%s", JobSchedulerPlugin.JS_BASE_URI, "_release_lock", lock.getLockId())
);
releaseLockRequest.addParameter(LockModel.LOCK_ID, lock.getLockId());

try {
Response releaseLockResponse = client.performRequest(releaseLockRequest);
boolean lockIsReleased = RestStatus.fromCode(releaseLockResponse.getStatusLine().getStatusCode()) == RestStatus.OK
? true
: false;
CompletableFuture<Response> releaseLockResponse = new CompletableFuture<>();
client.performRequestAsync(releaseLockRequest, new ResponseListener() {

@Override
public void onSuccess(Response response) {
releaseLockResponse.complete(response);
}

@Override
public void onFailure(Exception exception) {
releaseLockResponse.completeExceptionally(exception);
}

});
Response response = releaseLockResponse.orTimeout(15L, TimeUnit.SECONDS).join();

boolean lockIsReleased = RestStatus.fromCode(response.getStatusLine().getStatusCode()) == RestStatus.OK ? true : false;
if (lockIsReleased) {
log.info("Released lock for AD job {}", jobParameter.getName());
} else {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
// @anomaly-detection.create-detector Commented this code until we have support of Job Scheduler for extensibility
/*
* SPDX-License-Identifier: Apache-2.0
*
Expand Down
23 changes: 10 additions & 13 deletions src/main/java/org/opensearch/ad/NodeState.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import java.util.Optional;

import org.opensearch.ad.model.AnomalyDetector;
//import org.opensearch.ad.model.AnomalyDetectorJob;
import org.opensearch.ad.model.AnomalyDetectorJob;

/**
* Storing intermediate state during the execution of transport action
Expand All @@ -43,8 +43,7 @@ public class NodeState implements ExpiringState {
// cold start running flag to prevent concurrent cold start
private boolean coldStartRunning;
// detector job
// @anomaly-detection.create-detector Commented this code until we have support of Job Scheduler for extensibility
// private AnomalyDetectorJob detectorJob;
private AnomalyDetectorJob detectorJob;

public NodeState(String detectorId, Clock clock) {
this.detectorId = detectorId;
Expand Down Expand Up @@ -171,25 +170,23 @@ public void setColdStartRunning(boolean coldStartRunning) {
refreshLastUpdateTime();
}

// @anomaly-detection.create-detector Commented this code until we have support of Job Scheduler for extensibility
/**
*
* @return Detector configuration object
*/
// public AnomalyDetectorJob getDetectorJob() {
// refreshLastUpdateTime();
// return detectorJob;
// }
public AnomalyDetectorJob getDetectorJob() {
refreshLastUpdateTime();
return detectorJob;
}

// @anomaly-detection.create-detector Commented this code until we have support of Job Scheduler for extensibility
/**
*
* @param detectorJob Detector job
*/
// public void setDetectorJob(AnomalyDetectorJob detectorJob) {
// this.detectorJob = detectorJob;
// refreshLastUpdateTime();
// }
public void setDetectorJob(AnomalyDetectorJob detectorJob) {
this.detectorJob = detectorJob;
refreshLastUpdateTime();
}

/**
* refresh last access time.
Expand Down
Loading