diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorPlugin.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorPlugin.java index 456ca0e9..5f93e936 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorPlugin.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorPlugin.java @@ -245,7 +245,7 @@ public Collection createComponents( SingleFeatureLinearUniformInterpolator singleFeatureLinearUniformInterpolator = new IntegerSensitiveSingleFeatureLinearUniformInterpolator(); Interpolator interpolator = new LinearUniformInterpolator(singleFeatureLinearUniformInterpolator); - SearchFeatureDao searchFeatureDao = new SearchFeatureDao(client, scriptService, xContentRegistry, interpolator, clientUtil); + SearchFeatureDao searchFeatureDao = new SearchFeatureDao(client, xContentRegistry, interpolator, clientUtil); JvmService jvmService = new JvmService(environment.settings()); RandomCutForestSerDe rcfSerde = new RandomCutForestSerDe(); diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorRunner.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorRunner.java index 4d728ad1..5998b08f 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorRunner.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorRunner.java @@ -27,6 +27,7 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; +import java.io.IOException; import java.time.Instant; import java.util.ArrayList; import java.util.Collections; @@ -58,16 +59,8 @@ public AnomalyDetectorRunner(ModelManager modelManager, FeatureManager featureMa * @param endTime detection period end time * @param listener handle anomaly result */ - public void run(AnomalyDetector detector, Instant startTime, Instant endTime, ActionListener> listener) { - executeDetector(detector, startTime, endTime, listener); - } - - private void executeDetector( - AnomalyDetector detector, - Instant startTime, - Instant endTime, - ActionListener> listener - ) { + public void executeDetector(AnomalyDetector detector, Instant startTime, Instant endTime, ActionListener> listener) + throws IOException { featureManager.getPreviewFeatures(detector, startTime.toEpochMilli(), endTime.toEpochMilli(), ActionListener.wrap(features -> { try { List results = modelManager.getPreviewResults(features.getProcessedFeatures()); diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/feature/FeatureManager.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/feature/FeatureManager.java index c1b60bd6..538a4fec 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/feature/FeatureManager.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/feature/FeatureManager.java @@ -15,6 +15,7 @@ package com.amazon.opendistroforelasticsearch.ad.feature; +import java.io.IOException; import java.time.Clock; import java.time.Duration; import java.time.Instant; @@ -350,8 +351,10 @@ public Features getPreviewFeatures(AnomalyDetector detector, long startMilli, lo * @param listener onResponse is called with time ranges, unprocessed features, * and processed features of the data points from the period * onFailure is called with IllegalArgumentException when there is no data to preview + * @throws IOException if a user gives wrong query input when defining a detector */ - public void getPreviewFeatures(AnomalyDetector detector, long startMilli, long endMilli, ActionListener listener) { + public void getPreviewFeatures(AnomalyDetector detector, long startMilli, long endMilli, ActionListener listener) + throws IOException { Entry>, Integer> sampleRangeResults = getSampleRanges(detector, startMilli, endMilli); List> sampleRanges = sampleRangeResults.getKey(); int stride = sampleRangeResults.getValue(); @@ -418,12 +421,13 @@ private Entry>, double[][]> getSamplesForRanges(AnomalyDe * Gets search results for the sampled time ranges. * * @param listener handle search results map: key is time ranges, value is corresponding search results + * @throws IOException if a user gives wrong query input when defining a detector */ void getSamplesForRanges( AnomalyDetector detector, List> sampleRanges, ActionListener>, double[][]>> listener - ) { + ) throws IOException { searchFeatureDao.getFeatureSamplesForPeriods(detector, sampleRanges, ActionListener.wrap(featureSamples -> { List> ranges = new ArrayList<>(featureSamples.size()); List samples = new ArrayList<>(featureSamples.size()); diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/feature/SearchFeatureDao.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/feature/SearchFeatureDao.java index ce030865..58e82785 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/feature/SearchFeatureDao.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/feature/SearchFeatureDao.java @@ -17,6 +17,7 @@ import java.io.IOException; import java.util.Arrays; +import java.util.Collections; import java.util.AbstractMap.SimpleEntry; import java.util.ArrayDeque; import java.util.HashMap; @@ -41,10 +42,11 @@ import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.Client; import org.elasticsearch.search.aggregations.AggregationBuilders; +import org.elasticsearch.search.aggregations.Aggregations; +import org.elasticsearch.search.aggregations.bucket.range.InternalDateRange; import org.elasticsearch.search.aggregations.metrics.Max; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.common.xcontent.NamedXContentRegistry; -import org.elasticsearch.script.ScriptService; import org.elasticsearch.search.aggregations.Aggregation; import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregation.SingleValue; import org.elasticsearch.search.aggregations.metrics.InternalTDigestPercentiles; @@ -60,13 +62,11 @@ public class SearchFeatureDao { protected static final String AGG_NAME_MAX = "max_timefield"; - protected static final String FEATURE_SAMPLE_PREFERENCE = "_shards:0"; private static final Logger logger = LogManager.getLogger(SearchFeatureDao.class); // Dependencies private final Client client; - private final ScriptService scriptService; private final NamedXContentRegistry xContent; private final Interpolator interpolator; private final ClientUtil clientUtil; @@ -75,20 +75,12 @@ public class SearchFeatureDao { * Constructor injection. * * @param client ES client for queries - * @param scriptService ES ScriptService * @param xContent ES XContentRegistry * @param interpolator interpolator for missing values * @param clientUtil utility for ES client */ - public SearchFeatureDao( - Client client, - ScriptService scriptService, - NamedXContentRegistry xContent, - Interpolator interpolator, - ClientUtil clientUtil - ) { + public SearchFeatureDao(Client client, NamedXContentRegistry xContent, Interpolator interpolator, ClientUtil clientUtil) { this.client = client; - this.scriptService = scriptService; this.xContent = xContent; this.interpolator = interpolator; this.clientUtil = clientUtil; @@ -157,18 +149,10 @@ public void getFeaturesForPeriod(AnomalyDetector detector, long startTime, long } private Optional parseResponse(SearchResponse response, List featureIds) { - return Optional - .ofNullable(response) - .filter(resp -> response.getHits().getTotalHits().value > 0L) - .map(resp -> resp.getAggregations()) - .map(aggs -> aggs.asMap()) - .map( - map -> featureIds - .stream() - .mapToDouble(id -> Optional.ofNullable(map.get(id)).map(this::parseAggregation).orElse(Double.NaN)) - .toArray() - ) - .filter(result -> Arrays.stream(result).noneMatch(d -> Double.isNaN(d) || Double.isInfinite(d))); + return parseAggregations( + Optional.ofNullable(response).filter(resp -> response.getHits().getTotalHits().value > 0L).map(resp -> resp.getAggregations()), + featureIds + ); } private double parseAggregation(Aggregation aggregation) { @@ -198,7 +182,7 @@ public List> getFeatureSamplesForPeriods(AnomalyDetector dete MultiSearchRequest multiSearchRequest = new MultiSearchRequest(); ranges .stream() - .map(range -> createFeatureSearchRequest(detector, range.getKey(), range.getValue(), Optional.of(FEATURE_SAMPLE_PREFERENCE))) + .map(range -> createFeatureSearchRequest(detector, range.getKey(), range.getValue(), Optional.empty())) .forEachOrdered(request -> multiSearchRequest.add(request)); return clientUtil @@ -222,40 +206,33 @@ public List> getFeatureSamplesForPeriods(AnomalyDetector dete * @param detector info about the indices, documents, feature query * @param ranges list of time ranges * @param listener handle approximate features for the time ranges + * @throws IOException if a user gives wrong query input when defining a detector */ public void getFeatureSamplesForPeriods( AnomalyDetector detector, List> ranges, ActionListener>> listener - ) { - MultiSearchRequest multiSearchRequest = new MultiSearchRequest(); - ranges - .stream() - .map(range -> createFeatureSearchRequest(detector, range.getKey(), range.getValue(), Optional.of(FEATURE_SAMPLE_PREFERENCE))) - .forEachOrdered(request -> multiSearchRequest.add(request)); + ) throws IOException { + SearchRequest request = createPreviewSearchRequest(detector, ranges); - client - .multiSearch( - multiSearchRequest, - ActionListener - .wrap( - response -> listener - .onResponse( - Optional - .of(response) - .map(Stream::of) - .orElseGet(Stream::empty) - .flatMap(multiSearchResp -> Arrays.stream(multiSearchResp.getResponses())) - .map( - item -> Optional - .ofNullable(item.getResponse()) - .flatMap(r -> parseResponse(r, detector.getEnabledFeatureIds())) - ) - .collect(Collectors.toList()) - ), - listener::onFailure - ) - ); + client.search(request, ActionListener.wrap(response -> { + Aggregations aggs = response.getAggregations(); + if (aggs == null) { + listener.onResponse(Collections.emptyList()); + return; + } + + listener + .onResponse( + aggs + .asList() + .stream() + .filter(InternalDateRange.class::isInstance) + .flatMap(agg -> ((InternalDateRange) agg).getBuckets().stream()) + .map(bucket -> parseBucket(bucket, detector.getEnabledFeatureIds())) + .collect(Collectors.toList()) + ); + }, listener::onFailure)); } /** @@ -353,8 +330,38 @@ private SearchRequest createFeatureSearchRequest(AnomalyDetector detector, long SearchSourceBuilder searchSourceBuilder = ParseUtils.generateInternalFeatureQuery(detector, startTime, endTime, xContent); return new SearchRequest(detector.getIndices().toArray(new String[0]), searchSourceBuilder).preference(preference.orElse(null)); } catch (IOException e) { - logger.warn("Failed to create feature search request for " + detector + " from " + startTime + " to " + endTime, e); + logger + .warn( + "Failed to create feature search request for " + detector.getDetectorId() + " from " + startTime + " to " + endTime, + e + ); throw new IllegalStateException(e); } } + + private SearchRequest createPreviewSearchRequest(AnomalyDetector detector, List> ranges) throws IOException { + try { + SearchSourceBuilder searchSourceBuilder = ParseUtils.generatePreviewQuery(detector, ranges, xContent); + return new SearchRequest(detector.getIndices().toArray(new String[0]), searchSourceBuilder); + } catch (IOException e) { + logger.warn("Failed to create feature search request for " + detector.getDetectorId() + " for preview", e); + throw e; + } + } + + private Optional parseBucket(InternalDateRange.Bucket bucket, List featureIds) { + return parseAggregations(Optional.ofNullable(bucket).map(b -> b.getAggregations()), featureIds); + } + + private Optional parseAggregations(Optional aggregations, List featureIds) { + return aggregations + .map(aggs -> aggs.asMap()) + .map( + map -> featureIds + .stream() + .mapToDouble(id -> Optional.ofNullable(map.get(id)).map(this::parseAggregation).orElse(Double.NaN)) + .toArray() + ) + .filter(result -> Arrays.stream(result).noneMatch(d -> Double.isNaN(d) || Double.isInfinite(d))); + } } diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/RestExecuteAnomalyDetectorAction.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/RestExecuteAnomalyDetectorAction.java index 50b1f815..ebbb4a41 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/RestExecuteAnomalyDetectorAction.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/RestExecuteAnomalyDetectorAction.java @@ -184,23 +184,24 @@ protected void processResponse(GetResponse response) throws Exception { ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser::getTokenLocation); AnomalyDetector detector = AnomalyDetector.parse(parser, response.getId(), response.getVersion()); - anomalyDetectorRunner.run(detector, input.getPeriodStart(), input.getPeriodEnd(), ActionListener.wrap(anomalyResult -> { - XContentBuilder builder = channel - .newBuilder() - .startObject() - .field(ANOMALY_RESULT, anomalyResult) - .field(ANOMALY_DETECTOR, detector) - .endObject(); - channel.sendResponse(new BytesRestResponse(RestStatus.OK, builder)); - }, exception -> { - logger.error("Unexpected error running anomaly detector " + detector.getDetectorId(), exception); - try { - XContentBuilder builder = channel.newBuilder().startObject().field(ANOMALY_DETECTOR, detector).endObject(); + anomalyDetectorRunner + .executeDetector(detector, input.getPeriodStart(), input.getPeriodEnd(), ActionListener.wrap(anomalyResult -> { + XContentBuilder builder = channel + .newBuilder() + .startObject() + .field(ANOMALY_RESULT, anomalyResult) + .field(ANOMALY_DETECTOR, detector) + .endObject(); channel.sendResponse(new BytesRestResponse(RestStatus.OK, builder)); - } catch (IOException e) { - logger.error("Fail to send back exception message" + detector.getDetectorId(), exception); - } - })); + }, exception -> { + logger.error("Unexpected error running anomaly detector " + detector.getDetectorId(), exception); + try { + XContentBuilder builder = channel.newBuilder().startObject().field(ANOMALY_DETECTOR, detector).endObject(); + channel.sendResponse(new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, builder)); + } catch (IOException e) { + logger.error("Fail to send back exception message" + detector.getDetectorId(), exception); + } + })); } }; } diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/ParseUtils.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/ParseUtils.java index c63fd1d5..5e2edc11 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/ParseUtils.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/ParseUtils.java @@ -29,15 +29,19 @@ import org.elasticsearch.search.aggregations.AggregatorFactories; import org.elasticsearch.search.aggregations.BaseAggregationBuilder; import org.elasticsearch.search.aggregations.PipelineAggregationBuilder; +import org.elasticsearch.search.aggregations.bucket.range.DateRangeAggregationBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder; import java.io.IOException; import java.time.Instant; +import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.regex.Matcher; import static com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector.QUERY_PARAM_PERIOD_END; import static com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector.QUERY_PARAM_PERIOD_START; +import static org.elasticsearch.search.aggregations.AggregationBuilders.dateRange; import static org.elasticsearch.search.aggregations.AggregatorFactories.VALID_AGG_NAME; /** @@ -270,7 +274,6 @@ public static SearchSourceBuilder generateInternalFeatureQuery( long endTime, NamedXContentRegistry xContentRegistry ) throws IOException { - SearchSourceBuilder searchSourceBuilder = detector.generateFeatureQuery(); RangeQueryBuilder rangeQuery = new RangeQueryBuilder(detector.getTimeField()) .from(startTime) .to(endTime) @@ -278,7 +281,7 @@ public static SearchSourceBuilder generateInternalFeatureQuery( .includeLower(true) .includeUpper(false); - BoolQueryBuilder internalFilterQuery = QueryBuilders.boolQuery().must(rangeQuery).must(searchSourceBuilder.query()); + BoolQueryBuilder internalFilterQuery = QueryBuilders.boolQuery().must(rangeQuery).must(detector.getFilterQuery()); SearchSourceBuilder internalSearchSourceBuilder = new SearchSourceBuilder().query(internalFilterQuery); if (detector.getFeatureAttributes() != null) { @@ -295,14 +298,38 @@ public static SearchSourceBuilder generateInternalFeatureQuery( return internalSearchSourceBuilder; } + public static SearchSourceBuilder generatePreviewQuery( + AnomalyDetector detector, + List> ranges, + NamedXContentRegistry xContentRegistry + ) throws IOException { + + DateRangeAggregationBuilder dateRangeBuilder = dateRange("date_range").field(detector.getTimeField()).format("epoch_millis"); + for (Entry range : ranges) { + dateRangeBuilder.addRange(range.getKey(), range.getValue()); + } + + if (detector.getFeatureAttributes() != null) { + for (Feature feature : detector.getFeatureAttributes()) { + AggregatorFactories.Builder internalAgg = parseAggregators( + feature.getAggregation().toString(), + xContentRegistry, + feature.getId() + ); + dateRangeBuilder.subAggregation(internalAgg.getAggregatorFactories().iterator().next()); + } + } + + return new SearchSourceBuilder().query(detector.getFilterQuery()).size(0).aggregation(dateRangeBuilder); + } + public static String generateInternalFeatureQueryTemplate(AnomalyDetector detector, NamedXContentRegistry xContentRegistry) throws IOException { - SearchSourceBuilder searchSourceBuilder = detector.generateFeatureQuery(); RangeQueryBuilder rangeQuery = new RangeQueryBuilder(detector.getTimeField()) .from("{{" + QUERY_PARAM_PERIOD_START + "}}") .to("{{" + QUERY_PARAM_PERIOD_END + "}}"); - BoolQueryBuilder internalFilterQuery = QueryBuilders.boolQuery().must(rangeQuery).must(searchSourceBuilder.query()); + BoolQueryBuilder internalFilterQuery = QueryBuilders.boolQuery().must(rangeQuery).must(detector.getFilterQuery()); SearchSourceBuilder internalSearchSourceBuilder = new SearchSourceBuilder().query(internalFilterQuery); if (detector.getFeatureAttributes() != null) { diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/feature/FeatureManagerTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/feature/FeatureManagerTests.java index bc8865cd..b3f7a6a2 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/feature/FeatureManagerTests.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/feature/FeatureManagerTests.java @@ -15,6 +15,7 @@ package com.amazon.opendistroforelasticsearch.ad.feature; +import java.io.IOException; import java.time.Clock; import java.time.Duration; import java.time.Instant; @@ -409,7 +410,8 @@ public void getPreviewFeatures_returnExpected() { } @SuppressWarnings("unchecked") - private void getPreviewFeaturesTemplate(List> samplesResults, boolean querySuccess, boolean previewSuccess) { + private void getPreviewFeaturesTemplate(List> samplesResults, boolean querySuccess, boolean previewSuccess) + throws IOException { long start = 0L; long end = 240_000L; IntervalTimeConfiguration detectionInterval = new IntervalTimeConfiguration(1, ChronoUnit.MINUTES); @@ -458,17 +460,17 @@ private void getPreviewFeaturesTemplate(List> samplesResults, } @Test - public void getPreviewFeatures_returnExpectedToListener() { + public void getPreviewFeatures_returnExpectedToListener() throws IOException { getPreviewFeaturesTemplate(asList(Optional.of(new double[] { 1 }), Optional.of(new double[] { 3 })), true, true); } @Test - public void getPreviewFeatures_returnExceptionToListener_whenNoDataToPreview() { + public void getPreviewFeatures_returnExceptionToListener_whenNoDataToPreview() throws IOException { getPreviewFeaturesTemplate(asList(), true, false); } @Test - public void getPreviewFeatures_returnExceptionToListener_whenQueryFail() { + public void getPreviewFeatures_returnExceptionToListener_whenQueryFail() throws IOException { getPreviewFeaturesTemplate(asList(Optional.of(new double[] { 1 }), Optional.of(new double[] { 3 })), false, false); } } diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/feature/SearchFeatureDaoTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/feature/SearchFeatureDaoTests.java index 593d290c..a9232b37 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/feature/SearchFeatureDaoTests.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/feature/SearchFeatureDaoTests.java @@ -153,7 +153,7 @@ public void setup() throws Exception { PowerMockito.mockStatic(ParseUtils.class); Interpolator interpolator = new LinearUniformInterpolator(new SingleFeatureLinearUniformInterpolator()); - searchFeatureDao = spy(new SearchFeatureDao(client, scriptService, xContent, interpolator, clientUtil)); + searchFeatureDao = spy(new SearchFeatureDao(client, xContent, interpolator, clientUtil)); detectionInterval = new IntervalTimeConfiguration(1, ChronoUnit.MINUTES); when(detector.getTimeField()).thenReturn("testTimeField"); @@ -190,8 +190,7 @@ public void setup() throws Exception { ); multiSearchRequest = new MultiSearchRequest(); - SearchRequest request = new SearchRequest(detector.getIndices().toArray(new String[0])) - .preference(SearchFeatureDao.FEATURE_SAMPLE_PREFERENCE); + SearchRequest request = new SearchRequest(detector.getIndices().toArray(new String[0])); multiSearchRequest.add(request); doReturn(Optional.of(multiSearchResponse)) .when(clientUtil)