Skip to content

Commit

Permalink
Mappings parsing fix (#851)
Browse files Browse the repository at this point in the history
* fixed mappings parsing when field name named "properties" exists in mappings

Signed-off-by: Petar Dzepina <petar.dzepina@gmail.com>

* message typo fix

Signed-off-by: Petar Dzepina <petar.dzepina@gmail.com>

---------

Signed-off-by: Petar Dzepina <petar.dzepina@gmail.com>
  • Loading branch information
petardz authored Apr 17, 2023
1 parent e0b7a5a commit 6a5c041
Show file tree
Hide file tree
Showing 2 changed files with 169 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -160,38 +160,34 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ
) {
// If node contains "properties" property then it is internal(non-leaf) node
log.debug("Node in traverse: $node")
if (node.containsKey(PROPERTIES)) {
return traverseMappingsAndUpdate(node.get(PROPERTIES) as MutableMap<String, Any>, currentPath, processLeafFn, flattenPaths)
} else {
// newNodes will hold list of updated leaf properties
var newNodes = ArrayList<Triple<String, String, Any>>(node.size)
node.entries.forEach {
// Compute full path relative to root
val fullPath = if (currentPath.isEmpty()) it.key
else "$currentPath.${it.key}"
val nodeProps = it.value as MutableMap<String, Any>
// If it has type property and type is not "nested" then this is a leaf
if (nodeProps.containsKey(TYPE) && nodeProps[TYPE] != NESTED) {
// At this point we know full path of node, so we add it to output array
flattenPaths.add(fullPath)
// Calls processLeafFn and gets old node name, new node name and new properties of node.
// This is all information we need to update this node
val (oldName, newName, props) = processLeafFn(it.key, it.value as MutableMap<String, Any>)
newNodes.add(Triple(oldName, newName, props))
} else {
// Internal(non-leaf) node - visit children
traverseMappingsAndUpdate(nodeProps[PROPERTIES] as MutableMap<String, Any>, fullPath, processLeafFn, flattenPaths)
}
// newNodes will hold list of updated leaf properties
var newNodes = ArrayList<Triple<String, String, Any>>(node.size)
node.entries.forEach {
// Compute full path relative to root
val fullPath = if (currentPath.isEmpty()) it.key
else "$currentPath.${it.key}"
val nodeProps = it.value as MutableMap<String, Any>
// If it has type property and type is not "nested" then this is a leaf
if (nodeProps.containsKey(TYPE) && nodeProps[TYPE] != NESTED) {
// At this point we know full path of node, so we add it to output array
flattenPaths.add(fullPath)
// Calls processLeafFn and gets old node name, new node name and new properties of node.
// This is all information we need to update this node
val (oldName, newName, props) = processLeafFn(it.key, it.value as MutableMap<String, Any>)
newNodes.add(Triple(oldName, newName, props))
} else {
// Internal(non-leaf) node - visit children
traverseMappingsAndUpdate(nodeProps[PROPERTIES] as MutableMap<String, Any>, fullPath, processLeafFn, flattenPaths)
}
// Here we can update all processed leaves in tree
newNodes.forEach {
// If we renamed leaf, we have to remove it first
if (it.first != it.second) {
node.remove(it.first)
}
// Put new properties of leaf
node.put(it.second, it.third)
}
// Here we can update all processed leaves in tree
newNodes.forEach {
// If we renamed leaf, we have to remove it first
if (it.first != it.second) {
node.remove(it.first)
}
// Put new properties of leaf
node.put(it.second, it.third)
}
}

Expand Down
154 changes: 143 additions & 11 deletions alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt
Original file line number Diff line number Diff line change
Expand Up @@ -141,18 +141,149 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
Assert.assertEquals(acknowledgeAlertResponse.acknowledged.size, 1)
}

fun `test mappings parsing`() {

val index1 = "index_123"
val index2 = "index_456"
val index3 = "index_789"
val index4 = "index_012"
val q1 = DocLevelQuery(query = "properties:\"abcd\"", name = "1")
val q2 = DocLevelQuery(query = "type.properties:\"abcd\"", name = "2")
val q3 = DocLevelQuery(query = "type.something.properties:\"abcd\"", name = "3")
val q4 = DocLevelQuery(query = "type.something.properties.lastone:\"abcd\"", name = "4")

createIndex(index1, Settings.EMPTY)
createIndex(index2, Settings.EMPTY)
createIndex(index3, Settings.EMPTY)
createIndex(index4, Settings.EMPTY)

val m1 = """{
"properties": {
"properties": {
"type": "keyword"
}
}
}
""".trimIndent()
client().admin().indices().putMapping(PutMappingRequest(index1).source(m1, XContentType.JSON)).get()

val m2 = """{
"properties": {
"type": {
"properties": {
"properties": { "type": "keyword" }
}
}
}
}
""".trimIndent()
client().admin().indices().putMapping(PutMappingRequest(index2).source(m2, XContentType.JSON)).get()

val m3 = """{
"properties": {
"type": {
"properties": {
"something": {
"properties" : {
"properties": { "type": "keyword" }
}
}
}
}
}
}
""".trimIndent()
client().admin().indices().putMapping(PutMappingRequest(index3).source(m3, XContentType.JSON)).get()

val m4 = """{
"properties": {
"type": {
"properties": {
"something": {
"properties" : {
"properties": {
"properties": {
"lastone": { "type": "keyword" }
}
}
}
}
}
}
}
}
""".trimIndent()
client().admin().indices().putMapping(PutMappingRequest(index4).source(m4, XContentType.JSON)).get()

val docLevelInput = DocLevelMonitorInput(
"description",
listOf(index1, index2, index3, index4),
listOf(q1, q2, q3, q4)
)
val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
val customFindingsIndex = "custom_findings_index"
val customFindingsIndexPattern = "custom_findings_index-1"
val customQueryIndex = "custom_alerts_index"
var monitor = randomDocumentLevelMonitor(
inputs = listOf(docLevelInput),
triggers = listOf(trigger),
dataSources = DataSources(
queryIndex = customQueryIndex,
findingsIndex = customFindingsIndex,
findingsIndexPattern = customFindingsIndexPattern
)
)
val monitorResponse = createMonitor(monitor)

val testDoc1 = """{
"properties": "abcd"
}"""
indexDoc(index1, "1", testDoc1)
val testDoc2 = """{
"type.properties": "abcd"
}"""
indexDoc(index2, "1", testDoc2)
val testDoc3 = """{
"type.something.properties": "abcd"
}"""
indexDoc(index3, "1", testDoc3)
val testDoc4 = """{
"type.something.properties.lastone": "abcd"
}"""
indexDoc(index4, "1", testDoc4)

assertFalse(monitorResponse?.id.isNullOrEmpty())
monitor = monitorResponse!!.monitor
val id = monitorResponse.id
val executeMonitorResponse = executeMonitor(monitor, id, false)
Assert.assertEquals(executeMonitorResponse!!.monitorRunResult.monitorName, monitor.name)
Assert.assertEquals(executeMonitorResponse.monitorRunResult.triggerResults.size, 1)
searchAlerts(id)
val table = Table("asc", "id", null, 1, 0, "")
var getAlertsResponse = client()
.execute(AlertingActions.GET_ALERTS_ACTION_TYPE, GetAlertsRequest(table, "ALL", "ALL", null, null))
.get()
Assert.assertTrue(getAlertsResponse != null)
Assert.assertTrue(getAlertsResponse.alerts.size == 1)
val findings = searchFindings(id, customFindingsIndex)
assertEquals("Findings saved for test monitor", 4, findings.size)
}

fun `test execute monitor with custom query index`() {
val docQuery1 = DocLevelQuery(query = "source.ip.v6.v1:12345", name = "3")
val docQuery2 = DocLevelQuery(query = "source.ip.v6.v2:16645", name = "4")
val docQuery3 = DocLevelQuery(query = "source.ip.v4.v0:120", name = "5")
val docQuery4 = DocLevelQuery(query = "alias.some.fff:\"us-west-2\"", name = "6")
val docQuery5 = DocLevelQuery(query = "message:\"This is an error from IAD region\"", name = "7")
val docQuery6 = DocLevelQuery(query = "f1.type.f4:\"hello\"", name = "8")
val docQuery7 = DocLevelQuery(query = "f1.type.f2.f3:\"world\"", name = "9")
val docQuery8 = DocLevelQuery(query = "type:\"some type\"", name = "10")
val q1 = DocLevelQuery(query = "source.ip.v6.v1:12345", name = "3")
val q2 = DocLevelQuery(query = "source.ip.v6.v2:16645", name = "4")
val q3 = DocLevelQuery(query = "source.ip.v4.v0:120", name = "5")
val q4 = DocLevelQuery(query = "alias.some.fff:\"us-west-2\"", name = "6")
val q5 = DocLevelQuery(query = "message:\"This is an error from IAD region\"", name = "7")
val q6 = DocLevelQuery(query = "f1.type.f4:\"hello\"", name = "8")
val q7 = DocLevelQuery(query = "f1.type.f2.f3:\"world\"", name = "9")
val q8 = DocLevelQuery(query = "type:\"some type\"", name = "10")
val q9 = DocLevelQuery(query = "properties:123", name = "11")

val docLevelInput = DocLevelMonitorInput(
"description", listOf(index), listOf(docQuery1, docQuery2, docQuery3, docQuery4, docQuery5, docQuery6, docQuery7, docQuery8)
"description",
listOf(index),
listOf(q1, q2, q3, q4, q5, q6, q7, q8, q9)
)
val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
val customFindingsIndex = "custom_findings_index"
Expand Down Expand Up @@ -180,7 +311,8 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
"test_field.some_other_field" : "us-west-2",
"f1.type.f2.f3" : "world",
"f1.type.f4" : "hello",
"type" : "some type"
"type" : "some type",
"properties": 123
}"""
indexDoc(index, "1", testDoc)
client().admin().indices().putMapping(
Expand All @@ -207,7 +339,7 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
val findings = searchFindings(id, customFindingsIndex)
assertEquals("Findings saved for test monitor", 1, findings.size)
assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("1"))
assertEquals("Didn't match all 8 queries", 8, findings[0].docLevelQueries.size)
assertEquals("Didn't match all 9 queries", 9, findings[0].docLevelQueries.size)
}

fun `test execute monitor with non-flattened json doc as source`() {
Expand Down

0 comments on commit 6a5c041

Please sign in to comment.