From 8fb9dd8cf4800afe3f54aba9ee4c0ae05efb4f1d Mon Sep 17 00:00:00 2001 From: killa Date: Thu, 8 Sep 2022 13:19:37 +0800 Subject: [PATCH] fix: findExecuteTask only return waiting task (#312) If multi instance access queue may return same task id, update task attemp idempotent for safe concurrent. --- app/core/entity/Task.ts | 16 +++++++++ app/core/service/TaskService.ts | 10 +++--- app/repository/TaskRepository.ts | 13 +++++++- app/repository/util/ModelConvertor.ts | 18 ++++++++++ .../TaskService/findExecuteTask.test.ts | 21 +++++++++++- test/repository/TaskRepository.test.ts | 33 +++++++++++++++++++ 6 files changed, 105 insertions(+), 6 deletions(-) diff --git a/app/core/entity/Task.ts b/app/core/entity/Task.ts index 93c371e7..f76a10db 100644 --- a/app/core/entity/Task.ts +++ b/app/core/entity/Task.ts @@ -64,6 +64,11 @@ export interface ChangesStreamTaskData extends TaskBaseData { registryId?: string, } +export interface TaskUpdateCondition { + taskId: string; + attempts: number; +} + export type CreateHookTask = Task; export type TriggerHookTask = Task; export type CreateSyncPackageTask = Task; @@ -221,6 +226,17 @@ export class Task extends Entity { task.logPath = `/binaries/${targetName}/syncs/${dayjs().format('YYYY/MM/DDHHmm')}-${task.taskId}.log`; return task; } + + start(): TaskUpdateCondition { + const condition = { + taskId: this.taskId, + attempts: this.attempts, + }; + this.setExecuteWorker(); + this.state = TaskState.Processing; + this.attempts += 1; + return condition; + } } export type SyncInfo = { diff --git a/app/core/service/TaskService.ts b/app/core/service/TaskService.ts index 463092a3..83b1c450 100644 --- a/app/core/service/TaskService.ts +++ b/app/core/service/TaskService.ts @@ -85,10 +85,12 @@ export class TaskService extends AbstractService { continue; } - task.setExecuteWorker(); - task.state = TaskState.Processing; - task.attempts += 1; - await this.taskRepository.saveTask(task); + const condition = task.start(); + const saveSucceed = await this.taskRepository.idempotentSaveTask(task, condition); + if (!saveSucceed) { + taskId = await this.queueAdapter.pop(taskType); + continue; + } return task; } diff --git a/app/repository/TaskRepository.ts b/app/repository/TaskRepository.ts index 44b62d94..105d2050 100644 --- a/app/repository/TaskRepository.ts +++ b/app/repository/TaskRepository.ts @@ -1,8 +1,9 @@ +import assert from 'assert'; import { AccessLevel, ContextProto, Inject } from '@eggjs/tegg'; import { ModelConvertor } from './util/ModelConvertor'; import type { Task as TaskModel } from './model/Task'; import type { HistoryTask as HistoryTaskModel } from './model/HistoryTask'; -import { Task as TaskEntity } from '../core/entity/Task'; +import { Task as TaskEntity, TaskUpdateCondition } from '../core/entity/Task'; import { AbstractRepository } from './AbstractRepository'; import { TaskType, TaskState } from '../../app/common/enum/Task'; @@ -44,6 +45,16 @@ export class TaskRepository extends AbstractRepository { } } + async idempotentSaveTask(task: TaskEntity, condition: TaskUpdateCondition): Promise { + assert(task.id, 'task have no save'); + const changes = ModelConvertor.convertEntityToChanges(task, this.Task); + const updateRows = await this.Task.update({ + taskId: condition.taskId, + attempts: condition.attempts, + }, changes); + return updateRows === 1; + } + async saveTaskToHistory(task: TaskEntity): Promise { const model = await this.Task.findOne({ id: task.id }); if (!model) return; diff --git a/app/repository/util/ModelConvertor.ts b/app/repository/util/ModelConvertor.ts index b64d71d1..0434c740 100644 --- a/app/repository/util/ModelConvertor.ts +++ b/app/repository/util/ModelConvertor.ts @@ -31,6 +31,24 @@ export class ModelConvertor { return model as T; } + static convertEntityToChanges(entity: object, ModelClazz: EggProtoImplClass) { + const changes = {}; + const metadata = ModelMetadataUtil.getModelMetadata(ModelClazz); + if (!metadata) { + throw new Error(`Model ${ModelClazz.name} has no metadata`); + } + for (const attributeMeta of metadata.attributes) { + const modelPropertyName = attributeMeta.propertyName; + const entityPropertyName = ModelConvertorUtil.getEntityPropertyName(ModelClazz, modelPropertyName); + if (entityPropertyName === CREATED_AT) continue; + const attributeValue = _.get(entity, entityPropertyName); + changes[modelPropertyName] = attributeValue; + } + changes[UPDATED_AT] = new Date(); + entity[UPDATED_AT] = changes[UPDATED_AT]; + return changes; + } + // TODO: options is QueryOptions, should let leoric export it to use // Find out which attributes changed and set `updatedAt` to now static async saveEntityToModel(entity: object, model: T, options?): Promise { diff --git a/test/core/service/TaskService/findExecuteTask.test.ts b/test/core/service/TaskService/findExecuteTask.test.ts index a2fd8015..c94d16d1 100644 --- a/test/core/service/TaskService/findExecuteTask.test.ts +++ b/test/core/service/TaskService/findExecuteTask.test.ts @@ -1,22 +1,26 @@ import assert = require('assert'); -import { app } from 'egg-mock/bootstrap'; +import { app, mm } from 'egg-mock/bootstrap'; import { Context } from 'egg'; import { TaskService } from 'app/core/service/TaskService'; import { PackageSyncerService } from 'app/core/service/PackageSyncerService'; import { TaskState, TaskType } from 'app/common/enum/Task'; +import { RedisQueueAdapter } from '../../../../app/infra/QueueAdapter'; describe('test/core/service/TaskService/findExecuteTask.test.ts', () => { let ctx: Context; let taskService: TaskService; let packageSyncerService: PackageSyncerService; + let queueAdapter: RedisQueueAdapter; beforeEach(async () => { ctx = await app.mockModuleContext(); taskService = await ctx.getEggObject(TaskService); packageSyncerService = await ctx.getEggObject(PackageSyncerService); + queueAdapter = await ctx.getEggObject(RedisQueueAdapter); }); afterEach(async () => { + mm.restore(); await app.destroyModuleContext(ctx); }); @@ -92,5 +96,20 @@ describe('test/core/service/TaskService/findExecuteTask.test.ts', () => { assert(executeTask === null); }); + it('should not task which take be other', async () => { + const task1 = await packageSyncerService.createTask('foo-1'); + const task2 = await packageSyncerService.createTask('foo-2'); + // mock pop get duplicate taskId + const popResult = [ task1.taskId, task1.taskId, task2.taskId ]; + let times = 0; + mm(queueAdapter, 'pop', async () => { + return popResult[times++]; + }); + const tasks = await Promise.all([ + taskService.findExecuteTask(task1.type), + taskService.findExecuteTask(task1.type), + ]); + assert(tasks[0]?.taskId !== task1[1]?.taskId); + }); }); }); diff --git a/test/repository/TaskRepository.test.ts b/test/repository/TaskRepository.test.ts index 5aa50438..a383b490 100644 --- a/test/repository/TaskRepository.test.ts +++ b/test/repository/TaskRepository.test.ts @@ -119,4 +119,37 @@ describe('test/repository/TaskRepository.test.ts', () => { assert(task1.updatedAt.getTime() > lastSince.getTime()); }); }); + + describe('idempotentSaveTask', () => { + let task: Task; + beforeEach(async () => { + const bizId = 'mock_dup_biz_id'; + const data: EasyData, 'taskId'> = { + type: TaskType.ChangesStream, + state: TaskState.Waiting, + targetName: 'foo', + authorId: `pid_${process.pid}`, + authorIp: os.hostname(), + data: { + taskWorker: '', + since: '', + }, + bizId, + }; + // 首先创建一个 task1 + const newData = EntityUtil.defaultData(data, 'taskId'); + task = new Task(newData); + // 持久化保存 task1 + await taskRepository.saveTask(task); + }); + + it('should only save one', async () => { + const condition = task.start(); + const [ firstSave, secondSave ] = await Promise.all([ + taskRepository.idempotentSaveTask(task, condition), + taskRepository.idempotentSaveTask(task, condition), + ]); + assert(firstSave !== secondSave); + }); + }); });