Skip to content

Commit

Permalink
Fixing snapshot bug (#1257)
Browse files Browse the repository at this point in the history
Signed-off-by: Kshitij Tandon <tandonks@amazon.com>
Co-authored-by: bowenlan-amzn <bowenlan23@gmail.com>
(cherry picked from commit 809f85c)
  • Loading branch information
tandonks committed Sep 18, 2024
1 parent 4e86ce3 commit e5fde6f
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,19 @@ package org.opensearch.indexmanagement.indexstatemanagement.step.snapshot

import org.apache.logging.log4j.LogManager
import org.opensearch.ExceptionsHelper
import org.opensearch.action.admin.cluster.snapshots.status.SnapshotStatus
import org.opensearch.action.admin.cluster.snapshots.status.SnapshotsStatusRequest
import org.opensearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse
import org.opensearch.cluster.SnapshotsInProgress.State
import org.opensearch.action.admin.cluster.snapshots.get.GetSnapshotsRequest
import org.opensearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse
import org.opensearch.indexmanagement.indexstatemanagement.action.SnapshotAction
import org.opensearch.indexmanagement.opensearchapi.suspendUntil
import org.opensearch.indexmanagement.spi.indexstatemanagement.Step
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionProperties
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData
import org.opensearch.snapshots.SnapshotInfo
import org.opensearch.snapshots.SnapshotState
import org.opensearch.transport.RemoteTransportException

class WaitForSnapshotStep(private val action: SnapshotAction) : Step(name) {

private val logger = LogManager.getLogger(javaClass)
private var stepStatus = StepStatus.STARTING
private var info: Map<String, Any>? = null
Expand All @@ -34,30 +33,31 @@ class WaitForSnapshotStep(private val action: SnapshotAction) : Step(name) {

try {
val snapshotName = getSnapshotName(managedIndexMetadata, indexName) ?: return this
val request = SnapshotsStatusRequest()
val newRequest = GetSnapshotsRequest()
.snapshots(arrayOf(snapshotName))
.repository(repository)
val response: SnapshotsStatusResponse = context.client.admin().cluster().suspendUntil { snapshotsStatus(request, it) }
val status: SnapshotStatus? = response
.snapshots
.find { snapshotStatus ->
snapshotStatus.snapshot.snapshotId.name == snapshotName && snapshotStatus.snapshot.repository == repository
}
val response: GetSnapshotsResponse = context.client.admin().cluster().suspendUntil { getSnapshots(newRequest, it) }
val status: SnapshotInfo? =
response
.snapshots
.find { snapshotInfo ->
snapshotInfo.snapshotId().name == snapshotName
}
if (status != null) {
when (status.state) {
State.INIT, State.STARTED -> {
when (status.state()) {
SnapshotState.IN_PROGRESS -> {
stepStatus = StepStatus.CONDITION_NOT_MET
info = mapOf("message" to getSnapshotInProgressMessage(indexName), "state" to status.state.name)
info = mapOf("message" to getSnapshotInProgressMessage(indexName), "state" to status.state().toString())
}
State.SUCCESS -> {
SnapshotState.SUCCESS -> {
stepStatus = StepStatus.COMPLETED
info = mapOf("message" to getSuccessMessage(indexName), "state" to status.state.name)
info = mapOf("message" to getSuccessMessage(indexName), "state" to status.state().toString())
}
else -> { // State.FAILED, State.ABORTED
val message = getFailedExistsMessage(indexName)
logger.warn(message)
stepStatus = StepStatus.FAILED
info = mapOf("message" to message, "state" to status.state.name)
info = mapOf("message" to message, "state" to status.state().toString())
}
}
} else {
Expand Down Expand Up @@ -97,23 +97,26 @@ class WaitForSnapshotStep(private val action: SnapshotAction) : Step(name) {
return actionProperties.snapshotName
}

override fun getUpdatedManagedIndexMetadata(currentMetadata: ManagedIndexMetaData): ManagedIndexMetaData {
return currentMetadata.copy(
stepMetaData = StepMetaData(name, getStepStartTime(currentMetadata).toEpochMilli(), stepStatus),
transitionTo = null,
info = info,
)
}
override fun getUpdatedManagedIndexMetadata(currentMetadata: ManagedIndexMetaData): ManagedIndexMetaData = currentMetadata.copy(
stepMetaData = StepMetaData(name, getStepStartTime(currentMetadata).toEpochMilli(), stepStatus),
transitionTo = null,
info = info,
)

override fun isIdempotent(): Boolean = true

companion object {
const val name = "wait_for_snapshot"

fun getFailedMessage(index: String) = "Failed to get status of snapshot [index=$index]"

fun getFailedExistsMessage(index: String) = "Snapshot doesn't exist [index=$index]"

fun getFailedActionPropertiesMessage(index: String, actionProperties: ActionProperties?) =
"Unable to retrieve [${ActionProperties.Properties.SNAPSHOT_NAME.key}] from ActionProperties=$actionProperties [index=$index]"

fun getSuccessMessage(index: String) = "Successfully created snapshot [index=$index]"

fun getSnapshotInProgressMessage(index: String) = "Snapshot currently in progress [index=$index]"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,31 +11,30 @@ import com.nhaarman.mockitokotlin2.doReturn
import com.nhaarman.mockitokotlin2.mock
import com.nhaarman.mockitokotlin2.whenever
import kotlinx.coroutines.runBlocking
import org.opensearch.action.admin.cluster.snapshots.status.SnapshotStatus
import org.opensearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse
import org.opensearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse
import org.opensearch.client.AdminClient
import org.opensearch.client.Client
import org.opensearch.client.ClusterAdminClient
import org.opensearch.cluster.SnapshotsInProgress
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.settings.Settings
import org.opensearch.core.action.ActionListener
import org.opensearch.indexmanagement.indexstatemanagement.action.SnapshotAction
import org.opensearch.indexmanagement.indexstatemanagement.step.snapshot.WaitForSnapshotStep
import org.opensearch.indexmanagement.snapshotmanagement.mockInProgressSnapshotInfo
import org.opensearch.indexmanagement.snapshotmanagement.mockSnapshotInfo
import org.opensearch.indexmanagement.spi.indexstatemanagement.Step
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionMetaData
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionProperties
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext
import org.opensearch.jobscheduler.spi.utils.LockService
import org.opensearch.script.ScriptService
import org.opensearch.snapshots.Snapshot
import org.opensearch.snapshots.SnapshotId
import org.opensearch.snapshots.SnapshotInfo
import org.opensearch.snapshots.SnapshotState
import org.opensearch.test.OpenSearchTestCase
import org.opensearch.transport.RemoteTransportException

class WaitForSnapshotStepTests : OpenSearchTestCase() {

private val clusterService: ClusterService = mock()
private val scriptService: ScriptService = mock()
private val settings: Settings = Settings.EMPTY
Expand Down Expand Up @@ -71,13 +70,10 @@ class WaitForSnapshotStepTests : OpenSearchTestCase() {
}

fun `test snapshot status states`() {
val snapshotStatus: SnapshotStatus = mock()
val response: SnapshotsStatusResponse = mock()
whenever(response.snapshots).doReturn(listOf(snapshotStatus))
whenever(snapshotStatus.snapshot).doReturn(Snapshot("repo", SnapshotId("snapshot-name", "some_uuid")))
val snapshotInfo: SnapshotInfo = mockInProgressSnapshotInfo(snapshot)
val response: GetSnapshotsResponse = mock()
whenever(response.snapshots).doReturn(listOf(snapshotInfo))
val client = getClient(getAdminClient(getClusterAdminClient(response, null)))

whenever(snapshotStatus.state).doReturn(SnapshotsInProgress.State.INIT)
runBlocking {
val snapshotAction = SnapshotAction("repo", snapshot, 0)
val metadata = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, ActionMetaData(WaitForSnapshotStep.name, 1, 0, false, 0, null, ActionProperties(snapshotName = "snapshot-name")), null, null, null)
Expand All @@ -89,31 +85,34 @@ class WaitForSnapshotStepTests : OpenSearchTestCase() {
assertEquals("Did not get snapshot in progress message", WaitForSnapshotStep.getSnapshotInProgressMessage("test"), updatedManagedIndexMetaData.info!!["message"])
}

whenever(snapshotStatus.state).doReturn(SnapshotsInProgress.State.STARTED)
val snapshotInfo2: SnapshotInfo = mockSnapshotInfo(snapshot, SnapshotState.SUCCESS)
whenever(response.snapshots).doReturn(listOf(snapshotInfo2))
runBlocking {
val snapshotAction = SnapshotAction("repo", snapshot, 0)
val metadata = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, ActionMetaData(WaitForSnapshotStep.name, 1, 0, false, 0, null, ActionProperties(snapshotName = "snapshot-name")), null, null, null)
val step = WaitForSnapshotStep(snapshotAction)
val context = StepContext(metadata, clusterService, client, null, null, scriptService, settings, lockService)
step.preExecute(logger, context).execute()
val updatedManagedIndexMetaData = step.getUpdatedManagedIndexMetadata(metadata)
assertEquals("Step status is not CONDITION_NOT_MET", Step.StepStatus.CONDITION_NOT_MET, updatedManagedIndexMetaData.stepMetaData?.stepStatus)
assertEquals("Did not get snapshot in progress message", WaitForSnapshotStep.getSnapshotInProgressMessage("test"), updatedManagedIndexMetaData.info!!["message"])
assertEquals("Step status is not COMPLETED", Step.StepStatus.COMPLETED, updatedManagedIndexMetaData.stepMetaData?.stepStatus)
assertEquals("Did not get snapshot completed message", WaitForSnapshotStep.getSuccessMessage("test"), updatedManagedIndexMetaData.info!!["message"])
}

whenever(snapshotStatus.state).doReturn(SnapshotsInProgress.State.SUCCESS)
val snapshotInfo3: SnapshotInfo = mockSnapshotInfo(snapshot, SnapshotState.FAILED)
whenever(response.snapshots).doReturn(listOf(snapshotInfo3))
runBlocking {
val snapshotAction = SnapshotAction("repo", snapshot, 0)
val metadata = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, ActionMetaData(WaitForSnapshotStep.name, 1, 0, false, 0, null, ActionProperties(snapshotName = "snapshot-name")), null, null, null)
val step = WaitForSnapshotStep(snapshotAction)
val context = StepContext(metadata, clusterService, client, null, null, scriptService, settings, lockService)
step.preExecute(logger, context).execute()
val updatedManagedIndexMetaData = step.getUpdatedManagedIndexMetadata(metadata)
assertEquals("Step status is not COMPLETED", Step.StepStatus.COMPLETED, updatedManagedIndexMetaData.stepMetaData?.stepStatus)
assertEquals("Did not get snapshot completed message", WaitForSnapshotStep.getSuccessMessage("test"), updatedManagedIndexMetaData.info!!["message"])
assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus)
assertEquals("Did not get snapshot failed message", WaitForSnapshotStep.getFailedExistsMessage("test"), updatedManagedIndexMetaData.info!!["message"])
}

whenever(snapshotStatus.state).doReturn(SnapshotsInProgress.State.ABORTED)
val snapshotInfo4: SnapshotInfo = mockSnapshotInfo(snapshot, SnapshotState.PARTIAL)
whenever(response.snapshots).doReturn(listOf(snapshotInfo4))
runBlocking {
val snapshotAction = SnapshotAction("repo", snapshot, 0)
val metadata = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, ActionMetaData(WaitForSnapshotStep.name, 1, 0, false, 0, null, ActionProperties(snapshotName = "snapshot-name")), null, null, null)
Expand All @@ -125,7 +124,8 @@ class WaitForSnapshotStepTests : OpenSearchTestCase() {
assertEquals("Did not get snapshot failed message", WaitForSnapshotStep.getFailedExistsMessage("test"), updatedManagedIndexMetaData.info!!["message"])
}

whenever(snapshotStatus.state).doReturn(SnapshotsInProgress.State.FAILED)
val snapshotInfo5: SnapshotInfo = mockSnapshotInfo(snapshot, SnapshotState.INCOMPATIBLE)
whenever(response.snapshots).doReturn(listOf(snapshotInfo5))
runBlocking {
val snapshotAction = SnapshotAction("repo", snapshot, 0)
val metadata = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, ActionMetaData(WaitForSnapshotStep.name, 1, 0, false, 0, null, ActionProperties(snapshotName = "snapshot-name")), null, null, null)
Expand All @@ -139,10 +139,9 @@ class WaitForSnapshotStepTests : OpenSearchTestCase() {
}

fun `test snapshot not in response list`() {
val snapshotStatus: SnapshotStatus = mock()
val response: SnapshotsStatusResponse = mock()
whenever(response.snapshots).doReturn(listOf(snapshotStatus))
whenever(snapshotStatus.snapshot).doReturn(Snapshot("repo", SnapshotId("snapshot-different-name", "some_uuid")))
val snapshotInfo: SnapshotInfo = mockSnapshotInfo("snapshot-different-name")
val response: GetSnapshotsResponse = mock()
whenever(response.snapshots).doReturn(listOf(snapshotInfo))
val client = getClient(getAdminClient(getClusterAdminClient(response, null)))

runBlocking {
Expand Down Expand Up @@ -188,18 +187,20 @@ class WaitForSnapshotStepTests : OpenSearchTestCase() {
}

private fun getClient(adminClient: AdminClient): Client = mock { on { admin() } doReturn adminClient }

private fun getAdminClient(clusterAdminClient: ClusterAdminClient): AdminClient = mock { on { cluster() } doReturn clusterAdminClient }
private fun getClusterAdminClient(snapshotsStatusResponse: SnapshotsStatusResponse?, exception: Exception?): ClusterAdminClient {
assertTrue("Must provide one and only one response or exception", (snapshotsStatusResponse != null).xor(exception != null))

private fun getClusterAdminClient(getSnapshotsResponse: GetSnapshotsResponse?, exception: Exception?): ClusterAdminClient {
assertTrue("Must provide one and only one response or exception", (getSnapshotsResponse != null).xor(exception != null))
return mock {
doAnswer { invocationOnMock ->
val listener = invocationOnMock.getArgument<ActionListener<SnapshotsStatusResponse>>(1)
if (snapshotsStatusResponse != null) {
listener.onResponse(snapshotsStatusResponse)
val listener = invocationOnMock.getArgument<ActionListener<GetSnapshotsResponse>>(1)
if (getSnapshotsResponse != null) {
listener.onResponse(getSnapshotsResponse)
} else {
listener.onFailure(exception)
}
}.whenever(this.mock).snapshotsStatus(any(), any())
}.whenever(this.mock).getSnapshots(any(), any())
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule
import org.opensearch.snapshots.Snapshot
import org.opensearch.snapshots.SnapshotId
import org.opensearch.snapshots.SnapshotInfo
import org.opensearch.snapshots.SnapshotState
import org.opensearch.test.OpenSearchTestCase.randomAlphaOfLength
import org.opensearch.test.OpenSearchTestCase.randomBoolean
import org.opensearch.test.OpenSearchTestCase.randomIntBetween
Expand Down Expand Up @@ -169,11 +170,14 @@ fun randomSMState(): SMState = SMState.values()[randomIntBetween(0, SMState.valu

fun randomNotificationConfig(): NotificationConfig = NotificationConfig(randomChannel(), randomConditions())

fun randomConditions(): NotificationConfig.Conditions = NotificationConfig.Conditions(randomBoolean(), randomBoolean(), randomBoolean(), randomBoolean())
fun randomConditions(): NotificationConfig.Conditions =
NotificationConfig.Conditions(randomBoolean(), randomBoolean(), randomBoolean(), randomBoolean())

fun ToXContent.toJsonString(params: ToXContent.Params = ToXContent.EMPTY_PARAMS): String = this.toXContent(XContentFactory.jsonBuilder(), params).string()
fun ToXContent.toJsonString(params: ToXContent.Params = ToXContent.EMPTY_PARAMS): String =
this.toXContent(XContentFactory.jsonBuilder(), params).string()

fun ToXContent.toMap(params: ToXContent.Params = ToXContent.EMPTY_PARAMS): Map<String, Any> = this.toXContent(XContentFactory.jsonBuilder(), params).toMap()
fun ToXContent.toMap(params: ToXContent.Params = ToXContent.EMPTY_PARAMS): Map<String, Any> =
this.toXContent(XContentFactory.jsonBuilder(), params).toMap()

fun mockIndexResponse(status: RestStatus = RestStatus.OK): IndexResponse {
val indexResponse: IndexResponse = mock()
Expand Down Expand Up @@ -253,6 +257,18 @@ fun mockInProgressSnapshotInfo(
return SnapshotInfo(entry)
}

fun mockSnapshotInfo(
name: String = randomAlphaOfLength(10),
snapshotState: SnapshotState,
): SnapshotInfo {
return SnapshotInfo(
SnapshotId(name, UUIDs.randomBase64UUID()),
emptyList(),
emptyList(),
snapshotState,
)
}

fun mockGetSnapshotResponse(num: Int): GetSnapshotsResponse {
val getSnapshotsRes: GetSnapshotsResponse = mock()
whenever(getSnapshotsRes.snapshots).doReturn(mockSnapshotInfoList(num))
Expand All @@ -271,4 +287,5 @@ fun mockSnapshotInfoList(num: Int, namePrefix: String = randomAlphaOfLength(10))
return result.toList()
}

fun String.parser(): XContentParser = XContentType.JSON.xContent().createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, this)
fun String.parser(): XContentParser =
XContentType.JSON.xContent().createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, this)

0 comments on commit e5fde6f

Please sign in to comment.