-
Notifications
You must be signed in to change notification settings - Fork 108
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adds support for using findings from list of monitors as input data for a monitor in workflow #1112
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -38,17 +38,26 @@ class WorkflowService( | |
* Returns finding doc ids per index for the given workflow execution | ||
* Used for pre-filtering the dataset in the case of creating a workflow with chained findings | ||
* | ||
* @param chainedMonitor Monitor that is previously executed | ||
* @param chainedMonitors Monitor that is previously executed | ||
* @param workflowExecutionId Execution id of the current workflow | ||
*/ | ||
suspend fun getFindingDocIdsByExecutionId(chainedMonitor: Monitor, workflowExecutionId: String): Map<String, List<String>> { | ||
suspend fun getFindingDocIdsByExecutionId(chainedMonitors: List<Monitor>, workflowExecutionId: String): Map<String, List<String>> { | ||
if (chainedMonitors.isEmpty()) | ||
return emptyMap() | ||
val dataSources = chainedMonitors[0].dataSources | ||
try { | ||
val existsResponse: IndicesExistsResponse = client.admin().indices().suspendUntil { | ||
exists(IndicesExistsRequest(chainedMonitor.dataSources.findingsIndex).local(true), it) | ||
exists(IndicesExistsRequest(dataSources.findingsIndex).local(true), it) | ||
} | ||
if (existsResponse.isExists == false) return emptyMap() | ||
// Search findings index per monitor and workflow execution id | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nitpick: update description since this is not for a single monitor anymore. |
||
val bqb = QueryBuilders.boolQuery().filter(QueryBuilders.termQuery(Finding.MONITOR_ID_FIELD, chainedMonitor.id)) | ||
val bqb = QueryBuilders.boolQuery() | ||
.filter( | ||
QueryBuilders.termsQuery( | ||
Finding.MONITOR_ID_FIELD, | ||
chainedMonitors.map { it.id } | ||
) | ||
) | ||
.filter(QueryBuilders.termQuery(Finding.EXECUTION_ID_FIELD, workflowExecutionId)) | ||
val searchRequest = SearchRequest() | ||
.source( | ||
|
@@ -57,7 +66,7 @@ class WorkflowService( | |
.version(true) | ||
.seqNoAndPrimaryTerm(true) | ||
) | ||
.indices(chainedMonitor.dataSources.findingsIndex) | ||
.indices(dataSources.findingsIndex) | ||
val searchResponse: SearchResponse = client.suspendUntil { client.search(searchRequest, it) } | ||
|
||
// Get the findings docs | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -56,12 +56,14 @@ import org.opensearch.commons.alerting.model.DataSources | |
import org.opensearch.commons.alerting.model.Delegate | ||
import org.opensearch.commons.alerting.model.DocLevelMonitorInput | ||
import org.opensearch.commons.alerting.model.DocLevelQuery | ||
import org.opensearch.commons.alerting.model.IntervalSchedule | ||
import org.opensearch.commons.alerting.model.Monitor | ||
import org.opensearch.commons.alerting.model.ScheduledJob | ||
import org.opensearch.commons.alerting.model.ScheduledJob.Companion.DOC_LEVEL_QUERIES_INDEX | ||
import org.opensearch.commons.alerting.model.ScheduledJob.Companion.SCHEDULED_JOBS_INDEX | ||
import org.opensearch.commons.alerting.model.SearchInput | ||
import org.opensearch.commons.alerting.model.Table | ||
import org.opensearch.commons.alerting.model.Workflow | ||
import org.opensearch.core.rest.RestStatus | ||
import org.opensearch.core.xcontent.XContentParser | ||
import org.opensearch.core.xcontent.XContentParserUtils | ||
|
@@ -2698,6 +2700,121 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() { | |
assertFindings(monitorResponse2.id, customFindingsIndex2, 1, 1, listOf("2")) | ||
} | ||
|
||
fun `test execute workflow with multiple monitors in chained monitor findings of single monitor`() { | ||
val docQuery1 = DocLevelQuery(query = "test_field_1:\"us-west-2\"", name = "3") | ||
val docLevelInput1 = DocLevelMonitorInput("description", listOf(index), listOf(docQuery1)) | ||
val trigger1 = randomDocumentLevelTrigger(condition = ALWAYS_RUN) | ||
val customAlertsIndex1 = "custom_alerts_index" | ||
val customFindingsIndex1 = "custom_findings_index" | ||
val customFindingsIndexPattern1 = "custom_findings_index-1" | ||
var monitor1 = randomDocumentLevelMonitor( | ||
inputs = listOf(docLevelInput1), | ||
triggers = listOf(trigger1), | ||
enabled = false, | ||
dataSources = DataSources( | ||
alertsIndex = customAlertsIndex1, | ||
findingsIndex = customFindingsIndex1, | ||
findingsIndexPattern = customFindingsIndexPattern1 | ||
) | ||
) | ||
val monitorResponse = createMonitor(monitor1)!! | ||
|
||
val docQuery2 = DocLevelQuery(query = "source.ip.v6.v2:16645", name = "4") | ||
val docLevelInput2 = DocLevelMonitorInput("description", listOf(index), listOf(docQuery2)) | ||
val trigger2 = randomDocumentLevelTrigger(condition = ALWAYS_RUN) | ||
var monitor2 = randomDocumentLevelMonitor( | ||
inputs = listOf(docLevelInput2), | ||
triggers = listOf(trigger2), | ||
enabled = false, | ||
dataSources = DataSources( | ||
alertsIndex = customAlertsIndex1, | ||
findingsIndex = customFindingsIndex1, | ||
findingsIndexPattern = customFindingsIndexPattern1 | ||
) | ||
) | ||
|
||
val monitorResponse2 = createMonitor(monitor2)!! | ||
val docQuery3 = DocLevelQuery(query = "_id:*", name = "5") | ||
val docLevelInput3 = DocLevelMonitorInput("description", listOf(index), listOf(docQuery3)) | ||
val trigger3 = randomDocumentLevelTrigger(condition = ALWAYS_RUN) | ||
|
||
var monitor3 = randomDocumentLevelMonitor( | ||
inputs = listOf(docLevelInput3), | ||
triggers = listOf(trigger3), | ||
enabled = false, | ||
dataSources = DataSources( | ||
alertsIndex = customAlertsIndex1, | ||
findingsIndex = customFindingsIndex1, | ||
findingsIndexPattern = customFindingsIndexPattern1 | ||
) | ||
) | ||
|
||
val monitorResponse3 = createMonitor(monitor3)!! | ||
val d1 = Delegate(1, monitorResponse.id) | ||
val d2 = Delegate(2, monitorResponse2.id) | ||
val d3 = Delegate( | ||
3, monitorResponse3.id, | ||
ChainedMonitorFindings(null, listOf(monitorResponse.id, monitorResponse2.id)) | ||
) | ||
var workflow = Workflow( | ||
id = "", | ||
name = "test", | ||
enabled = false, | ||
schedule = IntervalSchedule(interval = 5, unit = ChronoUnit.MINUTES), | ||
lastUpdateTime = Instant.now(), | ||
enabledTime = null, | ||
workflowType = Workflow.WorkflowType.COMPOSITE, | ||
user = randomUser(), | ||
inputs = listOf(CompositeInput(org.opensearch.commons.alerting.model.Sequence(listOf(d1, d2, d3)))), | ||
version = -1L, | ||
schemaVersion = 0, | ||
triggers = emptyList(), | ||
auditDelegateMonitorAlerts = false | ||
|
||
) | ||
val workflowResponse = upsertWorkflow(workflow)!! | ||
val workflowById = searchWorkflow(workflowResponse.id) | ||
assertNotNull(workflowById) | ||
|
||
var testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(ChronoUnit.MILLIS)) | ||
// Matches monitor1 | ||
val testDoc1 = """{ | ||
"message" : "This is an error from IAD region", | ||
"source.ip.v6.v2" : 16644, | ||
"test_strict_date_time" : "$testTime", | ||
"test_field_1" : "us-west-2" | ||
}""" | ||
indexDoc(index, "1", testDoc1) | ||
|
||
testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(ChronoUnit.MILLIS)) | ||
// Matches monitor1 and monitor2 | ||
val testDoc2 = """{ | ||
"message" : "This is an error from IAD region", | ||
"source.ip.v6.v2" : 16645, | ||
"test_strict_date_time" : "$testTime", | ||
"test_field_1" : "us-west-2" | ||
}""" | ||
indexDoc(index, "2", testDoc2) | ||
|
||
testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(ChronoUnit.MILLIS)) | ||
// Doesn't match | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this would still match monitor 2, right? |
||
val testDoc3 = """{ | ||
"message" : "This is an error from IAD region", | ||
"source.ip.v6.v2" : 16645, | ||
"test_strict_date_time" : "$testTime", | ||
"test_field_1" : "us-east-1" | ||
}""" | ||
indexDoc(index, "3", testDoc3) | ||
|
||
val workflowId = workflowResponse.id | ||
val executeWorkflowResponse = executeWorkflow(workflowById, workflowId, false)!! | ||
val monitorsRunResults = executeWorkflowResponse.workflowRunResult.monitorRunResults | ||
assertEquals(3, monitorsRunResults.size) | ||
assertFindings(monitorResponse.id, customFindingsIndex1, 2, 2, listOf("1", "2")) | ||
assertFindings(monitorResponse2.id, customFindingsIndex1, 2, 2, listOf("2", "3")) | ||
assertFindings(monitorResponse3.id, customFindingsIndex1, 3, 3, listOf("1", "2", "3")) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shouldn't monitor 3 only execute on document 2 since it takes an input from monitor 1 and 2 and they only both would generate findings from document 2? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We would do union of 2 sets here not intersection. |
||
} | ||
|
||
fun `test execute workflows with shared doc level monitor delegate`() { | ||
val docQuery = DocLevelQuery(query = "test_field_1:\"us-west-2\"", name = "3") | ||
val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery)) | ||
|
@@ -5307,7 +5424,7 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() { | |
e.message?.let { | ||
assertTrue( | ||
"Exception not returning IndexWorkflow Action error ", | ||
it.contains("Delegate monitor and it's chained finding monitor must query the same indices") | ||
it.contains("doesn't query all of chained findings monitor's indices") | ||
) | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nitpick: update comment to indicate
Monitors that have previously executed
.