From 138702fa0199d6df08716905cf0cb9352701f53c Mon Sep 17 00:00:00 2001 From: Dimitris Athanasiou Date: Tue, 30 Nov 2021 12:21:56 +0200 Subject: [PATCH 1/2] [ML] Parent datafeed actions to the datafeed's persistent task The vast majority of a datafeed's actions are executed from the data extractor. This includes the heaviest actions which are the searches. This commit passes a `ParentTaskAssigningClient` to `DataExtractorFactory.create` which ensures the client used by any extractor will be setting the corresponding task id: the action task id for preview datafeed and the master operation stage of the start datafeed action, and the persistent task id for the datafeed operations after it has started. --- .../action/TransportPreviewDatafeedAction.java | 18 ++++++++++++++---- .../action/TransportStartDatafeedAction.java | 8 +++++--- .../xpack/ml/datafeed/DatafeedJobBuilder.java | 13 +++++++++---- 3 files changed, 28 insertions(+), 11 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPreviewDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPreviewDatafeedAction.java index 70f6e18d0dc19..f00608a92808c 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPreviewDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPreviewDatafeedAction.java @@ -13,6 +13,8 @@ import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.client.Client; +import org.elasticsearch.client.ParentTaskAssigningClient; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; @@ -53,6 +55,7 @@ public class TransportPreviewDatafeedAction extends HandledTransportAction listener) { ActionListener datafeedConfigActionListener = ActionListener.wrap(datafeedConfig -> { if (request.getJobConfig() != null) { - previewDatafeed(datafeedConfig, request.getJobConfig().build(new Date()), listener); + previewDatafeed(task, datafeedConfig, request.getJobConfig().build(new Date()), listener); return; } jobConfigProvider.getJob( datafeedConfig.getJobId(), - ActionListener.wrap(jobBuilder -> previewDatafeed(datafeedConfig, jobBuilder.build(), listener), listener::onFailure) + ActionListener.wrap(jobBuilder -> previewDatafeed(task, datafeedConfig, jobBuilder.build(), listener), listener::onFailure) ); }, listener::onFailure); if (request.getDatafeedConfig() != null) { @@ -102,7 +107,12 @@ protected void doExecute(Task task, PreviewDatafeedAction.Request request, Actio } } - private void previewDatafeed(DatafeedConfig datafeedConfig, Job job, ActionListener listener) { + private void previewDatafeed( + Task task, + DatafeedConfig datafeedConfig, + Job job, + ActionListener listener + ) { DatafeedConfig.Builder previewDatafeedBuilder = buildPreviewDatafeed(datafeedConfig); useSecondaryAuthIfAvailable(securityContext, () -> { previewDatafeedBuilder.setHeaders(filterSecurityHeaders(threadPool.getThreadContext().getHeaders())); @@ -111,7 +121,7 @@ private void previewDatafeed(DatafeedConfig datafeedConfig, Job job, ActionListe // requesting the preview doesn't have permission to search the relevant indices. DatafeedConfig previewDatafeedConfig = previewDatafeedBuilder.build(); DataExtractorFactory.create( - client, + new ParentTaskAssigningClient(client, clusterService.localNode(), task), previewDatafeedConfig, job, xContentRegistry, diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java index 2f51c6fab26ba..62adb1ff89f70 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java @@ -16,6 +16,7 @@ import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.client.Client; +import org.elasticsearch.client.ParentTaskAssigningClient; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; @@ -251,7 +252,7 @@ public void onFailure(Exception e) { remoteAliases, (cn) -> remoteClusterService.getConnection(cn).getVersion() ); - createDataExtractor(job, datafeedConfigHolder.get(), params, waitForTaskListener); + createDataExtractor(task, job, datafeedConfigHolder.get(), params, waitForTaskListener); } }, e -> listener.onFailure( @@ -264,7 +265,7 @@ public void onFailure(Exception e) { ) ); } else { - createDataExtractor(job, datafeedConfigHolder.get(), params, waitForTaskListener); + createDataExtractor(task, job, datafeedConfigHolder.get(), params, waitForTaskListener); } }; @@ -343,13 +344,14 @@ static void checkRemoteClusterVersions( /** Creates {@link DataExtractorFactory} solely for the purpose of validation i.e. verifying that it can be created. */ private void createDataExtractor( + Task task, Job job, DatafeedConfig datafeed, StartDatafeedAction.DatafeedParams params, ActionListener> listener ) { DataExtractorFactory.create( - client, + new ParentTaskAssigningClient(client, clusterService.localNode(), task), datafeed, job, xContentRegistry, diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilder.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilder.java index d2f8d08de294e..f45404a574efc 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilder.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilder.java @@ -44,7 +44,7 @@ public class DatafeedJobBuilder { private final Supplier currentTimeSupplier; private final JobResultsPersister jobResultsPersister; private final boolean remoteClusterClient; - private final String nodeName; + private final ClusterService clusterService; private volatile long delayedDataCheckFreq; @@ -65,8 +65,8 @@ public DatafeedJobBuilder( this.currentTimeSupplier = Objects.requireNonNull(currentTimeSupplier); this.jobResultsPersister = Objects.requireNonNull(jobResultsPersister); this.remoteClusterClient = DiscoveryNode.isRemoteClusterClient(settings); - this.nodeName = clusterService.getNodeName(); this.delayedDataCheckFreq = DELAYED_DATA_CHECK_FREQ.get(settings).millis(); + this.clusterService = Objects.requireNonNull(clusterService); clusterService.getClusterSettings().addSettingsUpdateConsumer(DELAYED_DATA_CHECK_FREQ, this::setDelayedDataCheckFreq); } @@ -75,7 +75,7 @@ private void setDelayedDataCheckFreq(TimeValue value) { } void build(TransportStartDatafeedAction.DatafeedTask task, DatafeedContext context, ActionListener listener) { - final ParentTaskAssigningClient parentTaskAssigningClient = new ParentTaskAssigningClient(client, task.getParentTaskId()); + final ParentTaskAssigningClient parentTaskAssigningClient = new ParentTaskAssigningClient(client, clusterService.localNode(), task); final DatafeedConfig datafeedConfig = context.getDatafeedConfig(); final Job job = context.getJob(); final long latestFinalBucketEndMs = context.getRestartTimeInfo().getLatestFinalBucketTimeMs() == null @@ -155,7 +155,12 @@ private void checkRemoteIndicesAreAvailable(DatafeedConfig datafeedConfig) { List remoteIndices = RemoteClusterLicenseChecker.remoteIndices(datafeedConfig.getIndices()); if (remoteIndices.isEmpty() == false) { throw ExceptionsHelper.badRequestException( - Messages.getMessage(Messages.DATAFEED_NEEDS_REMOTE_CLUSTER_SEARCH, datafeedConfig.getId(), remoteIndices, nodeName) + Messages.getMessage( + Messages.DATAFEED_NEEDS_REMOTE_CLUSTER_SEARCH, + datafeedConfig.getId(), + remoteIndices, + clusterService.getNodeName() + ) ); } } From 521a05603a5c4dae363cfccbd6b17251d48996c2 Mon Sep 17 00:00:00 2001 From: Dimitris Athanasiou Date: Tue, 30 Nov 2021 14:54:17 +0200 Subject: [PATCH 2/2] Fix test --- .../ml/datafeed/DatafeedJobBuilderTests.java | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilderTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilderTests.java index a99f43a7b446c..7c52b8f842bfd 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilderTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilderTests.java @@ -6,8 +6,14 @@ */ package org.elasticsearch.xpack.ml.datafeed; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlocks; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.OperationRouting; import org.elasticsearch.cluster.service.ClusterApplierService; import org.elasticsearch.cluster.service.ClusterService; @@ -38,6 +44,8 @@ import java.util.HashSet; import java.util.concurrent.atomic.AtomicBoolean; +import static java.util.Collections.emptyMap; +import static java.util.Collections.emptySet; import static org.elasticsearch.test.NodeRoles.nonRemoteClusterClientNode; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; @@ -78,11 +86,25 @@ public void init() { ) ) ); + final DiscoveryNode localNode = new DiscoveryNode( + "test_node", + buildNewFakeTransportAddress(), + emptyMap(), + emptySet(), + Version.CURRENT + ); clusterService = new ClusterService( Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), "test_node").build(), clusterSettings, threadPool ); + clusterService.getClusterApplierService() + .setInitialState( + ClusterState.builder(new ClusterName("DatafeedJobBuilderTests")) + .nodes(DiscoveryNodes.builder().add(localNode).localNodeId(localNode.getId()).masterNodeId(localNode.getId())) + .blocks(ClusterBlocks.EMPTY_CLUSTER_BLOCK) + .build() + ); datafeedJobBuilder = new DatafeedJobBuilder( client,