Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

record error and execution start/end time in AD result; handle except… #59

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import java.time.Instant;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;

import static com.amazon.opendistroforelasticsearch.ad.AnomalyDetectorPlugin.AD_THREAD_POOL_NAME;
Expand All @@ -69,10 +70,15 @@
*/
public class AnomalyDetectorJobRunner implements ScheduledJobRunner {
private static final Logger log = LogManager.getLogger(AnomalyDetectorJobRunner.class);

private static final long JOB_RUN_BUFFER_IN_MILLISECONDS = 10 * 1000;
private static AnomalyDetectorJobRunner INSTANCE;
private Settings settings;
private BackoffPolicy backoffPolicy;
private int maxRetryForEndRunException;
private Client client;
private ThreadPool threadPool;
private AnomalyResultHandler anomalyResultHandler;
private ConcurrentHashMap<String, Integer> detectorEndRunExceptionCount;

public static AnomalyDetectorJobRunner getJobRunnerInstance() {
if (INSTANCE != null) {
Expand All @@ -87,11 +93,6 @@ public static AnomalyDetectorJobRunner getJobRunnerInstance() {
}
}

private Client client;
private ThreadPool threadPool;
private AnomalyResultHandler anomalyResultHandler;
private static final long JOB_RUN_BUFFER_IN_MILLISECONDS = 10 * 1000;

private AnomalyDetectorJobRunner() {
// Singleton class, use getJobRunnerInstance method instead of constructor
}
Expand All @@ -108,13 +109,18 @@ public void setAnomalyResultHandler(AnomalyResultHandler anomalyResultHandler) {
this.anomalyResultHandler = anomalyResultHandler;
}

public void setDetectorEndRunExceptionCount(ConcurrentHashMap<String, Integer> detectorEndRunExceptionCount) {
this.detectorEndRunExceptionCount = detectorEndRunExceptionCount;
}

public void setSettings(Settings settings) {
this.settings = settings;
this.backoffPolicy = BackoffPolicy
.exponentialBackoff(
AnomalyDetectorSettings.BACKOFF_INITIAL_DELAY.get(settings),
AnomalyDetectorSettings.MAX_RETRY_FOR_BACKOFF.get(settings)
);
this.maxRetryForEndRunException = AnomalyDetectorSettings.MAX_RETRY_FOR_END_RUN_EXCEPTION.get(settings);
}

public BackoffPolicy getBackoffPolicy() {
Expand All @@ -132,8 +138,7 @@ public void runJob(ScheduledJobParameter jobParameter, JobExecutionContext conte

Instant executionStartTime = Instant.now();
IntervalSchedule schedule = (IntervalSchedule) jobParameter.getSchedule();
Instant endTime = Instant.now();
Instant startTime = endTime.minus(schedule.getInterval(), schedule.getUnit());
Instant startTime = executionStartTime.minus(schedule.getInterval(), schedule.getUnit());

final LockService lockService = context.getLockService();

Expand All @@ -150,12 +155,12 @@ public void runJob(ScheduledJobParameter jobParameter, JobExecutionContext conte
lockService,
lock,
startTime,
endTime,
executionStartTime,
backoffPolicy.iterator()
backoffPolicy.iterator(),
0
),
exception -> {
indexAnomalyResultExceptionSilently(jobParameter, startTime, endTime, executionStartTime, exception);
indexAnomalyResultException(jobParameter, startTime, executionStartTime, exception);
throw new IllegalStateException("Failed to acquire lock for AD job: " + jobParameter.getName());
}
)
Expand All @@ -174,16 +179,15 @@ protected void runAdJob(
LockService lockService,
LockModel lock,
Instant startTime,
Instant endTime,
Instant executionStartTime,
Iterator<TimeValue> backoff
Iterator<TimeValue> backoff,
int retryTimes
) {

if (lock == null) {
indexAnomalyResultExceptionSilently(
indexAnomalyResultException(
jobParameter,
startTime,
endTime,
executionStartTime,
new AnomalyDetectionException(jobParameter.getName(), "Can't run AD job due to null lock")
);
Expand All @@ -194,43 +198,19 @@ protected void runAdJob(
AnomalyResultRequest request = new AnomalyResultRequest(
jobParameter.getName(),
startTime.toEpochMilli(),
endTime.toEpochMilli()
executionStartTime.toEpochMilli()
);
client
.execute(
AnomalyResultAction.INSTANCE,
request,
ActionListener
.wrap(
response -> {
indexAnomalyResultSilently(
jobParameter,
lockService,
lock,
startTime,
endTime,
executionStartTime,
response
);
},
exception -> {
handleAdException(
jobParameter,
lockService,
lock,
startTime,
endTime,
executionStartTime,
backoff,
exception
);
}
)
);
client.execute(AnomalyResultAction.INSTANCE, request, ActionListener.wrap(response -> {
indexAnomalyResult(jobParameter, lockService, lock, startTime, executionStartTime, response);
detectorEndRunExceptionCount.remove(jobParameter.getName());
},
exception -> {
handleAdException(jobParameter, lockService, lock, startTime, executionStartTime, backoff, retryTimes, exception);
}
));
} catch (Exception e) {
indexAnomalyResultExceptionSilently(jobParameter, startTime, endTime, executionStartTime, e);
indexAnomalyResultException(jobParameter, lockService, lock, startTime, executionStartTime, e, true);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would we have double lock release since your code in the try block can release the lock? How about adding a isLockReleasedOrExpired before release?

Copy link
Contributor Author

@ylwu-amzn ylwu-amzn Mar 13, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Recheck and find no double lock release. LockService already handles exception. Lock expire time is equals to detection interval, lock will not expire during job run.

log.error("Failed to execute AD job " + jobParameter.getName(), e);
releaseLock(jobParameter, lockService, lock);
}
}

Expand All @@ -239,65 +219,76 @@ protected void handleAdException(
LockService lockService,
LockModel lock,
Instant startTime,
Instant endTime,
Instant executionStartTime,
Iterator<TimeValue> backoff,
int retryTimes,
Exception exception
) {
String detectorId = jobParameter.getName();
if (exception instanceof EndRunException) {
log.error("EndRunException happened when executed anomaly result action for " + jobParameter.getName(), exception);
log.error("EndRunException happened when executed anomaly result action for " + detectorId, exception);

if (((EndRunException) exception).isEndNow()) {
detectorEndRunExceptionCount.remove(detectorId);
// Stop AD job if EndRunException shows we should end job now.
stopAdJobSilently(jobParameter.getName());
indexAnomalyResultExceptionSilently(
stopAdJob(detectorId);
indexAnomalyResultException(
jobParameter,
lockService,
lock,
startTime,
endTime,
executionStartTime,
"Stopped detector: " + exception.getMessage()
"Stopped detector: " + exception.getMessage(),
true
);
releaseLock(jobParameter, lockService, lock);
} else {
// retry AD job, if all retries failed or have no enough time to retry, will stop AD job.
detectorEndRunExceptionCount.compute(detectorId, (k, v) -> {
if (v == null) {
return 1;
} else if (retryTimes == 0) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was confused at the beginning when looking at this line. Wondering when retryTimes is 0. Then I realized every time a new AD job run would set retryTimes to 0. retryTimes is more of a signal to increment the count by 1 instead of the real retry times of the detector. Is my understanding correct?

I suggest the following to simplify:
First, detectorEndRunExceptionCount for a detector id is removed from the map whenever we have a successful run or an exception that is not EndRunException.
Second, every time an EndRunException exception is caught, add count. Insert the mapping if the detector id is not present. Then check if the count has reached the threshold, and stop if it is.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First, detectorEndRunExceptionCount for a detector id is removed from the map whenever we have a successful run or an exception that is not EndRunException.

Have removed from map for successful run. Will remove for not EndRunException case, this comment seems duplicate with another one.

Second, every time an EndRunException exception is caught, add count. Insert the mapping if the detector id is not present. Then check if the count has reached the threshold, and stop if it is.

We can't do this as we have backoff retry now. If we do this, for the same AD job run, if we backoff retry multiple times, the detectorEndRunExceptionCount will increase count and may terminate current job immediately when the count reach limit.

return v + 1;
} else {
return v;
}
});
// if AD job failed consecutively due to EndRunException and failed times exceeds upper limit, will stop AD job
if (detectorEndRunExceptionCount.get(detectorId) > maxRetryForEndRunException) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After much thought, I am still inclined to remove the backoff retry because we don't have a clear use case a quick retry is needed and there are complications for the backoff retry. This is the 3 cases where EndRunException is thrown with endNow being false:

  • training data for cold start not available
  • cold start cannot succeed
  • unknown prediction error

Two of the causes are related to cold start. Let's discuss it one by one:

  • training data for cold start not available: the situation won't improve within seconds.
  • cold start cannot succeed and unknown prediction error: this indicates some bugs of our side and system heavy load. Quick retry does not help.

Some complications of retry quickly:

  • Cold start is expensive as it runs 24 queries, initializing models, and saving checkpoints. Retry cold start quickly can impose performance pressure.
  • 10 seconds buffer might not be enough to prevent multiple cold starts (started by backoff retry and AD job) at the same time as cold start cannot finish within a few seconds. Lai once said it takes about 20~30 seconds. The process can take longer if customers' feature queries are complex and we have more features. Hanguang's cancel would only apply if there are prediction queries for current window running. Cancel won't happen when current window queries finish, but we are running 24 queries for cold start, initializing models and saving checkpoints.
  • We are using our own threadpool and it is limited by 1/4 cores. Retry uses threadpool and may slow down other jobs' running.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your analysis. Here the backoff retry is to resolve the transient exception.

Code start training data is not transient exception. We need to build finer granularity exception later to distinguish non-retryable and retryable exception. If we can't know which exception is transient and retryable in AnomalyResultTransportAction, I'm ok to remove the backoff retry now to avoid performance issue. But that's a tradeoff, as without retrying, some transient exception will cause current job run fail and if there is anomaly, user will miss it and will not get alerting notification. Sometimes missing anomaly&notification is not acceptable. For example, current detection interval is 1hour, and there should be anomaly in current interval, some transient exception may fail current AD job, so no anomaly found and user never know it. Then we start next AD job, maybe there is no anomaly in next 1hour, user will never know something wrong happened. In one word, this is some tradeoff between protecting our performance, user experience and what we can do currently.

So, can you help confirm if we can know which exception is retryable in AnomalyResultTransportAction? If we can't, will remove this backoff retry.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we discussed offline, we can define some exceptions like fail to get RCF/threshold model result as retryable exception. Such exceptions are transient and maybe resolved by some backoff retry. Will add todo now and we can change it later.

stopAdJobForEndRunException(
jobParameter,
lockService,
lock,
startTime,
executionStartTime,
(EndRunException) exception
);
return;
}
// retry AD job, if all retries failed or have no enough time to retry, will record exception in AD result.
Schedule schedule = jobParameter.getSchedule();
long nextExecutionTime = jobParameter.getSchedule().getNextExecutionTime(executionStartTime).toEpochMilli();
long now = Instant.now().toEpochMilli();
long leftTime = nextExecutionTime - now - JOB_RUN_BUFFER_IN_MILLISECONDS;
long usedTime = now - executionStartTime.toEpochMilli();

if (!backoff.hasNext()) {
stopAdJobForEndRunException(
TimeValue nextRunDelay = backoff.hasNext() ? backoff.next() : null;

if (nextRunDelay != null && leftTime > (usedTime + nextRunDelay.getMillis())) {
threadPool
.schedule(
() -> runAdJob(jobParameter, lockService, lock, startTime, executionStartTime, backoff, retryTimes + 1),
nextRunDelay,
ThreadPool.Names.SAME
);
} else {
indexAnomalyResultException(
jobParameter,
lockService,
lock,
startTime,
endTime,
executionStartTime,
(EndRunException) exception
exception.getMessage(),
true
);
} else {
if (backoff.hasNext()) {
TimeValue nextRunDelay = backoff.next();
if (leftTime > (usedTime + nextRunDelay.getMillis())) {
threadPool
.schedule(
() -> runAdJob(jobParameter, lockService, lock, startTime, endTime, executionStartTime, backoff),
nextRunDelay,
ThreadPool.Names.SAME
);
} else {
stopAdJobForEndRunException(
jobParameter,
lockService,
lock,
startTime,
endTime,
executionStartTime,
(EndRunException) exception
);
}
}
}
}
} else {
Expand All @@ -307,8 +298,7 @@ protected void handleAdException(
} else {
log.error("Failed to execute anomaly result action for " + jobParameter.getName(), exception);
}
indexAnomalyResultExceptionSilently(jobParameter, startTime, endTime, executionStartTime, exception);
releaseLock(jobParameter, lockService, lock);
indexAnomalyResultException(jobParameter, lockService, lock, startTime, executionStartTime, exception, true);
}
}

Expand All @@ -317,22 +307,23 @@ private void stopAdJobForEndRunException(
LockService lockService,
LockModel lock,
Instant startTime,
Instant endTime,
Instant executionStartTime,
EndRunException exception
) {
stopAdJobSilently(jobParameter.getName());
indexAnomalyResultExceptionSilently(
detectorEndRunExceptionCount.remove(jobParameter.getName());
stopAdJob(jobParameter.getName());
indexAnomalyResultException(
jobParameter,
lockService,
lock,
startTime,
endTime,
executionStartTime,
"Stopped detector: " + exception.getMessage()
"Stopped detector: " + exception.getMessage(),
true
);
releaseLock(jobParameter, lockService, lock);
}

private void stopAdJobSilently(String detectorId) {
private void stopAdJob(String detectorId) {
try {
GetRequest getRequest = new GetRequest(AnomalyDetectorJob.ANOMALY_DETECTOR_JOB_INDEX).id(detectorId);

Expand Down Expand Up @@ -380,19 +371,18 @@ private void stopAdJobSilently(String detectorId) {
}
}

private void indexAnomalyResultSilently(
private void indexAnomalyResult(
ScheduledJobParameter jobParameter,
LockService lockService,
LockModel lock,
Instant startTime,
Instant endTime,
Instant executionStartTime,
AnomalyResultResponse response
) {
try {
IntervalTimeConfiguration windowDelay = (IntervalTimeConfiguration) ((AnomalyDetectorJob) jobParameter).getWindowDelay();
Instant dataStartTime = startTime.minus(windowDelay.getInterval(), windowDelay.getUnit());
Instant dataEndTime = endTime.minus(windowDelay.getInterval(), windowDelay.getUnit());
Instant dataEndTime = executionStartTime.minus(windowDelay.getInterval(), windowDelay.getUnit());

if (response.getError() != null) {
log.info("Anomaly result action run successfully for {} with error {}", jobParameter.getName(), response.getError());
Expand All @@ -417,34 +407,47 @@ private void indexAnomalyResultSilently(
}
}

private void indexAnomalyResultExceptionSilently(
private void indexAnomalyResultException(
ScheduledJobParameter jobParameter,
Instant startTime,
Instant endTime,
Instant executionStartTime,
Exception exception
) {
indexAnomalyResultException(jobParameter, null, null, startTime, executionStartTime, exception, false);
}

private void indexAnomalyResultException(
ScheduledJobParameter jobParameter,
LockService lockService,
LockModel lock,
Instant startTime,
Instant executionStartTime,
Exception exception,
boolean releaseLock
) {
try {
String errorMessage = exception instanceof AnomalyDetectionException
? exception.getMessage()
: Throwables.getStackTraceAsString(exception);
indexAnomalyResultExceptionSilently(jobParameter, startTime, endTime, executionStartTime, errorMessage);
indexAnomalyResultException(jobParameter, lockService, lock, startTime, executionStartTime, errorMessage, releaseLock);
} catch (Exception e) {
log.error("Failed to index anomaly result for " + jobParameter.getName(), e);
}
}

private void indexAnomalyResultExceptionSilently(
private void indexAnomalyResultException(
ScheduledJobParameter jobParameter,
LockService lockService,
LockModel lock,
Instant startTime,
Instant endTime,
Instant executionStartTime,
String errorMessage
String errorMessage,
boolean releaseLock
) {
try {
IntervalTimeConfiguration windowDelay = (IntervalTimeConfiguration) ((AnomalyDetectorJob) jobParameter).getWindowDelay();
Instant dataStartTime = startTime.minus(windowDelay.getInterval(), windowDelay.getUnit());
Instant dataEndTime = endTime.minus(windowDelay.getInterval(), windowDelay.getUnit());
Instant dataEndTime = executionStartTime.minus(windowDelay.getInterval(), windowDelay.getUnit());

AnomalyResult anomalyResult = new AnomalyResult(
jobParameter.getName(),
Expand All @@ -461,6 +464,10 @@ private void indexAnomalyResultExceptionSilently(
anomalyResultHandler.indexAnomalyResult(anomalyResult);
} catch (Exception e) {
log.error("Failed to index anomaly result for " + jobParameter.getName(), e);
} finally {
if (releaseLock) {
releaseLock(jobParameter, lockService, lock);
}
}
}

Expand Down
Loading