Skip to content

Commit

Permalink
Added support for "nested" mappings (opensearch-project#645)
Browse files Browse the repository at this point in the history
* example

Signed-off-by: Petar Dzepina <petar.dzepina@gmail.com>

* fixed updating mappings for queryIndex

Signed-off-by: Petar Dzepina <petar.dzepina@gmail.com>

Signed-off-by: Petar Dzepina <petar.dzepina@gmail.com>
(cherry picked from commit 42abf4d)
  • Loading branch information
petardz committed Nov 8, 2022
1 parent 2684f67 commit 427f918
Show file tree
Hide file tree
Showing 2 changed files with 116 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ private val log = LogManager.getLogger(DocLevelMonitorQueries::class.java)

class DocLevelMonitorQueries(private val client: Client, private val clusterService: ClusterService) {
companion object {

val PROPERTIES = "properties"
val NESTED = "nested"
val TYPE = "type"

@JvmStatic
fun docLevelQueriesMappings(): String {
return DocLevelMonitorQueries::class.java.classLoader.getResource("mappings/doc-level-queries.json").readText()
Expand Down Expand Up @@ -95,6 +100,22 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ
return clusterState.routingTable.hasIndex(ScheduledJob.DOC_LEVEL_QUERIES_INDEX)
}

/**
* From given index mapping node, extracts fieldName -> fieldProperties pair
*/
fun extractField(node: MutableMap<String, Any>, currentPath: String): Pair<String, MutableMap<String, Any>> {
if (node.containsKey(PROPERTIES)) {
return extractField(node.get(PROPERTIES) as MutableMap<String, Any>, currentPath)
} else if (node.containsKey(NESTED)) {
return extractField(node.get(NESTED) as MutableMap<String, Any>, currentPath)
} else if (node.size == 1 && node.containsKey(TYPE) == false) {
val iter = node.iterator().next()
return extractField(iter.value as MutableMap<String, Any>, currentPath + "." + iter.key)
} else {
return Pair(currentPath, node)
}
}

suspend fun indexDocLevelQueries(
monitor: Monitor,
monitorId: String,
Expand Down Expand Up @@ -123,17 +144,19 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ
)

val updatedProperties = properties.entries.associate {
val newVal = it.value.toMutableMap()
var (fieldName, fieldProps) = extractField(it.value as MutableMap<String, Any>, it.key)
val newProps = fieldProps
if (monitor.dataSources.queryIndexMappingsByType.isNotEmpty()) {
val mappingsByType = monitor.dataSources.queryIndexMappingsByType
if (it.value.containsKey("type") && mappingsByType.containsKey(it.value["type"]!!)) {
mappingsByType[it.value["type"]]?.entries?.forEach { iter: Map.Entry<String, String> ->
newVal[iter.key] = iter.value
newProps[iter.key] = iter.value
}
}
}
if (it.value.containsKey("path")) newVal["path"] = "${it.value["path"]}_${indexName}_$monitorId"
"${it.key}_${indexName}_$monitorId" to newVal

if (fieldProps.containsKey("path")) newProps["path"] = "${fieldProps["path"]}_${indexName}_$monitorId"
"${fieldName}_${indexName}_$monitorId" to newProps
}
val queryIndex = monitor.dataSources.queryIndex

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ package org.opensearch.alerting
import org.junit.Assert
import org.opensearch.action.admin.cluster.state.ClusterStateRequest
import org.opensearch.action.admin.indices.create.CreateIndexRequest
import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest
import org.opensearch.action.admin.indices.refresh.RefreshRequest
import org.opensearch.action.search.SearchRequest
import org.opensearch.action.support.WriteRequest
import org.opensearch.alerting.action.SearchMonitorAction
Expand All @@ -17,6 +19,7 @@ import org.opensearch.alerting.transport.AlertingSingleNodeTestCase
import org.opensearch.common.settings.Settings
import org.opensearch.commons.alerting.action.AcknowledgeAlertRequest
import org.opensearch.commons.alerting.action.AlertingActions
import org.opensearch.commons.alerting.action.DeleteMonitorRequest
import org.opensearch.commons.alerting.action.GetAlertsRequest
import org.opensearch.commons.alerting.model.Alert
import org.opensearch.commons.alerting.model.DataSources
Expand All @@ -25,6 +28,8 @@ import org.opensearch.commons.alerting.model.DocLevelQuery
import org.opensearch.commons.alerting.model.ScheduledJob.Companion.SCHEDULED_JOBS_INDEX
import org.opensearch.commons.alerting.model.Table
import org.opensearch.index.query.MatchQueryBuilder
import org.opensearch.index.query.QueryBuilders
import org.opensearch.search.builder.SearchSourceBuilder
import org.opensearch.test.OpenSearchTestCase
import java.time.ZonedDateTime
import java.time.format.DateTimeFormatter
Expand All @@ -51,6 +56,7 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
}"""
assertFalse(monitorResponse?.id.isNullOrEmpty())
monitor = monitorResponse!!.monitor
Assert.assertEquals(monitor.owner, "alerting")
indexDoc(index, "1", testDoc)
val id = monitorResponse.id
val executeMonitorResponse = executeMonitor(monitor, id, true)
Expand Down Expand Up @@ -137,12 +143,24 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
val testDoc = """{
"message" : "This is an error from IAD region",
"source.port": 12345,
"test_strict_date_time" : "$testTime",
"test_field" : "us-west-2"
}"""
indexDoc(index, "1", testDoc)
assertFalse(monitorResponse?.id.isNullOrEmpty())
monitor = monitorResponse!!.monitor
indexDoc(index, "1", testDoc)
client().admin().indices().putMapping(
PutMappingRequest(
index
).source("test_alias.field_a", "type=alias,path=message")
).get()
client().admin().indices().putMapping(
PutMappingRequest(
index
).source("test_alias2", "type=alias,path=test_field")
).get()

val id = monitorResponse.id
val executeMonitorResponse = executeMonitor(monitor, id, false)
Assert.assertEquals(executeMonitorResponse!!.monitorRunResult.monitorName, monitor.name)
Expand Down Expand Up @@ -195,17 +213,67 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
Assert.assertTrue(mapping?.source()?.string()?.contains("\"analyzer\":\"$analyzer\"") == true)
}

fun `test execute monitor with custom findings index`() {
fun `test delete monitor deletes all queries and metadata too`() {
val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3")
val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery))
val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
val customQueryIndex = "custom_alerts_index"
val analyzer = "whitespace"
var monitor = randomDocumentLevelMonitor(
inputs = listOf(docLevelInput),
triggers = listOf(trigger),
dataSources = DataSources(
queryIndex = customQueryIndex,
queryIndexMappingsByType = mapOf(Pair("text", mapOf(Pair("analyzer", analyzer)))),
)
)
val monitorResponse = createMonitor(monitor)
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
val testDoc = """{
"message" : "This is an error from IAD region",
"test_strict_date_time" : "$testTime",
"test_field" : "us-west-2"
}"""
assertFalse(monitorResponse?.id.isNullOrEmpty())
monitor = monitorResponse!!.monitor
indexDoc(index, "1", testDoc)
val monitorId = monitorResponse.id
val executeMonitorResponse = executeMonitor(monitor, monitorId, false)
Assert.assertEquals(executeMonitorResponse!!.monitorRunResult.monitorName, monitor.name)
Assert.assertEquals(executeMonitorResponse.monitorRunResult.triggerResults.size, 1)
searchAlerts(monitorId)
val clusterStateResponse = client().admin().cluster().state(ClusterStateRequest().indices(customQueryIndex).metadata(true)).get()
val mapping = clusterStateResponse.state.metadata.index(customQueryIndex).mapping()
Assert.assertTrue(mapping?.source()?.string()?.contains("\"analyzer\":\"$analyzer\"") == true)
// Verify queries exist
var searchResponse = client().search(
SearchRequest(customQueryIndex).source(SearchSourceBuilder().query(QueryBuilders.matchAllQuery()))
).get()
assertNotEquals(0, searchResponse.hits.hits.size)
client().execute(
AlertingActions.DELETE_MONITOR_ACTION_TYPE, DeleteMonitorRequest(monitorId, WriteRequest.RefreshPolicy.IMMEDIATE)
).get()
client().admin().indices().refresh(RefreshRequest(customQueryIndex)).get()
// Verify queries are deleted
searchResponse = client().search(
SearchRequest(customQueryIndex).source(SearchSourceBuilder().query(QueryBuilders.matchAllQuery()))
).get()
assertEquals(0, searchResponse.hits.hits.size)
}

fun `test execute monitor with custom findings index and pattern`() {
val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3")
val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery))
val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
val customFindingsIndex = "custom_findings_index"
val customFindingsIndexPattern = "<custom_findings_index-{now/d}-1>"
var monitor = randomDocumentLevelMonitor(
inputs = listOf(docLevelInput),
triggers = listOf(trigger),
dataSources = DataSources(findingsIndex = customFindingsIndex)
dataSources = DataSources(findingsIndex = customFindingsIndex, findingsIndexPattern = customFindingsIndexPattern)
)
val monitorResponse = createMonitor(monitor)
client().admin().indices().refresh(RefreshRequest("*"))
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
val testDoc = """{
"message" : "This is an error from IAD region",
Expand All @@ -216,24 +284,25 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
monitor = monitorResponse!!.monitor
indexDoc(index, "1", testDoc)
val id = monitorResponse.id
val executeMonitorResponse = executeMonitor(monitor, id, false)
var executeMonitorResponse = executeMonitor(monitor, id, false)
Assert.assertEquals(executeMonitorResponse!!.monitorRunResult.monitorName, monitor.name)
Assert.assertEquals(executeMonitorResponse.monitorRunResult.triggerResults.size, 1)
searchAlerts(id)
val findings = searchFindings(id, customFindingsIndex)

var findings = searchFindings(id, "custom_findings_index*", true)
assertEquals("Findings saved for test monitor", 1, findings.size)
assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("1"))
val table = Table("asc", "id", null, 1, 0, "")
var getAlertsResponse = client()
.execute(AlertingActions.GET_ALERTS_ACTION_TYPE, GetAlertsRequest(table, "ALL", "ALL", null, null))
.get()
Assert.assertTrue(getAlertsResponse != null)
Assert.assertTrue(getAlertsResponse.alerts.size == 1)
getAlertsResponse = client()
.execute(AlertingActions.GET_ALERTS_ACTION_TYPE, GetAlertsRequest(table, "ALL", "ALL", id, null))
.get()
Assert.assertTrue(getAlertsResponse != null)
Assert.assertTrue(getAlertsResponse.alerts.size == 1)

indexDoc(index, "2", testDoc)
executeMonitorResponse = executeMonitor(monitor, id, false)
Assert.assertEquals(executeMonitorResponse!!.monitorRunResult.monitorName, monitor.name)
Assert.assertEquals(executeMonitorResponse.monitorRunResult.triggerResults.size, 1)
searchAlerts(id)
findings = searchFindings(id, "custom_findings_index*", true)
assertEquals("Findings saved for test monitor", 2, findings.size)
assertTrue("Findings saved for test monitor", findings[1].relatedDocIds.contains("2"))

val indices = getAllIndicesFromPattern("custom_findings_index*")
Assert.assertTrue(indices.isNotEmpty())
}

fun `test execute pre-existing monitorand update`() {
Expand Down Expand Up @@ -318,7 +387,6 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {

val customAlertsIndex = "custom_alerts_index"
val customQueryIndex = "custom_query_index"
Assert.assertFalse(client().admin().cluster().state(ClusterStateRequest()).get().state.routingTable.hasIndex(customQueryIndex))
val customFindingsIndex = "custom_findings_index"
val updateMonitorResponse = updateMonitor(
monitor.copy(
Expand All @@ -335,8 +403,9 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
Assert.assertNotNull(updateMonitorResponse)
Assert.assertEquals(updateMonitorResponse!!.monitor.owner, "security_analytics_plugin")
indexDoc(index, "2", testDoc)
executeMonitorResponse = executeMonitor(updateMonitorResponse!!.monitor, monitorId, false)
Assert.assertTrue(client().admin().cluster().state(ClusterStateRequest()).get().state.routingTable.hasIndex(customQueryIndex))
if (updateMonitorResponse != null) {
executeMonitorResponse = executeMonitor(updateMonitorResponse.monitor, monitorId, false)
}
val findings = searchFindings(monitorId, customFindingsIndex)
assertEquals("Findings saved for test monitor", 1, findings.size)
assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("2"))
Expand Down

0 comments on commit 427f918

Please sign in to comment.