Skip to content

Commit

Permalink
Use any index specified by .watches for Watcher (#39541) (#39707)
Browse files Browse the repository at this point in the history
Previously, Watcher only attached its listener to indices that started
with the prefix `.watches`, which causes Watcher to silently fail to
schedule newly created Watches if the `.watches` alias is redirected to
an index that does not start with `.watches`.

Watcher now attaches the listener to all indices, so that Watcher can
respond to changes in which index has the `.watches` alias.

Also adjusts the tests to randomly use non-prefixed concrete indices 
for .watches and .triggered_watches.
  • Loading branch information
gwbrown authored Mar 5, 2019
1 parent 19fa399 commit 154b650
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -574,11 +574,9 @@ public void onIndexModule(IndexModule module) {
}

assert listener != null;
// for now, we only add this index operation listener to indices starting with .watches
// this also means, that aliases pointing to this index have to follow this notation
if (module.getIndex().getName().startsWith(Watch.INDEX)) {
module.addIndexOperationListener(listener);
}
// Attach a listener to every index so that we can react to alias changes.
// This listener will be a no-op except on the index pointed to by .watches
module.addIndexOperationListener(listener);
}

static void validAutoCreateIndex(Settings settings, Logger logger) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.watcher;

import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.protocol.xpack.watcher.PutWatchResponse;
import org.elasticsearch.xpack.core.watcher.watch.Watch;
import org.elasticsearch.xpack.watcher.condition.InternalAlwaysCondition;
import org.elasticsearch.xpack.watcher.test.AbstractWatcherIntegrationTestCase;

import java.util.Locale;

import static org.elasticsearch.xpack.watcher.actions.ActionBuilders.indexAction;
import static org.elasticsearch.xpack.watcher.client.WatchSourceBuilders.watchBuilder;
import static org.elasticsearch.xpack.watcher.input.InputBuilders.noneInput;
import static org.elasticsearch.xpack.watcher.trigger.TriggerBuilders.schedule;
import static org.elasticsearch.xpack.watcher.trigger.schedule.Schedules.interval;
import static org.hamcrest.Matchers.greaterThan;

public class WatcherConcreteIndexTests extends AbstractWatcherIntegrationTestCase {

@Override
protected boolean timeWarped() {
return false;
}

public void testCanUseAnyConcreteIndexName() throws Exception {
String newWatcherIndexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
String watchResultsIndex = randomAlphaOfLength(11).toLowerCase(Locale.ROOT);
createIndex(watchResultsIndex);

stopWatcher();
replaceWatcherIndexWithRandomlyNamedIndex(Watch.INDEX, newWatcherIndexName, Watch.DOC_TYPE);
startWatcher();

PutWatchResponse putWatchResponse = watcherClient().preparePutWatch("mywatch").setSource(watchBuilder()
.trigger(schedule(interval("3s")))
.input(noneInput())
.condition(InternalAlwaysCondition.INSTANCE)
.addAction("indexer", indexAction(watchResultsIndex, "_doc")))
.get();

assertTrue(putWatchResponse.isCreated());

assertBusy(() -> {
SearchResponse searchResult = client().prepareSearch(watchResultsIndex).setTrackTotalHits(true).get();
assertThat((int) searchResult.getHits().getTotalHits().value, greaterThan(0));
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,17 @@
package org.elasticsearch.xpack.watcher.test;

import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesResponse;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesResponse;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.analysis.common.CommonAnalysisPlugin;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.network.NetworkModule;
Expand Down Expand Up @@ -194,7 +197,7 @@ public void _setup() throws Exception {
internalCluster().setDisruptionScheme(ice);
ice.startDisrupting();
}

stopWatcher();
createWatcherIndicesOrAliases();
startWatcher();
}
Expand All @@ -221,13 +224,19 @@ private void createWatcherIndicesOrAliases() throws Exception {
// alias for .watches, setting the index template to the same as well
String watchIndexName;
String triggeredWatchIndexName;
if (rarely()) {
watchIndexName = ".watches-alias-index";
CreateIndexResponse response = client().admin().indices().prepareCreate(watchIndexName)
if (randomBoolean()) {
// Create an index to get the template
String tempIndex = ".watches" + randomAlphaOfLength(5).toLowerCase(Locale.ROOT);
CreateIndexResponse response = client().admin().indices().prepareCreate(tempIndex)
.setCause("Index to test aliases with .watches index")
.addAlias(new Alias(Watch.INDEX))
.get();
assertAcked(response);

// Now replace it with a randomly named index
watchIndexName = randomAlphaOfLengthBetween(5,10).toLowerCase(Locale.ROOT);
replaceWatcherIndexWithRandomlyNamedIndex(Watch.INDEX, watchIndexName, Watch.DOC_TYPE);

logger.info("set alias for .watches index to [{}]", watchIndexName);
} else {
watchIndexName = Watch.INDEX;
Expand All @@ -239,13 +248,19 @@ private void createWatcherIndicesOrAliases() throws Exception {
}

// alias for .triggered-watches, ensuring the index template is set appropriately
if (rarely()) {
triggeredWatchIndexName = ".triggered_watches-alias-index";
CreateIndexResponse response = client().admin().indices().prepareCreate(triggeredWatchIndexName)
if (randomBoolean()) {
String tempIndex = ".triggered_watches-alias-index";
CreateIndexResponse response = client().admin().indices().prepareCreate(tempIndex)
.setCause("Index to test aliases with .triggered-watches index")
.addAlias(new Alias(TriggeredWatchStoreField.INDEX_NAME))
.get();
assertAcked(response);

// Now replace it with a randomly-named index
triggeredWatchIndexName = randomValueOtherThan(watchIndexName,
() -> randomAlphaOfLengthBetween(5,10).toLowerCase(Locale.ROOT));
replaceWatcherIndexWithRandomlyNamedIndex(TriggeredWatchStoreField.INDEX_NAME, triggeredWatchIndexName,
TriggeredWatchStoreField.DOC_TYPE);
logger.info("set alias for .triggered-watches index to [{}]", triggeredWatchIndexName);
} else {
triggeredWatchIndexName = TriggeredWatchStoreField.INDEX_NAME;
Expand All @@ -259,6 +274,38 @@ private void createWatcherIndicesOrAliases() throws Exception {
}
}

public void replaceWatcherIndexWithRandomlyNamedIndex(String originalIndexOrAlias, String to, String docType) {
GetIndexResponse index = client().admin().indices().prepareGetIndex().setIndices(originalIndexOrAlias).get();
MappingMetaData mapping = index.getMappings().get(index.getIndices()[0]).get(docType);

Settings settings = index.getSettings().get(index.getIndices()[0]);
Settings.Builder newSettings = Settings.builder().put(settings);
newSettings.remove("index.provided_name");
newSettings.remove("index.uuid");
newSettings.remove("index.creation_date");
newSettings.remove("index.version.created");

CreateIndexResponse createIndexResponse = client().admin().indices().prepareCreate(to)
.addMapping(docType, mapping.sourceAsMap())
.setSettings(newSettings)
.get();
assertTrue(createIndexResponse.isAcknowledged());
ensureGreen(to);

AtomicReference<String> originalIndex = new AtomicReference<>(originalIndexOrAlias);
boolean watchesIsAlias = client().admin().indices().prepareAliasesExist(originalIndexOrAlias).get().isExists();
if (watchesIsAlias) {
GetAliasesResponse aliasesResponse = client().admin().indices().prepareGetAliases(originalIndexOrAlias).get();
assertEquals(1, aliasesResponse.getAliases().size());
aliasesResponse.getAliases().forEach((aliasRecord) -> {
assertEquals(1, aliasRecord.value.size());
originalIndex.set(aliasRecord.key);
});
}
client().admin().indices().prepareDelete(originalIndex.get()).get();
client().admin().indices().prepareAliases().addAlias(to, originalIndexOrAlias).get();
}

protected TimeWarp timeWarp() {
assert timeWarped() : "cannot access TimeWarp when test context is not time warped";
return timeWarp;
Expand Down

0 comments on commit 154b650

Please sign in to comment.