Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean up doc level queries on dry run #1430

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,9 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
monitorMetadata.copy(lastRunContext = updatedLastRunContext),
true
)
} else {
// Clean up any queries created by the dry run monitor
monitorCtx.docLevelMonitorQueries!!.deleteDocLevelQueriesOnDryRun(monitorMetadata)
}

// TODO: Update the Document as part of the Trigger and return back the trigger action result
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import org.opensearch.action.admin.indices.alias.Alias
import org.opensearch.action.admin.indices.create.CreateIndexRequest
import org.opensearch.action.admin.indices.create.CreateIndexResponse
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest
import org.opensearch.action.admin.indices.exists.indices.IndicesExistsRequest
import org.opensearch.action.admin.indices.exists.indices.IndicesExistsResponse
import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest
import org.opensearch.action.admin.indices.rollover.RolloverRequest
import org.opensearch.action.admin.indices.rollover.RolloverResponse
Expand All @@ -38,8 +40,16 @@ import org.opensearch.commons.alerting.model.DocLevelMonitorInput
import org.opensearch.commons.alerting.model.DocLevelQuery
import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.commons.alerting.model.ScheduledJob
import org.opensearch.core.action.ActionListener
import org.opensearch.core.rest.RestStatus
import org.opensearch.index.mapper.MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING
import org.opensearch.index.query.QueryBuilders
import org.opensearch.index.reindex.BulkByScrollResponse
import org.opensearch.index.reindex.DeleteByQueryAction
import org.opensearch.index.reindex.DeleteByQueryRequestBuilder
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
import kotlin.coroutines.suspendCoroutine

private val log = LogManager.getLogger(DocLevelMonitorQueries::class.java)

Expand Down Expand Up @@ -134,6 +144,42 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ
return true
}

suspend fun deleteDocLevelQueriesOnDryRun(monitorMetadata: MonitorMetadata) {
try {
monitorMetadata.sourceToQueryIndexMapping.forEach { (_, queryIndex) ->
val indicesExistsResponse: IndicesExistsResponse =
client.suspendUntil {
client.admin().indices().exists(IndicesExistsRequest(queryIndex), it)
}
if (indicesExistsResponse.isExists == false) {
return
}

val queryBuilder = QueryBuilders.boolQuery()
.must(QueryBuilders.existsQuery("monitor_id"))
.mustNot(QueryBuilders.wildcardQuery("monitor_id", "*"))

val response: BulkByScrollResponse = suspendCoroutine { cont ->
DeleteByQueryRequestBuilder(client, DeleteByQueryAction.INSTANCE)
.source(queryIndex)
.filter(queryBuilder)
.refresh(true)
.execute(
object : ActionListener<BulkByScrollResponse> {
override fun onResponse(response: BulkByScrollResponse) = cont.resume(response)
override fun onFailure(t: Exception) = cont.resumeWithException(t)
}
)
}
response.bulkFailures.forEach {
log.error("Failed deleting queries while removing dry run queries: [${it.id}] cause: [${it.cause}] ")
}
}
} catch (e: Exception) {
log.error("Failed to delete doc level queries on dry run", e)
}
}

fun docLevelQueryIndexExists(dataSources: DataSources): Boolean {
val clusterState = clusterService.state()
return clusterState.metadata.hasAlias(dataSources.queryIndex)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,21 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() {

val alerts = searchAlerts(monitor)
assertEquals("Alert saved for test monitor", 0, alerts.size)

// ensure doc level query is deleted on dry run
val request = """{
"size": 10,
"query": {
"match_all": {}
}
}"""
var httpResponse = adminClient().makeRequest(
"GET", "/${monitor.dataSources.queryIndex}/_search",
StringEntity(request, ContentType.APPLICATION_JSON)
)
assertEquals("Search failed", RestStatus.OK, httpResponse.restStatus())
var searchResponse = SearchResponse.fromXContent(createParser(JsonXContent.jsonXContent, httpResponse.entity.content))
searchResponse.hits.totalHits?.let { assertEquals("Query saved in query index", 0L, it.value) }
}

fun `test execute monitor returns search result with dryrun`() {
Expand Down Expand Up @@ -105,6 +120,120 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() {
assertEquals("Incorrect search result", 2, matchingDocsToQuery.size)
assertTrue("Incorrect search result", matchingDocsToQuery.contains("1|$testIndex"))
assertTrue("Incorrect search result", matchingDocsToQuery.contains("5|$testIndex"))

// ensure doc level query is deleted on dry run
val request = """{
"size": 10,
"query": {
"match_all": {}
}
}"""
var httpResponse = adminClient().makeRequest(
"GET", "/${monitor.dataSources.queryIndex}/_search",
StringEntity(request, ContentType.APPLICATION_JSON)
)
assertEquals("Search failed", RestStatus.OK, httpResponse.restStatus())
var searchResponse = SearchResponse.fromXContent(createParser(JsonXContent.jsonXContent, httpResponse.entity.content))
searchResponse.hits.totalHits?.let { assertEquals("Query saved in query index", 0L, it.value) }
}

fun `test execute monitor returns search result with dryrun then without dryrun ensure dry run query not saved`() {
val testIndex = createTestIndex()
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
val testDoc = """{
"message" : "This is an error from IAD region",
"test_strict_date_time" : "$testTime",
"test_field" : "us-west-2"
}"""

val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3", fields = listOf())
val docLevelInput = DocLevelMonitorInput("description", listOf(testIndex), listOf(docQuery))

val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
val monitor = randomDocumentLevelMonitor(inputs = listOf(docLevelInput), triggers = listOf(trigger))

indexDoc(testIndex, "1", testDoc)
indexDoc(testIndex, "2", testDoc)

val response = executeMonitor(monitor, params = DRYRUN_MONITOR)

val output = entityAsMap(response)

assertEquals(monitor.name, output["monitor_name"])
@Suppress("UNCHECKED_CAST")
val searchResult = (output.objectMap("input_results")["results"] as List<Map<String, Any>>).first()
@Suppress("UNCHECKED_CAST")
val matchingDocsToQuery = searchResult[docQuery.id] as List<String>
assertEquals("Incorrect search result", 2, matchingDocsToQuery.size)
assertTrue("Incorrect search result", matchingDocsToQuery.contains("1|$testIndex"))
assertTrue("Incorrect search result", matchingDocsToQuery.contains("2|$testIndex"))

// ensure doc level query is deleted on dry run
val request = """{
"size": 10,
"query": {
"match_all": {}
}
}"""
var httpResponse = adminClient().makeRequest(
"GET", "/${monitor.dataSources.queryIndex}/_search",
StringEntity(request, ContentType.APPLICATION_JSON)
)
assertEquals("Search failed", RestStatus.OK, httpResponse.restStatus())
var searchResponse = SearchResponse.fromXContent(createParser(JsonXContent.jsonXContent, httpResponse.entity.content))
searchResponse.hits.totalHits?.let { assertEquals(0L, it.value) }

// create and execute second monitor not as dryrun
val testIndex2 = createTestIndex("test1")
val testTime2 = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
val testDoc2 = """{
"message" : "This is an error from IAD region",
"test_strict_date_time" : "$testTime2",
"test_field" : "us-east-1"
}"""

val docQuery2 = DocLevelQuery(query = "test_field:\"us-east-1\"", name = "3", fields = listOf())
val docLevelInput2 = DocLevelMonitorInput("description", listOf(testIndex2), listOf(docQuery2))

val trigger2 = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
val monitor2 = createMonitor(randomDocumentLevelMonitor(inputs = listOf(docLevelInput2), triggers = listOf(trigger2)))
assertNotNull(monitor2.id)

indexDoc(testIndex2, "1", testDoc2)
indexDoc(testIndex2, "5", testDoc2)

val response2 = executeMonitor(monitor2.id)
val output2 = entityAsMap(response2)

assertEquals(monitor2.name, output2["monitor_name"])
@Suppress("UNCHECKED_CAST")
val searchResult2 = (output2.objectMap("input_results")["results"] as List<Map<String, Any>>).first()
@Suppress("UNCHECKED_CAST")
val matchingDocsToQuery2 = searchResult2[docQuery2.id] as List<String>
assertEquals("Incorrect search result", 2, matchingDocsToQuery2.size)
assertTrue("Incorrect search result", matchingDocsToQuery2.containsAll(listOf("1|$testIndex2", "5|$testIndex2")))

val alerts = searchAlertsWithFilter(monitor2)
assertEquals("Alert saved for test monitor", 2, alerts.size)

val findings = searchFindings(monitor2)
assertEquals("Findings saved for test monitor", 2, findings.size)
assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("1"))
assertTrue("Findings saved for test monitor", findings[1].relatedDocIds.contains("5"))

// ensure query from second monitor was saved
val expectedQueries = listOf("test_field_test1_${monitor2.id}:\"us-east-1\"")
httpResponse = adminClient().makeRequest(
"GET", "/${monitor.dataSources.queryIndex}/_search",
StringEntity(request, ContentType.APPLICATION_JSON)
)
assertEquals("Search failed", RestStatus.OK, httpResponse.restStatus())
searchResponse = SearchResponse.fromXContent(createParser(JsonXContent.jsonXContent, httpResponse.entity.content))
searchResponse.hits.forEach { hit ->
val query = ((hit.sourceAsMap["query"] as Map<String, Any>)["query_string"] as Map<String, Any>)["query"]
assertTrue(expectedQueries.contains(query))
}
searchResponse.hits.totalHits?.let { assertEquals("Query saved in query index", 1L, it.value) }
}

fun `test execute monitor generates alerts and findings`() {
Expand Down
Loading