From a8d8314f8c5c8ecfcabbaf550e1101ade9a0f006 Mon Sep 17 00:00:00 2001 From: Jake Landis Date: Tue, 12 May 2020 12:56:34 -0500 Subject: [PATCH] Watcher dont add watches post index if stopped (#56556) Watcher adds watches to the trigger service on the postIndex action for the .watches index. This has the (intentional) side effect of also adding the watches to the stats. The tests rely on these stats for their assertions. The tests also start and stop Watcher between each test for a clean slate. When Watcher executes it updates the .watches index and upon this update it will go through the postIndex method and end up added that watch to the trigger service (and stats). Functionally this is not a problem, if Watcher is stopping or stopped since Watcher is also paused and will not execute the watch. However, with specific timing and expectations of a clean slate can cause issues the test assertions against the stats. This commit ensures that the postIndex action only adds to the trigger service if the Watcher state is not stopping or stopped. When started back up it will re-read index .watches. This commit also un-mutes the tests related to #53177 and #56534 --- .../elasticsearch/xpack/watcher/Watcher.java | 2 +- .../watcher/WatcherIndexingListener.java | 14 +++++++--- .../watcher/WatcherLifeCycleService.java | 5 ++-- .../stats/TransportWatcherStatsAction.java | 2 +- .../watcher/WatcherIndexingListenerTests.java | 26 ++++++++++++++++++- .../watcher/WatcherLifeCycleServiceTests.java | 10 +++---- .../TransportWatcherStatsActionTests.java | 2 +- 7 files changed, 46 insertions(+), 15 deletions(-) diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/Watcher.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/Watcher.java index d927e63ba2c08..1fce2d6810283 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/Watcher.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/Watcher.java @@ -423,7 +423,7 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure) final WatcherLifeCycleService watcherLifeCycleService = new WatcherLifeCycleService(clusterService, watcherService); - listener = new WatcherIndexingListener(watchParser, getClock(), triggerService); + listener = new WatcherIndexingListener(watchParser, getClock(), triggerService, watcherLifeCycleService.getState()); clusterService.addListener(listener); return Arrays.asList(registry, inputRegistry, historyStore, triggerService, triggeredWatchParser, diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherIndexingListener.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherIndexingListener.java index 2cac0632f3eab..a8a2c611c89d4 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherIndexingListener.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherIndexingListener.java @@ -25,6 +25,7 @@ import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.shard.IndexingOperationListener; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.xpack.core.watcher.WatcherState; import org.elasticsearch.xpack.core.watcher.watch.Watch; import org.elasticsearch.xpack.watcher.trigger.TriggerService; import org.elasticsearch.xpack.watcher.watch.WatchParser; @@ -38,11 +39,13 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.function.Supplier; import java.util.stream.Collectors; import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING; @@ -66,12 +69,14 @@ final class WatcherIndexingListener implements IndexingOperationListener, Cluste private final WatchParser parser; private final Clock clock; private final TriggerService triggerService; + private final Supplier watcherState; private volatile Configuration configuration = INACTIVE; - WatcherIndexingListener(WatchParser parser, Clock clock, TriggerService triggerService) { + WatcherIndexingListener(WatchParser parser, Clock clock, TriggerService triggerService, Supplier watcherState) { this.parser = parser; this.clock = clock; this.triggerService = triggerService; + this.watcherState = watcherState; } // package private for testing @@ -119,8 +124,9 @@ public void postIndex(ShardId shardId, Engine.Index operation, Engine.IndexResul } boolean shouldBeTriggered = shardAllocationConfiguration.shouldBeTriggered(watch.id()); - if (shouldBeTriggered) { - if (watch.status().state().isActive()) { + WatcherState currentState = watcherState.get(); + if (shouldBeTriggered && EnumSet.of(WatcherState.STOPPING, WatcherState.STOPPED).contains(currentState) == false) { + if (watch.status().state().isActive() ) { logger.debug("adding watch [{}] to trigger service", watch.id()); triggerService.add(watch); } else { @@ -128,7 +134,7 @@ public void postIndex(ShardId shardId, Engine.Index operation, Engine.IndexResul triggerService.remove(watch.id()); } } else { - logger.debug("watch [{}] should not be triggered", watch.id()); + logger.debug("watch [{}] should not be triggered. watcher state [{}]", watch.id(), currentState); } } catch (IOException e) { throw new ElasticsearchParseException("Could not parse watch with id [{}]", e, operation.id()); diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java index f1dd7be196569..517dda9a4ac20 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java @@ -31,6 +31,7 @@ import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; import java.util.stream.Collectors; import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING; @@ -203,7 +204,7 @@ List shardRoutings() { return previousShardRoutings.get(); } - public WatcherState getState() { - return state.get(); + public Supplier getState(){ + return () -> state.get(); } } diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/stats/TransportWatcherStatsAction.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/stats/TransportWatcherStatsAction.java index e3e00db6850c1..182a8d022bf2f 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/stats/TransportWatcherStatsAction.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/stats/TransportWatcherStatsAction.java @@ -66,7 +66,7 @@ protected WatcherStatsResponse.Node newNodeResponse(StreamInput in) throws IOExc @Override protected WatcherStatsResponse.Node nodeOperation(WatcherStatsRequest.Node request) { WatcherStatsResponse.Node statsResponse = new WatcherStatsResponse.Node(clusterService.localNode()); - statsResponse.setWatcherState(lifeCycleService.getState()); + statsResponse.setWatcherState(lifeCycleService.getState().get()); statsResponse.setThreadPoolQueueSize(executionService.executionThreadPoolQueueSize()); statsResponse.setThreadPoolMaxSize(executionService.executionThreadPoolMaxSize()); if (request.includeCurrentWatches()) { diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherIndexingListenerTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherIndexingListenerTests.java index 031f75ac6d259..e3a822e6a09a6 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherIndexingListenerTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherIndexingListenerTests.java @@ -33,6 +33,7 @@ import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.core.watcher.WatcherState; import org.elasticsearch.xpack.core.watcher.watch.ClockMock; import org.elasticsearch.xpack.core.watcher.watch.Watch; import org.elasticsearch.xpack.core.watcher.watch.WatchStatus; @@ -89,7 +90,7 @@ public class WatcherIndexingListenerTests extends ESTestCase { @Before public void setup() throws Exception { clock.freeze(); - listener = new WatcherIndexingListener(parser, clock, triggerService); + listener = new WatcherIndexingListener(parser, clock, triggerService, () -> WatcherState.STARTED); Map map = new HashMap<>(); map.put(shardId, new ShardAllocationConfiguration(0, 1, Collections.singletonList("foo"))); @@ -140,6 +141,29 @@ public void testPostIndex() throws Exception { } } + public void testPostIndexWhenStopped() throws Exception { + listener = new WatcherIndexingListener(parser, clock, triggerService, () -> WatcherState.STOPPED); + Map map = new HashMap<>(); + map.put(shardId, new ShardAllocationConfiguration(0, 1, Collections.singletonList("foo"))); + listener.setConfiguration(new Configuration(Watch.INDEX, map)); + when(operation.id()).thenReturn(randomAlphaOfLength(10)); + when(operation.source()).thenReturn(BytesArray.EMPTY); + when(shardId.getIndexName()).thenReturn(Watch.INDEX); + List types = new ArrayList<>(List.of(Engine.Result.Type.values())); + types.remove(Engine.Result.Type.FAILURE); + when(result.getResultType()).thenReturn(randomFrom(types)); + + boolean watchActive = randomBoolean(); + boolean isNewWatch = randomBoolean(); + Watch watch = mockWatch("_id", watchActive, isNewWatch); + when(parser.parseWithSecrets(anyObject(), eq(true), anyObject(), anyObject(), anyObject(), anyLong(), anyLong())).thenReturn(watch); + + listener.postIndex(shardId, operation, result); + ZonedDateTime now = DateUtils.nowWithMillisResolution(clock); + verify(parser).parseWithSecrets(eq(operation.id()), eq(true), eq(BytesArray.EMPTY), eq(now), anyObject(), anyLong(), anyLong()); + verifyZeroInteractions(triggerService); + } + // this test emulates an index with 10 shards, and ensures that triggering only happens on a // single shard public void testPostIndexWatchGetsOnlyTriggeredOnceAcrossAllShards() throws Exception { diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleServiceTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleServiceTests.java index cf6c2c5ac6663..25a9287e76715 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleServiceTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleServiceTests.java @@ -179,9 +179,9 @@ public void testManualStartStop() { ArgumentCaptor captor = ArgumentCaptor.forClass(Runnable.class); verify(watcherService, times(1)) .stop(eq("watcher manually marked to shutdown by cluster state update"), captor.capture()); - assertEquals(WatcherState.STOPPING, lifeCycleService.getState()); + assertEquals(WatcherState.STOPPING, lifeCycleService.getState().get()); captor.getValue().run(); - assertEquals(WatcherState.STOPPED, lifeCycleService.getState()); + assertEquals(WatcherState.STOPPED, lifeCycleService.getState().get()); // Starting via cluster state update, as the watcher metadata block is removed/set to true reset(watcherService); @@ -480,7 +480,7 @@ public void testMasterOnlyNodeCanStart() { new HashSet<>(roles), Version.CURRENT))).build(); lifeCycleService.clusterChanged(new ClusterChangedEvent("test", state, state)); - assertThat(lifeCycleService.getState(), is(WatcherState.STARTED)); + assertThat(lifeCycleService.getState().get(), is(WatcherState.STARTED)); } public void testDataNodeWithoutDataCanStart() { @@ -494,7 +494,7 @@ public void testDataNodeWithoutDataCanStart() { .build(); lifeCycleService.clusterChanged(new ClusterChangedEvent("test", state, state)); - assertThat(lifeCycleService.getState(), is(WatcherState.STARTED)); + assertThat(lifeCycleService.getState().get(), is(WatcherState.STARTED)); } // this emulates a node outage somewhere in the cluster that carried a watcher shard @@ -584,7 +584,7 @@ private void startWatcher() { when(watcherService.validate(state)).thenReturn(true); lifeCycleService.clusterChanged(new ClusterChangedEvent("foo", state, emptyState)); - assertThat(lifeCycleService.getState(), is(WatcherState.STARTED)); + assertThat(lifeCycleService.getState().get(), is(WatcherState.STARTED)); verify(watcherService, times(1)).reload(eq(state), anyString()); assertThat(lifeCycleService.shardRoutings(), hasSize(1)); diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/transport/actions/stats/TransportWatcherStatsActionTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/transport/actions/stats/TransportWatcherStatsActionTests.java index 605119c0b6404..127f372da3e4a 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/transport/actions/stats/TransportWatcherStatsActionTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/transport/actions/stats/TransportWatcherStatsActionTests.java @@ -58,7 +58,7 @@ public void setupTransportAction() { when(clusterService.state()).thenReturn(clusterState); WatcherLifeCycleService watcherLifeCycleService = mock(WatcherLifeCycleService.class); - when(watcherLifeCycleService.getState()).thenReturn(WatcherState.STARTED); + when(watcherLifeCycleService.getState()).thenReturn(() -> WatcherState.STARTED); ExecutionService executionService = mock(ExecutionService.class); when(executionService.executionThreadPoolQueueSize()).thenReturn(100L);