Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add listenable TransportRequestHandler in TransportNodesAction #15166

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Changed
- Indexed IP field supports `terms_query` with more than 1025 IP masks [#16391](https://github.com/opensearch-project/OpenSearch/pull/16391)
- Make entries for dependencies from server/build.gradle to gradle version catalog ([#16707](https://github.com/opensearch-project/OpenSearch/pull/16707))
- Add listenable TransportRequestHandler in TransportNodesAction ([#15166](https://github.com/opensearch-project/OpenSearch/pull/15166))

### Deprecated
- Performing update operation with default pipeline or final pipeline is deprecated ([#16712](https://github.com/opensearch-project/OpenSearch/pull/16712))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,50 @@
transportService.registerRequestHandler(transportNodeAction, nodeExecutor, nodeRequest, new NodeTransportHandler());
}

/**
* @param actionName action name
* @param threadPool thread-pool
* @param clusterService cluster service
* @param transportService transport service
* @param actionFilters action filters
* @param request node request writer
* @param nodeRequest node request reader
* @param nodeExecutor executor to execute node action on
* @param finalExecutor executor to execute final collection of all responses on
* @param listenableHandler true if the handler should be a listenable handler
* @param nodeResponseClass class of the node responses
*/
protected TransportNodesAction(
String actionName,
ThreadPool threadPool,
ClusterService clusterService,
TransportService transportService,
ActionFilters actionFilters,
Writeable.Reader<NodesRequest> request,
Writeable.Reader<NodeRequest> nodeRequest,
String nodeExecutor,
String finalExecutor,
boolean listenableHandler,
Class<NodeResponse> nodeResponseClass
) {
super(actionName, transportService, actionFilters, request);
this.threadPool = threadPool;
this.clusterService = Objects.requireNonNull(clusterService);
this.transportService = Objects.requireNonNull(transportService);
this.nodeResponseClass = Objects.requireNonNull(nodeResponseClass);

this.transportNodeAction = actionName + "[n]";
this.finalExecutor = finalExecutor;
if (listenableHandler) {
transportService.registerRequestHandler(transportNodeAction, nodeExecutor, nodeRequest, new ListenableNodeTransportHandler());
} else {
transportService.registerRequestHandler(transportNodeAction, nodeExecutor, nodeRequest, new NodeTransportHandler());
}
}

/**
* Same as {@link #TransportNodesAction(String, ThreadPool, ClusterService, TransportService, ActionFilters, Writeable.Reader,
* Writeable.Reader, String, String, Class)} but executes final response collection on the transport thread except for when the final
* Writeable.Reader, String, String, boolean, Class)} but executes final response collection on the transport thread except for when the final
* node response is received from the local node, in which case {@code nodeExecutor} is used.
* This constructor should only be used for actions for which the creation of the final response is fast enough to be safely executed
* on a transport thread.
Expand All @@ -144,6 +185,7 @@
nodeRequest,
nodeExecutor,
ThreadPool.Names.SAME,
false,
nodeResponseClass
);
}
Expand Down Expand Up @@ -196,6 +238,8 @@

protected abstract NodeResponse nodeOperation(NodeRequest request);

protected void nodeOperation(NodeRequest request, ActionListener<NodeResponse> actionListener) {}

Check warning on line 241 in server/src/main/java/org/opensearch/action/support/nodes/TransportNodesAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/support/nodes/TransportNodesAction.java#L241

Added line #L241 was not covered by tests

protected NodeResponse nodeOperation(NodeRequest request, Task task) {
return nodeOperation(request);
}
Expand Down Expand Up @@ -335,4 +379,14 @@
}
}

class ListenableNodeTransportHandler implements TransportRequestHandler<NodeRequest> {

@Override
public void messageReceived(NodeRequest request, TransportChannel channel, Task task) {
ActionListener<NodeResponse> listener = ActionListener.wrap(channel::sendResponse, e -> {
TransportChannel.sendErrorResponse(channel, actionName, request, e);
});

Check warning on line 388 in server/src/main/java/org/opensearch/action/support/nodes/TransportNodesAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/support/nodes/TransportNodesAction.java#L387-L388

Added lines #L387 - L388 were not covered by tests
nodeOperation(request, listener);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,19 @@
import org.opensearch.cluster.node.DiscoveryNodeRole;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.indices.IndicesService;
import org.opensearch.node.NodeService;
import org.opensearch.tasks.Task;
import org.opensearch.telemetry.tracing.noop.NoopTracer;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.test.transport.CapturingTransport;
import org.opensearch.threadpool.TestThreadPool;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportChannel;
import org.opensearch.transport.TransportRequest;
import org.opensearch.transport.TransportService;
import org.junit.After;
Expand All @@ -74,9 +77,12 @@
import java.util.function.Supplier;
import java.util.stream.Collectors;

import org.mockito.ArgumentCaptor;

import static org.opensearch.test.ClusterServiceUtils.createClusterService;
import static org.opensearch.test.ClusterServiceUtils.setState;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;

public class TransportNodesActionTests extends OpenSearchTestCase {

Expand Down Expand Up @@ -198,6 +204,28 @@ public void testTransportNodesActionWithDiscoveryNodesReset() {
capturedTransportNodeRequestList.forEach(capturedRequest -> assertNull(capturedRequest.testNodesRequest.concreteNodes()));
}

public void testCreateTransportNodesActionWithListenableHandler() {
TransportNodesAction action = getListenableHandlerTestTransportNodesAction();
assertTrue(
transport.getRequestHandlers()
.getHandler(action.actionName + "[n]")
.getHandler() instanceof TransportNodesAction.ListenableNodeTransportHandler
);
}

public void testMessageReceivedInListenableNodeTransportHandler() throws Exception {
TransportNodesAction action = getListenableHandlerTestTransportNodesAction();
TransportChannel transportChannel = mock(TransportChannel.class);
transport.getRequestHandlers()
.getHandler(action.actionName + "[n]")
.getHandler()
.messageReceived(new TestNodeRequest(), transportChannel, mock(Task.class));
ArgumentCaptor<TestNodeResponse> argCaptor = ArgumentCaptor.forClass(TestNodeResponse.class);
verify(transportChannel).sendResponse(argCaptor.capture());
TestNodeResponse response = argCaptor.getValue();
assertNotNull(response);
}

private <T> List<T> mockList(Supplier<T> supplier, int size) {
List<T> failures = new ArrayList<>(size);
for (int i = 0; i < size; ++i) {
Expand Down Expand Up @@ -290,6 +318,19 @@ public TestTransportNodesAction getTestTransportNodesAction() {
);
}

public TestTransportNodesAction getListenableHandlerTestTransportNodesAction() {
return new TestTransportNodesAction(
THREAD_POOL,
clusterService,
transportService,
new ActionFilters(Collections.emptySet()),
TestNodesRequest::new,
TestNodeRequest::new,
ThreadPool.Names.SAME,
true
);
}

public DataNodesOnlyTransportNodesAction getDataNodesOnlyTransportNodesAction(TransportService transportService) {
return new DataNodesOnlyTransportNodesAction(
THREAD_POOL,
Expand Down Expand Up @@ -335,6 +376,31 @@ private static class TestTransportNodesAction extends TransportNodesAction<
);
}

TestTransportNodesAction(
ThreadPool threadPool,
ClusterService clusterService,
TransportService transportService,
ActionFilters actionFilters,
Writeable.Reader<TestNodesRequest> request,
Writeable.Reader<TestNodeRequest> nodeRequest,
String nodeExecutor,
boolean listenableHandler
) {
super(
"indices:admin/test",
threadPool,
clusterService,
transportService,
actionFilters,
request,
nodeRequest,
nodeExecutor,
nodeExecutor,
listenableHandler,
TestNodeResponse.class
);
}

@Override
protected TestNodesResponse newResponse(
TestNodesRequest request,
Expand All @@ -359,6 +425,11 @@ protected TestNodeResponse nodeOperation(TestNodeRequest request) {
return new TestNodeResponse();
}

@Override
protected void nodeOperation(TestNodeRequest request, ActionListener<TestNodeResponse> actionListener) {
actionListener.onResponse(new TestNodeResponse());
}

}

private static class DataNodesOnlyTransportNodesAction extends TestTransportNodesAction {
Expand Down
Loading