From 154b65023309342a5fa938ce24ec76930a9f8d38 Mon Sep 17 00:00:00 2001 From: Gordon Brown Date: Tue, 5 Mar 2019 11:45:26 -0700 Subject: [PATCH] Use any index specified by .watches for Watcher (#39541) (#39707) Previously, Watcher only attached its listener to indices that started with the prefix `.watches`, which causes Watcher to silently fail to schedule newly created Watches if the `.watches` alias is redirected to an index that does not start with `.watches`. Watcher now attaches the listener to all indices, so that Watcher can respond to changes in which index has the `.watches` alias. Also adjusts the tests to randomly use non-prefixed concrete indices for .watches and .triggered_watches. --- .../elasticsearch/xpack/watcher/Watcher.java | 8 +-- .../watcher/WatcherConcreteIndexTests.java | 54 ++++++++++++++++ .../AbstractWatcherIntegrationTestCase.java | 61 ++++++++++++++++--- 3 files changed, 111 insertions(+), 12 deletions(-) create mode 100644 x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherConcreteIndexTests.java diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/Watcher.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/Watcher.java index cf0883c7ef19b..6888019b2699c 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/Watcher.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/Watcher.java @@ -574,11 +574,9 @@ public void onIndexModule(IndexModule module) { } assert listener != null; - // for now, we only add this index operation listener to indices starting with .watches - // this also means, that aliases pointing to this index have to follow this notation - if (module.getIndex().getName().startsWith(Watch.INDEX)) { - module.addIndexOperationListener(listener); - } + // Attach a listener to every index so that we can react to alias changes. + // This listener will be a no-op except on the index pointed to by .watches + module.addIndexOperationListener(listener); } static void validAutoCreateIndex(Settings settings, Logger logger) { diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherConcreteIndexTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherConcreteIndexTests.java new file mode 100644 index 0000000000000..b96f06e3f3c05 --- /dev/null +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherConcreteIndexTests.java @@ -0,0 +1,54 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.watcher; + +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.protocol.xpack.watcher.PutWatchResponse; +import org.elasticsearch.xpack.core.watcher.watch.Watch; +import org.elasticsearch.xpack.watcher.condition.InternalAlwaysCondition; +import org.elasticsearch.xpack.watcher.test.AbstractWatcherIntegrationTestCase; + +import java.util.Locale; + +import static org.elasticsearch.xpack.watcher.actions.ActionBuilders.indexAction; +import static org.elasticsearch.xpack.watcher.client.WatchSourceBuilders.watchBuilder; +import static org.elasticsearch.xpack.watcher.input.InputBuilders.noneInput; +import static org.elasticsearch.xpack.watcher.trigger.TriggerBuilders.schedule; +import static org.elasticsearch.xpack.watcher.trigger.schedule.Schedules.interval; +import static org.hamcrest.Matchers.greaterThan; + +public class WatcherConcreteIndexTests extends AbstractWatcherIntegrationTestCase { + + @Override + protected boolean timeWarped() { + return false; + } + + public void testCanUseAnyConcreteIndexName() throws Exception { + String newWatcherIndexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + String watchResultsIndex = randomAlphaOfLength(11).toLowerCase(Locale.ROOT); + createIndex(watchResultsIndex); + + stopWatcher(); + replaceWatcherIndexWithRandomlyNamedIndex(Watch.INDEX, newWatcherIndexName, Watch.DOC_TYPE); + startWatcher(); + + PutWatchResponse putWatchResponse = watcherClient().preparePutWatch("mywatch").setSource(watchBuilder() + .trigger(schedule(interval("3s"))) + .input(noneInput()) + .condition(InternalAlwaysCondition.INSTANCE) + .addAction("indexer", indexAction(watchResultsIndex, "_doc"))) + .get(); + + assertTrue(putWatchResponse.isCreated()); + + assertBusy(() -> { + SearchResponse searchResult = client().prepareSearch(watchResultsIndex).setTrackTotalHits(true).get(); + assertThat((int) searchResult.getHits().getTotalHits().value, greaterThan(0)); + }); + } +} diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/AbstractWatcherIntegrationTestCase.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/AbstractWatcherIntegrationTestCase.java index f710531532b87..8a38f13a149c3 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/AbstractWatcherIntegrationTestCase.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/AbstractWatcherIntegrationTestCase.java @@ -6,7 +6,9 @@ package org.elasticsearch.xpack.watcher.test; import org.elasticsearch.action.admin.indices.alias.Alias; +import org.elasticsearch.action.admin.indices.alias.get.GetAliasesResponse; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; +import org.elasticsearch.action.admin.indices.get.GetIndexResponse; import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesResponse; import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchResponse; @@ -14,6 +16,7 @@ import org.elasticsearch.analysis.common.CommonAnalysisPlugin; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.network.NetworkModule; @@ -194,7 +197,7 @@ public void _setup() throws Exception { internalCluster().setDisruptionScheme(ice); ice.startDisrupting(); } - + stopWatcher(); createWatcherIndicesOrAliases(); startWatcher(); } @@ -221,13 +224,19 @@ private void createWatcherIndicesOrAliases() throws Exception { // alias for .watches, setting the index template to the same as well String watchIndexName; String triggeredWatchIndexName; - if (rarely()) { - watchIndexName = ".watches-alias-index"; - CreateIndexResponse response = client().admin().indices().prepareCreate(watchIndexName) + if (randomBoolean()) { + // Create an index to get the template + String tempIndex = ".watches" + randomAlphaOfLength(5).toLowerCase(Locale.ROOT); + CreateIndexResponse response = client().admin().indices().prepareCreate(tempIndex) .setCause("Index to test aliases with .watches index") .addAlias(new Alias(Watch.INDEX)) .get(); assertAcked(response); + + // Now replace it with a randomly named index + watchIndexName = randomAlphaOfLengthBetween(5,10).toLowerCase(Locale.ROOT); + replaceWatcherIndexWithRandomlyNamedIndex(Watch.INDEX, watchIndexName, Watch.DOC_TYPE); + logger.info("set alias for .watches index to [{}]", watchIndexName); } else { watchIndexName = Watch.INDEX; @@ -239,13 +248,19 @@ private void createWatcherIndicesOrAliases() throws Exception { } // alias for .triggered-watches, ensuring the index template is set appropriately - if (rarely()) { - triggeredWatchIndexName = ".triggered_watches-alias-index"; - CreateIndexResponse response = client().admin().indices().prepareCreate(triggeredWatchIndexName) + if (randomBoolean()) { + String tempIndex = ".triggered_watches-alias-index"; + CreateIndexResponse response = client().admin().indices().prepareCreate(tempIndex) .setCause("Index to test aliases with .triggered-watches index") .addAlias(new Alias(TriggeredWatchStoreField.INDEX_NAME)) .get(); assertAcked(response); + + // Now replace it with a randomly-named index + triggeredWatchIndexName = randomValueOtherThan(watchIndexName, + () -> randomAlphaOfLengthBetween(5,10).toLowerCase(Locale.ROOT)); + replaceWatcherIndexWithRandomlyNamedIndex(TriggeredWatchStoreField.INDEX_NAME, triggeredWatchIndexName, + TriggeredWatchStoreField.DOC_TYPE); logger.info("set alias for .triggered-watches index to [{}]", triggeredWatchIndexName); } else { triggeredWatchIndexName = TriggeredWatchStoreField.INDEX_NAME; @@ -259,6 +274,38 @@ private void createWatcherIndicesOrAliases() throws Exception { } } + public void replaceWatcherIndexWithRandomlyNamedIndex(String originalIndexOrAlias, String to, String docType) { + GetIndexResponse index = client().admin().indices().prepareGetIndex().setIndices(originalIndexOrAlias).get(); + MappingMetaData mapping = index.getMappings().get(index.getIndices()[0]).get(docType); + + Settings settings = index.getSettings().get(index.getIndices()[0]); + Settings.Builder newSettings = Settings.builder().put(settings); + newSettings.remove("index.provided_name"); + newSettings.remove("index.uuid"); + newSettings.remove("index.creation_date"); + newSettings.remove("index.version.created"); + + CreateIndexResponse createIndexResponse = client().admin().indices().prepareCreate(to) + .addMapping(docType, mapping.sourceAsMap()) + .setSettings(newSettings) + .get(); + assertTrue(createIndexResponse.isAcknowledged()); + ensureGreen(to); + + AtomicReference originalIndex = new AtomicReference<>(originalIndexOrAlias); + boolean watchesIsAlias = client().admin().indices().prepareAliasesExist(originalIndexOrAlias).get().isExists(); + if (watchesIsAlias) { + GetAliasesResponse aliasesResponse = client().admin().indices().prepareGetAliases(originalIndexOrAlias).get(); + assertEquals(1, aliasesResponse.getAliases().size()); + aliasesResponse.getAliases().forEach((aliasRecord) -> { + assertEquals(1, aliasRecord.value.size()); + originalIndex.set(aliasRecord.key); + }); + } + client().admin().indices().prepareDelete(originalIndex.get()).get(); + client().admin().indices().prepareAliases().addAlias(to, originalIndexOrAlias).get(); + } + protected TimeWarp timeWarp() { assert timeWarped() : "cannot access TimeWarp when test context is not time warped"; return timeWarp;