Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[7.11] [Alerting] Shift polling interval by random amount when Task Manager experiences consistent claim version conflicts (#88020) #88114

Merged
merged 1 commit into from
Jan 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion x-pack/plugins/task_manager/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ The task_manager can be configured via `taskManager` config options (e.g. `taskM
- `max_poll_inactivity_cycles` - How many poll intervals is work allowed to block polling for before it's timed out. This does not include task execution, as task execution does not block the polling, but rather includes work needed to manage Task Manager's state.
- `index` - **deprecated** The name of the index that the task_manager will use. This is deprecated, and will be removed starting in 8.0
- `max_workers` - The maximum number of tasks a Kibana will run concurrently (defaults to 10)
- `version_conflict_threshold` - The threshold percentage for workers experiencing version conflicts for shifting the polling interval
- `credentials` - Encrypted user credentials. All tasks will run in the security context of this user. See [this issue](https://github.com/elastic/dev/issues/1045) for a discussion on task scheduler security.
- `override_num_workers`: An object of `taskType: number` that overrides the `num_workers` for tasks
- For example: `task_manager.override_num_workers.reporting: 2` would override the number of workers occupied by tasks of type `reporting`
Expand Down Expand Up @@ -521,4 +522,4 @@ The task manager's public API is create / delete / list. Updates aren't directly

Task Manager exposes runtime statistics which enable basic observability into its inner workings and makes it possible to monitor the system from external services.

Learn More: [./MONITORING](./MONITORING.MD)
Learn More: [./MONITORING](./MONITORING.MD)
2 changes: 2 additions & 0 deletions x-pack/plugins/task_manager/server/MONITORING.md
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,8 @@ For example, if you _curl_ the `/api/task_manager/_health` endpoint, you might g
"polling": {
/* When was the last polling cycle? */
"last_successful_poll": "2020-10-05T17:57:55.411Z",
/* When was the last time Task Manager adjusted it's polling delay? */
"last_polling_delay": "2020-10-05T17:57:55.411Z",
/* Running average of polling duration measuring the time from the scheduled polling cycle
start until all claimed tasks are marked as running */
"duration": {
Expand Down
3 changes: 3 additions & 0 deletions x-pack/plugins/task_manager/server/config.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ describe('config validation', () => {
},
"poll_interval": 3000,
"request_capacity": 1000,
"version_conflict_threshold": 80,
}
`);
});
Expand Down Expand Up @@ -74,6 +75,7 @@ describe('config validation', () => {
},
"poll_interval": 3000,
"request_capacity": 1000,
"version_conflict_threshold": 80,
}
`);
});
Expand Down Expand Up @@ -113,6 +115,7 @@ describe('config validation', () => {
},
"poll_interval": 3000,
"request_capacity": 1000,
"version_conflict_threshold": 80,
}
`);
});
Expand Down
7 changes: 7 additions & 0 deletions x-pack/plugins/task_manager/server/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ export const MAX_WORKERS_LIMIT = 100;
export const DEFAULT_MAX_WORKERS = 10;
export const DEFAULT_POLL_INTERVAL = 3000;
export const DEFAULT_MAX_POLL_INACTIVITY_CYCLES = 10;
export const DEFAULT_VERSION_CONFLICT_THRESHOLD = 80;

// Monitoring Constants
// ===================
Expand Down Expand Up @@ -76,6 +77,12 @@ export const configSchema = schema.object(
// disable the task manager rather than trying to specify it with 0 workers
min: 1,
}),
/* The threshold percenatge for workers experiencing version conflicts for shifting the polling interval. */
version_conflict_threshold: schema.number({
defaultValue: DEFAULT_VERSION_CONFLICT_THRESHOLD,
min: 50,
max: 100,
}),
/* The rate at which we emit fresh monitored stats. By default we'll use the poll_interval (+ a slight buffer) */
monitored_stats_required_freshness: schema.number({
defaultValue: (config?: unknown) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ describe('managed configuration', () => {
index: 'foo',
max_attempts: 9,
poll_interval: 3000,
version_conflict_threshold: 80,
max_poll_inactivity_cycles: 10,
monitored_aggregated_stats_refresh_rate: 60000,
monitored_stats_required_freshness: 4000,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ describe('Configuration Statistics Aggregator', () => {
index: 'foo',
max_attempts: 9,
poll_interval: 6000000,
version_conflict_threshold: 80,
monitored_stats_required_freshness: 6000000,
max_poll_inactivity_cycles: 10,
request_capacity: 1000,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ describe('createMonitoringStatsStream', () => {
index: 'foo',
max_attempts: 9,
poll_interval: 6000000,
version_conflict_threshold: 80,
monitored_stats_required_freshness: 6000000,
max_poll_inactivity_cycles: 10,
request_capacity: 1000,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
* you may not use this file except in compliance with the Elastic License.
*/

import { combineLatest, Observable } from 'rxjs';
import { combineLatest, merge, Observable, of } from 'rxjs';
import { filter, startWith, map } from 'rxjs/operators';
import { JsonObject } from 'src/plugins/kibana_utils/common';
import { isNumber, mapValues } from 'lodash';
Expand Down Expand Up @@ -36,6 +36,7 @@ import { TaskExecutionFailureThreshold, TaskManagerConfig } from '../config';

interface FillPoolStat extends JsonObject {
last_successful_poll: string;
last_polling_delay: string;
duration: number[];
claim_conflicts: number[];
claim_mismatches: number[];
Expand All @@ -51,11 +52,13 @@ export interface TaskRunStat extends JsonObject {
drift: number[];
load: number[];
execution: ExecutionStat;
polling: FillPoolStat | Omit<FillPoolStat, 'last_successful_poll'>;
polling: Omit<FillPoolStat, 'last_successful_poll' | 'last_polling_delay'> &
Pick<Partial<FillPoolStat>, 'last_successful_poll' | 'last_polling_delay'>;
}

interface FillPoolRawStat extends JsonObject {
last_successful_poll: string;
last_polling_delay: string;
result_frequency_percent_as_number: {
[FillPoolResult.Failed]: number;
[FillPoolResult.NoAvailableWorkers]: number;
Expand Down Expand Up @@ -123,37 +126,61 @@ export function createTaskRunAggregator(
const pollingDurationQueue = createRunningAveragedStat<number>(runningAverageWindowSize);
const claimConflictsQueue = createRunningAveragedStat<number>(runningAverageWindowSize);
const claimMismatchesQueue = createRunningAveragedStat<number>(runningAverageWindowSize);
const taskPollingEvents$: Observable<
Pick<TaskRunStat, 'polling'>
> = taskPollingLifecycle.events.pipe(
filter(
(taskEvent: TaskLifecycleEvent) =>
isTaskPollingCycleEvent(taskEvent) && isOk<ClaimAndFillPoolResult, unknown>(taskEvent.event)
const taskPollingEvents$: Observable<Pick<TaskRunStat, 'polling'>> = combineLatest([
// get latest polling stats
taskPollingLifecycle.events.pipe(
filter(
(taskEvent: TaskLifecycleEvent) =>
isTaskPollingCycleEvent(taskEvent) &&
isOk<ClaimAndFillPoolResult, unknown>(taskEvent.event)
),
map((taskEvent: TaskLifecycleEvent) => {
const {
result,
stats: { tasksClaimed, tasksUpdated, tasksConflicted } = {},
} = ((taskEvent.event as unknown) as Ok<ClaimAndFillPoolResult>).value;
const duration = (taskEvent?.timing?.stop ?? 0) - (taskEvent?.timing?.start ?? 0);
return {
polling: {
last_successful_poll: new Date().toISOString(),
// Track how long the polling cycle took from begining until all claimed tasks were marked as running
duration: duration ? pollingDurationQueue(duration) : pollingDurationQueue(),
// Track how many version conflicts occured during polling
claim_conflicts: isNumber(tasksConflicted)
? claimConflictsQueue(tasksConflicted)
: claimConflictsQueue(),
// Track how much of a mismatch there is between claimed and updated
claim_mismatches:
isNumber(tasksClaimed) && isNumber(tasksUpdated)
? claimMismatchesQueue(tasksUpdated - tasksClaimed)
: claimMismatchesQueue(),
result_frequency_percent_as_number: resultFrequencyQueue(result),
},
};
})
),
map((taskEvent: TaskLifecycleEvent) => {
const {
result,
stats: { tasksClaimed, tasksUpdated, tasksConflicted } = {},
} = ((taskEvent.event as unknown) as Ok<ClaimAndFillPoolResult>).value;
const duration = (taskEvent?.timing?.stop ?? 0) - (taskEvent?.timing?.start ?? 0);
return {
polling: {
last_successful_poll: new Date().toISOString(),
// Track how long the polling cycle took from begining until all claimed tasks were marked as running
duration: duration ? pollingDurationQueue(duration) : pollingDurationQueue(),
// Track how many version conflicts occured during polling
claim_conflicts: isNumber(tasksConflicted)
? claimConflictsQueue(tasksConflicted)
: claimConflictsQueue(),
// Track how much of a mismatch there is between claimed and updated
claim_mismatches:
isNumber(tasksClaimed) && isNumber(tasksUpdated)
? claimMismatchesQueue(tasksUpdated - tasksClaimed)
: claimMismatchesQueue(),
result_frequency_percent_as_number: resultFrequencyQueue(result),
},
};
})
// get DateTime of latest polling delay refresh
merge(
/**
* as `combineLatest` hangs until it has its first value and we're not likely to reconfigure the delay in normal deployments, we needed some initial value.
I've used _now_ (`new Date().toISOString()`) as it made the most sense (it would be the time Kibana started), but it _could_ be confusing in the future.
*/
of(new Date().toISOString()),
taskPollingLifecycle.events.pipe(
filter(
(taskEvent: TaskLifecycleEvent) =>
isTaskManagerStatEvent(taskEvent) && taskEvent.id === 'pollingDelay'
),
map(() => new Date().toISOString())
)
),
]).pipe(
map(([{ polling }, pollingDelay]) => ({
polling: {
last_polling_delay: pollingDelay,
...polling,
},
}))
);

return combineLatest([
Expand Down Expand Up @@ -234,6 +261,8 @@ export function summarizeTaskRunStat(
polling: {
// eslint-disable-next-line @typescript-eslint/naming-convention
last_successful_poll,
// eslint-disable-next-line @typescript-eslint/naming-convention
last_polling_delay,
duration: pollingDuration,
result_frequency_percent_as_number: pollingResultFrequency,
claim_conflicts: claimConflicts,
Expand All @@ -249,6 +278,7 @@ export function summarizeTaskRunStat(
value: {
polling: {
...(last_successful_poll ? { last_successful_poll } : {}),
...(last_polling_delay ? { last_polling_delay } : {}),
duration: calculateRunningAverage(pollingDuration as number[]),
claim_conflicts: calculateRunningAverage(claimConflicts as number[]),
claim_mismatches: calculateRunningAverage(claimMismatches as number[]),
Expand Down
2 changes: 2 additions & 0 deletions x-pack/plugins/task_manager/server/plugin.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ describe('TaskManagerPlugin', () => {
index: 'foo',
max_attempts: 9,
poll_interval: 3000,
version_conflict_threshold: 80,
max_poll_inactivity_cycles: 10,
request_capacity: 1000,
monitored_aggregated_stats_refresh_rate: 5000,
Expand Down Expand Up @@ -49,6 +50,7 @@ describe('TaskManagerPlugin', () => {
index: 'foo',
max_attempts: 9,
poll_interval: 3000,
version_conflict_threshold: 80,
max_poll_inactivity_cycles: 10,
request_capacity: 1000,
monitored_aggregated_stats_refresh_rate: 5000,
Expand Down
Loading