Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds support for using findings from list of monitors as input data for a monitor in workflow #1112

Merged
merged 4 commits into from
Sep 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,26 @@ class WorkflowService(
* Returns finding doc ids per index for the given workflow execution
* Used for pre-filtering the dataset in the case of creating a workflow with chained findings
*
* @param chainedMonitor Monitor that is previously executed
* @param chainedMonitors Monitors that have previously executed
* @param workflowExecutionId Execution id of the current workflow
*/
suspend fun getFindingDocIdsByExecutionId(chainedMonitor: Monitor, workflowExecutionId: String): Map<String, List<String>> {
suspend fun getFindingDocIdsByExecutionId(chainedMonitors: List<Monitor>, workflowExecutionId: String): Map<String, List<String>> {
if (chainedMonitors.isEmpty())
return emptyMap()
val dataSources = chainedMonitors[0].dataSources
try {
val existsResponse: IndicesExistsResponse = client.admin().indices().suspendUntil {
exists(IndicesExistsRequest(chainedMonitor.dataSources.findingsIndex).local(true), it)
exists(IndicesExistsRequest(dataSources.findingsIndex).local(true), it)
}
if (existsResponse.isExists == false) return emptyMap()
// Search findings index per monitor and workflow execution id
val bqb = QueryBuilders.boolQuery().filter(QueryBuilders.termQuery(Finding.MONITOR_ID_FIELD, chainedMonitor.id))
// Search findings index to match id of monitors and workflow execution id
val bqb = QueryBuilders.boolQuery()
.filter(
QueryBuilders.termsQuery(
Finding.MONITOR_ID_FIELD,
chainedMonitors.map { it.id }
)
)
.filter(QueryBuilders.termQuery(Finding.EXECUTION_ID_FIELD, workflowExecutionId))
val searchRequest = SearchRequest()
.source(
Expand All @@ -57,7 +66,7 @@ class WorkflowService(
.version(true)
.seqNoAndPrimaryTerm(true)
)
.indices(chainedMonitor.dataSources.findingsIndex)
.indices(dataSources.findingsIndex)
val searchResponse: SearchResponse = client.suspendUntil { client.search(searchRequest, it) }

// Get the findings docs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -528,27 +528,39 @@ class TransportIndexWorkflowAction @Inject constructor(

val monitorsById = monitorDelegates.associateBy { it.id }
delegates.forEach {

val delegateMonitor = monitorsById[it.monitorId] ?: throw AlertingException.wrap(
IllegalArgumentException("Delegate monitor ${it.monitorId} doesn't exist")
)
if (it.chainedMonitorFindings != null) {
val chainedFindingMonitor =
monitorsById[it.chainedMonitorFindings!!.monitorId] ?: throw AlertingException.wrap(
IllegalArgumentException("Chained finding monitor doesn't exist")
)

if (chainedFindingMonitor.isQueryLevelMonitor()) {
throw AlertingException.wrap(IllegalArgumentException("Query level monitor can't be part of chained findings"))
val chainedMonitorIds: MutableList<String> = mutableListOf()
if (it.chainedMonitorFindings!!.monitorId.isNullOrBlank()) {
chainedMonitorIds.addAll(it.chainedMonitorFindings!!.monitorIds)
} else {
chainedMonitorIds.add(it.chainedMonitorFindings!!.monitorId!!)
}
chainedMonitorIds.forEach { chainedMonitorId ->
val chainedFindingMonitor =
monitorsById[chainedMonitorId] ?: throw AlertingException.wrap(
IllegalArgumentException("Chained finding monitor $chainedMonitorId doesn't exist")
)

if (chainedFindingMonitor.isQueryLevelMonitor()) {
throw AlertingException.wrap(IllegalArgumentException("Query level monitor can't be part of chained findings"))
}

val delegateMonitorIndices = getMonitorIndices(delegateMonitor)
val delegateMonitorIndices = getMonitorIndices(delegateMonitor)

val chainedMonitorIndices = getMonitorIndices(chainedFindingMonitor)
val chainedMonitorIndices = getMonitorIndices(chainedFindingMonitor)

if (!delegateMonitorIndices.equalsIgnoreOrder(chainedMonitorIndices)) {
throw AlertingException.wrap(
IllegalArgumentException("Delegate monitor and it's chained finding monitor must query the same indices")
)
if (!delegateMonitorIndices.containsAll(chainedMonitorIndices)) {
throw AlertingException.wrap(
IllegalArgumentException(
"Delegate monitor indices ${delegateMonitorIndices.joinToString()} " +
"doesn't query all of chained findings monitor's indices ${chainedMonitorIndices.joinToString()}}"
)
)
}
}
}
}
Expand Down Expand Up @@ -581,7 +593,7 @@ class TransportIndexWorkflowAction @Inject constructor(

private fun validateDelegateMonitorsExist(
monitorIds: List<String>,
delegateMonitors: List<Monitor>
delegateMonitors: List<Monitor>,
) {
val reqMonitorIds: MutableList<String> = monitorIds as MutableList<String>
delegateMonitors.forEach {
Expand All @@ -601,7 +613,7 @@ class TransportIndexWorkflowAction @Inject constructor(
request: IndexWorkflowRequest,
user: User?,
client: Client,
actionListener: ActionListener<AcknowledgedResponse>
actionListener: ActionListener<AcknowledgedResponse>,
) {
val compositeInput = request.workflow.inputs[0] as CompositeInput
val monitorIds = compositeInput.sequence.delegates.stream().map { it.monitorId }.collect(Collectors.toList())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,23 @@ object CompositeWorkflowRunner : WorkflowRunner() {
IllegalStateException("Delegate monitor not found ${delegate.monitorId} for the workflow $workflow.id")
)
if (delegate.chainedMonitorFindings != null) {
val chainedMonitor = monitorsById[delegate.chainedMonitorFindings!!.monitorId]
?: throw AlertingException.wrap(
IllegalStateException("Chained finding monitor not found ${delegate.monitorId} for the workflow $workflow.id")
)
val chainedMonitorIds: MutableList<String> = mutableListOf()
if (delegate.chainedMonitorFindings!!.monitorId.isNullOrBlank()) {
chainedMonitorIds.addAll(delegate.chainedMonitorFindings!!.monitorIds)
} else {
chainedMonitorIds.add(delegate.chainedMonitorFindings!!.monitorId!!)
}
val chainedMonitors = mutableListOf<Monitor>()
chainedMonitorIds.forEach {
val chainedMonitor = monitorsById[it]
?: throw AlertingException.wrap(
IllegalStateException("Chained finding monitor not found ${delegate.monitorId} for the workflow $workflow.id")
)
chainedMonitors.add(chainedMonitor)
}

try {
indexToDocIds = monitorCtx.workflowService!!.getFindingDocIdsByExecutionId(chainedMonitor, executionId)
indexToDocIds = monitorCtx.workflowService!!.getFindingDocIdsByExecutionId(chainedMonitors, executionId)
} catch (e: Exception) {
logger.error("Failed to execute workflow due to failure in chained findings. Error: ${e.message}", e)
return WorkflowRunResult(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,14 @@ import org.opensearch.commons.alerting.model.DataSources
import org.opensearch.commons.alerting.model.Delegate
import org.opensearch.commons.alerting.model.DocLevelMonitorInput
import org.opensearch.commons.alerting.model.DocLevelQuery
import org.opensearch.commons.alerting.model.IntervalSchedule
import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.commons.alerting.model.ScheduledJob
import org.opensearch.commons.alerting.model.ScheduledJob.Companion.DOC_LEVEL_QUERIES_INDEX
import org.opensearch.commons.alerting.model.ScheduledJob.Companion.SCHEDULED_JOBS_INDEX
import org.opensearch.commons.alerting.model.SearchInput
import org.opensearch.commons.alerting.model.Table
import org.opensearch.commons.alerting.model.Workflow
import org.opensearch.core.rest.RestStatus
import org.opensearch.core.xcontent.XContentParser
import org.opensearch.core.xcontent.XContentParserUtils
Expand Down Expand Up @@ -2698,6 +2700,121 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
assertFindings(monitorResponse2.id, customFindingsIndex2, 1, 1, listOf("2"))
}

fun `test execute workflow with multiple monitors in chained monitor findings of single monitor`() {
val docQuery1 = DocLevelQuery(query = "test_field_1:\"us-west-2\"", name = "3")
val docLevelInput1 = DocLevelMonitorInput("description", listOf(index), listOf(docQuery1))
val trigger1 = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
val customAlertsIndex1 = "custom_alerts_index"
val customFindingsIndex1 = "custom_findings_index"
val customFindingsIndexPattern1 = "custom_findings_index-1"
var monitor1 = randomDocumentLevelMonitor(
inputs = listOf(docLevelInput1),
triggers = listOf(trigger1),
enabled = false,
dataSources = DataSources(
alertsIndex = customAlertsIndex1,
findingsIndex = customFindingsIndex1,
findingsIndexPattern = customFindingsIndexPattern1
)
)
val monitorResponse = createMonitor(monitor1)!!

val docQuery2 = DocLevelQuery(query = "source.ip.v6.v2:16645", name = "4")
val docLevelInput2 = DocLevelMonitorInput("description", listOf(index), listOf(docQuery2))
val trigger2 = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
var monitor2 = randomDocumentLevelMonitor(
inputs = listOf(docLevelInput2),
triggers = listOf(trigger2),
enabled = false,
dataSources = DataSources(
alertsIndex = customAlertsIndex1,
findingsIndex = customFindingsIndex1,
findingsIndexPattern = customFindingsIndexPattern1
)
)

val monitorResponse2 = createMonitor(monitor2)!!
val docQuery3 = DocLevelQuery(query = "_id:*", name = "5")
val docLevelInput3 = DocLevelMonitorInput("description", listOf(index), listOf(docQuery3))
val trigger3 = randomDocumentLevelTrigger(condition = ALWAYS_RUN)

var monitor3 = randomDocumentLevelMonitor(
inputs = listOf(docLevelInput3),
triggers = listOf(trigger3),
enabled = false,
dataSources = DataSources(
alertsIndex = customAlertsIndex1,
findingsIndex = customFindingsIndex1,
findingsIndexPattern = customFindingsIndexPattern1
)
)

val monitorResponse3 = createMonitor(monitor3)!!
val d1 = Delegate(1, monitorResponse.id)
val d2 = Delegate(2, monitorResponse2.id)
val d3 = Delegate(
3, monitorResponse3.id,
ChainedMonitorFindings(null, listOf(monitorResponse.id, monitorResponse2.id))
)
var workflow = Workflow(
id = "",
name = "test",
enabled = false,
schedule = IntervalSchedule(interval = 5, unit = ChronoUnit.MINUTES),
lastUpdateTime = Instant.now(),
enabledTime = null,
workflowType = Workflow.WorkflowType.COMPOSITE,
user = randomUser(),
inputs = listOf(CompositeInput(org.opensearch.commons.alerting.model.Sequence(listOf(d1, d2, d3)))),
version = -1L,
schemaVersion = 0,
triggers = emptyList(),
auditDelegateMonitorAlerts = false

)
val workflowResponse = upsertWorkflow(workflow)!!
val workflowById = searchWorkflow(workflowResponse.id)
assertNotNull(workflowById)

var testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(ChronoUnit.MILLIS))
// Matches monitor1
val testDoc1 = """{
"message" : "This is an error from IAD region",
"source.ip.v6.v2" : 16644,
"test_strict_date_time" : "$testTime",
"test_field_1" : "us-west-2"
}"""
indexDoc(index, "1", testDoc1)

testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(ChronoUnit.MILLIS))
// Matches monitor1 and monitor2
val testDoc2 = """{
"message" : "This is an error from IAD region",
"source.ip.v6.v2" : 16645,
"test_strict_date_time" : "$testTime",
"test_field_1" : "us-west-2"
}"""
indexDoc(index, "2", testDoc2)

testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(ChronoUnit.MILLIS))
// Matches monitor1 and monitor2
val testDoc3 = """{
"message" : "This is an error from IAD region",
"source.ip.v6.v2" : 16645,
"test_strict_date_time" : "$testTime",
"test_field_1" : "us-east-1"
}"""
indexDoc(index, "3", testDoc3)

val workflowId = workflowResponse.id
val executeWorkflowResponse = executeWorkflow(workflowById, workflowId, false)!!
val monitorsRunResults = executeWorkflowResponse.workflowRunResult.monitorRunResults
assertEquals(3, monitorsRunResults.size)
assertFindings(monitorResponse.id, customFindingsIndex1, 2, 2, listOf("1", "2"))
assertFindings(monitorResponse2.id, customFindingsIndex1, 2, 2, listOf("2", "3"))
assertFindings(monitorResponse3.id, customFindingsIndex1, 3, 3, listOf("1", "2", "3"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't monitor 3 only execute on document 2 since it takes an input from monitor 1 and 2 and they only both would generate findings from document 2?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We would do union of 2 sets here not intersection.

}

fun `test execute workflows with shared doc level monitor delegate`() {
val docQuery = DocLevelQuery(query = "test_field_1:\"us-west-2\"", name = "3")
val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery))
Expand Down Expand Up @@ -5307,7 +5424,7 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
e.message?.let {
assertTrue(
"Exception not returning IndexWorkflow Action error ",
it.contains("Delegate monitor and it's chained finding monitor must query the same indices")
it.contains("doesn't query all of chained findings monitor's indices")
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ class WorkflowRestApiIT : AlertingRestTestCase() {
e.message?.let {
assertTrue(
"Exception not returning IndexWorkflow Action error ",
it.contains("Delegate monitor and it's chained finding monitor must query the same indices")
it.contains("doesn't query all of chained findings monitor's indices")
)
}
}
Expand Down
Loading