Skip to content

Commit

Permalink
Fix LRON concurrent indexing throw ResourceAlreadyExists (opensearch-…
Browse files Browse the repository at this point in the history
…project#831)

* fix LRON unwrap exception and some IT

Signed-off-by: zhichao-aws <zhichaog@amazon.com>

* remove useless log

Signed-off-by: zhichao-aws <zhichaog@amazon.com>

---------

Signed-off-by: zhichao-aws <zhichaog@amazon.com>
Co-authored-by: Hailong Cui <ihailong@amazon.com>
  • Loading branch information
zhichao-aws and Hailong-am committed Jul 3, 2023
1 parent bdc9dad commit f07fa2b
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.indexmanagement.controlcenter.notification

import org.opensearch.ExceptionsHelper
import org.opensearch.ResourceAlreadyExistsException
import org.opensearch.action.ActionListener
import org.opensearch.action.admin.indices.create.CreateIndexRequest
Expand All @@ -31,7 +32,7 @@ class ControlCenterIndices(
indexRequest,
object : ActionListener<CreateIndexResponse> {
override fun onFailure(e: Exception) {
if (e is ResourceAlreadyExistsException) {
if (ExceptionsHelper.unwrapCause(e) is ResourceAlreadyExistsException) {
/* if two request create the control center index at the same time, may raise this exception */
/* but we don't take it as error */
actionListener.onResponse(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@ import org.opensearch.indexmanagement.IndexManagementPlugin
import org.opensearch.indexmanagement.IndexManagementRestTestCase
import org.opensearch.indexmanagement.controlcenter.notification.initNodeIdsInRestIT
import org.opensearch.indexmanagement.controlcenter.notification.model.LRONConfig
import org.opensearch.indexmanagement.controlcenter.notification.nodeIdsInRestIT
import org.opensearch.indexmanagement.controlcenter.notification.randomLRONConfig
import org.opensearch.indexmanagement.controlcenter.notification.randomTaskId
import org.opensearch.indexmanagement.controlcenter.notification.toJsonString
import org.opensearch.indexmanagement.makeRequest
import org.opensearch.rest.RestStatus
Expand All @@ -30,8 +27,6 @@ abstract class LRONConfigRestTestCase : IndexManagementRestTestCase() {
setDebugLogLevel()
/* init cluster node ids in integ test */
initNodeIdsInRestIT(client())
/* index a random doc to create .opensearch-control-center index */
createLRONConfig(randomLRONConfig(taskId = randomTaskId(nodeId = nodeIdsInRestIT.random())))
}

@After
Expand Down Expand Up @@ -74,8 +69,13 @@ abstract class LRONConfigRestTestCase : IndexManagementRestTestCase() {

companion object {
@AfterClass
@JvmStatic fun clearIndicesAfterClass() {
wipeAllIndices()
@JvmStatic fun removeControlCenterIndex() {
try {
adminClient().makeRequest("DELETE", IndexManagementPlugin.CONTROL_CENTER_INDEX, emptyMap())
} catch (e: ResponseException) {
/* ignore if the index has not been created */
assertEquals("Unexpected status", RestStatus.NOT_FOUND, RestStatus.fromCode(e.response.statusLine.statusCode))
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ class RestDeleteLRONConfigActionIT : LRONConfigRestTestCase() {
}

fun `test delete nonexist LRONConfig response`() {
/* index a random doc to create .opensearch-control-center index */
createLRONConfig(randomLRONConfig(taskId = randomTaskId(nodeId = nodeIdsInRestIT.random())))
val lronConfig = randomLRONConfig(taskId = randomTaskId(nodeId = nodeIdsInRestIT.random()))
val response = client().makeRequest("DELETE", getResourceURI(lronConfig.taskId, lronConfig.actionName))
assertEquals("delete LRONConfig failed", RestStatus.OK, response.restStatus())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ class RestGetLRONConfigActionIT : LRONConfigRestTestCase() {
}

fun `test get nonexist LRONConfig fails`() {
/* index a random doc to create .opensearch-control-center index */
createLRONConfig(randomLRONConfig(taskId = randomTaskId(nodeId = nodeIdsInRestIT.random())))
try {
val lronConfig = randomLRONConfig(taskId = randomTaskId(nodeId = nodeIdsInRestIT.random()))
client().makeRequest("GET", getResourceURI(lronConfig.taskId, lronConfig.actionName))
Expand All @@ -51,7 +53,7 @@ class RestGetLRONConfigActionIT : LRONConfigRestTestCase() {

fun `test get all LRONConfigs`() {
/* LRONConfigRestTestCase index a doc to auto create the index, here we wipe the index before count doc number */
wipeAllIndices()
removeControlCenterIndex()
val lronConfigResponses = randomList(1, 15) {
createLRONConfig(randomLRONConfig(taskId = randomTaskId(nodeId = nodeIdsInRestIT.random()))).asMap()
}
Expand Down Expand Up @@ -90,16 +92,11 @@ class RestGetLRONConfigActionIT : LRONConfigRestTestCase() {
}

fun `test get all LRONConfig if index not exists`() {
try {
wipeAllIndices()
val response = client().makeRequest("GET", IndexManagementPlugin.LRON_BASE_URI)
assertEquals("get LRONConfigs failed", RestStatus.OK, response.restStatus())
val responseBody = response.asMap()
val totalNumber = responseBody["total_number"]
OpenSearchTestCase.assertEquals("wrong LRONConfigs number", 0, totalNumber)
} finally {
/* index a random doc to create .opensearch-control-center index */
createLRONConfig(randomLRONConfig(taskId = randomTaskId(nodeId = nodeIdsInRestIT.random())))
}
removeControlCenterIndex()
val response = client().makeRequest("GET", IndexManagementPlugin.LRON_BASE_URI)
assertEquals("get LRONConfigs failed", RestStatus.OK, response.restStatus())
val responseBody = response.asMap()
val totalNumber = responseBody["total_number"]
OpenSearchTestCase.assertEquals("wrong LRONConfigs number", 0, totalNumber)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@

package org.opensearch.indexmanagement.controlcenter.notification.resthandler

import kotlinx.coroutines.asCoroutineDispatcher
import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.runBlocking
import org.junit.Assert
import org.opensearch.client.ResponseException
import org.opensearch.common.xcontent.XContentType
Expand All @@ -21,6 +25,7 @@ import org.opensearch.indexmanagement.makeRequest
import org.opensearch.indexmanagement.opensearchapi.convertToMap
import org.opensearch.indexmanagement.util.DRY_RUN
import org.opensearch.rest.RestStatus
import java.util.concurrent.Executors

@Suppress("UNCHECKED_CAST")
class RestIndexLRONConfigActionIT : LRONConfigRestTestCase() {
Expand Down Expand Up @@ -155,21 +160,21 @@ class RestIndexLRONConfigActionIT : LRONConfigRestTestCase() {
}

fun `test autocreate index for indexLRONConfig action`() {
wipeAllIndices()
removeControlCenterIndex()
val lronConfig = randomLRONConfig(taskId = randomTaskId(nodeId = nodeIdsInRestIT.random()))
var response = createLRONConfig(lronConfig)
assertEquals("Create LRONConfig failed", RestStatus.OK, response.restStatus())
wipeAllIndices()
removeControlCenterIndex()
response = client().makeRequest(
"PUT",
getResourceURI(lronConfig.taskId, lronConfig.actionName),
lronConfig.toHttpEntity()
)
assertEquals("Create LRONConfig failed", RestStatus.OK, response.restStatus())
wipeAllIndices()
}

fun `test mappings after LRONConfig creation`() {
removeControlCenterIndex()
val lronConfig = randomLRONConfig(taskId = randomTaskId(nodeId = nodeIdsInRestIT.random()))
createLRONConfig(lronConfig)

Expand All @@ -185,4 +190,29 @@ class RestIndexLRONConfigActionIT : LRONConfigRestTestCase() {

assertEquals("Mappings are different", expectedMap, mappingsMap)
}

fun `test concurrent indexing requests auto create index and not throw exception`() {
removeControlCenterIndex()
val threadSize = 5
val lronConfigs = List(threadSize) { randomLRONConfig(taskId = randomTaskId(nodeId = nodeIdsInRestIT.random())) }
val threadPool = Executors.newFixedThreadPool(threadSize)
try {
runBlocking {
val dispatcher = threadPool.asCoroutineDispatcher()
val responses = lronConfigs.map {
async(dispatcher) {
createLRONConfig(it)
}
}.awaitAll()
responses.forEach { assertEquals("Create LRONConfig failed", RestStatus.OK, it.restStatus()) }
}
} finally {
threadPool.shutdown()
}
val response = client().makeRequest("GET", IndexManagementPlugin.LRON_BASE_URI)
assertEquals("get LRONConfigs failed", RestStatus.OK, response.restStatus())
val responseBody = response.asMap()
val totalNumber = responseBody["total_number"]
assertEquals("wrong LRONConfigs number", threadSize, totalNumber)
}
}

0 comments on commit f07fa2b

Please sign in to comment.