Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML] Parent datafeed actions to the datafeed's persistent task #81143

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.ParentTaskAssigningClient;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -53,6 +55,7 @@ public class TransportPreviewDatafeedAction extends HandledTransportAction<Previ

private final ThreadPool threadPool;
private final Client client;
private final ClusterService clusterService;
private final JobConfigProvider jobConfigProvider;
private final DatafeedConfigProvider datafeedConfigProvider;
private final NamedXContentRegistry xContentRegistry;
Expand All @@ -65,13 +68,15 @@ public TransportPreviewDatafeedAction(
TransportService transportService,
ActionFilters actionFilters,
Client client,
ClusterService clusterService,
JobConfigProvider jobConfigProvider,
DatafeedConfigProvider datafeedConfigProvider,
NamedXContentRegistry xContentRegistry
) {
super(PreviewDatafeedAction.NAME, transportService, actionFilters, PreviewDatafeedAction.Request::new);
this.threadPool = threadPool;
this.client = client;
this.clusterService = clusterService;
this.jobConfigProvider = jobConfigProvider;
this.datafeedConfigProvider = datafeedConfigProvider;
this.xContentRegistry = xContentRegistry;
Expand All @@ -84,12 +89,12 @@ public TransportPreviewDatafeedAction(
protected void doExecute(Task task, PreviewDatafeedAction.Request request, ActionListener<PreviewDatafeedAction.Response> listener) {
ActionListener<DatafeedConfig> datafeedConfigActionListener = ActionListener.wrap(datafeedConfig -> {
if (request.getJobConfig() != null) {
previewDatafeed(datafeedConfig, request.getJobConfig().build(new Date()), listener);
previewDatafeed(task, datafeedConfig, request.getJobConfig().build(new Date()), listener);
return;
}
jobConfigProvider.getJob(
datafeedConfig.getJobId(),
ActionListener.wrap(jobBuilder -> previewDatafeed(datafeedConfig, jobBuilder.build(), listener), listener::onFailure)
ActionListener.wrap(jobBuilder -> previewDatafeed(task, datafeedConfig, jobBuilder.build(), listener), listener::onFailure)
);
}, listener::onFailure);
if (request.getDatafeedConfig() != null) {
Expand All @@ -102,7 +107,12 @@ protected void doExecute(Task task, PreviewDatafeedAction.Request request, Actio
}
}

private void previewDatafeed(DatafeedConfig datafeedConfig, Job job, ActionListener<PreviewDatafeedAction.Response> listener) {
private void previewDatafeed(
Task task,
DatafeedConfig datafeedConfig,
Job job,
ActionListener<PreviewDatafeedAction.Response> listener
) {
DatafeedConfig.Builder previewDatafeedBuilder = buildPreviewDatafeed(datafeedConfig);
useSecondaryAuthIfAvailable(securityContext, () -> {
previewDatafeedBuilder.setHeaders(filterSecurityHeaders(threadPool.getThreadContext().getHeaders()));
Expand All @@ -111,7 +121,7 @@ private void previewDatafeed(DatafeedConfig datafeedConfig, Job job, ActionListe
// requesting the preview doesn't have permission to search the relevant indices.
DatafeedConfig previewDatafeedConfig = previewDatafeedBuilder.build();
DataExtractorFactory.create(
client,
new ParentTaskAssigningClient(client, clusterService.localNode(), task),
previewDatafeedConfig,
job,
xContentRegistry,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.ParentTaskAssigningClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
Expand Down Expand Up @@ -251,7 +252,7 @@ public void onFailure(Exception e) {
remoteAliases,
(cn) -> remoteClusterService.getConnection(cn).getVersion()
);
createDataExtractor(job, datafeedConfigHolder.get(), params, waitForTaskListener);
createDataExtractor(task, job, datafeedConfigHolder.get(), params, waitForTaskListener);
}
},
e -> listener.onFailure(
Expand All @@ -264,7 +265,7 @@ public void onFailure(Exception e) {
)
);
} else {
createDataExtractor(job, datafeedConfigHolder.get(), params, waitForTaskListener);
createDataExtractor(task, job, datafeedConfigHolder.get(), params, waitForTaskListener);
}
};

Expand Down Expand Up @@ -343,13 +344,14 @@ static void checkRemoteClusterVersions(

/** Creates {@link DataExtractorFactory} solely for the purpose of validation i.e. verifying that it can be created. */
private void createDataExtractor(
Task task,
Job job,
DatafeedConfig datafeed,
StartDatafeedAction.DatafeedParams params,
ActionListener<PersistentTasksCustomMetadata.PersistentTask<StartDatafeedAction.DatafeedParams>> listener
) {
DataExtractorFactory.create(
client,
new ParentTaskAssigningClient(client, clusterService.localNode(), task),
datafeed,
job,
xContentRegistry,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public class DatafeedJobBuilder {
private final Supplier<Long> currentTimeSupplier;
private final JobResultsPersister jobResultsPersister;
private final boolean remoteClusterClient;
private final String nodeName;
private final ClusterService clusterService;

private volatile long delayedDataCheckFreq;

Expand All @@ -65,8 +65,8 @@ public DatafeedJobBuilder(
this.currentTimeSupplier = Objects.requireNonNull(currentTimeSupplier);
this.jobResultsPersister = Objects.requireNonNull(jobResultsPersister);
this.remoteClusterClient = DiscoveryNode.isRemoteClusterClient(settings);
this.nodeName = clusterService.getNodeName();
this.delayedDataCheckFreq = DELAYED_DATA_CHECK_FREQ.get(settings).millis();
this.clusterService = Objects.requireNonNull(clusterService);
clusterService.getClusterSettings().addSettingsUpdateConsumer(DELAYED_DATA_CHECK_FREQ, this::setDelayedDataCheckFreq);
}

Expand All @@ -75,7 +75,7 @@ private void setDelayedDataCheckFreq(TimeValue value) {
}

void build(TransportStartDatafeedAction.DatafeedTask task, DatafeedContext context, ActionListener<DatafeedJob> listener) {
final ParentTaskAssigningClient parentTaskAssigningClient = new ParentTaskAssigningClient(client, task.getParentTaskId());
final ParentTaskAssigningClient parentTaskAssigningClient = new ParentTaskAssigningClient(client, clusterService.localNode(), task);
final DatafeedConfig datafeedConfig = context.getDatafeedConfig();
final Job job = context.getJob();
final long latestFinalBucketEndMs = context.getRestartTimeInfo().getLatestFinalBucketTimeMs() == null
Expand Down Expand Up @@ -155,7 +155,12 @@ private void checkRemoteIndicesAreAvailable(DatafeedConfig datafeedConfig) {
List<String> remoteIndices = RemoteClusterLicenseChecker.remoteIndices(datafeedConfig.getIndices());
if (remoteIndices.isEmpty() == false) {
throw ExceptionsHelper.badRequestException(
Messages.getMessage(Messages.DATAFEED_NEEDS_REMOTE_CLUSTER_SEARCH, datafeedConfig.getId(), remoteIndices, nodeName)
Messages.getMessage(
Messages.DATAFEED_NEEDS_REMOTE_CLUSTER_SEARCH,
datafeedConfig.getId(),
remoteIndices,
clusterService.getNodeName()
)
);
}
}
Expand Down