Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mappings traversal bug fix (#669) backport to 2.4 #674

Merged
merged 1 commit into from
Nov 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ private val log = LogManager.getLogger(DocLevelMonitorQueries::class.java)

class DocLevelMonitorQueries(private val client: Client, private val clusterService: ClusterService) {
companion object {

val PROPERTIES = "properties"
val NESTED = "nested"
val TYPE = "type"

@JvmStatic
fun docLevelQueriesMappings(): String {
return DocLevelMonitorQueries::class.java.classLoader.getResource("mappings/doc-level-queries.json").readText()
Expand Down Expand Up @@ -95,6 +100,58 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ
return clusterState.routingTable.hasIndex(ScheduledJob.DOC_LEVEL_QUERIES_INDEX)
}

/**
* Does a DFS traversal of index mappings tree.
* Calls processLeafFn on every leaf node.
* Populates flattenPaths list with full paths of leaf nodes
* @param node current node which we're visiting
* @param currentPath current node path from root node
* @param processLeafFn leaf processor function which is called on every leaf discovered
* @param flattenPaths list of full paths of all leaf nodes relative to root
*/
fun traverseMappingsAndUpdate(
node: MutableMap<String, Any>,
currentPath: String,
processLeafFn: (String, MutableMap<String, Any>) -> Triple<String, String, MutableMap<String, Any>>,
flattenPaths: MutableList<String>
) {
// If node contains "properties" property then it is internal(non-leaf) node
if (node.containsKey(PROPERTIES)) {
return traverseMappingsAndUpdate(node.get(PROPERTIES) as MutableMap<String, Any>, currentPath, processLeafFn, flattenPaths)
} else if (node.containsKey(TYPE) == false) {
// If there is no "type" property, this is either internal(non-leaf) node or leaf node
// newNodes will hold list of updated leaf properties
var newNodes = ArrayList<Triple<String, String, Any>>(node.size)
node.entries.forEach {
// Compute full path relative to root
val fullPath = if (currentPath.isEmpty()) it.key
else "$currentPath.${it.key}"
val nodeProps = it.value as MutableMap<String, Any>
// If it has type property and type is not "nested" then this is a leaf
if (nodeProps.containsKey(TYPE) && nodeProps[TYPE] != NESTED) {
// At this point we know full path of node, so we add it to output array
flattenPaths.add(fullPath)
// Calls processLeafFn and gets old node name, new node name and new properties of node.
// This is all information we need to update this node
val (oldName, newName, props) = processLeafFn(it.key, it.value as MutableMap<String, Any>)
newNodes.add(Triple(oldName, newName, props))
} else {
// Internal(non-leaf) node - visit children
traverseMappingsAndUpdate(nodeProps[PROPERTIES] as MutableMap<String, Any>, fullPath, processLeafFn, flattenPaths)
}
}
// Here we can update all processed leaves in tree
newNodes.forEach {
// If we renamed leaf, we have to remove it first
if (it.first != it.second) {
node.remove(it.first)
}
// Put new properties of leaf
node.put(it.second, it.third)
}
}
}

suspend fun indexDocLevelQueries(
monitor: Monitor,
monitorId: String,
Expand All @@ -113,28 +170,39 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ
}
val indices = getIndexResponse.indices()

// Run through each backing index and apply appropriate mappings to query index
indices?.forEach { indexName ->
if (clusterState.routingTable.hasIndex(indexName)) {
val indexMetadata = clusterState.metadata.index(indexName)
if (indexMetadata.mapping()?.sourceAsMap?.get("properties") != null) {
val properties = (
(indexMetadata.mapping()?.sourceAsMap?.get("properties"))
as Map<String, Map<String, Any>>
as MutableMap<String, Any>
)

val updatedProperties = properties.entries.associate {
val newVal = it.value.toMutableMap()
if (monitor.dataSources.queryIndexMappingsByType.isNotEmpty()) {
val mappingsByType = monitor.dataSources.queryIndexMappingsByType
if (it.value.containsKey("type") && mappingsByType.containsKey(it.value["type"]!!)) {
mappingsByType[it.value["type"]]?.entries?.forEach { iter: Map.Entry<String, String> ->
newVal[iter.key] = iter.value
// Node processor function is used to process leaves of index mappings tree
//
val leafNodeProcessor =
fun(fieldName: String, props: MutableMap<String, Any>): Triple<String, String, MutableMap<String, Any>> {
val newProps = props.toMutableMap()
if (monitor.dataSources.queryIndexMappingsByType.isNotEmpty()) {
val mappingsByType = monitor.dataSources.queryIndexMappingsByType
if (props.containsKey("type") && mappingsByType.containsKey(props["type"]!!)) {
mappingsByType[props["type"]]?.entries?.forEach { iter: Map.Entry<String, String> ->
newProps[iter.key] = iter.value
}
}
}
if (props.containsKey("path")) {
newProps["path"] = "${props["path"]}_${indexName}_$monitorId"
}
return Triple(fieldName, "${fieldName}_${indexName}_$monitorId", newProps)
}
if (it.value.containsKey("path")) newVal["path"] = "${it.value["path"]}_${indexName}_$monitorId"
"${it.key}_${indexName}_$monitorId" to newVal
}
// Traverse and update index mappings here while extracting flatten field paths
val flattenPaths = mutableListOf<String>()
traverseMappingsAndUpdate(properties, "", leafNodeProcessor, flattenPaths)
// Updated mappings ready to be applied on queryIndex
val updatedProperties = properties

val queryIndex = monitor.dataSources.queryIndex

val updateMappingRequest = PutMappingRequest(queryIndex)
Expand All @@ -147,8 +215,8 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ
val indexRequests = mutableListOf<IndexRequest>()
queries.forEach {
var query = it.query
properties.forEach { prop ->
query = query.replace("${prop.key}:", "${prop.key}_${indexName}_$monitorId:")
flattenPaths.forEach { fieldPath ->
query = query.replace("$fieldPath:", "${fieldPath}_${indexName}_$monitorId:")
}
val indexRequest = IndexRequest(queryIndex)
.id(it.id + "_${indexName}_$monitorId")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package org.opensearch.alerting
import org.junit.Assert
import org.opensearch.action.admin.cluster.state.ClusterStateRequest
import org.opensearch.action.admin.indices.create.CreateIndexRequest
import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest
import org.opensearch.action.admin.indices.refresh.RefreshRequest
import org.opensearch.action.search.SearchRequest
import org.opensearch.action.support.WriteRequest
Expand All @@ -33,6 +34,7 @@ import org.opensearch.test.OpenSearchTestCase
import java.time.ZonedDateTime
import java.time.format.DateTimeFormatter
import java.time.temporal.ChronoUnit.MILLIS
import java.util.Map
import java.util.concurrent.TimeUnit
import java.util.stream.Collectors

Expand Down Expand Up @@ -128,25 +130,45 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
}

fun `test execute monitor with custom query index`() {
val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3")
val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery))
val docQuery1 = DocLevelQuery(query = "source.ip.v6.v1:12345", name = "3")
val docQuery2 = DocLevelQuery(query = "source.ip.v6.v2:16645", name = "4")
val docQuery3 = DocLevelQuery(query = "source.ip.v4.v0:120", name = "5")
val docQuery4 = DocLevelQuery(query = "alias.some.fff:\"us-west-2\"", name = "6")
val docQuery5 = DocLevelQuery(query = "message:\"This is an error from IAD region\"", name = "7")
val docLevelInput = DocLevelMonitorInput(
"description", listOf(index), listOf(docQuery1, docQuery2, docQuery3, docQuery4, docQuery5)
)
val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
val customFindingsIndex = "custom_findings_index"
val customFindingsIndexPattern = "custom_findings_index-1"
val customQueryIndex = "custom_alerts_index"
var monitor = randomDocumentLevelMonitor(
inputs = listOf(docLevelInput),
triggers = listOf(trigger),
dataSources = DataSources(queryIndex = customQueryIndex)
dataSources = DataSources(
queryIndex = customQueryIndex,
findingsIndex = customFindingsIndex,
findingsIndexPattern = customFindingsIndexPattern
)
)
val monitorResponse = createMonitor(monitor)
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
// Trying to test here few different "nesting" situations and "wierd" characters
val testDoc = """{
"message" : "This is an error from IAD region",
"source.ip.v6.v1" : 12345,
"source.ip.v6.v2" : 16645,
"source.ip.v4.v0" : 120,
"test_bad_char" : "\u0000",
"test_strict_date_time" : "$testTime",
"test_field" : "us-west-2"
"test_field.some_other_field" : "us-west-2"
}"""
indexDoc(index, "1", testDoc)
client().admin().indices().putMapping(
PutMappingRequest(index).source("alias.some.fff", "type=alias,path=test_field.some_other_field")
)
assertFalse(monitorResponse?.id.isNullOrEmpty())
monitor = monitorResponse!!.monitor
indexDoc(index, "1", testDoc)
val id = monitorResponse.id
val executeMonitorResponse = executeMonitor(monitor, id, false)
Assert.assertEquals(executeMonitorResponse!!.monitorRunResult.monitorName, monitor.name)
Expand All @@ -158,11 +180,85 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
.get()
Assert.assertTrue(getAlertsResponse != null)
Assert.assertTrue(getAlertsResponse.alerts.size == 1)
getAlertsResponse = client()
.execute(AlertingActions.GET_ALERTS_ACTION_TYPE, GetAlertsRequest(table, "ALL", "ALL", id, null))
val findings = searchFindings(id, customFindingsIndex)
assertEquals("Findings saved for test monitor", 1, findings.size)
assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("1"))
assertEquals("Didn't match all 5 queries", 5, findings[0].docLevelQueries.size)
}

fun `test execute monitor with custom query index and nested mappings`() {
val docQuery1 = DocLevelQuery(query = "message:\"msg 1 2 3 4\"", name = "3")
val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery1))
val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
val customFindingsIndex = "custom_findings_index"
val customFindingsIndexPattern = "custom_findings_index-1"
val customQueryIndex = "custom_alerts_index"
var monitor = randomDocumentLevelMonitor(
inputs = listOf(docLevelInput),
triggers = listOf(trigger),
dataSources = DataSources(
queryIndex = customQueryIndex,
findingsIndex = customFindingsIndex,
findingsIndexPattern = customFindingsIndexPattern
)
)
val monitorResponse = createMonitor(monitor)

// We are verifying here that index with nested mappings and nested aliases
// won't break query matching

// Create index mappings
val m: MutableMap<String, Any> = HashMap()
val m1: MutableMap<String, Any> = HashMap()
m1["title"] = Map.of("type", "text")
m1["category"] = Map.of("type", "keyword")
m["rule"] = Map.of("type", "nested", "properties", m1)
val properties = Map.of<String, Any>("properties", m)

client().admin().indices().putMapping(
PutMappingRequest(
index
).source(properties)
).get()

// Put alias for nested fields
val mm: MutableMap<String, Any> = HashMap()
val mm1: MutableMap<String, Any> = HashMap()
mm1["title_alias"] = Map.of("type", "alias", "path", "rule.title")
mm["rule"] = Map.of("type", "nested", "properties", mm1)
val properties1 = Map.of<String, Any>("properties", mm)
client().admin().indices().putMapping(
PutMappingRequest(
index
).source(properties1)
).get()

val testDoc = """{
"rule": {"title": "some_title"},
"message": "msg 1 2 3 4"
}"""
indexDoc(index, "2", testDoc)

client().admin().indices().putMapping(
PutMappingRequest(index).source("alias.some.fff", "type=alias,path=test_field.some_other_field")
)
assertFalse(monitorResponse?.id.isNullOrEmpty())
monitor = monitorResponse!!.monitor
val id = monitorResponse.id
val executeMonitorResponse = executeMonitor(monitor, id, false)
Assert.assertEquals(executeMonitorResponse!!.monitorRunResult.monitorName, monitor.name)
Assert.assertEquals(executeMonitorResponse.monitorRunResult.triggerResults.size, 1)
searchAlerts(id)
val table = Table("asc", "id", null, 1, 0, "")
var getAlertsResponse = client()
.execute(AlertingActions.GET_ALERTS_ACTION_TYPE, GetAlertsRequest(table, "ALL", "ALL", null, null))
.get()
Assert.assertTrue(getAlertsResponse != null)
Assert.assertTrue(getAlertsResponse.alerts.size == 1)
val findings = searchFindings(id, customFindingsIndex)
assertEquals("Findings saved for test monitor", 1, findings.size)
assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("2"))
assertEquals("Didn't match all 4 queries", 1, findings[0].docLevelQueries.size)
}

fun `test execute monitor with custom query index and custom field mappings`() {
Expand Down Expand Up @@ -373,7 +469,6 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {

val customAlertsIndex = "custom_alerts_index"
val customQueryIndex = "custom_query_index"
Assert.assertFalse(client().admin().cluster().state(ClusterStateRequest()).get().state.routingTable.hasIndex(customQueryIndex))
val customFindingsIndex = "custom_findings_index"
val updateMonitorResponse = updateMonitor(
monitor.copy(
Expand Down