diff --git a/x-pack/plugins/task_manager/server/config.test.ts b/x-pack/plugins/task_manager/server/config.test.ts index 54e9562ec62fb..9e1e89e68624b 100644 --- a/x-pack/plugins/task_manager/server/config.test.ts +++ b/x-pack/plugins/task_manager/server/config.test.ts @@ -44,6 +44,7 @@ describe('config validation', () => { "exclude_task_types": Array [], }, "version_conflict_threshold": 80, + "worker_utilization_running_average_window": 5, } `); }); @@ -95,6 +96,7 @@ describe('config validation', () => { "exclude_task_types": Array [], }, "version_conflict_threshold": 80, + "worker_utilization_running_average_window": 5, } `); }); @@ -149,6 +151,7 @@ describe('config validation', () => { "exclude_task_types": Array [], }, "version_conflict_threshold": 80, + "worker_utilization_running_average_window": 5, } `); }); diff --git a/x-pack/plugins/task_manager/server/config.ts b/x-pack/plugins/task_manager/server/config.ts index b876e414a9666..ae6c7a9fb41eb 100644 --- a/x-pack/plugins/task_manager/server/config.ts +++ b/x-pack/plugins/task_manager/server/config.ts @@ -21,6 +21,9 @@ export const DEFAULT_MONITORING_REFRESH_RATE = 60 * 1000; export const DEFAULT_MONITORING_STATS_RUNNING_AVERAGE_WINDOW = 50; export const DEFAULT_MONITORING_STATS_WARN_DELAYED_TASK_START_IN_SECONDS = 60; +// At the default poll interval of 3sec, this averages over the last 15sec. +export const DEFAULT_WORKER_UTILIZATION_RUNNING_AVERAGE_WINDOW = 5; + export const taskExecutionFailureThresholdSchema = schema.object( { error_threshold: schema.number({ @@ -130,6 +133,11 @@ export const configSchema = schema.object( }), }), event_loop_delay: eventLoopDelaySchema, + worker_utilization_running_average_window: schema.number({ + defaultValue: DEFAULT_WORKER_UTILIZATION_RUNNING_AVERAGE_WINDOW, + max: 100, + min: 1, + }), /* These are not designed to be used by most users. Please use caution when changing these */ unsafe: schema.object({ exclude_task_types: schema.arrayOf(schema.string(), { defaultValue: [] }), diff --git a/x-pack/plugins/task_manager/server/ephemeral_task_lifecycle.test.ts b/x-pack/plugins/task_manager/server/ephemeral_task_lifecycle.test.ts index 374a52ea84b2f..c2c0f6e3aceed 100644 --- a/x-pack/plugins/task_manager/server/ephemeral_task_lifecycle.test.ts +++ b/x-pack/plugins/task_manager/server/ephemeral_task_lifecycle.test.ts @@ -77,6 +77,7 @@ describe('EphemeralTaskLifecycle', () => { monitor: true, warn_threshold: 5000, }, + worker_utilization_running_average_window: 5, ...config, }, elasticsearchAndSOAvailability$, diff --git a/x-pack/plugins/task_manager/server/integration_tests/managed_configuration.test.ts b/x-pack/plugins/task_manager/server/integration_tests/managed_configuration.test.ts index 308aa9f556797..db8981a186ab4 100644 --- a/x-pack/plugins/task_manager/server/integration_tests/managed_configuration.test.ts +++ b/x-pack/plugins/task_manager/server/integration_tests/managed_configuration.test.ts @@ -71,6 +71,7 @@ describe('managed configuration', () => { monitor: true, warn_threshold: 5000, }, + worker_utilization_running_average_window: 5, }); logger = context.logger.get('taskManager'); diff --git a/x-pack/plugins/task_manager/server/monitoring/background_task_utilization_statistics.test.ts b/x-pack/plugins/task_manager/server/monitoring/background_task_utilization_statistics.test.ts index eda8f984b906c..cdd67a07ff9e7 100644 --- a/x-pack/plugins/task_manager/server/monitoring/background_task_utilization_statistics.test.ts +++ b/x-pack/plugins/task_manager/server/monitoring/background_task_utilization_statistics.test.ts @@ -473,6 +473,57 @@ describe('Task Run Statistics', () => { } }); }); + + test('returns a running count of load with custom window size', async () => { + const loads = [40, 80, 100, 100, 10, 10, 60, 40]; + const events$ = new Subject(); + const taskPollingLifecycle = taskPollingLifecycleMock.create({ + events$: events$ as Observable, + }); + + const BackgroundTaskUtilizationAggregator = createBackgroundTaskUtilizationAggregator( + taskPollingLifecycle, + new AdHocTaskCounter(), + pollInterval, + 3 + ); + + function expectWindowEqualsUpdate( + taskStat: AggregatedStat, + window: number[] + ) { + expect(taskStat.value.load).toEqual(mean(window)); + } + + return new Promise((resolve) => { + BackgroundTaskUtilizationAggregator.pipe( + // skip initial stat which is just initialized data which + // ensures we don't stall on combineLatest + skip(1), + map(({ key, value }: AggregatedStat) => ({ + key, + value, + })), + take(loads.length), + bufferCount(loads.length) + ).subscribe((taskStats: Array>) => { + expectWindowEqualsUpdate(taskStats[0], loads.slice(0, 1)); + expectWindowEqualsUpdate(taskStats[1], loads.slice(0, 2)); + expectWindowEqualsUpdate(taskStats[2], loads.slice(0, 3)); + // from the 4th value, begin to drop old values as our window is 3 + expectWindowEqualsUpdate(taskStats[3], loads.slice(1, 4)); + expectWindowEqualsUpdate(taskStats[4], loads.slice(2, 5)); + expectWindowEqualsUpdate(taskStats[5], loads.slice(3, 6)); + expectWindowEqualsUpdate(taskStats[6], loads.slice(4, 7)); + expectWindowEqualsUpdate(taskStats[7], loads.slice(5, 8)); + resolve(); + }); + + for (const load of loads) { + events$.next(mockTaskStatEvent('workerUtilization', load)); + } + }); + }); }); describe('summarizeUtilizationStats', () => { diff --git a/x-pack/plugins/task_manager/server/monitoring/background_task_utilization_statistics.ts b/x-pack/plugins/task_manager/server/monitoring/background_task_utilization_statistics.ts index 15dfa6ef5a3d9..c8de438abf7ca 100644 --- a/x-pack/plugins/task_manager/server/monitoring/background_task_utilization_statistics.ts +++ b/x-pack/plugins/task_manager/server/monitoring/background_task_utilization_statistics.ts @@ -22,8 +22,8 @@ import { import { MonitoredStat } from './monitoring_stats_stream'; import { AggregatedStat, AggregatedStatProvider } from './runtime_statistics_aggregator'; import { createRunningAveragedStat } from './task_run_calcultors'; +import { DEFAULT_WORKER_UTILIZATION_RUNNING_AVERAGE_WINDOW } from '../config'; -const BACKGROUND_UTILIZATION_LOAD_RUNNING_AVERAGE_WINDOW_SIZE = 5; // Average over 5 polling cycles which is 15 seconds with the default polling interval export interface PublicBackgroundTaskUtilizationStat extends JsonObject { load: number; } @@ -52,7 +52,8 @@ interface AdhocTaskStat extends TaskStat { export function createBackgroundTaskUtilizationAggregator( taskPollingLifecycle: TaskPollingLifecycle, adHocTaskCounter: AdHocTaskCounter, - pollInterval: number + pollInterval: number, + workerUtilizationRunningAverageWindowSize: number = DEFAULT_WORKER_UTILIZATION_RUNNING_AVERAGE_WINDOW ): AggregatedStatProvider { const taskRunEventToAdhocStat = createTaskRunEventToAdhocStat(); const taskRunAdhocEvents$: Observable> = @@ -82,7 +83,9 @@ export function createBackgroundTaskUtilizationAggregator( }) ); - const taskManagerUtilizationEventToLoadStat = createTaskRunEventToLoadStat(); + const taskManagerUtilizationEventToLoadStat = createTaskRunEventToLoadStat( + workerUtilizationRunningAverageWindowSize + ); const taskManagerWorkerUtilizationEvent$: Observable< Pick > = taskPollingLifecycle.events.pipe( @@ -239,10 +242,8 @@ function createTaskRunEventToRecurringStat() { }; } -function createTaskRunEventToLoadStat() { - const loadQueue = createRunningAveragedStat( - BACKGROUND_UTILIZATION_LOAD_RUNNING_AVERAGE_WINDOW_SIZE - ); +function createTaskRunEventToLoadStat(workerUtilizationRunningAverageWindowSize: number) { + const loadQueue = createRunningAveragedStat(workerUtilizationRunningAverageWindowSize); return (load: number): Pick => { const historicalLoad = loadQueue(load); return { diff --git a/x-pack/plugins/task_manager/server/monitoring/configuration_statistics.test.ts b/x-pack/plugins/task_manager/server/monitoring/configuration_statistics.test.ts index 764b7aa15335b..b63c52893c5d8 100644 --- a/x-pack/plugins/task_manager/server/monitoring/configuration_statistics.test.ts +++ b/x-pack/plugins/task_manager/server/monitoring/configuration_statistics.test.ts @@ -45,6 +45,7 @@ describe('Configuration Statistics Aggregator', () => { monitor: true, warn_threshold: 5000, }, + worker_utilization_running_average_window: 5, }; const managedConfig = { diff --git a/x-pack/plugins/task_manager/server/monitoring/monitoring_stats_stream.test.ts b/x-pack/plugins/task_manager/server/monitoring/monitoring_stats_stream.test.ts index 610e0bf080b05..8d6df288e702d 100644 --- a/x-pack/plugins/task_manager/server/monitoring/monitoring_stats_stream.test.ts +++ b/x-pack/plugins/task_manager/server/monitoring/monitoring_stats_stream.test.ts @@ -49,6 +49,7 @@ describe('createMonitoringStatsStream', () => { monitor: true, warn_threshold: 5000, }, + worker_utilization_running_average_window: 5, }; it('returns the initial config used to configure Task Manager', async () => { diff --git a/x-pack/plugins/task_manager/server/monitoring/monitoring_stats_stream.ts b/x-pack/plugins/task_manager/server/monitoring/monitoring_stats_stream.ts index 0a52d71bdee4a..e1ff38d1c9607 100644 --- a/x-pack/plugins/task_manager/server/monitoring/monitoring_stats_stream.ts +++ b/x-pack/plugins/task_manager/server/monitoring/monitoring_stats_stream.ts @@ -109,7 +109,8 @@ export function createAggregators( createBackgroundTaskUtilizationAggregator( taskPollingLifecycle, adHocTaskCounter, - config.poll_interval + config.poll_interval, + config.worker_utilization_running_average_window ) ); } diff --git a/x-pack/plugins/task_manager/server/plugin.test.ts b/x-pack/plugins/task_manager/server/plugin.test.ts index ad02e6d7490f5..fce648c87f6e9 100644 --- a/x-pack/plugins/task_manager/server/plugin.test.ts +++ b/x-pack/plugins/task_manager/server/plugin.test.ts @@ -70,6 +70,7 @@ const pluginInitializerContextParams = { monitor: true, warn_threshold: 5000, }, + worker_utilization_running_average_window: 5, }; describe('TaskManagerPlugin', () => { diff --git a/x-pack/plugins/task_manager/server/polling_lifecycle.test.ts b/x-pack/plugins/task_manager/server/polling_lifecycle.test.ts index 51cac494d3a67..9e62eec402b2d 100644 --- a/x-pack/plugins/task_manager/server/polling_lifecycle.test.ts +++ b/x-pack/plugins/task_manager/server/polling_lifecycle.test.ts @@ -75,6 +75,7 @@ describe('TaskPollingLifecycle', () => { monitor: true, warn_threshold: 5000, }, + worker_utilization_running_average_window: 5, }, taskStore: mockTaskStore, logger: taskManagerLogger,