diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/feature/FeatureManager.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/feature/FeatureManager.java index 6aef9e4b..13e1422c 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/feature/FeatureManager.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/feature/FeatureManager.java @@ -36,6 +36,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; import java.util.stream.IntStream; +import java.util.stream.LongStream; import java.util.stream.Stream; import org.apache.logging.log4j.LogManager; @@ -56,7 +57,7 @@ public class FeatureManager { private static final Logger logger = LogManager.getLogger(FeatureManager.class); // Each anomaly detector has a queue of data points with timestamps (in epoch milliseconds). - private final Map>> detectorIdsToTimeShingles; + private final Map>>> detectorIdsToTimeShingles; private final SearchFeatureDao searchFeatureDao; private final Interpolator interpolator; @@ -124,6 +125,13 @@ public FeatureManager( /** * Returns to listener unprocessed features and processed features (such as shingle) for the current data point. + * The listener's onFailure is called with EndRunException on feature query creation errors. + * + * This method sends a single query for historical data for data points (including the current point) that are missing + * from the shingle, and updates the shingle which is persisted to future calls to this method for subsequent time + * intervals. To allow for time variations/delays, an interval is considered missing from the shingle if no data point + * is found within half an interval away. See doc for updateUnprocessedFeatures for details on how the shingle is + * updated. * * @param detector anomaly detector for which the features are returned * @param startTime start time of the data point in epoch milliseconds @@ -132,74 +140,113 @@ public FeatureManager( */ public void getCurrentFeatures(AnomalyDetector detector, long startTime, long endTime, ActionListener listener) { - Deque> shingle = detectorIdsToTimeShingles - .computeIfAbsent(detector.getDetectorId(), id -> new ArrayDeque>(shingleSize)); - if (shingle.isEmpty() || shingle.getLast().getKey() < endTime) { - searchFeatureDao - .getFeaturesForPeriod( - detector, - startTime, - endTime, - ActionListener - .wrap(point -> updateUnprocessedFeatures(point, shingle, detector, endTime, listener), listener::onFailure) - ); + Deque>> shingle = detectorIdsToTimeShingles + .computeIfAbsent(detector.getDetectorId(), id -> new ArrayDeque<>(shingleSize)); + + // To allow for small time variations/delays in running the detector. + long maxTimeDifference = getDetectorIntervalInMilliseconds(detector) / 2; + Map>> featuresMap = getNearbyPointsForShingle(detector, shingle, endTime, maxTimeDifference) + .collect(Collectors.toMap(Entry::getKey, Entry::getValue)); + + List> missingRanges = getMissingRangesInShingle(detector, featuresMap, endTime); + + if (missingRanges.size() > 0) { + try { + searchFeatureDao.getFeatureSamplesForPeriods(detector, missingRanges, ActionListener.wrap(points -> { + for (int i = 0; i < points.size(); i++) { + Optional point = points.get(i); + long rangeEndTime = missingRanges.get(i).getValue(); + featuresMap.put(rangeEndTime, new SimpleImmutableEntry<>(rangeEndTime, point)); + } + updateUnprocessedFeatures(detector, shingle, featuresMap, endTime, listener); + }, listener::onFailure)); + } catch (IOException e) { + listener.onFailure(new EndRunException(detector.getDetectorId(), CommonErrorMessages.INVALID_SEARCH_QUERY_MSG, e, true)); + } } else { getProcessedFeatures(shingle, detector, endTime, listener); } } + private List> getMissingRangesInShingle( + AnomalyDetector detector, + Map>> featuresMap, + long endTime + ) { + long intervalMilli = getDetectorIntervalInMilliseconds(detector); + + return getFullShingleEndTimes(endTime, intervalMilli) + .filter(time -> !featuresMap.containsKey(time)) + .mapToObj(time -> new SimpleImmutableEntry<>(time - intervalMilli, time)) + .collect(Collectors.toList()); + } + + /** + * Updates the shingle to contain one Optional data point for each of shingleSize consecutive time intervals, ending + * with the current interval. Each entry in the shingle contains the timestamp of the data point as the key, and the + * data point wrapped in an Optional. If the data point is missing (even after querying, since this method is invoked + * after querying), an entry with an empty Optional value is stored in the shingle to prevent subsequent calls to + * getCurrentFeatures from re-querying the missing data point again. + * + * Note that in the presence of time variations/delays up to half an interval, the shingle stores the actual original + * end times of the data points, not the computed end times that were calculated based on the current endTime. + * Ex: if data points are queried at times 100, 201, 299, then the shingle will contain [100: x, 201: y, 299: z]. + * + * @param detector anomaly detector for which the features are returned. + * @param shingle buffer which persists the past shingleSize data points to subsequent calls of getCurrentFeature. + * Each entry contains the timestamp of the data point and an optional data point value. + * @param featuresMap A map where the keys are the computed millisecond timestamps associated with intervals in the + * shingle, and the values are entries that contain the actual timestamp of the data point and + * an optional data point value. + * @param listener onResponse is called with unprocessed features and processed features for the current data point. + */ private void updateUnprocessedFeatures( - Optional point, - Deque> shingle, AnomalyDetector detector, + Deque>> shingle, + Map>> featuresMap, long endTime, ActionListener listener ) { - if (point.isPresent()) { - if (shingle.size() == shingleSize) { - shingle.remove(); - } - shingle.add(new SimpleImmutableEntry<>(endTime, point.get())); - getProcessedFeatures(shingle, detector, endTime, listener); - } else { - listener.onResponse(new SinglePointFeatures(Optional.empty(), Optional.empty())); - } + shingle.clear(); + getFullShingleEndTimes(endTime, getDetectorIntervalInMilliseconds(detector)) + .mapToObj(time -> featuresMap.getOrDefault(time, new SimpleImmutableEntry<>(time, Optional.empty()))) + .forEach(e -> shingle.add(e)); + + getProcessedFeatures(shingle, detector, endTime, listener); } private void getProcessedFeatures( - Deque> shingle, + Deque>> shingle, AnomalyDetector detector, long endTime, ActionListener listener ) { - - double[][] currentPoints = filterAndFill(shingle, endTime, detector); - Optional currentPoint = Optional.ofNullable(shingle.peekLast()).map(Entry::getValue); + Optional currentPoint = shingle.peekLast().getValue(); listener .onResponse( - Optional - .ofNullable(currentPoints) - .map(points -> new SinglePointFeatures(currentPoint, Optional.of(batchShingle(points, shingleSize)[0]))) - .orElse(new SinglePointFeatures(currentPoint, Optional.empty())) + new SinglePointFeatures( + currentPoint, + Optional + .ofNullable(currentPoint.isPresent() ? filterAndFill(shingle, endTime, detector) : null) + .map(points -> batchShingle(points, shingleSize)[0]) + ) ); } - private double[][] filterAndFill(Deque> shingle, long endTime, AnomalyDetector detector) { - long intervalMilli = ((IntervalTimeConfiguration) detector.getDetectionInterval()).toDuration().toMillis(); + private double[][] filterAndFill(Deque>> shingle, long endTime, AnomalyDetector detector) { + Deque>> filteredShingle = shingle + .stream() + .filter(e -> e.getValue().isPresent()) + .collect(Collectors.toCollection(ArrayDeque::new)); double[][] result = null; - if (shingle.size() >= shingleSize - maxMissingPoints) { - TreeMap search = new TreeMap<>(shingle.stream().collect(Collectors.toMap(Entry::getKey, Entry::getValue))); - result = IntStream.rangeClosed(1, shingleSize).mapToLong(i -> endTime - (shingleSize - i) * intervalMilli).mapToObj(t -> { - Optional> after = Optional.ofNullable(search.ceilingEntry(t)); - Optional> before = Optional.ofNullable(search.floorEntry(t)); - return after - .filter(a -> Math.abs(t - a.getKey()) <= before.map(b -> Math.abs(t - b.getKey())).orElse(Long.MAX_VALUE)) - .map(Optional::of) - .orElse(before) - .filter(e -> Math.abs(t - e.getKey()) < intervalMilli * maxNeighborDistance) - .map(Entry::getValue) - .orElse(null); - }).filter(d -> d != null).toArray(double[][]::new); + if (filteredShingle.size() >= shingleSize - maxMissingPoints) { + // Imputes missing data points with the values of neighboring data points. + long maxMillisecondsDifference = maxNeighborDistance * getDetectorIntervalInMilliseconds(detector); + result = getNearbyPointsForShingle(detector, filteredShingle, endTime, maxMillisecondsDifference) + .map(e -> e.getValue().getValue().orElse(null)) + .filter(d -> d != null) + .toArray(double[][]::new); + if (result.length < shingleSize) { result = null; } @@ -207,6 +254,46 @@ private double[][] filterAndFill(Deque> shingle, long endT return result; } + /** + * Helper method that associates data points (along with their actual timestamps) to the intervals of a full shingle. + * + * Depending on the timestamp tolerance (maxMillisecondsDifference), this can be used to allow for small time + * variations/delays in running the detector, or used for imputing missing points in the shingle with neighboring points. + * + * @return A stream of entries, where the key is the computed millisecond timestamp associated with an interval in + * the shingle, and the value is an entry that contains the actual timestamp of the data point and an optional data + * point value. + */ + private Stream>>> getNearbyPointsForShingle( + AnomalyDetector detector, + Deque>> shingle, + long endTime, + long maxMillisecondsDifference + ) { + long intervalMilli = getDetectorIntervalInMilliseconds(detector); + TreeMap> search = new TreeMap<>( + shingle.stream().collect(Collectors.toMap(Entry::getKey, Entry::getValue)) + ); + return getFullShingleEndTimes(endTime, intervalMilli).mapToObj(t -> { + Optional>> after = Optional.ofNullable(search.ceilingEntry(t)); + Optional>> before = Optional.ofNullable(search.floorEntry(t)); + return after + .filter(a -> Math.abs(t - a.getKey()) <= before.map(b -> Math.abs(t - b.getKey())).orElse(Long.MAX_VALUE)) + .map(Optional::of) + .orElse(before) + .filter(e -> Math.abs(t - e.getKey()) < maxMillisecondsDifference) + .map(e -> new SimpleImmutableEntry<>(t, e)); + }).filter(Optional::isPresent).map(Optional::get); + } + + private long getDetectorIntervalInMilliseconds(AnomalyDetector detector) { + return ((IntervalTimeConfiguration) detector.getDetectionInterval()).toDuration().toMillis(); + } + + private LongStream getFullShingleEndTimes(long endTime, long intervalMilli) { + return LongStream.rangeClosed(1, shingleSize).map(i -> endTime - (shingleSize - i) * intervalMilli); + } + /** * Provides data for cold-start training. * @@ -423,7 +510,7 @@ public void getPreviewFeatures(AnomalyDetector detector, long startMilli, long e private Entry>, Integer> getSampleRanges(AnomalyDetector detector, long startMilli, long endMilli) { long start = truncateToMinute(startMilli); long end = truncateToMinute(endMilli); - long bucketSize = ((IntervalTimeConfiguration) detector.getDetectionInterval()).toDuration().toMillis(); + long bucketSize = getDetectorIntervalInMilliseconds(detector); int numBuckets = (int) Math.floor((end - start) / (double) bucketSize); int numSamples = (int) Math.max(Math.min(numBuckets * previewSampleRate, maxPreviewSamples), 1); int stride = (int) Math.max(1, Math.floor((double) numBuckets / numSamples)); @@ -511,9 +598,9 @@ private long truncateToMinute(long epochMillis) { } public int getShingleSize(String detectorId) { - Deque> shingle = detectorIdsToTimeShingles.get(detectorId); + Deque>> shingle = detectorIdsToTimeShingles.get(detectorId); if (shingle != null) { - return shingle.size(); + return Math.toIntExact(shingle.stream().filter(entry -> entry.getValue().isPresent()).count()); } else { return -1; } diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/feature/FeatureManagerTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/feature/FeatureManagerTests.java index e7533a16..d3f83fc9 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/feature/FeatureManagerTests.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/feature/FeatureManagerTests.java @@ -18,6 +18,7 @@ import static java.util.Arrays.asList; import static java.util.Optional.empty; import static java.util.Optional.ofNullable; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; @@ -28,6 +29,7 @@ import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -41,6 +43,7 @@ import java.util.List; import java.util.Map.Entry; import java.util.Optional; +import java.util.concurrent.atomic.AtomicBoolean; import junitparams.JUnitParamsRunner; import junitparams.Parameters; @@ -77,6 +80,7 @@ public class FeatureManagerTests { private double previewSampleRate; private int maxPreviewSamples; private Duration featureBufferTtl; + private long intervalInMilliseconds; @Mock private AnomalyDetector detector; @@ -111,7 +115,9 @@ public void setup() { featureBufferTtl = Duration.ofMillis(1_000L); when(detector.getDetectorId()).thenReturn("id"); - when(detector.getDetectionInterval()).thenReturn(new IntervalTimeConfiguration(1, ChronoUnit.MINUTES)); + IntervalTimeConfiguration detectorIntervalTimeConfig = new IntervalTimeConfiguration(1, ChronoUnit.MINUTES); + when(detector.getDetectionInterval()).thenReturn(detectorIntervalTimeConfig); + intervalInMilliseconds = detectorIntervalTimeConfig.toDuration().toMillis(); Interpolator interpolator = new LinearUniformInterpolator(new SingleFeatureLinearUniformInterpolator()); this.featureManager = spy( @@ -305,20 +311,25 @@ public void batchShingle_throwExpected_forInvalidInput(double[][] points, int sh } @Test - public void clear_deleteFeatures() { - long start = 0; - long end = 0; - for (int i = 1; i <= shingleSize; i++) { - start = i * 10; - end = (i + 1) * 10; + public void clear_deleteFeatures() throws IOException { + long start = shingleSize * intervalInMilliseconds; + long end = (shingleSize + 1) * intervalInMilliseconds; + + AtomicBoolean firstQuery = new AtomicBoolean(true); + + doAnswer(invocation -> { + ActionListener>> daoListener = invocation.getArgument(2); + if (firstQuery.get()) { + firstQuery.set(false); + daoListener + .onResponse(asList(Optional.of(new double[] { 3 }), Optional.of(new double[] { 2 }), Optional.of(new double[] { 1 }))); + } else { + daoListener.onResponse(asList(Optional.ofNullable(null), Optional.ofNullable(null), Optional.of(new double[] { 1 }))); + } + return null; + }).when(searchFeatureDao).getFeatureSamplesForPeriods(eq(detector), any(List.class), any(ActionListener.class)); + featureManager.getCurrentFeatures(detector, start, end, mock(ActionListener.class)); - doAnswer(invocation -> { - ActionListener> daoListener = invocation.getArgument(3); - daoListener.onResponse(Optional.of(new double[] { 1 })); - return null; - }).when(searchFeatureDao).getFeaturesForPeriod(eq(detector), eq(start), eq(end), any(ActionListener.class)); - featureManager.getCurrentFeatures(detector, start, end, mock(ActionListener.class)); - } SinglePointFeatures beforeMaintenance = getCurrentFeatures(detector, start, end); assertTrue(beforeMaintenance.getUnprocessedFeatures().isPresent()); assertTrue(beforeMaintenance.getProcessedFeatures().isPresent()); @@ -330,7 +341,7 @@ public void clear_deleteFeatures() { assertFalse(afterMaintenance.getProcessedFeatures().isPresent()); } - private SinglePointFeatures getCurrentFeatures(AnomalyDetector detector, long start, long end) { + private SinglePointFeatures getCurrentFeatures(AnomalyDetector detector, long start, long end) throws IOException { ActionListener listener = mock(ActionListener.class); ArgumentCaptor captor = ArgumentCaptor.forClass(SinglePointFeatures.class); featureManager.getCurrentFeatures(detector, start, end, listener); @@ -339,20 +350,25 @@ private SinglePointFeatures getCurrentFeatures(AnomalyDetector detector, long st } @Test - public void maintenance_removeStaleData() { - long start = 0; - long end = 0; - for (int i = 1; i <= shingleSize; i++) { - start = i * 10; - end = (i + 1) * 10; + public void maintenance_removeStaleData() throws IOException { + long start = shingleSize * intervalInMilliseconds; + long end = (shingleSize + 1) * intervalInMilliseconds; + + AtomicBoolean firstQuery = new AtomicBoolean(true); + + doAnswer(invocation -> { + ActionListener>> daoListener = invocation.getArgument(2); + if (firstQuery.get()) { + firstQuery.set(false); + daoListener + .onResponse(asList(Optional.of(new double[] { 3 }), Optional.of(new double[] { 2 }), Optional.of(new double[] { 1 }))); + } else { + daoListener.onResponse(asList(Optional.ofNullable(null), Optional.ofNullable(null), Optional.of(new double[] { 1 }))); + } + return null; + }).when(searchFeatureDao).getFeatureSamplesForPeriods(eq(detector), any(List.class), any(ActionListener.class)); + featureManager.getCurrentFeatures(detector, start, end, mock(ActionListener.class)); - doAnswer(invocation -> { - ActionListener> daoListener = invocation.getArgument(3); - daoListener.onResponse(Optional.of(new double[] { 1 })); - return null; - }).when(searchFeatureDao).getFeaturesForPeriod(eq(detector), eq(start), eq(end), any(ActionListener.class)); - featureManager.getCurrentFeatures(detector, start, end, mock(ActionListener.class)); - } SinglePointFeatures beforeMaintenance = getCurrentFeatures(detector, start, end); assertTrue(beforeMaintenance.getUnprocessedFeatures().isPresent()); assertTrue(beforeMaintenance.getProcessedFeatures().isPresent()); @@ -366,21 +382,25 @@ public void maintenance_removeStaleData() { } @Test - public void maintenance_keepRecentData() { - long start = 0; - long end = 0; - for (int i = 1; i <= shingleSize; i++) { - start = i * 10; - end = (i + 1) * 10; + public void maintenance_keepRecentData() throws IOException { + long start = shingleSize * intervalInMilliseconds; + long end = (shingleSize + 1) * intervalInMilliseconds; + + AtomicBoolean firstQuery = new AtomicBoolean(true); + + doAnswer(invocation -> { + ActionListener>> daoListener = invocation.getArgument(2); + if (firstQuery.get()) { + firstQuery.set(false); + daoListener + .onResponse(asList(Optional.of(new double[] { 3 }), Optional.of(new double[] { 2 }), Optional.of(new double[] { 1 }))); + } else { + daoListener.onResponse(asList(Optional.ofNullable(null), Optional.ofNullable(null), Optional.of(new double[] { 1 }))); + } + return null; + }).when(searchFeatureDao).getFeatureSamplesForPeriods(eq(detector), any(List.class), any(ActionListener.class)); + featureManager.getCurrentFeatures(detector, start, end, mock(ActionListener.class)); - doAnswer(invocation -> { - ActionListener> daoListener = invocation.getArgument(3); - daoListener.onResponse(Optional.of(new double[] { 1 })); - return null; - }).when(searchFeatureDao).getFeaturesForPeriod(eq(detector), eq(start), eq(end), any(ActionListener.class)); - when(searchFeatureDao.getFeaturesForPeriod(detector, start, end)).thenReturn(Optional.of(new double[] { i })); - featureManager.getCurrentFeatures(detector, start, end, mock(ActionListener.class)); - } SinglePointFeatures beforeMaintenance = getCurrentFeatures(detector, start, end); assertTrue(beforeMaintenance.getUnprocessedFeatures().isPresent()); assertTrue(beforeMaintenance.getProcessedFeatures().isPresent()); @@ -465,6 +485,405 @@ public void getPreviewFeatures_returnExceptionToListener_whenQueryFail() throws getPreviewFeaturesTemplate(asList(Optional.of(new double[] { 1 }), Optional.of(new double[] { 3 })), false, false); } + private void setupSearchFeatureDaoForGetCurrentFeatures( + List> preQueryResponse, + Optional>> testQueryResponse + ) throws IOException { + AtomicBoolean isPreQuery = new AtomicBoolean(true); + + doAnswer(invocation -> { + ActionListener>> daoListener = invocation.getArgument(2); + if (isPreQuery.get()) { + isPreQuery.set(false); + daoListener.onResponse(preQueryResponse); + } else { + if (testQueryResponse.isPresent()) { + daoListener.onResponse(testQueryResponse.get()); + } else { + daoListener.onFailure(new IOException()); + } + } + return null; + }).when(searchFeatureDao).getFeatureSamplesForPeriods(eq(detector), any(List.class), any(ActionListener.class)); + } + + private Object[] getCurrentFeaturesTestData_whenAfterQueryResultsFormFullShingle() { + return new Object[] { + new Object[] { + asList(Optional.empty(), Optional.empty(), Optional.empty()), + 3, + Optional.of(asList(Optional.of(new double[] { 1 }), Optional.of(new double[] { 2 }), Optional.of(new double[] { 3 }))), + new double[] { 1, 2, 3 } }, + new Object[] { + asList(Optional.empty(), Optional.of(new double[] { 1 }), Optional.of(new double[] { 5 })), + 1, + Optional.of(asList(Optional.of(new double[] { 3 }))), + new double[] { 1, 5, 3 } }, + new Object[] { + asList(Optional.empty(), Optional.empty(), Optional.of(new double[] { 1, 2 })), + 2, + Optional.of(asList(Optional.of(new double[] { 3, 4 }), Optional.of(new double[] { 5, 6 }))), + new double[] { 1, 2, 3, 4, 5, 6 } }, }; + } + + @Test + @Parameters(method = "getCurrentFeaturesTestData_whenAfterQueryResultsFormFullShingle") + public void getCurrentFeatures_returnExpectedProcessedFeatures_whenAfterQueryResultsFormFullShingle( + List> preQueryResponse, + long intervalOffsetFromPreviousQuery, + Optional>> testQueryResponse, + double[] expectedProcessedFeatures + ) throws IOException { + int expectedNumQueriesToSearchFeatureDao = 2; + long previousStartTime = shingleSize * intervalInMilliseconds; + long previousEndTime = previousStartTime + intervalInMilliseconds; + long testStartTime = previousStartTime + intervalOffsetFromPreviousQuery * intervalInMilliseconds; + long testEndTime = testStartTime + intervalInMilliseconds; + + // Set up + setupSearchFeatureDaoForGetCurrentFeatures(preQueryResponse, testQueryResponse); + featureManager.getCurrentFeatures(detector, previousStartTime, previousEndTime, mock(ActionListener.class)); + + // Start test + SinglePointFeatures listenerResponse = getCurrentFeatures(detector, testStartTime, testEndTime); + verify(searchFeatureDao, times(expectedNumQueriesToSearchFeatureDao)) + .getFeatureSamplesForPeriods(eq(detector), any(List.class), any(ActionListener.class)); + assertTrue(listenerResponse.getUnprocessedFeatures().isPresent()); + assertTrue(listenerResponse.getProcessedFeatures().isPresent()); + + double[] actualProcessedFeatures = listenerResponse.getProcessedFeatures().get(); + for (int i = 0; i < expectedProcessedFeatures.length; i++) { + assertEquals(expectedProcessedFeatures[i], actualProcessedFeatures[i], 0); + } + } + + private Object[] getCurrentFeaturesTestData_whenNoQueryNeededToFormFullShingle() { + return new Object[] { + new Object[] { + asList(Optional.of(new double[] { 1 }), Optional.of(new double[] { 2 }), Optional.of(new double[] { 3 })), + new double[] { 1, 2, 3 } }, + new Object[] { + asList(Optional.of(new double[] { 1, 2 }), Optional.of(new double[] { 3, 4 }), Optional.of(new double[] { 5, 6 })), + new double[] { 1, 2, 3, 4, 5, 6 } } }; + } + + @Test + @Parameters(method = "getCurrentFeaturesTestData_whenNoQueryNeededToFormFullShingle") + public void getCurrentFeatures_returnExpectedProcessedFeatures_whenNoQueryNeededToFormFullShingle( + List> preQueryResponse, + double[] expectedProcessedFeatures + ) throws IOException { + int expectedNumQueriesToSearchFeatureDao = 1; + long start = shingleSize * intervalInMilliseconds; + long end = start + intervalInMilliseconds; + + // Set up + setupSearchFeatureDaoForGetCurrentFeatures(preQueryResponse, Optional.empty()); + featureManager.getCurrentFeatures(detector, start, end, mock(ActionListener.class)); + + // Start test + SinglePointFeatures listenerResponse = getCurrentFeatures(detector, start, end); + verify(searchFeatureDao, times(expectedNumQueriesToSearchFeatureDao)) + .getFeatureSamplesForPeriods(eq(detector), any(List.class), any(ActionListener.class)); + assertTrue(listenerResponse.getUnprocessedFeatures().isPresent()); + assertTrue(listenerResponse.getProcessedFeatures().isPresent()); + + double[] actualProcessedFeatures = listenerResponse.getProcessedFeatures().get(); + for (int i = 0; i < expectedProcessedFeatures.length; i++) { + assertEquals(expectedProcessedFeatures[i], actualProcessedFeatures[i], 0); + } + } + + private Object[] getCurrentFeaturesTestData_whenAfterQueryResultsAllowImputedShingle() { + return new Object[] { + new Object[] { + asList(Optional.empty(), Optional.empty(), Optional.empty()), + 3, + Optional.of(asList(Optional.of(new double[] { 1 }), Optional.empty(), Optional.of(new double[] { 3 }))), + new double[] { 1, 3, 3 } }, + new Object[] { + asList(Optional.of(new double[] { 1 }), Optional.empty(), Optional.of(new double[] { 5 })), + 1, + Optional.of(asList(Optional.of(new double[] { 3 }))), + new double[] { 5, 5, 3 } }, + new Object[] { + asList(Optional.empty(), Optional.of(new double[] { 1 }), Optional.empty()), + 1, + Optional.of(asList(Optional.of(new double[] { 2 }))), + new double[] { 1, 2, 2 } }, + new Object[] { + asList(Optional.empty(), Optional.empty(), Optional.of(new double[] { 1 })), + 2, + Optional.of(asList(Optional.empty(), Optional.of(new double[] { 2 }))), + new double[] { 1, 2, 2 } }, + new Object[] { + asList(Optional.of(new double[] { 5, 6 }), Optional.empty(), Optional.empty()), + 2, + Optional.of(asList(Optional.of(new double[] { 3, 4 }), Optional.of(new double[] { 1, 2 }))), + new double[] { 3, 4, 3, 4, 1, 2 } }, }; + } + + @Test + @Parameters(method = "getCurrentFeaturesTestData_whenAfterQueryResultsAllowImputedShingle") + public void getCurrentFeatures_returnExpectedProcessedFeatures_whenAfterQueryResultsAllowImputedShingle( + List> preQueryResponse, + long intervalOffsetFromPreviousQuery, + Optional>> testQueryResponse, + double[] expectedProcessedFeatures + ) throws IOException { + int expectedNumQueriesToSearchFeatureDao = 2; + long previousStartTime = (shingleSize + 1) * intervalInMilliseconds; + long previousEndTime = previousStartTime + intervalInMilliseconds; + long testStartTime = previousStartTime + (intervalOffsetFromPreviousQuery * intervalInMilliseconds); + long testEndTime = testStartTime + intervalInMilliseconds; + + // Set up + setupSearchFeatureDaoForGetCurrentFeatures(preQueryResponse, testQueryResponse); + featureManager.getCurrentFeatures(detector, previousStartTime, previousEndTime, mock(ActionListener.class)); + + // Start test + SinglePointFeatures listenerResponse = getCurrentFeatures(detector, testStartTime, testEndTime); + verify(searchFeatureDao, times(expectedNumQueriesToSearchFeatureDao)) + .getFeatureSamplesForPeriods(eq(detector), any(List.class), any(ActionListener.class)); + assertTrue(listenerResponse.getUnprocessedFeatures().isPresent()); + assertTrue(listenerResponse.getProcessedFeatures().isPresent()); + + double[] actualProcessedFeatures = listenerResponse.getProcessedFeatures().get(); + for (int i = 0; i < expectedProcessedFeatures.length; i++) { + assertEquals(expectedProcessedFeatures[i], actualProcessedFeatures[i], 0); + } + } + + private Object[] getCurrentFeaturesTestData_whenMissingCurrentDataPoint() { + return new Object[] { + new Object[] { + asList(Optional.empty(), Optional.empty(), Optional.empty()), + 3, + Optional.of(asList(Optional.of(new double[] { 1 }), Optional.of(new double[] { 3 }), Optional.empty())), }, + new Object[] { + asList(Optional.of(new double[] { 1 }), Optional.of(new double[] { 1 }), Optional.empty()), + 1, + Optional.of(asList(Optional.empty())), }, + new Object[] { + asList(Optional.empty(), Optional.empty(), Optional.of(new double[] { 1, 2, 3 })), + 2, + Optional.of(asList(Optional.of(new double[] { 4, 5, 6 }), Optional.empty())), } }; + } + + @Test + @Parameters(method = "getCurrentFeaturesTestData_whenMissingCurrentDataPoint") + public void getCurrentFeatures_returnNoProcessedOrUnprocessedFeatures_whenMissingCurrentDataPoint( + List> preQueryResponse, + long intervalOffsetFromPreviousQuery, + Optional>> testQueryResponse + ) throws IOException { + int expectedNumQueriesToSearchFeatureDao = 2; + long previousStartTime = shingleSize * intervalInMilliseconds; + long previousEndTime = previousStartTime + intervalInMilliseconds; + long testStartTime = previousStartTime + intervalOffsetFromPreviousQuery * intervalInMilliseconds; + long testEndTime = testStartTime + intervalInMilliseconds; + + // Set up + setupSearchFeatureDaoForGetCurrentFeatures(preQueryResponse, testQueryResponse); + featureManager.getCurrentFeatures(detector, previousStartTime, previousEndTime, mock(ActionListener.class)); + + // Start test + SinglePointFeatures listenerResponse = getCurrentFeatures(detector, testStartTime, testEndTime); + verify(searchFeatureDao, times(expectedNumQueriesToSearchFeatureDao)) + .getFeatureSamplesForPeriods(eq(detector), any(List.class), any(ActionListener.class)); + assertFalse(listenerResponse.getUnprocessedFeatures().isPresent()); + assertFalse(listenerResponse.getProcessedFeatures().isPresent()); + } + + private Object[] getCurrentFeaturesTestData_whenAfterQueryResultsCannotBeShingled() { + return new Object[] { + new Object[] { + asList(Optional.of(new double[] { 1 }), Optional.of(new double[] { 2 }), Optional.of(new double[] { 3 })), + 3, + Optional.of(asList(Optional.empty(), Optional.empty(), Optional.of(new double[] { 4 }))), }, + new Object[] { + asList(Optional.of(new double[] { 1, 2 }), Optional.empty(), Optional.empty()), + 1, + Optional.of(asList(Optional.of(new double[] { 3, 4 }))), } }; + } + + @Test + @Parameters(method = "getCurrentFeaturesTestData_whenAfterQueryResultsCannotBeShingled") + public void getCurrentFeatures_returnNoProcessedFeatures_whenAfterQueryResultsCannotBeShingled( + List> preQueryResponse, + long intervalOffsetFromPreviousQuery, + Optional>> testQueryResponse + ) throws IOException { + int expectedNumQueriesToSearchFeatureDao = 2; + long previousStartTime = shingleSize * intervalInMilliseconds; + long previousEndTime = previousStartTime + intervalInMilliseconds; + long testStartTime = previousStartTime + intervalOffsetFromPreviousQuery * intervalInMilliseconds; + long testEndTime = testStartTime + intervalInMilliseconds; + + // Set up + setupSearchFeatureDaoForGetCurrentFeatures(preQueryResponse, testQueryResponse); + featureManager.getCurrentFeatures(detector, previousStartTime, previousEndTime, mock(ActionListener.class)); + + // Start test + SinglePointFeatures listenerResponse = getCurrentFeatures(detector, testStartTime, testEndTime); + verify(searchFeatureDao, times(expectedNumQueriesToSearchFeatureDao)) + .getFeatureSamplesForPeriods(eq(detector), any(List.class), any(ActionListener.class)); + assertTrue(listenerResponse.getUnprocessedFeatures().isPresent()); + assertFalse(listenerResponse.getProcessedFeatures().isPresent()); + } + + private Object[] getCurrentFeaturesTestData_whenQueryThrowsIOException() { + return new Object[] { + new Object[] { asList(Optional.empty(), Optional.empty(), Optional.empty()), 3 }, + new Object[] { asList(Optional.empty(), Optional.of(new double[] { 1, 2 }), Optional.of(new double[] { 3, 4 })), 1 } }; + } + + @Test + @Parameters(method = "getCurrentFeaturesTestData_whenQueryThrowsIOException") + public void getCurrentFeatures_returnExceptionToListener_whenQueryThrowsIOException( + List> preQueryResponse, + long intervalOffsetFromPreviousQuery + ) throws IOException { + int expectedNumQueriesToSearchFeatureDao = 2; + long previousStartTime = shingleSize * intervalInMilliseconds; + long previousEndTime = previousStartTime + intervalInMilliseconds; + long testStartTime = previousStartTime + intervalOffsetFromPreviousQuery * intervalInMilliseconds; + long testEndTime = testStartTime + intervalInMilliseconds; + + // Set up + setupSearchFeatureDaoForGetCurrentFeatures(preQueryResponse, Optional.empty()); + featureManager.getCurrentFeatures(detector, previousStartTime, previousEndTime, mock(ActionListener.class)); + + // Start test + ActionListener listener = mock(ActionListener.class); + featureManager.getCurrentFeatures(detector, testStartTime, testEndTime, listener); + verify(searchFeatureDao, times(expectedNumQueriesToSearchFeatureDao)) + .getFeatureSamplesForPeriods(eq(detector), any(List.class), any(ActionListener.class)); + verify(listener).onFailure(any(IOException.class)); + } + + private Object[] getCurrentFeaturesTestData_cacheMissingData() { + return new Object[] { + new Object[] { + asList(Optional.empty(), Optional.empty(), Optional.empty()), + Optional.of(asList(Optional.of(new double[] { 1 }))), + Optional.empty() }, + new Object[] { + asList(Optional.of(new double[] { 1, 2 }), Optional.empty(), Optional.of(new double[] { 3, 4 })), + Optional.of(asList(Optional.of(new double[] { 5, 6 }))), + Optional.of(new double[] { 3, 4, 3, 4, 5, 6 }) } }; + } + + @Test + @Parameters(method = "getCurrentFeaturesTestData_cacheMissingData") + public void getCurrentFeatures_returnExpectedFeatures_cacheMissingData( + List> firstQueryResponseToBeCached, + Optional>> secondQueryResponse, + Optional expectedProcessedFeaturesOptional + ) throws IOException { + long firstStartTime = shingleSize * intervalInMilliseconds; + long firstEndTime = firstStartTime + intervalInMilliseconds; + long secondStartTime = firstEndTime; + long secondEndTime = secondStartTime + intervalInMilliseconds; + + setupSearchFeatureDaoForGetCurrentFeatures(firstQueryResponseToBeCached, secondQueryResponse); + + // first call to cache missing points + featureManager.getCurrentFeatures(detector, firstStartTime, firstEndTime, mock(ActionListener.class)); + verify(searchFeatureDao, times(1)) + .getFeatureSamplesForPeriods(eq(detector), argThat(list -> list.size() == shingleSize), any(ActionListener.class)); + + // second call should only fetch current point even if previous points missing + SinglePointFeatures listenerResponse = getCurrentFeatures(detector, secondStartTime, secondEndTime); + verify(searchFeatureDao, times(1)) + .getFeatureSamplesForPeriods(eq(detector), argThat(list -> list.size() == 1), any(ActionListener.class)); + + assertTrue(listenerResponse.getUnprocessedFeatures().isPresent()); + if (expectedProcessedFeaturesOptional.isPresent()) { + assertTrue(listenerResponse.getProcessedFeatures().isPresent()); + double[] expectedProcessedFeatures = expectedProcessedFeaturesOptional.get(); + double[] actualProcessedFeatures = listenerResponse.getProcessedFeatures().get(); + for (int i = 0; i < expectedProcessedFeatures.length; i++) { + assertEquals(expectedProcessedFeatures[i], actualProcessedFeatures[i], 0); + } + } else { + assertFalse(listenerResponse.getProcessedFeatures().isPresent()); + } + } + + private Object[] getCurrentFeaturesTestData_withTimeJitterUpToHalfInterval() { + return new Object[] { + new Object[] { + asList(Optional.empty(), Optional.empty(), Optional.of(new double[] { 1 })), + 2.1, + Optional.of(asList(Optional.of(new double[] { 2 }), Optional.of(new double[] { 3 }))), + new double[] { 1, 2, 3 } }, + new Object[] { + asList(Optional.of(new double[] { 1 }), Optional.empty(), Optional.of(new double[] { 5 })), + 0.8, + Optional.of(asList(Optional.of(new double[] { 3 }))), + new double[] { 5, 5, 3 } }, + new Object[] { + asList(Optional.empty(), Optional.empty(), Optional.of(new double[] { 1 })), + 1.49, + Optional.of(asList(Optional.of(new double[] { 2 }))), + new double[] { 1, 1, 2 } }, + new Object[] { + asList(Optional.empty(), Optional.empty(), Optional.of(new double[] { 1 })), + 1.51, + Optional.of(asList(Optional.empty(), Optional.of(new double[] { 2 }))), + new double[] { 1, 1, 2 } }, + new Object[] { + asList(Optional.empty(), Optional.empty(), Optional.of(new double[] { 1 })), + 2.49, + Optional.of(asList(Optional.empty(), Optional.of(new double[] { 2 }))), + new double[] { 1, 2, 2 } }, + new Object[] { + asList(Optional.of(new double[] { 1, 2 }), Optional.of(new double[] { 3, 4 }), Optional.of(new double[] { 5, 6 })), + 2.5, + Optional + .of( + asList( + Optional.of(new double[] { 7, 8 }), + Optional.of(new double[] { 9, 10 }), + Optional.of(new double[] { 11, 12 }) + ) + ), + new double[] { 7, 8, 9, 10, 11, 12 } }, }; + } + + @Test + @Parameters(method = "getCurrentFeaturesTestData_withTimeJitterUpToHalfInterval") + public void getCurrentFeatures_returnExpectedFeatures_withTimeJitterUpToHalfInterval( + List> preQueryResponse, + double intervalOffsetFromPreviousQuery, + Optional>> testQueryResponse, + double[] expectedProcessedFeatures + ) throws IOException { + int expectedNumQueriesToSearchFeatureDao = 2; + long previousStartTime = (shingleSize + 1) * intervalInMilliseconds; + long previousEndTime = previousStartTime + intervalInMilliseconds; + double millisecondsOffset = intervalOffsetFromPreviousQuery * intervalInMilliseconds; + long testStartTime = previousStartTime + (long) millisecondsOffset; + long testEndTime = testStartTime + intervalInMilliseconds; + + // Set up + setupSearchFeatureDaoForGetCurrentFeatures(preQueryResponse, testQueryResponse); + featureManager.getCurrentFeatures(detector, previousStartTime, previousEndTime, mock(ActionListener.class)); + + // Start test + SinglePointFeatures listenerResponse = getCurrentFeatures(detector, testStartTime, testEndTime); + verify(searchFeatureDao, times(expectedNumQueriesToSearchFeatureDao)) + .getFeatureSamplesForPeriods(eq(detector), any(List.class), any(ActionListener.class)); + assertTrue(listenerResponse.getUnprocessedFeatures().isPresent()); + assertTrue(listenerResponse.getProcessedFeatures().isPresent()); + + double[] actualProcessedFeatures = listenerResponse.getProcessedFeatures().get(); + for (int i = 0; i < expectedProcessedFeatures.length; i++) { + assertEquals(expectedProcessedFeatures[i], actualProcessedFeatures[i], 0); + } + } + private Entry entry(K key, V value) { return new SimpleEntry<>(key, value); } diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/AnomalyResultTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/AnomalyResultTests.java index faa552b3..454f1b0e 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/AnomalyResultTests.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/AnomalyResultTests.java @@ -949,7 +949,7 @@ enum FeatureTestMode { } @SuppressWarnings("unchecked") - public void featureTestTemplate(FeatureTestMode mode) { + public void featureTestTemplate(FeatureTestMode mode) throws IOException { if (mode == FeatureTestMode.FEATURE_NOT_AVAILABLE) { doAnswer(invocation -> { ActionListener listener = invocation.getArgument(3); @@ -996,15 +996,15 @@ public void featureTestTemplate(FeatureTestMode mode) { } } - public void testFeatureNotAvailable() { + public void testFeatureNotAvailable() throws IOException { featureTestTemplate(FeatureTestMode.FEATURE_NOT_AVAILABLE); } - public void testFeatureIllegalState() { + public void testFeatureIllegalState() throws IOException { featureTestTemplate(FeatureTestMode.ILLEGAL_STATE); } - public void testFeatureAnomalyException() { + public void testFeatureAnomalyException() throws IOException { featureTestTemplate(FeatureTestMode.AD_EXCEPTION); } @@ -1115,8 +1115,7 @@ public void testNullRCFResult() { } @SuppressWarnings("unchecked") - public void testAllFeaturesDisabled() { - + public void testAllFeaturesDisabled() throws IOException { doAnswer(invocation -> { Object[] args = invocation.getArguments(); ActionListener listener = (ActionListener) args[3];