Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Fix empty preview result due to insufficient sample #65

Merged
merged 5 commits into from
Mar 21, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ public Collection<Object> createComponents(
SingleFeatureLinearUniformInterpolator singleFeatureLinearUniformInterpolator =
new IntegerSensitiveSingleFeatureLinearUniformInterpolator();
Interpolator interpolator = new LinearUniformInterpolator(singleFeatureLinearUniformInterpolator);
SearchFeatureDao searchFeatureDao = new SearchFeatureDao(client, scriptService, xContentRegistry, interpolator, clientUtil);
SearchFeatureDao searchFeatureDao = new SearchFeatureDao(client, xContentRegistry, interpolator, clientUtil);

JvmService jvmService = new JvmService(environment.settings());
RandomCutForestSerDe rcfSerde = new RandomCutForestSerDe();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;

import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -58,16 +59,8 @@ public AnomalyDetectorRunner(ModelManager modelManager, FeatureManager featureMa
* @param endTime detection period end time
* @param listener handle anomaly result
*/
public void run(AnomalyDetector detector, Instant startTime, Instant endTime, ActionListener<List<AnomalyResult>> listener) {
executeDetector(detector, startTime, endTime, listener);
}

private void executeDetector(
AnomalyDetector detector,
Instant startTime,
Instant endTime,
ActionListener<List<AnomalyResult>> listener
) {
public void executeDetector(AnomalyDetector detector, Instant startTime, Instant endTime, ActionListener<List<AnomalyResult>> listener)
throws IOException {
featureManager.getPreviewFeatures(detector, startTime.toEpochMilli(), endTime.toEpochMilli(), ActionListener.wrap(features -> {
try {
List<ThresholdingResult> results = modelManager.getPreviewResults(features.getProcessedFeatures());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package com.amazon.opendistroforelasticsearch.ad.feature;

import java.io.IOException;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
Expand Down Expand Up @@ -288,8 +289,10 @@ public Features getPreviewFeatures(AnomalyDetector detector, long startMilli, lo
* @param listener onResponse is called with time ranges, unprocessed features,
* and processed features of the data points from the period
* onFailure is called with IllegalArgumentException when there is no data to preview
* @throws IOException if a user gives wrong query input when defining a detector
*/
public void getPreviewFeatures(AnomalyDetector detector, long startMilli, long endMilli, ActionListener<Features> listener) {
public void getPreviewFeatures(AnomalyDetector detector, long startMilli, long endMilli, ActionListener<Features> listener)
throws IOException {
Entry<List<Entry<Long, Long>>, Integer> sampleRangeResults = getSampleRanges(detector, startMilli, endMilli);
List<Entry<Long, Long>> sampleRanges = sampleRangeResults.getKey();
int stride = sampleRangeResults.getValue();
Expand Down Expand Up @@ -356,12 +359,13 @@ private Entry<List<Entry<Long, Long>>, double[][]> getSamplesForRanges(AnomalyDe
* Gets search results for the sampled time ranges.
*
* @param listener handle search results map: key is time ranges, value is corresponding search results
* @throws IOException if a user gives wrong query input when defining a detector
*/
void getSamplesForRanges(
AnomalyDetector detector,
List<Entry<Long, Long>> sampleRanges,
ActionListener<Entry<List<Entry<Long, Long>>, double[][]>> listener
) {
) throws IOException {
searchFeatureDao.getFeatureSamplesForPeriods(detector, sampleRanges, ActionListener.wrap(featureSamples -> {
List<Entry<Long, Long>> ranges = new ArrayList<>(featureSamples.size());
List<double[]> samples = new ArrayList<>(featureSamples.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.AbstractMap.SimpleEntry;
import java.util.ArrayDeque;
import java.util.HashMap;
Expand All @@ -41,10 +42,11 @@
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.bucket.range.InternalDateRange;
import org.elasticsearch.search.aggregations.metrics.Max;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregation.SingleValue;
import org.elasticsearch.search.aggregations.metrics.InternalTDigestPercentiles;
Expand All @@ -60,13 +62,11 @@
public class SearchFeatureDao {

protected static final String AGG_NAME_MAX = "max_timefield";
protected static final String FEATURE_SAMPLE_PREFERENCE = "_shards:0";

private static final Logger logger = LogManager.getLogger(SearchFeatureDao.class);

// Dependencies
private final Client client;
private final ScriptService scriptService;
private final NamedXContentRegistry xContent;
private final Interpolator interpolator;
private final ClientUtil clientUtil;
Expand All @@ -75,20 +75,12 @@ public class SearchFeatureDao {
* Constructor injection.
*
* @param client ES client for queries
* @param scriptService ES ScriptService
* @param xContent ES XContentRegistry
* @param interpolator interpolator for missing values
* @param clientUtil utility for ES client
*/
public SearchFeatureDao(
Client client,
ScriptService scriptService,
NamedXContentRegistry xContent,
Interpolator interpolator,
ClientUtil clientUtil
) {
public SearchFeatureDao(Client client, NamedXContentRegistry xContent, Interpolator interpolator, ClientUtil clientUtil) {
this.client = client;
this.scriptService = scriptService;
this.xContent = xContent;
this.interpolator = interpolator;
this.clientUtil = clientUtil;
Expand Down Expand Up @@ -136,18 +128,10 @@ public Optional<double[]> getFeaturesForPeriod(AnomalyDetector detector, long st
}

private Optional<double[]> parseResponse(SearchResponse response, List<String> featureIds) {
return Optional
.ofNullable(response)
.filter(resp -> response.getHits().getTotalHits().value > 0L)
.map(resp -> resp.getAggregations())
.map(aggs -> aggs.asMap())
.map(
map -> featureIds
.stream()
.mapToDouble(id -> Optional.ofNullable(map.get(id)).map(this::parseAggregation).orElse(Double.NaN))
.toArray()
)
.filter(result -> Arrays.stream(result).noneMatch(d -> Double.isNaN(d) || Double.isInfinite(d)));
return parseAggregations(
Optional.ofNullable(response).filter(resp -> response.getHits().getTotalHits().value > 0L).map(resp -> resp.getAggregations()),
featureIds
);
}

private double parseAggregation(Aggregation aggregation) {
Expand Down Expand Up @@ -177,7 +161,7 @@ public List<Optional<double[]>> getFeatureSamplesForPeriods(AnomalyDetector dete
MultiSearchRequest multiSearchRequest = new MultiSearchRequest();
ranges
.stream()
.map(range -> createFeatureSearchRequest(detector, range.getKey(), range.getValue(), Optional.of(FEATURE_SAMPLE_PREFERENCE)))
.map(range -> createFeatureSearchRequest(detector, range.getKey(), range.getValue(), Optional.empty()))
.forEachOrdered(request -> multiSearchRequest.add(request));

return clientUtil
Expand All @@ -201,40 +185,33 @@ public List<Optional<double[]>> getFeatureSamplesForPeriods(AnomalyDetector dete
* @param detector info about the indices, documents, feature query
* @param ranges list of time ranges
* @param listener handle approximate features for the time ranges
* @throws IOException if a user gives wrong query input when defining a detector
*/
public void getFeatureSamplesForPeriods(
AnomalyDetector detector,
List<Entry<Long, Long>> ranges,
ActionListener<List<Optional<double[]>>> listener
) {
MultiSearchRequest multiSearchRequest = new MultiSearchRequest();
ranges
.stream()
.map(range -> createFeatureSearchRequest(detector, range.getKey(), range.getValue(), Optional.of(FEATURE_SAMPLE_PREFERENCE)))
kaituo marked this conversation as resolved.
Show resolved Hide resolved
.forEachOrdered(request -> multiSearchRequest.add(request));
) throws IOException {
SearchRequest request = createPreviewSearchRequest(detector, ranges);

client.search(request, ActionListener.wrap(response -> {
Aggregations aggs = response.getAggregations();
if (aggs == null) {
listener.onResponse(Collections.emptyList());
kaituo marked this conversation as resolved.
Show resolved Hide resolved
return;
}

client
.multiSearch(
multiSearchRequest,
ActionListener
.wrap(
response -> listener
.onResponse(
Optional
.of(response)
.map(Stream::of)
.orElseGet(Stream::empty)
.flatMap(multiSearchResp -> Arrays.stream(multiSearchResp.getResponses()))
.map(
item -> Optional
.ofNullable(item.getResponse())
.flatMap(r -> parseResponse(r, detector.getEnabledFeatureIds()))
)
.collect(Collectors.toList())
),
listener::onFailure
)
);
listener
.onResponse(
aggs
.asList()
.stream()
.filter(InternalDateRange.class::isInstance)
Copy link
Contributor

@ylwu-amzn ylwu-amzn Mar 18, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible ES returns different date range class other than InternalDateRange for this case?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for 7.4, it is not possible. For other versions, it is possible.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. So for other version, the InternalDateRange.class is still what we want?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

other versions is untested. Will do that when we release new versions.

.flatMap(agg -> ((InternalDateRange) agg).getBuckets().stream())
.map(bucket -> parseBucket(bucket, detector.getEnabledFeatureIds()))
.collect(Collectors.toList())
);
}, listener::onFailure));
}

/**
Expand Down Expand Up @@ -332,8 +309,38 @@ private SearchRequest createFeatureSearchRequest(AnomalyDetector detector, long
SearchSourceBuilder searchSourceBuilder = ParseUtils.generateInternalFeatureQuery(detector, startTime, endTime, xContent);
return new SearchRequest(detector.getIndices().toArray(new String[0]), searchSourceBuilder).preference(preference.orElse(null));
} catch (IOException e) {
logger.warn("Failed to create feature search request for " + detector + " from " + startTime + " to " + endTime, e);
logger
.warn(
"Failed to create feature search request for " + detector.getDetectorId() + " from " + startTime + " to " + endTime,
e
);
throw new IllegalStateException(e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as Yaliang's comment.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is Lai's code I guess. I guess IOException is checked exception and the method and and its callers have to declare the exception before the exception is handled. Lai prefers to use unchecked exception like IllegalStateException.

I prefer checked exceptions as they serve as documentation on what a method can throw and remind the caller to handle it.

Do you have any preference on when to use checked and unchecked exceptions?

}
}

private SearchRequest createPreviewSearchRequest(AnomalyDetector detector, List<Entry<Long, Long>> ranges) throws IOException {
try {
SearchSourceBuilder searchSourceBuilder = ParseUtils.generatePreviewQuery(detector, ranges, xContent);
return new SearchRequest(detector.getIndices().toArray(new String[0]), searchSourceBuilder);
} catch (IOException e) {
logger.warn("Failed to create feature search request for " + detector.getDetectorId() + " for preview", e);
throw e;
}
}

private Optional<double[]> parseBucket(InternalDateRange.Bucket bucket, List<String> featureIds) {
kaituo marked this conversation as resolved.
Show resolved Hide resolved
return parseAggregations(Optional.ofNullable(bucket).map(b -> b.getAggregations()), featureIds);
}

private Optional<double[]> parseAggregations(Optional<Aggregations> aggregations, List<String> featureIds) {
return aggregations
.map(aggs -> aggs.asMap())
.map(
map -> featureIds
.stream()
.mapToDouble(id -> Optional.ofNullable(map.get(id)).map(this::parseAggregation).orElse(Double.NaN))
.toArray()
)
.filter(result -> Arrays.stream(result).noneMatch(d -> Double.isNaN(d) || Double.isInfinite(d)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -184,23 +184,24 @@ protected void processResponse(GetResponse response) throws Exception {
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser::getTokenLocation);
AnomalyDetector detector = AnomalyDetector.parse(parser, response.getId(), response.getVersion());

anomalyDetectorRunner.run(detector, input.getPeriodStart(), input.getPeriodEnd(), ActionListener.wrap(anomalyResult -> {
XContentBuilder builder = channel
.newBuilder()
.startObject()
.field(ANOMALY_RESULT, anomalyResult)
.field(ANOMALY_DETECTOR, detector)
.endObject();
channel.sendResponse(new BytesRestResponse(RestStatus.OK, builder));
}, exception -> {
logger.error("Unexpected error running anomaly detector " + detector.getDetectorId(), exception);
try {
XContentBuilder builder = channel.newBuilder().startObject().field(ANOMALY_DETECTOR, detector).endObject();
anomalyDetectorRunner
.executeDetector(detector, input.getPeriodStart(), input.getPeriodEnd(), ActionListener.wrap(anomalyResult -> {
XContentBuilder builder = channel
.newBuilder()
.startObject()
.field(ANOMALY_RESULT, anomalyResult)
.field(ANOMALY_DETECTOR, detector)
.endObject();
channel.sendResponse(new BytesRestResponse(RestStatus.OK, builder));
} catch (IOException e) {
logger.error("Fail to send back exception message" + detector.getDetectorId(), exception);
}
}));
}, exception -> {
logger.error("Unexpected error running anomaly detector " + detector.getDetectorId(), exception);
try {
XContentBuilder builder = channel.newBuilder().startObject().field(ANOMALY_DETECTOR, detector).endObject();
channel.sendResponse(new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, builder));
} catch (IOException e) {
logger.error("Fail to send back exception message" + detector.getDetectorId(), exception);
}
}));
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,19 @@
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.BaseAggregationBuilder;
import org.elasticsearch.search.aggregations.PipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.range.DateRangeAggregationBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;

import java.io.IOException;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.regex.Matcher;

import static com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector.QUERY_PARAM_PERIOD_END;
import static com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector.QUERY_PARAM_PERIOD_START;
import static org.elasticsearch.search.aggregations.AggregationBuilders.dateRange;
import static org.elasticsearch.search.aggregations.AggregatorFactories.VALID_AGG_NAME;

/**
Expand Down Expand Up @@ -270,15 +274,14 @@ public static SearchSourceBuilder generateInternalFeatureQuery(
long endTime,
NamedXContentRegistry xContentRegistry
) throws IOException {
SearchSourceBuilder searchSourceBuilder = detector.generateFeatureQuery();
RangeQueryBuilder rangeQuery = new RangeQueryBuilder(detector.getTimeField())
.from(startTime)
.to(endTime)
.format("epoch_millis")
.includeLower(true)
.includeUpper(false);

BoolQueryBuilder internalFilterQuery = QueryBuilders.boolQuery().must(rangeQuery).must(searchSourceBuilder.query());
BoolQueryBuilder internalFilterQuery = QueryBuilders.boolQuery().must(rangeQuery).must(detector.getFilterQuery());

SearchSourceBuilder internalSearchSourceBuilder = new SearchSourceBuilder().query(internalFilterQuery);
if (detector.getFeatureAttributes() != null) {
Expand All @@ -295,14 +298,38 @@ public static SearchSourceBuilder generateInternalFeatureQuery(
return internalSearchSourceBuilder;
}

public static SearchSourceBuilder generatePreviewQuery(
AnomalyDetector detector,
List<Entry<Long, Long>> ranges,
NamedXContentRegistry xContentRegistry
) throws IOException {

DateRangeAggregationBuilder dateRangeBuilder = dateRange("date_range").field(detector.getTimeField()).format("epoch_millis");
for (Entry<Long, Long> range : ranges) {
dateRangeBuilder.addRange(range.getKey(), range.getValue());
}

if (detector.getFeatureAttributes() != null) {
for (Feature feature : detector.getFeatureAttributes()) {
AggregatorFactories.Builder internalAgg = parseAggregators(
feature.getAggregation().toString(),
xContentRegistry,
feature.getId()
);
dateRangeBuilder.subAggregation(internalAgg.getAggregatorFactories().iterator().next());
}
}

return new SearchSourceBuilder().query(detector.getFilterQuery()).size(0).aggregation(dateRangeBuilder);
}

public static String generateInternalFeatureQueryTemplate(AnomalyDetector detector, NamedXContentRegistry xContentRegistry)
throws IOException {
SearchSourceBuilder searchSourceBuilder = detector.generateFeatureQuery();
RangeQueryBuilder rangeQuery = new RangeQueryBuilder(detector.getTimeField())
.from("{{" + QUERY_PARAM_PERIOD_START + "}}")
.to("{{" + QUERY_PARAM_PERIOD_END + "}}");

BoolQueryBuilder internalFilterQuery = QueryBuilders.boolQuery().must(rangeQuery).must(searchSourceBuilder.query());
BoolQueryBuilder internalFilterQuery = QueryBuilders.boolQuery().must(rangeQuery).must(detector.getFilterQuery());

SearchSourceBuilder internalSearchSourceBuilder = new SearchSourceBuilder().query(internalFilterQuery);
if (detector.getFeatureAttributes() != null) {
Expand Down
Loading