Skip to content

Commit

Permalink
Compact rcf integration (opensearch-project#149)
Browse files Browse the repository at this point in the history
* compact rcf
  • Loading branch information
kaituo authored and ohltyler committed Sep 1, 2021
1 parent 53b13db commit d8a8084
Show file tree
Hide file tree
Showing 72 changed files with 3,179 additions and 1,217 deletions.
35 changes: 26 additions & 9 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ buildscript {
ext {
opensearch_group = "org.opensearch"
opensearch_version = System.getProperty("opensearch.version", "1.0.0")
common_utils_version = '1.0.0.0'
job_scheduler_version = '1.0.0.0'
common_utils_version = System.getProperty("common_utils.version", "1.0.0.0")
job_scheduler_version = System.getProperty("job_scheduler.version", "1.0.0.0")
}

repositories {
Expand Down Expand Up @@ -62,7 +62,7 @@ repositories {
}

ext {
opensearchVersion = '1.0.0'
opensearchVersion = System.getProperty("opensearch.version", "1.0.0")
isSnapshot = "true" == System.getProperty("build.snapshot", "true")
}

Expand Down Expand Up @@ -102,8 +102,6 @@ configurations.all {
if (it.state != Configuration.State.UNRESOLVED) return
resolutionStrategy {
force "joda-time:joda-time:${versions.joda}"
force "com.fasterxml.jackson.core:jackson-core:${versions.jackson}"
force "com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:${versions.jackson}"
force "commons-logging:commons-logging:${versions.commonslogging}"
force "org.apache.httpcomponents:httpcore:${versions.httpcore}"
force "commons-codec:commons-codec:${versions.commonscodec}"
Expand Down Expand Up @@ -335,6 +333,7 @@ List<String> jacocoExclusions = [
'org.opensearch.ad.util.BulkUtil',
'org.opensearch.ad.util.ExceptionUtil',
'org.opensearch.ad.ml.EntityModel',
'org.opensearch.ad.ml.ModelPartitioner',

// TODO: unified flow caused coverage drop
'org.opensearch.ad.transport.AnomalyDetectorJobRequest',
Expand All @@ -354,7 +353,6 @@ List<String> jacocoExclusions = [
'org.opensearch.ad.transport.AnomalyDetectorJobTransportAction',
'org.opensearch.ad.transport.CronNodeRequest',
'org.opensearch.ad.transport.DeleteAnomalyResultsTransportAction',
'org.opensearch.ad.transport.EntityResultResponse',
'org.opensearch.ad.transport.GetAnomalyDetectorResponse'
]

Expand Down Expand Up @@ -399,15 +397,34 @@ dependencies {
compileOnly "org.opensearch.plugin:opensearch-scripting-painless-spi:${opensearch_version}"
compileOnly "org.opensearch:opensearch-job-scheduler-spi:${job_scheduler_version}"
compile "org.opensearch:common-utils:${common_utils_version}"
compile "org.opensearch.client:opensearch-rest-client:${opensearch_version}"
compile group: 'com.google.guava', name: 'guava', version:'29.0-jre'
compile group: 'org.apache.commons', name: 'commons-math3', version: '3.6.1'
compile group: 'com.google.code.gson', name: 'gson', version: '2.8.6'
compile group: 'com.yahoo.datasketches', name: 'sketches-core', version: '0.13.4'
compile group: 'com.yahoo.datasketches', name: 'memory', version: '0.12.2'
compile group: 'commons-lang', name: 'commons-lang', version: '2.6'
compile 'software.amazon.randomcutforest:randomcutforest-core:1.0'
compile 'software.amazon.randomcutforest:randomcutforest-serialization-json:1.0'
compile "org.opensearch.client:opensearch-rest-client:${opensearch_version}"
compile group: 'org.apache.commons', name: 'commons-pool2', version: '2.10.0'

// randomcutforest-serialization uses jackson 2.12, but opensearch-scripting-painless-spi uses jackson 2.11.
// compile scope won't work due to conflict.
// resolutionStrategy using 2.11 won't work as
// com.fasterxml.jackson.databind.ObjectMapper depends on com/fasterxml/jackson/core/util/JacksonFeature
// that is created since 2.12. Compile won't fail but there is a runtime ClassNotFoundException
// due to absent JacksonFeature.
// The fix is to put jackson in direct dependency and use implementation scope.
// implementation scope let the dependency in both compiling and running classpath, but
// not leaked through to clients (Opensearch). Here we force the jackson version to whatever
// opensearch uses.
compile 'software.amazon.randomcutforest:randomcutforest-core:2.0-rc2'
implementation 'software.amazon.randomcutforest:randomcutforest-serialization:2.0-rc2'
implementation "com.fasterxml.jackson.core:jackson-core:${versions.jackson}"
implementation "com.fasterxml.jackson.core:jackson-databind:${versions.jackson}"
implementation "com.fasterxml.jackson.core:jackson-annotations:${versions.jackson}"

// used for serializing/deserializing rcf models.
compile group: 'io.protostuff', name: 'protostuff-core', version: '1.7.4'
compile group: 'io.protostuff', name: 'protostuff-runtime', version: '1.7.4'

compile "org.jacoco:org.jacoco.agent:0.8.5"
compile ("org.jacoco:org.jacoco.ant:0.8.5") {
Expand Down
74 changes: 66 additions & 8 deletions src/main/java/org/opensearch/ad/AnomalyDetectorPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.apache.commons.pool2.BasePooledObjectFactory;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.SpecialPermission;
Expand Down Expand Up @@ -199,12 +203,18 @@
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.watcher.ResourceWatcherService;

import com.amazon.randomcutforest.serialize.RandomCutForestSerDe;
import com.amazon.randomcutforest.serialize.json.v1.V1JsonToV2StateConverter;
import com.amazon.randomcutforest.state.RandomCutForestMapper;
import com.amazon.randomcutforest.state.RandomCutForestState;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;

import io.protostuff.LinkedBuffer;
import io.protostuff.Schema;
import io.protostuff.runtime.RuntimeSchema;

/**
* Entry point of AD plugin.
*/
Expand Down Expand Up @@ -233,6 +243,8 @@ public class AnomalyDetectorPlugin extends Plugin implements ActionPlugin, Scrip
private ADTaskCacheManager adTaskCacheManager;
private ADTaskManager adTaskManager;
private ADBatchTaskRunner adBatchTaskRunner;
// package private for testing
GenericObjectPool<LinkedBuffer> serializeRCFBufferPool;

static {
SpecialPermission.check();
Expand Down Expand Up @@ -349,7 +361,13 @@ public Collection<Object> createComponents(
);

JvmService jvmService = new JvmService(environment.settings());
RandomCutForestSerDe rcfSerde = new RandomCutForestSerDe();
RandomCutForestMapper mapper = new RandomCutForestMapper();
mapper.setSaveExecutorContextEnabled(true);
mapper.setSaveTreeStateEnabled(true);
mapper.setPartialTreeStateEnabled(true);
Schema<RandomCutForestState> schema = AccessController
.doPrivileged((PrivilegedAction<Schema<RandomCutForestState>>) () -> RuntimeSchema.getSchema(RandomCutForestState.class));
V1JsonToV2StateConverter converter = new V1JsonToV2StateConverter();

double modelMaxSizePercent = AnomalyDetectorSettings.MODEL_MAX_SIZE_PERCENTAGE.get(settings);

Expand Down Expand Up @@ -378,7 +396,8 @@ public Collection<Object> createComponents(
clientUtil,
getClock(),
AnomalyDetectorSettings.HOURLY_MAINTENANCE,
modelPartitioner
modelPartitioner,
clusterService
);

FeatureManager featureManager = new FeatureManager(
Expand All @@ -400,15 +419,41 @@ public Collection<Object> createComponents(

long heapSizeBytes = JvmInfo.jvmInfo().getMem().getHeapMax().getBytes();

serializeRCFBufferPool = AccessController.doPrivileged(new PrivilegedAction<GenericObjectPool<LinkedBuffer>>() {
@Override
public GenericObjectPool<LinkedBuffer> run() {
return new GenericObjectPool<>(new BasePooledObjectFactory<LinkedBuffer>() {
@Override
public LinkedBuffer create() throws Exception {
return LinkedBuffer.allocate(AnomalyDetectorSettings.SERIALIZATION_BUFFER_BYTES);
}

@Override
public PooledObject<LinkedBuffer> wrap(LinkedBuffer obj) {
return new DefaultPooledObject<>(obj);
}
});
}
});
serializeRCFBufferPool.setMaxTotal(AnomalyDetectorSettings.MAX_TOTAL_RCF_SERIALIZATION_BUFFERS);
serializeRCFBufferPool.setMaxIdle(AnomalyDetectorSettings.MAX_TOTAL_RCF_SERIALIZATION_BUFFERS);
serializeRCFBufferPool.setMinIdle(0);
serializeRCFBufferPool.setBlockWhenExhausted(false);
serializeRCFBufferPool.setTimeBetweenEvictionRuns(AnomalyDetectorSettings.HOURLY_MAINTENANCE);

CheckpointDao checkpoint = new CheckpointDao(
client,
clientUtil,
CommonName.CHECKPOINT_INDEX_NAME,
gson,
rcfSerde,
mapper,
schema,
converter,
HybridThresholdingModel.class,
anomalyDetectionIndices,
AnomalyDetectorSettings.MAX_CHECKPOINT_BYTES
AnomalyDetectorSettings.MAX_CHECKPOINT_BYTES,
serializeRCFBufferPool,
AnomalyDetectorSettings.SERIALIZATION_BUFFER_BYTES
);

Random random = new Random(42);
Expand Down Expand Up @@ -498,9 +543,7 @@ public Collection<Object> createComponents(
);

ModelManager modelManager = new ModelManager(
rcfSerde,
checkpoint,
gson,
getClock(),
AnomalyDetectorSettings.NUM_TREES,
AnomalyDetectorSettings.NUM_SAMPLES_PER_TREE,
Expand All @@ -512,7 +555,6 @@ public Collection<Object> createComponents(
AnomalyDetectorSettings.THRESHOLD_NUM_LOGNORMAL_QUANTILES,
AnomalyDetectorSettings.THRESHOLD_DOWNSAMPLES,
AnomalyDetectorSettings.THRESHOLD_MAX_SAMPLES,
HybridThresholdingModel.class,
AnomalyDetectorSettings.MIN_PREVIEW_SIZE,
AnomalyDetectorSettings.HOURLY_MAINTENANCE,
AnomalyDetectorSettings.HOURLY_MAINTENANCE,
Expand Down Expand Up @@ -915,4 +957,20 @@ public ScheduledJobParser getJobParser() {
return AnomalyDetectorJob.parse(parser);
};
}

@Override
public void close() {
if (serializeRCFBufferPool != null) {
try {
AccessController.doPrivileged((PrivilegedAction<Void>) () -> {
serializeRCFBufferPool.clear();
serializeRCFBufferPool.close();
return null;
});
serializeRCFBufferPool = null;
} catch (Exception e) {
LOG.error("Failed to shut down object Pool", e);
}
}
}
}
6 changes: 4 additions & 2 deletions src/main/java/org/opensearch/ad/AnomalyDetectorRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ public void executeDetector(
startTime.toEpochMilli(),
endTime.toEpochMilli(),
ActionListener.wrap(features -> {
List<ThresholdingResult> entityResults = modelManager.getPreviewResults(features.getProcessedFeatures());
List<ThresholdingResult> entityResults = modelManager
.getPreviewResults(features.getProcessedFeatures(), detector.getShingleSize());
List<AnomalyResult> sampledEntityResults = sample(
parsePreviewResult(detector, features, entityResults, entity),
maxPreviewResults
Expand All @@ -131,7 +132,8 @@ public void executeDetector(
} else {
featureManager.getPreviewFeatures(detector, startTime.toEpochMilli(), endTime.toEpochMilli(), ActionListener.wrap(features -> {
try {
List<ThresholdingResult> results = modelManager.getPreviewResults(features.getProcessedFeatures());
List<ThresholdingResult> results = modelManager
.getPreviewResults(features.getProcessedFeatures(), detector.getShingleSize());
listener.onResponse(sample(parsePreviewResult(detector, features, results, null), maxPreviewResults));
} catch (Exception e) {
onFailure(e, listener, detector.getDetectorId());
Expand Down
92 changes: 66 additions & 26 deletions src/main/java/org/opensearch/ad/MemoryTracker.java
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public MemoryTracker(
* @return true if there is enough memory; otherwise throw LimitExceededException.
*/
public synchronized boolean isHostingAllowed(String detectorId, RandomCutForest rcf) {
long requiredBytes = estimateModelSize(rcf);
long requiredBytes = estimateTotalModelSize(rcf);
if (canAllocateReserved(requiredBytes)) {
return true;
} else {
Expand Down Expand Up @@ -181,8 +181,19 @@ private void adjustOriginMemoryRelease(long memoryToConsume, Origin origin, Map<
* @param forest RCF forest object
* @return estimated model size in bytes
*/
public long estimateModelSize(RandomCutForest forest) {
return estimateModelSize(forest.getDimensions(), forest.getNumberOfTrees(), forest.getSampleSize());
public long estimateTotalModelSize(RandomCutForest forest) {
return estimateRCFModelSize(forest.getDimensions(), forest.getNumberOfTrees(), forest.getBoundingBoxCacheFraction())
+ thresholdModelBytes;
}

/**
* Gets the estimated size of a RCF model.
*
* @param forest RCF forest object
* @return estimated model size in bytes
*/
public long estimateRCFModelSize(RandomCutForest forest) {
return estimateRCFModelSize(forest.getDimensions(), forest.getNumberOfTrees(), forest.getBoundingBoxCacheFraction());
}

/**
Expand All @@ -191,40 +202,69 @@ public long estimateModelSize(RandomCutForest forest) {
*
* @param detector detector config object
* @param numberOfTrees the number of trees in a RCF forest
* @param boundingBoxCacheFraction Bounding box cache ratio in RCF
* @return estimated model size in bytes
*/
public long estimateModelSize(AnomalyDetector detector, int numberOfTrees) {
return estimateModelSize(detector.getEnabledFeatureIds().size() * detector.getShingleSize(), numberOfTrees, sampleSize);
public long estimateTotalModelSize(AnomalyDetector detector, int numberOfTrees, double boundingBoxCacheFraction) {
return estimateRCFModelSize(
detector.getEnabledFeatureIds().size() * detector.getShingleSize(),
numberOfTrees,
boundingBoxCacheFraction
) + thresholdModelBytes;
}

/**
* Gets the estimated size of an entity's model.
* RCF size:
* (Num_trees * num_samples * ( (16*dimensions + 84) + (24*dimensions + 48)))
*
* (16*dimensions + 84) is for non-leaf node. 16 are for two doubles for min and max.
* 84 is the meta-data size we observe from jmap data.
* (24*dimensions + 48)) is for leaf node. We find a leaf node has 3 vectors: leaf pointers,
* min, and max arrays from jmap data. That’s why we use 24 ( 3 doubles). 48 is the
* meta-data size we observe from jmap data.
*
* Sampler size:
* Number_of_trees * num_samples * ( 12 (object) + 8 (subsequence) + 8 (weight) + 8 (point reference))
*
* The range of mem usage of RCF model in our test(1feature, 1 shingle) is from ~400K to ~800K.
* Using shingle size 1 and 1 feature (total dimension = 1), one rcf’s size is of 532 K,
* which lies in our range of 400~800 k.
* RCF size:
* Assume the sample size is 256. A compact RCF forest consists of:
* - Random number generator: 56 bytes
* - PointStoreCoordinator: 24 bytes
* - SequentialForestUpdateExecutor: 24 bytes
* - SequentialForestTraversalExecutor: 16 bytes
* - PointStoreFloat
* + IndexManager
* - int array for free indexes: 256 * numberOfTrees * 4, where 4 is the size of an integer
* - two int array for locationList and refCount: 256 * numberOfTrees * 4 bytes * 2
* - a float array for data store: 256 * trees * dimension * 4 bytes
* - ComponentList: an array of size numberOfTrees
* + SamplerPlusTree
* - CompactSampler: 2248
* + CompactRandomCutTreeFloat
* - other fields: 152
* - SmallNodeStore (small node store since our sample size is 256, less than the max of short): 6120
* + BoxCacheFloat
* - other: 104
* - BoundingBoxFloat: (1040 + 255* (dimension * 4 * 2 + 64)) * adjusted bounding box cache usage,
* where if full we have 255 inner node and each box has 80 bytes.
* Plus metadata, we can have in total 21544 bytes.
* {@code adjusted bounding box cache usage = (bounding box cache fraction >= 0.3? 1: bounding box cache fraction)}
* {@code >= 0.3} we will still initialize bounding box cache array of the max size,
* but exclude them using the cache ratio. It is not guaranteed we will only
* use cache ratio in the array. For example, with cache ratio 0.5, we used 150
* out of 255 elements. So can have two float array whose size is the number of
* dimensions; other constants are the metadata size.
* In total, RCF size is
* 56 + # trees * (2248 + 152 + 6120 + 104 + (1040 + 255* (dimension * 4 * 2 + 64)) * adjusted bounding box cache ratio) +
* (256 * # trees * 2 + 256 * # trees * dimension) * 4 bytes * 0.5 + 1064 + 24 + 24 + 16
* = 56 + # trees * (8624 + (1040 + 255 * (dimension * 8 + 64)) * adjusted bounding box cache ratio) + 256 * # trees *
* (2 + dimension) * 4 * 0.5 + 1128
*
* @param dimension The number of feature dimensions in RCF
* @param numberOfTrees The number of trees in RCF
* @param numSamples The number of samples in RCF
* @return estimated model size in bytes
* @param boundingBoxCacheFraction Bounding box cache ratio in RCF
* @return estimated RCF model size
*/
public long estimateModelSize(int dimension, int numberOfTrees, int numSamples) {
long totalSamples = (long) numberOfTrees * (long) numSamples;
long rcfSize = totalSamples * (40 * dimension + 132);
long samplerSize = totalSamples * 36;
return rcfSize + samplerSize + thresholdModelBytes;
public long estimateRCFModelSize(int dimension, int numberOfTrees, double boundingBoxCacheFraction) {
float averagePointStoreUsage = dimension == 1 ? 1 : 0.5f;
float actualBoundingBoxUsage = boundingBoxCacheFraction >= 0.3 ? 1 : (float) boundingBoxCacheFraction;
long compactRcfSize = (long) (56 + numberOfTrees * (8624 + (1040 + 255 * (dimension * 8 + 64)) * actualBoundingBoxUsage) + 256
* numberOfTrees * (2 + dimension) * 4 * averagePointStoreUsage + 1128);
return compactRcfSize;
}

public long estimateTotalModelSize(int dimension, int numberOfTrees, double boundingBoxCacheFraction) {
return estimateRCFModelSize(dimension, numberOfTrees, boundingBoxCacheFraction) + thresholdModelBytes;
}

/**
Expand Down
Loading

0 comments on commit d8a8084

Please sign in to comment.