Skip to content

Commit

Permalink
[ML] ensure the ml-config index (#36792)
Browse files Browse the repository at this point in the history
  • Loading branch information
davidkyle authored Dec 18, 2018
1 parent c1ed462 commit 0cd088b
Show file tree
Hide file tree
Showing 11 changed files with 303 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
*/
public final class AnomalyDetectorsIndex {

public static final int CONFIG_INDEX_MAX_RESULTS_WINDOW = 10_000;

private AnomalyDetectorsIndex() {
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -675,7 +675,9 @@ public UnaryOperator<Map<String, IndexTemplateMetaData>> getIndexTemplateMetaDat
// least possible burden on Elasticsearch
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS, "0-1")
.put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), delayedNodeTimeOutSetting))
.put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), delayedNodeTimeOutSetting)
.put(IndexSettings.MAX_RESULT_WINDOW_SETTING.getKey(),
AnomalyDetectorsIndex.CONFIG_INDEX_MAX_RESULTS_WINDOW))
.version(Version.CURRENT.id)
.putMapping(ElasticsearchMappings.DOC_TYPE, Strings.toString(configMapping))
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
Expand Down Expand Up @@ -58,23 +57,23 @@ public void clusterChanged(ClusterChangedEvent event) {
return;
}

if (event.metaDataChanged() == false) {
return;
}
PersistentTasksCustomMetaData previous = event.previousState().getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
PersistentTasksCustomMetaData current = event.state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE);

mlConfigMigrator.migrateConfigsWithoutTasks(event.state(), ActionListener.wrap(
response -> threadPool.executor(executorName()).execute(() -> auditChangesToMlTasks(current, previous, event.state())),
response -> threadPool.executor(executorName()).execute(() -> auditChangesToMlTasks(event)),
e -> {
logger.error("error migrating ml configurations", e);
threadPool.executor(executorName()).execute(() -> auditChangesToMlTasks(current, previous, event.state()));
threadPool.executor(executorName()).execute(() -> auditChangesToMlTasks(event));
}
));
}

private void auditChangesToMlTasks(PersistentTasksCustomMetaData current, PersistentTasksCustomMetaData previous,
ClusterState state) {
private void auditChangesToMlTasks(ClusterChangedEvent event) {

if (event.metaDataChanged() == false) {
return;
}

PersistentTasksCustomMetaData previous = event.previousState().getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
PersistentTasksCustomMetaData current = event.state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE);

if (Objects.equals(previous, current)) {
return;
Expand All @@ -92,7 +91,7 @@ private void auditChangesToMlTasks(PersistentTasksCustomMetaData current, Persis
if (currentAssignment.getExecutorNode() == null) {
auditor.warning(jobId, "No node found to open job. Reasons [" + currentAssignment.getExplanation() + "]");
} else {
DiscoveryNode node = state.nodes().get(currentAssignment.getExecutorNode());
DiscoveryNode node = event.state().nodes().get(currentAssignment.getExecutorNode());
auditor.info(jobId, "Opening job on node [" + node.toString() + "]");
}
} else if (MlTasks.DATAFEED_TASK_NAME.equals(currentTask.getTaskName())) {
Expand All @@ -106,7 +105,7 @@ private void auditChangesToMlTasks(PersistentTasksCustomMetaData current, Persis
auditor.warning(jobId, msg);
}
} else {
DiscoveryNode node = state.nodes().get(currentAssignment.getExecutorNode());
DiscoveryNode node = event.state().nodes().get(currentAssignment.getExecutorNode());
if (jobId != null) {
auditor.info(jobId, "Starting datafeed [" + datafeedParams.getDatafeedId() + "] on node [" + node + "]");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@

import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;

/**
* Checks whether migration can start and whether ML resources (e.g. jobs, datafeeds)
Expand All @@ -37,10 +39,12 @@ private void setConfigMigrationEnabled(boolean configMigrationEnabled) {
this.isConfigMigrationEnabled = configMigrationEnabled;
}


/**
* Can migration start? Returns:
* False if config migration is disabled via the setting {@link #ENABLE_CONFIG_MIGRATION}
* False if the min node version of the cluster is before {@link #MIN_NODE_VERSION}
* False if the .ml-config index shards are not active
* True otherwise
* @param clusterState The cluster state
* @return A boolean that dictates if config migration can start
Expand All @@ -54,12 +58,26 @@ public boolean canStartMigration(ClusterState clusterState) {
if (minNodeVersion.before(MIN_NODE_VERSION)) {
return false;
}

return mlConfigIndexIsAllocated(clusterState);
}

static boolean mlConfigIndexIsAllocated(ClusterState clusterState) {
if (clusterState.metaData().hasIndex(AnomalyDetectorsIndex.configIndexName()) == false) {
return false;
}

IndexRoutingTable routingTable = clusterState.getRoutingTable().index(AnomalyDetectorsIndex.configIndexName());
if (routingTable == null || routingTable.allPrimaryShardsActive() == false) {
return false;
}
return true;
}

/**
* Is the job a eligible for migration? Returns:
* False if {@link #canStartMigration(ClusterState)} returns {@code false}
* False if the job is not in the cluster state
* False if the {@link Job#isDeleting()}
* False if the job has a persistent task
* True otherwise i.e. the job is present, not deleting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
Expand All @@ -21,6 +23,7 @@
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
Expand All @@ -29,6 +32,7 @@
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.MlTasks;
Expand Down Expand Up @@ -126,19 +130,11 @@ public MlConfigMigrator(Settings settings, Client client, ClusterService cluster
* @param listener The success listener
*/
public void migrateConfigsWithoutTasks(ClusterState clusterState, ActionListener<Boolean> listener) {

if (migrationEligibilityCheck.canStartMigration(clusterState) == false) {
listener.onResponse(false);
return;
}

if (migrationInProgress.compareAndSet(false, true) == false) {
listener.onResponse(Boolean.FALSE);
return;
}

logger.debug("migrating ml configurations");

ActionListener<Boolean> unMarkMigrationInProgress = ActionListener.wrap(
response -> {
migrationInProgress.set(false);
Expand All @@ -150,19 +146,34 @@ public void migrateConfigsWithoutTasks(ClusterState clusterState, ActionListener
}
);

List<JobsAndDatafeeds> batches = splitInBatches(clusterState);
if (batches.isEmpty()) {
unMarkMigrationInProgress.onResponse(Boolean.FALSE);
return;
}

if (clusterState.metaData().hasIndex(AnomalyDetectorsIndex.configIndexName()) == false) {
createConfigIndex(ActionListener.wrap(
response -> {
unMarkMigrationInProgress.onResponse(Boolean.FALSE);
},
unMarkMigrationInProgress::onFailure
));
return;
}

if (migrationEligibilityCheck.canStartMigration(clusterState) == false) {
unMarkMigrationInProgress.onResponse(Boolean.FALSE);
return;
}

snapshotMlMeta(MlMetadata.getMlMetadata(clusterState), ActionListener.wrap(
response -> {
// We have successfully snapshotted the ML configs so we don't need to try again
tookConfigSnapshot.set(true);

List<JobsAndDatafeeds> batches = splitInBatches(clusterState);
if (batches.isEmpty()) {
unMarkMigrationInProgress.onResponse(Boolean.FALSE);
return;
}
migrateBatches(batches, unMarkMigrationInProgress);
},
unMarkMigrationInProgress::onFailure
response -> {
// We have successfully snapshotted the ML configs so we don't need to try again
tookConfigSnapshot.set(true);
migrateBatches(batches, unMarkMigrationInProgress);
},
unMarkMigrationInProgress::onFailure
));
}

Expand Down Expand Up @@ -296,13 +307,15 @@ static RemovalResult removeJobsAndDatafeeds(List<String> jobsToRemove, List<Stri
private void addJobIndexRequests(Collection<Job> jobs, BulkRequestBuilder bulkRequestBuilder) {
ToXContent.Params params = new ToXContent.MapParams(JobConfigProvider.TO_XCONTENT_PARAMS);
for (Job job : jobs) {
logger.debug("adding job to migrate: " + job.getId());
bulkRequestBuilder.add(indexRequest(job, Job.documentId(job.getId()), params));
}
}

private void addDatafeedIndexRequests(Collection<DatafeedConfig> datafeedConfigs, BulkRequestBuilder bulkRequestBuilder) {
ToXContent.Params params = new ToXContent.MapParams(DatafeedConfigProvider.TO_XCONTENT_PARAMS);
for (DatafeedConfig datafeedConfig : datafeedConfigs) {
logger.debug("adding datafeed to migrate: " + datafeedConfig.getId());
bulkRequestBuilder.add(indexRequest(datafeedConfig, DatafeedConfig.documentId(datafeedConfig.getId()), params));
}
}
Expand All @@ -318,7 +331,6 @@ private IndexRequest indexRequest(ToXContentObject source, String documentId, To
return indexRequest;
}


// public for testing
public void snapshotMlMeta(MlMetadata mlMetadata, ActionListener<Boolean> listener) {

Expand Down Expand Up @@ -361,6 +373,30 @@ public void snapshotMlMeta(MlMetadata mlMetadata, ActionListener<Boolean> listen
);
}

private void createConfigIndex(ActionListener<Boolean> listener) {
logger.info("creating the .ml-config index");
CreateIndexRequest createIndexRequest = new CreateIndexRequest(AnomalyDetectorsIndex.configIndexName());
try
{
createIndexRequest.settings(
Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS, "0-1")
.put(IndexSettings.MAX_RESULT_WINDOW_SETTING.getKey(), AnomalyDetectorsIndex.CONFIG_INDEX_MAX_RESULTS_WINDOW)
);
createIndexRequest.mapping(ElasticsearchMappings.DOC_TYPE, ElasticsearchMappings.configMapping());
} catch (Exception e) {
logger.error("error writing the .ml-config mappings", e);
listener.onFailure(e);
return;
}

executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, createIndexRequest,
ActionListener.<CreateIndexResponse>wrap(
r -> listener.onResponse(r.isAcknowledged()),
listener::onFailure
), client.admin().indices()::create);
}

public static Job updateJobForMigration(Job job) {
Job.Builder builder = new Job.Builder(job);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,15 @@
import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;

/**
* This class implements CRUD operation for the
* datafeed configuration document
*
* The number of datafeeds returned in a search it limited to
* {@link AnomalyDetectorsIndex#CONFIG_INDEX_MAX_RESULTS_WINDOW}.
* In most cases we expect 10s or 100s of datafeeds to be defined and
* a search for all datafeeds should return all.
*/
public class DatafeedConfigProvider {

private static final Logger logger = LogManager.getLogger(DatafeedConfigProvider.class);
Expand All @@ -88,13 +97,6 @@ public class DatafeedConfigProvider {
TO_XCONTENT_PARAMS = Collections.unmodifiableMap(modifiable);
}

/**
* In most cases we expect 10s or 100s of datafeeds to be defined and
* a search for all datafeeds should return all.
* TODO this is a temporary fix
*/
public int searchSize = 1000;

public DatafeedConfigProvider(Client client, NamedXContentRegistry xContentRegistry) {
this.client = client;
this.xContentRegistry = xContentRegistry;
Expand Down Expand Up @@ -433,7 +435,7 @@ private SearchRequest buildExpandDatafeedIdsSearch(String expression) {
return client.prepareSearch(AnomalyDetectorsIndex.configIndexName())
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
.setSource(sourceBuilder)
.setSize(searchSize)
.setSize(AnomalyDetectorsIndex.CONFIG_INDEX_MAX_RESULTS_WINDOW)
.request();
}

Expand All @@ -458,7 +460,7 @@ public void expandDatafeedConfigs(String expression, boolean allowNoDatafeeds, A
SearchRequest searchRequest = client.prepareSearch(AnomalyDetectorsIndex.configIndexName())
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
.setSource(sourceBuilder)
.setSize(searchSize)
.setSize(AnomalyDetectorsIndex.CONFIG_INDEX_MAX_RESULTS_WINDOW)
.request();

ExpandedIdsMatcher requiredMatches = new ExpandedIdsMatcher(tokens, allowNoDatafeeds);
Expand Down Expand Up @@ -514,7 +516,7 @@ public void expandDatafeedConfigsWithoutMissingCheck(String expression, ActionLi
SearchRequest searchRequest = client.prepareSearch(AnomalyDetectorsIndex.configIndexName())
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
.setSource(sourceBuilder)
.setSize(searchSize)
.setSize(AnomalyDetectorsIndex.CONFIG_INDEX_MAX_RESULTS_WINDOW)
.request();

executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, searchRequest,
Expand Down
Loading

0 comments on commit 0cd088b

Please sign in to comment.