From 2759c2ec38208272fc41a69c0764c50571bc25cb Mon Sep 17 00:00:00 2001 From: Vitalii Dmyterko Date: Fri, 20 May 2022 16:55:48 +0100 Subject: [PATCH 01/23] init taskManager bulk --- .../alerting/server/routes/bulk_edit_rules.ts | 9 ++++ .../server/rules_client/rules_client.ts | 33 +++++++++++- x-pack/plugins/task_manager/server/plugin.ts | 3 +- .../task_manager/server/task_scheduling.ts | 54 +++++++++++++++++-- 4 files changed, 94 insertions(+), 5 deletions(-) diff --git a/x-pack/plugins/alerting/server/routes/bulk_edit_rules.ts b/x-pack/plugins/alerting/server/routes/bulk_edit_rules.ts index 6588a46e1d914..4925ee12b2bd0 100644 --- a/x-pack/plugins/alerting/server/routes/bulk_edit_rules.ts +++ b/x-pack/plugins/alerting/server/routes/bulk_edit_rules.ts @@ -12,6 +12,10 @@ import { ILicenseState, RuleTypeDisabledError } from '../lib'; import { verifyAccessAndContext, rewriteRule, handleDisabledApiKeysError } from './lib'; import { AlertingRequestHandlerContext, INTERNAL_BASE_ALERTING_API_PATH } from '../types'; +const scheduleSchema = schema.object({ + interval: schema.string(), +}); + const ruleActionSchema = schema.object({ group: schema.string(), id: schema.string(), @@ -34,6 +38,11 @@ const operationsSchema = schema.arrayOf( field: schema.literal('actions'), value: schema.arrayOf(ruleActionSchema), }), + schema.object({ + operation: schema.literal('set'), + field: schema.literal('schedule'), + value: scheduleSchema, + }), ]), { minSize: 1 } ); diff --git a/x-pack/plugins/alerting/server/rules_client/rules_client.ts b/x-pack/plugins/alerting/server/rules_client/rules_client.ts index 4e248412eae15..10040d3659691 100644 --- a/x-pack/plugins/alerting/server/rules_client/rules_client.ts +++ b/x-pack/plugins/alerting/server/rules_client/rules_client.ts @@ -211,7 +211,7 @@ export interface FindOptions extends IndexType { filter?: string; } -export type BulkEditFields = keyof Pick; +export type BulkEditFields = keyof Pick; export type BulkEditOperation = | { @@ -223,6 +223,11 @@ export type BulkEditOperation = operation: 'add' | 'set'; field: Extract; value: NormalizedAlertAction[]; + } + | { + operation: 'set'; + field: Extract; + value: Rule['schedule']; }; // schedule, throttle, notifyWhen is commented out before https://github.com/elastic/kibana/issues/124850 will be implemented @@ -1494,6 +1499,32 @@ export class RulesClient { ); }); + // update schedules only if schedule operation is present + const scheduleOperation = options.operations.find( + (op): op is Extract }> => + op.field === 'schedule' + ); + + if (scheduleOperation?.value) { + const taskIds = updatedRules.reduce((acc, rule) => { + if (rule.scheduledTaskId) { + acc.push(rule.scheduledTaskId); + } + return acc; + }, []); + + try { + await this.taskManager.bulkUpdateSchedules(taskIds, scheduleOperation.value); + } catch (error) { + this.auditLogger?.log( + ruleAuditEvent({ + action: RuleAuditAction.BULK_EDIT, + error, + }) + ); + } + } + return { rules: updatedRules, errors, total }; } diff --git a/x-pack/plugins/task_manager/server/plugin.ts b/x-pack/plugins/task_manager/server/plugin.ts index 3694f01227178..0a7e25230b6d5 100644 --- a/x-pack/plugins/task_manager/server/plugin.ts +++ b/x-pack/plugins/task_manager/server/plugin.ts @@ -48,7 +48,7 @@ export interface TaskManagerSetupContract { export type TaskManagerStartContract = Pick< TaskScheduling, - 'schedule' | 'runNow' | 'ephemeralRunNow' | 'ensureScheduled' + 'schedule' | 'runNow' | 'ephemeralRunNow' | 'ensureScheduled' | 'bulkUpdateSchedules' > & Pick & { removeIfExists: TaskStore['remove']; @@ -238,6 +238,7 @@ export class TaskManagerPlugin schedule: (...args) => taskScheduling.schedule(...args), ensureScheduled: (...args) => taskScheduling.ensureScheduled(...args), runNow: (...args) => taskScheduling.runNow(...args), + bulkUpdateSchedules: (...args) => taskScheduling.bulkUpdateSchedules(...args), ephemeralRunNow: (task: EphemeralTask) => taskScheduling.ephemeralRunNow(task), supportsEphemeralTasks: () => this.config.ephemeral_tasks.enabled, }; diff --git a/x-pack/plugins/task_manager/server/task_scheduling.ts b/x-pack/plugins/task_manager/server/task_scheduling.ts index a38e2d23fccec..a1653012b7adb 100644 --- a/x-pack/plugins/task_manager/server/task_scheduling.ts +++ b/x-pack/plugins/task_manager/server/task_scheduling.ts @@ -6,15 +6,16 @@ */ import { filter, take } from 'rxjs/operators'; - +import pMap from 'p-map'; import { pipe } from 'fp-ts/lib/pipeable'; import { Option, map as mapOptional, getOrElse, isSome } from 'fp-ts/lib/Option'; import uuid from 'uuid'; -import { pick } from 'lodash'; +import { pick, chunk } from 'lodash'; import { merge, Subject } from 'rxjs'; import agent from 'elastic-apm-node'; import { Logger } from '@kbn/core/server'; +import { mustBeAllOf } from './queries/query_clauses'; import { asOk, either, map, mapErr, promiseResult, isErr } from './lib/result_type'; import { isTaskRunEvent, @@ -28,6 +29,7 @@ import { TaskClaimErrorType, } from './task_events'; import { Middleware } from './lib/middleware'; +import { parseIntervalAsMillisecond } from './lib/intervals'; import { ConcreteTaskInstance, TaskInstanceWithId, @@ -36,8 +38,9 @@ import { TaskLifecycleResult, TaskStatus, EphemeralTask, + IntervalSchedule, } from './task'; -import { TaskStore } from './task_store'; +import { TaskStore, BulkUpdateResult } from './task_store'; import { ensureDeprecatedFieldsAreCorrected } from './lib/correct_deprecated_fields'; import { TaskLifecycleEvent, TaskPollingLifecycle } from './polling_lifecycle'; import { TaskTypeDictionary } from './task_type_dictionary'; @@ -111,6 +114,51 @@ export class TaskScheduling { }); } + /** + * Bulk updates schedules for tasks by ids. + * + * @param taskIs - list of task ids + * @param schedule - new schedule + * @returns {Promise} + */ + public async bulkUpdateSchedules( + taskIds: string[], + schedule: IntervalSchedule + ): Promise { + const tasks = await pMap(chunk(taskIds, 100), async (taskIdsChunk) => + this.store.fetch({ + query: mustBeAllOf( + { + terms: { + _id: taskIdsChunk.map((taskId) => `task:${taskId}`), + }, + }, + { + term: { + 'task.status': 'idle', + }, + } + ), + size: 100, + }) + ); + + const updatedTasks = tasks + .flatMap(({ docs }) => docs) + .map((task) => { + const oldInterval = parseIntervalAsMillisecond(task.schedule?.interval ?? '0s'); + + const newRunAtInMs = Math.max( + Date.now(), + task.runAt.getTime() - oldInterval + parseIntervalAsMillisecond(schedule.interval) + ); + + return { ...task, schedule, runAt: new Date(newRunAtInMs) }; + }); + + return this.store.bulkUpdate(updatedTasks); + } + /** * Run task. * From d24c2f30b4b7f57b1b2af70519111e9495018cf0 Mon Sep 17 00:00:00 2001 From: Vitalii Dmyterko Date: Mon, 23 May 2022 12:48:21 +0100 Subject: [PATCH 02/23] updates and fixes --- x-pack/plugins/task_manager/server/mocks.ts | 1 + .../task_manager/server/task_scheduling.ts | 37 ++++++++++--------- .../group1/tests/alerting/bulk_edit.ts | 2 +- 3 files changed, 22 insertions(+), 18 deletions(-) diff --git a/x-pack/plugins/task_manager/server/mocks.ts b/x-pack/plugins/task_manager/server/mocks.ts index 2db8cdd6268c7..2870111ebafef 100644 --- a/x-pack/plugins/task_manager/server/mocks.ts +++ b/x-pack/plugins/task_manager/server/mocks.ts @@ -27,6 +27,7 @@ const createStartMock = () => { ensureScheduled: jest.fn(), removeIfExists: jest.fn(), supportsEphemeralTasks: jest.fn(), + bulkUpdateSchedules: jest.fn(), }; return mock; }; diff --git a/x-pack/plugins/task_manager/server/task_scheduling.ts b/x-pack/plugins/task_manager/server/task_scheduling.ts index a1653012b7adb..2e27ff723912f 100644 --- a/x-pack/plugins/task_manager/server/task_scheduling.ts +++ b/x-pack/plugins/task_manager/server/task_scheduling.ts @@ -125,32 +125,35 @@ export class TaskScheduling { taskIds: string[], schedule: IntervalSchedule ): Promise { - const tasks = await pMap(chunk(taskIds, 100), async (taskIdsChunk) => - this.store.fetch({ - query: mustBeAllOf( - { - terms: { - _id: taskIdsChunk.map((taskId) => `task:${taskId}`), - }, - }, - { - term: { - 'task.status': 'idle', + const tasks = await pMap( + chunk(taskIds, 100), + async (taskIdsChunk) => + this.store.fetch({ + query: mustBeAllOf( + { + terms: { + _id: taskIdsChunk.map((taskId) => `task:${taskId}`), + }, }, - } - ), - size: 100, - }) + { + term: { + 'task.status': 'idle', + }, + } + ), + size: 100, + }), + { concurrency: 10 } ); const updatedTasks = tasks .flatMap(({ docs }) => docs) .map((task) => { - const oldInterval = parseIntervalAsMillisecond(task.schedule?.interval ?? '0s'); + const oldIntervalInMs = parseIntervalAsMillisecond(task.schedule?.interval ?? '0s'); const newRunAtInMs = Math.max( Date.now(), - task.runAt.getTime() - oldInterval + parseIntervalAsMillisecond(schedule.interval) + task.runAt.getTime() - oldIntervalInMs + parseIntervalAsMillisecond(schedule.interval) ); return { ...task, schedule, runAt: new Date(newRunAtInMs) }; diff --git a/x-pack/test/alerting_api_integration/security_and_spaces/group1/tests/alerting/bulk_edit.ts b/x-pack/test/alerting_api_integration/security_and_spaces/group1/tests/alerting/bulk_edit.ts index 6eeafe8472499..89c14d55ce41c 100644 --- a/x-pack/test/alerting_api_integration/security_and_spaces/group1/tests/alerting/bulk_edit.ts +++ b/x-pack/test/alerting_api_integration/security_and_spaces/group1/tests/alerting/bulk_edit.ts @@ -437,7 +437,7 @@ export default function createUpdateTests({ getService }: FtrProviderContext) { statusCode: 400, error: 'Bad Request', message: - '[request body.operations.0]: types that failed validation:\n- [request body.operations.0.0.operation]: types that failed validation:\n - [request body.operations.0.operation.0]: expected value to equal [add]\n - [request body.operations.0.operation.1]: expected value to equal [delete]\n - [request body.operations.0.operation.2]: expected value to equal [set]\n- [request body.operations.0.1.operation]: types that failed validation:\n - [request body.operations.0.operation.0]: expected value to equal [add]\n - [request body.operations.0.operation.1]: expected value to equal [set]', + '[request body.operations.0]: types that failed validation:\n- [request body.operations.0.0.operation]: types that failed validation:\n - [request body.operations.0.operation.0]: expected value to equal [add]\n - [request body.operations.0.operation.1]: expected value to equal [delete]\n - [request body.operations.0.operation.2]: expected value to equal [set]\n- [request body.operations.0.1.operation]: types that failed validation:\n - [request body.operations.0.operation.0]: expected value to equal [add]\n - [request body.operations.0.operation.1]: expected value to equal [set]\n- [request body.operations.0.2.operation]: expected value to equal [set]', }); expect(response.statusCode).to.eql(400); break; From 6efc5fb9a8383e3ee2725319822032aa8237276d Mon Sep 17 00:00:00 2001 From: Vitalii Dmyterko Date: Mon, 23 May 2022 14:28:23 +0100 Subject: [PATCH 03/23] fix the rest of tests --- .../security_and_spaces/group1/tests/alerting/bulk_edit.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/x-pack/test/alerting_api_integration/security_and_spaces/group1/tests/alerting/bulk_edit.ts b/x-pack/test/alerting_api_integration/security_and_spaces/group1/tests/alerting/bulk_edit.ts index 89c14d55ce41c..35ea847f70ef4 100644 --- a/x-pack/test/alerting_api_integration/security_and_spaces/group1/tests/alerting/bulk_edit.ts +++ b/x-pack/test/alerting_api_integration/security_and_spaces/group1/tests/alerting/bulk_edit.ts @@ -482,7 +482,7 @@ export default function createUpdateTests({ getService }: FtrProviderContext) { statusCode: 400, error: 'Bad Request', message: - '[request body.operations.0]: types that failed validation:\n- [request body.operations.0.0.field]: expected value to equal [tags]\n- [request body.operations.0.1.field]: expected value to equal [actions]', + '[request body.operations.0]: types that failed validation:\n- [request body.operations.0.0.field]: expected value to equal [tags]\n- [request body.operations.0.1.field]: expected value to equal [actions]\n- [request body.operations.0.2.operation]: expected value to equal [set]', }); expect(response.statusCode).to.eql(400); break; @@ -520,7 +520,7 @@ export default function createUpdateTests({ getService }: FtrProviderContext) { statusCode: 400, error: 'Bad Request', message: - '[request body.operations.0]: types that failed validation:\n- [request body.operations.0.0.field]: expected value to equal [tags]\n- [request body.operations.0.1.field]: expected value to equal [actions]', + '[request body.operations.0]: types that failed validation:\n- [request body.operations.0.0.field]: expected value to equal [tags]\n- [request body.operations.0.1.field]: expected value to equal [actions]\n- [request body.operations.0.2.operation]: expected value to equal [set]', }); expect(response.statusCode).to.eql(400); break; From 7e1e7c77c379638b30f94f62676649bc577854a1 Mon Sep 17 00:00:00 2001 From: Vitalii Dmyterko Date: Thu, 26 May 2022 17:14:22 +0100 Subject: [PATCH 04/23] add unit tests --- .../server/task_scheduling.test.ts | 94 +++++++++++++++++++ .../task_manager/server/task_scheduling.ts | 4 +- 2 files changed, 96 insertions(+), 2 deletions(-) diff --git a/x-pack/plugins/task_manager/server/task_scheduling.test.ts b/x-pack/plugins/task_manager/server/task_scheduling.test.ts index 6fe368d495ade..db7db2df52ec4 100644 --- a/x-pack/plugins/task_manager/server/task_scheduling.test.ts +++ b/x-pack/plugins/task_manager/server/task_scheduling.test.ts @@ -27,6 +27,9 @@ import { TaskRunResult } from './task_running'; import { mockLogger } from './test_utils'; import { TaskTypeDictionary } from './task_type_dictionary'; import { ephemeralTaskLifecycleMock } from './ephemeral_task_lifecycle.mock'; +import { mustBeAllOf } from './queries/query_clauses'; + +const ONE_HOUR_IN_MS = 60 * 60 * 1000; jest.mock('uuid', () => ({ v4: () => 'v4uuid', @@ -134,6 +137,97 @@ describe('TaskScheduling', () => { }); }); + describe('bulkUpdateSchedules', () => { + const id = '01ddff11-e88a-4d13-bc4e-256164e755e2'; + + beforeEach(() => {}); + + test('should search for tasks by ids and idle status', async () => { + mockTaskStore.fetch.mockResolvedValue({ docs: [] }); + const taskScheduling = new TaskScheduling(taskSchedulingOpts); + + await taskScheduling.bulkUpdateSchedules([id], { interval: '1h' }); + + expect(mockTaskStore.fetch).toHaveBeenCalledTimes(1); + expect(mockTaskStore.fetch).toHaveBeenCalledWith({ + query: mustBeAllOf( + { + terms: { + _id: [`task:${id}`], + }, + }, + { + term: { + 'task.status': 'idle', + }, + } + ), + size: 100, + }); + }); + + test('should split search on chunks when input ids array too large', async () => { + mockTaskStore.fetch.mockResolvedValue({ docs: [] }); + const taskScheduling = new TaskScheduling(taskSchedulingOpts); + + await taskScheduling.bulkUpdateSchedules(Array.from({ length: 1250 }), { interval: '1h' }); + + expect(mockTaskStore.fetch).toHaveBeenCalledTimes(13); + }); + + test('should postpone task run if new interval is greater than previous', async () => { + // task set to be run in one 1hr from now + const runInTwoHrs = new Date(Date.now() + 2 * ONE_HOUR_IN_MS); + const task = mockTask({ id, schedule: { interval: '3h' }, runAt: runInTwoHrs }); + + mockTaskStore.fetch.mockResolvedValue({ docs: [task] }); + + const taskScheduling = new TaskScheduling(taskSchedulingOpts); + await taskScheduling.bulkUpdateSchedules([id], { interval: '5h' }); + + const bulkUpdatePayload = mockTaskStore.bulkUpdate.mock.calls[0][0]; + + expect(bulkUpdatePayload).toHaveLength(1); + expect(bulkUpdatePayload[0]).toHaveProperty('schedule', { interval: '5h' }); + // if we update rule with schedule of '5h' and prev interval was 3h, task will be run in 2 hours later + expect(bulkUpdatePayload[0].runAt.getTime() - runInTwoHrs.getTime()).toBe(2 * ONE_HOUR_IN_MS); + }); + + test('should set task run sooner if new interval is lesser than previous', async () => { + // task set to be run in one 2hrs from now + const runInTwoHrs = new Date(Date.now() + 2 * ONE_HOUR_IN_MS); + const task = mockTask({ id, schedule: { interval: '3h' }, runAt: runInTwoHrs }); + + mockTaskStore.fetch.mockResolvedValue({ docs: [task] }); + + const taskScheduling = new TaskScheduling(taskSchedulingOpts); + await taskScheduling.bulkUpdateSchedules([id], { interval: '2h' }); + + const bulkUpdatePayload = mockTaskStore.bulkUpdate.mock.calls[0][0]; + + expect(bulkUpdatePayload[0]).toHaveProperty('schedule', { interval: '2h' }); + // if we update rule with schedule of '2h' and prev interval was 3h, task will be run in 1 hour sooner + expect(runInTwoHrs.getTime() - bulkUpdatePayload[0].runAt.getTime()).toBe(ONE_HOUR_IN_MS); + }); + + test('should set task run to now if time that passed from last run is greater than new interval', async () => { + // task set to be run in one 1hr from now + const runInOneHr = new Date(Date.now() + ONE_HOUR_IN_MS); + const task = mockTask({ id, schedule: { interval: '2h' }, runAt: runInOneHr }); + + mockTaskStore.fetch.mockResolvedValue({ docs: [task] }); + + const taskScheduling = new TaskScheduling(taskSchedulingOpts); + await taskScheduling.bulkUpdateSchedules([id], { interval: '30m' }); + + const bulkUpdatePayload = mockTaskStore.bulkUpdate.mock.calls[0][0]; + + expect(bulkUpdatePayload[0]).toHaveProperty('schedule', { interval: '30m' }); + + // if time that passed from last rule task is greater than new interval, task should be set to run at now time + expect(bulkUpdatePayload[0].runAt.getTime()).toBeLessThanOrEqual(Date.now()); + }); + }); describe('runNow', () => { test('resolves when the task run succeeds', () => { const events$ = new Subject(); diff --git a/x-pack/plugins/task_manager/server/task_scheduling.ts b/x-pack/plugins/task_manager/server/task_scheduling.ts index 2e27ff723912f..98ec77aeb6fb6 100644 --- a/x-pack/plugins/task_manager/server/task_scheduling.ts +++ b/x-pack/plugins/task_manager/server/task_scheduling.ts @@ -117,8 +117,8 @@ export class TaskScheduling { /** * Bulk updates schedules for tasks by ids. * - * @param taskIs - list of task ids - * @param schedule - new schedule + * @param taskIss string[] - list of task ids + * @param schedule IntervalSchedule - new schedule * @returns {Promise} */ public async bulkUpdateSchedules( From 815854b08994e3a6233fb5d27e8759b76ad2a523 Mon Sep 17 00:00:00 2001 From: Vitalii Dmyterko Date: Mon, 30 May 2022 20:10:37 +0100 Subject: [PATCH 05/23] tests!!! --- x-pack/plugins/task_manager/server/index.ts | 2 +- .../server/task_scheduling.test.ts | 48 ++++++++++--- .../task_manager/server/task_scheduling.ts | 25 +++++-- .../sample_task_plugin/server/init_routes.ts | 25 +++++++ .../task_manager/task_management.ts | 68 ++++++++++++++++++- 5 files changed, 152 insertions(+), 16 deletions(-) diff --git a/x-pack/plugins/task_manager/server/index.ts b/x-pack/plugins/task_manager/server/index.ts index f6cb3a6e6b1d5..88a27d040b176 100644 --- a/x-pack/plugins/task_manager/server/index.ts +++ b/x-pack/plugins/task_manager/server/index.ts @@ -30,7 +30,7 @@ export { throwUnrecoverableError, isEphemeralTaskRejectedDueToCapacityError, } from './task_running'; -export type { RunNowResult } from './task_scheduling'; +export type { RunNowResult, BulkUpdateSchedulesResult } from './task_scheduling'; export { getOldestIdleActionTask } from './queries/oldest_idle_action_task'; export { IdleTaskWithExpiredRunAt, diff --git a/x-pack/plugins/task_manager/server/task_scheduling.test.ts b/x-pack/plugins/task_manager/server/task_scheduling.test.ts index db7db2df52ec4..fbc271c0ed4ee 100644 --- a/x-pack/plugins/task_manager/server/task_scheduling.test.ts +++ b/x-pack/plugins/task_manager/server/task_scheduling.test.ts @@ -7,6 +7,7 @@ import { Subject } from 'rxjs'; import { none, some } from 'fp-ts/lib/Option'; +import moment from 'moment'; import { asTaskMarkRunningEvent, @@ -29,8 +30,6 @@ import { TaskTypeDictionary } from './task_type_dictionary'; import { ephemeralTaskLifecycleMock } from './ephemeral_task_lifecycle.mock'; import { mustBeAllOf } from './queries/query_clauses'; -const ONE_HOUR_IN_MS = 60 * 60 * 1000; - jest.mock('uuid', () => ({ v4: () => 'v4uuid', })); @@ -139,8 +138,11 @@ describe('TaskScheduling', () => { describe('bulkUpdateSchedules', () => { const id = '01ddff11-e88a-4d13-bc4e-256164e755e2'; - - beforeEach(() => {}); + beforeEach(() => { + mockTaskStore.bulkUpdate.mockImplementation(() => + Promise.resolve([{ tag: 'ok', value: mockTask() }]) + ); + }); test('should search for tasks by ids and idle status', async () => { mockTaskStore.fetch.mockResolvedValue({ docs: [] }); @@ -175,9 +177,31 @@ describe('TaskScheduling', () => { expect(mockTaskStore.fetch).toHaveBeenCalledTimes(13); }); + test('should transform response into correct format', async () => { + const successfulTask = mockTask({ id: 'task-1', schedule: { interval: '1h' } }); + const failedTask = mockTask({ id: 'task-2', schedule: { interval: '1h' } }); + mockTaskStore.bulkUpdate.mockImplementation(() => + Promise.resolve([ + { tag: 'ok', value: successfulTask }, + { tag: 'err', error: { entity: failedTask, error: new Error('fail') } }, + ]) + ); + mockTaskStore.fetch.mockResolvedValue({ docs: [successfulTask, failedTask] }); + + const taskScheduling = new TaskScheduling(taskSchedulingOpts); + const result = await taskScheduling.bulkUpdateSchedules([successfulTask.id, failedTask.id], { + interval: '1h', + }); + + expect(result).toEqual({ + tasks: [successfulTask], + errors: [{ task: failedTask, error: new Error('fail') }], + }); + }); + test('should postpone task run if new interval is greater than previous', async () => { - // task set to be run in one 1hr from now - const runInTwoHrs = new Date(Date.now() + 2 * ONE_HOUR_IN_MS); + // task set to be run in 2 hrs from now + const runInTwoHrs = new Date(Date.now() + moment.duration(2, 'hours').asMilliseconds()); const task = mockTask({ id, schedule: { interval: '3h' }, runAt: runInTwoHrs }); mockTaskStore.fetch.mockResolvedValue({ docs: [task] }); @@ -190,12 +214,14 @@ describe('TaskScheduling', () => { expect(bulkUpdatePayload).toHaveLength(1); expect(bulkUpdatePayload[0]).toHaveProperty('schedule', { interval: '5h' }); // if we update rule with schedule of '5h' and prev interval was 3h, task will be run in 2 hours later - expect(bulkUpdatePayload[0].runAt.getTime() - runInTwoHrs.getTime()).toBe(2 * ONE_HOUR_IN_MS); + expect(bulkUpdatePayload[0].runAt.getTime() - runInTwoHrs.getTime()).toBe( + moment.duration(2, 'hours').asMilliseconds() + ); }); test('should set task run sooner if new interval is lesser than previous', async () => { // task set to be run in one 2hrs from now - const runInTwoHrs = new Date(Date.now() + 2 * ONE_HOUR_IN_MS); + const runInTwoHrs = new Date(Date.now() + moment.duration(2, 'hours').asMilliseconds()); const task = mockTask({ id, schedule: { interval: '3h' }, runAt: runInTwoHrs }); mockTaskStore.fetch.mockResolvedValue({ docs: [task] }); @@ -207,12 +233,14 @@ describe('TaskScheduling', () => { expect(bulkUpdatePayload[0]).toHaveProperty('schedule', { interval: '2h' }); // if we update rule with schedule of '2h' and prev interval was 3h, task will be run in 1 hour sooner - expect(runInTwoHrs.getTime() - bulkUpdatePayload[0].runAt.getTime()).toBe(ONE_HOUR_IN_MS); + expect(runInTwoHrs.getTime() - bulkUpdatePayload[0].runAt.getTime()).toBe( + moment.duration(1, 'hour').asMilliseconds() + ); }); test('should set task run to now if time that passed from last run is greater than new interval', async () => { // task set to be run in one 1hr from now - const runInOneHr = new Date(Date.now() + ONE_HOUR_IN_MS); + const runInOneHr = new Date(Date.now() + moment.duration(1, 'hour').asMilliseconds()); const task = mockTask({ id, schedule: { interval: '2h' }, runAt: runInOneHr }); mockTaskStore.fetch.mockResolvedValue({ docs: [task] }); diff --git a/x-pack/plugins/task_manager/server/task_scheduling.ts b/x-pack/plugins/task_manager/server/task_scheduling.ts index 98ec77aeb6fb6..2241a35a6813b 100644 --- a/x-pack/plugins/task_manager/server/task_scheduling.ts +++ b/x-pack/plugins/task_manager/server/task_scheduling.ts @@ -40,7 +40,7 @@ import { EphemeralTask, IntervalSchedule, } from './task'; -import { TaskStore, BulkUpdateResult } from './task_store'; +import { TaskStore } from './task_store'; import { ensureDeprecatedFieldsAreCorrected } from './lib/correct_deprecated_fields'; import { TaskLifecycleEvent, TaskPollingLifecycle } from './polling_lifecycle'; import { TaskTypeDictionary } from './task_type_dictionary'; @@ -59,6 +59,10 @@ export interface TaskSchedulingOpts { taskManagerId: string; } +export interface BulkUpdateSchedulesResult { + tasks: ConcreteTaskInstance[]; + errors: Array<{ task: ConcreteTaskInstance; error: Error }>; +} export interface RunNowResult { id: ConcreteTaskInstance['id']; state?: ConcreteTaskInstance['state']; @@ -117,14 +121,14 @@ export class TaskScheduling { /** * Bulk updates schedules for tasks by ids. * - * @param taskIss string[] - list of task ids + * @param taskIds string[] - list of task ids * @param schedule IntervalSchedule - new schedule * @returns {Promise} */ public async bulkUpdateSchedules( taskIds: string[], schedule: IntervalSchedule - ): Promise { + ): Promise { const tasks = await pMap( chunk(taskIds, 100), async (taskIdsChunk) => @@ -159,7 +163,20 @@ export class TaskScheduling { return { ...task, schedule, runAt: new Date(newRunAtInMs) }; }); - return this.store.bulkUpdate(updatedTasks); + const result: BulkUpdateSchedulesResult = { + tasks: [], + errors: [], + }; + + (await this.store.bulkUpdate(updatedTasks)).forEach((task) => { + if (task.tag === 'ok') { + result.tasks.push(task.value); + } else { + result.errors.push({ error: task.error.error, task: task.error.entity }); + } + }); + + return result; } /** diff --git a/x-pack/test/plugin_api_integration/plugins/sample_task_plugin/server/init_routes.ts b/x-pack/test/plugin_api_integration/plugins/sample_task_plugin/server/init_routes.ts index 2d98a58d29d7c..539cef69d92eb 100644 --- a/x-pack/test/plugin_api_integration/plugins/sample_task_plugin/server/init_routes.ts +++ b/x-pack/test/plugin_api_integration/plugins/sample_task_plugin/server/init_routes.ts @@ -111,6 +111,31 @@ export function initRoutes( } ); + router.post( + { + path: `/api/sample_tasks/bulk_update_schedules`, + validate: { + body: schema.object({ + taskIds: schema.arrayOf(schema.string()), + schedule: schema.object({ interval: schema.string() }), + }), + }, + }, + async function ( + context: RequestHandlerContext, + req: KibanaRequest, + res: KibanaResponseFactory + ) { + const { taskIds, schedule } = req.body; + try { + const taskManager = await taskManagerStart; + return res.ok({ body: await taskManager.bulkUpdateSchedules(taskIds, schedule) }); + } catch (err) { + return res.ok({ body: { taskIds, error: `${err}` } }); + } + } + ); + router.post( { path: `/api/sample_tasks/ephemeral_run_now`, diff --git a/x-pack/test/plugin_api_integration/test_suites/task_manager/task_management.ts b/x-pack/test/plugin_api_integration/test_suites/task_manager/task_management.ts index 51fb161ac0d6a..d155b76fc0101 100644 --- a/x-pack/test/plugin_api_integration/test_suites/task_manager/task_management.ts +++ b/x-pack/test/plugin_api_integration/test_suites/task_manager/task_management.ts @@ -5,12 +5,13 @@ * 2.0. */ +import moment from 'moment'; import { random, times } from 'lodash'; import expect from '@kbn/expect'; import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey'; import TaskManagerMapping from '@kbn/task-manager-plugin/server/saved_objects/mappings.json'; import { DEFAULT_MAX_WORKERS, DEFAULT_POLL_INTERVAL } from '@kbn/task-manager-plugin/server/config'; -import { ConcreteTaskInstance } from '@kbn/task-manager-plugin/server'; +import { ConcreteTaskInstance, BulkUpdateSchedulesResult } from '@kbn/task-manager-plugin/server'; import { FtrProviderContext } from '../../ftr_provider_context'; const { @@ -177,6 +178,15 @@ export default function ({ getService }: FtrProviderContext) { .then((response) => response.body); } + function bulkUpdateSchedules(taskIds: string[], schedule: { interval: string }) { + return supertest + .post('/api/sample_tasks/bulk_update_schedules') + .set('kbn-xsrf', 'xxx') + .send({ taskIds, schedule }) + .expect(200) + .then((response: { body: BulkUpdateSchedulesResult }) => response.body); + } + // TODO: Add this back in with https://github.com/elastic/kibana/issues/106139 // function runEphemeralTaskNow(task: { // taskType: string; @@ -899,6 +909,62 @@ export default function ({ getService }: FtrProviderContext) { }); }); + it('should bulk updates schedules for multiple tasks', async () => { + const initialTime = Date.now(); + const tasks = await Promise.all([ + scheduleTask({ + taskType: 'sampleTask', + schedule: { interval: '1h' }, + params: {}, + }), + + scheduleTask({ + taskType: 'sampleTask', + schedule: { interval: '5m' }, + params: {}, + }), + ]); + + const taskIds = tasks.map(({ id }) => id); + + await retry.try(async () => { + // ensure each task has ran at least once and been rescheduled for future run + for (const task of tasks) { + const { state } = await currentTask<{ count: number }>(task.id); + expect(state.count).to.be(1); + } + + // first task to be scheduled in 1h + expect(Date.parse((await currentTask(tasks[0].id)).runAt) - initialTime).to.be.greaterThan( + moment.duration(1, 'hour').asMilliseconds() + ); + + // second task to be scheduled in 5m + expect(Date.parse((await currentTask(tasks[1].id)).runAt) - initialTime).to.be.greaterThan( + moment.duration(5, 'minutes').asMilliseconds() + ); + }); + + await retry.try(async () => { + const updates = await bulkUpdateSchedules(taskIds, { interval: '3h' }); + + expect(updates.tasks.length).to.be(2); + expect(updates.errors.length).to.be(0); + }); + + await retry.try(async () => { + const updatedTasks = (await currentTasks()).docs; + + updatedTasks.forEach((task) => { + expect(task.schedule).to.eql({ interval: '3h' }); + // should be scheduled to run in 3 hours + expect(Date.parse(task.runAt) - initialTime).to.be.greaterThan( + moment.duration(3, 'hours').asMilliseconds() + ); + }); + }); + }); + // TODO: Add this back in with https://github.com/elastic/kibana/issues/106139 // it('should return the resulting task state when asked to run an ephemeral task now', async () => { // const ephemeralTask = await runEphemeralTaskNow({ From 79fc692a4588018920b759015e0a4d50f9d9e817 Mon Sep 17 00:00:00 2001 From: Vitalii Dmyterko Date: Mon, 30 May 2022 20:15:42 +0100 Subject: [PATCH 06/23] refactor it --- .../task_manager/server/task_scheduling.ts | 24 +++++++++---------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/x-pack/plugins/task_manager/server/task_scheduling.ts b/x-pack/plugins/task_manager/server/task_scheduling.ts index 2241a35a6813b..6f67d47124cd5 100644 --- a/x-pack/plugins/task_manager/server/task_scheduling.ts +++ b/x-pack/plugins/task_manager/server/task_scheduling.ts @@ -163,20 +163,18 @@ export class TaskScheduling { return { ...task, schedule, runAt: new Date(newRunAtInMs) }; }); - const result: BulkUpdateSchedulesResult = { - tasks: [], - errors: [], - }; - - (await this.store.bulkUpdate(updatedTasks)).forEach((task) => { - if (task.tag === 'ok') { - result.tasks.push(task.value); - } else { - result.errors.push({ error: task.error.error, task: task.error.entity }); - } - }); + return (await this.store.bulkUpdate(updatedTasks)).reduce( + (acc, task) => { + if (task.tag === 'ok') { + acc.tasks.push(task.value); + } else { + acc.errors.push({ error: task.error.error, task: task.error.entity }); + } - return result; + return acc; + }, + { tasks: [], errors: [] } + ); } /** From e9d44266d68eb447f5d2bd383a21c06cd680beb9 Mon Sep 17 00:00:00 2001 From: Vitalii Dmyterko Date: Tue, 31 May 2022 10:54:06 +0100 Subject: [PATCH 07/23] add test to rukes_client --- .../server/rules_client/rules_client.ts | 13 ++-- .../rules_client/tests/bulk_edit.test.ts | 77 +++++++++++++++++++ 2 files changed, 82 insertions(+), 8 deletions(-) diff --git a/x-pack/plugins/alerting/server/rules_client/rules_client.ts b/x-pack/plugins/alerting/server/rules_client/rules_client.ts index 9869601dee932..6ebb675ab633a 100644 --- a/x-pack/plugins/alerting/server/rules_client/rules_client.ts +++ b/x-pack/plugins/alerting/server/rules_client/rules_client.ts @@ -230,12 +230,7 @@ export type BulkEditOperation = value: Rule['schedule']; }; -// schedule, throttle, notifyWhen is commented out before https://github.com/elastic/kibana/issues/124850 will be implemented -// | { -// operation: 'set'; -// field: Extract; -// value: Rule['schedule']; -// } +// throttle, notifyWhen is commented out before https://github.com/elastic/kibana/issues/124850 will be implemented // | { // operation: 'set'; // field: Extract; @@ -1501,8 +1496,10 @@ export class RulesClient { // update schedules only if schedule operation is present const scheduleOperation = options.operations.find( - (op): op is Extract }> => - op.field === 'schedule' + ( + operation + ): operation is Extract }> => + operation.field === 'schedule' ); if (scheduleOperation?.value) { diff --git a/x-pack/plugins/alerting/server/rules_client/tests/bulk_edit.test.ts b/x-pack/plugins/alerting/server/rules_client/tests/bulk_edit.test.ts index e878fd3f79e17..fe5f934ce4ab3 100644 --- a/x-pack/plugins/alerting/server/rules_client/tests/bulk_edit.test.ts +++ b/x-pack/plugins/alerting/server/rules_client/tests/bulk_edit.test.ts @@ -899,4 +899,81 @@ describe('bulkEdit()', () => { ); }); }); + + describe('task manager', () => { + test('should call task manager method bulkUpdateSchedules if operation set new schedules', async () => { + unsecuredSavedObjectsClient.bulkUpdate.mockResolvedValue({ + saved_objects: [ + { + id: '1', + type: 'alert', + attributes: { + enabled: true, + tags: ['foo'], + alertTypeId: 'myType', + schedule: { interval: '1m' }, + consumer: 'myApp', + scheduledTaskId: 'task-123', + params: { index: ['test-index-*'] }, + throttle: null, + notifyWhen: null, + actions: [], + }, + references: [], + version: '123', + }, + ], + }); + + await rulesClient.bulkEdit({ + operations: [ + { + field: 'schedule', + operation: 'set', + value: { interval: '10m' }, + }, + ], + }); + + expect(taskManager.bulkUpdateSchedules).toHaveBeenCalledWith(['task-123'], { + interval: '10m', + }); + }); + + test('should not call task manager method bulkUpdateSchedules if operation is not set schedule', async () => { + unsecuredSavedObjectsClient.bulkUpdate.mockResolvedValue({ + saved_objects: [ + { + id: '1', + type: 'alert', + attributes: { + enabled: true, + tags: ['foo'], + alertTypeId: 'myType', + schedule: { interval: '1m' }, + consumer: 'myApp', + params: { index: ['test-index-*'] }, + throttle: null, + notifyWhen: null, + actions: [], + }, + references: [], + version: '123', + }, + ], + }); + + await rulesClient.bulkEdit({ + operations: [ + { + field: 'tags', + operation: 'set', + value: ['test-tag'], + }, + ], + }); + + expect(taskManager.bulkUpdateSchedules).not.toHaveBeenCalled(); + }); + }); }); From 0b96f460d54adee56ce313cca7ef75dace08127b Mon Sep 17 00:00:00 2001 From: Vitalii Dmyterko Date: Tue, 31 May 2022 12:09:42 +0100 Subject: [PATCH 08/23] tests, more tests --- .../spaces_only/tests/alerting/bulk_edit.ts | 137 ++++++++++++++---- 1 file changed, 112 insertions(+), 25 deletions(-) diff --git a/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/bulk_edit.ts b/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/bulk_edit.ts index 3150925e2e49e..aa1f6bfff2588 100644 --- a/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/bulk_edit.ts +++ b/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/bulk_edit.ts @@ -16,7 +16,7 @@ export default function createUpdateTests({ getService }: FtrProviderContext) { const supertest = getService('supertest'); // FLAKY: https://github.com/elastic/kibana/issues/132195 - describe.skip('bulkEdit', () => { + describe('bulkEdit', () => { const objectRemover = new ObjectRemover(supertest); after(() => objectRemover.removeAll()); @@ -25,7 +25,7 @@ export default function createUpdateTests({ getService }: FtrProviderContext) { const { body: createdRule } = await supertest .post(`${getUrlPrefix(Spaces.space1.id)}/api/alerting/rule`) .set('kbn-xsrf', 'foo') - .send(getTestRuleData({ tags: ['default'] })); + .send(getTestRuleData({ enabled: false, tags: ['default'] })); objectRemover.add(Spaces.space1.id, createdRule.id, 'rule', 'alerting'); @@ -71,7 +71,7 @@ export default function createUpdateTests({ getService }: FtrProviderContext) { supertest .post(`${getUrlPrefix(Spaces.space1.id)}/api/alerting/rule`) .set('kbn-xsrf', 'foo') - .send(getTestRuleData({ tags: [`multiple-rules-edit`] })) + .send(getTestRuleData({ enabled: false, tags: [`multiple-rules-edit`] })) .expect(200) ) ) @@ -119,11 +119,11 @@ export default function createUpdateTests({ getService }: FtrProviderContext) { }); }); - it(`shouldn't bulk edit rule from another space`, async () => { + it('should bulk edit rule with schedule operation', async () => { const { body: createdRule } = await supertest .post(`${getUrlPrefix(Spaces.space1.id)}/api/alerting/rule`) .set('kbn-xsrf', 'foo') - .send(getTestRuleData({ tags: ['default'] })); + .send(getTestRuleData({ enabled: false, schedule: { interval: '10m' } })); objectRemover.add(Spaces.space1.id, createdRule.id, 'rule', 'alerting'); @@ -131,27 +131,42 @@ export default function createUpdateTests({ getService }: FtrProviderContext) { ids: [createdRule.id], operations: [ { - operation: 'add', - field: 'tags', - value: ['tag-1'], + operation: 'set', + field: 'schedule', + value: { interval: '1h' }, }, ], }; - await supertest - .post(`${getUrlPrefix(Spaces.other.id)}/internal/alerting/rules/_bulk_edit`) + const bulkEditResponse = await supertest + .post(`${getUrlPrefix(Spaces.space1.id)}/internal/alerting/rules/_bulk_edit`) .set('kbn-xsrf', 'foo') - .send(payload) - .expect(200, { rules: [], errors: [], total: 0 }); + .send(payload); + + expect(bulkEditResponse.body.errors).to.have.length(0); + expect(bulkEditResponse.body.rules).to.have.length(1); + expect(bulkEditResponse.body.rules[0].schedule).to.eql({ interval: '1h' }); + + const { body: updatedRule } = await supertest + .get(`${getUrlPrefix(Spaces.space1.id)}/internal/alerting/rule/${createdRule.id}`) + .set('kbn-xsrf', 'foo'); + + expect(updatedRule.schedule).to.eql({ interval: '1h' }); + + // Ensure AAD isn't broken + await checkAAD({ + supertest, + spaceId: Spaces.space1.id, + type: 'alert', + id: createdRule.id, + }); }); - it('should return mapped params after bulk edit', async () => { + it(`shouldn't bulk edit rule from another space`, async () => { const { body: createdRule } = await supertest .post(`${getUrlPrefix(Spaces.space1.id)}/api/alerting/rule`) .set('kbn-xsrf', 'foo') - .send( - getTestRuleData({ tags: ['default'], params: { risk_score: 40, severity: 'medium' } }) - ); + .send(getTestRuleData({ enabled: false, tags: ['default'] })); objectRemover.add(Spaces.space1.id, createdRule.id, 'rule', 'alerting'); @@ -166,17 +181,89 @@ export default function createUpdateTests({ getService }: FtrProviderContext) { ], }; - const bulkEditResponse = await supertest - .post(`${getUrlPrefix(Spaces.space1.id)}/internal/alerting/rules/_bulk_edit`) + await supertest + .post(`${getUrlPrefix(Spaces.other.id)}/internal/alerting/rules/_bulk_edit`) .set('kbn-xsrf', 'foo') - .send(payload); + .send(payload) + .expect(200, { rules: [], errors: [], total: 0 }); + }); - expect(bulkEditResponse.body.errors).to.have.length(0); - expect(bulkEditResponse.body.rules).to.have.length(1); - expect(bulkEditResponse.body.rules[0].mapped_params).to.eql({ - risk_score: 40, - severity: '40-medium', + // for test purpose only, will be removed + for (let i = 0; i < 100; i++) { + it(`should return mapped params after bulk edit #${i}`, async () => { + const { body: createdRule } = await supertest + .post(`${getUrlPrefix(Spaces.space1.id)}/api/alerting/rule`) + .set('kbn-xsrf', 'foo') + .send( + getTestRuleData({ + enabled: false, + tags: ['default'], + params: { risk_score: 40, severity: 'medium' }, + }) + ); + + objectRemover.add(Spaces.space1.id, createdRule.id, 'rule', 'alerting'); + + const payload = { + ids: [createdRule.id], + operations: [ + { + operation: 'add', + field: 'tags', + value: ['tag-1'], + }, + ], + }; + + const bulkEditResponse = await supertest + .post(`${getUrlPrefix(Spaces.space1.id)}/internal/alerting/rules/_bulk_edit`) + .set('kbn-xsrf', 'foo') + .send(payload); + + expect(bulkEditResponse.body.errors).to.have.length(0); + expect(bulkEditResponse.body.rules).to.have.length(1); + expect(bulkEditResponse.body.rules[0].mapped_params).to.eql({ + risk_score: 40, + severity: '40-medium', + }); }); - }); + + it(`should return mapped params after bulk edit ENABLED #${i}`, async () => { + const { body: createdRule } = await supertest + .post(`${getUrlPrefix(Spaces.space1.id)}/api/alerting/rule`) + .set('kbn-xsrf', 'foo') + .send( + getTestRuleData({ + tags: ['default'], + params: { risk_score: 40, severity: 'medium' }, + }) + ); + + objectRemover.add(Spaces.space1.id, createdRule.id, 'rule', 'alerting'); + + const payload = { + ids: [createdRule.id], + operations: [ + { + operation: 'add', + field: 'tags', + value: ['tag-1'], + }, + ], + }; + + const bulkEditResponse = await supertest + .post(`${getUrlPrefix(Spaces.space1.id)}/internal/alerting/rules/_bulk_edit`) + .set('kbn-xsrf', 'foo') + .send(payload); + + expect(bulkEditResponse.body.errors).to.have.length(0); + expect(bulkEditResponse.body.rules).to.have.length(1); + expect(bulkEditResponse.body.rules[0].mapped_params).to.eql({ + risk_score: 40, + severity: '40-medium', + }); + }); + } }); } From 9f1ccf71bf636e85a8e3b18004e88bad8f21e13b Mon Sep 17 00:00:00 2001 From: Vitalii Dmyterko Date: Tue, 31 May 2022 17:05:41 +0100 Subject: [PATCH 09/23] README, docs --- x-pack/plugins/task_manager/README.md | 30 +++++++++++++++++++ .../task_manager/server/task_scheduling.ts | 10 +++++++ .../spaces_only/tests/alerting/bulk_edit.ts | 2 +- 3 files changed, 41 insertions(+), 1 deletion(-) diff --git a/x-pack/plugins/task_manager/README.md b/x-pack/plugins/task_manager/README.md index 350a53c660bc7..f345b5647219e 100644 --- a/x-pack/plugins/task_manager/README.md +++ b/x-pack/plugins/task_manager/README.md @@ -328,6 +328,9 @@ The _Start_ Plugin api allow you to use Task Manager to facilitate your Plugin's runNow: (taskId: string) => { // ... }, + bulkUpdateSchedules: (taskIds: string[], schedule: IntervalSchedule) => { + // ... + }, ensureScheduled: (taskInstance: TaskInstanceWithId, options?: any) => { // ... }, @@ -415,6 +418,33 @@ export class Plugin { } ``` +#### bulkUpdateSchedules +Using `bulkUpdatesSchedules` you can instruct TaskManger to update interval of tasks that are in `idle` status. +When interval updated, new `runAt` will be computed and task will be updated with that value + +```js +export class Plugin { + constructor() { + } + + public setup(core: CoreSetup, plugins: { taskManager }) { + } + + public start(core: CoreStart, plugins: { taskManager }) { + try { + const bulkUpdateResults = await taskManager.bulkUpdateSchedule( + ['97c2c4e7-d850-11ec-bf95-895ffd19f959', 'a5ee24d1-dce2-11ec-ab8d-cf74da82133d'], + { interval: '10m' }, + ); + // If no error is thrown, the bulkUpdateSchedule has completed successfully. + // But some updates of some tasks can be failed, due to OCC 409 conflict for example + } catch(err: Error) { + // if error is caught, means the whole method requested has failed and tasks weren't updated + } + } +} +``` + #### more options More custom access to the tasks can be done directly via Elasticsearch, though that won't be officially supported, as we can change the document structure at any time. diff --git a/x-pack/plugins/task_manager/server/task_scheduling.ts b/x-pack/plugins/task_manager/server/task_scheduling.ts index 6f67d47124cd5..e4a6f8c7c12fb 100644 --- a/x-pack/plugins/task_manager/server/task_scheduling.ts +++ b/x-pack/plugins/task_manager/server/task_scheduling.ts @@ -59,8 +59,18 @@ export interface TaskSchedulingOpts { taskManagerId: string; } +/** + * return type of TaskScheduling.bulkUpdateSchedules method + */ export interface BulkUpdateSchedulesResult { + /** + * list of successfully updated tasks + */ tasks: ConcreteTaskInstance[]; + + /** + * list of failed tasks and error caused failure + */ errors: Array<{ task: ConcreteTaskInstance; error: Error }>; } export interface RunNowResult { diff --git a/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/bulk_edit.ts b/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/bulk_edit.ts index aa1f6bfff2588..91fb33362e47d 100644 --- a/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/bulk_edit.ts +++ b/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/bulk_edit.ts @@ -189,7 +189,7 @@ export default function createUpdateTests({ getService }: FtrProviderContext) { }); // for test purpose only, will be removed - for (let i = 0; i < 100; i++) { + for (let i = 0; i < 200; i++) { it(`should return mapped params after bulk edit #${i}`, async () => { const { body: createdRule } = await supertest .post(`${getUrlPrefix(Spaces.space1.id)}/api/alerting/rule`) From edc78900895cb9618b9638b6b8ae69c12b722da3 Mon Sep 17 00:00:00 2001 From: Vitalii Dmyterko Date: Wed, 1 Jun 2022 09:05:06 +0100 Subject: [PATCH 10/23] skip again --- .../spaces_only/tests/alerting/bulk_edit.ts | 137 ++++-------------- 1 file changed, 25 insertions(+), 112 deletions(-) diff --git a/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/bulk_edit.ts b/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/bulk_edit.ts index 91fb33362e47d..3150925e2e49e 100644 --- a/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/bulk_edit.ts +++ b/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/bulk_edit.ts @@ -16,7 +16,7 @@ export default function createUpdateTests({ getService }: FtrProviderContext) { const supertest = getService('supertest'); // FLAKY: https://github.com/elastic/kibana/issues/132195 - describe('bulkEdit', () => { + describe.skip('bulkEdit', () => { const objectRemover = new ObjectRemover(supertest); after(() => objectRemover.removeAll()); @@ -25,7 +25,7 @@ export default function createUpdateTests({ getService }: FtrProviderContext) { const { body: createdRule } = await supertest .post(`${getUrlPrefix(Spaces.space1.id)}/api/alerting/rule`) .set('kbn-xsrf', 'foo') - .send(getTestRuleData({ enabled: false, tags: ['default'] })); + .send(getTestRuleData({ tags: ['default'] })); objectRemover.add(Spaces.space1.id, createdRule.id, 'rule', 'alerting'); @@ -71,7 +71,7 @@ export default function createUpdateTests({ getService }: FtrProviderContext) { supertest .post(`${getUrlPrefix(Spaces.space1.id)}/api/alerting/rule`) .set('kbn-xsrf', 'foo') - .send(getTestRuleData({ enabled: false, tags: [`multiple-rules-edit`] })) + .send(getTestRuleData({ tags: [`multiple-rules-edit`] })) .expect(200) ) ) @@ -119,11 +119,11 @@ export default function createUpdateTests({ getService }: FtrProviderContext) { }); }); - it('should bulk edit rule with schedule operation', async () => { + it(`shouldn't bulk edit rule from another space`, async () => { const { body: createdRule } = await supertest .post(`${getUrlPrefix(Spaces.space1.id)}/api/alerting/rule`) .set('kbn-xsrf', 'foo') - .send(getTestRuleData({ enabled: false, schedule: { interval: '10m' } })); + .send(getTestRuleData({ tags: ['default'] })); objectRemover.add(Spaces.space1.id, createdRule.id, 'rule', 'alerting'); @@ -131,42 +131,27 @@ export default function createUpdateTests({ getService }: FtrProviderContext) { ids: [createdRule.id], operations: [ { - operation: 'set', - field: 'schedule', - value: { interval: '1h' }, + operation: 'add', + field: 'tags', + value: ['tag-1'], }, ], }; - const bulkEditResponse = await supertest - .post(`${getUrlPrefix(Spaces.space1.id)}/internal/alerting/rules/_bulk_edit`) + await supertest + .post(`${getUrlPrefix(Spaces.other.id)}/internal/alerting/rules/_bulk_edit`) .set('kbn-xsrf', 'foo') - .send(payload); - - expect(bulkEditResponse.body.errors).to.have.length(0); - expect(bulkEditResponse.body.rules).to.have.length(1); - expect(bulkEditResponse.body.rules[0].schedule).to.eql({ interval: '1h' }); - - const { body: updatedRule } = await supertest - .get(`${getUrlPrefix(Spaces.space1.id)}/internal/alerting/rule/${createdRule.id}`) - .set('kbn-xsrf', 'foo'); - - expect(updatedRule.schedule).to.eql({ interval: '1h' }); - - // Ensure AAD isn't broken - await checkAAD({ - supertest, - spaceId: Spaces.space1.id, - type: 'alert', - id: createdRule.id, - }); + .send(payload) + .expect(200, { rules: [], errors: [], total: 0 }); }); - it(`shouldn't bulk edit rule from another space`, async () => { + it('should return mapped params after bulk edit', async () => { const { body: createdRule } = await supertest .post(`${getUrlPrefix(Spaces.space1.id)}/api/alerting/rule`) .set('kbn-xsrf', 'foo') - .send(getTestRuleData({ enabled: false, tags: ['default'] })); + .send( + getTestRuleData({ tags: ['default'], params: { risk_score: 40, severity: 'medium' } }) + ); objectRemover.add(Spaces.space1.id, createdRule.id, 'rule', 'alerting'); @@ -181,89 +166,17 @@ export default function createUpdateTests({ getService }: FtrProviderContext) { ], }; - await supertest - .post(`${getUrlPrefix(Spaces.other.id)}/internal/alerting/rules/_bulk_edit`) + const bulkEditResponse = await supertest + .post(`${getUrlPrefix(Spaces.space1.id)}/internal/alerting/rules/_bulk_edit`) .set('kbn-xsrf', 'foo') - .send(payload) - .expect(200, { rules: [], errors: [], total: 0 }); - }); - - // for test purpose only, will be removed - for (let i = 0; i < 200; i++) { - it(`should return mapped params after bulk edit #${i}`, async () => { - const { body: createdRule } = await supertest - .post(`${getUrlPrefix(Spaces.space1.id)}/api/alerting/rule`) - .set('kbn-xsrf', 'foo') - .send( - getTestRuleData({ - enabled: false, - tags: ['default'], - params: { risk_score: 40, severity: 'medium' }, - }) - ); - - objectRemover.add(Spaces.space1.id, createdRule.id, 'rule', 'alerting'); - - const payload = { - ids: [createdRule.id], - operations: [ - { - operation: 'add', - field: 'tags', - value: ['tag-1'], - }, - ], - }; - - const bulkEditResponse = await supertest - .post(`${getUrlPrefix(Spaces.space1.id)}/internal/alerting/rules/_bulk_edit`) - .set('kbn-xsrf', 'foo') - .send(payload); - - expect(bulkEditResponse.body.errors).to.have.length(0); - expect(bulkEditResponse.body.rules).to.have.length(1); - expect(bulkEditResponse.body.rules[0].mapped_params).to.eql({ - risk_score: 40, - severity: '40-medium', - }); - }); - - it(`should return mapped params after bulk edit ENABLED #${i}`, async () => { - const { body: createdRule } = await supertest - .post(`${getUrlPrefix(Spaces.space1.id)}/api/alerting/rule`) - .set('kbn-xsrf', 'foo') - .send( - getTestRuleData({ - tags: ['default'], - params: { risk_score: 40, severity: 'medium' }, - }) - ); - - objectRemover.add(Spaces.space1.id, createdRule.id, 'rule', 'alerting'); - - const payload = { - ids: [createdRule.id], - operations: [ - { - operation: 'add', - field: 'tags', - value: ['tag-1'], - }, - ], - }; - - const bulkEditResponse = await supertest - .post(`${getUrlPrefix(Spaces.space1.id)}/internal/alerting/rules/_bulk_edit`) - .set('kbn-xsrf', 'foo') - .send(payload); + .send(payload); - expect(bulkEditResponse.body.errors).to.have.length(0); - expect(bulkEditResponse.body.rules).to.have.length(1); - expect(bulkEditResponse.body.rules[0].mapped_params).to.eql({ - risk_score: 40, - severity: '40-medium', - }); + expect(bulkEditResponse.body.errors).to.have.length(0); + expect(bulkEditResponse.body.rules).to.have.length(1); + expect(bulkEditResponse.body.rules[0].mapped_params).to.eql({ + risk_score: 40, + severity: '40-medium', }); - } + }); }); } From d20f2fd5093593fdb5831cdfa26e3cf3dbf1bcc7 Mon Sep 17 00:00:00 2001 From: Vitalii Dmyterko Date: Wed, 1 Jun 2022 09:07:18 +0100 Subject: [PATCH 11/23] add rest of ops --- .../server/rules_client/rules_client.ts | 27 ++++++++++--------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/x-pack/plugins/alerting/server/rules_client/rules_client.ts b/x-pack/plugins/alerting/server/rules_client/rules_client.ts index 6ebb675ab633a..fc4c900e21f14 100644 --- a/x-pack/plugins/alerting/server/rules_client/rules_client.ts +++ b/x-pack/plugins/alerting/server/rules_client/rules_client.ts @@ -211,7 +211,10 @@ export interface FindOptions extends IndexType { filter?: string; } -export type BulkEditFields = keyof Pick; +export type BulkEditFields = keyof Pick< + Rule, + 'actions' | 'tags' | 'schedule' | 'throttle' | 'notifyWhen' +>; export type BulkEditOperation = | { @@ -228,20 +231,18 @@ export type BulkEditOperation = operation: 'set'; field: Extract; value: Rule['schedule']; + } + | { + operation: 'set'; + field: Extract; + value: Rule['throttle']; + } + | { + operation: 'set'; + field: Extract; + value: Rule['notifyWhen']; }; -// throttle, notifyWhen is commented out before https://github.com/elastic/kibana/issues/124850 will be implemented -// | { -// operation: 'set'; -// field: Extract; -// value: Rule['throttle']; -// } -// | { -// operation: 'set'; -// field: Extract; -// value: Rule['notifyWhen']; -// }; - type RuleParamsModifier = (params: Params) => Promise; export interface BulkEditOptionsFilter { From f2bc69607f24f4d4898469ef3b961209301b00b1 Mon Sep 17 00:00:00 2001 From: Vitalii Dmyterko Date: Wed, 1 Jun 2022 16:51:00 +0100 Subject: [PATCH 12/23] tests --- .../alerting/server/routes/bulk_edit_rules.ts | 16 ++++++++++++++ .../group1/tests/alerting/bulk_edit.ts | 21 +++++++------------ 2 files changed, 23 insertions(+), 14 deletions(-) diff --git a/x-pack/plugins/alerting/server/routes/bulk_edit_rules.ts b/x-pack/plugins/alerting/server/routes/bulk_edit_rules.ts index 4925ee12b2bd0..a56acd1514011 100644 --- a/x-pack/plugins/alerting/server/routes/bulk_edit_rules.ts +++ b/x-pack/plugins/alerting/server/routes/bulk_edit_rules.ts @@ -43,6 +43,22 @@ const operationsSchema = schema.arrayOf( field: schema.literal('schedule'), value: scheduleSchema, }), + schema.object({ + operation: schema.literal('set'), + field: schema.literal('throttle'), + value: schema.nullable(schema.string()), + }), + schema.object({ + operation: schema.literal('set'), + field: schema.literal('notifyWhen'), + value: schema.nullable( + schema.oneOf([ + schema.literal('onActionGroupChange'), + schema.literal('onActiveAlert'), + schema.literal('onThrottleInterval'), + ]) + ), + }), ]), { minSize: 1 } ); diff --git a/x-pack/test/alerting_api_integration/security_and_spaces/group1/tests/alerting/bulk_edit.ts b/x-pack/test/alerting_api_integration/security_and_spaces/group1/tests/alerting/bulk_edit.ts index 35ea847f70ef4..6ae46cfd8d7bf 100644 --- a/x-pack/test/alerting_api_integration/security_and_spaces/group1/tests/alerting/bulk_edit.ts +++ b/x-pack/test/alerting_api_integration/security_and_spaces/group1/tests/alerting/bulk_edit.ts @@ -437,7 +437,7 @@ export default function createUpdateTests({ getService }: FtrProviderContext) { statusCode: 400, error: 'Bad Request', message: - '[request body.operations.0]: types that failed validation:\n- [request body.operations.0.0.operation]: types that failed validation:\n - [request body.operations.0.operation.0]: expected value to equal [add]\n - [request body.operations.0.operation.1]: expected value to equal [delete]\n - [request body.operations.0.operation.2]: expected value to equal [set]\n- [request body.operations.0.1.operation]: types that failed validation:\n - [request body.operations.0.operation.0]: expected value to equal [add]\n - [request body.operations.0.operation.1]: expected value to equal [set]\n- [request body.operations.0.2.operation]: expected value to equal [set]', + '[request body.operations.0]: types that failed validation:\n- [request body.operations.0.0.operation]: types that failed validation:\n - [request body.operations.0.operation.0]: expected value to equal [add]\n - [request body.operations.0.operation.1]: expected value to equal [delete]\n - [request body.operations.0.operation.2]: expected value to equal [set]\n- [request body.operations.0.1.operation]: types that failed validation:\n - [request body.operations.0.operation.0]: expected value to equal [add]\n - [request body.operations.0.operation.1]: expected value to equal [set]\n- [request body.operations.0.2.operation]: expected value to equal [set]\n- [request body.operations.0.3.operation]: expected value to equal [set]\n- [request body.operations.0.4.operation]: expected value to equal [set]', }); expect(response.statusCode).to.eql(400); break; @@ -446,21 +446,14 @@ export default function createUpdateTests({ getService }: FtrProviderContext) { } }); - it('should handle bulk edit of rules when operation field is invalid', async () => { - const { body: createdRule } = await supertest - .post(`${getUrlPrefix(space.id)}/api/alerting/rule`) - .set('kbn-xsrf', 'foo') - .send(getTestRuleData({ tags: ['foo'] })) - .expect(200); - objectRemover.add(space.id, createdRule.id, 'rule', 'alerting'); - + it('should handle bulk edit of rules when operation value type is incorrect', async () => { const payload = { - ids: [createdRule.id], + filter: '', operations: [ { operation: 'add', - field: 'test', - value: ['test'], + field: 'tags', + value: 'not an array', }, ], }; @@ -482,7 +475,7 @@ export default function createUpdateTests({ getService }: FtrProviderContext) { statusCode: 400, error: 'Bad Request', message: - '[request body.operations.0]: types that failed validation:\n- [request body.operations.0.0.field]: expected value to equal [tags]\n- [request body.operations.0.1.field]: expected value to equal [actions]\n- [request body.operations.0.2.operation]: expected value to equal [set]', + '[request body.operations.0]: types that failed validation:\n- [request body.operations.0.0.value]: could not parse array value from json input\n- [request body.operations.0.1.field]: expected value to equal [actions]\n- [request body.operations.0.2.operation]: expected value to equal [set]\n- [request body.operations.0.3.operation]: expected value to equal [set]\n- [request body.operations.0.4.operation]: expected value to equal [set]', }); expect(response.statusCode).to.eql(400); break; @@ -520,7 +513,7 @@ export default function createUpdateTests({ getService }: FtrProviderContext) { statusCode: 400, error: 'Bad Request', message: - '[request body.operations.0]: types that failed validation:\n- [request body.operations.0.0.field]: expected value to equal [tags]\n- [request body.operations.0.1.field]: expected value to equal [actions]\n- [request body.operations.0.2.operation]: expected value to equal [set]', + '[request body.operations.0]: types that failed validation:\n- [request body.operations.0.0.field]: expected value to equal [tags]\n- [request body.operations.0.1.field]: expected value to equal [actions]\n- [request body.operations.0.2.operation]: expected value to equal [set]\n- [request body.operations.0.3.operation]: expected value to equal [set]\n- [request body.operations.0.4.operation]: expected value to equal [set]', }); expect(response.statusCode).to.eql(400); break; From 0b44ef184fed3bb48dc783a889d032c2788f4e26 Mon Sep 17 00:00:00 2001 From: Vitalii Dmyterko Date: Mon, 6 Jun 2022 12:00:39 +0100 Subject: [PATCH 13/23] comments updates --- .../plugins/task_manager/server/task_scheduling.test.ts | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/x-pack/plugins/task_manager/server/task_scheduling.test.ts b/x-pack/plugins/task_manager/server/task_scheduling.test.ts index fbc271c0ed4ee..79773e6895516 100644 --- a/x-pack/plugins/task_manager/server/task_scheduling.test.ts +++ b/x-pack/plugins/task_manager/server/task_scheduling.test.ts @@ -213,7 +213,7 @@ describe('TaskScheduling', () => { expect(bulkUpdatePayload).toHaveLength(1); expect(bulkUpdatePayload[0]).toHaveProperty('schedule', { interval: '5h' }); - // if we update rule with schedule of '5h' and prev interval was 3h, task will be run in 2 hours later + // if tasks updated with schedule interval of '5h' and previous interval was 3h, task will be scheduled to run in 2 hours later expect(bulkUpdatePayload[0].runAt.getTime() - runInTwoHrs.getTime()).toBe( moment.duration(2, 'hours').asMilliseconds() ); @@ -232,14 +232,14 @@ describe('TaskScheduling', () => { const bulkUpdatePayload = mockTaskStore.bulkUpdate.mock.calls[0][0]; expect(bulkUpdatePayload[0]).toHaveProperty('schedule', { interval: '2h' }); - // if we update rule with schedule of '2h' and prev interval was 3h, task will be run in 1 hour sooner + // if tasks updated with schedule interval of '2h' and previous interval was 3h, task will be scheduled to run in 1 hour sooner expect(runInTwoHrs.getTime() - bulkUpdatePayload[0].runAt.getTime()).toBe( moment.duration(1, 'hour').asMilliseconds() ); }); test('should set task run to now if time that passed from last run is greater than new interval', async () => { - // task set to be run in one 1hr from now + // task set to be run in one 1hr from now. With interval of '2h', it means last run happened 1 hour ago const runInOneHr = new Date(Date.now() + moment.duration(1, 'hour').asMilliseconds()); const task = mockTask({ id, schedule: { interval: '2h' }, runAt: runInOneHr }); @@ -252,7 +252,7 @@ describe('TaskScheduling', () => { expect(bulkUpdatePayload[0]).toHaveProperty('schedule', { interval: '30m' }); - // if time that passed from last rule task is greater than new interval, task should be set to run at now time + // if time that passed from last task run is greater than new interval, task should be set to run at now time expect(bulkUpdatePayload[0].runAt.getTime()).toBeLessThanOrEqual(Date.now()); }); }); From cb5411a71d298fa37754d9e0a6fd1052e2e9d055 Mon Sep 17 00:00:00 2001 From: Vitalii Dmyterko Date: Mon, 6 Jun 2022 12:09:11 +0100 Subject: [PATCH 14/23] JSDoc --- x-pack/plugins/task_manager/server/task_scheduling.ts | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/x-pack/plugins/task_manager/server/task_scheduling.ts b/x-pack/plugins/task_manager/server/task_scheduling.ts index e4a6f8c7c12fb..1fddc44c9e4b5 100644 --- a/x-pack/plugins/task_manager/server/task_scheduling.ts +++ b/x-pack/plugins/task_manager/server/task_scheduling.ts @@ -69,7 +69,7 @@ export interface BulkUpdateSchedulesResult { tasks: ConcreteTaskInstance[]; /** - * list of failed tasks and error caused failure + * list of failed tasks and errors caused failure */ errors: Array<{ task: ConcreteTaskInstance; error: Error }>; } @@ -131,9 +131,9 @@ export class TaskScheduling { /** * Bulk updates schedules for tasks by ids. * - * @param taskIds string[] - list of task ids - * @param schedule IntervalSchedule - new schedule - * @returns {Promise} + * @param {string[]} taskIds - list of task ids + * @param {IntervalSchedule} schedule - new schedule + * @returns {Promise} */ public async bulkUpdateSchedules( taskIds: string[], From bdeadf66d451e6c0097b83f1468e51a4747fada0 Mon Sep 17 00:00:00 2001 From: Vitalii Dmyterko Date: Mon, 6 Jun 2022 15:01:19 +0100 Subject: [PATCH 15/23] few perf improvements --- .../task_manager/server/task_scheduling.test.ts | 13 +++++++++++++ .../plugins/task_manager/server/task_scheduling.ts | 14 +++++++++++--- 2 files changed, 24 insertions(+), 3 deletions(-) diff --git a/x-pack/plugins/task_manager/server/task_scheduling.test.ts b/x-pack/plugins/task_manager/server/task_scheduling.test.ts index 79773e6895516..371266fc872ff 100644 --- a/x-pack/plugins/task_manager/server/task_scheduling.test.ts +++ b/x-pack/plugins/task_manager/server/task_scheduling.test.ts @@ -199,6 +199,19 @@ describe('TaskScheduling', () => { }); }); + test('should not update task if new interval is equal to previous', async () => { + const task = mockTask({ id, schedule: { interval: '3h' } }); + + mockTaskStore.fetch.mockResolvedValue({ docs: [task] }); + + const taskScheduling = new TaskScheduling(taskSchedulingOpts); + await taskScheduling.bulkUpdateSchedules([id], { interval: '3h' }); + + const bulkUpdatePayload = mockTaskStore.bulkUpdate.mock.calls[0][0]; + + expect(bulkUpdatePayload).toHaveLength(0); + }); + test('should postpone task run if new interval is greater than previous', async () => { // task set to be run in 2 hrs from now const runInTwoHrs = new Date(Date.now() + moment.duration(2, 'hours').asMilliseconds()); diff --git a/x-pack/plugins/task_manager/server/task_scheduling.ts b/x-pack/plugins/task_manager/server/task_scheduling.ts index 1fddc44c9e4b5..02a6f55bdc5ef 100644 --- a/x-pack/plugins/task_manager/server/task_scheduling.ts +++ b/x-pack/plugins/task_manager/server/task_scheduling.ts @@ -162,16 +162,24 @@ export class TaskScheduling { const updatedTasks = tasks .flatMap(({ docs }) => docs) - .map((task) => { + .reduce((acc, task) => { + // if task schedule interval is the same, no need to update it + if (task.schedule?.interval === schedule.interval) { + return acc; + } + const oldIntervalInMs = parseIntervalAsMillisecond(task.schedule?.interval ?? '0s'); + // computing new runAt using formula: + // newRunAt = oldRunAt - oldInterval + newInterval const newRunAtInMs = Math.max( Date.now(), task.runAt.getTime() - oldIntervalInMs + parseIntervalAsMillisecond(schedule.interval) ); - return { ...task, schedule, runAt: new Date(newRunAtInMs) }; - }); + acc.push({ ...task, schedule, runAt: new Date(newRunAtInMs) }); + return acc; + }, []); return (await this.store.bulkUpdate(updatedTasks)).reduce( (acc, task) => { From 64e7adba1acefc3501114eed784ec47ae317e8a6 Mon Sep 17 00:00:00 2001 From: Vitalii Dmyterko Date: Wed, 8 Jun 2022 17:28:47 +0100 Subject: [PATCH 16/23] CR: replace auditLogger with logger.error --- .../alerting/server/rules_client/rules_client.ts | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/x-pack/plugins/alerting/server/rules_client/rules_client.ts b/x-pack/plugins/alerting/server/rules_client/rules_client.ts index fc4c900e21f14..fe61148b6350d 100644 --- a/x-pack/plugins/alerting/server/rules_client/rules_client.ts +++ b/x-pack/plugins/alerting/server/rules_client/rules_client.ts @@ -1513,12 +1513,14 @@ export class RulesClient { try { await this.taskManager.bulkUpdateSchedules(taskIds, scheduleOperation.value); + this.logger.debug( + `Successfully updated schedules for underlying tasks: ${taskIds.join(', ')}` + ); } catch (error) { - this.auditLogger?.log( - ruleAuditEvent({ - action: RuleAuditAction.BULK_EDIT, - error, - }) + this.logger.error( + `Failure to update schedules for underlying tasks: ${taskIds.join( + ', ' + )}. TaskManager bulkUpdateSchedules failed with Error: ${error.message}` ); } } From f079cd7933a78050ba59609a0e99465c93c0d641 Mon Sep 17 00:00:00 2001 From: Vitalii Dmyterko Date: Thu, 9 Jun 2022 12:49:39 +0100 Subject: [PATCH 17/23] [RAM] rules-client-update-move-to-new-taks-manager-api --- .../server/rules_client/rules_client.ts | 27 ++++++++++--------- 1 file changed, 15 insertions(+), 12 deletions(-) diff --git a/x-pack/plugins/alerting/server/rules_client/rules_client.ts b/x-pack/plugins/alerting/server/rules_client/rules_client.ts index fe61148b6350d..c993796c65a4f 100644 --- a/x-pack/plugins/alerting/server/rules_client/rules_client.ts +++ b/x-pack/plugins/alerting/server/rules_client/rules_client.ts @@ -1245,20 +1245,23 @@ export class RulesClient { (async () => { if ( updateResult.scheduledTaskId && + updateResult.schedule && !isEqual(alertSavedObject.attributes.schedule, updateResult.schedule) ) { - this.taskManager - .runNow(updateResult.scheduledTaskId) - .then(() => { - this.logger.debug( - `Alert update has rescheduled the underlying task: ${updateResult.scheduledTaskId}` - ); - }) - .catch((err: Error) => { - this.logger.error( - `Alert update failed to run its underlying task. TaskManager runNow failed with Error: ${err.message}` - ); - }); + try { + await this.taskManager.bulkUpdateSchedules( + [updateResult.scheduledTaskId], + updateResult.schedule + ); + + this.logger.debug( + `Alert update has rescheduled the underlying task: ${updateResult.scheduledTaskId}` + ); + } catch (err) { + this.logger.error( + `Alert update failed to run its underlying task. TaskManager runNow failed with Error: ${err.message}` + ); + } } })(), ]); From 24e9f45785c9c188a309d7e6d57b90fba610988d Mon Sep 17 00:00:00 2001 From: Vitalii Dmyterko Date: Thu, 9 Jun 2022 17:12:44 +0100 Subject: [PATCH 18/23] fix failing unti tests --- .../server/rules_client/tests/update.test.ts | 54 +++---------------- 1 file changed, 8 insertions(+), 46 deletions(-) diff --git a/x-pack/plugins/alerting/server/rules_client/tests/update.test.ts b/x-pack/plugins/alerting/server/rules_client/tests/update.test.ts index 1508d49fe5851..5d3187b09d3f4 100644 --- a/x-pack/plugins/alerting/server/rules_client/tests/update.test.ts +++ b/x-pack/plugins/alerting/server/rules_client/tests/update.test.ts @@ -1585,7 +1585,7 @@ describe('update()', () => { taskManager.runNow.mockReturnValueOnce(Promise.resolve({ id: alertId })); } - test('updating the alert schedule should rerun the task immediately', async () => { + test('updating the alert schedule should call taskManager.bulkUpdateSchedules', async () => { const alertId = uuid.v4(); const taskId = uuid.v4(); @@ -1614,10 +1614,10 @@ describe('update()', () => { }, }); - expect(taskManager.runNow).toHaveBeenCalledWith(taskId); + expect(taskManager.bulkUpdateSchedules).toHaveBeenCalledWith([taskId], { interval: '1m' }); }); - test('updating the alert without changing the schedule should not rerun the task', async () => { + test('updating the alert without changing the schedule should not call taskManager.bulkUpdateSchedules', async () => { const alertId = uuid.v4(); const taskId = uuid.v4(); @@ -1646,55 +1646,17 @@ describe('update()', () => { }, }); - expect(taskManager.runNow).not.toHaveBeenCalled(); + expect(taskManager.bulkUpdateSchedules).not.toHaveBeenCalled(); }); - test('updating the alert should not wait for the rerun the task to complete', async () => { + test('logs when update of schedule of an alerts underlying task fails', async () => { const alertId = uuid.v4(); const taskId = uuid.v4(); mockApiCalls(alertId, taskId, { interval: '1m' }, { interval: '30s' }); - const resolveAfterAlertUpdatedCompletes = resolvable<{ id: string }>(); - - taskManager.runNow.mockReset(); - taskManager.runNow.mockReturnValue(resolveAfterAlertUpdatedCompletes); - - await rulesClient.update({ - id: alertId, - data: { - schedule: { interval: '1m' }, - name: 'abc', - tags: ['foo'], - params: { - bar: true, - }, - throttle: null, - notifyWhen: null, - actions: [ - { - group: 'default', - id: '1', - params: { - foo: true, - }, - }, - ], - }, - }); - - expect(taskManager.runNow).toHaveBeenCalled(); - resolveAfterAlertUpdatedCompletes.resolve({ id: alertId }); - }); - - test('logs when the rerun of an alerts underlying task fails', async () => { - const alertId = uuid.v4(); - const taskId = uuid.v4(); - - mockApiCalls(alertId, taskId, { interval: '1m' }, { interval: '30s' }); - - taskManager.runNow.mockReset(); - taskManager.runNow.mockRejectedValue(new Error('Failed to run alert')); + taskManager.bulkUpdateSchedules.mockReset(); + taskManager.bulkUpdateSchedules.mockRejectedValue(new Error('Failed to run alert')); await rulesClient.update({ id: alertId, @@ -1719,7 +1681,7 @@ describe('update()', () => { }, }); - expect(taskManager.runNow).toHaveBeenCalled(); + expect(taskManager.bulkUpdateSchedules).toHaveBeenCalled(); expect(rulesClientParams.logger.error).toHaveBeenCalledWith( `Alert update failed to run its underlying task. TaskManager runNow failed with Error: Failed to run alert` From 4136dd3dd7a07391c1e8de1f9e73548aecd8fcdb Mon Sep 17 00:00:00 2001 From: kibanamachine <42973632+kibanamachine@users.noreply.github.com> Date: Thu, 9 Jun 2022 16:18:37 +0000 Subject: [PATCH 19/23] [CI] Auto-commit changed files from 'node scripts/precommit_hook.js --ref HEAD~1..HEAD --fix' --- x-pack/plugins/alerting/server/rules_client/tests/update.test.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/x-pack/plugins/alerting/server/rules_client/tests/update.test.ts b/x-pack/plugins/alerting/server/rules_client/tests/update.test.ts index 5d3187b09d3f4..b7e19492e5cf5 100644 --- a/x-pack/plugins/alerting/server/rules_client/tests/update.test.ts +++ b/x-pack/plugins/alerting/server/rules_client/tests/update.test.ts @@ -17,7 +17,6 @@ import { RecoveredActionGroup } from '../../../common'; import { encryptedSavedObjectsMock } from '@kbn/encrypted-saved-objects-plugin/server/mocks'; import { actionsAuthorizationMock } from '@kbn/actions-plugin/server/mocks'; import { AlertingAuthorization } from '../../authorization/alerting_authorization'; -import { resolvable } from '../../test_utils'; import { ActionsAuthorization, ActionsClient } from '@kbn/actions-plugin/server'; import { TaskStatus } from '@kbn/task-manager-plugin/server'; import { auditLoggerMock } from '@kbn/security-plugin/server/audit/mocks'; From f216bbfac3d15ff8aa50cdc8a86067330e0d6b25 Mon Sep 17 00:00:00 2001 From: Vitalii Dmyterko <92328789+vitaliidm@users.noreply.github.com> Date: Wed, 15 Jun 2022 18:02:56 +0100 Subject: [PATCH 20/23] Update x-pack/plugins/alerting/server/rules_client/rules_client.ts Co-authored-by: Ying Mao --- x-pack/plugins/alerting/server/rules_client/rules_client.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugins/alerting/server/rules_client/rules_client.ts b/x-pack/plugins/alerting/server/rules_client/rules_client.ts index c993796c65a4f..f133e5e32d5d6 100644 --- a/x-pack/plugins/alerting/server/rules_client/rules_client.ts +++ b/x-pack/plugins/alerting/server/rules_client/rules_client.ts @@ -1255,7 +1255,7 @@ export class RulesClient { ); this.logger.debug( - `Alert update has rescheduled the underlying task: ${updateResult.scheduledTaskId}` + `Rule update has rescheduled the underlying task: ${updateResult.scheduledTaskId}` ); } catch (err) { this.logger.error( From d59787d4c49251a5ab9901539d40319965586785 Mon Sep 17 00:00:00 2001 From: Vitalii Dmyterko <92328789+vitaliidm@users.noreply.github.com> Date: Wed, 15 Jun 2022 18:03:12 +0100 Subject: [PATCH 21/23] Update x-pack/plugins/alerting/server/rules_client/rules_client.ts Co-authored-by: Ying Mao --- x-pack/plugins/alerting/server/rules_client/rules_client.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugins/alerting/server/rules_client/rules_client.ts b/x-pack/plugins/alerting/server/rules_client/rules_client.ts index f133e5e32d5d6..d773ce86d46e1 100644 --- a/x-pack/plugins/alerting/server/rules_client/rules_client.ts +++ b/x-pack/plugins/alerting/server/rules_client/rules_client.ts @@ -1259,7 +1259,7 @@ export class RulesClient { ); } catch (err) { this.logger.error( - `Alert update failed to run its underlying task. TaskManager runNow failed with Error: ${err.message}` + `Rule update failed to run its underlying task. TaskManager bulkUpdateSchedules failed with Error: ${err.message}` ); } } From 1b46b3ace63b66b1c9526db948cf5652937f1e41 Mon Sep 17 00:00:00 2001 From: Vitalii Dmyterko Date: Wed, 15 Jun 2022 18:46:04 +0100 Subject: [PATCH 22/23] CR: add runAt in --- x-pack/plugins/alerting/server/rules_client/rules_client.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/x-pack/plugins/alerting/server/rules_client/rules_client.ts b/x-pack/plugins/alerting/server/rules_client/rules_client.ts index d773ce86d46e1..30861ee1ab5a4 100644 --- a/x-pack/plugins/alerting/server/rules_client/rules_client.ts +++ b/x-pack/plugins/alerting/server/rules_client/rules_client.ts @@ -1249,13 +1249,13 @@ export class RulesClient { !isEqual(alertSavedObject.attributes.schedule, updateResult.schedule) ) { try { - await this.taskManager.bulkUpdateSchedules( + const { tasks } = await this.taskManager.bulkUpdateSchedules( [updateResult.scheduledTaskId], updateResult.schedule ); this.logger.debug( - `Rule update has rescheduled the underlying task: ${updateResult.scheduledTaskId}` + `Rule update has rescheduled the underlying task: ${updateResult.scheduledTaskId} to run at: ${tasks?.[0]?.runAt}` ); } catch (err) { this.logger.error( From 6bdcc9f3b4592a203945eaa8005e0ce1f156c9be Mon Sep 17 00:00:00 2001 From: Vitalii Dmyterko Date: Wed, 15 Jun 2022 20:00:40 +0100 Subject: [PATCH 23/23] fix test --- .../plugins/alerting/server/rules_client/tests/update.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugins/alerting/server/rules_client/tests/update.test.ts b/x-pack/plugins/alerting/server/rules_client/tests/update.test.ts index b7e19492e5cf5..28b7ad273b456 100644 --- a/x-pack/plugins/alerting/server/rules_client/tests/update.test.ts +++ b/x-pack/plugins/alerting/server/rules_client/tests/update.test.ts @@ -1683,7 +1683,7 @@ describe('update()', () => { expect(taskManager.bulkUpdateSchedules).toHaveBeenCalled(); expect(rulesClientParams.logger.error).toHaveBeenCalledWith( - `Alert update failed to run its underlying task. TaskManager runNow failed with Error: Failed to run alert` + `Rule update failed to run its underlying task. TaskManager bulkUpdateSchedules failed with Error: Failed to run alert` ); }); });