diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorMetadataService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorMetadataService.kt index 5bbb32718..cfa10431c 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorMetadataService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorMetadataService.kt @@ -12,6 +12,7 @@ import kotlinx.coroutines.SupervisorJob import org.apache.logging.log4j.LogManager import org.opensearch.ExceptionsHelper import org.opensearch.OpenSearchSecurityException +import org.opensearch.OpenSearchStatusException import org.opensearch.action.DocWriteRequest import org.opensearch.action.DocWriteResponse import org.opensearch.action.admin.indices.get.GetIndexRequest @@ -78,35 +79,51 @@ object MonitorMetadataService : @Suppress("ComplexMethod", "ReturnCount") suspend fun upsertMetadata(metadata: MonitorMetadata, updating: Boolean): MonitorMetadata { try { - val indexRequest = IndexRequest(ScheduledJob.SCHEDULED_JOBS_INDEX) - .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) - .source(metadata.toXContent(XContentFactory.jsonBuilder(), ToXContent.MapParams(mapOf("with_type" to "true")))) - .id(metadata.id) - .routing(metadata.monitorId) - .setIfSeqNo(metadata.seqNo) - .setIfPrimaryTerm(metadata.primaryTerm) - .timeout(indexTimeout) + if (clusterService.state().routingTable.hasIndex(ScheduledJob.SCHEDULED_JOBS_INDEX)) { + val indexRequest = IndexRequest(ScheduledJob.SCHEDULED_JOBS_INDEX) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + .source( + metadata.toXContent( + XContentFactory.jsonBuilder(), + ToXContent.MapParams(mapOf("with_type" to "true")) + ) + ) + .id(metadata.id) + .routing(metadata.monitorId) + .setIfSeqNo(metadata.seqNo) + .setIfPrimaryTerm(metadata.primaryTerm) + .timeout(indexTimeout) - if (updating) { - indexRequest.id(metadata.id).setIfSeqNo(metadata.seqNo).setIfPrimaryTerm(metadata.primaryTerm) - } else { - indexRequest.opType(DocWriteRequest.OpType.CREATE) - } - val response: IndexResponse = client.suspendUntil { index(indexRequest, it) } - when (response.result) { - DocWriteResponse.Result.DELETED, DocWriteResponse.Result.NOOP, DocWriteResponse.Result.NOT_FOUND, null -> { - val failureReason = "The upsert metadata call failed with a ${response.result?.lowercase} result" - log.error(failureReason) - throw AlertingException(failureReason, RestStatus.INTERNAL_SERVER_ERROR, IllegalStateException(failureReason)) + if (updating) { + indexRequest.id(metadata.id).setIfSeqNo(metadata.seqNo).setIfPrimaryTerm(metadata.primaryTerm) + } else { + indexRequest.opType(DocWriteRequest.OpType.CREATE) } - DocWriteResponse.Result.CREATED, DocWriteResponse.Result.UPDATED -> { - log.debug("Successfully upserted MonitorMetadata:${metadata.id} ") + val response: IndexResponse = client.suspendUntil { index(indexRequest, it) } + when (response.result) { + DocWriteResponse.Result.DELETED, DocWriteResponse.Result.NOOP, DocWriteResponse.Result.NOT_FOUND, null -> { + val failureReason = + "The upsert metadata call failed with a ${response.result?.lowercase} result" + log.error(failureReason) + throw AlertingException( + failureReason, + RestStatus.INTERNAL_SERVER_ERROR, + IllegalStateException(failureReason) + ) + } + + DocWriteResponse.Result.CREATED, DocWriteResponse.Result.UPDATED -> { + log.debug("Successfully upserted MonitorMetadata:${metadata.id} ") + } } + return metadata.copy( + seqNo = response.seqNo, + primaryTerm = response.primaryTerm + ) + } else { + val failureReason = "Job index ${ScheduledJob.SCHEDULED_JOBS_INDEX} does not exist to update monitor metadata" + throw OpenSearchStatusException(failureReason, RestStatus.INTERNAL_SERVER_ERROR) } - return metadata.copy( - seqNo = response.seqNo, - primaryTerm = response.primaryTerm - ) } catch (e: Exception) { throw AlertingException.wrap(e) }