Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimize doc-level monitor workflow for index patterns #1097

Merged
merged 1 commit into from
Sep 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import org.opensearch.core.common.bytes.BytesReference
import org.opensearch.core.rest.RestStatus
import org.opensearch.core.xcontent.ToXContent
import org.opensearch.core.xcontent.XContentBuilder
import org.opensearch.index.IndexNotFoundException
import org.opensearch.index.query.BoolQueryBuilder
import org.opensearch.index.query.Operator
import org.opensearch.index.query.QueryBuilders
Expand Down Expand Up @@ -118,11 +119,15 @@ object DocumentLevelMonitorRunner : MonitorRunner() {

try {
// Resolve all passed indices to concrete indices
val indices = IndexUtils.resolveAllIndices(
val concreteIndices = IndexUtils.resolveAllIndices(
docLevelMonitorInput.indices,
monitorCtx.clusterService!!,
monitorCtx.indexNameExpressionResolver!!
)
if (concreteIndices.isEmpty()) {
logger.error("indices not found-${docLevelMonitorInput.indices.joinToString(",")}")
throw IndexNotFoundException(docLevelMonitorInput.indices.joinToString(","))
}

monitorCtx.docLevelMonitorQueries!!.initDocLevelQueryIndex(monitor.dataSources)
monitorCtx.docLevelMonitorQueries!!.indexDocLevelQueries(
Expand All @@ -134,73 +139,91 @@ object DocumentLevelMonitorRunner : MonitorRunner() {

// cleanup old indices that are not monitored anymore from the same monitor
for (ind in updatedLastRunContext.keys) {
if (!indices.contains(ind)) {
if (!concreteIndices.contains(ind)) {
updatedLastRunContext.remove(ind)
}
}

// Map of document ids per index when monitor is workflow delegate and has chained findings
val matchingDocIdsPerIndex = workflowRunContext?.matchingDocIdsPerIndex

indices.forEach { indexName ->
// Prepare lastRunContext for each index
val indexLastRunContext = lastRunContext.getOrPut(indexName) {
val isIndexCreatedRecently = createdRecently(
monitor,
periodStart,
periodEnd,
monitorCtx.clusterService!!.state().metadata.index(indexName)
)
MonitorMetadataService.createRunContextForIndex(indexName, isIndexCreatedRecently)
}
docLevelMonitorInput.indices.forEach { indexName ->
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why did we change this list being looped from computed concrete indices list to the monitor indices list?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is because we want to expand 1 index pattern & process all its concrete indices separately. Previous logic was we expand all index patterns & process all the concrete indices one by one.

val concreteIndices = IndexUtils.resolveAllIndices(
listOf(indexName),
monitorCtx.clusterService!!,
monitorCtx.indexNameExpressionResolver!!
)
val updatedIndexName = indexName.replace("*", "_")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we do this indexName.replace("*", "_")?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is because query string query fields with * have a different meaning & is also not allowed in some cases.

val conflictingFields = monitorCtx.docLevelMonitorQueries!!.getAllConflictingFields(
monitorCtx.clusterService!!.state(),
concreteIndices
)

// Prepare updatedLastRunContext for each index
val indexUpdatedRunContext = updateLastRunContext(
indexLastRunContext.toMutableMap(),
monitorCtx,
indexName
) as MutableMap<String, Any>
updatedLastRunContext[indexName] = indexUpdatedRunContext

val count: Int = indexLastRunContext["shards_count"] as Int
for (i: Int in 0 until count) {
val shard = i.toString()

// update lastRunContext if its a temp monitor as we only want to view the last bit of data then
// TODO: If dryrun, we should make it so we limit the search as this could still potentially give us lots of data
if (isTempMonitor) {
indexLastRunContext[shard] = max(-1, (indexUpdatedRunContext[shard] as String).toInt() - 10)
concreteIndices.forEach { concreteIndexName ->
// Prepare lastRunContext for each index
val indexLastRunContext = lastRunContext.getOrPut(concreteIndexName) {
val isIndexCreatedRecently = createdRecently(
monitor,
periodStart,
periodEnd,
monitorCtx.clusterService!!.state().metadata.index(concreteIndexName)
)
MonitorMetadataService.createRunContextForIndex(concreteIndexName, isIndexCreatedRecently)
}
}

// Prepare DocumentExecutionContext for each index
val docExecutionContext = DocumentExecutionContext(queries, indexLastRunContext, indexUpdatedRunContext)
// Prepare updatedLastRunContext for each index
val indexUpdatedRunContext = updateLastRunContext(
indexLastRunContext.toMutableMap(),
monitorCtx,
concreteIndexName
) as MutableMap<String, Any>
updatedLastRunContext[concreteIndexName] = indexUpdatedRunContext

val count: Int = indexLastRunContext["shards_count"] as Int
for (i: Int in 0 until count) {
val shard = i.toString()

// update lastRunContext if its a temp monitor as we only want to view the last bit of data then
// TODO: If dryrun, we should make it so we limit the search as this could still potentially give us lots of data
if (isTempMonitor) {
indexLastRunContext[shard] = max(-1, (indexUpdatedRunContext[shard] as String).toInt() - 10)
}
}

val matchingDocs = getMatchingDocs(
monitor,
monitorCtx,
docExecutionContext,
indexName,
matchingDocIdsPerIndex?.get(indexName)
)
// Prepare DocumentExecutionContext for each index
val docExecutionContext = DocumentExecutionContext(queries, indexLastRunContext, indexUpdatedRunContext)

if (matchingDocs.isNotEmpty()) {
val matchedQueriesForDocs = getMatchedQueries(
monitorCtx,
matchingDocs.map { it.second },
val matchingDocs = getMatchingDocs(
monitor,
monitorMetadata,
indexName
monitorCtx,
docExecutionContext,
updatedIndexName,
concreteIndexName,
conflictingFields.toList(),
matchingDocIdsPerIndex?.get(concreteIndexName)
)

matchedQueriesForDocs.forEach { hit ->
val id = hit.id.replace("_${indexName}_${monitor.id}", "")

val docIndices = hit.field("_percolator_document_slot").values.map { it.toString().toInt() }
docIndices.forEach { idx ->
val docIndex = "${matchingDocs[idx].first}|$indexName"
inputRunResults.getOrPut(id) { mutableSetOf() }.add(docIndex)
docsToQueries.getOrPut(docIndex) { mutableListOf() }.add(id)
if (matchingDocs.isNotEmpty()) {
val matchedQueriesForDocs = getMatchedQueries(
monitorCtx,
matchingDocs.map { it.second },
monitor,
monitorMetadata,
updatedIndexName,
concreteIndexName
)

matchedQueriesForDocs.forEach { hit ->
val id = hit.id
.replace("_${updatedIndexName}_${monitor.id}", "")
.replace("_${concreteIndexName}_${monitor.id}", "")

val docIndices = hit.field("_percolator_document_slot").values.map { it.toString().toInt() }
docIndices.forEach { idx ->
val docIndex = "${matchingDocs[idx].first}|$concreteIndexName"
inputRunResults.getOrPut(id) { mutableSetOf() }.add(docIndex)
docsToQueries.getOrPut(docIndex) { mutableListOf() }.add(id)
}
}
}
}
Expand Down Expand Up @@ -554,6 +577,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
monitorCtx: MonitorRunnerExecutionContext,
docExecutionCtx: DocumentExecutionContext,
index: String,
concreteIndex: String,
conflictingFields: List<String>,
docIds: List<String>? = null
): List<Pair<String, BytesReference>> {
val count: Int = docExecutionCtx.updatedLastRunContext["shards_count"] as Int
Expand All @@ -566,7 +591,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {

val hits: SearchHits = searchShard(
monitorCtx,
index,
concreteIndex,
shard,
prevSeqNo,
maxSeqNo,
Expand All @@ -575,7 +600,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
)

if (hits.hits.isNotEmpty()) {
matchingDocs.addAll(getAllDocs(hits, index, monitor.id))
matchingDocs.addAll(getAllDocs(hits, index, concreteIndex, monitor.id, conflictingFields))
}
} catch (e: Exception) {
logger.warn("Failed to run for shard $shard. Error: ${e.message}")
Expand Down Expand Up @@ -628,7 +653,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
docs: List<BytesReference>,
monitor: Monitor,
monitorMetadata: MonitorMetadata,
index: String
index: String,
concreteIndex: String
): SearchHits {
val boolQueryBuilder = BoolQueryBuilder().must(QueryBuilders.matchQuery("index", index).operator(Operator.AND))

Expand All @@ -641,7 +667,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
val queryIndex = monitorMetadata.sourceToQueryIndexMapping[index + monitor.id]
if (queryIndex == null) {
val message = "Failed to resolve concrete queryIndex from sourceIndex during monitor execution!" +
" sourceIndex:$index queryIndex:${monitor.dataSources.queryIndex}"
" sourceIndex:$concreteIndex queryIndex:${monitor.dataSources.queryIndex}"
logger.error(message)
throw AlertingException.wrap(
OpenSearchStatusException(message, RestStatus.INTERNAL_SERVER_ERROR)
Expand Down Expand Up @@ -669,11 +695,23 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
return response.hits
}

private fun getAllDocs(hits: SearchHits, index: String, monitorId: String): List<Pair<String, BytesReference>> {
private fun getAllDocs(
hits: SearchHits,
index: String,
concreteIndex: String,
monitorId: String,
conflictingFields: List<String>
): List<Pair<String, BytesReference>> {
return hits.map { hit ->
val sourceMap = hit.sourceAsMap

transformDocumentFieldNames(sourceMap, "_${index}_$monitorId")
transformDocumentFieldNames(
sourceMap,
conflictingFields,
"_${index}_$monitorId",
"_${concreteIndex}_$monitorId",
""
)

var xContentBuilder = XContentFactory.jsonBuilder().map(sourceMap)

Expand All @@ -686,7 +724,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
}

/**
* Traverses document fields in leaves recursively and appends [fieldNameSuffix] to field names.
* Traverses document fields in leaves recursively and appends [fieldNameSuffixIndex] to field names with same names
* but different mappings & [fieldNameSuffixPattern] to field names which have unique names.
*
* Example for index name is my_log_index and Monitor ID is TReewWdsf2gdJFV:
* { {
Expand All @@ -700,17 +739,36 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
*/
private fun transformDocumentFieldNames(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a very critical piece fo code. can we update java docs description of this method incorporating the new change?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated the documentation for the method.

jsonAsMap: MutableMap<String, Any>,
fieldNameSuffix: String
conflictingFields: List<String>,
fieldNameSuffixPattern: String,
fieldNameSuffixIndex: String,
fieldNamePrefix: String
) {
val tempMap = mutableMapOf<String, Any>()
val it: MutableIterator<Map.Entry<String, Any>> = jsonAsMap.entries.iterator()
while (it.hasNext()) {
val entry = it.next()
if (entry.value is Map<*, *>) {
transformDocumentFieldNames(entry.value as MutableMap<String, Any>, fieldNameSuffix)
} else if (entry.key.endsWith(fieldNameSuffix) == false) {
tempMap["${entry.key}$fieldNameSuffix"] = entry.value
it.remove()
transformDocumentFieldNames(
entry.value as MutableMap<String, Any>,
conflictingFields,
fieldNameSuffixPattern,
fieldNameSuffixIndex,
if (fieldNamePrefix == "") entry.key else "$fieldNamePrefix.${entry.key}"
)
} else if (!entry.key.endsWith(fieldNameSuffixPattern) && !entry.key.endsWith(fieldNameSuffixIndex)) {
var alreadyReplaced = false
conflictingFields.forEach { conflictingField ->
if (conflictingField == "$fieldNamePrefix.${entry.key}" || (fieldNamePrefix == "" && conflictingField == entry.key)) {
tempMap["${entry.key}$fieldNameSuffixIndex"] = entry.value
it.remove()
alreadyReplaced = true
}
}
if (!alreadyReplaced) {
tempMap["${entry.key}$fieldNameSuffixPattern"] = entry.value
it.remove()
}
}
}
jsonAsMap.putAll(tempMap)
Expand Down
Loading
Loading