Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added exception check once the .opendistro-alerting-config index is b… #650

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,15 @@ import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import org.apache.logging.log4j.LogManager
import org.opensearch.OpenSearchException
import org.opensearch.OpenSearchSecurityException
import org.opensearch.OpenSearchStatusException
import org.opensearch.ResourceAlreadyExistsException
import org.opensearch.action.ActionListener
import org.opensearch.action.ActionRequest
import org.opensearch.action.admin.cluster.health.ClusterHealthAction
import org.opensearch.action.admin.cluster.health.ClusterHealthRequest
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse
import org.opensearch.action.admin.indices.create.CreateIndexResponse
import org.opensearch.action.admin.indices.get.GetIndexRequest
import org.opensearch.action.admin.indices.get.GetIndexResponse
Expand Down Expand Up @@ -297,10 +302,30 @@ class TransportIndexMonitorAction @Inject constructor(
if (!scheduledJobIndices.scheduledJobIndexExists()) {
scheduledJobIndices.initScheduledJobIndex(object : ActionListener<CreateIndexResponse> {
override fun onResponse(response: CreateIndexResponse) {
onCreateMappingsResponse(response)
onCreateMappingsResponse(response.isAcknowledged)
}
override fun onFailure(t: Exception) {
actionListener.onFailure(AlertingException.wrap(t))
// https://github.com/opensearch-project/alerting/issues/646
if (t is ResourceAlreadyExistsException && t.message?.contains("already exists") == true) {
scope.launch {
// Wait for the yellow status
val request = ClusterHealthRequest()
.indices(SCHEDULED_JOBS_INDEX)
.waitForYellowStatus()
val response: ClusterHealthResponse = client.suspendUntil {
execute(ClusterHealthAction.INSTANCE, request, it)
}
if (response.isTimedOut) {
actionListener.onFailure(
OpenSearchException("Cannot determine that the $SCHEDULED_JOBS_INDEX index is healthy")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be Alerting exception too?

)
}
// Retry mapping of monitor
onCreateMappingsResponse(true)
}
} else {
actionListener.onFailure(AlertingException.wrap(t))
}
}
})
} else if (!IndexUtils.scheduledJobIndexUpdated) {
Expand Down Expand Up @@ -346,6 +371,7 @@ class TransportIndexMonitorAction @Inject constructor(
val query = QueryBuilders.boolQuery().filter(QueryBuilders.termQuery("${Monitor.MONITOR_TYPE}.type", Monitor.MONITOR_TYPE))
val searchSource = SearchSourceBuilder().query(query).timeout(requestTimeout)
val searchRequest = SearchRequest(SCHEDULED_JOBS_INDEX).source(searchSource)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

plz revert this new line

client.search(
searchRequest,
object : ActionListener<SearchResponse> {
Expand Down Expand Up @@ -401,8 +427,8 @@ class TransportIndexMonitorAction @Inject constructor(
}
}

private fun onCreateMappingsResponse(response: CreateIndexResponse) {
if (response.isAcknowledged) {
private fun onCreateMappingsResponse(isAcknowledged: Boolean) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why did we make this change?

if (isAcknowledged) {
log.info("Created $SCHEDULED_JOBS_INDEX with mappings.")
prepareMonitorIndexing()
IndexUtils.scheduledJobIndexUpdated()
Expand Down