Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RAM] adds bulkUpdatesSchedules method to Task Manager API #132637

Merged
merged 29 commits into from
Jun 13, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
2759c2e
init taskManager bulk
vitaliidm May 20, 2022
d24c2f3
updates and fixes
vitaliidm May 23, 2022
ff55e01
Merge branch 'main' into task-manager-bulk-schedules
vitaliidm May 23, 2022
6efc5fb
fix the rest of tests
vitaliidm May 23, 2022
655a753
Merge branch 'main' into task-manager-bulk-schedules
vitaliidm May 23, 2022
74c5a7c
Merge branch 'main' into task-manager-bulk-schedules
vitaliidm May 26, 2022
7e1e7c7
add unit tests
vitaliidm May 26, 2022
815854b
tests!!!
vitaliidm May 30, 2022
79fc692
refactor it
vitaliidm May 30, 2022
8197409
Merge branch 'main' into task-manager-bulk-schedules
vitaliidm May 30, 2022
e9d4426
add test to rukes_client
vitaliidm May 31, 2022
51469cc
Merge branch 'task-manager-bulk-schedules' of https://github.com/vita…
vitaliidm May 31, 2022
0b96f46
tests, more tests
vitaliidm May 31, 2022
9f1ccf7
README, docs
vitaliidm May 31, 2022
edc7890
skip again
vitaliidm Jun 1, 2022
d20f2fd
add rest of ops
vitaliidm Jun 1, 2022
f572388
Merge branch 'main' into task-manager-bulk-schedules
vitaliidm Jun 1, 2022
f2bc696
tests
vitaliidm Jun 1, 2022
0b44ef1
comments updates
vitaliidm Jun 6, 2022
cb5411a
JSDoc
vitaliidm Jun 6, 2022
bdeadf6
few perf improvements
vitaliidm Jun 6, 2022
0563f88
Merge branch 'main' into task-manager-bulk-schedules
vitaliidm Jun 6, 2022
bd36d9a
Merge branch 'main' into task-manager-bulk-schedules
vitaliidm Jun 8, 2022
64e7adb
CR: replace auditLogger with logger.error
vitaliidm Jun 8, 2022
6fb6bee
Merge branch 'main' into task-manager-bulk-schedules
vitaliidm Jun 9, 2022
fc2f119
CR: minor suggestions addressed
vitaliidm Jun 13, 2022
eb4af87
Merge branch 'main' into task-manager-bulk-schedules
vitaliidm Jun 13, 2022
0e2d63a
CR: fix tests
vitaliidm Jun 13, 2022
52b794a
CR: add functional test for task in running status
vitaliidm Jun 13, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions x-pack/plugins/alerting/server/routes/bulk_edit_rules.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ import { ILicenseState, RuleTypeDisabledError } from '../lib';
import { verifyAccessAndContext, rewriteRule, handleDisabledApiKeysError } from './lib';
import { AlertingRequestHandlerContext, INTERNAL_BASE_ALERTING_API_PATH } from '../types';

const scheduleSchema = schema.object({
interval: schema.string(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have a schema config validator we use for intervals, as seen here:

interval: schema.string({ validate: validateDurationSchema }),

Using that, a bad interval sent will "fail fast" and not somewhere else, deeper in the framework.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, I've replaced it with validateDurationSchema

});

const ruleActionSchema = schema.object({
group: schema.string(),
id: schema.string(),
Expand All @@ -34,6 +38,11 @@ const operationsSchema = schema.arrayOf(
field: schema.literal('actions'),
value: schema.arrayOf(ruleActionSchema),
}),
schema.object({
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently functional tests are skipped due to #132195
I plan to fix them and add additional tests in the following PR, so won't mix everything together.

I can do it in this PR, if it preferable

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For reviewers:
I believe this is the follow up PR @vitaliidm is referring to 😉
https://github.com/elastic/kibana/pull/133635/files

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer to see them here, but TBH doesn't make sense if they're skipped, so ok in the follow-up. I added a comment to that PR as a reminder.

operation: schema.literal('set'),
field: schema.literal('schedule'),
value: scheduleSchema,
}),
]),
{ minSize: 1 }
);
Expand Down
33 changes: 32 additions & 1 deletion x-pack/plugins/alerting/server/rules_client/rules_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ export interface FindOptions extends IndexType {
filter?: string;
}

export type BulkEditFields = keyof Pick<Rule, 'actions' | 'tags'>;
export type BulkEditFields = keyof Pick<Rule, 'actions' | 'tags' | 'schedule'>;

export type BulkEditOperation =
| {
Expand All @@ -223,6 +223,11 @@ export type BulkEditOperation =
operation: 'add' | 'set';
field: Extract<BulkEditFields, 'actions'>;
value: NormalizedAlertAction[];
}
| {
operation: 'set';
field: Extract<BulkEditFields, 'schedule'>;
value: Rule['schedule'];
};

// schedule, throttle, notifyWhen is commented out before https://github.com/elastic/kibana/issues/124850 will be implemented
Expand Down Expand Up @@ -1494,6 +1499,32 @@ export class RulesClient {
);
});

// update schedules only if schedule operation is present
const scheduleOperation = options.operations.find(
(op): op is Extract<BulkEditOperation, { field: Extract<BulkEditFields, 'schedule'> }> =>
op.field === 'schedule'
);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what other operations should affect rescheduling of tasks?

Should notifyWhen or throttle trigger this bulkUpdateSchedules as well?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only logic that causes a rule's task to get rescheduled at this time is when the rule interval changes. We should be ok to skip the other operations.


if (scheduleOperation?.value) {
const taskIds = updatedRules.reduce<string[]>((acc, rule) => {
if (rule.scheduledTaskId) {
acc.push(rule.scheduledTaskId);
}
return acc;
}, []);

try {
await this.taskManager.bulkUpdateSchedules(taskIds, scheduleOperation.value);
} catch (error) {
this.auditLogger?.log(
ymao1 marked this conversation as resolved.
Show resolved Hide resolved
ruleAuditEvent({
action: RuleAuditAction.BULK_EDIT,
error,
})
);
}
}

return { rules: updatedRules, errors, total };
}

Expand Down
3 changes: 2 additions & 1 deletion x-pack/plugins/task_manager/server/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ export interface TaskManagerSetupContract {

export type TaskManagerStartContract = Pick<
TaskScheduling,
'schedule' | 'runNow' | 'ephemeralRunNow' | 'ensureScheduled'
'schedule' | 'runNow' | 'ephemeralRunNow' | 'ensureScheduled' | 'bulkUpdateSchedules'
> &
Pick<TaskStore, 'fetch' | 'get' | 'remove'> & {
removeIfExists: TaskStore['remove'];
Expand Down Expand Up @@ -238,6 +238,7 @@ export class TaskManagerPlugin
schedule: (...args) => taskScheduling.schedule(...args),
ensureScheduled: (...args) => taskScheduling.ensureScheduled(...args),
runNow: (...args) => taskScheduling.runNow(...args),
bulkUpdateSchedules: (...args) => taskScheduling.bulkUpdateSchedules(...args),
ephemeralRunNow: (task: EphemeralTask) => taskScheduling.ephemeralRunNow(task),
supportsEphemeralTasks: () => this.config.ephemeral_tasks.enabled,
};
Expand Down
54 changes: 51 additions & 3 deletions x-pack/plugins/task_manager/server/task_scheduling.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,16 @@
*/

import { filter, take } from 'rxjs/operators';

import pMap from 'p-map';
import { pipe } from 'fp-ts/lib/pipeable';
import { Option, map as mapOptional, getOrElse, isSome } from 'fp-ts/lib/Option';

import uuid from 'uuid';
import { pick } from 'lodash';
import { pick, chunk } from 'lodash';
import { merge, Subject } from 'rxjs';
import agent from 'elastic-apm-node';
import { Logger } from '@kbn/core/server';
import { mustBeAllOf } from './queries/query_clauses';
import { asOk, either, map, mapErr, promiseResult, isErr } from './lib/result_type';
import {
isTaskRunEvent,
Expand All @@ -28,6 +29,7 @@ import {
TaskClaimErrorType,
} from './task_events';
import { Middleware } from './lib/middleware';
import { parseIntervalAsMillisecond } from './lib/intervals';
import {
ConcreteTaskInstance,
TaskInstanceWithId,
Expand All @@ -36,8 +38,9 @@ import {
TaskLifecycleResult,
TaskStatus,
EphemeralTask,
IntervalSchedule,
} from './task';
import { TaskStore } from './task_store';
import { TaskStore, BulkUpdateResult } from './task_store';
import { ensureDeprecatedFieldsAreCorrected } from './lib/correct_deprecated_fields';
import { TaskLifecycleEvent, TaskPollingLifecycle } from './polling_lifecycle';
import { TaskTypeDictionary } from './task_type_dictionary';
Expand Down Expand Up @@ -111,6 +114,51 @@ export class TaskScheduling {
});
}

/**
* Bulk updates schedules for tasks by ids.
*
* @param taskIs - list of task ids
* @param schedule - new schedule
* @returns {Promise<ConcreteTaskInstance[]>}
*/
public async bulkUpdateSchedules(
taskIds: string[],
schedule: IntervalSchedule
): Promise<BulkUpdateResult[]> {
const tasks = await pMap(chunk(taskIds, 100), async (taskIdsChunk) =>
this.store.fetch({
query: mustBeAllOf(
{
terms: {
_id: taskIdsChunk.map((taskId) => `task:${taskId}`),
},
},
{
term: {
'task.status': 'idle',
},
}
),
size: 100,
})
);

const updatedTasks = tasks
.flatMap(({ docs }) => docs)
.map((task) => {
const oldInterval = parseIntervalAsMillisecond(task.schedule?.interval ?? '0s');

const newRunAtInMs = Math.max(
Date.now(),
task.runAt.getTime() - oldInterval + parseIntervalAsMillisecond(schedule.interval)
);

return { ...task, schedule, runAt: new Date(newRunAtInMs) };
});

return this.store.bulkUpdate(updatedTasks);
}

/**
* Run task.
*
Expand Down