From 418c6c70fef61c535348eadb3dba31261c8c666c Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Mon, 2 Oct 2023 20:14:32 +0000 Subject: [PATCH] Add primary first calls for different monitor types (#1205) Signed-off-by: Ashish Agrawal (cherry picked from commit 2197d46eceab683da4f3b9a4d290723725247f42) Signed-off-by: github-actions[bot] --- .../opensearch/alerting/DocumentLevelMonitorRunner.kt | 4 +++- .../main/kotlin/org/opensearch/alerting/InputService.kt | 9 +++++++-- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt index 35630aa51..a4b9310d6 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt @@ -30,6 +30,7 @@ import org.opensearch.alerting.workflow.WorkflowRunContext import org.opensearch.client.Client import org.opensearch.client.node.NodeClient import org.opensearch.cluster.metadata.IndexMetadata +import org.opensearch.cluster.routing.Preference import org.opensearch.cluster.routing.ShardRouting import org.opensearch.cluster.service.ClusterService import org.opensearch.common.xcontent.XContentFactory @@ -642,6 +643,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { .query(boolQueryBuilder) .size(10000) // fixme: make this configurable. ) + .preference(Preference.PRIMARY_FIRST.type()) val response: SearchResponse = monitorCtx.client!!.suspendUntil { monitorCtx.client!!.search(request, it) } if (response.status() !== RestStatus.OK) { throw IOException("Failed to search shard: $shard") @@ -674,7 +676,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { OpenSearchStatusException(message, RestStatus.INTERNAL_SERVER_ERROR) ) } - val searchRequest = SearchRequest(queryIndex) + val searchRequest = SearchRequest(queryIndex).preference(Preference.PRIMARY_FIRST.type()) val searchSourceBuilder = SearchSourceBuilder() searchSourceBuilder.query(boolQueryBuilder) searchRequest.source(searchSourceBuilder) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt index 38e86d594..b31e21d5f 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt @@ -19,6 +19,7 @@ import org.opensearch.alerting.util.clusterMetricsMonitorHelpers.toMap import org.opensearch.alerting.util.getRoleFilterEnabled import org.opensearch.alerting.workflow.WorkflowRunContext import org.opensearch.client.Client +import org.opensearch.cluster.routing.Preference import org.opensearch.cluster.service.ClusterService import org.opensearch.common.io.stream.BytesStreamOutput import org.opensearch.common.settings.Settings @@ -99,7 +100,9 @@ class InputService( .newInstance(searchParams) .execute() - val searchRequest = SearchRequest().indices(*input.indices.toTypedArray()) + val searchRequest = SearchRequest() + .indices(*input.indices.toTypedArray()) + .preference(Preference.PRIMARY_FIRST.type()) XContentType.JSON.xContent().createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, searchSource).use { searchRequest.source(SearchSourceBuilder.fromXContent(it)) } @@ -191,7 +194,9 @@ class InputService( .newInstance(searchParams) .execute() - val searchRequest = SearchRequest().indices(*input.indices.toTypedArray()) + val searchRequest = SearchRequest() + .indices(*input.indices.toTypedArray()) + .preference(Preference.PRIMARY_FIRST.type()) XContentType.JSON.xContent().createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, searchSource).use { searchRequest.source(SearchSourceBuilder.fromXContent(it)) }