Skip to content

Commit

Permalink
Cleanup unused code for claiming tasks by id (#144408)
Browse files Browse the repository at this point in the history
In this PR, I'm cleaning up code within Task Manager regarding claiming
tasks by id that is no longer used. The code was previously used for
running alerting rules right away but recently `runNow` got replaced
with `runSoon` which no longer needs this code to function. For more
info on the previous change, see
#133550.
  • Loading branch information
mikecote authored Nov 9, 2022
1 parent 08eb63a commit daf9322
Show file tree
Hide file tree
Showing 10 changed files with 22 additions and 755 deletions.
1 change: 0 additions & 1 deletion x-pack/plugins/task_manager/server/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,6 @@ export class TaskManagerPlugin
taskStore,
middleware: this.middleware,
ephemeralTaskLifecycle: this.ephemeralTaskLifecycle,
definitions: this.definitions,
taskManagerId: taskStore.taskManagerId,
});

Expand Down
6 changes: 2 additions & 4 deletions x-pack/plugins/task_manager/server/polling_lifecycle.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -206,9 +206,7 @@ describe('TaskPollingLifecycle', () => {
)
);

expect(
isOk(await getFirstAsPromise(claimAvailableTasks([], taskClaiming, logger)))
).toBeTruthy();
expect(isOk(await getFirstAsPromise(claimAvailableTasks(taskClaiming, logger)))).toBeTruthy();

expect(taskClaiming.claimAvailableTasksIfCapacityIsAvailable).toHaveBeenCalledTimes(1);
});
Expand Down Expand Up @@ -266,7 +264,7 @@ describe('TaskPollingLifecycle', () => {
})
);

const err = await getFirstAsPromise(claimAvailableTasks([], taskClaiming, logger));
const err = await getFirstAsPromise(claimAvailableTasks(taskClaiming, logger));

expect(isErr(err)).toBeTruthy();
expect((err as Err<FillPoolResult>).error).toEqual(FillPoolResult.Failed);
Expand Down
8 changes: 1 addition & 7 deletions x-pack/plugins/task_manager/server/polling_lifecycle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -254,11 +254,7 @@ export class TaskPollingLifecycle {
return fillPool(
// claim available tasks
() => {
return claimAvailableTasks(
tasksToClaim.splice(0, this.pool.availableWorkers),
this.taskClaiming,
this.logger
).pipe(
return claimAvailableTasks(this.taskClaiming, this.logger).pipe(
tap(
mapOk(({ timing }: ClaimOwnershipResult) => {
if (timing) {
Expand Down Expand Up @@ -313,15 +309,13 @@ export class TaskPollingLifecycle {
}

export function claimAvailableTasks(
claimTasksById: string[],
taskClaiming: TaskClaiming,
logger: Logger
): Observable<Result<ClaimOwnershipResult, FillPoolResult>> {
return new Observable((observer) => {
taskClaiming
.claimAvailableTasksIfCapacityIsAvailable({
claimOwnershipUntil: intervalFromNow('30s')!,
claimTasksById,
})
.subscribe(
(claimResult) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ describe('mark_available_tasks_as_claimed', () => {
createTaskRunner: () => ({ run: () => Promise.resolve() }),
},
});
const claimTasksById = undefined;
const defaultMaxAttempts = 1;
const taskManagerId = '3478fg6-82374f6-83467gf5-384g6f';
const claimOwnershipUntil = '2019-02-12T21:01:22.479Z';
Expand All @@ -62,7 +61,6 @@ describe('mark_available_tasks_as_claimed', () => {
),
script: updateFieldsAndMarkAsFailed({
fieldUpdates,
claimTasksById: claimTasksById || [],
claimableTaskTypes: definitions.getAllTypes(),
skippedTaskTypes: [],
unusedTaskTypes: [],
Expand Down Expand Up @@ -140,7 +138,7 @@ if (doc['task.runAt'].size()!=0) {
script: {
source: `
if (params.claimableTaskTypes.contains(ctx._source.task.taskType)) {
if (ctx._source.task.schedule != null || ctx._source.task.attempts < params.taskMaxAttempts[ctx._source.task.taskType] || params.claimTasksById.contains(ctx._id)) {
if (ctx._source.task.schedule != null || ctx._source.task.attempts < params.taskMaxAttempts[ctx._source.task.taskType]) {
if(ctx._source.task.retryAt != null && ZonedDateTime.parse(ctx._source.task.retryAt).toInstant().toEpochMilli() < params.now) {
ctx._source.task.scheduledAt=ctx._source.task.retryAt;
} else {
Expand All @@ -152,15 +150,6 @@ if (doc['task.runAt'].size()!=0) {
} else {
ctx._source.task.status = "failed";
}
} else if (params.skippedTaskTypes.contains(ctx._source.task.taskType) && params.claimTasksById.contains(ctx._id)) {
if(ctx._source.task.retryAt != null && ZonedDateTime.parse(ctx._source.task.retryAt).toInstant().toEpochMilli() < params.now) {
ctx._source.task.scheduledAt=ctx._source.task.retryAt;
} else {
ctx._source.task.scheduledAt=ctx._source.task.runAt;
}
ctx._source.task.status = "claiming"; ${Object.keys(fieldUpdates)
.map((field) => `ctx._source.task.${field}=params.fieldUpdates.${field};`)
.join(' ')}
} else if (params.unusedTaskTypes.contains(ctx._source.task.taskType)) {
ctx._source.task.status = "unrecognized";
} else {
Expand All @@ -173,7 +162,6 @@ if (doc['task.runAt'].size()!=0) {
ownerId: taskManagerId,
retryAt: claimOwnershipUntil,
},
claimTasksById: [],
claimableTaskTypes: ['sampleTask', 'otherTask'],
skippedTaskTypes: [],
unusedTaskTypes: [],
Expand All @@ -187,79 +175,6 @@ if (doc['task.runAt'].size()!=0) {
});

describe(`script`, () => {
test('it supports claiming specific tasks by id', async () => {
const taskManagerId = '3478fg6-82374f6-83467gf5-384g6f';
const claimOwnershipUntil = '2019-02-12T21:01:22.479Z';
const fieldUpdates = {
ownerId: taskManagerId,
retryAt: claimOwnershipUntil,
};

const claimTasksById = [
'33c6977a-ed6d-43bd-98d9-3f827f7b7cd8',
'a208b22c-14ec-4fb4-995f-d2ff7a3b03b8',
];

expect(
updateFieldsAndMarkAsFailed({
fieldUpdates,
claimTasksById,
claimableTaskTypes: ['foo', 'bar'],
skippedTaskTypes: [],
unusedTaskTypes: [],
taskMaxAttempts: {
foo: 5,
bar: 2,
},
})
).toMatchObject({
source: `
if (params.claimableTaskTypes.contains(ctx._source.task.taskType)) {
if (ctx._source.task.schedule != null || ctx._source.task.attempts < params.taskMaxAttempts[ctx._source.task.taskType] || params.claimTasksById.contains(ctx._id)) {
if(ctx._source.task.retryAt != null && ZonedDateTime.parse(ctx._source.task.retryAt).toInstant().toEpochMilli() < params.now) {
ctx._source.task.scheduledAt=ctx._source.task.retryAt;
} else {
ctx._source.task.scheduledAt=ctx._source.task.runAt;
}
ctx._source.task.status = "claiming"; ${Object.keys(fieldUpdates)
.map((field) => `ctx._source.task.${field}=params.fieldUpdates.${field};`)
.join(' ')}
} else {
ctx._source.task.status = "failed";
}
} else if (params.skippedTaskTypes.contains(ctx._source.task.taskType) && params.claimTasksById.contains(ctx._id)) {
if(ctx._source.task.retryAt != null && ZonedDateTime.parse(ctx._source.task.retryAt).toInstant().toEpochMilli() < params.now) {
ctx._source.task.scheduledAt=ctx._source.task.retryAt;
} else {
ctx._source.task.scheduledAt=ctx._source.task.runAt;
}
ctx._source.task.status = "claiming"; ${Object.keys(fieldUpdates)
.map((field) => `ctx._source.task.${field}=params.fieldUpdates.${field};`)
.join(' ')}
} else if (params.unusedTaskTypes.contains(ctx._source.task.taskType)) {
ctx._source.task.status = "unrecognized";
} else {
ctx.op = "noop";
}`,
lang: 'painless',
params: {
now: 0,
fieldUpdates,
claimTasksById: [
'33c6977a-ed6d-43bd-98d9-3f827f7b7cd8',
'a208b22c-14ec-4fb4-995f-d2ff7a3b03b8',
],
claimableTaskTypes: ['foo', 'bar'],
skippedTaskTypes: [],
unusedTaskTypes: [],
taskMaxAttempts: {
foo: 5,
bar: 2,
},
},
});
});

test('it marks the update as a noop if the type is skipped', async () => {
const taskManagerId = '3478fg6-82374f6-83467gf5-384g6f';
const claimOwnershipUntil = '2019-02-12T21:01:22.479Z';
Expand All @@ -271,7 +186,6 @@ if (doc['task.runAt'].size()!=0) {
expect(
updateFieldsAndMarkAsFailed({
fieldUpdates,
claimTasksById: [],
claimableTaskTypes: ['foo', 'bar'],
skippedTaskTypes: [],
unusedTaskTypes: [],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@ export interface UpdateFieldsAndMarkAsFailedOpts {
fieldUpdates: {
[field: string]: string | number | Date;
};
claimTasksById: string[];
claimableTaskTypes: string[];
skippedTaskTypes: string[];
unusedTaskTypes: string[];
Expand All @@ -129,7 +128,6 @@ export interface UpdateFieldsAndMarkAsFailedOpts {

export const updateFieldsAndMarkAsFailed = ({
fieldUpdates,
claimTasksById,
claimableTaskTypes,
skippedTaskTypes,
unusedTaskTypes,
Expand All @@ -148,13 +146,11 @@ export const updateFieldsAndMarkAsFailed = ({
return {
source: `
if (params.claimableTaskTypes.contains(ctx._source.task.taskType)) {
if (ctx._source.task.schedule != null || ctx._source.task.attempts < params.taskMaxAttempts[ctx._source.task.taskType] || params.claimTasksById.contains(ctx._id)) {
if (ctx._source.task.schedule != null || ctx._source.task.attempts < params.taskMaxAttempts[ctx._source.task.taskType]) {
${setScheduledAtAndMarkAsClaimed}
} else {
ctx._source.task.status = "failed";
}
} else if (params.skippedTaskTypes.contains(ctx._source.task.taskType) && params.claimTasksById.contains(ctx._id)) {
${setScheduledAtAndMarkAsClaimed}
} else if (params.unusedTaskTypes.contains(ctx._source.task.taskType)) {
ctx._source.task.status = "unrecognized";
} else {
Expand All @@ -164,7 +160,6 @@ export const updateFieldsAndMarkAsFailed = ({
params: {
now: new Date().getTime(),
fieldUpdates,
claimTasksById,
claimableTaskTypes,
skippedTaskTypes,
unusedTaskTypes,
Expand Down
12 changes: 0 additions & 12 deletions x-pack/plugins/task_manager/server/queries/query_clauses.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,3 @@ export function filterDownBy(...filter: estypes.QueryDslQueryContainer[]) {
},
};
}

export function asPinnedQuery(
ids: estypes.QueryDslPinnedQuery['ids'],
organic: estypes.QueryDslPinnedQuery['organic']
): Pick<estypes.QueryDslQueryContainer, 'pinned'> {
return {
pinned: {
ids,
organic,
},
};
}
Loading

0 comments on commit daf9322

Please sign in to comment.