Skip to content

Commit

Permalink
Custom history indicies (#616) (#621)
Browse files Browse the repository at this point in the history
Signed-off-by: Petar Dzepina <petar.dzepina@gmail.com>
(cherry picked from commit 7546b00)

Co-authored-by: Petar Dzepina <petar.dzepina@gmail.com>
  • Loading branch information
2 people authored and sbcd90 committed Nov 2, 2022
1 parent f66a2b8 commit dd3c801
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 21 deletions.
14 changes: 8 additions & 6 deletions alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,9 @@ class AlertService(
retryPolicy: BackoffPolicy,
allowUpdatingAcknowledgedAlert: Boolean = false
) {
val alertIndex = dataSources.alertsIndex
val alertsIndex = dataSources.alertsIndex
val alertsHistoryIndex = dataSources.alertsHistoryIndex

var requestsToRetry = alerts.flatMap { alert ->
// We don't want to set the version when saving alerts because the MonitorRunner has first priority when writing alerts.
// In the rare event that a user acknowledges an alert between when it's read and when it's written
Expand All @@ -293,7 +295,7 @@ class AlertService(
when (alert.state) {
Alert.State.ACTIVE, Alert.State.ERROR -> {
listOf<DocWriteRequest<*>>(
IndexRequest(alertIndex)
IndexRequest(alertsIndex)
.routing(alert.monitorId)
.source(alert.toXContentWithUser(XContentFactory.jsonBuilder()))
.id(if (alert.id != Alert.NO_ID) alert.id else null)
Expand All @@ -304,7 +306,7 @@ class AlertService(
// and updated by the MonitorRunner
if (allowUpdatingAcknowledgedAlert) {
listOf<DocWriteRequest<*>>(
IndexRequest(alertIndex)
IndexRequest(alertsIndex)
.routing(alert.monitorId)
.source(alert.toXContentWithUser(XContentFactory.jsonBuilder()))
.id(if (alert.id != Alert.NO_ID) alert.id else null)
Expand All @@ -318,11 +320,11 @@ class AlertService(
}
Alert.State.COMPLETED -> {
listOfNotNull<DocWriteRequest<*>>(
DeleteRequest(alertIndex, alert.id)
DeleteRequest(alertsIndex, alert.id)
.routing(alert.monitorId),
// Only add completed alert to history index if history is enabled
if (alertIndices.isAlertHistoryEnabled(dataSources)) {
IndexRequest(AlertIndices.ALERT_HISTORY_WRITE_INDEX)
if (alertIndices.isAlertHistoryEnabled()) {
IndexRequest(alertsHistoryIndex)
.routing(alert.monitorId)
.source(alert.toXContentWithUser(XContentFactory.jsonBuilder()))
.id(alert.id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
private val logger = LogManager.getLogger(javaClass)

var monitorCtx: MonitorRunnerExecutionContext = MonitorRunnerExecutionContext()

private lateinit var runnerSupervisor: Job
override val coroutineContext: CoroutineContext
get() = Dispatchers.Default + runnerSupervisor
Expand Down Expand Up @@ -184,7 +183,7 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
launch {
try {
monitorCtx.moveAlertsRetryPolicy!!.retry(logger) {
if (monitorCtx.alertIndices!!.isAlertInitialized()) {
if (monitorCtx.alertIndices!!.isAlertInitialized(job.dataSources)) {
moveAlerts(monitorCtx.client!!, job.id, job)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,13 +231,25 @@ class AlertIndices(
return alertIndexInitialized && alertHistoryIndexInitialized
}

fun isAlertHistoryEnabled(dataSources: DataSources): Boolean {
if (dataSources.alertsIndex == ALERT_INDEX) {
return alertHistoryEnabled
fun isAlertInitialized(dataSources: DataSources): Boolean {
val alertsIndex = dataSources.alertsIndex
val alertsHistoryIndex = dataSources.alertsHistoryIndex
if (alertsIndex == ALERT_INDEX && alertsHistoryIndex == ALERT_HISTORY_WRITE_INDEX) {
return alertIndexInitialized && alertHistoryIndexInitialized
}
if (
clusterService.state().metadata.indices.containsKey(alertsIndex) &&
clusterService.state().metadata.hasAlias(alertsHistoryIndex)
) {
return true
}
return false
}

fun isAlertHistoryEnabled(): Boolean {
return alertHistoryEnabled
}

fun isFindingHistoryEnabled(): Boolean = findingHistoryEnabled

suspend fun createOrUpdateAlertIndex() {
Expand Down Expand Up @@ -265,6 +277,19 @@ class AlertIndices(
if (dataSources.alertsIndex == ALERT_INDEX) {
return createOrUpdateInitialAlertHistoryIndex()
}
if (!clusterService.state().metadata.hasAlias(dataSources.alertsHistoryIndex)) {
createIndex(
dataSources.alertsHistoryIndexPattern ?: ALERT_HISTORY_INDEX_PATTERN,
alertMapping(),
dataSources.alertsHistoryIndex
)
} else {
updateIndexMapping(
dataSources.alertsHistoryIndex ?: ALERT_HISTORY_WRITE_INDEX,
alertMapping(),
true
)
}
}
suspend fun createOrUpdateInitialAlertHistoryIndex() {
if (!alertHistoryIndexInitialized) {
Expand Down Expand Up @@ -300,13 +325,15 @@ class AlertIndices(
return createOrUpdateInitialFindingHistoryIndex()
}
val findingsIndex = dataSources.findingsIndex
val findingsIndexPattern = dataSources.findingsIndexPattern ?: FINDING_HISTORY_INDEX_PATTERN
if (!clusterService.state().routingTable().hasIndex(findingsIndex)) {
createIndex(
findingsIndex,
findingMapping()
findingsIndexPattern,
findingMapping(),
findingsIndex
)
} else {
updateIndexMapping(findingsIndex, findingMapping(), false)
updateIndexMapping(findingsIndex, findingMapping(), true)
}
}

Expand Down Expand Up @@ -339,6 +366,7 @@ class AlertIndices(
targetIndex = IndexUtils.getIndexNameWithAlias(clusterState, index)
}

// TODO call getMapping and compare actual mappings here instead of this
if (targetIndex == IndexUtils.lastUpdatedAlertHistoryIndex || targetIndex == IndexUtils.lastUpdatedFindingHistoryIndex) {
return
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,14 @@ import org.opensearch.search.builder.SearchSourceBuilder
* 1. Find active alerts:
* a. matching monitorId if no monitor is provided (postDelete)
* b. matching monitorId and no triggerIds if monitor is provided (postIndex)
* 2. Move alerts over to [ALERT_HISTORY_WRITE_INDEX] as DELETED
* 3. Delete alerts from [ALERT_INDEX]
* 2. Move alerts over to DataSources.alertsHistoryIndex as DELETED
* 3. Delete alerts from monitor's DataSources.alertsIndex
* 4. Schedule a retry if there were any failures
*/
suspend fun moveAlerts(client: Client, monitorId: String, monitor: Monitor? = null) {
suspend fun moveAlerts(client: Client, monitorId: String, monitor: Monitor?) {
var alertIndex = monitor?.dataSources?.alertsIndex ?: ALERT_INDEX
var alertHistoryIndex = monitor?.dataSources?.alertsHistoryIndex ?: ALERT_HISTORY_WRITE_INDEX

val boolQuery = QueryBuilders.boolQuery()
.filter(QueryBuilders.termQuery(Alert.MONITOR_ID_FIELD, monitorId))

Expand All @@ -53,15 +56,15 @@ suspend fun moveAlerts(client: Client, monitorId: String, monitor: Monitor? = nu
.query(boolQuery)
.version(true)

val activeAlertsRequest = SearchRequest(AlertIndices.ALERT_INDEX)
val activeAlertsRequest = SearchRequest(alertIndex)
.routing(monitorId)
.source(activeAlertsQuery)
val response: SearchResponse = client.suspendUntil { search(activeAlertsRequest, it) }

// If no alerts are found, simply return
if (response.hits.totalHits?.value == 0L) return
val indexRequests = response.hits.map { hit ->
IndexRequest(AlertIndices.ALERT_HISTORY_WRITE_INDEX)
IndexRequest(alertHistoryIndex)
.routing(monitorId)
.source(
Alert.parse(alertContentParser(hit.sourceRef), hit.id, hit.version)
Expand All @@ -76,7 +79,7 @@ suspend fun moveAlerts(client: Client, monitorId: String, monitor: Monitor? = nu
val copyResponse: BulkResponse = client.suspendUntil { bulk(copyRequest, it) }

val deleteRequests = copyResponse.items.filterNot { it.isFailed }.map {
DeleteRequest(AlertIndices.ALERT_INDEX, it.id)
DeleteRequest(alertIndex, it.id)
.routing(monitorId)
.version(it.version)
.versionType(VersionType.EXTERNAL_GTE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ class TransportAcknowledgeAlertAction @Inject constructor(
}

private suspend fun onSearchResponse(response: SearchResponse, monitor: Monitor) {
val alertsHistoryIndex = monitor.dataSources.alertsHistoryIndex
val updateRequests = mutableListOf<UpdateRequest>()
val copyRequests = mutableListOf<IndexRequest>()
response.hits.forEach { hit ->
Expand Down Expand Up @@ -176,7 +177,7 @@ class TransportAcknowledgeAlertAction @Inject constructor(
)
updateRequests.add(updateRequest)
} else {
val copyRequest = IndexRequest(AlertIndices.ALERT_HISTORY_WRITE_INDEX)
val copyRequest = IndexRequest(alertsHistoryIndex)
.routing(request.monitorId)
.id(alert.id)
.source(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,17 @@ import org.opensearch.common.settings.Settings
import org.opensearch.commons.alerting.action.AcknowledgeAlertRequest
import org.opensearch.commons.alerting.action.AlertingActions
import org.opensearch.commons.alerting.action.GetAlertsRequest
import org.opensearch.commons.alerting.model.Alert
import org.opensearch.commons.alerting.model.DataSources
import org.opensearch.commons.alerting.model.DocLevelMonitorInput
import org.opensearch.commons.alerting.model.DocLevelQuery
import org.opensearch.commons.alerting.model.ScheduledJob.Companion.SCHEDULED_JOBS_INDEX
import org.opensearch.commons.alerting.model.Table
import org.opensearch.test.OpenSearchTestCase
import java.time.ZonedDateTime
import java.time.format.DateTimeFormatter
import java.time.temporal.ChronoUnit.MILLIS
import java.util.concurrent.TimeUnit
import java.util.stream.Collectors

class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
Expand Down Expand Up @@ -465,6 +468,54 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
}
}

fun `test search custom alerts history index`() {
val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3")
val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery))
val trigger1 = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
val trigger2 = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
val customAlertsIndex = "custom_alerts_index"
val customAlertsHistoryIndex = "custom_alerts_history_index"
val customAlertsHistoryIndexPattern = "<custom_alerts_history_index-{now/d}-1>"
var monitor = randomDocumentLevelMonitor(
inputs = listOf(docLevelInput),
triggers = listOf(trigger1, trigger2),
dataSources = DataSources(
alertsIndex = customAlertsIndex,
alertsHistoryIndex = customAlertsHistoryIndex,
alertsHistoryIndexPattern = customAlertsHistoryIndexPattern
)
)
val monitorResponse = createMonitor(monitor)
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
val testDoc = """{
"message" : "This is an error from IAD region",
"test_strict_date_time" : "$testTime",
"test_field" : "us-west-2"
}"""
assertFalse(monitorResponse?.id.isNullOrEmpty())
monitor = monitorResponse!!.monitor
indexDoc(index, "1", testDoc)
val monitorId = monitorResponse.id
val executeMonitorResponse = executeMonitor(monitor, monitorId, false)
var alertsBefore = searchAlerts(monitorId, customAlertsIndex)
Assert.assertEquals(2, alertsBefore.size)
Assert.assertEquals(executeMonitorResponse!!.monitorRunResult.monitorName, monitor.name)
Assert.assertEquals(executeMonitorResponse.monitorRunResult.triggerResults.size, 2)
// Remove 1 trigger from monitor to force moveAlerts call to move alerts to history index
monitor = monitor.copy(triggers = listOf(trigger1))
updateMonitor(monitor, monitorId)

var alerts = listOf<Alert>()
OpenSearchTestCase.waitUntil({
alerts = searchAlerts(monitorId, customAlertsHistoryIndex)
if (alerts.size == 1) {
return@waitUntil true
}
return@waitUntil false
}, 30, TimeUnit.SECONDS)
assertEquals("Alerts from custom history index", 1, alerts.size)
}

fun `test get alerts by list of monitors containing both existent and non-existent ids`() {
val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3")
val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery))
Expand Down

0 comments on commit dd3c801

Please sign in to comment.