From 16cb3990470854c98aeb3496297695824a6b3473 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Sat, 14 Oct 2023 03:13:37 +0000 Subject: [PATCH] Set the rollover action to idempotent (#986) (#990) (cherry picked from commit f543d930ab698157967e38d9e474717d2ea45bd4) Signed-off-by: bowenlan-amzn Signed-off-by: github-actions[bot] Co-authored-by: github-actions[bot] (cherry picked from commit 4dc79977fe6786e262b2fb4753e9f0516439914c) Signed-off-by: github-actions[bot] --- .../step/rollover/AttemptRolloverStep.kt | 2 +- .../action/RolloverActionIT.kt | 62 +++++++++++++++++++ 2 files changed, 63 insertions(+), 1 deletion(-) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/rollover/AttemptRolloverStep.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/rollover/AttemptRolloverStep.kt index 663e1bee2..4c7356e07 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/rollover/AttemptRolloverStep.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/rollover/AttemptRolloverStep.kt @@ -366,7 +366,7 @@ class AttemptRolloverStep(private val action: RolloverAction) : Step(name) { info = mutableInfo.toMap() } - override fun isIdempotent(): Boolean = false + override fun isIdempotent(): Boolean = true @Suppress("TooManyFunctions") companion object { diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/RolloverActionIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/RolloverActionIT.kt index a2d8b8553..c5569dd9c 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/RolloverActionIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/RolloverActionIT.kt @@ -27,6 +27,7 @@ import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionRetry import org.opensearch.indexmanagement.waitFor import org.opensearch.rest.RestRequest import org.opensearch.core.rest.RestStatus +import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.INDEX_MANAGEMENT_INDEX import org.opensearch.test.OpenSearchTestCase import java.time.Instant import java.time.temporal.ChronoUnit @@ -733,4 +734,65 @@ class RolloverActionIT : IndexStateManagementRestTestCase() { Assert.assertEquals(alias.containsKey("test_alias2"), true) Assert.assertEquals(alias.containsKey("test_alias3"), true) } + + fun `test rollover detects transient failure and continues executing`() { + val aliasName = "${testIndexName}_alias" + val indexNameBase = "${testIndexName}_index" + val firstIndex = "$indexNameBase-1" + val policyID = "${testIndexName}_testPolicyName_1" + val actionConfig = RolloverAction(null, 1, null, null, false, 0) + val states = listOf(State(name = "RolloverAction", actions = listOf(actionConfig), transitions = listOf())) + val policy = Policy( + id = policyID, + description = "$testIndexName description", + schemaVersion = 1L, + lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS), + errorNotification = randomErrorNotification(), + defaultState = states[0].name, + states = states + ) + + createPolicy(policy, policyID) + createIndex(firstIndex, policyID, aliasName) + + val managedIndexConfig = getExistingManagedIndexConfig(firstIndex) + + // Change the start time so the job will trigger in 2 seconds, this will trigger the first initialization of the policy + updateManagedIndexConfigStartTime(managedIndexConfig) + waitFor { assertEquals(policyID, getExplainManagedIndexMetaData(firstIndex).policyID) } + + // Insert data to trigger rollover + insertSampleData(index = firstIndex, docCount = 5, delay = 0) + // Need to speed up to second execution where it will trigger the attempt rollover step + updateManagedIndexConfigStartTime(managedIndexConfig) + waitFor { + val info = getExplainManagedIndexMetaData(firstIndex).info as Map + assertEquals("Index did not rollover.", AttemptRolloverStep.getSuccessMessage(firstIndex), info["message"]) + } + // Manually produce transaction failure + val response = client().makeRequest( + "POST", "$INDEX_MANAGEMENT_INDEX/_update/${managedIndexConfig.id}%23metadata", + StringEntity( + "{\n" + + " \"script\": {\n" + + " \"source\": \"ctx._source.managed_index_metadata.step.step_status = params.step_status\",\n" + + " \"lang\": \"painless\",\n" + + " \"params\": {\n" + + " \"step_status\": \"starting\"\n" + + " }\n" + + " }\n" + + "}", + ContentType.APPLICATION_JSON + ) + ) + assertEquals("Request failed", RestStatus.OK, response.restStatus()) + + // Execute again to see the transaction failure + updateManagedIndexConfigStartTime(managedIndexConfig) + waitFor { + val metadata = getExplainManagedIndexMetaData(firstIndex) + assertEquals("Executing the wrong step", "attempt_rollover", metadata.stepMetaData?.name) + assertEquals("rollover step did not continue executing after detecting the transient failure.", Step.StepStatus.COMPLETED, metadata.stepMetaData?.stepStatus) + } + } }