diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentReturningMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentReturningMonitorRunner.kt index e430d8f67..2287854ab 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentReturningMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentReturningMonitorRunner.kt @@ -6,6 +6,7 @@ package org.opensearch.alerting import org.apache.logging.log4j.LogManager +import org.opensearch.action.admin.indices.alias.get.GetAliasesRequest import org.opensearch.action.index.IndexRequest import org.opensearch.action.search.SearchAction import org.opensearch.action.search.SearchRequest @@ -57,17 +58,16 @@ object DocumentReturningMonitorRunner : MonitorRunner() { periodEnd: Instant, dryrun: Boolean ): MonitorRunResult { - logger.info("Document-level-monitor is running ...") + logger.debug("Document-level-monitor is running ...") var monitorResult = MonitorRunResult(monitor.name, periodStart, periodEnd) - // TODO: is this needed from Charlie? try { monitorCtx.alertIndices!!.createOrUpdateAlertIndex() monitorCtx.alertIndices!!.createOrUpdateInitialAlertHistoryIndex() monitorCtx.alertIndices!!.createOrUpdateInitialFindingHistoryIndex() } catch (e: Exception) { val id = if (monitor.id.trim().isEmpty()) "_na_" else monitor.id - logger.error("Error loading alerts for monitor: $id", e) + logger.error("Error setting up alerts and findings indices for monitor: $id", e) return monitorResult.copy(error = e) } @@ -83,62 +83,87 @@ object DocumentReturningMonitorRunner : MonitorRunner() { val queries: List = docLevelMonitorInput.queries val isTempMonitor = dryrun || monitor.id == Monitor.NO_ID - var lastRunContext = monitor.lastRunContext.toMutableMap() - try { - if (lastRunContext.isNullOrEmpty()) { - lastRunContext = createRunContext(monitorCtx.clusterService!!, monitorCtx.client!!, index).toMutableMap() - } - } catch (e: Exception) { - logger.info("Failed to start Document-level-monitor $index. Error: ${e.message}") - return monitorResult.copy(error = e) - } - - val count: Int = lastRunContext["shards_count"] as Int + val lastRunContext = if (monitor.lastRunContext.isNullOrEmpty()) mutableMapOf() + else monitor.lastRunContext.toMutableMap() as MutableMap> val updatedLastRunContext = lastRunContext.toMutableMap() - for (i: Int in 0 until count) { - val shard = i.toString() - val maxSeqNo: Long = getMaxSeqNo(monitorCtx.client!!, index, shard) - updatedLastRunContext[shard] = maxSeqNo - - // update lastRunContext if its a temp monitor as we only want to view the last bit of data then - // TODO: If dryrun, we should make it so we limit the search as this could still potentially give us lots of data - if (isTempMonitor) { - lastRunContext[shard] = max(-1, maxSeqNo - 1) - } - } val queryToDocIds = mutableMapOf>() val docsToQueries = mutableMapOf>() - val docExecutionContext = DocumentExecutionContext(queries, lastRunContext, updatedLastRunContext) val idQueryMap = mutableMapOf() - val matchingDocs = getMatchingDocs(monitor, monitorCtx, docExecutionContext, index, dryrun) + try { + val getAliasesRequest = GetAliasesRequest(index) + val getAliasesResponse = monitorCtx.client!!.admin().indices().getAliases(getAliasesRequest).actionGet() + val aliasIndices = getAliasesResponse.aliases.keys().map { it.value } - if (matchingDocs.isNotEmpty()) { - val matchedQueriesForDocs = getMatchedQueries(monitorCtx, matchingDocs.map { it.second }, monitor) + val isAlias = aliasIndices.isNotEmpty() + logger.debug("index, $index, is an alias index: $isAlias") - matchedQueriesForDocs.forEach { hit -> - val (id, query) = Pair( - hit.id.replace("_${monitor.id}", ""), - ((hit.sourceAsMap["query"] as HashMap<*, *>)["query_string"] as HashMap<*, *>)["query"] - ) - val docLevelQuery = DocLevelQuery(id, id, query.toString()) - - val docIndices = hit.field("_percolator_document_slot").values.map { it.toString().toInt() } - docIndices.forEach { idx -> - if (queryToDocIds.containsKey(docLevelQuery)) { - queryToDocIds[docLevelQuery]?.add(matchingDocs[idx].first) - } else { - queryToDocIds[docLevelQuery] = mutableSetOf(matchingDocs[idx].first) + // If the input index is an alias, creating a list of all indices associated with that alias; + // else creating a list containing the single index input + val indices = if (isAlias) getAliasesResponse.aliases.keys().map { it.value } else listOf(index) + + indices.forEach { indexName -> + // Prepare lastRunContext for each index + val indexLastRunContext = lastRunContext.getOrPut(indexName) { + createRunContext(monitorCtx.clusterService!!, monitorCtx.client!!, indexName) + } + + // Prepare updatedLastRunContext for each index + val indexUpdatedRunContext = updateLastRunContext( + indexLastRunContext.toMutableMap(), + monitorCtx, + indexName + ) as MutableMap + updatedLastRunContext[indexName] = indexUpdatedRunContext + + val count: Int = indexLastRunContext["shards_count"] as Int + for (i: Int in 0 until count) { + val shard = i.toString() + + // update lastRunContext if its a temp monitor as we only want to view the last bit of data then + // TODO: If dryrun, we should make it so we limit the search as this could still potentially give us lots of data + if (isTempMonitor) { + indexLastRunContext[shard] = max(-1, (indexUpdatedRunContext[shard] as String).toInt() - 1) } + } - if (docsToQueries.containsKey(matchingDocs[idx].first)) { - docsToQueries[matchingDocs[idx].first]?.add(id) - } else { - docsToQueries[matchingDocs[idx].first] = mutableListOf(id) + // Prepare DocumentExecutionContext for each index + val docExecutionContext = DocumentExecutionContext(queries, indexLastRunContext, indexUpdatedRunContext) + + val matchingDocs = getMatchingDocs(monitor, monitorCtx, docExecutionContext, indexName) + + if (matchingDocs.isNotEmpty()) { + val matchedQueriesForDocs = getMatchedQueries(monitorCtx, matchingDocs.map { it.second }, monitor, indexName) + + matchedQueriesForDocs.forEach { hit -> + val (id, query) = Pair( + hit.id.replace("_${indexName}_${monitor.id}", ""), + ((hit.sourceAsMap["query"] as HashMap<*, *>)["query_string"] as HashMap<*, *>)["query"] + ) + val docLevelQuery = DocLevelQuery(id, id, query.toString()) + + val docIndices = hit.field("_percolator_document_slot").values.map { it.toString().toInt() } + docIndices.forEach { idx -> + val docIndex = "${matchingDocs[idx].first}|$indexName" + if (queryToDocIds.containsKey(docLevelQuery)) { + queryToDocIds[docLevelQuery]?.add(docIndex) + } else { + queryToDocIds[docLevelQuery] = mutableSetOf(docIndex) + } + + if (docsToQueries.containsKey(docIndex)) { + docsToQueries[docIndex]?.add(id) + } else { + docsToQueries[docIndex] = mutableListOf(id) + } + } } } } + } catch (e: Exception) { + logger.error("Failed to start Document-level-monitor $index. Error: ${e.message}", e) + return monitorResult.copy(error = e) } val queryInputResults = queryToDocIds.mapKeys { it.key.id } @@ -193,9 +218,6 @@ object DocumentReturningMonitorRunner : MonitorRunner() { logger.info("trigger results") logger.info(triggerResult.triggeredDocs.toString()) - val index = (monitor.inputs[0] as DocLevelMonitorInput).indices[0] - - // TODO: modify findings such that there is a finding per document val findings = mutableListOf() val findingDocPairs = mutableListOf>() @@ -203,7 +225,7 @@ object DocumentReturningMonitorRunner : MonitorRunner() { if (!dryrun && monitor.id != Monitor.NO_ID) { docsToQueries.forEach { val triggeredQueries = it.value.map { queryId -> idQueryMap[queryId]!! } - val findingId = createFindings(monitor, monitorCtx, index, triggeredQueries, listOf(it.key)) + val findingId = createFindings(monitor, monitorCtx, triggeredQueries, it.key) findings.add(findingId) if (triggerResult.triggeredDocs.contains(it.key)) { @@ -244,25 +266,25 @@ object DocumentReturningMonitorRunner : MonitorRunner() { private fun createFindings( monitor: Monitor, monitorCtx: MonitorRunnerExecutionContext, - index: String, docLevelQueries: List, - matchingDocIds: List + matchingDocId: String ): String { + // Before the "|" is the doc id and after the "|" is the index + val docIndex = matchingDocId.split("|") + val finding = Finding( id = UUID.randomUUID().toString(), - relatedDocIds = matchingDocIds, + relatedDocIds = listOf(docIndex[0]), monitorId = monitor.id, monitorName = monitor.name, - index = index, + index = docIndex[1], docLevelQueries = docLevelQueries, timestamp = Instant.now() ) val findingStr = finding.toXContent(XContentBuilder.builder(XContentType.JSON.xContent()), ToXContent.EMPTY_PARAMS).string() - // change this to debug. - logger.info("Findings: $findingStr") + logger.debug("Findings: $findingStr") - // todo: below is all hardcoded, temp code and added only to test. replace this with proper Findings index lifecycle management. val indexRequest = IndexRequest(FINDING_HISTORY_WRITE_INDEX) .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) .source(findingStr, XContentType.JSON) @@ -273,6 +295,21 @@ object DocumentReturningMonitorRunner : MonitorRunner() { return finding.id } + private fun updateLastRunContext( + lastRunContext: Map, + monitorCtx: MonitorRunnerExecutionContext, + index: String + ): Map { + val count: Int = getShardsCount(monitorCtx.clusterService!!, index) + val updatedLastRunContext = lastRunContext.toMutableMap() + for (i: Int in 0 until count) { + val shard = i.toString() + val maxSeqNo: Long = getMaxSeqNo(monitorCtx.client!!, index, shard) + updatedLastRunContext[shard] = maxSeqNo.toString() + } + return updatedLastRunContext + } + private fun validate(monitor: Monitor) { if (monitor.inputs.size > 1) { throw IOException("Only one input is supported with document-level-monitor.") @@ -337,28 +374,16 @@ object DocumentReturningMonitorRunner : MonitorRunner() { monitor: Monitor, monitorCtx: MonitorRunnerExecutionContext, docExecutionCtx: DocumentExecutionContext, - index: String, - dryrun: Boolean + index: String ): List> { - val count: Int = docExecutionCtx.lastRunContext["shards_count"] as Int + val count: Int = docExecutionCtx.updatedLastRunContext["shards_count"] as Int val matchingDocs = mutableListOf>() for (i: Int in 0 until count) { val shard = i.toString() try { logger.info("Monitor execution for shard: $shard") - val maxSeqNo: Long = docExecutionCtx.updatedLastRunContext[shard].toString().toLong() - logger.info("MaxSeqNo of shard_$shard is $maxSeqNo") - - // If dryrun, set the previous sequence number as 1 less than the max sequence number or 0 - val prevSeqNo = if (dryrun || monitor.id == Monitor.NO_ID) - max(-1, maxSeqNo - 1) - else docExecutionCtx.lastRunContext[shard].toString().toLongOrNull() - - if (dryrun) { - logger.info("it is a dryrun") - } - + val prevSeqNo = docExecutionCtx.lastRunContext[shard].toString().toLongOrNull() logger.info("prevSeq: $prevSeqNo, maxSeq: $maxSeqNo") val hits: SearchHits = searchShard( @@ -372,7 +397,7 @@ object DocumentReturningMonitorRunner : MonitorRunner() { logger.info("Search hits for shard_$shard is: ${hits.hits.size}") if (hits.hits.isNotEmpty()) { - matchingDocs.addAll(getAllDocs(hits, monitor.id)) + matchingDocs.addAll(getAllDocs(hits, index, monitor.id)) } } catch (e: Exception) { logger.info("Failed to run for shard $shard. Error: ${e.message}") @@ -419,9 +444,10 @@ object DocumentReturningMonitorRunner : MonitorRunner() { private fun getMatchedQueries( monitorCtx: MonitorRunnerExecutionContext, docs: List, - monitor: Monitor + monitor: Monitor, + index: String ): SearchHits { - val boolQueryBuilder = BoolQueryBuilder() + val boolQueryBuilder = BoolQueryBuilder().filter(QueryBuilders.matchQuery("index", index)) val percolateQueryBuilder = PercolateQueryBuilderExt("query", docs, XContentType.JSON) if (monitor.id.isNotEmpty()) { @@ -442,13 +468,13 @@ object DocumentReturningMonitorRunner : MonitorRunner() { return response.hits } - private fun getAllDocs(hits: SearchHits, monitorId: String): List> { + private fun getAllDocs(hits: SearchHits, index: String, monitorId: String): List> { return hits.map { hit -> val sourceMap = hit.sourceAsMap var xContentBuilder = XContentFactory.jsonBuilder().startObject() sourceMap.forEach { (k, v) -> - xContentBuilder = xContentBuilder.field("${k}_$monitorId", v) + xContentBuilder = xContentBuilder.field("${k}_${index}_$monitorId", v) } xContentBuilder = xContentBuilder.endObject() diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/model/Monitor.kt b/alerting/src/main/kotlin/org/opensearch/alerting/model/Monitor.kt index 787e5d778..cc11e92f8 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/model/Monitor.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/model/Monitor.kt @@ -197,9 +197,11 @@ data class Monitor( // Outputting type with each Trigger so that the generic Trigger.readFrom() can read it out.writeVInt(triggers.size) triggers.forEach { - if (it is QueryLevelTrigger) out.writeEnum(Trigger.Type.QUERY_LEVEL_TRIGGER) - else if (it is DocumentLevelTrigger) out.writeEnum(Trigger.Type.DOCUMENT_LEVEL_TRIGGER) - else out.writeEnum(Trigger.Type.BUCKET_LEVEL_TRIGGER) + when (it) { + is BucketLevelTrigger -> out.writeEnum(Trigger.Type.BUCKET_LEVEL_TRIGGER) + is DocumentLevelTrigger -> out.writeEnum(Trigger.Type.DOCUMENT_LEVEL_TRIGGER) + else -> out.writeEnum(Trigger.Type.QUERY_LEVEL_TRIGGER) + } it.writeTo(out) } out.writeMap(lastRunContext) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestGetFindingsAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestGetFindingsAction.kt index e71412a2b..69b65d142 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestGetFindingsAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestGetFindingsAction.kt @@ -39,7 +39,7 @@ class RestGetFindingsAction : BaseRestHandler() { log.info("${request.method()} ${request.path()}") val findingID: String? = request.param("findingId") - val sortString = request.param("sortString", "id.keyword") + val sortString = request.param("sortString", "id") val sortOrder = request.param("sortOrder", "asc") val missing: String? = request.param("missing") val size = request.paramAsInt("size", 20) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportExecuteMonitorAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportExecuteMonitorAction.kt index d5a52495f..830541cbb 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportExecuteMonitorAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportExecuteMonitorAction.kt @@ -42,7 +42,7 @@ import org.opensearch.tasks.Task import org.opensearch.transport.TransportService import java.time.Instant -private val log = LogManager.getLogger(TransportGetMonitorAction::class.java) +private val log = LogManager.getLogger(TransportExecuteMonitorAction::class.java) class TransportExecuteMonitorAction @Inject constructor( transportService: TransportService, diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetFindingsAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetFindingsAction.kt index f91d0efd4..a67644171 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetFindingsAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetFindingsAction.kt @@ -6,6 +6,7 @@ package org.opensearch.alerting.transport import org.apache.logging.log4j.LogManager +import org.apache.lucene.search.join.ScoreMode import org.opensearch.action.ActionListener import org.opensearch.action.get.MultiGetRequest import org.opensearch.action.search.SearchRequest @@ -90,12 +91,23 @@ class TransportGetFindingsSearchAction @Inject constructor( if (!tableProp.searchString.isNullOrBlank()) { queryBuilder - .must( + .should( QueryBuilders .queryStringQuery(tableProp.searchString) - .defaultOperator(Operator.AND) - .field("queries.tags") - .field("queries.name") + ) + .should( + QueryBuilders.nestedQuery( + "queries", + QueryBuilders.boolQuery() + .must( + QueryBuilders + .queryStringQuery(tableProp.searchString) + .defaultOperator(Operator.AND) + .field("queries.tags") + .field("queries.name") + ), + ScoreMode.Avg + ) ) } @@ -131,7 +143,7 @@ class TransportGetFindingsSearchAction @Inject constructor( mgetRequest.add(MultiGetRequest.Item(finding.index, docId)) } } - val documents = searchDocument(mgetRequest) + val documents = if (mgetRequest.items.isEmpty()) mutableMapOf() else searchDocument(mgetRequest) findings.forEach { val documentIds = it.relatedDocIds val relatedDocs = mutableListOf() diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt index 246ef3439..6157cb580 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt @@ -9,6 +9,7 @@ import org.apache.logging.log4j.LogManager import org.opensearch.OpenSearchSecurityException import org.opensearch.OpenSearchStatusException import org.opensearch.action.ActionListener +import org.opensearch.action.admin.indices.alias.get.GetAliasesRequest import org.opensearch.action.admin.indices.create.CreateIndexResponse import org.opensearch.action.bulk.BulkResponse import org.opensearch.action.get.GetRequest @@ -381,7 +382,8 @@ class TransportIndexMonitorAction @Inject constructor( private fun indexMonitor() { if (request.monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR) { val monitorIndex = (request.monitor.inputs[0] as DocLevelMonitorInput).indices[0] - val lastRunContext = DocumentReturningMonitorRunner.createRunContext(clusterService, client, monitorIndex).toMutableMap() + val lastRunContext = createFullRunContext(monitorIndex) + log.info("index last run context: $lastRunContext") request.monitor = request.monitor.copy(lastRunContext = lastRunContext) } request.monitor = request.monitor.copy(schemaVersion = IndexUtils.scheduledJobIndexSchemaVersion) @@ -511,7 +513,7 @@ class TransportIndexMonitorAction @Inject constructor( request.monitor.lastRunContext.toMutableMap().isNullOrEmpty() ) { val monitorIndex = (request.monitor.inputs[0] as DocLevelMonitorInput).indices[0] - val lastRunContext = DocumentReturningMonitorRunner.createRunContext(clusterService, client, monitorIndex).toMutableMap() + val lastRunContext = createFullRunContext(monitorIndex) request.monitor = request.monitor.copy(lastRunContext = lastRunContext) } @@ -570,6 +572,19 @@ class TransportIndexMonitorAction @Inject constructor( ) } + private fun createFullRunContext(index: String): MutableMap> { + val getAliasesRequest = GetAliasesRequest(index) + val getAliasesResponse = client.admin().indices().getAliases(getAliasesRequest).actionGet() + val aliasIndices = getAliasesResponse.aliases.keys().map { it.value } + val isAlias = aliasIndices.isNotEmpty() + val indices = if (isAlias) getAliasesResponse.aliases.keys().map { it.value } else listOf(index) + val lastRunContext = mutableMapOf>() + indices.forEach { indexName -> + lastRunContext[indexName] = DocumentReturningMonitorRunner.createRunContext(clusterService, client, indexName) + } + return lastRunContext + } + private fun checkShardsFailure(response: IndexResponse): String? { val failureReasons = StringBuilder() if (response.shardInfo.failed > 0) { diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt index badb76370..7ee9c97b5 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt @@ -7,6 +7,8 @@ package org.opensearch.alerting.util import org.apache.logging.log4j.LogManager import org.opensearch.action.ActionListener +import org.opensearch.action.admin.indices.alias.get.GetAliasesRequest +import org.opensearch.action.admin.indices.alias.get.GetAliasesResponse import org.opensearch.action.admin.indices.create.CreateIndexRequest import org.opensearch.action.admin.indices.create.CreateIndexResponse import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest @@ -69,47 +71,83 @@ class DocLevelMonitorQueries(private val client: AdminClient, private val cluste val queries: List = docLevelMonitorInput.queries val clusterState = clusterService.state() - if (clusterState.routingTable.hasIndex(index)) { - val indexMetadata = clusterState.metadata.index(index) - if (indexMetadata.mapping() != null) { - val properties = ((indexMetadata.mapping()?.sourceAsMap?.get("properties")) as Map>) - val updatedProperties = properties.entries.associate { "${it.key}_$monitorId" to it.value }.toMutableMap() - - val updateMappingRequest = PutMappingRequest(ScheduledJob.DOC_LEVEL_QUERIES_INDEX) - updateMappingRequest.source(mapOf("properties" to updatedProperties)) - - queryClient.admin().indices().putMapping( - updateMappingRequest, - object : ActionListener { - override fun onResponse(response: AcknowledgedResponse) { - log.info("Percolation index ${ScheduledJob.DOC_LEVEL_QUERIES_INDEX} updated with new mappings") - - val request = BulkRequest().setRefreshPolicy(refreshPolicy).timeout(indexTimeout) - - queries.forEach { - var query = it.query - - properties.forEach { prop -> - query = query.replace("${prop.key}:", "${prop.key}_$monitorId:") - } - val indexRequest = IndexRequest(ScheduledJob.DOC_LEVEL_QUERIES_INDEX) - .id(it.id + "_$monitorId") - .source(mapOf("query" to mapOf("query_string" to mapOf("query" to query)), "monitor_id" to monitorId)) - request.add(indexRequest) + val getAliasesRequest = GetAliasesRequest(index) + queryClient.admin().indices().getAliases( + getAliasesRequest, + object : ActionListener { + override fun onResponse(getAliasesResponse: GetAliasesResponse?) { + val aliasIndices = getAliasesResponse?.aliases?.keys()?.map { it.value } + val isAlias = aliasIndices != null && aliasIndices.isNotEmpty() + val indices = if (isAlias) aliasIndices else listOf(index) + val indexRequests = mutableListOf() + log.info("indices: $indices") + indices?.forEach { indexName -> + if (clusterState.routingTable.hasIndex(indexName)) { + val indexMetadata = clusterState.metadata.index(indexName) + if (indexMetadata.mapping()?.sourceAsMap?.get("properties") != null) { + val properties = ( + (indexMetadata.mapping()?.sourceAsMap?.get("properties")) + as Map> + ) + val updatedProperties = properties.entries.associate { + "${it.key}_${indexName}_$monitorId" to it.value + }.toMutableMap() + + val updateMappingRequest = PutMappingRequest(ScheduledJob.DOC_LEVEL_QUERIES_INDEX) + updateMappingRequest.source(mapOf("properties" to updatedProperties)) + + queryClient.admin().indices().putMapping( + updateMappingRequest, + object : ActionListener { + override fun onResponse(response: AcknowledgedResponse) { + + queries.forEach { + var query = it.query + + properties.forEach { prop -> + query = query.replace("${prop.key}:", "${prop.key}_${indexName}_$monitorId:") + } + val indexRequest = IndexRequest(ScheduledJob.DOC_LEVEL_QUERIES_INDEX) + .id(it.id + "_${indexName}_$monitorId") + .source( + mapOf( + "query" to mapOf("query_string" to mapOf("query" to query)), + "monitor_id" to monitorId, + "index" to indexName + ) + ) + indexRequests.add(indexRequest) + } + if (indexRequests.isNotEmpty()) { + queryClient.bulk( + BulkRequest().setRefreshPolicy(refreshPolicy).timeout(indexTimeout).add(indexRequests), + docLevelQueryIndexListener + ) + } + return + } + + override fun onFailure(e: Exception) { + log.error("This is a failure", e) + if (indexMonitorActionListener != null) { + indexMonitorActionListener.onFailure(AlertingException.wrap(e)) + } else executeMonitorActionListener?.onFailure(AlertingException.wrap(e)) + return + } + } + ) } - - queryClient.bulk(request, docLevelQueryIndexListener) - } - - override fun onFailure(e: Exception) { - if (indexMonitorActionListener != null) { - indexMonitorActionListener.onFailure(AlertingException.wrap(e)) - } else executeMonitorActionListener?.onFailure(AlertingException.wrap(e)) } } - ) + } + + override fun onFailure(e: Exception) { + if (indexMonitorActionListener != null) { + indexMonitorActionListener.onFailure(AlertingException.wrap(e)) + } else executeMonitorActionListener?.onFailure(AlertingException.wrap(e)) + } } - } + ) } } diff --git a/alerting/src/main/resources/org/opensearch/alerting/alerts/alert_mapping.json b/alerting/src/main/resources/org/opensearch/alerting/alerts/alert_mapping.json index abb377b6c..fcb1d1c94 100644 --- a/alerting/src/main/resources/org/opensearch/alerting/alerts/alert_mapping.json +++ b/alerting/src/main/resources/org/opensearch/alerting/alerts/alert_mapping.json @@ -4,7 +4,7 @@ "required": true }, "_meta" : { - "schema_version": 3 + "schema_version": 4 }, "properties": { "schema_version": { diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt b/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt index a5cd3738a..49d1718d7 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt @@ -18,6 +18,8 @@ import org.opensearch.alerting.AlertingPlugin.Companion.EMAIL_ACCOUNT_BASE_URI import org.opensearch.alerting.AlertingPlugin.Companion.EMAIL_GROUP_BASE_URI import org.opensearch.alerting.action.GetFindingsResponse import org.opensearch.alerting.alerts.AlertIndices +import org.opensearch.alerting.alerts.AlertIndices.Companion.FINDING_HISTORY_WRITE_INDEX +import org.opensearch.alerting.core.model.DocLevelMonitorInput import org.opensearch.alerting.core.model.DocLevelQuery import org.opensearch.alerting.core.model.ScheduledJob import org.opensearch.alerting.core.model.SearchInput @@ -88,6 +90,7 @@ abstract class AlertingRestTestCase : ODFERestTestCase() { mutableListOf( Monitor.XCONTENT_REGISTRY, SearchInput.XCONTENT_REGISTRY, + DocLevelMonitorInput.XCONTENT_REGISTRY, QueryLevelTrigger.XCONTENT_REGISTRY, BucketLevelTrigger.XCONTENT_REGISTRY, DocumentLevelTrigger.XCONTENT_REGISTRY @@ -572,7 +575,7 @@ abstract class AlertingRestTestCase : ODFERestTestCase() { val findingStr = finding.toXContent(XContentBuilder.builder(XContentType.JSON.xContent()), ToXContent.EMPTY_PARAMS).string() - indexDoc(".opensearch-alerting-findings", finding.id, findingStr) + indexDoc(FINDING_HISTORY_WRITE_INDEX, finding.id, findingStr) return finding.id } @@ -782,6 +785,60 @@ abstract class AlertingRestTestCase : ODFERestTestCase() { return index } + protected fun createTestAlias( + alias: String = randomAlphaOfLength(10).toLowerCase(Locale.ROOT), + numOfAliasIndices: Int = randomIntBetween(1, 10), + includeWriteIndex: Boolean = true + ): MutableMap> { + return createTestAlias(alias = alias, indices = randomAliasIndices(alias, numOfAliasIndices, includeWriteIndex)) + } + + protected fun createTestAlias( + alias: String = randomAlphaOfLength(10).toLowerCase(Locale.ROOT), + indices: Map = randomAliasIndices( + alias = alias, + num = randomIntBetween(1, 10), + includeWriteIndex = true + ) + ): MutableMap> { + logger.info("number of indices behind alias: ${indices.size}") + logger.info("the alias indices: $indices") + val indicesMap = mutableMapOf() + val indicesJson = jsonBuilder().startObject().startArray("actions") + indices.keys.map { + val indexName = createTestIndex(index = it.toLowerCase(Locale.ROOT), mapping = "") + val isWriteIndex = indices.getOrDefault(indexName, false) + indicesMap[indexName] = isWriteIndex + val indexMap = mapOf( + "add" to mapOf( + "index" to indexName, + "alias" to alias, + "is_write_index" to isWriteIndex + ) + ) + indicesJson.value(indexMap) + } + val requestBody = indicesJson.endArray().endObject().string() + client().makeRequest("POST", "/_aliases", emptyMap(), StringEntity(requestBody, APPLICATION_JSON)) + return mutableMapOf(alias to indicesMap) + } + + protected fun randomAliasIndices( + alias: String, + num: Int = randomIntBetween(1, 10), + includeWriteIndex: Boolean = true + ): Map { + val indices = mutableMapOf() + val writeIndex = randomIntBetween(0, num) + for (i: Int in 0 until num) { + var indexName = randomAlphaOfLength(10) + while (indexName.equals(alias) || indices.containsKey(indexName)) + indexName = randomAlphaOfLength(10) + indices[indexName] = includeWriteIndex && i == writeIndex + } + return indices + } + protected fun insertSampleTimeSerializedData(index: String, data: List) { data.forEachIndexed { i, value -> val twoMinsAgo = ZonedDateTime.now().minus(2, ChronoUnit.MINUTES).truncatedTo(ChronoUnit.MILLIS) diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt index bb401dd5d..6809ec9fe 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt @@ -7,6 +7,9 @@ package org.opensearch.alerting import org.opensearch.alerting.core.model.DocLevelMonitorInput import org.opensearch.alerting.core.model.DocLevelQuery +import org.opensearch.client.Response +import org.opensearch.client.ResponseException +import org.opensearch.script.Script import java.time.ZonedDateTime import java.time.format.DateTimeFormatter import java.time.temporal.ChronoUnit.MILLIS @@ -83,7 +86,7 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { @Suppress("UNCHECKED_CAST") val matchingDocsToQuery = searchResult[docQuery.id] as List assertEquals("Incorrect search result", 1, matchingDocsToQuery.size) - assertTrue("Incorrect search result", matchingDocsToQuery.contains("5")) + assertTrue("Incorrect search result", matchingDocsToQuery.contains("5|$testIndex")) } fun `test execute monitor generates alerts and findings`() { @@ -103,7 +106,6 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { val monitor = createMonitor(randomDocumentLevelMonitor(inputs = listOf(docReturningInput), triggers = listOf(trigger))) assertNotNull(monitor.id) - Thread.sleep(2000) indexDoc(testIndex, "1", testDoc) indexDoc(testIndex, "5", testDoc) @@ -117,7 +119,7 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { @Suppress("UNCHECKED_CAST") val matchingDocsToQuery = searchResult[docQuery.id] as List assertEquals("Incorrect search result", 2, matchingDocsToQuery.size) - assertTrue("Incorrect search result", matchingDocsToQuery.containsAll(listOf("1", "5"))) + assertTrue("Incorrect search result", matchingDocsToQuery.containsAll(listOf("1|$testIndex", "5|$testIndex"))) val alerts = searchAlertsWithFilter(monitor) assertEquals("Alert saved for test monitor", 2, alerts.size) @@ -129,6 +131,198 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { assertTrue("Findings saved for test monitor", findings[1].relatedDocIds.contains("5")) } + fun `test document-level monitor when alias only has write index with 0 docs`() { + // Monitor should execute, but create 0 findings. + val alias = createTestAlias(includeWriteIndex = true) + val indices = alias[alias.keys.first()]?.keys?.toList() as List + val query = randomDocLevelQuery(tags = listOf()) + val input = randomDocLevelMonitorInput(indices = indices, queries = listOf(query)) + val trigger = randomDocLevelTrigger(condition = Script("params${query.id}")) + val monitor = createMonitor(randomDocumentLevelMonitor(enabled = false, inputs = listOf(input), triggers = listOf(trigger))) + + val response: Response + try { + response = executeMonitor(monitor.id) + } catch (e: ResponseException) { + assertNotNull("Expected an error message: $e", e.message) + e.message?.let { + assertTrue("Unexpected exception: $e", it.contains("""reason":"no such index [.opensearch-alerting-findings]""")) + } + assertEquals(404, e.response.statusLine.statusCode) + return + } + + val output = entityAsMap(response) + val inputResults = output.stringMap("input_results") + val errorMessage = inputResults?.get("error") + @Suppress("UNCHECKED_CAST") + val searchResult = (inputResults?.get("results") as List>).firstOrNull() + @Suppress("UNCHECKED_CAST") + val findings = searchFindings() + + assertEquals(monitor.name, output["monitor_name"]) + assertNull("Unexpected monitor execution failure: $errorMessage", errorMessage) + findings.findings.forEach { + val queryIds = it.finding.docLevelQueries.map { query -> query.id } + assertFalse("No findings should exist with queryId ${query.id}, but found: $it", queryIds.contains(query.id)) + } + } + + fun `test document-level monitor when docs exist prior to monitor creation`() { + // FIXME: Consider renaming this test case + // Only new docs should create findings. + val alias = createTestAlias(includeWriteIndex = true) + val indices = alias[alias.keys.first()]?.keys?.toList() as List + val query = randomDocLevelQuery(tags = listOf()) + val input = randomDocLevelMonitorInput(indices = indices, queries = listOf(query)) + val trigger = randomDocLevelTrigger(condition = Script("params${query.id}")) + + val preExistingDocIds = mutableSetOf() + indices.forEach { index -> + val docId = index.hashCode().toString() + val doc = """{ "message" : "${query.query}" }""" + preExistingDocIds.add(docId) + indexDoc(index = index, id = docId, doc = doc) + } + assertEquals(indices.size, preExistingDocIds.size) + + val monitor = createMonitor(randomDocumentLevelMonitor(enabled = false, inputs = listOf(input), triggers = listOf(trigger))) + + val response = executeMonitor(monitor.id) + + val output = entityAsMap(response) + val inputResults = output.stringMap("input_results") + val errorMessage = inputResults?.get("error") + @Suppress("UNCHECKED_CAST") + val searchResult = (inputResults?.get("results") as List>).firstOrNull() + @Suppress("UNCHECKED_CAST") + val findings = searchFindings() + + assertEquals(monitor.name, output["monitor_name"]) + assertNull("Unexpected monitor execution failure: $errorMessage", errorMessage) + findings.findings.forEach { + val docIds = it.finding.relatedDocIds + assertTrue( + "Findings index should not contain a pre-existing doc, but found $it", + preExistingDocIds.intersect(docIds).isEmpty() + ) + } + } + + fun `test document-level monitor when alias indices only contain docs that match query`() { + // Only new docs should create findings. + val alias = createTestAlias(includeWriteIndex = true) + val indices = alias[alias.keys.first()]?.keys?.toList() as List + val query = randomDocLevelQuery(tags = listOf()) + val input = randomDocLevelMonitorInput(indices = indices, queries = listOf(query)) + val trigger = randomDocLevelTrigger(condition = Script("params${query.id}")) + + val preExistingDocIds = mutableSetOf() + indices.forEach { index -> + val docId = index.hashCode().toString() + val doc = """{ "message" : "${query.query}" }""" + preExistingDocIds.add(docId) + indexDoc(index = index, id = docId, doc = doc) + } + assertEquals(indices.size, preExistingDocIds.size) + + val monitor = createMonitor(randomDocumentLevelMonitor(enabled = false, inputs = listOf(input), triggers = listOf(trigger))) + executeMonitor(monitor.id) + + val newDocIds = mutableSetOf() + indices.forEach { index -> + (1..5).map { + val docId = "${index.hashCode()}$it" + val doc = """{ "message" : "${query.query}" }""" + newDocIds.add(docId) + indexDoc(index = index, id = docId, doc = doc) + } + } + assertEquals(indices.size * 5, newDocIds.size) + + val response = executeMonitor(monitor.id) + + val output = entityAsMap(response) + val inputResults = output.stringMap("input_results") + val errorMessage = inputResults?.get("error") + @Suppress("UNCHECKED_CAST") + val searchResult = (inputResults?.get("results") as List>).firstOrNull() + @Suppress("UNCHECKED_CAST") + val findings = searchFindings() + + assertEquals(monitor.name, output["monitor_name"]) + assertNull("Unexpected monitor execution failure: $errorMessage", errorMessage) + findings.findings.forEach { + val docIds = it.finding.relatedDocIds + assertTrue( + "Findings index should not contain a pre-existing doc, but found $it", + preExistingDocIds.intersect(docIds).isEmpty() + ) + assertTrue("Found an unexpected finding $it", newDocIds.intersect(docIds).isNotEmpty()) + } + } + + fun `test document-level monitor when alias indices contain docs that do and do not match query`() { + // Only matching docs should create findings. + val alias = createTestAlias(includeWriteIndex = true) + val indices = alias[alias.keys.first()]?.keys?.toList() as List + val query = randomDocLevelQuery(tags = listOf()) + val input = randomDocLevelMonitorInput(indices = indices, queries = listOf(query)) + val trigger = randomDocLevelTrigger(condition = Script("params${query.id}")) + + val preExistingDocIds = mutableSetOf() + indices.forEach { index -> + val docId = index.hashCode().toString() + val doc = """{ "message" : "${query.query}" }""" + preExistingDocIds.add(docId) + indexDoc(index = index, id = docId, doc = doc) + } + assertEquals(indices.size, preExistingDocIds.size) + + val monitor = createMonitor(randomDocumentLevelMonitor(enabled = false, inputs = listOf(input), triggers = listOf(trigger))) + executeMonitor(monitor.id) + + val matchingDocIds = mutableSetOf() + val nonMatchingDocIds = mutableSetOf() + indices.forEach { index -> + (1..5).map { + val matchingDocId = "${index.hashCode()}$it" + val matchingDoc = """{ "message" : "${query.query}" }""" + indexDoc(index = index, id = matchingDocId, doc = matchingDoc) + matchingDocIds.add(matchingDocId) + + val nonMatchingDocId = "${index.hashCode()}${it}2" + var nonMatchingDoc = StringBuilder(query.query).insert(2, "difference").toString() + nonMatchingDoc = """{ "message" : "$nonMatchingDoc" }""" + indexDoc(index = index, id = nonMatchingDocId, doc = nonMatchingDoc) + nonMatchingDocIds.add(nonMatchingDocId) + } + } + assertEquals(indices.size * 5, matchingDocIds.size) + + val response = executeMonitor(monitor.id) + + val output = entityAsMap(response) + val inputResults = output.stringMap("input_results") + val errorMessage = inputResults?.get("error") + @Suppress("UNCHECKED_CAST") + val searchResult = (inputResults?.get("results") as List>).firstOrNull() + @Suppress("UNCHECKED_CAST") + val findings = searchFindings() + + assertEquals(monitor.name, output["monitor_name"]) + assertNull("Unexpected monitor execution failure: $errorMessage", errorMessage) + findings.findings.forEach { + val docIds = it.finding.relatedDocIds + assertTrue( + "Findings index should not contain a pre-existing doc, but found $it", + preExistingDocIds.intersect(docIds).isEmpty() + ) + assertTrue("Found doc that doesn't match query: $it", nonMatchingDocIds.intersect(docIds).isEmpty()) + assertFalse("Found an unexpected finding $it", matchingDocIds.intersect(docIds).isNotEmpty()) + } + } + @Suppress("UNCHECKED_CAST") /** helper that returns a field in a json map whose values are all json objects */ private fun Map.objectMap(key: String): Map> { diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt b/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt index 9f4e8717e..d6d3191bb 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt @@ -645,6 +645,7 @@ fun xContentRegistry(): NamedXContentRegistry { return NamedXContentRegistry( listOf( SearchInput.XCONTENT_REGISTRY, + DocLevelMonitorInput.XCONTENT_REGISTRY, QueryLevelTrigger.XCONTENT_REGISTRY, BucketLevelTrigger.XCONTENT_REGISTRY, DocumentLevelTrigger.XCONTENT_REGISTRY diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/alerts/AlertIndicesIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/alerts/AlertIndicesIT.kt index dcf229fe4..8e4e821a7 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/alerts/AlertIndicesIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/alerts/AlertIndicesIT.kt @@ -52,7 +52,7 @@ class AlertIndicesIT : AlertingRestTestCase() { putAlertMappings( AlertIndices.alertMapping().trimStart('{').trimEnd('}') - .replace("\"schema_version\": 3", "\"schema_version\": 0") + .replace("\"schema_version\": 4", "\"schema_version\": 0") ) assertIndexExists(AlertIndices.ALERT_INDEX) assertIndexExists(AlertIndices.ALERT_HISTORY_WRITE_INDEX) @@ -63,8 +63,8 @@ class AlertIndicesIT : AlertingRestTestCase() { assertIndexExists(AlertIndices.ALERT_INDEX) assertIndexExists(AlertIndices.ALERT_HISTORY_WRITE_INDEX) verifyIndexSchemaVersion(ScheduledJob.SCHEDULED_JOBS_INDEX, 5) - verifyIndexSchemaVersion(AlertIndices.ALERT_INDEX, 3) - verifyIndexSchemaVersion(AlertIndices.ALERT_HISTORY_WRITE_INDEX, 3) + verifyIndexSchemaVersion(AlertIndices.ALERT_INDEX, 4) + verifyIndexSchemaVersion(AlertIndices.ALERT_HISTORY_WRITE_INDEX, 4) } fun `test update finding index mapping with new schema version`() { diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/FindingsRestApiIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/FindingsRestApiIT.kt index b48235330..494ed2615 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/FindingsRestApiIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/FindingsRestApiIT.kt @@ -5,8 +5,12 @@ package org.opensearch.alerting.resthandler +import org.opensearch.alerting.ALWAYS_RUN import org.opensearch.alerting.AlertingRestTestCase +import org.opensearch.alerting.core.model.DocLevelMonitorInput import org.opensearch.alerting.core.model.DocLevelQuery +import org.opensearch.alerting.randomDocumentLevelMonitor +import org.opensearch.alerting.randomDocumentLevelTrigger import org.opensearch.test.junit.annotations.TestLogging @TestLogging("level:DEBUG", reason = "Debug for tests.") @@ -14,7 +18,12 @@ import org.opensearch.test.junit.annotations.TestLogging class FindingsRestApiIT : AlertingRestTestCase() { fun `test find Finding where doc is not retrieved`() { - + val testIndex = createTestIndex() + val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3") + val docReturningInput = DocLevelMonitorInput("description", listOf(testIndex), listOf(docQuery)) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val trueMonitor = createMonitor(randomDocumentLevelMonitor(inputs = listOf(docReturningInput), triggers = listOf(trigger))) + executeMonitor(trueMonitor.id, mapOf(Pair("dryrun", "true"))) createFinding(matchingDocIds = listOf("someId")) val response = searchFindings() assertEquals(1, response.totalFindings) @@ -35,6 +44,12 @@ class FindingsRestApiIT : AlertingRestTestCase() { }""" indexDoc(testIndex, "someId2", testDoc2) + val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3") + val docReturningInput = DocLevelMonitorInput("description", listOf(testIndex), listOf(docQuery)) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val trueMonitor = createMonitor(randomDocumentLevelMonitor(inputs = listOf(docReturningInput), triggers = listOf(trigger))) + executeMonitor(trueMonitor.id, mapOf(Pair("dryrun", "true"))) + val findingWith1 = createFinding(matchingDocIds = listOf("someId"), index = testIndex) val findingWith2 = createFinding(matchingDocIds = listOf("someId", "someId2"), index = testIndex) val response = searchFindings() @@ -69,6 +84,12 @@ class FindingsRestApiIT : AlertingRestTestCase() { }""" indexDoc(testIndex, "someId2", testDoc2) + val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3") + val docReturningInput = DocLevelMonitorInput("description", listOf(testIndex), listOf(docQuery)) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val trueMonitor = createMonitor(randomDocumentLevelMonitor(inputs = listOf(docReturningInput), triggers = listOf(trigger))) + executeMonitor(trueMonitor.id, mapOf(Pair("dryrun", "true"))) + createFinding(matchingDocIds = listOf("someId"), index = testIndex) val findingId = createFinding(matchingDocIds = listOf("someId", "someId2"), index = testIndex) val response = searchFindings(mapOf(Pair("findingId", findingId))) @@ -95,6 +116,11 @@ class FindingsRestApiIT : AlertingRestTestCase() { indexDoc(testIndex, "someId2", testDoc2) val docLevelQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "realQuery", tags = listOf("sigma")) + val docReturningInput = DocLevelMonitorInput("description", listOf(testIndex), listOf(docLevelQuery)) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val trueMonitor = createMonitor(randomDocumentLevelMonitor(inputs = listOf(docReturningInput), triggers = listOf(trigger))) + executeMonitor(trueMonitor.id, mapOf(Pair("dryrun", "true"))) + createFinding(matchingDocIds = listOf("someId"), index = testIndex) val findingId = createFinding( matchingDocIds = listOf("someId", "someId2"), @@ -125,6 +151,11 @@ class FindingsRestApiIT : AlertingRestTestCase() { indexDoc(testIndex, "someId2", testDoc2) val docLevelQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "realQuery", tags = listOf("sigma")) + val docReturningInput = DocLevelMonitorInput("description", listOf(testIndex), listOf(docLevelQuery)) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val trueMonitor = createMonitor(randomDocumentLevelMonitor(inputs = listOf(docReturningInput), triggers = listOf(trigger))) + executeMonitor(trueMonitor.id, mapOf(Pair("dryrun", "true"))) + createFinding(matchingDocIds = listOf("someId"), index = testIndex) val findingId = createFinding( matchingDocIds = listOf("someId", "someId2"), @@ -140,4 +171,40 @@ class FindingsRestApiIT : AlertingRestTestCase() { assertEquals(testDoc, response.findings[0].documents[0].document) assertEquals(testDoc2, response.findings[0].documents[1].document) } + + fun `test find Finding by monitor id`() { + val testIndex = createTestIndex() + val testDoc = """{ + "message" : "This is an error from IAD region", + "test_field" : "us-west-2" + }""" + indexDoc(testIndex, "someId", testDoc) + val testDoc2 = """{ + "message" : "This is an error2 from IAD region", + "test_field" : "us-west-3" + }""" + indexDoc(testIndex, "someId2", testDoc2) + + val docLevelQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "realQuery", tags = listOf("sigma")) + val docReturningInput = DocLevelMonitorInput("description", listOf(testIndex), listOf(docLevelQuery)) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val trueMonitor = createMonitor(randomDocumentLevelMonitor(inputs = listOf(docReturningInput), triggers = listOf(trigger))) + executeMonitor(trueMonitor.id, mapOf(Pair("dryrun", "true"))) + + createFinding(matchingDocIds = listOf("someId"), index = testIndex) + val findingId = createFinding( + monitorId = "monitorToFind", + matchingDocIds = listOf("someId", "someId2"), + index = testIndex, + docLevelQueries = listOf(docLevelQuery) + ) + val response = searchFindings(mapOf(Pair("searchString", "monitorToFind"))) + assertEquals(1, response.totalFindings) + assertEquals(findingId, response.findings[0].finding.id) + assertEquals(2, response.findings[0].documents.size) + assertTrue(response.findings[0].documents[0].found) + assertTrue(response.findings[0].documents[1].found) + assertEquals(testDoc, response.findings[0].documents[0].document) + assertEquals(testDoc2, response.findings[0].documents[1].document) + } } diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/MonitorRestApiIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/MonitorRestApiIT.kt index 6e00302c2..773316a86 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/MonitorRestApiIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/MonitorRestApiIT.kt @@ -1082,38 +1082,6 @@ class MonitorRestApiIT : AlertingRestTestCase() { } } - private fun validateAlertingStatsNodeResponse(nodesResponse: Map) { - assertEquals("Incorrect number of nodes", numberOfNodes, nodesResponse["total"]) - assertEquals("Failed nodes found during monitor stats call", 0, nodesResponse["failed"]) - assertEquals("More than $numberOfNodes successful node", numberOfNodes, nodesResponse["successful"]) - } - - private fun isMonitorScheduled(monitorId: String, alertingStatsResponse: Map): Boolean { - val nodesInfo = alertingStatsResponse["nodes"] as Map - for (nodeId in nodesInfo.keys) { - val nodeInfo = nodesInfo[nodeId] as Map - val jobsInfo = nodeInfo["jobs_info"] as Map - if (jobsInfo.keys.contains(monitorId)) { - return true - } - } - - return false - } - - private fun assertAlertingStatsSweeperEnabled(alertingStatsResponse: Map, expected: Boolean) { - assertEquals( - "Legacy scheduled job enabled field is not set to $expected", - expected, - alertingStatsResponse[statsResponseOpendistroSweeperEnabledField] - ) - assertEquals( - "Scheduled job is not ${if (expected) "enabled" else "disabled"}", - expected, - alertingStatsResponse[statsResponseOpenSearchSweeperEnabledField] - ) - } - @Throws(Exception::class) fun `test creating a document monitor`() { val testIndex = createTestIndex() @@ -1227,4 +1195,36 @@ class MonitorRestApiIT : AlertingRestTestCase() { ) } } + + private fun validateAlertingStatsNodeResponse(nodesResponse: Map) { + assertEquals("Incorrect number of nodes", numberOfNodes, nodesResponse["total"]) + assertEquals("Failed nodes found during monitor stats call", 0, nodesResponse["failed"]) + assertEquals("More than $numberOfNodes successful node", numberOfNodes, nodesResponse["successful"]) + } + + private fun isMonitorScheduled(monitorId: String, alertingStatsResponse: Map): Boolean { + val nodesInfo = alertingStatsResponse["nodes"] as Map + for (nodeId in nodesInfo.keys) { + val nodeInfo = nodesInfo[nodeId] as Map + val jobsInfo = nodeInfo["jobs_info"] as Map + if (jobsInfo.keys.contains(monitorId)) { + return true + } + } + + return false + } + + private fun assertAlertingStatsSweeperEnabled(alertingStatsResponse: Map, expected: Boolean) { + assertEquals( + "Legacy scheduled job enabled field is not set to $expected", + expected, + alertingStatsResponse[statsResponseOpendistroSweeperEnabledField] + ) + assertEquals( + "Scheduled job is not ${if (expected) "enabled" else "disabled"}", + expected, + alertingStatsResponse[statsResponseOpenSearchSweeperEnabledField] + ) + } } diff --git a/core/src/main/kotlin/org/opensearch/alerting/core/model/ScheduledJob.kt b/core/src/main/kotlin/org/opensearch/alerting/core/model/ScheduledJob.kt index 95e48d7e5..fb595d9f0 100644 --- a/core/src/main/kotlin/org/opensearch/alerting/core/model/ScheduledJob.kt +++ b/core/src/main/kotlin/org/opensearch/alerting/core/model/ScheduledJob.kt @@ -36,7 +36,7 @@ interface ScheduledJob : Writeable, ToXContentObject { companion object { /** The name of the ElasticSearch index in which we store jobs */ const val SCHEDULED_JOBS_INDEX = ".opendistro-alerting-config" - const val DOC_LEVEL_QUERIES_INDEX = ".opendistro-alerting-queries" + const val DOC_LEVEL_QUERIES_INDEX = ".opensearch-alerting-queries" const val NO_ID = ""