Skip to content

Commit

Permalink
[Response Ops][Task Manager] Resource based task scheduling (#187999)
Browse files Browse the repository at this point in the history
Resolves #185043

## Summary

### Task types can define a `cost` associated with running it

- Optional definition that defaults to `Normal` cost

### New `xpack.task_manager.capacity` setting

- Previous `xpack.task_manager.max_workers` setting is deprecated,
changed to optional, and a warning will be logged if used
- New optional `xpack.task_manager.capacity` setting is added. This
represents the number of normal cost tasks that can be run at one time.
- When `xpack.task_manager.max_workers` is defined and
`xpack.task_manager.capacity` is not defined, a deprecation warning is
logged and the value for max workers will be used as the capacity value.
- When `xpack.task_manager.capacity` is defined and
`xpack.task_manager.max_workers` is not defined, the capacity value will
be used. For the `default` claiming strategy, this capacity value will
be used as the `max_workers` value
- When both values are set, a warning will be logged and the value for
`xpack.task_manager.capacity` will be used
- When neither value is set, the `DEFAULT_CAPACITY` value will be used.

### Updates to `TaskPool` class

- Moves the logic to determine used and available capacity so that we
can switch between capacity calculators based on claim strategy. For the
`default` claim strategy, the capacity will be in units of workers. For
the `mget` claim strategy, the capacity will be in units of task cost.

### Updates to `mget` task claimer

- Updated `taskStore.fetch` call to take a new parameter that will
return a slimmer task document that excludes that task state and task
params. This will improve the I/O efficiency of returning up to 400 task
docs in one query
- Applies capacity constraint to the candidate tasks.
- Bulk gets the full task documents for the tasks we have capacity for
in order to update them to `claiming` status. Uses the
`SavedObjectsClient.bulkGet` which uses an `mget` under the hood.

### Updates the monitoring stats

- Emitting capacity config value and also capacity as translated into
workers and cost.
- Added total cost of running and overdue tasks to the health report

## Tasks for followup issues

- Update mget functional tests to include tasks with different costs. -
#189111
- Update cost of indicator match rule to be Extra Large -
#189112
- Set `xpack.task_manager.capacity` on ECH based on the node size -
#189117

---------

Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>
Co-authored-by: kibanamachine <42973632+kibanamachine@users.noreply.github.com>
  • Loading branch information
3 people authored Jul 24, 2024
1 parent e566abf commit f1af9b4
Show file tree
Hide file tree
Showing 62 changed files with 4,579 additions and 1,513 deletions.
6 changes: 3 additions & 3 deletions x-pack/plugins/task_manager/server/MONITORING.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ The root `timestamp` is the time in which the summary was exposed (either to the
Follow this step-by-step guide to make sense of the stats: https://www.elastic.co/guide/en/kibana/master/task-manager-troubleshooting.html#task-manager-diagnosing-root-cause

#### The Configuration Section
The `configuration` section summarizes Task Manager's current configuration, including dynamic configurations which change over time, such as `poll_interval` and `max_workers` which adjust in reaction to changing load on the system.
The `configuration` section summarizes Task Manager's current configuration, including dynamic configurations which change over time, such as `poll_interval` and `capacity` which adjust in reaction to changing load on the system.

These are "Hot" stats which are updated whenever a change happens in the configuration.

Expand All @@ -69,8 +69,8 @@ The `runtime` tracks Task Manager's performance as it runs, making note of task
These include:
- The time it takes a task to run (p50, p90, p95 & p99, using a configurable running average window, `50` by default)
- The average _drift_ that tasks experience (p50, p90, p95 & p99, using the same configurable running average window as above). Drift tells us how long after a task's scheduled a task typically executes.
- The average _load_ (p50, p90, p95 & p99, using the same configurable running average window as above). Load tells us what percentage of workers is in use at the end of each polling cycle.
- The polling rate (the timestamp of the last time a polling cycle completed), the polling health stats (number of version clashes and mismatches) and the result [`No tasks | Filled task pool | Unexpectedly ran out of workers`] frequency the past 50 polling cycles (using the same window size as the one used for running averages)
- The average _load_ (p50, p90, p95 & p99, using the same configurable running average window as above). Load tells us what percentage of capacity is in use at the end of each polling cycle.
- The polling rate (the timestamp of the last time a polling cycle completed), the polling health stats (number of version clashes and mismatches) and the result [`No tasks | Filled task pool | Unexpectedly ran out of capacity`] frequency the past 50 polling cycles (using the same window size as the one used for running averages)
- The `Success | Retry | Failure ratio` by task type. This is different than the workload stats which tell you what's in the queue, but ca't keep track of retries and of non recurring tasks as they're wiped off the index when completed.

These are "Hot" stats which are updated reactively as Tasks are executed and interacted with.
3 changes: 0 additions & 3 deletions x-pack/plugins/task_manager/server/config.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ describe('config validation', () => {
"warn_threshold": 5000,
},
"max_attempts": 3,
"max_workers": 10,
"metrics_reset_interval": 30000,
"monitored_aggregated_stats_refresh_rate": 60000,
"monitored_stats_health_verbose_log": Object {
Expand Down Expand Up @@ -81,7 +80,6 @@ describe('config validation', () => {
"warn_threshold": 5000,
},
"max_attempts": 3,
"max_workers": 10,
"metrics_reset_interval": 30000,
"monitored_aggregated_stats_refresh_rate": 60000,
"monitored_stats_health_verbose_log": Object {
Expand Down Expand Up @@ -137,7 +135,6 @@ describe('config validation', () => {
"warn_threshold": 5000,
},
"max_attempts": 3,
"max_workers": 10,
"metrics_reset_interval": 30000,
"monitored_aggregated_stats_refresh_rate": 60000,
"monitored_stats_health_verbose_log": Object {
Expand Down
16 changes: 11 additions & 5 deletions x-pack/plugins/task_manager/server/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
import { schema, TypeOf } from '@kbn/config-schema';

export const MAX_WORKERS_LIMIT = 100;
export const DEFAULT_CAPACITY = 10;
export const MAX_CAPACITY = 50;
export const MIN_CAPACITY = 5;
export const DEFAULT_MAX_WORKERS = 10;
export const DEFAULT_POLL_INTERVAL = 3000;
export const DEFAULT_VERSION_CONFLICT_THRESHOLD = 80;
Expand Down Expand Up @@ -64,6 +67,8 @@ const requestTimeoutsConfig = schema.object({
export const configSchema = schema.object(
{
allow_reading_invalid_state: schema.boolean({ defaultValue: true }),
/* The number of normal cost tasks that this Kibana instance will run simultaneously */
capacity: schema.maybe(schema.number({ min: MIN_CAPACITY, max: MAX_CAPACITY })),
ephemeral_tasks: schema.object({
enabled: schema.boolean({ defaultValue: false }),
/* How many requests can Task Manager buffer before it rejects new requests. */
Expand All @@ -81,11 +86,12 @@ export const configSchema = schema.object(
min: 1,
}),
/* The maximum number of tasks that this Kibana instance will run simultaneously. */
max_workers: schema.number({
defaultValue: DEFAULT_MAX_WORKERS,
// disable the task manager rather than trying to specify it with 0 workers
min: 1,
}),
max_workers: schema.maybe(
schema.number({
// disable the task manager rather than trying to specify it with 0 workers
min: 1,
})
),
/* The interval at which monotonically increasing metrics counters will reset */
metrics_reset_interval: schema.number({
defaultValue: DEFAULT_METRICS_RESET_INTERVAL,
Expand Down
29 changes: 14 additions & 15 deletions x-pack/plugins/task_manager/server/ephemeral_task_lifecycle.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import { v4 as uuidv4 } from 'uuid';
import { asTaskPollingCycleEvent, asTaskRunEvent, TaskPersistence } from './task_events';
import { TaskRunResult } from './task_running';
import { TaskPoolRunResult } from './task_pool';
import { TaskPoolMock } from './task_pool.mock';
import { TaskPoolMock } from './task_pool/task_pool.mock';
import { executionContextServiceMock } from '@kbn/core/server/mocks';
import { taskManagerMock } from './mocks';

Expand All @@ -45,7 +45,6 @@ describe('EphemeralTaskLifecycle', () => {
definitions: new TaskTypeDictionary(taskManagerLogger),
executionContext,
config: {
max_workers: 10,
max_attempts: 9,
poll_interval: 6000000,
version_conflict_threshold: 80,
Expand Down Expand Up @@ -156,7 +155,7 @@ describe('EphemeralTaskLifecycle', () => {
expect(ephemeralTaskLifecycle.attemptToRun(task)).toMatchObject(asOk(task));

poolCapacity.mockReturnValue({
availableWorkers: 10,
availableCapacity: 10,
});

lifecycleEvent$.next(
Expand All @@ -179,7 +178,7 @@ describe('EphemeralTaskLifecycle', () => {
expect(ephemeralTaskLifecycle.attemptToRun(task)).toMatchObject(asOk(task));

poolCapacity.mockReturnValue({
availableWorkers: 10,
availableCapacity: 10,
});

lifecycleEvent$.next(
Expand Down Expand Up @@ -216,7 +215,7 @@ describe('EphemeralTaskLifecycle', () => {
expect(ephemeralTaskLifecycle.attemptToRun(tasks[2])).toMatchObject(asOk(tasks[2]));

poolCapacity.mockReturnValue({
availableWorkers: 2,
availableCapacity: 2,
});

lifecycleEvent$.next(
Expand Down Expand Up @@ -256,9 +255,9 @@ describe('EphemeralTaskLifecycle', () => {

// pool has capacity for both
poolCapacity.mockReturnValue({
availableWorkers: 10,
availableCapacity: 10,
});
pool.getOccupiedWorkersByType.mockReturnValue(0);
pool.getUsedCapacityByType.mockReturnValue(0);

lifecycleEvent$.next(
asTaskPollingCycleEvent(asOk({ result: FillPoolResult.NoTasksClaimed }))
Expand Down Expand Up @@ -296,10 +295,10 @@ describe('EphemeralTaskLifecycle', () => {

// pool has capacity in general
poolCapacity.mockReturnValue({
availableWorkers: 2,
availableCapacity: 2,
});
// but when we ask how many it has occupied by type - wee always have one worker already occupied by that type
pool.getOccupiedWorkersByType.mockReturnValue(1);
pool.getUsedCapacityByType.mockReturnValue(1);

lifecycleEvent$.next(
asTaskPollingCycleEvent(asOk({ result: FillPoolResult.NoTasksClaimed }))
Expand All @@ -308,7 +307,7 @@ describe('EphemeralTaskLifecycle', () => {
expect(pool.run).toHaveBeenCalledTimes(0);

// now we release the worker in the pool and cause another cycle in the epheemral queue
pool.getOccupiedWorkersByType.mockReturnValue(0);
pool.getUsedCapacityByType.mockReturnValue(0);
lifecycleEvent$.next(
asTaskPollingCycleEvent(asOk({ result: FillPoolResult.NoTasksClaimed }))
);
Expand Down Expand Up @@ -356,9 +355,9 @@ describe('EphemeralTaskLifecycle', () => {

// pool has capacity for all
poolCapacity.mockReturnValue({
availableWorkers: 10,
availableCapacity: 10,
});
pool.getOccupiedWorkersByType.mockReturnValue(0);
pool.getUsedCapacityByType.mockReturnValue(0);

lifecycleEvent$.next(asTaskPollingCycleEvent(asOk({ result: FillPoolResult.NoTasksClaimed })));

Expand Down Expand Up @@ -389,19 +388,19 @@ describe('EphemeralTaskLifecycle', () => {

expect(ephemeralTaskLifecycle.queuedTasks).toBe(3);
poolCapacity.mockReturnValue({
availableWorkers: 1,
availableCapacity: 1,
});
lifecycleEvent$.next(asTaskPollingCycleEvent(asOk({ result: FillPoolResult.NoTasksClaimed })));
expect(ephemeralTaskLifecycle.queuedTasks).toBe(2);

poolCapacity.mockReturnValue({
availableWorkers: 1,
availableCapacity: 1,
});
lifecycleEvent$.next(asTaskPollingCycleEvent(asOk({ result: FillPoolResult.NoTasksClaimed })));
expect(ephemeralTaskLifecycle.queuedTasks).toBe(1);

poolCapacity.mockReturnValue({
availableWorkers: 1,
availableCapacity: 1,
});
lifecycleEvent$.next(asTaskPollingCycleEvent(asOk({ result: FillPoolResult.NoTasksClaimed })));
expect(ephemeralTaskLifecycle.queuedTasks).toBe(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,13 +143,13 @@ export class EphemeralTaskLifecycle {
taskType && this.definitions.get(taskType)?.maxConcurrency
? Math.max(
Math.min(
this.pool.availableWorkers,
this.pool.availableCapacity(),
this.definitions.get(taskType)!.maxConcurrency! -
this.pool.getOccupiedWorkersByType(taskType)
this.pool.getUsedCapacityByType(taskType)
),
0
)
: this.pool.availableWorkers;
: this.pool.availableCapacity();

private emitEvent = (event: TaskLifecycleEvent) => {
this.events$.next(event);
Expand Down
7 changes: 4 additions & 3 deletions x-pack/plugins/task_manager/server/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,6 @@ export type {

export const config: PluginConfigDescriptor<TaskManagerConfig> = {
schema: configSchema,
exposeToUsage: {
max_workers: true,
},
deprecations: ({ deprecate }) => {
return [
deprecate('ephemeral_tasks.enabled', 'a future version', {
Expand All @@ -68,6 +65,10 @@ export const config: PluginConfigDescriptor<TaskManagerConfig> = {
level: 'warning',
message: `Configuring "xpack.task_manager.ephemeral_tasks.request_capacity" is deprecated and will be removed in a future version. Remove this setting to increase task execution resiliency.`,
}),
deprecate('max_workers', 'a future version', {
level: 'warning',
message: `Configuring "xpack.task_manager.max_workers" is deprecated and will be removed in a future version. Remove this setting and use "xpack.task_manager.capacity" instead.`,
}),
(settings, fromPath, addDeprecation) => {
const taskManager = get(settings, fromPath);
if (taskManager?.index) {
Expand Down
Loading

0 comments on commit f1af9b4

Please sign in to comment.