Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Queries data from the index when insufficient data in buffer to form a full shingle #176

Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.LongStream;
import java.util.stream.Stream;

import org.apache.logging.log4j.LogManager;
Expand All @@ -53,7 +54,7 @@ public class FeatureManager {
private static final Logger logger = LogManager.getLogger(FeatureManager.class);

// Each anomaly detector has a queue of data points with timestamps (in epoch milliseconds).
private final Map<String, ArrayDeque<Entry<Long, double[]>>> detectorIdsToTimeShingles;
private final Map<String, ArrayDeque<Entry<Long, Optional<double[]>>>> detectorIdsToTimeShingles;

private final SearchFeatureDao searchFeatureDao;
private final Interpolator interpolator;
Expand Down Expand Up @@ -121,81 +122,132 @@ public FeatureManager(
*/
public void getCurrentFeatures(AnomalyDetector detector, long startTime, long endTime, ActionListener<SinglePointFeatures> listener) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add comments on your workflow? Is the following understanding correct?

First, you interpolate using nearest points.
Then, query for missing points.
Finally, do another round of interpolation using nearest points.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code isn't actually interpolating before querying for missing points. It is meant to account for a small amount of jitter in time (see wnbts's comment above) so that points that are not actually missing aren't re-queried simply because the millisecond timestamp doesn't exactly match up. The fact that we have to account for random time jitter does make the implementation a bit more complicated than the original code (and my original first version of this feature).

For example, let's say the interval is 60,000 milliseconds, and the detector ends up running at time 60,000, time 120,001, and time 179,999 (simply due to random time jitter). At time 120,001, the function shouldn't re-query historical data just because there doesn't exist a data point stored in the shingle buffer whose timestamp matches up to exactly 60,001. It should just recognize that the point at timestamp 60,000 is the point from 1 interval ago. Similarly, at time 179,999, it shouldn't re-query the previous 2 intervals just because there aren't points whose timestamps are exactly 119,999 and 59,999.

Also, what actually gets stored in the shingle buffer (which persists to intervals in the future) is the actual original timestamp of the data point, so no interpolation is being done. In the example above, what gets stored in the shingle is:
[<60000, x>, <120001, y>, <179999, z>]. So the data point at time 60,000 is not getting interpolated to time 60,001. Instead, time 60,000 is just recognized as the timestamp of the previous interval.

The actual interpolation is done after the query for missing points. Here, missing points will be interpolated with points from neighboring intervals up to the configured maxNeighborDistance (which is currently configured to 2 intervals away). So if the shingle is missing an interval, such as [<60000, x>, <179999, z>], then the data point from time 179,999 will get interpolated to time 119,999 as if that were the actual data point from the previous interval, so the resulting double[] from filterAndFill becomes [x, z, z].

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First of all, there is no jitter. So the logic is unnecessary. (FYI @wnbts)

You called getNearbyPointsForShingle before running query that actually use nearby points for interpolation, right? getNearbyPointsForShingle returns a map whose key is 60,000, 120,000, 180,000 in your example because the key's source is what getFullShingleEndTimes returns, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kaituo i am not following this entire thread, just comment on the statement that "there is no jiftter". It looks like the time range comes from system clock

, which has jitters. Also, scheduled job runs are not exactly evenly spaced for jitters are intentionally added to scheduler.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wnbts scheduled job's jitter is disabled for AD. It's possible that a scheduled job may not happen exactly as it is scheduled (e.g., system is under heavy load). But in normal cases, the job should run on time.

@LiuJoyceC You can keep your code. That should give some fault tolerance.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You called getNearbyPointsForShingle before running query that actually use nearby points for interpolation, right? getNearbyPointsForShingle returns a map whose key is 60,000, 120,000, 180,000 in your example because the key's source is what getFullShingleEndTimes returns, right?

The key in the map is meant to allow the shingle to match up data points to intervals. The actual timestamp of the data point is stored in the value of the map along with the data point value. In updateUnprocessedFeatures, it is the actual timestamp of the data point that is added to the shingle and persisted to future intervals. So the shingle in the example will contain the timestamps 60,000, 120,001, 179,999. The data points are only associated with their actual timestamps and not imputed to other timestamps at that stage.

You can keep your code. That should give some fault tolerance.

👍


Deque<Entry<Long, double[]>> shingle = detectorIdsToTimeShingles
.computeIfAbsent(detector.getDetectorId(), id -> new ArrayDeque<Entry<Long, double[]>>(shingleSize));
if (shingle.isEmpty() || shingle.getLast().getKey() < endTime) {
searchFeatureDao
.getFeaturesForPeriod(
detector,
startTime,
endTime,
ActionListener
.wrap(point -> updateUnprocessedFeatures(point, shingle, detector, endTime, listener), listener::onFailure)
);
Deque<Entry<Long, Optional<double[]>>> shingle = detectorIdsToTimeShingles
.computeIfAbsent(detector.getDetectorId(), id -> new ArrayDeque<>(shingleSize));

long maxTimeDifference = getDetectorIntervalInMilliseconds(detector) / 2;
Map<Long, Entry<Long, Optional<double[]>>> featuresMap = getNearbyPointsForShingle(detector, shingle, endTime, maxTimeDifference)
.collect(Collectors.toMap(Entry::getKey, Entry::getValue));

List<Entry<Long, Long>> missingRanges = getMissingRangesInShingle(detector, featuresMap, endTime);
kaituo marked this conversation as resolved.
Show resolved Hide resolved

if (missingRanges.size() > 0) {
try {
searchFeatureDao.getFeatureSamplesForPeriods(detector, missingRanges, ActionListener.wrap(points -> {
for (int i = 0; i < points.size(); i++) {
Optional<double[]> point = points.get(i);
long rangeEndTime = missingRanges.get(i).getValue();
featuresMap.put(rangeEndTime, new SimpleImmutableEntry<>(rangeEndTime, point));
}
updateUnprocessedFeatures(detector, shingle, featuresMap, endTime, listener);
}, listener::onFailure));
} catch (IOException e) {
listener.onFailure(e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor. error handling can change to as kaituo suggested.

}
} else {
getProcessedFeatures(shingle, detector, endTime, listener);
}
LiuJoyceC marked this conversation as resolved.
Show resolved Hide resolved
}

private List<Entry<Long, Long>> getMissingRangesInShingle(
AnomalyDetector detector,
Map<Long, Entry<Long, Optional<double[]>>> featuresMap,
long endTime
) {
long intervalMilli = getDetectorIntervalInMilliseconds(detector);

return getFullShingleEndTimes(endTime, intervalMilli)
.filter(time -> !featuresMap.containsKey(time))
wnbts marked this conversation as resolved.
Show resolved Hide resolved
.mapToObj(time -> new SimpleImmutableEntry<>(time - intervalMilli, time))
.collect(Collectors.toList());
}

private void updateUnprocessedFeatures(
Optional<double[]> point,
Deque<Entry<Long, double[]>> shingle,
AnomalyDetector detector,
Deque<Entry<Long, Optional<double[]>>> shingle,
Map<Long, Entry<Long, Optional<double[]>>> featuresMap,
long endTime,
ActionListener<SinglePointFeatures> listener
) {
if (point.isPresent()) {
if (shingle.size() == shingleSize) {
shingle.remove();
}
shingle.add(new SimpleImmutableEntry<>(endTime, point.get()));
shingle.clear();
getFullShingleEndTimes(endTime, getDetectorIntervalInMilliseconds(detector))
.filter(time -> featuresMap.containsKey(time))
.mapToObj(time -> featuresMap.get(time))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor. this first filter might not be needed since the map should contains results for all interval, present or absent. to be safe in the unlikely case that map is incomplete, the second get can return an empty if the key is absent using getOrDefault.

Copy link
Contributor Author

@LiuJoyceC LiuJoyceC Jul 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It depends on how we want to handle this case: https://github.com/opendistro-for-elasticsearch/anomaly-detection/blob/master/src/main/java/com/amazon/opendistroforelasticsearch/ad/feature/SearchFeatureDao.java#L211-L214. This is the case that could cause some of the times to be missing from featuresMap at this line.

Is aggs == null in the search response equivalent to each queried time range having no data? In other words, do we want to treat Collections.emptyList() equivalent to [Optional.empty(), Optional.empty(), ..., Optional.empty()]? In the case of Collections.emptyList(), do we want to cache the null values for each of the queried time ranges so they will not get re-queried?

If yes, it might be cleaner to handle this logic in the callback passed to searchFeatureDao, so that the logic for determining shingle values based on the search response is located in one function rather than spread across multiple functions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i haven't seen or know that happens in practice. if that ever happens, caching the results is ok. what i was suggesting is just simplify the implementation to one line like stream.maptoobject(time -> map.getOrDefault(time, <time, empty()>)).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright, I will change the implementation to cache results for the missing ranges when the response is an empty list.

.forEach(e -> shingle.add(e));

if (featuresMap.containsKey(endTime)) {
getProcessedFeatures(shingle, detector, endTime, listener);
} else {
listener.onResponse(new SinglePointFeatures(Optional.empty(), Optional.empty()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getProcessedFeatures should be able to handle both branches.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you suggesting to pass featuresMap as a parameter into getProcessedFeatures and check this condition in there?

The condition featuresMap.containsKey(endTime) is the simplest/most concise way to ensure the conditions !shingle.isEmpty() and shingle.getLast().getKey() ≈ endTime (where means within half an interval away as discussed, which basically means that the current point is present). The reason this check is being done in updateUnprocessedFeatures is because this check is only needed if updateUnprocessedFeatures is invoked, and not needed if getProcessedFeatures is directly invoked by getCurrentFeatures (in which case the missingRanges.size() > 0 already ensures the above conditions, so checking featuresMap is unnecessary).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, featuremap is not needed. by the end of line 176, the collection shingle should contain exactly shingle sized data points, present or absent, the last being the most ending around endTime. getProcessedFeatures can output the same results for both if and else branches.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the current implementation (where results are not added to the shingle if searchFeatureDao returns Collections.emptyList()), the shingle is not guaranteed to contain an entry around endTime, which is why this check is currently being done.

However, per your response on the comment below, if we go ahead and cache all missing ranges when searchFeatureDao returns Collections.emptyList(), then the shingle will be guaranteed to have the entry at or near endTime, and then checking the featureMap will no longer be needed.

}
}

private void getProcessedFeatures(
Deque<Entry<Long, double[]>> shingle,
Deque<Entry<Long, Optional<double[]>>> shingle,
AnomalyDetector detector,
long endTime,
ActionListener<SinglePointFeatures> listener
) {

double[][] currentPoints = filterAndFill(shingle, endTime, detector);
Optional<double[]> currentPoint = Optional.ofNullable(shingle.peekLast()).map(Entry::getValue);
Optional<double[]> currentPoint = shingle.peekLast().getValue();
listener
.onResponse(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original code is easier to read. Also, you do "currentPoint.map(point -> filterAndFill(shingle, endTime, detector))" without using currentPoint in filterAndFill, which looks strange.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change was made to address the comments above from wnbts to simplify the code here to not use if else to ensure currentPoint is not an empty Optional. The way this is written is such that if currentPoint is an empty Optional, it triggers the orElse line, so that if the currentPoint is empty, we don't end up with a non-empty array of points in the SinglePointFeatures (if that were to happen, it would be an unexpected change in behavior from before).

The reason the original code didn't need to check that was because currentPoint wasn't an Optional before. Now that we are caching empty responses from the search request to ensure we don't re-query missing data, the points in the shingle are wrapped in Optionals.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another unexpected change in behavior that this is avoiding is running filterAndFill on a shingle where the current point is not present. This would result in the shingle filling in the missing current point with older data. In the original code, there were checks in getCurrentFeatures and updateUnprocessedFeature to ensure the current point is not missing. These checks were removed to address comments above about simplifying the logic.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code was modified to no longer have an unused param point in a function, and now currentPoint.isPresent() is explicitly checked to make the intention clear. Does this address the concern?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, thanks for the change.

Optional
.ofNullable(currentPoints)
currentPoint
.map(point -> filterAndFill(shingle, endTime, detector))
.map(points -> new SinglePointFeatures(currentPoint, Optional.of(batchShingle(points, shingleSize)[0])))
.orElse(new SinglePointFeatures(currentPoint, Optional.empty()))
);
}

private double[][] filterAndFill(Deque<Entry<Long, double[]>> shingle, long endTime, AnomalyDetector detector) {
long intervalMilli = ((IntervalTimeConfiguration) detector.getDetectionInterval()).toDuration().toMillis();
private double[][] filterAndFill(Deque<Entry<Long, Optional<double[]>>> shingle, long endTime, AnomalyDetector detector) {
Deque<Entry<Long, Optional<double[]>>> filteredShingle = shingle
.stream()
.filter(e -> e.getValue().isPresent())
.collect(Collectors.toCollection(ArrayDeque::new));
double[][] result = null;
if (shingle.size() >= shingleSize - maxMissingPoints) {
TreeMap<Long, double[]> search = new TreeMap<>(shingle.stream().collect(Collectors.toMap(Entry::getKey, Entry::getValue)));
result = IntStream.rangeClosed(1, shingleSize).mapToLong(i -> endTime - (shingleSize - i) * intervalMilli).mapToObj(t -> {
Optional<Entry<Long, double[]>> after = Optional.ofNullable(search.ceilingEntry(t));
Optional<Entry<Long, double[]>> before = Optional.ofNullable(search.floorEntry(t));
return after
.filter(a -> Math.abs(t - a.getKey()) <= before.map(b -> Math.abs(t - b.getKey())).orElse(Long.MAX_VALUE))
.map(Optional::of)
.orElse(before)
.filter(e -> Math.abs(t - e.getKey()) < intervalMilli * maxNeighborDistance)
.map(Entry::getValue)
.orElse(null);
}).filter(d -> d != null).toArray(double[][]::new);
if (filteredShingle.size() >= shingleSize - maxMissingPoints) {
long maxMillisecondsDifference = maxNeighborDistance * getDetectorIntervalInMilliseconds(detector);
result = getNearbyPointsForShingle(detector, filteredShingle, endTime, maxMillisecondsDifference)
.map(e -> e.getValue().getValue().orElse(null))
.filter(d -> d != null)
.toArray(double[][]::new);

if (result.length < shingleSize) {
result = null;
}
}
return result;
}

private Stream<Entry<Long, Entry<Long, Optional<double[]>>>> getNearbyPointsForShingle(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have some documentation of this method since the return value is nested and not easy to understand?

AnomalyDetector detector,
Deque<Entry<Long, Optional<double[]>>> shingle,
long endTime,
long maxMillisecondsDifference
) {
long intervalMilli = getDetectorIntervalInMilliseconds(detector);
TreeMap<Long, Optional<double[]>> search = new TreeMap<>(
shingle.stream().collect(Collectors.toMap(Entry::getKey, Entry::getValue))
);
return getFullShingleEndTimes(endTime, intervalMilli).mapToObj(t -> {
Optional<Entry<Long, Optional<double[]>>> after = Optional.ofNullable(search.ceilingEntry(t));
Optional<Entry<Long, Optional<double[]>>> before = Optional.ofNullable(search.floorEntry(t));
return after
.filter(a -> Math.abs(t - a.getKey()) <= before.map(b -> Math.abs(t - b.getKey())).orElse(Long.MAX_VALUE))
.map(Optional::of)
.orElse(before)
.filter(e -> Math.abs(t - e.getKey()) < maxMillisecondsDifference)
.map(e -> new SimpleImmutableEntry<>(t, e));
}).filter(Optional::isPresent).map(Optional::get);
}

private long getDetectorIntervalInMilliseconds(AnomalyDetector detector) {
return ((IntervalTimeConfiguration) detector.getDetectionInterval()).toDuration().toMillis();
}

private LongStream getFullShingleEndTimes(long endTime, long intervalMilli) {
return LongStream.rangeClosed(1, shingleSize).map(i -> endTime - (shingleSize - i) * intervalMilli);
}

/**
* Provides data for cold-start training.
*
Expand Down Expand Up @@ -367,7 +419,7 @@ public void getPreviewFeatures(AnomalyDetector detector, long startMilli, long e
private Entry<List<Entry<Long, Long>>, Integer> getSampleRanges(AnomalyDetector detector, long startMilli, long endMilli) {
long start = truncateToMinute(startMilli);
long end = truncateToMinute(endMilli);
long bucketSize = ((IntervalTimeConfiguration) detector.getDetectionInterval()).toDuration().toMillis();
long bucketSize = getDetectorIntervalInMilliseconds(detector);
int numBuckets = (int) Math.floor((end - start) / (double) bucketSize);
int numSamples = (int) Math.max(Math.min(numBuckets * previewSampleRate, maxPreviewSamples), 1);
int stride = (int) Math.max(1, Math.floor((double) numBuckets / numSamples));
Expand Down Expand Up @@ -455,9 +507,9 @@ private long truncateToMinute(long epochMillis) {
}

public int getShingleSize(String detectorId) {
Deque<Entry<Long, double[]>> shingle = detectorIdsToTimeShingles.get(detectorId);
Deque<Entry<Long, Optional<double[]>>> shingle = detectorIdsToTimeShingles.get(detectorId);
if (shingle != null) {
return shingle.size();
return Math.toIntExact(shingle.stream().filter(entry -> entry.getValue().isPresent()).count());
} else {
return -1;
}
Expand Down
Loading