diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt index 98693def1..3ca2a9e24 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt @@ -32,6 +32,11 @@ private val log = LogManager.getLogger(DocLevelMonitorQueries::class.java) class DocLevelMonitorQueries(private val client: Client, private val clusterService: ClusterService) { companion object { + + val PROPERTIES = "properties" + val NESTED = "nested" + val TYPE = "type" + @JvmStatic fun docLevelQueriesMappings(): String { return DocLevelMonitorQueries::class.java.classLoader.getResource("mappings/doc-level-queries.json").readText() @@ -95,6 +100,22 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ return clusterState.routingTable.hasIndex(ScheduledJob.DOC_LEVEL_QUERIES_INDEX) } + /** + * From given index mapping node, extracts fieldName -> fieldProperties pair + */ + fun extractField(node: MutableMap, currentPath: String): Pair> { + if (node.containsKey(PROPERTIES)) { + return extractField(node.get(PROPERTIES) as MutableMap, currentPath) + } else if (node.containsKey(NESTED)) { + return extractField(node.get(NESTED) as MutableMap, currentPath) + } else if (node.size == 1 && node.containsKey(TYPE) == false) { + val iter = node.iterator().next() + return extractField(iter.value as MutableMap, currentPath + "." + iter.key) + } else { + return Pair(currentPath, node) + } + } + suspend fun indexDocLevelQueries( monitor: Monitor, monitorId: String, @@ -123,17 +144,19 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ ) val updatedProperties = properties.entries.associate { - val newVal = it.value.toMutableMap() + var (fieldName, fieldProps) = extractField(it.value as MutableMap, it.key) + val newProps = fieldProps if (monitor.dataSources.queryIndexMappingsByType.isNotEmpty()) { val mappingsByType = monitor.dataSources.queryIndexMappingsByType if (it.value.containsKey("type") && mappingsByType.containsKey(it.value["type"]!!)) { mappingsByType[it.value["type"]]?.entries?.forEach { iter: Map.Entry -> - newVal[iter.key] = iter.value + newProps[iter.key] = iter.value } } } - if (it.value.containsKey("path")) newVal["path"] = "${it.value["path"]}_${indexName}_$monitorId" - "${it.key}_${indexName}_$monitorId" to newVal + + if (fieldProps.containsKey("path")) newProps["path"] = "${fieldProps["path"]}_${indexName}_$monitorId" + "${fieldName}_${indexName}_$monitorId" to newProps } val queryIndex = monitor.dataSources.queryIndex diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt index 43c9dff3e..bf6a2549d 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt @@ -8,6 +8,8 @@ package org.opensearch.alerting import org.junit.Assert import org.opensearch.action.admin.cluster.state.ClusterStateRequest import org.opensearch.action.admin.indices.create.CreateIndexRequest +import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest +import org.opensearch.action.admin.indices.refresh.RefreshRequest import org.opensearch.action.search.SearchRequest import org.opensearch.action.support.WriteRequest import org.opensearch.alerting.action.SearchMonitorAction @@ -17,6 +19,7 @@ import org.opensearch.alerting.transport.AlertingSingleNodeTestCase import org.opensearch.common.settings.Settings import org.opensearch.commons.alerting.action.AcknowledgeAlertRequest import org.opensearch.commons.alerting.action.AlertingActions +import org.opensearch.commons.alerting.action.DeleteMonitorRequest import org.opensearch.commons.alerting.action.GetAlertsRequest import org.opensearch.commons.alerting.model.Alert import org.opensearch.commons.alerting.model.DataSources @@ -25,6 +28,8 @@ import org.opensearch.commons.alerting.model.DocLevelQuery import org.opensearch.commons.alerting.model.ScheduledJob.Companion.SCHEDULED_JOBS_INDEX import org.opensearch.commons.alerting.model.Table import org.opensearch.index.query.MatchQueryBuilder +import org.opensearch.index.query.QueryBuilders +import org.opensearch.search.builder.SearchSourceBuilder import org.opensearch.test.OpenSearchTestCase import java.time.ZonedDateTime import java.time.format.DateTimeFormatter @@ -51,6 +56,7 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() { }""" assertFalse(monitorResponse?.id.isNullOrEmpty()) monitor = monitorResponse!!.monitor + Assert.assertEquals(monitor.owner, "alerting") indexDoc(index, "1", testDoc) val id = monitorResponse.id val executeMonitorResponse = executeMonitor(monitor, id, true) @@ -137,12 +143,24 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() { val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) val testDoc = """{ "message" : "This is an error from IAD region", + "source.port": 12345, "test_strict_date_time" : "$testTime", "test_field" : "us-west-2" }""" + indexDoc(index, "1", testDoc) assertFalse(monitorResponse?.id.isNullOrEmpty()) monitor = monitorResponse!!.monitor - indexDoc(index, "1", testDoc) + client().admin().indices().putMapping( + PutMappingRequest( + index + ).source("test_alias.field_a", "type=alias,path=message") + ).get() + client().admin().indices().putMapping( + PutMappingRequest( + index + ).source("test_alias2", "type=alias,path=test_field") + ).get() + val id = monitorResponse.id val executeMonitorResponse = executeMonitor(monitor, id, false) Assert.assertEquals(executeMonitorResponse!!.monitorRunResult.monitorName, monitor.name) @@ -195,17 +213,67 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() { Assert.assertTrue(mapping?.source()?.string()?.contains("\"analyzer\":\"$analyzer\"") == true) } - fun `test execute monitor with custom findings index`() { + fun `test delete monitor deletes all queries and metadata too`() { + val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3") + val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery)) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val customQueryIndex = "custom_alerts_index" + val analyzer = "whitespace" + var monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger), + dataSources = DataSources( + queryIndex = customQueryIndex, + queryIndexMappingsByType = mapOf(Pair("text", mapOf(Pair("analyzer", analyzer)))), + ) + ) + val monitorResponse = createMonitor(monitor) + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) + val testDoc = """{ + "message" : "This is an error from IAD region", + "test_strict_date_time" : "$testTime", + "test_field" : "us-west-2" + }""" + assertFalse(monitorResponse?.id.isNullOrEmpty()) + monitor = monitorResponse!!.monitor + indexDoc(index, "1", testDoc) + val monitorId = monitorResponse.id + val executeMonitorResponse = executeMonitor(monitor, monitorId, false) + Assert.assertEquals(executeMonitorResponse!!.monitorRunResult.monitorName, monitor.name) + Assert.assertEquals(executeMonitorResponse.monitorRunResult.triggerResults.size, 1) + searchAlerts(monitorId) + val clusterStateResponse = client().admin().cluster().state(ClusterStateRequest().indices(customQueryIndex).metadata(true)).get() + val mapping = clusterStateResponse.state.metadata.index(customQueryIndex).mapping() + Assert.assertTrue(mapping?.source()?.string()?.contains("\"analyzer\":\"$analyzer\"") == true) + // Verify queries exist + var searchResponse = client().search( + SearchRequest(customQueryIndex).source(SearchSourceBuilder().query(QueryBuilders.matchAllQuery())) + ).get() + assertNotEquals(0, searchResponse.hits.hits.size) + client().execute( + AlertingActions.DELETE_MONITOR_ACTION_TYPE, DeleteMonitorRequest(monitorId, WriteRequest.RefreshPolicy.IMMEDIATE) + ).get() + client().admin().indices().refresh(RefreshRequest(customQueryIndex)).get() + // Verify queries are deleted + searchResponse = client().search( + SearchRequest(customQueryIndex).source(SearchSourceBuilder().query(QueryBuilders.matchAllQuery())) + ).get() + assertEquals(0, searchResponse.hits.hits.size) + } + + fun `test execute monitor with custom findings index and pattern`() { val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3") val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery)) val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) val customFindingsIndex = "custom_findings_index" + val customFindingsIndexPattern = "" var monitor = randomDocumentLevelMonitor( inputs = listOf(docLevelInput), triggers = listOf(trigger), - dataSources = DataSources(findingsIndex = customFindingsIndex) + dataSources = DataSources(findingsIndex = customFindingsIndex, findingsIndexPattern = customFindingsIndexPattern) ) val monitorResponse = createMonitor(monitor) + client().admin().indices().refresh(RefreshRequest("*")) val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) val testDoc = """{ "message" : "This is an error from IAD region", @@ -216,24 +284,25 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() { monitor = monitorResponse!!.monitor indexDoc(index, "1", testDoc) val id = monitorResponse.id - val executeMonitorResponse = executeMonitor(monitor, id, false) + var executeMonitorResponse = executeMonitor(monitor, id, false) Assert.assertEquals(executeMonitorResponse!!.monitorRunResult.monitorName, monitor.name) Assert.assertEquals(executeMonitorResponse.monitorRunResult.triggerResults.size, 1) - searchAlerts(id) - val findings = searchFindings(id, customFindingsIndex) + + var findings = searchFindings(id, "custom_findings_index*", true) assertEquals("Findings saved for test monitor", 1, findings.size) assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("1")) - val table = Table("asc", "id", null, 1, 0, "") - var getAlertsResponse = client() - .execute(AlertingActions.GET_ALERTS_ACTION_TYPE, GetAlertsRequest(table, "ALL", "ALL", null, null)) - .get() - Assert.assertTrue(getAlertsResponse != null) - Assert.assertTrue(getAlertsResponse.alerts.size == 1) - getAlertsResponse = client() - .execute(AlertingActions.GET_ALERTS_ACTION_TYPE, GetAlertsRequest(table, "ALL", "ALL", id, null)) - .get() - Assert.assertTrue(getAlertsResponse != null) - Assert.assertTrue(getAlertsResponse.alerts.size == 1) + + indexDoc(index, "2", testDoc) + executeMonitorResponse = executeMonitor(monitor, id, false) + Assert.assertEquals(executeMonitorResponse!!.monitorRunResult.monitorName, monitor.name) + Assert.assertEquals(executeMonitorResponse.monitorRunResult.triggerResults.size, 1) + searchAlerts(id) + findings = searchFindings(id, "custom_findings_index*", true) + assertEquals("Findings saved for test monitor", 2, findings.size) + assertTrue("Findings saved for test monitor", findings[1].relatedDocIds.contains("2")) + + val indices = getAllIndicesFromPattern("custom_findings_index*") + Assert.assertTrue(indices.isNotEmpty()) } fun `test execute pre-existing monitorand update`() { @@ -318,7 +387,6 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() { val customAlertsIndex = "custom_alerts_index" val customQueryIndex = "custom_query_index" - Assert.assertFalse(client().admin().cluster().state(ClusterStateRequest()).get().state.routingTable.hasIndex(customQueryIndex)) val customFindingsIndex = "custom_findings_index" val updateMonitorResponse = updateMonitor( monitor.copy( @@ -335,8 +403,9 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() { Assert.assertNotNull(updateMonitorResponse) Assert.assertEquals(updateMonitorResponse!!.monitor.owner, "security_analytics_plugin") indexDoc(index, "2", testDoc) - executeMonitorResponse = executeMonitor(updateMonitorResponse!!.monitor, monitorId, false) - Assert.assertTrue(client().admin().cluster().state(ClusterStateRequest()).get().state.routingTable.hasIndex(customQueryIndex)) + if (updateMonitorResponse != null) { + executeMonitorResponse = executeMonitor(updateMonitorResponse.monitor, monitorId, false) + } val findings = searchFindings(monitorId, customFindingsIndex) assertEquals("Findings saved for test monitor", 1, findings.size) assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("2"))