Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature/extensions] Implements create components for AnomalyDetectionIndices and ADTaskManager #825

Merged
merged 6 commits into from
Mar 2, 2023
17 changes: 16 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -729,7 +729,21 @@ List<String> jacocoExclusions = [
'org.opensearch.ad.transport.AnomalyResultAction',
'org.opensearch.ad.transport.CronNodeResponse',
'org.opensearch.ad.transport.CronResponse',
'org.opensearch.ad.transport.AnomalyResultResponse'
'org.opensearch.ad.transport.AnomalyResultResponse',
'org.opensearch.ad.MemoryTracker',
'org.opensearch.ad.caching.PriorityCache',
'org.opensearch.ad.common.exception.EndRunException',
'org.opensearch.ad.common.exception.LimitExceededException',
'org.opensearch.ad.common.exception.ClientException',
'org.opensearch.ad.task.ADTaskSlotLimit',
'org.opensearch.ad.task.ADHCBatchTaskCache',
'org.opensearch.ad.task.ADTaskCacheManager',
'org.opensearch.ad.task.ADRealtimeTaskCache',
'org.opensearch.ad.task.ADHCBatchTaskRunState:',
'org.opensearch.ad.task.ADBatchTaskCache',
'org.opensearch.ad.caching.DoorKeeper',
'org.opensearch.ad.caching.PriorityCache.1',
'org.opensearch.ad.caching.CacheProvider'
]


Expand Down Expand Up @@ -774,6 +788,7 @@ dependencies {
// implementation "org.opensearch:common-utils:${common_utils_version}"
implementation "org.opensearch:opensearch-job-scheduler:${job_scheduler_version}"
implementation "org.opensearch.sdk:opensearch-sdk-java:2.0.0-SNAPSHOT"
implementation "com.google.inject:guice:5.1.0"
implementation "org.opensearch.client:opensearch-java:${opensearch_version}"
implementation "org.opensearch.client:opensearch-rest-client:${opensearch_version}"
implementation "org.opensearch.client:opensearch-rest-high-level-client:${opensearch_version}"
Expand Down
58 changes: 58 additions & 0 deletions src/main/java/org/opensearch/ad/AnomalyDetectorExtension.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import static java.util.Collections.unmodifiableList;

import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -21,6 +22,8 @@
import org.opensearch.action.ActionRequest;
import org.opensearch.action.ActionResponse;
import org.opensearch.action.support.TransportAction;
import org.opensearch.ad.breaker.ADCircuitBreakerService;
import org.opensearch.ad.indices.AnomalyDetectionIndices;
import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.model.AnomalyDetectorJob;
import org.opensearch.ad.model.AnomalyResult;
Expand All @@ -30,18 +33,25 @@
import org.opensearch.ad.rest.RestValidateAnomalyDetectorAction;
import org.opensearch.ad.settings.AnomalyDetectorSettings;
import org.opensearch.ad.settings.EnabledSetting;
import org.opensearch.ad.task.ADTaskCacheManager;
import org.opensearch.ad.task.ADTaskManager;
import org.opensearch.ad.transport.ADJobParameterAction;
import org.opensearch.ad.transport.ADJobParameterTransportAction;
import org.opensearch.ad.transport.ADJobRunnerAction;
import org.opensearch.ad.transport.ADJobRunnerTransportAction;
import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.monitor.jvm.JvmService;
import org.opensearch.sdk.BaseExtension;
import org.opensearch.sdk.ExtensionRestHandler;
import org.opensearch.sdk.ExtensionsRunner;
import org.opensearch.sdk.SDKClient;
import org.opensearch.sdk.SDKClient.SDKRestClient;
import org.opensearch.sdk.SDKClusterService;
import org.opensearch.sdk.SDKNamedXContentRegistry;
import org.opensearch.threadpool.ThreadPool;

import com.google.common.collect.ImmutableList;

Expand All @@ -65,6 +75,54 @@ public List<ExtensionRestHandler> getExtensionRestHandlers() {
);
}

@Override
public Collection<Object> createComponents(ExtensionsRunner runner) {

SDKRestClient sdkRestClient = getRestClient();
dbwiddis marked this conversation as resolved.
Show resolved Hide resolved
SDKClusterService sdkClusterService = runner.getSdkClusterService();
Settings environmentSettings = runner.getEnvironmentSettings();
dbwiddis marked this conversation as resolved.
Show resolved Hide resolved
SDKNamedXContentRegistry xContentRegistry = runner.getNamedXContentRegistry();
ThreadPool threadPool = runner.getThreadPool();

JvmService jvmService = new JvmService(environmentSettings);
dbwiddis marked this conversation as resolved.
Show resolved Hide resolved

ADCircuitBreakerService adCircuitBreakerService = new ADCircuitBreakerService(jvmService).init();

MemoryTracker memoryTracker = new MemoryTracker(
jvmService,
AnomalyDetectorSettings.MODEL_MAX_SIZE_PERCENTAGE.get(environmentSettings),
AnomalyDetectorSettings.DESIRED_MODEL_SIZE_PERCENTAGE,
sdkClusterService,
adCircuitBreakerService
);

ADTaskCacheManager adTaskCacheManager = new ADTaskCacheManager(environmentSettings, sdkClusterService, memoryTracker);

AnomalyDetectionIndices anomalyDetectionIndices = new AnomalyDetectionIndices(
sdkRestClient,
sdkClusterService,
threadPool,
environmentSettings,
null, // nodeFilter
AnomalyDetectorSettings.MAX_UPDATE_RETRY_TIMES
);

ADTaskManager adTaskManager = new ADTaskManager(
environmentSettings,
sdkClusterService,
sdkRestClient,
xContentRegistry,
anomalyDetectionIndices,
null, // nodeFilter
null, // hashRing
adTaskCacheManager,
threadPool
);

return ImmutableList
.of(sdkRestClient, anomalyDetectionIndices, jvmService, adCircuitBreakerService, adTaskManager, adTaskCacheManager);
}

@Override
public List<Setting<?>> getSettings() {
// Copied from AnomalyDetectorPlugin getSettings
Expand Down
73 changes: 14 additions & 59 deletions src/main/java/org/opensearch/ad/AnomalyDetectorPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@
import java.security.PrivilegedAction;
import java.time.Clock;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Random;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand All @@ -30,21 +28,7 @@
import org.opensearch.SpecialPermission;
import org.opensearch.action.ActionRequest;
import org.opensearch.action.ActionResponse;
import org.opensearch.ad.breaker.ADCircuitBreakerService;
import org.opensearch.ad.cluster.HashRing;
import org.opensearch.ad.constant.CommonName;
import org.opensearch.ad.dataprocessor.IntegerSensitiveSingleFeatureLinearUniformInterpolator;
import org.opensearch.ad.dataprocessor.Interpolator;
import org.opensearch.ad.dataprocessor.LinearUniformInterpolator;
import org.opensearch.ad.dataprocessor.SingleFeatureLinearUniformInterpolator;
import org.opensearch.ad.feature.FeatureManager;
import org.opensearch.ad.feature.SearchFeatureDao;
import org.opensearch.ad.indices.AnomalyDetectionIndices;
import org.opensearch.ad.ml.CheckpointDao;
import org.opensearch.ad.ml.EntityColdStarter;
import org.opensearch.ad.ml.HybridThresholdingModel;
import org.opensearch.ad.ml.ModelManager;
import org.opensearch.ad.ratelimit.CheckpointWriteWorker;
import org.opensearch.ad.settings.AnomalyDetectorSettings;
import org.opensearch.ad.settings.EnabledSetting;
import org.opensearch.ad.stats.ADStats;
Expand All @@ -58,39 +42,24 @@
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.io.stream.NamedWriteableRegistry;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.IndexScopedSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.settings.SettingsFilter;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.env.Environment;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.monitor.jvm.JvmInfo;
import org.opensearch.monitor.jvm.JvmService;
import org.opensearch.plugins.ActionPlugin;
import org.opensearch.plugins.Plugin;
import org.opensearch.plugins.ScriptPlugin;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.rest.RestController;
import org.opensearch.rest.RestHandler;
import org.opensearch.script.ScriptService;
import org.opensearch.threadpool.ExecutorBuilder;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.watcher.ResourceWatcherService;

import com.amazon.randomcutforest.parkservices.state.ThresholdedRandomCutForestMapper;
import com.amazon.randomcutforest.parkservices.state.ThresholdedRandomCutForestState;
import com.amazon.randomcutforest.serialize.json.v1.V1JsonToV3StateConverter;
import com.amazon.randomcutforest.state.RandomCutForestMapper;
import com.google.common.collect.ImmutableList;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;

import io.protostuff.LinkedBuffer;
import io.protostuff.Schema;
import io.protostuff.runtime.RuntimeSchema;

/**
* Entry point of AD plugin.
Expand Down Expand Up @@ -206,6 +175,7 @@ private static Void initGson() {
return null;
}

/* @anomalydetection.createcomponents
@Override
public Collection<Object> createComponents(
Client client,
Expand All @@ -221,18 +191,14 @@ public Collection<Object> createComponents(
Supplier<RepositoriesService> repositoriesServiceSupplier
) {
EnabledSetting.getInstance().init(clusterService);
/* @anomaly-detection.create-detector
NumericSetting.getInstance().init(clusterService);
this.client = client;
this.threadPool = threadPool;
*/
Settings settings = environment.settings();
/* @anomaly-detection.create-detector
Throttler throttler = new Throttler(getClock());
this.clientUtil = new ClientUtil(settings, client, throttler);
this.indexUtils = new IndexUtils(client, clientUtil, clusterService, indexNameExpressionResolver);
this.nodeFilter = new DiscoveryNodeFilterer(clusterService);
*/
// AnomalyDetectionIndices is Injected for IndexAnomalyDetectorTrasnportAction constructor
this.anomalyDetectionIndices = new AnomalyDetectionIndices(
null, // client,
Expand All @@ -243,7 +209,7 @@ public Collection<Object> createComponents(
AnomalyDetectorSettings.MAX_UPDATE_RETRY_TIMES
);
this.clusterService = clusterService;

dbwiddis marked this conversation as resolved.
Show resolved Hide resolved
SingleFeatureLinearUniformInterpolator singleFeatureLinearUniformInterpolator =
new IntegerSensitiveSingleFeatureLinearUniformInterpolator();
Interpolator interpolator = new LinearUniformInterpolator(singleFeatureLinearUniformInterpolator);
Expand All @@ -257,26 +223,26 @@ public Collection<Object> createComponents(
null, // ClusterService clusterService,
AnomalyDetectorSettings.NUM_SAMPLES_PER_TREE
);

JvmService jvmService = new JvmService(environment.settings());
RandomCutForestMapper mapper = new RandomCutForestMapper();
mapper.setSaveExecutorContextEnabled(true);
mapper.setSaveTreeStateEnabled(true);
mapper.setPartialTreeStateEnabled(true);
V1JsonToV3StateConverter converter = new V1JsonToV3StateConverter();

double modelMaxSizePercent = AnomalyDetectorSettings.MODEL_MAX_SIZE_PERCENTAGE.get(settings);

ADCircuitBreakerService adCircuitBreakerService = new ADCircuitBreakerService(jvmService).init();

MemoryTracker memoryTracker = new MemoryTracker(
jvmService,
modelMaxSizePercent,
AnomalyDetectorSettings.DESIRED_MODEL_SIZE_PERCENTAGE,
clusterService,
adCircuitBreakerService
);

NodeStateManager stateManager = new NodeStateManager(
client,
xContentRegistry,
Expand All @@ -286,7 +252,7 @@ public Collection<Object> createComponents(
AnomalyDetectorSettings.HOURLY_MAINTENANCE,
clusterService
);

FeatureManager featureManager = new FeatureManager(
searchFeatureDao,
interpolator,
Expand All @@ -304,7 +270,6 @@ public Collection<Object> createComponents(
AD_THREAD_POOL_NAME
);
long heapSizeBytes = JvmInfo.jvmInfo().getMem().getHeapMax().getBytes();
/* @anomaly-detection.create-detector
serializeRCFBufferPool = AccessController.doPrivileged(new PrivilegedAction<GenericObjectPool<LinkedBuffer>>() {
@Override
public GenericObjectPool<LinkedBuffer> run() {
Expand All @@ -326,7 +291,6 @@ public PooledObject<LinkedBuffer> wrap(LinkedBuffer obj) {
serializeRCFBufferPool.setMinIdle(0);
serializeRCFBufferPool.setBlockWhenExhausted(false);
serializeRCFBufferPool.setTimeBetweenEvictionRuns(AnomalyDetectorSettings.HOURLY_MAINTENANCE);
*/
CheckpointDao checkpoint = new CheckpointDao(
client,
clientUtil,
Expand All @@ -347,9 +311,9 @@ public PooledObject<LinkedBuffer> wrap(LinkedBuffer obj) {
AnomalyDetectorSettings.SERIALIZATION_BUFFER_BYTES,
1 - AnomalyDetectorSettings.THRESHOLD_MIN_PVALUE
);

Random random = new Random(42);

CheckpointWriteWorker checkpointWriteQueue = new CheckpointWriteWorker(
heapSizeBytes,
AnomalyDetectorSettings.CHECKPOINT_WRITE_QUEUE_SIZE_IN_BYTES,
Expand All @@ -371,7 +335,6 @@ public PooledObject<LinkedBuffer> wrap(LinkedBuffer obj) {
stateManager,
AnomalyDetectorSettings.HOURLY_MAINTENANCE
);
/* @anomaly-detection.create-detector
EntityCache cache = new PriorityCache(
checkpoint,
AnomalyDetectorSettings.DEDICATED_CACHE_SIZE.get(settings),
Expand All @@ -388,7 +351,6 @@ public PooledObject<LinkedBuffer> wrap(LinkedBuffer obj) {
);

CacheProvider cacheProvider = new CacheProvider(cache);
*/
EntityColdStarter entityColdStarter = new EntityColdStarter(
getClock(),
threadPool,
Expand All @@ -408,7 +370,6 @@ public PooledObject<LinkedBuffer> wrap(LinkedBuffer obj) {
checkpointWriteQueue,
AnomalyDetectorSettings.MAX_COLD_START_ROUNDS
);
/* @anomaly-detection.create-detector
EntityColdStartWorker coldstartQueue = new EntityColdStartWorker(
heapSizeBytes,
AnomalyDetectorSettings.ENTITY_REQUEST_SIZE_IN_BYTES,
Expand All @@ -428,8 +389,7 @@ public PooledObject<LinkedBuffer> wrap(LinkedBuffer obj) {
AnomalyDetectorSettings.HOURLY_MAINTENANCE,
stateManager
);
*/


ModelManager modelManager = new ModelManager(
checkpoint,
getClock(),
Expand All @@ -445,7 +405,6 @@ public PooledObject<LinkedBuffer> wrap(LinkedBuffer obj) {
featureManager,
memoryTracker
);
/* @anomaly-detection.create-detector
MultiEntityResultHandler multiEntityResultHandler = new MultiEntityResultHandler(
client,
settings,
Expand Down Expand Up @@ -521,11 +480,9 @@ public PooledObject<LinkedBuffer> wrap(LinkedBuffer obj) {
AnomalyDetectorSettings.HOURLY_MAINTENANCE,
stateManager
);
*/
// @anomaly-detection.create-detector Commented this code until we have support of Job Scheduler for extensibility
// ADDataMigrator dataMigrator = new ADDataMigrator(client, clusterService, xContentRegistry, anomalyDetectionIndices);

ADDataMigrator dataMigrator = new ADDataMigrator(client, clusterService, xContentRegistry, anomalyDetectionIndices);
HashRing hashRing = new HashRing(nodeFilter, getClock(), settings, client, clusterService, modelManager);
/* @anomaly-detection.create-detector
anomalyDetectorRunner = new AnomalyDetectorRunner(modelManager, featureManager, AnomalyDetectorSettings.MAX_PREVIEW_RESULTS);

Map<String, ADStat<?>> stats = ImmutableMap
Expand Down Expand Up @@ -571,7 +528,6 @@ public PooledObject<LinkedBuffer> wrap(LinkedBuffer obj) {
adStats = new ADStats(stats);

adTaskCacheManager = new ADTaskCacheManager(settings, clusterService, memoryTracker);
*/
adTaskManager = new ADTaskManager(
settings,
clusterService,
Expand All @@ -583,7 +539,6 @@ public PooledObject<LinkedBuffer> wrap(LinkedBuffer obj) {
adTaskCacheManager,
threadPool
);
/* @anomaly-detection.create-detector
AnomalyResultBulkIndexHandler anomalyResultBulkIndexHandler = new AnomalyResultBulkIndexHandler(
client,
settings,
Expand Down Expand Up @@ -646,9 +601,9 @@ public PooledObject<LinkedBuffer> wrap(LinkedBuffer obj) {
entityColdStarter,
adTaskCacheManager
);
*/
return ImmutableList.of(searchFeatureDao, anomalyDetectionIndices, adTaskManager);
}
*/

/**
* createComponents doesn't work for Clock as ES process cannot start
Expand Down
Loading