Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Monitor and Trigger to split into Query-Level and Bucket-Lev… #150

Merged
merged 2 commits into from
Aug 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ import org.opensearch.alerting.core.resthandler.RestScheduledJobStatsHandler
import org.opensearch.alerting.core.schedule.JobScheduler
import org.opensearch.alerting.core.settings.LegacyOpenDistroScheduledJobSettings
import org.opensearch.alerting.core.settings.ScheduledJobSettings
import org.opensearch.alerting.model.BucketLevelTrigger
import org.opensearch.alerting.model.Monitor
import org.opensearch.alerting.model.QueryLevelTrigger
import org.opensearch.alerting.resthandler.RestAcknowledgeAlertAction
import org.opensearch.alerting.resthandler.RestDeleteDestinationAction
import org.opensearch.alerting.resthandler.RestDeleteEmailAccountAction
Expand Down Expand Up @@ -135,8 +137,8 @@ import java.util.function.Supplier
/**
* Entry point of the OpenDistro for Elasticsearch alerting plugin
* This class initializes the [RestGetMonitorAction], [RestDeleteMonitorAction], [RestIndexMonitorAction] rest handlers.
* It also adds [Monitor.XCONTENT_REGISTRY], [SearchInput.XCONTENT_REGISTRY] to the
* [NamedXContentRegistry] so that we are able to deserialize the custom named objects.
* It also adds [Monitor.XCONTENT_REGISTRY], [SearchInput.XCONTENT_REGISTRY], [QueryLevelTrigger.XCONTENT_REGISTRY],
* [BucketLevelTrigger.XCONTENT_REGISTRY] to the [NamedXContentRegistry] so that we are able to deserialize the custom named objects.
*/
internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, ReloadablePlugin, SearchPlugin, Plugin() {

Expand Down Expand Up @@ -224,7 +226,12 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
}

override fun getNamedXContent(): List<NamedXContentRegistry.Entry> {
return listOf(Monitor.XCONTENT_REGISTRY, SearchInput.XCONTENT_REGISTRY)
return listOf(
Monitor.XCONTENT_REGISTRY,
SearchInput.XCONTENT_REGISTRY,
QueryLevelTrigger.XCONTENT_REGISTRY,
BucketLevelTrigger.XCONTENT_REGISTRY
)
}

override fun createComponents(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ import java.io.IOException

class ExecuteMonitorResponse : ActionResponse, ToXContentObject {

val monitorRunResult: MonitorRunResult
val monitorRunResult: MonitorRunResult<*>

constructor(monitorRunResult: MonitorRunResult) : super() {
constructor(monitorRunResult: MonitorRunResult<*>) : super() {
this.monitorRunResult = monitorRunResult
}

Expand Down
91 changes: 73 additions & 18 deletions alerting/src/main/kotlin/org/opensearch/alerting/model/Alert.kt
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.opensearch.alerting.elasticapi.instant
import org.opensearch.alerting.elasticapi.optionalTimeField
import org.opensearch.alerting.elasticapi.optionalUserField
import org.opensearch.alerting.util.IndexUtils.Companion.NO_SCHEMA_VERSION
import org.opensearch.commons.authuser.User
import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.io.stream.StreamOutput
import org.opensearch.common.io.stream.Writeable
Expand All @@ -39,7 +40,6 @@ import org.opensearch.common.xcontent.ToXContent
import org.opensearch.common.xcontent.XContentBuilder
import org.opensearch.common.xcontent.XContentParser
import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken
import org.opensearch.commons.authuser.User
import java.io.IOException
import java.time.Instant

Expand All @@ -61,7 +61,8 @@ data class Alert(
val errorMessage: String? = null,
val errorHistory: List<AlertError>,
val severity: String,
val actionExecutionResults: List<ActionExecutionResult>
val actionExecutionResults: List<ActionExecutionResult>,
val aggregationResultBucket: AggregationResultBucket? = null
) : Writeable, ToXContent {

init {
Expand All @@ -72,20 +73,52 @@ data class Alert(

constructor(
monitor: Monitor,
trigger: Trigger,
trigger: QueryLevelTrigger,
startTime: Instant,
lastNotificationTime: Instant?,
state: State = State.ACTIVE,
errorMessage: String? = null,
errorHistory: List<AlertError> = mutableListOf(),
actionExecutionResults: List<ActionExecutionResult> = mutableListOf(),
schemaVersion: Int = NO_SCHEMA_VERSION
) : this(
monitorId = monitor.id, monitorName = monitor.name, monitorVersion = monitor.version, monitorUser = monitor.user,
) : this(monitorId = monitor.id, monitorName = monitor.name, monitorVersion = monitor.version, monitorUser = monitor.user,
triggerId = trigger.id, triggerName = trigger.name, state = state, startTime = startTime,
lastNotificationTime = lastNotificationTime, errorMessage = errorMessage, errorHistory = errorHistory,
severity = trigger.severity, actionExecutionResults = actionExecutionResults, schemaVersion = schemaVersion
)
severity = trigger.severity, actionExecutionResults = actionExecutionResults, schemaVersion = schemaVersion,
aggregationResultBucket = null)

constructor(
monitor: Monitor,
trigger: BucketLevelTrigger,
startTime: Instant,
lastNotificationTime: Instant?,
state: State = State.ACTIVE,
errorMessage: String? = null,
errorHistory: List<AlertError> = mutableListOf(),
actionExecutionResults: List<ActionExecutionResult> = mutableListOf(),
schemaVersion: Int = NO_SCHEMA_VERSION
) : this(monitorId = monitor.id, monitorName = monitor.name, monitorVersion = monitor.version, monitorUser = monitor.user,
triggerId = trigger.id, triggerName = trigger.name, state = state, startTime = startTime,
lastNotificationTime = lastNotificationTime, errorMessage = errorMessage, errorHistory = errorHistory,
severity = trigger.severity, actionExecutionResults = actionExecutionResults, schemaVersion = schemaVersion,
aggregationResultBucket = null)

constructor(
monitor: Monitor,
trigger: BucketLevelTrigger,
startTime: Instant,
lastNotificationTime: Instant?,
state: State = State.ACTIVE,
errorMessage: String? = null,
errorHistory: List<AlertError> = mutableListOf(),
actionExecutionResults: List<ActionExecutionResult> = mutableListOf(),
schemaVersion: Int = NO_SCHEMA_VERSION,
aggregationResultBucket: AggregationResultBucket
) : this(monitorId = monitor.id, monitorName = monitor.name, monitorVersion = monitor.version, monitorUser = monitor.user,
triggerId = trigger.id, triggerName = trigger.name, state = state, startTime = startTime,
lastNotificationTime = lastNotificationTime, errorMessage = errorMessage, errorHistory = errorHistory,
severity = trigger.severity, actionExecutionResults = actionExecutionResults, schemaVersion = schemaVersion,
aggregationResultBucket = aggregationResultBucket)

enum class State {
ACTIVE, ACKNOWLEDGED, COMPLETED, ERROR, DELETED
Expand All @@ -112,7 +145,8 @@ data class Alert(
errorMessage = sin.readOptionalString(),
errorHistory = sin.readList(::AlertError),
severity = sin.readString(),
actionExecutionResults = sin.readList(::ActionExecutionResult)
actionExecutionResults = sin.readList(::ActionExecutionResult),
aggregationResultBucket = if (sin.readBoolean()) AggregationResultBucket(sin) else null
)

fun isAcknowledged(): Boolean = (state == State.ACKNOWLEDGED)
Expand All @@ -138,6 +172,12 @@ data class Alert(
out.writeCollection(errorHistory)
out.writeString(severity)
out.writeCollection(actionExecutionResults)
if (aggregationResultBucket != null) {
out.writeBoolean(true)
aggregationResultBucket.writeTo(out)
} else {
out.writeBoolean(false)
}
}

companion object {
Expand All @@ -160,7 +200,8 @@ data class Alert(
const val ALERT_HISTORY_FIELD = "alert_history"
const val SEVERITY_FIELD = "severity"
const val ACTION_EXECUTION_RESULTS_FIELD = "action_execution_results"

const val BUCKET_KEYS = AggregationResultBucket.BUCKET_KEYS
const val PARENTS_BUCKET_PATH = AggregationResultBucket.PARENTS_BUCKET_PATH
const val NO_ID = ""
const val NO_VERSION = Versions.NOT_FOUND

Expand All @@ -183,8 +224,8 @@ data class Alert(
var acknowledgedTime: Instant? = null
var errorMessage: String? = null
val errorHistory: MutableList<AlertError> = mutableListOf()
var actionExecutionResults: MutableList<ActionExecutionResult> = mutableListOf()

val actionExecutionResults: MutableList<ActionExecutionResult> = mutableListOf()
var aggAlertBucket: AggregationResultBucket? = null
ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp)
while (xcp.nextToken() != XContentParser.Token.END_OBJECT) {
val fieldName = xcp.currentName()
Expand Down Expand Up @@ -217,18 +258,27 @@ data class Alert(
actionExecutionResults.add(ActionExecutionResult.parse(xcp))
}
}
AggregationResultBucket.CONFIG_NAME -> {
// If an Alert with aggAlertBucket contents is indexed into the alerts index first, then
// that field will be added to the mappings.
// In this case, that field will default to null when it isn't present for Alerts created by Query-Level Monitors
// (even though the toXContent doesn't output the field) so null is being accounted for here.
aggAlertBucket = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) {
null
} else {
AggregationResultBucket.parse(xcp)
}
}
}
}

return Alert(
id = id, version = version, schemaVersion = schemaVersion, monitorId = requireNotNull(monitorId),
return Alert(id = id, version = version, schemaVersion = schemaVersion, monitorId = requireNotNull(monitorId),
monitorName = requireNotNull(monitorName), monitorVersion = monitorVersion, monitorUser = monitorUser,
triggerId = requireNotNull(triggerId), triggerName = requireNotNull(triggerName),
state = requireNotNull(state), startTime = requireNotNull(startTime), endTime = endTime,
lastNotificationTime = lastNotificationTime, acknowledgedTime = acknowledgedTime,
errorMessage = errorMessage, errorHistory = errorHistory, severity = severity,
actionExecutionResults = actionExecutionResults
)
actionExecutionResults = actionExecutionResults, aggregationResultBucket = aggAlertBucket)
}

@JvmStatic
Expand All @@ -239,7 +289,7 @@ data class Alert(
}

override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
return builder.startObject()
builder.startObject()
.field(ALERT_ID_FIELD, id)
.field(ALERT_VERSION_FIELD, version)
.field(MONITOR_ID_FIELD, monitorId)
Expand All @@ -258,7 +308,9 @@ data class Alert(
.optionalTimeField(LAST_NOTIFICATION_TIME_FIELD, lastNotificationTime)
.optionalTimeField(END_TIME_FIELD, endTime)
.optionalTimeField(ACKNOWLEDGED_TIME_FIELD, acknowledgedTime)
.endObject()
aggregationResultBucket?.innerXContent(builder)
builder.endObject()
return builder
}

fun asTemplateArg(): Map<String, Any?> {
Expand All @@ -271,7 +323,10 @@ data class Alert(
LAST_NOTIFICATION_TIME_FIELD to lastNotificationTime?.toEpochMilli(),
SEVERITY_FIELD to severity,
START_TIME_FIELD to startTime.toEpochMilli(),
STATE_FIELD to state.toString()
STATE_FIELD to state.toString(),
// Converting bucket keys to comma separated String to avoid manipulation in Action mustache templates
BUCKET_KEYS to aggregationResultBucket?.bucketKeys?.joinToString(","),
PARENTS_BUCKET_PATH to aggregationResultBucket?.parentBucketPath
)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

package org.opensearch.alerting.model

import org.opensearch.alerting.aggregation.bucketselectorext.BucketSelectorExtAggregationBuilder
import org.opensearch.alerting.model.Trigger.Companion.ACTIONS_FIELD
import org.opensearch.alerting.model.Trigger.Companion.ID_FIELD
import org.opensearch.alerting.model.Trigger.Companion.NAME_FIELD
import org.opensearch.alerting.model.Trigger.Companion.SEVERITY_FIELD
import org.opensearch.alerting.model.action.Action
import org.opensearch.common.CheckedFunction
import org.opensearch.common.ParseField
import org.opensearch.common.UUIDs
import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.io.stream.StreamOutput
import org.opensearch.common.xcontent.NamedXContentRegistry
import org.opensearch.common.xcontent.ToXContent
import org.opensearch.common.xcontent.XContentBuilder
import org.opensearch.common.xcontent.XContentParser
import org.opensearch.common.xcontent.XContentParser.Token
import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken
import java.io.IOException

/**
* A multi-alert Trigger available with Bucket-Level Monitors that filters aggregation buckets via a pipeline
* aggregator.
*/
data class BucketLevelTrigger(
override val id: String = UUIDs.base64UUID(),
override val name: String,
override val severity: String,
val bucketSelector: BucketSelectorExtAggregationBuilder,
override val actions: List<Action>
) : Trigger {

@Throws(IOException::class)
constructor(sin: StreamInput): this(
sin.readString(), // id
sin.readString(), // name
sin.readString(), // severity
BucketSelectorExtAggregationBuilder(sin), // condition
sin.readList(::Action) // actions
)

override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
builder.startObject()
.startObject(BUCKET_LEVEL_TRIGGER_FIELD)
.field(ID_FIELD, id)
.field(NAME_FIELD, name)
.field(SEVERITY_FIELD, severity)
.startObject(CONDITION_FIELD)
bucketSelector.internalXContent(builder, params)
builder.endObject()
.field(ACTIONS_FIELD, actions.toTypedArray())
.endObject()
.endObject()
return builder
}

override fun name(): String {
return BUCKET_LEVEL_TRIGGER_FIELD
}

@Throws(IOException::class)
override fun writeTo(out: StreamOutput) {
out.writeString(id)
out.writeString(name)
out.writeString(severity)
bucketSelector.writeTo(out)
out.writeCollection(actions)
}

fun asTemplateArg(): Map<String, Any> {
return mapOf(
ID_FIELD to id,
NAME_FIELD to name,
SEVERITY_FIELD to severity,
ACTIONS_FIELD to actions.map { it.asTemplateArg() },
PARENT_BUCKET_PATH to getParentBucketPath()
)
}

fun getParentBucketPath(): String {
return bucketSelector.parentBucketPath
}

companion object {
const val BUCKET_LEVEL_TRIGGER_FIELD = "bucket_level_trigger"
const val CONDITION_FIELD = "condition"
const val PARENT_BUCKET_PATH = "parentBucketPath"

val XCONTENT_REGISTRY = NamedXContentRegistry.Entry(Trigger::class.java, ParseField(BUCKET_LEVEL_TRIGGER_FIELD),
CheckedFunction { parseInner(it) })

@JvmStatic
@Throws(IOException::class)
fun parseInner(xcp: XContentParser): BucketLevelTrigger {
var id = UUIDs.base64UUID() // assign a default triggerId if one is not specified
lateinit var name: String
lateinit var severity: String
val actions: MutableList<Action> = mutableListOf()
ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp)
lateinit var bucketSelector: BucketSelectorExtAggregationBuilder

while (xcp.nextToken() != Token.END_OBJECT) {
val fieldName = xcp.currentName()

xcp.nextToken()
when (fieldName) {
ID_FIELD -> id = xcp.text()
NAME_FIELD -> name = xcp.text()
SEVERITY_FIELD -> severity = xcp.text()
CONDITION_FIELD -> {
// Using the trigger id as the name in the bucket selector since it is validated for uniqueness within Monitors.
// The contents of the trigger definition are round-tripped through parse and toXContent during Monitor creation
// ensuring that the id is available here in the version of the Monitor object that will be executed, even if the
// user submitted a custom trigger id after the condition definition.
bucketSelector = BucketSelectorExtAggregationBuilder.parse(id, xcp)
}
ACTIONS_FIELD -> {
ensureExpectedToken(Token.START_ARRAY, xcp.currentToken(), xcp)
while (xcp.nextToken() != Token.END_ARRAY) {
actions.add(Action.parse(xcp))
}
}
}
}

return BucketLevelTrigger(
id = requireNotNull(id) { "Trigger id is null." },
name = requireNotNull(name) { "Trigger name is null" },
severity = requireNotNull(severity) { "Trigger severity is null" },
bucketSelector = requireNotNull(bucketSelector) { "Trigger condition is null" },
actions = requireNotNull(actions) { "Trigger actions are null" })
}

@JvmStatic
@Throws(IOException::class)
fun readFrom(sin: StreamInput): BucketLevelTrigger {
return BucketLevelTrigger(sin)
}
}
}
Loading