Skip to content

Commit

Permalink
Backport bug fixes to 2.9 (#1265)
Browse files Browse the repository at this point in the history
* fix workflow execution for first run (#1227)

Signed-off-by: Subhobrata Dey <sbcd90@gmail.com>

* optimize doc-level monitor workflow for index patterns (#1122)

Signed-off-by: Subhobrata Dey <sbcd90@gmail.com>

* [Backport 2.x] fix for concurrentmodificationexception with linkedhashmap (#1259)

Signed-off-by: Subhobrata Dey <sbcd90@gmail.com>

* ignore bwc test

Signed-off-by: Surya Sashank Nistala <snistala@amazon.com>

* replace ignore annotation with awaitsFix annotation

Signed-off-by: Surya Sashank Nistala <snistala@amazon.com>

---------

Signed-off-by: Subhobrata Dey <sbcd90@gmail.com>
Signed-off-by: Surya Sashank Nistala <snistala@amazon.com>
Co-authored-by: Subhobrata Dey <sbcd90@gmail.com>
Co-authored-by: opensearch-trigger-bot[bot] <98922864+opensearch-trigger-bot[bot]@users.noreply.github.com>
  • Loading branch information
3 people authored Oct 12, 2023
1 parent d0fcce4 commit 0269dd1
Show file tree
Hide file tree
Showing 9 changed files with 1,030 additions and 130 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import org.opensearch.commons.alerting.model.action.PerAlertActionScope
import org.opensearch.commons.alerting.util.string
import org.opensearch.core.xcontent.ToXContent
import org.opensearch.core.xcontent.XContentBuilder
import org.opensearch.index.IndexNotFoundException
import org.opensearch.index.query.BoolQueryBuilder
import org.opensearch.index.query.Operator
import org.opensearch.index.query.QueryBuilders
Expand Down Expand Up @@ -118,11 +119,15 @@ object DocumentLevelMonitorRunner : MonitorRunner() {

try {
// Resolve all passed indices to concrete indices
val indices = IndexUtils.resolveAllIndices(
val concreteIndices = IndexUtils.resolveAllIndices(
docLevelMonitorInput.indices,
monitorCtx.clusterService!!,
monitorCtx.indexNameExpressionResolver!!
)
if (concreteIndices.isEmpty()) {
logger.error("indices not found-${docLevelMonitorInput.indices.joinToString(",")}")
throw IndexNotFoundException(docLevelMonitorInput.indices.joinToString(","))
}

monitorCtx.docLevelMonitorQueries!!.initDocLevelQueryIndex(monitor.dataSources)
monitorCtx.docLevelMonitorQueries!!.indexDocLevelQueries(
Expand All @@ -133,74 +138,93 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
)

// cleanup old indices that are not monitored anymore from the same monitor
for (ind in updatedLastRunContext.keys) {
if (!indices.contains(ind)) {
val runContextKeys = updatedLastRunContext.keys.toMutableSet()
for (ind in runContextKeys) {
if (!concreteIndices.contains(ind)) {
updatedLastRunContext.remove(ind)
}
}

// Map of document ids per index when monitor is workflow delegate and has chained findings
val matchingDocIdsPerIndex = workflowRunContext?.matchingDocIdsPerIndex

indices.forEach { indexName ->
// Prepare lastRunContext for each index
val indexLastRunContext = lastRunContext.getOrPut(indexName) {
val isIndexCreatedRecently = createdRecently(
monitor,
periodStart,
periodEnd,
monitorCtx.clusterService!!.state().metadata.index(indexName)
)
MonitorMetadataService.createRunContextForIndex(indexName, isIndexCreatedRecently)
}
docLevelMonitorInput.indices.forEach { indexName ->
val concreteIndices = IndexUtils.resolveAllIndices(
listOf(indexName),
monitorCtx.clusterService!!,
monitorCtx.indexNameExpressionResolver!!
)
val updatedIndexName = indexName.replace("*", "_")
val conflictingFields = monitorCtx.docLevelMonitorQueries!!.getAllConflictingFields(
monitorCtx.clusterService!!.state(),
concreteIndices
)

// Prepare updatedLastRunContext for each index
val indexUpdatedRunContext = updateLastRunContext(
indexLastRunContext.toMutableMap(),
monitorCtx,
indexName
) as MutableMap<String, Any>
updatedLastRunContext[indexName] = indexUpdatedRunContext

val count: Int = indexLastRunContext["shards_count"] as Int
for (i: Int in 0 until count) {
val shard = i.toString()

// update lastRunContext if its a temp monitor as we only want to view the last bit of data then
// TODO: If dryrun, we should make it so we limit the search as this could still potentially give us lots of data
if (isTempMonitor) {
indexLastRunContext[shard] = max(-1, (indexUpdatedRunContext[shard] as String).toInt() - 10)
concreteIndices.forEach { concreteIndexName ->
// Prepare lastRunContext for each index
val indexLastRunContext = lastRunContext.getOrPut(concreteIndexName) {
val isIndexCreatedRecently = createdRecently(
monitor,
periodStart,
periodEnd,
monitorCtx.clusterService!!.state().metadata.index(concreteIndexName)
)
MonitorMetadataService.createRunContextForIndex(concreteIndexName, isIndexCreatedRecently)
}
}

// Prepare DocumentExecutionContext for each index
val docExecutionContext = DocumentExecutionContext(queries, indexLastRunContext, indexUpdatedRunContext)
// Prepare updatedLastRunContext for each index
val indexUpdatedRunContext = updateLastRunContext(
indexLastRunContext.toMutableMap(),
monitorCtx,
concreteIndexName
) as MutableMap<String, Any>
updatedLastRunContext[concreteIndexName] = indexUpdatedRunContext

val count: Int = indexLastRunContext["shards_count"] as Int
for (i: Int in 0 until count) {
val shard = i.toString()

// update lastRunContext if its a temp monitor as we only want to view the last bit of data then
// TODO: If dryrun, we should make it so we limit the search as this could still potentially give us lots of data
if (isTempMonitor) {
indexLastRunContext[shard] = max(-1, (indexUpdatedRunContext[shard] as String).toInt() - 10)
}
}

val matchingDocs = getMatchingDocs(
monitor,
monitorCtx,
docExecutionContext,
indexName,
matchingDocIdsPerIndex?.get(indexName)
)
// Prepare DocumentExecutionContext for each index
val docExecutionContext = DocumentExecutionContext(queries, indexLastRunContext, indexUpdatedRunContext)

if (matchingDocs.isNotEmpty()) {
val matchedQueriesForDocs = getMatchedQueries(
monitorCtx,
matchingDocs.map { it.second },
val matchingDocs = getMatchingDocs(
monitor,
monitorMetadata,
indexName
monitorCtx,
docExecutionContext,
updatedIndexName,
concreteIndexName,
conflictingFields.toList(),
matchingDocIdsPerIndex?.get(concreteIndexName)
)

matchedQueriesForDocs.forEach { hit ->
val id = hit.id.replace("_${indexName}_${monitor.id}", "")

val docIndices = hit.field("_percolator_document_slot").values.map { it.toString().toInt() }
docIndices.forEach { idx ->
val docIndex = "${matchingDocs[idx].first}|$indexName"
inputRunResults.getOrPut(id) { mutableSetOf() }.add(docIndex)
docsToQueries.getOrPut(docIndex) { mutableListOf() }.add(id)
if (matchingDocs.isNotEmpty()) {
val matchedQueriesForDocs = getMatchedQueries(
monitorCtx,
matchingDocs.map { it.second },
monitor,
monitorMetadata,
updatedIndexName,
concreteIndexName
)

matchedQueriesForDocs.forEach { hit ->
val id = hit.id
.replace("_${updatedIndexName}_${monitor.id}", "")
.replace("_${concreteIndexName}_${monitor.id}", "")

val docIndices = hit.field("_percolator_document_slot").values.map { it.toString().toInt() }
docIndices.forEach { idx ->
val docIndex = "${matchingDocs[idx].first}|$concreteIndexName"
inputRunResults.getOrPut(id) { mutableSetOf() }.add(docIndex)
docsToQueries.getOrPut(docIndex) { mutableListOf() }.add(id)
}
}
}
}
Expand Down Expand Up @@ -555,6 +579,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
monitorCtx: MonitorRunnerExecutionContext,
docExecutionCtx: DocumentExecutionContext,
index: String,
concreteIndex: String,
conflictingFields: List<String>,
docIds: List<String>? = null
): List<Pair<String, BytesReference>> {
val count: Int = docExecutionCtx.updatedLastRunContext["shards_count"] as Int
Expand All @@ -567,7 +593,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {

val hits: SearchHits = searchShard(
monitorCtx,
index,
concreteIndex,
shard,
prevSeqNo,
maxSeqNo,
Expand All @@ -576,7 +602,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
)

if (hits.hits.isNotEmpty()) {
matchingDocs.addAll(getAllDocs(hits, index, monitor.id))
matchingDocs.addAll(getAllDocs(hits, index, concreteIndex, monitor.id, conflictingFields))
}
} catch (e: Exception) {
logger.warn("Failed to run for shard $shard. Error: ${e.message}")
Expand Down Expand Up @@ -629,7 +655,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
docs: List<BytesReference>,
monitor: Monitor,
monitorMetadata: MonitorMetadata,
index: String
index: String,
concreteIndex: String
): SearchHits {
val boolQueryBuilder = BoolQueryBuilder().must(QueryBuilders.matchQuery("index", index).operator(Operator.AND))

Expand All @@ -642,7 +669,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
val queryIndex = monitorMetadata.sourceToQueryIndexMapping[index + monitor.id]
if (queryIndex == null) {
val message = "Failed to resolve concrete queryIndex from sourceIndex during monitor execution!" +
" sourceIndex:$index queryIndex:${monitor.dataSources.queryIndex}"
" sourceIndex:$concreteIndex queryIndex:${monitor.dataSources.queryIndex}"
logger.error(message)
throw AlertingException.wrap(
OpenSearchStatusException(message, RestStatus.INTERNAL_SERVER_ERROR)
Expand Down Expand Up @@ -670,11 +697,23 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
return response.hits
}

private fun getAllDocs(hits: SearchHits, index: String, monitorId: String): List<Pair<String, BytesReference>> {
private fun getAllDocs(
hits: SearchHits,
index: String,
concreteIndex: String,
monitorId: String,
conflictingFields: List<String>
): List<Pair<String, BytesReference>> {
return hits.map { hit ->
val sourceMap = hit.sourceAsMap

transformDocumentFieldNames(sourceMap, "_${index}_$monitorId")
transformDocumentFieldNames(
sourceMap,
conflictingFields,
"_${index}_$monitorId",
"_${concreteIndex}_$monitorId",
""
)

var xContentBuilder = XContentFactory.jsonBuilder().map(sourceMap)

Expand All @@ -687,7 +726,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
}

/**
* Traverses document fields in leaves recursively and appends [fieldNameSuffix] to field names.
* Traverses document fields in leaves recursively and appends [fieldNameSuffixIndex] to field names with same names
* but different mappings & [fieldNameSuffixPattern] to field names which have unique names.
*
* Example for index name is my_log_index and Monitor ID is TReewWdsf2gdJFV:
* { {
Expand All @@ -701,17 +741,36 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
*/
private fun transformDocumentFieldNames(
jsonAsMap: MutableMap<String, Any>,
fieldNameSuffix: String
conflictingFields: List<String>,
fieldNameSuffixPattern: String,
fieldNameSuffixIndex: String,
fieldNamePrefix: String
) {
val tempMap = mutableMapOf<String, Any>()
val it: MutableIterator<Map.Entry<String, Any>> = jsonAsMap.entries.iterator()
while (it.hasNext()) {
val entry = it.next()
if (entry.value is Map<*, *>) {
transformDocumentFieldNames(entry.value as MutableMap<String, Any>, fieldNameSuffix)
} else if (entry.key.endsWith(fieldNameSuffix) == false) {
tempMap["${entry.key}$fieldNameSuffix"] = entry.value
it.remove()
transformDocumentFieldNames(
entry.value as MutableMap<String, Any>,
conflictingFields,
fieldNameSuffixPattern,
fieldNameSuffixIndex,
if (fieldNamePrefix == "") entry.key else "$fieldNamePrefix.${entry.key}"
)
} else if (!entry.key.endsWith(fieldNameSuffixPattern) && !entry.key.endsWith(fieldNameSuffixIndex)) {
var alreadyReplaced = false
conflictingFields.forEach { conflictingField ->
if (conflictingField == "$fieldNamePrefix.${entry.key}" || (fieldNamePrefix == "" && conflictingField == entry.key)) {
tempMap["${entry.key}$fieldNameSuffixIndex"] = entry.value
it.remove()
alreadyReplaced = true
}
}
if (!alreadyReplaced) {
tempMap["${entry.key}$fieldNameSuffixPattern"] = entry.value
it.remove()
}
}
}
jsonAsMap.putAll(tempMap)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ import org.opensearch.action.search.SearchResponse
import org.opensearch.action.support.ActionFilters
import org.opensearch.action.support.HandledTransportAction
import org.opensearch.action.support.master.AcknowledgedResponse
import org.opensearch.alerting.MonitorMetadataService
import org.opensearch.alerting.MonitorRunnerService.monitorCtx
import org.opensearch.alerting.WorkflowMetadataService
import org.opensearch.alerting.core.ScheduledJobIndices
import org.opensearch.alerting.opensearchapi.InjectorContextElement
import org.opensearch.alerting.opensearchapi.addFilter
Expand All @@ -43,6 +46,7 @@ import org.opensearch.alerting.util.AlertingException
import org.opensearch.alerting.util.IndexUtils
import org.opensearch.alerting.util.isADMonitor
import org.opensearch.alerting.util.isQueryLevelMonitor
import org.opensearch.alerting.workflow.CompositeWorkflowRunner
import org.opensearch.client.Client
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.inject.Inject
Expand Down Expand Up @@ -371,6 +375,37 @@ class TransportIndexWorkflowAction @Inject constructor(
)
return
}

val createdWorkflow = request.workflow.copy(id = indexResponse.id)
val executionId = CompositeWorkflowRunner.generateExecutionId(false, createdWorkflow)

val (workflowMetadata, _) = WorkflowMetadataService.getOrCreateWorkflowMetadata(
workflow = createdWorkflow,
skipIndex = false,
executionId = executionId
)

val delegates = (createdWorkflow.inputs[0] as CompositeInput).sequence.delegates.sortedBy { it.order }
val monitors = monitorCtx.workflowService!!.getMonitorsById(delegates.map { it.monitorId }, delegates.size)

for (monitor in monitors) {
var (monitorMetadata, created) = MonitorMetadataService.getOrCreateMetadata(
monitor = monitor,
createWithRunContext = true,
workflowMetadataId = workflowMetadata.id
)

if (created == false) {
log.warn("Metadata doc id:${monitorMetadata.id} exists, but it shouldn't!")
}

if (monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR) {
val oldMonitorMetadata = MonitorMetadataService.getMetadata(monitor)
monitorMetadata = monitorMetadata.copy(sourceToQueryIndexMapping = oldMonitorMetadata!!.sourceToQueryIndexMapping)
}
// When inserting queries in queryIndex we could update sourceToQueryIndexMapping
MonitorMetadataService.upsertMetadata(monitorMetadata, updating = true)
}
actionListener.onResponse(
IndexWorkflowResponse(
indexResponse.id, indexResponse.version, indexResponse.seqNo,
Expand Down Expand Up @@ -498,6 +533,33 @@ class TransportIndexWorkflowAction @Inject constructor(
)
return
}

val updatedWorkflow = request.workflow.copy(id = indexResponse.id)
val executionId = CompositeWorkflowRunner.generateExecutionId(false, updatedWorkflow)

val (workflowMetadata, _) = WorkflowMetadataService.getOrCreateWorkflowMetadata(
workflow = updatedWorkflow,
skipIndex = false,
executionId = executionId
)

val delegates = (updatedWorkflow.inputs[0] as CompositeInput).sequence.delegates.sortedBy { it.order }
val monitors = monitorCtx.workflowService!!.getMonitorsById(delegates.map { it.monitorId }, delegates.size)

for (monitor in monitors) {
val (monitorMetadata, created) = MonitorMetadataService.getOrCreateMetadata(
monitor = monitor,
createWithRunContext = true,
workflowMetadataId = workflowMetadata.id
)

if (created == false && monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR) {
var updatedMetadata = MonitorMetadataService.recreateRunContext(monitorMetadata, monitor)
val oldMonitorMetadata = MonitorMetadataService.getMetadata(monitor)
updatedMetadata = updatedMetadata.copy(sourceToQueryIndexMapping = oldMonitorMetadata!!.sourceToQueryIndexMapping)
MonitorMetadataService.upsertMetadata(updatedMetadata, updating = true)
}
}
actionListener.onResponse(
IndexWorkflowResponse(
indexResponse.id, indexResponse.version, indexResponse.seqNo,
Expand Down
Loading

0 comments on commit 0269dd1

Please sign in to comment.