Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding replication (CCR) plugin interface and classes to common-utils #667

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.commons.replication

import org.opensearch.action.support.master.AcknowledgedResponse
import org.opensearch.client.node.NodeClient
import org.opensearch.commons.replication.action.ReplicationActions.UNFOLLOW_REPLICATION_ACTION_TYPE
import org.opensearch.commons.replication.action.StopIndexReplicationRequest
import org.opensearch.commons.utils.recreateObject
import org.opensearch.core.action.ActionListener
import org.opensearch.core.action.ActionResponse
import org.opensearch.core.common.io.stream.Writeable

/**
* Transport action plugin interfaces for the cross-cluster-replication plugin.
*/
object ReplicationPluginInterface {

/**
* Stop replication.
* @param client Node client for making transport action
* @param request The request object
* @param listener The listener for getting response
*/

fun stopReplication(
client: NodeClient,
request: StopIndexReplicationRequest,
listener: ActionListener<AcknowledgedResponse>
) {
return client.execute(
UNFOLLOW_REPLICATION_ACTION_TYPE,
request,
wrapActionListener(listener) { response ->
recreateObject(response) {
AcknowledgedResponse(it)
}
}
)
}

/**
* Wrap action listener on concrete response class by a new created one on ActionResponse.
* This is required because the response may be loaded by different classloader across plugins.
* The onResponse(ActionResponse) avoids type cast exception and give a chance to recreate
* the response object.
*/
@Suppress("UNCHECKED_CAST")
private fun <Response : AcknowledgedResponse> wrapActionListener(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the existing code in AlertingPluginInterface, I see we are doing <Response: BaseResponse>, does that have problem here? can we also use BaseResponse, instead of AcknowledgedResponse.

listener: ActionListener<Response>,
recreate: (Writeable) -> Response
): ActionListener<Response> {
return object : ActionListener<ActionResponse> {
override fun onResponse(response: ActionResponse) {
val recreated = response as? Response ?: recreate(response)
listener.onResponse(recreated)
}

override fun onFailure(exception: java.lang.Exception) {
listener.onFailure(exception)
}
} as ActionListener<Response>
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.commons.replication.action

import org.opensearch.action.ActionType
import org.opensearch.action.support.master.AcknowledgedResponse

/**
* Information related to the transport stop replication action for the Replication plugin
*/
object ReplicationActions {

/**
* Action names for stopping replication
* STOP_REPLICATION_ACTION_NAME: action used for _stop REST API
* UNFOLLOW_REPLICATION_ACTION_NAME: internal action used for inter-plugin communication i.e. by ism to invoke stop
* replication.
*/
const val STOP_REPLICATION_ACTION_NAME = "indices:admin/plugins/replication/index/stop"
const val UNFOLLOW_REPLICATION_ACTION_NAME = "indices:admin/plugins/replication/index/unfollow"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably can use some better name here, I'm thinking

indices:admin/plugins/replication/index/stop_from_ism 😅

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I agreee 😅
But I was wondering if stop_from_ism would limit it to being only from ism? and what if some other plugin in future intends to invoke stop-replication using these libs. 🤔
Would something like stop_from_transport / transport_stop / stop_from_plugin work?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, better to be intuitive for our user.

Looking at all the existing ones, I am thinking indices:transport/plugins/replication/index/stop
Hopefully transport is a common word for OpenSearch user.

https://github.com/opensearch-project/cross-cluster-replication/blob/d6e7636d42ef87cd483f7439d8a2cf43bf397727/HANDBOOK.md?plain=1#L108-L121

# Index Level Permissions

indices:admin/close
indices:admin/close[s]
indices:admin/create
indices:admin/mapping/put
indices:admin/open
indices:admin/plugins/replication/index/start
indices:admin/plugins/replication/index/stop
indices:data/read/plugins/replication/file_metadata
indices:data/write/index
indices:data/write/plugins/replication/changes
indices:data/write/replication
indices:monitor/stats

@cwperks to give some suggestion about the naming of the new transport action
This new transport action being added here is only supposed to be called from ISM plugin, and doesn't have any REST API associated with it. ISM user will need this permission to do the stop replication action using ISM.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bowenlan-amzn indices:transport is not one of the valid action prefixes tracked in core. You can introduce a new action prefix by opening a PR in core to add to that list.

I see there is indices:internal in the list of prefixes. Is that prefix reserved for actions defined in OpenSearch core, or is it possible for a plugin to use that prefix as well?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

indices:internal makes sense here

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will try with indices:internal/plugins/replication/index/stop then 🙂
Thank you @cwperks and @bowenlan-amzn for these inputs.


/**
* Stop replication transport action types.
*/
val STOP_REPLICATION_ACTION_TYPE =
bowenlan-amzn marked this conversation as resolved.
Show resolved Hide resolved
ActionType(STOP_REPLICATION_ACTION_NAME, ::AcknowledgedResponse)
val UNFOLLOW_REPLICATION_ACTION_TYPE =
ActionType(UNFOLLOW_REPLICATION_ACTION_NAME, ::AcknowledgedResponse)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package org.opensearch.commons.replication.action

import org.opensearch.action.ActionRequestValidationException
import org.opensearch.action.IndicesRequest
import org.opensearch.action.support.IndicesOptions
import org.opensearch.action.support.master.AcknowledgedRequest
import org.opensearch.core.common.io.stream.StreamInput
import org.opensearch.core.common.io.stream.StreamOutput
import org.opensearch.core.xcontent.ObjectParser
import org.opensearch.core.xcontent.ToXContent
import org.opensearch.core.xcontent.ToXContentObject
import org.opensearch.core.xcontent.XContentBuilder
import org.opensearch.core.xcontent.XContentParser

class StopIndexReplicationRequest :
AcknowledgedRequest<StopIndexReplicationRequest>, IndicesRequest.Replaceable, ToXContentObject {
lateinit var indexName: String
constructor(indexName: String) {
this.indexName = indexName
}

private constructor() {
}

constructor(inp: StreamInput) : super(inp) {
indexName = inp.readString()
}
companion object {
private val PARSER = ObjectParser<StopIndexReplicationRequest, Void>("StopReplicationRequestParser") {
StopIndexReplicationRequest()
}

fun fromXContent(parser: XContentParser, followerIndex: String): StopIndexReplicationRequest {
val stopIndexReplicationRequest = PARSER.parse(parser, null)
stopIndexReplicationRequest.indexName = followerIndex
return stopIndexReplicationRequest
}
}

override fun validate(): ActionRequestValidationException? {
return null
}

override fun indices(vararg indices: String?): IndicesRequest {
return this
}
override fun indices(): Array<String> {
return arrayOf(indexName)
}

override fun indicesOptions(): IndicesOptions {
return IndicesOptions.strictSingleIndexNoExpandForbidClosed()
}

override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
builder.startObject()
builder.field("indexName", indexName)
builder.endObject()
return builder
}

override fun writeTo(out: StreamOutput) {
super.writeTo(out)
out.writeString(indexName)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.commons.replication

import com.nhaarman.mockitokotlin2.whenever
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.extension.ExtendWith
import org.mockito.Answers
import org.mockito.ArgumentMatchers
import org.mockito.Mock
import org.mockito.Mockito
import org.mockito.junit.jupiter.MockitoExtension
import org.opensearch.action.ActionType
import org.opensearch.action.support.master.AcknowledgedResponse
import org.opensearch.client.node.NodeClient
import org.opensearch.commons.replication.action.StopIndexReplicationRequest
import org.opensearch.core.action.ActionListener

@Suppress("UNCHECKED_CAST")
@ExtendWith(MockitoExtension::class)
internal class ReplicationPluginInterfaceTests {

@Mock(answer = Answers.RETURNS_DEEP_STUBS)
private lateinit var client: NodeClient

@Test
fun stopReplication() {
val request = Mockito.mock(StopIndexReplicationRequest::class.java)
val response = AcknowledgedResponse(true)
val listener: ActionListener<AcknowledgedResponse> =
Mockito.mock(ActionListener::class.java) as ActionListener<AcknowledgedResponse>

Mockito.doAnswer {
(it.getArgument(2) as ActionListener<AcknowledgedResponse>)
.onResponse(response)
}.whenever(client).execute(Mockito.any(ActionType::class.java), Mockito.any(), Mockito.any())

ReplicationPluginInterface.stopReplication(client, request, listener)
Mockito.verify(listener, Mockito.times(1)).onResponse(ArgumentMatchers.eq(response))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure about what this test is checking, would you explain a bit

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. I've written this test on similar lines as done in NotificationsPluginInterfaceTests.kt and AlertingPluginInterfaceTests.kt

  • An AcknowledgedResponse response object is created with a value of true (expected response).
  • Have mocked the client object such that, when stopReplication() invokes client.execute(), it would trigger the onResponse method of the ActionListener with the successful response object created earlier.
  • The test runs stopReplication() and verifies that the onResponse() method of the listener was called exactly once with the provided response object as an argument.
  • This is to simulate a successful execution and verifies that the stopReplication method correctly handles the response.

Copy link
Member

@bowenlan-amzn bowenlan-amzn Jul 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explain

What I feel weird is we mock the client.execute to do listener.onResponse
Then we call stopReplication which just does client.execute, and verify listener.onResponse called once

In the end, it seems just testing stopReplication does client.execute exactly once 🤔
but it's fine, just curious what it actually test...

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.commons.replication.action

import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Assertions.assertNotNull
import org.junit.jupiter.api.Assertions.assertNull
import org.junit.jupiter.api.Test
import org.opensearch.commons.utils.recreateObject

internal class StopIndexReplicationRequestTests {
@Test
fun `Stop Replication request serialize and deserialize transport object should be equal`() {
val index = "test-idx"
val request = StopIndexReplicationRequest(index)
val recreatedRequest = recreateObject(request) { StopIndexReplicationRequest(it) }
assertNotNull(recreatedRequest)
assertEquals(request.indexName, recreatedRequest.indexName)
assertNull(recreatedRequest.validate())
}
}
Loading