Skip to content

Commit

Permalink
[Task manager] Prevents edge case where already running tasks are res…
Browse files Browse the repository at this point in the history
…chedule every polling interval (elastic#74606)

Fixes flaky tests in Task Manager and Alerting.

The fix in elastic#73244 was correct, but it missed an edge case which causes the already running task to be rescheduled over and over.

This prevents that edge case which was effecting both TM in general and Alerting specifically.
  • Loading branch information
gmmorris committed Aug 13, 2020
1 parent 0c21024 commit 06fb025
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 38 deletions.
104 changes: 100 additions & 4 deletions x-pack/plugins/task_manager/server/task_store.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -627,7 +627,7 @@ if (doc['task.runAt'].size()!=0) {
});
});

test('it returns task objects', async () => {
test('it filters out running tasks', async () => {
const taskManagerId = uuid.v1();
const claimOwnershipUntil = new Date(Date.now());
const runAt = new Date();
Expand All @@ -641,7 +641,7 @@ if (doc['task.runAt'].size()!=0) {
taskType: 'foo',
schedule: undefined,
attempts: 0,
status: 'idle',
status: 'claiming',
params: '{ "hello": "world" }',
state: '{ "baby": "Henhen" }',
user: 'jimbo',
Expand Down Expand Up @@ -715,7 +715,103 @@ if (doc['task.runAt'].size()!=0) {
runAt,
scope: ['reporting'],
state: { baby: 'Henhen' },
status: 'idle',
status: 'claiming',
taskType: 'foo',
user: 'jimbo',
ownerId: taskManagerId,
},
]);
});

test('it returns task objects', async () => {
const taskManagerId = uuid.v1();
const claimOwnershipUntil = new Date(Date.now());
const runAt = new Date();
const tasks = [
{
_id: 'aaa',
_source: {
type: 'task',
task: {
runAt,
taskType: 'foo',
schedule: undefined,
attempts: 0,
status: 'claiming',
params: '{ "hello": "world" }',
state: '{ "baby": "Henhen" }',
user: 'jimbo',
scope: ['reporting'],
ownerId: taskManagerId,
},
},
_seq_no: 1,
_primary_term: 2,
sort: ['a', 1],
},
{
_id: 'bbb',
_source: {
type: 'task',
task: {
runAt,
taskType: 'bar',
schedule: { interval: '5m' },
attempts: 2,
status: 'claiming',
params: '{ "shazm": 1 }',
state: '{ "henry": "The 8th" }',
user: 'dabo',
scope: ['reporting', 'ceo'],
ownerId: taskManagerId,
},
},
_seq_no: 3,
_primary_term: 4,
sort: ['b', 2],
},
];
const {
result: { docs },
args: {
search: {
body: { query },
},
},
} = await testClaimAvailableTasks({
opts: {
taskManagerId,
},
claimingOpts: {
claimOwnershipUntil,
size: 10,
},
hits: tasks,
});

expect(query.bool.must).toContainEqual({
bool: {
must: [
{
term: {
'task.ownerId': taskManagerId,
},
},
{ term: { 'task.status': 'claiming' } },
],
},
});

expect(docs).toMatchObject([
{
attempts: 0,
id: 'aaa',
schedule: undefined,
params: { hello: 'world' },
runAt,
scope: ['reporting'],
state: { baby: 'Henhen' },
status: 'claiming',
taskType: 'foo',
user: 'jimbo',
ownerId: taskManagerId,
Expand All @@ -728,7 +824,7 @@ if (doc['task.runAt'].size()!=0) {
runAt,
scope: ['reporting', 'ceo'],
state: { henry: 'The 8th' },
status: 'running',
status: 'claiming',
taskType: 'bar',
user: 'dabo',
ownerId: taskManagerId,
Expand Down
55 changes: 23 additions & 32 deletions x-pack/plugins/task_manager/server/task_store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -216,48 +216,39 @@ export class TaskStore {
claimTasksByIdWithRawIds,
size
);

const docs =
numberOfTasksClaimed > 0
? await this.sweepForClaimedTasks(claimTasksByIdWithRawIds, size)
: [];

// emit success/fail events for claimed tasks by id
if (claimTasksById && claimTasksById.length) {
const [documentsReturnedById, documentsClaimedBySchedule] = partition(docs, (doc) =>
claimTasksById.includes(doc.id)
);

const [documentsClaimedById, documentsRequestedButNotClaimed] = partition(
documentsReturnedById,
// we filter the schduled tasks down by status is 'claiming' in the esearch,
// but we do not apply this limitation on tasks claimed by ID so that we can
// provide more detailed error messages when we fail to claim them
(doc) => doc.status === TaskStatus.Claiming
);

const documentsRequestedButNotReturned = difference(
claimTasksById,
map(documentsReturnedById, 'id')
);
const [documentsReturnedById, documentsClaimedBySchedule] = partition(docs, (doc) =>
claimTasksById.includes(doc.id)
);

this.emitEvents(
[...documentsClaimedById, ...documentsClaimedBySchedule].map((doc) =>
asTaskClaimEvent(doc.id, asOk(doc))
)
);
const [documentsClaimedById, documentsRequestedButNotClaimed] = partition(
documentsReturnedById,
// we filter the schduled tasks down by status is 'claiming' in the esearch,
// but we do not apply this limitation on tasks claimed by ID so that we can
// provide more detailed error messages when we fail to claim them
(doc) => doc.status === TaskStatus.Claiming
);

this.emitEvents(
documentsRequestedButNotClaimed.map((doc) => asTaskClaimEvent(doc.id, asErr(some(doc))))
);
const documentsRequestedButNotReturned = difference(
claimTasksById,
map(documentsReturnedById, 'id')
);

this.emitEvents(
documentsRequestedButNotReturned.map((id) => asTaskClaimEvent(id, asErr(none)))
);
}
this.emitEvents([
...documentsClaimedById.map((doc) => asTaskClaimEvent(doc.id, asOk(doc))),
...documentsClaimedBySchedule.map((doc) => asTaskClaimEvent(doc.id, asOk(doc))),
...documentsRequestedButNotClaimed.map((doc) => asTaskClaimEvent(doc.id, asErr(some(doc)))),
...documentsRequestedButNotReturned.map((id) => asTaskClaimEvent(id, asErr(none))),
]);

return {
claimedTasks: numberOfTasksClaimed,
docs,
claimedTasks: documentsClaimedById.length + documentsClaimedBySchedule.length,
docs: docs.filter((doc) => doc.status === TaskStatus.Claiming),
};
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ export default function ({ getService }) {
const testHistoryIndex = '.kibana_task_manager_test_result';
const supertest = supertestAsPromised(url.format(config.get('servers.kibana')));

// FLAKY: https://github.com/elastic/kibana/issues/71390
describe.skip('scheduling and running tasks', () => {
describe('scheduling and running tasks', () => {
beforeEach(
async () => await supertest.delete('/api/sample_tasks').set('kbn-xsrf', 'xxx').expect(200)
);
Expand Down

0 comments on commit 06fb025

Please sign in to comment.