Skip to content

Commit

Permalink
feat: add lock for sync package
Browse files Browse the repository at this point in the history
  • Loading branch information
elrrrrrrr committed Nov 24, 2022
1 parent 396554d commit e88b781
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 8 deletions.
25 changes: 19 additions & 6 deletions app/common/adapter/CacheAdapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import {
Inject,
} from '@eggjs/tegg';
import { Redis } from 'ioredis';
import { setTimeout } from 'timers/promises';

const ONE_DAY = 3600 * 24;

Expand Down Expand Up @@ -60,16 +61,28 @@ export class CacheAdapter {
await this.redis.del(lockName);
}

async usingLock(key: string, seconds: number, func: () => Promise<void>) {
async waitForUnLock(key: string, seconds: number, func: () => Promise<void>, retry = 3) {
const lockTimestamp = await this.lock(key, seconds);
if (!lockTimestamp) return;
try {
await func();
} finally {
await this.unlock(key, lockTimestamp);
// 抢占成功
if (lockTimestamp) {
try {
await func();
} finally {
await this.unlock(key, lockTimestamp);
}
return;
}
// 抢占失败
if (retry) {
await setTimeout(seconds * 1000);
await this.waitForUnLock(key, seconds, func, retry - 1);
}
}

async usingLock(key: string, seconds: number, func: () => Promise<void>) {
await this.waitForUnLock(key, seconds, func, 0);
}

private getLockName(key: string) {
return `CNPMCORE_L_${key}`;
}
Expand Down
12 changes: 10 additions & 2 deletions app/port/schedule/SyncPackageWorker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { EggAppConfig, EggLogger } from 'egg';
import { IntervalParams, Schedule, ScheduleType } from '@eggjs/tegg/schedule';
import { Inject } from '@eggjs/tegg';
import { PackageSyncerService } from '../../core/service/PackageSyncerService';

import { CacheAdapter } from '../../../app/common/adapter/CacheAdapter';

let executingCount = 0;

Expand All @@ -22,6 +22,9 @@ export class SyncPackageWorker {
@Inject()
private readonly logger: EggLogger;

@Inject()
private readonly cacheAdapter: CacheAdapter;

async subscribe() {
if (this.config.cnpmcore.syncMode !== 'all') return;
if (executingCount >= this.config.cnpmcore.syncPackageWorkerMaxConcurrentTasks) return;
Expand All @@ -34,7 +37,12 @@ export class SyncPackageWorker {
this.logger.info('[SyncPackageWorker:subscribe:executeTask:start][%s] taskId: %s, targetName: %s, attempts: %s, params: %j, updatedAt: %s, delay %sms',
executingCount, task.taskId, task.targetName, task.attempts, task.data, task.updatedAt,
startTime - task.updatedAt.getTime());
await this.packageSyncerService.executeTask(task);
// 默认独占 1 分钟
// 防止同名任务导致互相冲突
// 只需保证间隔顺序即可
await this.cacheAdapter.waitForUnLock(`${task.type}_${task.targetName}`, 60, async () => {
await this.packageSyncerService.executeTask(task);
});
const use = Date.now() - startTime;
this.logger.info('[SyncPackageWorker:subscribe:executeTask:success][%s] taskId: %s, targetName: %s, use %sms',
executingCount, task.taskId, task.targetName, use);
Expand Down
20 changes: 20 additions & 0 deletions test/common/adapter/CacheAdapter.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,5 +67,25 @@ describe('test/common/adapter/CacheAdapter.test.ts', () => {
const lockId3 = await cache.lock('CNPMCORE_L_unittest', 10);
assert(lockId3);
});

describe('waitForUnLock()', () => {
it('should work', async () => {
let trigger = 0;
await cache.lock('unittest-waitForUnLock', 1);
await cache.waitForUnLock('unittest-waitForUnLock', 1, async () => {
trigger++;
});
assert(trigger === 1);
});

it('should abort after retry', async () => {
let trigger = 0;
await cache.lock('unittest-waitForUnLock', 10);
await cache.waitForUnLock('unittest-waitForUnLock', 1, async () => {
trigger++;
});
assert(trigger === 0);
});
});
});
});

0 comments on commit e88b781

Please sign in to comment.