-
Notifications
You must be signed in to change notification settings - Fork 8.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Task Manager] time out work when it overruns in poller #74980
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the Elastic License; | ||
* you may not use this file except in compliance with the Elastic License. | ||
*/ | ||
|
||
import { timeoutPromiseAfter } from './timeout_promise_after'; | ||
|
||
const delay = (ms: number, result: unknown) => | ||
new Promise((resolve) => setTimeout(() => resolve(result), ms)); | ||
|
||
const delayRejection = (ms: number, result: unknown) => | ||
new Promise((resolve, reject) => setTimeout(() => reject(result), ms)); | ||
|
||
describe('Promise Timeout', () => { | ||
test('resolves when wrapped promise resolves', async () => { | ||
return expect( | ||
timeoutPromiseAfter(delay(100, 'OK'), 1000, () => 'TIMEOUT ERR') | ||
).resolves.toMatchInlineSnapshot(`"OK"`); | ||
}); | ||
|
||
test('reject when wrapped promise rejects', async () => { | ||
return expect( | ||
timeoutPromiseAfter(delayRejection(100, 'ERR'), 1000, () => 'TIMEOUT ERR') | ||
).rejects.toMatchInlineSnapshot(`"ERR"`); | ||
}); | ||
|
||
test('reject it the timeout elapses', async () => { | ||
return expect( | ||
timeoutPromiseAfter(delay(1000, 'OK'), 100, () => 'TIMEOUT ERR') | ||
).rejects.toMatchInlineSnapshot(`"TIMEOUT ERR"`); | ||
}); | ||
}); |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the Elastic License; | ||
* you may not use this file except in compliance with the Elastic License. | ||
*/ | ||
|
||
export function timeoutPromiseAfter<T, G>( | ||
future: Promise<T>, | ||
ms: number, | ||
onTimeout: () => G | ||
): Promise<T> { | ||
return new Promise((resolve, reject) => { | ||
setTimeout(() => reject(onTimeout()), ms); | ||
future.then(resolve).catch(reject); | ||
}); | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,7 +9,7 @@ import { Subject } from 'rxjs'; | |
import { Option, none, some } from 'fp-ts/lib/Option'; | ||
import { createTaskPoller, PollingError, PollingErrorType } from './task_poller'; | ||
import { fakeSchedulers } from 'rxjs-marbles/jest'; | ||
import { sleep, resolvable } from './test_utils'; | ||
import { sleep, resolvable, Resolvable } from './test_utils'; | ||
import { asOk, asErr } from './lib/result_type'; | ||
|
||
describe('TaskPoller', () => { | ||
|
@@ -243,6 +243,7 @@ describe('TaskPoller', () => { | |
}, | ||
getCapacity: () => 5, | ||
pollRequests$, | ||
workTimeout: pollInterval * 5, | ||
}).subscribe(handler); | ||
|
||
pollRequests$.next(some('one')); | ||
|
@@ -272,6 +273,68 @@ describe('TaskPoller', () => { | |
}) | ||
); | ||
|
||
test( | ||
'work times out whe nit exceeds a predefined amount of time', | ||
fakeSchedulers(async (advance) => { | ||
const pollInterval = 100; | ||
const workTimeout = pollInterval * 2; | ||
const bufferCapacity = 2; | ||
|
||
const handler = jest.fn(); | ||
|
||
type ResolvableTupple = [string, PromiseLike<void> & Resolvable]; | ||
const pollRequests$ = new Subject<Option<ResolvableTupple>>(); | ||
createTaskPoller<[string, Resolvable], string[]>({ | ||
pollInterval, | ||
bufferCapacity, | ||
work: async (...resolvables) => { | ||
await Promise.all(resolvables.map(([, future]) => future)); | ||
return resolvables.map(([name]) => name); | ||
}, | ||
getCapacity: () => 5, | ||
pollRequests$, | ||
workTimeout, | ||
}).subscribe(handler); | ||
|
||
const one: ResolvableTupple = ['one', resolvable()]; | ||
pollRequests$.next(some(one)); | ||
|
||
// split these into two payloads | ||
advance(pollInterval); | ||
|
||
const two: ResolvableTupple = ['two', resolvable()]; | ||
const three: ResolvableTupple = ['three', resolvable()]; | ||
pollRequests$.next(some(two)); | ||
pollRequests$.next(some(three)); | ||
|
||
advance(workTimeout); | ||
await sleep(workTimeout); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we account for a bit of slop here? Seems like it shouldn't be possible for this to resolve before the workTimeout, but you know ... node, time, etc, heh. Maybe just add 100ms or so? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wanted to start here and address if it does introduce flakiness. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ya, I'm guessing even if we did see some timing flakiness, it would be fairly obvious that was the problem, given the context, so WORKSFORME |
||
|
||
// one resolves too late! | ||
one[1].resolve(); | ||
|
||
expect(handler).toHaveBeenCalledWith( | ||
asErr( | ||
new PollingError<string>( | ||
'Failed to poll for work: Error: work has timed out', | ||
PollingErrorType.WorkError, | ||
none | ||
) | ||
) | ||
); | ||
expect(handler.mock.calls[0][0].error.type).toEqual(PollingErrorType.WorkError); | ||
|
||
// two and three in time | ||
two[1].resolve(); | ||
three[1].resolve(); | ||
|
||
advance(pollInterval); | ||
await sleep(pollInterval); | ||
|
||
expect(handler).toHaveBeenCalledWith(asOk(['two', 'three'])); | ||
}) | ||
); | ||
|
||
test( | ||
'returns an error when polling for work fails', | ||
fakeSchedulers(async (advance) => { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -25,6 +25,7 @@ import { | |
asErr, | ||
promiseResult, | ||
} from './lib/result_type'; | ||
import { timeoutPromiseAfter } from './lib/timeout_promise_after'; | ||
|
||
type WorkFn<T, H> = (...params: T[]) => Promise<H>; | ||
|
||
|
@@ -34,6 +35,7 @@ interface Opts<T, H> { | |
getCapacity: () => number; | ||
pollRequests$: Observable<Option<T>>; | ||
work: WorkFn<T, H>; | ||
workTimeout?: number; | ||
} | ||
|
||
/** | ||
|
@@ -55,6 +57,7 @@ export function createTaskPoller<T, H>({ | |
pollRequests$, | ||
bufferCapacity, | ||
work, | ||
workTimeout, | ||
}: Opts<T, H>): Observable<Result<H, PollingError<T>>> { | ||
const hasCapacity = () => getCapacity() > 0; | ||
|
||
|
@@ -89,11 +92,15 @@ export function createTaskPoller<T, H>({ | |
concatMap(async (set: Set<T>) => { | ||
closeSleepPerf(); | ||
return mapResult<H, Error, Result<H, PollingError<T>>>( | ||
await promiseResult<H, Error>(work(...pullFromSet(set, getCapacity()))), | ||
await promiseResult<H, Error>( | ||
timeoutPromiseAfter<H, Error>( | ||
work(...pullFromSet(set, getCapacity())), | ||
workTimeout ?? pollInterval, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is the case for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. hmm it was really just a default... but yeah, you're right that if someone sets the interval to be lower than the latency of talking to Elasticsearch this will behave badly 🤔 Not sure about the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As the default config is set to |
||
() => new Error(`work has timed out`) | ||
) | ||
), | ||
(workResult) => asOk(workResult), | ||
(err: Error) => { | ||
return asPollingError<T>(err, PollingErrorType.WorkError); | ||
} | ||
(err: Error) => asPollingError<T>(err, PollingErrorType.WorkError) | ||
); | ||
}), | ||
tap(openSleepPerf), | ||
|
@@ -129,6 +136,7 @@ function pushOptionalIntoSet<T>( | |
|
||
export enum PollingErrorType { | ||
WorkError, | ||
WorkTimeout, | ||
RequestCapacityReached, | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Curious why you used a "count" as the config bit here, instead of a duration - my guess is the "count" makes this setting "move" if the poll interval changes, which I think is a nice pattern - I'd been thinking some of our other explicit timeouts could be "relative" like this as well - eg, throttle based on alert interval (eg, throttle for 10 intervals, not 10 minutes, or probably allow both).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, Mike asked the same. :)
It's exactly that - to make it relative to the polling interval.
It also reduces the scenarios you need to support, such as a work duration that's lower than polling interval, and the potential complexity that introduces.