-
Notifications
You must be signed in to change notification settings - Fork 108
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
separate doc-level monitor query indices for externally defined monitors #1664
Changes from 1 commit
452a51c
b280761
a15d5a1
90620a0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -374,6 +374,13 @@ class DocumentLevelMonitorRunner : MonitorRunner() { | |
// Clean up any queries created by the dry run monitor | ||
monitorCtx.docLevelMonitorQueries!!.deleteDocLevelQueriesOnDryRun(monitorMetadata) | ||
} | ||
|
||
if (monitor.dataSources.queryIndex.contains("optimized")) { | ||
val ack = monitorCtx.docLevelMonitorQueries!!.deleteDocLevelQueryIndex(monitor.dataSources) | ||
if (!ack) { | ||
logger.error("Deletion of concrete queryIndex:${monitor.dataSources.queryIndex} is not ack'd!") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. plz log monitor id There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fixed it. |
||
} | ||
} | ||
// TODO: Update the Document as part of the Trigger and return back the trigger action result | ||
return monitorResult.copy(triggerResults = triggerResults, inputResults = inputRunResults) | ||
} catch (e: Exception) { | ||
|
@@ -385,6 +392,14 @@ class DocumentLevelMonitorRunner : MonitorRunner() { | |
RestStatus.INTERNAL_SERVER_ERROR, | ||
e | ||
) | ||
if (monitor.dataSources.queryIndex.contains("optimized") && | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. plz dont write this twice There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fixed it. |
||
monitorCtx.docLevelMonitorQueries!!.docLevelQueryIndexExists(monitor.dataSources) | ||
) { | ||
val ack = monitorCtx.docLevelMonitorQueries!!.deleteDocLevelQueryIndex(monitor.dataSources) | ||
if (!ack) { | ||
logger.error("Retry deletion of concrete queryIndex:${monitor.dataSources.queryIndex} is not ack'd!") | ||
} | ||
} | ||
return monitorResult.copy(error = alertingException, inputResults = InputRunResults(emptyList(), alertingException)) | ||
} finally { | ||
val endTime = System.currentTimeMillis() | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -95,57 +95,69 @@ object DeleteMonitorService : | |
|
||
private suspend fun deleteDocLevelMonitorQueriesAndIndices(monitor: Monitor) { | ||
try { | ||
val metadata = MonitorMetadataService.getMetadata(monitor) | ||
metadata?.sourceToQueryIndexMapping?.forEach { (_, queryIndex) -> | ||
if (monitor.owner == "alerting") { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. plz delete based on a boolean flag that defaults to false. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fixed it based on boolean flag passed from |
||
val metadata = MonitorMetadataService.getMetadata(monitor) | ||
metadata?.sourceToQueryIndexMapping?.forEach { (_, queryIndex) -> | ||
|
||
val indicesExistsResponse: IndicesExistsResponse = | ||
client.suspendUntil { | ||
client.admin().indices().exists(IndicesExistsRequest(queryIndex), it) | ||
val indicesExistsResponse: IndicesExistsResponse = | ||
client.suspendUntil { | ||
client.admin().indices().exists(IndicesExistsRequest(queryIndex), it) | ||
} | ||
if (indicesExistsResponse.isExists == false) { | ||
return | ||
} | ||
if (indicesExistsResponse.isExists == false) { | ||
return | ||
} | ||
// Check if there's any queries from other monitors in this queryIndex, | ||
// to avoid unnecessary doc deletion, if we could just delete index completely | ||
val searchResponse: SearchResponse = client.suspendUntil { | ||
search( | ||
SearchRequest(queryIndex).source( | ||
SearchSourceBuilder() | ||
.size(0) | ||
.query( | ||
QueryBuilders.boolQuery().mustNot( | ||
QueryBuilders.matchQuery("monitor_id", monitor.id) | ||
// Check if there's any queries from other monitors in this queryIndex, | ||
// to avoid unnecessary doc deletion, if we could just delete index completely | ||
val searchResponse: SearchResponse = client.suspendUntil { | ||
search( | ||
SearchRequest(queryIndex).source( | ||
SearchSourceBuilder() | ||
.size(0) | ||
.query( | ||
QueryBuilders.boolQuery().mustNot( | ||
QueryBuilders.matchQuery("monitor_id", monitor.id) | ||
) | ||
) | ||
) | ||
).indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_HIDDEN), | ||
it | ||
) | ||
} | ||
if (searchResponse.hits.totalHits.value == 0L) { | ||
val ack: AcknowledgedResponse = client.suspendUntil { | ||
client.admin().indices().delete( | ||
DeleteIndexRequest(queryIndex).indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_HIDDEN), | ||
).indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_HIDDEN), | ||
it | ||
) | ||
} | ||
if (ack.isAcknowledged == false) { | ||
log.error("Deletion of concrete queryIndex:$queryIndex is not ack'd!") | ||
} | ||
} else { | ||
// Delete all queries added by this monitor | ||
val response: BulkByScrollResponse = suspendCoroutine { cont -> | ||
DeleteByQueryRequestBuilder(client, DeleteByQueryAction.INSTANCE) | ||
.source(queryIndex) | ||
.filter(QueryBuilders.matchQuery("monitor_id", monitor.id)) | ||
.refresh(true) | ||
.execute( | ||
object : ActionListener<BulkByScrollResponse> { | ||
override fun onResponse(response: BulkByScrollResponse) = cont.resume(response) | ||
override fun onFailure(t: Exception) = cont.resumeWithException(t) | ||
} | ||
if (searchResponse.hits.totalHits.value == 0L) { | ||
val ack: AcknowledgedResponse = client.suspendUntil { | ||
client.admin().indices().delete( | ||
DeleteIndexRequest(queryIndex).indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_HIDDEN), | ||
it | ||
) | ||
} | ||
if (ack.isAcknowledged == false) { | ||
log.error("Deletion of concrete queryIndex:$queryIndex is not ack'd!") | ||
} | ||
} else { | ||
// Delete all queries added by this monitor | ||
val response: BulkByScrollResponse = suspendCoroutine { cont -> | ||
DeleteByQueryRequestBuilder(client, DeleteByQueryAction.INSTANCE) | ||
.source(queryIndex) | ||
.filter(QueryBuilders.matchQuery("monitor_id", monitor.id)) | ||
.refresh(true) | ||
.execute( | ||
object : ActionListener<BulkByScrollResponse> { | ||
override fun onResponse(response: BulkByScrollResponse) = cont.resume(response) | ||
override fun onFailure(t: Exception) = cont.resumeWithException(t) | ||
} | ||
) | ||
} | ||
} | ||
} | ||
} else { | ||
val ack: AcknowledgedResponse = client.suspendUntil { | ||
client.admin().indices().delete( | ||
DeleteIndexRequest(monitor.dataSources.queryIndex).indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_HIDDEN), | ||
it | ||
) | ||
} | ||
if (ack.isAcknowledged == false) { | ||
log.error("Deletion of concrete queryIndex:${monitor.dataSources.queryIndex} is not ack'd!") | ||
} | ||
} | ||
} catch (e: Exception) { | ||
// we only log the error and don't fail the request because if monitor document has been deleted successfully, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -537,7 +537,8 @@ class TransportIndexMonitorAction @Inject constructor( | |
if ( | ||
request.monitor.isMonitorOfStandardType() && | ||
Monitor.MonitorType.valueOf(request.monitor.monitorType.uppercase(Locale.ROOT)) == | ||
Monitor.MonitorType.DOC_LEVEL_MONITOR | ||
Monitor.MonitorType.DOC_LEVEL_MONITOR && | ||
request.monitor.owner == "alerting" | ||
) { | ||
indexDocLevelMonitorQueries(request.monitor, indexResponse.id, metadata, request.refreshPolicy) | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. here source to query mapping is updated but in case of new change query index doesnt even get created and wrong value would come up right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the source to query mapping is dynamically updated during the first monitor run as shown in this logic. https://github.com/opensearch-project/alerting/blob/main/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt#L481 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is this method being called for the monitors with dedicated queryindex also? this code snippet will always throw exception right?
because the queryIndex is not an alias anymore? |
||
|
@@ -702,13 +703,22 @@ class TransportIndexMonitorAction @Inject constructor( | |
Monitor.MonitorType.valueOf(currentMonitor.monitorType.uppercase(Locale.ROOT)) == Monitor.MonitorType.DOC_LEVEL_MONITOR | ||
) { | ||
updatedMetadata = MonitorMetadataService.recreateRunContext(metadata, currentMonitor) | ||
client.suspendUntil<Client, BulkByScrollResponse> { | ||
DeleteByQueryRequestBuilder(client, DeleteByQueryAction.INSTANCE) | ||
.source(currentMonitor.dataSources.queryIndex) | ||
.filter(QueryBuilders.matchQuery("monitor_id", currentMonitor.id)) | ||
.execute(it) | ||
if (docLevelMonitorQueries.docLevelQueryIndexExists(currentMonitor.dataSources)) { | ||
client.suspendUntil<Client, BulkByScrollResponse> { | ||
DeleteByQueryRequestBuilder(client, DeleteByQueryAction.INSTANCE) | ||
.source(currentMonitor.dataSources.queryIndex) | ||
.filter(QueryBuilders.matchQuery("monitor_id", currentMonitor.id)) | ||
.execute(it) | ||
} | ||
} | ||
if (currentMonitor.owner == "alerting") { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. refactor to use flag There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fixed it. |
||
indexDocLevelMonitorQueries( | ||
request.monitor, | ||
currentMonitor.id, | ||
updatedMetadata, | ||
request.refreshPolicy | ||
) | ||
} | ||
indexDocLevelMonitorQueries(request.monitor, currentMonitor.id, updatedMetadata, request.refreshPolicy) | ||
MonitorMetadataService.upsertMetadata(updatedMetadata, updating = true) | ||
} | ||
actionListener.onResponse( | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,6 +24,7 @@ import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest | |
import org.opensearch.action.bulk.BulkRequest | ||
import org.opensearch.action.bulk.BulkResponse | ||
import org.opensearch.action.index.IndexRequest | ||
import org.opensearch.action.support.IndicesOptions | ||
import org.opensearch.action.support.WriteRequest.RefreshPolicy | ||
import org.opensearch.action.support.master.AcknowledgedResponse | ||
import org.opensearch.alerting.MonitorRunnerService.monitorCtx | ||
|
@@ -181,6 +182,16 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ | |
} | ||
} | ||
|
||
suspend fun deleteDocLevelQueryIndex(dataSources: DataSources): Boolean { | ||
val ack: AcknowledgedResponse = client.suspendUntil { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we do exists check before deleting There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fixed it. |
||
client.admin().indices().delete( | ||
DeleteIndexRequest(dataSources.queryIndex).indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_HIDDEN), | ||
it | ||
) | ||
} | ||
return ack.isAcknowledged | ||
} | ||
|
||
fun docLevelQueryIndexExists(dataSources: DataSources): Boolean { | ||
val clusterState = clusterService.state() | ||
return clusterState.metadata.hasAlias(dataSources.queryIndex) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1195,7 +1195,8 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() { | |
dataSources = DataSources( | ||
queryIndex = customQueryIndex, | ||
queryIndexMappingsByType = mapOf(Pair("text", mapOf(Pair("analyzer", analyzer)))), | ||
) | ||
), | ||
owner = "alerting" | ||
) | ||
try { | ||
createMonitor(monitor) | ||
|
@@ -2379,7 +2380,9 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() { | |
val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) | ||
var monitor = randomDocumentLevelMonitor( | ||
inputs = listOf(docLevelInput), | ||
triggers = listOf(trigger) | ||
triggers = listOf(trigger), | ||
dataSources = DataSources(), | ||
owner = "alerting" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. plz add 2 new tests-
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. in first test plz also update monitor with flag false to true and do rollover again and this time assert monitor execution succeeeds |
||
) | ||
// This doc should create close to 10000 (limit) fields in index mapping. It's easier to add mappings like this then via api | ||
val docPayload: StringBuilder = StringBuilder(100000) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
plz add a flag in monitor creation called
deleteQueryIndexBeforeEveryRun
(or something briefer)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added as part of pr opensearch-project/common-utils#734