Skip to content

Commit

Permalink
Security improvements (opensearch-project#126)
Browse files Browse the repository at this point in the history
Signed-off-by: Ravi Thaluru <ravi1092@gmail.com>
  • Loading branch information
thalurur authored Aug 26, 2021
1 parent 88cea4a commit 11a1f73
Show file tree
Hide file tree
Showing 15 changed files with 307 additions and 144 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import org.opensearch.action.support.master.AcknowledgedResponse
import org.opensearch.client.node.NodeClient
import org.opensearch.cluster.ClusterState
import org.opensearch.cluster.block.ClusterBlockException
import org.opensearch.cluster.metadata.IndexNameExpressionResolver
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.inject.Inject
import org.opensearch.common.settings.Settings
Expand All @@ -64,7 +65,6 @@ import org.opensearch.indexmanagement.indexstatemanagement.util.FailedIndex
import org.opensearch.indexmanagement.indexstatemanagement.util.managedIndexConfigIndexRequest
import org.opensearch.indexmanagement.opensearchapi.parseFromGetResponse
import org.opensearch.indexmanagement.settings.IndexManagementSettings
import org.opensearch.indexmanagement.util.IndexManagementException
import org.opensearch.indexmanagement.util.IndexUtils
import org.opensearch.indexmanagement.util.SecurityUtils.Companion.buildUser
import org.opensearch.indexmanagement.util.SecurityUtils.Companion.userHasPermissionForResource
Expand All @@ -73,19 +73,21 @@ import org.opensearch.rest.RestStatus
import org.opensearch.tasks.Task
import org.opensearch.transport.TransportService
import java.lang.Exception
import java.lang.IllegalArgumentException
import java.time.Duration
import java.time.Instant

private val log = LogManager.getLogger(TransportAddPolicyAction::class.java)

@Suppress("SpreadOperator")
@Suppress("SpreadOperator", "ReturnCount")
class TransportAddPolicyAction @Inject constructor(
val client: NodeClient,
transportService: TransportService,
actionFilters: ActionFilters,
val settings: Settings,
val clusterService: ClusterService,
val xContentRegistry: NamedXContentRegistry
val xContentRegistry: NamedXContentRegistry,
val indexNameExpressionResolver: IndexNameExpressionResolver
) : HandledTransportAction<AddPolicyRequest, ISMStatusResponse>(
AddPolicyAction.NAME, transportService, actionFilters, ::AddPolicyRequest
) {
Expand Down Expand Up @@ -114,47 +116,80 @@ class TransportAddPolicyAction @Inject constructor(
) {
private lateinit var startTime: Instant
private lateinit var policy: Policy
private val resolvedIndices = mutableListOf<String>()
private val indicesToAdd = mutableMapOf<String, String>() // uuid: name
private val failedIndices: MutableList<FailedIndex> = mutableListOf()

fun start() {
if (!validateUserConfiguration(user, filterByEnabled, actionListener)) {
return
}
val requestedIndices = mutableListOf<String>()
request.indices.forEach { index ->
requestedIndices.addAll(
indexNameExpressionResolver.concreteIndexNames(
clusterService.state(),
IndicesOptions.lenientExpand(),
true,
index
)
)
}
if (requestedIndices.isEmpty()) {
// Nothing to do will ignore since found no matching indices
actionListener.onResponse(ISMStatusResponse(0, failedIndices))
return
}
if (user == null) {
resolvedIndices.addAll(requestedIndices)
getPolicy()
} else {
validateAndGetPolicy()
validateAndGetPolicy(0, requestedIndices)
}
}

private fun validateAndGetPolicy() {
val request = ManagedIndexRequest().indices(*request.indices.toTypedArray())
/**
* We filter the requested indices to the indices user has permission to manage and apply policies only on top of those
*/
private fun validateAndGetPolicy(current: Int, indices: List<String>) {
val request = ManagedIndexRequest().indices(indices[current])
client.execute(
ManagedIndexAction.INSTANCE,
request,
object : ActionListener<AcknowledgedResponse> {
override fun onResponse(response: AcknowledgedResponse) {
getPolicy()
resolvedIndices.add(indices[current])
proceed(current, indices)
}

override fun onFailure(e: Exception) {
actionListener.onFailure(
IndexManagementException.wrap(
when (e is OpenSearchSecurityException) {
true -> OpenSearchStatusException(
"User doesn't have required index permissions on one or more requested indices: ${e.localizedMessage}",
RestStatus.FORBIDDEN
)
false -> e
}
)
)
when (e is OpenSearchSecurityException) {
true -> {
proceed(current, indices)
}
false -> {
// failing the request for any other exception
actionListener.onFailure(e)
}
}
}
}
)
}

private fun proceed(current: Int, indices: List<String>) {
if (current < indices.count() - 1) {
validateAndGetPolicy(current + 1, indices)
} else {
// sanity check that there are indices - if none then return
if (resolvedIndices.isEmpty()) {
actionListener.onResponse(ISMStatusResponse(0, failedIndices))
return
}
getPolicy()
}
}

private fun getPolicy() {
val getRequest = GetRequest(INDEX_MANAGEMENT_INDEX, request.policyID)

Expand All @@ -171,7 +206,12 @@ class TransportAddPolicyAction @Inject constructor(
actionListener.onFailure(OpenSearchStatusException("Could not find policy=${request.policyID}", RestStatus.NOT_FOUND))
return
}
this.policy = parseFromGetResponse(response, xContentRegistry, Policy.Companion::parse)
try {
this.policy = parseFromGetResponse(response, xContentRegistry, Policy.Companion::parse)
} catch (e: IllegalArgumentException) {
actionListener.onFailure(OpenSearchStatusException("Could not find policy=${request.policyID}", RestStatus.NOT_FOUND))
return
}
if (!userHasPermissionForResource(user, policy.user, filterByEnabled, "policy", request.policyID, actionListener)) {
return
}
Expand Down Expand Up @@ -205,7 +245,7 @@ class TransportAddPolicyAction @Inject constructor(

val clusterStateRequest = ClusterStateRequest()
.clear()
.indices(*request.indices.toTypedArray())
.indices(*resolvedIndices.toTypedArray())
.metadata(true)
.local(false)
.waitForTimeout(TimeValue.timeValueMillis(ADD_POLICY_TIMEOUT_IN_MILLIS))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ import org.opensearch.rest.RestStatus
import org.opensearch.search.fetch.subphase.FetchSourceContext
import org.opensearch.tasks.Task
import org.opensearch.transport.TransportService
import java.lang.IllegalArgumentException

private val log = LogManager.getLogger(TransportChangePolicyAction::class.java)

Expand Down Expand Up @@ -173,7 +174,12 @@ class TransportChangePolicyAction @Inject constructor(
actionListener.onFailure(OpenSearchStatusException("Could not find policy=${request.changePolicy.policyID}", RestStatus.NOT_FOUND))
return
}
policy = parseFromGetResponse(response, xContentRegistry, Policy.Companion::parse)
try {
policy = parseFromGetResponse(response, xContentRegistry, Policy.Companion::parse)
} catch (e: IllegalArgumentException) {
actionListener.onFailure(OpenSearchStatusException("Could not find policy=${request.changePolicy.policyID}", RestStatus.NOT_FOUND))
return
}
if (!userHasPermissionForResource(user, policy.user, filterByEnabled, "policy", request.changePolicy.policyID, actionListener)) {
return
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@ import org.opensearch.indexmanagement.util.SecurityUtils.Companion.userHasPermis
import org.opensearch.rest.RestStatus
import org.opensearch.tasks.Task
import org.opensearch.transport.TransportService
import java.lang.IllegalArgumentException

@Suppress("ReturnCount")
class TransportDeletePolicyAction @Inject constructor(
val client: NodeClient,
transportService: TransportService,
Expand Down Expand Up @@ -84,12 +86,7 @@ class TransportDeletePolicyAction @Inject constructor(

fun start() {
client.threadPool().threadContext.stashContext().use {
if (user == null || !filterByEnabled) {
// Security is disabled or filter by is disabled
delete()
} else {
getPolicy()
}
getPolicy()
}
}

Expand All @@ -104,7 +101,13 @@ class TransportDeletePolicyAction @Inject constructor(
return
}

val policy: Policy = parseFromGetResponse(response, xContentRegistry, Policy.Companion::parse)
val policy: Policy?
try {
policy = parseFromGetResponse(response, xContentRegistry, Policy.Companion::parse)
} catch (e: IllegalArgumentException) {
actionListener.onFailure(OpenSearchStatusException("Policy ${request.policyID} is not found", RestStatus.NOT_FOUND))
return
}
if (!userHasPermissionForResource(user, policy.user, filterByEnabled, "policy", request.policyID, actionListener)) {
return
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ package org.opensearch.indexmanagement.indexstatemanagement.transport.action.exp
import org.apache.logging.log4j.LogManager
import org.opensearch.ExceptionsHelper
import org.opensearch.OpenSearchSecurityException
import org.opensearch.OpenSearchStatusException
import org.opensearch.action.ActionListener
import org.opensearch.action.admin.cluster.state.ClusterStateRequest
import org.opensearch.action.admin.cluster.state.ClusterStateResponse
Expand Down Expand Up @@ -63,9 +62,7 @@ import org.opensearch.indexmanagement.indexstatemanagement.transport.action.mana
import org.opensearch.indexmanagement.indexstatemanagement.transport.action.managedIndex.ManagedIndexRequest
import org.opensearch.indexmanagement.indexstatemanagement.util.isMetadataMoved
import org.opensearch.indexmanagement.indexstatemanagement.util.managedIndexMetadataID
import org.opensearch.indexmanagement.util.IndexManagementException
import org.opensearch.indexmanagement.util.SecurityUtils.Companion.buildUser
import org.opensearch.rest.RestStatus
import org.opensearch.search.builder.SearchSourceBuilder
import org.opensearch.search.fetch.subphase.FetchSourceContext.FETCH_SOURCE
import org.opensearch.search.sort.SortBuilders
Expand Down Expand Up @@ -312,47 +309,78 @@ class TransportExplainAction @Inject constructor(
}
managedIndicesMetaDataMap.clear()

if (user == null) {
if (user == null || indexNames.isEmpty()) {
sendResponse()
} else {
validateAndSendResponse(threadContext)
filterAndSendResponse(threadContext)
}
}

private fun validateAndSendResponse(threadContext: ThreadContext.StoredContext) {
private fun filterAndSendResponse(threadContext: ThreadContext.StoredContext) {
threadContext.restore()
val request = ManagedIndexRequest().indices(*indexNames.toTypedArray())
val filteredIndices = mutableListOf<String>()
val filteredMetadata = mutableListOf<ManagedIndexMetaData?>()
val filteredPolicies = mutableListOf<String?>()
val enabledStatus = mutableMapOf<String, Boolean>()
filter(0, filteredIndices, filteredMetadata, filteredPolicies, enabledStatus)
}

private fun filter(
current: Int,
filteredIndices: MutableList<String>,
filteredMetadata: MutableList<ManagedIndexMetaData?>,
filteredPolicies: MutableList<String?>,
enabledStatus: MutableMap<String, Boolean>
) {
val request = ManagedIndexRequest().indices(indexNames[current])
client.execute(
ManagedIndexAction.INSTANCE,
request,
object : ActionListener<AcknowledgedResponse> {
override fun onResponse(response: AcknowledgedResponse) {
sendResponse()
filteredIndices.add(indexNames[current])
filteredMetadata.add(indexMetadatas[current])
filteredPolicies.add(indexPolicyIDs[current])
enabledStatus[indexNames[current]] = enabledState.getOrDefault(indexNames[current], false)
if (current < indexNames.count() - 1) {
// do nothing - skip the index and go to next one
filter(current + 1, filteredIndices, filteredMetadata, filteredPolicies, enabledStatus)
} else {
sendResponse(filteredIndices, filteredMetadata, filteredPolicies, enabledStatus)
}
}

override fun onFailure(e: java.lang.Exception) {
actionListener.onFailure(
IndexManagementException.wrap(
when (e is OpenSearchSecurityException) {
true -> OpenSearchStatusException(
"User doesn't have required index permissions on one or more requested indices: ${e.localizedMessage}",
RestStatus.FORBIDDEN
)
false -> e
override fun onFailure(e: Exception) {
when (e is OpenSearchSecurityException) {
true -> {
if (current < indexNames.count() - 1) {
// do nothing - skip the index and go to next one
filter(current + 1, filteredIndices, filteredMetadata, filteredPolicies, enabledStatus)
} else {
sendResponse(filteredIndices, filteredMetadata, filteredPolicies, enabledStatus)
}
)
)
}
false -> {
actionListener.onFailure(e)
}
}
}
}
)
}

private fun sendResponse() {
private fun sendResponse(
indices: List<String> = indexNames,
metadata: List<ManagedIndexMetaData?> = indexMetadatas,
policies: List<String?> = indexPolicyIDs,
enabledStatus: Map<String, Boolean> = enabledState,
totalIndices: Int = totalManagedIndices
) {
if (explainAll) {
actionListener.onResponse(ExplainAllResponse(indexNames, indexPolicyIDs, indexMetadatas, totalManagedIndices, enabledState))
actionListener.onResponse(ExplainAllResponse(indices, policies, metadata, totalIndices, enabledStatus))
return
}
actionListener.onResponse(ExplainResponse(indexNames, indexPolicyIDs, indexMetadatas))
actionListener.onResponse(ExplainResponse(indices, policies, metadata))
}

private fun getMetadata(response: GetResponse?): ManagedIndexMetaData? {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ import org.opensearch.indexmanagement.util.SecurityUtils.Companion.userHasPermis
import org.opensearch.rest.RestStatus
import org.opensearch.tasks.Task
import org.opensearch.transport.TransportService
import java.lang.IllegalArgumentException

@Suppress("ReturnCount")
class TransportGetPolicyAction @Inject constructor(
val client: NodeClient,
transportService: TransportService,
Expand Down Expand Up @@ -104,7 +106,13 @@ class TransportGetPolicyAction @Inject constructor(
return
}

val policy: Policy = parseFromGetResponse(response, xContentRegistry, Policy.Companion::parse)
val policy: Policy?
try {
policy = parseFromGetResponse(response, xContentRegistry, Policy.Companion::parse)
} catch (e: IllegalArgumentException) {
actionListener.onFailure(OpenSearchStatusException("Policy not found", RestStatus.NOT_FOUND))
return
}
if (!userHasPermissionForResource(user, policy.user, filterByEnabled, "policy", request.policyID, actionListener)) {
return
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import org.opensearch.tasks.Task
import org.opensearch.transport.TransportService
import java.lang.Exception

@Suppress("ReturnCount")
class TransportDeleteRollupAction @Inject constructor(
transportService: TransportService,
val client: Client,
Expand Down Expand Up @@ -84,12 +85,7 @@ class TransportDeleteRollupAction @Inject constructor(

fun start() {
client.threadPool().threadContext.stashContext().use {
if (!filterByEnabled || user == null) {
// security is disabled or filter by is disabled
delete()
} else {
getRollup()
}
getRollup()
}
}

Expand All @@ -104,8 +100,14 @@ class TransportDeleteRollupAction @Inject constructor(
return
}

val rollup: Rollup = parseRollup(response, xContentRegistry)
if (!userHasPermissionForResource(user, rollup.user, filterByEnabled, "rollup", request.id(), actionListener)) {
val rollup: Rollup?
try {
rollup = parseRollup(response, xContentRegistry)
} catch (e: IllegalArgumentException) {
actionListener.onFailure(OpenSearchStatusException("Rollup ${request.id()} is not found", RestStatus.NOT_FOUND))
return
}
if (!userHasPermissionForResource(user, rollup.user, filterByEnabled, "rollup", rollup.id, actionListener)) {
return
} else {
delete()
Expand Down
Loading

0 comments on commit 11a1f73

Please sign in to comment.