77
88import org .elasticsearch .client .Request ;
99import org .elasticsearch .client .Response ;
10+ import org .elasticsearch .client .ResponseException ;
1011import org .elasticsearch .common .Strings ;
1112import org .elasticsearch .common .settings .SecureString ;
1213import org .elasticsearch .common .settings .Settings ;
1314import org .elasticsearch .common .util .concurrent .ThreadContext ;
1415import org .elasticsearch .common .xcontent .XContentBuilder ;
16+ import org .elasticsearch .common .xcontent .XContentType ;
1517import org .elasticsearch .test .rest .ESRestTestCase ;
1618import org .elasticsearch .test .rest .yaml .ObjectPath ;
1719import org .elasticsearch .xpack .test .rest .XPackRestTestConstants ;
@@ -110,7 +112,7 @@ protected Settings restAdminSettings() {
110112
111113 @ AwaitsFix (bugUrl = "https://github.com/elastic/elasticsearch/issues/32299" )
112114 public void testMonitorClusterHealth () throws Exception {
113- String watchId = "cluster_health_watch" ;
115+ final String watchId = "cluster_health_watch" ;
114116
115117 // get master publish address
116118 Response clusterStateResponse = adminClient ().performRequest (new Request ("GET" , "/_cluster/state" ));
@@ -124,7 +126,7 @@ public void testMonitorClusterHealth() throws Exception {
124126 assertThat (address , is (notNullValue ()));
125127 String [] splitAddress = address .split (":" , 2 );
126128 String host = splitAddress [0 ];
127- int port = Integer .valueOf (splitAddress [1 ]);
129+ int port = Integer .parseInt (splitAddress [1 ]);
128130
129131 // put watch
130132 try (XContentBuilder builder = jsonBuilder ()) {
@@ -156,11 +158,17 @@ public void testMonitorClusterHealth() throws Exception {
156158
157159 // check watch history
158160 ObjectPath objectPath = getWatchHistoryEntry (watchId );
159- boolean conditionMet = objectPath .evaluate ("hits.hits.0._source.result.condition.met" );
160- assertThat (conditionMet , is (true ));
161+ Boolean conditionMet = objectPath .evaluate ("hits.hits.0._source.result.condition.met" );
162+ String historyEntriesAsString = Strings .toString (objectPath .toXContentBuilder (XContentType .JSON .xContent ()));
163+ assertThat ("condition not met in response [" + historyEntriesAsString + "]" , conditionMet , is (true ));
161164
162165 deleteWatch (watchId );
163- assertWatchCount (0 );
166+ // Wrap inside an assertBusy(...), because watch may execute just after being deleted,
167+ // This tries to re-add the watch which fails, because of version conflict,
168+ // but for a moment the watch count from watcher stats api may be incorrect.
169+ // (via WatcherIndexingListener#preIndex)
170+ // The WatcherIndexingListener#postIndex() detects this version conflict and corrects the watch count.
171+ assertBusy (() -> assertWatchCount (0 ));
164172 }
165173
166174 private void indexWatch (String watchId , XContentBuilder builder ) throws Exception {
@@ -182,16 +190,54 @@ private void deleteWatch(String watchId) throws IOException {
182190 private ObjectPath getWatchHistoryEntry (String watchId ) throws Exception {
183191 final AtomicReference <ObjectPath > objectPathReference = new AtomicReference <>();
184192 assertBusy (() -> {
185- client ().performRequest (new Request ("POST" , "/.watcher-history-*/_refresh" ));
193+ try {
194+ client ().performRequest (new Request ("POST" , "/.watcher-history-*/_refresh" ));
195+ } catch (ResponseException e ) {
196+ final String err = "Failed to perform refresh of watcher history" ;
197+ logger .error (err , e );
198+ throw new AssertionError (err , e );
199+ }
186200
187201 try (XContentBuilder builder = jsonBuilder ()) {
188202 builder .startObject ();
189- builder .startObject ("query" ).startObject ("bool" ).startArray ("must" );
190- builder .startObject ().startObject ("term" ).startObject ("watch_id" ).field ("value" , watchId ).endObject ().endObject ()
191- .endObject ();
192- builder .endArray ().endObject ().endObject ();
193- builder .startArray ("sort" ).startObject ().startObject ("trigger_event.triggered_time" ).field ("order" , "desc" ).endObject ()
194- .endObject ().endArray ();
203+ {
204+ builder .startObject ("query" );
205+ {
206+ builder .startObject ("bool" );
207+ builder .startArray ("must" );
208+ builder .startObject ();
209+ {
210+ builder .startObject ("term" );
211+ builder .startObject ("watch_id" );
212+ builder .field ("value" , watchId );
213+ builder .endObject ();
214+ builder .endObject ();
215+ }
216+ builder .endObject ();
217+ builder .startObject ();
218+ {
219+ builder .startObject ("term" );
220+ builder .startObject ("state" );
221+ builder .field ("value" , "executed" );
222+ builder .endObject ();
223+ builder .endObject ();
224+ }
225+ builder .endObject ();
226+ builder .endArray ();
227+ builder .endObject ();
228+ }
229+ builder .endObject ();
230+ builder .startArray ("sort" );
231+ builder .startObject ();
232+ {
233+
234+ builder .startObject ("result.execution_time" );
235+ builder .field ("order" , "desc" );
236+ builder .endObject ();
237+ }
238+ builder .endObject ();
239+ builder .endArray ();
240+ }
195241 builder .endObject ();
196242
197243 Request searchRequest = new Request ("POST" , "/.watcher-history-*/_search" );
@@ -204,6 +250,10 @@ private ObjectPath getWatchHistoryEntry(String watchId) throws Exception {
204250 String watchid = objectPath .evaluate ("hits.hits.0._source.watch_id" );
205251 assertThat (watchid , is (watchId ));
206252 objectPathReference .set (objectPath );
253+ } catch (ResponseException e ) {
254+ final String err = "Failed to perform search of watcher history" ;
255+ logger .error (err , e );
256+ throw new AssertionError (err , e );
207257 }
208258 });
209259 return objectPathReference .get ();
0 commit comments