diff --git a/x-pack/plugins/task_manager/server/MONITORING.md b/x-pack/plugins/task_manager/server/MONITORING.md index 4960086411e9a..3595b86317489 100644 --- a/x-pack/plugins/task_manager/server/MONITORING.md +++ b/x-pack/plugins/task_manager/server/MONITORING.md @@ -179,9 +179,21 @@ For example, if you _curl_ the `/api/task_manager/_health` endpoint, you might g /* What is the frequency of polling cycle result? Here we see 94% of "NoTasksClaimed" and 6% "PoolFilled" */ "result_frequency_percent_as_number": { + /* This tells us that the polling cycle didnt claim any new tasks */ "NoTasksClaimed": 94, - "RanOutOfCapacity": 0, /* This is a legacy result, we might want to rename - it tells us when a polling cycle resulted in claiming more tasks than we had workers for, butt he name doesn't make much sense outside of the context of the code */ - "PoolFilled": 6 + /* This is a legacy result we are renaming in 8.0.0 - + it tells us when a polling cycle resulted in claiming more tasks + than we had workers for, butt he name doesn't make much sense outside of the context of the code */ + "RanOutOfCapacity": 0, + /* This is a legacy result we are renaming in 8.0.0 - + it tells us when a polling cycle resulted in tasks being claimed but less the the available workers */ + "PoolFilled": 6, + /* This tells us when a polling cycle resulted in no tasks being claimed due to there being no available workers */ + "NoAvailableWorkers": 0, + /* This tells us when a polling cycle resulted in tasks being claimed at 100% capacity of the available workers */ + "RunningAtCapacity": 0, + /* This tells us when the poller failed to claim */ + "Failed": 0 } }, /* on average, the tasks in this deployment run 1.7s after their scheduled time */ diff --git a/x-pack/plugins/task_manager/server/lib/fill_pool.test.ts b/x-pack/plugins/task_manager/server/lib/fill_pool.test.ts index ebb72c3ed36d6..a2c1eb514aebc 100644 --- a/x-pack/plugins/task_manager/server/lib/fill_pool.test.ts +++ b/x-pack/plugins/task_manager/server/lib/fill_pool.test.ts @@ -8,6 +8,7 @@ import _ from 'lodash'; import sinon from 'sinon'; import { fillPool } from './fill_pool'; import { TaskPoolRunResult } from '../task_pool'; +import { asOk } from './result_type'; describe('fillPool', () => { test('stops filling when pool runs all claimed tasks, even if there is more capacity', async () => { @@ -16,7 +17,7 @@ describe('fillPool', () => { [4, 5], ]; let index = 0; - const fetchAvailableTasks = async () => tasks[index++] || []; + const fetchAvailableTasks = async () => asOk(tasks[index++] || []); const run = sinon.spy(async () => TaskPoolRunResult.RunningAllClaimedTasks); const converter = _.identity; @@ -31,7 +32,7 @@ describe('fillPool', () => { [4, 5], ]; let index = 0; - const fetchAvailableTasks = async () => tasks[index++] || []; + const fetchAvailableTasks = async () => asOk(tasks[index++] || []); const run = sinon.spy(async () => TaskPoolRunResult.RanOutOfCapacity); const converter = _.identity; @@ -46,7 +47,7 @@ describe('fillPool', () => { [4, 5], ]; let index = 0; - const fetchAvailableTasks = async () => tasks[index++] || []; + const fetchAvailableTasks = async () => asOk(tasks[index++] || []); const run = sinon.spy(async () => TaskPoolRunResult.RanOutOfCapacity); const converter = (x: number) => x.toString(); @@ -80,7 +81,7 @@ describe('fillPool', () => { [4, 5], ]; let index = 0; - const fetchAvailableTasks = async () => tasks[index++] || []; + const fetchAvailableTasks = async () => asOk(tasks[index++] || []); await fillPool(fetchAvailableTasks, converter, run); } catch (err) { @@ -95,7 +96,7 @@ describe('fillPool', () => { [4, 5], ]; let index = 0; - const fetchAvailableTasks = async () => tasks[index++] || []; + const fetchAvailableTasks = async () => asOk(tasks[index++] || []); const run = sinon.spy(async () => TaskPoolRunResult.RanOutOfCapacity); const converter = (x: number) => { throw new Error(`can not convert ${x}`); diff --git a/x-pack/plugins/task_manager/server/lib/fill_pool.ts b/x-pack/plugins/task_manager/server/lib/fill_pool.ts index 9e4894587203d..5ab173755662f 100644 --- a/x-pack/plugins/task_manager/server/lib/fill_pool.ts +++ b/x-pack/plugins/task_manager/server/lib/fill_pool.ts @@ -6,15 +6,19 @@ import { performance } from 'perf_hooks'; import { TaskPoolRunResult } from '../task_pool'; +import { Result, map } from './result_type'; export enum FillPoolResult { + Failed = 'Failed', + NoAvailableWorkers = 'NoAvailableWorkers', NoTasksClaimed = 'NoTasksClaimed', + RunningAtCapacity = 'RunningAtCapacity', RanOutOfCapacity = 'RanOutOfCapacity', PoolFilled = 'PoolFilled', } type BatchRun = (tasks: T[]) => Promise; -type Fetcher = () => Promise; +type Fetcher = () => Promise>; type Converter = (t: T1) => T2; /** @@ -30,33 +34,43 @@ type Converter = (t: T1) => T2; * @param converter - a function that converts task records to the appropriate task runner */ export async function fillPool( - fetchAvailableTasks: Fetcher, + fetchAvailableTasks: Fetcher, converter: Converter, run: BatchRun ): Promise { performance.mark('fillPool.start'); - const instances = await fetchAvailableTasks(); + return map>( + await fetchAvailableTasks(), + async (instances) => { + if (!instances.length) { + performance.mark('fillPool.bailNoTasks'); + performance.measure( + 'fillPool.activityDurationUntilNoTasks', + 'fillPool.start', + 'fillPool.bailNoTasks' + ); + return FillPoolResult.NoTasksClaimed; + } - if (!instances.length) { - performance.mark('fillPool.bailNoTasks'); - performance.measure( - 'fillPool.activityDurationUntilNoTasks', - 'fillPool.start', - 'fillPool.bailNoTasks' - ); - return FillPoolResult.NoTasksClaimed; - } - const tasks = instances.map(converter); + const tasks = instances.map(converter); - if ((await run(tasks)) === TaskPoolRunResult.RanOutOfCapacity) { - performance.mark('fillPool.bailExhaustedCapacity'); - performance.measure( - 'fillPool.activityDurationUntilExhaustedCapacity', - 'fillPool.start', - 'fillPool.bailExhaustedCapacity' - ); - return FillPoolResult.RanOutOfCapacity; - } - performance.mark('fillPool.cycle'); - return FillPoolResult.PoolFilled; + switch (await run(tasks)) { + case TaskPoolRunResult.RanOutOfCapacity: + performance.mark('fillPool.bailExhaustedCapacity'); + performance.measure( + 'fillPool.activityDurationUntilExhaustedCapacity', + 'fillPool.start', + 'fillPool.bailExhaustedCapacity' + ); + return FillPoolResult.RanOutOfCapacity; + case TaskPoolRunResult.RunningAtCapacity: + performance.mark('fillPool.cycle'); + return FillPoolResult.RunningAtCapacity; + default: + performance.mark('fillPool.cycle'); + return FillPoolResult.PoolFilled; + } + }, + async (result) => result + ); } diff --git a/x-pack/plugins/task_manager/server/monitoring/task_run_statistics.test.ts b/x-pack/plugins/task_manager/server/monitoring/task_run_statistics.test.ts index 538acd51bb792..7d5a8811dbe2b 100644 --- a/x-pack/plugins/task_manager/server/monitoring/task_run_statistics.test.ts +++ b/x-pack/plugins/task_manager/server/monitoring/task_run_statistics.test.ts @@ -427,25 +427,95 @@ describe('Task Run Statistics', () => { taskStats.map((taskStat) => taskStat.value.polling.result_frequency_percent_as_number) ).toEqual([ // NoTasksClaimed - { NoTasksClaimed: 100, RanOutOfCapacity: 0, PoolFilled: 0 }, + { + NoTasksClaimed: 100, + RanOutOfCapacity: 0, + PoolFilled: 0, + Failed: 0, + NoAvailableWorkers: 0, + RunningAtCapacity: 0, + }, // NoTasksClaimed, NoTasksClaimed, - { NoTasksClaimed: 100, RanOutOfCapacity: 0, PoolFilled: 0 }, + { + NoTasksClaimed: 100, + RanOutOfCapacity: 0, + PoolFilled: 0, + Failed: 0, + NoAvailableWorkers: 0, + RunningAtCapacity: 0, + }, // NoTasksClaimed, NoTasksClaimed, NoTasksClaimed - { NoTasksClaimed: 100, RanOutOfCapacity: 0, PoolFilled: 0 }, + { + NoTasksClaimed: 100, + RanOutOfCapacity: 0, + PoolFilled: 0, + Failed: 0, + NoAvailableWorkers: 0, + RunningAtCapacity: 0, + }, // NoTasksClaimed, NoTasksClaimed, NoTasksClaimed, PoolFilled - { NoTasksClaimed: 75, RanOutOfCapacity: 0, PoolFilled: 25 }, + { + NoTasksClaimed: 75, + RanOutOfCapacity: 0, + PoolFilled: 25, + Failed: 0, + NoAvailableWorkers: 0, + RunningAtCapacity: 0, + }, // NoTasksClaimed, NoTasksClaimed, NoTasksClaimed, PoolFilled, PoolFilled - { NoTasksClaimed: 60, RanOutOfCapacity: 0, PoolFilled: 40 }, + { + NoTasksClaimed: 60, + RanOutOfCapacity: 0, + PoolFilled: 40, + Failed: 0, + NoAvailableWorkers: 0, + RunningAtCapacity: 0, + }, // NoTasksClaimed, NoTasksClaimed, PoolFilled, PoolFilled, PoolFilled - { NoTasksClaimed: 40, RanOutOfCapacity: 0, PoolFilled: 60 }, + { + NoTasksClaimed: 40, + RanOutOfCapacity: 0, + PoolFilled: 60, + Failed: 0, + NoAvailableWorkers: 0, + RunningAtCapacity: 0, + }, // NoTasksClaimed, PoolFilled, PoolFilled, PoolFilled, RanOutOfCapacity - { NoTasksClaimed: 20, RanOutOfCapacity: 20, PoolFilled: 60 }, + { + NoTasksClaimed: 20, + RanOutOfCapacity: 20, + PoolFilled: 60, + Failed: 0, + NoAvailableWorkers: 0, + RunningAtCapacity: 0, + }, // PoolFilled, PoolFilled, PoolFilled, RanOutOfCapacity, RanOutOfCapacity - { NoTasksClaimed: 0, RanOutOfCapacity: 40, PoolFilled: 60 }, + { + NoTasksClaimed: 0, + RanOutOfCapacity: 40, + PoolFilled: 60, + Failed: 0, + NoAvailableWorkers: 0, + RunningAtCapacity: 0, + }, // PoolFilled, PoolFilled, RanOutOfCapacity, RanOutOfCapacity, NoTasksClaimed - { NoTasksClaimed: 20, RanOutOfCapacity: 40, PoolFilled: 40 }, + { + NoTasksClaimed: 20, + RanOutOfCapacity: 40, + PoolFilled: 40, + Failed: 0, + NoAvailableWorkers: 0, + RunningAtCapacity: 0, + }, // PoolFilled, RanOutOfCapacity, RanOutOfCapacity, NoTasksClaimed, NoTasksClaimed - { NoTasksClaimed: 40, RanOutOfCapacity: 40, PoolFilled: 20 }, + { + NoTasksClaimed: 40, + RanOutOfCapacity: 40, + PoolFilled: 20, + Failed: 0, + NoAvailableWorkers: 0, + RunningAtCapacity: 0, + }, ]); resolve(); } catch (e) { diff --git a/x-pack/plugins/task_manager/server/monitoring/task_run_statistics.ts b/x-pack/plugins/task_manager/server/monitoring/task_run_statistics.ts index 3e0d517172d01..c1851789a769d 100644 --- a/x-pack/plugins/task_manager/server/monitoring/task_run_statistics.ts +++ b/x-pack/plugins/task_manager/server/monitoring/task_run_statistics.ts @@ -52,8 +52,11 @@ export interface TaskRunStat extends JsonObject { interface FillPoolRawStat extends JsonObject { last_successful_poll: string; result_frequency_percent_as_number: { + [FillPoolResult.Failed]: number; + [FillPoolResult.NoAvailableWorkers]: number; [FillPoolResult.NoTasksClaimed]: number; [FillPoolResult.RanOutOfCapacity]: number; + [FillPoolResult.RunningAtCapacity]: number; [FillPoolResult.PoolFilled]: number; }; } @@ -163,8 +166,11 @@ const DEFAULT_TASK_RUN_FREQUENCIES = { [TaskRunResult.Failed]: 0, }; const DEFAULT_POLLING_FREQUENCIES = { + [FillPoolResult.Failed]: 0, + [FillPoolResult.NoAvailableWorkers]: 0, [FillPoolResult.NoTasksClaimed]: 0, [FillPoolResult.RanOutOfCapacity]: 0, + [FillPoolResult.RunningAtCapacity]: 0, [FillPoolResult.PoolFilled]: 0, }; diff --git a/x-pack/plugins/task_manager/server/polling_lifecycle.ts b/x-pack/plugins/task_manager/server/polling_lifecycle.ts index 1a8108e34078d..1876c52b0029e 100644 --- a/x-pack/plugins/task_manager/server/polling_lifecycle.ts +++ b/x-pack/plugins/task_manager/server/polling_lifecycle.ts @@ -12,7 +12,7 @@ import { Option, some, map as mapOptional } from 'fp-ts/lib/Option'; import { tap } from 'rxjs/operators'; import { Logger } from '../../../../src/core/server'; -import { Result, asErr, mapErr } from './lib/result_type'; +import { Result, asErr, mapErr, asOk } from './lib/result_type'; import { ManagedConfiguration } from './lib/create_managed_configuration'; import { TaskManagerConfig } from './config'; @@ -232,7 +232,7 @@ export async function claimAvailableTasks( claim: (opts: OwnershipClaimingOpts) => Promise, availableWorkers: number, logger: Logger -) { +): Promise> { if (availableWorkers > 0) { performance.mark('claimAvailableTasks_start'); @@ -260,12 +260,13 @@ export async function claimAvailableTasks( } task(s) were fetched (${docs.map((doc) => doc.id).join(', ')})` ); } - return docs; + return asOk(docs); } catch (ex) { if (identifyEsError(ex).includes('cannot execute [inline] scripts')) { logger.warn( `Task Manager cannot operate when inline scripts are disabled in Elasticsearch` ); + return asErr(FillPoolResult.Failed); } else { throw ex; } @@ -275,6 +276,6 @@ export async function claimAvailableTasks( logger.debug( `[Task Ownership]: Task Manager has skipped Claiming Ownership of available tasks at it has ran out Available Workers.` ); + return asErr(FillPoolResult.NoAvailableWorkers); } - return []; } diff --git a/x-pack/plugins/task_manager/server/task_pool.test.ts b/x-pack/plugins/task_manager/server/task_pool.test.ts index a174af71ef18f..95768bb2f1afa 100644 --- a/x-pack/plugins/task_manager/server/task_pool.test.ts +++ b/x-pack/plugins/task_manager/server/task_pool.test.ts @@ -92,7 +92,7 @@ describe('TaskPool', () => { ] `); - expect(result).toEqual(TaskPoolRunResult.RunningAllClaimedTasks); + expect(result).toEqual(TaskPoolRunResult.RunningAtCapacity); }); test('should log when running a Task fails', async () => { @@ -242,7 +242,7 @@ describe('TaskPool', () => { }, ]); - expect(result).toEqual(TaskPoolRunResult.RunningAllClaimedTasks); + expect(result).toEqual(TaskPoolRunResult.RunningAtCapacity); expect(pool.occupiedWorkers).toEqual(2); expect(pool.availableWorkers).toEqual(0); diff --git a/x-pack/plugins/task_manager/server/task_pool.ts b/x-pack/plugins/task_manager/server/task_pool.ts index acd2ea59ad30b..6946cd613e0a7 100644 --- a/x-pack/plugins/task_manager/server/task_pool.ts +++ b/x-pack/plugins/task_manager/server/task_pool.ts @@ -22,7 +22,11 @@ interface Opts { } export enum TaskPoolRunResult { + // This means we're running all the tasks we claimed RunningAllClaimedTasks = 'RunningAllClaimedTasks', + // This means we're running all the tasks we claimed and we're at capacity + RunningAtCapacity = 'RunningAtCapacity', + // This means we're prematurely out of capacity and have accidentally claimed more tasks than we had capacity for RanOutOfCapacity = 'RanOutOfCapacity', } @@ -123,6 +127,8 @@ export class TaskPool { return this.attemptToRun(leftOverTasks); } return TaskPoolRunResult.RanOutOfCapacity; + } else if (!this.availableWorkers) { + return TaskPoolRunResult.RunningAtCapacity; } return TaskPoolRunResult.RunningAllClaimedTasks; } diff --git a/x-pack/test/plugin_api_integration/test_suites/task_manager/health_route.ts b/x-pack/test/plugin_api_integration/test_suites/task_manager/health_route.ts index 9b02b58573673..eb8e35fd871f3 100644 --- a/x-pack/test/plugin_api_integration/test_suites/task_manager/health_route.ts +++ b/x-pack/test/plugin_api_integration/test_suites/task_manager/health_route.ts @@ -178,6 +178,9 @@ export default function ({ getService }: FtrProviderContext) { expect(typeof polling.result_frequency_percent_as_number.NoTasksClaimed).to.eql('number'); expect(typeof polling.result_frequency_percent_as_number.RanOutOfCapacity).to.eql('number'); expect(typeof polling.result_frequency_percent_as_number.PoolFilled).to.eql('number'); + expect(typeof polling.result_frequency_percent_as_number.NoAvailableWorkers).to.eql('number'); + expect(typeof polling.result_frequency_percent_as_number.RunningAtCapacity).to.eql('number'); + expect(typeof polling.result_frequency_percent_as_number.Failed).to.eql('number'); expect(typeof drift.p50).to.eql('number'); expect(typeof drift.p90).to.eql('number');