diff --git a/docs/changelog/121124.yaml b/docs/changelog/121124.yaml new file mode 100644 index 0000000000000..066145386ecb4 --- /dev/null +++ b/docs/changelog/121124.yaml @@ -0,0 +1,5 @@ +pr: 121124 +summary: Run `TransportGetEnrichPolicyAction` on local node +area: Ingest Node +type: enhancement +issues: [] diff --git a/muted-tests.yml b/muted-tests.yml index 8d9bdff3d2572..440a67256ac45 100644 --- a/muted-tests.yml +++ b/muted-tests.yml @@ -331,6 +331,12 @@ tests: - class: org.elasticsearch.xpack.security.CoreWithSecurityClientYamlTestSuiteIT method: test {yaml=nodes.stats/11_indices_metrics/Metric - blank for indices mappings} issue: https://github.com/elastic/elasticsearch/issues/121238 +- class: org.elasticsearch.xpack.security.CoreWithSecurityClientYamlTestSuiteIT + method: test {yaml=indices.get_alias/10_basic/Get aliases via /_alias/_all} + issue: https://github.com/elastic/elasticsearch/issues/121242 +- class: org.elasticsearch.xpack.security.CoreWithSecurityClientYamlTestSuiteIT + method: test {yaml=cluster.stats/10_basic/Sparse vector stats} + issue: https://github.com/elastic/elasticsearch/issues/121246 # Examples: # diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/enrich.get_policy.json b/rest-api-spec/src/main/resources/rest-api-spec/api/enrich.get_policy.json index aed7397877393..e735a75f67ee9 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/enrich.get_policy.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/enrich.get_policy.json @@ -30,7 +30,7 @@ "params": { "master_timeout":{ "type":"time", - "description":"Timeout for processing on master node" + "description":"Timeout for waiting for new cluster state in case it is blocked" } } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/action/GetEnrichPolicyAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/action/GetEnrichPolicyAction.java index 7f138dec7ee23..ae02dc781e0dd 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/action/GetEnrichPolicyAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/action/GetEnrichPolicyAction.java @@ -9,10 +9,14 @@ import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.ActionType; -import org.elasticsearch.action.support.master.MasterNodeReadRequest; +import org.elasticsearch.action.support.local.LocalClusterStateRequest; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.core.UpdateForV10; +import org.elasticsearch.tasks.CancellableTask; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.xcontent.ToXContentObject; import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xpack.core.enrich.EnrichPolicy; @@ -33,7 +37,7 @@ private GetEnrichPolicyAction() { super(NAME); } - public static class Request extends MasterNodeReadRequest { + public static class Request extends LocalClusterStateRequest { private final List names; @@ -42,6 +46,11 @@ public Request(TimeValue masterNodeTimeout, String... names) { this.names = List.of(names); } + /** + * NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC we must remain able to read these requests until + * we no longer need to support calling this action remotely. + */ + @UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT) public Request(StreamInput in) throws IOException { super(in); this.names = in.readStringCollectionAsImmutableList(); @@ -52,14 +61,13 @@ public ActionRequestValidationException validate() { return null; } - public List getNames() { - return names; + @Override + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + return new CancellableTask(id, type, action, "", parentTaskId, headers); } - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeStringCollection(names); + public List getNames() { + return names; } @Override @@ -89,10 +97,11 @@ public Response(Map policies) { .collect(Collectors.toList()); } - public Response(StreamInput in) throws IOException { - policies = in.readCollectionAsList(EnrichPolicy.NamedPolicy::new); - } - + /** + * NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC we must remain able to write these responses until + * we no longer need to support calling this action remotely. + */ + @UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT) @Override public void writeTo(StreamOutput out) throws IOException { out.writeCollection(policies); diff --git a/x-pack/plugin/enrich/build.gradle b/x-pack/plugin/enrich/build.gradle index 352b7a3e64171..46972578ae1fc 100644 --- a/x-pack/plugin/enrich/build.gradle +++ b/x-pack/plugin/enrich/build.gradle @@ -19,6 +19,7 @@ dependencies { testImplementation project(path: ':modules:legacy-geo') testImplementation project(xpackModule('spatial')) testImplementation(testArtifact(project(xpackModule('monitoring')))) + internalClusterTestImplementation project(':modules:rest-root') } addQaCheckDependencies(project) diff --git a/x-pack/plugin/enrich/src/internalClusterTest/java/org/elasticsearch/xpack/enrich/EnrichRestActionCancellationIT.java b/x-pack/plugin/enrich/src/internalClusterTest/java/org/elasticsearch/xpack/enrich/EnrichRestActionCancellationIT.java new file mode 100644 index 0000000000000..a75dd26eaceeb --- /dev/null +++ b/x-pack/plugin/enrich/src/internalClusterTest/java/org/elasticsearch/xpack/enrich/EnrichRestActionCancellationIT.java @@ -0,0 +1,141 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.enrich; + +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.methods.HttpPost; +import org.elasticsearch.action.support.CancellableActionTestPlugin; +import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.action.support.RefCountingListener; +import org.elasticsearch.action.support.SubscribableListener; +import org.elasticsearch.client.Request; +import org.elasticsearch.client.Response; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.network.NetworkModule; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.rest.root.MainRestPlugin; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.rest.ObjectPath; +import org.elasticsearch.transport.netty4.Netty4Plugin; +import org.elasticsearch.xpack.core.enrich.action.GetEnrichPolicyAction; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import static org.elasticsearch.action.support.ActionTestUtils.wrapAsRestResponseListener; +import static org.elasticsearch.test.TaskAssertions.assertAllTasksHaveFinished; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.oneOf; + +public class EnrichRestActionCancellationIT extends ESIntegTestCase { + + @Override + protected boolean addMockHttpTransport() { + return false; // enable http + } + + @Override + protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) { + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal, otherSettings)) + .put(NetworkModule.TRANSPORT_TYPE_KEY, Netty4Plugin.NETTY_TRANSPORT_NAME) + .put(NetworkModule.HTTP_TYPE_KEY, Netty4Plugin.NETTY_HTTP_TRANSPORT_NAME) + .build(); + } + + @Override + protected Collection> nodePlugins() { + return List.of(getTestTransportPlugin(), MainRestPlugin.class, CancellableActionTestPlugin.class, EnrichPlugin.class); + } + + public void testGetEnrichPolicyCancellation() throws IOException { + runRestActionCancellationTest(new Request(HttpGet.METHOD_NAME, "/_enrich/policy"), GetEnrichPolicyAction.NAME); + } + + private void runRestActionCancellationTest(Request request, String actionName) { + final var node = usually() ? internalCluster().getRandomNodeName() : internalCluster().startCoordinatingOnlyNode(Settings.EMPTY); + + try ( + var restClient = createRestClient(node); + var capturingAction = CancellableActionTestPlugin.capturingActionOnNode(actionName, node) + ) { + final var responseFuture = new PlainActionFuture(); + final var restInvocation = restClient.performRequestAsync(request, wrapAsRestResponseListener(responseFuture)); + + if (randomBoolean()) { + // cancel by aborting the REST request + capturingAction.captureAndCancel(restInvocation::cancel); + expectThrows(ExecutionException.class, CancellationException.class, () -> responseFuture.get(10, TimeUnit.SECONDS)); + } else { + // cancel via the task management API + final var cancelFuture = new PlainActionFuture(); + capturingAction.captureAndCancel( + () -> SubscribableListener + + .newForked( + l -> restClient.performRequestAsync( + getListTasksRequest(node, actionName), + wrapAsRestResponseListener(l.map(ObjectPath::createFromResponse)) + ) + ) + + .andThen((l, listTasksResponse) -> { + final var taskCount = listTasksResponse.evaluateArraySize("tasks"); + assertThat(taskCount, greaterThan(0)); + try (var listeners = new RefCountingListener(l)) { + for (int i = 0; i < taskCount; i++) { + final var taskPrefix = "tasks." + i + "."; + assertTrue(listTasksResponse.evaluate(taskPrefix + "cancellable")); + assertFalse(listTasksResponse.evaluate(taskPrefix + "cancelled")); + restClient.performRequestAsync( + getCancelTaskRequest( + listTasksResponse.evaluate(taskPrefix + "node"), + listTasksResponse.evaluate(taskPrefix + "id") + ), + wrapAsRestResponseListener(listeners.acquire(EnrichRestActionCancellationIT::assertOK)) + ); + } + } + }) + + .addListener(cancelFuture) + ); + cancelFuture.get(10, TimeUnit.SECONDS); + expectThrows(Exception.class, () -> responseFuture.get(10, TimeUnit.SECONDS)); + } + + assertAllTasksHaveFinished(actionName); + } catch (Exception e) { + fail(e); + } + } + + private static Request getListTasksRequest(String taskNode, String actionName) { + final var listTasksRequest = new Request(HttpGet.METHOD_NAME, "/_tasks"); + listTasksRequest.addParameter("nodes", taskNode); + listTasksRequest.addParameter("actions", actionName); + listTasksRequest.addParameter("group_by", "none"); + return listTasksRequest; + } + + private static Request getCancelTaskRequest(String taskNode, int taskId) { + final var cancelTaskRequest = new Request(HttpPost.METHOD_NAME, Strings.format("/_tasks/%s:%d/_cancel", taskNode, taskId)); + cancelTaskRequest.addParameter("wait_for_completion", null); + return cancelTaskRequest; + } + + public static void assertOK(Response response) { + assertThat(response.getStatusLine().getStatusCode(), oneOf(200, 201)); + } + +} diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/TransportGetEnrichPolicyAction.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/TransportGetEnrichPolicyAction.java index cff0ff60c599b..3af102e481e38 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/TransportGetEnrichPolicyAction.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/TransportGetEnrichPolicyAction.java @@ -8,16 +8,17 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.master.TransportMasterNodeReadAction; +import org.elasticsearch.action.support.ChannelActionListener; +import org.elasticsearch.action.support.local.TransportLocalClusterStateAction; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; -import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.core.UpdateForV10; import org.elasticsearch.injection.guice.Inject; +import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.Task; -import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.enrich.EnrichPolicy; import org.elasticsearch.xpack.core.enrich.action.GetEnrichPolicyAction; @@ -26,32 +27,38 @@ import java.util.HashMap; import java.util.Map; -public class TransportGetEnrichPolicyAction extends TransportMasterNodeReadAction< +public class TransportGetEnrichPolicyAction extends TransportLocalClusterStateAction< GetEnrichPolicyAction.Request, GetEnrichPolicyAction.Response> { + /** + * NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC it must be registered with the TransportService until + * we no longer need to support calling this action remotely. + */ + @UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT) + @SuppressWarnings("this-escape") @Inject - public TransportGetEnrichPolicyAction( - TransportService transportService, - ClusterService clusterService, - ThreadPool threadPool, - ActionFilters actionFilters, - IndexNameExpressionResolver indexNameExpressionResolver - ) { + public TransportGetEnrichPolicyAction(TransportService transportService, ClusterService clusterService, ActionFilters actionFilters) { super( GetEnrichPolicyAction.NAME, - transportService, - clusterService, - threadPool, actionFilters, - GetEnrichPolicyAction.Request::new, - GetEnrichPolicyAction.Response::new, + transportService.getTaskManager(), + clusterService, EsExecutors.DIRECT_EXECUTOR_SERVICE ); + + transportService.registerRequestHandler( + actionName, + executor, + false, + true, + GetEnrichPolicyAction.Request::new, + (request, channel, task) -> executeDirect(task, request, new ChannelActionListener<>(channel)) + ); } @Override - protected void masterOperation( + protected void localClusterStateOperation( Task task, GetEnrichPolicyAction.Request request, ClusterState state, @@ -71,6 +78,7 @@ protected void masterOperation( } } } + ((CancellableTask) task).ensureNotCancelled(); listener.onResponse(new GetEnrichPolicyAction.Response(policies)); } diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/rest/RestGetEnrichPolicyAction.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/rest/RestGetEnrichPolicyAction.java index 2fb9f63c1eb4a..4796bfcdbfeb0 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/rest/RestGetEnrichPolicyAction.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/rest/RestGetEnrichPolicyAction.java @@ -13,6 +13,7 @@ import org.elasticsearch.rest.RestUtils; import org.elasticsearch.rest.Scope; import org.elasticsearch.rest.ServerlessScope; +import org.elasticsearch.rest.action.RestCancellableNodeClient; import org.elasticsearch.rest.action.RestToXContentListener; import org.elasticsearch.xpack.core.enrich.action.GetEnrichPolicyAction; @@ -39,6 +40,10 @@ protected RestChannelConsumer prepareRequest(final RestRequest restRequest, fina RestUtils.getMasterNodeTimeout(restRequest), Strings.splitStringByCommaToArray(restRequest.param("name")) ); - return channel -> client.execute(GetEnrichPolicyAction.INSTANCE, request, new RestToXContentListener<>(channel)); + return channel -> new RestCancellableNodeClient(client, restRequest.getHttpChannel()).execute( + GetEnrichPolicyAction.INSTANCE, + request, + new RestToXContentListener<>(channel) + ); } } diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/GetEnrichPolicyActionRequestTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/GetEnrichPolicyActionRequestTests.java deleted file mode 100644 index 051eadac48467..0000000000000 --- a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/GetEnrichPolicyActionRequestTests.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ -package org.elasticsearch.xpack.enrich.action; - -import org.elasticsearch.common.io.stream.Writeable; -import org.elasticsearch.test.AbstractWireSerializingTestCase; -import org.elasticsearch.xpack.core.enrich.action.GetEnrichPolicyAction; - -public class GetEnrichPolicyActionRequestTests extends AbstractWireSerializingTestCase { - - @Override - protected GetEnrichPolicyAction.Request createTestInstance() { - return new GetEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, generateRandomStringArray(0, 4, false)); - } - - @Override - protected GetEnrichPolicyAction.Request mutateInstance(GetEnrichPolicyAction.Request instance) { - return null;// TODO implement https://github.com/elastic/elasticsearch/issues/25929 - } - - @Override - protected Writeable.Reader instanceReader() { - return GetEnrichPolicyAction.Request::new; - } -} diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/GetEnrichPolicyActionResponseTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/GetEnrichPolicyActionResponseTests.java deleted file mode 100644 index c46005163fa12..0000000000000 --- a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/GetEnrichPolicyActionResponseTests.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ -package org.elasticsearch.xpack.enrich.action; - -import org.elasticsearch.common.io.stream.Writeable; -import org.elasticsearch.test.AbstractXContentSerializingTestCase; -import org.elasticsearch.xcontent.XContentParser; -import org.elasticsearch.xcontent.XContentType; -import org.elasticsearch.xpack.core.enrich.EnrichPolicy; -import org.elasticsearch.xpack.core.enrich.action.GetEnrichPolicyAction; - -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; - -import static org.elasticsearch.xpack.enrich.EnrichPolicyTests.assertEqualPolicies; -import static org.elasticsearch.xpack.enrich.EnrichPolicyTests.randomEnrichPolicy; -import static org.hamcrest.core.IsEqual.equalTo; - -public class GetEnrichPolicyActionResponseTests extends AbstractXContentSerializingTestCase { - - @Override - protected GetEnrichPolicyAction.Response doParseInstance(XContentParser parser) throws IOException { - Map policies = new HashMap<>(); - assert parser.nextToken() == XContentParser.Token.START_OBJECT; - assert parser.nextToken() == XContentParser.Token.FIELD_NAME; - assert parser.currentName().equals("policies"); - assert parser.nextToken() == XContentParser.Token.START_ARRAY; - - XContentParser.Token token; - while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) { - assert token == XContentParser.Token.START_OBJECT; - assert parser.nextToken() == XContentParser.Token.FIELD_NAME; - assert parser.currentName().equals("config"); - assert parser.nextToken() == XContentParser.Token.START_OBJECT; - EnrichPolicy.NamedPolicy policy = EnrichPolicy.NamedPolicy.fromXContent(parser); - policies.put(policy.getName(), policy.getPolicy()); - assert parser.nextToken() == XContentParser.Token.END_OBJECT; - } - - return new GetEnrichPolicyAction.Response(policies); - } - - @Override - protected GetEnrichPolicyAction.Response createTestInstance() { - Map items = new HashMap<>(); - for (int i = 0; i < randomIntBetween(0, 3); i++) { - EnrichPolicy policy = randomEnrichPolicy(XContentType.JSON); - items.put(randomAlphaOfLength(3), policy); - } - return new GetEnrichPolicyAction.Response(items); - } - - @Override - protected GetEnrichPolicyAction.Response mutateInstance(GetEnrichPolicyAction.Response instance) { - return null;// TODO implement https://github.com/elastic/elasticsearch/issues/25929 - } - - @Override - protected Writeable.Reader instanceReader() { - return GetEnrichPolicyAction.Response::new; - } - - @Override - protected void assertEqualInstances(GetEnrichPolicyAction.Response expectedInstance, GetEnrichPolicyAction.Response newInstance) { - assertNotSame(expectedInstance, newInstance); - // the tests shuffle around the policy query source xcontent type, so this is needed here - assertThat(expectedInstance.getPolicies().size(), equalTo(newInstance.getPolicies().size())); - // since the backing store is a treemap the list will be sorted so we can just check each - // instance is the same - for (int i = 0; i < expectedInstance.getPolicies().size(); i++) { - EnrichPolicy.NamedPolicy expected = expectedInstance.getPolicies().get(i); - EnrichPolicy.NamedPolicy newed = newInstance.getPolicies().get(i); - assertThat(expected.getName(), equalTo(newed.getName())); - assertEqualPolicies(expected.getPolicy(), newed.getPolicy()); - } - } -} diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/TransportGetEnrichPolicyActionTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/TransportGetEnrichPolicyActionTests.java index 6a3c1eb2555b1..448f6d42a992c 100644 --- a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/TransportGetEnrichPolicyActionTests.java +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/TransportGetEnrichPolicyActionTests.java @@ -10,6 +10,8 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionTestUtils; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.tasks.CancellableTask; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.xcontent.XContentType; import org.elasticsearch.xpack.core.enrich.EnrichPolicy; import org.elasticsearch.xpack.core.enrich.action.GetEnrichPolicyAction; @@ -17,6 +19,7 @@ import org.elasticsearch.xpack.enrich.EnrichPolicyLocks; import org.junit.After; +import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicReference; @@ -34,7 +37,8 @@ public void cleanupPolicies() throws InterruptedException { final CountDownLatch latch = new CountDownLatch(1); final AtomicReference reference = new AtomicReference<>(); final TransportGetEnrichPolicyAction transportAction = node().injector().getInstance(TransportGetEnrichPolicyAction.class); - ActionTestUtils.execute(transportAction, null, new GetEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT), new ActionListener<>() { + final var task = createTask(); + ActionTestUtils.execute(transportAction, task, new GetEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT), new ActionListener<>() { @Override public void onResponse(GetEnrichPolicyAction.Response response) { reference.set(response); @@ -43,7 +47,7 @@ public void onResponse(GetEnrichPolicyAction.Response response) { } public void onFailure(final Exception e) { - fail(); + fail(e); } }); latch.await(); @@ -74,7 +78,8 @@ public void testListPolicies() throws InterruptedException { final CountDownLatch latch = new CountDownLatch(1); final AtomicReference reference = new AtomicReference<>(); final TransportGetEnrichPolicyAction transportAction = node().injector().getInstance(TransportGetEnrichPolicyAction.class); - ActionTestUtils.execute(transportAction, null, new GetEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT), new ActionListener<>() { + final var task = createTask(); + ActionTestUtils.execute(transportAction, task, new GetEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT), new ActionListener<>() { @Override public void onResponse(GetEnrichPolicyAction.Response response) { reference.set(response); @@ -83,7 +88,7 @@ public void onResponse(GetEnrichPolicyAction.Response response) { } public void onFailure(final Exception e) { - fail(); + fail(e); } }); latch.await(); @@ -101,7 +106,8 @@ public void testListEmptyPolicies() throws InterruptedException { final CountDownLatch latch = new CountDownLatch(1); final AtomicReference reference = new AtomicReference<>(); final TransportGetEnrichPolicyAction transportAction = node().injector().getInstance(TransportGetEnrichPolicyAction.class); - ActionTestUtils.execute(transportAction, null, new GetEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT), new ActionListener<>() { + final var task = createTask(); + ActionTestUtils.execute(transportAction, task, new GetEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT), new ActionListener<>() { @Override public void onResponse(GetEnrichPolicyAction.Response response) { reference.set(response); @@ -110,7 +116,7 @@ public void onResponse(GetEnrichPolicyAction.Response response) { } public void onFailure(final Exception e) { - fail(); + fail(e); } }); latch.await(); @@ -137,7 +143,7 @@ public void testGetPolicy() throws InterruptedException { final TransportGetEnrichPolicyAction transportAction = node().injector().getInstance(TransportGetEnrichPolicyAction.class); ActionTestUtils.execute( transportAction, - null, + createTask(), new GetEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, name), new ActionListener<>() { @Override @@ -147,7 +153,7 @@ public void onResponse(GetEnrichPolicyAction.Response response) { } public void onFailure(final Exception e) { - fail(); + fail(e); } } ); @@ -184,7 +190,7 @@ public void testGetMultiplePolicies() throws InterruptedException { final TransportGetEnrichPolicyAction transportAction = node().injector().getInstance(TransportGetEnrichPolicyAction.class); ActionTestUtils.execute( transportAction, - null, + createTask(), new GetEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, name, anotherName), new ActionListener<>() { @Override @@ -194,7 +200,7 @@ public void onResponse(GetEnrichPolicyAction.Response response) { } public void onFailure(final Exception e) { - fail(); + fail(e); } } ); @@ -218,7 +224,7 @@ public void testGetPolicyThrowsError() throws InterruptedException { final TransportGetEnrichPolicyAction transportAction = node().injector().getInstance(TransportGetEnrichPolicyAction.class); ActionTestUtils.execute( transportAction, - null, + createTask(), new GetEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, "non-exists"), new ActionListener<>() { @Override @@ -228,7 +234,7 @@ public void onResponse(GetEnrichPolicyAction.Response response) { } public void onFailure(final Exception e) { - fail(); + fail(e); } } ); @@ -236,4 +242,8 @@ public void onFailure(final Exception e) { assertNotNull(reference.get()); assertThat(reference.get().getPolicies().size(), equalTo(0)); } + + private static CancellableTask createTask() { + return new CancellableTask(randomNonNegativeLong(), "test", GetEnrichPolicyAction.NAME, "", TaskId.EMPTY_TASK_ID, Map.of()); + } }