Skip to content

Commit

Permalink
fix: changes stream (#297)
Browse files Browse the repository at this point in the history
1. 修复 registry/sync 初始化时没有传递 registryId 问题
2. 修复 changesStream 获取时,如果没有需要同步的任务会导致 task#since 无法更新
  • Loading branch information
elrrrrrrr authored Aug 24, 2022
1 parent 304014c commit 359a150
Show file tree
Hide file tree
Showing 8 changed files with 41 additions and 10 deletions.
3 changes: 2 additions & 1 deletion app/core/entity/Task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ export class Task<T extends TaskBaseData = TaskBaseData> extends Entity {
return task;
}

public static createChangesStream(targetName: string, since = ''): ChangesStreamTask {
public static createChangesStream(targetName: string, registryId = '', since = ''): ChangesStreamTask {
const data = {
type: TaskType.ChangesStream,
state: TaskState.Waiting,
Expand All @@ -146,6 +146,7 @@ export class Task<T extends TaskBaseData = TaskBaseData> extends Entity {
data: {
// task execute worker
taskWorker: '',
registryId,
since,
},
};
Expand Down
13 changes: 12 additions & 1 deletion app/core/service/ChangesStreamService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -164,10 +164,10 @@ export class ChangesStreamService extends AbstractService {
for await (const change of stream) {
const { fullname, seq } = change as ChangesStreamChange;
lastPackage = fullname;
lastSince = seq;
const valid = await this.needSync(registry, fullname);
if (valid) {
taskCount++;
lastSince = seq;
await this.packageSyncerService.createTask(fullname, {
authorIp: HOST_NAME,
authorId: 'ChangesStreamService',
Expand All @@ -185,6 +185,17 @@ export class ChangesStreamService extends AbstractService {
}
}

// 如果没有需要同步的任务,也更新一下 taskData 里的信息
if (taskCount === 0) {
// 实时更新 task 信息
task.updateSyncData({
lastSince,
lastPackage,
taskCount,
});
await this.taskRepository.saveTask(task);
}

return { lastSince, taskCount };
}
}
2 changes: 1 addition & 1 deletion app/core/service/RegistryManagerService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ export class RegistryManagerService extends AbstractService {

// 启动 changeStream
const targetName = `${registry.name.toUpperCase()}_WORKER`;
await this.taskService.createTask(Task.createChangesStream(targetName, since), false);
await this.taskService.createTask(Task.createChangesStream(targetName, registryId, since), false);
}

async createRegistry(createCmd: CreateRegistryCmd): Promise<Registry> {
Expand Down
4 changes: 2 additions & 2 deletions app/repository/model/HistoryTask.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Attribute, Model } from '@eggjs/tegg-orm-decorator';
import { DataTypes, Bone } from 'leoric';
import { DataTypes, Bone, LENGTH_VARIANTS } from 'leoric';
import { TaskState, TaskType } from '../../common/enum/Task';

@Model()
Expand Down Expand Up @@ -48,6 +48,6 @@ export class HistoryTask extends Bone {
@Attribute(DataTypes.INTEGER)
attempts: number;

@Attribute(DataTypes.TEXT('long'))
@Attribute(DataTypes.TEXT(LENGTH_VARIANTS.long))
error: string;
}
4 changes: 2 additions & 2 deletions app/repository/model/PackageVersionBlock.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Attribute, Model } from '@eggjs/tegg-orm-decorator';
import { DataTypes, Bone } from 'leoric';
import { DataTypes, Bone, LENGTH_VARIANTS } from 'leoric';

@Model()
export class PackageVersionBlock extends Bone {
Expand All @@ -26,6 +26,6 @@ export class PackageVersionBlock extends Bone {
@Attribute(DataTypes.STRING(256))
version: string;

@Attribute(DataTypes.TEXT('long'))
@Attribute(DataTypes.TEXT(LENGTH_VARIANTS.long))
reason: string;
}
4 changes: 2 additions & 2 deletions app/repository/model/Task.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Attribute, Model } from '@eggjs/tegg-orm-decorator';
import { DataTypes, Bone } from 'leoric';
import { DataTypes, Bone, LENGTH_VARIANTS } from 'leoric';
import { TaskState, TaskType } from '../../common/enum/Task';

@Model()
Expand Down Expand Up @@ -48,7 +48,7 @@ export class Task extends Bone {
@Attribute(DataTypes.INTEGER)
attempts: number;

@Attribute(DataTypes.TEXT('long'))
@Attribute(DataTypes.TEXT(LENGTH_VARIANTS.long))
error: string;

@Attribute(DataTypes.STRING(48), {
Expand Down
20 changes: 19 additions & 1 deletion test/core/service/ChangesStreamService.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ describe('test/core/service/ChangesStreamService.test.ts', () => {
registryManagerService = await ctx.getEggObject(RegistryManagerService);
scopeManagerService = await ctx.getEggObject(ScopeManagerService);
assert(changesStreamService);
task = Task.createChangesStream('GLOBAL_WORKER', '9527');
task = Task.createChangesStream('GLOBAL_WORKER', '', '9527');
taskService.createTask(task, false);

// create default registry
Expand Down Expand Up @@ -163,6 +163,24 @@ describe('test/core/service/ChangesStreamService.test.ts', () => {
assert(changes.taskCount === 2);
assert(changes.lastSince === '3');
});

it('should update since even not taskCount', async () => {
mock(ChangesStreamService.prototype, 'needSync', async () => {
return false;
});
mock(ctx.httpclient, 'request', async () => {
return {
res: Readable.from(`
{"seq":2,"id":"backbone.websql.deferred","changes":[{"rev":"4-f5150b238ab62cd890211fb57fc9eca5"}],"deleted":true},
{"seq":3,"id":"backbone2.websql.deferred","changes":[{"rev":"4-f6150b238ab62cd890211fb57fc9eca5"}],"deleted":true},
`),
};
});
const changes = await changesStreamService.executeSync('1', task);
assert(changes.taskCount === 0);
assert(changes.lastSince === '3');
assert(task.data.since === '3');
});
});

});
1 change: 1 addition & 0 deletions test/core/service/RegistryManagerService/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ describe('test/core/service/RegistryManagerService/index.test.ts', () => {
const targetName = 'CUSTOM_WORKER';
const task = await taskRepository.findTaskByTargetName(targetName, TaskType.ChangesStream);
assert(task);
assert((task.data as ChangesStreamTaskData).registryId === registry.registryId);
});

it('should preCheck registry', async () => {
Expand Down

0 comments on commit 359a150

Please sign in to comment.