Skip to content

Commit

Permalink
[ML] ensure the ml-config index (#36792) (#36832)
Browse files Browse the repository at this point in the history
  • Loading branch information
davidkyle authored Dec 19, 2018
1 parent f2a5373 commit d43cbda
Show file tree
Hide file tree
Showing 10 changed files with 305 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
*/
public final class AnomalyDetectorsIndex {

public static final int CONFIG_INDEX_MAX_RESULTS_WINDOW = 10_000;

private AnomalyDetectorsIndex() {
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -670,7 +670,9 @@ public UnaryOperator<Map<String, IndexTemplateMetaData>> getIndexTemplateMetaDat
// least possible burden on Elasticsearch
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS, "0-1")
.put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), delayedNodeTimeOutSetting))
.put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), delayedNodeTimeOutSetting)
.put(IndexSettings.MAX_RESULT_WINDOW_SETTING.getKey(),
AnomalyDetectorsIndex.CONFIG_INDEX_MAX_RESULTS_WINDOW))
.version(Version.CURRENT.id)
.putMapping(ElasticsearchMappings.DOC_TYPE, Strings.toString(configMapping))
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@

import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;

/**
* Checks whether migration can start and whether ML resources (e.g. jobs, datafeeds)
Expand All @@ -37,10 +39,12 @@ private void setConfigMigrationEnabled(boolean configMigrationEnabled) {
this.isConfigMigrationEnabled = configMigrationEnabled;
}


/**
* Can migration start? Returns:
* False if config migration is disabled via the setting {@link #ENABLE_CONFIG_MIGRATION}
* False if the min node version of the cluster is before {@link #MIN_NODE_VERSION}
* False if the .ml-config index shards are not active
* True otherwise
* @param clusterState The cluster state
* @return A boolean that dictates if config migration can start
Expand All @@ -54,12 +58,26 @@ public boolean canStartMigration(ClusterState clusterState) {
if (minNodeVersion.before(MIN_NODE_VERSION)) {
return false;
}

return mlConfigIndexIsAllocated(clusterState);
}

static boolean mlConfigIndexIsAllocated(ClusterState clusterState) {
if (clusterState.metaData().hasIndex(AnomalyDetectorsIndex.configIndexName()) == false) {
return false;
}

IndexRoutingTable routingTable = clusterState.getRoutingTable().index(AnomalyDetectorsIndex.configIndexName());
if (routingTable == null || routingTable.allPrimaryShardsActive() == false) {
return false;
}
return true;
}

/**
* Is the job a eligible for migration? Returns:
* False if {@link #canStartMigration(ClusterState)} returns {@code false}
* False if the job is not in the cluster state
* False if the {@link Job#isDeleting()}
* False if the job has a persistent task
* True otherwise i.e. the job is present, not deleting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
Expand All @@ -21,6 +23,7 @@
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
Expand All @@ -29,6 +32,7 @@
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.MlTasks;
Expand Down Expand Up @@ -126,19 +130,11 @@ public MlConfigMigrator(Settings settings, Client client, ClusterService cluster
* @param listener The success listener
*/
public void migrateConfigsWithoutTasks(ClusterState clusterState, ActionListener<Boolean> listener) {

if (migrationEligibilityCheck.canStartMigration(clusterState) == false) {
listener.onResponse(false);
return;
}

if (migrationInProgress.compareAndSet(false, true) == false) {
listener.onResponse(Boolean.FALSE);
return;
}

logger.debug("migrating ml configurations");

ActionListener<Boolean> unMarkMigrationInProgress = ActionListener.wrap(
response -> {
migrationInProgress.set(false);
Expand All @@ -150,19 +146,34 @@ public void migrateConfigsWithoutTasks(ClusterState clusterState, ActionListener
}
);

List<JobsAndDatafeeds> batches = splitInBatches(clusterState);
if (batches.isEmpty()) {
unMarkMigrationInProgress.onResponse(Boolean.FALSE);
return;
}

if (clusterState.metaData().hasIndex(AnomalyDetectorsIndex.configIndexName()) == false) {
createConfigIndex(ActionListener.wrap(
response -> {
unMarkMigrationInProgress.onResponse(Boolean.FALSE);
},
unMarkMigrationInProgress::onFailure
));
return;
}

if (migrationEligibilityCheck.canStartMigration(clusterState) == false) {
unMarkMigrationInProgress.onResponse(Boolean.FALSE);
return;
}

snapshotMlMeta(MlMetadata.getMlMetadata(clusterState), ActionListener.wrap(
response -> {
// We have successfully snapshotted the ML configs so we don't need to try again
tookConfigSnapshot.set(true);

List<JobsAndDatafeeds> batches = splitInBatches(clusterState);
if (batches.isEmpty()) {
unMarkMigrationInProgress.onResponse(Boolean.FALSE);
return;
}
migrateBatches(batches, unMarkMigrationInProgress);
},
unMarkMigrationInProgress::onFailure
response -> {
// We have successfully snapshotted the ML configs so we don't need to try again
tookConfigSnapshot.set(true);
migrateBatches(batches, unMarkMigrationInProgress);
},
unMarkMigrationInProgress::onFailure
));
}

Expand Down Expand Up @@ -296,13 +307,15 @@ static RemovalResult removeJobsAndDatafeeds(List<String> jobsToRemove, List<Stri
private void addJobIndexRequests(Collection<Job> jobs, BulkRequestBuilder bulkRequestBuilder) {
ToXContent.Params params = new ToXContent.MapParams(JobConfigProvider.TO_XCONTENT_PARAMS);
for (Job job : jobs) {
logger.debug("adding job to migrate: " + job.getId());
bulkRequestBuilder.add(indexRequest(job, Job.documentId(job.getId()), params));
}
}

private void addDatafeedIndexRequests(Collection<DatafeedConfig> datafeedConfigs, BulkRequestBuilder bulkRequestBuilder) {
ToXContent.Params params = new ToXContent.MapParams(DatafeedConfigProvider.TO_XCONTENT_PARAMS);
for (DatafeedConfig datafeedConfig : datafeedConfigs) {
logger.debug("adding datafeed to migrate: " + datafeedConfig.getId());
bulkRequestBuilder.add(indexRequest(datafeedConfig, DatafeedConfig.documentId(datafeedConfig.getId()), params));
}
}
Expand All @@ -318,7 +331,6 @@ private IndexRequest indexRequest(ToXContentObject source, String documentId, To
return indexRequest;
}


// public for testing
public void snapshotMlMeta(MlMetadata mlMetadata, ActionListener<Boolean> listener) {

Expand Down Expand Up @@ -361,6 +373,30 @@ public void snapshotMlMeta(MlMetadata mlMetadata, ActionListener<Boolean> listen
);
}

private void createConfigIndex(ActionListener<Boolean> listener) {
logger.info("creating the .ml-config index");
CreateIndexRequest createIndexRequest = new CreateIndexRequest(AnomalyDetectorsIndex.configIndexName());
try
{
createIndexRequest.settings(
Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS, "0-1")
.put(IndexSettings.MAX_RESULT_WINDOW_SETTING.getKey(), AnomalyDetectorsIndex.CONFIG_INDEX_MAX_RESULTS_WINDOW)
);
createIndexRequest.mapping(ElasticsearchMappings.DOC_TYPE, ElasticsearchMappings.configMapping());
} catch (Exception e) {
logger.error("error writing the .ml-config mappings", e);
listener.onFailure(e);
return;
}

executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, createIndexRequest,
ActionListener.<CreateIndexResponse>wrap(
r -> listener.onResponse(r.isAcknowledged()),
listener::onFailure
), client.admin().indices()::create);
}

public static Job updateJobForMigration(Job job) {
Job.Builder builder = new Job.Builder(job);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,15 @@
import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;

/**
* This class implements CRUD operation for the
* datafeed configuration document
*
* The number of datafeeds returned in a search it limited to
* {@link AnomalyDetectorsIndex#CONFIG_INDEX_MAX_RESULTS_WINDOW}.
* In most cases we expect 10s or 100s of datafeeds to be defined and
* a search for all datafeeds should return all.
*/
public class DatafeedConfigProvider {

private static final Logger logger = LogManager.getLogger(DatafeedConfigProvider.class);
Expand All @@ -87,13 +96,6 @@ public class DatafeedConfigProvider {
TO_XCONTENT_PARAMS = Collections.unmodifiableMap(modifiable);
}

/**
* In most cases we expect 10s or 100s of datafeeds to be defined and
* a search for all datafeeds should return all.
* TODO this is a temporary fix
*/
public int searchSize = 1000;

public DatafeedConfigProvider(Client client, NamedXContentRegistry xContentRegistry) {
this.client = client;
this.xContentRegistry = xContentRegistry;
Expand Down Expand Up @@ -368,7 +370,9 @@ public void expandDatafeedIds(String expression, boolean allowNoDatafeeds, Actio

SearchRequest searchRequest = client.prepareSearch(AnomalyDetectorsIndex.configIndexName())
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
.setSource(sourceBuilder).request();
.setSource(sourceBuilder)
.setSize(AnomalyDetectorsIndex.CONFIG_INDEX_MAX_RESULTS_WINDOW)
.request();

ExpandedIdsMatcher requiredMatches = new ExpandedIdsMatcher(tokens, allowNoDatafeeds);

Expand Down Expand Up @@ -407,7 +411,6 @@ public void expandDatafeedIds(String expression, boolean allowNoDatafeeds, Actio
* wildcard then setting this true will not suppress the exception
* @param listener The expanded datafeed config listener
*/
// NORELEASE datafeed configs should be paged or have a mechanism to return all jobs if there are many of them
public void expandDatafeedConfigs(String expression, boolean allowNoDatafeeds, ActionListener<List<DatafeedConfig.Builder>> listener) {
String [] tokens = ExpandedIdsMatcher.tokenizeExpression(expression);
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(buildDatafeedIdQuery(tokens));
Expand All @@ -416,7 +419,7 @@ public void expandDatafeedConfigs(String expression, boolean allowNoDatafeeds, A
SearchRequest searchRequest = client.prepareSearch(AnomalyDetectorsIndex.configIndexName())
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
.setSource(sourceBuilder)
.setSize(searchSize)
.setSize(AnomalyDetectorsIndex.CONFIG_INDEX_MAX_RESULTS_WINDOW)
.request();

ExpandedIdsMatcher requiredMatches = new ExpandedIdsMatcher(tokens, allowNoDatafeeds);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,11 @@
/**
* This class implements CRUD operation for the
* anomaly detector job configuration document
*
* The number of jobs returned in a search it limited to
* {@link AnomalyDetectorsIndex#CONFIG_INDEX_MAX_RESULTS_WINDOW}.
* In most cases we expect 10s or 100s of jobs to be defined and
* a search for all jobs should return all.
*/
public class JobConfigProvider {

Expand All @@ -101,13 +106,6 @@ public class JobConfigProvider {
TO_XCONTENT_PARAMS = Collections.unmodifiableMap(modifiable);
}

/**
* In most cases we expect 10s or 100s of jobs to be defined and
* a search for all jobs should return all.
* TODO this is a temporary fix
*/
private int searchSize = 1000;

private final Client client;

public JobConfigProvider(Client client) {
Expand Down Expand Up @@ -565,7 +563,7 @@ public void expandJobsIds(String expression, boolean allowNoJobs, boolean exclud
SearchRequest searchRequest = client.prepareSearch(AnomalyDetectorsIndex.configIndexName())
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
.setSource(sourceBuilder)
.setSize(searchSize)
.setSize(AnomalyDetectorsIndex.CONFIG_INDEX_MAX_RESULTS_WINDOW)
.request();

ExpandedIdsMatcher requiredMatches = new ExpandedIdsMatcher(tokens, allowNoJobs);
Expand Down Expand Up @@ -599,6 +597,21 @@ public void expandJobsIds(String expression, boolean allowNoJobs, boolean exclud

}

private SearchRequest makeExpandIdsSearchRequest(String expression, boolean excludeDeleting) {
String [] tokens = ExpandedIdsMatcher.tokenizeExpression(expression);
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(buildQuery(tokens, excludeDeleting));
sourceBuilder.sort(Job.ID.getPreferredName());
sourceBuilder.fetchSource(false);
sourceBuilder.docValueField(Job.ID.getPreferredName(), DocValueFieldsContext.USE_DEFAULT_FORMAT);
sourceBuilder.docValueField(Job.GROUPS.getPreferredName(), DocValueFieldsContext.USE_DEFAULT_FORMAT);

return client.prepareSearch(AnomalyDetectorsIndex.configIndexName())
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
.setSource(sourceBuilder)
.setSize(AnomalyDetectorsIndex.CONFIG_INDEX_MAX_RESULTS_WINDOW)
.request();
}

/**
* The same logic as {@link #expandJobsIds(String, boolean, boolean, ActionListener)} but
* the full anomaly detector job configuration is returned.
Expand All @@ -612,7 +625,6 @@ public void expandJobsIds(String expression, boolean allowNoJobs, boolean exclud
* @param excludeDeleting If true exclude jobs marked as deleting
* @param listener The expanded jobs listener
*/
// NORELEASE jobs should be paged or have a mechanism to return all jobs if there are many of them
public void expandJobs(String expression, boolean allowNoJobs, boolean excludeDeleting, ActionListener<List<Job.Builder>> listener) {
String [] tokens = ExpandedIdsMatcher.tokenizeExpression(expression);
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(buildQuery(tokens, excludeDeleting));
Expand All @@ -621,7 +633,7 @@ public void expandJobs(String expression, boolean allowNoJobs, boolean excludeDe
SearchRequest searchRequest = client.prepareSearch(AnomalyDetectorsIndex.configIndexName())
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
.setSource(sourceBuilder)
.setSize(searchSize)
.setSize(AnomalyDetectorsIndex.CONFIG_INDEX_MAX_RESULTS_WINDOW)
.request();

ExpandedIdsMatcher requiredMatches = new ExpandedIdsMatcher(tokens, allowNoJobs);
Expand Down Expand Up @@ -679,7 +691,7 @@ public void expandGroupIds(List<String> groupIds, ActionListener<SortedSet<Strin
SearchRequest searchRequest = client.prepareSearch(AnomalyDetectorsIndex.configIndexName())
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
.setSource(sourceBuilder)
.setSize(searchSize)
.setSize(AnomalyDetectorsIndex.CONFIG_INDEX_MAX_RESULTS_WINDOW)
.request();

executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, searchRequest,
Expand Down Expand Up @@ -741,7 +753,7 @@ public void findJobsWithCustomRules(ActionListener<List<Job>> listener) {
SearchRequest searchRequest = client.prepareSearch(AnomalyDetectorsIndex.configIndexName())
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
.setSource(sourceBuilder)
.setSize(searchSize)
.setSize(AnomalyDetectorsIndex.CONFIG_INDEX_MAX_RESULTS_WINDOW)
.request();

executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, searchRequest,
Expand Down
Loading

0 comments on commit d43cbda

Please sign in to comment.