From bb6f87d3c0d8f0039af43a9c230a5c31d763100a Mon Sep 17 00:00:00 2001 From: AWSHurneyt Date: Tue, 29 Mar 2022 08:43:49 -0700 Subject: [PATCH 01/11] Implemented support for defining doc level monitors using aliases. Signed-off-by: AWSHurneyt --- .../DocumentReturningMonitorRunner.kt | 218 ++++++++++++++---- .../org/opensearch/alerting/model/Monitor.kt | 8 +- .../alerting/AlertingRestTestCase.kt | 54 +++++ .../alerting/MonitorRunnerServiceIT.kt | 188 +++++++++++++++ .../org/opensearch/alerting/TestHelpers.kt | 1 + .../alerting/resthandler/MonitorRestApiIT.kt | 64 ++--- 6 files changed, 448 insertions(+), 85 deletions(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentReturningMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentReturningMonitorRunner.kt index e430d8f67..397e453ff 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentReturningMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentReturningMonitorRunner.kt @@ -6,6 +6,7 @@ package org.opensearch.alerting import org.apache.logging.log4j.LogManager +import org.opensearch.action.admin.indices.alias.get.GetAliasesRequest import org.opensearch.action.index.IndexRequest import org.opensearch.action.search.SearchAction import org.opensearch.action.search.SearchRequest @@ -83,64 +84,162 @@ object DocumentReturningMonitorRunner : MonitorRunner() { val queries: List = docLevelMonitorInput.queries val isTempMonitor = dryrun || monitor.id == Monitor.NO_ID - var lastRunContext = monitor.lastRunContext.toMutableMap() - try { - if (lastRunContext.isNullOrEmpty()) { - lastRunContext = createRunContext(monitorCtx.clusterService!!, monitorCtx.client!!, index).toMutableMap() - } - } catch (e: Exception) { - logger.info("Failed to start Document-level-monitor $index. Error: ${e.message}") - return monitorResult.copy(error = e) - } - - val count: Int = lastRunContext["shards_count"] as Int + val lastRunContext = if (monitor.lastRunContext.isNullOrEmpty()) mutableMapOf() + else monitor.lastRunContext.toMutableMap() as MutableMap> +// var lastRunContext = monitor.lastRunContext.toMutableMap() +// try { +// if (lastRunContext.isNullOrEmpty()) { +// lastRunContext = createRunContext(monitorCtx.clusterService!!, monitorCtx.client!!, index).toMutableMap() +// } +// } catch (e: Exception) { +// logger.info("Failed to start Document-level-monitor $index. Error: ${e.message}") +// return monitorResult.copy(error = e) +// } + +// val count: Int = lastRunContext["shards_count"] as Int val updatedLastRunContext = lastRunContext.toMutableMap() - for (i: Int in 0 until count) { - val shard = i.toString() - val maxSeqNo: Long = getMaxSeqNo(monitorCtx.client!!, index, shard) - updatedLastRunContext[shard] = maxSeqNo - - // update lastRunContext if its a temp monitor as we only want to view the last bit of data then - // TODO: If dryrun, we should make it so we limit the search as this could still potentially give us lots of data - if (isTempMonitor) { - lastRunContext[shard] = max(-1, maxSeqNo - 1) - } - } +// for (i: Int in 0 until count) { +// val shard = i.toString() +// val maxSeqNo: Long = getMaxSeqNo(monitorCtx.client!!, index, shard) +// updatedLastRunContext[shard] = maxSeqNo +// +// // update lastRunContext if its a temp monitor as we only want to view the last bit of data then +// // TODO: If dryrun, we should make it so we limit the search as this could still potentially give us lots of data +// if (isTempMonitor) { +// lastRunContext[shard] = max(-1, maxSeqNo - 1) +// } +// } val queryToDocIds = mutableMapOf>() val docsToQueries = mutableMapOf>() val docExecutionContext = DocumentExecutionContext(queries, lastRunContext, updatedLastRunContext) val idQueryMap = mutableMapOf() - val matchingDocs = getMatchingDocs(monitor, monitorCtx, docExecutionContext, index, dryrun) + try { + val getAliasesRequest = GetAliasesRequest(index) + val getAliasesResponse = monitorCtx.client!!.admin().indices().getAliases(getAliasesRequest).actionGet() + val aliasIndices = getAliasesResponse.aliases.keys().map { it.value } - if (matchingDocs.isNotEmpty()) { - val matchedQueriesForDocs = getMatchedQueries(monitorCtx, matchingDocs.map { it.second }, monitor) + // TODO: Add log message if index is/is not alias? + val isAlias = aliasIndices.isNotEmpty() - matchedQueriesForDocs.forEach { hit -> - val (id, query) = Pair( - hit.id.replace("_${monitor.id}", ""), - ((hit.sourceAsMap["query"] as HashMap<*, *>)["query_string"] as HashMap<*, *>)["query"] - ) - val docLevelQuery = DocLevelQuery(id, id, query.toString()) - - val docIndices = hit.field("_percolator_document_slot").values.map { it.toString().toInt() } - docIndices.forEach { idx -> - if (queryToDocIds.containsKey(docLevelQuery)) { - queryToDocIds[docLevelQuery]?.add(matchingDocs[idx].first) - } else { - queryToDocIds[docLevelQuery] = mutableSetOf(matchingDocs[idx].first) - } + // If the input index is an alias, creating a list of all indices associated with that alias; + // else creating a list containing the single index input + val indices = if (isAlias) getAliasesResponse.aliases.keys().map { it.value } else listOf(index) + + indices.forEach { indexName -> + // Prepare lastRunContext for each index + val indexLastRunContext = lastRunContext.getOrPut(indexName) { + createRunContext(monitorCtx.clusterService!!, monitorCtx.client!!, indexName) + } - if (docsToQueries.containsKey(matchingDocs[idx].first)) { - docsToQueries[matchingDocs[idx].first]?.add(id) - } else { - docsToQueries[matchingDocs[idx].first] = mutableListOf(id) + // Prepare updatedLastRunContext for each index + val indexUpdatedRunContext = updateLastRunContext( + indexLastRunContext.toMutableMap(), + monitorCtx, + indexName + ) as MutableMap + updatedLastRunContext[indexName] = indexUpdatedRunContext + + // Prepare DocumentExecutionContext for each index + val docExecutionContext = DocumentExecutionContext(queries, indexLastRunContext, indexUpdatedRunContext) + + val matchingDocs = getMatchingDocs(monitor, monitorCtx, docExecutionContext, index, dryrun) + + if (matchingDocs.isNotEmpty()) { + val matchedQueriesForDocs = getMatchedQueries(monitorCtx, matchingDocs.map { it.second }, monitor) + + matchedQueriesForDocs.forEach { hit -> + val (id, query) = Pair( + hit.id.replace("_${monitor.id}", ""), + ((hit.sourceAsMap["query"] as HashMap<*, *>)["query_string"] as HashMap<*, *>)["query"] + ) + val docLevelQuery = DocLevelQuery(id, id, query.toString()) + + val docIndices = hit.field("_percolator_document_slot").values.map { it.toString().toInt() } + docIndices.forEach { idx -> + val docIndex = "${matchingDocs[idx].first}|$indexName" + if (queryToDocIds.containsKey(docLevelQuery)) { + queryToDocIds[docLevelQuery]?.add(docIndex) + } else { + queryToDocIds[docLevelQuery] = mutableSetOf(docIndex) + } + + if (docsToQueries.containsKey(docIndex)) { + docsToQueries[docIndex]?.add(id) + } else { + docsToQueries[docIndex] = mutableListOf(id) + } + } } } + +// val queryResults = getDocLevelQueryResults( +// monitorCtx, +// docExecutionContext, +// indexName, +// queries +// ) + +// // Only need to compile the query results together when executing the monitor for more than 1 index +// if (indices.size > 1) { +// val newQueryToDocIds = mutableMapOf>() +// queryResults["queryDocIds"]?.forEach { +// val docLevelQuery = it.key +// val matchingDocIds = it.value +// val existingQueryToDocIds = queryToDocIds.getOrDefault(docLevelQuery, setOf()).toMutableSet() +// existingQueryToDocIds.addAll(matchingDocIds) +// newQueryToDocIds[it.key as DocLevelQuery] = existingQueryToDocIds +// } +// queryToDocIds.putAll(newQueryToDocIds) +// +// val newDocsToQueries = mutableMapOf>() +// queryResults["docsToQueries"]?.forEach { +// val docId = it.key +// val queryIds = it.value +// val existingQueryIds = docsToQueries.getOrDefault(docId, mutableListOf()) +// existingQueryIds.addAll(queryIds) +// newDocsToQueries[docId as String] = existingQueryIds +// } +// docsToQueries.putAll(newDocsToQueries) +// } else { +// queryToDocIds.putAll(queryResults["queryDocIds"] as MutableMap>) +// docsToQueries.putAll(queryResults["docsToQueries"] as MutableMap>) +// } } + } catch (e: Exception) { + logger.info("Failed to start Document-level-monitor $index. Error: ${e.message}") } +// val matchingDocs = getMatchingDocs(monitor, monitorCtx, docExecutionContext, index, dryrun) +// +// if (matchingDocs.isNotEmpty()) { +// val matchedQueriesForDocs = getMatchedQueries(monitorCtx, matchingDocs.map { it.second }, monitor) +// +// matchedQueriesForDocs.forEach { hit -> +// val (id, query) = Pair( +// hit.id.replace("_${monitor.id}", ""), +// ((hit.sourceAsMap["query"] as HashMap<*, *>)["query_string"] as HashMap<*, *>)["query"] +// ) +// val docLevelQuery = DocLevelQuery(id, id, query.toString()) +// +// val docIndices = hit.field("_percolator_document_slot").values.map { it.toString().toInt() } +// docIndices.forEach { idx -> +// if (queryToDocIds.containsKey(docLevelQuery)) { +// queryToDocIds[docLevelQuery]?.add(matchingDocs[idx].first) +// } else { +// queryToDocIds[docLevelQuery] = mutableSetOf(matchingDocs[idx].first) +// } +// +// if (docsToQueries.containsKey(matchingDocs[idx].first)) { +// docsToQueries[matchingDocs[idx].first]?.add(id) +// } else { +// docsToQueries[matchingDocs[idx].first] = mutableListOf(id) +// } +// } +// } +// } + val queryInputResults = queryToDocIds.mapKeys { it.key.id } monitorResult = monitorResult.copy(inputResults = InputRunResults(listOf(queryInputResults))) val queryIds = queries.map { @@ -193,8 +292,6 @@ object DocumentReturningMonitorRunner : MonitorRunner() { logger.info("trigger results") logger.info(triggerResult.triggeredDocs.toString()) - val index = (monitor.inputs[0] as DocLevelMonitorInput).indices[0] - // TODO: modify findings such that there is a finding per document val findings = mutableListOf() val findingDocPairs = mutableListOf>() @@ -203,7 +300,7 @@ object DocumentReturningMonitorRunner : MonitorRunner() { if (!dryrun && monitor.id != Monitor.NO_ID) { docsToQueries.forEach { val triggeredQueries = it.value.map { queryId -> idQueryMap[queryId]!! } - val findingId = createFindings(monitor, monitorCtx, index, triggeredQueries, listOf(it.key)) + val findingId = createFindings(monitor, monitorCtx, triggeredQueries, it.key) findings.add(findingId) if (triggerResult.triggeredDocs.contains(it.key)) { @@ -244,16 +341,18 @@ object DocumentReturningMonitorRunner : MonitorRunner() { private fun createFindings( monitor: Monitor, monitorCtx: MonitorRunnerExecutionContext, - index: String, docLevelQueries: List, - matchingDocIds: List + matchingDocId: String ): String { + // Before the "|" is the doc id and after the "|" is the index + val docIndex = matchingDocId.split("|") + val finding = Finding( id = UUID.randomUUID().toString(), - relatedDocIds = matchingDocIds, + relatedDocIds = listOf(docIndex[0]), monitorId = monitor.id, monitorName = monitor.name, - index = index, + index = docIndex[1], docLevelQueries = docLevelQueries, timestamp = Instant.now() ) @@ -273,6 +372,25 @@ object DocumentReturningMonitorRunner : MonitorRunner() { return finding.id } + private fun updateLastRunContext( + lastRunContext: Map, + monitorCtx: MonitorRunnerExecutionContext, + index: String + ): Map { + // TODO DRAFT: Is it better to get shards_count by calling getShardsCount + // as opposed to retrieving the count from lastRunContext? The number of shards + // could changing between monitor executions. +// val count: Int = lastRunContext["shards_count"] as Int + val count: Int = getShardsCount(monitorCtx.clusterService!!, index) + val updatedLastRunContext = lastRunContext.toMutableMap() + for (i: Int in 0 until count) { + val shard = i.toString() + val maxSeqNo: Long = getMaxSeqNo(monitorCtx.client!!, index, shard) + updatedLastRunContext[shard] = maxSeqNo.toString() + } + return updatedLastRunContext + } + private fun validate(monitor: Monitor) { if (monitor.inputs.size > 1) { throw IOException("Only one input is supported with document-level-monitor.") diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/model/Monitor.kt b/alerting/src/main/kotlin/org/opensearch/alerting/model/Monitor.kt index 787e5d778..cc11e92f8 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/model/Monitor.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/model/Monitor.kt @@ -197,9 +197,11 @@ data class Monitor( // Outputting type with each Trigger so that the generic Trigger.readFrom() can read it out.writeVInt(triggers.size) triggers.forEach { - if (it is QueryLevelTrigger) out.writeEnum(Trigger.Type.QUERY_LEVEL_TRIGGER) - else if (it is DocumentLevelTrigger) out.writeEnum(Trigger.Type.DOCUMENT_LEVEL_TRIGGER) - else out.writeEnum(Trigger.Type.BUCKET_LEVEL_TRIGGER) + when (it) { + is BucketLevelTrigger -> out.writeEnum(Trigger.Type.BUCKET_LEVEL_TRIGGER) + is DocumentLevelTrigger -> out.writeEnum(Trigger.Type.DOCUMENT_LEVEL_TRIGGER) + else -> out.writeEnum(Trigger.Type.QUERY_LEVEL_TRIGGER) + } it.writeTo(out) } out.writeMap(lastRunContext) diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt b/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt index a5cd3738a..b821f9917 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt @@ -18,6 +18,7 @@ import org.opensearch.alerting.AlertingPlugin.Companion.EMAIL_ACCOUNT_BASE_URI import org.opensearch.alerting.AlertingPlugin.Companion.EMAIL_GROUP_BASE_URI import org.opensearch.alerting.action.GetFindingsResponse import org.opensearch.alerting.alerts.AlertIndices +import org.opensearch.alerting.core.model.DocLevelMonitorInput import org.opensearch.alerting.core.model.DocLevelQuery import org.opensearch.alerting.core.model.ScheduledJob import org.opensearch.alerting.core.model.SearchInput @@ -88,6 +89,7 @@ abstract class AlertingRestTestCase : ODFERestTestCase() { mutableListOf( Monitor.XCONTENT_REGISTRY, SearchInput.XCONTENT_REGISTRY, + DocLevelMonitorInput.XCONTENT_REGISTRY, QueryLevelTrigger.XCONTENT_REGISTRY, BucketLevelTrigger.XCONTENT_REGISTRY, DocumentLevelTrigger.XCONTENT_REGISTRY @@ -782,6 +784,58 @@ abstract class AlertingRestTestCase : ODFERestTestCase() { return index } + protected fun createTestAlias( + alias: String = randomAlphaOfLength(10).toLowerCase(Locale.ROOT), + numOfAliasIndices: Int = randomIntBetween(1, 10), + includeWriteIndex: Boolean = randomBoolean() + ): MutableMap> { + return createTestAlias(alias = alias, indices = randomAliasIndices(alias, numOfAliasIndices, includeWriteIndex)) + } + + protected fun createTestAlias( + alias: String = randomAlphaOfLength(10).toLowerCase(Locale.ROOT), + indices: Map = randomAliasIndices( + alias = alias, + num = randomIntBetween(1, 10), + includeWriteIndex = randomBoolean() + ) + ): MutableMap> { + val indicesMap = mutableMapOf() + val indicesJson = jsonBuilder().startObject().startArray("actions") + indices.keys.map { + val indexName = createTestIndex(index = it.toLowerCase(Locale.ROOT), mapping = "") + val isWriteIndex = indices.getOrDefault(indexName, false) + indicesMap[indexName] = isWriteIndex + val indexMap = mapOf( + "add" to mapOf( + "index" to indexName, + "alias" to alias, + "is_write_index" to isWriteIndex + ) + ) + indicesJson.value(indexMap) + } + val requestBody = indicesJson.endArray().endObject().string() + client().makeRequest("POST", "/_aliases", emptyMap(), StringEntity(requestBody, APPLICATION_JSON)) + return mutableMapOf(alias to indicesMap) + } + + protected fun randomAliasIndices( + alias: String, + num: Int = randomIntBetween(1, 10), + includeWriteIndex: Boolean = true + ): Map { + val indices = mutableMapOf() + val writeIndex = randomIntBetween(0, num) + for (i: Int in 0 until num) { + var indexName = randomAlphaOfLength(10) + while (indexName.equals(alias) || indices.containsKey(indexName)) + indexName = randomAlphaOfLength(10) + indices[indexName] = includeWriteIndex && i == writeIndex + } + return indices + } + protected fun insertSampleTimeSerializedData(index: String, data: List) { data.forEachIndexed { i, value -> val twoMinsAgo = ZonedDateTime.now().minus(2, ChronoUnit.MINUTES).truncatedTo(ChronoUnit.MILLIS) diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerServiceIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerServiceIT.kt index 540fd166c..0b7c6c3cb 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerServiceIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerServiceIT.kt @@ -29,6 +29,7 @@ import org.opensearch.alerting.model.destination.email.Email import org.opensearch.alerting.model.destination.email.Recipient import org.opensearch.alerting.util.DestinationType import org.opensearch.alerting.util.getBucketKeysHash +import org.opensearch.client.Response import org.opensearch.client.ResponseException import org.opensearch.client.WarningFailureException import org.opensearch.common.settings.Settings @@ -1657,6 +1658,193 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() { } } + fun `test document-level monitor when alias only has write index with 0 docs`() { + // Monitor should execute, but create 0 findings. + val alias = createTestAlias(includeWriteIndex = false) + val indices = alias[alias.keys.first()]?.keys?.toList() as List + val query = randomDocLevelQuery(tags = listOf()) + val input = randomDocLevelMonitorInput(indices = indices, queries = listOf(query)) + val trigger = randomDocLevelTrigger(condition = Script("params${query.id}")) + val monitor = createMonitor(randomDocumentLevelMonitor(enabled = false, inputs = listOf(input), triggers = listOf(trigger))) + + val response: Response + try { + response = executeMonitor(monitor.id) + } catch (e: ResponseException) { + assertNotNull("Expected an error message: $e", e.message) + e.message?.let { + assertTrue("Unexpected exception: $e", it.contains("""reason":"no such index [.opensearch-alerting-findings]""")) + } + assertEquals(404, e.response.statusLine.statusCode) + return + } + + val output = entityAsMap(response) + val inputResults = output.stringMap("input_results") + val errorMessage = inputResults?.get("error") + @Suppress("UNCHECKED_CAST") + val searchResult = (inputResults?.get("results") as List>).firstOrNull() + @Suppress("UNCHECKED_CAST") + val findings = if (searchResult == null) listOf() + else searchResult?.stringMap("hits")?.getOrDefault("hits", listOf>>()) as List> + + assertEquals(monitor.name, output["monitor_name"]) + assertNull("Unexpected monitor execution failure: $errorMessage", errorMessage) + findings.forEach { + val findingQueryId = it["queryId"] + assertNotEquals("No findings should exist with queryId ${query.id}, but found: $it", query.id, findingQueryId) + } + } + + fun `test document-level monitor when docs exist prior to monitor creation`() { + // FIXME: Consider renaming this test case + // Only new docs should create findings. + val alias = createTestAlias(includeWriteIndex = false) + val indices = alias[alias.keys.first()]?.keys?.toList() as List + val query = randomDocLevelQuery(tags = listOf()) + val input = randomDocLevelMonitorInput(indices = indices, queries = listOf(query)) + val trigger = randomDocLevelTrigger(condition = Script("params${query.id}")) + + val preExistingDocIds = mutableSetOf() + indices.forEach { index -> + val docId = index.hashCode().toString() + val doc = """{ "message" : "${query.query}" }""" + preExistingDocIds.add(docId) + indexDoc(index = index, id = docId, doc = doc) + } + assertEquals(indices.size, preExistingDocIds.size) + + val monitor = createMonitor(randomDocumentLevelMonitor(enabled = false, inputs = listOf(input), triggers = listOf(trigger))) + + val response = executeMonitor(monitor.id) + + val output = entityAsMap(response) + val inputResults = output.stringMap("input_results") + val errorMessage = inputResults?.get("error") + @Suppress("UNCHECKED_CAST") + val searchResult = (inputResults?.get("results") as List>).firstOrNull() + @Suppress("UNCHECKED_CAST") + val findings = if (searchResult == null) listOf() + else searchResult?.stringMap("hits")?.getOrDefault("hits", listOf>>()) as List> + + assertEquals(monitor.name, output["monitor_name"]) + assertNull("Unexpected monitor execution failure: $errorMessage", errorMessage) + findings.forEach { + val findingDocId = it["id"] as String + assertFalse("Findings index should not contain a pre-existing doc, but found $it", preExistingDocIds.contains(findingDocId)) + } + } + + fun `test document-level monitor when alias indices only contain docs that match query`() { + // Only new docs should create findings. + val alias = createTestAlias(includeWriteIndex = false) + val indices = alias[alias.keys.first()]?.keys?.toList() as List + val query = randomDocLevelQuery(tags = listOf()) + val input = randomDocLevelMonitorInput(indices = indices, queries = listOf(query)) + val trigger = randomDocLevelTrigger(condition = Script("params${query.id}")) + + val preExistingDocIds = mutableSetOf() + indices.forEach { index -> + val docId = index.hashCode().toString() + val doc = """{ "message" : "${query.query}" }""" + preExistingDocIds.add(docId) + indexDoc(index = index, id = docId, doc = doc) + } + assertEquals(indices.size, preExistingDocIds.size) + + val monitor = createMonitor(randomDocumentLevelMonitor(enabled = false, inputs = listOf(input), triggers = listOf(trigger))) + executeMonitor(monitor.id) + + val newDocIds = mutableSetOf() + indices.forEach { index -> + (1..5).map { + val docId = "${index.hashCode()}$it" + val doc = """{ "message" : "${query.query}" }""" + newDocIds.add(docId) + indexDoc(index = index, id = docId, doc = doc) + } + } + assertEquals(indices.size * 5, newDocIds.size) + + val response = executeMonitor(monitor.id) + + val output = entityAsMap(response) + val inputResults = output.stringMap("input_results") + val errorMessage = inputResults?.get("error") + @Suppress("UNCHECKED_CAST") + val searchResult = (inputResults?.get("results") as List>).firstOrNull() + @Suppress("UNCHECKED_CAST") + val findings = if (searchResult == null) listOf() + else searchResult?.stringMap("hits")?.getOrDefault("hits", listOf>>()) as List> + + assertEquals(monitor.name, output["monitor_name"]) + assertNull("Unexpected monitor execution failure: $errorMessage", errorMessage) + findings.forEach { + val findingDocId = it["id"] as String + assertFalse("Findings index should not contain a pre-existing doc, but found $it", preExistingDocIds.contains(findingDocId)) + assertTrue("Found an unexpected finding $it", newDocIds.contains(findingDocId)) + } + } + + fun `test document-level monitor when alias indices contain docs that do and do not match query`() { + // Only matching docs should create findings. + val alias = createTestAlias(includeWriteIndex = false) + val indices = alias[alias.keys.first()]?.keys?.toList() as List + val query = randomDocLevelQuery(tags = listOf()) + val input = randomDocLevelMonitorInput(indices = indices, queries = listOf(query)) + val trigger = randomDocLevelTrigger(condition = Script("params${query.id}")) + + val preExistingDocIds = mutableSetOf() + indices.forEach { index -> + val docId = index.hashCode().toString() + val doc = """{ "message" : "${query.query}" }""" + preExistingDocIds.add(docId) + indexDoc(index = index, id = docId, doc = doc) + } + assertEquals(indices.size, preExistingDocIds.size) + + val monitor = createMonitor(randomDocumentLevelMonitor(enabled = false, inputs = listOf(input), triggers = listOf(trigger))) + executeMonitor(monitor.id) + + val matchingDocIds = mutableSetOf() + val nonMatchingDocIds = mutableSetOf() + indices.forEach { index -> + (1..5).map { + val matchingDocId = "${index.hashCode()}$it" + val matchingDoc = """{ "message" : "${query.query}" }""" + indexDoc(index = index, id = matchingDocId, doc = matchingDoc) + matchingDocIds.add(matchingDocId) + + val nonMatchingDocId = "${index.hashCode()}${it}2" + var nonMatchingDoc = StringBuilder(query.query).insert(2, "difference").toString() + nonMatchingDoc = """{ "message" : "$nonMatchingDoc" }""" + indexDoc(index = index, id = nonMatchingDocId, doc = nonMatchingDoc) + nonMatchingDocIds.add(nonMatchingDocId) + } + } + assertEquals(indices.size * 5, matchingDocIds.size) + + val response = executeMonitor(monitor.id) + + val output = entityAsMap(response) + val inputResults = output.stringMap("input_results") + val errorMessage = inputResults?.get("error") + @Suppress("UNCHECKED_CAST") + val searchResult = (inputResults?.get("results") as List>).firstOrNull() + @Suppress("UNCHECKED_CAST") + val findings = if (searchResult == null) listOf() + else searchResult?.stringMap("hits")?.getOrDefault("hits", listOf>>()) as List> + + assertEquals(monitor.name, output["monitor_name"]) + assertNull("Unexpected monitor execution failure: $errorMessage", errorMessage) + findings.forEach { + val findingDocId = it["id"] as String + assertFalse("Findings index should not contain a pre-existing doc, but found $it", preExistingDocIds.contains(findingDocId)) + assertFalse("Found doc that doesn't match query: $it", nonMatchingDocIds.contains(findingDocId)) + assertTrue("Found an unexpected finding $it", matchingDocIds.contains(findingDocId)) + } + } + private fun prepareTestAnomalyResult(detectorId: String, user: User) { val adResultIndex = ".opendistro-anomaly-results-history-2020.10.17" try { diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt b/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt index 9f4e8717e..d6d3191bb 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt @@ -645,6 +645,7 @@ fun xContentRegistry(): NamedXContentRegistry { return NamedXContentRegistry( listOf( SearchInput.XCONTENT_REGISTRY, + DocLevelMonitorInput.XCONTENT_REGISTRY, QueryLevelTrigger.XCONTENT_REGISTRY, BucketLevelTrigger.XCONTENT_REGISTRY, DocumentLevelTrigger.XCONTENT_REGISTRY diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/MonitorRestApiIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/MonitorRestApiIT.kt index 6e00302c2..773316a86 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/MonitorRestApiIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/MonitorRestApiIT.kt @@ -1082,38 +1082,6 @@ class MonitorRestApiIT : AlertingRestTestCase() { } } - private fun validateAlertingStatsNodeResponse(nodesResponse: Map) { - assertEquals("Incorrect number of nodes", numberOfNodes, nodesResponse["total"]) - assertEquals("Failed nodes found during monitor stats call", 0, nodesResponse["failed"]) - assertEquals("More than $numberOfNodes successful node", numberOfNodes, nodesResponse["successful"]) - } - - private fun isMonitorScheduled(monitorId: String, alertingStatsResponse: Map): Boolean { - val nodesInfo = alertingStatsResponse["nodes"] as Map - for (nodeId in nodesInfo.keys) { - val nodeInfo = nodesInfo[nodeId] as Map - val jobsInfo = nodeInfo["jobs_info"] as Map - if (jobsInfo.keys.contains(monitorId)) { - return true - } - } - - return false - } - - private fun assertAlertingStatsSweeperEnabled(alertingStatsResponse: Map, expected: Boolean) { - assertEquals( - "Legacy scheduled job enabled field is not set to $expected", - expected, - alertingStatsResponse[statsResponseOpendistroSweeperEnabledField] - ) - assertEquals( - "Scheduled job is not ${if (expected) "enabled" else "disabled"}", - expected, - alertingStatsResponse[statsResponseOpenSearchSweeperEnabledField] - ) - } - @Throws(Exception::class) fun `test creating a document monitor`() { val testIndex = createTestIndex() @@ -1227,4 +1195,36 @@ class MonitorRestApiIT : AlertingRestTestCase() { ) } } + + private fun validateAlertingStatsNodeResponse(nodesResponse: Map) { + assertEquals("Incorrect number of nodes", numberOfNodes, nodesResponse["total"]) + assertEquals("Failed nodes found during monitor stats call", 0, nodesResponse["failed"]) + assertEquals("More than $numberOfNodes successful node", numberOfNodes, nodesResponse["successful"]) + } + + private fun isMonitorScheduled(monitorId: String, alertingStatsResponse: Map): Boolean { + val nodesInfo = alertingStatsResponse["nodes"] as Map + for (nodeId in nodesInfo.keys) { + val nodeInfo = nodesInfo[nodeId] as Map + val jobsInfo = nodeInfo["jobs_info"] as Map + if (jobsInfo.keys.contains(monitorId)) { + return true + } + } + + return false + } + + private fun assertAlertingStatsSweeperEnabled(alertingStatsResponse: Map, expected: Boolean) { + assertEquals( + "Legacy scheduled job enabled field is not set to $expected", + expected, + alertingStatsResponse[statsResponseOpendistroSweeperEnabledField] + ) + assertEquals( + "Scheduled job is not ${if (expected) "enabled" else "disabled"}", + expected, + alertingStatsResponse[statsResponseOpenSearchSweeperEnabledField] + ) + } } From 4663a1293454c4aba7baa655b14ffea59df00a20 Mon Sep 17 00:00:00 2001 From: Ashish Agrawal Date: Wed, 20 Apr 2022 09:19:41 -0700 Subject: [PATCH 02/11] Fix integ tests and cleaup alias logic Signed-off-by: Ashish Agrawal --- .../DocumentReturningMonitorRunner.kt | 134 ++---------- .../resthandler/RestGetFindingsAction.kt | 2 +- .../TransportExecuteMonitorAction.kt | 2 +- .../transport/TransportGetFindingsAction.kt | 22 +- .../transport/TransportIndexMonitorAction.kt | 19 +- .../alerting/util/DocLevelMonitorQueries.kt | 113 ++++++---- .../alerting/AlertingRestTestCase.kt | 9 +- .../alerting/DocumentMonitorRunnerIT.kt | 195 +++++++++++++++++- .../alerting/MonitorRunnerServiceIT.kt | 188 ----------------- .../alerting/resthandler/FindingsRestApiIT.kt | 70 ++++++- .../alerting/core/model/ScheduledJob.kt | 2 +- 11 files changed, 399 insertions(+), 357 deletions(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentReturningMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentReturningMonitorRunner.kt index 397e453ff..e7621fc82 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentReturningMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentReturningMonitorRunner.kt @@ -58,17 +58,16 @@ object DocumentReturningMonitorRunner : MonitorRunner() { periodEnd: Instant, dryrun: Boolean ): MonitorRunResult { - logger.info("Document-level-monitor is running ...") + logger.debug("Document-level-monitor is running ...") var monitorResult = MonitorRunResult(monitor.name, periodStart, periodEnd) - // TODO: is this needed from Charlie? try { monitorCtx.alertIndices!!.createOrUpdateAlertIndex() monitorCtx.alertIndices!!.createOrUpdateInitialAlertHistoryIndex() monitorCtx.alertIndices!!.createOrUpdateInitialFindingHistoryIndex() } catch (e: Exception) { val id = if (monitor.id.trim().isEmpty()) "_na_" else monitor.id - logger.error("Error loading alerts for monitor: $id", e) + logger.error("Error setting up alerts and findings indices for monitor: $id", e) return monitorResult.copy(error = e) } @@ -86,33 +85,10 @@ object DocumentReturningMonitorRunner : MonitorRunner() { val isTempMonitor = dryrun || monitor.id == Monitor.NO_ID val lastRunContext = if (monitor.lastRunContext.isNullOrEmpty()) mutableMapOf() else monitor.lastRunContext.toMutableMap() as MutableMap> -// var lastRunContext = monitor.lastRunContext.toMutableMap() -// try { -// if (lastRunContext.isNullOrEmpty()) { -// lastRunContext = createRunContext(monitorCtx.clusterService!!, monitorCtx.client!!, index).toMutableMap() -// } -// } catch (e: Exception) { -// logger.info("Failed to start Document-level-monitor $index. Error: ${e.message}") -// return monitorResult.copy(error = e) -// } - -// val count: Int = lastRunContext["shards_count"] as Int val updatedLastRunContext = lastRunContext.toMutableMap() -// for (i: Int in 0 until count) { -// val shard = i.toString() -// val maxSeqNo: Long = getMaxSeqNo(monitorCtx.client!!, index, shard) -// updatedLastRunContext[shard] = maxSeqNo -// -// // update lastRunContext if its a temp monitor as we only want to view the last bit of data then -// // TODO: If dryrun, we should make it so we limit the search as this could still potentially give us lots of data -// if (isTempMonitor) { -// lastRunContext[shard] = max(-1, maxSeqNo - 1) -// } -// } val queryToDocIds = mutableMapOf>() val docsToQueries = mutableMapOf>() - val docExecutionContext = DocumentExecutionContext(queries, lastRunContext, updatedLastRunContext) val idQueryMap = mutableMapOf() try { @@ -120,8 +96,8 @@ object DocumentReturningMonitorRunner : MonitorRunner() { val getAliasesResponse = monitorCtx.client!!.admin().indices().getAliases(getAliasesRequest).actionGet() val aliasIndices = getAliasesResponse.aliases.keys().map { it.value } - // TODO: Add log message if index is/is not alias? val isAlias = aliasIndices.isNotEmpty() + logger.debug("index, $index, is an alias index: $isAlias") // If the input index is an alias, creating a list of all indices associated with that alias; // else creating a list containing the single index input @@ -141,10 +117,21 @@ object DocumentReturningMonitorRunner : MonitorRunner() { ) as MutableMap updatedLastRunContext[indexName] = indexUpdatedRunContext + val count: Int = indexLastRunContext["shards_count"] as Int + for (i: Int in 0 until count) { + val shard = i.toString() + + // update lastRunContext if its a temp monitor as we only want to view the last bit of data then + // TODO: If dryrun, we should make it so we limit the search as this could still potentially give us lots of data + if (isTempMonitor) { + indexLastRunContext[shard] = max(-1, (indexUpdatedRunContext[shard] as String).toInt() - 1) + } + } + // Prepare DocumentExecutionContext for each index val docExecutionContext = DocumentExecutionContext(queries, indexLastRunContext, indexUpdatedRunContext) - val matchingDocs = getMatchingDocs(monitor, monitorCtx, docExecutionContext, index, dryrun) + val matchingDocs = getMatchingDocs(monitor, monitorCtx, docExecutionContext, index) if (matchingDocs.isNotEmpty()) { val matchedQueriesForDocs = getMatchedQueries(monitorCtx, matchingDocs.map { it.second }, monitor) @@ -173,73 +160,11 @@ object DocumentReturningMonitorRunner : MonitorRunner() { } } } - -// val queryResults = getDocLevelQueryResults( -// monitorCtx, -// docExecutionContext, -// indexName, -// queries -// ) - -// // Only need to compile the query results together when executing the monitor for more than 1 index -// if (indices.size > 1) { -// val newQueryToDocIds = mutableMapOf>() -// queryResults["queryDocIds"]?.forEach { -// val docLevelQuery = it.key -// val matchingDocIds = it.value -// val existingQueryToDocIds = queryToDocIds.getOrDefault(docLevelQuery, setOf()).toMutableSet() -// existingQueryToDocIds.addAll(matchingDocIds) -// newQueryToDocIds[it.key as DocLevelQuery] = existingQueryToDocIds -// } -// queryToDocIds.putAll(newQueryToDocIds) -// -// val newDocsToQueries = mutableMapOf>() -// queryResults["docsToQueries"]?.forEach { -// val docId = it.key -// val queryIds = it.value -// val existingQueryIds = docsToQueries.getOrDefault(docId, mutableListOf()) -// existingQueryIds.addAll(queryIds) -// newDocsToQueries[docId as String] = existingQueryIds -// } -// docsToQueries.putAll(newDocsToQueries) -// } else { -// queryToDocIds.putAll(queryResults["queryDocIds"] as MutableMap>) -// docsToQueries.putAll(queryResults["docsToQueries"] as MutableMap>) -// } } } catch (e: Exception) { - logger.info("Failed to start Document-level-monitor $index. Error: ${e.message}") + logger.error("Failed to start Document-level-monitor $index. Error: ${e.message}", e) } -// val matchingDocs = getMatchingDocs(monitor, monitorCtx, docExecutionContext, index, dryrun) -// -// if (matchingDocs.isNotEmpty()) { -// val matchedQueriesForDocs = getMatchedQueries(monitorCtx, matchingDocs.map { it.second }, monitor) -// -// matchedQueriesForDocs.forEach { hit -> -// val (id, query) = Pair( -// hit.id.replace("_${monitor.id}", ""), -// ((hit.sourceAsMap["query"] as HashMap<*, *>)["query_string"] as HashMap<*, *>)["query"] -// ) -// val docLevelQuery = DocLevelQuery(id, id, query.toString()) -// -// val docIndices = hit.field("_percolator_document_slot").values.map { it.toString().toInt() } -// docIndices.forEach { idx -> -// if (queryToDocIds.containsKey(docLevelQuery)) { -// queryToDocIds[docLevelQuery]?.add(matchingDocs[idx].first) -// } else { -// queryToDocIds[docLevelQuery] = mutableSetOf(matchingDocs[idx].first) -// } -// -// if (docsToQueries.containsKey(matchingDocs[idx].first)) { -// docsToQueries[matchingDocs[idx].first]?.add(id) -// } else { -// docsToQueries[matchingDocs[idx].first] = mutableListOf(id) -// } -// } -// } -// } - val queryInputResults = queryToDocIds.mapKeys { it.key.id } monitorResult = monitorResult.copy(inputResults = InputRunResults(listOf(queryInputResults))) val queryIds = queries.map { @@ -292,7 +217,6 @@ object DocumentReturningMonitorRunner : MonitorRunner() { logger.info("trigger results") logger.info(triggerResult.triggeredDocs.toString()) - // TODO: modify findings such that there is a finding per document val findings = mutableListOf() val findingDocPairs = mutableListOf>() @@ -358,10 +282,8 @@ object DocumentReturningMonitorRunner : MonitorRunner() { ) val findingStr = finding.toXContent(XContentBuilder.builder(XContentType.JSON.xContent()), ToXContent.EMPTY_PARAMS).string() - // change this to debug. - logger.info("Findings: $findingStr") + logger.debug("Findings: $findingStr") - // todo: below is all hardcoded, temp code and added only to test. replace this with proper Findings index lifecycle management. val indexRequest = IndexRequest(FINDING_HISTORY_WRITE_INDEX) .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) .source(findingStr, XContentType.JSON) @@ -377,10 +299,6 @@ object DocumentReturningMonitorRunner : MonitorRunner() { monitorCtx: MonitorRunnerExecutionContext, index: String ): Map { - // TODO DRAFT: Is it better to get shards_count by calling getShardsCount - // as opposed to retrieving the count from lastRunContext? The number of shards - // could changing between monitor executions. -// val count: Int = lastRunContext["shards_count"] as Int val count: Int = getShardsCount(monitorCtx.clusterService!!, index) val updatedLastRunContext = lastRunContext.toMutableMap() for (i: Int in 0 until count) { @@ -455,28 +373,16 @@ object DocumentReturningMonitorRunner : MonitorRunner() { monitor: Monitor, monitorCtx: MonitorRunnerExecutionContext, docExecutionCtx: DocumentExecutionContext, - index: String, - dryrun: Boolean + index: String ): List> { - val count: Int = docExecutionCtx.lastRunContext["shards_count"] as Int + val count: Int = docExecutionCtx.updatedLastRunContext["shards_count"] as Int val matchingDocs = mutableListOf>() for (i: Int in 0 until count) { val shard = i.toString() try { logger.info("Monitor execution for shard: $shard") - val maxSeqNo: Long = docExecutionCtx.updatedLastRunContext[shard].toString().toLong() - logger.info("MaxSeqNo of shard_$shard is $maxSeqNo") - - // If dryrun, set the previous sequence number as 1 less than the max sequence number or 0 - val prevSeqNo = if (dryrun || monitor.id == Monitor.NO_ID) - max(-1, maxSeqNo - 1) - else docExecutionCtx.lastRunContext[shard].toString().toLongOrNull() - - if (dryrun) { - logger.info("it is a dryrun") - } - + val prevSeqNo = docExecutionCtx.lastRunContext[shard].toString().toLongOrNull() logger.info("prevSeq: $prevSeqNo, maxSeq: $maxSeqNo") val hits: SearchHits = searchShard( diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestGetFindingsAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestGetFindingsAction.kt index e71412a2b..69b65d142 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestGetFindingsAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestGetFindingsAction.kt @@ -39,7 +39,7 @@ class RestGetFindingsAction : BaseRestHandler() { log.info("${request.method()} ${request.path()}") val findingID: String? = request.param("findingId") - val sortString = request.param("sortString", "id.keyword") + val sortString = request.param("sortString", "id") val sortOrder = request.param("sortOrder", "asc") val missing: String? = request.param("missing") val size = request.paramAsInt("size", 20) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportExecuteMonitorAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportExecuteMonitorAction.kt index d5a52495f..830541cbb 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportExecuteMonitorAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportExecuteMonitorAction.kt @@ -42,7 +42,7 @@ import org.opensearch.tasks.Task import org.opensearch.transport.TransportService import java.time.Instant -private val log = LogManager.getLogger(TransportGetMonitorAction::class.java) +private val log = LogManager.getLogger(TransportExecuteMonitorAction::class.java) class TransportExecuteMonitorAction @Inject constructor( transportService: TransportService, diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetFindingsAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetFindingsAction.kt index f91d0efd4..a67644171 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetFindingsAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetFindingsAction.kt @@ -6,6 +6,7 @@ package org.opensearch.alerting.transport import org.apache.logging.log4j.LogManager +import org.apache.lucene.search.join.ScoreMode import org.opensearch.action.ActionListener import org.opensearch.action.get.MultiGetRequest import org.opensearch.action.search.SearchRequest @@ -90,12 +91,23 @@ class TransportGetFindingsSearchAction @Inject constructor( if (!tableProp.searchString.isNullOrBlank()) { queryBuilder - .must( + .should( QueryBuilders .queryStringQuery(tableProp.searchString) - .defaultOperator(Operator.AND) - .field("queries.tags") - .field("queries.name") + ) + .should( + QueryBuilders.nestedQuery( + "queries", + QueryBuilders.boolQuery() + .must( + QueryBuilders + .queryStringQuery(tableProp.searchString) + .defaultOperator(Operator.AND) + .field("queries.tags") + .field("queries.name") + ), + ScoreMode.Avg + ) ) } @@ -131,7 +143,7 @@ class TransportGetFindingsSearchAction @Inject constructor( mgetRequest.add(MultiGetRequest.Item(finding.index, docId)) } } - val documents = searchDocument(mgetRequest) + val documents = if (mgetRequest.items.isEmpty()) mutableMapOf() else searchDocument(mgetRequest) findings.forEach { val documentIds = it.relatedDocIds val relatedDocs = mutableListOf() diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt index 246ef3439..6157cb580 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt @@ -9,6 +9,7 @@ import org.apache.logging.log4j.LogManager import org.opensearch.OpenSearchSecurityException import org.opensearch.OpenSearchStatusException import org.opensearch.action.ActionListener +import org.opensearch.action.admin.indices.alias.get.GetAliasesRequest import org.opensearch.action.admin.indices.create.CreateIndexResponse import org.opensearch.action.bulk.BulkResponse import org.opensearch.action.get.GetRequest @@ -381,7 +382,8 @@ class TransportIndexMonitorAction @Inject constructor( private fun indexMonitor() { if (request.monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR) { val monitorIndex = (request.monitor.inputs[0] as DocLevelMonitorInput).indices[0] - val lastRunContext = DocumentReturningMonitorRunner.createRunContext(clusterService, client, monitorIndex).toMutableMap() + val lastRunContext = createFullRunContext(monitorIndex) + log.info("index last run context: $lastRunContext") request.monitor = request.monitor.copy(lastRunContext = lastRunContext) } request.monitor = request.monitor.copy(schemaVersion = IndexUtils.scheduledJobIndexSchemaVersion) @@ -511,7 +513,7 @@ class TransportIndexMonitorAction @Inject constructor( request.monitor.lastRunContext.toMutableMap().isNullOrEmpty() ) { val monitorIndex = (request.monitor.inputs[0] as DocLevelMonitorInput).indices[0] - val lastRunContext = DocumentReturningMonitorRunner.createRunContext(clusterService, client, monitorIndex).toMutableMap() + val lastRunContext = createFullRunContext(monitorIndex) request.monitor = request.monitor.copy(lastRunContext = lastRunContext) } @@ -570,6 +572,19 @@ class TransportIndexMonitorAction @Inject constructor( ) } + private fun createFullRunContext(index: String): MutableMap> { + val getAliasesRequest = GetAliasesRequest(index) + val getAliasesResponse = client.admin().indices().getAliases(getAliasesRequest).actionGet() + val aliasIndices = getAliasesResponse.aliases.keys().map { it.value } + val isAlias = aliasIndices.isNotEmpty() + val indices = if (isAlias) getAliasesResponse.aliases.keys().map { it.value } else listOf(index) + val lastRunContext = mutableMapOf>() + indices.forEach { indexName -> + lastRunContext[indexName] = DocumentReturningMonitorRunner.createRunContext(clusterService, client, indexName) + } + return lastRunContext + } + private fun checkShardsFailure(response: IndexResponse): String? { val failureReasons = StringBuilder() if (response.shardInfo.failed > 0) { diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt index badb76370..3bb84fc8a 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt @@ -7,6 +7,8 @@ package org.opensearch.alerting.util import org.apache.logging.log4j.LogManager import org.opensearch.action.ActionListener +import org.opensearch.action.admin.indices.alias.get.GetAliasesRequest +import org.opensearch.action.admin.indices.alias.get.GetAliasesResponse import org.opensearch.action.admin.indices.create.CreateIndexRequest import org.opensearch.action.admin.indices.create.CreateIndexResponse import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest @@ -69,47 +71,82 @@ class DocLevelMonitorQueries(private val client: AdminClient, private val cluste val queries: List = docLevelMonitorInput.queries val clusterState = clusterService.state() - if (clusterState.routingTable.hasIndex(index)) { - val indexMetadata = clusterState.metadata.index(index) - - if (indexMetadata.mapping() != null) { - val properties = ((indexMetadata.mapping()?.sourceAsMap?.get("properties")) as Map>) - val updatedProperties = properties.entries.associate { "${it.key}_$monitorId" to it.value }.toMutableMap() - - val updateMappingRequest = PutMappingRequest(ScheduledJob.DOC_LEVEL_QUERIES_INDEX) - updateMappingRequest.source(mapOf("properties" to updatedProperties)) - - queryClient.admin().indices().putMapping( - updateMappingRequest, - object : ActionListener { - override fun onResponse(response: AcknowledgedResponse) { - log.info("Percolation index ${ScheduledJob.DOC_LEVEL_QUERIES_INDEX} updated with new mappings") - - val request = BulkRequest().setRefreshPolicy(refreshPolicy).timeout(indexTimeout) - - queries.forEach { - var query = it.query - - properties.forEach { prop -> - query = query.replace("${prop.key}:", "${prop.key}_$monitorId:") - } - val indexRequest = IndexRequest(ScheduledJob.DOC_LEVEL_QUERIES_INDEX) - .id(it.id + "_$monitorId") - .source(mapOf("query" to mapOf("query_string" to mapOf("query" to query)), "monitor_id" to monitorId)) - request.add(indexRequest) - } - - queryClient.bulk(request, docLevelQueryIndexListener) - } - override fun onFailure(e: Exception) { - if (indexMonitorActionListener != null) { - indexMonitorActionListener.onFailure(AlertingException.wrap(e)) - } else executeMonitorActionListener?.onFailure(AlertingException.wrap(e)) + val getAliasesRequest = GetAliasesRequest(index) + queryClient.admin().indices().getAliases( + getAliasesRequest, + object : ActionListener { + override fun onResponse(getAliasesResponse: GetAliasesResponse?) { + val aliasIndices = getAliasesResponse?.aliases?.keys()?.map { it.value } + val isAlias = aliasIndices != null && aliasIndices.isNotEmpty() + val indices = if (isAlias) getAliasesResponse?.aliases?.keys()?.map { it.value } else listOf(index) + val indexRequests = mutableListOf() + log.info("indices: $indices") + indices?.forEach { indexName -> + if (clusterState.routingTable.hasIndex(indexName)) { + val indexMetadata = clusterState.metadata.index(indexName) + + if (indexMetadata.mapping() != null) { + log.info("Index name: $indexName") + val properties = ( + (indexMetadata.mapping()?.sourceAsMap?.get("properties")) + as Map> + ) + val updatedProperties = properties.entries.associate { "${it.key}_$monitorId" to it.value }.toMutableMap() + + val updateMappingRequest = PutMappingRequest(ScheduledJob.DOC_LEVEL_QUERIES_INDEX) + updateMappingRequest.source(mapOf("properties" to updatedProperties)) + + queryClient.admin().indices().putMapping( + updateMappingRequest, + object : ActionListener { + override fun onResponse(response: AcknowledgedResponse) { + + queries.forEach { + var query = it.query + + properties.forEach { prop -> + query = query.replace("${prop.key}:", "${prop.key}_$monitorId:") + } + val indexRequest = IndexRequest(ScheduledJob.DOC_LEVEL_QUERIES_INDEX) + .id(it.id + "_$monitorId") + .source( + mapOf( + "query" to mapOf("query_string" to mapOf("query" to query)), + "monitor_id" to monitorId + ) + ) + indexRequests.add(indexRequest) + } + if (indexRequests.isNotEmpty()) { + queryClient.bulk( + BulkRequest().setRefreshPolicy(refreshPolicy).timeout(indexTimeout).add(indexRequests), + docLevelQueryIndexListener + ) + } + return + } + + override fun onFailure(e: Exception) { + log.error("This is a failure", e) + if (indexMonitorActionListener != null) { + indexMonitorActionListener.onFailure(AlertingException.wrap(e)) + } else executeMonitorActionListener?.onFailure(AlertingException.wrap(e)) + return + } + } + ) + } } } - ) + } + + override fun onFailure(e: Exception) { + if (indexMonitorActionListener != null) { + indexMonitorActionListener.onFailure(AlertingException.wrap(e)) + } else executeMonitorActionListener?.onFailure(AlertingException.wrap(e)) + } } - } + ) } } diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt b/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt index b821f9917..49d1718d7 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt @@ -18,6 +18,7 @@ import org.opensearch.alerting.AlertingPlugin.Companion.EMAIL_ACCOUNT_BASE_URI import org.opensearch.alerting.AlertingPlugin.Companion.EMAIL_GROUP_BASE_URI import org.opensearch.alerting.action.GetFindingsResponse import org.opensearch.alerting.alerts.AlertIndices +import org.opensearch.alerting.alerts.AlertIndices.Companion.FINDING_HISTORY_WRITE_INDEX import org.opensearch.alerting.core.model.DocLevelMonitorInput import org.opensearch.alerting.core.model.DocLevelQuery import org.opensearch.alerting.core.model.ScheduledJob @@ -574,7 +575,7 @@ abstract class AlertingRestTestCase : ODFERestTestCase() { val findingStr = finding.toXContent(XContentBuilder.builder(XContentType.JSON.xContent()), ToXContent.EMPTY_PARAMS).string() - indexDoc(".opensearch-alerting-findings", finding.id, findingStr) + indexDoc(FINDING_HISTORY_WRITE_INDEX, finding.id, findingStr) return finding.id } @@ -787,7 +788,7 @@ abstract class AlertingRestTestCase : ODFERestTestCase() { protected fun createTestAlias( alias: String = randomAlphaOfLength(10).toLowerCase(Locale.ROOT), numOfAliasIndices: Int = randomIntBetween(1, 10), - includeWriteIndex: Boolean = randomBoolean() + includeWriteIndex: Boolean = true ): MutableMap> { return createTestAlias(alias = alias, indices = randomAliasIndices(alias, numOfAliasIndices, includeWriteIndex)) } @@ -797,9 +798,11 @@ abstract class AlertingRestTestCase : ODFERestTestCase() { indices: Map = randomAliasIndices( alias = alias, num = randomIntBetween(1, 10), - includeWriteIndex = randomBoolean() + includeWriteIndex = true ) ): MutableMap> { + logger.info("number of indices behind alias: ${indices.size}") + logger.info("the alias indices: $indices") val indicesMap = mutableMapOf() val indicesJson = jsonBuilder().startObject().startArray("actions") indices.keys.map { diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt index bb401dd5d..b017a7830 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt @@ -7,6 +7,9 @@ package org.opensearch.alerting import org.opensearch.alerting.core.model.DocLevelMonitorInput import org.opensearch.alerting.core.model.DocLevelQuery +import org.opensearch.client.Response +import org.opensearch.client.ResponseException +import org.opensearch.script.Script import java.time.ZonedDateTime import java.time.format.DateTimeFormatter import java.time.temporal.ChronoUnit.MILLIS @@ -83,7 +86,7 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { @Suppress("UNCHECKED_CAST") val matchingDocsToQuery = searchResult[docQuery.id] as List assertEquals("Incorrect search result", 1, matchingDocsToQuery.size) - assertTrue("Incorrect search result", matchingDocsToQuery.contains("5")) + assertTrue("Incorrect search result", matchingDocsToQuery.contains("5|$testIndex")) } fun `test execute monitor generates alerts and findings`() { @@ -103,7 +106,6 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { val monitor = createMonitor(randomDocumentLevelMonitor(inputs = listOf(docReturningInput), triggers = listOf(trigger))) assertNotNull(monitor.id) - Thread.sleep(2000) indexDoc(testIndex, "1", testDoc) indexDoc(testIndex, "5", testDoc) @@ -117,7 +119,7 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { @Suppress("UNCHECKED_CAST") val matchingDocsToQuery = searchResult[docQuery.id] as List assertEquals("Incorrect search result", 2, matchingDocsToQuery.size) - assertTrue("Incorrect search result", matchingDocsToQuery.containsAll(listOf("1", "5"))) + assertTrue("Incorrect search result", matchingDocsToQuery.containsAll(listOf("1|$testIndex", "5|$testIndex"))) val alerts = searchAlertsWithFilter(monitor) assertEquals("Alert saved for test monitor", 2, alerts.size) @@ -129,6 +131,193 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { assertTrue("Findings saved for test monitor", findings[1].relatedDocIds.contains("5")) } + fun `test document-level monitor when alias only has write index with 0 docs`() { + // Monitor should execute, but create 0 findings. + val alias = createTestAlias(includeWriteIndex = false) + val indices = alias[alias.keys.first()]?.keys?.toList() as List + val query = randomDocLevelQuery(tags = listOf()) + val input = randomDocLevelMonitorInput(indices = indices, queries = listOf(query)) + val trigger = randomDocLevelTrigger(condition = Script("params${query.id}")) + val monitor = createMonitor(randomDocumentLevelMonitor(enabled = false, inputs = listOf(input), triggers = listOf(trigger))) + + val response: Response + try { + response = executeMonitor(monitor.id) + } catch (e: ResponseException) { + assertNotNull("Expected an error message: $e", e.message) + e.message?.let { + assertTrue("Unexpected exception: $e", it.contains("""reason":"no such index [.opensearch-alerting-findings]""")) + } + assertEquals(404, e.response.statusLine.statusCode) + return + } + + val output = entityAsMap(response) + val inputResults = output.stringMap("input_results") + val errorMessage = inputResults?.get("error") + @Suppress("UNCHECKED_CAST") + val searchResult = (inputResults?.get("results") as List>).firstOrNull() + @Suppress("UNCHECKED_CAST") + val findings = if (searchResult == null) listOf() + else searchResult?.stringMap("hits")?.getOrDefault("hits", listOf>>()) as List> + + assertEquals(monitor.name, output["monitor_name"]) + assertNull("Unexpected monitor execution failure: $errorMessage", errorMessage) + findings.forEach { + val findingQueryId = it["queryId"] + assertNotEquals("No findings should exist with queryId ${query.id}, but found: $it", query.id, findingQueryId) + } + } + + fun `test document-level monitor when docs exist prior to monitor creation`() { + // FIXME: Consider renaming this test case + // Only new docs should create findings. + val alias = createTestAlias(includeWriteIndex = false) + val indices = alias[alias.keys.first()]?.keys?.toList() as List + val query = randomDocLevelQuery(tags = listOf()) + val input = randomDocLevelMonitorInput(indices = indices, queries = listOf(query)) + val trigger = randomDocLevelTrigger(condition = Script("params${query.id}")) + + val preExistingDocIds = mutableSetOf() + indices.forEach { index -> + val docId = index.hashCode().toString() + val doc = """{ "message" : "${query.query}" }""" + preExistingDocIds.add(docId) + indexDoc(index = index, id = docId, doc = doc) + } + assertEquals(indices.size, preExistingDocIds.size) + + val monitor = createMonitor(randomDocumentLevelMonitor(enabled = false, inputs = listOf(input), triggers = listOf(trigger))) + + val response = executeMonitor(monitor.id) + + val output = entityAsMap(response) + val inputResults = output.stringMap("input_results") + val errorMessage = inputResults?.get("error") + @Suppress("UNCHECKED_CAST") + val searchResult = (inputResults?.get("results") as List>).firstOrNull() + @Suppress("UNCHECKED_CAST") + val findings = if (searchResult == null) listOf() + else searchResult?.stringMap("hits")?.getOrDefault("hits", listOf>>()) as List> + + assertEquals(monitor.name, output["monitor_name"]) + assertNull("Unexpected monitor execution failure: $errorMessage", errorMessage) + findings.forEach { + val findingDocId = it["id"] as String + assertFalse("Findings index should not contain a pre-existing doc, but found $it", preExistingDocIds.contains(findingDocId)) + } + } + + fun `test document-level monitor when alias indices only contain docs that match query`() { + // Only new docs should create findings. + val alias = createTestAlias(includeWriteIndex = false) + val indices = alias[alias.keys.first()]?.keys?.toList() as List + val query = randomDocLevelQuery(tags = listOf()) + val input = randomDocLevelMonitorInput(indices = indices, queries = listOf(query)) + val trigger = randomDocLevelTrigger(condition = Script("params${query.id}")) + + val preExistingDocIds = mutableSetOf() + indices.forEach { index -> + val docId = index.hashCode().toString() + val doc = """{ "message" : "${query.query}" }""" + preExistingDocIds.add(docId) + indexDoc(index = index, id = docId, doc = doc) + } + assertEquals(indices.size, preExistingDocIds.size) + + val monitor = createMonitor(randomDocumentLevelMonitor(enabled = false, inputs = listOf(input), triggers = listOf(trigger))) + executeMonitor(monitor.id) + + val newDocIds = mutableSetOf() + indices.forEach { index -> + (1..5).map { + val docId = "${index.hashCode()}$it" + val doc = """{ "message" : "${query.query}" }""" + newDocIds.add(docId) + indexDoc(index = index, id = docId, doc = doc) + } + } + assertEquals(indices.size * 5, newDocIds.size) + + val response = executeMonitor(monitor.id) + + val output = entityAsMap(response) + val inputResults = output.stringMap("input_results") + val errorMessage = inputResults?.get("error") + @Suppress("UNCHECKED_CAST") + val searchResult = (inputResults?.get("results") as List>).firstOrNull() + @Suppress("UNCHECKED_CAST") + val findings = if (searchResult == null) listOf() + else searchResult?.stringMap("hits")?.getOrDefault("hits", listOf>>()) as List> + + assertEquals(monitor.name, output["monitor_name"]) + assertNull("Unexpected monitor execution failure: $errorMessage", errorMessage) + findings.forEach { + val findingDocId = it["id"] as String + assertFalse("Findings index should not contain a pre-existing doc, but found $it", preExistingDocIds.contains(findingDocId)) + assertTrue("Found an unexpected finding $it", newDocIds.contains(findingDocId)) + } + } + + fun `test document-level monitor when alias indices contain docs that do and do not match query`() { + // Only matching docs should create findings. + val alias = createTestAlias(includeWriteIndex = false) + val indices = alias[alias.keys.first()]?.keys?.toList() as List + val query = randomDocLevelQuery(tags = listOf()) + val input = randomDocLevelMonitorInput(indices = indices, queries = listOf(query)) + val trigger = randomDocLevelTrigger(condition = Script("params${query.id}")) + + val preExistingDocIds = mutableSetOf() + indices.forEach { index -> + val docId = index.hashCode().toString() + val doc = """{ "message" : "${query.query}" }""" + preExistingDocIds.add(docId) + indexDoc(index = index, id = docId, doc = doc) + } + assertEquals(indices.size, preExistingDocIds.size) + + val monitor = createMonitor(randomDocumentLevelMonitor(enabled = false, inputs = listOf(input), triggers = listOf(trigger))) + executeMonitor(monitor.id) + + val matchingDocIds = mutableSetOf() + val nonMatchingDocIds = mutableSetOf() + indices.forEach { index -> + (1..5).map { + val matchingDocId = "${index.hashCode()}$it" + val matchingDoc = """{ "message" : "${query.query}" }""" + indexDoc(index = index, id = matchingDocId, doc = matchingDoc) + matchingDocIds.add(matchingDocId) + + val nonMatchingDocId = "${index.hashCode()}${it}2" + var nonMatchingDoc = StringBuilder(query.query).insert(2, "difference").toString() + nonMatchingDoc = """{ "message" : "$nonMatchingDoc" }""" + indexDoc(index = index, id = nonMatchingDocId, doc = nonMatchingDoc) + nonMatchingDocIds.add(nonMatchingDocId) + } + } + assertEquals(indices.size * 5, matchingDocIds.size) + + val response = executeMonitor(monitor.id) + + val output = entityAsMap(response) + val inputResults = output.stringMap("input_results") + val errorMessage = inputResults?.get("error") + @Suppress("UNCHECKED_CAST") + val searchResult = (inputResults?.get("results") as List>).firstOrNull() + @Suppress("UNCHECKED_CAST") + val findings = if (searchResult == null) listOf() + else searchResult?.stringMap("hits")?.getOrDefault("hits", listOf>>()) as List> + + assertEquals(monitor.name, output["monitor_name"]) + assertNull("Unexpected monitor execution failure: $errorMessage", errorMessage) + findings.forEach { + val findingDocId = it["id"] as String + assertFalse("Findings index should not contain a pre-existing doc, but found $it", preExistingDocIds.contains(findingDocId)) + assertFalse("Found doc that doesn't match query: $it", nonMatchingDocIds.contains(findingDocId)) + assertTrue("Found an unexpected finding $it", matchingDocIds.contains(findingDocId)) + } + } + @Suppress("UNCHECKED_CAST") /** helper that returns a field in a json map whose values are all json objects */ private fun Map.objectMap(key: String): Map> { diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerServiceIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerServiceIT.kt index 0b7c6c3cb..540fd166c 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerServiceIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerServiceIT.kt @@ -29,7 +29,6 @@ import org.opensearch.alerting.model.destination.email.Email import org.opensearch.alerting.model.destination.email.Recipient import org.opensearch.alerting.util.DestinationType import org.opensearch.alerting.util.getBucketKeysHash -import org.opensearch.client.Response import org.opensearch.client.ResponseException import org.opensearch.client.WarningFailureException import org.opensearch.common.settings.Settings @@ -1658,193 +1657,6 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() { } } - fun `test document-level monitor when alias only has write index with 0 docs`() { - // Monitor should execute, but create 0 findings. - val alias = createTestAlias(includeWriteIndex = false) - val indices = alias[alias.keys.first()]?.keys?.toList() as List - val query = randomDocLevelQuery(tags = listOf()) - val input = randomDocLevelMonitorInput(indices = indices, queries = listOf(query)) - val trigger = randomDocLevelTrigger(condition = Script("params${query.id}")) - val monitor = createMonitor(randomDocumentLevelMonitor(enabled = false, inputs = listOf(input), triggers = listOf(trigger))) - - val response: Response - try { - response = executeMonitor(monitor.id) - } catch (e: ResponseException) { - assertNotNull("Expected an error message: $e", e.message) - e.message?.let { - assertTrue("Unexpected exception: $e", it.contains("""reason":"no such index [.opensearch-alerting-findings]""")) - } - assertEquals(404, e.response.statusLine.statusCode) - return - } - - val output = entityAsMap(response) - val inputResults = output.stringMap("input_results") - val errorMessage = inputResults?.get("error") - @Suppress("UNCHECKED_CAST") - val searchResult = (inputResults?.get("results") as List>).firstOrNull() - @Suppress("UNCHECKED_CAST") - val findings = if (searchResult == null) listOf() - else searchResult?.stringMap("hits")?.getOrDefault("hits", listOf>>()) as List> - - assertEquals(monitor.name, output["monitor_name"]) - assertNull("Unexpected monitor execution failure: $errorMessage", errorMessage) - findings.forEach { - val findingQueryId = it["queryId"] - assertNotEquals("No findings should exist with queryId ${query.id}, but found: $it", query.id, findingQueryId) - } - } - - fun `test document-level monitor when docs exist prior to monitor creation`() { - // FIXME: Consider renaming this test case - // Only new docs should create findings. - val alias = createTestAlias(includeWriteIndex = false) - val indices = alias[alias.keys.first()]?.keys?.toList() as List - val query = randomDocLevelQuery(tags = listOf()) - val input = randomDocLevelMonitorInput(indices = indices, queries = listOf(query)) - val trigger = randomDocLevelTrigger(condition = Script("params${query.id}")) - - val preExistingDocIds = mutableSetOf() - indices.forEach { index -> - val docId = index.hashCode().toString() - val doc = """{ "message" : "${query.query}" }""" - preExistingDocIds.add(docId) - indexDoc(index = index, id = docId, doc = doc) - } - assertEquals(indices.size, preExistingDocIds.size) - - val monitor = createMonitor(randomDocumentLevelMonitor(enabled = false, inputs = listOf(input), triggers = listOf(trigger))) - - val response = executeMonitor(monitor.id) - - val output = entityAsMap(response) - val inputResults = output.stringMap("input_results") - val errorMessage = inputResults?.get("error") - @Suppress("UNCHECKED_CAST") - val searchResult = (inputResults?.get("results") as List>).firstOrNull() - @Suppress("UNCHECKED_CAST") - val findings = if (searchResult == null) listOf() - else searchResult?.stringMap("hits")?.getOrDefault("hits", listOf>>()) as List> - - assertEquals(monitor.name, output["monitor_name"]) - assertNull("Unexpected monitor execution failure: $errorMessage", errorMessage) - findings.forEach { - val findingDocId = it["id"] as String - assertFalse("Findings index should not contain a pre-existing doc, but found $it", preExistingDocIds.contains(findingDocId)) - } - } - - fun `test document-level monitor when alias indices only contain docs that match query`() { - // Only new docs should create findings. - val alias = createTestAlias(includeWriteIndex = false) - val indices = alias[alias.keys.first()]?.keys?.toList() as List - val query = randomDocLevelQuery(tags = listOf()) - val input = randomDocLevelMonitorInput(indices = indices, queries = listOf(query)) - val trigger = randomDocLevelTrigger(condition = Script("params${query.id}")) - - val preExistingDocIds = mutableSetOf() - indices.forEach { index -> - val docId = index.hashCode().toString() - val doc = """{ "message" : "${query.query}" }""" - preExistingDocIds.add(docId) - indexDoc(index = index, id = docId, doc = doc) - } - assertEquals(indices.size, preExistingDocIds.size) - - val monitor = createMonitor(randomDocumentLevelMonitor(enabled = false, inputs = listOf(input), triggers = listOf(trigger))) - executeMonitor(monitor.id) - - val newDocIds = mutableSetOf() - indices.forEach { index -> - (1..5).map { - val docId = "${index.hashCode()}$it" - val doc = """{ "message" : "${query.query}" }""" - newDocIds.add(docId) - indexDoc(index = index, id = docId, doc = doc) - } - } - assertEquals(indices.size * 5, newDocIds.size) - - val response = executeMonitor(monitor.id) - - val output = entityAsMap(response) - val inputResults = output.stringMap("input_results") - val errorMessage = inputResults?.get("error") - @Suppress("UNCHECKED_CAST") - val searchResult = (inputResults?.get("results") as List>).firstOrNull() - @Suppress("UNCHECKED_CAST") - val findings = if (searchResult == null) listOf() - else searchResult?.stringMap("hits")?.getOrDefault("hits", listOf>>()) as List> - - assertEquals(monitor.name, output["monitor_name"]) - assertNull("Unexpected monitor execution failure: $errorMessage", errorMessage) - findings.forEach { - val findingDocId = it["id"] as String - assertFalse("Findings index should not contain a pre-existing doc, but found $it", preExistingDocIds.contains(findingDocId)) - assertTrue("Found an unexpected finding $it", newDocIds.contains(findingDocId)) - } - } - - fun `test document-level monitor when alias indices contain docs that do and do not match query`() { - // Only matching docs should create findings. - val alias = createTestAlias(includeWriteIndex = false) - val indices = alias[alias.keys.first()]?.keys?.toList() as List - val query = randomDocLevelQuery(tags = listOf()) - val input = randomDocLevelMonitorInput(indices = indices, queries = listOf(query)) - val trigger = randomDocLevelTrigger(condition = Script("params${query.id}")) - - val preExistingDocIds = mutableSetOf() - indices.forEach { index -> - val docId = index.hashCode().toString() - val doc = """{ "message" : "${query.query}" }""" - preExistingDocIds.add(docId) - indexDoc(index = index, id = docId, doc = doc) - } - assertEquals(indices.size, preExistingDocIds.size) - - val monitor = createMonitor(randomDocumentLevelMonitor(enabled = false, inputs = listOf(input), triggers = listOf(trigger))) - executeMonitor(monitor.id) - - val matchingDocIds = mutableSetOf() - val nonMatchingDocIds = mutableSetOf() - indices.forEach { index -> - (1..5).map { - val matchingDocId = "${index.hashCode()}$it" - val matchingDoc = """{ "message" : "${query.query}" }""" - indexDoc(index = index, id = matchingDocId, doc = matchingDoc) - matchingDocIds.add(matchingDocId) - - val nonMatchingDocId = "${index.hashCode()}${it}2" - var nonMatchingDoc = StringBuilder(query.query).insert(2, "difference").toString() - nonMatchingDoc = """{ "message" : "$nonMatchingDoc" }""" - indexDoc(index = index, id = nonMatchingDocId, doc = nonMatchingDoc) - nonMatchingDocIds.add(nonMatchingDocId) - } - } - assertEquals(indices.size * 5, matchingDocIds.size) - - val response = executeMonitor(monitor.id) - - val output = entityAsMap(response) - val inputResults = output.stringMap("input_results") - val errorMessage = inputResults?.get("error") - @Suppress("UNCHECKED_CAST") - val searchResult = (inputResults?.get("results") as List>).firstOrNull() - @Suppress("UNCHECKED_CAST") - val findings = if (searchResult == null) listOf() - else searchResult?.stringMap("hits")?.getOrDefault("hits", listOf>>()) as List> - - assertEquals(monitor.name, output["monitor_name"]) - assertNull("Unexpected monitor execution failure: $errorMessage", errorMessage) - findings.forEach { - val findingDocId = it["id"] as String - assertFalse("Findings index should not contain a pre-existing doc, but found $it", preExistingDocIds.contains(findingDocId)) - assertFalse("Found doc that doesn't match query: $it", nonMatchingDocIds.contains(findingDocId)) - assertTrue("Found an unexpected finding $it", matchingDocIds.contains(findingDocId)) - } - } - private fun prepareTestAnomalyResult(detectorId: String, user: User) { val adResultIndex = ".opendistro-anomaly-results-history-2020.10.17" try { diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/FindingsRestApiIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/FindingsRestApiIT.kt index b48235330..6a425c089 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/FindingsRestApiIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/FindingsRestApiIT.kt @@ -5,8 +5,12 @@ package org.opensearch.alerting.resthandler +import org.opensearch.alerting.ALWAYS_RUN import org.opensearch.alerting.AlertingRestTestCase +import org.opensearch.alerting.core.model.DocLevelMonitorInput import org.opensearch.alerting.core.model.DocLevelQuery +import org.opensearch.alerting.randomDocumentLevelMonitor +import org.opensearch.alerting.randomDocumentLevelTrigger import org.opensearch.test.junit.annotations.TestLogging @TestLogging("level:DEBUG", reason = "Debug for tests.") @@ -14,7 +18,12 @@ import org.opensearch.test.junit.annotations.TestLogging class FindingsRestApiIT : AlertingRestTestCase() { fun `test find Finding where doc is not retrieved`() { - + val testIndex = createTestIndex() + val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3") + val docReturningInput = DocLevelMonitorInput("description", listOf(testIndex), listOf(docQuery)) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val trueMonitor = createMonitor(randomDocumentLevelMonitor(inputs = listOf(docReturningInput), triggers = listOf(trigger))) + executeMonitor(trueMonitor.id, mapOf(Pair("dryrun", "true"))) createFinding(matchingDocIds = listOf("someId")) val response = searchFindings() assertEquals(1, response.totalFindings) @@ -35,6 +44,12 @@ class FindingsRestApiIT : AlertingRestTestCase() { }""" indexDoc(testIndex, "someId2", testDoc2) + val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3") + val docReturningInput = DocLevelMonitorInput("description", listOf(testIndex), listOf(docQuery)) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val trueMonitor = createMonitor(randomDocumentLevelMonitor(inputs = listOf(docReturningInput), triggers = listOf(trigger))) + executeMonitor(trueMonitor.id, mapOf(Pair("dryrun", "true"))) + val findingWith1 = createFinding(matchingDocIds = listOf("someId"), index = testIndex) val findingWith2 = createFinding(matchingDocIds = listOf("someId", "someId2"), index = testIndex) val response = searchFindings() @@ -69,6 +84,12 @@ class FindingsRestApiIT : AlertingRestTestCase() { }""" indexDoc(testIndex, "someId2", testDoc2) + val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3") + val docReturningInput = DocLevelMonitorInput("description", listOf(testIndex), listOf(docQuery)) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val trueMonitor = createMonitor(randomDocumentLevelMonitor(inputs = listOf(docReturningInput), triggers = listOf(trigger))) + executeMonitor(trueMonitor.id, mapOf(Pair("dryrun", "true"))) + createFinding(matchingDocIds = listOf("someId"), index = testIndex) val findingId = createFinding(matchingDocIds = listOf("someId", "someId2"), index = testIndex) val response = searchFindings(mapOf(Pair("findingId", findingId))) @@ -95,6 +116,12 @@ class FindingsRestApiIT : AlertingRestTestCase() { indexDoc(testIndex, "someId2", testDoc2) val docLevelQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "realQuery", tags = listOf("sigma")) + val docReturningInput = DocLevelMonitorInput("description", listOf(testIndex), listOf(docLevelQuery)) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val trueMonitor = createMonitor(randomDocumentLevelMonitor(inputs = listOf(docReturningInput), triggers = listOf(trigger))) + executeMonitor(trueMonitor.id, mapOf(Pair("dryrun", "true"))) + +// val docLevelQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "realQuery", tags = listOf("sigma")) createFinding(matchingDocIds = listOf("someId"), index = testIndex) val findingId = createFinding( matchingDocIds = listOf("someId", "someId2"), @@ -125,6 +152,11 @@ class FindingsRestApiIT : AlertingRestTestCase() { indexDoc(testIndex, "someId2", testDoc2) val docLevelQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "realQuery", tags = listOf("sigma")) + val docReturningInput = DocLevelMonitorInput("description", listOf(testIndex), listOf(docLevelQuery)) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val trueMonitor = createMonitor(randomDocumentLevelMonitor(inputs = listOf(docReturningInput), triggers = listOf(trigger))) + executeMonitor(trueMonitor.id, mapOf(Pair("dryrun", "true"))) + createFinding(matchingDocIds = listOf("someId"), index = testIndex) val findingId = createFinding( matchingDocIds = listOf("someId", "someId2"), @@ -140,4 +172,40 @@ class FindingsRestApiIT : AlertingRestTestCase() { assertEquals(testDoc, response.findings[0].documents[0].document) assertEquals(testDoc2, response.findings[0].documents[1].document) } + + fun `test find Finding by monitor id`() { + val testIndex = createTestIndex() + val testDoc = """{ + "message" : "This is an error from IAD region", + "test_field" : "us-west-2" + }""" + indexDoc(testIndex, "someId", testDoc) + val testDoc2 = """{ + "message" : "This is an error2 from IAD region", + "test_field" : "us-west-3" + }""" + indexDoc(testIndex, "someId2", testDoc2) + + val docLevelQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "realQuery", tags = listOf("sigma")) + val docReturningInput = DocLevelMonitorInput("description", listOf(testIndex), listOf(docLevelQuery)) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val trueMonitor = createMonitor(randomDocumentLevelMonitor(inputs = listOf(docReturningInput), triggers = listOf(trigger))) + executeMonitor(trueMonitor.id, mapOf(Pair("dryrun", "true"))) + + createFinding(matchingDocIds = listOf("someId"), index = testIndex) + val findingId = createFinding( + monitorId = "monitorToFind", + matchingDocIds = listOf("someId", "someId2"), + index = testIndex, + docLevelQueries = listOf(docLevelQuery) + ) + val response = searchFindings(mapOf(Pair("searchString", "monitorToFind"))) + assertEquals(1, response.totalFindings) + assertEquals(findingId, response.findings[0].finding.id) + assertEquals(2, response.findings[0].documents.size) + assertTrue(response.findings[0].documents[0].found) + assertTrue(response.findings[0].documents[1].found) + assertEquals(testDoc, response.findings[0].documents[0].document) + assertEquals(testDoc2, response.findings[0].documents[1].document) + } } diff --git a/core/src/main/kotlin/org/opensearch/alerting/core/model/ScheduledJob.kt b/core/src/main/kotlin/org/opensearch/alerting/core/model/ScheduledJob.kt index 95e48d7e5..fb595d9f0 100644 --- a/core/src/main/kotlin/org/opensearch/alerting/core/model/ScheduledJob.kt +++ b/core/src/main/kotlin/org/opensearch/alerting/core/model/ScheduledJob.kt @@ -36,7 +36,7 @@ interface ScheduledJob : Writeable, ToXContentObject { companion object { /** The name of the ElasticSearch index in which we store jobs */ const val SCHEDULED_JOBS_INDEX = ".opendistro-alerting-config" - const val DOC_LEVEL_QUERIES_INDEX = ".opendistro-alerting-queries" + const val DOC_LEVEL_QUERIES_INDEX = ".opensearch-alerting-queries" const val NO_ID = "" From 090da721b822dada36bb87438d6c48ede3b10cd5 Mon Sep 17 00:00:00 2001 From: Ashish Agrawal Date: Wed, 20 Apr 2022 09:21:16 -0700 Subject: [PATCH 03/11] remove comment Signed-off-by: Ashish Agrawal --- .../org/opensearch/alerting/resthandler/FindingsRestApiIT.kt | 1 - 1 file changed, 1 deletion(-) diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/FindingsRestApiIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/FindingsRestApiIT.kt index 6a425c089..494ed2615 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/FindingsRestApiIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/FindingsRestApiIT.kt @@ -121,7 +121,6 @@ class FindingsRestApiIT : AlertingRestTestCase() { val trueMonitor = createMonitor(randomDocumentLevelMonitor(inputs = listOf(docReturningInput), triggers = listOf(trigger))) executeMonitor(trueMonitor.id, mapOf(Pair("dryrun", "true"))) -// val docLevelQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "realQuery", tags = listOf("sigma")) createFinding(matchingDocIds = listOf("someId"), index = testIndex) val findingId = createFinding( matchingDocIds = listOf("someId", "someId2"), From 1217669e24433de40491bc61b945e9cbcd0f0cf3 Mon Sep 17 00:00:00 2001 From: Ashish Agrawal Date: Wed, 20 Apr 2022 09:46:55 -0700 Subject: [PATCH 04/11] fix integ test Signed-off-by: Ashish Agrawal --- .../kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt index b017a7830..d6296995e 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt @@ -172,7 +172,7 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { fun `test document-level monitor when docs exist prior to monitor creation`() { // FIXME: Consider renaming this test case // Only new docs should create findings. - val alias = createTestAlias(includeWriteIndex = false) + val alias = createTestAlias(includeWriteIndex = true) val indices = alias[alias.keys.first()]?.keys?.toList() as List val query = randomDocLevelQuery(tags = listOf()) val input = randomDocLevelMonitorInput(indices = indices, queries = listOf(query)) From fd3ef18c6eed694737ce7736485e927a3e4e69a5 Mon Sep 17 00:00:00 2001 From: Ashish Agrawal Date: Wed, 20 Apr 2022 11:34:53 -0700 Subject: [PATCH 05/11] fix percolate stuff Signed-off-by: Ashish Agrawal --- .../alerting/DocumentReturningMonitorRunner.kt | 17 +++++++++-------- .../alerting/util/DocLevelMonitorQueries.kt | 9 +++++---- 2 files changed, 14 insertions(+), 12 deletions(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentReturningMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentReturningMonitorRunner.kt index e7621fc82..812b4d9f9 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentReturningMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentReturningMonitorRunner.kt @@ -131,14 +131,14 @@ object DocumentReturningMonitorRunner : MonitorRunner() { // Prepare DocumentExecutionContext for each index val docExecutionContext = DocumentExecutionContext(queries, indexLastRunContext, indexUpdatedRunContext) - val matchingDocs = getMatchingDocs(monitor, monitorCtx, docExecutionContext, index) + val matchingDocs = getMatchingDocs(monitor, monitorCtx, docExecutionContext, indexName) if (matchingDocs.isNotEmpty()) { - val matchedQueriesForDocs = getMatchedQueries(monitorCtx, matchingDocs.map { it.second }, monitor) + val matchedQueriesForDocs = getMatchedQueries(monitorCtx, matchingDocs.map { it.second }, monitor, indexName) matchedQueriesForDocs.forEach { hit -> val (id, query) = Pair( - hit.id.replace("_${monitor.id}", ""), + hit.id.replace("_${indexName}_${monitor.id}", ""), ((hit.sourceAsMap["query"] as HashMap<*, *>)["query_string"] as HashMap<*, *>)["query"] ) val docLevelQuery = DocLevelQuery(id, id, query.toString()) @@ -396,7 +396,7 @@ object DocumentReturningMonitorRunner : MonitorRunner() { logger.info("Search hits for shard_$shard is: ${hits.hits.size}") if (hits.hits.isNotEmpty()) { - matchingDocs.addAll(getAllDocs(hits, monitor.id)) + matchingDocs.addAll(getAllDocs(hits, index, monitor.id)) } } catch (e: Exception) { logger.info("Failed to run for shard $shard. Error: ${e.message}") @@ -443,9 +443,10 @@ object DocumentReturningMonitorRunner : MonitorRunner() { private fun getMatchedQueries( monitorCtx: MonitorRunnerExecutionContext, docs: List, - monitor: Monitor + monitor: Monitor, + index: String ): SearchHits { - val boolQueryBuilder = BoolQueryBuilder() + val boolQueryBuilder = BoolQueryBuilder().filter(QueryBuilders.matchQuery("index", index)) val percolateQueryBuilder = PercolateQueryBuilderExt("query", docs, XContentType.JSON) if (monitor.id.isNotEmpty()) { @@ -466,13 +467,13 @@ object DocumentReturningMonitorRunner : MonitorRunner() { return response.hits } - private fun getAllDocs(hits: SearchHits, monitorId: String): List> { + private fun getAllDocs(hits: SearchHits, index: String, monitorId: String): List> { return hits.map { hit -> val sourceMap = hit.sourceAsMap var xContentBuilder = XContentFactory.jsonBuilder().startObject() sourceMap.forEach { (k, v) -> - xContentBuilder = xContentBuilder.field("${k}_$monitorId", v) + xContentBuilder = xContentBuilder.field("${k}_${index}_$monitorId", v) } xContentBuilder = xContentBuilder.endObject() diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt index 3bb84fc8a..2070534cd 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt @@ -92,7 +92,7 @@ class DocLevelMonitorQueries(private val client: AdminClient, private val cluste (indexMetadata.mapping()?.sourceAsMap?.get("properties")) as Map> ) - val updatedProperties = properties.entries.associate { "${it.key}_$monitorId" to it.value }.toMutableMap() + val updatedProperties = properties.entries.associate { "${it.key}_${indexName}_$monitorId" to it.value }.toMutableMap() val updateMappingRequest = PutMappingRequest(ScheduledJob.DOC_LEVEL_QUERIES_INDEX) updateMappingRequest.source(mapOf("properties" to updatedProperties)) @@ -106,14 +106,15 @@ class DocLevelMonitorQueries(private val client: AdminClient, private val cluste var query = it.query properties.forEach { prop -> - query = query.replace("${prop.key}:", "${prop.key}_$monitorId:") + query = query.replace("${prop.key}:", "${prop.key}_${indexName}_$monitorId:") } val indexRequest = IndexRequest(ScheduledJob.DOC_LEVEL_QUERIES_INDEX) - .id(it.id + "_$monitorId") + .id(it.id + "_${indexName}_$monitorId") .source( mapOf( "query" to mapOf("query_string" to mapOf("query" to query)), - "monitor_id" to monitorId + "monitor_id" to monitorId, + "index" to indexName ) ) indexRequests.add(indexRequest) From 9c794fa09cbe6b16fd4c8e80d394144f71c3758d Mon Sep 17 00:00:00 2001 From: Ashish Agrawal Date: Wed, 20 Apr 2022 15:24:53 -0700 Subject: [PATCH 06/11] minor fixes Signed-off-by: Ashish Agrawal --- .../org/opensearch/alerting/DocumentReturningMonitorRunner.kt | 1 + .../resources/org/opensearch/alerting/alerts/alert_mapping.json | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentReturningMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentReturningMonitorRunner.kt index 812b4d9f9..2287854ab 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentReturningMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentReturningMonitorRunner.kt @@ -163,6 +163,7 @@ object DocumentReturningMonitorRunner : MonitorRunner() { } } catch (e: Exception) { logger.error("Failed to start Document-level-monitor $index. Error: ${e.message}", e) + return monitorResult.copy(error = e) } val queryInputResults = queryToDocIds.mapKeys { it.key.id } diff --git a/alerting/src/main/resources/org/opensearch/alerting/alerts/alert_mapping.json b/alerting/src/main/resources/org/opensearch/alerting/alerts/alert_mapping.json index abb377b6c..fcb1d1c94 100644 --- a/alerting/src/main/resources/org/opensearch/alerting/alerts/alert_mapping.json +++ b/alerting/src/main/resources/org/opensearch/alerting/alerts/alert_mapping.json @@ -4,7 +4,7 @@ "required": true }, "_meta" : { - "schema_version": 3 + "schema_version": 4 }, "properties": { "schema_version": { From 4f17881a5ac2f0bc049db3198f34c238f7b97458 Mon Sep 17 00:00:00 2001 From: Ashish Agrawal Date: Wed, 20 Apr 2022 15:34:03 -0700 Subject: [PATCH 07/11] fix test Signed-off-by: Ashish Agrawal --- .../kotlin/org/opensearch/alerting/alerts/AlertIndicesIT.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/alerts/AlertIndicesIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/alerts/AlertIndicesIT.kt index dcf229fe4..a37d0d8f2 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/alerts/AlertIndicesIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/alerts/AlertIndicesIT.kt @@ -52,7 +52,7 @@ class AlertIndicesIT : AlertingRestTestCase() { putAlertMappings( AlertIndices.alertMapping().trimStart('{').trimEnd('}') - .replace("\"schema_version\": 3", "\"schema_version\": 0") + .replace("\"schema_version\": 4", "\"schema_version\": 0") ) assertIndexExists(AlertIndices.ALERT_INDEX) assertIndexExists(AlertIndices.ALERT_HISTORY_WRITE_INDEX) From 91f03583cc8664ec574314956176b975bb44df13 Mon Sep 17 00:00:00 2001 From: Ashish Agrawal Date: Wed, 20 Apr 2022 15:43:12 -0700 Subject: [PATCH 08/11] fix assert in test Signed-off-by: Ashish Agrawal --- .../kotlin/org/opensearch/alerting/alerts/AlertIndicesIT.kt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/alerts/AlertIndicesIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/alerts/AlertIndicesIT.kt index a37d0d8f2..8e4e821a7 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/alerts/AlertIndicesIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/alerts/AlertIndicesIT.kt @@ -63,8 +63,8 @@ class AlertIndicesIT : AlertingRestTestCase() { assertIndexExists(AlertIndices.ALERT_INDEX) assertIndexExists(AlertIndices.ALERT_HISTORY_WRITE_INDEX) verifyIndexSchemaVersion(ScheduledJob.SCHEDULED_JOBS_INDEX, 5) - verifyIndexSchemaVersion(AlertIndices.ALERT_INDEX, 3) - verifyIndexSchemaVersion(AlertIndices.ALERT_HISTORY_WRITE_INDEX, 3) + verifyIndexSchemaVersion(AlertIndices.ALERT_INDEX, 4) + verifyIndexSchemaVersion(AlertIndices.ALERT_HISTORY_WRITE_INDEX, 4) } fun `test update finding index mapping with new schema version`() { From c838f31c5d9c3a3568c30ae4ed32a6876cdbdef4 Mon Sep 17 00:00:00 2001 From: Ashish Agrawal Date: Wed, 20 Apr 2022 15:59:38 -0700 Subject: [PATCH 09/11] add null check for bad indices Signed-off-by: Ashish Agrawal --- .../opensearch/alerting/util/DocLevelMonitorQueries.kt | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt index 2070534cd..c507e1195 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt @@ -85,14 +85,14 @@ class DocLevelMonitorQueries(private val client: AdminClient, private val cluste indices?.forEach { indexName -> if (clusterState.routingTable.hasIndex(indexName)) { val indexMetadata = clusterState.metadata.index(indexName) - - if (indexMetadata.mapping() != null) { - log.info("Index name: $indexName") + if (indexMetadata.mapping()?.sourceAsMap?.get("properties") != null) { val properties = ( (indexMetadata.mapping()?.sourceAsMap?.get("properties")) as Map> ) - val updatedProperties = properties.entries.associate { "${it.key}_${indexName}_$monitorId" to it.value }.toMutableMap() + val updatedProperties = properties.entries.associate { + "${it.key}_${indexName}_$monitorId" to it.value + }.toMutableMap() val updateMappingRequest = PutMappingRequest(ScheduledJob.DOC_LEVEL_QUERIES_INDEX) updateMappingRequest.source(mapOf("properties" to updatedProperties)) From 3dfd72def149ebcc2ce0a4387511aa141802ce0e Mon Sep 17 00:00:00 2001 From: Ashish Agrawal Date: Wed, 20 Apr 2022 16:46:20 -0700 Subject: [PATCH 10/11] make alias tests more robust Signed-off-by: Ashish Agrawal --- .../alerting/DocumentMonitorRunnerIT.kt | 57 ++++++++++--------- 1 file changed, 31 insertions(+), 26 deletions(-) diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt index d6296995e..6809ec9fe 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt @@ -133,7 +133,7 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { fun `test document-level monitor when alias only has write index with 0 docs`() { // Monitor should execute, but create 0 findings. - val alias = createTestAlias(includeWriteIndex = false) + val alias = createTestAlias(includeWriteIndex = true) val indices = alias[alias.keys.first()]?.keys?.toList() as List val query = randomDocLevelQuery(tags = listOf()) val input = randomDocLevelMonitorInput(indices = indices, queries = listOf(query)) @@ -158,14 +158,13 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { @Suppress("UNCHECKED_CAST") val searchResult = (inputResults?.get("results") as List>).firstOrNull() @Suppress("UNCHECKED_CAST") - val findings = if (searchResult == null) listOf() - else searchResult?.stringMap("hits")?.getOrDefault("hits", listOf>>()) as List> + val findings = searchFindings() assertEquals(monitor.name, output["monitor_name"]) assertNull("Unexpected monitor execution failure: $errorMessage", errorMessage) - findings.forEach { - val findingQueryId = it["queryId"] - assertNotEquals("No findings should exist with queryId ${query.id}, but found: $it", query.id, findingQueryId) + findings.findings.forEach { + val queryIds = it.finding.docLevelQueries.map { query -> query.id } + assertFalse("No findings should exist with queryId ${query.id}, but found: $it", queryIds.contains(query.id)) } } @@ -197,20 +196,22 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { @Suppress("UNCHECKED_CAST") val searchResult = (inputResults?.get("results") as List>).firstOrNull() @Suppress("UNCHECKED_CAST") - val findings = if (searchResult == null) listOf() - else searchResult?.stringMap("hits")?.getOrDefault("hits", listOf>>()) as List> + val findings = searchFindings() assertEquals(monitor.name, output["monitor_name"]) assertNull("Unexpected monitor execution failure: $errorMessage", errorMessage) - findings.forEach { - val findingDocId = it["id"] as String - assertFalse("Findings index should not contain a pre-existing doc, but found $it", preExistingDocIds.contains(findingDocId)) + findings.findings.forEach { + val docIds = it.finding.relatedDocIds + assertTrue( + "Findings index should not contain a pre-existing doc, but found $it", + preExistingDocIds.intersect(docIds).isEmpty() + ) } } fun `test document-level monitor when alias indices only contain docs that match query`() { // Only new docs should create findings. - val alias = createTestAlias(includeWriteIndex = false) + val alias = createTestAlias(includeWriteIndex = true) val indices = alias[alias.keys.first()]?.keys?.toList() as List val query = randomDocLevelQuery(tags = listOf()) val input = randomDocLevelMonitorInput(indices = indices, queries = listOf(query)) @@ -247,21 +248,23 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { @Suppress("UNCHECKED_CAST") val searchResult = (inputResults?.get("results") as List>).firstOrNull() @Suppress("UNCHECKED_CAST") - val findings = if (searchResult == null) listOf() - else searchResult?.stringMap("hits")?.getOrDefault("hits", listOf>>()) as List> + val findings = searchFindings() assertEquals(monitor.name, output["monitor_name"]) assertNull("Unexpected monitor execution failure: $errorMessage", errorMessage) - findings.forEach { - val findingDocId = it["id"] as String - assertFalse("Findings index should not contain a pre-existing doc, but found $it", preExistingDocIds.contains(findingDocId)) - assertTrue("Found an unexpected finding $it", newDocIds.contains(findingDocId)) + findings.findings.forEach { + val docIds = it.finding.relatedDocIds + assertTrue( + "Findings index should not contain a pre-existing doc, but found $it", + preExistingDocIds.intersect(docIds).isEmpty() + ) + assertTrue("Found an unexpected finding $it", newDocIds.intersect(docIds).isNotEmpty()) } } fun `test document-level monitor when alias indices contain docs that do and do not match query`() { // Only matching docs should create findings. - val alias = createTestAlias(includeWriteIndex = false) + val alias = createTestAlias(includeWriteIndex = true) val indices = alias[alias.keys.first()]?.keys?.toList() as List val query = randomDocLevelQuery(tags = listOf()) val input = randomDocLevelMonitorInput(indices = indices, queries = listOf(query)) @@ -305,16 +308,18 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { @Suppress("UNCHECKED_CAST") val searchResult = (inputResults?.get("results") as List>).firstOrNull() @Suppress("UNCHECKED_CAST") - val findings = if (searchResult == null) listOf() - else searchResult?.stringMap("hits")?.getOrDefault("hits", listOf>>()) as List> + val findings = searchFindings() assertEquals(monitor.name, output["monitor_name"]) assertNull("Unexpected monitor execution failure: $errorMessage", errorMessage) - findings.forEach { - val findingDocId = it["id"] as String - assertFalse("Findings index should not contain a pre-existing doc, but found $it", preExistingDocIds.contains(findingDocId)) - assertFalse("Found doc that doesn't match query: $it", nonMatchingDocIds.contains(findingDocId)) - assertTrue("Found an unexpected finding $it", matchingDocIds.contains(findingDocId)) + findings.findings.forEach { + val docIds = it.finding.relatedDocIds + assertTrue( + "Findings index should not contain a pre-existing doc, but found $it", + preExistingDocIds.intersect(docIds).isEmpty() + ) + assertTrue("Found doc that doesn't match query: $it", nonMatchingDocIds.intersect(docIds).isEmpty()) + assertFalse("Found an unexpected finding $it", matchingDocIds.intersect(docIds).isNotEmpty()) } } From 390ab08eb292b55e785e44217e6e9502efab5dfb Mon Sep 17 00:00:00 2001 From: Ashish Agrawal Date: Wed, 20 Apr 2022 16:47:39 -0700 Subject: [PATCH 11/11] minor refactor Signed-off-by: Ashish Agrawal --- .../org/opensearch/alerting/util/DocLevelMonitorQueries.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt index c507e1195..7ee9c97b5 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt @@ -79,7 +79,7 @@ class DocLevelMonitorQueries(private val client: AdminClient, private val cluste override fun onResponse(getAliasesResponse: GetAliasesResponse?) { val aliasIndices = getAliasesResponse?.aliases?.keys()?.map { it.value } val isAlias = aliasIndices != null && aliasIndices.isNotEmpty() - val indices = if (isAlias) getAliasesResponse?.aliases?.keys()?.map { it.value } else listOf(index) + val indices = if (isAlias) aliasIndices else listOf(index) val indexRequests = mutableListOf() log.info("indices: $indices") indices?.forEach { indexName ->