Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add primary first calls for different monitor types #1205

Merged
merged 2 commits into from
Oct 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.opensearch.alerting.workflow.WorkflowRunContext
import org.opensearch.client.Client
import org.opensearch.client.node.NodeClient
import org.opensearch.cluster.metadata.IndexMetadata
import org.opensearch.cluster.routing.Preference
import org.opensearch.cluster.routing.ShardRouting
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.xcontent.XContentFactory
Expand Down Expand Up @@ -641,6 +642,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
.query(boolQueryBuilder)
.size(10000) // fixme: make this configurable.
)
.preference(Preference.PRIMARY_FIRST.type())
val response: SearchResponse = monitorCtx.client!!.suspendUntil { monitorCtx.client!!.search(request, it) }
if (response.status() !== RestStatus.OK) {
throw IOException("Failed to search shard: $shard")
Expand Down Expand Up @@ -673,7 +675,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
OpenSearchStatusException(message, RestStatus.INTERNAL_SERVER_ERROR)
)
}
val searchRequest = SearchRequest(queryIndex)
val searchRequest = SearchRequest(queryIndex).preference(Preference.PRIMARY_FIRST.type())
val searchSourceBuilder = SearchSourceBuilder()
searchSourceBuilder.query(boolQueryBuilder)
searchRequest.source(searchSourceBuilder)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import org.opensearch.alerting.util.getRoleFilterEnabled
import org.opensearch.alerting.util.use
import org.opensearch.alerting.workflow.WorkflowRunContext
import org.opensearch.client.Client
import org.opensearch.cluster.routing.Preference
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.io.stream.BytesStreamOutput
import org.opensearch.common.settings.Settings
Expand Down Expand Up @@ -100,7 +101,9 @@ class InputService(
.newInstance(searchParams)
.execute()

val searchRequest = SearchRequest().indices(*input.indices.toTypedArray())
val searchRequest = SearchRequest()
.indices(*input.indices.toTypedArray())
.preference(Preference.PRIMARY_FIRST.type())
XContentType.JSON.xContent().createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, searchSource).use {
searchRequest.source(SearchSourceBuilder.fromXContent(it))
}
Expand Down Expand Up @@ -192,7 +195,9 @@ class InputService(
.newInstance(searchParams)
.execute()

val searchRequest = SearchRequest().indices(*input.indices.toTypedArray())
val searchRequest = SearchRequest()
.indices(*input.indices.toTypedArray())
.preference(Preference.PRIMARY_FIRST.type())
XContentType.JSON.xContent().createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, searchSource).use {
searchRequest.source(SearchSourceBuilder.fromXContent(it))
}
Expand Down
Loading