Skip to content

Commit

Permalink
Bulk index findings and sequentially invoke auto-correlations (opense…
Browse files Browse the repository at this point in the history
…arch-project#1355)

* Bulk index findings and sequentially invoke auto-correlations

Signed-off-by: Megha Goyal <goyamegh@amazon.com>

* Bulk index findings in batches of 10000 and make it configurable

Signed-off-by: Megha Goyal <goyamegh@amazon.com>

* Addressing review comments

Signed-off-by: Megha Goyal <goyamegh@amazon.com>

* Add integ tests to test bulk index findings

Signed-off-by: Megha Goyal <goyamegh@amazon.com>

* Fix ktlint formatting

Signed-off-by: Megha Goyal <goyamegh@amazon.com>

---------

Signed-off-by: Megha Goyal <goyamegh@amazon.com>
  • Loading branch information
goyamegh authored Feb 6, 2024
1 parent 8d59060 commit b561965
Show file tree
Hide file tree
Showing 6 changed files with 159 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,8 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
AlertingSettings.FINDING_HISTORY_MAX_DOCS,
AlertingSettings.FINDING_HISTORY_INDEX_MAX_AGE,
AlertingSettings.FINDING_HISTORY_ROLLOVER_PERIOD,
AlertingSettings.FINDING_HISTORY_RETENTION_PERIOD
AlertingSettings.FINDING_HISTORY_RETENTION_PERIOD,
AlertingSettings.FINDINGS_INDEXING_BATCH_SIZE
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,15 @@ package org.opensearch.alerting
import org.apache.logging.log4j.LogManager
import org.opensearch.ExceptionsHelper
import org.opensearch.OpenSearchStatusException
import org.opensearch.action.DocWriteRequest
import org.opensearch.action.admin.indices.refresh.RefreshAction
import org.opensearch.action.admin.indices.refresh.RefreshRequest
import org.opensearch.action.bulk.BulkRequest
import org.opensearch.action.bulk.BulkResponse
import org.opensearch.action.index.IndexRequest
import org.opensearch.action.index.IndexResponse
import org.opensearch.action.search.SearchAction
import org.opensearch.action.search.SearchRequest
import org.opensearch.action.search.SearchResponse
import org.opensearch.action.support.WriteRequest
import org.opensearch.alerting.model.DocumentExecutionContext
import org.opensearch.alerting.model.DocumentLevelTriggerRunResult
import org.opensearch.alerting.model.InputRunResults
Expand Down Expand Up @@ -273,10 +276,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
// If there are no triggers defined, we still want to generate findings
if (monitor.triggers.isEmpty()) {
if (dryrun == false && monitor.id != Monitor.NO_ID) {
docsToQueries.forEach {
val triggeredQueries = it.value.map { queryId -> idQueryMap[queryId]!! }
createFindings(monitor, monitorCtx, triggeredQueries, it.key, true)
}
createFindings(monitor, monitorCtx, docsToQueries, idQueryMap, true)
}
} else {
monitor.triggers.forEach {
Expand Down Expand Up @@ -365,7 +365,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
trigger: DocumentLevelTrigger,
monitor: Monitor,
idQueryMap: Map<String, DocLevelQuery>,
docsToQueries: Map<String, List<String>>,
docsToQueries: MutableMap<String, MutableList<String>>,
queryToDocIds: Map<DocLevelQuery, Set<String>>,
dryrun: Boolean,
workflowRunContext: WorkflowRunContext?,
Expand All @@ -374,35 +374,33 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
val triggerCtx = DocumentLevelTriggerExecutionContext(monitor, trigger)
val triggerResult = monitorCtx.triggerService!!.runDocLevelTrigger(monitor, trigger, queryToDocIds)

val findings = mutableListOf<String>()
val findingDocPairs = mutableListOf<Pair<String, String>>()
val triggerFindingDocPairs = mutableListOf<Pair<String, String>>()

// TODO: Implement throttling for findings
docsToQueries.forEach {
val triggeredQueries = it.value.map { queryId -> idQueryMap[queryId]!! }
val findingId = createFindings(
monitor,
monitorCtx,
triggeredQueries,
it.key,
!dryrun && monitor.id != Monitor.NO_ID,
executionId
)
findings.add(findingId)
val findingToDocPairs = createFindings(
monitor,
monitorCtx,
docsToQueries,
idQueryMap,
!dryrun && monitor.id != Monitor.NO_ID,
executionId
)

if (triggerResult.triggeredDocs.contains(it.key)) {
findingDocPairs.add(Pair(findingId, it.key))
findingToDocPairs.forEach {
// Only pick those entries whose docs have triggers associated with them
if (triggerResult.triggeredDocs.contains(it.second)) {
triggerFindingDocPairs.add(Pair(it.first, it.second))
}
}

val actionCtx = triggerCtx.copy(
triggeredDocs = triggerResult.triggeredDocs,
relatedFindings = findings,
relatedFindings = findingToDocPairs.map { it.first },
error = monitorResult.error ?: triggerResult.error
)

val alerts = mutableListOf<Alert>()
findingDocPairs.forEach {
triggerFindingDocPairs.forEach {
val alert = monitorCtx.alertService!!.composeDocLevelAlert(
listOf(it.first),
listOf(it.second),
Expand Down Expand Up @@ -461,51 +459,92 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
return triggerResult
}

/**
* 1. Bulk index all findings based on shouldCreateFinding flag
* 2. invoke publishFinding() to kickstart auto-correlations
* 3. Returns a list of pairs for finding id to doc id
*/
private suspend fun createFindings(
monitor: Monitor,
monitorCtx: MonitorRunnerExecutionContext,
docLevelQueries: List<DocLevelQuery>,
matchingDocId: String,
docsToQueries: MutableMap<String, MutableList<String>>,
idQueryMap: Map<String, DocLevelQuery>,
shouldCreateFinding: Boolean,
workflowExecutionId: String? = null,
): String {
// Before the "|" is the doc id and after the "|" is the index
val docIndex = matchingDocId.split("|")
): List<Pair<String, String>> {

val finding = Finding(
id = UUID.randomUUID().toString(),
relatedDocIds = listOf(docIndex[0]),
correlatedDocIds = listOf(docIndex[0]),
monitorId = monitor.id,
monitorName = monitor.name,
index = docIndex[1],
docLevelQueries = docLevelQueries,
timestamp = Instant.now(),
executionId = workflowExecutionId
)
val findingDocPairs = mutableListOf<Pair<String, String>>()
val findings = mutableListOf<Finding>()
val indexRequests = mutableListOf<IndexRequest>()

val findingStr = finding.toXContent(XContentBuilder.builder(XContentType.JSON.xContent()), ToXContent.EMPTY_PARAMS).string()
logger.debug("Findings: $findingStr")
docsToQueries.forEach {
val triggeredQueries = it.value.map { queryId -> idQueryMap[queryId]!! }

if (shouldCreateFinding) {
val indexRequest = IndexRequest(monitor.dataSources.findingsIndex)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.source(findingStr, XContentType.JSON)
.id(finding.id)
.routing(finding.id)
// Before the "|" is the doc id and after the "|" is the index
val docIndex = it.key.split("|")

monitorCtx.client!!.suspendUntil<Client, IndexResponse> {
monitorCtx.client!!.index(indexRequest, it)
val finding = Finding(
id = UUID.randomUUID().toString(),
relatedDocIds = listOf(docIndex[0]),
correlatedDocIds = listOf(docIndex[0]),
monitorId = monitor.id,
monitorName = monitor.name,
index = docIndex[1],
docLevelQueries = triggeredQueries,
timestamp = Instant.now(),
executionId = workflowExecutionId
)
findingDocPairs.add(Pair(finding.id, it.key))
findings.add(finding)

val findingStr =
finding.toXContent(XContentBuilder.builder(XContentType.JSON.xContent()), ToXContent.EMPTY_PARAMS)
.string()
logger.debug("Findings: $findingStr")

if (shouldCreateFinding) {
indexRequests += IndexRequest(monitor.dataSources.findingsIndex)
.source(findingStr, XContentType.JSON)
.id(finding.id)
.opType(DocWriteRequest.OpType.CREATE)
}
}

if (indexRequests.isNotEmpty()) {
bulkIndexFindings(monitor, monitorCtx, indexRequests)
}

try {
publishFinding(monitor, monitorCtx, finding)
findings.forEach { finding ->
publishFinding(monitor, monitorCtx, finding)
}
} catch (e: Exception) {
// suppress exception
logger.error("Optional finding callback failed", e)
}
return finding.id
return findingDocPairs
}

private suspend fun bulkIndexFindings(
monitor: Monitor,
monitorCtx: MonitorRunnerExecutionContext,
indexRequests: List<IndexRequest>
) {
indexRequests.chunked(monitorCtx.findingsIndexBatchSize).forEach { batch ->
val bulkResponse: BulkResponse = monitorCtx.client!!.suspendUntil {
bulk(BulkRequest().add(batch), it)
}
if (bulkResponse.hasFailures()) {
bulkResponse.items.forEach { item ->
if (item.isFailed) {
logger.error("Failed indexing the finding ${item.id} of monitor [${monitor.id}]")
}
}
} else {
logger.debug("[${bulkResponse.items.size}] All findings successfully indexed.")
}
}
monitorCtx.client!!.execute(RefreshAction.INSTANCE, RefreshRequest(monitor.dataSources.findingsIndex))
}

private fun publishFinding(
Expand Down Expand Up @@ -629,7 +668,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
matchingDocs.addAll(getAllDocs(hits, index, concreteIndex, monitor.id, conflictingFields))
}
} catch (e: Exception) {
logger.warn("Failed to run for shard $shard. Error: ${e.message}")
logger.error("Failed to run for shard $shard. Error: ${e.message}")
}
}
return matchingDocs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,5 +47,6 @@ data class MonitorRunnerExecutionContext(
@Volatile var destinationContextFactory: DestinationContextFactory? = null,

@Volatile var maxActionableAlertCount: Long = AlertingSettings.DEFAULT_MAX_ACTIONABLE_ALERT_COUNT,
@Volatile var indexTimeout: TimeValue? = null
@Volatile var indexTimeout: TimeValue? = null,
@Volatile var findingsIndexBatchSize: Int = AlertingSettings.DEFAULT_FINDINGS_INDEXING_BATCH_SIZE
)
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ import org.opensearch.alerting.model.WorkflowRunResult
import org.opensearch.alerting.model.destination.DestinationContextFactory
import org.opensearch.alerting.opensearchapi.retry
import org.opensearch.alerting.script.TriggerExecutionContext
import org.opensearch.alerting.settings.AlertingSettings
import org.opensearch.alerting.settings.AlertingSettings.Companion.ALERT_BACKOFF_COUNT
import org.opensearch.alerting.settings.AlertingSettings.Companion.ALERT_BACKOFF_MILLIS
import org.opensearch.alerting.settings.AlertingSettings.Companion.FINDINGS_INDEXING_BATCH_SIZE
import org.opensearch.alerting.settings.AlertingSettings.Companion.INDEX_TIMEOUT
import org.opensearch.alerting.settings.AlertingSettings.Companion.MAX_ACTIONABLE_ALERT_COUNT
import org.opensearch.alerting.settings.AlertingSettings.Companion.MOVE_ALERTS_BACKOFF_COUNT
Expand Down Expand Up @@ -169,6 +171,11 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon

monitorCtx.indexTimeout = INDEX_TIMEOUT.get(monitorCtx.settings)

monitorCtx.findingsIndexBatchSize = FINDINGS_INDEXING_BATCH_SIZE.get(monitorCtx.settings)
monitorCtx.clusterService!!.clusterSettings.addSettingsUpdateConsumer(AlertingSettings.FINDINGS_INDEXING_BATCH_SIZE) {
monitorCtx.findingsIndexBatchSize = it
}

return this
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ class AlertingSettings {

companion object {
const val DEFAULT_MAX_ACTIONABLE_ALERT_COUNT = 50L
const val DEFAULT_FINDINGS_INDEXING_BATCH_SIZE = 1000

val ALERTING_MAX_MONITORS = Setting.intSetting(
"plugins.alerting.monitor.max_monitors",
Expand Down Expand Up @@ -153,5 +154,12 @@ class AlertingSettings {
-1L,
Setting.Property.NodeScope, Setting.Property.Dynamic
)

val FINDINGS_INDEXING_BATCH_SIZE = Setting.intSetting(
"plugins.alerting.alert_findings_indexing_batch_size",
DEFAULT_FINDINGS_INDEXING_BATCH_SIZE,
1,
Setting.Property.NodeScope, Setting.Property.Dynamic
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,54 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() {
assertEquals("Didn't find findings for docs 1 and 5", 2, foundFindings.size)
}

fun `test execute monitor for bulk index findings`() {
val testIndexPrefix = "test-index-${randomAlphaOfLength(10).lowercase(Locale.ROOT)}"
val testQueryName = "wildcard-test-query"
val testIndex = createTestIndex("${testIndexPrefix}1")
val testIndex2 = createTestIndex("${testIndexPrefix}2")

val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
val testDoc = """{
"message" : "This is an error from IAD region",
"test_strict_date_time" : "$testTime",
"test_field" : "us-west-2"
}"""

val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = testQueryName, fields = listOf())
val docLevelInput = DocLevelMonitorInput("description", listOf("$testIndexPrefix*"), listOf(docQuery))

val trigger = randomDocumentLevelTrigger(condition = Script("query[name=$testQueryName]"))
val monitor = createMonitor(randomDocumentLevelMonitor(inputs = listOf(docLevelInput), triggers = listOf(trigger)))
assertNotNull(monitor.id)

for (i in 0 until 9) {
indexDoc(testIndex, i.toString(), testDoc)
}
indexDoc(testIndex2, "3", testDoc)
adminClient().updateSettings("plugins.alerting.alert_findings_indexing_batch_size", 2)

val response = executeMonitor(monitor.id)

val output = entityAsMap(response)

assertEquals(monitor.name, output["monitor_name"])
@Suppress("UNCHECKED_CAST")
val searchResult = (output.objectMap("input_results")["results"] as List<Map<String, Any>>).first()
@Suppress("UNCHECKED_CAST")
val matchingDocsToQuery = searchResult[docQuery.id] as List<String>
assertEquals("Correct search result", 10, matchingDocsToQuery.size)
assertTrue("Correct search result", matchingDocsToQuery.containsAll(listOf("1|$testIndex", "2|$testIndex", "3|$testIndex2")))

val alerts = searchAlertsWithFilter(monitor)
assertEquals("Alert saved for test monitor", 10, alerts.size)

val findings = searchFindings(monitor)
assertEquals("Findings saved for test monitor", 10, findings.size)
val foundFindings =
findings.filter { it.relatedDocIds.contains("1") || it.relatedDocIds.contains("2") || it.relatedDocIds.contains("3") }
assertEquals("Found findings for all docs", 4, foundFindings.size)
}

fun `test execute monitor with wildcard index that generates alerts and findings for NOT EQUALS query operator`() {
val testIndexPrefix = "test-index-${randomAlphaOfLength(10).lowercase(Locale.ROOT)}"
val testQueryName = "wildcard-test-query"
Expand Down

0 comments on commit b561965

Please sign in to comment.