Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Fix empty preview result due to insufficient sample #65

Merged
merged 5 commits into from
Mar 21, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ public Collection<Object> createComponents(
SingleFeatureLinearUniformInterpolator singleFeatureLinearUniformInterpolator =
new IntegerSensitiveSingleFeatureLinearUniformInterpolator();
Interpolator interpolator = new LinearUniformInterpolator(singleFeatureLinearUniformInterpolator);
SearchFeatureDao searchFeatureDao = new SearchFeatureDao(client, scriptService, xContentRegistry, interpolator, clientUtil);
SearchFeatureDao searchFeatureDao = new SearchFeatureDao(client, xContentRegistry, interpolator, clientUtil);

JvmService jvmService = new JvmService(environment.settings());
RandomCutForestSerDe rcfSerde = new RandomCutForestSerDe();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.AbstractMap.SimpleEntry;
import java.util.ArrayDeque;
import java.util.HashMap;
Expand All @@ -41,10 +42,11 @@
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.bucket.range.InternalDateRange;
import org.elasticsearch.search.aggregations.metrics.Max;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregation.SingleValue;
import org.elasticsearch.search.aggregations.metrics.InternalTDigestPercentiles;
Expand All @@ -60,13 +62,11 @@
public class SearchFeatureDao {

protected static final String AGG_NAME_MAX = "max_timefield";
protected static final String FEATURE_SAMPLE_PREFERENCE = "_shards:0";

private static final Logger logger = LogManager.getLogger(SearchFeatureDao.class);

// Dependencies
private final Client client;
private final ScriptService scriptService;
private final NamedXContentRegistry xContent;
private final Interpolator interpolator;
private final ClientUtil clientUtil;
Expand All @@ -75,20 +75,12 @@ public class SearchFeatureDao {
* Constructor injection.
*
* @param client ES client for queries
* @param scriptService ES ScriptService
* @param xContent ES XContentRegistry
* @param interpolator interpolator for missing values
* @param clientUtil utility for ES client
*/
public SearchFeatureDao(
Client client,
ScriptService scriptService,
NamedXContentRegistry xContent,
Interpolator interpolator,
ClientUtil clientUtil
) {
public SearchFeatureDao(Client client, NamedXContentRegistry xContent, Interpolator interpolator, ClientUtil clientUtil) {
this.client = client;
this.scriptService = scriptService;
this.xContent = xContent;
this.interpolator = interpolator;
this.clientUtil = clientUtil;
Expand Down Expand Up @@ -136,18 +128,10 @@ public Optional<double[]> getFeaturesForPeriod(AnomalyDetector detector, long st
}

private Optional<double[]> parseResponse(SearchResponse response, List<String> featureIds) {
return Optional
.ofNullable(response)
.filter(resp -> response.getHits().getTotalHits().value > 0L)
.map(resp -> resp.getAggregations())
.map(aggs -> aggs.asMap())
.map(
map -> featureIds
.stream()
.mapToDouble(id -> Optional.ofNullable(map.get(id)).map(this::parseAggregation).orElse(Double.NaN))
.toArray()
)
.filter(result -> Arrays.stream(result).noneMatch(d -> Double.isNaN(d) || Double.isInfinite(d)));
return parseAggregations(
Optional.ofNullable(response).filter(resp -> response.getHits().getTotalHits().value > 0L).map(resp -> resp.getAggregations()),
featureIds
);
}

private double parseAggregation(Aggregation aggregation) {
Expand Down Expand Up @@ -177,7 +161,7 @@ public List<Optional<double[]>> getFeatureSamplesForPeriods(AnomalyDetector dete
MultiSearchRequest multiSearchRequest = new MultiSearchRequest();
ranges
.stream()
.map(range -> createFeatureSearchRequest(detector, range.getKey(), range.getValue(), Optional.of(FEATURE_SAMPLE_PREFERENCE)))
.map(range -> createFeatureSearchRequest(detector, range.getKey(), range.getValue(), Optional.empty()))
.forEachOrdered(request -> multiSearchRequest.add(request));

return clientUtil
Expand Down Expand Up @@ -207,34 +191,25 @@ public void getFeatureSamplesForPeriods(
List<Entry<Long, Long>> ranges,
ActionListener<List<Optional<double[]>>> listener
) {
MultiSearchRequest multiSearchRequest = new MultiSearchRequest();
ranges
.stream()
.map(range -> createFeatureSearchRequest(detector, range.getKey(), range.getValue(), Optional.of(FEATURE_SAMPLE_PREFERENCE)))
kaituo marked this conversation as resolved.
Show resolved Hide resolved
.forEachOrdered(request -> multiSearchRequest.add(request));
SearchRequest request = createPreviewSearchRequest(detector, ranges);

client
.multiSearch(
multiSearchRequest,
ActionListener
.wrap(
response -> listener
.onResponse(
Optional
.of(response)
.map(Stream::of)
.orElseGet(Stream::empty)
.flatMap(multiSearchResp -> Arrays.stream(multiSearchResp.getResponses()))
.map(
item -> Optional
.ofNullable(item.getResponse())
.flatMap(r -> parseResponse(r, detector.getEnabledFeatureIds()))
)
.collect(Collectors.toList())
),
listener::onFailure
)
);
client.search(request, ActionListener.wrap(response -> {
Aggregations aggs = response.getAggregations();
if (aggs == null) {
listener.onResponse(Collections.emptyList());
kaituo marked this conversation as resolved.
Show resolved Hide resolved
}

listener
.onResponse(
aggs
.asList()
.stream()
.filter(InternalDateRange.class::isInstance)
Copy link
Contributor

@ylwu-amzn ylwu-amzn Mar 18, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible ES returns different date range class other than InternalDateRange for this case?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for 7.4, it is not possible. For other versions, it is possible.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. So for other version, the InternalDateRange.class is still what we want?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

other versions is untested. Will do that when we release new versions.

.flatMap(agg -> ((InternalDateRange) agg).getBuckets().stream())
.map(bucket -> parseBucket(bucket, detector.getEnabledFeatureIds()))
.collect(Collectors.toList())
);
}, listener::onFailure));
}

/**
Expand Down Expand Up @@ -336,4 +311,30 @@ private SearchRequest createFeatureSearchRequest(AnomalyDetector detector, long
throw new IllegalStateException(e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as Yaliang's comment.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is Lai's code I guess. I guess IOException is checked exception and the method and and its callers have to declare the exception before the exception is handled. Lai prefers to use unchecked exception like IllegalStateException.

I prefer checked exceptions as they serve as documentation on what a method can throw and remind the caller to handle it.

Do you have any preference on when to use checked and unchecked exceptions?

}
}

private SearchRequest createPreviewSearchRequest(AnomalyDetector detector, List<Entry<Long, Long>> ranges) {
try {
SearchSourceBuilder searchSourceBuilder = ParseUtils.generatePreviewQuery(detector, ranges, xContent);
return new SearchRequest(detector.getIndices().toArray(new String[0]), searchSourceBuilder);
} catch (IOException e) {
logger.warn("Failed to create feature search request for " + detector + " for preview", e);
kaituo marked this conversation as resolved.
Show resolved Hide resolved
throw new IllegalStateException(e);
kaituo marked this conversation as resolved.
Show resolved Hide resolved
}
}

private Optional<double[]> parseBucket(InternalDateRange.Bucket bucket, List<String> featureIds) {
kaituo marked this conversation as resolved.
Show resolved Hide resolved
return parseAggregations(Optional.ofNullable(bucket).map(b -> b.getAggregations()), featureIds);
}

private Optional<double[]> parseAggregations(Optional<Aggregations> aggregations, List<String> featureIds) {
return aggregations
.map(aggs -> aggs.asMap())
.map(
map -> featureIds
.stream()
.mapToDouble(id -> Optional.ofNullable(map.get(id)).map(this::parseAggregation).orElse(Double.NaN))
.toArray()
)
.filter(result -> Arrays.stream(result).noneMatch(d -> Double.isNaN(d) || Double.isInfinite(d)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,19 @@
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.BaseAggregationBuilder;
import org.elasticsearch.search.aggregations.PipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.range.DateRangeAggregationBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;

import java.io.IOException;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.regex.Matcher;

import static com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector.QUERY_PARAM_PERIOD_END;
import static com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector.QUERY_PARAM_PERIOD_START;
import static org.elasticsearch.search.aggregations.AggregationBuilders.dateRange;
import static org.elasticsearch.search.aggregations.AggregatorFactories.VALID_AGG_NAME;

/**
Expand Down Expand Up @@ -270,15 +274,14 @@ public static SearchSourceBuilder generateInternalFeatureQuery(
long endTime,
NamedXContentRegistry xContentRegistry
) throws IOException {
SearchSourceBuilder searchSourceBuilder = detector.generateFeatureQuery();
RangeQueryBuilder rangeQuery = new RangeQueryBuilder(detector.getTimeField())
.from(startTime)
.to(endTime)
.format("epoch_millis")
.includeLower(true)
.includeUpper(false);

BoolQueryBuilder internalFilterQuery = QueryBuilders.boolQuery().must(rangeQuery).must(searchSourceBuilder.query());
BoolQueryBuilder internalFilterQuery = QueryBuilders.boolQuery().must(rangeQuery).must(detector.getFilterQuery());

SearchSourceBuilder internalSearchSourceBuilder = new SearchSourceBuilder().query(internalFilterQuery);
if (detector.getFeatureAttributes() != null) {
Expand All @@ -295,14 +298,38 @@ public static SearchSourceBuilder generateInternalFeatureQuery(
return internalSearchSourceBuilder;
}

public static SearchSourceBuilder generatePreviewQuery(
AnomalyDetector detector,
List<Entry<Long, Long>> ranges,
NamedXContentRegistry xContentRegistry
) throws IOException {

DateRangeAggregationBuilder dateRangeBuilder = dateRange("date_range").field(detector.getTimeField()).format("epoch_millis");
for (Entry<Long, Long> range : ranges) {
dateRangeBuilder.addRange(range.getKey(), range.getValue());
}

if (detector.getFeatureAttributes() != null) {
for (Feature feature : detector.getFeatureAttributes()) {
AggregatorFactories.Builder internalAgg = parseAggregators(
feature.getAggregation().toString(),
xContentRegistry,
feature.getId()
);
dateRangeBuilder.subAggregation(internalAgg.getAggregatorFactories().iterator().next());
}
}

return new SearchSourceBuilder().query(detector.getFilterQuery()).size(0).aggregation(dateRangeBuilder);
}

public static String generateInternalFeatureQueryTemplate(AnomalyDetector detector, NamedXContentRegistry xContentRegistry)
throws IOException {
SearchSourceBuilder searchSourceBuilder = detector.generateFeatureQuery();
RangeQueryBuilder rangeQuery = new RangeQueryBuilder(detector.getTimeField())
.from("{{" + QUERY_PARAM_PERIOD_START + "}}")
.to("{{" + QUERY_PARAM_PERIOD_END + "}}");

BoolQueryBuilder internalFilterQuery = QueryBuilders.boolQuery().must(rangeQuery).must(searchSourceBuilder.query());
BoolQueryBuilder internalFilterQuery = QueryBuilders.boolQuery().must(rangeQuery).must(detector.getFilterQuery());

SearchSourceBuilder internalSearchSourceBuilder = new SearchSourceBuilder().query(internalFilterQuery);
if (detector.getFeatureAttributes() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ public void setup() throws Exception {
PowerMockito.mockStatic(ParseUtils.class);

Interpolator interpolator = new LinearUniformInterpolator(new SingleFeatureLinearUniformInterpolator());
searchFeatureDao = spy(new SearchFeatureDao(client, scriptService, xContent, interpolator, clientUtil));
searchFeatureDao = spy(new SearchFeatureDao(client, xContent, interpolator, clientUtil));

detectionInterval = new IntervalTimeConfiguration(1, ChronoUnit.MINUTES);
when(detector.getTimeField()).thenReturn("testTimeField");
Expand Down Expand Up @@ -184,8 +184,7 @@ public void setup() throws Exception {
);

multiSearchRequest = new MultiSearchRequest();
SearchRequest request = new SearchRequest(detector.getIndices().toArray(new String[0]))
.preference(SearchFeatureDao.FEATURE_SAMPLE_PREFERENCE);
SearchRequest request = new SearchRequest(detector.getIndices().toArray(new String[0]));
multiSearchRequest.add(request);
doReturn(Optional.of(multiSearchResponse))
.when(clientUtil)
Expand Down