diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt index 98693def1..3ca2a9e24 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt @@ -32,6 +32,11 @@ private val log = LogManager.getLogger(DocLevelMonitorQueries::class.java) class DocLevelMonitorQueries(private val client: Client, private val clusterService: ClusterService) { companion object { + + val PROPERTIES = "properties" + val NESTED = "nested" + val TYPE = "type" + @JvmStatic fun docLevelQueriesMappings(): String { return DocLevelMonitorQueries::class.java.classLoader.getResource("mappings/doc-level-queries.json").readText() @@ -95,6 +100,22 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ return clusterState.routingTable.hasIndex(ScheduledJob.DOC_LEVEL_QUERIES_INDEX) } + /** + * From given index mapping node, extracts fieldName -> fieldProperties pair + */ + fun extractField(node: MutableMap, currentPath: String): Pair> { + if (node.containsKey(PROPERTIES)) { + return extractField(node.get(PROPERTIES) as MutableMap, currentPath) + } else if (node.containsKey(NESTED)) { + return extractField(node.get(NESTED) as MutableMap, currentPath) + } else if (node.size == 1 && node.containsKey(TYPE) == false) { + val iter = node.iterator().next() + return extractField(iter.value as MutableMap, currentPath + "." + iter.key) + } else { + return Pair(currentPath, node) + } + } + suspend fun indexDocLevelQueries( monitor: Monitor, monitorId: String, @@ -123,17 +144,19 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ ) val updatedProperties = properties.entries.associate { - val newVal = it.value.toMutableMap() + var (fieldName, fieldProps) = extractField(it.value as MutableMap, it.key) + val newProps = fieldProps if (monitor.dataSources.queryIndexMappingsByType.isNotEmpty()) { val mappingsByType = monitor.dataSources.queryIndexMappingsByType if (it.value.containsKey("type") && mappingsByType.containsKey(it.value["type"]!!)) { mappingsByType[it.value["type"]]?.entries?.forEach { iter: Map.Entry -> - newVal[iter.key] = iter.value + newProps[iter.key] = iter.value } } } - if (it.value.containsKey("path")) newVal["path"] = "${it.value["path"]}_${indexName}_$monitorId" - "${it.key}_${indexName}_$monitorId" to newVal + + if (fieldProps.containsKey("path")) newProps["path"] = "${fieldProps["path"]}_${indexName}_$monitorId" + "${fieldName}_${indexName}_$monitorId" to newProps } val queryIndex = monitor.dataSources.queryIndex diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt index f4575cbbe..fabd162b6 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt @@ -8,6 +8,7 @@ package org.opensearch.alerting import org.junit.Assert import org.opensearch.action.admin.cluster.state.ClusterStateRequest import org.opensearch.action.admin.indices.create.CreateIndexRequest +import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest import org.opensearch.action.admin.indices.refresh.RefreshRequest import org.opensearch.action.search.SearchRequest import org.opensearch.action.support.WriteRequest @@ -141,12 +142,24 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() { val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) val testDoc = """{ "message" : "This is an error from IAD region", + "source.port": 12345, "test_strict_date_time" : "$testTime", "test_field" : "us-west-2" }""" + indexDoc(index, "1", testDoc) assertFalse(monitorResponse?.id.isNullOrEmpty()) monitor = monitorResponse!!.monitor - indexDoc(index, "1", testDoc) + client().admin().indices().putMapping( + PutMappingRequest( + index + ).source("test_alias.field_a", "type=alias,path=message") + ).get() + client().admin().indices().putMapping( + PutMappingRequest( + index + ).source("test_alias2", "type=alias,path=test_field") + ).get() + val id = monitorResponse.id val executeMonitorResponse = executeMonitor(monitor, id, false) Assert.assertEquals(executeMonitorResponse!!.monitorRunResult.monitorName, monitor.name)