diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/WorkflowService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/WorkflowService.kt index ac30c777c..04bd64b8d 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/WorkflowService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/WorkflowService.kt @@ -38,17 +38,26 @@ class WorkflowService( * Returns finding doc ids per index for the given workflow execution * Used for pre-filtering the dataset in the case of creating a workflow with chained findings * - * @param chainedMonitor Monitor that is previously executed + * @param chainedMonitors Monitors that have previously executed * @param workflowExecutionId Execution id of the current workflow */ - suspend fun getFindingDocIdsByExecutionId(chainedMonitor: Monitor, workflowExecutionId: String): Map> { + suspend fun getFindingDocIdsByExecutionId(chainedMonitors: List, workflowExecutionId: String): Map> { + if (chainedMonitors.isEmpty()) + return emptyMap() + val dataSources = chainedMonitors[0].dataSources try { val existsResponse: IndicesExistsResponse = client.admin().indices().suspendUntil { - exists(IndicesExistsRequest(chainedMonitor.dataSources.findingsIndex).local(true), it) + exists(IndicesExistsRequest(dataSources.findingsIndex).local(true), it) } if (existsResponse.isExists == false) return emptyMap() - // Search findings index per monitor and workflow execution id - val bqb = QueryBuilders.boolQuery().filter(QueryBuilders.termQuery(Finding.MONITOR_ID_FIELD, chainedMonitor.id)) + // Search findings index to match id of monitors and workflow execution id + val bqb = QueryBuilders.boolQuery() + .filter( + QueryBuilders.termsQuery( + Finding.MONITOR_ID_FIELD, + chainedMonitors.map { it.id } + ) + ) .filter(QueryBuilders.termQuery(Finding.EXECUTION_ID_FIELD, workflowExecutionId)) val searchRequest = SearchRequest() .source( @@ -57,7 +66,7 @@ class WorkflowService( .version(true) .seqNoAndPrimaryTerm(true) ) - .indices(chainedMonitor.dataSources.findingsIndex) + .indices(dataSources.findingsIndex) val searchResponse: SearchResponse = client.suspendUntil { client.search(searchRequest, it) } // Get the findings docs diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexWorkflowAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexWorkflowAction.kt index 511d937c1..e93fbd392 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexWorkflowAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexWorkflowAction.kt @@ -527,27 +527,39 @@ class TransportIndexWorkflowAction @Inject constructor( val monitorsById = monitorDelegates.associateBy { it.id } delegates.forEach { + val delegateMonitor = monitorsById[it.monitorId] ?: throw AlertingException.wrap( IllegalArgumentException("Delegate monitor ${it.monitorId} doesn't exist") ) if (it.chainedMonitorFindings != null) { - val chainedFindingMonitor = - monitorsById[it.chainedMonitorFindings!!.monitorId] ?: throw AlertingException.wrap( - IllegalArgumentException("Chained finding monitor doesn't exist") - ) - - if (chainedFindingMonitor.isQueryLevelMonitor()) { - throw AlertingException.wrap(IllegalArgumentException("Query level monitor can't be part of chained findings")) + val chainedMonitorIds: MutableList = mutableListOf() + if (it.chainedMonitorFindings!!.monitorId.isNullOrBlank()) { + chainedMonitorIds.addAll(it.chainedMonitorFindings!!.monitorIds) + } else { + chainedMonitorIds.add(it.chainedMonitorFindings!!.monitorId!!) } + chainedMonitorIds.forEach { chainedMonitorId -> + val chainedFindingMonitor = + monitorsById[chainedMonitorId] ?: throw AlertingException.wrap( + IllegalArgumentException("Chained finding monitor $chainedMonitorId doesn't exist") + ) + + if (chainedFindingMonitor.isQueryLevelMonitor()) { + throw AlertingException.wrap(IllegalArgumentException("Query level monitor can't be part of chained findings")) + } - val delegateMonitorIndices = getMonitorIndices(delegateMonitor) + val delegateMonitorIndices = getMonitorIndices(delegateMonitor) - val chainedMonitorIndices = getMonitorIndices(chainedFindingMonitor) + val chainedMonitorIndices = getMonitorIndices(chainedFindingMonitor) - if (!delegateMonitorIndices.equalsIgnoreOrder(chainedMonitorIndices)) { - throw AlertingException.wrap( - IllegalArgumentException("Delegate monitor and it's chained finding monitor must query the same indices") - ) + if (!delegateMonitorIndices.containsAll(chainedMonitorIndices)) { + throw AlertingException.wrap( + IllegalArgumentException( + "Delegate monitor indices ${delegateMonitorIndices.joinToString()} " + + "doesn't query all of chained findings monitor's indices ${chainedMonitorIndices.joinToString()}}" + ) + ) + } } } } @@ -580,7 +592,7 @@ class TransportIndexWorkflowAction @Inject constructor( private fun validateDelegateMonitorsExist( monitorIds: List, - delegateMonitors: List + delegateMonitors: List, ) { val reqMonitorIds: MutableList = monitorIds as MutableList delegateMonitors.forEach { @@ -600,7 +612,7 @@ class TransportIndexWorkflowAction @Inject constructor( request: IndexWorkflowRequest, user: User?, client: Client, - actionListener: ActionListener + actionListener: ActionListener, ) { val compositeInput = request.workflow.inputs[0] as CompositeInput val monitorIds = compositeInput.sequence.delegates.stream().map { it.monitorId }.collect(Collectors.toList()) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/workflow/CompositeWorkflowRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/workflow/CompositeWorkflowRunner.kt index 1de6c87e4..71f05cc45 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/workflow/CompositeWorkflowRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/workflow/CompositeWorkflowRunner.kt @@ -98,12 +98,23 @@ object CompositeWorkflowRunner : WorkflowRunner() { IllegalStateException("Delegate monitor not found ${delegate.monitorId} for the workflow $workflow.id") ) if (delegate.chainedMonitorFindings != null) { - val chainedMonitor = monitorsById[delegate.chainedMonitorFindings!!.monitorId] - ?: throw AlertingException.wrap( - IllegalStateException("Chained finding monitor not found ${delegate.monitorId} for the workflow $workflow.id") - ) + val chainedMonitorIds: MutableList = mutableListOf() + if (delegate.chainedMonitorFindings!!.monitorId.isNullOrBlank()) { + chainedMonitorIds.addAll(delegate.chainedMonitorFindings!!.monitorIds) + } else { + chainedMonitorIds.add(delegate.chainedMonitorFindings!!.monitorId!!) + } + val chainedMonitors = mutableListOf() + chainedMonitorIds.forEach { + val chainedMonitor = monitorsById[it] + ?: throw AlertingException.wrap( + IllegalStateException("Chained finding monitor not found ${delegate.monitorId} for the workflow $workflow.id") + ) + chainedMonitors.add(chainedMonitor) + } + try { - indexToDocIds = monitorCtx.workflowService!!.getFindingDocIdsByExecutionId(chainedMonitor, executionId) + indexToDocIds = monitorCtx.workflowService!!.getFindingDocIdsByExecutionId(chainedMonitors, executionId) } catch (e: Exception) { logger.error("Failed to execute workflow due to failure in chained findings. Error: ${e.message}", e) return WorkflowRunResult( diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt index 6a8ef2819..7fbca1e00 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt @@ -56,12 +56,14 @@ import org.opensearch.commons.alerting.model.DataSources import org.opensearch.commons.alerting.model.Delegate import org.opensearch.commons.alerting.model.DocLevelMonitorInput import org.opensearch.commons.alerting.model.DocLevelQuery +import org.opensearch.commons.alerting.model.IntervalSchedule import org.opensearch.commons.alerting.model.Monitor import org.opensearch.commons.alerting.model.ScheduledJob import org.opensearch.commons.alerting.model.ScheduledJob.Companion.DOC_LEVEL_QUERIES_INDEX import org.opensearch.commons.alerting.model.ScheduledJob.Companion.SCHEDULED_JOBS_INDEX import org.opensearch.commons.alerting.model.SearchInput import org.opensearch.commons.alerting.model.Table +import org.opensearch.commons.alerting.model.Workflow import org.opensearch.core.rest.RestStatus import org.opensearch.core.xcontent.XContentParser import org.opensearch.core.xcontent.XContentParserUtils @@ -2700,6 +2702,121 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() { assertFindings(monitorResponse2.id, customFindingsIndex2, 1, 1, listOf("2")) } + fun `test execute workflow with multiple monitors in chained monitor findings of single monitor`() { + val docQuery1 = DocLevelQuery(query = "test_field_1:\"us-west-2\"", name = "3") + val docLevelInput1 = DocLevelMonitorInput("description", listOf(index), listOf(docQuery1)) + val trigger1 = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val customAlertsIndex1 = "custom_alerts_index" + val customFindingsIndex1 = "custom_findings_index" + val customFindingsIndexPattern1 = "custom_findings_index-1" + var monitor1 = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput1), + triggers = listOf(trigger1), + enabled = false, + dataSources = DataSources( + alertsIndex = customAlertsIndex1, + findingsIndex = customFindingsIndex1, + findingsIndexPattern = customFindingsIndexPattern1 + ) + ) + val monitorResponse = createMonitor(monitor1)!! + + val docQuery2 = DocLevelQuery(query = "source.ip.v6.v2:16645", name = "4") + val docLevelInput2 = DocLevelMonitorInput("description", listOf(index), listOf(docQuery2)) + val trigger2 = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + var monitor2 = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput2), + triggers = listOf(trigger2), + enabled = false, + dataSources = DataSources( + alertsIndex = customAlertsIndex1, + findingsIndex = customFindingsIndex1, + findingsIndexPattern = customFindingsIndexPattern1 + ) + ) + + val monitorResponse2 = createMonitor(monitor2)!! + val docQuery3 = DocLevelQuery(query = "_id:*", name = "5") + val docLevelInput3 = DocLevelMonitorInput("description", listOf(index), listOf(docQuery3)) + val trigger3 = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + + var monitor3 = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput3), + triggers = listOf(trigger3), + enabled = false, + dataSources = DataSources( + alertsIndex = customAlertsIndex1, + findingsIndex = customFindingsIndex1, + findingsIndexPattern = customFindingsIndexPattern1 + ) + ) + + val monitorResponse3 = createMonitor(monitor3)!! + val d1 = Delegate(1, monitorResponse.id) + val d2 = Delegate(2, monitorResponse2.id) + val d3 = Delegate( + 3, monitorResponse3.id, + ChainedMonitorFindings(null, listOf(monitorResponse.id, monitorResponse2.id)) + ) + var workflow = Workflow( + id = "", + name = "test", + enabled = false, + schedule = IntervalSchedule(interval = 5, unit = ChronoUnit.MINUTES), + lastUpdateTime = Instant.now(), + enabledTime = null, + workflowType = Workflow.WorkflowType.COMPOSITE, + user = randomUser(), + inputs = listOf(CompositeInput(org.opensearch.commons.alerting.model.Sequence(listOf(d1, d2, d3)))), + version = -1L, + schemaVersion = 0, + triggers = emptyList(), + auditDelegateMonitorAlerts = false + + ) + val workflowResponse = upsertWorkflow(workflow)!! + val workflowById = searchWorkflow(workflowResponse.id) + assertNotNull(workflowById) + + var testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(ChronoUnit.MILLIS)) + // Matches monitor1 + val testDoc1 = """{ + "message" : "This is an error from IAD region", + "source.ip.v6.v2" : 16644, + "test_strict_date_time" : "$testTime", + "test_field_1" : "us-west-2" + }""" + indexDoc(index, "1", testDoc1) + + testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(ChronoUnit.MILLIS)) + // Matches monitor1 and monitor2 + val testDoc2 = """{ + "message" : "This is an error from IAD region", + "source.ip.v6.v2" : 16645, + "test_strict_date_time" : "$testTime", + "test_field_1" : "us-west-2" + }""" + indexDoc(index, "2", testDoc2) + + testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(ChronoUnit.MILLIS)) + // Matches monitor1 and monitor2 + val testDoc3 = """{ + "message" : "This is an error from IAD region", + "source.ip.v6.v2" : 16645, + "test_strict_date_time" : "$testTime", + "test_field_1" : "us-east-1" + }""" + indexDoc(index, "3", testDoc3) + + val workflowId = workflowResponse.id + val executeWorkflowResponse = executeWorkflow(workflowById, workflowId, false)!! + val monitorsRunResults = executeWorkflowResponse.workflowRunResult.monitorRunResults + assertEquals(3, monitorsRunResults.size) + assertFindings(monitorResponse.id, customFindingsIndex1, 2, 2, listOf("1", "2")) + assertFindings(monitorResponse2.id, customFindingsIndex1, 2, 2, listOf("2", "3")) + assertFindings(monitorResponse3.id, customFindingsIndex1, 3, 3, listOf("1", "2", "3")) + } + fun `test execute workflows with shared doc level monitor delegate`() { val docQuery = DocLevelQuery(query = "test_field_1:\"us-west-2\"", name = "3") val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery)) @@ -5309,7 +5426,7 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() { e.message?.let { assertTrue( "Exception not returning IndexWorkflow Action error ", - it.contains("Delegate monitor and it's chained finding monitor must query the same indices") + it.contains("doesn't query all of chained findings monitor's indices") ) } } diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/WorkflowRestApiIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/WorkflowRestApiIT.kt index 0ad9780c7..33ae93717 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/WorkflowRestApiIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/WorkflowRestApiIT.kt @@ -348,7 +348,7 @@ class WorkflowRestApiIT : AlertingRestTestCase() { e.message?.let { assertTrue( "Exception not returning IndexWorkflow Action error ", - it.contains("Delegate monitor and it's chained finding monitor must query the same indices") + it.contains("doesn't query all of chained findings monitor's indices") ) } }