Skip to content

Commit

Permalink
Merge branch 'main' into flag
Browse files Browse the repository at this point in the history
  • Loading branch information
jdconrad committed Jan 29, 2025
2 parents 9746396 + afdd453 commit b053670
Show file tree
Hide file tree
Showing 11 changed files with 228 additions and 154 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/121124.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 121124
summary: Run `TransportGetEnrichPolicyAction` on local node
area: Ingest Node
type: enhancement
issues: []
6 changes: 6 additions & 0 deletions muted-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,12 @@ tests:
- class: org.elasticsearch.xpack.security.CoreWithSecurityClientYamlTestSuiteIT
method: test {yaml=nodes.stats/11_indices_metrics/Metric - blank for indices mappings}
issue: https://github.com/elastic/elasticsearch/issues/121238
- class: org.elasticsearch.xpack.security.CoreWithSecurityClientYamlTestSuiteIT
method: test {yaml=indices.get_alias/10_basic/Get aliases via /_alias/_all}
issue: https://github.com/elastic/elasticsearch/issues/121242
- class: org.elasticsearch.xpack.security.CoreWithSecurityClientYamlTestSuiteIT
method: test {yaml=cluster.stats/10_basic/Sparse vector stats}
issue: https://github.com/elastic/elasticsearch/issues/121246

# Examples:
#
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
"params": {
"master_timeout":{
"type":"time",
"description":"Timeout for processing on master node"
"description":"Timeout for waiting for new cluster state in case it is blocked"
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,14 @@
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.master.MasterNodeReadRequest;
import org.elasticsearch.action.support.local.LocalClusterStateRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.UpdateForV10;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
Expand All @@ -33,7 +37,7 @@ private GetEnrichPolicyAction() {
super(NAME);
}

public static class Request extends MasterNodeReadRequest<Request> {
public static class Request extends LocalClusterStateRequest {

private final List<String> names;

Expand All @@ -42,6 +46,11 @@ public Request(TimeValue masterNodeTimeout, String... names) {
this.names = List.of(names);
}

/**
* NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC we must remain able to read these requests until
* we no longer need to support calling this action remotely.
*/
@UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT)
public Request(StreamInput in) throws IOException {
super(in);
this.names = in.readStringCollectionAsImmutableList();
Expand All @@ -52,14 +61,13 @@ public ActionRequestValidationException validate() {
return null;
}

public List<String> getNames() {
return names;
@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
return new CancellableTask(id, type, action, "", parentTaskId, headers);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeStringCollection(names);
public List<String> getNames() {
return names;
}

@Override
Expand Down Expand Up @@ -89,10 +97,11 @@ public Response(Map<String, EnrichPolicy> policies) {
.collect(Collectors.toList());
}

public Response(StreamInput in) throws IOException {
policies = in.readCollectionAsList(EnrichPolicy.NamedPolicy::new);
}

/**
* NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC we must remain able to write these responses until
* we no longer need to support calling this action remotely.
*/
@UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT)
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeCollection(policies);
Expand Down
1 change: 1 addition & 0 deletions x-pack/plugin/enrich/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ dependencies {
testImplementation project(path: ':modules:legacy-geo')
testImplementation project(xpackModule('spatial'))
testImplementation(testArtifact(project(xpackModule('monitoring'))))
internalClusterTestImplementation project(':modules:rest-root')
}

addQaCheckDependencies(project)
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.enrich;

import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.elasticsearch.action.support.CancellableActionTestPlugin;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.RefCountingListener;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.root.MainRestPlugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.rest.ObjectPath;
import org.elasticsearch.transport.netty4.Netty4Plugin;
import org.elasticsearch.xpack.core.enrich.action.GetEnrichPolicyAction;

import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

import static org.elasticsearch.action.support.ActionTestUtils.wrapAsRestResponseListener;
import static org.elasticsearch.test.TaskAssertions.assertAllTasksHaveFinished;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.oneOf;

public class EnrichRestActionCancellationIT extends ESIntegTestCase {

@Override
protected boolean addMockHttpTransport() {
return false; // enable http
}

@Override
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal, otherSettings))
.put(NetworkModule.TRANSPORT_TYPE_KEY, Netty4Plugin.NETTY_TRANSPORT_NAME)
.put(NetworkModule.HTTP_TYPE_KEY, Netty4Plugin.NETTY_HTTP_TRANSPORT_NAME)
.build();
}

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return List.of(getTestTransportPlugin(), MainRestPlugin.class, CancellableActionTestPlugin.class, EnrichPlugin.class);
}

public void testGetEnrichPolicyCancellation() throws IOException {
runRestActionCancellationTest(new Request(HttpGet.METHOD_NAME, "/_enrich/policy"), GetEnrichPolicyAction.NAME);
}

private void runRestActionCancellationTest(Request request, String actionName) {
final var node = usually() ? internalCluster().getRandomNodeName() : internalCluster().startCoordinatingOnlyNode(Settings.EMPTY);

try (
var restClient = createRestClient(node);
var capturingAction = CancellableActionTestPlugin.capturingActionOnNode(actionName, node)
) {
final var responseFuture = new PlainActionFuture<Response>();
final var restInvocation = restClient.performRequestAsync(request, wrapAsRestResponseListener(responseFuture));

if (randomBoolean()) {
// cancel by aborting the REST request
capturingAction.captureAndCancel(restInvocation::cancel);
expectThrows(ExecutionException.class, CancellationException.class, () -> responseFuture.get(10, TimeUnit.SECONDS));
} else {
// cancel via the task management API
final var cancelFuture = new PlainActionFuture<Void>();
capturingAction.captureAndCancel(
() -> SubscribableListener

.<ObjectPath>newForked(
l -> restClient.performRequestAsync(
getListTasksRequest(node, actionName),
wrapAsRestResponseListener(l.map(ObjectPath::createFromResponse))
)
)

.<Void>andThen((l, listTasksResponse) -> {
final var taskCount = listTasksResponse.evaluateArraySize("tasks");
assertThat(taskCount, greaterThan(0));
try (var listeners = new RefCountingListener(l)) {
for (int i = 0; i < taskCount; i++) {
final var taskPrefix = "tasks." + i + ".";
assertTrue(listTasksResponse.evaluate(taskPrefix + "cancellable"));
assertFalse(listTasksResponse.evaluate(taskPrefix + "cancelled"));
restClient.performRequestAsync(
getCancelTaskRequest(
listTasksResponse.evaluate(taskPrefix + "node"),
listTasksResponse.evaluate(taskPrefix + "id")
),
wrapAsRestResponseListener(listeners.acquire(EnrichRestActionCancellationIT::assertOK))
);
}
}
})

.addListener(cancelFuture)
);
cancelFuture.get(10, TimeUnit.SECONDS);
expectThrows(Exception.class, () -> responseFuture.get(10, TimeUnit.SECONDS));
}

assertAllTasksHaveFinished(actionName);
} catch (Exception e) {
fail(e);
}
}

private static Request getListTasksRequest(String taskNode, String actionName) {
final var listTasksRequest = new Request(HttpGet.METHOD_NAME, "/_tasks");
listTasksRequest.addParameter("nodes", taskNode);
listTasksRequest.addParameter("actions", actionName);
listTasksRequest.addParameter("group_by", "none");
return listTasksRequest;
}

private static Request getCancelTaskRequest(String taskNode, int taskId) {
final var cancelTaskRequest = new Request(HttpPost.METHOD_NAME, Strings.format("/_tasks/%s:%d/_cancel", taskNode, taskId));
cancelTaskRequest.addParameter("wait_for_completion", null);
return cancelTaskRequest;
}

public static void assertOK(Response response) {
assertThat(response.getStatusLine().getStatusCode(), oneOf(200, 201));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,17 @@

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.TransportMasterNodeReadAction;
import org.elasticsearch.action.support.ChannelActionListener;
import org.elasticsearch.action.support.local.TransportLocalClusterStateAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.UpdateForV10;
import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
import org.elasticsearch.xpack.core.enrich.action.GetEnrichPolicyAction;
Expand All @@ -26,32 +27,38 @@
import java.util.HashMap;
import java.util.Map;

public class TransportGetEnrichPolicyAction extends TransportMasterNodeReadAction<
public class TransportGetEnrichPolicyAction extends TransportLocalClusterStateAction<
GetEnrichPolicyAction.Request,
GetEnrichPolicyAction.Response> {

/**
* NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC it must be registered with the TransportService until
* we no longer need to support calling this action remotely.
*/
@UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT)
@SuppressWarnings("this-escape")
@Inject
public TransportGetEnrichPolicyAction(
TransportService transportService,
ClusterService clusterService,
ThreadPool threadPool,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver
) {
public TransportGetEnrichPolicyAction(TransportService transportService, ClusterService clusterService, ActionFilters actionFilters) {
super(
GetEnrichPolicyAction.NAME,
transportService,
clusterService,
threadPool,
actionFilters,
GetEnrichPolicyAction.Request::new,
GetEnrichPolicyAction.Response::new,
transportService.getTaskManager(),
clusterService,
EsExecutors.DIRECT_EXECUTOR_SERVICE
);

transportService.registerRequestHandler(
actionName,
executor,
false,
true,
GetEnrichPolicyAction.Request::new,
(request, channel, task) -> executeDirect(task, request, new ChannelActionListener<>(channel))
);
}

@Override
protected void masterOperation(
protected void localClusterStateOperation(
Task task,
GetEnrichPolicyAction.Request request,
ClusterState state,
Expand All @@ -71,6 +78,7 @@ protected void masterOperation(
}
}
}
((CancellableTask) task).ensureNotCancelled();
listener.onResponse(new GetEnrichPolicyAction.Response(policies));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.rest.RestUtils;
import org.elasticsearch.rest.Scope;
import org.elasticsearch.rest.ServerlessScope;
import org.elasticsearch.rest.action.RestCancellableNodeClient;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.xpack.core.enrich.action.GetEnrichPolicyAction;

Expand All @@ -39,6 +40,10 @@ protected RestChannelConsumer prepareRequest(final RestRequest restRequest, fina
RestUtils.getMasterNodeTimeout(restRequest),
Strings.splitStringByCommaToArray(restRequest.param("name"))
);
return channel -> client.execute(GetEnrichPolicyAction.INSTANCE, request, new RestToXContentListener<>(channel));
return channel -> new RestCancellableNodeClient(client, restRequest.getHttpChannel()).execute(
GetEnrichPolicyAction.INSTANCE,
request,
new RestToXContentListener<>(channel)
);
}
}

This file was deleted.

Loading

0 comments on commit b053670

Please sign in to comment.