Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Removing Usages of Action Get Call and using listeners #100

Merged
merged 1 commit into from
Jul 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[![Test and Build Workflow](https://github.com/opensearch-project/index-management/workflows/Test%20and%20Build%20Workflow/badge.svg)](https://github.com/opensearch-project/index-management/actions)
[![codecov](https://codecov.io/gh/opensearch-project/index-management/branch/main/graph/badge.svg)](https://codecov.io/gh/opensearch-project/index-management)
[![Documentation](https://img.shields.io/badge/api-reference-blue.svg)](https://docs-beta.opensearch.org/im-plugin/index/)
[![Documentation](https://img.shields.io/badge/api-reference-blue.svg)](https://opensearch.org/docs/im-plugin/index/)
[![Chat](https://img.shields.io/badge/chat-on%20forums-blue)](https://discuss.opendistrocommunity.dev/c/index-management/)
![PRs welcome!](https://img.shields.io/badge/PRs-welcome!-success)

Expand Down Expand Up @@ -58,7 +58,7 @@ See [developer guide](DEVELOPER_GUIDE.md) and [how to contribute to this project

If you find a bug, or have a feature request, please don't hesitate to open an issue in this repository.

For more information, see [project website](https://opensearch.org/) and [documentation](https://docs-beta.opensearch.org/). If you need help and are unsure where to open an issue, try [forums](https://discuss.opendistrocommunity.dev/).
For more information, see [project website](https://opensearch.org/) and [documentation](https://opensearch.org/docs/). If you need help and are unsure where to open an issue, try [forums](https://discuss.opendistrocommunity.dev/).

## Code of Conduct

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,21 +30,22 @@ import org.apache.logging.log4j.LogManager
import org.opensearch.action.ActionListener
import org.opensearch.action.DocWriteRequest
import org.opensearch.action.admin.cluster.state.ClusterStateRequest
import org.opensearch.action.admin.cluster.state.ClusterStateResponse
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest
import org.opensearch.action.admin.indices.rollover.RolloverRequest
import org.opensearch.action.admin.indices.rollover.RolloverResponse
import org.opensearch.action.bulk.BulkRequest
import org.opensearch.action.bulk.BulkResponse
import org.opensearch.action.index.IndexRequest
import org.opensearch.action.support.IndicesOptions
import org.opensearch.action.support.master.AcknowledgedResponse
import org.opensearch.client.Client
import org.opensearch.cluster.LocalNodeMasterListener
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.settings.Settings
import org.opensearch.common.xcontent.ToXContent
import org.opensearch.common.xcontent.XContentFactory
import org.opensearch.common.xcontent.XContentType
import org.opensearch.index.IndexNotFoundException
import org.opensearch.indexmanagement.IndexManagementIndices
import org.opensearch.indexmanagement.IndexManagementPlugin
import org.opensearch.indexmanagement.indexstatemanagement.model.ManagedIndexMetaData
Expand All @@ -61,6 +62,7 @@ import org.opensearch.threadpool.ThreadPool
import java.time.Instant

@OpenForTesting
@Suppress("TooManyFunctions")
class IndexStateManagementHistory(
settings: Settings,
private val client: Client,
Expand Down Expand Up @@ -176,7 +178,6 @@ class IndexStateManagementHistory(

@Suppress("SpreadOperator", "NestedBlockDepth", "ComplexMethod")
private fun deleteOldHistoryIndex() {
val indexToDelete = mutableListOf<String>()

val clusterStateRequest = ClusterStateRequest()
.clear()
Expand All @@ -185,8 +186,28 @@ class IndexStateManagementHistory(
.local(true)
.indicesOptions(IndicesOptions.strictExpand())

val clusterStateResponse = client.admin().cluster().state(clusterStateRequest).actionGet()
client.admin().cluster().state(
clusterStateRequest,
object : ActionListener<ClusterStateResponse> {
override fun onResponse(clusterStateResponse: ClusterStateResponse) {
if (!clusterStateResponse.state.metadata.indices.isEmpty) {
val indicesToDelete = getIndicesToDelete(clusterStateResponse)
logger.info("Deleting old history indices viz $indicesToDelete")
deleteAllOldHistoryIndices(indicesToDelete)
} else {
logger.info("No Old History Indices to delete")
}
}

override fun onFailure(exception: Exception) {
logger.error("Error fetching cluster state ${exception.message}")
}
}
)
}

private fun getIndicesToDelete(clusterStateResponse: ClusterStateResponse): List<String> {
var indicesToDelete = mutableListOf<String>()
for (entry in clusterStateResponse.state.metadata.indices()) {
val indexMetaData = entry.value
val creationTime = indexMetaData.creationDate
Expand All @@ -198,27 +219,51 @@ class IndexStateManagementHistory(
continue
}

indexToDelete.add(indexMetaData.index.name)
indicesToDelete.add(indexMetaData.index.name)
}
}
return indicesToDelete
}

@Suppress("SpreadOperator")
private fun deleteAllOldHistoryIndices(indicesToDelete: List<String>) {
if (indicesToDelete.isNotEmpty()) {
val deleteRequest = DeleteIndexRequest(*indicesToDelete.toTypedArray())
client.admin().indices().delete(
deleteRequest,
object : ActionListener<AcknowledgedResponse> {
override fun onResponse(deleteIndicesResponse: AcknowledgedResponse) {
if (!deleteIndicesResponse.isAcknowledged) {
logger.error("could not delete one or more ISM history index. $indicesToDelete. Retrying one by one.")
deleteOldHistoryIndex(indicesToDelete)
}
}
override fun onFailure(exception: Exception) {
logger.error("Error deleting old history indices ${exception.message}")
deleteOldHistoryIndex(indicesToDelete)
}
}
)
}
}

if (indexToDelete.isNotEmpty()) {
val deleteRequest = DeleteIndexRequest(*indexToDelete.toTypedArray())
val deleteResponse = client.admin().indices().delete(deleteRequest).actionGet()
if (!deleteResponse.isAcknowledged) {
logger.error("could not delete one or more ISM history index. $indexToDelete. Retrying one by one.")
for (index in indexToDelete) {
try {
val singleDeleteRequest = DeleteIndexRequest(*indexToDelete.toTypedArray())
val singleDeleteResponse = client.admin().indices().delete(singleDeleteRequest).actionGet()
@Suppress("SpreadOperator")
private fun deleteOldHistoryIndex(indicesToDelete: List<String>) {
for (index in indicesToDelete) {
val singleDeleteRequest = DeleteIndexRequest(*indicesToDelete.toTypedArray())
client.admin().indices().delete(
singleDeleteRequest,
object : ActionListener<AcknowledgedResponse> {
override fun onResponse(singleDeleteResponse: AcknowledgedResponse) {
if (!singleDeleteResponse.isAcknowledged) {
logger.error("could not delete one or more ISM history index. $index.")
}
} catch (e: IndexNotFoundException) {
logger.debug("$index was already deleted. ${e.message}")
}
override fun onFailure(exception: Exception) {
logger.debug("Exception ${exception.message} while deleting the index $index")
}
}
}
)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class AttemptRolloverStep(

override fun isIdempotent() = false

@Suppress("TooGenericExceptionCaught")
@Suppress("ComplexMethod", "LongMethod", "TooGenericExceptionCaught")
override suspend fun execute(): AttemptRolloverStep {
val skipRollover = clusterService.state().metadata.index(indexName).getRolloverSkip()
if (skipRollover) {
Expand Down Expand Up @@ -278,6 +278,7 @@ class AttemptRolloverStep(
)
}

@Suppress("TooManyFunctions")
companion object {
fun getFailedMessage(index: String) = "Failed to rollover index [index=$index]"
fun getFailedAliasUpdateMessage(index: String, newIndex: String) =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class TransportGetTransformAction @Inject constructor(
GetTransformAction.NAME, transportService, actionFilters, ::GetTransformRequest
) {

@Suppress("ReturnCount")
override fun doExecute(task: Task, request: GetTransformRequest, listener: ActionListener<GetTransformResponse>) {
val getRequest = GetRequest(INDEX_MANAGEMENT_INDEX, request.id)
.fetchSourceContext(request.srcContext).preference(request.preference)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ class TransportIndexTransformAction @Inject constructor(
return modified.toList()
}

@Suppress("SpreadOperator")
private fun putTransform() {
val transform = request.transform.copy(schemaVersion = IndexUtils.indexManagementConfigSchemaVersion)
request.index(INDEX_MANAGEMENT_INDEX)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class TransportPreviewTransformAction @Inject constructor(
PreviewTransformAction.NAME, transportService, actionFilters, ::PreviewTransformRequest
) {

@Suppress("SpreadOperator")
override fun doExecute(task: Task, request: PreviewTransformRequest, listener: ActionListener<PreviewTransformResponse>) {
val transform = request.transform

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ data class Transform(
const val TRANSFORM_DOC_ID_FIELD = "$TRANSFORM_TYPE._id"
const val TRANSFORM_DOC_COUNT_FIELD = "$TRANSFORM_TYPE._doc_count"

@Suppress("LongMethod")
@Suppress("ComplexMethod", "LongMethod")
@JvmStatic
@JvmOverloads
fun parse(
Expand Down