Skip to content

Commit

Permalink
Addressing review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Megha Goyal <goyamegh@amazon.com>
  • Loading branch information
goyamegh committed Feb 5, 2024
1 parent 03b86ae commit ae32748
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,14 @@ import org.apache.logging.log4j.LogManager
import org.opensearch.ExceptionsHelper
import org.opensearch.OpenSearchStatusException
import org.opensearch.action.DocWriteRequest
import org.opensearch.action.admin.indices.refresh.RefreshAction
import org.opensearch.action.admin.indices.refresh.RefreshRequest
import org.opensearch.action.bulk.BulkRequest
import org.opensearch.action.bulk.BulkResponse
import org.opensearch.action.index.IndexRequest
import org.opensearch.action.search.SearchAction
import org.opensearch.action.search.SearchRequest
import org.opensearch.action.search.SearchResponse
import org.opensearch.action.support.WriteRequest
import org.opensearch.alerting.model.DocumentExecutionContext
import org.opensearch.alerting.model.DocumentLevelTriggerRunResult
import org.opensearch.alerting.model.InputRunResults
Expand Down Expand Up @@ -395,7 +396,6 @@ object DocumentLevelMonitorRunner : MonitorRunner() {

val actionCtx = triggerCtx.copy(
triggeredDocs = triggerResult.triggeredDocs,
// confirm if this is right or only trigger-able findings should be present in this list
relatedFindings = findingToDocPairs.map { it.first },
error = monitorResult.error ?: triggerResult.error
)
Expand Down Expand Up @@ -477,10 +477,6 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
val findingDocPairs = mutableListOf<Pair<String, String>>()
val findings = mutableListOf<Finding>()
val indexRequests = mutableListOf<IndexRequest>()
monitorCtx.findingsIndexBatchSize = FINDINGS_INDEXING_BATCH_SIZE.get(monitorCtx.settings)
monitorCtx.clusterService!!.clusterSettings.addSettingsUpdateConsumer(FINDINGS_INDEXING_BATCH_SIZE) {
monitorCtx.findingsIndexBatchSize = it
}

docsToQueries.forEach {
val triggeredQueries = it.value.map { queryId -> idQueryMap[queryId]!! }
Expand All @@ -507,22 +503,15 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
.string()
logger.debug("Findings: $findingStr")

if (indexRequests.size > monitorCtx.findingsIndexBatchSize) {
// make bulk indexing call here and flush the indexRequest object
bulkIndexFindings(monitor, monitorCtx, indexRequests)
indexRequests.clear()
} else {
if (shouldCreateFinding) {
indexRequests += IndexRequest(monitor.dataSources.findingsIndex)
.source(findingStr, XContentType.JSON)
.id(finding.id)
.routing(finding.id)
.opType(DocWriteRequest.OpType.INDEX)
}
if (shouldCreateFinding) {
indexRequests += IndexRequest(monitor.dataSources.findingsIndex)
.source(findingStr, XContentType.JSON)
.id(finding.id)
.opType(DocWriteRequest.OpType.CREATE)
}
}

if (indexRequests.size <= monitorCtx.findingsIndexBatchSize) {
if (indexRequests.isNotEmpty()) {
bulkIndexFindings(monitor, monitorCtx, indexRequests)
}

Expand All @@ -542,9 +531,11 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
monitorCtx: MonitorRunnerExecutionContext,
indexRequests: List<IndexRequest>
) {
if (indexRequests.isNotEmpty()) {
monitorCtx.findingsIndexBatchSize = FINDINGS_INDEXING_BATCH_SIZE.get(monitorCtx.settings)

indexRequests.chunked(monitorCtx.findingsIndexBatchSize).forEach { batch ->
val bulkResponse: BulkResponse = monitorCtx.client!!.suspendUntil {
bulk(BulkRequest().add(indexRequests).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE), it)
bulk(BulkRequest().add(indexRequests), it)
}
if (bulkResponse.hasFailures()) {
bulkResponse.items.forEach { item ->
Expand All @@ -556,6 +547,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
logger.debug("[${bulkResponse.items.size}] All findings successfully indexed.")
}
}
monitorCtx.client!!.execute(RefreshAction.INSTANCE, RefreshRequest(monitor.dataSources.findingsIndex))
}

private fun publishFinding(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.opensearch.alerting.model.WorkflowRunResult
import org.opensearch.alerting.model.destination.DestinationContextFactory
import org.opensearch.alerting.opensearchapi.retry
import org.opensearch.alerting.script.TriggerExecutionContext
import org.opensearch.alerting.settings.AlertingSettings
import org.opensearch.alerting.settings.AlertingSettings.Companion.ALERT_BACKOFF_COUNT
import org.opensearch.alerting.settings.AlertingSettings.Companion.ALERT_BACKOFF_MILLIS
import org.opensearch.alerting.settings.AlertingSettings.Companion.INDEX_TIMEOUT
Expand Down Expand Up @@ -169,6 +170,10 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon

monitorCtx.indexTimeout = INDEX_TIMEOUT.get(monitorCtx.settings)

monitorCtx.clusterService!!.clusterSettings.addSettingsUpdateConsumer(AlertingSettings.FINDINGS_INDEXING_BATCH_SIZE) {
monitorCtx.findingsIndexBatchSize = it
}

return this
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class AlertingSettings {

companion object {
const val DEFAULT_MAX_ACTIONABLE_ALERT_COUNT = 50L
const val DEFAULT_FINDINGS_INDEXING_BATCH_SIZE = 10000
const val DEFAULT_FINDINGS_INDEXING_BATCH_SIZE = 1000

val ALERTING_MAX_MONITORS = Setting.intSetting(
"plugins.alerting.monitor.max_monitors",
Expand Down Expand Up @@ -158,7 +158,7 @@ class AlertingSettings {
val FINDINGS_INDEXING_BATCH_SIZE = Setting.intSetting(
"plugins.alerting.alert_findings_indexing_batch_size",
DEFAULT_FINDINGS_INDEXING_BATCH_SIZE,
0,
1,
Setting.Property.NodeScope, Setting.Property.Dynamic
)
}
Expand Down

0 comments on commit ae32748

Please sign in to comment.