diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt index 9c719ac43..ba488892b 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt @@ -57,7 +57,9 @@ import org.opensearch.alerting.core.resthandler.RestScheduledJobStatsHandler import org.opensearch.alerting.core.schedule.JobScheduler import org.opensearch.alerting.core.settings.LegacyOpenDistroScheduledJobSettings import org.opensearch.alerting.core.settings.ScheduledJobSettings +import org.opensearch.alerting.model.BucketLevelTrigger import org.opensearch.alerting.model.Monitor +import org.opensearch.alerting.model.QueryLevelTrigger import org.opensearch.alerting.resthandler.RestAcknowledgeAlertAction import org.opensearch.alerting.resthandler.RestDeleteDestinationAction import org.opensearch.alerting.resthandler.RestDeleteEmailAccountAction @@ -135,8 +137,8 @@ import java.util.function.Supplier /** * Entry point of the OpenDistro for Elasticsearch alerting plugin * This class initializes the [RestGetMonitorAction], [RestDeleteMonitorAction], [RestIndexMonitorAction] rest handlers. - * It also adds [Monitor.XCONTENT_REGISTRY], [SearchInput.XCONTENT_REGISTRY] to the - * [NamedXContentRegistry] so that we are able to deserialize the custom named objects. + * It also adds [Monitor.XCONTENT_REGISTRY], [SearchInput.XCONTENT_REGISTRY], [QueryLevelTrigger.XCONTENT_REGISTRY], + * [BucketLevelTrigger.XCONTENT_REGISTRY] to the [NamedXContentRegistry] so that we are able to deserialize the custom named objects. */ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, ReloadablePlugin, SearchPlugin, Plugin() { @@ -224,7 +226,12 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R } override fun getNamedXContent(): List { - return listOf(Monitor.XCONTENT_REGISTRY, SearchInput.XCONTENT_REGISTRY) + return listOf( + Monitor.XCONTENT_REGISTRY, + SearchInput.XCONTENT_REGISTRY, + QueryLevelTrigger.XCONTENT_REGISTRY, + BucketLevelTrigger.XCONTENT_REGISTRY + ) } override fun createComponents( diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/action/ExecuteMonitorResponse.kt b/alerting/src/main/kotlin/org/opensearch/alerting/action/ExecuteMonitorResponse.kt index b00733205..44b3064f3 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/action/ExecuteMonitorResponse.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/action/ExecuteMonitorResponse.kt @@ -37,9 +37,9 @@ import java.io.IOException class ExecuteMonitorResponse : ActionResponse, ToXContentObject { - val monitorRunResult: MonitorRunResult + val monitorRunResult: MonitorRunResult<*> - constructor(monitorRunResult: MonitorRunResult) : super() { + constructor(monitorRunResult: MonitorRunResult<*>) : super() { this.monitorRunResult = monitorRunResult } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/model/Alert.kt b/alerting/src/main/kotlin/org/opensearch/alerting/model/Alert.kt index f1053d8ee..5283a61e9 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/model/Alert.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/model/Alert.kt @@ -31,6 +31,7 @@ import org.opensearch.alerting.elasticapi.instant import org.opensearch.alerting.elasticapi.optionalTimeField import org.opensearch.alerting.elasticapi.optionalUserField import org.opensearch.alerting.util.IndexUtils.Companion.NO_SCHEMA_VERSION +import org.opensearch.commons.authuser.User import org.opensearch.common.io.stream.StreamInput import org.opensearch.common.io.stream.StreamOutput import org.opensearch.common.io.stream.Writeable @@ -39,7 +40,6 @@ import org.opensearch.common.xcontent.ToXContent import org.opensearch.common.xcontent.XContentBuilder import org.opensearch.common.xcontent.XContentParser import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken -import org.opensearch.commons.authuser.User import java.io.IOException import java.time.Instant @@ -61,7 +61,8 @@ data class Alert( val errorMessage: String? = null, val errorHistory: List, val severity: String, - val actionExecutionResults: List + val actionExecutionResults: List, + val aggregationResultBucket: AggregationResultBucket? = null ) : Writeable, ToXContent { init { @@ -72,7 +73,7 @@ data class Alert( constructor( monitor: Monitor, - trigger: Trigger, + trigger: QueryLevelTrigger, startTime: Instant, lastNotificationTime: Instant?, state: State = State.ACTIVE, @@ -80,12 +81,44 @@ data class Alert( errorHistory: List = mutableListOf(), actionExecutionResults: List = mutableListOf(), schemaVersion: Int = NO_SCHEMA_VERSION - ) : this( - monitorId = monitor.id, monitorName = monitor.name, monitorVersion = monitor.version, monitorUser = monitor.user, + ) : this(monitorId = monitor.id, monitorName = monitor.name, monitorVersion = monitor.version, monitorUser = monitor.user, triggerId = trigger.id, triggerName = trigger.name, state = state, startTime = startTime, lastNotificationTime = lastNotificationTime, errorMessage = errorMessage, errorHistory = errorHistory, - severity = trigger.severity, actionExecutionResults = actionExecutionResults, schemaVersion = schemaVersion - ) + severity = trigger.severity, actionExecutionResults = actionExecutionResults, schemaVersion = schemaVersion, + aggregationResultBucket = null) + + constructor( + monitor: Monitor, + trigger: BucketLevelTrigger, + startTime: Instant, + lastNotificationTime: Instant?, + state: State = State.ACTIVE, + errorMessage: String? = null, + errorHistory: List = mutableListOf(), + actionExecutionResults: List = mutableListOf(), + schemaVersion: Int = NO_SCHEMA_VERSION + ) : this(monitorId = monitor.id, monitorName = monitor.name, monitorVersion = monitor.version, monitorUser = monitor.user, + triggerId = trigger.id, triggerName = trigger.name, state = state, startTime = startTime, + lastNotificationTime = lastNotificationTime, errorMessage = errorMessage, errorHistory = errorHistory, + severity = trigger.severity, actionExecutionResults = actionExecutionResults, schemaVersion = schemaVersion, + aggregationResultBucket = null) + + constructor( + monitor: Monitor, + trigger: BucketLevelTrigger, + startTime: Instant, + lastNotificationTime: Instant?, + state: State = State.ACTIVE, + errorMessage: String? = null, + errorHistory: List = mutableListOf(), + actionExecutionResults: List = mutableListOf(), + schemaVersion: Int = NO_SCHEMA_VERSION, + aggregationResultBucket: AggregationResultBucket + ) : this(monitorId = monitor.id, monitorName = monitor.name, monitorVersion = monitor.version, monitorUser = monitor.user, + triggerId = trigger.id, triggerName = trigger.name, state = state, startTime = startTime, + lastNotificationTime = lastNotificationTime, errorMessage = errorMessage, errorHistory = errorHistory, + severity = trigger.severity, actionExecutionResults = actionExecutionResults, schemaVersion = schemaVersion, + aggregationResultBucket = aggregationResultBucket) enum class State { ACTIVE, ACKNOWLEDGED, COMPLETED, ERROR, DELETED @@ -112,7 +145,8 @@ data class Alert( errorMessage = sin.readOptionalString(), errorHistory = sin.readList(::AlertError), severity = sin.readString(), - actionExecutionResults = sin.readList(::ActionExecutionResult) + actionExecutionResults = sin.readList(::ActionExecutionResult), + aggregationResultBucket = if (sin.readBoolean()) AggregationResultBucket(sin) else null ) fun isAcknowledged(): Boolean = (state == State.ACKNOWLEDGED) @@ -138,6 +172,12 @@ data class Alert( out.writeCollection(errorHistory) out.writeString(severity) out.writeCollection(actionExecutionResults) + if (aggregationResultBucket != null) { + out.writeBoolean(true) + aggregationResultBucket.writeTo(out) + } else { + out.writeBoolean(false) + } } companion object { @@ -160,7 +200,8 @@ data class Alert( const val ALERT_HISTORY_FIELD = "alert_history" const val SEVERITY_FIELD = "severity" const val ACTION_EXECUTION_RESULTS_FIELD = "action_execution_results" - + const val BUCKET_KEYS = AggregationResultBucket.BUCKET_KEYS + const val PARENTS_BUCKET_PATH = AggregationResultBucket.PARENTS_BUCKET_PATH const val NO_ID = "" const val NO_VERSION = Versions.NOT_FOUND @@ -183,8 +224,8 @@ data class Alert( var acknowledgedTime: Instant? = null var errorMessage: String? = null val errorHistory: MutableList = mutableListOf() - var actionExecutionResults: MutableList = mutableListOf() - + val actionExecutionResults: MutableList = mutableListOf() + var aggAlertBucket: AggregationResultBucket? = null ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp) while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { val fieldName = xcp.currentName() @@ -217,18 +258,27 @@ data class Alert( actionExecutionResults.add(ActionExecutionResult.parse(xcp)) } } + AggregationResultBucket.CONFIG_NAME -> { + // If an Alert with aggAlertBucket contents is indexed into the alerts index first, then + // that field will be added to the mappings. + // In this case, that field will default to null when it isn't present for Alerts created by Query-Level Monitors + // (even though the toXContent doesn't output the field) so null is being accounted for here. + aggAlertBucket = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) { + null + } else { + AggregationResultBucket.parse(xcp) + } + } } } - return Alert( - id = id, version = version, schemaVersion = schemaVersion, monitorId = requireNotNull(monitorId), + return Alert(id = id, version = version, schemaVersion = schemaVersion, monitorId = requireNotNull(monitorId), monitorName = requireNotNull(monitorName), monitorVersion = monitorVersion, monitorUser = monitorUser, triggerId = requireNotNull(triggerId), triggerName = requireNotNull(triggerName), state = requireNotNull(state), startTime = requireNotNull(startTime), endTime = endTime, lastNotificationTime = lastNotificationTime, acknowledgedTime = acknowledgedTime, errorMessage = errorMessage, errorHistory = errorHistory, severity = severity, - actionExecutionResults = actionExecutionResults - ) + actionExecutionResults = actionExecutionResults, aggregationResultBucket = aggAlertBucket) } @JvmStatic @@ -239,7 +289,7 @@ data class Alert( } override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { - return builder.startObject() + builder.startObject() .field(ALERT_ID_FIELD, id) .field(ALERT_VERSION_FIELD, version) .field(MONITOR_ID_FIELD, monitorId) @@ -258,7 +308,9 @@ data class Alert( .optionalTimeField(LAST_NOTIFICATION_TIME_FIELD, lastNotificationTime) .optionalTimeField(END_TIME_FIELD, endTime) .optionalTimeField(ACKNOWLEDGED_TIME_FIELD, acknowledgedTime) - .endObject() + aggregationResultBucket?.innerXContent(builder) + builder.endObject() + return builder } fun asTemplateArg(): Map { @@ -271,7 +323,10 @@ data class Alert( LAST_NOTIFICATION_TIME_FIELD to lastNotificationTime?.toEpochMilli(), SEVERITY_FIELD to severity, START_TIME_FIELD to startTime.toEpochMilli(), - STATE_FIELD to state.toString() + STATE_FIELD to state.toString(), + // Converting bucket keys to comma separated String to avoid manipulation in Action mustache templates + BUCKET_KEYS to aggregationResultBucket?.bucketKeys?.joinToString(","), + PARENTS_BUCKET_PATH to aggregationResultBucket?.parentBucketPath ) } } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/model/BucketLevelTrigger.kt b/alerting/src/main/kotlin/org/opensearch/alerting/model/BucketLevelTrigger.kt new file mode 100644 index 000000000..137715ba7 --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/model/BucketLevelTrigger.kt @@ -0,0 +1,152 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.alerting.model + +import org.opensearch.alerting.aggregation.bucketselectorext.BucketSelectorExtAggregationBuilder +import org.opensearch.alerting.model.Trigger.Companion.ACTIONS_FIELD +import org.opensearch.alerting.model.Trigger.Companion.ID_FIELD +import org.opensearch.alerting.model.Trigger.Companion.NAME_FIELD +import org.opensearch.alerting.model.Trigger.Companion.SEVERITY_FIELD +import org.opensearch.alerting.model.action.Action +import org.opensearch.common.CheckedFunction +import org.opensearch.common.ParseField +import org.opensearch.common.UUIDs +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.common.io.stream.StreamOutput +import org.opensearch.common.xcontent.NamedXContentRegistry +import org.opensearch.common.xcontent.ToXContent +import org.opensearch.common.xcontent.XContentBuilder +import org.opensearch.common.xcontent.XContentParser +import org.opensearch.common.xcontent.XContentParser.Token +import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken +import java.io.IOException + +/** + * A multi-alert Trigger available with Bucket-Level Monitors that filters aggregation buckets via a pipeline + * aggregator. + */ +data class BucketLevelTrigger( + override val id: String = UUIDs.base64UUID(), + override val name: String, + override val severity: String, + val bucketSelector: BucketSelectorExtAggregationBuilder, + override val actions: List +) : Trigger { + + @Throws(IOException::class) + constructor(sin: StreamInput): this( + sin.readString(), // id + sin.readString(), // name + sin.readString(), // severity + BucketSelectorExtAggregationBuilder(sin), // condition + sin.readList(::Action) // actions + ) + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + builder.startObject() + .startObject(BUCKET_LEVEL_TRIGGER_FIELD) + .field(ID_FIELD, id) + .field(NAME_FIELD, name) + .field(SEVERITY_FIELD, severity) + .startObject(CONDITION_FIELD) + bucketSelector.internalXContent(builder, params) + builder.endObject() + .field(ACTIONS_FIELD, actions.toTypedArray()) + .endObject() + .endObject() + return builder + } + + override fun name(): String { + return BUCKET_LEVEL_TRIGGER_FIELD + } + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + out.writeString(id) + out.writeString(name) + out.writeString(severity) + bucketSelector.writeTo(out) + out.writeCollection(actions) + } + + fun asTemplateArg(): Map { + return mapOf( + ID_FIELD to id, + NAME_FIELD to name, + SEVERITY_FIELD to severity, + ACTIONS_FIELD to actions.map { it.asTemplateArg() }, + PARENT_BUCKET_PATH to getParentBucketPath() + ) + } + + fun getParentBucketPath(): String { + return bucketSelector.parentBucketPath + } + + companion object { + const val BUCKET_LEVEL_TRIGGER_FIELD = "bucket_level_trigger" + const val CONDITION_FIELD = "condition" + const val PARENT_BUCKET_PATH = "parentBucketPath" + + val XCONTENT_REGISTRY = NamedXContentRegistry.Entry(Trigger::class.java, ParseField(BUCKET_LEVEL_TRIGGER_FIELD), + CheckedFunction { parseInner(it) }) + + @JvmStatic + @Throws(IOException::class) + fun parseInner(xcp: XContentParser): BucketLevelTrigger { + var id = UUIDs.base64UUID() // assign a default triggerId if one is not specified + lateinit var name: String + lateinit var severity: String + val actions: MutableList = mutableListOf() + ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp) + lateinit var bucketSelector: BucketSelectorExtAggregationBuilder + + while (xcp.nextToken() != Token.END_OBJECT) { + val fieldName = xcp.currentName() + + xcp.nextToken() + when (fieldName) { + ID_FIELD -> id = xcp.text() + NAME_FIELD -> name = xcp.text() + SEVERITY_FIELD -> severity = xcp.text() + CONDITION_FIELD -> { + // Using the trigger id as the name in the bucket selector since it is validated for uniqueness within Monitors. + // The contents of the trigger definition are round-tripped through parse and toXContent during Monitor creation + // ensuring that the id is available here in the version of the Monitor object that will be executed, even if the + // user submitted a custom trigger id after the condition definition. + bucketSelector = BucketSelectorExtAggregationBuilder.parse(id, xcp) + } + ACTIONS_FIELD -> { + ensureExpectedToken(Token.START_ARRAY, xcp.currentToken(), xcp) + while (xcp.nextToken() != Token.END_ARRAY) { + actions.add(Action.parse(xcp)) + } + } + } + } + + return BucketLevelTrigger( + id = requireNotNull(id) { "Trigger id is null." }, + name = requireNotNull(name) { "Trigger name is null" }, + severity = requireNotNull(severity) { "Trigger severity is null" }, + bucketSelector = requireNotNull(bucketSelector) { "Trigger condition is null" }, + actions = requireNotNull(actions) { "Trigger actions are null" }) + } + + @JvmStatic + @Throws(IOException::class) + fun readFrom(sin: StreamInput): BucketLevelTrigger { + return BucketLevelTrigger(sin) + } + } +} diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/model/BucketLevelTriggerRunResult.kt b/alerting/src/main/kotlin/org/opensearch/alerting/model/BucketLevelTriggerRunResult.kt new file mode 100644 index 000000000..5e05032d2 --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/model/BucketLevelTriggerRunResult.kt @@ -0,0 +1,62 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.alerting.model + +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.common.io.stream.StreamOutput +import org.opensearch.common.xcontent.ToXContent +import org.opensearch.common.xcontent.XContentBuilder +import java.io.IOException + +data class BucketLevelTriggerRunResult( + override var triggerName: String, + override var error: Exception? = null, + var aggregationResultBuckets: Map, + var actionResultsMap: MutableMap> = mutableMapOf() +) : TriggerRunResult(triggerName, error) { + + @Throws(IOException::class) + @Suppress("UNCHECKED_CAST") + constructor(sin: StreamInput): this( + sin.readString(), + sin.readException() as Exception?, // error + sin.readMap(StreamInput::readString, ::AggregationResultBucket), + sin.readMap() as MutableMap> + ) + + override fun internalXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + return builder + .field(AGG_RESULT_BUCKETS, aggregationResultBuckets) + .field(ACTIONS_RESULTS, actionResultsMap as Map) + } + + @Throws(IOException::class) + @Suppress("UNCHECKED_CAST") + override fun writeTo(out: StreamOutput) { + super.writeTo(out) + out.writeMap(aggregationResultBuckets, StreamOutput::writeString) { + valueOut: StreamOutput, aggResultBucket: AggregationResultBucket -> aggResultBucket.writeTo(valueOut) + } + out.writeMap(actionResultsMap as Map) + } + + companion object { + const val AGG_RESULT_BUCKETS = "agg_result_buckets" + const val ACTIONS_RESULTS = "action_results" + + @JvmStatic + @Throws(IOException::class) + fun readFrom(sin: StreamInput): TriggerRunResult { + return BucketLevelTriggerRunResult(sin) + } + } +} diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/model/Monitor.kt b/alerting/src/main/kotlin/org/opensearch/alerting/model/Monitor.kt index 9df01b0c4..4cc346070 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/model/Monitor.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/model/Monitor.kt @@ -39,6 +39,8 @@ import org.opensearch.alerting.settings.AlertingSettings.Companion.MONITOR_MAX_T import org.opensearch.alerting.util.IndexUtils.Companion.NO_SCHEMA_VERSION import org.opensearch.alerting.util._ID import org.opensearch.alerting.util._VERSION +import org.opensearch.alerting.util.isBucketLevelMonitor +import org.opensearch.commons.authuser.User import org.opensearch.common.CheckedFunction import org.opensearch.common.ParseField import org.opensearch.common.io.stream.StreamInput @@ -49,9 +51,9 @@ import org.opensearch.common.xcontent.XContentBuilder import org.opensearch.common.xcontent.XContentParser import org.opensearch.common.xcontent.XContentParser.Token import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken -import org.opensearch.commons.authuser.User import java.io.IOException import java.time.Instant +import java.util.Locale /** * A value object that represents a Monitor. Monitors are used to periodically execute a source query and check the @@ -65,6 +67,9 @@ data class Monitor( override val schedule: Schedule, override val lastUpdateTime: Instant, override val enabledTime: Instant?, + // TODO: Check how this behaves during rolling upgrade/multi-version cluster + // Can read/write and parsing break if it's done from an old -> new version of the plugin? + val monitorType: MonitorType, val user: User?, val schemaVersion: Int = NO_SCHEMA_VERSION, val inputs: List, @@ -79,6 +84,13 @@ data class Monitor( val triggerIds = mutableSetOf() triggers.forEach { trigger -> require(triggerIds.add(trigger.id)) { "Duplicate trigger id: ${trigger.id}. Trigger ids must be unique." } + // Verify Trigger type based on Monitor type + when (monitorType) { + MonitorType.QUERY_LEVEL_MONITOR -> + require(trigger is QueryLevelTrigger) { "Incompatible trigger [$trigger.id] for monitor type [$monitorType]" } + MonitorType.BUCKET_LEVEL_MONITOR -> + require(trigger is BucketLevelTrigger) { "Incompatible trigger [$trigger.id] for monitor type [$monitorType]" } + } } if (enabled) { requireNotNull(enabledTime) @@ -87,10 +99,20 @@ data class Monitor( } require(inputs.size <= MONITOR_MAX_INPUTS) { "Monitors can only have $MONITOR_MAX_INPUTS search input." } require(triggers.size <= MONITOR_MAX_TRIGGERS) { "Monitors can only support up to $MONITOR_MAX_TRIGGERS triggers." } + if (this.isBucketLevelMonitor()) { + inputs.forEach { input -> + require(input is SearchInput) { "Unsupported input [$input] for Monitor" } + // TODO: Keeping query validation simple for now, only term aggregations have full support for the "group by" on the + // initial release. Should either add tests for other aggregation types or add validation to prevent using them. + require(input.query.aggregations() != null && !input.query.aggregations().aggregatorFactories.isEmpty()) { + "At least one aggregation is required for the input [$input]" + } + } + } } @Throws(IOException::class) - constructor(sin: StreamInput) : this( + constructor(sin: StreamInput): this( id = sin.readString(), version = sin.readLong(), name = sin.readString(), @@ -98,14 +120,27 @@ data class Monitor( schedule = Schedule.readFrom(sin), lastUpdateTime = sin.readInstant(), enabledTime = sin.readOptionalInstant(), + monitorType = sin.readEnum(MonitorType::class.java), user = if (sin.readBoolean()) { User(sin) } else null, schemaVersion = sin.readInt(), inputs = sin.readList(::SearchInput), - triggers = sin.readList(::Trigger), + triggers = sin.readList((Trigger)::readFrom), uiMetadata = suppressWarning(sin.readMap()) ) + + // This enum classifies different Monitors + // This is different from 'type' which denotes the Scheduled Job type + enum class MonitorType(val value: String) { + QUERY_LEVEL_MONITOR("query_level_monitor"), + BUCKET_LEVEL_MONITOR("bucket_level_monitor"); + + override fun toString(): String { + return value + } + } + fun toXContent(builder: XContentBuilder): XContentBuilder { return toXContent(builder, ToXContent.EMPTY_PARAMS) } @@ -121,6 +156,7 @@ data class Monitor( builder.field(TYPE_FIELD, type) .field(SCHEMA_VERSION_FIELD, schemaVersion) .field(NAME_FIELD, name) + .field(MONITOR_TYPE_FIELD, monitorType) .optionalUserField(USER_FIELD, user) .field(ENABLED_FIELD, enabled) .optionalTimeField(ENABLED_TIME_FIELD, enabledTime) @@ -149,17 +185,25 @@ data class Monitor( schedule.writeTo(out) out.writeInstant(lastUpdateTime) out.writeOptionalInstant(enabledTime) + out.writeEnum(monitorType) out.writeBoolean(user != null) user?.writeTo(out) out.writeInt(schemaVersion) out.writeCollection(inputs) - out.writeCollection(triggers) + // Outputting type with each Trigger so that the generic Trigger.readFrom() can read it + out.writeVInt(triggers.size) + triggers.forEach { + if (it is QueryLevelTrigger) out.writeEnum(Trigger.Type.QUERY_LEVEL_TRIGGER) + else out.writeEnum(Trigger.Type.BUCKET_LEVEL_TRIGGER) + it.writeTo(out) + } out.writeMap(uiMetadata) } companion object { const val MONITOR_TYPE = "monitor" const val TYPE_FIELD = "type" + const val MONITOR_TYPE_FIELD = "monitor_type" const val SCHEMA_VERSION_FIELD = "schema_version" const val NAME_FIELD = "name" const val USER_FIELD = "user" @@ -175,17 +219,17 @@ data class Monitor( // This is defined here instead of in ScheduledJob to avoid having the ScheduledJob class know about all // the different subclasses and creating circular dependencies - val XCONTENT_REGISTRY = NamedXContentRegistry.Entry( - ScheduledJob::class.java, + val XCONTENT_REGISTRY = NamedXContentRegistry.Entry(ScheduledJob::class.java, ParseField(MONITOR_TYPE), - CheckedFunction { parse(it) } - ) + CheckedFunction { parse(it) }) @JvmStatic @JvmOverloads @Throws(IOException::class) fun parse(xcp: XContentParser, id: String = NO_ID, version: Long = NO_VERSION): Monitor { lateinit var name: String + // Default to QUERY_LEVEL_MONITOR to cover Monitors that existed before the addition of MonitorType + var monitorType: String = MonitorType.QUERY_LEVEL_MONITOR.toString() var user: User? = null lateinit var schedule: Schedule var lastUpdateTime: Instant? = null @@ -204,6 +248,13 @@ data class Monitor( when (fieldName) { SCHEMA_VERSION_FIELD -> schemaVersion = xcp.intValue() NAME_FIELD -> name = xcp.text() + MONITOR_TYPE_FIELD -> { + monitorType = xcp.text() + val allowedTypes = MonitorType.values().map { it.value } + if (!allowedTypes.contains(monitorType)) { + throw IllegalStateException("Monitor type should be one of $allowedTypes") + } + } USER_FIELD -> user = if (xcp.currentToken() == Token.VALUE_NULL) null else User.parse(xcp) ENABLED_FIELD -> enabled = xcp.booleanValue() SCHEDULE_FIELD -> schedule = Schedule.parse(xcp) @@ -233,20 +284,19 @@ data class Monitor( } else if (!enabled) { enabledTime = null } - return Monitor( - id, + return Monitor(id, version, requireNotNull(name) { "Monitor name is null" }, enabled, requireNotNull(schedule) { "Monitor schedule is null" }, lastUpdateTime ?: Instant.now(), enabledTime, + MonitorType.valueOf(monitorType.toUpperCase(Locale.ROOT)), user, schemaVersion, inputs.toList(), triggers.toList(), - uiMetadata - ) + uiMetadata) } @JvmStatic diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/model/MonitorRunResult.kt b/alerting/src/main/kotlin/org/opensearch/alerting/model/MonitorRunResult.kt index 28a6dec05..eef9899f7 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/model/MonitorRunResult.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/model/MonitorRunResult.kt @@ -27,9 +27,9 @@ package org.opensearch.alerting.model import org.apache.logging.log4j.LogManager -import org.opensearch.OpenSearchException import org.opensearch.alerting.alerts.AlertError import org.opensearch.alerting.elasticapi.optionalTimeField +import org.opensearch.OpenSearchException import org.opensearch.common.io.stream.StreamInput import org.opensearch.common.io.stream.StreamOutput import org.opensearch.common.io.stream.Writeable @@ -39,23 +39,24 @@ import org.opensearch.script.ScriptException import java.io.IOException import java.time.Instant -data class MonitorRunResult( +data class MonitorRunResult( val monitorName: String, val periodStart: Instant, val periodEnd: Instant, val error: Exception? = null, val inputResults: InputRunResults = InputRunResults(), - val triggerResults: Map = mapOf() + val triggerResults: Map = mapOf() ) : Writeable, ToXContent { @Throws(IOException::class) + @Suppress("UNCHECKED_CAST") constructor(sin: StreamInput) : this( sin.readString(), // monitorName sin.readInstant(), // periodStart sin.readInstant(), // periodEnd sin.readException(), // error InputRunResults.readFrom(sin), // inputResults - suppressWarning(sin.readMap()) // triggerResults + suppressWarning(sin.readMap()) as Map // triggerResults ) override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { @@ -88,7 +89,7 @@ data class MonitorRunResult( companion object { @JvmStatic @Throws(IOException::class) - fun readFrom(sin: StreamInput): MonitorRunResult { + fun readFrom(sin: StreamInput): MonitorRunResult { return MonitorRunResult(sin) } @@ -109,7 +110,11 @@ data class MonitorRunResult( } } -data class InputRunResults(val results: List> = listOf(), val error: Exception? = null) : Writeable, ToXContent { +data class InputRunResults( + val results: List> = listOf(), + val error: Exception? = null, + val aggTriggersAfterKey: MutableMap?>? = null +) : Writeable, ToXContent { override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { return builder.startObject() @@ -144,66 +149,14 @@ data class InputRunResults(val results: List> = listOf(), val e return map as Map } } -} -data class TriggerRunResult( - val triggerName: String, - val triggered: Boolean, - val error: Exception? = null, - val actionResults: MutableMap = mutableMapOf() -) : Writeable, ToXContent { - - @Throws(IOException::class) - constructor(sin: StreamInput) : this( - sin.readString(), // triggerName - sin.readBoolean(), // triggered - sin.readException(), // error - suppressWarning(sin.readMap()) // actionResults - ) - - override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { - var msg = error?.message - if (error is ScriptException) msg = error.toJsonString() - return builder.startObject() - .field("name", triggerName) - .field("triggered", triggered) - .field("error", msg) - .field("action_results", actionResults as Map) - .endObject() - } - - /** Returns error information to store in the Alert. Currently it's just the stack trace but it can be more */ - fun alertError(): AlertError? { - if (error != null) { - return AlertError(Instant.now(), "Failed evaluating trigger:\n${error.userErrorMessage()}") - } - for (actionResult in actionResults.values) { - if (actionResult.error != null) { - return AlertError(Instant.now(), "Failed running action:\n${actionResult.error.userErrorMessage()}") + fun afterKeysPresent(): Boolean { + aggTriggersAfterKey?.forEach { + if (it.value != null) { + return true } } - return null - } - - @Throws(IOException::class) - override fun writeTo(out: StreamOutput) { - out.writeString(triggerName) - out.writeBoolean(triggered) - out.writeException(error) - out.writeMap(actionResults as Map) - } - - companion object { - @JvmStatic - @Throws(IOException::class) - fun readFrom(sin: StreamInput): TriggerRunResult { - return TriggerRunResult(sin) - } - - @Suppress("UNCHECKED_CAST") - fun suppressWarning(map: MutableMap?): MutableMap { - return map as MutableMap - } + return false } } @@ -264,7 +217,7 @@ data class ActionRunResult( private val logger = LogManager.getLogger(MonitorRunResult::class.java) /** Constructs an error message from an exception suitable for human consumption. */ -private fun Throwable.userErrorMessage(): String { +fun Throwable.userErrorMessage(): String { return when { this is ScriptException -> this.scriptStack.joinToString(separator = "\n", limit = 100) this is OpenSearchException -> this.detailedMessage diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/model/QueryLevelTrigger.kt b/alerting/src/main/kotlin/org/opensearch/alerting/model/QueryLevelTrigger.kt new file mode 100644 index 000000000..442de276a --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/model/QueryLevelTrigger.kt @@ -0,0 +1,185 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.alerting.model + +import org.opensearch.alerting.model.Trigger.Companion.ACTIONS_FIELD +import org.opensearch.alerting.model.Trigger.Companion.ID_FIELD +import org.opensearch.alerting.model.Trigger.Companion.NAME_FIELD +import org.opensearch.alerting.model.Trigger.Companion.SEVERITY_FIELD +import org.opensearch.alerting.model.action.Action +import org.opensearch.common.CheckedFunction +import org.opensearch.common.ParseField +import org.opensearch.common.UUIDs +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.common.io.stream.StreamOutput +import org.opensearch.common.xcontent.NamedXContentRegistry +import org.opensearch.common.xcontent.ToXContent +import org.opensearch.common.xcontent.XContentBuilder +import org.opensearch.common.xcontent.XContentParser +import org.opensearch.common.xcontent.XContentParser.Token +import org.opensearch.common.xcontent.XContentParserUtils +import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken +import org.opensearch.script.Script +import java.io.IOException + +/** + * A single-alert Trigger that uses Painless scripts which execute on the response of the Monitor input query to define + * alerting conditions. + */ +data class QueryLevelTrigger( + override val id: String = UUIDs.base64UUID(), + override val name: String, + override val severity: String, + override val actions: List, + val condition: Script +) : Trigger { + + @Throws(IOException::class) + constructor(sin: StreamInput): this( + sin.readString(), // id + sin.readString(), // name + sin.readString(), // severity + sin.readList(::Action), // actions + Script(sin) // condition + ) + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + builder.startObject() + .startObject(QUERY_LEVEL_TRIGGER_FIELD) + .field(ID_FIELD, id) + .field(NAME_FIELD, name) + .field(SEVERITY_FIELD, severity) + .startObject(CONDITION_FIELD) + .field(SCRIPT_FIELD, condition) + .endObject() + .field(ACTIONS_FIELD, actions.toTypedArray()) + .endObject() + .endObject() + return builder + } + + override fun name(): String { + return QUERY_LEVEL_TRIGGER_FIELD + } + + /** Returns a representation of the trigger suitable for passing into painless and mustache scripts. */ + fun asTemplateArg(): Map { + return mapOf(ID_FIELD to id, NAME_FIELD to name, SEVERITY_FIELD to severity, + ACTIONS_FIELD to actions.map { it.asTemplateArg() }) + } + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + out.writeString(id) + out.writeString(name) + out.writeString(severity) + out.writeCollection(actions) + condition.writeTo(out) + } + + companion object { + const val QUERY_LEVEL_TRIGGER_FIELD = "query_level_trigger" + const val CONDITION_FIELD = "condition" + const val SCRIPT_FIELD = "script" + + val XCONTENT_REGISTRY = NamedXContentRegistry.Entry(Trigger::class.java, ParseField(QUERY_LEVEL_TRIGGER_FIELD), + CheckedFunction { parseInner(it) }) + + /** + * This parse method needs to account for both the old and new Trigger format. + * In the old format, only one Trigger existed (which is now QueryLevelTrigger) and it was + * not a named object. + * + * The parse() method in the Trigger interface needs to consume the outer START_OBJECT to be able + * to infer whether it is dealing with the old or new Trigger format. This means that the currentToken at + * the time this parseInner method is called could differ based on which format is being dealt with. + * + * Old Format + * ---------- + * { + * "id": ..., + * ^ + * Current token starts here + * "name" ..., + * ... + * } + * + * New Format + * ---------- + * { + * "query_level_trigger": { + * "id": ..., ^ Current token starts here + * "name": ..., + * ... + * } + * } + * + * It isn't typically conventional but this parse method will account for both START_OBJECT + * and FIELD_NAME as the starting token to cover both cases. + */ + @JvmStatic @Throws(IOException::class) + fun parseInner(xcp: XContentParser): QueryLevelTrigger { + var id = UUIDs.base64UUID() // assign a default triggerId if one is not specified + lateinit var name: String + lateinit var severity: String + lateinit var condition: Script + val actions: MutableList = mutableListOf() + + if (xcp.currentToken() != Token.START_OBJECT && xcp.currentToken() != Token.FIELD_NAME) { + XContentParserUtils.throwUnknownToken(xcp.currentToken(), xcp.tokenLocation) + } + + // If the parser began on START_OBJECT, move to the next token so that the while loop enters on + // the fieldName (or END_OBJECT if it's empty). + if (xcp.currentToken() == Token.START_OBJECT) xcp.nextToken() + + while (xcp.currentToken() != Token.END_OBJECT) { + val fieldName = xcp.currentName() + + xcp.nextToken() + when (fieldName) { + ID_FIELD -> id = xcp.text() + NAME_FIELD -> name = xcp.text() + SEVERITY_FIELD -> severity = xcp.text() + CONDITION_FIELD -> { + xcp.nextToken() + condition = Script.parse(xcp) + require(condition.lang == Script.DEFAULT_SCRIPT_LANG) { + "Invalid script language. Allowed languages are [${Script.DEFAULT_SCRIPT_LANG}]" + } + xcp.nextToken() + } + ACTIONS_FIELD -> { + ensureExpectedToken(Token.START_ARRAY, xcp.currentToken(), xcp) + while (xcp.nextToken() != Token.END_ARRAY) { + actions.add(Action.parse(xcp)) + } + } + } + xcp.nextToken() + } + + return QueryLevelTrigger( + name = requireNotNull(name) { "Trigger name is null" }, + severity = requireNotNull(severity) { "Trigger severity is null" }, + condition = requireNotNull(condition) { "Trigger condition is null" }, + actions = requireNotNull(actions) { "Trigger actions are null" }, + id = requireNotNull(id) { "Trigger id is null." }) + } + + @JvmStatic + @Throws(IOException::class) + fun readFrom(sin: StreamInput): QueryLevelTrigger { + return QueryLevelTrigger(sin) + } + } +} diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/model/QueryLevelTriggerRunResult.kt b/alerting/src/main/kotlin/org/opensearch/alerting/model/QueryLevelTriggerRunResult.kt new file mode 100644 index 000000000..0ed7f9b1a --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/model/QueryLevelTriggerRunResult.kt @@ -0,0 +1,72 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.alerting.model + +import org.opensearch.alerting.alerts.AlertError +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.common.io.stream.StreamOutput +import org.opensearch.common.xcontent.ToXContent +import org.opensearch.common.xcontent.XContentBuilder +import org.opensearch.script.ScriptException +import java.io.IOException +import java.time.Instant + +data class QueryLevelTriggerRunResult( + override var triggerName: String, + var triggered: Boolean, + override var error: Exception?, + var actionResults: MutableMap = mutableMapOf() +) : TriggerRunResult(triggerName, error) { + + @Throws(IOException::class) + @Suppress("UNCHECKED_CAST") + constructor(sin: StreamInput) : this( + triggerName = sin.readString(), + error = sin.readException(), + triggered = sin.readBoolean(), + actionResults = sin.readMap() as MutableMap + ) + + override fun alertError(): AlertError? { + if (error != null) { + return AlertError(Instant.now(), "Failed evaluating trigger:\n${error!!.userErrorMessage()}") + } + for (actionResult in actionResults.values) { + if (actionResult.error != null) { + return AlertError(Instant.now(), "Failed running action:\n${actionResult.error.userErrorMessage()}") + } + } + return null + } + + override fun internalXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + if (error is ScriptException) error = Exception((error as ScriptException).toJsonString(), error) + return builder + .field("triggered", triggered) + .field("action_results", actionResults as Map) + } + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + super.writeTo(out) + out.writeBoolean(triggered) + out.writeMap(actionResults as Map) + } + + companion object { + @JvmStatic + @Throws(IOException::class) + fun readFrom(sin: StreamInput): TriggerRunResult { + return QueryLevelTriggerRunResult(sin) + } + } +} diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/model/Trigger.kt b/alerting/src/main/kotlin/org/opensearch/alerting/model/Trigger.kt index 6cee62bfc..127240748 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/model/Trigger.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/model/Trigger.kt @@ -26,120 +26,77 @@ package org.opensearch.alerting.model +import org.opensearch.alerting.core.model.ScheduledJob.Companion.SCHEDULED_JOBS_INDEX import org.opensearch.alerting.model.action.Action -import org.opensearch.common.UUIDs import org.opensearch.common.io.stream.StreamInput -import org.opensearch.common.io.stream.StreamOutput import org.opensearch.common.io.stream.Writeable -import org.opensearch.common.xcontent.ToXContent -import org.opensearch.common.xcontent.XContentBuilder +import org.opensearch.common.xcontent.ToXContentObject import org.opensearch.common.xcontent.XContentParser import org.opensearch.common.xcontent.XContentParser.Token import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken -import org.opensearch.script.Script import java.io.IOException -data class Trigger( - val name: String, - val severity: String, - val condition: Script, - val actions: List, - val id: String = UUIDs.base64UUID() -) : Writeable, ToXContent { +interface Trigger : Writeable, ToXContentObject { - @Throws(IOException::class) - constructor(sin: StreamInput) : this( - sin.readString(), // name - sin.readString(), // severity - Script(sin), // condition - sin.readList(::Action), // actions - sin.readString() // id - ) - override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { - builder.startObject() - .field(ID_FIELD, id) - .field(NAME_FIELD, name) - .field(SEVERITY_FIELD, severity) - .startObject(CONDITION_FIELD) - .field(SCRIPT_FIELD, condition) - .endObject() - .field(ACTIONS_FIELD, actions.toTypedArray()) - .endObject() - return builder - } - - /** Returns a representation of the trigger suitable for passing into painless and mustache scripts. */ - fun asTemplateArg(): Map { - return mapOf( - ID_FIELD to id, NAME_FIELD to name, SEVERITY_FIELD to severity, - ACTIONS_FIELD to actions.map { it.asTemplateArg() } - ) - } + enum class Type(val value: String) { + QUERY_LEVEL_TRIGGER(QueryLevelTrigger.QUERY_LEVEL_TRIGGER_FIELD), + BUCKET_LEVEL_TRIGGER(BucketLevelTrigger.BUCKET_LEVEL_TRIGGER_FIELD); - @Throws(IOException::class) - override fun writeTo(out: StreamOutput) { - out.writeString(name) - out.writeString(severity) - condition.writeTo(out) - out.writeCollection(actions) - out.writeString(id) + override fun toString(): String { + return value + } } companion object { const val ID_FIELD = "id" const val NAME_FIELD = "name" const val SEVERITY_FIELD = "severity" - const val CONDITION_FIELD = "condition" const val ACTIONS_FIELD = "actions" - const val SCRIPT_FIELD = "script" - @JvmStatic @Throws(IOException::class) + @Throws(IOException::class) fun parse(xcp: XContentParser): Trigger { - var id = UUIDs.base64UUID() // assign a default triggerId if one is not specified - lateinit var name: String - lateinit var severity: String - lateinit var condition: Script - val actions: MutableList = mutableListOf() - ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp) + val trigger: Trigger - while (xcp.nextToken() != Token.END_OBJECT) { - val fieldName = xcp.currentName() - - xcp.nextToken() - when (fieldName) { - ID_FIELD -> id = xcp.text() - NAME_FIELD -> name = xcp.text() - SEVERITY_FIELD -> severity = xcp.text() - CONDITION_FIELD -> { - xcp.nextToken() - condition = Script.parse(xcp) - require(condition.lang == Script.DEFAULT_SCRIPT_LANG) { - "Invalid script language. Allowed languages are [${Script.DEFAULT_SCRIPT_LANG}]" - } - xcp.nextToken() - } - ACTIONS_FIELD -> { - ensureExpectedToken(Token.START_ARRAY, xcp.currentToken(), xcp) - while (xcp.nextToken() != Token.END_ARRAY) { - actions.add(Action.parse(xcp)) - } - } - } + ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp) + ensureExpectedToken(Token.FIELD_NAME, xcp.nextToken(), xcp) + val triggerTypeNames = Type.values().map { it.toString() } + if (triggerTypeNames.contains(xcp.currentName())) { + ensureExpectedToken(Token.START_OBJECT, xcp.nextToken(), xcp) + trigger = xcp.namedObject(Trigger::class.java, xcp.currentName(), null) + ensureExpectedToken(Token.END_OBJECT, xcp.nextToken(), xcp) + } else { + // Infer the old Trigger (now called QueryLevelTrigger) when it is not defined as a named + // object to remain backwards compatible when parsing the old format + trigger = QueryLevelTrigger.parseInner(xcp) + ensureExpectedToken(Token.END_OBJECT, xcp.currentToken(), xcp) } - - return Trigger( - name = requireNotNull(name) { "Trigger name is null" }, - severity = requireNotNull(severity) { "Trigger severity is null" }, - condition = requireNotNull(condition) { "Trigger is null" }, - actions = requireNotNull(actions) { "Trigger actions are null" }, - id = requireNotNull(id) { "Trigger id is null." } - ) + return trigger } @JvmStatic @Throws(IOException::class) fun readFrom(sin: StreamInput): Trigger { - return Trigger(sin) + return when (val type = sin.readEnum(Trigger.Type::class.java)) { + Type.QUERY_LEVEL_TRIGGER -> QueryLevelTrigger(sin) + Type.BUCKET_LEVEL_TRIGGER -> BucketLevelTrigger(sin) + // This shouldn't be reachable but ensuring exhaustiveness as Kotlin warns + // enum can be null in Java + else -> throw IllegalStateException("Unexpected input [$type] when reading Trigger") + } } } + + /** The id of the Trigger in the [SCHEDULED_JOBS_INDEX] */ + val id: String + + /** The name of the Trigger */ + val name: String + + /** The severity of the Trigger, used to classify the subsequent Alert */ + val severity: String + + /** The actions executed if the Trigger condition evaluates to true */ + val actions: List + + fun name(): String } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/model/TriggerRunResult.kt b/alerting/src/main/kotlin/org/opensearch/alerting/model/TriggerRunResult.kt new file mode 100644 index 000000000..cd05bcdff --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/model/TriggerRunResult.kt @@ -0,0 +1,61 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.alerting.model + +import org.opensearch.alerting.alerts.AlertError +import org.opensearch.common.io.stream.StreamOutput +import org.opensearch.common.io.stream.Writeable +import org.opensearch.common.xcontent.ToXContent +import org.opensearch.common.xcontent.XContentBuilder +import java.io.IOException +import java.time.Instant + +abstract class TriggerRunResult( + open var triggerName: String, + open var error: Exception? = null +) : Writeable, ToXContent { + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + builder.startObject() + .field("name", triggerName) + + internalXContent(builder, params) + val msg = error?.message + + builder.field("error", msg) + .endObject() + return builder + } + + abstract fun internalXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder + + /** Returns error information to store in the Alert. Currently it's just the stack trace but it can be more */ + open fun alertError(): AlertError? { + if (error != null) { + return AlertError(Instant.now(), "Failed evaluating trigger:\n${error!!.userErrorMessage()}") + } + return null + } + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + out.writeString(triggerName) + out.writeException(error) + } + + companion object { + @Suppress("UNCHECKED_CAST") + fun suppressWarning(map: MutableMap?): MutableMap { + return map as MutableMap + } + } +} diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/script/BucketLevelTriggerExecutionContext.kt b/alerting/src/main/kotlin/org/opensearch/alerting/script/BucketLevelTriggerExecutionContext.kt new file mode 100644 index 000000000..9da6b5b75 --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/script/BucketLevelTriggerExecutionContext.kt @@ -0,0 +1,55 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.alerting.script + +import org.opensearch.alerting.model.BucketLevelTrigger +import org.opensearch.alerting.model.BucketLevelTriggerRunResult +import org.opensearch.alerting.model.Alert +import org.opensearch.alerting.model.Monitor +import org.opensearch.alerting.model.MonitorRunResult +import java.time.Instant + +data class BucketLevelTriggerExecutionContext( + override val monitor: Monitor, + val trigger: BucketLevelTrigger, + override val results: List>, + override val periodStart: Instant, + override val periodEnd: Instant, + val dedupedAlerts: List = listOf(), + val newAlerts: List = listOf(), + val completedAlerts: List = listOf(), + override val error: Exception? = null +) : TriggerExecutionContext(monitor, results, periodStart, periodEnd, error) { + + constructor( + monitor: Monitor, + trigger: BucketLevelTrigger, + monitorRunResult: MonitorRunResult, + dedupedAlerts: List = listOf(), + newAlerts: List = listOf(), + completedAlerts: List = listOf() + ) : this(monitor, trigger, monitorRunResult.inputResults.results, monitorRunResult.periodStart, monitorRunResult.periodEnd, + dedupedAlerts, newAlerts, completedAlerts, monitorRunResult.scriptContextError(trigger)) + + /** + * Mustache templates need special permissions to reflectively introspect field names. To avoid doing this we + * translate the context to a Map of Strings to primitive types, which can be accessed without reflection. + */ + override fun asTemplateArg(): Map { + val tempArg = super.asTemplateArg().toMutableMap() + tempArg["trigger"] = trigger.asTemplateArg() + tempArg["dedupedAlerts"] = dedupedAlerts.map { it.asTemplateArg() } + tempArg["newAlerts"] = newAlerts.map { it.asTemplateArg() } + tempArg["completedAlerts"] = completedAlerts.map { it.asTemplateArg() } + return tempArg + } +} diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/script/QueryLevelTriggerExecutionContext.kt b/alerting/src/main/kotlin/org/opensearch/alerting/script/QueryLevelTriggerExecutionContext.kt new file mode 100644 index 000000000..566d503ab --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/script/QueryLevelTriggerExecutionContext.kt @@ -0,0 +1,49 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.alerting.script + +import org.opensearch.alerting.model.Alert +import org.opensearch.alerting.model.Monitor +import org.opensearch.alerting.model.MonitorRunResult +import org.opensearch.alerting.model.QueryLevelTrigger +import org.opensearch.alerting.model.QueryLevelTriggerRunResult +import java.time.Instant + +data class QueryLevelTriggerExecutionContext( + override val monitor: Monitor, + val trigger: QueryLevelTrigger, + override val results: List>, + override val periodStart: Instant, + override val periodEnd: Instant, + val alert: Alert? = null, + override val error: Exception? = null +) : TriggerExecutionContext(monitor, results, periodStart, periodEnd, error) { + + constructor( + monitor: Monitor, + trigger: QueryLevelTrigger, + monitorRunResult: MonitorRunResult, + alert: Alert? = null + ) : this(monitor, trigger, monitorRunResult.inputResults.results, monitorRunResult.periodStart, monitorRunResult.periodEnd, + alert, monitorRunResult.scriptContextError(trigger)) + + /** + * Mustache templates need special permissions to reflectively introspect field names. To avoid doing this we + * translate the context to a Map of Strings to primitive types, which can be accessed without reflection. + */ + override fun asTemplateArg(): Map { + val tempArg = super.asTemplateArg().toMutableMap() + tempArg["trigger"] = trigger.asTemplateArg() + tempArg["alert"] = alert?.asTemplateArg() + return tempArg + } +} diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/script/TriggerExecutionContext.kt b/alerting/src/main/kotlin/org/opensearch/alerting/script/TriggerExecutionContext.kt index 0f6c65d6f..46c447fbc 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/script/TriggerExecutionContext.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/script/TriggerExecutionContext.kt @@ -26,40 +26,33 @@ package org.opensearch.alerting.script -import org.opensearch.alerting.model.Alert import org.opensearch.alerting.model.Monitor import org.opensearch.alerting.model.MonitorRunResult import org.opensearch.alerting.model.Trigger import java.time.Instant -data class TriggerExecutionContext( - val monitor: Monitor, - val trigger: Trigger, - val results: List>, - val periodStart: Instant, - val periodEnd: Instant, - val alert: Alert? = null, - val error: Exception? = null +abstract class TriggerExecutionContext( + open val monitor: Monitor, + open val results: List>, + open val periodStart: Instant, + open val periodEnd: Instant, + open val error: Exception? = null ) { - constructor(monitor: Monitor, trigger: Trigger, monitorRunResult: MonitorRunResult, alert: Alert? = null) : - this( - monitor, trigger, monitorRunResult.inputResults.results, monitorRunResult.periodStart, - monitorRunResult.periodEnd, alert, monitorRunResult.scriptContextError(trigger) - ) + constructor(monitor: Monitor, trigger: Trigger, monitorRunResult: MonitorRunResult<*>): + this(monitor, monitorRunResult.inputResults.results, monitorRunResult.periodStart, + monitorRunResult.periodEnd, monitorRunResult.scriptContextError(trigger)) /** * Mustache templates need special permissions to reflectively introspect field names. To avoid doing this we * translate the context to a Map of Strings to primitive types, which can be accessed without reflection. */ - fun asTemplateArg(): Map { + open fun asTemplateArg(): Map { return mapOf( "monitor" to monitor.asTemplateArg(), - "trigger" to trigger.asTemplateArg(), "results" to results, "periodStart" to periodStart, "periodEnd" to periodEnd, - "alert" to alert?.asTemplateArg(), "error" to error ) } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/script/TriggerScript.kt b/alerting/src/main/kotlin/org/opensearch/alerting/script/TriggerScript.kt index 722f8c9a0..3cf345be6 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/script/TriggerScript.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/script/TriggerScript.kt @@ -58,7 +58,7 @@ abstract class TriggerScript(_scriptParams: Map) { * * @param ctx - the trigger execution context */ - abstract fun execute(ctx: TriggerExecutionContext): Boolean + abstract fun execute(ctx: QueryLevelTriggerExecutionContext): Boolean interface Factory { fun newInstance(scriptParams: Map): TriggerScript diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/AlertingUtils.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/AlertingUtils.kt index a44a278d1..3563cc2bc 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/util/AlertingUtils.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/AlertingUtils.kt @@ -31,6 +31,7 @@ import org.opensearch.OpenSearchStatusException import org.opensearch.action.ActionListener import org.opensearch.alerting.destination.message.BaseMessage import org.opensearch.alerting.model.AggregationResultBucket +import org.opensearch.alerting.model.Monitor import org.opensearch.alerting.model.action.Action import org.opensearch.alerting.model.action.ActionExecutionScope import org.opensearch.alerting.model.destination.Destination @@ -143,6 +144,8 @@ fun checkUserFilterByPermissions( return true } +fun Monitor.isBucketLevelMonitor(): Boolean = this.monitorType == Monitor.MonitorType.BUCKET_LEVEL_MONITOR + /** * Since buckets can have multi-value keys, this converts the bucket key values to a string that can be used * as the key for a HashMap to easily retrieve [AggregationResultBucket] based on the bucket key values. diff --git a/alerting/src/main/resources/org/opensearch/alerting/alerts/alert_mapping.json b/alerting/src/main/resources/org/opensearch/alerting/alerts/alert_mapping.json index c30dfbfd1..4ebba38fa 100644 --- a/alerting/src/main/resources/org/opensearch/alerting/alerts/alert_mapping.json +++ b/alerting/src/main/resources/org/opensearch/alerting/alerts/alert_mapping.json @@ -4,7 +4,7 @@ "required": true }, "_meta" : { - "schema_version": 2 + "schema_version": 3 }, "properties": { "schema_version": { @@ -125,6 +125,17 @@ "type": "integer" } } + }, + "agg_alert_content": { + "dynamic": true, + "properties": { + "parent_bucket_path": { + "type": "text" + }, + "bucket_key": { + "type": "text" + } + } } } } \ No newline at end of file diff --git a/alerting/src/main/resources/org/opensearch/alerting/org.opensearch.alerting.txt b/alerting/src/main/resources/org/opensearch/alerting/org.opensearch.alerting.txt index a5d7b9cea..8359204ca 100644 --- a/alerting/src/main/resources/org/opensearch/alerting/org.opensearch.alerting.txt +++ b/alerting/src/main/resources/org/opensearch/alerting/org.opensearch.alerting.txt @@ -4,7 +4,7 @@ class org.opensearch.alerting.script.TriggerScript { Map getParams() - boolean execute(TriggerExecutionContext) + boolean execute(QueryLevelTriggerExecutionContext) String[] PARAMETERS } @@ -14,7 +14,15 @@ class org.opensearch.alerting.script.TriggerScript$Factory { class org.opensearch.alerting.script.TriggerExecutionContext { Monitor getMonitor() - Trigger getTrigger() + List getResults() + java.time.Instant getPeriodStart() + java.time.Instant getPeriodEnd() + Exception getError() +} + +class org.opensearch.alerting.script.QueryLevelTriggerExecutionContext { + Monitor getMonitor() + QueryLevelTrigger getTrigger() List getResults() java.time.Instant getPeriodStart() java.time.Instant getPeriodEnd() @@ -29,17 +37,13 @@ class org.opensearch.alerting.model.Monitor { boolean getEnabled() } -class org.opensearch.alerting.model.Trigger { +class org.opensearch.alerting.model.QueryLevelTrigger { String getId() String getName() String getSeverity() List getActions() } -class org.opensearch.alerting.model.action.Action { - String getName() -} - class org.opensearch.alerting.model.Alert { String getId() long getVersion() diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/ADTestHelpers.kt b/alerting/src/test/kotlin/org/opensearch/alerting/ADTestHelpers.kt index 996419411..18204c559 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/ADTestHelpers.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/ADTestHelpers.kt @@ -30,6 +30,7 @@ import org.opensearch.alerting.core.model.IntervalSchedule import org.opensearch.alerting.core.model.Schedule import org.opensearch.alerting.core.model.SearchInput import org.opensearch.alerting.model.Monitor +import org.opensearch.alerting.model.QueryLevelTrigger import org.opensearch.alerting.model.Trigger import org.opensearch.commons.authuser.User import org.opensearch.index.query.BoolQueryBuilder @@ -490,12 +491,12 @@ fun maxAnomalyGradeSearchInput( return SearchInput(indices = listOf(adResultIndex), query = searchSourceBuilder) } -fun adMonitorTrigger(): Trigger { +fun adMonitorTrigger(): QueryLevelTrigger { val triggerScript = """ return ctx.results[0].aggregations.max_anomaly_grade.value != null && ctx.results[0].aggregations.max_anomaly_grade.value > 0.7 """.trimIndent() - return randomTrigger(condition = Script(triggerScript)) + return randomQueryLevelTrigger(condition = Script(triggerScript)) } fun adSearchInput(detectorId: String): SearchInput { @@ -508,21 +509,17 @@ fun randomADMonitor( inputs: List = listOf(adSearchInput("test_detector_id")), schedule: Schedule = IntervalSchedule(interval = 5, unit = ChronoUnit.MINUTES), enabled: Boolean = OpenSearchTestCase.randomBoolean(), - triggers: List = (1..OpenSearchTestCase.randomInt(10)).map { randomTrigger() }, + triggers: List = (1..OpenSearchTestCase.randomInt(10)).map { randomQueryLevelTrigger() }, enabledTime: Instant? = if (enabled) Instant.now().truncatedTo(ChronoUnit.MILLIS) else null, lastUpdateTime: Instant = Instant.now().truncatedTo(ChronoUnit.MILLIS), withMetadata: Boolean = false ): Monitor { - return Monitor( - name = name, enabled = enabled, inputs = inputs, schedule = schedule, triggers = triggers, - enabledTime = enabledTime, lastUpdateTime = lastUpdateTime, - user = user, uiMetadata = if (withMetadata) mapOf("foo" to "bar") else mapOf() - ) + return Monitor(name = name, monitorType = Monitor.MonitorType.QUERY_LEVEL_MONITOR, enabled = enabled, inputs = inputs, + schedule = schedule, triggers = triggers, enabledTime = enabledTime, lastUpdateTime = lastUpdateTime, + user = user, uiMetadata = if (withMetadata) mapOf("foo" to "bar") else mapOf()) } fun randomADUser(backendRole: String = OpenSearchRestTestCase.randomAlphaOfLength(10)): User { - return User( - OpenSearchRestTestCase.randomAlphaOfLength(10), listOf(backendRole), - listOf(OpenSearchRestTestCase.randomAlphaOfLength(10), "all_access"), listOf("test_attr=test") - ) + return User(OpenSearchRestTestCase.randomAlphaOfLength(10), listOf(backendRole), + listOf(OpenSearchRestTestCase.randomAlphaOfLength(10), "all_access"), listOf("test_attr=test")) } diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt b/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt index 703d2c58e..494338737 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt @@ -43,7 +43,9 @@ import org.opensearch.alerting.core.model.SearchInput import org.opensearch.alerting.core.settings.ScheduledJobSettings import org.opensearch.alerting.elasticapi.string import org.opensearch.alerting.model.Alert +import org.opensearch.alerting.model.BucketLevelTrigger import org.opensearch.alerting.model.Monitor +import org.opensearch.alerting.model.QueryLevelTrigger import org.opensearch.alerting.model.destination.Destination import org.opensearch.alerting.model.destination.email.EmailAccount import org.opensearch.alerting.model.destination.email.EmailGroup @@ -72,6 +74,9 @@ import java.net.URLEncoder import java.nio.file.Files import java.nio.file.Path import java.time.Instant +import java.time.ZonedDateTime +import java.time.format.DateTimeFormatter +import java.time.temporal.ChronoUnit import java.util.Locale import javax.management.MBeanServerInvocationHandler import javax.management.ObjectName @@ -88,10 +93,10 @@ abstract class AlertingRestTestCase : ODFERestTestCase() { return NamedXContentRegistry( mutableListOf( Monitor.XCONTENT_REGISTRY, - SearchInput.XCONTENT_REGISTRY - ) + - SearchModule(Settings.EMPTY, false, emptyList()).namedXContents - ) + SearchInput.XCONTENT_REGISTRY, + QueryLevelTrigger.XCONTENT_REGISTRY, + BucketLevelTrigger.XCONTENT_REGISTRY + ) + SearchModule(Settings.EMPTY, false, emptyList()).namedXContents) } fun Response.asMap(): Map { @@ -148,7 +153,7 @@ abstract class AlertingRestTestCase : ODFERestTestCase() { emptyMap(), destination.toHttpEntity() ) - assertEquals("Unable to create a new destination", RestStatus.OK, response.restStatus()) + assertEquals("Unable to delete destination", RestStatus.OK, response.restStatus()) return response } @@ -398,7 +403,7 @@ abstract class AlertingRestTestCase : ODFERestTestCase() { } protected fun createRandomMonitor(refresh: Boolean = false, withMetadata: Boolean = false): Monitor { - val monitor = randomMonitor(withMetadata = withMetadata) + val monitor = randomQueryLevelMonitor(withMetadata = withMetadata) val monitorId = createMonitor(monitor, refresh).id if (withMetadata) { return getMonitor(monitorId = monitorId, header = BasicHeader(HttpHeaders.USER_AGENT, "OpenSearch-Dashboards")) @@ -533,14 +538,22 @@ abstract class AlertingRestTestCase : ODFERestTestCase() { return response } + protected fun deleteDoc(index: String, id: String, refresh: Boolean = true): Response { + val params = if (refresh) mapOf("refresh" to "true") else mapOf() + val response = client().makeRequest("DELETE", "$index/_doc/$id", params) + assertTrue("Unable to delete doc with ID $id in index: '$index'", listOf(RestStatus.OK).contains(response.restStatus())) + return response + } + /** A test index that can be used across tests. Feel free to add new fields but don't remove any. */ protected fun createTestIndex(index: String = randomAlphaOfLength(10).toLowerCase(Locale.ROOT)): String { createIndex( index, Settings.EMPTY, """ - "properties" : { - "test_strict_date_time" : { "type" : "date", "format" : "strict_date_time" } - } + "properties" : { + "test_strict_date_time" : { "type" : "date", "format" : "strict_date_time" }, + "test_field" : { "type" : "keyword" } + } """.trimIndent() ) return index @@ -556,17 +569,37 @@ abstract class AlertingRestTestCase : ODFERestTestCase() { createIndex( index, Settings.builder().build(), """ - "properties" : { - "test_strict_date_time" : { "type" : "date", "format" : "strict_date_time" } - } - """.trimIndent() - ) + "properties" : { + "test_strict_date_time" : { "type" : "date", "format" : "strict_date_time" } + } + """.trimIndent()) } catch (ex: WarningFailureException) { // ignore } return index } + protected fun insertSampleTimeSerializedData(index: String, data: List) { + data.forEachIndexed { i, value -> + val twoMinsAgo = ZonedDateTime.now().minus(2, ChronoUnit.MINUTES).truncatedTo(ChronoUnit.MILLIS) + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(twoMinsAgo) + val testDoc = """ + { + "test_strict_date_time": "$testTime", + "test_field": "$value" + } + """.trimIndent() + // Indexing documents with deterministic doc id to allow for easy selected deletion during testing + indexDoc(index, (i + 1).toString(), testDoc) + } + } + + protected fun deleteDataWithDocIds(index: String, docIds: List) { + docIds.forEach { + deleteDoc(index, it) + } + } + fun putAlertMappings(mapping: String? = null) { val mappingHack = if (mapping != null) mapping else AlertIndices.alertMapping().trimStart('{').trimEnd('}') val encodedHistoryIndex = URLEncoder.encode(AlertIndices.HISTORY_INDEX_PATTERN, Charsets.UTF_8.toString()) diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerIT.kt index 2d4d0eb39..79ccb09cf 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerIT.kt @@ -66,7 +66,7 @@ class MonitorRunnerIT : AlertingRestTestCase() { fun `test execute monitor with dryrun`() { val action = randomAction(template = randomTemplateScript("Hello {{ctx.monitor.name}}"), destinationId = createDestination().id) - val monitor = randomMonitor(triggers = listOf(randomTrigger(condition = ALWAYS_RUN, actions = listOf(action)))) + val monitor = randomQueryLevelMonitor(triggers = listOf(randomQueryLevelTrigger(condition = ALWAYS_RUN, actions = listOf(action)))) val response = executeMonitor(monitor, params = DRYRUN_MONITOR) @@ -101,8 +101,8 @@ class MonitorRunnerIT : AlertingRestTestCase() { return ctx.results[0].hits.hits.size() == 1 """.trimIndent() - val trigger = randomTrigger(condition = Script(triggerScript)) - val monitor = randomMonitor(inputs = listOf(input), triggers = listOf(trigger)) + val trigger = randomQueryLevelTrigger(condition = Script(triggerScript)) + val monitor = randomQueryLevelMonitor(inputs = listOf(input), triggers = listOf(trigger)) val response = executeMonitor(monitor, params = DRYRUN_MONITOR) val output = entityAsMap(response) @@ -116,7 +116,7 @@ class MonitorRunnerIT : AlertingRestTestCase() { } fun `test execute monitor not triggered`() { - val monitor = randomMonitor(triggers = listOf(randomTrigger(condition = NEVER_RUN))) + val monitor = randomQueryLevelMonitor(triggers = listOf(randomQueryLevelTrigger(condition = NEVER_RUN))) val response = executeMonitor(monitor) @@ -132,7 +132,7 @@ class MonitorRunnerIT : AlertingRestTestCase() { fun `test active alert is updated on each run`() { val monitor = createMonitor( - randomMonitor(triggers = listOf(randomTrigger(condition = ALWAYS_RUN, destinationId = createDestination().id))) + randomQueryLevelMonitor(triggers = listOf(randomQueryLevelTrigger(condition = ALWAYS_RUN, destinationId = createDestination().id))) ) executeMonitor(monitor.id) @@ -158,9 +158,9 @@ class MonitorRunnerIT : AlertingRestTestCase() { createIndex("foo", Settings.EMPTY) val input = SearchInput(indices = listOf("foo"), query = SearchSourceBuilder().query(QueryBuilders.matchAllQuery())) val monitor = createMonitor( - randomMonitor( + randomQueryLevelMonitor( inputs = listOf(input), - triggers = listOf(randomTrigger(condition = NEVER_RUN)) + triggers = listOf(randomQueryLevelTrigger(condition = NEVER_RUN)) ) ) @@ -183,9 +183,9 @@ class MonitorRunnerIT : AlertingRestTestCase() { createIndex("foo", Settings.EMPTY) val input = SearchInput(indices = listOf("foo"), query = SearchSourceBuilder().query(QueryBuilders.matchAllQuery())) val monitor = createMonitor( - randomMonitor( + randomQueryLevelMonitor( inputs = listOf(input), - triggers = listOf(randomTrigger(condition = NEVER_RUN)) + triggers = listOf(randomQueryLevelTrigger(condition = NEVER_RUN)) ) ) @@ -204,9 +204,9 @@ class MonitorRunnerIT : AlertingRestTestCase() { createIndex("foo", Settings.EMPTY) val input = SearchInput(indices = listOf("foo"), query = SearchSourceBuilder().query(QueryBuilders.matchAllQuery())) val monitor = createMonitor( - randomMonitor( + randomQueryLevelMonitor( inputs = listOf(input), - triggers = listOf(randomTrigger(condition = ALWAYS_RUN, destinationId = destinationId)) + triggers = listOf(randomQueryLevelTrigger(condition = ALWAYS_RUN, destinationId = destinationId)) ) ) @@ -231,7 +231,7 @@ class MonitorRunnerIT : AlertingRestTestCase() { fun `test acknowledged alert is not updated unnecessarily`() { val monitor = createMonitor( - randomMonitor(triggers = listOf(randomTrigger(condition = ALWAYS_RUN, destinationId = createDestination().id))) + randomQueryLevelMonitor(triggers = listOf(randomQueryLevelTrigger(condition = ALWAYS_RUN, destinationId = createDestination().id))) ) executeMonitor(monitor.id) acknowledgeAlerts(monitor, searchAlerts(monitor).single()) @@ -253,8 +253,8 @@ class MonitorRunnerIT : AlertingRestTestCase() { } fun `test alert completion`() { - val trigger = randomTrigger(condition = Script("ctx.alert == null"), destinationId = createDestination().id) - val monitor = createMonitor(randomMonitor(triggers = listOf(trigger))) + val trigger = randomQueryLevelTrigger(condition = Script("ctx.alert == null"), destinationId = createDestination().id) + val monitor = createMonitor(randomQueryLevelMonitor(triggers = listOf(trigger))) executeMonitor(monitor.id) val activeAlert = searchAlerts(monitor).single() @@ -268,8 +268,8 @@ class MonitorRunnerIT : AlertingRestTestCase() { fun `test execute monitor script error`() { // This painless script should cause a syntax error - val trigger = randomTrigger(condition = Script("foo bar baz")) - val monitor = randomMonitor(triggers = listOf(trigger)) + val trigger = randomQueryLevelTrigger(condition = Script("foo bar baz")) + val monitor = randomQueryLevelMonitor(triggers = listOf(trigger)) val response = executeMonitor(monitor) @@ -286,7 +286,7 @@ class MonitorRunnerIT : AlertingRestTestCase() { fun `test execute action template error`() { // Intentional syntax error in mustache template val action = randomAction(template = randomTemplateScript("Hello {{ctx.monitor.name")) - val monitor = randomMonitor(triggers = listOf(randomTrigger(condition = ALWAYS_RUN, actions = listOf(action)))) + val monitor = randomQueryLevelMonitor(triggers = listOf(randomQueryLevelTrigger(condition = ALWAYS_RUN, actions = listOf(action)))) val response = executeMonitor(monitor) @@ -320,8 +320,8 @@ class MonitorRunnerIT : AlertingRestTestCase() { return ctx.results[0].hits.hits.size() > 0 """.trimIndent() val destinationId = createDestination().id - val trigger = randomTrigger(condition = Script(triggerScript), destinationId = destinationId) - val monitor = createMonitor(randomMonitor(inputs = listOf(input), triggers = listOf(trigger))) + val trigger = randomQueryLevelTrigger(condition = Script(triggerScript), destinationId = destinationId) + val monitor = createMonitor(randomQueryLevelMonitor(inputs = listOf(input), triggers = listOf(trigger))) val response = executeMonitor(monitor.id) @@ -355,8 +355,8 @@ class MonitorRunnerIT : AlertingRestTestCase() { // make sure there is exactly one hit return ctx.results[0].hits.hits.size() == 1 """.trimIndent() - val trigger = randomTrigger(condition = Script(triggerScript)) - val monitor = randomMonitor(inputs = listOf(input), triggers = listOf(trigger)) + val trigger = randomQueryLevelTrigger(condition = Script(triggerScript)) + val monitor = randomQueryLevelMonitor(inputs = listOf(input), triggers = listOf(trigger)) val response = executeMonitor(monitor) @@ -397,8 +397,8 @@ class MonitorRunnerIT : AlertingRestTestCase() { // make sure there is exactly one hit return ctx.results[0].hits.hits.size() == 1 """.trimIndent() - val trigger = randomTrigger(condition = Script(triggerScript)) - val monitor = randomMonitor(inputs = listOf(input), triggers = listOf(trigger)) + val trigger = randomQueryLevelTrigger(condition = Script(triggerScript)) + val monitor = randomQueryLevelMonitor(inputs = listOf(input), triggers = listOf(trigger)) val response = executeMonitor(monitor, params = DRYRUN_MONITOR) @@ -420,7 +420,7 @@ class MonitorRunnerIT : AlertingRestTestCase() { destinationId = createDestination().id ) val actions = listOf(goodAction, syntaxErrorAction) - val monitor = createMonitor(randomMonitor(triggers = listOf(randomTrigger(condition = ALWAYS_RUN, actions = actions)))) + val monitor = createMonitor(randomQueryLevelMonitor(triggers = listOf(randomQueryLevelTrigger(condition = ALWAYS_RUN, actions = actions)))) val output = entityAsMap(executeMonitor(monitor.id)) @@ -447,8 +447,8 @@ class MonitorRunnerIT : AlertingRestTestCase() { putAlertMappings() // Required as we do not have a create alert API. // This template script has a parsing error to purposefully create an errorMessage during runMonitor val action = randomAction(template = randomTemplateScript("Hello {{ctx.monitor.name")) - val trigger = randomTrigger(condition = ALWAYS_RUN, actions = listOf(action)) - val monitor = createMonitor(randomMonitor(triggers = listOf(trigger))) + val trigger = randomQueryLevelTrigger(condition = ALWAYS_RUN, actions = listOf(action)) + val monitor = createMonitor(randomQueryLevelMonitor(triggers = listOf(trigger))) val listOfFiveErrorMessages = (1..5).map { i -> AlertError(timestamp = Instant.now(), message = "error message $i") } val activeAlert = createAlert( randomAlert(monitor).copy( @@ -473,7 +473,7 @@ class MonitorRunnerIT : AlertingRestTestCase() { fun `test latest error is not lost when alert is completed`() { // Creates an active alert the first time it's run and completes it the second time the monitor is run. - val trigger = randomTrigger( + val trigger = randomQueryLevelTrigger( condition = Script( """ if (ctx.alert == null) { @@ -484,7 +484,7 @@ class MonitorRunnerIT : AlertingRestTestCase() { """.trimIndent() ) ) - val monitor = createMonitor(randomMonitor(triggers = listOf(trigger))) + val monitor = createMonitor(randomQueryLevelMonitor(triggers = listOf(trigger))) executeMonitor(monitor.id) val errorAlert = searchAlerts(monitor).single() @@ -501,14 +501,14 @@ class MonitorRunnerIT : AlertingRestTestCase() { fun `test throw script exception`() { // Creates an active alert the first time it's run and completes it the second time the monitor is run. - val trigger = randomTrigger( + val trigger = randomQueryLevelTrigger( condition = Script( """ param[0]; return true """.trimIndent() ) ) - val monitor = createMonitor(randomMonitor(triggers = listOf(trigger))) + val monitor = createMonitor(randomQueryLevelMonitor(triggers = listOf(trigger))) executeMonitor(monitor.id) val errorAlert = searchAlerts(monitor).single() @@ -524,8 +524,8 @@ class MonitorRunnerIT : AlertingRestTestCase() { putAlertMappings() // Required as we do not have a create alert API. // This template script has a parsing error to purposefully create an errorMessage during runMonitor val action = randomAction(template = randomTemplateScript("Hello {{ctx.monitor.name")) - val trigger = randomTrigger(condition = ALWAYS_RUN, actions = listOf(action)) - val monitor = createMonitor(randomMonitor(triggers = listOf(trigger))) + val trigger = randomQueryLevelTrigger(condition = ALWAYS_RUN, actions = listOf(action)) + val monitor = createMonitor(randomQueryLevelMonitor(triggers = listOf(trigger))) val listOfTenErrorMessages = (1..10).map { i -> AlertError(timestamp = Instant.now(), message = "error message $i") } val activeAlert = createAlert( randomAlert(monitor).copy( @@ -551,8 +551,8 @@ class MonitorRunnerIT : AlertingRestTestCase() { fun `test execute monitor creates alert for trigger with no actions`() { putAlertMappings() // Required as we do not have a create alert API. - val trigger = randomTrigger(condition = ALWAYS_RUN, actions = emptyList(), destinationId = createDestination().id) - val monitor = createMonitor(randomMonitor(triggers = listOf(trigger))) + val trigger = randomQueryLevelTrigger(condition = ALWAYS_RUN, actions = emptyList(), destinationId = createDestination().id) + val monitor = createMonitor(randomQueryLevelMonitor(triggers = listOf(trigger))) executeMonitor(monitor.id) @@ -563,9 +563,9 @@ class MonitorRunnerIT : AlertingRestTestCase() { fun `test execute monitor non-dryrun`() { val monitor = createMonitor( - randomMonitor( + randomQueryLevelMonitor( triggers = listOf( - randomTrigger( + randomQueryLevelTrigger( condition = ALWAYS_RUN, actions = listOf(randomAction(destinationId = createDestination().id)) ) @@ -583,9 +583,9 @@ class MonitorRunnerIT : AlertingRestTestCase() { fun `test execute monitor with already active alert`() { val monitor = createMonitor( - randomMonitor( + randomQueryLevelMonitor( triggers = listOf( - randomTrigger( + randomQueryLevelTrigger( condition = ALWAYS_RUN, actions = listOf(randomAction(destinationId = createDestination().id)) ) @@ -612,7 +612,7 @@ class MonitorRunnerIT : AlertingRestTestCase() { putAlertMappings() val newMonitor = createMonitor( - randomMonitor(triggers = listOf(randomTrigger(condition = NEVER_RUN, actions = listOf(randomAction())))) + randomQueryLevelMonitor(triggers = listOf(randomQueryLevelTrigger(condition = NEVER_RUN, actions = listOf(randomAction())))) ) val deleteNewMonitorResponse = client().makeRequest("DELETE", "$ALERTING_BASE_URI/${newMonitor.id}") @@ -620,7 +620,7 @@ class MonitorRunnerIT : AlertingRestTestCase() { } fun `test update monitor stays on schedule`() { - val monitor = createMonitor(randomMonitor(enabled = true)) + val monitor = createMonitor(randomQueryLevelMonitor(enabled = true)) updateMonitor(monitor.copy(enabledTime = Instant.now())) @@ -629,19 +629,19 @@ class MonitorRunnerIT : AlertingRestTestCase() { } fun `test enabled time by disabling and re-enabling monitor`() { - val monitor = createMonitor(randomMonitor(enabled = true)) + val monitor = createMonitor(randomQueryLevelMonitor(enabled = true)) assertNotNull("Enabled time is null on a enabled monitor.", getMonitor(monitor.id).enabledTime) - val disabledMonitor = updateMonitor(randomMonitor(enabled = false).copy(id = monitor.id)) + val disabledMonitor = updateMonitor(randomQueryLevelMonitor(enabled = false).copy(id = monitor.id)) assertNull("Enabled time is not null on a disabled monitor.", disabledMonitor.enabledTime) - val enabledMonitor = updateMonitor(randomMonitor(enabled = true).copy(id = monitor.id)) + val enabledMonitor = updateMonitor(randomQueryLevelMonitor(enabled = true).copy(id = monitor.id)) assertNotNull("Enabled time is null on a enabled monitor.", enabledMonitor.enabledTime) } fun `test enabled time by providing enabled time`() { val enabledTime = Instant.ofEpochSecond(1538164858L) // This is 2018-09-27 20:00:58 GMT - val monitor = createMonitor(randomMonitor(enabled = true, enabledTime = enabledTime)) + val monitor = createMonitor(randomQueryLevelMonitor(enabled = true, enabledTime = enabledTime)) val retrievedMonitor = getMonitor(monitorId = monitor.id) assertTrue("Monitor is not enabled", retrievedMonitor.enabled) @@ -661,8 +661,8 @@ class MonitorRunnerIT : AlertingRestTestCase() { ) val actions = listOf(actionThrottleEnabled, actionThrottleNotEnabled) val monitor = createMonitor( - randomMonitor( - triggers = listOf(randomTrigger(condition = ALWAYS_RUN, actions = actions)), + randomQueryLevelMonitor( + triggers = listOf(randomQueryLevelTrigger(condition = ALWAYS_RUN, actions = actions)), schedule = IntervalSchedule(interval = 1, unit = ChronoUnit.MINUTES) ) ) @@ -716,9 +716,9 @@ class MonitorRunnerIT : AlertingRestTestCase() { throttleEnabled = true, throttle = Throttle(value = 5, unit = MINUTES) ) val actions = listOf(actionThrottleEnabled) - val trigger = randomTrigger(condition = ALWAYS_RUN, actions = actions) + val trigger = randomQueryLevelTrigger(condition = ALWAYS_RUN, actions = actions) val monitor = createMonitor( - randomMonitor( + randomQueryLevelMonitor( triggers = listOf(trigger), schedule = IntervalSchedule(interval = 1, unit = ChronoUnit.MINUTES) ) @@ -777,8 +777,8 @@ class MonitorRunnerIT : AlertingRestTestCase() { ) ) val action = randomAction(destinationId = destination.id) - val trigger = randomTrigger(condition = ALWAYS_RUN, actions = listOf(action)) - val monitor = createMonitor(randomMonitor(triggers = listOf(trigger))) + val trigger = randomQueryLevelTrigger(condition = ALWAYS_RUN, actions = listOf(action)) + val monitor = createMonitor(randomQueryLevelMonitor(triggers = listOf(trigger))) executeMonitor(monitor.id) @@ -803,8 +803,8 @@ class MonitorRunnerIT : AlertingRestTestCase() { ) ) val action = randomAction(destinationId = destination.id) - val trigger = randomTrigger(condition = ALWAYS_RUN, actions = listOf(action)) - val monitor = createMonitor(randomMonitor(triggers = listOf(trigger))) + val trigger = randomQueryLevelTrigger(condition = ALWAYS_RUN, actions = listOf(action)) + val monitor = createMonitor(randomQueryLevelMonitor(triggers = listOf(trigger))) executeMonitor(adminClient(), monitor.id) val alerts = searchAlerts(monitor) @@ -832,8 +832,8 @@ class MonitorRunnerIT : AlertingRestTestCase() { ) ) val action = randomAction(destinationId = destination.id) - val trigger = randomTrigger(condition = ALWAYS_RUN, actions = listOf(action)) - val monitor = createMonitor(randomMonitor(triggers = listOf(trigger))) + val trigger = randomQueryLevelTrigger(condition = ALWAYS_RUN, actions = listOf(action)) + val monitor = createMonitor(randomQueryLevelMonitor(triggers = listOf(trigger))) executeMonitor(adminClient(), monitor.id) val alerts = searchAlerts(monitor) diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorTests.kt b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorTests.kt index 46c0bde8e..0e8f05ec0 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorTests.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorTests.kt @@ -34,7 +34,7 @@ import java.time.Instant class MonitorTests : OpenSearchTestCase() { fun `test enabled time`() { - val monitor = randomMonitor() + val monitor = randomQueryLevelMonitor() val enabledMonitor = monitor.copy(enabled = true, enabledTime = Instant.now()) try { enabledMonitor.copy(enabled = false) @@ -52,11 +52,11 @@ class MonitorTests : OpenSearchTestCase() { } fun `test max triggers`() { - val monitor = randomMonitor() + val monitor = randomQueryLevelMonitor() val tooManyTriggers = mutableListOf() for (i in 0..10) { - tooManyTriggers.add(randomTrigger()) + tooManyTriggers.add(randomQueryLevelTrigger()) } try { diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt b/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt index ed62473df..09756e1e2 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt @@ -25,8 +25,8 @@ */ package org.opensearch.alerting -import org.apache.http.Header -import org.apache.http.HttpEntity +import org.opensearch.alerting.aggregation.bucketselectorext.BucketSelectorExtAggregationBuilder +import org.opensearch.alerting.aggregation.bucketselectorext.BucketSelectorExtFilter import org.opensearch.alerting.core.model.Input import org.opensearch.alerting.core.model.IntervalSchedule import org.opensearch.alerting.core.model.Schedule @@ -34,17 +34,30 @@ import org.opensearch.alerting.core.model.SearchInput import org.opensearch.alerting.elasticapi.string import org.opensearch.alerting.model.ActionExecutionResult import org.opensearch.alerting.model.ActionRunResult +import org.opensearch.alerting.model.AggregationResultBucket +import org.opensearch.alerting.model.BucketLevelTrigger +import org.opensearch.alerting.model.BucketLevelTriggerRunResult import org.opensearch.alerting.model.Alert import org.opensearch.alerting.model.InputRunResults import org.opensearch.alerting.model.Monitor import org.opensearch.alerting.model.MonitorRunResult +import org.opensearch.alerting.model.QueryLevelTrigger +import org.opensearch.alerting.model.QueryLevelTriggerRunResult import org.opensearch.alerting.model.Trigger -import org.opensearch.alerting.model.TriggerRunResult import org.opensearch.alerting.model.action.Action +import org.opensearch.alerting.model.action.ActionExecutionScope +import org.opensearch.alerting.model.action.ActionExecutionPolicy +import org.opensearch.alerting.model.action.AlertCategory +import org.opensearch.alerting.model.action.PerAlertActionScope +import org.opensearch.alerting.model.action.PerExecutionActionScope import org.opensearch.alerting.model.action.Throttle import org.opensearch.alerting.model.destination.email.EmailAccount import org.opensearch.alerting.model.destination.email.EmailEntry import org.opensearch.alerting.model.destination.email.EmailGroup +import org.opensearch.alerting.util.getBucketKeysHash +import org.opensearch.commons.authuser.User +import org.apache.http.Header +import org.apache.http.HttpEntity import org.opensearch.client.Request import org.opensearch.client.RequestOptions import org.opensearch.client.Response @@ -59,70 +72,120 @@ import org.opensearch.common.xcontent.XContentBuilder import org.opensearch.common.xcontent.XContentFactory import org.opensearch.common.xcontent.XContentParser import org.opensearch.common.xcontent.XContentType -import org.opensearch.commons.authuser.User import org.opensearch.index.query.QueryBuilders import org.opensearch.script.Script import org.opensearch.script.ScriptType import org.opensearch.search.SearchModule +import org.opensearch.search.aggregations.bucket.terms.IncludeExclude +import org.opensearch.search.aggregations.bucket.terms.TermsAggregationBuilder import org.opensearch.search.builder.SearchSourceBuilder -import org.opensearch.test.OpenSearchTestCase +import org.opensearch.test.OpenSearchTestCase.randomBoolean import org.opensearch.test.OpenSearchTestCase.randomInt import org.opensearch.test.OpenSearchTestCase.randomIntBetween import org.opensearch.test.rest.OpenSearchRestTestCase import java.time.Instant import java.time.temporal.ChronoUnit -fun randomMonitor( +fun randomQueryLevelMonitor( name: String = OpenSearchRestTestCase.randomAlphaOfLength(10), user: User = randomUser(), inputs: List = listOf(SearchInput(emptyList(), SearchSourceBuilder().query(QueryBuilders.matchAllQuery()))), schedule: Schedule = IntervalSchedule(interval = 5, unit = ChronoUnit.MINUTES), - enabled: Boolean = OpenSearchTestCase.randomBoolean(), - triggers: List = (1..randomInt(10)).map { randomTrigger() }, + enabled: Boolean = randomBoolean(), + triggers: List = (1..randomInt(10)).map { randomQueryLevelTrigger() }, enabledTime: Instant? = if (enabled) Instant.now().truncatedTo(ChronoUnit.MILLIS) else null, lastUpdateTime: Instant = Instant.now().truncatedTo(ChronoUnit.MILLIS), withMetadata: Boolean = false ): Monitor { - return Monitor( - name = name, enabled = enabled, inputs = inputs, schedule = schedule, triggers = triggers, - enabledTime = enabledTime, lastUpdateTime = lastUpdateTime, - user = user, uiMetadata = if (withMetadata) mapOf("foo" to "bar") else mapOf() - ) + return Monitor(name = name, monitorType = Monitor.MonitorType.QUERY_LEVEL_MONITOR, enabled = enabled, inputs = inputs, + schedule = schedule, triggers = triggers, enabledTime = enabledTime, lastUpdateTime = lastUpdateTime, user = user, + uiMetadata = if (withMetadata) mapOf("foo" to "bar") else mapOf()) } // Monitor of older versions without security. -fun randomMonitorWithoutUser( +fun randomQueryLevelMonitorWithoutUser( name: String = OpenSearchRestTestCase.randomAlphaOfLength(10), inputs: List = listOf(SearchInput(emptyList(), SearchSourceBuilder().query(QueryBuilders.matchAllQuery()))), schedule: Schedule = IntervalSchedule(interval = 5, unit = ChronoUnit.MINUTES), - enabled: Boolean = OpenSearchTestCase.randomBoolean(), - triggers: List = (1..randomInt(10)).map { randomTrigger() }, + enabled: Boolean = randomBoolean(), + triggers: List = (1..randomInt(10)).map { randomQueryLevelTrigger() }, enabledTime: Instant? = if (enabled) Instant.now().truncatedTo(ChronoUnit.MILLIS) else null, lastUpdateTime: Instant = Instant.now().truncatedTo(ChronoUnit.MILLIS), withMetadata: Boolean = false ): Monitor { - return Monitor( - name = name, enabled = enabled, inputs = inputs, schedule = schedule, triggers = triggers, - enabledTime = enabledTime, lastUpdateTime = lastUpdateTime, - user = null, uiMetadata = if (withMetadata) mapOf("foo" to "bar") else mapOf() - ) + return Monitor(name = name, monitorType = Monitor.MonitorType.QUERY_LEVEL_MONITOR, enabled = enabled, inputs = inputs, + schedule = schedule, triggers = triggers, enabledTime = enabledTime, lastUpdateTime = lastUpdateTime, user = null, + uiMetadata = if (withMetadata) mapOf("foo" to "bar") else mapOf()) +} + +fun randomBucketLevelMonitor( + name: String = OpenSearchRestTestCase.randomAlphaOfLength(10), + user: User = randomUser(), + inputs: List = listOf( + SearchInput( + emptyList(), + SearchSourceBuilder().query(QueryBuilders.matchAllQuery()).aggregation(TermsAggregationBuilder("test_agg")) + ) + ), + schedule: Schedule = IntervalSchedule(interval = 5, unit = ChronoUnit.MINUTES), + enabled: Boolean = randomBoolean(), + triggers: List = (1..randomInt(10)).map { randomBucketLevelTrigger() }, + enabledTime: Instant? = if (enabled) Instant.now().truncatedTo(ChronoUnit.MILLIS) else null, + lastUpdateTime: Instant = Instant.now().truncatedTo(ChronoUnit.MILLIS), + withMetadata: Boolean = false +): Monitor { + return Monitor(name = name, monitorType = Monitor.MonitorType.BUCKET_LEVEL_MONITOR, enabled = enabled, inputs = inputs, + schedule = schedule, triggers = triggers, enabledTime = enabledTime, lastUpdateTime = lastUpdateTime, user = user, + uiMetadata = if (withMetadata) mapOf("foo" to "bar") else mapOf()) } -fun randomTrigger( +fun randomQueryLevelTrigger( id: String = UUIDs.base64UUID(), name: String = OpenSearchRestTestCase.randomAlphaOfLength(10), severity: String = "1", condition: Script = randomScript(), actions: List = mutableListOf(), destinationId: String = "" -): Trigger { - return Trigger( +): QueryLevelTrigger { + return QueryLevelTrigger( id = id, name = name, severity = severity, condition = condition, - actions = if (actions.isEmpty()) (0..randomInt(10)).map { randomAction(destinationId = destinationId) } else actions - ) + actions = if (actions.isEmpty()) (0..randomInt(10)).map { randomAction(destinationId = destinationId) } else actions) +} + +fun randomBucketLevelTrigger( + id: String = UUIDs.base64UUID(), + name: String = OpenSearchRestTestCase.randomAlphaOfLength(10), + severity: String = "1", + bucketSelector: BucketSelectorExtAggregationBuilder = randomBucketSelectorExtAggregationBuilder(name = id), + actions: List = mutableListOf(), + destinationId: String = "" +): BucketLevelTrigger { + return BucketLevelTrigger( + id = id, + name = name, + severity = severity, + bucketSelector = bucketSelector, + actions = if (actions.isEmpty()) (0..randomInt(10)).map { randomAction(destinationId = destinationId) } else actions) +} + +fun randomBucketSelectorExtAggregationBuilder( + name: String = OpenSearchRestTestCase.randomAlphaOfLength(10), + bucketsPathsMap: MutableMap = mutableMapOf("avg" to "10"), + script: Script = randomBucketSelectorScript(params = bucketsPathsMap), + parentBucketPath: String = "testPath", + filter: BucketSelectorExtFilter = BucketSelectorExtFilter(IncludeExclude("foo*", "bar*")) +): BucketSelectorExtAggregationBuilder { + return BucketSelectorExtAggregationBuilder(name, bucketsPathsMap, script, parentBucketPath, filter) +} + +fun randomBucketSelectorScript( + idOrCode: String = "params.avg >= 0", + params: Map = mutableMapOf("avg" to "10") +): Script { + return Script(Script.DEFAULT_SCRIPT_TYPE, Script.DEFAULT_SCRIPT_LANG, idOrCode, emptyMap(), params) } fun randomEmailAccount( @@ -172,21 +235,43 @@ fun randomAction( template: Script = randomTemplateScript("Hello World"), destinationId: String = "", throttleEnabled: Boolean = false, - throttle: Throttle = randomThrottle() -) = Action(name, destinationId, template, template, throttleEnabled, throttle) + throttle: Throttle = randomThrottle(), + actionExecutionPolicy: ActionExecutionPolicy = randomActionExecutionPolicy() +) = Action(name, destinationId, template, template, throttleEnabled, throttle, actionExecutionPolicy = actionExecutionPolicy) fun randomThrottle( value: Int = randomIntBetween(60, 120), unit: ChronoUnit = ChronoUnit.MINUTES ) = Throttle(value, unit) -fun randomAlert(monitor: Monitor = randomMonitor()): Alert { - val trigger = randomTrigger() +fun randomActionExecutionPolicy( + throttle: Throttle = randomThrottle(), + actionExecutionScope: ActionExecutionScope = randomActionExecutionFrequency() +) = ActionExecutionPolicy(throttle, actionExecutionScope) + +fun randomActionExecutionFrequency(): ActionExecutionScope { + return if (randomBoolean()) { + val alertCategories = AlertCategory.values() + PerAlertActionScope( + actionableAlerts = (1..randomInt(alertCategories.size)).map { alertCategories[it - 1] }.toSet()) + } else { + PerExecutionActionScope() + } +} + +fun randomAlert(monitor: Monitor = randomQueryLevelMonitor()): Alert { + val trigger = randomQueryLevelTrigger() val actionExecutionResults = mutableListOf(randomActionExecutionResult(), randomActionExecutionResult()) - return Alert( - monitor, trigger, Instant.now().truncatedTo(ChronoUnit.MILLIS), null, - actionExecutionResults = actionExecutionResults - ) + return Alert(monitor, trigger, Instant.now().truncatedTo(ChronoUnit.MILLIS), null, + actionExecutionResults = actionExecutionResults) +} + +fun randomAlertWithAggregationResultBucket(monitor: Monitor = randomBucketLevelMonitor()): Alert { + val trigger = randomBucketLevelTrigger() + val actionExecutionResults = mutableListOf(randomActionExecutionResult(), randomActionExecutionResult()) + return Alert(monitor, trigger, Instant.now().truncatedTo(ChronoUnit.MILLIS), null, + actionExecutionResults = actionExecutionResults, aggregationResultBucket = AggregationResultBucket("parent_bucket_path_1", + listOf("bucket_key_1"), mapOf("k1" to "val1", "k2" to "val2"))) } fun randomEmailAccountMethod(): EmailAccount.MethodType { @@ -201,9 +286,24 @@ fun randomActionExecutionResult( throttledCount: Int = randomInt() ) = ActionExecutionResult(actionId, lastExecutionTime, throttledCount) -fun randomMonitorRunResult(): MonitorRunResult { - val triggerResults = mutableMapOf() - val triggerRunResult = randomTriggerRunResult() +fun randomQueryLevelMonitorRunResult(): MonitorRunResult { + val triggerResults = mutableMapOf() + val triggerRunResult = randomQueryLevelTriggerRunResult() + triggerResults.plus(Pair("test", triggerRunResult)) + + return MonitorRunResult( + "test-monitor", + Instant.now(), + Instant.now(), + null, + randomInputRunResults(), + triggerResults + ) +} + +fun randomBucketLevelMonitorRunResult(): MonitorRunResult { + val triggerResults = mutableMapOf() + val triggerRunResult = randomBucketLevelTriggerRunResult() triggerResults.plus(Pair("test", triggerRunResult)) return MonitorRunResult( @@ -220,11 +320,29 @@ fun randomInputRunResults(): InputRunResults { return InputRunResults(listOf(), null) } -fun randomTriggerRunResult(): TriggerRunResult { +fun randomQueryLevelTriggerRunResult(): QueryLevelTriggerRunResult { val map = mutableMapOf() map.plus(Pair("key1", randomActionRunResult())) map.plus(Pair("key2", randomActionRunResult())) - return TriggerRunResult("trigger-name", true, null, map) + return QueryLevelTriggerRunResult("trigger-name", true, null, map) +} + +fun randomBucketLevelTriggerRunResult(): BucketLevelTriggerRunResult { + val map = mutableMapOf() + map.plus(Pair("key1", randomActionRunResult())) + map.plus(Pair("key2", randomActionRunResult())) + + val aggBucket1 = AggregationResultBucket("parent_bucket_path_1", listOf("bucket_key_1"), + mapOf("k1" to "val1", "k2" to "val2")) + val aggBucket2 = AggregationResultBucket("parent_bucket_path_2", listOf("bucket_key_2"), + mapOf("k1" to "val1", "k2" to "val2")) + + val actionResultsMap: MutableMap> = mutableMapOf() + actionResultsMap[aggBucket1.getBucketKeysHash()] = map + actionResultsMap[aggBucket2.getBucketKeysHash()] = map + + return BucketLevelTriggerRunResult("trigger-name", null, + mapOf(aggBucket1.getBucketKeysHash() to aggBucket1, aggBucket2.getBucketKeysHash() to aggBucket2), actionResultsMap) } fun randomActionRunResult(): ActionRunResult { @@ -243,15 +361,8 @@ fun Monitor.toJsonString(): String { } fun randomUser(): User { - return User( - OpenSearchRestTestCase.randomAlphaOfLength(10), - listOf( - OpenSearchRestTestCase.randomAlphaOfLength(10), - OpenSearchRestTestCase.randomAlphaOfLength(10) - ), - listOf(OpenSearchRestTestCase.randomAlphaOfLength(10), "all_access"), - listOf("test_attr=test") - ) + return User(OpenSearchRestTestCase.randomAlphaOfLength(10), listOf(OpenSearchRestTestCase.randomAlphaOfLength(10), + OpenSearchRestTestCase.randomAlphaOfLength(10)), listOf(OpenSearchRestTestCase.randomAlphaOfLength(10), "all_access"), listOf("test_attr=test")) } fun randomUserEmpty(): User { @@ -329,10 +440,7 @@ fun parser(xc: String): XContentParser { } fun xContentRegistry(): NamedXContentRegistry { - return NamedXContentRegistry( - listOf( - SearchInput.XCONTENT_REGISTRY - ) + - SearchModule(Settings.EMPTY, false, emptyList()).namedXContents - ) + return NamedXContentRegistry(listOf( + SearchInput.XCONTENT_REGISTRY, QueryLevelTrigger.XCONTENT_REGISTRY, BucketLevelTrigger.XCONTENT_REGISTRY) + + SearchModule(Settings.EMPTY, false, emptyList()).namedXContents) } diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/action/AcknowledgeAlertResponseTests.kt b/alerting/src/test/kotlin/org/opensearch/alerting/action/AcknowledgeAlertResponseTests.kt index 4b95a9e7c..c816b2248 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/action/AcknowledgeAlertResponseTests.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/action/AcknowledgeAlertResponseTests.kt @@ -45,7 +45,7 @@ class AcknowledgeAlertResponseTests : OpenSearchTestCase() { "1234", 0L, 1, "monitor-1234", "test-monitor", 0L, randomUser(), "trigger-14", "test-trigger", Alert.State.ACKNOWLEDGED, Instant.now(), Instant.now(), Instant.now(), Instant.now(), null, ArrayList(), - "sev-2", ArrayList() + "sev-2", ArrayList(), null ) ) val failed = mutableListOf( @@ -53,7 +53,7 @@ class AcknowledgeAlertResponseTests : OpenSearchTestCase() { "1234", 0L, 1, "monitor-1234", "test-monitor", 0L, randomUser(), "trigger-14", "test-trigger", Alert.State.ERROR, Instant.now(), Instant.now(), Instant.now(), Instant.now(), null, mutableListOf(AlertError(Instant.now(), "Error msg")), - "sev-2", mutableListOf(ActionExecutionResult("7890", null, 0)) + "sev-2", mutableListOf(ActionExecutionResult("7890", null, 0)), null ) ) val missing = mutableListOf("1", "2", "3", "4") diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/action/ExecuteMonitorRequestTests.kt b/alerting/src/test/kotlin/org/opensearch/alerting/action/ExecuteMonitorRequestTests.kt index ebd2373f8..2e7f95e19 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/action/ExecuteMonitorRequestTests.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/action/ExecuteMonitorRequestTests.kt @@ -27,7 +27,7 @@ package org.opensearch.alerting.action import org.opensearch.alerting.core.model.SearchInput -import org.opensearch.alerting.randomMonitor +import org.opensearch.alerting.randomQueryLevelMonitor import org.opensearch.common.io.stream.BytesStreamOutput import org.opensearch.common.io.stream.StreamInput import org.opensearch.common.unit.TimeValue @@ -52,7 +52,7 @@ class ExecuteMonitorRequestTests : OpenSearchTestCase() { } fun `test execute monitor request with monitor`() { - val monitor = randomMonitor().copy(inputs = listOf(SearchInput(emptyList(), SearchSourceBuilder()))) + val monitor = randomQueryLevelMonitor().copy(inputs = listOf(SearchInput(emptyList(), SearchSourceBuilder()))) val req = ExecuteMonitorRequest(false, TimeValue.timeValueSeconds(100L), null, monitor) assertNotNull(req.monitor) diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/action/ExecuteMonitorResponseTests.kt b/alerting/src/test/kotlin/org/opensearch/alerting/action/ExecuteMonitorResponseTests.kt index 7002bc098..d33127183 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/action/ExecuteMonitorResponseTests.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/action/ExecuteMonitorResponseTests.kt @@ -27,15 +27,29 @@ package org.opensearch.alerting.action import org.junit.Assert -import org.opensearch.alerting.randomMonitorRunResult +import org.opensearch.alerting.randomBucketLevelMonitorRunResult +import org.opensearch.alerting.randomQueryLevelMonitorRunResult import org.opensearch.common.io.stream.BytesStreamOutput import org.opensearch.common.io.stream.StreamInput import org.opensearch.test.OpenSearchTestCase class ExecuteMonitorResponseTests : OpenSearchTestCase() { - fun `test exec monitor response`() { - val req = ExecuteMonitorResponse(randomMonitorRunResult()) + fun `test exec query-level monitor response`() { + val req = ExecuteMonitorResponse(randomQueryLevelMonitorRunResult()) + Assert.assertNotNull(req) + + val out = BytesStreamOutput() + req.writeTo(out) + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val newReq = ExecuteMonitorResponse(sin) + assertNotNull(newReq.monitorRunResult) + assertEquals("test-monitor", newReq.monitorRunResult.monitorName) + assertNotNull(newReq.monitorRunResult.inputResults) + } + + fun `test exec bucket-level monitor response`() { + val req = ExecuteMonitorResponse(randomBucketLevelMonitorRunResult()) Assert.assertNotNull(req) val out = BytesStreamOutput() diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/action/GetAlertsResponseTests.kt b/alerting/src/test/kotlin/org/opensearch/alerting/action/GetAlertsResponseTests.kt index afacfbd36..66931a415 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/action/GetAlertsResponseTests.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/action/GetAlertsResponseTests.kt @@ -72,7 +72,8 @@ class GetAlertsResponseTests : OpenSearchTestCase() { null, Collections.emptyList(), "severity", - Collections.emptyList() + Collections.emptyList(), + null ) val req = GetAlertsResponse(listOf(alert), 1) assertNotNull(req) @@ -107,7 +108,8 @@ class GetAlertsResponseTests : OpenSearchTestCase() { null, Collections.emptyList(), "severity", - Collections.emptyList() + Collections.emptyList(), + null ) val req = GetAlertsResponse(listOf(alert), 1) var actualXContentString = req.toXContent(builder(), ToXContent.EMPTY_PARAMS).string() diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/action/GetMonitorResponseTests.kt b/alerting/src/test/kotlin/org/opensearch/alerting/action/GetMonitorResponseTests.kt index 7516fea75..f2023fd6d 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/action/GetMonitorResponseTests.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/action/GetMonitorResponseTests.kt @@ -57,13 +57,22 @@ class GetMonitorResponseTests : OpenSearchTestCase() { val testInstance = Instant.ofEpochSecond(1538164858L) val cronSchedule = CronSchedule(cronExpression, ZoneId.of("Asia/Kolkata"), testInstance) - val req = GetMonitorResponse( - "1234", 1L, 2L, 0L, RestStatus.OK, - Monitor( - "123", 0L, "test-monitor", true, cronSchedule, Instant.now(), - Instant.now(), randomUser(), 0, mutableListOf(), mutableListOf(), mutableMapOf() - ) + val monitor = Monitor( + id = "123", + version = 0L, + name = "test-monitor", + enabled = true, + schedule = cronSchedule, + lastUpdateTime = Instant.now(), + enabledTime = Instant.now(), + monitorType = Monitor.MonitorType.QUERY_LEVEL_MONITOR, + user = randomUser(), + schemaVersion = 0, + inputs = mutableListOf(), + triggers = mutableListOf(), + uiMetadata = mutableMapOf() ) + val req = GetMonitorResponse("1234", 1L, 2L, 0L, RestStatus.OK, monitor) assertNotNull(req) val out = BytesStreamOutput() diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/action/IndexMonitorRequestTests.kt b/alerting/src/test/kotlin/org/opensearch/alerting/action/IndexMonitorRequestTests.kt index b38f2aee1..5c9ce3b78 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/action/IndexMonitorRequestTests.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/action/IndexMonitorRequestTests.kt @@ -28,7 +28,7 @@ package org.opensearch.alerting.action import org.opensearch.action.support.WriteRequest import org.opensearch.alerting.core.model.SearchInput -import org.opensearch.alerting.randomMonitor +import org.opensearch.alerting.randomQueryLevelMonitor import org.opensearch.common.io.stream.BytesStreamOutput import org.opensearch.common.io.stream.StreamInput import org.opensearch.rest.RestRequest @@ -41,7 +41,7 @@ class IndexMonitorRequestTests : OpenSearchTestCase() { val req = IndexMonitorRequest( "1234", 1L, 2L, WriteRequest.RefreshPolicy.IMMEDIATE, RestRequest.Method.POST, - randomMonitor().copy(inputs = listOf(SearchInput(emptyList(), SearchSourceBuilder()))) + randomQueryLevelMonitor().copy(inputs = listOf(SearchInput(emptyList(), SearchSourceBuilder()))) ) assertNotNull(req) @@ -60,7 +60,7 @@ class IndexMonitorRequestTests : OpenSearchTestCase() { val req = IndexMonitorRequest( "1234", 1L, 2L, WriteRequest.RefreshPolicy.IMMEDIATE, RestRequest.Method.PUT, - randomMonitor().copy(inputs = listOf(SearchInput(emptyList(), SearchSourceBuilder()))) + randomQueryLevelMonitor().copy(inputs = listOf(SearchInput(emptyList(), SearchSourceBuilder()))) ) assertNotNull(req) diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/action/IndexMonitorResponseTests.kt b/alerting/src/test/kotlin/org/opensearch/alerting/action/IndexMonitorResponseTests.kt index 444cf2d94..5a7a9b01f 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/action/IndexMonitorResponseTests.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/action/IndexMonitorResponseTests.kt @@ -43,13 +43,21 @@ class IndexMonitorResponseTests : OpenSearchTestCase() { val testInstance = Instant.ofEpochSecond(1538164858L) val cronSchedule = CronSchedule(cronExpression, ZoneId.of("Asia/Kolkata"), testInstance) - val req = IndexMonitorResponse( - "1234", 1L, 2L, 0L, RestStatus.OK, - Monitor( - "123", 0L, "test-monitor", true, cronSchedule, Instant.now(), - Instant.now(), randomUser(), 0, mutableListOf(), mutableListOf(), mutableMapOf() - ) - ) + val monitor = Monitor( + id = "123", + version = 0L, + name = "test-monitor", + enabled = true, + schedule = cronSchedule, + lastUpdateTime = Instant.now(), + enabledTime = Instant.now(), + monitorType = Monitor.MonitorType.QUERY_LEVEL_MONITOR, + user = randomUser(), + schemaVersion = 0, + inputs = mutableListOf(), + triggers = mutableListOf(), + uiMetadata = mutableMapOf()) + val req = IndexMonitorResponse("1234", 1L, 2L, 0L, RestStatus.OK, monitor) assertNotNull(req) val out = BytesStreamOutput() diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/alerts/AlertIndicesIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/alerts/AlertIndicesIT.kt index c7ebcdd84..948066f4d 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/alerts/AlertIndicesIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/alerts/AlertIndicesIT.kt @@ -34,8 +34,8 @@ import org.opensearch.alerting.AlertingRestTestCase import org.opensearch.alerting.NEVER_RUN import org.opensearch.alerting.core.model.ScheduledJob import org.opensearch.alerting.makeRequest -import org.opensearch.alerting.randomMonitor -import org.opensearch.alerting.randomTrigger +import org.opensearch.alerting.randomQueryLevelMonitor +import org.opensearch.alerting.randomQueryLevelTrigger import org.opensearch.alerting.settings.AlertingSettings import org.opensearch.common.xcontent.XContentType import org.opensearch.common.xcontent.json.JsonXContent.jsonXContent @@ -44,7 +44,7 @@ import org.opensearch.rest.RestStatus class AlertIndicesIT : AlertingRestTestCase() { fun `test create alert index`() { - executeMonitor(randomMonitor(triggers = listOf(randomTrigger(condition = ALWAYS_RUN)))) + executeMonitor(randomQueryLevelMonitor(triggers = listOf(randomQueryLevelTrigger(condition = ALWAYS_RUN)))) assertIndexExists(AlertIndices.ALERT_INDEX) assertIndexExists(AlertIndices.HISTORY_WRITE_INDEX) @@ -57,7 +57,7 @@ class AlertIndicesIT : AlertingRestTestCase() { putAlertMappings( AlertIndices.alertMapping().trimStart('{').trimEnd('}') - .replace("\"schema_version\": 2", "\"schema_version\": 0") + .replace("\"schema_version\": 3", "\"schema_version\": 0") ) assertIndexExists(AlertIndices.ALERT_INDEX) assertIndexExists(AlertIndices.HISTORY_WRITE_INDEX) @@ -67,15 +67,15 @@ class AlertIndicesIT : AlertingRestTestCase() { executeMonitor(createRandomMonitor()) assertIndexExists(AlertIndices.ALERT_INDEX) assertIndexExists(AlertIndices.HISTORY_WRITE_INDEX) - verifyIndexSchemaVersion(ScheduledJob.SCHEDULED_JOBS_INDEX, 3) - verifyIndexSchemaVersion(AlertIndices.ALERT_INDEX, 2) - verifyIndexSchemaVersion(AlertIndices.HISTORY_WRITE_INDEX, 2) + verifyIndexSchemaVersion(ScheduledJob.SCHEDULED_JOBS_INDEX, 4) + verifyIndexSchemaVersion(AlertIndices.ALERT_INDEX, 3) + verifyIndexSchemaVersion(AlertIndices.HISTORY_WRITE_INDEX, 3) } fun `test alert index gets recreated automatically if deleted`() { wipeAllODFEIndices() assertIndexDoesNotExist(AlertIndices.ALERT_INDEX) - val trueMonitor = randomMonitor(triggers = listOf(randomTrigger(condition = ALWAYS_RUN))) + val trueMonitor = randomQueryLevelMonitor(triggers = listOf(randomQueryLevelTrigger(condition = ALWAYS_RUN))) executeMonitor(trueMonitor) assertIndexExists(AlertIndices.ALERT_INDEX) @@ -95,7 +95,7 @@ class AlertIndicesIT : AlertingRestTestCase() { client().updateSettings(AlertingSettings.ALERT_HISTORY_ROLLOVER_PERIOD.key, "1s") client().updateSettings(AlertingSettings.ALERT_HISTORY_INDEX_MAX_AGE.key, "1s") - val trueMonitor = randomMonitor(triggers = listOf(randomTrigger(condition = ALWAYS_RUN))) + val trueMonitor = randomQueryLevelMonitor(triggers = listOf(randomQueryLevelTrigger(condition = ALWAYS_RUN))) executeMonitor(trueMonitor) // Allow for a rollover index. @@ -106,8 +106,8 @@ class AlertIndicesIT : AlertingRestTestCase() { fun `test history disabled`() { resetHistorySettings() - val trigger1 = randomTrigger(condition = ALWAYS_RUN) - val monitor1 = createMonitor(randomMonitor(triggers = listOf(trigger1))) + val trigger1 = randomQueryLevelTrigger(condition = ALWAYS_RUN) + val monitor1 = createMonitor(randomQueryLevelMonitor(triggers = listOf(trigger1))) executeMonitor(monitor1.id) // Check if alert is active @@ -126,8 +126,8 @@ class AlertIndicesIT : AlertingRestTestCase() { // Disable alert history client().updateSettings(AlertingSettings.ALERT_HISTORY_ENABLED.key, "false") - val trigger2 = randomTrigger(condition = ALWAYS_RUN) - val monitor2 = createMonitor(randomMonitor(triggers = listOf(trigger2))) + val trigger2 = randomQueryLevelTrigger(condition = ALWAYS_RUN) + val monitor2 = createMonitor(randomQueryLevelMonitor(triggers = listOf(trigger2))) executeMonitor(monitor2.id) // Check if second alert is active @@ -151,8 +151,8 @@ class AlertIndicesIT : AlertingRestTestCase() { resetHistorySettings() // Create monitor and execute - val trigger = randomTrigger(condition = ALWAYS_RUN) - val monitor = createMonitor(randomMonitor(triggers = listOf(trigger))) + val trigger = randomQueryLevelTrigger(condition = ALWAYS_RUN) + val monitor = createMonitor(randomQueryLevelMonitor(triggers = listOf(trigger))) executeMonitor(monitor.id) // Check if alert is active and alert index is created @@ -179,7 +179,7 @@ class AlertIndicesIT : AlertingRestTestCase() { client().updateSettings(AlertingSettings.ALERT_HISTORY_RETENTION_PERIOD.key, "1s") // Give some time for history to be rolled over and cleared - Thread.sleep(2000) + Thread.sleep(5000) // Given the max_docs and retention settings above, the history index will rollover and the non-write index will be deleted. // This leaves two indices: alert index and an empty history write index diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/model/AlertTests.kt b/alerting/src/test/kotlin/org/opensearch/alerting/model/AlertTests.kt index 77e1dbef6..6e2df7dd7 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/model/AlertTests.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/model/AlertTests.kt @@ -26,7 +26,9 @@ package org.opensearch.alerting.model +import org.junit.Assert import org.opensearch.alerting.randomAlert +import org.opensearch.alerting.randomAlertWithAggregationResultBucket import org.opensearch.test.OpenSearchTestCase class AlertTests : OpenSearchTestCase() { @@ -46,6 +48,26 @@ class AlertTests : OpenSearchTestCase() { assertEquals("Template args severity does not match", templateArgs[Alert.SEVERITY_FIELD], alert.severity) } + fun `test agg alert as template args`() { + val alert = randomAlertWithAggregationResultBucket().copy(acknowledgedTime = null, lastNotificationTime = null) + + val templateArgs = alert.asTemplateArg() + + assertEquals("Template args id does not match", templateArgs[Alert.ALERT_ID_FIELD], alert.id) + assertEquals("Template args version does not match", templateArgs[Alert.ALERT_VERSION_FIELD], alert.version) + assertEquals("Template args state does not match", templateArgs[Alert.STATE_FIELD], alert.state.toString()) + assertEquals("Template args error message does not match", templateArgs[Alert.ERROR_MESSAGE_FIELD], alert.errorMessage) + assertEquals("Template args acknowledged time does not match", templateArgs[Alert.ACKNOWLEDGED_TIME_FIELD], null) + assertEquals("Template args end time does not", templateArgs[Alert.END_TIME_FIELD], alert.endTime?.toEpochMilli()) + assertEquals("Template args start time does not", templateArgs[Alert.START_TIME_FIELD], alert.startTime.toEpochMilli()) + assertEquals("Template args last notification time does not match", templateArgs[Alert.LAST_NOTIFICATION_TIME_FIELD], null) + assertEquals("Template args severity does not match", templateArgs[Alert.SEVERITY_FIELD], alert.severity) + Assert.assertEquals("Template args bucketKeys do not match", + templateArgs[Alert.BUCKET_KEYS], alert.aggregationResultBucket?.bucketKeys?.joinToString(",")) + Assert.assertEquals("Template args parentBucketPath does not match", + templateArgs[Alert.PARENTS_BUCKET_PATH], alert.aggregationResultBucket?.parentBucketPath) + } + fun `test alert acknowledged`() { val ackAlert = randomAlert().copy(state = Alert.State.ACKNOWLEDGED) assertTrue("Alert is not acknowledged", ackAlert.isAcknowledged()) diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/model/WriteableTests.kt b/alerting/src/test/kotlin/org/opensearch/alerting/model/WriteableTests.kt index 0c23906ec..f0c372d2f 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/model/WriteableTests.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/model/WriteableTests.kt @@ -28,19 +28,24 @@ package org.opensearch.alerting.model import org.opensearch.alerting.core.model.SearchInput import org.opensearch.alerting.model.action.Action +import org.opensearch.alerting.model.action.ActionExecutionPolicy import org.opensearch.alerting.model.action.Throttle import org.opensearch.alerting.model.destination.email.EmailAccount import org.opensearch.alerting.model.destination.email.EmailGroup import org.opensearch.alerting.randomAction +import org.opensearch.alerting.randomActionExecutionPolicy import org.opensearch.alerting.randomActionRunResult +import org.opensearch.alerting.randomBucketLevelMonitorRunResult +import org.opensearch.alerting.randomBucketLevelTrigger +import org.opensearch.alerting.randomBucketLevelTriggerRunResult import org.opensearch.alerting.randomEmailAccount import org.opensearch.alerting.randomEmailGroup import org.opensearch.alerting.randomInputRunResults -import org.opensearch.alerting.randomMonitor -import org.opensearch.alerting.randomMonitorRunResult +import org.opensearch.alerting.randomQueryLevelMonitor +import org.opensearch.alerting.randomQueryLevelMonitorRunResult import org.opensearch.alerting.randomThrottle -import org.opensearch.alerting.randomTrigger -import org.opensearch.alerting.randomTriggerRunResult +import org.opensearch.alerting.randomQueryLevelTrigger +import org.opensearch.alerting.randomQueryLevelTriggerRunResult import org.opensearch.alerting.randomUser import org.opensearch.alerting.randomUserEmpty import org.opensearch.common.io.stream.BytesStreamOutput @@ -96,22 +101,31 @@ class WriteableTests : OpenSearchTestCase() { assertEquals("Round tripping Action doesn't work", action, newAction) } - fun `test monitor as stream`() { - val monitor = randomMonitor().copy(inputs = listOf(SearchInput(emptyList(), SearchSourceBuilder()))) + fun `test query-level monitor as stream`() { + val monitor = randomQueryLevelMonitor().copy(inputs = listOf(SearchInput(emptyList(), SearchSourceBuilder()))) val out = BytesStreamOutput() monitor.writeTo(out) val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) val newMonitor = Monitor(sin) - assertEquals("Round tripping Monitor doesn't work", monitor, newMonitor) + assertEquals("Round tripping QueryLevelMonitor doesn't work", monitor, newMonitor) } - fun `test trigger as stream`() { - val trigger = randomTrigger() + fun `test query-level trigger as stream`() { + val trigger = randomQueryLevelTrigger() val out = BytesStreamOutput() trigger.writeTo(out) val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) - val newTrigger = Trigger(sin) - assertEquals("Round tripping Trigger doesn't work", trigger, newTrigger) + val newTrigger = QueryLevelTrigger.readFrom(sin) + assertEquals("Round tripping QueryLevelTrigger doesn't work", trigger, newTrigger) + } + + fun `test bucket-level trigger as stream`() { + val trigger = randomBucketLevelTrigger() + val out = BytesStreamOutput() + trigger.writeTo(out) + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val newTrigger = BucketLevelTrigger.readFrom(sin) + assertEquals("Round tripping BucketLevelTrigger doesn't work", trigger, newTrigger) } fun `test actionrunresult as stream`() { @@ -123,12 +137,21 @@ class WriteableTests : OpenSearchTestCase() { assertEquals("Round tripping ActionRunResult doesn't work", actionRunResult, newActionRunResult) } - fun `test triggerrunresult as stream`() { - val runResult = randomTriggerRunResult() + fun `test query-level triggerrunresult as stream`() { + val runResult = randomQueryLevelTriggerRunResult() + val out = BytesStreamOutput() + runResult.writeTo(out) + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val newRunResult = QueryLevelTriggerRunResult(sin) + assertEquals("Round tripping ActionRunResult doesn't work", runResult, newRunResult) + } + + fun `test bucket-level triggerrunresult as stream`() { + val runResult = randomBucketLevelTriggerRunResult() val out = BytesStreamOutput() runResult.writeTo(out) val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) - val newRunResult = TriggerRunResult(sin) + val newRunResult = BucketLevelTriggerRunResult(sin) assertEquals("Round tripping ActionRunResult doesn't work", runResult, newRunResult) } @@ -141,12 +164,21 @@ class WriteableTests : OpenSearchTestCase() { assertEquals("Round tripping InputRunResults doesn't work", runResult, newRunResult) } - fun `test monitorrunresult as stream`() { - val runResult = randomMonitorRunResult() + fun `test query-level monitorrunresult as stream`() { + val runResult = randomQueryLevelMonitorRunResult() val out = BytesStreamOutput() runResult.writeTo(out) val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) - val newRunResult = MonitorRunResult(sin) + val newRunResult = MonitorRunResult(sin) + assertEquals("Round tripping MonitorRunResult doesn't work", runResult, newRunResult) + } + + fun `test bucket-level monitorrunresult as stream`() { + val runResult = randomBucketLevelMonitorRunResult() + val out = BytesStreamOutput() + runResult.writeTo(out) + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val newRunResult = MonitorRunResult(sin) assertEquals("Round tripping MonitorRunResult doesn't work", runResult, newRunResult) } @@ -194,4 +226,13 @@ class WriteableTests : OpenSearchTestCase() { val newEmailGroup = EmailGroup.readFrom(sin) assertEquals("Round tripping EmailGroup doesn't work", emailGroup, newEmailGroup) } + + fun `test action execution policy as stream`() { + val actionExecutionPolicy = randomActionExecutionPolicy() + val out = BytesStreamOutput() + actionExecutionPolicy.writeTo(out) + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val newActionExecutionPolicy = ActionExecutionPolicy.readFrom(sin) + assertEquals("Round tripping ActionExecutionPolicy doesn't work", actionExecutionPolicy, newActionExecutionPolicy) + } } diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/model/XContentTests.kt b/alerting/src/test/kotlin/org/opensearch/alerting/model/XContentTests.kt index 228b86294..f3ce396a8 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/model/XContentTests.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/model/XContentTests.kt @@ -27,26 +27,33 @@ package org.opensearch.alerting.model import org.opensearch.alerting.builder +import org.opensearch.alerting.core.model.SearchInput import org.opensearch.alerting.elasticapi.string import org.opensearch.alerting.model.action.Action +import org.opensearch.alerting.model.action.ActionExecutionPolicy import org.opensearch.alerting.model.action.Throttle import org.opensearch.alerting.model.destination.email.EmailAccount import org.opensearch.alerting.model.destination.email.EmailGroup import org.opensearch.alerting.parser import org.opensearch.alerting.randomAction +import org.opensearch.alerting.randomActionExecutionPolicy import org.opensearch.alerting.randomActionExecutionResult import org.opensearch.alerting.randomAlert +import org.opensearch.alerting.randomBucketLevelMonitor +import org.opensearch.alerting.randomBucketLevelTrigger import org.opensearch.alerting.randomEmailAccount import org.opensearch.alerting.randomEmailGroup -import org.opensearch.alerting.randomMonitor -import org.opensearch.alerting.randomMonitorWithoutUser +import org.opensearch.alerting.randomQueryLevelMonitor +import org.opensearch.alerting.randomQueryLevelMonitorWithoutUser import org.opensearch.alerting.randomThrottle -import org.opensearch.alerting.randomTrigger +import org.opensearch.alerting.randomQueryLevelTrigger import org.opensearch.alerting.randomUser import org.opensearch.alerting.randomUserEmpty import org.opensearch.alerting.toJsonString import org.opensearch.common.xcontent.ToXContent import org.opensearch.commons.authuser.User +import org.opensearch.index.query.QueryBuilders +import org.opensearch.search.builder.SearchSourceBuilder import org.opensearch.test.OpenSearchTestCase import kotlin.test.assertFailsWith @@ -56,29 +63,28 @@ class XContentTests : OpenSearchTestCase() { val action = randomAction() val actionString = action.toXContent(builder(), ToXContent.EMPTY_PARAMS).string() val parsedAction = Action.parse(parser(actionString)) - assertEquals("Round tripping Monitor doesn't work", action, parsedAction) + assertEquals("Round tripping Action doesn't work", action, parsedAction) } fun `test action parsing with null subject template`() { val action = randomAction().copy(subjectTemplate = null) val actionString = action.toXContent(builder(), ToXContent.EMPTY_PARAMS).string() val parsedAction = Action.parse(parser(actionString)) - assertEquals("Round tripping Monitor doesn't work", action, parsedAction) + assertEquals("Round tripping Action doesn't work", action, parsedAction) } fun `test action parsing with null throttle`() { val action = randomAction().copy(throttle = null) val actionString = action.toXContent(builder(), ToXContent.EMPTY_PARAMS).string() val parsedAction = Action.parse(parser(actionString)) - assertEquals("Round tripping Monitor doesn't work", action, parsedAction) + assertEquals("Round tripping Action doesn't work", action, parsedAction) } fun `test action parsing with throttled enabled and null throttle`() { val action = randomAction().copy(throttle = null).copy(throttleEnabled = true) val actionString = action.toXContent(builder(), ToXContent.EMPTY_PARAMS).string() assertFailsWith("Action throttle enabled but not set throttle value") { - Action.parse(parser(actionString)) - } + Action.parse(parser(actionString)) } } fun `test throttle parsing`() { @@ -103,21 +109,30 @@ class XContentTests : OpenSearchTestCase() { assertFailsWith("Can only set positive throttle period") { Throttle.parse(parser(throttleString)) } } - fun `test monitor parsing`() { - val monitor = randomMonitor() + fun `test query-level monitor parsing`() { + val monitor = randomQueryLevelMonitor() val monitorString = monitor.toJsonString() val parsedMonitor = Monitor.parse(parser(monitorString)) - assertEquals("Round tripping Monitor doesn't work", monitor, parsedMonitor) + assertEquals("Round tripping QueryLevelMonitor doesn't work", monitor, parsedMonitor) + } + + fun `test query-level trigger parsing`() { + val trigger = randomQueryLevelTrigger() + + val triggerString = trigger.toXContent(builder(), ToXContent.EMPTY_PARAMS).string() + val parsedTrigger = Trigger.parse(parser(triggerString)) + + assertEquals("Round tripping QueryLevelTrigger doesn't work", trigger, parsedTrigger) } - fun `test trigger parsing`() { - val trigger = randomTrigger() + fun `test bucket-level trigger parsing`() { + val trigger = randomBucketLevelTrigger() val triggerString = trigger.toXContent(builder(), ToXContent.EMPTY_PARAMS).string() val parsedTrigger = Trigger.parse(parser(triggerString)) - assertEquals("Round tripping Trigger doesn't work", trigger, parsedTrigger) + assertEquals("Round tripping BucketLevelTrigger doesn't work", trigger, parsedTrigger) } fun `test alert parsing`() { @@ -162,8 +177,8 @@ class XContentTests : OpenSearchTestCase() { fun `test creating a monitor with duplicate trigger ids fails`() { try { - val repeatedTrigger = randomTrigger() - randomMonitor().copy(triggers = listOf(repeatedTrigger, repeatedTrigger)) + val repeatedTrigger = randomQueryLevelTrigger() + randomQueryLevelMonitor().copy(triggers = listOf(repeatedTrigger, repeatedTrigger)) fail("Creating a monitor with duplicate triggers did not fail.") } catch (ignored: IllegalArgumentException) { } @@ -186,12 +201,12 @@ class XContentTests : OpenSearchTestCase() { assertEquals(0, parsedUser.roles.size) } - fun `test monitor parsing without user`() { - val monitor = randomMonitorWithoutUser() + fun `test query-level monitor parsing without user`() { + val monitor = randomQueryLevelMonitorWithoutUser() val monitorString = monitor.toJsonString() val parsedMonitor = Monitor.parse(parser(monitorString)) - assertEquals("Round tripping Monitor doesn't work", monitor, parsedMonitor) + assertEquals("Round tripping QueryLevelMonitor doesn't work", monitor, parsedMonitor) assertNull(parsedMonitor.user) } @@ -210,4 +225,127 @@ class XContentTests : OpenSearchTestCase() { val parsedEmailGroup = EmailGroup.parse(parser(emailGroupString)) assertEquals("Round tripping EmailGroup doesn't work", emailGroup, parsedEmailGroup) } + + fun `test old monitor format parsing`() { + val monitorString = """ + { + "type": "monitor", + "schema_version": 3, + "name": "asdf", + "user": { + "name": "admin123", + "backend_roles": [], + "roles": [ + "all_access", + "security_manager" + ], + "custom_attribute_names": [], + "user_requested_tenant": null + }, + "enabled": true, + "enabled_time": 1613530078244, + "schedule": { + "period": { + "interval": 1, + "unit": "MINUTES" + } + }, + "inputs": [ + { + "search": { + "indices": [ + "test_index" + ], + "query": { + "size": 0, + "query": { + "bool": { + "filter": [ + { + "range": { + "order_date": { + "from": "{{period_end}}||-1h", + "to": "{{period_end}}", + "include_lower": true, + "include_upper": true, + "format": "epoch_millis", + "boost": 1.0 + } + } + } + ], + "adjust_pure_negative": true, + "boost": 1.0 + } + }, + "aggregations": {} + } + } + } + ], + "triggers": [ + { + "id": "e_sc0XcB98Q42rHjTh4K", + "name": "abc", + "severity": "1", + "condition": { + "script": { + "source": "ctx.results[0].hits.total.value > 100000", + "lang": "painless" + } + }, + "actions": [] + } + ], + "last_update_time": 1614121489719 + } + """.trimIndent() + val parsedMonitor = Monitor.parse(parser(monitorString)) + assertEquals("Incorrect monitor type", Monitor.MonitorType.QUERY_LEVEL_MONITOR, parsedMonitor.monitorType) + assertEquals("Incorrect trigger count", 1, parsedMonitor.triggers.size) + val trigger = parsedMonitor.triggers.first() + assertTrue("Incorrect trigger type", trigger is QueryLevelTrigger) + assertEquals("Incorrect name for parsed trigger", "abc", trigger.name) + } + + fun `test creating an query-level monitor with invalid trigger type fails`() { + try { + val bucketLevelTrigger = randomBucketLevelTrigger() + randomQueryLevelMonitor().copy(triggers = listOf(bucketLevelTrigger)) + fail("Creating a query-level monitor with bucket-level triggers did not fail.") + } catch (ignored: IllegalArgumentException) { + } + } + + fun `test creating an bucket-level monitor with invalid trigger type fails`() { + try { + val queryLevelTrigger = randomQueryLevelTrigger() + randomBucketLevelMonitor().copy(triggers = listOf(queryLevelTrigger)) + fail("Creating a bucket-level monitor with query-level triggers did not fail.") + } catch (ignored: IllegalArgumentException) { + } + } + + fun `test creating an bucket-level monitor with invalid input fails`() { + try { + val invalidInput = SearchInput(emptyList(), SearchSourceBuilder().query(QueryBuilders.matchAllQuery())) + randomBucketLevelMonitor().copy(inputs = listOf(invalidInput)) + fail("Creating an bucket-level monitor with an invalid input did not fail.") + } catch (ignored: IllegalArgumentException) { + } + } + + fun `test action execution policy`() { + val actionExecutionPolicy = randomActionExecutionPolicy() + val actionExecutionPolicyString = actionExecutionPolicy.toXContent(builder(), ToXContent.EMPTY_PARAMS).string() + val parsedActionExecutionPolicy = ActionExecutionPolicy.parse(parser(actionExecutionPolicyString)) + assertEquals("Round tripping ActionExecutionPolicy doesn't work", actionExecutionPolicy, parsedActionExecutionPolicy) + } + + fun `test action execution policy with null throttle`() { + val actionExecutionPolicy = randomActionExecutionPolicy().copy(throttle = null) + val actionExecutionPolicyString = actionExecutionPolicy.toXContent(builder(), ToXContent.EMPTY_PARAMS).string() + val parsedActionExecutionPolicy = ActionExecutionPolicy.parse(parser(actionExecutionPolicyString)) + assertEquals("Round tripping ActionExecutionPolicy doesn't work", actionExecutionPolicy, parsedActionExecutionPolicy) + } } diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/MonitorRestApiIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/MonitorRestApiIT.kt index d17518e2e..274b1a4bb 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/MonitorRestApiIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/MonitorRestApiIT.kt @@ -42,15 +42,16 @@ import org.opensearch.alerting.core.settings.ScheduledJobSettings import org.opensearch.alerting.makeRequest import org.opensearch.alerting.model.Alert import org.opensearch.alerting.model.Monitor +import org.opensearch.alerting.model.QueryLevelTrigger import org.opensearch.alerting.model.Trigger import org.opensearch.alerting.randomADMonitor import org.opensearch.alerting.randomAction import org.opensearch.alerting.randomAlert import org.opensearch.alerting.randomAnomalyDetector import org.opensearch.alerting.randomAnomalyDetectorWithUser -import org.opensearch.alerting.randomMonitor +import org.opensearch.alerting.randomQueryLevelMonitor import org.opensearch.alerting.randomThrottle -import org.opensearch.alerting.randomTrigger +import org.opensearch.alerting.randomQueryLevelTrigger import org.opensearch.alerting.settings.AlertingSettings import org.opensearch.client.ResponseException import org.opensearch.client.WarningFailureException @@ -102,7 +103,7 @@ class MonitorRestApiIT : AlertingRestTestCase() { @Throws(Exception::class) fun `test creating a monitor`() { - val monitor = randomMonitor() + val monitor = randomQueryLevelMonitor() val createResponse = client().makeRequest("POST", ALERTING_BASE_URI, emptyMap(), monitor.toHttpEntity()) @@ -116,7 +117,7 @@ class MonitorRestApiIT : AlertingRestTestCase() { } fun `test creating a monitor with legacy ODFE`() { - val monitor = randomMonitor() + val monitor = randomQueryLevelMonitor() val createResponse = client().makeRequest("POST", LEGACY_OPENDISTRO_ALERTING_BASE_URI, emptyMap(), monitor.toHttpEntity()) assertEquals("Create monitor failed", RestStatus.CREATED, createResponse.restStatus()) val responseBody = createResponse.asMap() @@ -161,7 +162,7 @@ class MonitorRestApiIT : AlertingRestTestCase() { fun `test creating a monitor with PUT fails`() { try { - val monitor = randomMonitor() + val monitor = randomQueryLevelMonitor() client().makeRequest("PUT", ALERTING_BASE_URI, emptyMap(), monitor.toHttpEntity()) fail("Expected 405 Method Not Allowed response") } catch (e: ResponseException) { @@ -172,7 +173,7 @@ class MonitorRestApiIT : AlertingRestTestCase() { fun `test creating a monitor with illegal index name`() { try { val si = SearchInput(listOf("_#*IllegalIndexCharacters"), SearchSourceBuilder().query(QueryBuilders.matchAllQuery())) - val monitor = randomMonitor() + val monitor = randomQueryLevelMonitor() client().makeRequest("POST", ALERTING_BASE_URI, emptyMap(), monitor.copy(inputs = listOf(si)).toHttpEntity()) } catch (e: ResponseException) { // When an index with invalid name is mentioned, instead of returning invalid_index_name_exception security plugin throws security_exception. @@ -319,7 +320,14 @@ class MonitorRestApiIT : AlertingRestTestCase() { fun `test updating conditions for a monitor`() { val monitor = createRandomMonitor() - val updatedTriggers = listOf(Trigger("foo", "1", Script("return true"), emptyList())) + val updatedTriggers = listOf( + QueryLevelTrigger( + name = "foo", + severity = "1", + condition = Script("return true"), + actions = emptyList() + ) + ) val updateResponse = client().makeRequest( "PUT", monitor.relativeUrl(), emptyMap(), monitor.copy(triggers = updatedTriggers).toHttpEntity() @@ -715,8 +723,8 @@ class MonitorRestApiIT : AlertingRestTestCase() { fun `test delete trigger moves alerts`() { client().updateSettings(ScheduledJobSettings.SWEEPER_ENABLED.key, true) putAlertMappings() - val trigger = randomTrigger() - val monitor = createMonitor(randomMonitor(triggers = listOf(trigger))) + val trigger = randomQueryLevelTrigger() + val monitor = createMonitor(randomQueryLevelMonitor(triggers = listOf(trigger))) val alert = createAlert(randomAlert(monitor).copy(triggerId = trigger.id, state = Alert.State.ACTIVE)) refreshIndex("*") val updatedMonitor = monitor.copy(triggers = emptyList()) @@ -740,9 +748,9 @@ class MonitorRestApiIT : AlertingRestTestCase() { fun `test delete trigger moves alerts only for deleted trigger`() { client().updateSettings(ScheduledJobSettings.SWEEPER_ENABLED.key, true) putAlertMappings() - val triggerToDelete = randomTrigger() - val triggerToKeep = randomTrigger() - val monitor = createMonitor(randomMonitor(triggers = listOf(triggerToDelete, triggerToKeep))) + val triggerToDelete = randomQueryLevelTrigger() + val triggerToKeep = randomQueryLevelTrigger() + val monitor = createMonitor(randomQueryLevelMonitor(triggers = listOf(triggerToDelete, triggerToKeep))) val alertKeep = createAlert(randomAlert(monitor).copy(triggerId = triggerToKeep.id, state = Alert.State.ACTIVE)) val alertDelete = createAlert(randomAlert(monitor).copy(triggerId = triggerToDelete.id, state = Alert.State.ACTIVE)) refreshIndex("*") @@ -878,7 +886,7 @@ class MonitorRestApiIT : AlertingRestTestCase() { private fun randomMonitorWithThrottle(value: Int, unit: ChronoUnit = ChronoUnit.MINUTES): Monitor { val throttle = randomThrottle(value, unit) val action = randomAction().copy(throttle = throttle) - val trigger = randomTrigger(actions = listOf(action)) - return randomMonitor(triggers = listOf(trigger)) + val trigger = randomQueryLevelTrigger(actions = listOf(action)) + return randomQueryLevelMonitor(triggers = listOf(trigger)) } } diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/SecureMonitorRestApiIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/SecureMonitorRestApiIT.kt index 2a4b8244e..ee10bedc4 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/SecureMonitorRestApiIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/SecureMonitorRestApiIT.kt @@ -24,9 +24,9 @@ import org.opensearch.alerting.makeRequest import org.opensearch.alerting.model.Alert import org.opensearch.alerting.randomAction import org.opensearch.alerting.randomAlert -import org.opensearch.alerting.randomMonitor +import org.opensearch.alerting.randomQueryLevelMonitor import org.opensearch.alerting.randomTemplateScript -import org.opensearch.alerting.randomTrigger +import org.opensearch.alerting.randomQueryLevelTrigger import org.opensearch.client.Response import org.opensearch.client.ResponseException import org.opensearch.client.RestClient @@ -72,7 +72,7 @@ class SecureMonitorRestApiIT : AlertingRestTestCase() { createUserRolesMapping("alerting_full_access", arrayOf(user)) try { // randomMonitor has a dummy user, api ignores the User passed as part of monitor, it picks user info from the logged-in user. - val monitor = randomMonitor().copy( + val monitor = randomQueryLevelMonitor().copy( inputs = listOf( SearchInput( indices = listOf("hr_data"), query = SearchSourceBuilder().query(QueryBuilders.matchAllQuery()) @@ -106,7 +106,7 @@ class SecureMonitorRestApiIT : AlertingRestTestCase() { createUserWithTestData(user, "hr_data", "hr_role", "HR") try { - val monitor = randomMonitor().copy( + val monitor = randomQueryLevelMonitor().copy( inputs = listOf( SearchInput( indices = listOf("hr_data"), query = SearchSourceBuilder().query(QueryBuilders.matchAllQuery()) @@ -129,7 +129,7 @@ class SecureMonitorRestApiIT : AlertingRestTestCase() { createUserWithTestData(user, "hr_data", "hr_role", "HR") createUserRolesMapping("alerting_full_access", arrayOf(user)) try { - val monitor = randomMonitor().copy( + val monitor = randomQueryLevelMonitor().copy( inputs = listOf( SearchInput( indices = listOf("not_hr_data"), query = SearchSourceBuilder().query(QueryBuilders.matchAllQuery()) @@ -150,14 +150,14 @@ class SecureMonitorRestApiIT : AlertingRestTestCase() { fun `test create monitor with disable filter by`() { disableFilterBy() - val monitor = randomMonitor() + val monitor = randomQueryLevelMonitor() val createResponse = client().makeRequest("POST", ALERTING_BASE_URI, emptyMap(), monitor.toHttpEntity()) assertEquals("Create monitor failed", RestStatus.CREATED, createResponse.restStatus()) } fun `test create monitor with enable filter by`() { enableFilterBy() - val monitor = randomMonitor() + val monitor = randomQueryLevelMonitor() if (securityEnabled()) { // when security is enabled. No errors, must succeed. @@ -185,7 +185,7 @@ class SecureMonitorRestApiIT : AlertingRestTestCase() { // Query Monitors related security tests fun `test update monitor with disable filter by`() { disableFilterBy() - val monitor = randomMonitor(enabled = true) + val monitor = randomQueryLevelMonitor(enabled = true) val createdMonitor = createMonitor(monitor = monitor) @@ -205,7 +205,7 @@ class SecureMonitorRestApiIT : AlertingRestTestCase() { // refer: `test create monitor with enable filter by` return } - val monitor = randomMonitor(enabled = true) + val monitor = randomQueryLevelMonitor(enabled = true) val createdMonitor = createMonitor(monitor = monitor) @@ -220,7 +220,7 @@ class SecureMonitorRestApiIT : AlertingRestTestCase() { fun `test delete monitor with disable filter by`() { disableFilterBy() - val monitor = randomMonitor(enabled = true) + val monitor = randomQueryLevelMonitor(enabled = true) val createdMonitor = createMonitor(monitor = monitor) @@ -254,7 +254,7 @@ class SecureMonitorRestApiIT : AlertingRestTestCase() { // refer: `test create monitor with enable filter by` return } - val monitor = randomMonitor(enabled = true) + val monitor = randomQueryLevelMonitor(enabled = true) val createdMonitor = createMonitor(monitor = monitor) @@ -463,7 +463,7 @@ class SecureMonitorRestApiIT : AlertingRestTestCase() { query = SearchSourceBuilder().query(QueryBuilders.matchAllQuery()) ) ) - val monitor = randomMonitor(triggers = listOf(randomTrigger(condition = ALWAYS_RUN, actions = listOf(action))), inputs = inputs) + val monitor = randomQueryLevelMonitor(triggers = listOf(randomQueryLevelTrigger(condition = ALWAYS_RUN, actions = listOf(action))), inputs = inputs) // Make sure the elevating the permissions fails execute. val adminUser = User("admin", listOf("admin"), listOf("all_access"), listOf()) diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/util/AnomalyDetectionUtilsTests.kt b/alerting/src/test/kotlin/org/opensearch/alerting/util/AnomalyDetectionUtilsTests.kt index c51ad0332..15b41229d 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/util/AnomalyDetectionUtilsTests.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/util/AnomalyDetectionUtilsTests.kt @@ -29,7 +29,7 @@ package org.opensearch.alerting.util import org.opensearch.alerting.ANOMALY_RESULT_INDEX import org.opensearch.alerting.core.model.Input import org.opensearch.alerting.core.model.SearchInput -import org.opensearch.alerting.randomMonitor +import org.opensearch.alerting.randomQueryLevelMonitor import org.opensearch.common.io.stream.StreamOutput import org.opensearch.common.xcontent.ToXContent import org.opensearch.common.xcontent.XContentBuilder @@ -41,7 +41,7 @@ import org.opensearch.test.OpenSearchTestCase class AnomalyDetectionUtilsTests : OpenSearchTestCase() { fun `test is ad monitor`() { - val monitor = randomMonitor( + val monitor = randomQueryLevelMonitor( inputs = listOf( SearchInput( listOf(ANOMALY_RESULT_INDEX), @@ -54,14 +54,14 @@ class AnomalyDetectionUtilsTests : OpenSearchTestCase() { fun `test not ad monitor if monitor have no inputs`() { - val monitor = randomMonitor( + val monitor = randomQueryLevelMonitor( inputs = listOf() ) assertFalse(isADMonitor(monitor)) } fun `test not ad monitor if monitor input is not search input`() { - val monitor = randomMonitor( + val monitor = randomQueryLevelMonitor( inputs = listOf(object : Input { override fun name(): String { TODO("Not yet implemented") @@ -80,7 +80,7 @@ class AnomalyDetectionUtilsTests : OpenSearchTestCase() { } fun `test not ad monitor if monitor input has more than 1 indices`() { - val monitor = randomMonitor( + val monitor = randomQueryLevelMonitor( inputs = listOf( SearchInput( listOf(randomAlphaOfLength(5), randomAlphaOfLength(5)), @@ -92,7 +92,7 @@ class AnomalyDetectionUtilsTests : OpenSearchTestCase() { } fun `test not ad monitor if monitor input's index name is not AD result index`() { - val monitor = randomMonitor( + val monitor = randomQueryLevelMonitor( inputs = listOf(SearchInput(listOf(randomAlphaOfLength(5)), SearchSourceBuilder().query(QueryBuilders.matchAllQuery()))) ) assertFalse(isADMonitor(monitor)) diff --git a/core/src/main/resources/mappings/scheduled-jobs.json b/core/src/main/resources/mappings/scheduled-jobs.json index af013e01c..af3d10086 100644 --- a/core/src/main/resources/mappings/scheduled-jobs.json +++ b/core/src/main/resources/mappings/scheduled-jobs.json @@ -1,6 +1,6 @@ { "_meta" : { - "schema_version": 3 + "schema_version": 4 }, "properties": { "monitor": { @@ -18,6 +18,9 @@ } } }, + "monitor_type": { + "type": "keyword" + }, "user": { "properties": { "name": { @@ -115,6 +118,15 @@ } } }, + "group_by_fields": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, "triggers": { "type": "nested", "properties": { @@ -171,6 +183,64 @@ } } } + }, + "query_level_trigger": { + "properties": { + "name": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "min_time_between_executions": { + "type": "integer" + }, + "condition": { + "type": "object", + "enabled": false + }, + "actions": { + "type": "nested", + "properties": { + "name": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "destination_id": { + "type": "keyword" + }, + "subject_template": { + "type": "object", + "enabled": false + }, + "message_template": { + "type": "object", + "enabled": false + }, + "throttle_enabled": { + "type": "boolean" + }, + "throttle": { + "properties": { + "value": { + "type": "integer" + }, + "unit": { + "type": "keyword" + } + } + } + } + } + } } } },