Skip to content

Commit

Permalink
Skip execution of Actions on ACKNOWLEDGED Alerts for Bucket-Level Mon…
Browse files Browse the repository at this point in the history
…itors (#158)

Signed-off-by: Mohammad Qureshi <qreshi@amazon.com>
  • Loading branch information
qreshi authored Aug 30, 2021
1 parent 2d60ede commit 8a1dc1f
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 10 deletions.
23 changes: 18 additions & 5 deletions alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -233,16 +233,15 @@ class AlertService(
val currentAlert = currentAlerts[aggAlertBucket.getBucketKeysHash()]
if (currentAlert != null) {
// De-duped Alert
dedupedAlerts.add(currentAlert.copy(lastNotificationTime = currentTime, aggregationResultBucket = aggAlertBucket))
dedupedAlerts.add(currentAlert.copy(aggregationResultBucket = aggAlertBucket))

// Remove de-duped Alert from currentAlerts since it is no longer a candidate for a potentially completed Alert
currentAlerts.remove(aggAlertBucket.getBucketKeysHash())
} else {
// New Alert
// TODO: Setting lastNotificationTime is deceiving since the actions haven't run yet, maybe it should be null here
val newAlert = Alert(
monitor = monitor, trigger = trigger, startTime = currentTime,
lastNotificationTime = currentTime, state = Alert.State.ACTIVE, errorMessage = null,
lastNotificationTime = null, state = Alert.State.ACTIVE, errorMessage = null,
errorHistory = mutableListOf(), actionExecutionResults = mutableListOf(),
schemaVersion = IndexUtils.alertIndexSchemaVersion, aggregationResultBucket = aggAlertBucket
)
Expand All @@ -266,7 +265,7 @@ class AlertService(
} ?: listOf()
}

suspend fun saveAlerts(alerts: List<Alert>, retryPolicy: BackoffPolicy) {
suspend fun saveAlerts(alerts: List<Alert>, retryPolicy: BackoffPolicy, allowUpdatingAcknowledgedAlert: Boolean = false) {
var requestsToRetry = alerts.flatMap { alert ->
// We don't want to set the version when saving alerts because the MonitorRunner has first priority when writing alerts.
// In the rare event that a user acknowledges an alert between when it's read and when it's written
Expand All @@ -281,7 +280,21 @@ class AlertService(
.id(if (alert.id != Alert.NO_ID) alert.id else null)
)
}
Alert.State.ACKNOWLEDGED, Alert.State.DELETED -> {
Alert.State.ACKNOWLEDGED -> {
// Allow ACKNOWLEDGED Alerts to be updated for Bucket-Level Monitors since de-duped Alerts can be ACKNOWLEDGED
// and updated by the MonitorRunner
if (allowUpdatingAcknowledgedAlert) {
listOf<DocWriteRequest<*>>(
IndexRequest(AlertIndices.ALERT_INDEX)
.routing(alert.monitorId)
.source(alert.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS))
.id(if (alert.id != Alert.NO_ID) alert.id else null)
)
} else {
throw IllegalStateException("Unexpected attempt to save ${alert.state} alert: $alert")
}
}
Alert.State.DELETED -> {
throw IllegalStateException("Unexpected attempt to save ${alert.state} alert: $alert")
}
Alert.State.COMPLETED -> {
Expand Down
20 changes: 15 additions & 5 deletions alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunner.kt
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() {
* Note: Index operations can fail for various reasons (such as write blocks on cluster), in such a case, the Actions
* will still execute with the Alert information in the ctx but the Alerts may not be visible.
*/
alertService.saveAlerts(dedupedAlerts, retryPolicy)
alertService.saveAlerts(dedupedAlerts, retryPolicy, allowUpdatingAcknowledgedAlert = true)
newAlerts = alertService.saveNewAlerts(newAlerts, retryPolicy)

// Store deduped and new Alerts to accumulate across pages
Expand All @@ -439,7 +439,11 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() {

for (trigger in monitor.triggers) {
val alertsToUpdate = mutableSetOf<Alert>()
val dedupedAlerts = nextAlerts[trigger.id]?.get(AlertCategory.DEDUPED) ?: mutableListOf()
// Filter ACKNOWLEDGED Alerts from the deduped list so they do not have Actions executed for them.
// New Alerts are ignored since they cannot be acknowledged yet.
val dedupedAlerts = nextAlerts[trigger.id]?.get(AlertCategory.DEDUPED)
?.filterNot { it.state == Alert.State.ACKNOWLEDGED }
?: mutableListOf()
val newAlerts = nextAlerts[trigger.id]?.get(AlertCategory.NEW) ?: mutableListOf()
val completedAlerts = nextAlerts[trigger.id]?.get(AlertCategory.COMPLETED) ?: mutableListOf()

Expand All @@ -454,7 +458,12 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() {
if (action.getActionScope() == ActionExecutionScope.Type.PER_ALERT && monitorOrTriggerError == null) {
val perAlertActionFrequency = action.actionExecutionPolicy.actionExecutionScope as PerAlertActionScope
for (alertCategory in perAlertActionFrequency.actionableAlerts) {
val alertsToExecuteActionsFor = nextAlerts[trigger.id]?.get(alertCategory) ?: mutableListOf()
var alertsToExecuteActionsFor = nextAlerts[trigger.id]?.get(alertCategory) ?: mutableListOf()
// Filter out ACKNOWLEDGED Alerts from the deduped Alerts
if (alertCategory == AlertCategory.DEDUPED) {
alertsToExecuteActionsFor = alertsToExecuteActionsFor.filterNot { it.state == Alert.State.ACKNOWLEDGED }
.toMutableList()
}
for (alert in alertsToExecuteActionsFor) {
if (isBucketLevelTriggerActionThrottled(action, alert)) continue

Expand Down Expand Up @@ -507,15 +516,16 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() {
val bucketKeysHash = alert.aggregationResultBucket!!.getBucketKeysHash()
val actionResults = triggerResult.actionResultsMap.getOrDefault(bucketKeysHash, emptyMap<String, ActionRunResult>())
alertService.updateActionResultsForBucketLevelAlert(
alert,
alert.copy(lastNotificationTime = currentTime()),
actionResults,
// TODO: Update BucketLevelTriggerRunResult.alertError() to retrieve error based on the first failed Action
monitorResult.alertError() ?: triggerResult.alertError()
)
}

// Update Alerts with action execution results
alertService.saveAlerts(updatedAlerts, retryPolicy)
// ACKNOWLEDGED Alerts should not be saved here since actions are not executed for them
alertService.saveAlerts(updatedAlerts, retryPolicy, allowUpdatingAcknowledgedAlert = false)
}

return monitorResult.copy(triggerResults = triggerResults)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import org.opensearch.alerting.model.destination.Destination
import org.opensearch.alerting.model.destination.email.Email
import org.opensearch.alerting.model.destination.email.Recipient
import org.opensearch.alerting.util.DestinationType
import org.opensearch.alerting.util.getBucketKeysHash
import org.opensearch.client.ResponseException
import org.opensearch.client.WarningFailureException
import org.opensearch.common.settings.Settings
Expand Down Expand Up @@ -1086,6 +1087,95 @@ class MonitorRunnerIT : AlertingRestTestCase() {
assertEquals("Incorrect number of completed alerts", 1, completedAlerts.size)
}

fun `test bucket-level monitor with acknowledged alert`() {
val testIndex = createTestIndex()
insertSampleTimeSerializedData(
testIndex,
listOf(
"test_value_1",
"test_value_2"
)
)

val query = QueryBuilders.rangeQuery("test_strict_date_time")
.gt("{{period_end}}||-10d")
.lte("{{period_end}}")
.format("epoch_millis")
val compositeSources = listOf(
TermsValuesSourceBuilder("test_field").field("test_field")
)
val compositeAgg = CompositeAggregationBuilder("composite_agg", compositeSources)
val input = SearchInput(indices = listOf(testIndex), query = SearchSourceBuilder().size(0).query(query).aggregation(compositeAgg))
val triggerScript = """
params.docCount > 0
""".trimIndent()

var trigger = randomBucketLevelTrigger()
trigger = trigger.copy(
bucketSelector = BucketSelectorExtAggregationBuilder(
name = trigger.id,
bucketsPathsMap = mapOf("docCount" to "_count"),
script = Script(triggerScript),
parentBucketPath = "composite_agg",
filter = null
)
)
val monitor = createMonitor(randomBucketLevelMonitor(inputs = listOf(input), enabled = false, triggers = listOf(trigger)))
executeMonitor(monitor.id, params = DRYRUN_MONITOR)

// Check created Alerts
var currentAlerts = searchAlerts(monitor)
assertEquals("Alerts not saved", 2, currentAlerts.size)
currentAlerts.forEach {
verifyAlert(it, monitor, ACTIVE)
}

// Acknowledge one of the Alerts
val alertToAcknowledge = currentAlerts.single { it.aggregationResultBucket?.getBucketKeysHash().equals("test_value_1") }
acknowledgeAlerts(monitor, alertToAcknowledge)
currentAlerts = searchAlerts(monitor)
val acknowledgedAlert = currentAlerts.single { it.state == ACKNOWLEDGED }
val activeAlert = currentAlerts.single { it.state == ACTIVE }

// Runner uses ThreadPool.CachedTimeThread thread which only updates once every 200 ms. Wait a bit to
// let lastNotificationTime change. W/o this sleep the test can result in a false negative.
Thread.sleep(200)
executeMonitor(monitor.id, params = DRYRUN_MONITOR)

// Check that the lastNotification time of the acknowledged Alert wasn't updated and the active Alert's was
currentAlerts = searchAlerts(monitor)
val currentAcknowledgedAlert = currentAlerts.single { it.state == ACKNOWLEDGED }
val currentActiveAlert = currentAlerts.single { it.state == ACTIVE }
assertEquals("Acknowledged alert was updated", acknowledgedAlert.lastNotificationTime, currentAcknowledgedAlert.lastNotificationTime)
assertTrue("Active alert was not updated", currentActiveAlert.lastNotificationTime!! > activeAlert.lastNotificationTime)

// Remove data so that both Alerts are moved into completed
deleteDataWithDocIds(
testIndex,
listOf(
"1", // test_value_1
"2" // test_value_2
)
)

// Execute Monitor and check that both Alerts were updated
Thread.sleep(200)
executeMonitor(monitor.id, params = DRYRUN_MONITOR)
currentAlerts = searchAlerts(monitor, AlertIndices.ALL_INDEX_PATTERN)
val completedAlerts = currentAlerts.filter { it.state == COMPLETED }
assertEquals("Incorrect number of completed alerts", 2, completedAlerts.size)
val previouslyAcknowledgedAlert = completedAlerts.single { it.aggregationResultBucket?.getBucketKeysHash().equals("test_value_1") }
val previouslyActiveAlert = completedAlerts.single { it.aggregationResultBucket?.getBucketKeysHash().equals("test_value_2") }
assertTrue(
"Previously acknowledged alert was not updated when it moved to completed",
previouslyAcknowledgedAlert.lastNotificationTime!! > currentAcknowledgedAlert.lastNotificationTime
)
assertTrue(
"Previously active alert was not updated when it moved to completed",
previouslyActiveAlert.lastNotificationTime!! > currentActiveAlert.lastNotificationTime
)
}

@Suppress("UNCHECKED_CAST")
fun `test bucket-level monitor with one good action and one bad action`() {
val testIndex = createTestIndex()
Expand Down

0 comments on commit 8a1dc1f

Please sign in to comment.