Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.7] Mappings parsing fix #871

Merged
merged 1 commit into from
Apr 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -160,38 +160,34 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ
) {
// If node contains "properties" property then it is internal(non-leaf) node
log.debug("Node in traverse: $node")
if (node.containsKey(PROPERTIES)) {
return traverseMappingsAndUpdate(node.get(PROPERTIES) as MutableMap<String, Any>, currentPath, processLeafFn, flattenPaths)
} else {
// newNodes will hold list of updated leaf properties
var newNodes = ArrayList<Triple<String, String, Any>>(node.size)
node.entries.forEach {
// Compute full path relative to root
val fullPath = if (currentPath.isEmpty()) it.key
else "$currentPath.${it.key}"
val nodeProps = it.value as MutableMap<String, Any>
// If it has type property and type is not "nested" then this is a leaf
if (nodeProps.containsKey(TYPE) && nodeProps[TYPE] != NESTED) {
// At this point we know full path of node, so we add it to output array
flattenPaths.add(fullPath)
// Calls processLeafFn and gets old node name, new node name and new properties of node.
// This is all information we need to update this node
val (oldName, newName, props) = processLeafFn(it.key, it.value as MutableMap<String, Any>)
newNodes.add(Triple(oldName, newName, props))
} else {
// Internal(non-leaf) node - visit children
traverseMappingsAndUpdate(nodeProps[PROPERTIES] as MutableMap<String, Any>, fullPath, processLeafFn, flattenPaths)
}
// newNodes will hold list of updated leaf properties
var newNodes = ArrayList<Triple<String, String, Any>>(node.size)
node.entries.forEach {
// Compute full path relative to root
val fullPath = if (currentPath.isEmpty()) it.key
else "$currentPath.${it.key}"
val nodeProps = it.value as MutableMap<String, Any>
// If it has type property and type is not "nested" then this is a leaf
if (nodeProps.containsKey(TYPE) && nodeProps[TYPE] != NESTED) {
// At this point we know full path of node, so we add it to output array
flattenPaths.add(fullPath)
// Calls processLeafFn and gets old node name, new node name and new properties of node.
// This is all information we need to update this node
val (oldName, newName, props) = processLeafFn(it.key, it.value as MutableMap<String, Any>)
newNodes.add(Triple(oldName, newName, props))
} else {
// Internal(non-leaf) node - visit children
traverseMappingsAndUpdate(nodeProps[PROPERTIES] as MutableMap<String, Any>, fullPath, processLeafFn, flattenPaths)
}
// Here we can update all processed leaves in tree
newNodes.forEach {
// If we renamed leaf, we have to remove it first
if (it.first != it.second) {
node.remove(it.first)
}
// Put new properties of leaf
node.put(it.second, it.third)
}
// Here we can update all processed leaves in tree
newNodes.forEach {
// If we renamed leaf, we have to remove it first
if (it.first != it.second) {
node.remove(it.first)
}
// Put new properties of leaf
node.put(it.second, it.third)
}
}

Expand Down
154 changes: 143 additions & 11 deletions alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt
Original file line number Diff line number Diff line change
Expand Up @@ -141,18 +141,149 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
Assert.assertEquals(acknowledgeAlertResponse.acknowledged.size, 1)
}

fun `test mappings parsing`() {

val index1 = "index_123"
val index2 = "index_456"
val index3 = "index_789"
val index4 = "index_012"
val q1 = DocLevelQuery(query = "properties:\"abcd\"", name = "1")
val q2 = DocLevelQuery(query = "type.properties:\"abcd\"", name = "2")
val q3 = DocLevelQuery(query = "type.something.properties:\"abcd\"", name = "3")
val q4 = DocLevelQuery(query = "type.something.properties.lastone:\"abcd\"", name = "4")

createIndex(index1, Settings.EMPTY)
createIndex(index2, Settings.EMPTY)
createIndex(index3, Settings.EMPTY)
createIndex(index4, Settings.EMPTY)

val m1 = """{
"properties": {
"properties": {
"type": "keyword"
}
}
}
""".trimIndent()
client().admin().indices().putMapping(PutMappingRequest(index1).source(m1, XContentType.JSON)).get()

val m2 = """{
"properties": {
"type": {
"properties": {
"properties": { "type": "keyword" }
}
}
}
}
""".trimIndent()
client().admin().indices().putMapping(PutMappingRequest(index2).source(m2, XContentType.JSON)).get()

val m3 = """{
"properties": {
"type": {
"properties": {
"something": {
"properties" : {
"properties": { "type": "keyword" }
}
}
}
}
}
}
""".trimIndent()
client().admin().indices().putMapping(PutMappingRequest(index3).source(m3, XContentType.JSON)).get()

val m4 = """{
"properties": {
"type": {
"properties": {
"something": {
"properties" : {
"properties": {
"properties": {
"lastone": { "type": "keyword" }
}
}
}
}
}
}
}
}
""".trimIndent()
client().admin().indices().putMapping(PutMappingRequest(index4).source(m4, XContentType.JSON)).get()

val docLevelInput = DocLevelMonitorInput(
"description",
listOf(index1, index2, index3, index4),
listOf(q1, q2, q3, q4)
)
val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
val customFindingsIndex = "custom_findings_index"
val customFindingsIndexPattern = "custom_findings_index-1"
val customQueryIndex = "custom_alerts_index"
var monitor = randomDocumentLevelMonitor(
inputs = listOf(docLevelInput),
triggers = listOf(trigger),
dataSources = DataSources(
queryIndex = customQueryIndex,
findingsIndex = customFindingsIndex,
findingsIndexPattern = customFindingsIndexPattern
)
)
val monitorResponse = createMonitor(monitor)

val testDoc1 = """{
"properties": "abcd"
}"""
indexDoc(index1, "1", testDoc1)
val testDoc2 = """{
"type.properties": "abcd"
}"""
indexDoc(index2, "1", testDoc2)
val testDoc3 = """{
"type.something.properties": "abcd"
}"""
indexDoc(index3, "1", testDoc3)
val testDoc4 = """{
"type.something.properties.lastone": "abcd"
}"""
indexDoc(index4, "1", testDoc4)

assertFalse(monitorResponse?.id.isNullOrEmpty())
monitor = monitorResponse!!.monitor
val id = monitorResponse.id
val executeMonitorResponse = executeMonitor(monitor, id, false)
Assert.assertEquals(executeMonitorResponse!!.monitorRunResult.monitorName, monitor.name)
Assert.assertEquals(executeMonitorResponse.monitorRunResult.triggerResults.size, 1)
searchAlerts(id)
val table = Table("asc", "id", null, 1, 0, "")
var getAlertsResponse = client()
.execute(AlertingActions.GET_ALERTS_ACTION_TYPE, GetAlertsRequest(table, "ALL", "ALL", null, null))
.get()
Assert.assertTrue(getAlertsResponse != null)
Assert.assertTrue(getAlertsResponse.alerts.size == 1)
val findings = searchFindings(id, customFindingsIndex)
assertEquals("Findings saved for test monitor", 4, findings.size)
}

fun `test execute monitor with custom query index`() {
val docQuery1 = DocLevelQuery(query = "source.ip.v6.v1:12345", name = "3")
val docQuery2 = DocLevelQuery(query = "source.ip.v6.v2:16645", name = "4")
val docQuery3 = DocLevelQuery(query = "source.ip.v4.v0:120", name = "5")
val docQuery4 = DocLevelQuery(query = "alias.some.fff:\"us-west-2\"", name = "6")
val docQuery5 = DocLevelQuery(query = "message:\"This is an error from IAD region\"", name = "7")
val docQuery6 = DocLevelQuery(query = "f1.type.f4:\"hello\"", name = "8")
val docQuery7 = DocLevelQuery(query = "f1.type.f2.f3:\"world\"", name = "9")
val docQuery8 = DocLevelQuery(query = "type:\"some type\"", name = "10")
val q1 = DocLevelQuery(query = "source.ip.v6.v1:12345", name = "3")
val q2 = DocLevelQuery(query = "source.ip.v6.v2:16645", name = "4")
val q3 = DocLevelQuery(query = "source.ip.v4.v0:120", name = "5")
val q4 = DocLevelQuery(query = "alias.some.fff:\"us-west-2\"", name = "6")
val q5 = DocLevelQuery(query = "message:\"This is an error from IAD region\"", name = "7")
val q6 = DocLevelQuery(query = "f1.type.f4:\"hello\"", name = "8")
val q7 = DocLevelQuery(query = "f1.type.f2.f3:\"world\"", name = "9")
val q8 = DocLevelQuery(query = "type:\"some type\"", name = "10")
val q9 = DocLevelQuery(query = "properties:123", name = "11")

val docLevelInput = DocLevelMonitorInput(
"description", listOf(index), listOf(docQuery1, docQuery2, docQuery3, docQuery4, docQuery5, docQuery6, docQuery7, docQuery8)
"description",
listOf(index),
listOf(q1, q2, q3, q4, q5, q6, q7, q8, q9)
)
val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
val customFindingsIndex = "custom_findings_index"
Expand Down Expand Up @@ -180,7 +311,8 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
"test_field.some_other_field" : "us-west-2",
"f1.type.f2.f3" : "world",
"f1.type.f4" : "hello",
"type" : "some type"
"type" : "some type",
"properties": 123
}"""
indexDoc(index, "1", testDoc)
client().admin().indices().putMapping(
Expand All @@ -207,7 +339,7 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
val findings = searchFindings(id, customFindingsIndex)
assertEquals("Findings saved for test monitor", 1, findings.size)
assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("1"))
assertEquals("Didn't match all 8 queries", 8, findings[0].docLevelQueries.size)
assertEquals("Didn't match all 9 queries", 9, findings[0].docLevelQueries.size)
}

fun `test execute monitor with non-flattened json doc as source`() {
Expand Down