Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Queries data from the index when insufficient data in buffer to form a full shingle #176

Conversation

LiuJoyceC
Copy link
Contributor

@LiuJoyceC LiuJoyceC commented Jun 25, 2020

Issue #, if available:

Description of changes:
When running an AD on an interval, data points from past consecutive intervals are required in the shingling process. Currently, the AD only checks data points from a buffer which only contains data points for intervals which were previously run. This means for the first several intervals which the AD runs on, or if there were previously missed intervals (for example, due to restarts), the AD is unable to form a shingle and will output no results, even if the data points for the missed intervals are in the data index.

With this change, instead of issuing a query which only retrieves the current data point, a single query to the index will be issued for the current data point and any missing data points from the buffer that are needed to form a full shingle. The results of the query are cached so that if the data point is missing from the index itself, it will not be re-queried later.

Testing:
A comprehensive set of unit tests have been added (8 new tests covering 25 test cases) to test the FeatureManager getCurrentFeatures method under various initial states and query responses.

By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.

Copy link
Contributor

@wnbts wnbts left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems the unit test is lost from repo migration and further clean up. given the importance of this method, all branches should be covered by unit test.

https://code.amazon.com/reviews/CR-15233815/revisions/2#/diff

9af0080#diff-d5d992d03a1e15cffb9cfc0b32239c89

@yizheliu-amazon yizheliu-amazon added the enhancement New feature or request label Jun 25, 2020
@LiuJoyceC LiuJoyceC force-pushed the fill-shingle-with-history branch from e8b0d2a to 84d4a20 Compare June 30, 2020 19:14
@LiuJoyceC LiuJoyceC force-pushed the fill-shingle-with-history branch from 84d4a20 to f87ee3d Compare June 30, 2020 19:18
* @param listener onResponse is called with unprocessed features and processed features for the current data point
* @throws IOException if a user gives wrong query input when defining a detector
*/
public void getCurrentFeatures(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this method should be private to the class.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason this is made public is to allow the unit tests to set the initial state of the shingle without caching missing data points during the setup (since the caching would affect the desired initial state of the shingle to be tested).

I can try to find alternative ways to set the initial state of the shingle, in which case I'll just remove the cacheMissingDataPoints parameter (and just always cache missing data points) and remove the method above.

* @param endTime end time of the data point in epoch milliseconds
* @param cacheMissingDataPoints if set to true, missing data points are remembered and not re-queried later
* @param listener onResponse is called with unprocessed features and processed features for the current data point
* @throws IOException if a user gives wrong query input when defining a detector
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor. the query is from both the input and the system. if that happens, there will be no recovery for caller. usually those exceptions are caught and wrapped (abstracted) in unchecked exceptions for high-level callers, like in many other programming languages.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I'll add a try catch block around searchFeatureDao.getFeatureSamplesForPeriods() and pass the error to listener.onFailure() instead, since that's where other failures from searchFeatureDao.getFeatureSamplesForPeriods() are being passed to.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please make sure AnomalyResultTransportAction.onFeatureResponse can handle the exception you throw. The IOException sounds like sth you should rethrow EndRunException in AnomalyResultTransportAction and let user know.

* @param endTime end time of the data point in epoch milliseconds
* @param cacheMissingDataPoints if set to true, missing data points are remembered and not re-queried later
* @param listener onResponse is called with unprocessed features and processed features for the current data point
* @throws IOException if a user gives wrong query input when defining a detector
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please make sure AnomalyResultTransportAction.onFeatureResponse can handle the exception you throw. The IOException sounds like sth you should rethrow EndRunException in AnomalyResultTransportAction and let user know.

LiuJoyceC added 2 commits July 6, 2020 11:49
1) Allows time jitter up to half an interval.
2) getCurrentFeatures returns all errors to the listener instead of throwing when searchFeatureDao throws.
3) Modified unit tests to not require a public getCurrentFeatures interface that allows for not caching.
Comment on lines 56 to 57
private final Map<String, ArrayDeque<Entry<Long, double[]>>> detectorIdsToTimeShingles;
private Map<String, ArrayDeque<Entry<Long, Optional<double[]>>>> detectorIdsToTimeShingles;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor. this data should still be final.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. I forgot to add back the final when I was originally experimenting with mocking private variables for unit tests.

.map(points -> new SinglePointFeatures(currentPoint, Optional.of(batchShingle(points, shingleSize)[0])))
.orElse(new SinglePointFeatures(currentPoint, Optional.empty()))
);
if (shingle.isEmpty() || shingle.getLast().getKey() < endTime || !shingle.getLast().getValue().isPresent()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor. the second condition shingle.getLast().getKey() < endTime might need some relaxation. if the existing buffer is [1min, 2min,..., 8min] and endTime is 8min1sec, the last data point would still be valid for the endTime.

also, this if branch may be merged with the else branch, starting with optional current point and if current point is present, then optional current shingle.

Comment on lines 144 to 145
} catch (IOException e) {
listener.onFailure(e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor. error handling can change to as kaituo suggested.

Comment on lines 178 to 181
if (featuresMap.containsKey(endTime)) {
getProcessedFeatures(shingle, detector, endTime, listener);
} else {
listener.onResponse(new SinglePointFeatures(Optional.empty(), Optional.empty()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getProcessedFeatures should be able to handle both branches.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you suggesting to pass featuresMap as a parameter into getProcessedFeatures and check this condition in there?

The condition featuresMap.containsKey(endTime) is the simplest/most concise way to ensure the conditions !shingle.isEmpty() and shingle.getLast().getKey() ≈ endTime (where means within half an interval away as discussed, which basically means that the current point is present). The reason this check is being done in updateUnprocessedFeatures is because this check is only needed if updateUnprocessedFeatures is invoked, and not needed if getProcessedFeatures is directly invoked by getCurrentFeatures (in which case the missingRanges.size() > 0 already ensures the above conditions, so checking featuresMap is unnecessary).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, featuremap is not needed. by the end of line 176, the collection shingle should contain exactly shingle sized data points, present or absent, the last being the most ending around endTime. getProcessedFeatures can output the same results for both if and else branches.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the current implementation (where results are not added to the shingle if searchFeatureDao returns Collections.emptyList()), the shingle is not guaranteed to contain an entry around endTime, which is why this check is currently being done.

However, per your response on the comment below, if we go ahead and cache all missing ranges when searchFeatureDao returns Collections.emptyList(), then the shingle will be guaranteed to have the entry at or near endTime, and then checking the featureMap will no longer be needed.

shingle.clear();
getFullShingleEndTimes(endTime, getDetectorIntervalInMilliseconds(detector))
.filter(time -> featuresMap.containsKey(time))
.mapToObj(time -> featuresMap.get(time))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor. this first filter might not be needed since the map should contains results for all interval, present or absent. to be safe in the unlikely case that map is incomplete, the second get can return an empty if the key is absent using getOrDefault.

Copy link
Contributor Author

@LiuJoyceC LiuJoyceC Jul 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It depends on how we want to handle this case: https://github.com/opendistro-for-elasticsearch/anomaly-detection/blob/master/src/main/java/com/amazon/opendistroforelasticsearch/ad/feature/SearchFeatureDao.java#L211-L214. This is the case that could cause some of the times to be missing from featuresMap at this line.

Is aggs == null in the search response equivalent to each queried time range having no data? In other words, do we want to treat Collections.emptyList() equivalent to [Optional.empty(), Optional.empty(), ..., Optional.empty()]? In the case of Collections.emptyList(), do we want to cache the null values for each of the queried time ranges so they will not get re-queried?

If yes, it might be cleaner to handle this logic in the callback passed to searchFeatureDao, so that the logic for determining shingle values based on the search response is located in one function rather than spread across multiple functions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i haven't seen or know that happens in practice. if that ever happens, caching the results is ok. what i was suggesting is just simplify the implementation to one line like stream.maptoobject(time -> map.getOrDefault(time, <time, empty()>)).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright, I will change the implementation to cache results for the missing ranges when the response is an empty list.

}
Optional<double[]> currentPoint = shingle.peekLast().getValue();
listener
.onResponse(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original code is easier to read. Also, you do "currentPoint.map(point -> filterAndFill(shingle, endTime, detector))" without using currentPoint in filterAndFill, which looks strange.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change was made to address the comments above from wnbts to simplify the code here to not use if else to ensure currentPoint is not an empty Optional. The way this is written is such that if currentPoint is an empty Optional, it triggers the orElse line, so that if the currentPoint is empty, we don't end up with a non-empty array of points in the SinglePointFeatures (if that were to happen, it would be an unexpected change in behavior from before).

The reason the original code didn't need to check that was because currentPoint wasn't an Optional before. Now that we are caching empty responses from the search request to ensure we don't re-query missing data, the points in the shingle are wrapped in Optionals.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another unexpected change in behavior that this is avoiding is running filterAndFill on a shingle where the current point is not present. This would result in the shingle filling in the missing current point with older data. In the original code, there were checks in getCurrentFeatures and updateUnprocessedFeature to ensure the current point is not missing. These checks were removed to address comments above about simplifying the logic.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code was modified to no longer have an unused param point in a function, and now currentPoint.isPresent() is explicitly checked to make the intention clear. Does this address the concern?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, thanks for the change.

updateUnprocessedFeatures(detector, shingle, featuresMap, endTime, listener);
}, listener::onFailure));
} catch (IOException e) {
listener.onFailure(new EndRunException(detector.getDetectorId(), CommonErrorMessages.INVALID_SEARCH_QUERY_MSG, e, true));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a function-level comment (e.g.,

* onFailure is called with EndRunException on feature query creation errors
) to say you are gonna throw this exception when certain condition is true?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure.

if (result.length < shingleSize) {
result = null;
}
}
return result;
}

private Stream<Entry<Long, Entry<Long, Optional<double[]>>>> getNearbyPointsForShingle(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have some documentation of this method since the return value is nested and not easy to understand?

@@ -132,81 +133,129 @@ public FeatureManager(
*/
public void getCurrentFeatures(AnomalyDetector detector, long startTime, long endTime, ActionListener<SinglePointFeatures> listener) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add comments on your workflow? Is the following understanding correct?

First, you interpolate using nearest points.
Then, query for missing points.
Finally, do another round of interpolation using nearest points.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code isn't actually interpolating before querying for missing points. It is meant to account for a small amount of jitter in time (see wnbts's comment above) so that points that are not actually missing aren't re-queried simply because the millisecond timestamp doesn't exactly match up. The fact that we have to account for random time jitter does make the implementation a bit more complicated than the original code (and my original first version of this feature).

For example, let's say the interval is 60,000 milliseconds, and the detector ends up running at time 60,000, time 120,001, and time 179,999 (simply due to random time jitter). At time 120,001, the function shouldn't re-query historical data just because there doesn't exist a data point stored in the shingle buffer whose timestamp matches up to exactly 60,001. It should just recognize that the point at timestamp 60,000 is the point from 1 interval ago. Similarly, at time 179,999, it shouldn't re-query the previous 2 intervals just because there aren't points whose timestamps are exactly 119,999 and 59,999.

Also, what actually gets stored in the shingle buffer (which persists to intervals in the future) is the actual original timestamp of the data point, so no interpolation is being done. In the example above, what gets stored in the shingle is:
[<60000, x>, <120001, y>, <179999, z>]. So the data point at time 60,000 is not getting interpolated to time 60,001. Instead, time 60,000 is just recognized as the timestamp of the previous interval.

The actual interpolation is done after the query for missing points. Here, missing points will be interpolated with points from neighboring intervals up to the configured maxNeighborDistance (which is currently configured to 2 intervals away). So if the shingle is missing an interval, such as [<60000, x>, <179999, z>], then the data point from time 179,999 will get interpolated to time 119,999 as if that were the actual data point from the previous interval, so the resulting double[] from filterAndFill becomes [x, z, z].

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First of all, there is no jitter. So the logic is unnecessary. (FYI @wnbts)

You called getNearbyPointsForShingle before running query that actually use nearby points for interpolation, right? getNearbyPointsForShingle returns a map whose key is 60,000, 120,000, 180,000 in your example because the key's source is what getFullShingleEndTimes returns, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kaituo i am not following this entire thread, just comment on the statement that "there is no jiftter". It looks like the time range comes from system clock

, which has jitters. Also, scheduled job runs are not exactly evenly spaced for jitters are intentionally added to scheduler.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wnbts scheduled job's jitter is disabled for AD. It's possible that a scheduled job may not happen exactly as it is scheduled (e.g., system is under heavy load). But in normal cases, the job should run on time.

@LiuJoyceC You can keep your code. That should give some fault tolerance.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You called getNearbyPointsForShingle before running query that actually use nearby points for interpolation, right? getNearbyPointsForShingle returns a map whose key is 60,000, 120,000, 180,000 in your example because the key's source is what getFullShingleEndTimes returns, right?

The key in the map is meant to allow the shingle to match up data points to intervals. The actual timestamp of the data point is stored in the value of the map along with the data point value. In updateUnprocessedFeatures, it is the actual timestamp of the data point that is added to the shingle and persisted to future intervals. So the shingle in the example will contain the timestamps 60,000, 120,001, 179,999. The data points are only associated with their actual timestamps and not imputed to other timestamps at that stage.

You can keep your code. That should give some fault tolerance.

👍

}
Optional<double[]> currentPoint = shingle.peekLast().getValue();
listener
.onResponse(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, thanks for the change.

@LiuJoyceC LiuJoyceC force-pushed the fill-shingle-with-history branch from 484df68 to 636f293 Compare July 15, 2020 01:15
@LiuJoyceC LiuJoyceC merged commit 44a8bcd into opendistro-for-elasticsearch:master Jul 16, 2020
yizheliu-amazon pushed a commit that referenced this pull request Aug 28, 2020
…a full shingle (#176)

Adds feature to query data from the index when insufficient data in buffer to form a full shingle. Query results are cached.
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants