Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

log error messages and clean up monitor when indexing doc level queri… #1478

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import org.opensearch.commons.alerting.model.action.PerAlertActionScope
import org.opensearch.commons.alerting.util.string
import org.opensearch.core.xcontent.ToXContent
import org.opensearch.core.xcontent.XContentBuilder
import org.opensearch.index.IndexNotFoundException
import org.opensearch.index.query.BoolQueryBuilder
import org.opensearch.index.query.Operator
import org.opensearch.index.query.QueryBuilders
Expand Down Expand Up @@ -114,11 +115,15 @@ object DocumentLevelMonitorRunner : MonitorRunner() {

try {
// Resolve all passed indices to concrete indices
val indices = IndexUtils.resolveAllIndices(
val allConcreteIndices = IndexUtils.resolveAllIndices(
docLevelMonitorInput.indices,
monitorCtx.clusterService!!,
monitorCtx.indexNameExpressionResolver!!
)
if (allConcreteIndices.isEmpty()) {
logger.error("indices not found-${docLevelMonitorInput.indices.joinToString(",")}")
throw IndexNotFoundException(docLevelMonitorInput.indices.joinToString(","))
}

monitorCtx.docLevelMonitorQueries!!.initDocLevelQueryIndex(monitor.dataSources)
monitorCtx.docLevelMonitorQueries!!.indexDocLevelQueries(
Expand All @@ -130,64 +135,112 @@ object DocumentLevelMonitorRunner : MonitorRunner() {

// cleanup old indices that are not monitored anymore from the same monitor
for (ind in updatedLastRunContext.keys) {
if (!indices.contains(ind)) {
if (!allConcreteIndices.contains(ind)) {
updatedLastRunContext.remove(ind)
}
}

indices.forEach { indexName ->
// Prepare lastRunContext for each index
val indexLastRunContext = lastRunContext.getOrPut(indexName) {
val isIndexCreatedRecently = createdRecently(
monitor,
periodStart,
periodEnd,
monitorCtx.clusterService!!.state().metadata.index(indexName)
)
MonitorMetadataService.createRunContextForIndex(indexName, isIndexCreatedRecently)
docLevelMonitorInput.indices.forEach { indexName ->
var concreteIndices = IndexUtils.resolveAllIndices(
listOf(indexName),
monitorCtx.clusterService!!,
monitorCtx.indexNameExpressionResolver!!
)
var lastWriteIndex: String? = null
if (IndexUtils.isAlias(indexName, monitorCtx.clusterService!!.state()) ||
IndexUtils.isDataStream(indexName, monitorCtx.clusterService!!.state())
) {
lastWriteIndex = concreteIndices.find { lastRunContext.containsKey(it) }
if (lastWriteIndex != null) {
val lastWriteIndexCreationDate =
IndexUtils.getCreationDateForIndex(lastWriteIndex, monitorCtx.clusterService!!.state())
concreteIndices = IndexUtils.getNewestIndicesByCreationDate(
concreteIndices,
monitorCtx.clusterService!!.state(),
lastWriteIndexCreationDate
)
}
}
val updatedIndexName = indexName.replace("*", "_")
val conflictingFields = monitorCtx.docLevelMonitorQueries!!.getAllConflictingFields(
monitorCtx.clusterService!!.state(),
concreteIndices
)

// Prepare updatedLastRunContext for each index
val indexUpdatedRunContext = updateLastRunContext(
indexLastRunContext.toMutableMap(),
monitorCtx,
indexName
) as MutableMap<String, Any>
updatedLastRunContext[indexName] = indexUpdatedRunContext

val count: Int = indexLastRunContext["shards_count"] as Int
for (i: Int in 0 until count) {
val shard = i.toString()

// update lastRunContext if its a temp monitor as we only want to view the last bit of data then
// TODO: If dryrun, we should make it so we limit the search as this could still potentially give us lots of data
if (isTempMonitor) {
indexLastRunContext[shard] = max(-1, (indexUpdatedRunContext[shard] as String).toInt() - 10)
concreteIndices.forEach { concreteIndexName ->
// Prepare lastRunContext for each index
val indexLastRunContext = lastRunContext.getOrPut(concreteIndexName) {
val isIndexCreatedRecently = createdRecently(
monitor,
periodStart,
periodEnd,
monitorCtx.clusterService!!.state().metadata.index(concreteIndexName)
)
MonitorMetadataService.createRunContextForIndex(concreteIndexName, isIndexCreatedRecently)
}
}

// Prepare DocumentExecutionContext for each index
val docExecutionContext = DocumentExecutionContext(queries, indexLastRunContext, indexUpdatedRunContext)
// Prepare updatedLastRunContext for each index
val indexUpdatedRunContext = updateLastRunContext(
indexLastRunContext.toMutableMap(),
monitorCtx,
concreteIndexName
) as MutableMap<String, Any>

if (IndexUtils.isAlias(indexName, monitorCtx.clusterService!!.state()) ||
IndexUtils.isDataStream(indexName, monitorCtx.clusterService!!.state())
) {
if (concreteIndexName == IndexUtils.getWriteIndex(indexName, monitorCtx.clusterService!!.state())) {
updatedLastRunContext.remove(lastWriteIndex)
updatedLastRunContext[concreteIndexName] = indexUpdatedRunContext
}
} else {
updatedLastRunContext[concreteIndexName] = indexUpdatedRunContext
}

val matchingDocs = getMatchingDocs(monitor, monitorCtx, docExecutionContext, indexName)
val count: Int = indexLastRunContext["shards_count"] as Int
for (i: Int in 0 until count) {
val shard = i.toString()

if (matchingDocs.isNotEmpty()) {
val matchedQueriesForDocs = getMatchedQueries(
monitorCtx,
matchingDocs.map { it.second },
// update lastRunContext if its a temp monitor as we only want to view the last bit of data then
// TODO: If dryrun, we should make it so we limit the search as this could still potentially give us lots of data
if (isTempMonitor) {
indexLastRunContext[shard] = max(-1, (indexUpdatedRunContext[shard] as String).toInt() - 10)
}
}

// Prepare DocumentExecutionContext for each index
val docExecutionContext = DocumentExecutionContext(queries, indexLastRunContext, indexUpdatedRunContext)

val matchingDocs = getMatchingDocs(
monitor,
monitorMetadata,
indexName
monitorCtx,
docExecutionContext,
updatedIndexName,
concreteIndexName,
conflictingFields.toList()
)

matchedQueriesForDocs.forEach { hit ->
val id = hit.id.replace("_${indexName}_${monitor.id}", "")

val docIndices = hit.field("_percolator_document_slot").values.map { it.toString().toInt() }
docIndices.forEach { idx ->
val docIndex = "${matchingDocs[idx].first}|$indexName"
inputRunResults.getOrPut(id) { mutableSetOf() }.add(docIndex)
docsToQueries.getOrPut(docIndex) { mutableListOf() }.add(id)
if (matchingDocs.isNotEmpty()) {
val matchedQueriesForDocs = getMatchedQueries(
monitorCtx,
matchingDocs.map { it.second },
monitor,
monitorMetadata,
updatedIndexName,
concreteIndexName
)

matchedQueriesForDocs.forEach { hit ->
val id = hit.id
.replace("_${updatedIndexName}_${monitor.id}", "")
.replace("_${concreteIndexName}_${monitor.id}", "")

val docIndices = hit.field("_percolator_document_slot").values.map { it.toString().toInt() }
docIndices.forEach { idx ->
val docIndex = "${matchingDocs[idx].first}|$concreteIndexName"
inputRunResults.getOrPut(id) { mutableSetOf() }.add(docIndex)
docsToQueries.getOrPut(docIndex) { mutableListOf() }.add(id)
}
}
}
}
Expand Down Expand Up @@ -498,8 +551,9 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
if (response.status() !== RestStatus.OK) {
throw IOException("Failed to get max seq no for shard: $shard")
}
if (response.hits.hits.isEmpty())
if (response.hits.hits.isEmpty()) {
return -1L
}

return response.hits.hits[0].seqNo
}
Expand All @@ -513,7 +567,9 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
monitor: Monitor,
monitorCtx: MonitorRunnerExecutionContext,
docExecutionCtx: DocumentExecutionContext,
index: String
index: String,
concreteIndex: String,
conflictingFields: List<String>
): List<Pair<String, BytesReference>> {
val count: Int = docExecutionCtx.updatedLastRunContext["shards_count"] as Int
val matchingDocs = mutableListOf<Pair<String, BytesReference>>()
Expand All @@ -525,15 +581,15 @@ object DocumentLevelMonitorRunner : MonitorRunner() {

val hits: SearchHits = searchShard(
monitorCtx,
index,
concreteIndex,
shard,
prevSeqNo,
maxSeqNo,
null
)

if (hits.hits.isNotEmpty()) {
matchingDocs.addAll(getAllDocs(hits, index, monitor.id))
matchingDocs.addAll(getAllDocs(hits, index, concreteIndex, monitor.id, conflictingFields))
}
} catch (e: Exception) {
logger.warn("Failed to run for shard $shard. Error: ${e.message}")
Expand Down Expand Up @@ -581,7 +637,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
docs: List<BytesReference>,
monitor: Monitor,
monitorMetadata: MonitorMetadata,
index: String
index: String,
concreteIndex: String
): SearchHits {
val boolQueryBuilder = BoolQueryBuilder().must(QueryBuilders.matchQuery("index", index).operator(Operator.AND))

Expand All @@ -594,7 +651,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
val queryIndex = monitorMetadata.sourceToQueryIndexMapping[index + monitor.id]
if (queryIndex == null) {
val message = "Failed to resolve concrete queryIndex from sourceIndex during monitor execution!" +
" sourceIndex:$index queryIndex:${monitor.dataSources.queryIndex}"
" sourceIndex:$concreteIndex queryIndex:${monitor.dataSources.queryIndex}"
logger.error(message)
throw AlertingException.wrap(
OpenSearchStatusException(message, RestStatus.INTERNAL_SERVER_ERROR)
Expand Down Expand Up @@ -622,11 +679,23 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
return response.hits
}

private fun getAllDocs(hits: SearchHits, index: String, monitorId: String): List<Pair<String, BytesReference>> {
private fun getAllDocs(
hits: SearchHits,
index: String,
concreteIndex: String,
monitorId: String,
conflictingFields: List<String>
): List<Pair<String, BytesReference>> {
return hits.map { hit ->
val sourceMap = hit.sourceAsMap

transformDocumentFieldNames(sourceMap, "_${index}_$monitorId")
transformDocumentFieldNames(
sourceMap,
conflictingFields,
"_${index}_$monitorId",
"_${concreteIndex}_$monitorId",
""
)

var xContentBuilder = XContentFactory.jsonBuilder().map(sourceMap)

Expand All @@ -639,7 +708,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
}

/**
* Traverses document fields in leaves recursively and appends [fieldNameSuffix] to field names.
* Traverses document fields in leaves recursively and appends [fieldNameSuffixIndex] to field names with same names
* but different mappings & [fieldNameSuffixPattern] to field names which have unique names.
*
* Example for index name is my_log_index and Monitor ID is TReewWdsf2gdJFV:
* { {
Expand All @@ -653,17 +723,36 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
*/
private fun transformDocumentFieldNames(
jsonAsMap: MutableMap<String, Any>,
fieldNameSuffix: String
conflictingFields: List<String>,
fieldNameSuffixPattern: String,
fieldNameSuffixIndex: String,
fieldNamePrefix: String
) {
val tempMap = mutableMapOf<String, Any>()
val it: MutableIterator<Map.Entry<String, Any>> = jsonAsMap.entries.iterator()
while (it.hasNext()) {
val entry = it.next()
if (entry.value is Map<*, *>) {
transformDocumentFieldNames(entry.value as MutableMap<String, Any>, fieldNameSuffix)
} else if (entry.key.endsWith(fieldNameSuffix) == false) {
tempMap["${entry.key}$fieldNameSuffix"] = entry.value
it.remove()
transformDocumentFieldNames(
entry.value as MutableMap<String, Any>,
conflictingFields,
fieldNameSuffixPattern,
fieldNameSuffixIndex,
if (fieldNamePrefix == "") entry.key else "$fieldNamePrefix.${entry.key}"
)
} else if (!entry.key.endsWith(fieldNameSuffixPattern) && !entry.key.endsWith(fieldNameSuffixIndex)) {
var alreadyReplaced = false
conflictingFields.forEach { conflictingField ->
if (conflictingField == "$fieldNamePrefix.${entry.key}" || (fieldNamePrefix == "" && conflictingField == entry.key)) {
tempMap["${entry.key}$fieldNameSuffixIndex"] = entry.value
it.remove()
alreadyReplaced = true
}
}
if (!alreadyReplaced) {
tempMap["${entry.key}$fieldNameSuffixPattern"] = entry.value
it.remove()
}
}
}
jsonAsMap.putAll(tempMap)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.opensearch.alerting.model.MonitorMetadata
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.settings.AlertingSettings
import org.opensearch.alerting.util.AlertingException
import org.opensearch.alerting.util.IndexUtils
import org.opensearch.client.Client
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.settings.Settings
Expand Down Expand Up @@ -206,11 +207,18 @@ object MonitorMetadataService :
val lastRunContext = existingRunContext?.toMutableMap() ?: mutableMapOf()
try {
if (index == null) return mutableMapOf()
val getIndexRequest = GetIndexRequest().indices(index)
val getIndexResponse: GetIndexResponse = client.suspendUntil {
client.admin().indices().getIndex(getIndexRequest, it)
val indices = mutableListOf<String>()
if (IndexUtils.isAlias(index, clusterService.state()) ||
IndexUtils.isDataStream(index, clusterService.state())
) {
IndexUtils.getWriteIndex(index, clusterService.state())?.let { indices.add(it) }
} else {
val getIndexRequest = GetIndexRequest().indices(index)
val getIndexResponse: GetIndexResponse = client.suspendUntil {
client.admin().indices().getIndex(getIndexRequest, it)
}
indices.addAll(getIndexResponse.indices())
}
val indices = getIndexResponse.indices()

indices.forEach { indexName ->
if (!lastRunContext.containsKey(indexName)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,8 @@ class TransportDeleteMonitorAction @Inject constructor(
monitorId: String,
): DeleteResponse {
val deleteResponse = deleteMonitorDocument(client, deleteRequest)
deleteDocLevelMonitorQueriesAndIndices(client, monitor, monitorId)
deleteMetadata(client, monitor)
deleteDocLevelMonitorQueriesAndIndices(client, monitor, monitorId)
return deleteResponse
}

Expand Down
Loading
Loading