diff --git a/src/dev/build/tasks/os_packages/docker_generator/resources/base/bin/kibana-docker b/src/dev/build/tasks/os_packages/docker_generator/resources/base/bin/kibana-docker index 3123aa7f391dfb..7eaae8b09629f4 100755 --- a/src/dev/build/tasks/os_packages/docker_generator/resources/base/bin/kibana-docker +++ b/src/dev/build/tasks/os_packages/docker_generator/resources/base/bin/kibana-docker @@ -413,6 +413,7 @@ kibana_vars=( xpack.task_manager.version_conflict_threshold xpack.task_manager.event_loop_delay.monitor xpack.task_manager.event_loop_delay.warn_threshold + xpack.task_manager.worker_utilization_running_average_window xpack.uptime.index serverless ) diff --git a/x-pack/plugins/task_manager/server/config.test.ts b/x-pack/plugins/task_manager/server/config.test.ts index 54e9562ec62fb3..9e1e89e68624b3 100644 --- a/x-pack/plugins/task_manager/server/config.test.ts +++ b/x-pack/plugins/task_manager/server/config.test.ts @@ -44,6 +44,7 @@ describe('config validation', () => { "exclude_task_types": Array [], }, "version_conflict_threshold": 80, + "worker_utilization_running_average_window": 5, } `); }); @@ -95,6 +96,7 @@ describe('config validation', () => { "exclude_task_types": Array [], }, "version_conflict_threshold": 80, + "worker_utilization_running_average_window": 5, } `); }); @@ -149,6 +151,7 @@ describe('config validation', () => { "exclude_task_types": Array [], }, "version_conflict_threshold": 80, + "worker_utilization_running_average_window": 5, } `); }); diff --git a/x-pack/plugins/task_manager/server/config.ts b/x-pack/plugins/task_manager/server/config.ts index 40b915e8979a2d..ae6c7a9fb41eb9 100644 --- a/x-pack/plugins/task_manager/server/config.ts +++ b/x-pack/plugins/task_manager/server/config.ts @@ -18,9 +18,12 @@ export const DEFAULT_MAX_EPHEMERAL_REQUEST_CAPACITY = MAX_WORKERS_LIMIT; // =================== // Refresh aggregated monitored stats at a default rate of once a minute export const DEFAULT_MONITORING_REFRESH_RATE = 60 * 1000; -export const DEFAULT_MONITORING_STATS_RUNNING_AVERGAE_WINDOW = 50; +export const DEFAULT_MONITORING_STATS_RUNNING_AVERAGE_WINDOW = 50; export const DEFAULT_MONITORING_STATS_WARN_DELAYED_TASK_START_IN_SECONDS = 60; +// At the default poll interval of 3sec, this averages over the last 15sec. +export const DEFAULT_WORKER_UTILIZATION_RUNNING_AVERAGE_WINDOW = 5; + export const taskExecutionFailureThresholdSchema = schema.object( { error_threshold: schema.number({ @@ -98,7 +101,7 @@ export const configSchema = schema.object( }), /* The size of the running average window for monitored stats. */ monitored_stats_running_average_window: schema.number({ - defaultValue: DEFAULT_MONITORING_STATS_RUNNING_AVERGAE_WINDOW, + defaultValue: DEFAULT_MONITORING_STATS_RUNNING_AVERAGE_WINDOW, max: 100, min: 10, }), @@ -130,6 +133,11 @@ export const configSchema = schema.object( }), }), event_loop_delay: eventLoopDelaySchema, + worker_utilization_running_average_window: schema.number({ + defaultValue: DEFAULT_WORKER_UTILIZATION_RUNNING_AVERAGE_WINDOW, + max: 100, + min: 1, + }), /* These are not designed to be used by most users. Please use caution when changing these */ unsafe: schema.object({ exclude_task_types: schema.arrayOf(schema.string(), { defaultValue: [] }), diff --git a/x-pack/plugins/task_manager/server/ephemeral_task_lifecycle.test.ts b/x-pack/plugins/task_manager/server/ephemeral_task_lifecycle.test.ts index 374a52ea84b2f8..c2c0f6e3aceed4 100644 --- a/x-pack/plugins/task_manager/server/ephemeral_task_lifecycle.test.ts +++ b/x-pack/plugins/task_manager/server/ephemeral_task_lifecycle.test.ts @@ -77,6 +77,7 @@ describe('EphemeralTaskLifecycle', () => { monitor: true, warn_threshold: 5000, }, + worker_utilization_running_average_window: 5, ...config, }, elasticsearchAndSOAvailability$, diff --git a/x-pack/plugins/task_manager/server/integration_tests/managed_configuration.test.ts b/x-pack/plugins/task_manager/server/integration_tests/managed_configuration.test.ts index 308aa9f556797f..db8981a186ab4c 100644 --- a/x-pack/plugins/task_manager/server/integration_tests/managed_configuration.test.ts +++ b/x-pack/plugins/task_manager/server/integration_tests/managed_configuration.test.ts @@ -71,6 +71,7 @@ describe('managed configuration', () => { monitor: true, warn_threshold: 5000, }, + worker_utilization_running_average_window: 5, }); logger = context.logger.get('taskManager'); diff --git a/x-pack/plugins/task_manager/server/monitoring/background_task_utilization_statistics.test.ts b/x-pack/plugins/task_manager/server/monitoring/background_task_utilization_statistics.test.ts index 45b29c44b1a4b5..cdd67a07ff9e71 100644 --- a/x-pack/plugins/task_manager/server/monitoring/background_task_utilization_statistics.test.ts +++ b/x-pack/plugins/task_manager/server/monitoring/background_task_utilization_statistics.test.ts @@ -9,7 +9,13 @@ import { v4 as uuidv4 } from 'uuid'; import { Subject, Observable } from 'rxjs'; import { take, bufferCount, skip, map } from 'rxjs/operators'; import { ConcreteTaskInstance, TaskStatus } from '../task'; -import { asTaskRunEvent, TaskTiming, TaskPersistence } from '../task_events'; +import { + asTaskRunEvent, + TaskTiming, + TaskPersistence, + asTaskManagerStatEvent, + TaskManagerStats, +} from '../task_events'; import { asOk } from '../lib/result_type'; import { TaskLifecycleEvent } from '../polling_lifecycle'; import { TaskRunResult } from '../task_running'; @@ -18,9 +24,10 @@ import { taskPollingLifecycleMock } from '../polling_lifecycle.mock'; import { BackgroundTaskUtilizationStat, createBackgroundTaskUtilizationAggregator, + summarizeUtilizationStats, } from './background_task_utilization_statistics'; import { AdHocTaskCounter } from '../lib/adhoc_task_counter'; -import { sum } from 'lodash'; +import { sum, mean } from 'lodash'; describe('Task Run Statistics', () => { const pollInterval = 3000; @@ -29,404 +36,586 @@ describe('Task Run Statistics', () => { jest.resetAllMocks(); }); - test('returns a running count of adhoc actual service_time', async () => { - const serviceTimes = [1000, 2000, 500, 300, 400, 15000, 20000, 200]; - const events$ = new Subject(); - const taskPollingLifecycle = taskPollingLifecycleMock.create({ - events$: events$ as Observable, - }); - const adHocTaskCounter = new AdHocTaskCounter(); - - const runningAverageWindowSize = 5; - const BackgroundTaskUtilizationAggregator = createBackgroundTaskUtilizationAggregator( - taskPollingLifecycle, - runningAverageWindowSize, - adHocTaskCounter, - pollInterval - ); - - function expectWindowEqualsUpdate( - taskStat: AggregatedStat, - window: number[] - ) { - expect(taskStat.value.adhoc.ran.service_time.actual).toEqual(sum(window)); - } - - return new Promise((resolve) => { - const events = []; - const now = Date.now(); - for (const time of serviceTimes) { - events.push({ start: runAtMillisecondsAgo(now, time).getTime(), stop: now }); + describe('createBackgroundTaskUtilizationAggregator', () => { + test('returns a running count of adhoc actual service_time', async () => { + const serviceTimes = [1000, 2000, 500, 300, 400, 15000, 20000, 200]; + const events$ = new Subject(); + const taskPollingLifecycle = taskPollingLifecycleMock.create({ + events$: events$ as Observable, + }); + const adHocTaskCounter = new AdHocTaskCounter(); + + const BackgroundTaskUtilizationAggregator = createBackgroundTaskUtilizationAggregator( + taskPollingLifecycle, + adHocTaskCounter, + pollInterval + ); + + function expectWindowEqualsUpdate( + taskStat: AggregatedStat, + window: number[] + ) { + expect(taskStat.value.adhoc.ran.service_time.actual).toEqual(sum(window)); } - BackgroundTaskUtilizationAggregator.pipe( - // skip initial stat which is just initialized data which - // ensures we don't stall on combineLatest - skip(1), - // Use 'summarizeUtilizationStat' to receive summarize stats - map(({ key, value }: AggregatedStat) => ({ - key, - value, - })), - take(serviceTimes.length), - bufferCount(serviceTimes.length) - ).subscribe((taskStats: Array>) => { - expectWindowEqualsUpdate(taskStats[0], serviceTimes.slice(0, 1)); - expectWindowEqualsUpdate(taskStats[1], serviceTimes.slice(0, 2)); - expectWindowEqualsUpdate(taskStats[2], serviceTimes.slice(0, 3)); - expectWindowEqualsUpdate(taskStats[3], serviceTimes.slice(0, 4)); - expectWindowEqualsUpdate(taskStats[4], serviceTimes.slice(0, 5)); - // from the 6th value, begin to drop old values as out window is 5 - expectWindowEqualsUpdate(taskStats[5], serviceTimes.slice(0, 6)); - expectWindowEqualsUpdate(taskStats[6], serviceTimes.slice(0, 7)); - expectWindowEqualsUpdate(taskStats[7], serviceTimes.slice(0, 8)); - resolve(); + + return new Promise((resolve) => { + const events = []; + const now = Date.now(); + for (const time of serviceTimes) { + events.push({ start: runAtMillisecondsAgo(now, time).getTime(), stop: now }); + } + BackgroundTaskUtilizationAggregator.pipe( + // skip initial stat which is just initialized data which + // ensures we don't stall on combineLatest + skip(1), + // Use 'summarizeUtilizationStat' to receive summarize stats + map(({ key, value }: AggregatedStat) => ({ + key, + value, + })), + take(serviceTimes.length), + bufferCount(serviceTimes.length) + ).subscribe((taskStats: Array>) => { + expectWindowEqualsUpdate(taskStats[0], serviceTimes.slice(0, 1)); + expectWindowEqualsUpdate(taskStats[1], serviceTimes.slice(0, 2)); + expectWindowEqualsUpdate(taskStats[2], serviceTimes.slice(0, 3)); + expectWindowEqualsUpdate(taskStats[3], serviceTimes.slice(0, 4)); + expectWindowEqualsUpdate(taskStats[4], serviceTimes.slice(0, 5)); + // from the 6th value, begin to drop old values as out window is 5 + expectWindowEqualsUpdate(taskStats[5], serviceTimes.slice(0, 6)); + expectWindowEqualsUpdate(taskStats[6], serviceTimes.slice(0, 7)); + expectWindowEqualsUpdate(taskStats[7], serviceTimes.slice(0, 8)); + resolve(); + }); + + for (const event of events) { + events$.next(mockTaskRunEvent({}, event)); + } }); + }); - for (const event of events) { - events$.next(mockTaskRunEvent({}, event)); + test('returns a running count of adhoc adjusted service_time', async () => { + const serviceTimes = [1000, 2000, 500, 300, 400, 15000, 20000, 200]; + const events$ = new Subject(); + const taskPollingLifecycle = taskPollingLifecycleMock.create({ + events$: events$ as Observable, + }); + const adHocTaskCounter = new AdHocTaskCounter(); + + const BackgroundTaskUtilizationAggregator = createBackgroundTaskUtilizationAggregator( + taskPollingLifecycle, + adHocTaskCounter, + pollInterval + ); + + function expectWindowEqualsUpdate( + taskStat: AggregatedStat, + window: number[] + ) { + expect(taskStat.value.adhoc.ran.service_time.adjusted).toEqual(sum(window)); } - }); - }); - test('returns a running count of adhoc adjusted service_time', async () => { - const serviceTimes = [1000, 2000, 500, 300, 400, 15000, 20000, 200]; - const events$ = new Subject(); - const taskPollingLifecycle = taskPollingLifecycleMock.create({ - events$: events$ as Observable, + return new Promise((resolve) => { + const events = []; + const now = Date.now(); + for (const time of serviceTimes) { + events.push({ start: runAtMillisecondsAgo(now, time).getTime(), stop: now }); + } + BackgroundTaskUtilizationAggregator.pipe( + // skip initial stat which is just initialized data which + // ensures we don't stall on combineLatest + skip(1), + // Use 'summarizeUtilizationStat' to receive summarize stats + map(({ key, value }: AggregatedStat) => ({ + key, + value, + })), + take(serviceTimes.length), + bufferCount(serviceTimes.length) + ).subscribe((taskStats: Array>) => { + expectWindowEqualsUpdate(taskStats[0], roundUpToNearestSec(serviceTimes.slice(0, 1), 3)); + expectWindowEqualsUpdate(taskStats[1], roundUpToNearestSec(serviceTimes.slice(0, 2), 3)); + expectWindowEqualsUpdate(taskStats[2], roundUpToNearestSec(serviceTimes.slice(0, 3), 3)); + expectWindowEqualsUpdate(taskStats[3], roundUpToNearestSec(serviceTimes.slice(0, 4), 3)); + expectWindowEqualsUpdate(taskStats[4], roundUpToNearestSec(serviceTimes.slice(0, 5), 3)); + // from the 6th value, begin to drop old values as out window is 5 + expectWindowEqualsUpdate(taskStats[5], roundUpToNearestSec(serviceTimes.slice(0, 6), 3)); + expectWindowEqualsUpdate(taskStats[6], roundUpToNearestSec(serviceTimes.slice(0, 7), 3)); + expectWindowEqualsUpdate(taskStats[7], roundUpToNearestSec(serviceTimes.slice(0, 8), 3)); + resolve(); + }); + + for (const event of events) { + events$.next(mockTaskRunEvent({}, event)); + } + }); }); - const adHocTaskCounter = new AdHocTaskCounter(); - - const runningAverageWindowSize = 5; - const BackgroundTaskUtilizationAggregator = createBackgroundTaskUtilizationAggregator( - taskPollingLifecycle, - runningAverageWindowSize, - adHocTaskCounter, - pollInterval - ); - - function expectWindowEqualsUpdate( - taskStat: AggregatedStat, - window: number[] - ) { - expect(taskStat.value.adhoc.ran.service_time.adjusted).toEqual(sum(window)); - } - - return new Promise((resolve) => { - const events = []; - const now = Date.now(); - for (const time of serviceTimes) { - events.push({ start: runAtMillisecondsAgo(now, time).getTime(), stop: now }); + + test('returns a running count of adhoc task_counter', async () => { + const tasks = [0, 0, 0, 0, 0, 0, 0, 0]; + const events$ = new Subject(); + const taskPollingLifecycle = taskPollingLifecycleMock.create({ + events$: events$ as Observable, + }); + const adHocTaskCounter = new AdHocTaskCounter(); + + const BackgroundTaskUtilizationAggregator = createBackgroundTaskUtilizationAggregator( + taskPollingLifecycle, + adHocTaskCounter, + pollInterval + ); + + function expectWindowEqualsUpdate( + taskStat: AggregatedStat, + window: number[] + ) { + expect(taskStat.value.adhoc.ran.service_time.task_counter).toEqual(window.length); } - BackgroundTaskUtilizationAggregator.pipe( - // skip initial stat which is just initialized data which - // ensures we don't stall on combineLatest - skip(1), - // Use 'summarizeUtilizationStat' to receive summarize stats - map(({ key, value }: AggregatedStat) => ({ - key, - value, - })), - take(serviceTimes.length), - bufferCount(serviceTimes.length) - ).subscribe((taskStats: Array>) => { - expectWindowEqualsUpdate(taskStats[0], roundUpToNearestSec(serviceTimes.slice(0, 1), 3)); - expectWindowEqualsUpdate(taskStats[1], roundUpToNearestSec(serviceTimes.slice(0, 2), 3)); - expectWindowEqualsUpdate(taskStats[2], roundUpToNearestSec(serviceTimes.slice(0, 3), 3)); - expectWindowEqualsUpdate(taskStats[3], roundUpToNearestSec(serviceTimes.slice(0, 4), 3)); - expectWindowEqualsUpdate(taskStats[4], roundUpToNearestSec(serviceTimes.slice(0, 5), 3)); - // from the 6th value, begin to drop old values as out window is 5 - expectWindowEqualsUpdate(taskStats[5], roundUpToNearestSec(serviceTimes.slice(0, 6), 3)); - expectWindowEqualsUpdate(taskStats[6], roundUpToNearestSec(serviceTimes.slice(0, 7), 3)); - expectWindowEqualsUpdate(taskStats[7], roundUpToNearestSec(serviceTimes.slice(0, 8), 3)); - resolve(); + + return new Promise((resolve) => { + BackgroundTaskUtilizationAggregator.pipe( + // skip initial stat which is just initialized data which + // ensures we don't stall on combineLatest + skip(1), + // Use 'summarizeUtilizationStat' to receive summarize stats + map(({ key, value }: AggregatedStat) => ({ + key, + value, + })), + take(tasks.length), + bufferCount(tasks.length) + ).subscribe((taskStats: Array>) => { + expectWindowEqualsUpdate(taskStats[0], tasks.slice(0, 1)); + expectWindowEqualsUpdate(taskStats[1], tasks.slice(0, 2)); + expectWindowEqualsUpdate(taskStats[2], tasks.slice(0, 3)); + expectWindowEqualsUpdate(taskStats[3], tasks.slice(0, 4)); + expectWindowEqualsUpdate(taskStats[4], tasks.slice(0, 5)); + // from the 6th value, begin to drop old values as out window is 5 + expectWindowEqualsUpdate(taskStats[5], tasks.slice(0, 6)); + expectWindowEqualsUpdate(taskStats[6], tasks.slice(0, 7)); + expectWindowEqualsUpdate(taskStats[7], tasks.slice(0, 8)); + resolve(); + }); + + for (const task of tasks) { + events$.next(mockTaskRunEvent({}, { start: task, stop: task })); + } }); + }); - for (const event of events) { - events$.next(mockTaskRunEvent({}, event)); + test('returns a running count of adhoc created counter', async () => { + const tasks = [1000, 2000, 500, 300, 400, 15000, 20000, 200]; + const events$ = new Subject(); + const taskPollingLifecycle = taskPollingLifecycleMock.create({ + events$: events$ as Observable, + }); + const adHocTaskCounter = new AdHocTaskCounter(); + + const BackgroundTaskUtilizationAggregator = createBackgroundTaskUtilizationAggregator( + taskPollingLifecycle, + adHocTaskCounter, + pollInterval + ); + + function expectWindowEqualsUpdate( + taskStat: AggregatedStat, + window: number[] + ) { + expect(taskStat.value.adhoc.created.counter).toEqual(sum(window)); } - }); - }); - test('returns a running count of adhoc task_counter', async () => { - const tasks = [0, 0, 0, 0, 0, 0, 0, 0]; - const events$ = new Subject(); - const taskPollingLifecycle = taskPollingLifecycleMock.create({ - events$: events$ as Observable, - }); - const adHocTaskCounter = new AdHocTaskCounter(); - - const runningAverageWindowSize = 5; - const BackgroundTaskUtilizationAggregator = createBackgroundTaskUtilizationAggregator( - taskPollingLifecycle, - runningAverageWindowSize, - adHocTaskCounter, - pollInterval - ); - - function expectWindowEqualsUpdate( - taskStat: AggregatedStat, - window: number[] - ) { - expect(taskStat.value.adhoc.ran.service_time.task_counter).toEqual(window.length); - } - - return new Promise((resolve) => { - BackgroundTaskUtilizationAggregator.pipe( - // skip initial stat which is just initialized data which - // ensures we don't stall on combineLatest - skip(1), - // Use 'summarizeUtilizationStat' to receive summarize stats - map(({ key, value }: AggregatedStat) => ({ - key, - value, - })), - take(tasks.length), - bufferCount(tasks.length) - ).subscribe((taskStats: Array>) => { - expectWindowEqualsUpdate(taskStats[0], tasks.slice(0, 1)); - expectWindowEqualsUpdate(taskStats[1], tasks.slice(0, 2)); - expectWindowEqualsUpdate(taskStats[2], tasks.slice(0, 3)); - expectWindowEqualsUpdate(taskStats[3], tasks.slice(0, 4)); - expectWindowEqualsUpdate(taskStats[4], tasks.slice(0, 5)); - // from the 6th value, begin to drop old values as out window is 5 - expectWindowEqualsUpdate(taskStats[5], tasks.slice(0, 6)); - expectWindowEqualsUpdate(taskStats[6], tasks.slice(0, 7)); - expectWindowEqualsUpdate(taskStats[7], tasks.slice(0, 8)); - resolve(); + return new Promise((resolve) => { + BackgroundTaskUtilizationAggregator.pipe( + // skip initial stat which is just initialized data which + // ensures we don't stall on combineLatest + skip(1), + // Use 'summarizeUtilizationStat' to receive summarize stats + map(({ key, value }: AggregatedStat) => ({ + key, + value, + })), + take(tasks.length), + bufferCount(tasks.length) + ).subscribe((taskStats: Array>) => { + expectWindowEqualsUpdate(taskStats[0], tasks.slice(0, 1)); + expectWindowEqualsUpdate(taskStats[1], tasks.slice(0, 2)); + expectWindowEqualsUpdate(taskStats[2], tasks.slice(0, 3)); + expectWindowEqualsUpdate(taskStats[3], tasks.slice(0, 4)); + expectWindowEqualsUpdate(taskStats[4], tasks.slice(0, 5)); + // from the 6th value, begin to drop old values as out window is 5 + expectWindowEqualsUpdate(taskStats[5], tasks.slice(0, 6)); + expectWindowEqualsUpdate(taskStats[6], tasks.slice(0, 7)); + expectWindowEqualsUpdate(taskStats[7], tasks.slice(0, 8)); + resolve(); + }); + + for (const task of tasks) { + adHocTaskCounter.increment(task); + events$.next(mockTaskRunEvent({}, { start: 0, stop: 0 })); + } }); + }); - for (const task of tasks) { - events$.next(mockTaskRunEvent({}, { start: task, stop: task })); + test('returns a running count of recurring actual service_time', async () => { + const serviceTimes = [1000, 2000, 500, 300, 400, 15000, 20000, 200]; + const events$ = new Subject(); + const taskPollingLifecycle = taskPollingLifecycleMock.create({ + events$: events$ as Observable, + }); + const adHocTaskCounter = new AdHocTaskCounter(); + + const BackgroundTaskUtilizationAggregator = createBackgroundTaskUtilizationAggregator( + taskPollingLifecycle, + adHocTaskCounter, + pollInterval + ); + + function expectWindowEqualsUpdate( + taskStat: AggregatedStat, + window: number[] + ) { + expect(taskStat.value.recurring.ran.service_time.actual).toEqual(sum(window)); } - }); - }); - test('returns a running count of adhoc created counter', async () => { - const tasks = [1000, 2000, 500, 300, 400, 15000, 20000, 200]; - const events$ = new Subject(); - const taskPollingLifecycle = taskPollingLifecycleMock.create({ - events$: events$ as Observable, - }); - const adHocTaskCounter = new AdHocTaskCounter(); - - const runningAverageWindowSize = 5; - const BackgroundTaskUtilizationAggregator = createBackgroundTaskUtilizationAggregator( - taskPollingLifecycle, - runningAverageWindowSize, - adHocTaskCounter, - pollInterval - ); - - function expectWindowEqualsUpdate( - taskStat: AggregatedStat, - window: number[] - ) { - expect(taskStat.value.adhoc.created.counter).toEqual(sum(window)); - } - - return new Promise((resolve) => { - BackgroundTaskUtilizationAggregator.pipe( - // skip initial stat which is just initialized data which - // ensures we don't stall on combineLatest - skip(1), - // Use 'summarizeUtilizationStat' to receive summarize stats - map(({ key, value }: AggregatedStat) => ({ - key, - value, - })), - take(tasks.length), - bufferCount(tasks.length) - ).subscribe((taskStats: Array>) => { - expectWindowEqualsUpdate(taskStats[0], tasks.slice(0, 1)); - expectWindowEqualsUpdate(taskStats[1], tasks.slice(0, 2)); - expectWindowEqualsUpdate(taskStats[2], tasks.slice(0, 3)); - expectWindowEqualsUpdate(taskStats[3], tasks.slice(0, 4)); - expectWindowEqualsUpdate(taskStats[4], tasks.slice(0, 5)); - // from the 6th value, begin to drop old values as out window is 5 - expectWindowEqualsUpdate(taskStats[5], tasks.slice(0, 6)); - expectWindowEqualsUpdate(taskStats[6], tasks.slice(0, 7)); - expectWindowEqualsUpdate(taskStats[7], tasks.slice(0, 8)); - resolve(); + return new Promise((resolve) => { + const events = []; + const now = Date.now(); + for (const time of serviceTimes) { + events.push({ start: runAtMillisecondsAgo(now, time).getTime(), stop: now }); + } + BackgroundTaskUtilizationAggregator.pipe( + // skip initial stat which is just initialized data which + // ensures we don't stall on combineLatest + skip(1), + // Use 'summarizeUtilizationStat' to receive summarize stats + map(({ key, value }: AggregatedStat) => ({ + key, + value, + })), + take(serviceTimes.length), + bufferCount(serviceTimes.length) + ).subscribe((taskStats: Array>) => { + expectWindowEqualsUpdate(taskStats[0], serviceTimes.slice(0, 1)); + expectWindowEqualsUpdate(taskStats[1], serviceTimes.slice(0, 2)); + expectWindowEqualsUpdate(taskStats[2], serviceTimes.slice(0, 3)); + expectWindowEqualsUpdate(taskStats[3], serviceTimes.slice(0, 4)); + expectWindowEqualsUpdate(taskStats[4], serviceTimes.slice(0, 5)); + // from the 6th value, begin to drop old values as out window is 5 + expectWindowEqualsUpdate(taskStats[5], serviceTimes.slice(0, 6)); + expectWindowEqualsUpdate(taskStats[6], serviceTimes.slice(0, 7)); + expectWindowEqualsUpdate(taskStats[7], serviceTimes.slice(0, 8)); + resolve(); + }); + + for (const event of events) { + events$.next(mockTaskRunEvent({ schedule: { interval: '1h' } }, event)); + } }); + }); - for (const task of tasks) { - adHocTaskCounter.increment(task); - events$.next(mockTaskRunEvent({}, { start: 0, stop: 0 })); + test('returns a running count of recurring adjusted service_time', async () => { + const serviceTimes = [1000, 2000, 500, 300, 400, 15000, 20000, 200]; + const events$ = new Subject(); + const taskPollingLifecycle = taskPollingLifecycleMock.create({ + events$: events$ as Observable, + }); + const adHocTaskCounter = new AdHocTaskCounter(); + + const BackgroundTaskUtilizationAggregator = createBackgroundTaskUtilizationAggregator( + taskPollingLifecycle, + adHocTaskCounter, + pollInterval + ); + + function expectWindowEqualsUpdate( + taskStat: AggregatedStat, + window: number[] + ) { + expect(taskStat.value.recurring.ran.service_time.adjusted).toEqual(sum(window)); } - }); - }); - test('returns a running count of recurring actual service_time', async () => { - const serviceTimes = [1000, 2000, 500, 300, 400, 15000, 20000, 200]; - const events$ = new Subject(); - const taskPollingLifecycle = taskPollingLifecycleMock.create({ - events$: events$ as Observable, + return new Promise((resolve) => { + const events = []; + const now = Date.now(); + for (const time of serviceTimes) { + events.push({ start: runAtMillisecondsAgo(now, time).getTime(), stop: now }); + } + BackgroundTaskUtilizationAggregator.pipe( + // skip initial stat which is just initialized data which + // ensures we don't stall on combineLatest + skip(1), + // Use 'summarizeUtilizationStat' to receive summarize stats + map(({ key, value }: AggregatedStat) => ({ + key, + value, + })), + take(serviceTimes.length), + bufferCount(serviceTimes.length) + ).subscribe((taskStats: Array>) => { + expectWindowEqualsUpdate(taskStats[0], roundUpToNearestSec(serviceTimes.slice(0, 1), 3)); + expectWindowEqualsUpdate(taskStats[1], roundUpToNearestSec(serviceTimes.slice(0, 2), 3)); + expectWindowEqualsUpdate(taskStats[2], roundUpToNearestSec(serviceTimes.slice(0, 3), 3)); + expectWindowEqualsUpdate(taskStats[3], roundUpToNearestSec(serviceTimes.slice(0, 4), 3)); + expectWindowEqualsUpdate(taskStats[4], roundUpToNearestSec(serviceTimes.slice(0, 5), 3)); + // from the 6th value, begin to drop old values as out window is 5 + expectWindowEqualsUpdate(taskStats[5], roundUpToNearestSec(serviceTimes.slice(0, 6), 3)); + expectWindowEqualsUpdate(taskStats[6], roundUpToNearestSec(serviceTimes.slice(0, 7), 3)); + expectWindowEqualsUpdate(taskStats[7], roundUpToNearestSec(serviceTimes.slice(0, 8), 3)); + resolve(); + }); + + for (const event of events) { + events$.next(mockTaskRunEvent({ schedule: { interval: '1h' } }, event)); + } + }); }); - const adHocTaskCounter = new AdHocTaskCounter(); - - const runningAverageWindowSize = 5; - const BackgroundTaskUtilizationAggregator = createBackgroundTaskUtilizationAggregator( - taskPollingLifecycle, - runningAverageWindowSize, - adHocTaskCounter, - pollInterval - ); - - function expectWindowEqualsUpdate( - taskStat: AggregatedStat, - window: number[] - ) { - expect(taskStat.value.recurring.ran.service_time.actual).toEqual(sum(window)); - } - - return new Promise((resolve) => { - const events = []; - const now = Date.now(); - for (const time of serviceTimes) { - events.push({ start: runAtMillisecondsAgo(now, time).getTime(), stop: now }); + + test('returns a running count of recurring task_counter', async () => { + const tasks = [0, 0, 0, 0, 0, 0, 0, 0]; + const events$ = new Subject(); + const taskPollingLifecycle = taskPollingLifecycleMock.create({ + events$: events$ as Observable, + }); + const adHocTaskCounter = new AdHocTaskCounter(); + + const BackgroundTaskUtilizationAggregator = createBackgroundTaskUtilizationAggregator( + taskPollingLifecycle, + adHocTaskCounter, + pollInterval + ); + + function expectWindowEqualsUpdate( + taskStat: AggregatedStat, + window: number[] + ) { + expect(taskStat.value.recurring.ran.service_time.task_counter).toEqual(window.length); } - BackgroundTaskUtilizationAggregator.pipe( - // skip initial stat which is just initialized data which - // ensures we don't stall on combineLatest - skip(1), - // Use 'summarizeUtilizationStat' to receive summarize stats - map(({ key, value }: AggregatedStat) => ({ - key, - value, - })), - take(serviceTimes.length), - bufferCount(serviceTimes.length) - ).subscribe((taskStats: Array>) => { - expectWindowEqualsUpdate(taskStats[0], serviceTimes.slice(0, 1)); - expectWindowEqualsUpdate(taskStats[1], serviceTimes.slice(0, 2)); - expectWindowEqualsUpdate(taskStats[2], serviceTimes.slice(0, 3)); - expectWindowEqualsUpdate(taskStats[3], serviceTimes.slice(0, 4)); - expectWindowEqualsUpdate(taskStats[4], serviceTimes.slice(0, 5)); - // from the 6th value, begin to drop old values as out window is 5 - expectWindowEqualsUpdate(taskStats[5], serviceTimes.slice(0, 6)); - expectWindowEqualsUpdate(taskStats[6], serviceTimes.slice(0, 7)); - expectWindowEqualsUpdate(taskStats[7], serviceTimes.slice(0, 8)); - resolve(); + + return new Promise((resolve) => { + BackgroundTaskUtilizationAggregator.pipe( + // skip initial stat which is just initialized data which + // ensures we don't stall on combineLatest + skip(1), + // Use 'summarizeUtilizationStat' to receive summarize stats + map(({ key, value }: AggregatedStat) => ({ + key, + value, + })), + take(tasks.length), + bufferCount(tasks.length) + ).subscribe((taskStats: Array>) => { + expectWindowEqualsUpdate(taskStats[0], tasks.slice(0, 1)); + expectWindowEqualsUpdate(taskStats[1], tasks.slice(0, 2)); + expectWindowEqualsUpdate(taskStats[2], tasks.slice(0, 3)); + expectWindowEqualsUpdate(taskStats[3], tasks.slice(0, 4)); + expectWindowEqualsUpdate(taskStats[4], tasks.slice(0, 5)); + // from the 6th value, begin to drop old values as out window is 5 + expectWindowEqualsUpdate(taskStats[5], tasks.slice(0, 6)); + expectWindowEqualsUpdate(taskStats[6], tasks.slice(0, 7)); + expectWindowEqualsUpdate(taskStats[7], tasks.slice(0, 8)); + resolve(); + }); + + for (const task of tasks) { + events$.next( + mockTaskRunEvent({ schedule: { interval: '1h' } }, { start: task, stop: task }) + ); + } }); + }); - for (const event of events) { - events$.next(mockTaskRunEvent({ schedule: { interval: '1h' } }, event)); + test('returns a running count of load', async () => { + const loads = [40, 80, 100, 100, 10, 10, 60, 40]; + const events$ = new Subject(); + const taskPollingLifecycle = taskPollingLifecycleMock.create({ + events$: events$ as Observable, + }); + + const BackgroundTaskUtilizationAggregator = createBackgroundTaskUtilizationAggregator( + taskPollingLifecycle, + new AdHocTaskCounter(), + pollInterval + ); + + function expectWindowEqualsUpdate( + taskStat: AggregatedStat, + window: number[] + ) { + expect(taskStat.value.load).toEqual(mean(window)); } - }); - }); - test('returns a running count of recurring adjusted service_time', async () => { - const serviceTimes = [1000, 2000, 500, 300, 400, 15000, 20000, 200]; - const events$ = new Subject(); - const taskPollingLifecycle = taskPollingLifecycleMock.create({ - events$: events$ as Observable, + return new Promise((resolve) => { + BackgroundTaskUtilizationAggregator.pipe( + // skip initial stat which is just initialized data which + // ensures we don't stall on combineLatest + skip(1), + map(({ key, value }: AggregatedStat) => ({ + key, + value, + })), + take(loads.length), + bufferCount(loads.length) + ).subscribe((taskStats: Array>) => { + expectWindowEqualsUpdate(taskStats[0], loads.slice(0, 1)); + expectWindowEqualsUpdate(taskStats[1], loads.slice(0, 2)); + expectWindowEqualsUpdate(taskStats[2], loads.slice(0, 3)); + expectWindowEqualsUpdate(taskStats[3], loads.slice(0, 4)); + expectWindowEqualsUpdate(taskStats[4], loads.slice(0, 5)); + // from the 6th value, begin to drop old values as our window is 5 + expectWindowEqualsUpdate(taskStats[5], loads.slice(1, 6)); + expectWindowEqualsUpdate(taskStats[6], loads.slice(2, 7)); + expectWindowEqualsUpdate(taskStats[7], loads.slice(3, 8)); + resolve(); + }); + + for (const load of loads) { + events$.next(mockTaskStatEvent('workerUtilization', load)); + } + }); }); - const adHocTaskCounter = new AdHocTaskCounter(); - - const runningAverageWindowSize = 5; - const BackgroundTaskUtilizationAggregator = createBackgroundTaskUtilizationAggregator( - taskPollingLifecycle, - runningAverageWindowSize, - adHocTaskCounter, - pollInterval - ); - - function expectWindowEqualsUpdate( - taskStat: AggregatedStat, - window: number[] - ) { - expect(taskStat.value.recurring.ran.service_time.adjusted).toEqual(sum(window)); - } - - return new Promise((resolve) => { - const events = []; - const now = Date.now(); - for (const time of serviceTimes) { - events.push({ start: runAtMillisecondsAgo(now, time).getTime(), stop: now }); - } - BackgroundTaskUtilizationAggregator.pipe( - // skip initial stat which is just initialized data which - // ensures we don't stall on combineLatest - skip(1), - // Use 'summarizeUtilizationStat' to receive summarize stats - map(({ key, value }: AggregatedStat) => ({ - key, - value, - })), - take(serviceTimes.length), - bufferCount(serviceTimes.length) - ).subscribe((taskStats: Array>) => { - expectWindowEqualsUpdate(taskStats[0], roundUpToNearestSec(serviceTimes.slice(0, 1), 3)); - expectWindowEqualsUpdate(taskStats[1], roundUpToNearestSec(serviceTimes.slice(0, 2), 3)); - expectWindowEqualsUpdate(taskStats[2], roundUpToNearestSec(serviceTimes.slice(0, 3), 3)); - expectWindowEqualsUpdate(taskStats[3], roundUpToNearestSec(serviceTimes.slice(0, 4), 3)); - expectWindowEqualsUpdate(taskStats[4], roundUpToNearestSec(serviceTimes.slice(0, 5), 3)); - // from the 6th value, begin to drop old values as out window is 5 - expectWindowEqualsUpdate(taskStats[5], roundUpToNearestSec(serviceTimes.slice(0, 6), 3)); - expectWindowEqualsUpdate(taskStats[6], roundUpToNearestSec(serviceTimes.slice(0, 7), 3)); - expectWindowEqualsUpdate(taskStats[7], roundUpToNearestSec(serviceTimes.slice(0, 8), 3)); - resolve(); + + test('returns a running count of load with custom window size', async () => { + const loads = [40, 80, 100, 100, 10, 10, 60, 40]; + const events$ = new Subject(); + const taskPollingLifecycle = taskPollingLifecycleMock.create({ + events$: events$ as Observable, }); - for (const event of events) { - events$.next(mockTaskRunEvent({ schedule: { interval: '1h' } }, event)); + const BackgroundTaskUtilizationAggregator = createBackgroundTaskUtilizationAggregator( + taskPollingLifecycle, + new AdHocTaskCounter(), + pollInterval, + 3 + ); + + function expectWindowEqualsUpdate( + taskStat: AggregatedStat, + window: number[] + ) { + expect(taskStat.value.load).toEqual(mean(window)); } + + return new Promise((resolve) => { + BackgroundTaskUtilizationAggregator.pipe( + // skip initial stat which is just initialized data which + // ensures we don't stall on combineLatest + skip(1), + map(({ key, value }: AggregatedStat) => ({ + key, + value, + })), + take(loads.length), + bufferCount(loads.length) + ).subscribe((taskStats: Array>) => { + expectWindowEqualsUpdate(taskStats[0], loads.slice(0, 1)); + expectWindowEqualsUpdate(taskStats[1], loads.slice(0, 2)); + expectWindowEqualsUpdate(taskStats[2], loads.slice(0, 3)); + // from the 4th value, begin to drop old values as our window is 3 + expectWindowEqualsUpdate(taskStats[3], loads.slice(1, 4)); + expectWindowEqualsUpdate(taskStats[4], loads.slice(2, 5)); + expectWindowEqualsUpdate(taskStats[5], loads.slice(3, 6)); + expectWindowEqualsUpdate(taskStats[6], loads.slice(4, 7)); + expectWindowEqualsUpdate(taskStats[7], loads.slice(5, 8)); + resolve(); + }); + + for (const load of loads) { + events$.next(mockTaskStatEvent('workerUtilization', load)); + } + }); }); }); - test('returns a running count of recurring task_counter', async () => { - const tasks = [0, 0, 0, 0, 0, 0, 0, 0]; - const events$ = new Subject(); - const taskPollingLifecycle = taskPollingLifecycleMock.create({ - events$: events$ as Observable, + describe('summarizeUtilizationStats', () => { + const lastUpdate = '2023-04-02T17:34:41.371Z'; + const monitoredStats = { + timestamp: '2023-04-01T17:34:41.371Z', + value: { + adhoc: { + created: { + counter: 1, + }, + ran: { + service_time: { + actual: 10, + adjusted: 7, + task_counter: 3, + }, + }, + }, + recurring: { + ran: { + service_time: { + actual: 79, + adjusted: 66, + task_counter: 10, + }, + }, + }, + load: 63, + }, + }; + + test('should return null if monitoredStats is null', () => { + expect( + summarizeUtilizationStats({ + lastUpdate, + // @ts-expect-error + monitoredStats: null, + isInternal: false, + }) + ).toEqual({ + last_update: lastUpdate, + stats: null, + }); }); - const adHocTaskCounter = new AdHocTaskCounter(); - - const runningAverageWindowSize = 5; - const BackgroundTaskUtilizationAggregator = createBackgroundTaskUtilizationAggregator( - taskPollingLifecycle, - runningAverageWindowSize, - adHocTaskCounter, - pollInterval - ); - - function expectWindowEqualsUpdate( - taskStat: AggregatedStat, - window: number[] - ) { - expect(taskStat.value.recurring.ran.service_time.task_counter).toEqual(window.length); - } - - return new Promise((resolve) => { - BackgroundTaskUtilizationAggregator.pipe( - // skip initial stat which is just initialized data which - // ensures we don't stall on combineLatest - skip(1), - // Use 'summarizeUtilizationStat' to receive summarize stats - map(({ key, value }: AggregatedStat) => ({ - key, - value, - })), - take(tasks.length), - bufferCount(tasks.length) - ).subscribe((taskStats: Array>) => { - expectWindowEqualsUpdate(taskStats[0], tasks.slice(0, 1)); - expectWindowEqualsUpdate(taskStats[1], tasks.slice(0, 2)); - expectWindowEqualsUpdate(taskStats[2], tasks.slice(0, 3)); - expectWindowEqualsUpdate(taskStats[3], tasks.slice(0, 4)); - expectWindowEqualsUpdate(taskStats[4], tasks.slice(0, 5)); - // from the 6th value, begin to drop old values as out window is 5 - expectWindowEqualsUpdate(taskStats[5], tasks.slice(0, 6)); - expectWindowEqualsUpdate(taskStats[6], tasks.slice(0, 7)); - expectWindowEqualsUpdate(taskStats[7], tasks.slice(0, 8)); - resolve(); + + test('should return null if monitoredStats value is not defined', () => { + expect( + summarizeUtilizationStats({ + lastUpdate, + // @ts-expect-error + monitoredStats: {}, + isInternal: false, + }) + ).toEqual({ + last_update: lastUpdate, + stats: null, }); + }); - for (const task of tasks) { - events$.next( - mockTaskRunEvent({ schedule: { interval: '1h' } }, { start: task, stop: task }) - ); - } + test('should return summary with all stats when isInternal is true', () => { + expect( + summarizeUtilizationStats({ + lastUpdate, + monitoredStats, + isInternal: true, + }) + ).toEqual({ + last_update: lastUpdate, + stats: { + timestamp: monitoredStats.timestamp, + value: monitoredStats.value, + }, + }); + }); + + test('should return summary with only public stats when isInternal is false', () => { + expect( + summarizeUtilizationStats({ + lastUpdate, + monitoredStats, + isInternal: false, + }) + ).toEqual({ + last_update: lastUpdate, + stats: { + timestamp: monitoredStats.timestamp, + value: { + load: 63, + }, + }, + }); }); }); }); @@ -440,6 +629,10 @@ function roundUpToNearestSec(duration: number[], s: number): number[] { return duration.map((d) => Math.ceil(d / pollInterval) * pollInterval); } +const mockTaskStatEvent = (type: TaskManagerStats, value: number) => { + return asTaskManagerStatEvent(type, asOk(value)); +}; + const mockTaskRunEvent = ( overrides: Partial = {}, timing: TaskTiming, diff --git a/x-pack/plugins/task_manager/server/monitoring/background_task_utilization_statistics.ts b/x-pack/plugins/task_manager/server/monitoring/background_task_utilization_statistics.ts index 761ad4252ac831..fd116cbdd71d82 100644 --- a/x-pack/plugins/task_manager/server/monitoring/background_task_utilization_statistics.ts +++ b/x-pack/plugins/task_manager/server/monitoring/background_task_utilization_statistics.ts @@ -6,17 +6,29 @@ */ import { JsonObject } from '@kbn/utility-types'; -import { get } from 'lodash'; +import { get, pick } from 'lodash'; +import stats from 'stats-lite'; import { combineLatest, filter, map, Observable, startWith } from 'rxjs'; import { AdHocTaskCounter } from '../lib/adhoc_task_counter'; -import { unwrap } from '../lib/result_type'; +import { mapOk, unwrap } from '../lib/result_type'; import { TaskLifecycleEvent, TaskPollingLifecycle } from '../polling_lifecycle'; import { ConcreteTaskInstance } from '../task'; -import { isTaskRunEvent, TaskRun, TaskTiming } from '../task_events'; +import { + isTaskManagerWorkerUtilizationStatEvent, + isTaskRunEvent, + TaskRun, + TaskTiming, +} from '../task_events'; import { MonitoredStat } from './monitoring_stats_stream'; import { AggregatedStat, AggregatedStatProvider } from './runtime_statistics_aggregator'; +import { createRunningAveragedStat } from './task_run_calcultors'; +import { DEFAULT_WORKER_UTILIZATION_RUNNING_AVERAGE_WINDOW } from '../config'; -export interface BackgroundTaskUtilizationStat extends JsonObject { +export interface PublicBackgroundTaskUtilizationStat extends JsonObject { + load: number; +} + +export interface BackgroundTaskUtilizationStat extends PublicBackgroundTaskUtilizationStat { adhoc: AdhocTaskStat; recurring: TaskStat; } @@ -39,11 +51,11 @@ interface AdhocTaskStat extends TaskStat { export function createBackgroundTaskUtilizationAggregator( taskPollingLifecycle: TaskPollingLifecycle, - runningAverageWindowSize: number, adHocTaskCounter: AdHocTaskCounter, - pollInterval: number + pollInterval: number, + workerUtilizationRunningAverageWindowSize: number = DEFAULT_WORKER_UTILIZATION_RUNNING_AVERAGE_WINDOW ): AggregatedStatProvider { - const taskRunEventToAdhocStat = createTaskRunEventToAdhocStat(runningAverageWindowSize); + const taskRunEventToAdhocStat = createTaskRunEventToAdhocStat(); const taskRunAdhocEvents$: Observable> = taskPollingLifecycle.events.pipe( filter((taskEvent: TaskLifecycleEvent) => isTaskRunEvent(taskEvent) && hasTiming(taskEvent)), @@ -57,7 +69,7 @@ export function createBackgroundTaskUtilizationAggregator( }) ); - const taskRunEventToRecurringStat = createTaskRunEventToRecurringStat(runningAverageWindowSize); + const taskRunEventToRecurringStat = createTaskRunEventToRecurringStat(); const taskRunRecurringEvents$: Observable> = taskPollingLifecycle.events.pipe( filter((taskEvent: TaskLifecycleEvent) => isTaskRunEvent(taskEvent) && hasTiming(taskEvent)), @@ -71,6 +83,18 @@ export function createBackgroundTaskUtilizationAggregator( }) ); + const taskManagerUtilizationEventToLoadStat = createTaskRunEventToLoadStat( + workerUtilizationRunningAverageWindowSize + ); + + const taskManagerWorkerUtilizationEvent$: Observable< + Pick + > = taskPollingLifecycle.events.pipe( + filter(isTaskManagerWorkerUtilizationStatEvent), + map((taskEvent: TaskLifecycleEvent) => taskEvent.event), + map(mapOk((num: number) => taskManagerUtilizationEventToLoadStat(num))) + ); + return combineLatest([ taskRunAdhocEvents$.pipe( startWith({ @@ -101,17 +125,24 @@ export function createBackgroundTaskUtilizationAggregator( }, }) ), + taskManagerWorkerUtilizationEvent$.pipe( + startWith({ + load: 0, + }) + ), ]).pipe( map( - ([adhoc, recurring]: [ + ([adhoc, recurring, load]: [ Pick, - Pick + Pick, + Pick ]) => { return { key: 'utilization', value: { ...adhoc, ...recurring, + ...load, }, } as AggregatedStat; } @@ -123,31 +154,41 @@ function hasTiming(taskEvent: TaskLifecycleEvent) { return !!taskEvent?.timing; } -export function summarizeUtilizationStats({ - // eslint-disable-next-line @typescript-eslint/naming-convention - last_update, - stats, -}: { - last_update: string; - stats: MonitoredStat | undefined; -}): { +interface SummarizeUtilizationStatsOpts { + lastUpdate: string; + monitoredStats: MonitoredStat | undefined; + isInternal: boolean; +} + +interface SummarizeUtilizationStatsResult { last_update: string; - stats: MonitoredStat | null; -} { - const utilizationStats = stats?.value; + stats: + | MonitoredStat + | MonitoredStat + | null; +} + +export function summarizeUtilizationStats({ + lastUpdate, + monitoredStats, + isInternal, +}: SummarizeUtilizationStatsOpts): SummarizeUtilizationStatsResult { + const utilizationStats = monitoredStats?.value; + + if (!monitoredStats || !utilizationStats) { + return { last_update: lastUpdate, stats: null }; + } + return { - last_update, - stats: - stats && utilizationStats - ? { - timestamp: stats.timestamp, - value: utilizationStats, - } - : null, + last_update: lastUpdate, + stats: { + timestamp: monitoredStats.timestamp, + value: isInternal ? utilizationStats : pick(utilizationStats, 'load'), + }, }; } -function createTaskRunEventToAdhocStat(runningAverageWindowSize: number) { +function createTaskRunEventToAdhocStat() { let createdCounter = 0; let actualCounter = 0; let adjustedCounter = 0; @@ -177,7 +218,7 @@ function createTaskRunEventToAdhocStat(runningAverageWindowSize: number) { }; } -function createTaskRunEventToRecurringStat(runningAverageWindowSize: number) { +function createTaskRunEventToRecurringStat() { let actualCounter = 0; let adjustedCounter = 0; let taskCounter = 0; @@ -201,6 +242,16 @@ function createTaskRunEventToRecurringStat(runningAverageWindowSize: number) { }; } +function createTaskRunEventToLoadStat(workerUtilizationRunningAverageWindowSize: number) { + const loadQueue = createRunningAveragedStat(workerUtilizationRunningAverageWindowSize); + return (load: number): Pick => { + const historicalLoad = loadQueue(load); + return { + load: stats.mean(historicalLoad), + }; + }; +} + function getServiceTimeStats(timing: TaskTiming, pollInterval: number) { const duration = timing!.stop - timing!.start; const adjusted = Math.ceil(duration / pollInterval) * pollInterval; diff --git a/x-pack/plugins/task_manager/server/monitoring/configuration_statistics.test.ts b/x-pack/plugins/task_manager/server/monitoring/configuration_statistics.test.ts index 764b7aa15335b6..b63c52893c5d88 100644 --- a/x-pack/plugins/task_manager/server/monitoring/configuration_statistics.test.ts +++ b/x-pack/plugins/task_manager/server/monitoring/configuration_statistics.test.ts @@ -45,6 +45,7 @@ describe('Configuration Statistics Aggregator', () => { monitor: true, warn_threshold: 5000, }, + worker_utilization_running_average_window: 5, }; const managedConfig = { diff --git a/x-pack/plugins/task_manager/server/monitoring/monitoring_stats_stream.test.ts b/x-pack/plugins/task_manager/server/monitoring/monitoring_stats_stream.test.ts index 610e0bf080b051..8d6df288e702da 100644 --- a/x-pack/plugins/task_manager/server/monitoring/monitoring_stats_stream.test.ts +++ b/x-pack/plugins/task_manager/server/monitoring/monitoring_stats_stream.test.ts @@ -49,6 +49,7 @@ describe('createMonitoringStatsStream', () => { monitor: true, warn_threshold: 5000, }, + worker_utilization_running_average_window: 5, }; it('returns the initial config used to configure Task Manager', async () => { diff --git a/x-pack/plugins/task_manager/server/monitoring/monitoring_stats_stream.ts b/x-pack/plugins/task_manager/server/monitoring/monitoring_stats_stream.ts index e192a72f7092df..e1ff38d1c96078 100644 --- a/x-pack/plugins/task_manager/server/monitoring/monitoring_stats_stream.ts +++ b/x-pack/plugins/task_manager/server/monitoring/monitoring_stats_stream.ts @@ -108,9 +108,9 @@ export function createAggregators( createTaskRunAggregator(taskPollingLifecycle, config.monitored_stats_running_average_window), createBackgroundTaskUtilizationAggregator( taskPollingLifecycle, - config.monitored_stats_running_average_window, adHocTaskCounter, - config.poll_interval + config.poll_interval, + config.worker_utilization_running_average_window ) ); } diff --git a/x-pack/plugins/task_manager/server/plugin.test.ts b/x-pack/plugins/task_manager/server/plugin.test.ts index ad02e6d7490f5d..fce648c87f6e9d 100644 --- a/x-pack/plugins/task_manager/server/plugin.test.ts +++ b/x-pack/plugins/task_manager/server/plugin.test.ts @@ -70,6 +70,7 @@ const pluginInitializerContextParams = { monitor: true, warn_threshold: 5000, }, + worker_utilization_running_average_window: 5, }; describe('TaskManagerPlugin', () => { diff --git a/x-pack/plugins/task_manager/server/polling_lifecycle.test.ts b/x-pack/plugins/task_manager/server/polling_lifecycle.test.ts index 51cac494d3a670..9e62eec402b2dc 100644 --- a/x-pack/plugins/task_manager/server/polling_lifecycle.test.ts +++ b/x-pack/plugins/task_manager/server/polling_lifecycle.test.ts @@ -75,6 +75,7 @@ describe('TaskPollingLifecycle', () => { monitor: true, warn_threshold: 5000, }, + worker_utilization_running_average_window: 5, }, taskStore: mockTaskStore, logger: taskManagerLogger, diff --git a/x-pack/plugins/task_manager/server/polling_lifecycle.ts b/x-pack/plugins/task_manager/server/polling_lifecycle.ts index ce5a142dc433e0..e9b9eaca1e5d31 100644 --- a/x-pack/plugins/task_manager/server/polling_lifecycle.ts +++ b/x-pack/plugins/task_manager/server/polling_lifecycle.ts @@ -176,8 +176,13 @@ export class TaskPollingLifecycle { const capacity = this.pool.availableWorkers; if (!capacity) { // if there isn't capacity, emit a load event so that we can expose how often - // high load causes the poller to skip work (work isn'tcalled when there is no capacity) + // high load causes the poller to skip work (work isn't called when there is no capacity) this.emitEvent(asTaskManagerStatEvent('load', asOk(this.pool.workerLoad))); + + // Emit event indicating task manager utilization + this.emitEvent( + asTaskManagerStatEvent('workerUtilization', asOk(this.pool.workerLoad)) + ); } return capacity; }, @@ -187,7 +192,16 @@ export class TaskPollingLifecycle { // (such as polling for new work, marking tasks as running etc.) but does not // include the time of actually running the task workTimeout: pollInterval * maxPollInactivityCycles, - }), + }).pipe( + tap( + mapOk(() => { + // Emit event indicating task manager utilization % at the end of a polling cycle + this.emitEvent( + asTaskManagerStatEvent('workerUtilization', asOk(this.pool.workerLoad)) + ); + }) + ) + ), { heartbeatInterval: pollInterval, // Time out the poller itself if it has failed to complete the entire stream for a certain amount of time. diff --git a/x-pack/plugins/task_manager/server/routes/background_task_utilization.test.ts b/x-pack/plugins/task_manager/server/routes/background_task_utilization.test.ts index 66052b095a2585..1ccc0d89f804a9 100644 --- a/x-pack/plugins/task_manager/server/routes/background_task_utilization.test.ts +++ b/x-pack/plugins/task_manager/server/routes/background_task_utilization.test.ts @@ -37,7 +37,7 @@ describe('backgroundTaskUtilizationRoute', () => { jest.resetAllMocks(); }); - it('registers the route', async () => { + it('registers internal and public route', async () => { const router = httpServiceMock.createRouter(); backgroundTaskUtilizationRoute({ router, @@ -51,11 +51,15 @@ describe('backgroundTaskUtilizationRoute', () => { usageCounter: mockUsageCounter, }); - const [config] = router.get.mock.calls[0]; + const [config1] = router.get.mock.calls[0]; - expect(config.path).toMatchInlineSnapshot( + expect(config1.path).toMatchInlineSnapshot( `"/internal/task_manager/_background_task_utilization"` ); + + const [config2] = router.get.mock.calls[1]; + + expect(config2.path).toMatchInlineSnapshot(`"/api/task_manager/_background_task_utilization"`); }); it('checks user privileges and increments usage counter when API is accessed', async () => { diff --git a/x-pack/plugins/task_manager/server/routes/background_task_utilization.ts b/x-pack/plugins/task_manager/server/routes/background_task_utilization.ts index a1e0f5c4281f94..0d7eabee2a7ce2 100644 --- a/x-pack/plugins/task_manager/server/routes/background_task_utilization.ts +++ b/x-pack/plugins/task_manager/server/routes/background_task_utilization.ts @@ -21,6 +21,7 @@ import { MonitoringStats } from '../monitoring'; import { TaskManagerConfig } from '../config'; import { BackgroundTaskUtilizationStat, + PublicBackgroundTaskUtilizationStat, summarizeUtilizationStats, } from '../monitoring/background_task_utilization_statistics'; import { MonitoredStat } from '../monitoring/monitoring_stats_stream'; @@ -29,7 +30,10 @@ export interface MonitoredUtilization { process_uuid: string; timestamp: string; last_update: string; - stats: MonitoredStat | null; + stats: + | MonitoredStat + | MonitoredStat + | null; } export interface BackgroundTaskUtilRouteParams { @@ -44,6 +48,12 @@ export interface BackgroundTaskUtilRouteParams { usageCounter?: UsageCounter; } +// Create an internal and public route so we can test out experimental metrics +const routeOptions = [ + { basePath: 'internal', isInternal: true }, + { basePath: 'api', isInternal: false }, +]; + export function backgroundTaskUtilizationRoute( params: BackgroundTaskUtilRouteParams ): Observable { @@ -61,10 +71,11 @@ export function backgroundTaskUtilizationRoute( const requiredHotStatsFreshness: number = config.monitored_stats_required_freshness; - function getBackgroundTaskUtilization(monitoredStats: MonitoringStats) { + function getBackgroundTaskUtilization(monitoredStats: MonitoringStats, isInternal: boolean) { const summarizedStats = summarizeUtilizationStats({ - last_update: monitoredStats.last_update, - stats: monitoredStats.stats.utilization, + lastUpdate: monitoredStats.last_update, + monitoredStats: monitoredStats.stats.utilization, + isInternal, }); const now = Date.now(); const timestamp = new Date(now).toISOString(); @@ -83,7 +94,7 @@ export function backgroundTaskUtilizationRoute( }), // Only calculate the summarized stats (calculates all running averages and evaluates state) // when needed by throttling down to the requiredHotStatsFreshness - map((stats) => getBackgroundTaskUtilization(stats)) + map((stats) => getBackgroundTaskUtilization(stats, true)) ) .subscribe((utilizationStats) => { monitoredUtilization$.next(utilizationStats); @@ -92,58 +103,60 @@ export function backgroundTaskUtilizationRoute( } }); - router.get( - { - path: '/internal/task_manager/_background_task_utilization', - // Uncomment when we determine that we can restrict API usage to Global admins based on telemetry - // options: { tags: ['access:taskManager'] }, - validate: false, - }, - async function ( - context: RequestHandlerContext, - req: KibanaRequest, - res: KibanaResponseFactory - ): Promise { - // If we are able to count usage, we want to check whether the user has access to - // the `taskManager` feature, which is only available as part of the Global All privilege. - if (usageCounter) { - const clusterClient = await getClusterClient(); - const hasPrivilegesResponse = await clusterClient - .asScoped(req) - .asCurrentUser.security.hasPrivileges({ - body: { - application: [ - { - application: `kibana-${kibanaIndexName}`, - resources: ['*'], - privileges: [`api:${kibanaVersion}:taskManager`], - }, - ], - }, - }); + routeOptions.forEach((routeOption) => { + router.get( + { + path: `/${routeOption.basePath}/task_manager/_background_task_utilization`, + // Uncomment when we determine that we can restrict API usage to Global admins based on telemetry + // options: { tags: ['access:taskManager'] }, + validate: false, + }, + async function ( + _: RequestHandlerContext, + req: KibanaRequest, + res: KibanaResponseFactory + ): Promise { + // If we are able to count usage, we want to check whether the user has access to + // the `taskManager` feature, which is only available as part of the Global All privilege. + if (usageCounter) { + const clusterClient = await getClusterClient(); + const hasPrivilegesResponse = await clusterClient + .asScoped(req) + .asCurrentUser.security.hasPrivileges({ + body: { + application: [ + { + application: `kibana-${kibanaIndexName}`, + resources: ['*'], + privileges: [`api:${kibanaVersion}:taskManager`], + }, + ], + }, + }); - // Keep track of total access vs admin access - usageCounter.incrementCounter({ - counterName: `taskManagerBackgroundTaskUtilApiAccess`, - counterType: 'taskManagerBackgroundTaskUtilApi', - incrementBy: 1, - }); - if (hasPrivilegesResponse.has_all_requested) { + // Keep track of total access vs admin access usageCounter.incrementCounter({ - counterName: `taskManagerBackgroundTaskUtilApiAdminAccess`, + counterName: `taskManagerBackgroundTaskUtilApiAccess`, counterType: 'taskManagerBackgroundTaskUtilApi', incrementBy: 1, }); + if (hasPrivilegesResponse.has_all_requested) { + usageCounter.incrementCounter({ + counterName: `taskManagerBackgroundTaskUtilApiAdminAccess`, + counterType: 'taskManagerBackgroundTaskUtilApi', + incrementBy: 1, + }); + } } - } - return res.ok({ - body: lastMonitoredStats - ? getBackgroundTaskUtilization(lastMonitoredStats) - : { process_uuid: taskManagerId, timestamp: new Date().toISOString(), stats: {} }, - }); - } - ); + return res.ok({ + body: lastMonitoredStats + ? getBackgroundTaskUtilization(lastMonitoredStats, routeOption.isInternal) + : { process_uuid: taskManagerId, timestamp: new Date().toISOString(), stats: {} }, + }); + } + ); + }); return monitoredUtilization$; } diff --git a/x-pack/plugins/task_manager/server/task_events.ts b/x-pack/plugins/task_manager/server/task_events.ts index d0bcaf2724188b..e8808322b397a1 100644 --- a/x-pack/plugins/task_manager/server/task_events.ts +++ b/x-pack/plugins/task_manager/server/task_events.ts @@ -87,7 +87,8 @@ export type TaskManagerStats = | 'pollingDelay' | 'claimDuration' | 'queuedEphemeralTasks' - | 'ephemeralTaskDelay'; + | 'ephemeralTaskDelay' + | 'workerUtilization'; export type TaskManagerStat = TaskEvent; export type OkResultOf = EventType extends TaskEvent @@ -211,6 +212,11 @@ export function isTaskManagerStatEvent( ): taskEvent is TaskManagerStat { return taskEvent.type === TaskEventType.TASK_MANAGER_STAT; } +export function isTaskManagerWorkerUtilizationStatEvent( + taskEvent: TaskEvent +): taskEvent is TaskManagerStat { + return taskEvent.type === TaskEventType.TASK_MANAGER_STAT && taskEvent.id === 'workerUtilization'; +} export function isEphemeralTaskRejectedDueToCapacityEvent( taskEvent: TaskEvent ): taskEvent is EphemeralTaskRejectedDueToCapacity { diff --git a/x-pack/plugins/task_manager/server/usage/task_manager_usage_collector.test.ts b/x-pack/plugins/task_manager/server/usage/task_manager_usage_collector.test.ts index f41a0598ff50c8..c8cb2787a90fbe 100644 --- a/x-pack/plugins/task_manager/server/usage/task_manager_usage_collector.test.ts +++ b/x-pack/plugins/task_manager/server/usage/task_manager_usage_collector.test.ts @@ -19,6 +19,8 @@ import { registerTaskManagerUsageCollector } from './task_manager_usage_collecto import { sleep } from '../test_utils'; import { TaskManagerUsage } from './types'; import { MonitoredUtilization } from '../routes/background_task_utilization'; +import { MonitoredStat } from '../monitoring/monitoring_stats_stream'; +import { BackgroundTaskUtilizationStat } from '../monitoring/background_task_utilization_statistics'; describe('registerTaskManagerUsageCollector', () => { let collector: Collector; @@ -113,18 +115,20 @@ describe('registerTaskManagerUsageCollector', () => { const mockHealth = getMockMonitoredHealth(); monitoringStats$.next(mockHealth); const mockUtilization = getMockMonitoredUtilization(); + const mockUtilizationStats = + mockUtilization.stats as MonitoredStat; monitoringUtilization$.next(mockUtilization); await sleep(1001); expect(usageCollectionMock.makeUsageCollector).toBeCalled(); const telemetry: TaskManagerUsage = (await collector.fetch(fetchContext)) as TaskManagerUsage; expect(telemetry.recurring_tasks).toEqual({ - actual_service_time: mockUtilization.stats?.value.recurring.ran.service_time.actual, - adjusted_service_time: mockUtilization.stats?.value.recurring.ran.service_time.adjusted, + actual_service_time: mockUtilizationStats?.value.recurring.ran.service_time.actual, + adjusted_service_time: mockUtilizationStats?.value.recurring.ran.service_time.adjusted, }); expect(telemetry.adhoc_tasks).toEqual({ - actual_service_time: mockUtilization.stats?.value.adhoc.ran.service_time.actual, - adjusted_service_time: mockUtilization.stats?.value.adhoc.ran.service_time.adjusted, + actual_service_time: mockUtilizationStats?.value.adhoc.ran.service_time.actual, + adjusted_service_time: mockUtilizationStats?.value.adhoc.ran.service_time.adjusted, }); }); @@ -308,6 +312,7 @@ function getMockMonitoredUtilization(overrides = {}): MonitoredUtilization { stats: { timestamp: new Date().toISOString(), value: { + load: 6, adhoc: { created: { counter: 5, diff --git a/x-pack/plugins/task_manager/server/usage/task_manager_usage_collector.ts b/x-pack/plugins/task_manager/server/usage/task_manager_usage_collector.ts index a4bd3049a0b499..6c8809c5c3d982 100644 --- a/x-pack/plugins/task_manager/server/usage/task_manager_usage_collector.ts +++ b/x-pack/plugins/task_manager/server/usage/task_manager_usage_collector.ts @@ -9,6 +9,8 @@ import { UsageCollectionSetup } from '@kbn/usage-collection-plugin/server'; import { MonitoredHealth } from '../routes/health'; import { TaskManagerUsage } from './types'; import { MonitoredUtilization } from '../routes/background_task_utilization'; +import { BackgroundTaskUtilizationStat } from '../monitoring/background_task_utilization_statistics'; +import { MonitoredStat } from '../monitoring/monitoring_stats_stream'; export function createTaskManagerUsageCollector( usageCollection: UsageCollectionSetup, @@ -19,12 +21,13 @@ export function createTaskManagerUsageCollector( excludeTaskTypes: string[] ) { let lastMonitoredHealth: MonitoredHealth | null = null; - let lastMonitoredUtilization: MonitoredUtilization | null = null; + let lastMonitoredUtilizationStats: MonitoredStat | null = null; combineLatest([monitoringStats$, monitoredUtilization$]) .pipe() .subscribe(([health, utilization]) => { lastMonitoredHealth = health; - lastMonitoredUtilization = utilization; + lastMonitoredUtilizationStats = + (utilization?.stats as MonitoredStat) ?? null; }); return usageCollection.makeUsageCollector({ @@ -69,15 +72,15 @@ export function createTaskManagerUsageCollector( ), recurring_tasks: { actual_service_time: - lastMonitoredUtilization?.stats?.value.recurring.ran.service_time.actual ?? 0, + lastMonitoredUtilizationStats?.value.recurring.ran.service_time.actual ?? 0, adjusted_service_time: - lastMonitoredUtilization?.stats?.value.recurring.ran.service_time.adjusted ?? 0, + lastMonitoredUtilizationStats?.value.recurring.ran.service_time.adjusted ?? 0, }, adhoc_tasks: { actual_service_time: - lastMonitoredUtilization?.stats?.value.adhoc.ran.service_time.actual ?? 0, + lastMonitoredUtilizationStats?.value.adhoc.ran.service_time.actual ?? 0, adjusted_service_time: - lastMonitoredUtilization?.stats?.value.adhoc.ran.service_time.adjusted ?? 0, + lastMonitoredUtilizationStats?.value.adhoc.ran.service_time.adjusted ?? 0, }, capacity: lastMonitoredHealth?.stats.capacity_estimation?.value.observed diff --git a/x-pack/test/plugin_api_integration/test_suites/task_manager/background_task_utilization_route.ts b/x-pack/test/plugin_api_integration/test_suites/task_manager/background_task_utilization_route.ts index f58a5d473ca790..9c9dcbbe15126a 100644 --- a/x-pack/test/plugin_api_integration/test_suites/task_manager/background_task_utilization_route.ts +++ b/x-pack/test/plugin_api_integration/test_suites/task_manager/background_task_utilization_route.ts @@ -8,39 +8,11 @@ import expect from '@kbn/expect'; import url from 'url'; import supertest from 'supertest'; +import { MonitoredUtilization } from '@kbn/task-manager-plugin/server/routes/background_task_utilization'; +import { MonitoredStat } from '@kbn/task-manager-plugin/server/monitoring/monitoring_stats_stream'; +import { BackgroundTaskUtilizationStat } from '@kbn/task-manager-plugin/server/monitoring/background_task_utilization_statistics'; import { FtrProviderContext } from '../../ftr_provider_context'; -interface MonitoringStats { - last_update: string; - status: string; - stats: { - timestamp: string; - value: { - adhoc: { - created: { - counter: number; - }; - ran: { - service_time: { - actual: number; - adjusted: number; - task_counter: number; - }; - }; - }; - recurring: { - ran: { - service_time: { - actual: number; - adjusted: number; - task_counter: number; - }; - }; - }; - }; - }; -} - export default function ({ getService }: FtrProviderContext) { const config = getService('config'); const retry = getService('retry'); @@ -48,21 +20,21 @@ export default function ({ getService }: FtrProviderContext) { const delay = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms)); - function getUtilizationRequest() { + function getUtilizationRequest(isInternal: boolean = true) { return request - .get('/internal/task_manager/_background_task_utilization') + .get(`/${isInternal ? 'internal' : 'api'}/task_manager/_background_task_utilization`) .set('kbn-xsrf', 'foo'); } - function getUtilization(): Promise { - return getUtilizationRequest() + function getUtilization(isInternal: boolean = true): Promise { + return getUtilizationRequest(isInternal) .expect(200) .then((response) => response.body); } - function getBackgroundTaskUtilization(): Promise { + function getBackgroundTaskUtilization(isInternal: boolean = true): Promise { return retry.try(async () => { - const utilization = await getUtilization(); + const utilization = await getUtilization(isInternal); if (utilization.stats) { return utilization; @@ -79,7 +51,8 @@ export default function ({ getService }: FtrProviderContext) { value: { recurring: { ran }, }, - } = (await getBackgroundTaskUtilization()).stats; + } = (await getBackgroundTaskUtilization(true)) + .stats as MonitoredStat; const serviceTime = ran.service_time; expect(typeof serviceTime.actual).to.eql('number'); expect(typeof serviceTime.adjusted).to.eql('number'); @@ -91,7 +64,8 @@ export default function ({ getService }: FtrProviderContext) { value: { adhoc: { created, ran }, }, - } = (await getBackgroundTaskUtilization()).stats; + } = (await getBackgroundTaskUtilization(true)) + .stats as MonitoredStat; const serviceTime = ran.service_time; expect(typeof created.counter).to.eql('number'); @@ -99,5 +73,31 @@ export default function ({ getService }: FtrProviderContext) { expect(typeof serviceTime.adjusted).to.eql('number'); expect(typeof serviceTime.task_counter).to.eql('number'); }); + + it('should include load stat', async () => { + const { + value: { load }, + } = (await getBackgroundTaskUtilization(true)) + .stats as MonitoredStat; + expect(typeof load).to.eql('number'); + }); + + it('should return expected fields for internal route', async () => { + const monitoredStat = (await getBackgroundTaskUtilization(true)).stats; + expect(monitoredStat?.timestamp).not.to.be(undefined); + expect(monitoredStat?.value).not.to.be(undefined); + expect(monitoredStat?.value?.adhoc).not.to.be(undefined); + expect(monitoredStat?.value?.recurring).not.to.be(undefined); + expect(monitoredStat?.value?.load).not.to.be(undefined); + }); + + it('should return expected fields for public route', async () => { + const monitoredStat = (await getBackgroundTaskUtilization(false)).stats; + expect(monitoredStat?.timestamp).not.to.be(undefined); + expect(monitoredStat?.value).not.to.be(undefined); + expect(monitoredStat?.value?.adhoc).to.be(undefined); + expect(monitoredStat?.value?.recurring).to.be(undefined); + expect(monitoredStat?.value?.load).not.to.be(undefined); + }); }); }