Skip to content

Commit

Permalink
[ML] Persist data counts in DFA final step with ML origin
Browse files Browse the repository at this point in the history
In elastic#67623 I moved persisting the data counts at the end of a
data frame analytics job into a `FinalStep` class. However,
I forgot to execute the index request with ML origin resulting
in authentication problems if the user that runs the DFA job
does not have read privileges in the ML stats index.

This commit fixes this by executing that index request with ML
origin.
  • Loading branch information
dimitris-athanasiou committed Jan 19, 2021
1 parent 7bb676a commit a193566
Showing 1 changed file with 5 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.refresh.RefreshAction;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
import org.elasticsearch.action.index.IndexAction;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.ParentTaskAssigningClient;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
Expand All @@ -36,6 +36,7 @@
import java.util.Collections;

import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;

/**
* The final step of a data frame analytics job.
Expand Down Expand Up @@ -80,7 +81,7 @@ private void indexDataCounts(ActionListener<IndexResponse> listener) {
.id(DataCounts.documentId(config.getId()))
.setRequireAlias(true)
.source(builder);
parentTaskClient().index(indexRequest, listener);
executeAsyncWithOrigin(parentTaskClient(), ML_ORIGIN, IndexAction.INSTANCE, indexRequest, listener);
} catch (IOException e) {
listener.onFailure(ExceptionsHelper.serverError("[{}] Error persisting final data counts", e, config.getId()));
}
Expand All @@ -97,10 +98,7 @@ private void refreshIndices(ActionListener<RefreshResponse> listener) {
LOGGER.debug(() -> new ParameterizedMessage("[{}] Refreshing indices {}", config.getId(),
Arrays.toString(refreshRequest.indices())));

ParentTaskAssigningClient parentTaskClient = parentTaskClient();
try (ThreadContext.StoredContext ignore = parentTaskClient.threadPool().getThreadContext().stashWithOrigin(ML_ORIGIN)) {
parentTaskClient.admin().indices().refresh(refreshRequest, listener);
}
executeAsyncWithOrigin(parentTaskClient(), ML_ORIGIN, RefreshAction.INSTANCE, refreshRequest, listener);
}

@Override
Expand Down

0 comments on commit a193566

Please sign in to comment.