Skip to content

Commit

Permalink
fix: findExecuteTask only return waiting task (#312)
Browse files Browse the repository at this point in the history
If multi instance access queue may return
same task id, update task attemp idempotent
for safe concurrent.
  • Loading branch information
killagu authored Sep 8, 2022
1 parent d8a27e2 commit 8fb9dd8
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 6 deletions.
16 changes: 16 additions & 0 deletions app/core/entity/Task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ export interface ChangesStreamTaskData extends TaskBaseData {
registryId?: string,
}

export interface TaskUpdateCondition {
taskId: string;
attempts: number;
}

export type CreateHookTask = Task<CreateHookTaskData>;
export type TriggerHookTask = Task<TriggerHookTaskData>;
export type CreateSyncPackageTask = Task<CreateSyncPackageTaskData>;
Expand Down Expand Up @@ -221,6 +226,17 @@ export class Task<T extends TaskBaseData = TaskBaseData> extends Entity {
task.logPath = `/binaries/${targetName}/syncs/${dayjs().format('YYYY/MM/DDHHmm')}-${task.taskId}.log`;
return task;
}

start(): TaskUpdateCondition {
const condition = {
taskId: this.taskId,
attempts: this.attempts,
};
this.setExecuteWorker();
this.state = TaskState.Processing;
this.attempts += 1;
return condition;
}
}

export type SyncInfo = {
Expand Down
10 changes: 6 additions & 4 deletions app/core/service/TaskService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,12 @@ export class TaskService extends AbstractService {
continue;
}

task.setExecuteWorker();
task.state = TaskState.Processing;
task.attempts += 1;
await this.taskRepository.saveTask(task);
const condition = task.start();
const saveSucceed = await this.taskRepository.idempotentSaveTask(task, condition);
if (!saveSucceed) {
taskId = await this.queueAdapter.pop<string>(taskType);
continue;
}
return task;
}

Expand Down
13 changes: 12 additions & 1 deletion app/repository/TaskRepository.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import assert from 'assert';
import { AccessLevel, ContextProto, Inject } from '@eggjs/tegg';
import { ModelConvertor } from './util/ModelConvertor';
import type { Task as TaskModel } from './model/Task';
import type { HistoryTask as HistoryTaskModel } from './model/HistoryTask';
import { Task as TaskEntity } from '../core/entity/Task';
import { Task as TaskEntity, TaskUpdateCondition } from '../core/entity/Task';
import { AbstractRepository } from './AbstractRepository';
import { TaskType, TaskState } from '../../app/common/enum/Task';

Expand Down Expand Up @@ -44,6 +45,16 @@ export class TaskRepository extends AbstractRepository {
}
}

async idempotentSaveTask(task: TaskEntity, condition: TaskUpdateCondition): Promise<boolean> {
assert(task.id, 'task have no save');
const changes = ModelConvertor.convertEntityToChanges(task, this.Task);
const updateRows = await this.Task.update({
taskId: condition.taskId,
attempts: condition.attempts,
}, changes);
return updateRows === 1;
}

async saveTaskToHistory(task: TaskEntity): Promise<void> {
const model = await this.Task.findOne({ id: task.id });
if (!model) return;
Expand Down
18 changes: 18 additions & 0 deletions app/repository/util/ModelConvertor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,24 @@ export class ModelConvertor {
return model as T;
}

static convertEntityToChanges<T extends Bone>(entity: object, ModelClazz: EggProtoImplClass<T>) {
const changes = {};
const metadata = ModelMetadataUtil.getModelMetadata(ModelClazz);
if (!metadata) {
throw new Error(`Model ${ModelClazz.name} has no metadata`);
}
for (const attributeMeta of metadata.attributes) {
const modelPropertyName = attributeMeta.propertyName;
const entityPropertyName = ModelConvertorUtil.getEntityPropertyName(ModelClazz, modelPropertyName);
if (entityPropertyName === CREATED_AT) continue;
const attributeValue = _.get(entity, entityPropertyName);
changes[modelPropertyName] = attributeValue;
}
changes[UPDATED_AT] = new Date();
entity[UPDATED_AT] = changes[UPDATED_AT];
return changes;
}

// TODO: options is QueryOptions, should let leoric export it to use
// Find out which attributes changed and set `updatedAt` to now
static async saveEntityToModel<T extends Bone>(entity: object, model: T, options?): Promise<boolean> {
Expand Down
21 changes: 20 additions & 1 deletion test/core/service/TaskService/findExecuteTask.test.ts
Original file line number Diff line number Diff line change
@@ -1,22 +1,26 @@
import assert = require('assert');
import { app } from 'egg-mock/bootstrap';
import { app, mm } from 'egg-mock/bootstrap';
import { Context } from 'egg';
import { TaskService } from 'app/core/service/TaskService';
import { PackageSyncerService } from 'app/core/service/PackageSyncerService';
import { TaskState, TaskType } from 'app/common/enum/Task';
import { RedisQueueAdapter } from '../../../../app/infra/QueueAdapter';

describe('test/core/service/TaskService/findExecuteTask.test.ts', () => {
let ctx: Context;
let taskService: TaskService;
let packageSyncerService: PackageSyncerService;
let queueAdapter: RedisQueueAdapter;

beforeEach(async () => {
ctx = await app.mockModuleContext();
taskService = await ctx.getEggObject(TaskService);
packageSyncerService = await ctx.getEggObject(PackageSyncerService);
queueAdapter = await ctx.getEggObject(RedisQueueAdapter);
});

afterEach(async () => {
mm.restore();
await app.destroyModuleContext(ctx);
});

Expand Down Expand Up @@ -92,5 +96,20 @@ describe('test/core/service/TaskService/findExecuteTask.test.ts', () => {
assert(executeTask === null);
});

it('should not task which take be other', async () => {
const task1 = await packageSyncerService.createTask('foo-1');
const task2 = await packageSyncerService.createTask('foo-2');
// mock pop get duplicate taskId
const popResult = [ task1.taskId, task1.taskId, task2.taskId ];
let times = 0;
mm(queueAdapter, 'pop', async () => {
return popResult[times++];
});
const tasks = await Promise.all([
taskService.findExecuteTask(task1.type),
taskService.findExecuteTask(task1.type),
]);
assert(tasks[0]?.taskId !== task1[1]?.taskId);
});
});
});
33 changes: 33 additions & 0 deletions test/repository/TaskRepository.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -119,4 +119,37 @@ describe('test/repository/TaskRepository.test.ts', () => {
assert(task1.updatedAt.getTime() > lastSince.getTime());
});
});

describe('idempotentSaveTask', () => {
let task: Task;
beforeEach(async () => {
const bizId = 'mock_dup_biz_id';
const data: EasyData<TaskData<ChangesStreamTaskData>, 'taskId'> = {
type: TaskType.ChangesStream,
state: TaskState.Waiting,
targetName: 'foo',
authorId: `pid_${process.pid}`,
authorIp: os.hostname(),
data: {
taskWorker: '',
since: '',
},
bizId,
};
// 首先创建一个 task1
const newData = EntityUtil.defaultData(data, 'taskId');
task = new Task(newData);
// 持久化保存 task1
await taskRepository.saveTask(task);
});

it('should only save one', async () => {
const condition = task.start();
const [ firstSave, secondSave ] = await Promise.all([
taskRepository.idempotentSaveTask(task, condition),
taskRepository.idempotentSaveTask(task, condition),
]);
assert(firstSave !== secondSave);
});
});
});

0 comments on commit 8fb9dd8

Please sign in to comment.