Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Queries data from the index when insufficient data in buffer to form a full shingle #176

Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public class FeatureManager {
private static final Logger logger = LogManager.getLogger(FeatureManager.class);

// Each anomaly detector has a queue of data points with timestamps (in epoch milliseconds).
private Map<String, ArrayDeque<Entry<Long, Optional<double[]>>>> detectorIdsToTimeShingles;
private final Map<String, ArrayDeque<Entry<Long, Optional<double[]>>>> detectorIdsToTimeShingles;

private final SearchFeatureDao searchFeatureDao;
private final Interpolator interpolator;
Expand Down Expand Up @@ -175,7 +175,11 @@ private void updateUnprocessedFeatures(
.mapToObj(time -> featuresMap.get(time))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor. this first filter might not be needed since the map should contains results for all interval, present or absent. to be safe in the unlikely case that map is incomplete, the second get can return an empty if the key is absent using getOrDefault.

Copy link
Contributor Author

@LiuJoyceC LiuJoyceC Jul 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It depends on how we want to handle this case: https://github.com/opendistro-for-elasticsearch/anomaly-detection/blob/master/src/main/java/com/amazon/opendistroforelasticsearch/ad/feature/SearchFeatureDao.java#L211-L214. This is the case that could cause some of the times to be missing from featuresMap at this line.

Is aggs == null in the search response equivalent to each queried time range having no data? In other words, do we want to treat Collections.emptyList() equivalent to [Optional.empty(), Optional.empty(), ..., Optional.empty()]? In the case of Collections.emptyList(), do we want to cache the null values for each of the queried time ranges so they will not get re-queried?

If yes, it might be cleaner to handle this logic in the callback passed to searchFeatureDao, so that the logic for determining shingle values based on the search response is located in one function rather than spread across multiple functions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i haven't seen or know that happens in practice. if that ever happens, caching the results is ok. what i was suggesting is just simplify the implementation to one line like stream.maptoobject(time -> map.getOrDefault(time, <time, empty()>)).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright, I will change the implementation to cache results for the missing ranges when the response is an empty list.

.forEach(e -> shingle.add(e));

getProcessedFeatures(shingle, detector, endTime, listener);
if (featuresMap.containsKey(endTime)) {
getProcessedFeatures(shingle, detector, endTime, listener);
} else {
listener.onResponse(new SinglePointFeatures(Optional.empty(), Optional.empty()));
}
}

private void getProcessedFeatures(
Expand All @@ -184,19 +188,14 @@ private void getProcessedFeatures(
long endTime,
ActionListener<SinglePointFeatures> listener
) {
if (shingle.isEmpty() || shingle.getLast().getKey() < endTime || !shingle.getLast().getValue().isPresent()) {
listener.onResponse(new SinglePointFeatures(Optional.empty(), Optional.empty()));
} else {
double[][] currentPoints = filterAndFill(shingle, endTime, detector);
Optional<double[]> currentPoint = shingle.peekLast().getValue();
listener
.onResponse(
Optional
.ofNullable(currentPoints)
.map(points -> new SinglePointFeatures(currentPoint, Optional.of(batchShingle(points, shingleSize)[0])))
.orElse(new SinglePointFeatures(currentPoint, Optional.empty()))
);
}
Optional<double[]> currentPoint = shingle.peekLast().getValue();
listener
.onResponse(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original code is easier to read. Also, you do "currentPoint.map(point -> filterAndFill(shingle, endTime, detector))" without using currentPoint in filterAndFill, which looks strange.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change was made to address the comments above from wnbts to simplify the code here to not use if else to ensure currentPoint is not an empty Optional. The way this is written is such that if currentPoint is an empty Optional, it triggers the orElse line, so that if the currentPoint is empty, we don't end up with a non-empty array of points in the SinglePointFeatures (if that were to happen, it would be an unexpected change in behavior from before).

The reason the original code didn't need to check that was because currentPoint wasn't an Optional before. Now that we are caching empty responses from the search request to ensure we don't re-query missing data, the points in the shingle are wrapped in Optionals.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another unexpected change in behavior that this is avoiding is running filterAndFill on a shingle where the current point is not present. This would result in the shingle filling in the missing current point with older data. In the original code, there were checks in getCurrentFeatures and updateUnprocessedFeature to ensure the current point is not missing. These checks were removed to address comments above about simplifying the logic.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code was modified to no longer have an unused param point in a function, and now currentPoint.isPresent() is explicitly checked to make the intention clear. Does this address the concern?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, thanks for the change.

currentPoint
.map(point -> filterAndFill(shingle, endTime, detector))
.map(points -> new SinglePointFeatures(currentPoint, Optional.of(batchShingle(points, shingleSize)[0])))
.orElse(new SinglePointFeatures(currentPoint, Optional.empty()))
);
}

private double[][] filterAndFill(Deque<Entry<Long, Optional<double[]>>> shingle, long endTime, AnomalyDetector detector) {
Expand Down