Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimize to fetch only fields relevant to doc level queries in doc level monitor instead of entire _source for each doc #1441

Merged
merged 3 commits into from
Feb 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
LegacyOpenDistroAlertingSettings.REQUEST_TIMEOUT,
LegacyOpenDistroAlertingSettings.MAX_ACTION_THROTTLE_VALUE,
LegacyOpenDistroAlertingSettings.FILTER_BY_BACKEND_ROLES,
AlertingSettings.DOC_LEVEL_MONITOR_FETCH_ONLY_QUERY_FIELDS_ENABLED,
DestinationSettings.EMAIL_USERNAME,
DestinationSettings.EMAIL_PASSWORD,
DestinationSettings.ALLOW_LIST,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ import org.opensearch.alerting.model.MonitorRunResult
import org.opensearch.alerting.model.userErrorMessage
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.script.DocumentLevelTriggerExecutionContext
import org.opensearch.alerting.settings.AlertingSettings.Companion.PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT
import org.opensearch.alerting.settings.AlertingSettings.Companion.PERCOLATE_QUERY_MAX_NUM_DOCS_IN_MEMORY
import org.opensearch.alerting.util.AlertingException
import org.opensearch.alerting.util.IndexUtils
import org.opensearch.alerting.util.defaultToPerExecutionAction
Expand Down Expand Up @@ -236,6 +234,28 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
}
}

val fieldsToBeQueried = mutableSetOf<String>()
if (monitorCtx.fetchOnlyQueryFieldNames) {
for (it in queries) {
if (it.queryFieldNames.isEmpty()) {
fieldsToBeQueried.clear()
logger.debug(
"Monitor ${monitor.id} : " +
"Doc Level query ${it.id} : ${it.query} doesn't have queryFieldNames populated. " +
"Cannot optimize monitor to fetch only query-relevant fields. " +
"Querying entire doc source."
)
break
}
fieldsToBeQueried.addAll(it.queryFieldNames)
}
if (fieldsToBeQueried.isNotEmpty())
logger.debug(
"Monitor ${monitor.id} Querying only fields " +
"${fieldsToBeQueried.joinToString()} instead of entire _source of documents"
)
}

// Prepare DocumentExecutionContext for each index
val docExecutionContext = DocumentExecutionContext(queries, indexLastRunContext, indexUpdatedRunContext)

Expand All @@ -252,6 +272,7 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
docsToQueries,
updatedIndexNames,
concreteIndicesSeenSoFar,
ArrayList(fieldsToBeQueried)
)
}
}
Expand Down Expand Up @@ -683,6 +704,7 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
docsToQueries: MutableMap<String, MutableList<String>>,
monitorInputIndices: List<String>,
concreteIndices: List<String>,
fieldsToBeQueried: List<String>,
) {
val count: Int = docExecutionCtx.updatedLastRunContext["shards_count"] as Int
for (i: Int in 0 until count) {
Expand All @@ -697,8 +719,8 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
shard,
prevSeqNo,
maxSeqNo,
null,
docIds
docIds,
fieldsToBeQueried
)
val startTime = System.currentTimeMillis()
transformedDocs.addAll(
Expand Down Expand Up @@ -789,19 +811,15 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
shard: String,
prevSeqNo: Long?,
maxSeqNo: Long,
query: String?,
docIds: List<String>? = null,
fieldsToFetch: List<String>,
): SearchHits {
if (prevSeqNo?.equals(maxSeqNo) == true && maxSeqNo != 0L) {
return SearchHits.empty()
}
val boolQueryBuilder = BoolQueryBuilder()
boolQueryBuilder.filter(QueryBuilders.rangeQuery("_seq_no").gt(prevSeqNo).lte(maxSeqNo))

if (query != null) {
boolQueryBuilder.must(QueryBuilders.queryStringQuery(query))
}

if (!docIds.isNullOrEmpty()) {
boolQueryBuilder.filter(QueryBuilders.termsQuery("_id", docIds))
}
Expand All @@ -816,6 +834,13 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
.size(10000)
)
.preference(Preference.PRIMARY_FIRST.type())

if (monitorCtx.fetchOnlyQueryFieldNames && fieldsToFetch.isNotEmpty()) {
request.source().fetchSource(false)
for (field in fieldsToFetch) {
request.source().fetchField(field)
}
}
val response: SearchResponse = monitorCtx.client!!.suspendUntil { monitorCtx.client!!.search(request, it) }
if (response.status() !== RestStatus.OK) {
logger.error("Failed search shard. Response: $response")
Expand Down Expand Up @@ -906,7 +931,11 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
): List<Pair<String, TransformedDocDto>> {
return hits.mapNotNull(fun(hit: SearchHit): Pair<String, TransformedDocDto>? {
try {
val sourceMap = hit.sourceAsMap
val sourceMap = if (hit.hasSource()) {
hit.sourceAsMap
} else {
constructSourceMapFromFieldsInHit(hit)
}
transformDocumentFieldNames(
sourceMap,
conflictingFields,
Expand All @@ -927,6 +956,19 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
})
}

private fun constructSourceMapFromFieldsInHit(hit: SearchHit): MutableMap<String, Any> {
if (hit.fields == null)
return mutableMapOf()
val sourceMap: MutableMap<String, Any> = mutableMapOf()
for (field in hit.fields) {
if (field.value.values != null && field.value.values.isNotEmpty())
if (field.value.values.size == 1) {
sourceMap[field.key] = field.value.values[0]
} else sourceMap[field.key] = field.value.values
}
return sourceMap
}

/**
* Traverses document fields in leaves recursively and appends [fieldNameSuffixIndex] to field names with same names
* but different mappings & [fieldNameSuffixPattern] to field names which have unique names.
Expand Down Expand Up @@ -984,15 +1026,15 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
*
*/
private fun isInMemoryDocsSizeExceedingMemoryLimit(docsBytesSize: Long, monitorCtx: MonitorRunnerExecutionContext): Boolean {
var thresholdPercentage = PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT.get(monitorCtx.settings)
var thresholdPercentage = monitorCtx.percQueryDocsSizeMemoryPercentageLimit
val heapMaxBytes = monitorCtx.jvmStats!!.mem.heapMax.bytes
val thresholdBytes = (thresholdPercentage.toDouble() / 100.0) * heapMaxBytes

return docsBytesSize > thresholdBytes
}

private fun isInMemoryNumDocsExceedingMaxDocsPerPercolateQueryLimit(numDocs: Int, monitorCtx: MonitorRunnerExecutionContext): Boolean {
var maxNumDocsThreshold = PERCOLATE_QUERY_MAX_NUM_DOCS_IN_MEMORY.get(monitorCtx.settings)
var maxNumDocsThreshold = monitorCtx.percQueryMaxNumDocsInMemory
return numDocs >= maxNumDocsThreshold
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,5 +50,9 @@ data class MonitorRunnerExecutionContext(

@Volatile var maxActionableAlertCount: Long = AlertingSettings.DEFAULT_MAX_ACTIONABLE_ALERT_COUNT,
@Volatile var indexTimeout: TimeValue? = null,
@Volatile var findingsIndexBatchSize: Int = AlertingSettings.DEFAULT_FINDINGS_INDEXING_BATCH_SIZE
@Volatile var findingsIndexBatchSize: Int = AlertingSettings.DEFAULT_FINDINGS_INDEXING_BATCH_SIZE,
@Volatile var fetchOnlyQueryFieldNames: Boolean = true,
@Volatile var percQueryMaxNumDocsInMemory: Int = AlertingSettings.DEFAULT_PERCOLATE_QUERY_NUM_DOCS_IN_MEMORY,
@Volatile var percQueryDocsSizeMemoryPercentageLimit: Int =
AlertingSettings.DEFAULT_PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT,
)
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,14 @@ import org.opensearch.alerting.script.TriggerExecutionContext
import org.opensearch.alerting.settings.AlertingSettings
import org.opensearch.alerting.settings.AlertingSettings.Companion.ALERT_BACKOFF_COUNT
import org.opensearch.alerting.settings.AlertingSettings.Companion.ALERT_BACKOFF_MILLIS
import org.opensearch.alerting.settings.AlertingSettings.Companion.DOC_LEVEL_MONITOR_FETCH_ONLY_QUERY_FIELDS_ENABLED
import org.opensearch.alerting.settings.AlertingSettings.Companion.FINDINGS_INDEXING_BATCH_SIZE
import org.opensearch.alerting.settings.AlertingSettings.Companion.INDEX_TIMEOUT
import org.opensearch.alerting.settings.AlertingSettings.Companion.MAX_ACTIONABLE_ALERT_COUNT
import org.opensearch.alerting.settings.AlertingSettings.Companion.MOVE_ALERTS_BACKOFF_COUNT
import org.opensearch.alerting.settings.AlertingSettings.Companion.MOVE_ALERTS_BACKOFF_MILLIS
import org.opensearch.alerting.settings.AlertingSettings.Companion.PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT
import org.opensearch.alerting.settings.AlertingSettings.Companion.PERCOLATE_QUERY_MAX_NUM_DOCS_IN_MEMORY
import org.opensearch.alerting.settings.DestinationSettings.Companion.ALLOW_LIST
import org.opensearch.alerting.settings.DestinationSettings.Companion.HOST_DENY_LIST
import org.opensearch.alerting.settings.DestinationSettings.Companion.loadDestinationSettings
Expand Down Expand Up @@ -182,6 +185,23 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
monitorCtx.findingsIndexBatchSize = it
}

monitorCtx.fetchOnlyQueryFieldNames = DOC_LEVEL_MONITOR_FETCH_ONLY_QUERY_FIELDS_ENABLED.get(monitorCtx.settings)
monitorCtx.clusterService!!.clusterSettings.addSettingsUpdateConsumer(DOC_LEVEL_MONITOR_FETCH_ONLY_QUERY_FIELDS_ENABLED) {
monitorCtx.fetchOnlyQueryFieldNames = it
}

monitorCtx.percQueryMaxNumDocsInMemory = PERCOLATE_QUERY_MAX_NUM_DOCS_IN_MEMORY.get(monitorCtx.settings)
monitorCtx.clusterService!!.clusterSettings.addSettingsUpdateConsumer(PERCOLATE_QUERY_MAX_NUM_DOCS_IN_MEMORY) {
monitorCtx.percQueryMaxNumDocsInMemory = it
}

monitorCtx.percQueryDocsSizeMemoryPercentageLimit =
PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT.get(monitorCtx.settings)
monitorCtx.clusterService!!.clusterSettings
.addSettingsUpdateConsumer(PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT) {
monitorCtx.percQueryDocsSizeMemoryPercentageLimit = it
}

return this
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ class AlertingSettings {
companion object {
const val DEFAULT_MAX_ACTIONABLE_ALERT_COUNT = 50L
const val DEFAULT_FINDINGS_INDEXING_BATCH_SIZE = 1000
const val DEFAULT_PERCOLATE_QUERY_NUM_DOCS_IN_MEMORY = 50000
const val DEFAULT_PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT = 10

val ALERTING_MAX_MONITORS = Setting.intSetting(
"plugins.alerting.monitor.max_monitors",
Expand All @@ -44,7 +46,17 @@ class AlertingSettings {
*/
val PERCOLATE_QUERY_MAX_NUM_DOCS_IN_MEMORY = Setting.intSetting(
"plugins.alerting.monitor.percolate_query_max_num_docs_in_memory",
300000, 1000,
DEFAULT_PERCOLATE_QUERY_NUM_DOCS_IN_MEMORY, 1000,
Setting.Property.NodeScope, Setting.Property.Dynamic
)

/**
* Boolean setting to enable/disable optimizing doc level monitors by fetchign only fields mentioned in queries.
* Enabled by default. If disabled, will fetch entire source of documents while fetch data from shards.
*/
val DOC_LEVEL_MONITOR_FETCH_ONLY_QUERY_FIELDS_ENABLED = Setting.boolSetting(
"plugins.alerting.monitor.doc_level_monitor_query_field_names_enabled",
true,
Setting.Property.NodeScope, Setting.Property.Dynamic
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import org.apache.hc.core5.http.io.entity.StringEntity
import org.opensearch.action.search.SearchResponse
import org.opensearch.alerting.alerts.AlertIndices.Companion.ALL_ALERT_INDEX_PATTERN
import org.opensearch.alerting.alerts.AlertIndices.Companion.ALL_FINDING_INDEX_PATTERN
import org.opensearch.alerting.settings.AlertingSettings
import org.opensearch.client.Response
import org.opensearch.client.ResponseException
import org.opensearch.common.xcontent.json.JsonXContent
Expand Down Expand Up @@ -75,6 +76,152 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() {
assertEquals("Alert saved for test monitor", 0, alerts.size)
}

fun `test dryrun execute monitor with queryFieldNames set up with correct field`() {

val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
val testDoc = """{
"message" : "This is an error from IAD region",
"test_strict_date_time" : "$testTime",
"test_field" : "us-west-2"
}"""

val index = createTestIndex()

val docQuery =
DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3", fields = listOf(), queryFieldNames = listOf("test_field"))
val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery))

val action = randomAction(template = randomTemplateScript("Hello {{ctx.monitor.name}}"), destinationId = createDestination().id)
val monitor = randomDocumentLevelMonitor(
inputs = listOf(docLevelInput),
triggers = listOf(randomDocumentLevelTrigger(condition = ALWAYS_RUN, actions = listOf(action)))
)

indexDoc(index, "1", testDoc)

val response = executeMonitor(monitor, params = DRYRUN_MONITOR)

val output = entityAsMap(response)
assertEquals(monitor.name, output["monitor_name"])

assertEquals(1, output.objectMap("trigger_results").values.size)

for (triggerResult in output.objectMap("trigger_results").values) {
assertEquals(1, triggerResult.objectMap("action_results").values.size)
for (alertActionResult in triggerResult.objectMap("action_results").values) {
for (actionResult in alertActionResult.values) {
@Suppress("UNCHECKED_CAST") val actionOutput = (actionResult as Map<String, Map<String, String>>)["output"]
as Map<String, String>
assertEquals("Hello ${monitor.name}", actionOutput["subject"])
assertEquals("Hello ${monitor.name}", actionOutput["message"])
}
}
}

val alerts = searchAlerts(monitor)
assertEquals("Alert saved for test monitor", 0, alerts.size)
}

fun `test dryrun execute monitor with queryFieldNames set up with wrong field`() {

val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
val testDoc = """{
"message" : "This is an error from IAD region",
"test_strict_date_time" : "$testTime",
"test_field" : "us-west-2"
}"""

val index = createTestIndex()
// using wrong field name
val docQuery = DocLevelQuery(
query = "test_field:\"us-west-2\"",
name = "3",
fields = listOf(),
queryFieldNames = listOf("wrong_field")
)
val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery))

val action = randomAction(template = randomTemplateScript("Hello {{ctx.monitor.name}}"), destinationId = createDestination().id)
val monitor = randomDocumentLevelMonitor(
inputs = listOf(docLevelInput),
triggers = listOf(randomDocumentLevelTrigger(condition = ALWAYS_RUN, actions = listOf(action)))
)

indexDoc(index, "1", testDoc)

val response = executeMonitor(monitor, params = DRYRUN_MONITOR)

val output = entityAsMap(response)
assertEquals(monitor.name, output["monitor_name"])

assertEquals(1, output.objectMap("trigger_results").values.size)

for (triggerResult in output.objectMap("trigger_results").values) {
assertEquals(0, triggerResult.objectMap("action_results").values.size)
for (alertActionResult in triggerResult.objectMap("action_results").values) {
for (actionResult in alertActionResult.values) {
@Suppress("UNCHECKED_CAST") val actionOutput = (actionResult as Map<String, Map<String, String>>)["output"]
as Map<String, String>
assertEquals("Hello ${monitor.name}", actionOutput["subject"])
assertEquals("Hello ${monitor.name}", actionOutput["message"])
}
}
}

val alerts = searchAlerts(monitor)
assertEquals("Alert saved for test monitor", 0, alerts.size)
}

fun `test fetch_query_field_names setting is disabled by configuring queryFieldNames set up with wrong field still works`() {
adminClient().updateSettings(AlertingSettings.DOC_LEVEL_MONITOR_FETCH_ONLY_QUERY_FIELDS_ENABLED.key, "false")
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
val testDoc = """{
"message" : "This is an error from IAD region",
"test_strict_date_time" : "$testTime",
"test_field" : "us-west-2"
}"""

val index = createTestIndex()
// using wrong field name
val docQuery = DocLevelQuery(
query = "test_field:\"us-west-2\"",
name = "3",
fields = listOf(),
queryFieldNames = listOf("wrong_field")
)
val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery))

val action = randomAction(template = randomTemplateScript("Hello {{ctx.monitor.name}}"), destinationId = createDestination().id)
val monitor = randomDocumentLevelMonitor(
inputs = listOf(docLevelInput),
triggers = listOf(randomDocumentLevelTrigger(condition = ALWAYS_RUN, actions = listOf(action)))
)

indexDoc(index, "1", testDoc)

val response = executeMonitor(monitor, params = DRYRUN_MONITOR)

val output = entityAsMap(response)
assertEquals(monitor.name, output["monitor_name"])

assertEquals(1, output.objectMap("trigger_results").values.size)

for (triggerResult in output.objectMap("trigger_results").values) {
assertEquals(1, triggerResult.objectMap("action_results").values.size)
for (alertActionResult in triggerResult.objectMap("action_results").values) {
for (actionResult in alertActionResult.values) {
@Suppress("UNCHECKED_CAST") val actionOutput = (actionResult as Map<String, Map<String, String>>)["output"]
as Map<String, String>
assertEquals("Hello ${monitor.name}", actionOutput["subject"])
assertEquals("Hello ${monitor.name}", actionOutput["message"])
}
}
}

val alerts = searchAlerts(monitor)
assertEquals("Alert saved for test monitor", 0, alerts.size)
}

fun `test execute monitor returns search result with dryrun`() {
val testIndex = createTestIndex()
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
Expand Down
Loading
Loading