Skip to content

Commit

Permalink
log errors and clean up monitor when indexing doc level queries or me…
Browse files Browse the repository at this point in the history
…tadata creation fails

Signed-off-by: Surya Sashank Nistala <snistala@amazon.com>
  • Loading branch information
eirsep committed May 10, 2023
1 parent 461e95f commit 6d28218
Show file tree
Hide file tree
Showing 4 changed files with 251 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@

package org.opensearch.alerting.transport

import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.launch
import org.apache.logging.log4j.LogManager
import org.opensearch.OpenSearchStatusException
Expand All @@ -25,6 +24,7 @@ import org.opensearch.action.search.SearchResponse
import org.opensearch.action.support.ActionFilters
import org.opensearch.action.support.HandledTransportAction
import org.opensearch.action.support.IndicesOptions
import org.opensearch.action.support.WriteRequest
import org.opensearch.action.support.master.AcknowledgedResponse
import org.opensearch.alerting.MonitorMetadataService
import org.opensearch.alerting.opensearchapi.suspendUntil
Expand Down Expand Up @@ -57,6 +57,7 @@ import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
import kotlin.coroutines.suspendCoroutine

private val scope: CoroutineScope = CoroutineScope(Dispatchers.IO)
private val log = LogManager.getLogger(TransportDeleteMonitorAction::class.java)

class TransportDeleteMonitorAction @Inject constructor(
Expand Down Expand Up @@ -87,8 +88,7 @@ class TransportDeleteMonitorAction @Inject constructor(
if (!validateUserBackendRoles(user, actionListener)) {
return
}

GlobalScope.launch(Dispatchers.IO + CoroutineName("DeleteMonitorAction")) {
scope.launch {
DeleteMonitorHandler(client, actionListener, deleteRequest, user, transformedRequest.monitorId).resolveUserAndStart()
}
}
Expand All @@ -109,16 +109,15 @@ class TransportDeleteMonitorAction @Inject constructor(
checkUserPermissionsWithResource(user, monitor.user, actionListener, "monitor", monitorId)

if (canDelete) {
val deleteResponse = deleteMonitor(monitor)
deleteDocLevelMonitorQueriesAndIndices(monitor)
deleteMetadata(monitor)
val deleteResponse = deleteAllResourcesForMonitor(client, monitor, deleteRequest, monitorId)
actionListener.onResponse(DeleteMonitorResponse(deleteResponse.id, deleteResponse.version))
} else {
actionListener.onFailure(
AlertingException("Not allowed to delete this monitor!", RestStatus.FORBIDDEN, IllegalStateException())
)
}
} catch (t: Exception) {
log.error("Failed to delete monitor ${deleteRequest.id()}", t)
actionListener.onFailure(AlertingException.wrap(t))
}
}
Expand All @@ -140,68 +139,102 @@ class TransportDeleteMonitorAction @Inject constructor(
)
return ScheduledJob.parse(xcp, getResponse.id, getResponse.version) as Monitor
}
}

companion object {
@JvmStatic
suspend fun deleteAllResourcesForMonitor(
client: Client,
monitor: Monitor,
deleteRequest: DeleteRequest,
monitorId: String,
): DeleteResponse {
val deleteResponse = deleteMonitorDocument(client, deleteRequest)
deleteMetadata(client, monitor)
deleteDocLevelMonitorQueriesAndIndices(client, monitor, monitorId)
return deleteResponse
}

private suspend fun deleteMonitor(monitor: Monitor): DeleteResponse {
private suspend fun deleteMonitorDocument(client: Client, deleteRequest: DeleteRequest): DeleteResponse {
return client.suspendUntil { delete(deleteRequest, it) }
}

private suspend fun deleteMetadata(monitor: Monitor) {
suspend fun deleteMetadata(client: Client, monitor: Monitor) {
val deleteRequest = DeleteRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, "${monitor.id}-metadata")
val deleteResponse: DeleteResponse = client.suspendUntil { delete(deleteRequest, it) }
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
try {
val deleteResponse: DeleteResponse = client.suspendUntil { delete(deleteRequest, it) }
log.debug("Monitor metadata: ${deleteResponse.id} deletion result: ${deleteResponse.result}")
} catch (e: Exception) {
// we only log the error and don't fail the request because if monitor document has been deleted,
// we cannot retry based on this failure
log.error("Failed to delete workflow metadata for monitor ${monitor.id}.", e)
}
}

private suspend fun deleteDocLevelMonitorQueriesAndIndices(monitor: Monitor) {
val clusterState = clusterService.state()
val metadata = MonitorMetadataService.getMetadata(monitor)
metadata?.sourceToQueryIndexMapping?.forEach { (_, queryIndex) ->

val indicesExistsResponse: IndicesExistsResponse =
client.suspendUntil {
client.admin().indices().exists(IndicesExistsRequest(queryIndex), it)
suspend fun deleteDocLevelMonitorQueriesAndIndices(
client: Client,
monitor: Monitor,
monitorId: String,
) {
try {
val metadata = MonitorMetadataService.getMetadata(monitor)
metadata?.sourceToQueryIndexMapping?.forEach { (_, queryIndex) ->

val indicesExistsResponse: IndicesExistsResponse =
client.suspendUntil {
client.admin().indices().exists(IndicesExistsRequest(queryIndex), it)
}
if (indicesExistsResponse.isExists == false) {
return
}
if (indicesExistsResponse.isExists == false) {
return
}
// Check if there's any queries from other monitors in this queryIndex,
// to avoid unnecessary doc deletion, if we could just delete index completely
val searchResponse: SearchResponse = client.suspendUntil {
search(
SearchRequest(queryIndex).source(
SearchSourceBuilder()
.size(0)
.query(
QueryBuilders.boolQuery().mustNot(
QueryBuilders.matchQuery("monitor_id", monitorId)
// Check if there's any queries from other monitors in this queryIndex,
// to avoid unnecessary doc deletion, if we could just delete index completely
val searchResponse: SearchResponse = client.suspendUntil {
search(
SearchRequest(queryIndex).source(
SearchSourceBuilder()
.size(0)
.query(
QueryBuilders.boolQuery().mustNot(
QueryBuilders.matchQuery("monitor_id", monitorId)
)
)
)
).indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_HIDDEN),
it
)
}
if (searchResponse.hits.totalHits.value == 0L) {
val ack: AcknowledgedResponse = client.suspendUntil {
client.admin().indices().delete(
DeleteIndexRequest(queryIndex).indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_HIDDEN), it
).indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_HIDDEN),
it
)
}
if (ack.isAcknowledged == false) {
log.error("Deletion of concrete queryIndex:$queryIndex is not ack'd!")
}
} else {
// Delete all queries added by this monitor
val response: BulkByScrollResponse = suspendCoroutine { cont ->
DeleteByQueryRequestBuilder(client, DeleteByQueryAction.INSTANCE)
.source(queryIndex)
.filter(QueryBuilders.matchQuery("monitor_id", monitorId))
.refresh(true)
.execute(
object : ActionListener<BulkByScrollResponse> {
override fun onResponse(response: BulkByScrollResponse) = cont.resume(response)
override fun onFailure(t: Exception) = cont.resumeWithException(t)
}
if (searchResponse.hits.totalHits.value == 0L) {
val ack: AcknowledgedResponse = client.suspendUntil {
client.admin().indices().delete(
DeleteIndexRequest(queryIndex).indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_HIDDEN), it
)
}
if (ack.isAcknowledged == false) {
log.error("Deletion of concrete queryIndex:$queryIndex is not ack'd!")
}
} else {
// Delete all queries added by this monitor
val response: BulkByScrollResponse = suspendCoroutine { cont ->
DeleteByQueryRequestBuilder(client, DeleteByQueryAction.INSTANCE)
.source(queryIndex)
.filter(QueryBuilders.matchQuery("monitor_id", monitorId))
.refresh(true)
.execute(
object : ActionListener<BulkByScrollResponse> {
override fun onResponse(response: BulkByScrollResponse) = cont.resume(response)
override fun onFailure(t: Exception) {
cont.resumeWithException(t)
}
}
)
}
}
}
} catch (e: Exception) {
// we only log the error and don't fail the request because if monitor document has been deleted successfully,
// we cannot retry based on this failure
log.error("Failed to delete workflow metadata for monitor ${monitor.id}.", e)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import org.opensearch.action.admin.cluster.health.ClusterHealthAction
import org.opensearch.action.admin.cluster.health.ClusterHealthRequest
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse
import org.opensearch.action.admin.indices.create.CreateIndexResponse
import org.opensearch.action.delete.DeleteRequest
import org.opensearch.action.get.GetRequest
import org.opensearch.action.get.GetResponse
import org.opensearch.action.index.IndexRequest
Expand Down Expand Up @@ -177,7 +178,7 @@ class TransportIndexMonitorAction @Inject constructor(
client: Client,
actionListener: ActionListener<IndexMonitorResponse>,
request: IndexMonitorRequest,
user: User?
user: User?,
) {
val indices = mutableListOf<String>()
// todo: for doc level alerting: check if index is present before monitor is created.
Expand Down Expand Up @@ -230,7 +231,7 @@ class TransportIndexMonitorAction @Inject constructor(
client: Client,
actionListener: ActionListener<IndexMonitorResponse>,
request: IndexMonitorRequest,
user: User?
user: User?,
) {
client.threadPool().threadContext.stashContext().use {
IndexMonitorHandler(client, actionListener, request, user).resolveUserAndStartForAD()
Expand All @@ -241,7 +242,7 @@ class TransportIndexMonitorAction @Inject constructor(
private val client: Client,
private val actionListener: ActionListener<IndexMonitorResponse>,
private val request: IndexMonitorRequest,
private val user: User?
private val user: User?,
) {

fun resolveUserAndStart() {
Expand Down Expand Up @@ -492,16 +493,30 @@ class TransportIndexMonitorAction @Inject constructor(
)
return
}
request.monitor = request.monitor.copy(id = indexResponse.id)
var (metadata, created) = MonitorMetadataService.getOrCreateMetadata(request.monitor)
if (created == false) {
log.warn("Metadata doc id:${metadata.id} exists, but it shouldn't!")
var metadata: MonitorMetadata?
try { // delete monitor if metadata creation fails, log the right error and re-throw the error to fail listener
request.monitor = request.monitor.copy(id = indexResponse.id)
var (monitorMetadata: MonitorMetadata, created: Boolean) = MonitorMetadataService.getOrCreateMetadata(request.monitor)
if (created == false) {
log.warn("Metadata doc id:${monitorMetadata.id} exists, but it shouldn't!")
}
metadata = monitorMetadata
} catch (t: Exception) {
log.error("failed to create metadata for monitor ${indexResponse.id}. deleting monitor")
cleanupMonitorAfterPartialFailure(request.monitor, indexResponse)
throw t
}
if (request.monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR) {
indexDocLevelMonitorQueries(request.monitor, indexResponse.id, metadata, request.refreshPolicy)
try {
if (request.monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR) {
indexDocLevelMonitorQueries(request.monitor, indexResponse.id, metadata, request.refreshPolicy)
}
// When inserting queries in queryIndex we could update sourceToQueryIndexMapping
MonitorMetadataService.upsertMetadata(metadata, updating = true)
} catch (t: Exception) {
log.error("failed to index doc level queries monitor ${indexResponse.id}. deleting monitor", t)
cleanupMonitorAfterPartialFailure(request.monitor, indexResponse)
throw t
}
// When inserting queries in queryIndex we could update sourceToQueryIndexMapping
MonitorMetadataService.upsertMetadata(metadata, updating = true)

actionListener.onResponse(
IndexMonitorResponse(
Expand All @@ -514,6 +529,24 @@ class TransportIndexMonitorAction @Inject constructor(
}
}

private suspend fun cleanupMonitorAfterPartialFailure(monitor: Monitor, indexMonitorResponse: IndexResponse) {
// we simply log the success (debug log) or failure (error log) when we try clean up partially failed monitor creation request
try {
TransportDeleteMonitorAction.deleteAllResourcesForMonitor(
client,
monitor = monitor,
DeleteRequest(SCHEDULED_JOBS_INDEX, indexMonitorResponse.id).setRefreshPolicy(RefreshPolicy.IMMEDIATE),
indexMonitorResponse.id
)
log.debug(
"Cleaned up monitor related resources after monitor creation request partial failure. " +
"Monitor id : ${indexMonitorResponse.id}"
)
} catch (e: Exception) {
log.error("Failed to clean up monitor after monitor creation request partial failure", e)
}
}

@Suppress("UNCHECKED_CAST")
private suspend fun indexDocLevelMonitorQueries(
monitor: Monitor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package org.opensearch.alerting

import org.junit.Assert
import org.opensearch.action.admin.cluster.state.ClusterStateRequest
import org.opensearch.action.admin.indices.alias.Alias
import org.opensearch.action.admin.indices.close.CloseIndexRequest
import org.opensearch.action.admin.indices.create.CreateIndexRequest
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest
Expand Down Expand Up @@ -755,6 +756,59 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
assertEquals("Didn't match all 4 queries", 1, findings[0].docLevelQueries.size)
}

fun `test cleanup monitor on partial create monitor failure`() {
val docQuery = DocLevelQuery(query = "dnbkjndsfkjbnds:\"us-west-2\"", name = "3")
val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery))
val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
val customQueryIndex = "custom_alerts_index"
val analyzer = "dfbdfbafd"
val testDoc = """{
"rule": {"title": "some_title"},
"message": "msg 1 2 3 4"
}"""
indexDoc(index, "2", testDoc)
client().admin().indices()
.create(
CreateIndexRequest(customQueryIndex + "-000001").alias(Alias(customQueryIndex))
.mapping(
"""
{
"_meta": {
"schema_version": 1
},
"properties": {
"query": {
"type": "percolator_ext"
},
"monitor_id": {
"type": "text"
},
"index": {
"type": "text"
}
}
}
""".trimIndent()
)
).get()

client().admin().indices().close(CloseIndexRequest(customQueryIndex + "-000001")).get()
var monitor = randomDocumentLevelMonitor(
inputs = listOf(docLevelInput),
triggers = listOf(trigger),
dataSources = DataSources(
queryIndex = customQueryIndex,
queryIndexMappingsByType = mapOf(Pair("text", mapOf(Pair("analyzer", analyzer)))),
)
)
try {
createMonitor(monitor)
fail("monitor creation should fail due to incorrect analyzer name in test setup")
} catch (e: Exception) {
Assert.assertEquals(client().search(SearchRequest(SCHEDULED_JOBS_INDEX)).get().hits.hits.size, 0)
}
}

fun `test execute monitor without create when no monitors exists`() {
val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3")
val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery))
Expand Down
Loading

0 comments on commit 6d28218

Please sign in to comment.