|
12 | 12 | import org.elasticsearch.cluster.service.ClusterService; |
13 | 13 | import org.elasticsearch.common.component.AbstractComponent; |
14 | 14 | import org.elasticsearch.common.settings.Settings; |
| 15 | +import org.elasticsearch.persistent.PersistentTasksCustomMetaData; |
| 16 | +import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment; |
| 17 | +import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; |
15 | 18 | import org.elasticsearch.threadpool.ThreadPool; |
16 | | -import org.elasticsearch.xpack.core.ml.MlMetadata; |
17 | 19 | import org.elasticsearch.xpack.core.ml.MlTasks; |
18 | 20 | import org.elasticsearch.xpack.core.ml.action.OpenJobAction; |
19 | 21 | import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction; |
20 | | -import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; |
21 | | -import org.elasticsearch.persistent.PersistentTasksCustomMetaData; |
22 | | -import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment; |
23 | | -import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; |
24 | 22 | import org.elasticsearch.xpack.ml.notifications.Auditor; |
25 | 23 |
|
26 | 24 | import java.util.Objects; |
@@ -89,16 +87,20 @@ public void clusterChanged(ClusterChangedEvent event) { |
89 | 87 | auditor.info(jobId, "Opening job on node [" + node.toString() + "]"); |
90 | 88 | } |
91 | 89 | } else if (MlTasks.DATAFEED_TASK_NAME.equals(currentTask.getTaskName())) { |
92 | | - String datafeedId = ((StartDatafeedAction.DatafeedParams) currentTask.getParams()).getDatafeedId(); |
93 | | - DatafeedConfig datafeedConfig = MlMetadata.getMlMetadata(event.state()).getDatafeed(datafeedId); |
| 90 | + StartDatafeedAction.DatafeedParams datafeedParams = (StartDatafeedAction.DatafeedParams) currentTask.getParams(); |
| 91 | + String jobId = datafeedParams.getJob() != null ? datafeedParams.getJob().getId() : null; |
94 | 92 | if (currentAssignment.getExecutorNode() == null) { |
95 | | - String msg = "No node found to start datafeed [" + datafeedId +"]. Reasons [" + |
| 93 | + String msg = "No node found to start datafeed [" + datafeedParams.getDatafeedId() +"]. Reasons [" + |
96 | 94 | currentAssignment.getExplanation() + "]"; |
97 | | - logger.warn("[{}] {}", datafeedConfig.getJobId(), msg); |
98 | | - auditor.warning(datafeedConfig.getJobId(), msg); |
| 95 | + logger.warn("[{}] {}", jobId, msg); |
| 96 | + if (jobId != null) { |
| 97 | + auditor.warning(jobId, msg); |
| 98 | + } |
99 | 99 | } else { |
100 | 100 | DiscoveryNode node = event.state().nodes().get(currentAssignment.getExecutorNode()); |
101 | | - auditor.info(datafeedConfig.getJobId(), "Starting datafeed [" + datafeedId + "] on node [" + node + "]"); |
| 101 | + if (jobId != null) { |
| 102 | + auditor.info(jobId, "Starting datafeed [" + datafeedParams.getDatafeedId() + "] on node [" + node + "]"); |
| 103 | + } |
102 | 104 | } |
103 | 105 | } |
104 | 106 | } |
|
0 commit comments