Skip to content

Commit

Permalink
[Backport 2.3] Manual backports of several PRs (#960)
Browse files Browse the repository at this point in the history
* Added exception check once the .opendistro-alerting-config index is b… (#650)

* Added exception check once the .opendistro-alerting-config index is being created

During .opendistro-alerting-config index creation, if ResourceAlreadyExists exception is being raised, the flow will check first if the index is in yellow state and then it will re-try to index monitor

Signed-off-by: Stevan Buzejic <buzejic.stevan@gmail.com>

* Formating of the file fixed

Signed-off-by: Stevan Buzejic <buzejic.stevan@gmail.com>

Signed-off-by: Stevan Buzejic <buzejic.stevan@gmail.com>

* refactored DeleteMonitor Action to be synchronious (#628) (#630)

* refactored DeleteMonitor Action to be synchronious

Signed-off-by: Petar Dzepina <petar.dzepina@gmail.com>

Signed-off-by: Petar Dzepina <petar.dzepina@gmail.com>
Co-authored-by: Petar Dzepina <petar.dzepina@gmail.com>

* [Backport 2.x] QueryIndex rollover when field mapping limit is reached (#729)

Signed-off-by: Petar Dzepina <petar.dzepina@gmail.com>

* Mappings fix backport 2.x (#730)

* Added support for "nested" mappings (#645)

* example

Signed-off-by: Petar Dzepina <petar.dzepina@gmail.com>

* fixed updating mappings for queryIndex

Signed-off-by: Petar Dzepina <petar.dzepina@gmail.com>

Signed-off-by: Petar Dzepina <petar.dzepina@gmail.com>

* mappings traversal bug fix (#669)

Signed-off-by: Petar Dzepina <petar.dzepina@gmail.com>

Signed-off-by: Petar Dzepina <petar.dzepina@gmail.com>

* Added unwrapping exception from core; added more debug logs (#728)

Signed-off-by: Petar Dzepina <petar.dzepina@gmail.com>

* Add DataSources test for future backports

Signed-off-by: Ashish Agrawal <ashisagr@amazon.com>

* fix  percolator mapping error when having field name 'type' (#726)

Signed-off-by: Raj Chakravarthi <raj@icedome.ca>

* [BUG] ExecuteMonitor inserting metadata doc during dry run (#758)

* execute monitor bugfix

Signed-off-by: Petar Dzepina <petar.dzepina@gmail.com>

* added IT

Signed-off-by: Petar Dzepina <petar.dzepina@gmail.com>

* fixed created retval when skipIndex=true

Signed-off-by: Petar Dzepina <petar.dzepina@gmail.com>

---------

Signed-off-by: Petar Dzepina <petar.dzepina@gmail.com>

* fix for ERROR alert state generation in doc-level monitors (#768)

Signed-off-by: Subhobrata Dey <sbcd90@gmail.com>

* Adjusting max field index setting dynamically for query index (#776)

* added adjusting max field index setting dynamicly for query index

Signed-off-by: Petar Dzepina <petar.dzepina@gmail.com>

* Multiple indices support in DocLevelMonitorInput (#784)

Signed-off-by: Petar Dzepina <petar.dzepina@gmail.com>

* Doc transform 2.x backport (#853)

* conflict resovle - backport from main to 2.x

Signed-off-by: Petar Dzepina <petar.dzepina@gmail.com>

* fixed module class names

Signed-off-by: Petar Dzepina <petar.dzepina@gmail.com>

---------

Signed-off-by: Petar Dzepina <petar.dzepina@gmail.com>

* Update config index schema if needed at the start of each monitor execution (#849)

* Update config index schema if needed at the start of each monitor execution

Signed-off-by: Ashish Agrawal <ashisagr@amazon.com>

* Mappings parsing fix (#851)

* fixed mappings parsing when field name named "properties" exists in mappings

Signed-off-by: Petar Dzepina <petar.dzepina@gmail.com>

* message typo fix

Signed-off-by: Petar Dzepina <petar.dzepina@gmail.com>

---------

Signed-off-by: Petar Dzepina <petar.dzepina@gmail.com>

* [Backport 2.x] Notification security fix (#861)

* Notification security fix (#852)

* added injecting whole user object in threadContext before calling notification APIs so that backend roles are available to notification plugin

Signed-off-by: Petar Dzepina <petar.dzepina@gmail.com>

* compile fix

Signed-off-by: Petar Dzepina <petar.dzepina@gmail.com>

* refactored user_info injection to use InjectSecurity

Signed-off-by: Petar Dzepina <petar.dzepina@gmail.com>

* ktlint fix

Signed-off-by: Petar Dzepina <petar.dzepina@gmail.com>

---------

Signed-off-by: Petar Dzepina <petar.dzepina@gmail.com>
(cherry picked from commit e0b7a5a)

* remove unneeded import

Signed-off-by: Ashish Agrawal <ashisagr@amazon.com>

---------

Signed-off-by: Ashish Agrawal <ashisagr@amazon.com>
Co-authored-by: Petar Dzepina <petar.dzepina@gmail.com>
Co-authored-by: Ashish Agrawal <ashisagr@amazon.com>

* Fixed a bug that prevented alerts from being generated for doc level monitors that use wildcard characters in index names. (#894)

Signed-off-by: AWSHurneyt <hurneyt@amazon.com>

* fixed security tests (#484) (#794)

* fixed security tests

Signed-off-by: Raj Chakravarthi <raj@icedome.ca>
(cherry picked from commit c51940f)

---------

Signed-off-by: Stevan Buzejic <buzejic.stevan@gmail.com>
Signed-off-by: Petar Dzepina <petar.dzepina@gmail.com>
Signed-off-by: Ashish Agrawal <ashisagr@amazon.com>
Signed-off-by: Raj Chakravarthi <raj@icedome.ca>
Signed-off-by: Subhobrata Dey <sbcd90@gmail.com>
Signed-off-by: AWSHurneyt <hurneyt@amazon.com>
Co-authored-by: Stevan Buzejic <30922513+stevanbz@users.noreply.github.com>
Co-authored-by: Surya Sashank Nistala <snistala@amazon.com>
Co-authored-by: Petar Dzepina <petar.dzepina@gmail.com>
Co-authored-by: RAJ CHAKRAVARTHI <49325334+raj-chak@users.noreply.github.com>
Co-authored-by: Subhobrata Dey <sbcd90@gmail.com>
Co-authored-by: opensearch-trigger-bot[bot] <98922864+opensearch-trigger-bot[bot]@users.noreply.github.com>
Co-authored-by: AWSHurneyt <hurneyt@amazon.com>
Co-authored-by: RAJ CHAKRAVARTHI <raj@icedome.ca>
  • Loading branch information
9 people authored Jun 8, 2023
1 parent f9fdd57 commit 6d1f5e3
Show file tree
Hide file tree
Showing 32 changed files with 2,563 additions and 501 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
.registerClusterService(clusterService)
.registerClient(client)
.registerNamedXContentRegistry(xContentRegistry)
.registerindexNameExpressionResolver(indexNameExpressionResolver)
.registerScriptService(scriptService)
.registerSettings(settings)
.registerThreadPool(threadPool)
Expand All @@ -238,6 +239,14 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
destinationMigrationCoordinator = DestinationMigrationCoordinator(client, clusterService, threadPool, scheduledJobIndices)
this.threadPool = threadPool
this.clusterService = clusterService

MonitorMetadataService.initialize(
client,
clusterService,
xContentRegistry,
settings
)

return listOf(sweeper, scheduler, runner, scheduledJobIndices, docLevelMonitorQueries, destinationMigrationCoordinator)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@ package org.opensearch.alerting

import org.apache.logging.log4j.LogManager
import org.opensearch.ExceptionsHelper
import org.opensearch.action.admin.indices.get.GetIndexRequest
import org.opensearch.action.admin.indices.get.GetIndexResponse
import org.opensearch.OpenSearchStatusException
import org.opensearch.action.index.IndexRequest
import org.opensearch.action.index.IndexResponse
import org.opensearch.action.search.SearchAction
Expand All @@ -21,23 +20,24 @@ import org.opensearch.alerting.core.model.DocLevelQuery
import org.opensearch.alerting.core.model.ScheduledJob
import org.opensearch.alerting.model.ActionExecutionResult
import org.opensearch.alerting.model.Alert
import org.opensearch.alerting.model.AlertingConfigAccessor.Companion.getMonitorMetadata
import org.opensearch.alerting.model.DocumentExecutionContext
import org.opensearch.alerting.model.DocumentLevelTrigger
import org.opensearch.alerting.model.DocumentLevelTriggerRunResult
import org.opensearch.alerting.model.Finding
import org.opensearch.alerting.model.InputRunResults
import org.opensearch.alerting.model.Monitor
import org.opensearch.alerting.model.MonitorMetadata
import org.opensearch.alerting.model.MonitorRunResult
import org.opensearch.alerting.model.action.PerAlertActionScope
import org.opensearch.alerting.opensearchapi.string
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.script.DocumentLevelTriggerExecutionContext
import org.opensearch.alerting.util.AlertingException
import org.opensearch.alerting.util.IndexUtils
import org.opensearch.alerting.util.defaultToPerExecutionAction
import org.opensearch.alerting.util.getActionExecutionPolicy
import org.opensearch.alerting.util.updateMonitorMetadata
import org.opensearch.client.Client
import org.opensearch.cluster.metadata.IndexMetadata
import org.opensearch.cluster.routing.ShardRouting
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.bytes.BytesReference
Expand All @@ -46,6 +46,7 @@ import org.opensearch.common.xcontent.XContentBuilder
import org.opensearch.common.xcontent.XContentFactory
import org.opensearch.common.xcontent.XContentType
import org.opensearch.index.query.BoolQueryBuilder
import org.opensearch.index.query.Operator
import org.opensearch.index.query.QueryBuilders
import org.opensearch.percolator.PercolateQueryBuilderExt
import org.opensearch.rest.RestStatus
Expand All @@ -54,8 +55,7 @@ import org.opensearch.search.builder.SearchSourceBuilder
import org.opensearch.search.sort.SortOrder
import java.io.IOException
import java.time.Instant
import java.util.UUID
import kotlin.collections.HashMap
import java.util.*
import kotlin.math.max

object DocumentLevelMonitorRunner : MonitorRunner() {
Expand All @@ -69,6 +69,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
dryrun: Boolean
): MonitorRunResult<DocumentLevelTriggerRunResult> {
logger.debug("Document-level-monitor is running ...")
val isTempMonitor = dryrun || monitor.id == Monitor.NO_ID
var monitorResult = MonitorRunResult<DocumentLevelTriggerRunResult>(monitor.name, periodStart, periodEnd)

try {
Expand All @@ -78,33 +79,26 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
} catch (e: Exception) {
val id = if (monitor.id.trim().isEmpty()) "_na_" else monitor.id
logger.error("Error setting up alerts and findings indices for monitor: $id", e)
return monitorResult.copy(error = AlertingException.wrap(e))
monitorResult = monitorResult.copy(error = AlertingException.wrap(e))
}

try {
validate(monitor)
} catch (e: Exception) {
logger.error("Failed to start Document-level-monitor. Error: ${e.message}")
return monitorResult.copy(error = AlertingException.wrap(e))
monitorResult = monitorResult.copy(error = AlertingException.wrap(e))
}

monitorCtx.docLevelMonitorQueries!!.initDocLevelQueryIndex()
monitorCtx.docLevelMonitorQueries!!.indexDocLevelQueries(
var (monitorMetadata, _) = MonitorMetadataService.getOrCreateMetadata(
monitor = monitor,
monitorId = monitor.id,
indexTimeout = monitorCtx.indexTimeout!!
createWithRunContext = false,
skipIndex = isTempMonitor
)

val docLevelMonitorInput = monitor.inputs[0] as DocLevelMonitorInput
val index = docLevelMonitorInput.indices[0]
val queries: List<DocLevelQuery> = docLevelMonitorInput.queries

var monitorMetadata = getMonitorMetadata(monitorCtx.client!!, monitorCtx.xContentRegistry!!, "${monitor.id}-metadata")
if (monitorMetadata == null) {
monitorMetadata = createMonitorMetadata(monitor.id)
}
val queries: List<DocLevelQuery> = docLevelMonitorInput.queries

val isTempMonitor = dryrun || monitor.id == Monitor.NO_ID
val lastRunContext = if (monitorMetadata.lastRunContext.isNullOrEmpty()) mutableMapOf()
else monitorMetadata.lastRunContext.toMutableMap() as MutableMap<String, MutableMap<String, Any>>

Expand All @@ -115,11 +109,20 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
val docsToQueries = mutableMapOf<String, MutableList<String>>()

try {
val getIndexRequest = GetIndexRequest().indices(index)
val getIndexResponse: GetIndexResponse = monitorCtx.client!!.suspendUntil {
monitorCtx.client!!.admin().indices().getIndex(getIndexRequest, it)
}
val indices = getIndexResponse.indices()
// Resolve all passed indices to concrete indices
val indices = IndexUtils.resolveAllIndices(
docLevelMonitorInput.indices,
monitorCtx.clusterService!!,
monitorCtx.indexNameExpressionResolver!!
)

monitorCtx.docLevelMonitorQueries!!.initDocLevelQueryIndex()
monitorCtx.docLevelMonitorQueries!!.indexDocLevelQueries(
monitor = monitor,
monitorId = monitor.id,
monitorMetadata,
indexTimeout = monitorCtx.indexTimeout!!
)

// cleanup old indices that are not monitored anymore from the same monitor
for (ind in updatedLastRunContext.keys) {
Expand All @@ -131,8 +134,13 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
indices.forEach { indexName ->
// Prepare lastRunContext for each index
val indexLastRunContext = lastRunContext.getOrPut(indexName) {
val indexCreatedRecently = createdRecently(monitor, indexName, periodStart, periodEnd, getIndexResponse)
createRunContext(monitorCtx.clusterService!!, monitorCtx.client!!, indexName, indexCreatedRecently)
val isIndexCreatedRecently = createdRecently(
monitor,
periodStart,
periodEnd,
monitorCtx.clusterService!!.state().metadata.index(indexName)
)
MonitorMetadataService.createRunContextForIndex(indexName, isIndexCreatedRecently)
}

// Prepare updatedLastRunContext for each index
Expand Down Expand Up @@ -160,7 +168,13 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
val matchingDocs = getMatchingDocs(monitor, monitorCtx, docExecutionContext, indexName)

if (matchingDocs.isNotEmpty()) {
val matchedQueriesForDocs = getMatchedQueries(monitorCtx, matchingDocs.map { it.second }, monitor, indexName)
val matchedQueriesForDocs = getMatchedQueries(
monitorCtx,
matchingDocs.map { it.second },
monitor,
monitorMetadata,
indexName
)

matchedQueriesForDocs.forEach { hit ->
val id = hit.id.replace("_${indexName}_${monitor.id}", "")
Expand Down Expand Up @@ -214,7 +228,10 @@ object DocumentLevelMonitorRunner : MonitorRunner() {

// Don't update monitor if this is a test monitor
if (!isTempMonitor) {
updateMonitorMetadata(monitorCtx.client!!, monitorCtx.settings!!, monitorMetadata.copy(lastRunContext = updatedLastRunContext))
MonitorMetadataService.upsertMetadata(
monitorMetadata.copy(lastRunContext = updatedLastRunContext),
true
)
}

// TODO: Update the Document as part of the Trigger and return back the trigger action result
Expand Down Expand Up @@ -265,6 +282,16 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
alerts.add(alert)
}

if (findingDocPairs.isEmpty() && monitorResult.error != null) {
val alert = monitorCtx.alertService!!.composeDocLevelAlert(
listOf(),
listOf(),
triggerCtx,
monitorResult.alertError() ?: triggerResult.alertError()
)
alerts.add(alert)
}

val shouldDefaultToPerExecution = defaultToPerExecutionAction(
monitorCtx.maxActionableAlertCount,
monitorId = monitor.id,
Expand Down Expand Up @@ -366,42 +393,22 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
throw IOException("Invalid input with document-level-monitor.")
}

val docLevelMonitorInput = monitor.inputs[0] as DocLevelMonitorInput
if (docLevelMonitorInput.indices.size > 1) {
throw IOException("Only one index is supported with document-level-monitor.")
}
}

suspend fun createRunContext(
clusterService: ClusterService,
client: Client,
index: String,
createdRecently: Boolean = false
): HashMap<String, Any> {
val lastRunContext = HashMap<String, Any>()
lastRunContext["index"] = index
val count = getShardsCount(clusterService, index)
lastRunContext["shards_count"] = count

for (i: Int in 0 until count) {
val shard = i.toString()
val maxSeqNo: Long = if (createdRecently) -1L else getMaxSeqNo(client, index, shard)
lastRunContext[shard] = maxSeqNo
if ((monitor.inputs[0] as DocLevelMonitorInput).indices.isEmpty()) {
throw IllegalArgumentException("DocLevelMonitorInput has no indices")
}
return lastRunContext
}

// Checks if the index was created from the last execution run or when the monitor was last updated to ensure that
// new index is monitored from the beginning of that index
private fun createdRecently(
monitor: Monitor,
index: String,
periodStart: Instant,
periodEnd: Instant,
getIndexResponse: GetIndexResponse
indexMetadata: IndexMetadata
): Boolean {
val lastExecutionTime = if (periodStart == periodEnd) monitor.lastUpdateTime else periodStart
return getIndexResponse.settings.get(index).getAsLong("index.creation_date", 0L) > lastExecutionTime.toEpochMilli()
val indexCreationDate = indexMetadata.settings.get("index.creation_date")?.toLong() ?: 0L
return indexCreationDate > lastExecutionTime.toEpochMilli()
}

/**
Expand Down Expand Up @@ -506,17 +513,27 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
monitorCtx: MonitorRunnerExecutionContext,
docs: List<BytesReference>,
monitor: Monitor,
monitorMetadata: MonitorMetadata,
index: String
): SearchHits {
val boolQueryBuilder = BoolQueryBuilder().filter(QueryBuilders.matchQuery("index", index))
val boolQueryBuilder = BoolQueryBuilder().must(QueryBuilders.matchQuery("index", index).operator(Operator.AND))

val percolateQueryBuilder = PercolateQueryBuilderExt("query", docs, XContentType.JSON)
if (monitor.id.isNotEmpty()) {
boolQueryBuilder.filter(QueryBuilders.matchQuery("monitor_id", monitor.id))
boolQueryBuilder.must(QueryBuilders.matchQuery("monitor_id", monitor.id).operator(Operator.AND))
}
boolQueryBuilder.filter(percolateQueryBuilder)

val searchRequest = SearchRequest(ScheduledJob.DOC_LEVEL_QUERIES_INDEX)
val queryIndex = monitorMetadata.sourceToQueryIndexMapping[index + monitor.id]
if (queryIndex == null) {
val message = "Failed to resolve concrete queryIndex from sourceIndex during monitor execution!" +
" sourceIndex:$index queryIndex:${ScheduledJob.DOC_LEVEL_QUERIES_INDEX}"
logger.error(message)
throw AlertingException.wrap(
OpenSearchStatusException(message, RestStatus.INTERNAL_SERVER_ERROR)
)
}
val searchRequest = SearchRequest(queryIndex)
val searchSourceBuilder = SearchSourceBuilder()
searchSourceBuilder.query(boolQueryBuilder)
searchRequest.source(searchSourceBuilder)
Expand Down
Loading

0 comments on commit 6d1f5e3

Please sign in to comment.