diff --git a/x-pack/plugins/task_manager/server/README.md b/x-pack/plugins/task_manager/server/README.md index c3d45be5d8f22..fd2409a7db0a5 100644 --- a/x-pack/plugins/task_manager/server/README.md +++ b/x-pack/plugins/task_manager/server/README.md @@ -41,6 +41,7 @@ The task_manager can be configured via `taskManager` config options (e.g. `taskM - `max_attempts` - The maximum number of times a task will be attempted before being abandoned as failed - `poll_interval` - How often the background worker should check the task_manager index for more work +- `max_poll_inactivity_cycles` - How many poll intervals is work allowed to block polling for before it's timed out. This does not include task execution, as task execution does not block the polling, but rather includes work needed to manage Task Manager's state. - `index` - The name of the index that the task_manager - `max_workers` - The maximum number of tasks a Kibana will run concurrently (defaults to 10) - `credentials` - Encrypted user credentials. All tasks will run in the security context of this user. See [this issue](https://github.com/elastic/dev/issues/1045) for a discussion on task scheduler security. diff --git a/x-pack/plugins/task_manager/server/config.test.ts b/x-pack/plugins/task_manager/server/config.test.ts index 8e877f696a2fc..d5bbbe65582f1 100644 --- a/x-pack/plugins/task_manager/server/config.test.ts +++ b/x-pack/plugins/task_manager/server/config.test.ts @@ -13,6 +13,7 @@ describe('config validation', () => { "enabled": true, "index": ".kibana_task_manager", "max_attempts": 3, + "max_poll_inactivity_cycles": 10, "max_workers": 10, "poll_interval": 3000, "request_capacity": 1000, diff --git a/x-pack/plugins/task_manager/server/config.ts b/x-pack/plugins/task_manager/server/config.ts index e3af12eca8a49..aa78cf3baa96d 100644 --- a/x-pack/plugins/task_manager/server/config.ts +++ b/x-pack/plugins/task_manager/server/config.ts @@ -8,6 +8,7 @@ import { schema, TypeOf } from '@kbn/config-schema'; export const DEFAULT_MAX_WORKERS = 10; export const DEFAULT_POLL_INTERVAL = 3000; +export const DEFAULT_MAX_POLL_INACTIVITY_CYCLES = 10; export const configSchema = schema.object({ enabled: schema.boolean({ defaultValue: true }), @@ -21,6 +22,11 @@ export const configSchema = schema.object({ defaultValue: DEFAULT_POLL_INTERVAL, min: 100, }), + /* How many poll interval cycles can work take before it's timed out. */ + max_poll_inactivity_cycles: schema.number({ + defaultValue: DEFAULT_MAX_POLL_INACTIVITY_CYCLES, + min: 1, + }), /* How many requests can Task Manager buffer before it rejects new requests. */ request_capacity: schema.number({ // a nice round contrived number, feel free to change as we learn how it behaves diff --git a/x-pack/plugins/task_manager/server/lib/timeout_promise_after.test.ts b/x-pack/plugins/task_manager/server/lib/timeout_promise_after.test.ts new file mode 100644 index 0000000000000..3e88269671dcc --- /dev/null +++ b/x-pack/plugins/task_manager/server/lib/timeout_promise_after.test.ts @@ -0,0 +1,33 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import { timeoutPromiseAfter } from './timeout_promise_after'; + +const delay = (ms: number, result: unknown) => + new Promise((resolve) => setTimeout(() => resolve(result), ms)); + +const delayRejection = (ms: number, result: unknown) => + new Promise((resolve, reject) => setTimeout(() => reject(result), ms)); + +describe('Promise Timeout', () => { + test('resolves when wrapped promise resolves', async () => { + return expect( + timeoutPromiseAfter(delay(100, 'OK'), 1000, () => 'TIMEOUT ERR') + ).resolves.toMatchInlineSnapshot(`"OK"`); + }); + + test('reject when wrapped promise rejects', async () => { + return expect( + timeoutPromiseAfter(delayRejection(100, 'ERR'), 1000, () => 'TIMEOUT ERR') + ).rejects.toMatchInlineSnapshot(`"ERR"`); + }); + + test('reject it the timeout elapses', async () => { + return expect( + timeoutPromiseAfter(delay(1000, 'OK'), 100, () => 'TIMEOUT ERR') + ).rejects.toMatchInlineSnapshot(`"TIMEOUT ERR"`); + }); +}); diff --git a/x-pack/plugins/task_manager/server/lib/timeout_promise_after.ts b/x-pack/plugins/task_manager/server/lib/timeout_promise_after.ts new file mode 100644 index 0000000000000..2f99bde26ca41 --- /dev/null +++ b/x-pack/plugins/task_manager/server/lib/timeout_promise_after.ts @@ -0,0 +1,16 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +export function timeoutPromiseAfter( + future: Promise, + ms: number, + onTimeout: () => G +): Promise { + return new Promise((resolve, reject) => { + setTimeout(() => reject(onTimeout()), ms); + future.then(resolve).catch(reject); + }); +} diff --git a/x-pack/plugins/task_manager/server/task_manager.test.ts b/x-pack/plugins/task_manager/server/task_manager.test.ts index 7035971ad6061..cf7f9e2a7cff3 100644 --- a/x-pack/plugins/task_manager/server/task_manager.test.ts +++ b/x-pack/plugins/task_manager/server/task_manager.test.ts @@ -40,6 +40,7 @@ describe('TaskManager', () => { index: 'foo', max_attempts: 9, poll_interval: 6000000, + max_poll_inactivity_cycles: 10, request_capacity: 1000, }; const taskManagerOpts = { diff --git a/x-pack/plugins/task_manager/server/task_manager.ts b/x-pack/plugins/task_manager/server/task_manager.ts index 7165fd28678c1..851a6353739ac 100644 --- a/x-pack/plugins/task_manager/server/task_manager.ts +++ b/x-pack/plugins/task_manager/server/task_manager.ts @@ -159,6 +159,11 @@ export class TaskManager { getCapacity: () => this.pool.availableWorkers, pollRequests$: this.claimRequests$, work: this.pollForWork, + // Time out the `work` phase if it takes longer than a certain number of polling cycles + // The `work` phase includes the prework needed *before* executing a task + // (such as polling for new work, marking tasks as running etc.) but does not + // include the time of actually running the task + workTimeout: opts.config.poll_interval * opts.config.max_poll_inactivity_cycles, }); } diff --git a/x-pack/plugins/task_manager/server/task_poller.test.ts b/x-pack/plugins/task_manager/server/task_poller.test.ts index 4b0ecef7ff917..98e6d0f9388a4 100644 --- a/x-pack/plugins/task_manager/server/task_poller.test.ts +++ b/x-pack/plugins/task_manager/server/task_poller.test.ts @@ -9,7 +9,7 @@ import { Subject } from 'rxjs'; import { Option, none, some } from 'fp-ts/lib/Option'; import { createTaskPoller, PollingError, PollingErrorType } from './task_poller'; import { fakeSchedulers } from 'rxjs-marbles/jest'; -import { sleep, resolvable } from './test_utils'; +import { sleep, resolvable, Resolvable } from './test_utils'; import { asOk, asErr } from './lib/result_type'; describe('TaskPoller', () => { @@ -243,6 +243,7 @@ describe('TaskPoller', () => { }, getCapacity: () => 5, pollRequests$, + workTimeout: pollInterval * 5, }).subscribe(handler); pollRequests$.next(some('one')); @@ -272,6 +273,68 @@ describe('TaskPoller', () => { }) ); + test( + 'work times out whe nit exceeds a predefined amount of time', + fakeSchedulers(async (advance) => { + const pollInterval = 100; + const workTimeout = pollInterval * 2; + const bufferCapacity = 2; + + const handler = jest.fn(); + + type ResolvableTupple = [string, PromiseLike & Resolvable]; + const pollRequests$ = new Subject>(); + createTaskPoller<[string, Resolvable], string[]>({ + pollInterval, + bufferCapacity, + work: async (...resolvables) => { + await Promise.all(resolvables.map(([, future]) => future)); + return resolvables.map(([name]) => name); + }, + getCapacity: () => 5, + pollRequests$, + workTimeout, + }).subscribe(handler); + + const one: ResolvableTupple = ['one', resolvable()]; + pollRequests$.next(some(one)); + + // split these into two payloads + advance(pollInterval); + + const two: ResolvableTupple = ['two', resolvable()]; + const three: ResolvableTupple = ['three', resolvable()]; + pollRequests$.next(some(two)); + pollRequests$.next(some(three)); + + advance(workTimeout); + await sleep(workTimeout); + + // one resolves too late! + one[1].resolve(); + + expect(handler).toHaveBeenCalledWith( + asErr( + new PollingError( + 'Failed to poll for work: Error: work has timed out', + PollingErrorType.WorkError, + none + ) + ) + ); + expect(handler.mock.calls[0][0].error.type).toEqual(PollingErrorType.WorkError); + + // two and three in time + two[1].resolve(); + three[1].resolve(); + + advance(pollInterval); + await sleep(pollInterval); + + expect(handler).toHaveBeenCalledWith(asOk(['two', 'three'])); + }) + ); + test( 'returns an error when polling for work fails', fakeSchedulers(async (advance) => { diff --git a/x-pack/plugins/task_manager/server/task_poller.ts b/x-pack/plugins/task_manager/server/task_poller.ts index 3e1a04a366b0e..88511f42f96fb 100644 --- a/x-pack/plugins/task_manager/server/task_poller.ts +++ b/x-pack/plugins/task_manager/server/task_poller.ts @@ -25,6 +25,7 @@ import { asErr, promiseResult, } from './lib/result_type'; +import { timeoutPromiseAfter } from './lib/timeout_promise_after'; type WorkFn = (...params: T[]) => Promise; @@ -34,6 +35,7 @@ interface Opts { getCapacity: () => number; pollRequests$: Observable>; work: WorkFn; + workTimeout?: number; } /** @@ -55,6 +57,7 @@ export function createTaskPoller({ pollRequests$, bufferCapacity, work, + workTimeout, }: Opts): Observable>> { const hasCapacity = () => getCapacity() > 0; @@ -89,11 +92,15 @@ export function createTaskPoller({ concatMap(async (set: Set) => { closeSleepPerf(); return mapResult>>( - await promiseResult(work(...pullFromSet(set, getCapacity()))), + await promiseResult( + timeoutPromiseAfter( + work(...pullFromSet(set, getCapacity())), + workTimeout ?? pollInterval, + () => new Error(`work has timed out`) + ) + ), (workResult) => asOk(workResult), - (err: Error) => { - return asPollingError(err, PollingErrorType.WorkError); - } + (err: Error) => asPollingError(err, PollingErrorType.WorkError) ); }), tap(openSleepPerf), @@ -129,6 +136,7 @@ function pushOptionalIntoSet( export enum PollingErrorType { WorkError, + WorkTimeout, RequestCapacityReached, } diff --git a/x-pack/plugins/task_manager/server/test_utils/index.ts b/x-pack/plugins/task_manager/server/test_utils/index.ts index 3f000a9564ba3..6f43a60ff42d2 100644 --- a/x-pack/plugins/task_manager/server/test_utils/index.ts +++ b/x-pack/plugins/task_manager/server/test_utils/index.ts @@ -23,7 +23,7 @@ export function mockLogger() { }; } -interface Resolvable { +export interface Resolvable { resolve: () => void; }