diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/template/IndexTemplateRegistry.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/template/IndexTemplateRegistry.java index c0fd0c7c339f7..33ad3aca2e80f 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/template/IndexTemplateRegistry.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/template/IndexTemplateRegistry.java @@ -154,7 +154,7 @@ private void addTemplatesIfMissing(ClusterState state) { if (creationCheck.compareAndSet(false, true)) { IndexTemplateMetadata currentTemplate = state.metadata().getTemplates().get(templateName); if (Objects.isNull(currentTemplate)) { - logger.debug("adding index template [{}] for [{}], because it doesn't exist", templateName, getOrigin()); + logger.info("adding index template [{}] for [{}], because it doesn't exist", templateName, getOrigin()); putTemplate(newTemplate, creationCheck); } else if (Objects.isNull(currentTemplate.getVersion()) || newTemplate.getVersion() > currentTemplate.getVersion()) { // IndexTemplateConfig now enforces templates contain a `version` property, so if the template doesn't have one we can diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/history/HistoryStoreField.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/history/HistoryStoreField.java index 9952033747535..e682ab37108f7 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/history/HistoryStoreField.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/history/HistoryStoreField.java @@ -5,6 +5,8 @@ */ package org.elasticsearch.xpack.core.watcher.history; +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.common.time.DateFormatter; import org.elasticsearch.xpack.core.watcher.support.WatcherIndexTemplateRegistryField; @@ -14,12 +16,18 @@ public final class HistoryStoreField { public static final String INDEX_PREFIX = ".watcher-history-"; public static final String INDEX_PREFIX_WITH_TEMPLATE = INDEX_PREFIX + WatcherIndexTemplateRegistryField.INDEX_TEMPLATE_VERSION + "-"; + public static final String INDEX_PREFIX_WITH_TEMPLATE_10 = INDEX_PREFIX + + WatcherIndexTemplateRegistryField.INDEX_TEMPLATE_VERSION_10 + "-"; private static final DateFormatter indexTimeFormat = DateFormatter.forPattern("yyyy.MM.dd"); /** * Calculates the correct history index name for a given time */ - public static String getHistoryIndexNameForTime(ZonedDateTime time) { - return INDEX_PREFIX_WITH_TEMPLATE + indexTimeFormat.format(time); + public static String getHistoryIndexNameForTime(ZonedDateTime time, ClusterState state) { + if (state == null || state.nodes().getMinNodeVersion().onOrAfter(Version.V_7_7_0)) { + return INDEX_PREFIX_WITH_TEMPLATE + indexTimeFormat.format(time); + } else { + return INDEX_PREFIX_WITH_TEMPLATE_10 + indexTimeFormat.format(time); + } } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/support/WatcherIndexTemplateRegistryField.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/support/WatcherIndexTemplateRegistryField.java index 4a2524beecf31..94fe152dc9472 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/support/WatcherIndexTemplateRegistryField.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/support/WatcherIndexTemplateRegistryField.java @@ -18,8 +18,9 @@ public final class WatcherIndexTemplateRegistryField { // version 11: watch history indices are hidden // Note: if you change this, also inform the kibana team around the watcher-ui public static final int INDEX_TEMPLATE_VERSION = 11; + public static final int INDEX_TEMPLATE_VERSION_10 = 10; public static final String HISTORY_TEMPLATE_NAME = ".watch-history-" + INDEX_TEMPLATE_VERSION; - public static final String HISTORY_TEMPLATE_NAME_10 = ".watch-history-10"; + public static final String HISTORY_TEMPLATE_NAME_10 = ".watch-history-" + INDEX_TEMPLATE_VERSION_10; public static final String HISTORY_TEMPLATE_NAME_NO_ILM = ".watch-history-no-ilm-" + INDEX_TEMPLATE_VERSION; public static final String HISTORY_TEMPLATE_NAME_NO_ILM_10 = ".watch-history-no-ilm-10"; public static final String TRIGGERED_TEMPLATE_NAME = ".triggered_watches"; diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/store/ReservedRolesStoreTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/store/ReservedRolesStoreTests.java index 803bc234b5aac..2624787bf1f40 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/store/ReservedRolesStoreTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/store/ReservedRolesStoreTests.java @@ -1395,7 +1395,7 @@ public void testWatcherAdminRole() { assertThat(role.indices().allowedIndicesMatcher(IndexAction.NAME).test("foo"), is(false)); ZonedDateTime now = ZonedDateTime.now(ZoneOffset.UTC); - String historyIndex = HistoryStoreField.getHistoryIndexNameForTime(now); + String historyIndex = HistoryStoreField.getHistoryIndexNameForTime(now, null); for (String index : new String[]{ Watch.INDEX, historyIndex, TriggeredWatchStoreField.INDEX_NAME }) { assertOnlyReadAllowed(role, index); } @@ -1429,7 +1429,7 @@ public void testWatcherUserRole() { assertThat(role.indices().allowedIndicesMatcher(IndexAction.NAME).test(TriggeredWatchStoreField.INDEX_NAME), is(false)); ZonedDateTime now = ZonedDateTime.now(ZoneOffset.UTC); - String historyIndex = HistoryStoreField.getHistoryIndexNameForTime(now); + String historyIndex = HistoryStoreField.getHistoryIndexNameForTime(now, null); for (String index : new String[]{ Watch.INDEX, historyIndex }) { assertOnlyReadAllowed(role, index); } diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/Watcher.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/Watcher.java index 8378ce2045372..bf136d9070728 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/Watcher.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/Watcher.java @@ -386,7 +386,7 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure) .setConcurrentRequests(SETTING_BULK_CONCURRENT_REQUESTS.get(settings)) .build(); - HistoryStore historyStore = new HistoryStore(bulkProcessor); + HistoryStore historyStore = new HistoryStore(bulkProcessor, clusterService::state); // schedulers final Set scheduleParsers = new HashSet<>(); @@ -623,14 +623,14 @@ static void validAutoCreateIndex(Settings settings, Logger logger) { indices.add(".watches"); indices.add(".triggered_watches"); ZonedDateTime now = ZonedDateTime.now(ZoneOffset.UTC); - indices.add(HistoryStoreField.getHistoryIndexNameForTime(now)); - indices.add(HistoryStoreField.getHistoryIndexNameForTime(now.plusDays(1))); - indices.add(HistoryStoreField.getHistoryIndexNameForTime(now.plusMonths(1))); - indices.add(HistoryStoreField.getHistoryIndexNameForTime(now.plusMonths(2))); - indices.add(HistoryStoreField.getHistoryIndexNameForTime(now.plusMonths(3))); - indices.add(HistoryStoreField.getHistoryIndexNameForTime(now.plusMonths(4))); - indices.add(HistoryStoreField.getHistoryIndexNameForTime(now.plusMonths(5))); - indices.add(HistoryStoreField.getHistoryIndexNameForTime(now.plusMonths(6))); + indices.add(HistoryStoreField.getHistoryIndexNameForTime(now, null)); + indices.add(HistoryStoreField.getHistoryIndexNameForTime(now.plusDays(1), null)); + indices.add(HistoryStoreField.getHistoryIndexNameForTime(now.plusMonths(1), null)); + indices.add(HistoryStoreField.getHistoryIndexNameForTime(now.plusMonths(2), null)); + indices.add(HistoryStoreField.getHistoryIndexNameForTime(now.plusMonths(3), null)); + indices.add(HistoryStoreField.getHistoryIndexNameForTime(now.plusMonths(4), null)); + indices.add(HistoryStoreField.getHistoryIndexNameForTime(now.plusMonths(5), null)); + indices.add(HistoryStoreField.getHistoryIndexNameForTime(now.plusMonths(6), null)); for (String index : indices) { boolean matched = false; for (String match : matches) { diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java index 296c521771fe5..94a64cdb94cbb 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java @@ -452,7 +452,7 @@ private void executeAsync(WatchExecutionContext ctx, final TriggeredWatch trigge * Any existing watchRecord will be overwritten. */ private void forcePutHistory(WatchRecord watchRecord) { - String index = HistoryStoreField.getHistoryIndexNameForTime(watchRecord.triggerEvent().triggeredTime()); + String index = HistoryStoreField.getHistoryIndexNameForTime(watchRecord.triggerEvent().triggeredTime(), clusterService.state()); try { try (XContentBuilder builder = XContentFactory.jsonBuilder(); ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashWithOrigin(WATCHER_ORIGIN)) { diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/history/HistoryStore.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/history/HistoryStore.java index 5147fb92154ef..ed43fd4171946 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/history/HistoryStore.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/history/HistoryStore.java @@ -31,9 +31,11 @@ public class HistoryStore { private static final Logger logger = LogManager.getLogger(HistoryStore.class); private final BulkProcessor bulkProcessor; + private final Supplier clusterStateSupplier; - public HistoryStore(BulkProcessor bulkProcessor) { + public HistoryStore(BulkProcessor bulkProcessor, Supplier clusterStateSupplier) { this.bulkProcessor = bulkProcessor; + this.clusterStateSupplier = clusterStateSupplier; } /** @@ -41,7 +43,7 @@ public HistoryStore(BulkProcessor bulkProcessor) { * If the specified watchRecord already was stored this call will fail with a version conflict. */ public void put(WatchRecord watchRecord) throws Exception { - String index = HistoryStoreField.getHistoryIndexNameForTime(watchRecord.triggerEvent().triggeredTime()); + String index = HistoryStoreField.getHistoryIndexNameForTime(watchRecord.triggerEvent().triggeredTime(), clusterStateSupplier.get()); try (XContentBuilder builder = XContentFactory.jsonBuilder()) { watchRecord.toXContent(builder, WatcherParams.HIDE_SECRETS); @@ -58,7 +60,7 @@ public void put(WatchRecord watchRecord) throws Exception { * Any existing watchRecord will be overwritten. */ public void forcePut(WatchRecord watchRecord) { - String index = HistoryStoreField.getHistoryIndexNameForTime(watchRecord.triggerEvent().triggeredTime()); + String index = HistoryStoreField.getHistoryIndexNameForTime(watchRecord.triggerEvent().triggeredTime(), clusterStateSupplier.get()); try (XContentBuilder builder = XContentFactory.jsonBuilder()) { watchRecord.toXContent(builder, WatcherParams.HIDE_SECRETS); @@ -78,7 +80,7 @@ public void forcePut(WatchRecord watchRecord) { * @return true, if history store is ready to be started */ public static boolean validate(ClusterState state) { - String currentIndex = HistoryStoreField.getHistoryIndexNameForTime(ZonedDateTime.now(ZoneOffset.UTC)); + String currentIndex = HistoryStoreField.getHistoryIndexNameForTime(ZonedDateTime.now(ZoneOffset.UTC), state); IndexMetadata indexMetadata = WatchStoreUtils.getConcreteIndex(currentIndex, state.metadata()); return indexMetadata == null || (indexMetadata.getState() == IndexMetadata.State.OPEN && state.routingTable().index(indexMetadata.getIndex()).allPrimaryShardsActive()); diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/history/HistoryStoreTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/history/HistoryStoreTests.java index 59aef01f180a1..bf25d887223e5 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/history/HistoryStoreTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/history/HistoryStoreTests.java @@ -7,6 +7,7 @@ import org.apache.http.HttpStatus; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.DocWriteRequest.OpType; import org.elasticsearch.action.bulk.BulkItemResponse; @@ -16,6 +17,8 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.settings.MockSecureSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; @@ -43,11 +46,13 @@ import java.time.Instant; import java.time.ZoneOffset; import java.time.ZonedDateTime; +import java.util.Arrays; import static java.util.Collections.emptyMap; import static java.util.Collections.singletonMap; import static org.elasticsearch.xpack.core.watcher.history.HistoryStoreField.getHistoryIndexNameForTime; import static org.elasticsearch.xpack.core.watcher.support.WatcherIndexTemplateRegistryField.INDEX_TEMPLATE_VERSION; +import static org.elasticsearch.xpack.core.watcher.support.WatcherIndexTemplateRegistryField.INDEX_TEMPLATE_VERSION_10; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.instanceOf; @@ -63,24 +68,30 @@ public class HistoryStoreTests extends ESTestCase { private HistoryStore historyStore; private Client client; + private ClusterState clusterState; + private DiscoveryNodes discoveryNodes; @Before public void init() { Settings settings = Settings.builder().put("node.name", randomAlphaOfLength(10)).build(); client = mock(Client.class); + clusterState = mock(ClusterState.class); + discoveryNodes = mock(DiscoveryNodes.class); ThreadPool threadPool = mock(ThreadPool.class); when(client.threadPool()).thenReturn(threadPool); when(client.settings()).thenReturn(settings); when(threadPool.getThreadContext()).thenReturn(new ThreadContext(settings)); + when(clusterState.nodes()).thenReturn(discoveryNodes); + when(discoveryNodes.getMinNodeVersion()).thenReturn(randomFrom(Arrays.asList(Version.V_7_0_0, Version.V_7_7_0))); BulkProcessor.Listener listener = mock(BulkProcessor.Listener.class); BulkProcessor bulkProcessor = BulkProcessor.builder(client::bulk, listener).setConcurrentRequests(0).setBulkActions(1).build(); - historyStore = new HistoryStore(bulkProcessor); + historyStore = new HistoryStore(bulkProcessor, () -> clusterState); } public void testPut() throws Exception { ZonedDateTime now = Instant.ofEpochMilli(0).atZone(ZoneOffset.UTC); Wid wid = new Wid("_name", now); - String index = getHistoryIndexNameForTime(now); + String index = getHistoryIndexNameForTime(now, clusterState); ScheduleTriggerEvent event = new ScheduleTriggerEvent(wid.watchId(), now, now); WatchRecord watchRecord = new WatchRecord.MessageWatchRecord(wid, event, ExecutionState.EXECUTED, null, randomAlphaOfLength(10)); @@ -105,15 +116,11 @@ public void testPut() throws Exception { } public void testIndexNameGeneration() { - String indexTemplateVersion = Integer.toString(INDEX_TEMPLATE_VERSION); - assertThat(getHistoryIndexNameForTime(Instant.ofEpochMilli((long) 0).atZone(ZoneOffset.UTC)), - equalTo(".watcher-history-"+ indexTemplateVersion +"-1970.01.01")); - assertThat(getHistoryIndexNameForTime(Instant.ofEpochMilli(100000000000L).atZone(ZoneOffset.UTC)), - equalTo(".watcher-history-" + indexTemplateVersion + "-1973.03.03")); - assertThat(getHistoryIndexNameForTime(Instant.ofEpochMilli(1416582852000L).atZone(ZoneOffset.UTC)), - equalTo(".watcher-history-" + indexTemplateVersion + "-2014.11.21")); - assertThat(getHistoryIndexNameForTime(Instant.ofEpochMilli(2833165811000L).atZone(ZoneOffset.UTC)), - equalTo(".watcher-history-" + indexTemplateVersion + "-2059.10.12")); + when(discoveryNodes.getMinNodeVersion()).thenReturn(Version.V_7_7_0); + assertHistoryIndexName(Integer.toString(INDEX_TEMPLATE_VERSION)); + + when(discoveryNodes.getMinNodeVersion()).thenReturn(Version.V_7_0_0); + assertHistoryIndexName(Integer.toString(INDEX_TEMPLATE_VERSION_10)); } public void testStoreWithHideSecrets() throws Exception { @@ -179,4 +186,15 @@ public void testStoreWithHideSecrets() throws Exception { assertThat(indexedJson, containsString(username)); assertThat(indexedJson, not(containsString(password))); } + + private void assertHistoryIndexName(String indexTemplateVersion){ + assertThat(getHistoryIndexNameForTime(Instant.ofEpochMilli((long) 0).atZone(ZoneOffset.UTC), clusterState), + equalTo(".watcher-history-" + indexTemplateVersion + "-1970.01.01")); + assertThat(getHistoryIndexNameForTime(Instant.ofEpochMilli(100000000000L).atZone(ZoneOffset.UTC), clusterState), + equalTo(".watcher-history-" + indexTemplateVersion + "-1973.03.03")); + assertThat(getHistoryIndexNameForTime(Instant.ofEpochMilli(1416582852000L).atZone(ZoneOffset.UTC), clusterState), + equalTo(".watcher-history-" + indexTemplateVersion + "-2014.11.21")); + assertThat(getHistoryIndexNameForTime(Instant.ofEpochMilli(2833165811000L).atZone(ZoneOffset.UTC), clusterState), + equalTo(".watcher-history-" + indexTemplateVersion + "-2059.10.12")); + } } diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/AbstractWatcherIntegrationTestCase.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/AbstractWatcherIntegrationTestCase.java index dff1f9d39571e..322de2810591a 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/AbstractWatcherIntegrationTestCase.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/AbstractWatcherIntegrationTestCase.java @@ -276,7 +276,7 @@ private void createWatcherIndicesOrAliases() throws Exception { assertAcked(client().admin().indices().prepareCreate(triggeredWatchIndexName)); } - String historyIndex = HistoryStoreField.getHistoryIndexNameForTime(ZonedDateTime.now(ZoneOffset.UTC)); + String historyIndex = HistoryStoreField.getHistoryIndexNameForTime(ZonedDateTime.now(ZoneOffset.UTC), null); assertAcked(client().admin().indices().prepareCreate(historyIndex)); logger.info("creating watch history index [{}]", historyIndex); ensureGreen(historyIndex, watchIndexName, triggeredWatchIndexName); diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/BootStrapTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/BootStrapTests.java index 56488b12cb35c..b33e26f0c615f 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/BootStrapTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/BootStrapTests.java @@ -78,7 +78,7 @@ public void testLoadMalformedWatchRecord() throws Exception { Wid wid = new Wid("_id", now); ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", now, now); ExecutableCondition condition = InternalAlwaysCondition.INSTANCE; - String index = HistoryStoreField.getHistoryIndexNameForTime(now); + String index = HistoryStoreField.getHistoryIndexNameForTime(now, null); client().prepareIndex().setIndex(index).setId(wid.value()) .setSource(jsonBuilder().startObject() .startObject(WatchRecord.TRIGGER_EVENT.getPreferredName()) @@ -309,7 +309,7 @@ public void testWatchRecordSavedTwice() throws Exception { } LocalDateTime localDateTime = LocalDateTime.of(2015, 11, 5, 0, 0, 0, 0); ZonedDateTime triggeredTime = ZonedDateTime.of(localDateTime,ZoneOffset.UTC); - final String watchRecordIndex = HistoryStoreField.getHistoryIndexNameForTime(triggeredTime); + final String watchRecordIndex = HistoryStoreField.getHistoryIndexNameForTime(triggeredTime, null); logger.info("Stopping watcher"); stopWatcher();