Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Task manager] Adds ensureScheduled api to allow safer rescheduling of existing tasks #50232

Merged
merged 15 commits into from
Nov 14, 2019
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
feat(task-manager): added scheduleIfNotExists api
gmmorris committed Nov 11, 2019

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
commit 090e0942faf6d278e3d52f30c03c7e46921c74e4
1 change: 1 addition & 0 deletions x-pack/legacy/plugins/task_manager/plugin.test.ts
Original file line number Diff line number Diff line change
@@ -46,6 +46,7 @@ describe('Task Manager Plugin', () => {
"registerTaskDefinitions": [Function],
"remove": [Function],
"schedule": [Function],
"scheduleIfNotExists": [Function],
}
`);
});
2 changes: 2 additions & 0 deletions x-pack/legacy/plugins/task_manager/plugin.ts
Original file line number Diff line number Diff line change
@@ -11,6 +11,7 @@ export interface PluginSetupContract {
fetch: TaskManager['fetch'];
remove: TaskManager['remove'];
schedule: TaskManager['schedule'];
scheduleIfNotExists: TaskManager['scheduleIfNotExists'];
addMiddleware: TaskManager['addMiddleware'];
registerTaskDefinitions: TaskManager['registerTaskDefinitions'];
}
@@ -59,6 +60,7 @@ export class Plugin {
fetch: (...args) => taskManager.fetch(...args),
remove: (...args) => taskManager.remove(...args),
schedule: (...args) => taskManager.schedule(...args),
scheduleIfNotExists: (...args) => taskManager.scheduleIfNotExists(...args),
addMiddleware: (...args) => taskManager.addMiddleware(...args),
registerTaskDefinitions: (...args) => taskManager.registerTaskDefinitions(...args),
};
79 changes: 79 additions & 0 deletions x-pack/legacy/plugins/task_manager/task_manager.test.ts
Original file line number Diff line number Diff line change
@@ -121,6 +121,85 @@ describe('TaskManager', () => {
expect(savedObjectsClient.create).toHaveBeenCalled();
});

test('allows scheduling existing tasks that may have already been scheduled', async () => {
const client = new TaskManager(taskManagerOpts);
client.registerTaskDefinitions({
foo: {
type: 'foo',
title: 'Foo',
createTaskRunner: jest.fn(),
},
});
savedObjectsClient.create.mockRejectedValueOnce({
statusCode: 409,
});

client.start();

const result = await client.scheduleIfNotExists({
id: 'my-foo-id',
taskType: 'foo',
params: {},
state: {},
});

expect(result.id).toEqual('my-foo-id');
});

test('doesnt ignore failure to scheduling existing tasks for reasons other than already being scheduled', async () => {
const client = new TaskManager(taskManagerOpts);
client.registerTaskDefinitions({
foo: {
type: 'foo',
title: 'Foo',
createTaskRunner: jest.fn(),
},
});
savedObjectsClient.create.mockRejectedValueOnce({
statusCode: 500,
});

client.start();

return expect(
client.scheduleIfNotExists({
id: 'my-foo-id',
taskType: 'foo',
params: {},
state: {},
})
).rejects.toMatchObject({
statusCode: 500,
});
});

test('doesnt allow naively rescheduling existing tasks that have already been scheduled', async () => {
const client = new TaskManager(taskManagerOpts);
client.registerTaskDefinitions({
foo: {
type: 'foo',
title: 'Foo',
createTaskRunner: jest.fn(),
},
});
savedObjectsClient.create.mockRejectedValueOnce({
statusCode: 409,
});

client.start();

return expect(
client.schedule({
id: 'my-foo-id',
taskType: 'foo',
params: {},
state: {},
})
).rejects.toMatchObject({
statusCode: 409,
});
});

test('allows and queues removing tasks before starting', async () => {
const client = new TaskManager(taskManagerOpts);
savedObjectsClient.delete.mockResolvedValueOnce({});
23 changes: 23 additions & 0 deletions x-pack/legacy/plugins/task_manager/task_manager.ts
Original file line number Diff line number Diff line change
@@ -14,6 +14,7 @@ import {
TaskDefinition,
TaskDictionary,
ConcreteTaskInstance,
ExistingTaskInstance,
RunContext,
TaskInstance,
} from './task';
@@ -29,6 +30,8 @@ import {
} from './task_store';
import { identifyEsError } from './lib/identify_es_error';

const VERSION_CONFLICT_STATUS = 409;

export interface TaskManagerOpts {
logger: Logger;
config: any;
@@ -219,6 +222,26 @@ export class TaskManager {
return result;
}

/**
* Schedules a task with an Id
*
* @param task - The task being scheduled.
* @returns {Promise<ConcreteTaskInstance>}
*/
public async scheduleIfNotExists(
taskInstance: ExistingTaskInstance,
options?: any
): Promise<ExistingTaskInstance> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I guess sometimes this will return a ConcreteTaskInstance (if not already scheduled), but otherwise the taskInstance sent in. Seems like it would be "nice" if it did always return a ConcreteTaskInstance, but guessing you will have to do another read to get that, and hardly seems worth it, since the caller may not need it. Seems like this should be fine as a first attempt, maybe we'd need to re-look at a way to get the ConcreteTaskInstance back later, if we find we need it for some reason.

In any case, the JSDoc comment above this should be changed to say it's returning an ExistingTaskInstance instead of ConcreteTaskInstance.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I did think about that, though, the type it returns will always be TaskInstanceWithId, never an explicit ConcreteTaskInstance, as TaskInstanceWithId is essentially a subtype of ConcreteTaskInstance, but you're right that otherwise we'd have to make an additional read and it didn't seem worth it.
Feels reasonable to leave it such due to the ensure nature, no?

I'll change the JSDoc, thanks for catching 👍

try {
return await this.schedule(taskInstance, options);
} catch (err) {
if (err.statusCode === VERSION_CONFLICT_STATUS) {
return taskInstance;
}
throw err;
}
}

/**
* Fetches a paginatable list of scheduled tasks.
*
Original file line number Diff line number Diff line change
@@ -32,21 +32,34 @@ export function initRoutes(server) {
config: {
validate: {
payload: Joi.object({
taskType: Joi.string().required(),
interval: Joi.string().optional(),
params: Joi.object().required(),
state: Joi.object().optional(),
id: Joi.string().optional(),
task: Joi.object({
taskType: Joi.string().required(),
interval: Joi.string().optional(),
params: Joi.object().required(),
state: Joi.object().optional(),
id: Joi.string().optional()
}),
scheduleIfNotExists: Joi.boolean()
.default(false)
.optional(),
}),
},
},
async handler(request) {
try {
const task = await taskManager.schedule({
...request.payload,
const { scheduleIfNotExists = false, task: taskFields } = request.payload;
const task = {
...taskFields,
scope: [scope],
}, { request });
return task;
};

const taskResult = await (
scheduleIfNotExists
? taskManager.scheduleIfNotExists(task, { request })
: taskManager.schedule(task, { request })
);

return taskResult;
} catch (err) {
return err;
}
Original file line number Diff line number Diff line change
@@ -60,7 +60,15 @@ export default function ({ getService }) {
function scheduleTask(task) {
return supertest.post('/api/sample_tasks')
.set('kbn-xsrf', 'xxx')
.send(task)
.send({ task })
.expect(200)
.then((response) => response.body);
}

function scheduleTaskIfNotExists(task) {
return supertest.post('/api/sample_tasks')
.set('kbn-xsrf', 'xxx')
.send({ task, scheduleIfNotExists: true })
.expect(200)
.then((response) => response.body);
}
@@ -116,6 +124,24 @@ export default function ({ getService }) {
expect(result.id).to.be('test-task-for-sample-task-plugin-to-test-task-manager');
});

it('should allow a task with a given ID to be scheduled multiple times', async () => {
const result = await scheduleTaskIfNotExists({
id: 'test-task-to-reschedule-in-task-manager',
taskType: 'sampleTask',
params: { },
});

expect(result.id).to.be('test-task-to-reschedule-in-task-manager');

const rescheduleResult = await scheduleTaskIfNotExists({
id: 'test-task-to-reschedule-in-task-manager',
taskType: 'sampleTask',
params: { },
});

expect(rescheduleResult.id).to.be('test-task-to-reschedule-in-task-manager');
});

it('should reschedule if task errors', async () => {
const task = await scheduleTask({
taskType: 'sampleTask',