forked from opensearch-project/index-management
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Adds plugin version sweep background job (opensearch-project#434)
* [207]: Added 5 min scheduled job for sweeping ISM plugin version in the case of version discrepancy Signed-off-by: Stevan Buzejic <buzejic.stevan@gmail.com> * [207]: Created pluginVersionSweepCoordinator component responsible for scheduling the skip execution task. Annotated tests in order to prevent thread leak error during integrational tests Signed-off-by: Stevan Buzejic <buzejic.stevan@gmail.com> * [207]: Increased retry period for background job that sets the skip flag up to 5 mins Signed-off-by: Stevan Buzejic <buzejic.stevan@gmail.com> * Empty-Commit Signed-off-by: Stevan Buzejic <buzejic.stevan@gmail.com> Signed-off-by: Stevan Buzejic <buzejic.stevan@gmail.com> Co-authored-by: Stevan Buzejic <buzejic.stevan@gmail.com>
- Loading branch information
Showing
6 changed files
with
116 additions
and
27 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
91 changes: 91 additions & 0 deletions
91
...tlin/org/opensearch/indexmanagement/indexstatemanagement/PluginVersionSweepCoordinator.kt
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,91 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.indexmanagement.indexstatemanagement | ||
|
||
import kotlinx.coroutines.CoroutineName | ||
import kotlinx.coroutines.CoroutineScope | ||
import kotlinx.coroutines.Dispatchers | ||
import kotlinx.coroutines.SupervisorJob | ||
import kotlinx.coroutines.launch | ||
import org.apache.logging.log4j.LogManager | ||
import org.opensearch.cluster.ClusterChangedEvent | ||
import org.opensearch.cluster.ClusterStateListener | ||
import org.opensearch.cluster.service.ClusterService | ||
import org.opensearch.common.component.LifecycleListener | ||
import org.opensearch.common.settings.Settings | ||
import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings | ||
import org.opensearch.indexmanagement.util.OpenForTesting | ||
import org.opensearch.threadpool.Scheduler | ||
import org.opensearch.threadpool.ThreadPool | ||
|
||
class PluginVersionSweepCoordinator( | ||
private val skipExecution: SkipExecution, | ||
settings: Settings, | ||
private val threadPool: ThreadPool, | ||
clusterService: ClusterService, | ||
) : CoroutineScope by CoroutineScope(SupervisorJob() + Dispatchers.Default + CoroutineName("ISMPluginSweepCoordinator")), | ||
LifecycleListener(), | ||
ClusterStateListener { | ||
private val logger = LogManager.getLogger(javaClass) | ||
|
||
private var scheduledSkipExecution: Scheduler.Cancellable? = null | ||
|
||
@Volatile | ||
private var sweepSkipPeriod = ManagedIndexSettings.SWEEP_SKIP_PERIOD.get(settings) | ||
|
||
@Volatile | ||
private var indexStateManagementEnabled = ManagedIndexSettings.INDEX_STATE_MANAGEMENT_ENABLED.get(settings) | ||
|
||
init { | ||
clusterService.addLifecycleListener(this) | ||
clusterService.addListener(this) | ||
clusterService.clusterSettings.addSettingsUpdateConsumer(ManagedIndexSettings.SWEEP_SKIP_PERIOD) { | ||
sweepSkipPeriod = it | ||
initBackgroundSweepISMPluginVersionExecution() | ||
} | ||
} | ||
|
||
override fun afterStart() { | ||
initBackgroundSweepISMPluginVersionExecution() | ||
} | ||
|
||
override fun beforeStop() { | ||
scheduledSkipExecution?.cancel() | ||
} | ||
|
||
override fun clusterChanged(event: ClusterChangedEvent) { | ||
if (event.nodesChanged() || event.isNewCluster) { | ||
skipExecution.sweepISMPluginVersion() | ||
initBackgroundSweepISMPluginVersionExecution() | ||
} | ||
} | ||
|
||
@OpenForTesting | ||
fun initBackgroundSweepISMPluginVersionExecution() { | ||
// If ISM is disabled return early | ||
if (!isIndexStateManagementEnabled()) return | ||
// Cancel existing background sweep | ||
scheduledSkipExecution?.cancel() | ||
val scheduledJob = Runnable { | ||
launch { | ||
try { | ||
if (!skipExecution.flag) { | ||
logger.info("Canceling sweep ism plugin version job") | ||
scheduledSkipExecution?.cancel() | ||
} else { | ||
skipExecution.sweepISMPluginVersion() | ||
} | ||
} catch (e: Exception) { | ||
logger.error("Failed to sweep ism plugin version", e) | ||
} | ||
} | ||
} | ||
scheduledSkipExecution = | ||
threadPool.scheduleWithFixedDelay(scheduledJob, sweepSkipPeriod, ThreadPool.Names.MANAGEMENT) | ||
} | ||
|
||
private fun isIndexStateManagementEnabled(): Boolean = indexStateManagementEnabled == true | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters