Skip to content

Commit

Permalink
Using configurable running average window size
Browse files Browse the repository at this point in the history
  • Loading branch information
ymao1 committed Apr 25, 2023
1 parent b14c81f commit 1c9971f
Show file tree
Hide file tree
Showing 11 changed files with 78 additions and 8 deletions.
3 changes: 3 additions & 0 deletions x-pack/plugins/task_manager/server/config.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ describe('config validation', () => {
"exclude_task_types": Array [],
},
"version_conflict_threshold": 80,
"worker_utilization_running_average_window": 5,
}
`);
});
Expand Down Expand Up @@ -95,6 +96,7 @@ describe('config validation', () => {
"exclude_task_types": Array [],
},
"version_conflict_threshold": 80,
"worker_utilization_running_average_window": 5,
}
`);
});
Expand Down Expand Up @@ -149,6 +151,7 @@ describe('config validation', () => {
"exclude_task_types": Array [],
},
"version_conflict_threshold": 80,
"worker_utilization_running_average_window": 5,
}
`);
});
Expand Down
8 changes: 8 additions & 0 deletions x-pack/plugins/task_manager/server/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ export const DEFAULT_MONITORING_REFRESH_RATE = 60 * 1000;
export const DEFAULT_MONITORING_STATS_RUNNING_AVERAGE_WINDOW = 50;
export const DEFAULT_MONITORING_STATS_WARN_DELAYED_TASK_START_IN_SECONDS = 60;

// At the default poll interval of 3sec, this averages over the last 15sec.
export const DEFAULT_WORKER_UTILIZATION_RUNNING_AVERAGE_WINDOW = 5;

export const taskExecutionFailureThresholdSchema = schema.object(
{
error_threshold: schema.number({
Expand Down Expand Up @@ -130,6 +133,11 @@ export const configSchema = schema.object(
}),
}),
event_loop_delay: eventLoopDelaySchema,
worker_utilization_running_average_window: schema.number({
defaultValue: DEFAULT_WORKER_UTILIZATION_RUNNING_AVERAGE_WINDOW,
max: 100,
min: 1,
}),
/* These are not designed to be used by most users. Please use caution when changing these */
unsafe: schema.object({
exclude_task_types: schema.arrayOf(schema.string(), { defaultValue: [] }),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ describe('EphemeralTaskLifecycle', () => {
monitor: true,
warn_threshold: 5000,
},
worker_utilization_running_average_window: 5,
...config,
},
elasticsearchAndSOAvailability$,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ describe('managed configuration', () => {
monitor: true,
warn_threshold: 5000,
},
worker_utilization_running_average_window: 5,
});
logger = context.logger.get('taskManager');

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,57 @@ describe('Task Run Statistics', () => {
}
});
});

test('returns a running count of load with custom window size', async () => {
const loads = [40, 80, 100, 100, 10, 10, 60, 40];
const events$ = new Subject<TaskLifecycleEvent>();
const taskPollingLifecycle = taskPollingLifecycleMock.create({
events$: events$ as Observable<TaskLifecycleEvent>,
});

const BackgroundTaskUtilizationAggregator = createBackgroundTaskUtilizationAggregator(
taskPollingLifecycle,
new AdHocTaskCounter(),
pollInterval,
3
);

function expectWindowEqualsUpdate(
taskStat: AggregatedStat<BackgroundTaskUtilizationStat>,
window: number[]
) {
expect(taskStat.value.load).toEqual(mean(window));
}

return new Promise<void>((resolve) => {
BackgroundTaskUtilizationAggregator.pipe(
// skip initial stat which is just initialized data which
// ensures we don't stall on combineLatest
skip(1),
map(({ key, value }: AggregatedStat<BackgroundTaskUtilizationStat>) => ({
key,
value,
})),
take(loads.length),
bufferCount(loads.length)
).subscribe((taskStats: Array<AggregatedStat<BackgroundTaskUtilizationStat>>) => {
expectWindowEqualsUpdate(taskStats[0], loads.slice(0, 1));
expectWindowEqualsUpdate(taskStats[1], loads.slice(0, 2));
expectWindowEqualsUpdate(taskStats[2], loads.slice(0, 3));
// from the 4th value, begin to drop old values as our window is 3
expectWindowEqualsUpdate(taskStats[3], loads.slice(1, 4));
expectWindowEqualsUpdate(taskStats[4], loads.slice(2, 5));
expectWindowEqualsUpdate(taskStats[5], loads.slice(3, 6));
expectWindowEqualsUpdate(taskStats[6], loads.slice(4, 7));
expectWindowEqualsUpdate(taskStats[7], loads.slice(5, 8));
resolve();
});

for (const load of loads) {
events$.next(mockTaskStatEvent('workerUtilization', load));
}
});
});
});

describe('summarizeUtilizationStats', () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ import {
import { MonitoredStat } from './monitoring_stats_stream';
import { AggregatedStat, AggregatedStatProvider } from './runtime_statistics_aggregator';
import { createRunningAveragedStat } from './task_run_calcultors';
import { DEFAULT_WORKER_UTILIZATION_RUNNING_AVERAGE_WINDOW } from '../config';

const BACKGROUND_UTILIZATION_LOAD_RUNNING_AVERAGE_WINDOW_SIZE = 5; // Average over 5 polling cycles which is 15 seconds with the default polling interval
export interface PublicBackgroundTaskUtilizationStat extends JsonObject {
load: number;
}
Expand Down Expand Up @@ -52,7 +52,8 @@ interface AdhocTaskStat extends TaskStat {
export function createBackgroundTaskUtilizationAggregator(
taskPollingLifecycle: TaskPollingLifecycle,
adHocTaskCounter: AdHocTaskCounter,
pollInterval: number
pollInterval: number,
workerUtilizationRunningAverageWindowSize: number = DEFAULT_WORKER_UTILIZATION_RUNNING_AVERAGE_WINDOW
): AggregatedStatProvider<BackgroundTaskUtilizationStat> {
const taskRunEventToAdhocStat = createTaskRunEventToAdhocStat();
const taskRunAdhocEvents$: Observable<Pick<BackgroundTaskUtilizationStat, 'adhoc'>> =
Expand Down Expand Up @@ -82,7 +83,9 @@ export function createBackgroundTaskUtilizationAggregator(
})
);

const taskManagerUtilizationEventToLoadStat = createTaskRunEventToLoadStat();
const taskManagerUtilizationEventToLoadStat = createTaskRunEventToLoadStat(
workerUtilizationRunningAverageWindowSize
);
const taskManagerWorkerUtilizationEvent$: Observable<
Pick<BackgroundTaskUtilizationStat, 'load'>
> = taskPollingLifecycle.events.pipe(
Expand Down Expand Up @@ -239,10 +242,8 @@ function createTaskRunEventToRecurringStat() {
};
}

function createTaskRunEventToLoadStat() {
const loadQueue = createRunningAveragedStat<number>(
BACKGROUND_UTILIZATION_LOAD_RUNNING_AVERAGE_WINDOW_SIZE
);
function createTaskRunEventToLoadStat(workerUtilizationRunningAverageWindowSize: number) {
const loadQueue = createRunningAveragedStat<number>(workerUtilizationRunningAverageWindowSize);
return (load: number): Pick<BackgroundTaskUtilizationStat, 'load'> => {
const historicalLoad = loadQueue(load);
return {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ describe('Configuration Statistics Aggregator', () => {
monitor: true,
warn_threshold: 5000,
},
worker_utilization_running_average_window: 5,
};

const managedConfig = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ describe('createMonitoringStatsStream', () => {
monitor: true,
warn_threshold: 5000,
},
worker_utilization_running_average_window: 5,
};

it('returns the initial config used to configure Task Manager', async () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,8 @@ export function createAggregators(
createBackgroundTaskUtilizationAggregator(
taskPollingLifecycle,
adHocTaskCounter,
config.poll_interval
config.poll_interval,
config.worker_utilization_running_average_window
)
);
}
Expand Down
1 change: 1 addition & 0 deletions x-pack/plugins/task_manager/server/plugin.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ const pluginInitializerContextParams = {
monitor: true,
warn_threshold: 5000,
},
worker_utilization_running_average_window: 5,
};

describe('TaskManagerPlugin', () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ describe('TaskPollingLifecycle', () => {
monitor: true,
warn_threshold: 5000,
},
worker_utilization_running_average_window: 5,
},
taskStore: mockTaskStore,
logger: taskManagerLogger,
Expand Down

0 comments on commit 1c9971f

Please sign in to comment.