Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
Fix empty preview result due to insufficient sample (#65)
Browse files Browse the repository at this point in the history
* Fix empty preview result due to insufficient sample

Preview does not use all data in the given time range as it is costly. Previously, we sample data by issuing multiple queries on shard 0 data. The purpose of the shard 0 query restriction is to reduce system costs. The nab_art_daily_jumpsup data set has one doc in each interval, and the doc is spread out in 5 shards. Even though we issue 360 queries, we only get 70~80 samples back by querying shard 0. Together with interpolated data points, the preview run misses significant portions of data required to train models (400 is the minimum) and thus returns empty preview results. This PR fixes the issue by removing the shard 0 search restriction.

Previously, the preview API issues multiple queries encapsulated in a multisearch request (the request can contain 360 search queries at most). The same result could be obtained via a date range query with multiple range buckets. We show a date range query is 2~10 times faster than a multisearch request (#63). This PR replaces the multisearch request with a date range query.

This PR also
- removes unused field scriptService in SearchFeatureDao.
- fixes rest status during exception
- fixes a bug in query generation. We generate aggregation query twice: once with filter query, once separately.

Testing done:
- Previous preview unit tests pass.
- Manually verified date range queries results are correctly processed by cross checking intermediate logs.
- Manually verified preview results with multisearch and date range implementation are the same.
- Manually verified preview don't show empty results with the nab_art_daily_jumpsup data set with the fix
  • Loading branch information
kaituo authored Mar 21, 2020
1 parent 44a74c9 commit 091c367
Show file tree
Hide file tree
Showing 8 changed files with 127 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ public Collection<Object> createComponents(
SingleFeatureLinearUniformInterpolator singleFeatureLinearUniformInterpolator =
new IntegerSensitiveSingleFeatureLinearUniformInterpolator();
Interpolator interpolator = new LinearUniformInterpolator(singleFeatureLinearUniformInterpolator);
SearchFeatureDao searchFeatureDao = new SearchFeatureDao(client, scriptService, xContentRegistry, interpolator, clientUtil);
SearchFeatureDao searchFeatureDao = new SearchFeatureDao(client, xContentRegistry, interpolator, clientUtil);

JvmService jvmService = new JvmService(environment.settings());
RandomCutForestSerDe rcfSerde = new RandomCutForestSerDe();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;

import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -58,16 +59,8 @@ public AnomalyDetectorRunner(ModelManager modelManager, FeatureManager featureMa
* @param endTime detection period end time
* @param listener handle anomaly result
*/
public void run(AnomalyDetector detector, Instant startTime, Instant endTime, ActionListener<List<AnomalyResult>> listener) {
executeDetector(detector, startTime, endTime, listener);
}

private void executeDetector(
AnomalyDetector detector,
Instant startTime,
Instant endTime,
ActionListener<List<AnomalyResult>> listener
) {
public void executeDetector(AnomalyDetector detector, Instant startTime, Instant endTime, ActionListener<List<AnomalyResult>> listener)
throws IOException {
featureManager.getPreviewFeatures(detector, startTime.toEpochMilli(), endTime.toEpochMilli(), ActionListener.wrap(features -> {
try {
List<ThresholdingResult> results = modelManager.getPreviewResults(features.getProcessedFeatures());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package com.amazon.opendistroforelasticsearch.ad.feature;

import java.io.IOException;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
Expand Down Expand Up @@ -350,8 +351,10 @@ public Features getPreviewFeatures(AnomalyDetector detector, long startMilli, lo
* @param listener onResponse is called with time ranges, unprocessed features,
* and processed features of the data points from the period
* onFailure is called with IllegalArgumentException when there is no data to preview
* @throws IOException if a user gives wrong query input when defining a detector
*/
public void getPreviewFeatures(AnomalyDetector detector, long startMilli, long endMilli, ActionListener<Features> listener) {
public void getPreviewFeatures(AnomalyDetector detector, long startMilli, long endMilli, ActionListener<Features> listener)
throws IOException {
Entry<List<Entry<Long, Long>>, Integer> sampleRangeResults = getSampleRanges(detector, startMilli, endMilli);
List<Entry<Long, Long>> sampleRanges = sampleRangeResults.getKey();
int stride = sampleRangeResults.getValue();
Expand Down Expand Up @@ -418,12 +421,13 @@ private Entry<List<Entry<Long, Long>>, double[][]> getSamplesForRanges(AnomalyDe
* Gets search results for the sampled time ranges.
*
* @param listener handle search results map: key is time ranges, value is corresponding search results
* @throws IOException if a user gives wrong query input when defining a detector
*/
void getSamplesForRanges(
AnomalyDetector detector,
List<Entry<Long, Long>> sampleRanges,
ActionListener<Entry<List<Entry<Long, Long>>, double[][]>> listener
) {
) throws IOException {
searchFeatureDao.getFeatureSamplesForPeriods(detector, sampleRanges, ActionListener.wrap(featureSamples -> {
List<Entry<Long, Long>> ranges = new ArrayList<>(featureSamples.size());
List<double[]> samples = new ArrayList<>(featureSamples.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.AbstractMap.SimpleEntry;
import java.util.ArrayDeque;
import java.util.HashMap;
Expand All @@ -41,10 +42,11 @@
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.bucket.range.InternalDateRange;
import org.elasticsearch.search.aggregations.metrics.Max;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregation.SingleValue;
import org.elasticsearch.search.aggregations.metrics.InternalTDigestPercentiles;
Expand All @@ -60,13 +62,11 @@
public class SearchFeatureDao {

protected static final String AGG_NAME_MAX = "max_timefield";
protected static final String FEATURE_SAMPLE_PREFERENCE = "_shards:0";

private static final Logger logger = LogManager.getLogger(SearchFeatureDao.class);

// Dependencies
private final Client client;
private final ScriptService scriptService;
private final NamedXContentRegistry xContent;
private final Interpolator interpolator;
private final ClientUtil clientUtil;
Expand All @@ -75,20 +75,12 @@ public class SearchFeatureDao {
* Constructor injection.
*
* @param client ES client for queries
* @param scriptService ES ScriptService
* @param xContent ES XContentRegistry
* @param interpolator interpolator for missing values
* @param clientUtil utility for ES client
*/
public SearchFeatureDao(
Client client,
ScriptService scriptService,
NamedXContentRegistry xContent,
Interpolator interpolator,
ClientUtil clientUtil
) {
public SearchFeatureDao(Client client, NamedXContentRegistry xContent, Interpolator interpolator, ClientUtil clientUtil) {
this.client = client;
this.scriptService = scriptService;
this.xContent = xContent;
this.interpolator = interpolator;
this.clientUtil = clientUtil;
Expand Down Expand Up @@ -157,18 +149,10 @@ public void getFeaturesForPeriod(AnomalyDetector detector, long startTime, long
}

private Optional<double[]> parseResponse(SearchResponse response, List<String> featureIds) {
return Optional
.ofNullable(response)
.filter(resp -> response.getHits().getTotalHits().value > 0L)
.map(resp -> resp.getAggregations())
.map(aggs -> aggs.asMap())
.map(
map -> featureIds
.stream()
.mapToDouble(id -> Optional.ofNullable(map.get(id)).map(this::parseAggregation).orElse(Double.NaN))
.toArray()
)
.filter(result -> Arrays.stream(result).noneMatch(d -> Double.isNaN(d) || Double.isInfinite(d)));
return parseAggregations(
Optional.ofNullable(response).filter(resp -> response.getHits().getTotalHits().value > 0L).map(resp -> resp.getAggregations()),
featureIds
);
}

private double parseAggregation(Aggregation aggregation) {
Expand Down Expand Up @@ -198,7 +182,7 @@ public List<Optional<double[]>> getFeatureSamplesForPeriods(AnomalyDetector dete
MultiSearchRequest multiSearchRequest = new MultiSearchRequest();
ranges
.stream()
.map(range -> createFeatureSearchRequest(detector, range.getKey(), range.getValue(), Optional.of(FEATURE_SAMPLE_PREFERENCE)))
.map(range -> createFeatureSearchRequest(detector, range.getKey(), range.getValue(), Optional.empty()))
.forEachOrdered(request -> multiSearchRequest.add(request));

return clientUtil
Expand All @@ -222,40 +206,33 @@ public List<Optional<double[]>> getFeatureSamplesForPeriods(AnomalyDetector dete
* @param detector info about the indices, documents, feature query
* @param ranges list of time ranges
* @param listener handle approximate features for the time ranges
* @throws IOException if a user gives wrong query input when defining a detector
*/
public void getFeatureSamplesForPeriods(
AnomalyDetector detector,
List<Entry<Long, Long>> ranges,
ActionListener<List<Optional<double[]>>> listener
) {
MultiSearchRequest multiSearchRequest = new MultiSearchRequest();
ranges
.stream()
.map(range -> createFeatureSearchRequest(detector, range.getKey(), range.getValue(), Optional.of(FEATURE_SAMPLE_PREFERENCE)))
.forEachOrdered(request -> multiSearchRequest.add(request));
) throws IOException {
SearchRequest request = createPreviewSearchRequest(detector, ranges);

client
.multiSearch(
multiSearchRequest,
ActionListener
.wrap(
response -> listener
.onResponse(
Optional
.of(response)
.map(Stream::of)
.orElseGet(Stream::empty)
.flatMap(multiSearchResp -> Arrays.stream(multiSearchResp.getResponses()))
.map(
item -> Optional
.ofNullable(item.getResponse())
.flatMap(r -> parseResponse(r, detector.getEnabledFeatureIds()))
)
.collect(Collectors.toList())
),
listener::onFailure
)
);
client.search(request, ActionListener.wrap(response -> {
Aggregations aggs = response.getAggregations();
if (aggs == null) {
listener.onResponse(Collections.emptyList());
return;
}

listener
.onResponse(
aggs
.asList()
.stream()
.filter(InternalDateRange.class::isInstance)
.flatMap(agg -> ((InternalDateRange) agg).getBuckets().stream())
.map(bucket -> parseBucket(bucket, detector.getEnabledFeatureIds()))
.collect(Collectors.toList())
);
}, listener::onFailure));
}

/**
Expand Down Expand Up @@ -353,8 +330,38 @@ private SearchRequest createFeatureSearchRequest(AnomalyDetector detector, long
SearchSourceBuilder searchSourceBuilder = ParseUtils.generateInternalFeatureQuery(detector, startTime, endTime, xContent);
return new SearchRequest(detector.getIndices().toArray(new String[0]), searchSourceBuilder).preference(preference.orElse(null));
} catch (IOException e) {
logger.warn("Failed to create feature search request for " + detector + " from " + startTime + " to " + endTime, e);
logger
.warn(
"Failed to create feature search request for " + detector.getDetectorId() + " from " + startTime + " to " + endTime,
e
);
throw new IllegalStateException(e);
}
}

private SearchRequest createPreviewSearchRequest(AnomalyDetector detector, List<Entry<Long, Long>> ranges) throws IOException {
try {
SearchSourceBuilder searchSourceBuilder = ParseUtils.generatePreviewQuery(detector, ranges, xContent);
return new SearchRequest(detector.getIndices().toArray(new String[0]), searchSourceBuilder);
} catch (IOException e) {
logger.warn("Failed to create feature search request for " + detector.getDetectorId() + " for preview", e);
throw e;
}
}

private Optional<double[]> parseBucket(InternalDateRange.Bucket bucket, List<String> featureIds) {
return parseAggregations(Optional.ofNullable(bucket).map(b -> b.getAggregations()), featureIds);
}

private Optional<double[]> parseAggregations(Optional<Aggregations> aggregations, List<String> featureIds) {
return aggregations
.map(aggs -> aggs.asMap())
.map(
map -> featureIds
.stream()
.mapToDouble(id -> Optional.ofNullable(map.get(id)).map(this::parseAggregation).orElse(Double.NaN))
.toArray()
)
.filter(result -> Arrays.stream(result).noneMatch(d -> Double.isNaN(d) || Double.isInfinite(d)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -184,23 +184,24 @@ protected void processResponse(GetResponse response) throws Exception {
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser::getTokenLocation);
AnomalyDetector detector = AnomalyDetector.parse(parser, response.getId(), response.getVersion());

anomalyDetectorRunner.run(detector, input.getPeriodStart(), input.getPeriodEnd(), ActionListener.wrap(anomalyResult -> {
XContentBuilder builder = channel
.newBuilder()
.startObject()
.field(ANOMALY_RESULT, anomalyResult)
.field(ANOMALY_DETECTOR, detector)
.endObject();
channel.sendResponse(new BytesRestResponse(RestStatus.OK, builder));
}, exception -> {
logger.error("Unexpected error running anomaly detector " + detector.getDetectorId(), exception);
try {
XContentBuilder builder = channel.newBuilder().startObject().field(ANOMALY_DETECTOR, detector).endObject();
anomalyDetectorRunner
.executeDetector(detector, input.getPeriodStart(), input.getPeriodEnd(), ActionListener.wrap(anomalyResult -> {
XContentBuilder builder = channel
.newBuilder()
.startObject()
.field(ANOMALY_RESULT, anomalyResult)
.field(ANOMALY_DETECTOR, detector)
.endObject();
channel.sendResponse(new BytesRestResponse(RestStatus.OK, builder));
} catch (IOException e) {
logger.error("Fail to send back exception message" + detector.getDetectorId(), exception);
}
}));
}, exception -> {
logger.error("Unexpected error running anomaly detector " + detector.getDetectorId(), exception);
try {
XContentBuilder builder = channel.newBuilder().startObject().field(ANOMALY_DETECTOR, detector).endObject();
channel.sendResponse(new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, builder));
} catch (IOException e) {
logger.error("Fail to send back exception message" + detector.getDetectorId(), exception);
}
}));
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,19 @@
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.BaseAggregationBuilder;
import org.elasticsearch.search.aggregations.PipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.range.DateRangeAggregationBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;

import java.io.IOException;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.regex.Matcher;

import static com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector.QUERY_PARAM_PERIOD_END;
import static com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector.QUERY_PARAM_PERIOD_START;
import static org.elasticsearch.search.aggregations.AggregationBuilders.dateRange;
import static org.elasticsearch.search.aggregations.AggregatorFactories.VALID_AGG_NAME;

/**
Expand Down Expand Up @@ -270,15 +274,14 @@ public static SearchSourceBuilder generateInternalFeatureQuery(
long endTime,
NamedXContentRegistry xContentRegistry
) throws IOException {
SearchSourceBuilder searchSourceBuilder = detector.generateFeatureQuery();
RangeQueryBuilder rangeQuery = new RangeQueryBuilder(detector.getTimeField())
.from(startTime)
.to(endTime)
.format("epoch_millis")
.includeLower(true)
.includeUpper(false);

BoolQueryBuilder internalFilterQuery = QueryBuilders.boolQuery().must(rangeQuery).must(searchSourceBuilder.query());
BoolQueryBuilder internalFilterQuery = QueryBuilders.boolQuery().must(rangeQuery).must(detector.getFilterQuery());

SearchSourceBuilder internalSearchSourceBuilder = new SearchSourceBuilder().query(internalFilterQuery);
if (detector.getFeatureAttributes() != null) {
Expand All @@ -295,14 +298,38 @@ public static SearchSourceBuilder generateInternalFeatureQuery(
return internalSearchSourceBuilder;
}

public static SearchSourceBuilder generatePreviewQuery(
AnomalyDetector detector,
List<Entry<Long, Long>> ranges,
NamedXContentRegistry xContentRegistry
) throws IOException {

DateRangeAggregationBuilder dateRangeBuilder = dateRange("date_range").field(detector.getTimeField()).format("epoch_millis");
for (Entry<Long, Long> range : ranges) {
dateRangeBuilder.addRange(range.getKey(), range.getValue());
}

if (detector.getFeatureAttributes() != null) {
for (Feature feature : detector.getFeatureAttributes()) {
AggregatorFactories.Builder internalAgg = parseAggregators(
feature.getAggregation().toString(),
xContentRegistry,
feature.getId()
);
dateRangeBuilder.subAggregation(internalAgg.getAggregatorFactories().iterator().next());
}
}

return new SearchSourceBuilder().query(detector.getFilterQuery()).size(0).aggregation(dateRangeBuilder);
}

public static String generateInternalFeatureQueryTemplate(AnomalyDetector detector, NamedXContentRegistry xContentRegistry)
throws IOException {
SearchSourceBuilder searchSourceBuilder = detector.generateFeatureQuery();
RangeQueryBuilder rangeQuery = new RangeQueryBuilder(detector.getTimeField())
.from("{{" + QUERY_PARAM_PERIOD_START + "}}")
.to("{{" + QUERY_PARAM_PERIOD_END + "}}");

BoolQueryBuilder internalFilterQuery = QueryBuilders.boolQuery().must(rangeQuery).must(searchSourceBuilder.query());
BoolQueryBuilder internalFilterQuery = QueryBuilders.boolQuery().must(rangeQuery).must(detector.getFilterQuery());

SearchSourceBuilder internalSearchSourceBuilder = new SearchSourceBuilder().query(internalFilterQuery);
if (detector.getFeatureAttributes() != null) {
Expand Down
Loading

0 comments on commit 091c367

Please sign in to comment.