Skip to content

Commit 5a5f111

Browse files
committed
Read config in datafeed manager
1 parent df6857c commit 5a5f111

File tree

13 files changed

+288
-158
lines changed

13 files changed

+288
-158
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlTasks.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ public static Set<String> openJobIds(PersistentTasksCustomMetaData tasks) {
9797
* Is there an ml anomaly detector job task for the job {@code jobId}?
9898
* @param jobId The job id
9999
* @param tasks Persistent tasks
100-
* @return
100+
* @return True if the job has a task
101101
*/
102102
public static boolean taskExistsForJob(String jobId, PersistentTasksCustomMetaData tasks) {
103103
return openJobIds(tasks).contains(jobId);

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/StartDatafeedAction.java

Lines changed: 20 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,12 @@
2828
import org.elasticsearch.xpack.core.XPackPlugin;
2929
import org.elasticsearch.xpack.core.ml.MlTasks;
3030
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
31-
import org.elasticsearch.xpack.core.ml.job.config.Job;
3231
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
3332
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
3433

3534
import java.io.IOException;
35+
import java.util.Collections;
36+
import java.util.List;
3637
import java.util.Objects;
3738
import java.util.function.LongSupplier;
3839

@@ -196,8 +197,8 @@ public DatafeedParams(StreamInput in) throws IOException {
196197
endTime = in.readOptionalLong();
197198
timeout = TimeValue.timeValueMillis(in.readVLong());
198199
if (in.getVersion().onOrAfter(Version.CURRENT)) {
199-
datafeedConfig = in.readOptionalWriteable(DatafeedConfig::new);
200-
job = in.readOptionalWriteable(Job::new);
200+
jobId = in.readOptionalString();
201+
datafeedIndices = in.readList(StreamInput::readString);
201202
}
202203
}
203204

@@ -208,8 +209,9 @@ public DatafeedParams(StreamInput in) throws IOException {
208209
private long startTime;
209210
private Long endTime;
210211
private TimeValue timeout = TimeValue.timeValueSeconds(20);
211-
private DatafeedConfig datafeedConfig;
212-
private Job job;
212+
private List<String> datafeedIndices = Collections.emptyList();
213+
private String jobId;
214+
213215

214216
public String getDatafeedId() {
215217
return datafeedId;
@@ -239,20 +241,20 @@ public void setTimeout(TimeValue timeout) {
239241
this.timeout = timeout;
240242
}
241243

242-
public DatafeedConfig getDatafeedConfig() {
243-
return datafeedConfig;
244+
public String getJobId() {
245+
return jobId;
244246
}
245247

246-
public void setDatafeedConfig(DatafeedConfig datafeedConfig) {
247-
this.datafeedConfig = datafeedConfig;
248+
public void setJobId(String jobId) {
249+
this.jobId = jobId;
248250
}
249251

250-
public Job getJob() {
251-
return job;
252+
public List<String> getDatafeedIndices() {
253+
return datafeedIndices;
252254
}
253255

254-
public void setJob(Job job) {
255-
this.job = job;
256+
public void setDatafeedIndices(List<String> datafeedIndices) {
257+
this.datafeedIndices = datafeedIndices;
256258
}
257259

258260
@Override
@@ -272,8 +274,8 @@ public void writeTo(StreamOutput out) throws IOException {
272274
out.writeOptionalLong(endTime);
273275
out.writeVLong(timeout.millis());
274276
if (out.getVersion().onOrAfter(Version.CURRENT)) {
275-
out.writeOptionalWriteable(datafeedConfig);
276-
out.writeOptionalWriteable(job);
277+
out.writeOptionalString(jobId);
278+
out.writeStringList(datafeedIndices);
277279
}
278280
}
279281

@@ -292,7 +294,7 @@ public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params par
292294

293295
@Override
294296
public int hashCode() {
295-
return Objects.hash(datafeedId, startTime, endTime, timeout, datafeedConfig, job);
297+
return Objects.hash(datafeedId, startTime, endTime, timeout, jobId, datafeedIndices);
296298
}
297299

298300
@Override
@@ -308,8 +310,8 @@ public boolean equals(Object obj) {
308310
Objects.equals(startTime, other.startTime) &&
309311
Objects.equals(endTime, other.endTime) &&
310312
Objects.equals(timeout, other.timeout) &&
311-
Objects.equals(datafeedConfig, other.datafeedConfig) &&
312-
Objects.equals(job, other.job);
313+
Objects.equals(jobId, other.jobId) &&
314+
Objects.equals(datafeedIndices, other.datafeedIndices);
313315
}
314316
}
315317

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -411,7 +411,8 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
411411
jobManager, jobResultsProvider, jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory,
412412
normalizerFactory, xContentRegistry, auditor);
413413
this.autodetectProcessManager.set(autodetectProcessManager);
414-
DatafeedJobBuilder datafeedJobBuilder = new DatafeedJobBuilder(client, jobResultsProvider, auditor, System::currentTimeMillis);
414+
DatafeedJobBuilder datafeedJobBuilder = new DatafeedJobBuilder(client, settings, xContentRegistry,
415+
auditor, System::currentTimeMillis);
415416
DatafeedManager datafeedManager = new DatafeedManager(threadPool, client, clusterService, datafeedJobBuilder,
416417
System::currentTimeMillis, auditor);
417418
this.datafeedManager.set(datafeedManager);

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAssignmentNotifier.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ public void clusterChanged(ClusterChangedEvent event) {
8888
}
8989
} else if (MlTasks.DATAFEED_TASK_NAME.equals(currentTask.getTaskName())) {
9090
StartDatafeedAction.DatafeedParams datafeedParams = (StartDatafeedAction.DatafeedParams) currentTask.getParams();
91-
String jobId = datafeedParams.getJob() != null ? datafeedParams.getJob().getId() : null;
91+
String jobId = datafeedParams.getJobId();
9292
if (currentAssignment.getExecutorNode() == null) {
9393
String msg = "No node found to start datafeed [" + datafeedParams.getDatafeedId() +"]. Reasons [" +
9494
currentAssignment.getExplanation() + "]";

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java

Lines changed: 25 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import java.util.List;
5454
import java.util.Locale;
5555
import java.util.Map;
56+
import java.util.concurrent.atomic.AtomicReference;
5657
import java.util.function.Consumer;
5758
import java.util.function.Predicate;
5859

@@ -117,6 +118,7 @@ protected void masterOperation(StartDatafeedAction.Request request, ClusterState
117118
return;
118119
}
119120

121+
AtomicReference<DatafeedConfig> datafeedConfigHolder = new AtomicReference<>();
120122
PersistentTasksCustomMetaData tasks = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
121123

122124
ActionListener<PersistentTasksCustomMetaData.PersistentTask<StartDatafeedAction.DatafeedParams>> waitForTaskListener =
@@ -139,36 +141,37 @@ public void onFailure(Exception e) {
139141
};
140142

141143
// Verify data extractor factory can be created, then start persistent task
142-
Consumer<Boolean> createDataExtrator = ok -> {
143-
if (RemoteClusterLicenseChecker.containsRemoteIndex(params.getDatafeedConfig().getIndices())) {
144+
Consumer<Job> createDataExtrator = job -> {
145+
if (RemoteClusterLicenseChecker.containsRemoteIndex(params.getDatafeedIndices())) {
144146
final RemoteClusterLicenseChecker remoteClusterLicenseChecker =
145147
new RemoteClusterLicenseChecker(client, XPackLicenseState::isMachineLearningAllowedForOperationMode);
146148
remoteClusterLicenseChecker.checkRemoteClusterLicenses(
147-
RemoteClusterLicenseChecker.remoteClusterAliases(params.getDatafeedConfig().getIndices()),
149+
RemoteClusterLicenseChecker.remoteClusterAliases(params.getDatafeedIndices()),
148150
ActionListener.wrap(
149151
response -> {
150152
if (response.isSuccess() == false) {
151-
listener.onFailure(createUnlicensedError(params.getDatafeedConfig().getId(), response));
153+
listener.onFailure(createUnlicensedError(params.getDatafeedId(), response));
152154
} else {
153-
createDataExtractor(params.getJob(), params.getDatafeedConfig(), params, waitForTaskListener);
155+
createDataExtractor(job, datafeedConfigHolder.get(), params, waitForTaskListener);
154156
}
155157
},
156158
e -> listener.onFailure(
157159
createUnknownLicenseError(
158-
params.getDatafeedConfig().getId(),
159-
RemoteClusterLicenseChecker.remoteIndices(params.getDatafeedConfig().getIndices()), e))
160-
));
160+
params.getDatafeedId(),
161+
RemoteClusterLicenseChecker.remoteIndices(params.getDatafeedIndices()), e))
162+
)
163+
);
161164
} else {
162-
createDataExtractor(params.getJob(), params.getDatafeedConfig(), params, waitForTaskListener);
165+
createDataExtractor(job, datafeedConfigHolder.get(), params, waitForTaskListener);
163166
}
164167
};
165168

166169
ActionListener<Job.Builder> jobListener = ActionListener.wrap(
167170
jobBuilder -> {
168171
try {
169-
params.setJob(jobBuilder.build());
170-
validate(params.getJob(), params.getDatafeedConfig(), tasks);
171-
createDataExtrator.accept(Boolean.TRUE);
172+
Job job = jobBuilder.build();
173+
validate(job, datafeedConfigHolder.get(), tasks);
174+
createDataExtrator.accept(job);
172175
} catch (Exception e) {
173176
listener.onFailure(e);
174177
}
@@ -177,10 +180,13 @@ public void onFailure(Exception e) {
177180
);
178181

179182
ActionListener<DatafeedConfig.Builder> datafeedListener = ActionListener.wrap(
180-
datafeedConfig -> {
183+
datafeedBuilder -> {
181184
try {
182-
params.setDatafeedConfig(datafeedConfig.build());
183-
jobConfigProvider.getJob(params.getDatafeedConfig().getJobId(), jobListener);
185+
DatafeedConfig datafeedConfig = datafeedBuilder.build();
186+
params.setDatafeedIndices(datafeedConfig.getIndices());
187+
params.setJobId(datafeedConfig.getJobId());
188+
datafeedConfigHolder.set(datafeedConfig);
189+
jobConfigProvider.getJob(datafeedConfig.getJobId(), jobListener);
184190
} catch (Exception e) {
185191
listener.onFailure(e);
186192
}
@@ -304,14 +310,13 @@ public StartDatafeedPersistentTasksExecutor(Settings settings, DatafeedManager d
304310
@Override
305311
public PersistentTasksCustomMetaData.Assignment getAssignment(StartDatafeedAction.DatafeedParams params,
306312
ClusterState clusterState) {
307-
return new DatafeedNodeSelector(clusterState, resolver, params.getDatafeedConfig()).selectNode();
313+
return new DatafeedNodeSelector(clusterState, resolver, params.getDatafeedId(), params.getJobId(),
314+
params.getDatafeedIndices()).selectNode();
308315
}
309316

310317
@Override
311318
public void validate(StartDatafeedAction.DatafeedParams params, ClusterState clusterState) {
312-
PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
313-
TransportStartDatafeedAction.validate(params.getJob(), params.getDatafeedConfig(), tasks);
314-
new DatafeedNodeSelector(clusterState, resolver, params.getDatafeedConfig())
319+
new DatafeedNodeSelector(clusterState, resolver, params.getDatafeedId(), params.getJobId(), params.getDatafeedIndices())
315320
.checkDatafeedTaskCanBeCreated();
316321
}
317322

@@ -321,7 +326,7 @@ protected void nodeOperation(final AllocatedPersistentTask allocatedPersistentTa
321326
final PersistentTaskState state) {
322327
DatafeedTask datafeedTask = (DatafeedTask) allocatedPersistentTask;
323328
datafeedTask.datafeedManager = datafeedManager;
324-
datafeedManager.run(datafeedTask, params.getJob(), params.getDatafeedConfig(),
329+
datafeedManager.run(datafeedTask,
325330
(error) -> {
326331
if (error != null) {
327332
datafeedTask.markAsFailed(error);

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,10 @@ boolean isIsolated() {
8282
return isIsolated;
8383
}
8484

85+
public String getJobId() {
86+
return jobId;
87+
}
88+
8589
Long runLookBack(long startTime, Long endTime) throws Exception {
8690
lookbackStartTimeMs = skipToStartTime(startTime);
8791
Optional<Long> endMs = Optional.ofNullable(endTime);

0 commit comments

Comments
 (0)