Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

log error messages and clean up monitor when indexing doc level queries or metadata creation fails #900

Merged
merged 2 commits into from
May 10, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@

package org.opensearch.alerting.transport

import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.launch
import org.apache.logging.log4j.LogManager
import org.opensearch.OpenSearchStatusException
Expand All @@ -25,6 +24,7 @@ import org.opensearch.action.search.SearchResponse
import org.opensearch.action.support.ActionFilters
import org.opensearch.action.support.HandledTransportAction
import org.opensearch.action.support.IndicesOptions
import org.opensearch.action.support.WriteRequest
import org.opensearch.action.support.master.AcknowledgedResponse
import org.opensearch.alerting.MonitorMetadataService
import org.opensearch.alerting.opensearchapi.suspendUntil
Expand Down Expand Up @@ -57,6 +57,7 @@ import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
import kotlin.coroutines.suspendCoroutine

private val scope: CoroutineScope = CoroutineScope(Dispatchers.IO)
private val log = LogManager.getLogger(TransportDeleteMonitorAction::class.java)

class TransportDeleteMonitorAction @Inject constructor(
Expand Down Expand Up @@ -87,8 +88,7 @@ class TransportDeleteMonitorAction @Inject constructor(
if (!validateUserBackendRoles(user, actionListener)) {
return
}

GlobalScope.launch(Dispatchers.IO + CoroutineName("DeleteMonitorAction")) {
scope.launch {
DeleteMonitorHandler(client, actionListener, deleteRequest, user, transformedRequest.monitorId).resolveUserAndStart()
}
}
Expand All @@ -109,16 +109,15 @@ class TransportDeleteMonitorAction @Inject constructor(
checkUserPermissionsWithResource(user, monitor.user, actionListener, "monitor", monitorId)

if (canDelete) {
val deleteResponse = deleteMonitor(monitor)
deleteDocLevelMonitorQueriesAndIndices(monitor)
deleteMetadata(monitor)
val deleteResponse = deleteAllResourcesForMonitor(client, monitor, deleteRequest, monitorId)
actionListener.onResponse(DeleteMonitorResponse(deleteResponse.id, deleteResponse.version))
} else {
actionListener.onFailure(
AlertingException("Not allowed to delete this monitor!", RestStatus.FORBIDDEN, IllegalStateException())
)
}
} catch (t: Exception) {
log.error("Failed to delete monitor ${deleteRequest.id()}", t)
actionListener.onFailure(AlertingException.wrap(t))
}
}
Expand All @@ -140,68 +139,102 @@ class TransportDeleteMonitorAction @Inject constructor(
)
return ScheduledJob.parse(xcp, getResponse.id, getResponse.version) as Monitor
}
}

companion object {
@JvmStatic
suspend fun deleteAllResourcesForMonitor(
client: Client,
monitor: Monitor,
deleteRequest: DeleteRequest,
monitorId: String,
): DeleteResponse {
val deleteResponse = deleteMonitorDocument(client, deleteRequest)
deleteMetadata(client, monitor)
deleteDocLevelMonitorQueriesAndIndices(client, monitor, monitorId)
return deleteResponse
}

private suspend fun deleteMonitor(monitor: Monitor): DeleteResponse {
private suspend fun deleteMonitorDocument(client: Client, deleteRequest: DeleteRequest): DeleteResponse {
return client.suspendUntil { delete(deleteRequest, it) }
}

private suspend fun deleteMetadata(monitor: Monitor) {
suspend fun deleteMetadata(client: Client, monitor: Monitor) {
val deleteRequest = DeleteRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, "${monitor.id}-metadata")
val deleteResponse: DeleteResponse = client.suspendUntil { delete(deleteRequest, it) }
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
try {
val deleteResponse: DeleteResponse = client.suspendUntil { delete(deleteRequest, it) }
log.debug("Monitor metadata: ${deleteResponse.id} deletion result: ${deleteResponse.result}")
} catch (e: Exception) {
// we only log the error and don't fail the request because if monitor document has been deleted,
// we cannot retry based on this failure
log.error("Failed to delete workflow metadata for monitor ${monitor.id}.", e)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

monitor metadata instead of workflow metadata

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets put the workflow metadata id in here, so it gives us the ability to delete it manually if needed.

}
}

private suspend fun deleteDocLevelMonitorQueriesAndIndices(monitor: Monitor) {
val clusterState = clusterService.state()
val metadata = MonitorMetadataService.getMetadata(monitor)
metadata?.sourceToQueryIndexMapping?.forEach { (_, queryIndex) ->

val indicesExistsResponse: IndicesExistsResponse =
client.suspendUntil {
client.admin().indices().exists(IndicesExistsRequest(queryIndex), it)
suspend fun deleteDocLevelMonitorQueriesAndIndices(
client: Client,
monitor: Monitor,
monitorId: String,
) {
try {
val metadata = MonitorMetadataService.getMetadata(monitor)
metadata?.sourceToQueryIndexMapping?.forEach { (_, queryIndex) ->

val indicesExistsResponse: IndicesExistsResponse =
client.suspendUntil {
client.admin().indices().exists(IndicesExistsRequest(queryIndex), it)
}
if (indicesExistsResponse.isExists == false) {
return
}
if (indicesExistsResponse.isExists == false) {
return
}
// Check if there's any queries from other monitors in this queryIndex,
// to avoid unnecessary doc deletion, if we could just delete index completely
val searchResponse: SearchResponse = client.suspendUntil {
search(
SearchRequest(queryIndex).source(
SearchSourceBuilder()
.size(0)
.query(
QueryBuilders.boolQuery().mustNot(
QueryBuilders.matchQuery("monitor_id", monitorId)
// Check if there's any queries from other monitors in this queryIndex,
// to avoid unnecessary doc deletion, if we could just delete index completely
val searchResponse: SearchResponse = client.suspendUntil {
search(
SearchRequest(queryIndex).source(
SearchSourceBuilder()
.size(0)
.query(
QueryBuilders.boolQuery().mustNot(
QueryBuilders.matchQuery("monitor_id", monitorId)
)
)
)
).indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_HIDDEN),
it
)
}
if (searchResponse.hits.totalHits.value == 0L) {
val ack: AcknowledgedResponse = client.suspendUntil {
client.admin().indices().delete(
DeleteIndexRequest(queryIndex).indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_HIDDEN), it
).indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_HIDDEN),
it
)
}
if (ack.isAcknowledged == false) {
log.error("Deletion of concrete queryIndex:$queryIndex is not ack'd!")
}
} else {
// Delete all queries added by this monitor
val response: BulkByScrollResponse = suspendCoroutine { cont ->
DeleteByQueryRequestBuilder(client, DeleteByQueryAction.INSTANCE)
.source(queryIndex)
.filter(QueryBuilders.matchQuery("monitor_id", monitorId))
.refresh(true)
.execute(
object : ActionListener<BulkByScrollResponse> {
override fun onResponse(response: BulkByScrollResponse) = cont.resume(response)
override fun onFailure(t: Exception) = cont.resumeWithException(t)
}
if (searchResponse.hits.totalHits.value == 0L) {
val ack: AcknowledgedResponse = client.suspendUntil {
client.admin().indices().delete(
DeleteIndexRequest(queryIndex).indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_HIDDEN), it
)
}
if (ack.isAcknowledged == false) {
log.error("Deletion of concrete queryIndex:$queryIndex is not ack'd!")
}
} else {
// Delete all queries added by this monitor
val response: BulkByScrollResponse = suspendCoroutine { cont ->
DeleteByQueryRequestBuilder(client, DeleteByQueryAction.INSTANCE)
.source(queryIndex)
.filter(QueryBuilders.matchQuery("monitor_id", monitorId))
.refresh(true)
.execute(
object : ActionListener<BulkByScrollResponse> {
override fun onResponse(response: BulkByScrollResponse) = cont.resume(response)
override fun onFailure(t: Exception) {
cont.resumeWithException(t)
}
}
)
}
}
}
} catch (e: Exception) {
// we only log the error and don't fail the request because if monitor document has been deleted successfully,
// we cannot retry based on this failure
log.error("Failed to delete workflow metadata for monitor ${monitor.id}.", e)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

monitor metadata instead of workflow metadata

}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import org.opensearch.action.admin.cluster.health.ClusterHealthAction
import org.opensearch.action.admin.cluster.health.ClusterHealthRequest
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse
import org.opensearch.action.admin.indices.create.CreateIndexResponse
import org.opensearch.action.delete.DeleteRequest
import org.opensearch.action.get.GetRequest
import org.opensearch.action.get.GetResponse
import org.opensearch.action.index.IndexRequest
Expand Down Expand Up @@ -177,7 +178,7 @@ class TransportIndexMonitorAction @Inject constructor(
client: Client,
actionListener: ActionListener<IndexMonitorResponse>,
request: IndexMonitorRequest,
user: User?
user: User?,
) {
val indices = mutableListOf<String>()
// todo: for doc level alerting: check if index is present before monitor is created.
Expand Down Expand Up @@ -230,7 +231,7 @@ class TransportIndexMonitorAction @Inject constructor(
client: Client,
actionListener: ActionListener<IndexMonitorResponse>,
request: IndexMonitorRequest,
user: User?
user: User?,
) {
client.threadPool().threadContext.stashContext().use {
IndexMonitorHandler(client, actionListener, request, user).resolveUserAndStartForAD()
Expand All @@ -241,7 +242,7 @@ class TransportIndexMonitorAction @Inject constructor(
private val client: Client,
private val actionListener: ActionListener<IndexMonitorResponse>,
private val request: IndexMonitorRequest,
private val user: User?
private val user: User?,
) {

fun resolveUserAndStart() {
Expand Down Expand Up @@ -492,16 +493,30 @@ class TransportIndexMonitorAction @Inject constructor(
)
return
}
request.monitor = request.monitor.copy(id = indexResponse.id)
var (metadata, created) = MonitorMetadataService.getOrCreateMetadata(request.monitor)
if (created == false) {
log.warn("Metadata doc id:${metadata.id} exists, but it shouldn't!")
var metadata: MonitorMetadata?
try { // delete monitor if metadata creation fails, log the right error and re-throw the error to fail listener
request.monitor = request.monitor.copy(id = indexResponse.id)
var (monitorMetadata: MonitorMetadata, created: Boolean) = MonitorMetadataService.getOrCreateMetadata(request.monitor)
if (created == false) {
log.warn("Metadata doc id:${monitorMetadata.id} exists, but it shouldn't!")
}
metadata = monitorMetadata
} catch (t: Exception) {
log.error("failed to create metadata for monitor ${indexResponse.id}. deleting monitor")
cleanupMonitorAfterPartialFailure(request.monitor, indexResponse)
throw t
}
if (request.monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR) {
indexDocLevelMonitorQueries(request.monitor, indexResponse.id, metadata, request.refreshPolicy)
try {
if (request.monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR) {
indexDocLevelMonitorQueries(request.monitor, indexResponse.id, metadata, request.refreshPolicy)
}
// When inserting queries in queryIndex we could update sourceToQueryIndexMapping
MonitorMetadataService.upsertMetadata(metadata, updating = true)
} catch (t: Exception) {
log.error("failed to index doc level queries monitor ${indexResponse.id}. deleting monitor", t)
cleanupMonitorAfterPartialFailure(request.monitor, indexResponse)
throw t
}
// When inserting queries in queryIndex we could update sourceToQueryIndexMapping
MonitorMetadataService.upsertMetadata(metadata, updating = true)

actionListener.onResponse(
IndexMonitorResponse(
Expand All @@ -514,6 +529,24 @@ class TransportIndexMonitorAction @Inject constructor(
}
}

private suspend fun cleanupMonitorAfterPartialFailure(monitor: Monitor, indexMonitorResponse: IndexResponse) {
// we simply log the success (debug log) or failure (error log) when we try clean up partially failed monitor creation request
try {
TransportDeleteMonitorAction.deleteAllResourcesForMonitor(
client,
monitor = monitor,
DeleteRequest(SCHEDULED_JOBS_INDEX, indexMonitorResponse.id).setRefreshPolicy(RefreshPolicy.IMMEDIATE),
indexMonitorResponse.id
)
log.debug(
"Cleaned up monitor related resources after monitor creation request partial failure. " +
"Monitor id : ${indexMonitorResponse.id}"
)
} catch (e: Exception) {
log.error("Failed to clean up monitor after monitor creation request partial failure", e)
}
}

@Suppress("UNCHECKED_CAST")
private suspend fun indexDocLevelMonitorQueries(
monitor: Monitor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package org.opensearch.alerting

import org.junit.Assert
import org.opensearch.action.admin.cluster.state.ClusterStateRequest
import org.opensearch.action.admin.indices.alias.Alias
import org.opensearch.action.admin.indices.close.CloseIndexRequest
import org.opensearch.action.admin.indices.create.CreateIndexRequest
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest
Expand Down Expand Up @@ -755,6 +756,59 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
assertEquals("Didn't match all 4 queries", 1, findings[0].docLevelQueries.size)
}

fun `test cleanup monitor on partial create monitor failure`() {
val docQuery = DocLevelQuery(query = "dnbkjndsfkjbnds:\"us-west-2\"", name = "3")
val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery))
val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
val customQueryIndex = "custom_alerts_index"
val analyzer = "dfbdfbafd"
val testDoc = """{
"rule": {"title": "some_title"},
"message": "msg 1 2 3 4"
}"""
indexDoc(index, "2", testDoc)
client().admin().indices()
.create(
CreateIndexRequest(customQueryIndex + "-000001").alias(Alias(customQueryIndex))
.mapping(
"""
{
"_meta": {
"schema_version": 1
},
"properties": {
"query": {
"type": "percolator_ext"
},
"monitor_id": {
"type": "text"
},
"index": {
"type": "text"
}
}
}
""".trimIndent()
)
).get()

client().admin().indices().close(CloseIndexRequest(customQueryIndex + "-000001")).get()
var monitor = randomDocumentLevelMonitor(
inputs = listOf(docLevelInput),
triggers = listOf(trigger),
dataSources = DataSources(
queryIndex = customQueryIndex,
queryIndexMappingsByType = mapOf(Pair("text", mapOf(Pair("analyzer", analyzer)))),
)
)
try {
createMonitor(monitor)
fail("monitor creation should fail due to incorrect analyzer name in test setup")
} catch (e: Exception) {
Assert.assertEquals(client().search(SearchRequest(SCHEDULED_JOBS_INDEX)).get().hits.hits.size, 0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assert queryIndex has 0 docs and no new mappings applied

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We cant assert that because we dont fail delete monitor request when query index docs clean up fails.
So even though we call delete monitor there is no guarantee of that. It's only a best effort clean up

}
}

fun `test execute monitor without create when no monitors exists`() {
val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3")
val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery))
Expand Down
Loading