Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename .ml-state index to .ml-state-000001 to support rollover #52510

Merged
merged 4 commits into from
Feb 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
import org.elasticsearch.xpack.core.template.TemplateUtils;

import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.function.Predicate;
import java.util.regex.Pattern;

import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
Expand All @@ -36,6 +38,29 @@ public final class AnomalyDetectorsIndex {
private static final String RESULTS_MAPPINGS_VERSION_VARIABLE = "xpack.ml.version";
private static final String RESOURCE_PATH = "/org/elasticsearch/xpack/core/ml/anomalydetection/";

// Visible for testing
static final Comparator<String> STATE_INDEX_NAME_COMPARATOR = new Comparator<>() {

private final Predicate<String> HAS_SIX_DIGIT_SUFFIX = Pattern.compile("\\d{6}").asMatchPredicate();

@Override
public int compare(String index1, String index2) {
String[] index1Parts = index1.split("-");
String index1Suffix = index1Parts[index1Parts.length - 1];
boolean index1HasSixDigitsSuffix = HAS_SIX_DIGIT_SUFFIX.test(index1Suffix);
String[] index2Parts = index2.split("-");
String index2Suffix = index2Parts[index2Parts.length - 1];
boolean index2HasSixDigitsSuffix = HAS_SIX_DIGIT_SUFFIX.test(index2Suffix);
if (index1HasSixDigitsSuffix && index2HasSixDigitsSuffix) {
return index1Suffix.compareTo(index2Suffix);
} else if (index1HasSixDigitsSuffix != index2HasSixDigitsSuffix) {
return Boolean.compare(index1HasSixDigitsSuffix, index2HasSixDigitsSuffix);
} else {
return index1.compareTo(index2);
}
}
};

private AnomalyDetectorsIndex() {
}

Expand Down Expand Up @@ -89,8 +114,8 @@ public static String configIndexName() {
}

/**
* Create the .ml-state index (if necessary)
* Create the .ml-state-write alias for the .ml-state index (if necessary)
* Creates the .ml-state-000001 index (if necessary)
* Creates the .ml-state-write alias for the .ml-state-000001 index (if necessary)
*/
public static void createStateIndexAndAliasIfNecessary(Client client, ClusterState state, final ActionListener<Boolean> finalListener) {

Expand Down Expand Up @@ -122,12 +147,14 @@ public static void createStateIndexAndAliasIfNecessary(Client client, ClusterSta
IndicesOptions.lenientExpandOpen(),
jobStateIndexPattern());
if (stateIndices.length > 0) {
Arrays.sort(stateIndices, Collections.reverseOrder());
createAliasListener.onResponse(stateIndices[0]);
String latestStateIndex = Arrays.stream(stateIndices).max(STATE_INDEX_NAME_COMPARATOR).get();
createAliasListener.onResponse(latestStateIndex);
} else {
// The initial index name must be suitable for rollover functionality.
String initialJobStateIndex = AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX + "-000001";
CreateIndexRequest createIndexRequest = client.admin()
.indices()
.prepareCreate(AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX)
.prepareCreate(initialJobStateIndex)
.addAlias(new Alias(jobStateIndexWriteAlias()))
.request();
executeAsyncWithOrigin(client.threadPool().getThreadContext(),
Expand All @@ -140,7 +167,7 @@ public static void createStateIndexAndAliasIfNecessary(Client client, ClusterSta
// Adding an alias that already exists is idempotent. So, no need to double check if the alias exists
// as well.
if (ExceptionsHelper.unwrapCause(createIndexFailure) instanceof ResourceAlreadyExistsException) {
createAliasListener.onResponse(AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX);
createAliasListener.onResponse(initialJobStateIndex);
} else {
finalListener.onFailure(createIndexFailure);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,11 @@

import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Stream;

import static java.util.stream.Collectors.toMap;
import static org.hamcrest.Matchers.contains;
Expand All @@ -55,7 +57,8 @@

public class AnomalyDetectorsIndexTests extends ESTestCase {

private static final String ML_STATE = ".ml-state";
private static final String LEGACY_ML_STATE = ".ml-state";
private static final String INITIAL_ML_STATE = ".ml-state-000001";
private static final String ML_STATE_WRITE_ALIAS = ".ml-state-write";

private ThreadPool threadPool;
Expand All @@ -73,9 +76,9 @@ public void setUpMocks() {
when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY));

indicesAdminClient = mock(IndicesAdminClient.class);
when(indicesAdminClient.prepareCreate(ML_STATE))
.thenReturn(new CreateIndexRequestBuilder(client, CreateIndexAction.INSTANCE, ML_STATE));
doAnswer(withResponse(new CreateIndexResponse(true, true, ML_STATE))).when(indicesAdminClient).create(any(), any());
when(indicesAdminClient.prepareCreate(INITIAL_ML_STATE))
.thenReturn(new CreateIndexRequestBuilder(client, CreateIndexAction.INSTANCE, INITIAL_ML_STATE));
doAnswer(withResponse(new CreateIndexResponse(true, true, INITIAL_ML_STATE))).when(indicesAdminClient).create(any(), any());
when(indicesAdminClient.prepareAliases()).thenReturn(new IndicesAliasesRequestBuilder(client, IndicesAliasesAction.INSTANCE));
doAnswer(withResponse(new AcknowledgedResponse(true))).when(indicesAdminClient).aliases(any(), any());

Expand All @@ -102,12 +105,12 @@ public void testCreateStateIndexAndAliasIfNecessary_CleanState() {
AnomalyDetectorsIndex.createStateIndexAndAliasIfNecessary(client, clusterState, finalListener);

InOrder inOrder = inOrder(indicesAdminClient, finalListener);
inOrder.verify(indicesAdminClient).prepareCreate(ML_STATE);
inOrder.verify(indicesAdminClient).prepareCreate(INITIAL_ML_STATE);
inOrder.verify(indicesAdminClient).create(createRequestCaptor.capture(), any());
inOrder.verify(finalListener).onResponse(true);

CreateIndexRequest createRequest = createRequestCaptor.getValue();
assertThat(createRequest.index(), equalTo(ML_STATE));
assertThat(createRequest.index(), equalTo(INITIAL_ML_STATE));
assertThat(createRequest.aliases(), equalTo(Collections.singleton(new Alias(ML_STATE_WRITE_ALIAS))));
}

Expand All @@ -118,8 +121,12 @@ private void assertNoClientInteractionsWhenWriteAliasAlreadyExists(String indexN
verify(finalListener).onResponse(false);
}

public void testCreateStateIndexAndAliasIfNecessary_WriteAliasAlreadyExistsAndPointsAtLegacyStateIndex() {
assertNoClientInteractionsWhenWriteAliasAlreadyExists(LEGACY_ML_STATE);
}

public void testCreateStateIndexAndAliasIfNecessary_WriteAliasAlreadyExistsAndPointsAtInitialStateIndex() {
assertNoClientInteractionsWhenWriteAliasAlreadyExists(".ml-state-000001");
assertNoClientInteractionsWhenWriteAliasAlreadyExists(INITIAL_ML_STATE);
}

public void testCreateStateIndexAndAliasIfNecessary_WriteAliasAlreadyExistsAndPointsAtSubsequentStateIndex() {
Expand Down Expand Up @@ -147,9 +154,14 @@ private void assertMlStateWriteAliasAddedToMostRecentMlStateIndex(List<String> e
contains(AliasActions.add().alias(ML_STATE_WRITE_ALIAS).index(expectedWriteIndexName)));
}

public void testCreateStateIndexAndAliasIfNecessary_WriteAliasDoesNotExistButLegacyStateIndexExists() {
assertMlStateWriteAliasAddedToMostRecentMlStateIndex(
Arrays.asList(LEGACY_ML_STATE), LEGACY_ML_STATE);
}

public void testCreateStateIndexAndAliasIfNecessary_WriteAliasDoesNotExistButInitialStateIndexExists() {
assertMlStateWriteAliasAddedToMostRecentMlStateIndex(
Arrays.asList(".ml-state-000001"), ".ml-state-000001");
Arrays.asList(INITIAL_ML_STATE), INITIAL_ML_STATE);
}

public void testCreateStateIndexAndAliasIfNecessary_WriteAliasDoesNotExistButSubsequentStateIndicesExist() {
Expand All @@ -159,7 +171,32 @@ public void testCreateStateIndexAndAliasIfNecessary_WriteAliasDoesNotExistButSub

public void testCreateStateIndexAndAliasIfNecessary_WriteAliasDoesNotExistButBothLegacyAndNewStateIndicesDoExist() {
assertMlStateWriteAliasAddedToMostRecentMlStateIndex(
Arrays.asList(ML_STATE, ".ml-state-000003", ".ml-state-000040", ".ml-state-000500"), ".ml-state-000500");
Arrays.asList(LEGACY_ML_STATE, ".ml-state-000003", ".ml-state-000040", ".ml-state-000500"), ".ml-state-000500");
}

public void testStateIndexNameComparator() {
Comparator<String> comparator = AnomalyDetectorsIndex.STATE_INDEX_NAME_COMPARATOR;
assertThat(
Stream.of(".ml-state-000001").max(comparator).get(),
equalTo(".ml-state-000001"));
assertThat(
Stream.of(".ml-state-000002", ".ml-state-000001").max(comparator).get(),
equalTo(".ml-state-000002"));
assertThat(
Stream.of(".ml-state-000003", ".ml-state-000040", ".ml-state-000500").max(comparator).get(),
equalTo(".ml-state-000500"));
assertThat(
Stream.of(".ml-state-000042", ".ml-state-000049", ".ml-state-000038").max(comparator).get(),
equalTo(".ml-state-000049"));
assertThat(
Stream.of(".ml-state", ".ml-state-000003", ".ml-state-000040", ".ml-state-000500").max(comparator).get(),
equalTo(".ml-state-000500"));
assertThat(
Stream.of(".reindexed-6-ml-state", ".ml-state-000042").max(comparator).get(),
equalTo(".ml-state-000042"));
assertThat(
Stream.of(".a-000002", ".b-000001").max(comparator).get(),
equalTo(".a-000002"));
}

@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,7 @@ public void testDeleteExpiredData_RemovesUnusedState() throws Exception {
// Now calling the _delete_expired_data API should remove unused state
assertThat(deleteExpiredData().isDeleted(), is(true));

SearchResponse stateIndexSearchResponse = client().prepareSearch(".ml-state").execute().actionGet();
SearchResponse stateIndexSearchResponse = client().prepareSearch(".ml-state*").execute().actionGet();
assertThat(stateIndexSearchResponse.getHits().getTotalHits().value, equalTo(0L));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ public void testDeleteExpiredData_RemovesUnusedState() throws Exception {
// Now calling the _delete_expired_data API should remove unused state
assertThat(deleteExpiredData().isDeleted(), is(true));

SearchResponse stateIndexSearchResponse = client().prepareSearch(".ml-state").execute().actionGet();
SearchResponse stateIndexSearchResponse = client().prepareSearch(".ml-state*").execute().actionGet();
assertThat(stateIndexSearchResponse.getHits().getTotalHits().value, equalTo(0L));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ record = new HashMap<>();
}

private Quantiles getQuantiles(String jobId) {
SearchResponse response = client().prepareSearch(".ml-state")
SearchResponse response = client().prepareSearch(".ml-state*")
.setQuery(QueryBuilders.idsQuery().addIds(Quantiles.documentId(jobId)))
.setSize(1)
.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.AliasMetaData;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
Expand All @@ -22,7 +21,6 @@
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndexFields;

import java.util.List;

Expand All @@ -45,9 +43,6 @@ public void testVerifyIndicesPrimaryShardsAreActive() {
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
);
if (indexName.equals(AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX)) {
indexMetaData.putAlias(new AliasMetaData.Builder(AnomalyDetectorsIndex.jobStateIndexWriteAlias()));
}
metaData.put(indexMetaData);
Index index = new Index(indexName, "_uuid");
ShardId shardId = new ShardId(index, 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -960,7 +960,7 @@ setup:
headers:
Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser
index:
index: .ml-state
index: .ml-state-000001
id: "delete_foo_regression_state#1"
body: >
{
Expand All @@ -970,7 +970,7 @@ setup:
headers:
Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser
index:
index: .ml-state
index: .ml-state-000001
id: "data_frame_analytics-delete_foo-progress"
body: >
{
Expand All @@ -980,11 +980,11 @@ setup:
headers:
Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser
indices.refresh:
index: .ml-state
index: .ml-state*

- do:
search:
index: .ml-state
index: .ml-state*
body:
size: 0
query:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ setup:
headers:
Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser
index:
index: .ml-state
index: .ml-state-000001
id: "delete-model-snapshot_model_state_inactive-snapshot#1"
body: >
{
Expand All @@ -66,7 +66,7 @@ setup:
headers:
Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser
index:
index: .ml-state
index: .ml-state-000001
id: "delete-model-snapshot_model_state_inactive-snapshot#2"
body: >
{
Expand Down Expand Up @@ -118,7 +118,7 @@ setup:
headers:
Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser
indices.refresh:
index: .ml-state
index: .ml-state*

- do:
headers:
Expand Down Expand Up @@ -159,7 +159,7 @@ setup:
headers:
Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser
count:
index: .ml-state
index: .ml-state*

- match: { count: 3 }

Expand All @@ -179,7 +179,7 @@ setup:
headers:
Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser
indices.refresh:
index: .ml-state
index: .ml-state*

- do:
ml.get_model_snapshots:
Expand All @@ -191,7 +191,7 @@ setup:
headers:
Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser
count:
index: .ml-state
index: .ml-state*

- match: { count: 1 }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ setup:
Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser
Content-Type: application/json
index:
index: .ml-state
index: .ml-state-000001
id: "get-model-snapshots_model_state_1#1"
body: >
{
Expand All @@ -62,7 +62,7 @@ setup:
headers:
Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser
index:
index: .ml-state
index: .ml-state-000001
id: "get-model-snapshots_model_state_2#1"
body: >
{
Expand All @@ -72,7 +72,7 @@ setup:
headers:
Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser
index:
index: .ml-state
index: .ml-state-000001
id: "get-model-snapshots_model_state_2#2"
body: >
{
Expand All @@ -82,7 +82,7 @@ setup:
headers:
Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser
indices.refresh:
index: [.ml-anomalies-get-model-snapshots,.ml-state]
index: [.ml-anomalies-get-model-snapshots,.ml-state*]

---
"Test get model snapshots API with no params":
Expand Down
Loading