Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML] fixes testWatchdog test verifying matcher is interrupted on timeout #62391

Merged
merged 5 commits into from
Sep 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ public synchronized void close() {
* @throws ElasticsearchTimeoutException If the operation is found to have taken longer than the permitted time.
*/
public void check(String where) {

if (timeoutExceeded) {
throw new ElasticsearchTimeoutException("Aborting " + operation + " during [" + where +
"] as it has taken longer than the timeout of [" + timeout + "]");
Expand All @@ -101,7 +100,6 @@ public void check(String where) {
* @throws ElasticsearchTimeoutException If the operation is found to have taken longer than the permitted time.
*/
public Map<String, Object> grokCaptures(Grok grok, String text, String where) {

try {
return grok.captures(text);
} finally {
Expand Down Expand Up @@ -137,12 +135,15 @@ void add(Thread thread, TimeValue timeout) {
}

@Override
public void register(Matcher matcher) {
public synchronized void register(Matcher matcher) {
WatchDogEntry value = registry.get(Thread.currentThread());
if (value != null) {
boolean wasFalse = value.registered.compareAndSet(false, true);
assert wasFalse;
value.matchers.add(matcher);
if (value.isTimedOut()) {
benwtrent marked this conversation as resolved.
Show resolved Hide resolved
matcher.interrupt();
}
}
}

Expand All @@ -167,8 +168,9 @@ void remove(Thread thread) {
assert previousValue != null;
}

void interruptLongRunningThreadIfRegistered(Thread thread) {
synchronized void interruptLongRunningThreadIfRegistered(Thread thread) {
WatchDogEntry value = registry.get(thread);
value.timedOut();
if (value.registered.get()) {
for (Matcher matcher : value.matchers) {
matcher.interrupt();
Expand All @@ -181,12 +183,21 @@ static class WatchDogEntry {
final TimeValue timeout;
final AtomicBoolean registered;
final Collection<Matcher> matchers;
boolean timedOut;

WatchDogEntry(TimeValue timeout) {
this.timeout = timeout;
this.registered = new AtomicBoolean(false);
this.matchers = new CopyOnWriteArrayList<>();
}

private void timedOut() {
timedOut = true;
}

private boolean isTimedOut() {
return timedOut;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,21 +59,19 @@ public void testCheckTimeoutExceeded() throws Exception {
}
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/48861")
public void testWatchdog() throws Exception {
TimeValue timeout = TimeValue.timeValueMillis(500);
final TimeValue timeout = TimeValue.timeValueMillis(randomIntBetween(10, 500));
try (TimeoutChecker timeoutChecker = new TimeoutChecker("watchdog test", timeout, scheduler)) {
TimeoutChecker.TimeoutCheckerWatchdog watchdog = (TimeoutChecker.TimeoutCheckerWatchdog) TimeoutChecker.watchdog;

final TimeoutChecker.TimeoutCheckerWatchdog watchdog = (TimeoutChecker.TimeoutCheckerWatchdog) TimeoutChecker.watchdog;
Matcher matcher = mock(Matcher.class);
TimeoutChecker.watchdog.register(matcher);
watchdog.register(matcher);
assertThat(watchdog.registry.get(Thread.currentThread()).matchers.size(), equalTo(1));
try {
assertBusy(() -> {
verify(matcher).interrupt();
});
} finally {
TimeoutChecker.watchdog.unregister(matcher);
watchdog.unregister(matcher);
assertThat(watchdog.registry.get(Thread.currentThread()).matchers.size(), equalTo(0));
}
}
Expand Down