Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Task Manager] adds more granular polling results to monitoring stats #87494

Merged
merged 3 commits into from
Jan 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions x-pack/plugins/task_manager/server/MONITORING.md
Original file line number Diff line number Diff line change
Expand Up @@ -179,9 +179,21 @@ For example, if you _curl_ the `/api/task_manager/_health` endpoint, you might g
/* What is the frequency of polling cycle result?
Here we see 94% of "NoTasksClaimed" and 6% "PoolFilled" */
"result_frequency_percent_as_number": {
/* This tells us that the polling cycle didnt claim any new tasks */
"NoTasksClaimed": 94,
"RanOutOfCapacity": 0, /* This is a legacy result, we might want to rename - it tells us when a polling cycle resulted in claiming more tasks than we had workers for, butt he name doesn't make much sense outside of the context of the code */
"PoolFilled": 6
/* This is a legacy result we are renaming in 8.0.0 -
it tells us when a polling cycle resulted in claiming more tasks
than we had workers for, butt he name doesn't make much sense outside of the context of the code */
"RanOutOfCapacity": 0,
/* This is a legacy result we are renaming in 8.0.0 -
it tells us when a polling cycle resulted in tasks being claimed but less the the available workers */
"PoolFilled": 6,
/* This tells us when a polling cycle resulted in no tasks being claimed due to there being no available workers */
"NoAvailableWorkers": 0,
/* This tells us when a polling cycle resulted in tasks being claimed at 100% capacity of the available workers */
"RunningAtCapacity": 0,
/* This tells us when the poller failed to claim */
"Failed": 0
}
},
/* on average, the tasks in this deployment run 1.7s after their scheduled time */
Expand Down
11 changes: 6 additions & 5 deletions x-pack/plugins/task_manager/server/lib/fill_pool.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import _ from 'lodash';
import sinon from 'sinon';
import { fillPool } from './fill_pool';
import { TaskPoolRunResult } from '../task_pool';
import { asOk } from './result_type';

describe('fillPool', () => {
test('stops filling when pool runs all claimed tasks, even if there is more capacity', async () => {
Expand All @@ -16,7 +17,7 @@ describe('fillPool', () => {
[4, 5],
];
let index = 0;
const fetchAvailableTasks = async () => tasks[index++] || [];
const fetchAvailableTasks = async () => asOk(tasks[index++] || []);
const run = sinon.spy(async () => TaskPoolRunResult.RunningAllClaimedTasks);
const converter = _.identity;

Expand All @@ -31,7 +32,7 @@ describe('fillPool', () => {
[4, 5],
];
let index = 0;
const fetchAvailableTasks = async () => tasks[index++] || [];
const fetchAvailableTasks = async () => asOk(tasks[index++] || []);
const run = sinon.spy(async () => TaskPoolRunResult.RanOutOfCapacity);
const converter = _.identity;

Expand All @@ -46,7 +47,7 @@ describe('fillPool', () => {
[4, 5],
];
let index = 0;
const fetchAvailableTasks = async () => tasks[index++] || [];
const fetchAvailableTasks = async () => asOk(tasks[index++] || []);
const run = sinon.spy(async () => TaskPoolRunResult.RanOutOfCapacity);
const converter = (x: number) => x.toString();

Expand Down Expand Up @@ -80,7 +81,7 @@ describe('fillPool', () => {
[4, 5],
];
let index = 0;
const fetchAvailableTasks = async () => tasks[index++] || [];
const fetchAvailableTasks = async () => asOk(tasks[index++] || []);

await fillPool(fetchAvailableTasks, converter, run);
} catch (err) {
Expand All @@ -95,7 +96,7 @@ describe('fillPool', () => {
[4, 5],
];
let index = 0;
const fetchAvailableTasks = async () => tasks[index++] || [];
const fetchAvailableTasks = async () => asOk(tasks[index++] || []);
const run = sinon.spy(async () => TaskPoolRunResult.RanOutOfCapacity);
const converter = (x: number) => {
throw new Error(`can not convert ${x}`);
Expand Down
62 changes: 38 additions & 24 deletions x-pack/plugins/task_manager/server/lib/fill_pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,19 @@

import { performance } from 'perf_hooks';
import { TaskPoolRunResult } from '../task_pool';
import { Result, map } from './result_type';

export enum FillPoolResult {
Failed = 'Failed',
NoAvailableWorkers = 'NoAvailableWorkers',
NoTasksClaimed = 'NoTasksClaimed',
RunningAtCapacity = 'RunningAtCapacity',
RanOutOfCapacity = 'RanOutOfCapacity',
PoolFilled = 'PoolFilled',
}

type BatchRun<T> = (tasks: T[]) => Promise<TaskPoolRunResult>;
type Fetcher<T> = () => Promise<T[]>;
type Fetcher<T, E> = () => Promise<Result<T[], E>>;
type Converter<T1, T2> = (t: T1) => T2;

/**
Expand All @@ -30,33 +34,43 @@ type Converter<T1, T2> = (t: T1) => T2;
* @param converter - a function that converts task records to the appropriate task runner
*/
export async function fillPool<TRecord, TRunner>(
fetchAvailableTasks: Fetcher<TRecord>,
fetchAvailableTasks: Fetcher<TRecord, FillPoolResult>,
converter: Converter<TRecord, TRunner>,
run: BatchRun<TRunner>
): Promise<FillPoolResult> {
performance.mark('fillPool.start');
const instances = await fetchAvailableTasks();
return map<TRecord[], FillPoolResult, Promise<FillPoolResult>>(
await fetchAvailableTasks(),
async (instances) => {
if (!instances.length) {
performance.mark('fillPool.bailNoTasks');
performance.measure(
'fillPool.activityDurationUntilNoTasks',
'fillPool.start',
'fillPool.bailNoTasks'
);
return FillPoolResult.NoTasksClaimed;
}

if (!instances.length) {
performance.mark('fillPool.bailNoTasks');
performance.measure(
'fillPool.activityDurationUntilNoTasks',
'fillPool.start',
'fillPool.bailNoTasks'
);
return FillPoolResult.NoTasksClaimed;
}
const tasks = instances.map(converter);
const tasks = instances.map(converter);

if ((await run(tasks)) === TaskPoolRunResult.RanOutOfCapacity) {
performance.mark('fillPool.bailExhaustedCapacity');
performance.measure(
'fillPool.activityDurationUntilExhaustedCapacity',
'fillPool.start',
'fillPool.bailExhaustedCapacity'
);
return FillPoolResult.RanOutOfCapacity;
}
performance.mark('fillPool.cycle');
return FillPoolResult.PoolFilled;
switch (await run(tasks)) {
case TaskPoolRunResult.RanOutOfCapacity:
performance.mark('fillPool.bailExhaustedCapacity');
performance.measure(
'fillPool.activityDurationUntilExhaustedCapacity',
'fillPool.start',
'fillPool.bailExhaustedCapacity'
);
return FillPoolResult.RanOutOfCapacity;
case TaskPoolRunResult.RunningAtCapacity:
performance.mark('fillPool.cycle');
return FillPoolResult.RunningAtCapacity;
default:
performance.mark('fillPool.cycle');
return FillPoolResult.PoolFilled;
}
},
async (result) => result
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -427,25 +427,95 @@ describe('Task Run Statistics', () => {
taskStats.map((taskStat) => taskStat.value.polling.result_frequency_percent_as_number)
).toEqual([
// NoTasksClaimed
{ NoTasksClaimed: 100, RanOutOfCapacity: 0, PoolFilled: 0 },
{
NoTasksClaimed: 100,
RanOutOfCapacity: 0,
PoolFilled: 0,
Failed: 0,
NoAvailableWorkers: 0,
RunningAtCapacity: 0,
},
// NoTasksClaimed, NoTasksClaimed,
{ NoTasksClaimed: 100, RanOutOfCapacity: 0, PoolFilled: 0 },
{
NoTasksClaimed: 100,
RanOutOfCapacity: 0,
PoolFilled: 0,
Failed: 0,
NoAvailableWorkers: 0,
RunningAtCapacity: 0,
},
// NoTasksClaimed, NoTasksClaimed, NoTasksClaimed
{ NoTasksClaimed: 100, RanOutOfCapacity: 0, PoolFilled: 0 },
{
NoTasksClaimed: 100,
RanOutOfCapacity: 0,
PoolFilled: 0,
Failed: 0,
NoAvailableWorkers: 0,
RunningAtCapacity: 0,
},
// NoTasksClaimed, NoTasksClaimed, NoTasksClaimed, PoolFilled
{ NoTasksClaimed: 75, RanOutOfCapacity: 0, PoolFilled: 25 },
{
NoTasksClaimed: 75,
RanOutOfCapacity: 0,
PoolFilled: 25,
Failed: 0,
NoAvailableWorkers: 0,
RunningAtCapacity: 0,
},
// NoTasksClaimed, NoTasksClaimed, NoTasksClaimed, PoolFilled, PoolFilled
{ NoTasksClaimed: 60, RanOutOfCapacity: 0, PoolFilled: 40 },
{
NoTasksClaimed: 60,
RanOutOfCapacity: 0,
PoolFilled: 40,
Failed: 0,
NoAvailableWorkers: 0,
RunningAtCapacity: 0,
},
// NoTasksClaimed, NoTasksClaimed, PoolFilled, PoolFilled, PoolFilled
{ NoTasksClaimed: 40, RanOutOfCapacity: 0, PoolFilled: 60 },
{
NoTasksClaimed: 40,
RanOutOfCapacity: 0,
PoolFilled: 60,
Failed: 0,
NoAvailableWorkers: 0,
RunningAtCapacity: 0,
},
// NoTasksClaimed, PoolFilled, PoolFilled, PoolFilled, RanOutOfCapacity
{ NoTasksClaimed: 20, RanOutOfCapacity: 20, PoolFilled: 60 },
{
NoTasksClaimed: 20,
RanOutOfCapacity: 20,
PoolFilled: 60,
Failed: 0,
NoAvailableWorkers: 0,
RunningAtCapacity: 0,
},
// PoolFilled, PoolFilled, PoolFilled, RanOutOfCapacity, RanOutOfCapacity
{ NoTasksClaimed: 0, RanOutOfCapacity: 40, PoolFilled: 60 },
{
NoTasksClaimed: 0,
RanOutOfCapacity: 40,
PoolFilled: 60,
Failed: 0,
NoAvailableWorkers: 0,
RunningAtCapacity: 0,
},
// PoolFilled, PoolFilled, RanOutOfCapacity, RanOutOfCapacity, NoTasksClaimed
{ NoTasksClaimed: 20, RanOutOfCapacity: 40, PoolFilled: 40 },
{
NoTasksClaimed: 20,
RanOutOfCapacity: 40,
PoolFilled: 40,
Failed: 0,
NoAvailableWorkers: 0,
RunningAtCapacity: 0,
},
// PoolFilled, RanOutOfCapacity, RanOutOfCapacity, NoTasksClaimed, NoTasksClaimed
{ NoTasksClaimed: 40, RanOutOfCapacity: 40, PoolFilled: 20 },
{
NoTasksClaimed: 40,
RanOutOfCapacity: 40,
PoolFilled: 20,
Failed: 0,
NoAvailableWorkers: 0,
RunningAtCapacity: 0,
},
]);
resolve();
} catch (e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,11 @@ export interface TaskRunStat extends JsonObject {
interface FillPoolRawStat extends JsonObject {
last_successful_poll: string;
result_frequency_percent_as_number: {
[FillPoolResult.Failed]: number;
[FillPoolResult.NoAvailableWorkers]: number;
[FillPoolResult.NoTasksClaimed]: number;
[FillPoolResult.RanOutOfCapacity]: number;
[FillPoolResult.RunningAtCapacity]: number;
[FillPoolResult.PoolFilled]: number;
};
}
Expand Down Expand Up @@ -163,8 +166,11 @@ const DEFAULT_TASK_RUN_FREQUENCIES = {
[TaskRunResult.Failed]: 0,
};
const DEFAULT_POLLING_FREQUENCIES = {
[FillPoolResult.Failed]: 0,
[FillPoolResult.NoAvailableWorkers]: 0,
[FillPoolResult.NoTasksClaimed]: 0,
[FillPoolResult.RanOutOfCapacity]: 0,
[FillPoolResult.RunningAtCapacity]: 0,
[FillPoolResult.PoolFilled]: 0,
};

Expand Down
9 changes: 5 additions & 4 deletions x-pack/plugins/task_manager/server/polling_lifecycle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import { Option, some, map as mapOptional } from 'fp-ts/lib/Option';
import { tap } from 'rxjs/operators';
import { Logger } from '../../../../src/core/server';

import { Result, asErr, mapErr } from './lib/result_type';
import { Result, asErr, mapErr, asOk } from './lib/result_type';
import { ManagedConfiguration } from './lib/create_managed_configuration';
import { TaskManagerConfig } from './config';

Expand Down Expand Up @@ -232,7 +232,7 @@ export async function claimAvailableTasks(
claim: (opts: OwnershipClaimingOpts) => Promise<ClaimOwnershipResult>,
availableWorkers: number,
logger: Logger
) {
): Promise<Result<ClaimOwnershipResult['docs'], FillPoolResult>> {
if (availableWorkers > 0) {
performance.mark('claimAvailableTasks_start');

Expand Down Expand Up @@ -260,12 +260,13 @@ export async function claimAvailableTasks(
} task(s) were fetched (${docs.map((doc) => doc.id).join(', ')})`
);
}
return docs;
return asOk(docs);
} catch (ex) {
if (identifyEsError(ex).includes('cannot execute [inline] scripts')) {
logger.warn(
`Task Manager cannot operate when inline scripts are disabled in Elasticsearch`
);
return asErr(FillPoolResult.Failed);
} else {
throw ex;
}
Expand All @@ -275,6 +276,6 @@ export async function claimAvailableTasks(
logger.debug(
`[Task Ownership]: Task Manager has skipped Claiming Ownership of available tasks at it has ran out Available Workers.`
);
return asErr(FillPoolResult.NoAvailableWorkers);
}
return [];
}
4 changes: 2 additions & 2 deletions x-pack/plugins/task_manager/server/task_pool.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ describe('TaskPool', () => {
]
`);

expect(result).toEqual(TaskPoolRunResult.RunningAllClaimedTasks);
expect(result).toEqual(TaskPoolRunResult.RunningAtCapacity);
});

test('should log when running a Task fails', async () => {
Expand Down Expand Up @@ -242,7 +242,7 @@ describe('TaskPool', () => {
},
]);

expect(result).toEqual(TaskPoolRunResult.RunningAllClaimedTasks);
expect(result).toEqual(TaskPoolRunResult.RunningAtCapacity);
expect(pool.occupiedWorkers).toEqual(2);
expect(pool.availableWorkers).toEqual(0);

Expand Down
6 changes: 6 additions & 0 deletions x-pack/plugins/task_manager/server/task_pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@ interface Opts {
}

export enum TaskPoolRunResult {
// This means we're running all the tasks we claimed
RunningAllClaimedTasks = 'RunningAllClaimedTasks',
// This means we're running all the tasks we claimed and we're at capacity
RunningAtCapacity = 'RunningAtCapacity',
// This means we're prematurely out of capacity and have accidentally claimed more tasks than we had capacity for
RanOutOfCapacity = 'RanOutOfCapacity',
}

Expand Down Expand Up @@ -123,6 +127,8 @@ export class TaskPool {
return this.attemptToRun(leftOverTasks);
}
return TaskPoolRunResult.RanOutOfCapacity;
} else if (!this.availableWorkers) {
return TaskPoolRunResult.RunningAtCapacity;
}
return TaskPoolRunResult.RunningAllClaimedTasks;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,9 @@ export default function ({ getService }: FtrProviderContext) {
expect(typeof polling.result_frequency_percent_as_number.NoTasksClaimed).to.eql('number');
expect(typeof polling.result_frequency_percent_as_number.RanOutOfCapacity).to.eql('number');
expect(typeof polling.result_frequency_percent_as_number.PoolFilled).to.eql('number');
expect(typeof polling.result_frequency_percent_as_number.NoAvailableWorkers).to.eql('number');
expect(typeof polling.result_frequency_percent_as_number.RunningAtCapacity).to.eql('number');
expect(typeof polling.result_frequency_percent_as_number.Failed).to.eql('number');

expect(typeof drift.p50).to.eql('number');
expect(typeof drift.p90).to.eql('number');
Expand Down