Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Adds findings in bucket level monitor #651

Merged
merged 1 commit into from
Nov 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import org.opensearch.action.delete.DeleteRequest
import org.opensearch.action.index.IndexRequest
import org.opensearch.action.search.SearchRequest
import org.opensearch.action.search.SearchResponse
import org.opensearch.action.support.WriteRequest
import org.opensearch.alerting.alerts.AlertIndices
import org.opensearch.alerting.model.ActionRunResult
import org.opensearch.alerting.model.QueryLevelTriggerRunResult
Expand Down Expand Up @@ -236,7 +237,8 @@ class AlertService(
monitor: Monitor,
trigger: BucketLevelTrigger,
currentAlerts: MutableMap<String, Alert>,
aggResultBuckets: List<AggregationResultBucket>
aggResultBuckets: List<AggregationResultBucket>,
findings: List<String>
): Map<AlertCategory, List<Alert>> {
val dedupedAlerts = mutableListOf<Alert>()
val newAlerts = mutableListOf<Alert>()
Expand All @@ -256,7 +258,8 @@ class AlertService(
monitor = monitor, trigger = trigger, startTime = currentTime,
lastNotificationTime = null, state = Alert.State.ACTIVE, errorMessage = null,
errorHistory = mutableListOf(), actionExecutionResults = mutableListOf(),
schemaVersion = IndexUtils.alertIndexSchemaVersion, aggregationResultBucket = aggAlertBucket
schemaVersion = IndexUtils.alertIndexSchemaVersion, aggregationResultBucket = aggAlertBucket,
findingIds = findings
)
newAlerts.add(newAlert)
}
Expand Down Expand Up @@ -381,7 +384,7 @@ class AlertService(
// If the index request is to be retried, the Alert is saved separately as well so that its relative ordering is maintained in
// relation to index request in the retried bulk request for when it eventually succeeds.
retryPolicy.retry(logger, listOf(RestStatus.TOO_MANY_REQUESTS)) {
val bulkRequest = BulkRequest().add(requestsToRetry)
val bulkRequest = BulkRequest().add(requestsToRetry).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
val bulkResponse: BulkResponse = client.suspendUntil { client.bulk(bulkRequest, it) }
// TODO: This is only used to retrieve the retryCause, could instead fetch it from the bulkResponse iteration below
val failedResponses = (bulkResponse.items ?: arrayOf()).filter { it.isFailed }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,49 @@
package org.opensearch.alerting

import org.apache.logging.log4j.LogManager
import org.opensearch.action.bulk.BulkRequest
import org.opensearch.action.bulk.BulkResponse
import org.opensearch.action.index.IndexRequest
import org.opensearch.action.search.SearchRequest
import org.opensearch.action.search.SearchResponse
import org.opensearch.action.support.WriteRequest
import org.opensearch.alerting.model.ActionRunResult
import org.opensearch.alerting.model.BucketLevelTriggerRunResult
import org.opensearch.alerting.model.InputRunResults
import org.opensearch.alerting.model.MonitorRunResult
import org.opensearch.alerting.opensearchapi.InjectorContextElement
import org.opensearch.alerting.opensearchapi.retry
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.opensearchapi.withClosableContext
import org.opensearch.alerting.script.BucketLevelTriggerExecutionContext
import org.opensearch.alerting.util.defaultToPerExecutionAction
import org.opensearch.alerting.util.getActionExecutionPolicy
import org.opensearch.alerting.util.getBucketKeysHash
import org.opensearch.alerting.util.getCombinedTriggerRunResult
import org.opensearch.common.xcontent.LoggingDeprecationHandler
import org.opensearch.common.xcontent.ToXContent
import org.opensearch.common.xcontent.XContentBuilder
import org.opensearch.common.xcontent.XContentType
import org.opensearch.commons.alerting.model.Alert
import org.opensearch.commons.alerting.model.BucketLevelTrigger
import org.opensearch.commons.alerting.model.Finding
import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.commons.alerting.model.SearchInput
import org.opensearch.commons.alerting.model.action.AlertCategory
import org.opensearch.commons.alerting.model.action.PerAlertActionScope
import org.opensearch.commons.alerting.model.action.PerExecutionActionScope
import org.opensearch.commons.alerting.util.string
import org.opensearch.index.query.BoolQueryBuilder
import org.opensearch.index.query.QueryBuilders
import org.opensearch.rest.RestStatus
import org.opensearch.script.Script
import org.opensearch.script.ScriptType
import org.opensearch.script.TemplateScript
import org.opensearch.search.aggregations.AggregatorFactories
import org.opensearch.search.aggregations.bucket.composite.CompositeAggregationBuilder
import org.opensearch.search.builder.SearchSourceBuilder
import java.time.Instant
import java.util.UUID

object BucketLevelMonitorRunner : MonitorRunner() {
private val logger = LogManager.getLogger(javaClass)
Expand Down Expand Up @@ -116,11 +141,20 @@ object BucketLevelMonitorRunner : MonitorRunner() {
* existing Alerts in a way the user can easily view them since they will have all been moved to the history index.
*/
if (triggerResults[trigger.id]?.error != null) continue

val findings =
if (monitor.triggers.size == 1 && monitor.dataSources.findingsEnabled == true) createFindings(
triggerResult,
monitor,
monitorCtx,
periodStart,
periodEnd,
!dryrun && monitor.id != Monitor.NO_ID
)
else emptyList()
// TODO: Should triggerResult's aggregationResultBucket be a list? If not, getCategorizedAlertsForBucketLevelMonitor can
// be refactored to use a map instead
val categorizedAlerts = monitorCtx.alertService!!.getCategorizedAlertsForBucketLevelMonitor(
monitor, trigger, currentAlertsForTrigger, triggerResult.aggregationResultBuckets.values.toList()
monitor, trigger, currentAlertsForTrigger, triggerResult.aggregationResultBuckets.values.toList(), findings
).toMutableMap()
val dedupedAlerts = categorizedAlerts.getOrDefault(AlertCategory.DEDUPED, emptyList())
var newAlerts = categorizedAlerts.getOrDefault(AlertCategory.NEW, emptyList())
Expand Down Expand Up @@ -287,6 +321,117 @@ object BucketLevelMonitorRunner : MonitorRunner() {
return monitorResult.copy(inputResults = firstPageOfInputResults, triggerResults = triggerResults)
}

private suspend fun createFindings(
triggerResult: BucketLevelTriggerRunResult,
monitor: Monitor,
monitorCtx: MonitorRunnerExecutionContext,
periodStart: Instant,
periodEnd: Instant,
shouldCreateFinding: Boolean
): List<String> {
monitor.inputs.forEach { input ->
if (input is SearchInput) {
val bucketValues: Set<String> = triggerResult.aggregationResultBuckets.keys
val query = input.query
var fieldName = ""
var grouByFields = 0 // if number of fields used to group by > 1 we won't calculate findings
for (aggFactory in (query.aggregations() as AggregatorFactories.Builder).aggregatorFactories) {
val sources = (aggFactory as CompositeAggregationBuilder).sources()
for (source in sources) {
if (grouByFields > 0) {
return listOf()
}
grouByFields++
fieldName = source.field()
}
}
if (fieldName != "") {
val searchParams = mapOf(
"period_start" to periodStart.toEpochMilli(),
"period_end" to periodEnd.toEpochMilli()
)
val searchSource = monitorCtx.scriptService!!.compile(
Script(
ScriptType.INLINE, Script.DEFAULT_TEMPLATE_LANG,
query.toString(), searchParams
),
TemplateScript.CONTEXT
)
.newInstance(searchParams)
.execute()
val sr = SearchRequest(*input.indices.toTypedArray())
XContentType.JSON.xContent().createParser(monitorCtx.xContentRegistry, LoggingDeprecationHandler.INSTANCE, searchSource)
.use {
val source = SearchSourceBuilder.fromXContent(it)
val queryBuilder = if (input.query.query() == null) BoolQueryBuilder()
else QueryBuilders.boolQuery().must(source.query())
queryBuilder.filter(QueryBuilders.termsQuery(fieldName, bucketValues))
sr.source().query(queryBuilder)
}
val searchResponse: SearchResponse = monitorCtx.client!!.suspendUntil { monitorCtx.client!!.search(sr, it) }
return createFindingPerIndex(searchResponse, monitor, monitorCtx, shouldCreateFinding)
}
}
}
return listOf()
}

private suspend fun createFindingPerIndex(
searchResponse: SearchResponse,
monitor: Monitor,
monitorCtx: MonitorRunnerExecutionContext,
shouldCreateFinding: Boolean
): List<String> {
val docIdsByIndexName: MutableMap<String, MutableList<String>> = mutableMapOf()
for (hit in searchResponse.hits.hits) {
val ids = docIdsByIndexName.getOrDefault(hit.index, mutableListOf())
ids.add(hit.id)
docIdsByIndexName[hit.index] = ids
}
val findings = mutableListOf<String>()
var requestsToRetry: MutableList<IndexRequest> = mutableListOf()
docIdsByIndexName.entries.forEach { it ->
run {
val finding = Finding(
id = UUID.randomUUID().toString(),
relatedDocIds = it.value,
monitorId = monitor.id,
monitorName = monitor.name,
index = it.key,
timestamp = Instant.now(),
docLevelQueries = listOf()
)

val findingStr = finding.toXContent(XContentBuilder.builder(XContentType.JSON.xContent()), ToXContent.EMPTY_PARAMS).string()
logger.debug("Findings: $findingStr")
if (shouldCreateFinding) {
val indexRequest = IndexRequest(monitor.dataSources.findingsIndex)
.source(findingStr, XContentType.JSON)
.id(finding.id)
.routing(finding.id)
requestsToRetry.add(indexRequest)
}
findings.add(finding.id)
}
}
if (requestsToRetry.isEmpty()) return listOf()
monitorCtx.retryPolicy!!.retry(logger, listOf(RestStatus.TOO_MANY_REQUESTS)) {
val bulkRequest = BulkRequest().add(requestsToRetry).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
val bulkResponse: BulkResponse = monitorCtx.client!!.suspendUntil { monitorCtx.client!!.bulk(bulkRequest, it) }
requestsToRetry = mutableListOf()
val findingsBeingRetried = mutableListOf<Alert>()
bulkResponse.items.forEach { item ->
if (item.isFailed) {
if (item.status() == RestStatus.TOO_MANY_REQUESTS) {
requestsToRetry.add(bulkRequest.requests()[item.itemId] as IndexRequest)
findingsBeingRetried.add(findingsBeingRetried[item.itemId])
}
}
}
}
return findings
}

private fun getActionContextForAlertCategory(
alertCategory: AlertCategory,
alert: Alert,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,9 @@ class AlertServiceTests : OpenSearchTestCase() {
)
)

val categorizedAlerts = alertService.getCategorizedAlertsForBucketLevelMonitor(monitor, trigger, currentAlerts, aggResultBuckets)
val categorizedAlerts = alertService.getCategorizedAlertsForBucketLevelMonitor(
monitor, trigger, currentAlerts, aggResultBuckets, emptyList()
)
// Completed Alerts are what remains in currentAlerts after categorization
val completedAlerts = currentAlerts.values.toList()
assertEquals(listOf<Alert>(), categorizedAlerts[AlertCategory.DEDUPED])
Expand Down Expand Up @@ -115,7 +117,9 @@ class AlertServiceTests : OpenSearchTestCase() {
)
)

val categorizedAlerts = alertService.getCategorizedAlertsForBucketLevelMonitor(monitor, trigger, currentAlerts, aggResultBuckets)
val categorizedAlerts = alertService.getCategorizedAlertsForBucketLevelMonitor(
monitor, trigger, currentAlerts, aggResultBuckets, emptyList()
)
// Completed Alerts are what remains in currentAlerts after categorization
val completedAlerts = currentAlerts.values.toList()
assertAlertsExistForBucketKeys(
Expand All @@ -142,7 +146,9 @@ class AlertServiceTests : OpenSearchTestCase() {
)
val aggResultBuckets = listOf<AggregationResultBucket>()

val categorizedAlerts = alertService.getCategorizedAlertsForBucketLevelMonitor(monitor, trigger, currentAlerts, aggResultBuckets)
val categorizedAlerts = alertService.getCategorizedAlertsForBucketLevelMonitor(
monitor, trigger, currentAlerts, aggResultBuckets, emptyList()
)
// Completed Alerts are what remains in currentAlerts after categorization
val completedAlerts = currentAlerts.values.toList()
assertEquals(listOf<Alert>(), categorizedAlerts[AlertCategory.DEDUPED])
Expand Down Expand Up @@ -174,7 +180,9 @@ class AlertServiceTests : OpenSearchTestCase() {
)
)

val categorizedAlerts = alertService.getCategorizedAlertsForBucketLevelMonitor(monitor, trigger, currentAlerts, aggResultBuckets)
val categorizedAlerts = alertService.getCategorizedAlertsForBucketLevelMonitor(
monitor, trigger, currentAlerts, aggResultBuckets, emptyList()
)
// Completed Alerts are what remains in currentAlerts after categorization
val completedAlerts = currentAlerts.values.toList()
assertAlertsExistForBucketKeys(listOf(listOf("b")), categorizedAlerts[AlertCategory.DEDUPED] ?: error("Deduped alerts not found"))
Expand All @@ -198,7 +206,9 @@ class AlertServiceTests : OpenSearchTestCase() {
)
)

val categorizedAlerts = alertService.getCategorizedAlertsForBucketLevelMonitor(monitor, trigger, currentAlerts, aggResultBuckets)
val categorizedAlerts = alertService.getCategorizedAlertsForBucketLevelMonitor(
monitor, trigger, currentAlerts, aggResultBuckets, emptyList()
)
// Completed Alerts are what remains in currentAlerts after categorization
val completedAlerts = currentAlerts.values.toList()
assertAlertsExistForBucketKeys(listOf(listOf("a")), categorizedAlerts[AlertCategory.DEDUPED] ?: error("Deduped alerts not found"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -779,7 +779,8 @@ abstract class AlertingRestTestCase : ODFERestTestCase() {
"""
"properties" : {
"test_strict_date_time" : { "type" : "date", "format" : "strict_date_time" },
"test_field" : { "type" : "keyword" }
"test_field" : { "type" : "keyword" },
"number" : { "type" : "keyword" }
}
""".trimIndent()
)
Expand Down Expand Up @@ -866,7 +867,8 @@ abstract class AlertingRestTestCase : ODFERestTestCase() {
val testDoc = """
{
"test_strict_date_time": "$testTime",
"test_field": "$value"
"test_field": "$value",
"number": "$i"
}
""".trimIndent()
// Indexing documents with deterministic doc id to allow for easy selected deletion during testing
Expand Down
Loading