Skip to content

Commit

Permalink
[Task Manager] Mark task as failed if maxAttempts has been met. (elas…
Browse files Browse the repository at this point in the history
…tic#80681)

* wip

* Adding updateFieldsAndMarkAsFailed function

* Updating UBQ

* Only updating retryAt if marking as claiming

* Updating query

* Updating query to only fail one time tasks that have exceeded max attempts

* Fixing tests

* Fixing tests

* Handling claiming tasks by id

* Removing unused function

Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>
  • Loading branch information
ymao1 and kibanamachine committed Oct 27, 2020
1 parent 79aa050 commit 875f759
Show file tree
Hide file tree
Showing 7 changed files with 177 additions and 167 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,12 @@
*/

import _ from 'lodash';
import {
asUpdateByQuery,
shouldBeOneOf,
mustBeAllOf,
ExistsFilter,
TermFilter,
RangeFilter,
} from './query_clauses';
import { asUpdateByQuery, shouldBeOneOf, mustBeAllOf } from './query_clauses';

import {
updateFields,
updateFieldsAndMarkAsFailed,
IdleTaskWithExpiredRunAt,
RunningOrClaimingTaskWithExpiredRetryAt,
TaskWithSchedule,
taskWithLessThanMaxAttempts,
SortByRunAtAndRetryAt,
} from './mark_available_tasks_as_claimed';

Expand All @@ -40,29 +31,29 @@ describe('mark_available_tasks_as_claimed', () => {
createTaskRunner: () => ({ run: () => Promise.resolve() }),
},
});
const claimTasksById = undefined;
const defaultMaxAttempts = 1;
const taskManagerId = '3478fg6-82374f6-83467gf5-384g6f';
const claimOwnershipUntil = '2019-02-12T21:01:22.479Z';
const fieldUpdates = {
ownerId: taskManagerId,
retryAt: claimOwnershipUntil,
};

expect(
asUpdateByQuery({
query: mustBeAllOf(
// Either a task with idle status and runAt <= now or
// status running or claiming with a retryAt <= now.
shouldBeOneOf(IdleTaskWithExpiredRunAt, RunningOrClaimingTaskWithExpiredRetryAt),
// Either task has an schedule or the attempts < the maximum configured
shouldBeOneOf<ExistsFilter | TermFilter | RangeFilter>(
TaskWithSchedule,
...Array.from(definitions).map(([type, { maxAttempts }]) =>
taskWithLessThanMaxAttempts(type, maxAttempts || defaultMaxAttempts)
)
)
shouldBeOneOf(IdleTaskWithExpiredRunAt, RunningOrClaimingTaskWithExpiredRetryAt)
),
update: updateFieldsAndMarkAsFailed(
fieldUpdates,
claimTasksById || [],
Array.from(definitions).reduce((accumulator, [type, { maxAttempts }]) => {
return { ...accumulator, [type]: maxAttempts || defaultMaxAttempts };
}, {})
),
update: updateFields({
ownerId: taskManagerId,
status: 'claiming',
retryAt: claimOwnershipUntil,
}),
sort: SortByRunAtAndRetryAt,
})
).toEqual({
Expand Down Expand Up @@ -100,42 +91,6 @@ describe('mark_available_tasks_as_claimed', () => {
],
},
},
// Either task has an recurring schedule or the attempts < the maximum configured
{
bool: {
should: [
{ exists: { field: 'task.schedule' } },
{
bool: {
must: [
{ term: { 'task.taskType': 'sampleTask' } },
{
range: {
'task.attempts': {
lt: 5,
},
},
},
],
},
},
{
bool: {
must: [
{ term: { 'task.taskType': 'otherTask' } },
{
range: {
'task.attempts': {
lt: 1,
},
},
},
],
},
},
],
},
},
],
},
},
Expand All @@ -158,12 +113,26 @@ if (doc['task.runAt'].size()!=0) {
},
seq_no_primary_term: true,
script: {
source: `ctx._source.task.ownerId=params.ownerId; ctx._source.task.status=params.status; ctx._source.task.retryAt=params.retryAt;`,
source: `
if (ctx._source.task.schedule != null || ctx._source.task.attempts < params.taskMaxAttempts[ctx._source.task.taskType] || params.claimTasksById.contains(ctx._id)) {
ctx._source.task.status = "claiming"; ${Object.keys(fieldUpdates)
.map((field) => `ctx._source.task.${field}=params.fieldUpdates.${field};`)
.join(' ')}
} else {
ctx._source.task.status = "failed";
}
`,
lang: 'painless',
params: {
ownerId: taskManagerId,
retryAt: claimOwnershipUntil,
status: 'claiming',
fieldUpdates: {
ownerId: taskManagerId,
retryAt: claimOwnershipUntil,
},
claimTasksById: [],
taskMaxAttempts: {
sampleTask: 5,
otherTask: 1,
},
},
},
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,26 @@ if (doc['task.runAt'].size()!=0) {
},
};

export const updateFields = (fieldUpdates: {
[field: string]: string | number | Date;
}): ScriptClause => ({
source: Object.keys(fieldUpdates)
.map((field) => `ctx._source.task.${field}=params.${field};`)
.join(' '),
export const updateFieldsAndMarkAsFailed = (
fieldUpdates: {
[field: string]: string | number | Date;
},
claimTasksById: string[],
taskMaxAttempts: { [field: string]: number }
): ScriptClause => ({
source: `
if (ctx._source.task.schedule != null || ctx._source.task.attempts < params.taskMaxAttempts[ctx._source.task.taskType] || params.claimTasksById.contains(ctx._id)) {
ctx._source.task.status = "claiming"; ${Object.keys(fieldUpdates)
.map((field) => `ctx._source.task.${field}=params.fieldUpdates.${field};`)
.join(' ')}
} else {
ctx._source.task.status = "failed";
}
`,
lang: 'painless',
params: fieldUpdates,
params: {
fieldUpdates,
claimTasksById,
taskMaxAttempts,
},
});
7 changes: 6 additions & 1 deletion x-pack/plugins/task_manager/server/queries/query_clauses.ts
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,12 @@ export interface ScriptClause {
source: string;
lang: string;
params: {
[field: string]: string | number | Date;
[field: string]:
| string
| number
| Date
| string[]
| { [field: string]: string | number | Date };
};
}

Expand Down
126 changes: 51 additions & 75 deletions x-pack/plugins/task_manager/server/task_store.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -415,41 +415,6 @@ describe('TaskStore', () => {
],
},
},
{
bool: {
should: [
{ exists: { field: 'task.schedule' } },
{
bool: {
must: [
{ term: { 'task.taskType': 'foo' } },
{
range: {
'task.attempts': {
lt: maxAttempts,
},
},
},
],
},
},
{
bool: {
must: [
{ term: { 'task.taskType': 'bar' } },
{
range: {
'task.attempts': {
lt: customMaxAttempts,
},
},
},
],
},
},
],
},
},
],
},
},
Expand Down Expand Up @@ -501,6 +466,11 @@ if (doc['task.runAt'].size()!=0) {
const maxAttempts = _.random(2, 43);
const customMaxAttempts = _.random(44, 100);
const definitions = new TaskTypeDictionary(mockLogger());
const taskManagerId = uuid.v1();
const fieldUpdates = {
ownerId: taskManagerId,
retryAt: new Date(Date.now()),
};
definitions.registerTaskDefinitions({
foo: {
title: 'foo',
Expand All @@ -514,10 +484,11 @@ if (doc['task.runAt'].size()!=0) {
});
const {
args: {
updateByQuery: { body: { query, sort } = {} },
updateByQuery: { body: { query, script, sort } = {} },
},
} = await testClaimAvailableTasks({
opts: {
taskManagerId,
maxAttempts,
definitions,
},
Expand Down Expand Up @@ -576,41 +547,6 @@ if (doc['task.runAt'].size()!=0) {
],
},
},
{
bool: {
should: [
{ exists: { field: 'task.schedule' } },
{
bool: {
must: [
{ term: { 'task.taskType': 'foo' } },
{
range: {
'task.attempts': {
lt: maxAttempts,
},
},
},
],
},
},
{
bool: {
must: [
{ term: { 'task.taskType': 'bar' } },
{
range: {
'task.attempts': {
lt: customMaxAttempts,
},
},
},
],
},
},
],
},
},
],
},
},
Expand Down Expand Up @@ -640,6 +576,30 @@ if (doc['task.runAt'].size()!=0) {
},
});

expect(script).toMatchObject({
source: `
if (ctx._source.task.schedule != null || ctx._source.task.attempts < params.taskMaxAttempts[ctx._source.task.taskType] || params.claimTasksById.contains(ctx._id)) {
ctx._source.task.status = "claiming"; ${Object.keys(fieldUpdates)
.map((field) => `ctx._source.task.${field}=params.fieldUpdates.${field};`)
.join(' ')}
} else {
ctx._source.task.status = "failed";
}
`,
lang: 'painless',
params: {
fieldUpdates,
claimTasksById: [
'task:33c6977a-ed6d-43bd-98d9-3f827f7b7cd8',
'task:a208b22c-14ec-4fb4-995f-d2ff7a3b03b8',
],
taskMaxAttempts: {
bar: customMaxAttempts,
foo: maxAttempts,
},
},
});

expect(sort).toMatchObject([
'_score',
{
Expand All @@ -665,6 +625,10 @@ if (doc['task.runAt'].size()!=0) {
test('it claims tasks by setting their ownerId, status and retryAt', async () => {
const taskManagerId = uuid.v1();
const claimOwnershipUntil = new Date(Date.now());
const fieldUpdates = {
ownerId: taskManagerId,
retryAt: claimOwnershipUntil,
};
const {
args: {
updateByQuery: { body: { script } = {} },
Expand All @@ -679,12 +643,24 @@ if (doc['task.runAt'].size()!=0) {
},
});
expect(script).toMatchObject({
source: `ctx._source.task.ownerId=params.ownerId; ctx._source.task.status=params.status; ctx._source.task.retryAt=params.retryAt;`,
source: `
if (ctx._source.task.schedule != null || ctx._source.task.attempts < params.taskMaxAttempts[ctx._source.task.taskType] || params.claimTasksById.contains(ctx._id)) {
ctx._source.task.status = "claiming"; ${Object.keys(fieldUpdates)
.map((field) => `ctx._source.task.${field}=params.fieldUpdates.${field};`)
.join(' ')}
} else {
ctx._source.task.status = "failed";
}
`,
lang: 'painless',
params: {
ownerId: taskManagerId,
retryAt: claimOwnershipUntil,
status: 'claiming',
fieldUpdates,
claimTasksById: [],
taskMaxAttempts: {
dernstraight: 2,
report: 2,
yawn: 2,
},
},
});
});
Expand Down
Loading

0 comments on commit 875f759

Please sign in to comment.