forked from elastic/elasticsearch
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Fix Watcher deadlock that can cause in-abilty to index documents. (el…
…astic#41418) * Fix Watcher deadlock that can cause in-abilty to index documents. This commit removes the usage of the `BulkProcessor` to write history documents and delete triggered watches on a `EsRejectedExecutionException`. Since the exception could be handled on the write thread, the write thread can be blocked waiting on watcher threads (due to a synchronous method). This is problematic since those watcher threads can be blocked waiting on write threads. This commit also moves the handling of the exception to the generic threadpool to avoid submitting write requests from the write thread pool. fixes elastic#41390
- Loading branch information
1 parent
66578c3
commit 1ed0a5c
Showing
3 changed files
with
216 additions
and
20 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
81 changes: 81 additions & 0 deletions
81
...rc/test/java/org/elasticsearch/xpack/watcher/test/integration/RejectedExecutionTests.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,81 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the Elastic License; | ||
* you may not use this file except in compliance with the Elastic License. | ||
*/ | ||
package org.elasticsearch.xpack.watcher.test.integration; | ||
|
||
import org.elasticsearch.action.search.SearchResponse; | ||
import org.elasticsearch.common.settings.Settings; | ||
import org.elasticsearch.license.LicenseService; | ||
import org.elasticsearch.xpack.core.XPackSettings; | ||
import org.elasticsearch.xpack.core.watcher.client.WatcherClient; | ||
import org.elasticsearch.xpack.watcher.condition.CompareCondition; | ||
import org.elasticsearch.xpack.watcher.support.search.WatcherSearchTemplateRequest; | ||
import org.elasticsearch.xpack.watcher.test.AbstractWatcherIntegrationTestCase; | ||
import org.elasticsearch.xpack.watcher.trigger.schedule.IntervalSchedule; | ||
|
||
import java.util.concurrent.TimeUnit; | ||
|
||
import static org.elasticsearch.index.query.QueryBuilders.termQuery; | ||
import static org.elasticsearch.search.builder.SearchSourceBuilder.searchSource; | ||
import static org.elasticsearch.xpack.watcher.actions.ActionBuilders.loggingAction; | ||
import static org.elasticsearch.xpack.watcher.client.WatchSourceBuilders.watchBuilder; | ||
import static org.elasticsearch.xpack.watcher.input.InputBuilders.searchInput; | ||
import static org.elasticsearch.xpack.watcher.test.WatcherTestUtils.templateRequest; | ||
import static org.elasticsearch.xpack.watcher.trigger.TriggerBuilders.schedule; | ||
import static org.elasticsearch.xpack.watcher.trigger.schedule.Schedules.interval; | ||
import static org.hamcrest.Matchers.equalTo; | ||
import static org.hamcrest.Matchers.greaterThanOrEqualTo; | ||
|
||
public class RejectedExecutionTests extends AbstractWatcherIntegrationTestCase { | ||
|
||
@Override | ||
protected boolean timeWarped() { | ||
//need to use the real scheduler | ||
return false; | ||
} | ||
|
||
public void testHistoryAndTriggeredOnRejection() throws Exception { | ||
WatcherClient watcherClient = watcherClient(); | ||
createIndex("idx"); | ||
client().prepareIndex("idx", "_doc").setSource("field", "a").get(); | ||
refresh(); | ||
WatcherSearchTemplateRequest request = templateRequest(searchSource().query(termQuery("field", "a")), "idx"); | ||
watcherClient.preparePutWatch(randomAlphaOfLength(5)) | ||
.setSource(watchBuilder() | ||
.trigger(schedule(interval(1, IntervalSchedule.Interval.Unit.SECONDS))) | ||
.input(searchInput(request)) | ||
.condition(new CompareCondition("ctx.payload.hits.total", CompareCondition.Op.EQ, 1L)) | ||
.addAction("_logger", loggingAction("_logging") | ||
.setCategory("_category"))) | ||
.get(); | ||
|
||
assertBusy(() -> { | ||
flushAndRefresh(".watcher-history-*"); | ||
SearchResponse searchResponse = client().prepareSearch(".watcher-history-*").get(); | ||
assertThat(searchResponse.getHits().getTotalHits().value, greaterThanOrEqualTo(2L)); | ||
}, 10, TimeUnit.SECONDS); | ||
|
||
flushAndRefresh(".triggered_watches"); | ||
SearchResponse searchResponse = client().prepareSearch(".triggered_watches").get(); | ||
assertThat(searchResponse.getHits().getTotalHits().value, equalTo(0L)); | ||
} | ||
|
||
@Override | ||
protected Settings nodeSettings(int nodeOrdinal) { | ||
|
||
return Settings.builder() | ||
.put(super.nodeSettings(nodeOrdinal)) | ||
.put(XPackSettings.MONITORING_ENABLED.getKey(), false) | ||
.put(XPackSettings.SECURITY_ENABLED.getKey(), false) | ||
.put(LicenseService.SELF_GENERATED_LICENSE_TYPE.getKey(), "trial") | ||
.put("thread_pool.write.size", 1) | ||
.put("thread_pool.write.queue_size", 1) | ||
.put("xpack.watcher.thread_pool.size", 1) | ||
.put("xpack.watcher.thread_pool.queue_size", 0) | ||
.build(); | ||
} | ||
|
||
|
||
} |