From 831d43cea56fe4a8481e08043713a37915067f7e Mon Sep 17 00:00:00 2001 From: Petar Dzepina Date: Mon, 7 Nov 2022 23:29:49 +0100 Subject: [PATCH 1/2] Added support for "nested" mappings (#645) * example Signed-off-by: Petar Dzepina * fixed updating mappings for queryIndex Signed-off-by: Petar Dzepina Signed-off-by: Petar Dzepina --- .../alerting/util/DocLevelMonitorQueries.kt | 31 ++++++++++++++++--- .../alerting/MonitorDataSourcesIT.kt | 15 ++++++++- 2 files changed, 41 insertions(+), 5 deletions(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt index 98693def1..3ca2a9e24 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt @@ -32,6 +32,11 @@ private val log = LogManager.getLogger(DocLevelMonitorQueries::class.java) class DocLevelMonitorQueries(private val client: Client, private val clusterService: ClusterService) { companion object { + + val PROPERTIES = "properties" + val NESTED = "nested" + val TYPE = "type" + @JvmStatic fun docLevelQueriesMappings(): String { return DocLevelMonitorQueries::class.java.classLoader.getResource("mappings/doc-level-queries.json").readText() @@ -95,6 +100,22 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ return clusterState.routingTable.hasIndex(ScheduledJob.DOC_LEVEL_QUERIES_INDEX) } + /** + * From given index mapping node, extracts fieldName -> fieldProperties pair + */ + fun extractField(node: MutableMap, currentPath: String): Pair> { + if (node.containsKey(PROPERTIES)) { + return extractField(node.get(PROPERTIES) as MutableMap, currentPath) + } else if (node.containsKey(NESTED)) { + return extractField(node.get(NESTED) as MutableMap, currentPath) + } else if (node.size == 1 && node.containsKey(TYPE) == false) { + val iter = node.iterator().next() + return extractField(iter.value as MutableMap, currentPath + "." + iter.key) + } else { + return Pair(currentPath, node) + } + } + suspend fun indexDocLevelQueries( monitor: Monitor, monitorId: String, @@ -123,17 +144,19 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ ) val updatedProperties = properties.entries.associate { - val newVal = it.value.toMutableMap() + var (fieldName, fieldProps) = extractField(it.value as MutableMap, it.key) + val newProps = fieldProps if (monitor.dataSources.queryIndexMappingsByType.isNotEmpty()) { val mappingsByType = monitor.dataSources.queryIndexMappingsByType if (it.value.containsKey("type") && mappingsByType.containsKey(it.value["type"]!!)) { mappingsByType[it.value["type"]]?.entries?.forEach { iter: Map.Entry -> - newVal[iter.key] = iter.value + newProps[iter.key] = iter.value } } } - if (it.value.containsKey("path")) newVal["path"] = "${it.value["path"]}_${indexName}_$monitorId" - "${it.key}_${indexName}_$monitorId" to newVal + + if (fieldProps.containsKey("path")) newProps["path"] = "${fieldProps["path"]}_${indexName}_$monitorId" + "${fieldName}_${indexName}_$monitorId" to newProps } val queryIndex = monitor.dataSources.queryIndex diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt index f4575cbbe..fabd162b6 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt @@ -8,6 +8,7 @@ package org.opensearch.alerting import org.junit.Assert import org.opensearch.action.admin.cluster.state.ClusterStateRequest import org.opensearch.action.admin.indices.create.CreateIndexRequest +import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest import org.opensearch.action.admin.indices.refresh.RefreshRequest import org.opensearch.action.search.SearchRequest import org.opensearch.action.support.WriteRequest @@ -141,12 +142,24 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() { val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) val testDoc = """{ "message" : "This is an error from IAD region", + "source.port": 12345, "test_strict_date_time" : "$testTime", "test_field" : "us-west-2" }""" + indexDoc(index, "1", testDoc) assertFalse(monitorResponse?.id.isNullOrEmpty()) monitor = monitorResponse!!.monitor - indexDoc(index, "1", testDoc) + client().admin().indices().putMapping( + PutMappingRequest( + index + ).source("test_alias.field_a", "type=alias,path=message") + ).get() + client().admin().indices().putMapping( + PutMappingRequest( + index + ).source("test_alias2", "type=alias,path=test_field") + ).get() + val id = monitorResponse.id val executeMonitorResponse = executeMonitor(monitor, id, false) Assert.assertEquals(executeMonitorResponse!!.monitorRunResult.monitorName, monitor.name) From 9cfeafec47dc1c2664d98d348affbc1f4e584474 Mon Sep 17 00:00:00 2001 From: Petar Dzepina Date: Wed, 9 Nov 2022 22:10:47 +0100 Subject: [PATCH 2/2] mappings traversal bug fix (#669) Signed-off-by: Petar Dzepina --- .../alerting/util/DocLevelMonitorQueries.kt | 95 ++++++++++++---- .../alerting/MonitorDataSourcesIT.kt | 107 ++++++++++++++++-- 2 files changed, 165 insertions(+), 37 deletions(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt index 3ca2a9e24..3b036a382 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt @@ -101,18 +101,54 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ } /** - * From given index mapping node, extracts fieldName -> fieldProperties pair + * Does a DFS traversal of index mappings tree. + * Calls processLeafFn on every leaf node. + * Populates flattenPaths list with full paths of leaf nodes + * @param node current node which we're visiting + * @param currentPath current node path from root node + * @param processLeafFn leaf processor function which is called on every leaf discovered + * @param flattenPaths list of full paths of all leaf nodes relative to root */ - fun extractField(node: MutableMap, currentPath: String): Pair> { + fun traverseMappingsAndUpdate( + node: MutableMap, + currentPath: String, + processLeafFn: (String, MutableMap) -> Triple>, + flattenPaths: MutableList + ) { + // If node contains "properties" property then it is internal(non-leaf) node if (node.containsKey(PROPERTIES)) { - return extractField(node.get(PROPERTIES) as MutableMap, currentPath) - } else if (node.containsKey(NESTED)) { - return extractField(node.get(NESTED) as MutableMap, currentPath) - } else if (node.size == 1 && node.containsKey(TYPE) == false) { - val iter = node.iterator().next() - return extractField(iter.value as MutableMap, currentPath + "." + iter.key) - } else { - return Pair(currentPath, node) + return traverseMappingsAndUpdate(node.get(PROPERTIES) as MutableMap, currentPath, processLeafFn, flattenPaths) + } else if (node.containsKey(TYPE) == false) { + // If there is no "type" property, this is either internal(non-leaf) node or leaf node + // newNodes will hold list of updated leaf properties + var newNodes = ArrayList>(node.size) + node.entries.forEach { + // Compute full path relative to root + val fullPath = if (currentPath.isEmpty()) it.key + else "$currentPath.${it.key}" + val nodeProps = it.value as MutableMap + // If it has type property and type is not "nested" then this is a leaf + if (nodeProps.containsKey(TYPE) && nodeProps[TYPE] != NESTED) { + // At this point we know full path of node, so we add it to output array + flattenPaths.add(fullPath) + // Calls processLeafFn and gets old node name, new node name and new properties of node. + // This is all information we need to update this node + val (oldName, newName, props) = processLeafFn(it.key, it.value as MutableMap) + newNodes.add(Triple(oldName, newName, props)) + } else { + // Internal(non-leaf) node - visit children + traverseMappingsAndUpdate(nodeProps[PROPERTIES] as MutableMap, fullPath, processLeafFn, flattenPaths) + } + } + // Here we can update all processed leaves in tree + newNodes.forEach { + // If we renamed leaf, we have to remove it first + if (it.first != it.second) { + node.remove(it.first) + } + // Put new properties of leaf + node.put(it.second, it.third) + } } } @@ -134,30 +170,39 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ } val indices = getIndexResponse.indices() + // Run through each backing index and apply appropriate mappings to query index indices?.forEach { indexName -> if (clusterState.routingTable.hasIndex(indexName)) { val indexMetadata = clusterState.metadata.index(indexName) if (indexMetadata.mapping()?.sourceAsMap?.get("properties") != null) { val properties = ( (indexMetadata.mapping()?.sourceAsMap?.get("properties")) - as Map> + as MutableMap ) - - val updatedProperties = properties.entries.associate { - var (fieldName, fieldProps) = extractField(it.value as MutableMap, it.key) - val newProps = fieldProps - if (monitor.dataSources.queryIndexMappingsByType.isNotEmpty()) { - val mappingsByType = monitor.dataSources.queryIndexMappingsByType - if (it.value.containsKey("type") && mappingsByType.containsKey(it.value["type"]!!)) { - mappingsByType[it.value["type"]]?.entries?.forEach { iter: Map.Entry -> - newProps[iter.key] = iter.value + // Node processor function is used to process leaves of index mappings tree + // + val leafNodeProcessor = + fun(fieldName: String, props: MutableMap): Triple> { + val newProps = props.toMutableMap() + if (monitor.dataSources.queryIndexMappingsByType.isNotEmpty()) { + val mappingsByType = monitor.dataSources.queryIndexMappingsByType + if (props.containsKey("type") && mappingsByType.containsKey(props["type"]!!)) { + mappingsByType[props["type"]]?.entries?.forEach { iter: Map.Entry -> + newProps[iter.key] = iter.value + } } } + if (props.containsKey("path")) { + newProps["path"] = "${props["path"]}_${indexName}_$monitorId" + } + return Triple(fieldName, "${fieldName}_${indexName}_$monitorId", newProps) } + // Traverse and update index mappings here while extracting flatten field paths + val flattenPaths = mutableListOf() + traverseMappingsAndUpdate(properties, "", leafNodeProcessor, flattenPaths) + // Updated mappings ready to be applied on queryIndex + val updatedProperties = properties - if (fieldProps.containsKey("path")) newProps["path"] = "${fieldProps["path"]}_${indexName}_$monitorId" - "${fieldName}_${indexName}_$monitorId" to newProps - } val queryIndex = monitor.dataSources.queryIndex val updateMappingRequest = PutMappingRequest(queryIndex) @@ -170,8 +215,8 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ val indexRequests = mutableListOf() queries.forEach { var query = it.query - properties.forEach { prop -> - query = query.replace("${prop.key}:", "${prop.key}_${indexName}_$monitorId:") + flattenPaths.forEach { fieldPath -> + query = query.replace("$fieldPath:", "${fieldPath}_${indexName}_$monitorId:") } val indexRequest = IndexRequest(queryIndex) .id(it.id + "_${indexName}_$monitorId") diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt index fabd162b6..ac1d271d4 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt @@ -34,6 +34,7 @@ import org.opensearch.test.OpenSearchTestCase import java.time.ZonedDateTime import java.time.format.DateTimeFormatter import java.time.temporal.ChronoUnit.MILLIS +import java.util.Map import java.util.concurrent.TimeUnit import java.util.stream.Collectors @@ -129,37 +130,120 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() { } fun `test execute monitor with custom query index`() { - val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3") - val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery)) + val docQuery1 = DocLevelQuery(query = "source.ip.v6.v1:12345", name = "3") + val docQuery2 = DocLevelQuery(query = "source.ip.v6.v2:16645", name = "4") + val docQuery3 = DocLevelQuery(query = "source.ip.v4.v0:120", name = "5") + val docQuery4 = DocLevelQuery(query = "alias.some.fff:\"us-west-2\"", name = "6") + val docQuery5 = DocLevelQuery(query = "message:\"This is an error from IAD region\"", name = "7") + val docLevelInput = DocLevelMonitorInput( + "description", listOf(index), listOf(docQuery1, docQuery2, docQuery3, docQuery4, docQuery5) + ) val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val customFindingsIndex = "custom_findings_index" + val customFindingsIndexPattern = "custom_findings_index-1" val customQueryIndex = "custom_alerts_index" var monitor = randomDocumentLevelMonitor( inputs = listOf(docLevelInput), triggers = listOf(trigger), - dataSources = DataSources(queryIndex = customQueryIndex) + dataSources = DataSources( + queryIndex = customQueryIndex, + findingsIndex = customFindingsIndex, + findingsIndexPattern = customFindingsIndexPattern + ) ) val monitorResponse = createMonitor(monitor) val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) + // Trying to test here few different "nesting" situations and "wierd" characters val testDoc = """{ "message" : "This is an error from IAD region", - "source.port": 12345, + "source.ip.v6.v1" : 12345, + "source.ip.v6.v2" : 16645, + "source.ip.v4.v0" : 120, + "test_bad_char" : "\u0000", "test_strict_date_time" : "$testTime", - "test_field" : "us-west-2" + "test_field.some_other_field" : "us-west-2" }""" indexDoc(index, "1", testDoc) + client().admin().indices().putMapping( + PutMappingRequest(index).source("alias.some.fff", "type=alias,path=test_field.some_other_field") + ) assertFalse(monitorResponse?.id.isNullOrEmpty()) monitor = monitorResponse!!.monitor + val id = monitorResponse.id + val executeMonitorResponse = executeMonitor(monitor, id, false) + Assert.assertEquals(executeMonitorResponse!!.monitorRunResult.monitorName, monitor.name) + Assert.assertEquals(executeMonitorResponse.monitorRunResult.triggerResults.size, 1) + searchAlerts(id) + val table = Table("asc", "id", null, 1, 0, "") + var getAlertsResponse = client() + .execute(AlertingActions.GET_ALERTS_ACTION_TYPE, GetAlertsRequest(table, "ALL", "ALL", null, null)) + .get() + Assert.assertTrue(getAlertsResponse != null) + Assert.assertTrue(getAlertsResponse.alerts.size == 1) + val findings = searchFindings(id, customFindingsIndex) + assertEquals("Findings saved for test monitor", 1, findings.size) + assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("1")) + assertEquals("Didn't match all 5 queries", 5, findings[0].docLevelQueries.size) + } + + fun `test execute monitor with custom query index and nested mappings`() { + val docQuery1 = DocLevelQuery(query = "message:\"msg 1 2 3 4\"", name = "3") + val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery1)) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val customFindingsIndex = "custom_findings_index" + val customFindingsIndexPattern = "custom_findings_index-1" + val customQueryIndex = "custom_alerts_index" + var monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger), + dataSources = DataSources( + queryIndex = customQueryIndex, + findingsIndex = customFindingsIndex, + findingsIndexPattern = customFindingsIndexPattern + ) + ) + val monitorResponse = createMonitor(monitor) + + // We are verifying here that index with nested mappings and nested aliases + // won't break query matching + + // Create index mappings + val m: MutableMap = HashMap() + val m1: MutableMap = HashMap() + m1["title"] = Map.of("type", "text") + m1["category"] = Map.of("type", "keyword") + m["rule"] = Map.of("type", "nested", "properties", m1) + val properties = Map.of("properties", m) + client().admin().indices().putMapping( PutMappingRequest( index - ).source("test_alias.field_a", "type=alias,path=message") + ).source(properties) ).get() + + // Put alias for nested fields + val mm: MutableMap = HashMap() + val mm1: MutableMap = HashMap() + mm1["title_alias"] = Map.of("type", "alias", "path", "rule.title") + mm["rule"] = Map.of("type", "nested", "properties", mm1) + val properties1 = Map.of("properties", mm) client().admin().indices().putMapping( PutMappingRequest( index - ).source("test_alias2", "type=alias,path=test_field") + ).source(properties1) ).get() + val testDoc = """{ + "rule": {"title": "some_title"}, + "message": "msg 1 2 3 4" + }""" + indexDoc(index, "2", testDoc) + + client().admin().indices().putMapping( + PutMappingRequest(index).source("alias.some.fff", "type=alias,path=test_field.some_other_field") + ) + assertFalse(monitorResponse?.id.isNullOrEmpty()) + monitor = monitorResponse!!.monitor val id = monitorResponse.id val executeMonitorResponse = executeMonitor(monitor, id, false) Assert.assertEquals(executeMonitorResponse!!.monitorRunResult.monitorName, monitor.name) @@ -171,11 +255,10 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() { .get() Assert.assertTrue(getAlertsResponse != null) Assert.assertTrue(getAlertsResponse.alerts.size == 1) - getAlertsResponse = client() - .execute(AlertingActions.GET_ALERTS_ACTION_TYPE, GetAlertsRequest(table, "ALL", "ALL", id, null)) - .get() - Assert.assertTrue(getAlertsResponse != null) - Assert.assertTrue(getAlertsResponse.alerts.size == 1) + val findings = searchFindings(id, customFindingsIndex) + assertEquals("Findings saved for test monitor", 1, findings.size) + assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("2")) + assertEquals("Didn't match all 4 queries", 1, findings[0].docLevelQueries.size) } fun `test execute monitor with custom query index and custom field mappings`() {