Skip to content

Commit a1838da

Browse files
committed
ML: fix delayed data annotations on secured cluster (#37193)
* changing executing context for writing annotation * adjusting user * removing unused import
1 parent b0e7036 commit a1838da

File tree

2 files changed

+17
-13
lines changed

2 files changed

+17
-13
lines changed

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
3434
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
3535
import org.elasticsearch.xpack.core.ml.job.results.Bucket;
36-
import org.elasticsearch.xpack.core.security.user.SystemUser;
36+
import org.elasticsearch.xpack.core.security.user.XPackUser;
3737
import org.elasticsearch.xpack.ml.datafeed.delayeddatacheck.DelayedDataDetector;
3838
import org.elasticsearch.xpack.ml.datafeed.delayeddatacheck.DelayedDataDetectorFactory.BucketWithMissingData;
3939
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
@@ -226,22 +226,24 @@ private Annotation createAnnotation(Date startTime, Date endTime, String msg) {
226226
Date currentTime = new Date(currentTimeSupplier.get());
227227
return new Annotation(msg,
228228
currentTime,
229-
SystemUser.NAME,
229+
XPackUser.NAME,
230230
startTime,
231231
endTime,
232232
jobId,
233233
currentTime,
234-
SystemUser.NAME,
234+
XPackUser.NAME,
235235
"annotation");
236236
}
237237

238238
private String addAndSetDelayedDataAnnotation(Annotation annotation) {
239239
try (XContentBuilder xContentBuilder = annotation.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) {
240240
IndexRequest request = new IndexRequest(AnnotationIndex.WRITE_ALIAS_NAME, ElasticsearchMappings.DOC_TYPE);
241241
request.source(xContentBuilder);
242-
IndexResponse response = client.index(request).actionGet();
243-
lastDataCheckAnnotation = annotation;
244-
return response.getId();
242+
try (ThreadContext.StoredContext ignore = stashWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN)) {
243+
IndexResponse response = client.index(request).actionGet();
244+
lastDataCheckAnnotation = annotation;
245+
return response.getId();
246+
}
245247
} catch (IOException ex) {
246248
String errorMessage = "[" + jobId + "] failed to create annotation for delayed data checker.";
247249
LOGGER.error(errorMessage, ex);
@@ -252,7 +254,7 @@ private String addAndSetDelayedDataAnnotation(Annotation annotation) {
252254

253255
private void updateAnnotation(Annotation annotation) {
254256
Annotation updatedAnnotation = new Annotation(lastDataCheckAnnotation);
255-
updatedAnnotation.setModifiedUsername(SystemUser.NAME);
257+
updatedAnnotation.setModifiedUsername(XPackUser.NAME);
256258
updatedAnnotation.setModifiedTime(new Date(currentTimeSupplier.get()));
257259
updatedAnnotation.setAnnotation(annotation.getAnnotation());
258260
updatedAnnotation.setTimestamp(annotation.getTimestamp());
@@ -261,8 +263,10 @@ private void updateAnnotation(Annotation annotation) {
261263
IndexRequest indexRequest = new IndexRequest(AnnotationIndex.WRITE_ALIAS_NAME, ElasticsearchMappings.DOC_TYPE);
262264
indexRequest.id(lastDataCheckAnnotationId);
263265
indexRequest.source(xContentBuilder);
264-
client.index(indexRequest).actionGet();
265-
lastDataCheckAnnotation = updatedAnnotation;
266+
try (ThreadContext.StoredContext ignore = stashWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN)) {
267+
client.index(indexRequest).actionGet();
268+
lastDataCheckAnnotation = updatedAnnotation;
269+
}
266270
} catch (IOException ex) {
267271
String errorMessage = "[" + jobId + "] failed to update annotation for delayed data checker.";
268272
LOGGER.error(errorMessage, ex);

x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobTests.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
3232
import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
3333
import org.elasticsearch.xpack.core.ml.job.results.Bucket;
34-
import org.elasticsearch.xpack.core.security.user.SystemUser;
34+
import org.elasticsearch.xpack.core.security.user.XPackUser;
3535
import org.elasticsearch.xpack.ml.datafeed.delayeddatacheck.DelayedDataDetector;
3636
import org.elasticsearch.xpack.ml.datafeed.delayeddatacheck.DelayedDataDetectorFactory.BucketWithMissingData;
3737
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
@@ -273,12 +273,12 @@ public void testRealtimeRun() throws Exception {
273273

274274
Annotation expectedAnnotation = new Annotation(msg,
275275
new Date(currentTime),
276-
SystemUser.NAME,
276+
XPackUser.NAME,
277277
bucket.getTimestamp(),
278278
new Date((bucket.getEpoch() + bucket.getBucketSpan()) * 1000),
279279
jobId,
280280
new Date(currentTime),
281-
SystemUser.NAME,
281+
XPackUser.NAME,
282282
"annotation");
283283

284284
IndexRequest request = new IndexRequest(AnnotationIndex.WRITE_ALIAS_NAME, ElasticsearchMappings.DOC_TYPE);
@@ -314,7 +314,7 @@ public void testRealtimeRun() throws Exception {
314314
Annotation updatedAnnotation = new Annotation(expectedAnnotation);
315315
updatedAnnotation.setAnnotation(msg);
316316
updatedAnnotation.setModifiedTime(new Date(currentTime));
317-
updatedAnnotation.setModifiedUsername(SystemUser.NAME);
317+
updatedAnnotation.setModifiedUsername(XPackUser.NAME);
318318
updatedAnnotation.setEndTimestamp(new Date((bucket2.getEpoch() + bucket2.getBucketSpan()) * 1000));
319319
try (XContentBuilder xContentBuilder = updatedAnnotation.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) {
320320
indexRequest.source(xContentBuilder);

0 commit comments

Comments
 (0)