Skip to content

Commit

Permalink
Ensure that .watcher-history-11* template is in installed prior to use (
Browse files Browse the repository at this point in the history
elastic#56734)

WatcherIndexTemplateRegistry as of elastic#52962 
requires all nodes to be on 7.7.0 before it allows the version 11 index template to be 
installed.

While in a mixed cluster, nothing prevents Watcher from running on the new
host before the all of the nodes are on 7.7.0. This will result in the
.watcher-history-11* index without the proper mappings. Without the proper
mapping a single document (for a large watch) can exceed the default 1000 field
limit and cause error to show in the logs.

This commit ensures the same logic for writing to the index is applied as for
installing the template. In a mixed cluster, the `10` index template will continue
to be written. Only once all of nodes are on 7.7.0+ will the `11` index template
be installed and used.

closes elastic#56732
  • Loading branch information
jakelandis committed May 15, 2020
1 parent bfd29fb commit f162808
Show file tree
Hide file tree
Showing 10 changed files with 63 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ private void addTemplatesIfMissing(ClusterState state) {
if (creationCheck.compareAndSet(false, true)) {
IndexTemplateMetadata currentTemplate = state.metadata().getTemplates().get(templateName);
if (Objects.isNull(currentTemplate)) {
logger.debug("adding index template [{}] for [{}], because it doesn't exist", templateName, getOrigin());
logger.info("adding index template [{}] for [{}], because it doesn't exist", templateName, getOrigin());
putTemplate(newTemplate, creationCheck);
} else if (Objects.isNull(currentTemplate.getVersion()) || newTemplate.getVersion() > currentTemplate.getVersion()) {
// IndexTemplateConfig now enforces templates contain a `version` property, so if the template doesn't have one we can
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
*/
package org.elasticsearch.xpack.core.watcher.history;

import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.time.DateFormatter;
import org.elasticsearch.xpack.core.watcher.support.WatcherIndexTemplateRegistryField;

Expand All @@ -14,12 +16,18 @@ public final class HistoryStoreField {

public static final String INDEX_PREFIX = ".watcher-history-";
public static final String INDEX_PREFIX_WITH_TEMPLATE = INDEX_PREFIX + WatcherIndexTemplateRegistryField.INDEX_TEMPLATE_VERSION + "-";
public static final String INDEX_PREFIX_WITH_TEMPLATE_10 = INDEX_PREFIX +
WatcherIndexTemplateRegistryField.INDEX_TEMPLATE_VERSION_10 + "-";
private static final DateFormatter indexTimeFormat = DateFormatter.forPattern("yyyy.MM.dd");

/**
* Calculates the correct history index name for a given time
*/
public static String getHistoryIndexNameForTime(ZonedDateTime time) {
return INDEX_PREFIX_WITH_TEMPLATE + indexTimeFormat.format(time);
public static String getHistoryIndexNameForTime(ZonedDateTime time, ClusterState state) {
if (state == null || state.nodes().getMinNodeVersion().onOrAfter(Version.V_7_7_0)) {
return INDEX_PREFIX_WITH_TEMPLATE + indexTimeFormat.format(time);
} else {
return INDEX_PREFIX_WITH_TEMPLATE_10 + indexTimeFormat.format(time);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ public final class WatcherIndexTemplateRegistryField {
// version 11: watch history indices are hidden
// Note: if you change this, also inform the kibana team around the watcher-ui
public static final int INDEX_TEMPLATE_VERSION = 11;
public static final int INDEX_TEMPLATE_VERSION_10 = 10;
public static final String HISTORY_TEMPLATE_NAME = ".watch-history-" + INDEX_TEMPLATE_VERSION;
public static final String HISTORY_TEMPLATE_NAME_10 = ".watch-history-10";
public static final String HISTORY_TEMPLATE_NAME_10 = ".watch-history-" + INDEX_TEMPLATE_VERSION_10;
public static final String HISTORY_TEMPLATE_NAME_NO_ILM = ".watch-history-no-ilm-" + INDEX_TEMPLATE_VERSION;
public static final String HISTORY_TEMPLATE_NAME_NO_ILM_10 = ".watch-history-no-ilm-10";
public static final String TRIGGERED_TEMPLATE_NAME = ".triggered_watches";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1395,7 +1395,7 @@ public void testWatcherAdminRole() {
assertThat(role.indices().allowedIndicesMatcher(IndexAction.NAME).test("foo"), is(false));

ZonedDateTime now = ZonedDateTime.now(ZoneOffset.UTC);
String historyIndex = HistoryStoreField.getHistoryIndexNameForTime(now);
String historyIndex = HistoryStoreField.getHistoryIndexNameForTime(now, null);
for (String index : new String[]{ Watch.INDEX, historyIndex, TriggeredWatchStoreField.INDEX_NAME }) {
assertOnlyReadAllowed(role, index);
}
Expand Down Expand Up @@ -1429,7 +1429,7 @@ public void testWatcherUserRole() {
assertThat(role.indices().allowedIndicesMatcher(IndexAction.NAME).test(TriggeredWatchStoreField.INDEX_NAME), is(false));

ZonedDateTime now = ZonedDateTime.now(ZoneOffset.UTC);
String historyIndex = HistoryStoreField.getHistoryIndexNameForTime(now);
String historyIndex = HistoryStoreField.getHistoryIndexNameForTime(now, null);
for (String index : new String[]{ Watch.INDEX, historyIndex }) {
assertOnlyReadAllowed(role, index);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure)
.setConcurrentRequests(SETTING_BULK_CONCURRENT_REQUESTS.get(settings))
.build();

HistoryStore historyStore = new HistoryStore(bulkProcessor);
HistoryStore historyStore = new HistoryStore(bulkProcessor, clusterService::state);

// schedulers
final Set<Schedule.Parser> scheduleParsers = new HashSet<>();
Expand Down Expand Up @@ -623,14 +623,14 @@ static void validAutoCreateIndex(Settings settings, Logger logger) {
indices.add(".watches");
indices.add(".triggered_watches");
ZonedDateTime now = ZonedDateTime.now(ZoneOffset.UTC);
indices.add(HistoryStoreField.getHistoryIndexNameForTime(now));
indices.add(HistoryStoreField.getHistoryIndexNameForTime(now.plusDays(1)));
indices.add(HistoryStoreField.getHistoryIndexNameForTime(now.plusMonths(1)));
indices.add(HistoryStoreField.getHistoryIndexNameForTime(now.plusMonths(2)));
indices.add(HistoryStoreField.getHistoryIndexNameForTime(now.plusMonths(3)));
indices.add(HistoryStoreField.getHistoryIndexNameForTime(now.plusMonths(4)));
indices.add(HistoryStoreField.getHistoryIndexNameForTime(now.plusMonths(5)));
indices.add(HistoryStoreField.getHistoryIndexNameForTime(now.plusMonths(6)));
indices.add(HistoryStoreField.getHistoryIndexNameForTime(now, null));
indices.add(HistoryStoreField.getHistoryIndexNameForTime(now.plusDays(1), null));
indices.add(HistoryStoreField.getHistoryIndexNameForTime(now.plusMonths(1), null));
indices.add(HistoryStoreField.getHistoryIndexNameForTime(now.plusMonths(2), null));
indices.add(HistoryStoreField.getHistoryIndexNameForTime(now.plusMonths(3), null));
indices.add(HistoryStoreField.getHistoryIndexNameForTime(now.plusMonths(4), null));
indices.add(HistoryStoreField.getHistoryIndexNameForTime(now.plusMonths(5), null));
indices.add(HistoryStoreField.getHistoryIndexNameForTime(now.plusMonths(6), null));
for (String index : indices) {
boolean matched = false;
for (String match : matches) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@ private void executeAsync(WatchExecutionContext ctx, final TriggeredWatch trigge
* Any existing watchRecord will be overwritten.
*/
private void forcePutHistory(WatchRecord watchRecord) {
String index = HistoryStoreField.getHistoryIndexNameForTime(watchRecord.triggerEvent().triggeredTime());
String index = HistoryStoreField.getHistoryIndexNameForTime(watchRecord.triggerEvent().triggeredTime(), clusterService.state());
try {
try (XContentBuilder builder = XContentFactory.jsonBuilder();
ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashWithOrigin(WATCHER_ORIGIN)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,19 @@ public class HistoryStore {
private static final Logger logger = LogManager.getLogger(HistoryStore.class);

private final BulkProcessor bulkProcessor;
private final Supplier<ClusterState> clusterStateSupplier;

public HistoryStore(BulkProcessor bulkProcessor) {
public HistoryStore(BulkProcessor bulkProcessor, Supplier<ClusterState> clusterStateSupplier) {
this.bulkProcessor = bulkProcessor;
this.clusterStateSupplier = clusterStateSupplier;
}

/**
* Stores the specified watchRecord.
* If the specified watchRecord already was stored this call will fail with a version conflict.
*/
public void put(WatchRecord watchRecord) throws Exception {
String index = HistoryStoreField.getHistoryIndexNameForTime(watchRecord.triggerEvent().triggeredTime());
String index = HistoryStoreField.getHistoryIndexNameForTime(watchRecord.triggerEvent().triggeredTime(), clusterStateSupplier.get());
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
watchRecord.toXContent(builder, WatcherParams.HIDE_SECRETS);

Expand All @@ -58,7 +60,7 @@ public void put(WatchRecord watchRecord) throws Exception {
* Any existing watchRecord will be overwritten.
*/
public void forcePut(WatchRecord watchRecord) {
String index = HistoryStoreField.getHistoryIndexNameForTime(watchRecord.triggerEvent().triggeredTime());
String index = HistoryStoreField.getHistoryIndexNameForTime(watchRecord.triggerEvent().triggeredTime(), clusterStateSupplier.get());
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
watchRecord.toXContent(builder, WatcherParams.HIDE_SECRETS);

Expand All @@ -78,7 +80,7 @@ public void forcePut(WatchRecord watchRecord) {
* @return true, if history store is ready to be started
*/
public static boolean validate(ClusterState state) {
String currentIndex = HistoryStoreField.getHistoryIndexNameForTime(ZonedDateTime.now(ZoneOffset.UTC));
String currentIndex = HistoryStoreField.getHistoryIndexNameForTime(ZonedDateTime.now(ZoneOffset.UTC), state);
IndexMetadata indexMetadata = WatchStoreUtils.getConcreteIndex(currentIndex, state.metadata());
return indexMetadata == null || (indexMetadata.getState() == IndexMetadata.State.OPEN &&
state.routingTable().index(indexMetadata.getIndex()).allPrimaryShardsActive());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import org.apache.http.HttpStatus;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest.OpType;
import org.elasticsearch.action.bulk.BulkItemResponse;
Expand All @@ -16,6 +17,8 @@
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.settings.MockSecureSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
Expand Down Expand Up @@ -43,11 +46,13 @@
import java.time.Instant;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.Arrays;

import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonMap;
import static org.elasticsearch.xpack.core.watcher.history.HistoryStoreField.getHistoryIndexNameForTime;
import static org.elasticsearch.xpack.core.watcher.support.WatcherIndexTemplateRegistryField.INDEX_TEMPLATE_VERSION;
import static org.elasticsearch.xpack.core.watcher.support.WatcherIndexTemplateRegistryField.INDEX_TEMPLATE_VERSION_10;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
Expand All @@ -63,24 +68,30 @@ public class HistoryStoreTests extends ESTestCase {

private HistoryStore historyStore;
private Client client;
private ClusterState clusterState;
private DiscoveryNodes discoveryNodes;

@Before
public void init() {
Settings settings = Settings.builder().put("node.name", randomAlphaOfLength(10)).build();
client = mock(Client.class);
clusterState = mock(ClusterState.class);
discoveryNodes = mock(DiscoveryNodes.class);
ThreadPool threadPool = mock(ThreadPool.class);
when(client.threadPool()).thenReturn(threadPool);
when(client.settings()).thenReturn(settings);
when(threadPool.getThreadContext()).thenReturn(new ThreadContext(settings));
when(clusterState.nodes()).thenReturn(discoveryNodes);
when(discoveryNodes.getMinNodeVersion()).thenReturn(randomFrom(Arrays.asList(Version.V_7_0_0, Version.V_7_7_0)));
BulkProcessor.Listener listener = mock(BulkProcessor.Listener.class);
BulkProcessor bulkProcessor = BulkProcessor.builder(client::bulk, listener).setConcurrentRequests(0).setBulkActions(1).build();
historyStore = new HistoryStore(bulkProcessor);
historyStore = new HistoryStore(bulkProcessor, () -> clusterState);
}

public void testPut() throws Exception {
ZonedDateTime now = Instant.ofEpochMilli(0).atZone(ZoneOffset.UTC);
Wid wid = new Wid("_name", now);
String index = getHistoryIndexNameForTime(now);
String index = getHistoryIndexNameForTime(now, clusterState);
ScheduleTriggerEvent event = new ScheduleTriggerEvent(wid.watchId(), now, now);
WatchRecord watchRecord = new WatchRecord.MessageWatchRecord(wid, event, ExecutionState.EXECUTED, null, randomAlphaOfLength(10));

Expand All @@ -105,15 +116,11 @@ public void testPut() throws Exception {
}

public void testIndexNameGeneration() {
String indexTemplateVersion = Integer.toString(INDEX_TEMPLATE_VERSION);
assertThat(getHistoryIndexNameForTime(Instant.ofEpochMilli((long) 0).atZone(ZoneOffset.UTC)),
equalTo(".watcher-history-"+ indexTemplateVersion +"-1970.01.01"));
assertThat(getHistoryIndexNameForTime(Instant.ofEpochMilli(100000000000L).atZone(ZoneOffset.UTC)),
equalTo(".watcher-history-" + indexTemplateVersion + "-1973.03.03"));
assertThat(getHistoryIndexNameForTime(Instant.ofEpochMilli(1416582852000L).atZone(ZoneOffset.UTC)),
equalTo(".watcher-history-" + indexTemplateVersion + "-2014.11.21"));
assertThat(getHistoryIndexNameForTime(Instant.ofEpochMilli(2833165811000L).atZone(ZoneOffset.UTC)),
equalTo(".watcher-history-" + indexTemplateVersion + "-2059.10.12"));
when(discoveryNodes.getMinNodeVersion()).thenReturn(Version.V_7_7_0);
assertHistoryIndexName(Integer.toString(INDEX_TEMPLATE_VERSION));

when(discoveryNodes.getMinNodeVersion()).thenReturn(Version.V_7_0_0);
assertHistoryIndexName(Integer.toString(INDEX_TEMPLATE_VERSION_10));
}

public void testStoreWithHideSecrets() throws Exception {
Expand Down Expand Up @@ -179,4 +186,15 @@ public void testStoreWithHideSecrets() throws Exception {
assertThat(indexedJson, containsString(username));
assertThat(indexedJson, not(containsString(password)));
}

private void assertHistoryIndexName(String indexTemplateVersion){
assertThat(getHistoryIndexNameForTime(Instant.ofEpochMilli((long) 0).atZone(ZoneOffset.UTC), clusterState),
equalTo(".watcher-history-" + indexTemplateVersion + "-1970.01.01"));
assertThat(getHistoryIndexNameForTime(Instant.ofEpochMilli(100000000000L).atZone(ZoneOffset.UTC), clusterState),
equalTo(".watcher-history-" + indexTemplateVersion + "-1973.03.03"));
assertThat(getHistoryIndexNameForTime(Instant.ofEpochMilli(1416582852000L).atZone(ZoneOffset.UTC), clusterState),
equalTo(".watcher-history-" + indexTemplateVersion + "-2014.11.21"));
assertThat(getHistoryIndexNameForTime(Instant.ofEpochMilli(2833165811000L).atZone(ZoneOffset.UTC), clusterState),
equalTo(".watcher-history-" + indexTemplateVersion + "-2059.10.12"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ private void createWatcherIndicesOrAliases() throws Exception {
assertAcked(client().admin().indices().prepareCreate(triggeredWatchIndexName));
}

String historyIndex = HistoryStoreField.getHistoryIndexNameForTime(ZonedDateTime.now(ZoneOffset.UTC));
String historyIndex = HistoryStoreField.getHistoryIndexNameForTime(ZonedDateTime.now(ZoneOffset.UTC), null);
assertAcked(client().admin().indices().prepareCreate(historyIndex));
logger.info("creating watch history index [{}]", historyIndex);
ensureGreen(historyIndex, watchIndexName, triggeredWatchIndexName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public void testLoadMalformedWatchRecord() throws Exception {
Wid wid = new Wid("_id", now);
ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", now, now);
ExecutableCondition condition = InternalAlwaysCondition.INSTANCE;
String index = HistoryStoreField.getHistoryIndexNameForTime(now);
String index = HistoryStoreField.getHistoryIndexNameForTime(now, null);
client().prepareIndex().setIndex(index).setId(wid.value())
.setSource(jsonBuilder().startObject()
.startObject(WatchRecord.TRIGGER_EVENT.getPreferredName())
Expand Down Expand Up @@ -309,7 +309,7 @@ public void testWatchRecordSavedTwice() throws Exception {
}
LocalDateTime localDateTime = LocalDateTime.of(2015, 11, 5, 0, 0, 0, 0);
ZonedDateTime triggeredTime = ZonedDateTime.of(localDateTime,ZoneOffset.UTC);
final String watchRecordIndex = HistoryStoreField.getHistoryIndexNameForTime(triggeredTime);
final String watchRecordIndex = HistoryStoreField.getHistoryIndexNameForTime(triggeredTime, null);

logger.info("Stopping watcher");
stopWatcher();
Expand Down

0 comments on commit f162808

Please sign in to comment.