diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt index cc0bfbd97..7326c7c01 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt @@ -284,7 +284,9 @@ class AlertService( retryPolicy: BackoffPolicy, allowUpdatingAcknowledgedAlert: Boolean = false ) { - val alertIndex = dataSources.alertsIndex + val alertsIndex = dataSources.alertsIndex + val alertsHistoryIndex = dataSources.alertsHistoryIndex + var requestsToRetry = alerts.flatMap { alert -> // We don't want to set the version when saving alerts because the MonitorRunner has first priority when writing alerts. // In the rare event that a user acknowledges an alert between when it's read and when it's written @@ -293,7 +295,7 @@ class AlertService( when (alert.state) { Alert.State.ACTIVE, Alert.State.ERROR -> { listOf>( - IndexRequest(alertIndex) + IndexRequest(alertsIndex) .routing(alert.monitorId) .source(alert.toXContentWithUser(XContentFactory.jsonBuilder())) .id(if (alert.id != Alert.NO_ID) alert.id else null) @@ -304,7 +306,7 @@ class AlertService( // and updated by the MonitorRunner if (allowUpdatingAcknowledgedAlert) { listOf>( - IndexRequest(alertIndex) + IndexRequest(alertsIndex) .routing(alert.monitorId) .source(alert.toXContentWithUser(XContentFactory.jsonBuilder())) .id(if (alert.id != Alert.NO_ID) alert.id else null) @@ -318,11 +320,11 @@ class AlertService( } Alert.State.COMPLETED -> { listOfNotNull>( - DeleteRequest(alertIndex, alert.id) + DeleteRequest(alertsIndex, alert.id) .routing(alert.monitorId), // Only add completed alert to history index if history is enabled - if (alertIndices.isAlertHistoryEnabled(dataSources)) { - IndexRequest(AlertIndices.ALERT_HISTORY_WRITE_INDEX) + if (alertIndices.isAlertHistoryEnabled()) { + IndexRequest(alertsHistoryIndex) .routing(alert.monitorId) .source(alert.toXContentWithUser(XContentFactory.jsonBuilder())) .id(alert.id) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt index bbeb7e99f..471fc1c83 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt @@ -52,7 +52,6 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon private val logger = LogManager.getLogger(javaClass) var monitorCtx: MonitorRunnerExecutionContext = MonitorRunnerExecutionContext() - private lateinit var runnerSupervisor: Job override val coroutineContext: CoroutineContext get() = Dispatchers.Default + runnerSupervisor @@ -184,7 +183,7 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon launch { try { monitorCtx.moveAlertsRetryPolicy!!.retry(logger) { - if (monitorCtx.alertIndices!!.isAlertInitialized()) { + if (monitorCtx.alertIndices!!.isAlertInitialized(job.dataSources)) { moveAlerts(monitorCtx.client!!, job.id, job) } } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/alerts/AlertIndices.kt b/alerting/src/main/kotlin/org/opensearch/alerting/alerts/AlertIndices.kt index e813e8d0d..c56621745 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/alerts/AlertIndices.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/alerts/AlertIndices.kt @@ -231,13 +231,25 @@ class AlertIndices( return alertIndexInitialized && alertHistoryIndexInitialized } - fun isAlertHistoryEnabled(dataSources: DataSources): Boolean { - if (dataSources.alertsIndex == ALERT_INDEX) { - return alertHistoryEnabled + fun isAlertInitialized(dataSources: DataSources): Boolean { + val alertsIndex = dataSources.alertsIndex + val alertsHistoryIndex = dataSources.alertsHistoryIndex + if (alertsIndex == ALERT_INDEX && alertsHistoryIndex == ALERT_HISTORY_WRITE_INDEX) { + return alertIndexInitialized && alertHistoryIndexInitialized + } + if ( + clusterService.state().metadata.indices.containsKey(alertsIndex) && + clusterService.state().metadata.hasAlias(alertsHistoryIndex) + ) { + return true } return false } + fun isAlertHistoryEnabled(): Boolean { + return alertHistoryEnabled + } + fun isFindingHistoryEnabled(): Boolean = findingHistoryEnabled suspend fun createOrUpdateAlertIndex() { @@ -265,6 +277,19 @@ class AlertIndices( if (dataSources.alertsIndex == ALERT_INDEX) { return createOrUpdateInitialAlertHistoryIndex() } + if (!clusterService.state().metadata.hasAlias(dataSources.alertsHistoryIndex)) { + createIndex( + dataSources.alertsHistoryIndexPattern ?: ALERT_HISTORY_INDEX_PATTERN, + alertMapping(), + dataSources.alertsHistoryIndex + ) + } else { + updateIndexMapping( + dataSources.alertsHistoryIndex ?: ALERT_HISTORY_WRITE_INDEX, + alertMapping(), + true + ) + } } suspend fun createOrUpdateInitialAlertHistoryIndex() { if (!alertHistoryIndexInitialized) { @@ -300,13 +325,15 @@ class AlertIndices( return createOrUpdateInitialFindingHistoryIndex() } val findingsIndex = dataSources.findingsIndex + val findingsIndexPattern = dataSources.findingsIndexPattern ?: FINDING_HISTORY_INDEX_PATTERN if (!clusterService.state().routingTable().hasIndex(findingsIndex)) { createIndex( - findingsIndex, - findingMapping() + findingsIndexPattern, + findingMapping(), + findingsIndex ) } else { - updateIndexMapping(findingsIndex, findingMapping(), false) + updateIndexMapping(findingsIndex, findingMapping(), true) } } @@ -339,6 +366,7 @@ class AlertIndices( targetIndex = IndexUtils.getIndexNameWithAlias(clusterState, index) } + // TODO call getMapping and compare actual mappings here instead of this if (targetIndex == IndexUtils.lastUpdatedAlertHistoryIndex || targetIndex == IndexUtils.lastUpdatedFindingHistoryIndex) { return } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/alerts/AlertMover.kt b/alerting/src/main/kotlin/org/opensearch/alerting/alerts/AlertMover.kt index 100281a87..058e02c22 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/alerts/AlertMover.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/alerts/AlertMover.kt @@ -37,11 +37,14 @@ import org.opensearch.search.builder.SearchSourceBuilder * 1. Find active alerts: * a. matching monitorId if no monitor is provided (postDelete) * b. matching monitorId and no triggerIds if monitor is provided (postIndex) - * 2. Move alerts over to [ALERT_HISTORY_WRITE_INDEX] as DELETED - * 3. Delete alerts from [ALERT_INDEX] + * 2. Move alerts over to DataSources.alertsHistoryIndex as DELETED + * 3. Delete alerts from monitor's DataSources.alertsIndex * 4. Schedule a retry if there were any failures */ -suspend fun moveAlerts(client: Client, monitorId: String, monitor: Monitor? = null) { +suspend fun moveAlerts(client: Client, monitorId: String, monitor: Monitor?) { + var alertIndex = monitor?.dataSources?.alertsIndex ?: ALERT_INDEX + var alertHistoryIndex = monitor?.dataSources?.alertsHistoryIndex ?: ALERT_HISTORY_WRITE_INDEX + val boolQuery = QueryBuilders.boolQuery() .filter(QueryBuilders.termQuery(Alert.MONITOR_ID_FIELD, monitorId)) @@ -53,7 +56,7 @@ suspend fun moveAlerts(client: Client, monitorId: String, monitor: Monitor? = nu .query(boolQuery) .version(true) - val activeAlertsRequest = SearchRequest(AlertIndices.ALERT_INDEX) + val activeAlertsRequest = SearchRequest(alertIndex) .routing(monitorId) .source(activeAlertsQuery) val response: SearchResponse = client.suspendUntil { search(activeAlertsRequest, it) } @@ -61,7 +64,7 @@ suspend fun moveAlerts(client: Client, monitorId: String, monitor: Monitor? = nu // If no alerts are found, simply return if (response.hits.totalHits?.value == 0L) return val indexRequests = response.hits.map { hit -> - IndexRequest(AlertIndices.ALERT_HISTORY_WRITE_INDEX) + IndexRequest(alertHistoryIndex) .routing(monitorId) .source( Alert.parse(alertContentParser(hit.sourceRef), hit.id, hit.version) @@ -76,7 +79,7 @@ suspend fun moveAlerts(client: Client, monitorId: String, monitor: Monitor? = nu val copyResponse: BulkResponse = client.suspendUntil { bulk(copyRequest, it) } val deleteRequests = copyResponse.items.filterNot { it.isFailed }.map { - DeleteRequest(AlertIndices.ALERT_INDEX, it.id) + DeleteRequest(alertIndex, it.id) .routing(monitorId) .version(it.version) .versionType(VersionType.EXTERNAL_GTE) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportAcknowledgeAlertAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportAcknowledgeAlertAction.kt index 4e6ee8bbf..f95520f55 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportAcknowledgeAlertAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportAcknowledgeAlertAction.kt @@ -147,6 +147,7 @@ class TransportAcknowledgeAlertAction @Inject constructor( } private suspend fun onSearchResponse(response: SearchResponse, monitor: Monitor) { + val alertsHistoryIndex = monitor.dataSources.alertsHistoryIndex val updateRequests = mutableListOf() val copyRequests = mutableListOf() response.hits.forEach { hit -> @@ -176,7 +177,7 @@ class TransportAcknowledgeAlertAction @Inject constructor( ) updateRequests.add(updateRequest) } else { - val copyRequest = IndexRequest(AlertIndices.ALERT_HISTORY_WRITE_INDEX) + val copyRequest = IndexRequest(alertsHistoryIndex) .routing(request.monitorId) .id(alert.id) .source( diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt index d602a449e..767da7f22 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt @@ -15,14 +15,17 @@ import org.opensearch.common.settings.Settings import org.opensearch.commons.alerting.action.AcknowledgeAlertRequest import org.opensearch.commons.alerting.action.AlertingActions import org.opensearch.commons.alerting.action.GetAlertsRequest +import org.opensearch.commons.alerting.model.Alert import org.opensearch.commons.alerting.model.DataSources import org.opensearch.commons.alerting.model.DocLevelMonitorInput import org.opensearch.commons.alerting.model.DocLevelQuery import org.opensearch.commons.alerting.model.ScheduledJob.Companion.SCHEDULED_JOBS_INDEX import org.opensearch.commons.alerting.model.Table +import org.opensearch.test.OpenSearchTestCase import java.time.ZonedDateTime import java.time.format.DateTimeFormatter import java.time.temporal.ChronoUnit.MILLIS +import java.util.concurrent.TimeUnit import java.util.stream.Collectors class MonitorDataSourcesIT : AlertingSingleNodeTestCase() { @@ -465,6 +468,54 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() { } } + fun `test search custom alerts history index`() { + val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3") + val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery)) + val trigger1 = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val trigger2 = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val customAlertsIndex = "custom_alerts_index" + val customAlertsHistoryIndex = "custom_alerts_history_index" + val customAlertsHistoryIndexPattern = "" + var monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger1, trigger2), + dataSources = DataSources( + alertsIndex = customAlertsIndex, + alertsHistoryIndex = customAlertsHistoryIndex, + alertsHistoryIndexPattern = customAlertsHistoryIndexPattern + ) + ) + val monitorResponse = createMonitor(monitor) + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) + val testDoc = """{ + "message" : "This is an error from IAD region", + "test_strict_date_time" : "$testTime", + "test_field" : "us-west-2" + }""" + assertFalse(monitorResponse?.id.isNullOrEmpty()) + monitor = monitorResponse!!.monitor + indexDoc(index, "1", testDoc) + val monitorId = monitorResponse.id + val executeMonitorResponse = executeMonitor(monitor, monitorId, false) + var alertsBefore = searchAlerts(monitorId, customAlertsIndex) + Assert.assertEquals(2, alertsBefore.size) + Assert.assertEquals(executeMonitorResponse!!.monitorRunResult.monitorName, monitor.name) + Assert.assertEquals(executeMonitorResponse.monitorRunResult.triggerResults.size, 2) + // Remove 1 trigger from monitor to force moveAlerts call to move alerts to history index + monitor = monitor.copy(triggers = listOf(trigger1)) + updateMonitor(monitor, monitorId) + + var alerts = listOf() + OpenSearchTestCase.waitUntil({ + alerts = searchAlerts(monitorId, customAlertsHistoryIndex) + if (alerts.size == 1) { + return@waitUntil true + } + return@waitUntil false + }, 30, TimeUnit.SECONDS) + assertEquals("Alerts from custom history index", 1, alerts.size) + } + fun `test get alerts by list of monitors containing both existent and non-existent ids`() { val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3") val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery))