-
Notifications
You must be signed in to change notification settings - Fork 36
Use callbacks and bug fix #83
Use callbacks and bug fix #83
Conversation
This PR includes the following changes: 1. remove classes that are not needed in jacocoExclusions since we have enough coverage for those classes. 2. Use ClientUtil instead of Elasticsearch’s client in AD job runner 3. Use one function to get the number of partitioned forests. Previously, we have redundant code in both ModelManager and ADStateManager. 4. Change ADStateManager.getAnomalyDetector to use callback. 5. Change AnomalyResultTransportAction to use callback to get features. 6. Add in AnomalyResultTransportAction to handle the case where all features have been disabled, and users' index does not exist. 7. Change get RCF and threshold result methods to use callback and add exception handling of IndexNotFoundException due to the change. Previously, getting RCF and threshold result methods won’t throw IndexNotFoundException. 8. Remove unused fields in StopDetectorTransportAction and AnomalyResultTransportAction 9. Unwrap EsRejectedExecutionException as it can be nested inside RemoteTransportException. Previously, we would not recognize EsRejectedExecutionException and thus miss anomaly results write retrying. 10. Add error in anomaly result schema.11. Fix broken tests due to my changes. Testing done: 1. unit/integration tests pass 2. do end-to-end testing and make sure my fix achieves the purpose * timeout issue is gone * when all features have been disabled or index does not exist, we will retry a few more times and disable AD jobs.
if (!detector.isPresent()) { | ||
listener.onFailure(new EndRunException(adID, "AnomalyDetector is not available.", true)); | ||
listener.onFailure(new EndRunException(adID, "AnomalyDetector is not available.", false)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not end run immediately if we can't find detector?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed it back to end run immediately.
} else if (exception instanceof IllegalArgumentException) { | ||
listener | ||
.onFailure( | ||
new EndRunException(adID, "Having trouble querying data. Maybe all of your features have been disabled.", false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about we check feature list and give user a definite answer? If that change takes time, please add some todo here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good suggestion. Done.
}, exception -> { | ||
LOG.warn(exception); | ||
if (exception instanceof IndexNotFoundException) { | ||
listener.onFailure(new EndRunException(adID, "Having trouble querying data: " + exception.getMessage(), false)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about we change to "Can't find index XXX"? So user can know clearly the trouble is missing index, rather than others like wrong query or network latency.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
exception.getMessage would return such information.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it would be good to separate changes unrelated to callbacks into other prs to speed up delivery
* @return a pair of number of partitions and size of a parition (number of trees) | ||
* @throws LimitExceededException when there is no sufficient resouce available | ||
*/ | ||
public Entry<Integer, Integer> getPartitionedForestSizes(String detectorId, int rcfNumFeatures) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we are going to refactor this method, I suggest the new api just takes a detector object, which contains all the needed info and simpler to use.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We only use use detector id as part of error message. Don't need other detector information.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if model manager takes a detector, it can compute the feature dimensions and partitioning so that will be only input needed and that will save client the work to provide a second rcfNumFeatures input. that's why i suggest doing that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make sense. done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please take a look at the recent commit: d8ea9cf
* @return the number of RCF model's partition number for adID | ||
* @throws InterruptedException when we cannot get anomaly detector object for adID before timeout | ||
* @throws LimitExceededException when there is no sufficient resource available | ||
*/ | ||
public int getPartitionNumber(String adID) throws InterruptedException { | ||
public int getPartitionNumber(String adID, Optional<AnomalyDetector> detector) throws InterruptedException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor. Why not validate detector first and just pass a detector afterwards? saving all the repetitive and unlikely handling of a non-existent detector.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point. Done.
@@ -45,6 +45,9 @@ | |||
"execution_end_time": { | |||
"type": "date", | |||
"format": "strict_date_time||epoch_millis" | |||
}, | |||
"error": { | |||
"type": "text" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question. does error message need to be searched for?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the error message is not standardized, that means it could be any string, better to make it searchable for easy operation.
… to null (opendistro-for-elasticsearch#77) 1. Change the default value of lastUpdateTime from the current timestamp to null. Before the change, creating a detector returns one lastUpdateTime, while getting a detector returns a different lastUpdateTime. The difference is confusing to the user, and they may wonder what has happened between the creating and getting detector calls. After the change, creating a detector returns no last update time, while getting a detector returns a last update time. 2. Replace the mocked threadpool in 2 tests with a real threadpool object. Testing done: 1. verified lastUpdateTime change in a cluster 2. gradle build
* @return the number of RCF model's partition number for adID | ||
* @throws InterruptedException when we cannot get anomaly detector object for adID before timeout | ||
* @throws LimitExceededException when there is no sufficient resource available | ||
*/ | ||
public int getPartitionNumber(String adID) throws InterruptedException { | ||
public int getPartitionNumber(String adID, AnomalyDetector detector) throws InterruptedException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor. Is this exception still possible?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no, it is impossible now. Will fix.
* onFailure is called IllegalArgumentException when training data is invalid | ||
* onFailure is called LimitExceededException when a limit for training is exceeded | ||
*/ | ||
public void trainModel(AnomalyDetector anomalyDetector, double[][] dataPoints, ActionListener<Void> listener) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion. it's clearer to rebase and force push to avoid showing pulled changes that are checked in.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, will try next time
* Use callbacks and bug fix This PR includes the following changes: 1. remove classes that are not needed in jacocoExclusions since we have enough coverage for those classes. 2. Use ClientUtil instead of Elasticsearch’s client in AD job runner 3. Use one function to get the number of partitioned forests. Previously, we have redundant code in both ModelManager and ADStateManager. 4. Change ADStateManager.getAnomalyDetector to use callback. 5. Change AnomalyResultTransportAction to use callback to get features. 6. Add in AnomalyResultTransportAction to handle the case where all features have been disabled, and users' index does not exist. 7. Change get RCF and threshold result methods to use callback and add exception handling of IndexNotFoundException due to the change. Previously, getting RCF and threshold result methods won’t throw IndexNotFoundException. 8. Remove unused fields in StopDetectorTransportAction and AnomalyResultTransportAction 9. Unwrap EsRejectedExecutionException as it can be nested inside RemoteTransportException. Previously, we would not recognize EsRejectedExecutionException and thus miss anomaly results write retrying. 10. Add error in anomaly result schema.11. Fix broken tests due to my changes. Testing done: 1. unit/integration tests pass 2. do end-to-end testing and make sure my fix achieves the purpose * timeout issue is gone * when all features have been disabled or index does not exist, we will retry a few more times and disable AD jobs.
Author: Kaituo Li <kaituo@amazon.com> Date: Wed Apr 15 15:45:13 2020 -0700 Add state and error to profile API (opendistro-for-elasticsearch#84) * Add state and error to profile API We want to make it easy for customers and oncalls to identify a detector’s state and error if any. This PR adds such information to our new profile API. We expect three kinds of states: -Disabled: if get ad job api says the job is disabled; -Init: if anomaly score after the last update time of the detector is larger than 0 -Running: if neither of the above applies and no exceptions. Error is populated if error of the latest anomaly result is not empty. Testing done: -manual testing during a detector’s life cycle: not created, created but not started, started, during initialization, after initialization, stopped, restarted -added unit tests to cover above scenario commit 0c33050 Author: Kaituo Li <kaituo@amazon.com> Date: Tue Apr 14 11:52:20 2020 -0700 Use callbacks and bug fix (opendistro-for-elasticsearch#83) * Use callbacks and bug fix This PR includes the following changes: 1. remove classes that are not needed in jacocoExclusions since we have enough coverage for those classes. 2. Use ClientUtil instead of Elasticsearch’s client in AD job runner 3. Use one function to get the number of partitioned forests. Previously, we have redundant code in both ModelManager and ADStateManager. 4. Change ADStateManager.getAnomalyDetector to use callback. 5. Change AnomalyResultTransportAction to use callback to get features. 6. Add in AnomalyResultTransportAction to handle the case where all features have been disabled, and users' index does not exist. 7. Change get RCF and threshold result methods to use callback and add exception handling of IndexNotFoundException due to the change. Previously, getting RCF and threshold result methods won’t throw IndexNotFoundException. 8. Remove unused fields in StopDetectorTransportAction and AnomalyResultTransportAction 9. Unwrap EsRejectedExecutionException as it can be nested inside RemoteTransportException. Previously, we would not recognize EsRejectedExecutionException and thus miss anomaly results write retrying. 10. Add error in anomaly result schema.11. Fix broken tests due to my changes. Testing done: 1. unit/integration tests pass 2. do end-to-end testing and make sure my fix achieves the purpose * timeout issue is gone * when all features have been disabled or index does not exist, we will retry a few more times and disable AD jobs.
Issue #, if available:
#78
Description of changes:
This PR includes the following changes:
Testing done:
By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.