Skip to content

Commit

Permalink
[Task Manager] adds more granular polling results to monitoring stats (
Browse files Browse the repository at this point in the history
…elastic#87494)

Added the following values to the Polling stats:

- **NoAvailableWorkers**: This tells us when a polling cycle resulted in no tasks being claimed due to there being no available workers 
- **RunningAtCapacity**: This tells us when a polling cycle resulted in tasks being claimed at 100% capacity of the available workers
- **Failed**: This tells us when the poller failed to claim
  • Loading branch information
gmmorris committed Jan 6, 2021
1 parent 1c80c58 commit ac4ebf3
Show file tree
Hide file tree
Showing 9 changed files with 160 additions and 47 deletions.
16 changes: 14 additions & 2 deletions x-pack/plugins/task_manager/server/MONITORING.md
Original file line number Diff line number Diff line change
Expand Up @@ -179,9 +179,21 @@ For example, if you _curl_ the `/api/task_manager/_health` endpoint, you might g
/* What is the frequency of polling cycle result?
Here we see 94% of "NoTasksClaimed" and 6% "PoolFilled" */
"result_frequency_percent_as_number": {
/* This tells us that the polling cycle didnt claim any new tasks */
"NoTasksClaimed": 94,
"RanOutOfCapacity": 0, /* This is a legacy result, we might want to rename - it tells us when a polling cycle resulted in claiming more tasks than we had workers for, butt he name doesn't make much sense outside of the context of the code */
"PoolFilled": 6
/* This is a legacy result we are renaming in 8.0.0 -
it tells us when a polling cycle resulted in claiming more tasks
than we had workers for, butt he name doesn't make much sense outside of the context of the code */
"RanOutOfCapacity": 0,
/* This is a legacy result we are renaming in 8.0.0 -
it tells us when a polling cycle resulted in tasks being claimed but less the the available workers */
"PoolFilled": 6,
/* This tells us when a polling cycle resulted in no tasks being claimed due to there being no available workers */
"NoAvailableWorkers": 0,
/* This tells us when a polling cycle resulted in tasks being claimed at 100% capacity of the available workers */
"RunningAtCapacity": 0,
/* This tells us when the poller failed to claim */
"Failed": 0
}
},
/* on average, the tasks in this deployment run 1.7s after their scheduled time */
Expand Down
11 changes: 6 additions & 5 deletions x-pack/plugins/task_manager/server/lib/fill_pool.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import _ from 'lodash';
import sinon from 'sinon';
import { fillPool } from './fill_pool';
import { TaskPoolRunResult } from '../task_pool';
import { asOk } from './result_type';

describe('fillPool', () => {
test('stops filling when pool runs all claimed tasks, even if there is more capacity', async () => {
Expand All @@ -16,7 +17,7 @@ describe('fillPool', () => {
[4, 5],
];
let index = 0;
const fetchAvailableTasks = async () => tasks[index++] || [];
const fetchAvailableTasks = async () => asOk(tasks[index++] || []);
const run = sinon.spy(async () => TaskPoolRunResult.RunningAllClaimedTasks);
const converter = _.identity;

Expand All @@ -31,7 +32,7 @@ describe('fillPool', () => {
[4, 5],
];
let index = 0;
const fetchAvailableTasks = async () => tasks[index++] || [];
const fetchAvailableTasks = async () => asOk(tasks[index++] || []);
const run = sinon.spy(async () => TaskPoolRunResult.RanOutOfCapacity);
const converter = _.identity;

Expand All @@ -46,7 +47,7 @@ describe('fillPool', () => {
[4, 5],
];
let index = 0;
const fetchAvailableTasks = async () => tasks[index++] || [];
const fetchAvailableTasks = async () => asOk(tasks[index++] || []);
const run = sinon.spy(async () => TaskPoolRunResult.RanOutOfCapacity);
const converter = (x: number) => x.toString();

Expand Down Expand Up @@ -80,7 +81,7 @@ describe('fillPool', () => {
[4, 5],
];
let index = 0;
const fetchAvailableTasks = async () => tasks[index++] || [];
const fetchAvailableTasks = async () => asOk(tasks[index++] || []);

await fillPool(fetchAvailableTasks, converter, run);
} catch (err) {
Expand All @@ -95,7 +96,7 @@ describe('fillPool', () => {
[4, 5],
];
let index = 0;
const fetchAvailableTasks = async () => tasks[index++] || [];
const fetchAvailableTasks = async () => asOk(tasks[index++] || []);
const run = sinon.spy(async () => TaskPoolRunResult.RanOutOfCapacity);
const converter = (x: number) => {
throw new Error(`can not convert ${x}`);
Expand Down
62 changes: 38 additions & 24 deletions x-pack/plugins/task_manager/server/lib/fill_pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,19 @@

import { performance } from 'perf_hooks';
import { TaskPoolRunResult } from '../task_pool';
import { Result, map } from './result_type';

export enum FillPoolResult {
Failed = 'Failed',
NoAvailableWorkers = 'NoAvailableWorkers',
NoTasksClaimed = 'NoTasksClaimed',
RunningAtCapacity = 'RunningAtCapacity',
RanOutOfCapacity = 'RanOutOfCapacity',
PoolFilled = 'PoolFilled',
}

type BatchRun<T> = (tasks: T[]) => Promise<TaskPoolRunResult>;
type Fetcher<T> = () => Promise<T[]>;
type Fetcher<T, E> = () => Promise<Result<T[], E>>;
type Converter<T1, T2> = (t: T1) => T2;

/**
Expand All @@ -30,33 +34,43 @@ type Converter<T1, T2> = (t: T1) => T2;
* @param converter - a function that converts task records to the appropriate task runner
*/
export async function fillPool<TRecord, TRunner>(
fetchAvailableTasks: Fetcher<TRecord>,
fetchAvailableTasks: Fetcher<TRecord, FillPoolResult>,
converter: Converter<TRecord, TRunner>,
run: BatchRun<TRunner>
): Promise<FillPoolResult> {
performance.mark('fillPool.start');
const instances = await fetchAvailableTasks();
return map<TRecord[], FillPoolResult, Promise<FillPoolResult>>(
await fetchAvailableTasks(),
async (instances) => {
if (!instances.length) {
performance.mark('fillPool.bailNoTasks');
performance.measure(
'fillPool.activityDurationUntilNoTasks',
'fillPool.start',
'fillPool.bailNoTasks'
);
return FillPoolResult.NoTasksClaimed;
}

if (!instances.length) {
performance.mark('fillPool.bailNoTasks');
performance.measure(
'fillPool.activityDurationUntilNoTasks',
'fillPool.start',
'fillPool.bailNoTasks'
);
return FillPoolResult.NoTasksClaimed;
}
const tasks = instances.map(converter);
const tasks = instances.map(converter);

if ((await run(tasks)) === TaskPoolRunResult.RanOutOfCapacity) {
performance.mark('fillPool.bailExhaustedCapacity');
performance.measure(
'fillPool.activityDurationUntilExhaustedCapacity',
'fillPool.start',
'fillPool.bailExhaustedCapacity'
);
return FillPoolResult.RanOutOfCapacity;
}
performance.mark('fillPool.cycle');
return FillPoolResult.PoolFilled;
switch (await run(tasks)) {
case TaskPoolRunResult.RanOutOfCapacity:
performance.mark('fillPool.bailExhaustedCapacity');
performance.measure(
'fillPool.activityDurationUntilExhaustedCapacity',
'fillPool.start',
'fillPool.bailExhaustedCapacity'
);
return FillPoolResult.RanOutOfCapacity;
case TaskPoolRunResult.RunningAtCapacity:
performance.mark('fillPool.cycle');
return FillPoolResult.RunningAtCapacity;
default:
performance.mark('fillPool.cycle');
return FillPoolResult.PoolFilled;
}
},
async (result) => result
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -427,25 +427,95 @@ describe('Task Run Statistics', () => {
taskStats.map((taskStat) => taskStat.value.polling.result_frequency_percent_as_number)
).toEqual([
// NoTasksClaimed
{ NoTasksClaimed: 100, RanOutOfCapacity: 0, PoolFilled: 0 },
{
NoTasksClaimed: 100,
RanOutOfCapacity: 0,
PoolFilled: 0,
Failed: 0,
NoAvailableWorkers: 0,
RunningAtCapacity: 0,
},
// NoTasksClaimed, NoTasksClaimed,
{ NoTasksClaimed: 100, RanOutOfCapacity: 0, PoolFilled: 0 },
{
NoTasksClaimed: 100,
RanOutOfCapacity: 0,
PoolFilled: 0,
Failed: 0,
NoAvailableWorkers: 0,
RunningAtCapacity: 0,
},
// NoTasksClaimed, NoTasksClaimed, NoTasksClaimed
{ NoTasksClaimed: 100, RanOutOfCapacity: 0, PoolFilled: 0 },
{
NoTasksClaimed: 100,
RanOutOfCapacity: 0,
PoolFilled: 0,
Failed: 0,
NoAvailableWorkers: 0,
RunningAtCapacity: 0,
},
// NoTasksClaimed, NoTasksClaimed, NoTasksClaimed, PoolFilled
{ NoTasksClaimed: 75, RanOutOfCapacity: 0, PoolFilled: 25 },
{
NoTasksClaimed: 75,
RanOutOfCapacity: 0,
PoolFilled: 25,
Failed: 0,
NoAvailableWorkers: 0,
RunningAtCapacity: 0,
},
// NoTasksClaimed, NoTasksClaimed, NoTasksClaimed, PoolFilled, PoolFilled
{ NoTasksClaimed: 60, RanOutOfCapacity: 0, PoolFilled: 40 },
{
NoTasksClaimed: 60,
RanOutOfCapacity: 0,
PoolFilled: 40,
Failed: 0,
NoAvailableWorkers: 0,
RunningAtCapacity: 0,
},
// NoTasksClaimed, NoTasksClaimed, PoolFilled, PoolFilled, PoolFilled
{ NoTasksClaimed: 40, RanOutOfCapacity: 0, PoolFilled: 60 },
{
NoTasksClaimed: 40,
RanOutOfCapacity: 0,
PoolFilled: 60,
Failed: 0,
NoAvailableWorkers: 0,
RunningAtCapacity: 0,
},
// NoTasksClaimed, PoolFilled, PoolFilled, PoolFilled, RanOutOfCapacity
{ NoTasksClaimed: 20, RanOutOfCapacity: 20, PoolFilled: 60 },
{
NoTasksClaimed: 20,
RanOutOfCapacity: 20,
PoolFilled: 60,
Failed: 0,
NoAvailableWorkers: 0,
RunningAtCapacity: 0,
},
// PoolFilled, PoolFilled, PoolFilled, RanOutOfCapacity, RanOutOfCapacity
{ NoTasksClaimed: 0, RanOutOfCapacity: 40, PoolFilled: 60 },
{
NoTasksClaimed: 0,
RanOutOfCapacity: 40,
PoolFilled: 60,
Failed: 0,
NoAvailableWorkers: 0,
RunningAtCapacity: 0,
},
// PoolFilled, PoolFilled, RanOutOfCapacity, RanOutOfCapacity, NoTasksClaimed
{ NoTasksClaimed: 20, RanOutOfCapacity: 40, PoolFilled: 40 },
{
NoTasksClaimed: 20,
RanOutOfCapacity: 40,
PoolFilled: 40,
Failed: 0,
NoAvailableWorkers: 0,
RunningAtCapacity: 0,
},
// PoolFilled, RanOutOfCapacity, RanOutOfCapacity, NoTasksClaimed, NoTasksClaimed
{ NoTasksClaimed: 40, RanOutOfCapacity: 40, PoolFilled: 20 },
{
NoTasksClaimed: 40,
RanOutOfCapacity: 40,
PoolFilled: 20,
Failed: 0,
NoAvailableWorkers: 0,
RunningAtCapacity: 0,
},
]);
resolve();
} catch (e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,11 @@ export interface TaskRunStat extends JsonObject {
interface FillPoolRawStat extends JsonObject {
last_successful_poll: string;
result_frequency_percent_as_number: {
[FillPoolResult.Failed]: number;
[FillPoolResult.NoAvailableWorkers]: number;
[FillPoolResult.NoTasksClaimed]: number;
[FillPoolResult.RanOutOfCapacity]: number;
[FillPoolResult.RunningAtCapacity]: number;
[FillPoolResult.PoolFilled]: number;
};
}
Expand Down Expand Up @@ -163,8 +166,11 @@ const DEFAULT_TASK_RUN_FREQUENCIES = {
[TaskRunResult.Failed]: 0,
};
const DEFAULT_POLLING_FREQUENCIES = {
[FillPoolResult.Failed]: 0,
[FillPoolResult.NoAvailableWorkers]: 0,
[FillPoolResult.NoTasksClaimed]: 0,
[FillPoolResult.RanOutOfCapacity]: 0,
[FillPoolResult.RunningAtCapacity]: 0,
[FillPoolResult.PoolFilled]: 0,
};

Expand Down
9 changes: 5 additions & 4 deletions x-pack/plugins/task_manager/server/polling_lifecycle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import { Option, some, map as mapOptional } from 'fp-ts/lib/Option';
import { tap } from 'rxjs/operators';
import { Logger } from '../../../../src/core/server';

import { Result, asErr, mapErr } from './lib/result_type';
import { Result, asErr, mapErr, asOk } from './lib/result_type';
import { ManagedConfiguration } from './lib/create_managed_configuration';
import { TaskManagerConfig } from './config';

Expand Down Expand Up @@ -232,7 +232,7 @@ export async function claimAvailableTasks(
claim: (opts: OwnershipClaimingOpts) => Promise<ClaimOwnershipResult>,
availableWorkers: number,
logger: Logger
) {
): Promise<Result<ClaimOwnershipResult['docs'], FillPoolResult>> {
if (availableWorkers > 0) {
performance.mark('claimAvailableTasks_start');

Expand Down Expand Up @@ -260,12 +260,13 @@ export async function claimAvailableTasks(
} task(s) were fetched (${docs.map((doc) => doc.id).join(', ')})`
);
}
return docs;
return asOk(docs);
} catch (ex) {
if (identifyEsError(ex).includes('cannot execute [inline] scripts')) {
logger.warn(
`Task Manager cannot operate when inline scripts are disabled in Elasticsearch`
);
return asErr(FillPoolResult.Failed);
} else {
throw ex;
}
Expand All @@ -275,6 +276,6 @@ export async function claimAvailableTasks(
logger.debug(
`[Task Ownership]: Task Manager has skipped Claiming Ownership of available tasks at it has ran out Available Workers.`
);
return asErr(FillPoolResult.NoAvailableWorkers);
}
return [];
}
4 changes: 2 additions & 2 deletions x-pack/plugins/task_manager/server/task_pool.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ describe('TaskPool', () => {
]
`);

expect(result).toEqual(TaskPoolRunResult.RunningAllClaimedTasks);
expect(result).toEqual(TaskPoolRunResult.RunningAtCapacity);
});

test('should log when running a Task fails', async () => {
Expand Down Expand Up @@ -242,7 +242,7 @@ describe('TaskPool', () => {
},
]);

expect(result).toEqual(TaskPoolRunResult.RunningAllClaimedTasks);
expect(result).toEqual(TaskPoolRunResult.RunningAtCapacity);
expect(pool.occupiedWorkers).toEqual(2);
expect(pool.availableWorkers).toEqual(0);

Expand Down
6 changes: 6 additions & 0 deletions x-pack/plugins/task_manager/server/task_pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@ interface Opts {
}

export enum TaskPoolRunResult {
// This means we're running all the tasks we claimed
RunningAllClaimedTasks = 'RunningAllClaimedTasks',
// This means we're running all the tasks we claimed and we're at capacity
RunningAtCapacity = 'RunningAtCapacity',
// This means we're prematurely out of capacity and have accidentally claimed more tasks than we had capacity for
RanOutOfCapacity = 'RanOutOfCapacity',
}

Expand Down Expand Up @@ -123,6 +127,8 @@ export class TaskPool {
return this.attemptToRun(leftOverTasks);
}
return TaskPoolRunResult.RanOutOfCapacity;
} else if (!this.availableWorkers) {
return TaskPoolRunResult.RunningAtCapacity;
}
return TaskPoolRunResult.RunningAllClaimedTasks;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,9 @@ export default function ({ getService }: FtrProviderContext) {
expect(typeof polling.result_frequency_percent_as_number.NoTasksClaimed).to.eql('number');
expect(typeof polling.result_frequency_percent_as_number.RanOutOfCapacity).to.eql('number');
expect(typeof polling.result_frequency_percent_as_number.PoolFilled).to.eql('number');
expect(typeof polling.result_frequency_percent_as_number.NoAvailableWorkers).to.eql('number');
expect(typeof polling.result_frequency_percent_as_number.RunningAtCapacity).to.eql('number');
expect(typeof polling.result_frequency_percent_as_number.Failed).to.eql('number');

expect(typeof drift.p50).to.eql('number');
expect(typeof drift.p90).to.eql('number');
Expand Down

0 comments on commit ac4ebf3

Please sign in to comment.