Skip to content

Commit

Permalink
Update actionGet to SuspendUntil for ClusterMetrics (#1067)
Browse files Browse the repository at this point in the history
Signed-off-by: Ashish Agrawal <ashisagr@amazon.com>
  • Loading branch information
lezzago authored Aug 2, 2023
1 parent 5fbb18a commit 3baf51a
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.opensearch.action.admin.indices.recovery.RecoveryResponse
import org.opensearch.action.admin.indices.settings.get.GetSettingsResponse
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse
import org.opensearch.alerting.opensearchapi.convertToMap
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.settings.SupportedClusterMetricsSettings
import org.opensearch.alerting.settings.SupportedClusterMetricsSettings.Companion.resolveToActionRequest
import org.opensearch.client.Client
Expand All @@ -42,49 +43,50 @@ import kotlin.collections.HashMap
* @param client The [Client] used to call the respective transport action.
* @throws IllegalArgumentException When the requested API is not supported by this feature.
*/
fun executeTransportAction(clusterMetricsInput: ClusterMetricsInput, client: Client): ActionResponse {
suspend fun executeTransportAction(clusterMetricsInput: ClusterMetricsInput, client: Client): ActionResponse {
val request = resolveToActionRequest(clusterMetricsInput)
return when (clusterMetricsInput.clusterMetricType) {
ClusterMetricsInput.ClusterMetricType.CAT_INDICES -> {
request as CatIndicesRequestWrapper
val healthResponse: ClusterHealthResponse =
client.admin().cluster().health(request.clusterHealthRequest).get()
val healthResponse: ClusterHealthResponse = client.suspendUntil { admin().cluster().health(request.clusterHealthRequest, it) }
val indexSettingsResponse: GetSettingsResponse =
client.admin().indices().getSettings(request.indexSettingsRequest).get()
client.suspendUntil { admin().indices().getSettings(request.indexSettingsRequest, it) }
val indicesResponse: IndicesStatsResponse =
client.admin().indices().stats(request.indicesStatsRequest).get()
client.suspendUntil { admin().indices().stats(request.indicesStatsRequest, it) }
val stateResponse: ClusterStateResponse =
client.admin().cluster().state(request.clusterStateRequest).get()
client.suspendUntil { admin().cluster().state(request.clusterStateRequest, it) }
return CatIndicesResponseWrapper(healthResponse, stateResponse, indexSettingsResponse, indicesResponse)
}
ClusterMetricsInput.ClusterMetricType.CAT_PENDING_TASKS ->
client.admin().cluster().pendingClusterTasks(request as PendingClusterTasksRequest).get()
client.suspendUntil<Client, PendingClusterTasksResponse> {
admin().cluster().pendingClusterTasks(request as PendingClusterTasksRequest, it)
}
ClusterMetricsInput.ClusterMetricType.CAT_RECOVERY ->
client.admin().indices().recoveries(request as RecoveryRequest).get()
client.suspendUntil<Client, RecoveryResponse> { admin().indices().recoveries(request as RecoveryRequest, it) }
ClusterMetricsInput.ClusterMetricType.CAT_SHARDS -> {
request as CatShardsRequestWrapper
val stateResponse: ClusterStateResponse =
client.admin().cluster().state(request.clusterStateRequest).get()
client.suspendUntil { admin().cluster().state(request.clusterStateRequest, it) }
val indicesResponse: IndicesStatsResponse =
client.admin().indices().stats(request.indicesStatsRequest).get()
client.suspendUntil { admin().indices().stats(request.indicesStatsRequest, it) }
return CatShardsResponseWrapper(stateResponse, indicesResponse)
}
ClusterMetricsInput.ClusterMetricType.CAT_SNAPSHOTS ->
client.admin().cluster().getSnapshots(request as GetSnapshotsRequest).get()
client.suspendUntil<Client, GetSnapshotsResponse> { admin().cluster().getSnapshots(request as GetSnapshotsRequest, it) }
ClusterMetricsInput.ClusterMetricType.CAT_TASKS ->
client.admin().cluster().listTasks(request as ListTasksRequest).get()
client.suspendUntil<Client, ListTasksResponse> { admin().cluster().listTasks(request as ListTasksRequest, it) }
ClusterMetricsInput.ClusterMetricType.CLUSTER_HEALTH ->
client.admin().cluster().health(request as ClusterHealthRequest).get()
client.suspendUntil<Client, ClusterHealthResponse> { admin().cluster().health(request as ClusterHealthRequest, it) }
ClusterMetricsInput.ClusterMetricType.CLUSTER_SETTINGS -> {
val stateResponse: ClusterStateResponse =
client.admin().cluster().state(request as ClusterStateRequest).get()
client.suspendUntil { admin().cluster().state(request as ClusterStateRequest, it) }
val metadata: Metadata = stateResponse.state.metadata
return ClusterGetSettingsResponse(metadata.persistentSettings(), metadata.transientSettings(), Settings.EMPTY)
}
ClusterMetricsInput.ClusterMetricType.CLUSTER_STATS ->
client.admin().cluster().clusterStats(request as ClusterStatsRequest).get()
client.suspendUntil<Client, ClusterStatsResponse> { admin().cluster().clusterStats(request as ClusterStatsRequest, it) }
ClusterMetricsInput.ClusterMetricType.NODES_STATS ->
client.admin().cluster().nodesStats(request as NodesStatsRequest).get()
client.suspendUntil<Client, NodesStatsResponse> { admin().cluster().nodesStats(request as NodesStatsRequest, it) }
else -> throw IllegalArgumentException("Unsupported API request type: ${request.javaClass.name}")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class CatIndicesWrappersIT : OpenSearchSingleNodeTestCase() {
assertThrows(IllegalArgumentException::class.java) { CatIndicesRequestWrapper(pathParams = pathParams) }
}

fun `test CatIndicesResponseWrapper returns with only indices in pathParams`() {
suspend fun `test CatIndicesResponseWrapper returns with only indices in pathParams`() {
// GIVEN
val testIndices = (1..5).map {
"test-index${randomAlphaOfLength(10).lowercase()}" to randomIntBetween(1, 10)
Expand Down Expand Up @@ -125,7 +125,7 @@ class CatIndicesWrappersIT : OpenSearchSingleNodeTestCase() {
}
}

fun `test CatIndicesResponseWrapper returns with all indices when empty pathParams`() {
suspend fun `test CatIndicesResponseWrapper returns with all indices when empty pathParams`() {
// GIVEN
val testIndices = (1..5).map {
"test-index${randomAlphaOfLength(10).lowercase()}" to randomIntBetween(1, 10)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class CatShardsWrappersIT : OpenSearchSingleNodeTestCase() {
assertThrows(IllegalArgumentException::class.java) { CatShardsRequestWrapper(pathParams = pathParams) }
}

fun `test CatShardsResponseWrapper returns with only indices in pathParams`() {
suspend fun `test CatShardsResponseWrapper returns with only indices in pathParams`() {
// GIVEN
val testIndices = (1..5).map {
"test-index${randomAlphaOfLength(10).lowercase()}" to randomIntBetween(1, 10)
Expand Down Expand Up @@ -117,7 +117,7 @@ class CatShardsWrappersIT : OpenSearchSingleNodeTestCase() {
}
}

fun `test CatShardsResponseWrapper returns with all indices when empty pathParams`() {
suspend fun `test CatShardsResponseWrapper returns with all indices when empty pathParams`() {
// GIVEN
val testIndices = (1..5).map {
"test-index${randomAlphaOfLength(10).lowercase()}" to randomIntBetween(1, 10)
Expand Down

0 comments on commit 3baf51a

Please sign in to comment.