Skip to content

Commit

Permalink
Refactor change policy API and the policy in managed index to be non-…
Browse files Browse the repository at this point in the history
…null (opensearch-project#967)

* Refactor the policy to be non null in managed index config

Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com>

* Update

Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com>

* fix bug

Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com>

---------

Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com>
  • Loading branch information
bowenlan-amzn authored and Joshua152 committed Dec 22, 2023
1 parent fe1a859 commit ebd2fe5
Show file tree
Hide file tree
Showing 15 changed files with 145 additions and 204 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import org.opensearch.indexmanagement.opensearchapi.suspendUntil
import org.opensearch.indexmanagement.spi.indexstatemanagement.IndexMetadataService
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ISMIndexMetadata

class DefaultIndexMetadataService(val customUUIDSetting: String? = null) : IndexMetadataService {
class DefaultIndexMetadataService(private val customUUIDSetting: String? = null) : IndexMetadataService {

/**
* Returns the default index metadata needed for ISM
Expand All @@ -39,7 +39,7 @@ class DefaultIndexMetadataService(val customUUIDSetting: String? = null) : Index

response.state.metadata.indices.forEach {
// TODO waiting to add document count until it is definitely needed
val uuid = getCustomIndexUUID(it.value)
val uuid = getIndexUUID(it.value)
val indexMetadata = ISMIndexMetadata(uuid, it.value.creationDate, -1)
indexNameToMetadata[it.key] = indexMetadata
}
Expand All @@ -48,11 +48,10 @@ class DefaultIndexMetadataService(val customUUIDSetting: String? = null) : Index
}

/*
* If an extension wants Index Management to determine cluster state indices UUID based on a custom index setting if
* present of cluster state, the extension will override this customUUID setting. This allows an index to migrate off
* cluster and back while using this persistent uuid.
* This method prioritize the custom index setting provided by extension to decide the index UUID
* Custom index UUID is needed when index moved out of cluster and re-attach back, it will get a new UUID in cluster metadata
*/
fun getCustomIndexUUID(indexMetadata: IndexMetadata): String {
fun getIndexUUID(indexMetadata: IndexMetadata): String {
return if (customUUIDSetting != null) {
indexMetadata.settings.get(customUUIDSetting, indexMetadata.indexUUID)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ class ManagedIndexCoordinator(
// If there is a custom index uuid associated with the index, we do not auto manage it
// This is because cold index uses custom uuid, and we do not auto manage cold-to-warm index
val indexMetadata = clusterState.metadata.index(indexName)
val wasOffCluster = defaultIndexMetadataService.getCustomIndexUUID(indexMetadata) != indexMetadata.indexUUID
val wasOffCluster = defaultIndexMetadataService.getIndexUUID(indexMetadata) != indexMetadata.indexUUID
val ismIndexMetadata = ismIndicesMetadata[indexName]
// We try to find lookup name instead of using index name as datastream indices need the alias to match policy
val lookupName = findIndexLookupName(indexName, clusterState)
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ data class ManagedIndexConfig(
val policyID: String,
val policySeqNo: Long?,
val policyPrimaryTerm: Long?,
val policy: Policy?,
val policy: Policy,
val changePolicy: ChangePolicy?,
val jobJitter: Double?
) : ScheduledJobParameter {
Expand Down Expand Up @@ -177,11 +177,13 @@ data class ManagedIndexConfig(
policyID = requireNotNull(policyID) { "ManagedIndexConfig policy id is null" },
policySeqNo = policySeqNo,
policyPrimaryTerm = policyPrimaryTerm,
policy = policy?.copy(
id = policyID,
seqNo = policySeqNo ?: SequenceNumbers.UNASSIGNED_SEQ_NO,
primaryTerm = policyPrimaryTerm ?: SequenceNumbers.UNASSIGNED_PRIMARY_TERM
),
policy = requireNotNull(
policy?.copy(
id = policyID,
seqNo = policySeqNo ?: SequenceNumbers.UNASSIGNED_SEQ_NO,
primaryTerm = policyPrimaryTerm ?: SequenceNumbers.UNASSIGNED_PRIMARY_TERM
)
) { "ManagedIndexConfig policy is null" },
changePolicy = changePolicy,
jobJitter = jitter
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ fun getUuidsForClosedIndices(state: ClusterState, defaultIndexMetadataService: D
indexMetadatas.forEach {
// it.key is index name
if (it.value.state == IndexMetadata.State.CLOSE) {
closeList.add(defaultIndexMetadataService.getCustomIndexUUID(it.value))
closeList.add(defaultIndexMetadataService.getIndexUUID(it.value))
}
}
return closeList
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,16 @@ package org.opensearch.indexmanagement.indexstatemanagement.settings

import org.opensearch.common.settings.Setting
import org.opensearch.common.unit.TimeValue
import org.opensearch.indexmanagement.indexstatemanagement.ISMActionsParser
import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.DEFAULT_ISM_ENABLED
import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.DEFAULT_JOB_INTERVAL
import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.ALLOW_LIST_ALL
import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.SNAPSHOT_DENY_LIST_NONE
import java.util.concurrent.TimeUnit
import java.util.function.Function

@Suppress("UtilityClassWithPublicConstructor")
class LegacyOpenDistroManagedIndexSettings {
companion object {
const val DEFAULT_ISM_ENABLED = true
const val DEFAULT_JOB_INTERVAL = 5
private val ALLOW_LIST_ALL = ISMActionsParser.instance.parsers.map { it.getActionType() }.toList()
val ALLOW_LIST_NONE = emptyList<String>()
val SNAPSHOT_DENY_LIST_NONE = emptyList<String>()

val INDEX_STATE_MANAGEMENT_ENABLED: Setting<Boolean> = Setting.boolSetting(
"opendistro.index_state_management.enabled",
DEFAULT_ISM_ENABLED,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package org.opensearch.indexmanagement.indexstatemanagement.settings
import org.opensearch.common.settings.Setting
import org.opensearch.common.unit.TimeValue
import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.INDEX_MANAGEMENT_INDEX
import org.opensearch.indexmanagement.indexstatemanagement.ISMActionsParser
import java.util.function.Function

@Suppress("UtilityClassWithPublicConstructor")
Expand All @@ -19,6 +20,7 @@ class ManagedIndexSettings {
const val DEFAULT_JITTER = 0.6
const val DEFAULT_RESTRICTED_PATTERN = "\\.opendistro_security|\\.kibana.*|\\$INDEX_MANAGEMENT_INDEX"
val ALLOW_LIST_NONE = emptyList<String>()
val ALLOW_LIST_ALL = ISMActionsParser.instance.parsers.map { it.getActionType() }.toList()
val SNAPSHOT_DENY_LIST_NONE = emptyList<String>()

val INDEX_STATE_MANAGEMENT_ENABLED: Setting<Boolean> = Setting.boolSetting(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ class TransportChangePolicyAction @Inject constructor(
val clusterState = response.state
val defaultIndexMetadataService = indexMetadataProvider.services[DEFAULT_INDEX_TYPE] as DefaultIndexMetadataService
clusterState.metadata.indices.forEach {
val indexUUID = defaultIndexMetadataService.getCustomIndexUUID(it.value)
val indexUUID = defaultIndexMetadataService.getIndexUUID(it.value)
indexUuidToIndexMetadata[indexUUID] = it.value
}
// ISMIndexMetadata from the default index metadata service uses lenient expand, we want to use strict expand, filter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,10 +201,10 @@ class TransportExplainAction @Inject constructor(
"enabled" to managedIndex.enabled.toString()
)
if (showPolicy) {
managedIndex.policy?.let { appliedPolicies[managedIndex.index] = it }
managedIndex.policy.let { appliedPolicies[managedIndex.index] = it }
}
if (validateAction) {
managedIndex.policy?.let { policiesforValidation[managedIndex.index] = it }
managedIndex.policy.let { policiesforValidation[managedIndex.index] = it }
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ class TransportRetryFailedManagedIndexAction @Inject constructor(
override fun onResponse(response: ClusterStateResponse) {
val defaultIndexMetadataService = indexMetadataProvider.services[DEFAULT_INDEX_TYPE] as DefaultIndexMetadataService
response.state.metadata.indices.forEach {
val indexUUID = defaultIndexMetadataService.getCustomIndexUUID(it.value)
val indexUUID = defaultIndexMetadataService.getIndexUUID(it.value)
indexUuidToIndexMetadata[indexUUID] = it.value
}
processResponse()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ fun managedIndexConfigIndexRequest(
uuid: String,
policyID: String,
jobInterval: Int,
policy: Policy? = null,
policy: Policy,
jobJitter: Double?
): IndexRequest {
val managedIndexConfig = ManagedIndexConfig(
Expand All @@ -74,8 +74,8 @@ fun managedIndexConfigIndexRequest(
jobEnabledTime = Instant.now(),
policyID = policyID,
policy = policy,
policySeqNo = policy?.seqNo,
policyPrimaryTerm = policy?.primaryTerm,
policySeqNo = policy.seqNo,
policyPrimaryTerm = policy.primaryTerm,
changePolicy = null,
jobJitter = jobJitter
)
Expand Down Expand Up @@ -380,18 +380,16 @@ val ManagedIndexMetaData.isPolicyCompleted: Boolean
get() = this.policyCompleted == true

/**
* We will change the policy if a change policy exists and if we are currently in
* a Transitions action (which means we're safely at the end of a state). If a
* transitionTo exists on the [ManagedIndexMetaData] it should still be fine to
* change policy as we have not actually transitioned yet. If the next action is Transition
* or if the rest API determined it was "safe", meaning the new policy has the same structure
* We will change the policy if a change policy exists and if we are currently in a Transitions action
* which means we're safely at the end of a state.
*
* If the next action is Transition or if the rest API determined it was "safe", meaning the new policy has the same structure
* of the current state, it should be safe to immediately change (even in the middle of the state).
*
* @param managedIndexMetaData current [ManagedIndexMetaData]
* @return {@code true} if we should change policy, {@code false} if not
*/
@Suppress("ReturnCount")
fun ManagedIndexConfig.shouldChangePolicy(managedIndexMetaData: ManagedIndexMetaData, actionToExecute: Action?): Boolean {
fun ManagedIndexConfig.shouldChangePolicy(actionToExecute: Action?): Boolean {
if (this.changePolicy == null) {
return false
}
Expand All @@ -400,25 +398,14 @@ fun ManagedIndexConfig.shouldChangePolicy(managedIndexMetaData: ManagedIndexMeta
return true
}

// we need this in so that we can change policy before the first transition happens so policy doesnt get completed
// before we have a chance to change policy
if (actionToExecute?.type == TransitionsAction.name) {
return true
}

if (managedIndexMetaData.actionMetaData?.name != TransitionsAction.name) {
return false
}

return true
return actionToExecute?.type == TransitionsAction.name
}

fun ManagedIndexMetaData.hasVersionConflict(managedIndexConfig: ManagedIndexConfig): Boolean =
fun ManagedIndexMetaData.hasDifferentPolicyVersion(managedIndexConfig: ManagedIndexConfig): Boolean =
this.policySeqNo != managedIndexConfig.policySeqNo || this.policyPrimaryTerm != managedIndexConfig.policyPrimaryTerm

fun ManagedIndexConfig.hasDifferentJobInterval(jobInterval: Int): Boolean {
val schedule = this.schedule
when (schedule) {
when (val schedule = this.schedule) {
is IntervalSchedule -> {
return schedule.interval != jobInterval
}
Expand All @@ -427,13 +414,13 @@ fun ManagedIndexConfig.hasDifferentJobInterval(jobInterval: Int): Boolean {
}

/**
* A policy is safe to change to a new policy when each policy has the current state
* the [ManagedIndexConfig] is in and that state has the same actions in the same order.
* A policy is safe to change to a new policy when
* both policies have the current state the [ManagedIndexConfig] is in and that state has the same actions in the same order.
* This allows simple things like configuration updates to happen which won't break the execution/contract
* between [ManagedIndexMetaData] and [ManagedIndexConfig] as the metadata only knows about the current state.
* We never consider a policy safe to immediately change if the ChangePolicy contains a state to transition to
* as this could transition a user into a different state from the middle of the current state which we do not
* want to allow.
*
* If the ChangePolicy contains a state to transition to, we don't consider it's safe to change here
* as this may transition a user into a different state from the middle of the current state.
*
* @param stateName the name of the state the [ManagedIndexConfig] is currently in
* @param newPolicy the new (actual data model) policy we will eventually try to change to
Expand All @@ -442,20 +429,19 @@ fun ManagedIndexConfig.hasDifferentJobInterval(jobInterval: Int): Boolean {
*/
@Suppress("ReturnCount")
fun Policy.isSafeToChange(stateName: String?, newPolicy: Policy, changePolicy: ChangePolicy): Boolean {
// if stateName is null it means we either have not initialized the job (no metadata to pull stateName from)
// if stateName is null it means we either have not initialized the job
// or we failed to load the initial policy, both cases its safe to change the policy
if (stateName == null) return true
if (changePolicy.state != null) return false

val currentState = this.states.find { it.name == stateName }
val newState = newPolicy.states.find { it.name == stateName }
if (currentState == null || newState == null) {
return false
}

if (currentState.actions.size != newState.actions.size) {
return false
}

currentState.actions.forEachIndexed { index, action ->
val newStateAction = newState.actions[index]
if (action.type != newStateAction.type) return@isSafeToChange false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ object RollupFieldValueExpressionResolver {
private lateinit var scriptService: ScriptService
private lateinit var clusterService: ClusterService
lateinit var indexAliasUtils: IndexAliasUtils

fun resolve(rollup: Rollup, fieldValue: String): String {
val script = Script(ScriptType.INLINE, Script.DEFAULT_TEMPLATE_LANG, fieldValue, mapOf())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,8 +335,7 @@ fun randomManagedIndexConfig(
schedule: Schedule = IntervalSchedule(Instant.ofEpochMilli(Instant.now().toEpochMilli()), 5, ChronoUnit.MINUTES),
lastUpdatedTime: Instant = Instant.now().truncatedTo(ChronoUnit.MILLIS),
enabledTime: Instant? = if (enabled) Instant.now().truncatedTo(ChronoUnit.MILLIS) else null,
policyID: String = OpenSearchRestTestCase.randomAlphaOfLength(10),
policy: Policy? = randomPolicy(),
policy: Policy = randomPolicy(),
changePolicy: ChangePolicy? = randomChangePolicy(),
jitter: Double? = 0.0
): ManagedIndexConfig {
Expand All @@ -348,10 +347,10 @@ fun randomManagedIndexConfig(
jobSchedule = schedule,
jobLastUpdatedTime = lastUpdatedTime,
jobEnabledTime = enabledTime,
policyID = policy?.id ?: policyID,
policySeqNo = policy?.seqNo,
policyPrimaryTerm = policy?.primaryTerm,
policy = policy?.copy(seqNo = SequenceNumbers.UNASSIGNED_SEQ_NO, primaryTerm = SequenceNumbers.UNASSIGNED_PRIMARY_TERM),
policyID = policy.id,
policySeqNo = policy.seqNo,
policyPrimaryTerm = policy.primaryTerm,
policy = policy.copy(seqNo = SequenceNumbers.UNASSIGNED_SEQ_NO, primaryTerm = SequenceNumbers.UNASSIGNED_PRIMARY_TERM),
changePolicy = changePolicy,
jobJitter = jitter
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,10 @@ class RestChangePolicyActionIT : IndexStateManagementRestTestCase() {
// Will use the unique generated description to ensure they are the same policies, the cached policy does not have
// id, seqNo, primaryTerm on the policy itself so cannot directly compare
// TODO: figure out why the newPolicy.lastUpdatedTime and cached policy lastUpdatedTime is off by a few milliseconds
assertEquals("Initialized policy is not the change policy", newPolicy.description, updatedManagedIndexConfig.policy?.description)
assertEquals(
"Initialized policy is not the change policy", newPolicy.description,
updatedManagedIndexConfig.policy.description
)
}

fun `test changing policy on a valid index and log pattern`() {
Expand Down Expand Up @@ -301,7 +304,7 @@ class RestChangePolicyActionIT : IndexStateManagementRestTestCase() {
updateManagedIndexConfigStartTime(managedIndexConfig)

// After first execution we should expect the change policy to still be null (since we haven't called it yet)
// and the initial policy should of been cached
// and the initial policy should have been cached
val executedManagedIndexConfig: ManagedIndexConfig = waitFor {
val config = getManagedIndexConfigByDocId(managedIndexConfig.id)
assertNotNull("Executed managed index config is null", config)
Expand Down Expand Up @@ -346,7 +349,6 @@ class RestChangePolicyActionIT : IndexStateManagementRestTestCase() {
// speed up to second execution we will have a ChangePolicy but not be in Transitions yet
// which means we should still execute the ReadOnlyAction
updateManagedIndexConfigStartTime(managedIndexConfig)

waitFor {
val config = getManagedIndexConfigByDocId(managedIndexConfig.id)
assertNotNull("Next managed index config is null", config)
Expand Down Expand Up @@ -386,7 +388,6 @@ class RestChangePolicyActionIT : IndexStateManagementRestTestCase() {

// speed up to third execution so that we try to move to transitions and trigger a change policy
updateManagedIndexConfigStartTime(managedIndexConfig)

val changedManagedIndexConfig: ManagedIndexConfig = waitFor {
val config = getManagedIndexConfigByDocId(managedIndexConfig.id)
assertNotNull("Changed managed index config is null", config)
Expand Down Expand Up @@ -512,9 +513,7 @@ class RestChangePolicyActionIT : IndexStateManagementRestTestCase() {
RestRequest.Method.POST.toString(),
"${RestChangePolicyAction.CHANGE_POLICY_BASE_URI}/$index", emptyMap(), changePolicy.toHttpEntity()
)

assertAffectedIndicesResponseIsEqual(mapOf(FAILURES to false, FAILED_INDICES to emptyList<Any>(), UPDATED_INDICES to 1), response.asMap())

waitFor { assertNotNull(getExistingManagedIndexConfig(index).changePolicy) }

// speed up to first execution where we initialize the policy on the job
Expand All @@ -529,7 +528,10 @@ class RestChangePolicyActionIT : IndexStateManagementRestTestCase() {
// Will use the unique generated description to ensure they are the same policies, the cached policy does not have
// id, seqNo, primaryTerm on the policy itself so cannot directly compare
// TODO: figure out why the newPolicy.lastUpdatedTime and cached policy lastUpdatedTime is off by a few milliseconds
assertEquals("Initialized policy is not the change policy", newPolicy.description, config.policy?.description)
assertEquals(
"Initialized policy is not the change policy", newPolicy.description,
config.policy.description
)
config
}

Expand Down
Loading

0 comments on commit ebd2fe5

Please sign in to comment.