From e7ba6d7eb2e0d8c4afd4212d612b5cf3c7fb85dc Mon Sep 17 00:00:00 2001 From: Megha Goyal Date: Tue, 26 Dec 2023 17:44:56 -0800 Subject: [PATCH 1/5] Bulk index findings and sequentially invoke auto-correlations Signed-off-by: Megha Goyal --- .../alerting/DocumentLevelMonitorRunner.kt | 131 +++++++++++------- 1 file changed, 80 insertions(+), 51 deletions(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt index 3b8e4dee7..5226b3d43 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt @@ -8,8 +8,10 @@ package org.opensearch.alerting import org.apache.logging.log4j.LogManager import org.opensearch.ExceptionsHelper import org.opensearch.OpenSearchStatusException +import org.opensearch.action.DocWriteRequest +import org.opensearch.action.bulk.BulkRequest +import org.opensearch.action.bulk.BulkResponse import org.opensearch.action.index.IndexRequest -import org.opensearch.action.index.IndexResponse import org.opensearch.action.search.SearchAction import org.opensearch.action.search.SearchRequest import org.opensearch.action.search.SearchResponse @@ -273,10 +275,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { // If there are no triggers defined, we still want to generate findings if (monitor.triggers.isEmpty()) { if (dryrun == false && monitor.id != Monitor.NO_ID) { - docsToQueries.forEach { - val triggeredQueries = it.value.map { queryId -> idQueryMap[queryId]!! } - createFindings(monitor, monitorCtx, triggeredQueries, it.key, true) - } + createFindings(monitor, monitorCtx, docsToQueries, idQueryMap, true) } } else { monitor.triggers.forEach { @@ -365,7 +364,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { trigger: DocumentLevelTrigger, monitor: Monitor, idQueryMap: Map, - docsToQueries: Map>, + docsToQueries: MutableMap>, queryToDocIds: Map>, dryrun: Boolean, workflowRunContext: WorkflowRunContext?, @@ -374,35 +373,34 @@ object DocumentLevelMonitorRunner : MonitorRunner() { val triggerCtx = DocumentLevelTriggerExecutionContext(monitor, trigger) val triggerResult = monitorCtx.triggerService!!.runDocLevelTrigger(monitor, trigger, queryToDocIds) - val findings = mutableListOf() - val findingDocPairs = mutableListOf>() + val triggerFindingDocPairs = mutableListOf>() // TODO: Implement throttling for findings - docsToQueries.forEach { - val triggeredQueries = it.value.map { queryId -> idQueryMap[queryId]!! } - val findingId = createFindings( - monitor, - monitorCtx, - triggeredQueries, - it.key, - !dryrun && monitor.id != Monitor.NO_ID, - executionId - ) - findings.add(findingId) + val findingToDocPairs = createFindings( + monitor, + monitorCtx, + docsToQueries, + idQueryMap, + !dryrun && monitor.id != Monitor.NO_ID, + executionId + ) - if (triggerResult.triggeredDocs.contains(it.key)) { - findingDocPairs.add(Pair(findingId, it.key)) + findingToDocPairs.forEach { + // Only pick those entries whose docs have triggers associated with them + if (triggerResult.triggeredDocs.contains(it.second)) { + triggerFindingDocPairs.add(Pair(it.first, it.second)) } } val actionCtx = triggerCtx.copy( triggeredDocs = triggerResult.triggeredDocs, - relatedFindings = findings, + // confirm if this is right or only trigger-able findings should be present in this list + relatedFindings = findingToDocPairs.map { it.first }, error = monitorResult.error ?: triggerResult.error ) val alerts = mutableListOf() - findingDocPairs.forEach { + triggerFindingDocPairs.forEach { val alert = monitorCtx.alertService!!.composeDocLevelAlert( listOf(it.first), listOf(it.second), @@ -461,51 +459,82 @@ object DocumentLevelMonitorRunner : MonitorRunner() { return triggerResult } + /** + * 1. Bulk index all findings based on shouldCreateFinding flag + * 2. invoke publishFinding() to kickstart auto-correlations + * 3. Returns a list of pairs for finding id to doc id + */ private suspend fun createFindings( monitor: Monitor, monitorCtx: MonitorRunnerExecutionContext, - docLevelQueries: List, - matchingDocId: String, + docsToQueries: MutableMap>, + idQueryMap: Map, shouldCreateFinding: Boolean, workflowExecutionId: String? = null, - ): String { - // Before the "|" is the doc id and after the "|" is the index - val docIndex = matchingDocId.split("|") + ): List> { - val finding = Finding( - id = UUID.randomUUID().toString(), - relatedDocIds = listOf(docIndex[0]), - correlatedDocIds = listOf(docIndex[0]), - monitorId = monitor.id, - monitorName = monitor.name, - index = docIndex[1], - docLevelQueries = docLevelQueries, - timestamp = Instant.now(), - executionId = workflowExecutionId - ) + val findingDocPairs = mutableListOf>() + val findings = mutableListOf() + val indexRequests = mutableListOf() - val findingStr = finding.toXContent(XContentBuilder.builder(XContentType.JSON.xContent()), ToXContent.EMPTY_PARAMS).string() - logger.debug("Findings: $findingStr") + docsToQueries.forEach { + val triggeredQueries = it.value.map { queryId -> idQueryMap[queryId]!! } + + // Before the "|" is the doc id and after the "|" is the index + val docIndex = it.key.split("|") - if (shouldCreateFinding) { - val indexRequest = IndexRequest(monitor.dataSources.findingsIndex) - .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) - .source(findingStr, XContentType.JSON) - .id(finding.id) - .routing(finding.id) + val finding = Finding( + id = UUID.randomUUID().toString(), + relatedDocIds = listOf(docIndex[0]), + correlatedDocIds = listOf(docIndex[0]), + monitorId = monitor.id, + monitorName = monitor.name, + index = docIndex[1], + docLevelQueries = triggeredQueries, + timestamp = Instant.now(), + executionId = workflowExecutionId + ) + findingDocPairs.add(Pair(finding.id, it.key)) + findings.add(finding) + + val findingStr = + finding.toXContent(XContentBuilder.builder(XContentType.JSON.xContent()), ToXContent.EMPTY_PARAMS) + .string() + logger.debug("Findings: $findingStr") + + if (shouldCreateFinding) { + indexRequests += IndexRequest(monitor.dataSources.findingsIndex) + .source(findingStr, XContentType.JSON) + .id(finding.id) + .routing(finding.id) + .opType(DocWriteRequest.OpType.INDEX) + } + } - monitorCtx.client!!.suspendUntil { - monitorCtx.client!!.index(indexRequest, it) + if (indexRequests.isNotEmpty()) { + val bulkResponse: BulkResponse = monitorCtx.client!!.suspendUntil { + bulk(BulkRequest().add(indexRequests).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE), it) + } + if (bulkResponse.hasFailures()) { + bulkResponse.items.forEach { item -> + if (item.isFailed) { + logger.debug("Failed indexing the finding ${item.id} of monitor [${monitor.id}]") + } + } + } else { + logger.debug("[${bulkResponse.items.size}] All findings successfully indexed.") } } try { - publishFinding(monitor, monitorCtx, finding) + findings.forEach { finding -> + publishFinding(monitor, monitorCtx, finding) + } } catch (e: Exception) { // suppress exception logger.error("Optional finding callback failed", e) } - return finding.id + return findingDocPairs } private fun publishFinding( From 03b86ae033719a4f880801b8069d8d4ba3a312eb Mon Sep 17 00:00:00 2001 From: Megha Goyal Date: Thu, 1 Feb 2024 11:18:47 -0800 Subject: [PATCH 2/5] Bulk index findings in batches of 10000 and make it configurable Signed-off-by: Megha Goyal --- .../org/opensearch/alerting/AlertingPlugin.kt | 3 +- .../alerting/DocumentLevelMonitorRunner.kt | 57 +++++++++++++------ .../alerting/MonitorRunnerExecutionContext.kt | 3 +- .../alerting/settings/AlertingSettings.kt | 8 +++ 4 files changed, 51 insertions(+), 20 deletions(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt index 0a80f33ae..e90d2193d 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt @@ -346,7 +346,8 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R AlertingSettings.FINDING_HISTORY_MAX_DOCS, AlertingSettings.FINDING_HISTORY_INDEX_MAX_AGE, AlertingSettings.FINDING_HISTORY_ROLLOVER_PERIOD, - AlertingSettings.FINDING_HISTORY_RETENTION_PERIOD + AlertingSettings.FINDING_HISTORY_RETENTION_PERIOD, + AlertingSettings.FINDINGS_INDEXING_BATCH_SIZE ) } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt index 5226b3d43..1e6b3f4f2 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt @@ -24,6 +24,7 @@ import org.opensearch.alerting.model.MonitorRunResult import org.opensearch.alerting.model.userErrorMessage import org.opensearch.alerting.opensearchapi.suspendUntil import org.opensearch.alerting.script.DocumentLevelTriggerExecutionContext +import org.opensearch.alerting.settings.AlertingSettings.Companion.FINDINGS_INDEXING_BATCH_SIZE import org.opensearch.alerting.util.AlertingException import org.opensearch.alerting.util.IndexUtils import org.opensearch.alerting.util.defaultToPerExecutionAction @@ -476,6 +477,10 @@ object DocumentLevelMonitorRunner : MonitorRunner() { val findingDocPairs = mutableListOf>() val findings = mutableListOf() val indexRequests = mutableListOf() + monitorCtx.findingsIndexBatchSize = FINDINGS_INDEXING_BATCH_SIZE.get(monitorCtx.settings) + monitorCtx.clusterService!!.clusterSettings.addSettingsUpdateConsumer(FINDINGS_INDEXING_BATCH_SIZE) { + monitorCtx.findingsIndexBatchSize = it + } docsToQueries.forEach { val triggeredQueries = it.value.map { queryId -> idQueryMap[queryId]!! } @@ -502,15 +507,41 @@ object DocumentLevelMonitorRunner : MonitorRunner() { .string() logger.debug("Findings: $findingStr") - if (shouldCreateFinding) { - indexRequests += IndexRequest(monitor.dataSources.findingsIndex) - .source(findingStr, XContentType.JSON) - .id(finding.id) - .routing(finding.id) - .opType(DocWriteRequest.OpType.INDEX) + if (indexRequests.size > monitorCtx.findingsIndexBatchSize) { + // make bulk indexing call here and flush the indexRequest object + bulkIndexFindings(monitor, monitorCtx, indexRequests) + indexRequests.clear() + } else { + if (shouldCreateFinding) { + indexRequests += IndexRequest(monitor.dataSources.findingsIndex) + .source(findingStr, XContentType.JSON) + .id(finding.id) + .routing(finding.id) + .opType(DocWriteRequest.OpType.INDEX) + } } } + if (indexRequests.size <= monitorCtx.findingsIndexBatchSize) { + bulkIndexFindings(monitor, monitorCtx, indexRequests) + } + + try { + findings.forEach { finding -> + publishFinding(monitor, monitorCtx, finding) + } + } catch (e: Exception) { + // suppress exception + logger.error("Optional finding callback failed", e) + } + return findingDocPairs + } + + private suspend fun bulkIndexFindings( + monitor: Monitor, + monitorCtx: MonitorRunnerExecutionContext, + indexRequests: List + ) { if (indexRequests.isNotEmpty()) { val bulkResponse: BulkResponse = monitorCtx.client!!.suspendUntil { bulk(BulkRequest().add(indexRequests).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE), it) @@ -518,23 +549,13 @@ object DocumentLevelMonitorRunner : MonitorRunner() { if (bulkResponse.hasFailures()) { bulkResponse.items.forEach { item -> if (item.isFailed) { - logger.debug("Failed indexing the finding ${item.id} of monitor [${monitor.id}]") + logger.error("Failed indexing the finding ${item.id} of monitor [${monitor.id}]") } } } else { logger.debug("[${bulkResponse.items.size}] All findings successfully indexed.") } } - - try { - findings.forEach { finding -> - publishFinding(monitor, monitorCtx, finding) - } - } catch (e: Exception) { - // suppress exception - logger.error("Optional finding callback failed", e) - } - return findingDocPairs } private fun publishFinding( @@ -658,7 +679,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { matchingDocs.addAll(getAllDocs(hits, index, concreteIndex, monitor.id, conflictingFields)) } } catch (e: Exception) { - logger.warn("Failed to run for shard $shard. Error: ${e.message}") + logger.error("Failed to run for shard $shard. Error: ${e.message}") } } return matchingDocs diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt index 41a26bb79..2c98495de 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt @@ -47,5 +47,6 @@ data class MonitorRunnerExecutionContext( @Volatile var destinationContextFactory: DestinationContextFactory? = null, @Volatile var maxActionableAlertCount: Long = AlertingSettings.DEFAULT_MAX_ACTIONABLE_ALERT_COUNT, - @Volatile var indexTimeout: TimeValue? = null + @Volatile var indexTimeout: TimeValue? = null, + @Volatile var findingsIndexBatchSize: Int = AlertingSettings.DEFAULT_FINDINGS_INDEXING_BATCH_SIZE ) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt b/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt index e23d44c5b..37f6d9468 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt @@ -17,6 +17,7 @@ class AlertingSettings { companion object { const val DEFAULT_MAX_ACTIONABLE_ALERT_COUNT = 50L + const val DEFAULT_FINDINGS_INDEXING_BATCH_SIZE = 10000 val ALERTING_MAX_MONITORS = Setting.intSetting( "plugins.alerting.monitor.max_monitors", @@ -153,5 +154,12 @@ class AlertingSettings { -1L, Setting.Property.NodeScope, Setting.Property.Dynamic ) + + val FINDINGS_INDEXING_BATCH_SIZE = Setting.intSetting( + "plugins.alerting.alert_findings_indexing_batch_size", + DEFAULT_FINDINGS_INDEXING_BATCH_SIZE, + 0, + Setting.Property.NodeScope, Setting.Property.Dynamic + ) } } From ae32748934c1989ab90c969228e16a7ddaae1046 Mon Sep 17 00:00:00 2001 From: Megha Goyal Date: Sun, 4 Feb 2024 23:54:48 -0800 Subject: [PATCH 3/5] Addressing review comments Signed-off-by: Megha Goyal --- .../alerting/DocumentLevelMonitorRunner.kt | 34 +++++++------------ .../alerting/MonitorRunnerService.kt | 5 +++ .../alerting/settings/AlertingSettings.kt | 4 +-- 3 files changed, 20 insertions(+), 23 deletions(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt index 1e6b3f4f2..84aa2308b 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt @@ -9,13 +9,14 @@ import org.apache.logging.log4j.LogManager import org.opensearch.ExceptionsHelper import org.opensearch.OpenSearchStatusException import org.opensearch.action.DocWriteRequest +import org.opensearch.action.admin.indices.refresh.RefreshAction +import org.opensearch.action.admin.indices.refresh.RefreshRequest import org.opensearch.action.bulk.BulkRequest import org.opensearch.action.bulk.BulkResponse import org.opensearch.action.index.IndexRequest import org.opensearch.action.search.SearchAction import org.opensearch.action.search.SearchRequest import org.opensearch.action.search.SearchResponse -import org.opensearch.action.support.WriteRequest import org.opensearch.alerting.model.DocumentExecutionContext import org.opensearch.alerting.model.DocumentLevelTriggerRunResult import org.opensearch.alerting.model.InputRunResults @@ -395,7 +396,6 @@ object DocumentLevelMonitorRunner : MonitorRunner() { val actionCtx = triggerCtx.copy( triggeredDocs = triggerResult.triggeredDocs, - // confirm if this is right or only trigger-able findings should be present in this list relatedFindings = findingToDocPairs.map { it.first }, error = monitorResult.error ?: triggerResult.error ) @@ -477,10 +477,6 @@ object DocumentLevelMonitorRunner : MonitorRunner() { val findingDocPairs = mutableListOf>() val findings = mutableListOf() val indexRequests = mutableListOf() - monitorCtx.findingsIndexBatchSize = FINDINGS_INDEXING_BATCH_SIZE.get(monitorCtx.settings) - monitorCtx.clusterService!!.clusterSettings.addSettingsUpdateConsumer(FINDINGS_INDEXING_BATCH_SIZE) { - monitorCtx.findingsIndexBatchSize = it - } docsToQueries.forEach { val triggeredQueries = it.value.map { queryId -> idQueryMap[queryId]!! } @@ -507,22 +503,15 @@ object DocumentLevelMonitorRunner : MonitorRunner() { .string() logger.debug("Findings: $findingStr") - if (indexRequests.size > monitorCtx.findingsIndexBatchSize) { - // make bulk indexing call here and flush the indexRequest object - bulkIndexFindings(monitor, monitorCtx, indexRequests) - indexRequests.clear() - } else { - if (shouldCreateFinding) { - indexRequests += IndexRequest(monitor.dataSources.findingsIndex) - .source(findingStr, XContentType.JSON) - .id(finding.id) - .routing(finding.id) - .opType(DocWriteRequest.OpType.INDEX) - } + if (shouldCreateFinding) { + indexRequests += IndexRequest(monitor.dataSources.findingsIndex) + .source(findingStr, XContentType.JSON) + .id(finding.id) + .opType(DocWriteRequest.OpType.CREATE) } } - if (indexRequests.size <= monitorCtx.findingsIndexBatchSize) { + if (indexRequests.isNotEmpty()) { bulkIndexFindings(monitor, monitorCtx, indexRequests) } @@ -542,9 +531,11 @@ object DocumentLevelMonitorRunner : MonitorRunner() { monitorCtx: MonitorRunnerExecutionContext, indexRequests: List ) { - if (indexRequests.isNotEmpty()) { + monitorCtx.findingsIndexBatchSize = FINDINGS_INDEXING_BATCH_SIZE.get(monitorCtx.settings) + + indexRequests.chunked(monitorCtx.findingsIndexBatchSize).forEach { batch -> val bulkResponse: BulkResponse = monitorCtx.client!!.suspendUntil { - bulk(BulkRequest().add(indexRequests).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE), it) + bulk(BulkRequest().add(indexRequests), it) } if (bulkResponse.hasFailures()) { bulkResponse.items.forEach { item -> @@ -556,6 +547,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { logger.debug("[${bulkResponse.items.size}] All findings successfully indexed.") } } + monitorCtx.client!!.execute(RefreshAction.INSTANCE, RefreshRequest(monitor.dataSources.findingsIndex)) } private fun publishFinding( diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt index ca223f7a0..a8c9cfb60 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt @@ -22,6 +22,7 @@ import org.opensearch.alerting.model.WorkflowRunResult import org.opensearch.alerting.model.destination.DestinationContextFactory import org.opensearch.alerting.opensearchapi.retry import org.opensearch.alerting.script.TriggerExecutionContext +import org.opensearch.alerting.settings.AlertingSettings import org.opensearch.alerting.settings.AlertingSettings.Companion.ALERT_BACKOFF_COUNT import org.opensearch.alerting.settings.AlertingSettings.Companion.ALERT_BACKOFF_MILLIS import org.opensearch.alerting.settings.AlertingSettings.Companion.INDEX_TIMEOUT @@ -169,6 +170,10 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon monitorCtx.indexTimeout = INDEX_TIMEOUT.get(monitorCtx.settings) + monitorCtx.clusterService!!.clusterSettings.addSettingsUpdateConsumer(AlertingSettings.FINDINGS_INDEXING_BATCH_SIZE) { + monitorCtx.findingsIndexBatchSize = it + } + return this } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt b/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt index 37f6d9468..e8048e5b2 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt @@ -17,7 +17,7 @@ class AlertingSettings { companion object { const val DEFAULT_MAX_ACTIONABLE_ALERT_COUNT = 50L - const val DEFAULT_FINDINGS_INDEXING_BATCH_SIZE = 10000 + const val DEFAULT_FINDINGS_INDEXING_BATCH_SIZE = 1000 val ALERTING_MAX_MONITORS = Setting.intSetting( "plugins.alerting.monitor.max_monitors", @@ -158,7 +158,7 @@ class AlertingSettings { val FINDINGS_INDEXING_BATCH_SIZE = Setting.intSetting( "plugins.alerting.alert_findings_indexing_batch_size", DEFAULT_FINDINGS_INDEXING_BATCH_SIZE, - 0, + 1, Setting.Property.NodeScope, Setting.Property.Dynamic ) } From 1833d6dbda6c5ebaf5c728f1cb3b8ceeabe4fa7d Mon Sep 17 00:00:00 2001 From: Megha Goyal Date: Tue, 6 Feb 2024 07:19:33 -0800 Subject: [PATCH 4/5] Add integ tests to test bulk index findings Signed-off-by: Megha Goyal --- .../alerting/DocumentLevelMonitorRunner.kt | 5 +- .../alerting/MonitorRunnerService.kt | 2 + .../alerting/DocumentMonitorRunnerIT.kt | 47 +++++++++++++++++++ 3 files changed, 50 insertions(+), 4 deletions(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt index 84aa2308b..0e88b5cb3 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt @@ -25,7 +25,6 @@ import org.opensearch.alerting.model.MonitorRunResult import org.opensearch.alerting.model.userErrorMessage import org.opensearch.alerting.opensearchapi.suspendUntil import org.opensearch.alerting.script.DocumentLevelTriggerExecutionContext -import org.opensearch.alerting.settings.AlertingSettings.Companion.FINDINGS_INDEXING_BATCH_SIZE import org.opensearch.alerting.util.AlertingException import org.opensearch.alerting.util.IndexUtils import org.opensearch.alerting.util.defaultToPerExecutionAction @@ -531,11 +530,9 @@ object DocumentLevelMonitorRunner : MonitorRunner() { monitorCtx: MonitorRunnerExecutionContext, indexRequests: List ) { - monitorCtx.findingsIndexBatchSize = FINDINGS_INDEXING_BATCH_SIZE.get(monitorCtx.settings) - indexRequests.chunked(monitorCtx.findingsIndexBatchSize).forEach { batch -> val bulkResponse: BulkResponse = monitorCtx.client!!.suspendUntil { - bulk(BulkRequest().add(indexRequests), it) + bulk(BulkRequest().add(batch), it) } if (bulkResponse.hasFailures()) { bulkResponse.items.forEach { item -> diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt index a8c9cfb60..103da2230 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt @@ -25,6 +25,7 @@ import org.opensearch.alerting.script.TriggerExecutionContext import org.opensearch.alerting.settings.AlertingSettings import org.opensearch.alerting.settings.AlertingSettings.Companion.ALERT_BACKOFF_COUNT import org.opensearch.alerting.settings.AlertingSettings.Companion.ALERT_BACKOFF_MILLIS +import org.opensearch.alerting.settings.AlertingSettings.Companion.FINDINGS_INDEXING_BATCH_SIZE import org.opensearch.alerting.settings.AlertingSettings.Companion.INDEX_TIMEOUT import org.opensearch.alerting.settings.AlertingSettings.Companion.MAX_ACTIONABLE_ALERT_COUNT import org.opensearch.alerting.settings.AlertingSettings.Companion.MOVE_ALERTS_BACKOFF_COUNT @@ -170,6 +171,7 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon monitorCtx.indexTimeout = INDEX_TIMEOUT.get(monitorCtx.settings) + monitorCtx.findingsIndexBatchSize = FINDINGS_INDEXING_BATCH_SIZE.get(monitorCtx.settings) monitorCtx.clusterService!!.clusterSettings.addSettingsUpdateConsumer(AlertingSettings.FINDINGS_INDEXING_BATCH_SIZE) { monitorCtx.findingsIndexBatchSize = it } diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt index 943ad11a6..7739d4deb 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt @@ -393,6 +393,53 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { assertEquals("Didn't find findings for docs 1 and 5", 2, foundFindings.size) } + fun `test execute monitor for bulk index findings`() { + val testIndexPrefix = "test-index-${randomAlphaOfLength(10).lowercase(Locale.ROOT)}" + val testQueryName = "wildcard-test-query" + val testIndex = createTestIndex("${testIndexPrefix}1") + val testIndex2 = createTestIndex("${testIndexPrefix}2") + + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) + val testDoc = """{ + "message" : "This is an error from IAD region", + "test_strict_date_time" : "$testTime", + "test_field" : "us-west-2" + }""" + + val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = testQueryName, fields = listOf()) + val docLevelInput = DocLevelMonitorInput("description", listOf("$testIndexPrefix*"), listOf(docQuery)) + + val trigger = randomDocumentLevelTrigger(condition = Script("query[name=$testQueryName]")) + val monitor = createMonitor(randomDocumentLevelMonitor(inputs = listOf(docLevelInput), triggers = listOf(trigger))) + assertNotNull(monitor.id) + + for (i in 0 until 9) { + indexDoc(testIndex, i.toString(), testDoc) + } + indexDoc(testIndex2, "3", testDoc) + adminClient().updateSettings("plugins.alerting.alert_findings_indexing_batch_size", 2) + + val response = executeMonitor(monitor.id) + + val output = entityAsMap(response) + + assertEquals(monitor.name, output["monitor_name"]) + @Suppress("UNCHECKED_CAST") + val searchResult = (output.objectMap("input_results")["results"] as List>).first() + @Suppress("UNCHECKED_CAST") + val matchingDocsToQuery = searchResult[docQuery.id] as List + assertEquals("Correct search result", 10, matchingDocsToQuery.size) + assertTrue("Correct search result", matchingDocsToQuery.containsAll(listOf("1|$testIndex", "2|$testIndex", "3|$testIndex2"))) + + val alerts = searchAlertsWithFilter(monitor) + assertEquals("Alert saved for test monitor", 10, alerts.size) + + val findings = searchFindings(monitor) + assertEquals("Findings saved for test monitor", 10, findings.size) + val foundFindings = findings.filter { it.relatedDocIds.contains("1") || it.relatedDocIds.contains("2") || it.relatedDocIds.contains("3") } + assertEquals("Found findings for all docs", 4, foundFindings.size) + } + fun `test execute monitor with wildcard index that generates alerts and findings for NOT EQUALS query operator`() { val testIndexPrefix = "test-index-${randomAlphaOfLength(10).lowercase(Locale.ROOT)}" val testQueryName = "wildcard-test-query" From 3bd7888bd71a67d871403964537cf591aa44d5db Mon Sep 17 00:00:00 2001 From: Megha Goyal Date: Tue, 6 Feb 2024 11:15:28 -0800 Subject: [PATCH 5/5] Fix ktlint formatting Signed-off-by: Megha Goyal --- .../kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt index 7739d4deb..479e29dca 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt @@ -436,7 +436,8 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { val findings = searchFindings(monitor) assertEquals("Findings saved for test monitor", 10, findings.size) - val foundFindings = findings.filter { it.relatedDocIds.contains("1") || it.relatedDocIds.contains("2") || it.relatedDocIds.contains("3") } + val foundFindings = + findings.filter { it.relatedDocIds.contains("1") || it.relatedDocIds.contains("2") || it.relatedDocIds.contains("3") } assertEquals("Found findings for all docs", 4, foundFindings.size) }