Skip to content

Commit

Permalink
Added support for "nested" mappings (opensearch-project#645)
Browse files Browse the repository at this point in the history
* example

Signed-off-by: Petar Dzepina <petar.dzepina@gmail.com>

* fixed updating mappings for queryIndex

Signed-off-by: Petar Dzepina <petar.dzepina@gmail.com>

Signed-off-by: Petar Dzepina <petar.dzepina@gmail.com>
  • Loading branch information
petardz authored and eirsep committed Nov 9, 2022
1 parent bf63b34 commit 51712a2
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ private val log = LogManager.getLogger(DocLevelMonitorQueries::class.java)

class DocLevelMonitorQueries(private val client: Client, private val clusterService: ClusterService) {
companion object {

val PROPERTIES = "properties"
val NESTED = "nested"
val TYPE = "type"

@JvmStatic
fun docLevelQueriesMappings(): String {
return DocLevelMonitorQueries::class.java.classLoader.getResource("mappings/doc-level-queries.json").readText()
Expand Down Expand Up @@ -95,6 +100,22 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ
return clusterState.routingTable.hasIndex(ScheduledJob.DOC_LEVEL_QUERIES_INDEX)
}

/**
* From given index mapping node, extracts fieldName -> fieldProperties pair
*/
fun extractField(node: MutableMap<String, Any>, currentPath: String): Pair<String, MutableMap<String, Any>> {
if (node.containsKey(PROPERTIES)) {
return extractField(node.get(PROPERTIES) as MutableMap<String, Any>, currentPath)
} else if (node.containsKey(NESTED)) {
return extractField(node.get(NESTED) as MutableMap<String, Any>, currentPath)
} else if (node.size == 1 && node.containsKey(TYPE) == false) {
val iter = node.iterator().next()
return extractField(iter.value as MutableMap<String, Any>, currentPath + "." + iter.key)
} else {
return Pair(currentPath, node)
}
}

suspend fun indexDocLevelQueries(
monitor: Monitor,
monitorId: String,
Expand Down Expand Up @@ -123,17 +144,19 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ
)

val updatedProperties = properties.entries.associate {
val newVal = it.value.toMutableMap()
var (fieldName, fieldProps) = extractField(it.value as MutableMap<String, Any>, it.key)
val newProps = fieldProps
if (monitor.dataSources.queryIndexMappingsByType.isNotEmpty()) {
val mappingsByType = monitor.dataSources.queryIndexMappingsByType
if (it.value.containsKey("type") && mappingsByType.containsKey(it.value["type"]!!)) {
mappingsByType[it.value["type"]]?.entries?.forEach { iter: Map.Entry<String, String> ->
newVal[iter.key] = iter.value
newProps[iter.key] = iter.value
}
}
}
if (it.value.containsKey("path")) newVal["path"] = "${it.value["path"]}_${indexName}_$monitorId"
"${it.key}_${indexName}_$monitorId" to newVal

if (fieldProps.containsKey("path")) newProps["path"] = "${fieldProps["path"]}_${indexName}_$monitorId"
"${fieldName}_${indexName}_$monitorId" to newProps
}
val queryIndex = monitor.dataSources.queryIndex

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package org.opensearch.alerting
import org.junit.Assert
import org.opensearch.action.admin.cluster.state.ClusterStateRequest
import org.opensearch.action.admin.indices.create.CreateIndexRequest
import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest
import org.opensearch.action.admin.indices.refresh.RefreshRequest
import org.opensearch.action.search.SearchRequest
import org.opensearch.action.support.WriteRequest
Expand Down Expand Up @@ -141,12 +142,24 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
val testDoc = """{
"message" : "This is an error from IAD region",
"source.port": 12345,
"test_strict_date_time" : "$testTime",
"test_field" : "us-west-2"
}"""
indexDoc(index, "1", testDoc)
assertFalse(monitorResponse?.id.isNullOrEmpty())
monitor = monitorResponse!!.monitor
indexDoc(index, "1", testDoc)
client().admin().indices().putMapping(
PutMappingRequest(
index
).source("test_alias.field_a", "type=alias,path=message")
).get()
client().admin().indices().putMapping(
PutMappingRequest(
index
).source("test_alias2", "type=alias,path=test_field")
).get()

val id = monitorResponse.id
val executeMonitorResponse = executeMonitor(monitor, id, false)
Assert.assertEquals(executeMonitorResponse!!.monitorRunResult.monitorName, monitor.name)
Expand Down

0 comments on commit 51712a2

Please sign in to comment.