Skip to content

Commit

Permalink
optimize to fetch only fields relevant to doc level queries in doc le…
Browse files Browse the repository at this point in the history
…vel monitor instead of entire _source for each doc (opensearch-project#1441)

* optimize to fetch only fields relevant to doc level queries in doc level monitor

Signed-off-by: Surya Sashank Nistala <snistala@amazon.com>

* fix test for settings check

Signed-off-by: Surya Sashank Nistala <snistala@amazon.com>

* fix ktlint

Signed-off-by: Surya Sashank Nistala <snistala@amazon.com>

---------

Signed-off-by: Surya Sashank Nistala <snistala@amazon.com>
  • Loading branch information
eirsep committed Mar 7, 2024
1 parent ecc7713 commit 8be9fba
Show file tree
Hide file tree
Showing 7 changed files with 438 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
LegacyOpenDistroAlertingSettings.REQUEST_TIMEOUT,
LegacyOpenDistroAlertingSettings.MAX_ACTION_THROTTLE_VALUE,
LegacyOpenDistroAlertingSettings.FILTER_BY_BACKEND_ROLES,
AlertingSettings.DOC_LEVEL_MONITOR_FETCH_ONLY_QUERY_FIELDS_ENABLED,
DestinationSettings.EMAIL_USERNAME,
DestinationSettings.EMAIL_PASSWORD,
DestinationSettings.ALLOW_LIST,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ import org.opensearch.alerting.model.MonitorRunResult
import org.opensearch.alerting.model.userErrorMessage
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.script.DocumentLevelTriggerExecutionContext
import org.opensearch.alerting.settings.AlertingSettings.Companion.PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT
import org.opensearch.alerting.settings.AlertingSettings.Companion.PERCOLATE_QUERY_MAX_NUM_DOCS_IN_MEMORY
import org.opensearch.alerting.util.AlertingException
import org.opensearch.alerting.util.IndexUtils
import org.opensearch.alerting.util.defaultToPerExecutionAction
Expand Down Expand Up @@ -236,6 +234,28 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
}
}

val fieldsToBeQueried = mutableSetOf<String>()
if (monitorCtx.fetchOnlyQueryFieldNames) {
for (it in queries) {
if (it.queryFieldNames.isEmpty()) {
fieldsToBeQueried.clear()
logger.debug(
"Monitor ${monitor.id} : " +
"Doc Level query ${it.id} : ${it.query} doesn't have queryFieldNames populated. " +
"Cannot optimize monitor to fetch only query-relevant fields. " +
"Querying entire doc source."
)
break
}
fieldsToBeQueried.addAll(it.queryFieldNames)
}
if (fieldsToBeQueried.isNotEmpty())
logger.debug(
"Monitor ${monitor.id} Querying only fields " +
"${fieldsToBeQueried.joinToString()} instead of entire _source of documents"
)
}

// Prepare DocumentExecutionContext for each index
val docExecutionContext = DocumentExecutionContext(queries, indexLastRunContext, indexUpdatedRunContext)

Expand All @@ -252,6 +272,7 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
docsToQueries,
updatedIndexNames,
concreteIndicesSeenSoFar,
ArrayList(fieldsToBeQueried)
)
}
}
Expand Down Expand Up @@ -683,6 +704,7 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
docsToQueries: MutableMap<String, MutableList<String>>,
monitorInputIndices: List<String>,
concreteIndices: List<String>,
fieldsToBeQueried: List<String>,
) {
val count: Int = docExecutionCtx.updatedLastRunContext["shards_count"] as Int
for (i: Int in 0 until count) {
Expand All @@ -697,8 +719,8 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
shard,
prevSeqNo,
maxSeqNo,
null,
docIds
docIds,
fieldsToBeQueried
)
val startTime = System.currentTimeMillis()
transformedDocs.addAll(
Expand Down Expand Up @@ -789,19 +811,15 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
shard: String,
prevSeqNo: Long?,
maxSeqNo: Long,
query: String?,
docIds: List<String>? = null,
fieldsToFetch: List<String>,
): SearchHits {
if (prevSeqNo?.equals(maxSeqNo) == true && maxSeqNo != 0L) {
return SearchHits.empty()
}
val boolQueryBuilder = BoolQueryBuilder()
boolQueryBuilder.filter(QueryBuilders.rangeQuery("_seq_no").gt(prevSeqNo).lte(maxSeqNo))

if (query != null) {
boolQueryBuilder.must(QueryBuilders.queryStringQuery(query))
}

if (!docIds.isNullOrEmpty()) {
boolQueryBuilder.filter(QueryBuilders.termsQuery("_id", docIds))
}
Expand All @@ -816,6 +834,13 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
.size(10000)
)
.preference(Preference.PRIMARY_FIRST.type())

if (monitorCtx.fetchOnlyQueryFieldNames && fieldsToFetch.isNotEmpty()) {
request.source().fetchSource(false)
for (field in fieldsToFetch) {
request.source().fetchField(field)
}
}
val response: SearchResponse = monitorCtx.client!!.suspendUntil { monitorCtx.client!!.search(request, it) }
if (response.status() !== RestStatus.OK) {
logger.error("Failed search shard. Response: $response")
Expand Down Expand Up @@ -906,7 +931,11 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
): List<Pair<String, TransformedDocDto>> {
return hits.mapNotNull(fun(hit: SearchHit): Pair<String, TransformedDocDto>? {
try {
val sourceMap = hit.sourceAsMap
val sourceMap = if (hit.hasSource()) {
hit.sourceAsMap
} else {
constructSourceMapFromFieldsInHit(hit)
}
transformDocumentFieldNames(
sourceMap,
conflictingFields,
Expand All @@ -927,6 +956,19 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
})
}

private fun constructSourceMapFromFieldsInHit(hit: SearchHit): MutableMap<String, Any> {
if (hit.fields == null)
return mutableMapOf()
val sourceMap: MutableMap<String, Any> = mutableMapOf()
for (field in hit.fields) {
if (field.value.values != null && field.value.values.isNotEmpty())
if (field.value.values.size == 1) {
sourceMap[field.key] = field.value.values[0]
} else sourceMap[field.key] = field.value.values
}
return sourceMap
}

/**
* Traverses document fields in leaves recursively and appends [fieldNameSuffixIndex] to field names with same names
* but different mappings & [fieldNameSuffixPattern] to field names which have unique names.
Expand Down Expand Up @@ -984,15 +1026,15 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
*
*/
private fun isInMemoryDocsSizeExceedingMemoryLimit(docsBytesSize: Long, monitorCtx: MonitorRunnerExecutionContext): Boolean {
var thresholdPercentage = PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT.get(monitorCtx.settings)
var thresholdPercentage = monitorCtx.percQueryDocsSizeMemoryPercentageLimit
val heapMaxBytes = monitorCtx.jvmStats!!.mem.heapMax.bytes
val thresholdBytes = (thresholdPercentage.toDouble() / 100.0) * heapMaxBytes

return docsBytesSize > thresholdBytes
}

private fun isInMemoryNumDocsExceedingMaxDocsPerPercolateQueryLimit(numDocs: Int, monitorCtx: MonitorRunnerExecutionContext): Boolean {
var maxNumDocsThreshold = PERCOLATE_QUERY_MAX_NUM_DOCS_IN_MEMORY.get(monitorCtx.settings)
var maxNumDocsThreshold = monitorCtx.percQueryMaxNumDocsInMemory
return numDocs >= maxNumDocsThreshold
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,5 +50,9 @@ data class MonitorRunnerExecutionContext(

@Volatile var maxActionableAlertCount: Long = AlertingSettings.DEFAULT_MAX_ACTIONABLE_ALERT_COUNT,
@Volatile var indexTimeout: TimeValue? = null,
@Volatile var findingsIndexBatchSize: Int = AlertingSettings.DEFAULT_FINDINGS_INDEXING_BATCH_SIZE
@Volatile var findingsIndexBatchSize: Int = AlertingSettings.DEFAULT_FINDINGS_INDEXING_BATCH_SIZE,
@Volatile var fetchOnlyQueryFieldNames: Boolean = true,
@Volatile var percQueryMaxNumDocsInMemory: Int = AlertingSettings.DEFAULT_PERCOLATE_QUERY_NUM_DOCS_IN_MEMORY,
@Volatile var percQueryDocsSizeMemoryPercentageLimit: Int =
AlertingSettings.DEFAULT_PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT,
)
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,14 @@ import org.opensearch.alerting.script.TriggerExecutionContext
import org.opensearch.alerting.settings.AlertingSettings
import org.opensearch.alerting.settings.AlertingSettings.Companion.ALERT_BACKOFF_COUNT
import org.opensearch.alerting.settings.AlertingSettings.Companion.ALERT_BACKOFF_MILLIS
import org.opensearch.alerting.settings.AlertingSettings.Companion.DOC_LEVEL_MONITOR_FETCH_ONLY_QUERY_FIELDS_ENABLED
import org.opensearch.alerting.settings.AlertingSettings.Companion.FINDINGS_INDEXING_BATCH_SIZE
import org.opensearch.alerting.settings.AlertingSettings.Companion.INDEX_TIMEOUT
import org.opensearch.alerting.settings.AlertingSettings.Companion.MAX_ACTIONABLE_ALERT_COUNT
import org.opensearch.alerting.settings.AlertingSettings.Companion.MOVE_ALERTS_BACKOFF_COUNT
import org.opensearch.alerting.settings.AlertingSettings.Companion.MOVE_ALERTS_BACKOFF_MILLIS
import org.opensearch.alerting.settings.AlertingSettings.Companion.PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT
import org.opensearch.alerting.settings.AlertingSettings.Companion.PERCOLATE_QUERY_MAX_NUM_DOCS_IN_MEMORY
import org.opensearch.alerting.settings.DestinationSettings.Companion.ALLOW_LIST
import org.opensearch.alerting.settings.DestinationSettings.Companion.HOST_DENY_LIST
import org.opensearch.alerting.settings.DestinationSettings.Companion.loadDestinationSettings
Expand Down Expand Up @@ -182,6 +185,23 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
monitorCtx.findingsIndexBatchSize = it
}

monitorCtx.fetchOnlyQueryFieldNames = DOC_LEVEL_MONITOR_FETCH_ONLY_QUERY_FIELDS_ENABLED.get(monitorCtx.settings)
monitorCtx.clusterService!!.clusterSettings.addSettingsUpdateConsumer(DOC_LEVEL_MONITOR_FETCH_ONLY_QUERY_FIELDS_ENABLED) {
monitorCtx.fetchOnlyQueryFieldNames = it
}

monitorCtx.percQueryMaxNumDocsInMemory = PERCOLATE_QUERY_MAX_NUM_DOCS_IN_MEMORY.get(monitorCtx.settings)
monitorCtx.clusterService!!.clusterSettings.addSettingsUpdateConsumer(PERCOLATE_QUERY_MAX_NUM_DOCS_IN_MEMORY) {
monitorCtx.percQueryMaxNumDocsInMemory = it
}

monitorCtx.percQueryDocsSizeMemoryPercentageLimit =
PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT.get(monitorCtx.settings)
monitorCtx.clusterService!!.clusterSettings
.addSettingsUpdateConsumer(PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT) {
monitorCtx.percQueryDocsSizeMemoryPercentageLimit = it
}

return this
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ class AlertingSettings {
companion object {
const val DEFAULT_MAX_ACTIONABLE_ALERT_COUNT = 50L
const val DEFAULT_FINDINGS_INDEXING_BATCH_SIZE = 1000
const val DEFAULT_PERCOLATE_QUERY_NUM_DOCS_IN_MEMORY = 50000
const val DEFAULT_PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT = 10

val ALERTING_MAX_MONITORS = Setting.intSetting(
"plugins.alerting.monitor.max_monitors",
Expand All @@ -44,7 +46,17 @@ class AlertingSettings {
*/
val PERCOLATE_QUERY_MAX_NUM_DOCS_IN_MEMORY = Setting.intSetting(
"plugins.alerting.monitor.percolate_query_max_num_docs_in_memory",
300000, 1000,
DEFAULT_PERCOLATE_QUERY_NUM_DOCS_IN_MEMORY, 1000,
Setting.Property.NodeScope, Setting.Property.Dynamic
)

/**
* Boolean setting to enable/disable optimizing doc level monitors by fetchign only fields mentioned in queries.
* Enabled by default. If disabled, will fetch entire source of documents while fetch data from shards.
*/
val DOC_LEVEL_MONITOR_FETCH_ONLY_QUERY_FIELDS_ENABLED = Setting.boolSetting(
"plugins.alerting.monitor.doc_level_monitor_query_field_names_enabled",
true,
Setting.Property.NodeScope, Setting.Property.Dynamic
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import org.apache.http.entity.StringEntity
import org.opensearch.action.search.SearchResponse
import org.opensearch.alerting.alerts.AlertIndices.Companion.ALL_ALERT_INDEX_PATTERN
import org.opensearch.alerting.alerts.AlertIndices.Companion.ALL_FINDING_INDEX_PATTERN
import org.opensearch.alerting.settings.AlertingSettings
import org.opensearch.client.Response
import org.opensearch.client.ResponseException
import org.opensearch.common.xcontent.json.JsonXContent
Expand Down Expand Up @@ -75,6 +76,152 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() {
assertEquals("Alert saved for test monitor", 0, alerts.size)
}

fun `test dryrun execute monitor with queryFieldNames set up with correct field`() {

val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
val testDoc = """{
"message" : "This is an error from IAD region",
"test_strict_date_time" : "$testTime",
"test_field" : "us-west-2"
}"""

val index = createTestIndex()

val docQuery =
DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3", fields = listOf(), queryFieldNames = listOf("test_field"))
val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery))

val action = randomAction(template = randomTemplateScript("Hello {{ctx.monitor.name}}"), destinationId = createDestination().id)
val monitor = randomDocumentLevelMonitor(
inputs = listOf(docLevelInput),
triggers = listOf(randomDocumentLevelTrigger(condition = ALWAYS_RUN, actions = listOf(action)))
)

indexDoc(index, "1", testDoc)

val response = executeMonitor(monitor, params = DRYRUN_MONITOR)

val output = entityAsMap(response)
assertEquals(monitor.name, output["monitor_name"])

assertEquals(1, output.objectMap("trigger_results").values.size)

for (triggerResult in output.objectMap("trigger_results").values) {
assertEquals(1, triggerResult.objectMap("action_results").values.size)
for (alertActionResult in triggerResult.objectMap("action_results").values) {
for (actionResult in alertActionResult.values) {
@Suppress("UNCHECKED_CAST") val actionOutput = (actionResult as Map<String, Map<String, String>>)["output"]
as Map<String, String>
assertEquals("Hello ${monitor.name}", actionOutput["subject"])
assertEquals("Hello ${monitor.name}", actionOutput["message"])
}
}
}

val alerts = searchAlerts(monitor)
assertEquals("Alert saved for test monitor", 0, alerts.size)
}

fun `test dryrun execute monitor with queryFieldNames set up with wrong field`() {

val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
val testDoc = """{
"message" : "This is an error from IAD region",
"test_strict_date_time" : "$testTime",
"test_field" : "us-west-2"
}"""

val index = createTestIndex()
// using wrong field name
val docQuery = DocLevelQuery(
query = "test_field:\"us-west-2\"",
name = "3",
fields = listOf(),
queryFieldNames = listOf("wrong_field")
)
val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery))

val action = randomAction(template = randomTemplateScript("Hello {{ctx.monitor.name}}"), destinationId = createDestination().id)
val monitor = randomDocumentLevelMonitor(
inputs = listOf(docLevelInput),
triggers = listOf(randomDocumentLevelTrigger(condition = ALWAYS_RUN, actions = listOf(action)))
)

indexDoc(index, "1", testDoc)

val response = executeMonitor(monitor, params = DRYRUN_MONITOR)

val output = entityAsMap(response)
assertEquals(monitor.name, output["monitor_name"])

assertEquals(1, output.objectMap("trigger_results").values.size)

for (triggerResult in output.objectMap("trigger_results").values) {
assertEquals(0, triggerResult.objectMap("action_results").values.size)
for (alertActionResult in triggerResult.objectMap("action_results").values) {
for (actionResult in alertActionResult.values) {
@Suppress("UNCHECKED_CAST") val actionOutput = (actionResult as Map<String, Map<String, String>>)["output"]
as Map<String, String>
assertEquals("Hello ${monitor.name}", actionOutput["subject"])
assertEquals("Hello ${monitor.name}", actionOutput["message"])
}
}
}

val alerts = searchAlerts(monitor)
assertEquals("Alert saved for test monitor", 0, alerts.size)
}

fun `test fetch_query_field_names setting is disabled by configuring queryFieldNames set up with wrong field still works`() {
adminClient().updateSettings(AlertingSettings.DOC_LEVEL_MONITOR_FETCH_ONLY_QUERY_FIELDS_ENABLED.key, "false")
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
val testDoc = """{
"message" : "This is an error from IAD region",
"test_strict_date_time" : "$testTime",
"test_field" : "us-west-2"
}"""

val index = createTestIndex()
// using wrong field name
val docQuery = DocLevelQuery(
query = "test_field:\"us-west-2\"",
name = "3",
fields = listOf(),
queryFieldNames = listOf("wrong_field")
)
val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery))

val action = randomAction(template = randomTemplateScript("Hello {{ctx.monitor.name}}"), destinationId = createDestination().id)
val monitor = randomDocumentLevelMonitor(
inputs = listOf(docLevelInput),
triggers = listOf(randomDocumentLevelTrigger(condition = ALWAYS_RUN, actions = listOf(action)))
)

indexDoc(index, "1", testDoc)

val response = executeMonitor(monitor, params = DRYRUN_MONITOR)

val output = entityAsMap(response)
assertEquals(monitor.name, output["monitor_name"])

assertEquals(1, output.objectMap("trigger_results").values.size)

for (triggerResult in output.objectMap("trigger_results").values) {
assertEquals(1, triggerResult.objectMap("action_results").values.size)
for (alertActionResult in triggerResult.objectMap("action_results").values) {
for (actionResult in alertActionResult.values) {
@Suppress("UNCHECKED_CAST") val actionOutput = (actionResult as Map<String, Map<String, String>>)["output"]
as Map<String, String>
assertEquals("Hello ${monitor.name}", actionOutput["subject"])
assertEquals("Hello ${monitor.name}", actionOutput["message"])
}
}
}

val alerts = searchAlerts(monitor)
assertEquals("Alert saved for test monitor", 0, alerts.size)
}

fun `test execute monitor returns search result with dryrun`() {
val testIndex = createTestIndex()
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
Expand Down
Loading

0 comments on commit 8be9fba

Please sign in to comment.